Airflow: The Missing Context

Written by jondot | Published 2017/10/12
Tech Story Tags: python | big-data | data-science | engineering | spark

TLDRvia the TL;DR App

There are a number of ways to do data workflows:

  • Custom scripts
  • Cron, running each discrete task in a workflow based on timing and run-time of each task
  • Jenkins, modeling each task as a job, and using job dependencies
  • Oozie, Luigi, and other tailor made workflow management systems born into the Hadoop ecosystem
  • Airflow

We’re going to take a look at some of these approaches and end up with a discussion of why Airflow is the better choice.

Our use case

Let’s build a price tracking platform, which tracks a popular ecommerce website, let’s say Ebay’s fashion section, so that we can cross-reference products there with other fashion sites like Zappos and Zara. Of course, we’re talking “in theory” here, because crawling websites that don’t allow it, is against their TOS. Moving on, here are the high level steps.

Deep crawl over one or more stores. In this step we’re crawling one more more websites. We can crawl and dump everything we see, or scrape most of the data we need, dumping a semi-structured and semi-clean data to disk.

Cleansing, normalization and structured data extraction. In this step we’ll get rid of blanks, nulls, corrupted, and redundant data points. We’ll try our best to normalize URLs, domains, and the likes to their canonical form; potentially discovering duplicate records.

Enrichment. Although we’re getting data from the source, there’s still auxiliary data such as currency rates, inventory, blacklists, and so on that we need to use for enriching our existing data. These kind of data can be had by running more crawls, fetching data from APIs, or joining against a database.

Merge (match enrichment to items). Previous steps left us with data points that are clean and that we can join; perhaps we need to compute a currency on an item, based on an exchange rate we’ve fetched, or we need to fill-in a missing field in some items, from another, auxiliary sub-crawl that we ran as part of the enrichment step.

Aggregation and trend tracking. In this step we care about deduplicating data, and figuring out the identity of items, so that for every item, we can cross reference these with previous runs and see if in that item really changed. If it did, we’d output a change record, which allows us to model change-tracking, and then detecting trends in each item individually based on the rate of change and properties of that change.

As a workflow, we can execute these steps linearly as listed, but because a good part of the steps are not dependent, they are a graph that lends itself to optimization. For example, we can run the main crawl while running enrichment crawls, and continue when both finished.

Workflows with Make

Assuming we want to track fashion items on ebay, here’s a clunky workflow with make.

crawl-main:
    scrapy crawl ebay_wear
crawl-enrich: crawl-main
    RECENCY_ONLY=1 IN_7=1 scrapy crawl zara
    RECENCY_ONLY=1 IN_30=1 scrapy crawl h_n_m
    RECENCY_ONLY=1 IN_90=1 scrapy crawl zappos
agg-last: crawl-enrich
    python process_results.py

This (dangerously) works for the happy path (and sad developer) — there’s no failure recovery and no operational story at all and maintainbility is horrible. It’s run-and-pray and then troubleshoot by hand.

Some tasks are verbatim in the Makefile such as the crawl- tasks, and knobs appear as environment variables. Other tasks are more cryptic, like agg-last, where we shell out to Python because what ever we tried doing is too complex to remain in make realm.

And we run this by saying:

JOB_ID=run-11 make agg-last

When we run make in this way we synthesize a concept of a unit of work, so that all processes spawned are aware of it and build artifacts under it. By saying JOB_ID=run-11 each one of the processes must now implicitly be aware of it and generate an output under an agreed upon run-11 folder.

/out
  /run-10
    ebay_wear.csv
    zara.csv
    h_n_m.csv
    zappos.csv
    merged.csv
    agged.csv
  /run-11
    ebay_wear.csv
    zara.csv
    h_n_m.csv
    zappos.csv
    merged.csv
    agged.csv

But what was the core idea here going for make in the first place? make does know how to execute a model of a tree of dependencies, and it's already proven for highly complex builds with tons of dependencies, and heck, we can also play with making sentinel crawl-main, crawl-enrich, and agg-last files to have make skip tasks that have we ran successfully.

Bottom line, the intuition might be there, but the impedence mismatch between a build system and a data workflow management system has to result with a clunky solution sporting bad ergonomics that is hard to maintain and troubleshoot.

Workflows with Code

