Scale Your Data Pipelines with Airflow and Kubernetes

Written by tal-peretz | Published 2020/03/15
Tech Story Tags: airflow | data-engineering | kubernetes | software-architecture | big-data | machine-learning | scaling | python

TLDR Tal Peretz is CTO & Chief Data Scientist @Supertools. He developed a workflow engine using Flask, Celery, and Kubernetes. Airflow is both scalable and cost-efficient. We use Git-Sync containers to update the workflows using git alone. We can destroy and re-deploy the entire infrastructure easily easily. Decoupling of orchestration and execution is a great advantage for Airflow. We are using a template template to set up scalable airflow workflows.via the TL;DR App

It doesn’t matter if you are running background tasks, preprocessing jobs or ML pipelines. Writing tasks is the easy part. The hard part is the orchestration— Managing dependencies among tasks, scheduling workflows and monitor their execution is tedious.
Enter Airflow. Your new workflow management platform.

Why Airflow?

A couple of years ago, In Scaling Effectively: when Kubernetes met Celery, I wrote about my own implementation of a workflow engine using Flask, Celery, and Kubernetes. I considered available solutions — including airflow. With no satisfying solution in sight, I decided to implement my own framework. Since then, airflow had come a long way. Here is why I switched to Airflow:
Scalable
When using the right setup, the one we are about to see, Airflow is both scalable and cost-efficient.
Batteries Included
Though the UI is not perfect, it is one of Airflow’s core competencies. And in this case, a picture is worth a thousand words-
Airflow has plenty of integrations both in the form of Operators and in the form of Executors.
And an experimental yet indispensable REST API for workflows, which implies you can trigger workflows dynamically.
Battle Tested
With so many companies using Airflow, I can rest assured knowing it is going to continuously improve.

The Perfect Setup for Airflow Has

🔥 Disposable Infrastructure
Using helm and some premade commands, we can destroy and re-deploy the entire infrastructure easily.
🚀 Cost-Efficient Execution
We use kubernetes as the tasks’ engine. Airflow scheduler will run each task on a new pod and delete it upon completion. Allowing us to scale according to workload using the minimal amount of resources.
🔩 Decoupled Orchestration
Another great advantage of using Kubernetes as the task runner is — decoupling orchestration from execution. You can read more about it in We’re All Using Airflow Wrong and How to Fix It.
🏃 Dynamically Updated Workflows
We use Git-Sync containers. Those will allow us to update the workflows using git alone. No need to redeploy Airflow on each workflow change.

Airflow Execution Options

CeleryExecutor + KubernetesPodOperator (recommended)
➕ Decoupling of orchestration and execution.
➖ Extra pods for celery workers redis and flower monitoring.
KubernetesExecutor + WhateverOperator
➕ No extra pods.
➖ Weak-Decoupling. we’ll have to define execution code and dependencies inside the DAGs.
KubernetesExecutor + KubernetesPodOperator
➕ No extra pods.
➕ Decoupling of orchestration and execution.
Unsupported — currently causes recursion of pod startup.

Let’s Set It Up

Prerequisites
> brew install kubectl
> brew install helm
Make sure you have: 
It is also recommended to set up Kubernetes Dashboard.
Setup
cookiecutter https://github.com/talperetz/scalable-airflow-template
To fill in the cookiecutter options check out scalable airflow template github repo.
make deploy
and voila 🎉

Tasks as Docker Images

I use docker images since then I can decouple airflow from the actual tasks it runs. I can change the underlying task without changing anything in airflow configuration, code or deployment.
When constructing the image I start with python-cli-template — which provides a fast and intuitive CLI experience.
Python CLI Template on Github
An Example Workflow
from datetime import datetime, timedelta

from airflow import DAG
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator

default_args = {
    "owner": "airflow",
    "depends_on_past": False,
    "start_date": datetime(2015, 6, 1),
    "email": ["airflow@airflow.com"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": timedelta(minutes=5),
}

example_workflow = DAG('kube-operator',
                         default_args=default_args,
                         schedule_interval=timedelta(days=1))
with example_workflow:
    t1 = KubernetesPodOperator(namespace='airflow',
                               image="ubuntu:16.04",
                               cmds=["bash", "-cx"],
                               arguments=["echo", "hello world"],
                               labels={'runner': 'airflow'},
                               name="pod1",
                               task_id='pod1',
                               is_delete_operator_pod=True,
                               hostnetwork=False,
                               )

    t2 = KubernetesPodOperator(namespace='airflow',
                               image="ubuntu:16.04",
                               cmds=["bash", "-cx"],
                               arguments=["echo", "hello world"],
                               labels={'runner': 'airflow'},
                               name="pod2",
                               task_id='pod2',
                               is_delete_operator_pod=True,
                               hostnetwork=False,
                               )

    t3 = KubernetesPodOperator(namespace='airflow',
                               image="ubuntu:16.04",
                               cmds=["bash", "-cx"],
                               arguments=["echo", "hello world"],
                               labels={'runner': 'airflow'},
                               name="pod3",
                               task_id='pod3',
                               is_delete_operator_pod=True,
                               hostnetwork=False,
                               )

    t4 = KubernetesPodOperator(namespace='airflow',
                               image="ubuntu:16.04",
                               cmds=["bash", "-cx"],
                               arguments=["echo", "hello world"],
                               labels={'runner': 'airflow'},
                               name="pod4",
                               task_id='pod4',
                               is_delete_operator_pod=True,
                               hostnetwork=False,
                               )


    t1 >> [t2, t3] >> t4

Advanced Airflow

If you enjoyed this post, feel free to share it 📤
and if you’re interested in posts to come, make sure to follow me on

Written by tal-peretz | CTO & Chief Data Scientist @Supertools
Published by HackerNoon on 2020/03/15