Here’s how some of the workflow should look like in pure code. The main takeaways here are that we generate a job ID and around that a working directory for that job manually. We then have no choice but to build our own operators such as crawl so that the workflow is somewhat readable. There's still plenty that is repetitive and subpar.

<a href="https://medium.com/media/cf2a4d74f414ed4fd2a1aaa700212e46/href">https://medium.com/media/cf2a4d74f414ed4fd2a1aaa700212e46/href</a>

This marks a move away from our declarative attempt with make and into a programmatic attempt with a Python script. Nothing special about Python here, aside of the fact that both crawling and data wrangling frameworks used later are in Python as well, so that it makes sense to keep at it.

Workflows with Jenkins

Building workflows with Jenkins can be great, and for a while I’ve ignored Luigi, Oozie and Azkaban in favor of using Jenkins. It can offer everything that Airflow provides and more in form of Jenkins plugins and its ecosystem and before you’re done with modeling data jobs, you can integrate your existing CI jobs for the code that wrangles your data directly with your data pipeline.

You get dependency modeling between jobs, job ID and isolation, artifacts, history, gantt, visualizations and more, and lastly a distributed model for workers that’s battle proven over Mesos, Kubernetes or what-have-you, including its built-in robust scheduling and triggering mechanisms.

If your data pipeline work doesn’t require dynamic workflows, complex workflows, and extreme flexibility, then by all means, there’s no reason why you can’t model your data pipeline workflows in Jenkins. It’ll be simpler and closer to what your probably already know.

Otherwise, it’d be tedious, and it won’t be a platform you can meaningfully evolve and remain within your data realm. For example, while you could build plugins for customizing your workflow platform (read: Jenkins), it’d be a horrible experience to build, and probably a nightmare getting a grasp of Jenkins’ internal APIs, compared to Airflow’s small API surface area, and its ‘add a script and import’ ergonomics.

Workflows with Airflow

Here’s a minimal DAG with Airflow with some naive configuration to keep this example readable.

<a href="https://medium.com/media/494de811ced8a3f6d05764a7595e6d01/href">https://medium.com/media/494de811ced8a3f6d05764a7595e6d01/href</a>

I like to abstract operator creation, as it ultimately makes a more readable code block and allows for extra configuration to generate dynamic tasks, so here we have crawl, combine, agg, show and all can take parameters.

Here’s the crawl function.

<a href="https://medium.com/media/0e43fb7e34beb81d9c702a6a5d1ad149/href">https://medium.com/media/0e43fb7e34beb81d9c702a6a5d1ad149/href</a>

The oddly looking {{{{ds}}}} bit is what makes our job ID. This is a special template variable that Airflow injects for us for free - this bash_command parameter is actually a string template, passed into Airflow, rendered, and then executed as a Bash command. Because Airflow makes time a first-class citizen, you can look at plenty more of those special parameters here.

Hidden from our view is the metadata store and the scheduler and executor. The metadata store is a database that stores records of previous task runs and DAG runs, which makes task not repeat if they ran, and lets us retry, force-run tasks and DAGs and more. The executor can be a local executor using local processes, or a distributed one using a job queue or any other scheduler that we can build.

DAG authoring is ignorant of the operational aspects of itself: state and execution. This means that if we wanted to let data scientists write DAGs, devops maintain the database and data engineers work on the execution engine and DAG building blocks (operators, sensors) — we could.

The Good Parts

DAGs

Although Airflow uses the term “DAG” everywhere, it doesn’t make that much difference. The term is making its odd hype comeback, I guess. You build and use DAGs every day when you’re sorting out your builds and their dependencies. DAGs are everywhere, regardless of a framework or dealing with data. But if you didn’t happen to go over graph theory in university then it’ll probably make a difference in your thinking.

That said, plenty of your workflow tasks will be linear at first, and those that don’t will require plenty of fiddling with, and that will all happen as you go. Most often a job will start as one task, break into a linear series of jobs, perhaps running on different capacity machines, and then you’d optimize it for running bits in parallel, converge, and have steps communicate via a distributed file system.

DAG Control

The ability to visualize a DAG in more than one way — graph, tree, Gantt, and more importantly — to interact with such a DAG by selecting a node and forcing repeat, re-running a failed node, force running an entire DAG, looking at a node’s progress as part of a DAG run and its logs do make a difference, and stand out from any other system that uses “DAG” to promote itself.

Ergonomics

The API feels programmatic. Although it isn’t the most concise one, generating dynamic workflows and building new APIs that use it almost asks to happen as you start working with Airflow. Operators, Sensors and Hooks are simple concepts and yet carry a lot of weight so that when you subclass you get a ton of functionality.

Dynamic workflows are the killer feature. It’s nice to spin a for-loop that generates an array of parallel tasks for you and saves you some typing, but also think of Airflow and its malleability as a library you can use in your own projects.

With Airflow, you can have self-assembling workflows, dynamic and parameter-bound, and you can build one of those cool data shipping startups that hose data from one place to another, effectively building a multi-tenant workflow system and executor as-a-service like AWS data pipelines. For example, you can generate DAGs based on configuration files or database rows where each belongs to a different client.

Time

Time is a first-class citizen. But more abstractly, timing is versioning; if you’re dealing with software then it’s probable that you have some scheme of versioning built-in, from which you do delivery through periodical iterations, branches, patch and hotfix strategies, and perhaps a feature matrix.

When you’re dealing with data ingress, and organic traffic, then its most probable a time window is your de-facto version to use. For example you probably synthesized something like dt=20170908 as a folder name, file name, and log tag, to model a "version" of the current data iteration.

The impedence mismatch between time represeting a temporal thing, and version representing a discrete incremental thing, will vary between not-quite-a-workflow-management-system like Jenkins, to workflow management systems like Luigi, to data workflow management systems like Airflow.

In Airflow, there’s a strong understading of time being the natural cadence for data moving forward. For example, a time stamp should be your “job ID”. Once we commit to specializing the versioning concept over data as a time window, then we can talk about time-specific operations such as “backfilling” based on start and end date, time intervals, scheduling, and so on. In other systems where time plays an equal role as any other concept, you’d have to manually make that work.

Growth

Hooks, Operators and Sensors are basic building blocks which Airflow relies on expanding for better growth and adoption as a project. I feel that there’s almost a bet Airflow will prosper as a project iff people build more of these, which is not too hard to build.

Actually running a task using an Airflow worker’s cpu cycles vs an Airflow worker triggering a task in a remote, more powerful cluster allow for simplicity. More people can use the first case, and even a hybrid model using inline Spark (pyspark, at least) code, in the DAG file, which is easy and thereby attractive, and lastly go with the latter case where an Airflow task merely triggers a job in a more resourceful machine.

The Bad Parts

Airflow has some bad parts, but to take some care away I think whoever uses Airflow is still in the initial circle of early adopters; and this is a bleeding edge product, so expect to bleed.

Logs

Airflow is still clunky in some parts. There’s implicitness going on with automatic loading of DAGs, components that are separate from each other, which is a good thing; web, scheduler, executor, database, and more. All this means that when something goes wrong we have plenty of sifting through these components to understand what went wrong and then how to fix it; locating the right task and the right log is time consuming.

The exciting part is that this feels like a low-hanging-fruit; one idea is to show errors on the DAG visualization itself in a form of a badge with error count, and clicking-through will get you to the exact point in the logs.

Context

Although the documentation feels comprehensive, its not. There’s a lot of context missed (hopefully this article closes some gap there), and it feels that everything is there, in form of examples and building blocks, but no one took the time yet to do the appropriate write-ups.

It feels that with some more time it’ll be more complete; but on the other side the Airflow adoption train is moving ahead, and I keep spotting confused souls that I had to give this context to.

Bugs

That Airflow makes such a good fit for data workflows, and makes a no-brainer decision to adopt, obscures the fact that this is still a young product that tackles a complex topic. There’s bugs and plenty hidden knobs to bump into (as of v1.8.1), even when you’re just starting up. For the hidden knobs I recommend giving the API reference a thorough read, and for the bugs, make sure to get the latest version and be ready to go to Github issues every time you feel something is odd.

Lastly, don’t forget that because of the couple previous bad parts; logs and context — you’d be out ways to get a bearing for what’s wrong when something goes wrong — so for that, reading Airflow’s code is a good way to ground yourself with some more intuition.

Conclusion

This wasn’t an Airflow tutorial or a setup guide, there’s plenty of those already. It’s an attempt to hit the gray area where word-of-mouth is, and give you some good context for what to expect and how to prepare when you decide to adopt Airflow.

Airflow makes for a good match for data specific workflows because it understands time, its use case and how we build workflows, and makes a great tool built for data engineering.


Published by HackerNoon on 2017/10/12