Airflow google kubernetes

We love leveraging the power of Airflow with Kubernetes. Horizontally scalable dynamic data pipelines, who doesn’t want that? If you want to get started running Airflow on Kubernetes, containerizing your workloads, and using most out of both platforms then this post will show you how to do that in three different ways.


Code samples and configuration can be found here.

If you’d rather watch the accompanying video to this post, I did a Show ‘n Tell webinar on this topic previously which you can rewatch here.


Some prior knowledge of Airflow and Kubernetes is required.

Using the KubernetesPodOperator

The KubernetesPodOperator is an airflow builtin operator that you can use as a building block within your DAG’s.

A DAG stands for Acyclic Directed Graph and is basically your pipeline defitinion / workflow written in pure python. In your DAG you specify your pipeline steps as tasks using operators and define their flow (upstream and downstream dependecies). This DAG then gets scheduled by the Airflow scheduler and executed by the Executor.

There are many different operators available. From ones that can run your own python code to MySQL, azure, spark, cloud storage or serverless operators.

The KubernetesPodOperator enables you to run containerized workloads as pods on Kubernetes from your DAG.

It is perfect for when you want to use/re-use some existing containers in your ecosystem but want to schedule them from airflow and incorporate them into your workflow. This is by far the easiest way to get started running container workloads from Airflow on Kubernetes.

Setting it up

We want to ensure that we have Airflow running on our cluster. For this, we are using a simple deployment consisting of the Airflow webserver, scheduler/executor, and a separate PostgreSQL database deployment for the Airflow metadata DB.

airflow.yaml

apiVersion: apps/v1
kind: Deployment
metadata:
  name: airflow
    namespace: airflow-k8spodoperator
spec:
  replicas: 1
  selector:
    matchLabels:
      name: airflow
  template:
    metadata:
      labels:
        name: airflow
    spec:
      automountServiceAccountToken: true
      containers:
      - args:
        - webserver
        - -p
        - "8000"
        env:
        - name: AIRFLOW__CORE__SQL_ALCHEMY_CONN
          value: postgresql://postgres:password@airflow-db:5432/postgres
        - name: AIRFLOW__CORE__EXECUTOR
          value: LocalExecutor
        image: apache/airflow:1.10.12
        imagePullPolicy: Always
        volumeMounts:
        - mountPath: /opt/airflow/logs/
          mountPropagation: None
          name: airflow-logs
      - args:
        - scheduler
        env:
        - name: AIRFLOW__CORE__SQL_ALCHEMY_CONN
          value: postgresql://postgres:password@airflow-db:5432/postgres
        - name: AIRFLOW__CORE__EXECUTOR
          value: LocalExecutor
        image: eu.gcr.io/fullstaq-st-tim/st-airflow:latest
        imagePullPolicy: Always
        name: airflow-scheduler
        volumeMounts:
        - mountPath: /opt/airflow/logs/
          mountPropagation: None
          name: airflow-logs
      initContainers:
      - args:
        - initdb
        env:
        - name: AIRFLOW__CORE__SQL_ALCHEMY_CONN
          value: postgresql://postgres:password@airflow-db:5432/postgres
        - name: AIRFLOW__CORE__EXECUTOR
          value: LocalExecutor
        image: apache/airflow:1.10.12
        imagePullPolicy: Always

I stripped out some of the configuration here like livenessProbe and readinessProbe to make it a bit more verbose but the full code sample can be found in the repository.

Airflow requires at least a web server and scheduler component. The init container is responsible for bootstrapping the database.

airflow-db.yaml

apiVersion: apps/v1
kind: Deployment
metadata:
  name: airflow-db
  namespace: airflow-k8spodoperator
spec:
  replicas: 1
  selector:
    matchLabels:
      name: airflow-db
  template:
    metadata:
      labels:
        name: airflow-db
    spec:
      containers:
      - env:
        - name: POSTGRES_PASSWORD
          value: password
        image: postgres:9.6
        imagePullPolicy: IfNotPresent
        name: airflow-db
        volumeMounts:
        - mountPath: /var/lib/postgresql/data
          mountPropagation: None
          name: postgresql-data
      restartPolicy: Always
      schedulerName: default-scheduler
      terminationGracePeriodSeconds: 30
      volumes:
      - emptyDir: {}
        name: postgresql-data

A simple PostgreSQL database setup is required, with internal service to enable airflow to connect to it:

airflow-db-svc.yaml

apiVersion: v1
kind: Service
metadata:
  name: airflow-db
  namespace: airflow-k8spodoperator
spec:
  clusterIP: None
  ports:
  - port: 5432
    protocol: TCP
    targetPort: 5432
  selector:
    name: airflow-db
  sessionAffinity: None
  type: ClusterIP
status:
  loadBalancer: {}

We also need a service account and service account token available in our cluster to ensure the Operator can authenticate and is allowed to run Pods on the namespace we provide.

serviceaccount.yaml

apiVersion: v1
kind: ServiceAccount
metadata:
  name: airflow-k8spodoperator
  namespace: airflow-k8spodoperator

When these two prerequisites are met we can start running containerized workloads on Kubernetes from our DAG’s using the KubernetesPodOperator.

As an example take this dag below:

import logging
from datetime import datetime, timedelta
from pathlib import Path

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator

log = logging.getLogger(__name__)

dag = DAG(
    "example_using_k8s_pod_operator",
    schedule_interval="0 1 * * *",
    catchup=False,
    default_args={
        "owner": "admin",
        "depends_on_past": False,
        "start_date": datetime(2020, 8, 7),
        "email_on_failure": False,
        "email_on_retry": False,
        "retries": 2,
        "retry_delay": timedelta(seconds=30),
        "sla": timedelta(hours=23),
    },
)

with dag:
    task_1 = KubernetesPodOperator(
        image="ubuntu:16.04",
        namespace="airflow-k8spodoperator",
        cmds=["bash", "-cx"],
        arguments=["echo", "10"],
        labels={"foo": "bar"},
        name="test-using-k8spodoperator-task-1",
        task_id="task-1-echo",
        is_delete_operator_pod=False,
        in_cluster=True,
    )
    task_2 = KubernetesPodOperator(
        image="ubuntu:16.04",
        namespace="airflow-k8spodoperator",
        cmds=["sleep"],
        arguments=["300"],
        labels={"foo": "bar"},
        name="test-using-k8spodoperator-task-2",
        task_id="task-2-sleep",
        is_delete_operator_pod=False,
        in_cluster=True,
    )

task_1 >> task_2

Here we are using the KubernetesPodOperator to run a container as a pod from our DAG.

The KubernetesPodOperator has some required parameters like imagenamespacecmdsname, and task_id but the full Kubernetes pod API is supported. We are also specifying to look for the in_cluster authentication configuration (which uses our service account token) and to keep completed pods with is_delete_operator_pod.

Since the full pod k8s API is supported we can supply anything our pod might need as arguments to the Operator:

secret_file = Secret('volume', '/etc/sql_conn', 'airflow-secrets', 'sql_alchemy_conn')
secret_env  = Secret('env', 'SQL_CONN', 'airflow-secrets', 'sql_alchemy_conn')
secret_all_keys  = Secret('env', None, 'airflow-secrets-2')
volume_mount = VolumeMount('test-volume',
                            mount_path='/root/mount_file',
                            sub_path=None,
                            read_only=True)
port = Port('http', 80)
configmaps = ['test-configmap-1', 'test-configmap-2']

volume_config= {
    'persistentVolumeClaim':
      {
        'claimName': 'test-volume'
      }
    }
volume = Volume(name='test-volume', configs=volume_config)

k = KubernetesPodOperator(namespace='default',
                          image="ubuntu:16.04",
                          cmds=["bash", "-cx"],
                          arguments=["echo", "10"],
                          labels={"foo": "bar"},
                          secrets=[secret_file, secret_env, secret_all_keys],
                          ports=[port]
                          volumes=[volume],
                          volume_mounts=[volume_mount]
                          name="test",
                          task_id="task",
                          affinity=affinity,
                          is_delete_operator_pod=True,
                          hostnetwork=False,
                          tolerations=tolerations,
                          configmaps=configmaps
                          )

When this DAG is scheduled to run and executed by the Airflow executor our operator will create pods using the name parameters to construct the pod name:

NAME                                        READY   STATUS      RESTARTS   AGE
airflow-56f875bb-dk6vq                      2/2     Running     0          46h
airflow-db-57548fc4d-qvbgf                  1/1     Running     0          47h
test-using-k8spodoperator-task-1-5a055bae   0/1     Completed   0          8s
test-using-k8spodoperator-task-1-60caa2f3   0/1     Completed   0          8s

The KubernetesPodOperator is by far the easiest way to get started running containerized workloads from Airflow on Kubernetes. Setup and management are minimal and since you can customize parameters and arguments per workload there is a high level of flexibility. The KubernetesPodOperator is useful when you already have some workloads as containers, maybe you have some custom java or go code which you want to include in your pipeline or you want to start transferring some container workloads to be managed by Airflow.

The downside of this aproach is that having highly customized containers with lots of dependencies will have to be translated into arguments that are passed to the Operator. This may take some research, trial and error to get right.


If you want to run any task in your DAG natively as a kubernetes pod you are better of:

Using the KubernetesExecutor

The KubernetesExecutor is an abstraction layer that enables any task in your DAG to be run as a pod on your Kubernetes infrastructure. You configure this executor as part of your Airflow Deployment just like you would any other executor, albeit some additional configuration options are required.

Setting it up

For this example, we are utilizing again the simple Airflow deployment that we used for the KubernetesPodOperator with some additional configuration that is required by the KubernetesExecutor.

airflow.yaml

apiVersion: apps/v1
kind: Deployment
metadata:
  name: airflow
  namespace: airflow-k8sexecutor
spec:
  replicas: 1
  selector:
    matchLabels:
      name: airflow
  template:
    metadata:
      labels:
        name: airflow
    spec:
      automountServiceAccountToken: true
      containers:
      - args:
        - webserver
        - -p
        - "8000"
        env:
        - name: AIRFLOW__CORE__SQL_ALCHEMY_CONN
          value: postgresql://postgres:password@airflow-db:5432/postgres
        - name: AIRFLOW__CORE__EXECUTOR
          value: KubernetesExecutor
        - name: AIRFLOW__KUBERNETES__NAMESPACE
          value: airflow-k8sexecutor
        - name: AIRFLOW__KUBERNETES__WORKER_SERVICE_ACCOUNT_NAME
          value: default
        - name: AIRFLOW__KUBERNETES__IN_CLUSTER
          value: 'true'
        - name: AIRFLOW__KUBERNETES__DAG's_IN_IMAGE
          value: 'true'
        image: apache/airflow:1.10.12
        imagePullPolicy: Always
        volumeMounts:
        - mountPath: /opt/airflow/logs/
          mountPropagation: None
          name: airflow-logs
      - args:
        - scheduler
        env:
        - name: AIRFLOW__CORE__SQL_ALCHEMY_CONN
          value: postgresql://postgres:password@airflow-db:5432/postgres
        - name: AIRFLOW__CORE__EXECUTOR
          value: KubernetesExecutor
        - name: AIRFLOW__KUBERNETES__NAMESPACE
          value: airflow-k8sexecutor
        - name: AIRFLOW__KUBERNETES__WORKER_SERVICE_ACCOUNT_NAME
          value: default
        - name: AIRFLOW__KUBERNETES__IN_CLUSTER
          value: 'true'
        - name: AIRFLOW__KUBERNETES__WORKER_CONTAINER_REPOSITORY
          value: eu.gcr.io/fullstaq-st-tim/st-airflow-executor
        - name: AIRFLOW__KUBERNETES__WORKER_CONTAINER_TAG
          value: latest
        - name: AIRFLOW__KUBERNETES__DAG's_IN_IMAGE
          value: 'true'
        - name: AIRFLOW__KUBERNETES__RUN_AS_USER
          value: '50000'
        image: apache/airflow:1.10.12
        imagePullPolicy: Always
        name: airflow-scheduler
        volumeMounts:
        - mountPath: /opt/airflow/logs/
          mountPropagation: None
          name: airflow-logs
      initContainers:
      - args:
        - initdb
        env:
        - name: AIRFLOW__CORE__SQL_ALCHEMY_CONN
          value: postgresql://postgres:password@airflow-db:5432/postgres
        - name: AIRFLOW__CORE__EXECUTOR
          value: KubernetesExecutor
        - name: AIRFLOW__KUBERNETES__NAMESPACE
          value: airflow-k8sexecutor
        - name: AIRFLOW__KUBERNETES__WORKER_SERVICE_ACCOUNT_NAME
          value: default
        - name: AIRFLOW__KUBERNETES__IN_CLUSTER
          value: 'true'
        - name: AIRFLOW__KUBERNETES__WORKER_CONTAINER_REPOSITORY
          value: apache/airflow
        - name: AIRFLOW__KUBERNETES__WORKER_CONTAINER_TAG
          value: 1.10.10
        - name: AIRFLOW__KUBERNETES__DAG's_IN_IMAGE
          value: 'true'
        image: apache/airflow:1.10.12
        imagePullPolicy: Always

Agains some configuration was omitted for brevity. Full code examples can be found here.

To enable the KubernetesExecutor we configure it in our deployment with the following additional configuration parameters:

We need to set:

The executor we want to use with Airflow to KubernetesExecutor.

        - name: AIRFLOW__CORE__EXECUTOR
          value: KubernetesExecutor

The namespace where to run our worker pods.

        - name: AIRFLOW__KUBERNETES__NAMESPACE
          value: airflow-k8sexecutor

The kubernetes service account name to use for our workers

        - name: AIRFLOW__KUBERNETES__WORKER_SERVICE_ACCOUNT_NAME
          value: default

If our kubernetes authentication configuration is present in the cluster.

        - name: AIRFLOW__KUBERNETES__IN_CLUSTER
          value: 'true'

The container registry and container image name to use for our pod worker containers. And a separate environment variable for the tag to use. Since we are possibly going to be running any supplied Airflow operator as a task in a kubernetes pod we need to make sure that the dependencies for these operators are met in our worker image. Because of that, it is a good idea to use the Airflow docker image as your base.

        - name: AIRFLOW__KUBERNETES__WORKER_CONTAINER_REPOSITORY
          value: apache/airflow:1.10.12
        - name: AIRFLOW__KUBERNETES__WORKER_CONTAINER_TAG
          value: latest

Next, we need to supply how Airflow and Kubernetes have access to our dags. We can use git-sync, a shared volume, or bake the DAG’s into our Airflow images. In this case, we are doing the latter and tell Airflow to check for dags in our Image. When going this route you will have to bake your own Airflow image using the airflow base and adding a folder with your DAG’s. (Example dockerfiles can be found in the repository LINK)

        - name: AIRFLOW__KUBERNETES__DAG's_IN_IMAGE
          value: 'true'

Finally we tell Airflow to use the ‘airflow’ user to run as. This one is necessary without it the KubernetesExecutor will not be able to run your workloads.

        - name: AIRFLOW__KUBERNETES__RUN_AS_USER
          value: '50000'

With this configuration we setup Airflow to use the KubernetesExecutor. On to our DAG

import logging
import os
from datetime import datetime, timedelta
from pathlib import Path

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

log = logging.getLogger(__name__)

dag = DAG(
    "example_using_k8s_executor",
    schedule_interval="0 1 * * *",
    catchup=False,
    default_args={
        "owner": "airflow",
        "depends_on_past": False,
        "start_date": datetime(2020, 8, 7),
        "email_on_failure": False,
        "email_on_retry": False,
        "retries": 2,
        "retry_delay": timedelta(seconds=30),
        "sla": timedelta(hours=23),
    },
)

def use_airflow_binary():
    rc = os.system("airflow -h")
    assert rc == 0

with dag:
    task_1 = PythonOperator(
        task_id="task-1",
        python_callable=use_airflow_binary,
    )
    task_2 = PythonOperator(
        task_id="task-2",
        python_callable=use_airflow_binary,
    )

task_1 >> task_2

This example shows a simple DAG with two tasks using the PythonOperator. What happens when we run it is that the kubernetes executor will watch the Airflow task queue and pick up any task in that queue and construct a KubernetesPod out of it. The executor then watches the pod for its status and syncs back the status to the scheduler which then knows if the task should be rescheduled, retried ,or that the next downstream task can be scheduled.

When we run this DAG our task will be run in our worker pod by the KubernetesExecutor and cleaned up after success or failure. If you do not want your worker pods to be cleaned up you can add the additional ENV var to your Airflow configuration AIRFLOW__KUBERNETS__IS_DELETE_WORKER_POD and set it to false.

NAME                                                            READY   STATUS              RESTARTS   AGE
airflow-76c4f47d78-hw52g                                        2/2     Running             0          5d17h
airflow-db-6944c99c7c-fsmnz                                     1/1     Running             0          5d17h
exampleusingk8sexecutortask1-f1f0316e08bf49b3a4f97f10a12db542   0/1     ContainerCreating   0          2s
NAME                                                            READY   STATUS    RESTARTS   AGE
airflow-76c4f47d78-hw52g                                        2/2     Running   0          5d17h
airflow-db-6944c99c7c-fsmnz                                     1/1     Running   0          5d17h
exampleusingk8sexecutortask1-f1f0316e08bf49b3a4f97f10a12db542   1/1     Running   0          5s

The KubernetesExecutor is great because you get the native elasticity of kubernetes together with all the good stuff from Airflow. It is no longer necessary to build your own containers with custom workloads as any task in your DAG is going to be run as a pod. There is some additional initial configuration required when setting up Airflow but the setup remains fairly easy to manage.

Do take into account that when you are going to be scheduling lots of tasks in parallel this could become very expensive for your cluster resources very quickly. Always keep an eye on this or set limits by using Airflow Pools (They can be set via ENV vars or via the UI).

Also, there is some startup and shutdown overhead every time a task gets spin up as a Pod. But depending on your expectations or requirements this is negligible.


And finally, by far my favorite:

Using KEDA with Airflow

Here we utilize a CNCF Sandbox project developed originally by The Microsoft Azure Functions team: KEDA

KEDA stands for Kubernetes Event Driven Autoscaler. It enables us to scale deployments on Kubernetes based on external events/metrics enabling scaling to and from zero. This together with Airflow’s CeleryExecutor brings us the best of both worlds. Kubernetes pedigree at running elastic scaling workloads and CerelyWorkers for low overhead continuous execution of Airflow DAG Tasks.

KEDA works by using a new custom resource definition (CRD) called ScaledObject. The scaled object is the workhorse for scaling up and down deployments.

scaledobject.yaml

apiVersion: keda.k8s.io/v1alpha1
kind: ScaledObject
metadata:
  name: airflow-worker
spec:
  scaleTargetRef:
    deploymentName: airflow-worker
  pollingInterval: 10   # Optional. Default: 30 seconds
  cooldownPeriod: 30    # Optional. Default: 300 seconds
  maxReplicaCount: 10   # Optional. Default: 100
  triggers:
    - type: postgresql
      metadata:
        connection: AIRFLOW_CONN_AIRFLOW_DB
        query: "SELECT ceil(COUNT(*)::decimal / 4) FROM task_instance WHERE state='running' OR state='queued'"
        targetQueryValue: "1"

How it works is that the scaled object has a deployment target to scale in case a trigger is met. The trigger is called a Scaler and in this case, is a query executed on the Airflow PostgreSQL metadata database. The query is executed on interval and the result from the query determines if the deployment has to be scaled up or down. In this case, we are checking the number of Airflow task instances which have either a running or queued state.

If you want to get started with KEDA for scaling your deployments there are already multiple scalers available, like a scaler for Azure Blob Storage, one for scaling based on RabbitMQ messages, a MySQL scaler, or a scaler that scales based on Prometheus metrics.

For the full list check the documentation at keda.sh

For Airflow KEDA works in combination with the CeleryExecutor. Celery is a task queue implementation in python and together with KEDA it enables airflow to dynamically run tasks in celery workers in parallel. Scaling up and down CeleryWorkers as necessary based on queued or running tasks.

This has the advantage that the CeleryWorkers generally have less overhead in running tasks sequentially as there is no startup as with the KubernetesExecutor.

So you get the elasticity of Kubernetes, together with all the advantages Celery has to offer in terms of performance.

Setting it up

For this setup, we are utilizing the HELM chart developed by Astronomer to deploy Airflow + KEDA on Kubernetes.

I am referencing the official setup documentation from Astronomer Airflow, it can be found here.

helm repo add kedacore https://kedacore.github.io/charts
helm repo add astronomer https://helm.astronomer.io

helm repo update

kubectl create namespace keda
helm install keda \
    --namespace keda kedacore/keda

kubectl create namespace airflow

helm install airflow \
    --set executor=CeleryExecutor \
    --set workers.keda.enabled=true \
    --set workers.persistence.enabled=false \
    --namespace airflow \
    astronomer/airflow

After we have deployed Airflow with KEDA using the steps listed above we now have two namespaces.

namespace=airflow

NAME                                 READY   STATUS    RESTARTS   AGE
airflow-flower-5966c99975-7vh9r      1/1     Running   0          93s
airflow-postgresql-0                 1/1     Running   0          92s
airflow-redis-0                      1/1     Running   0          92s
airflow-scheduler-7f64b6cd67-lr95p   2/2     Running   0          93s
airflow-statsd-f7647597-9xdzv        1/1     Running   0          93s
airflow-webserver-74b794d767-rzp49   1/1     Running   0          93s

Since we are using Celery as our Executor we have some additional components to our Airflow deployment. Namely flower, the management/monitoring UI for Celery, redis for brokering messages to our CeleryWorkers, and a statsd server which is included in the Helm chart for gathering Airflow metrics.

namespace=keda

NAME                                               READY   STATUS    RESTARTS   AGE
keda-operator-5d44c49879-wpknv                     1/1     Running   0          3m16s
keda-operator-metrics-apiserver-86d8bbc4df-884cr   1/1     Running   0          2m49s

In our keda namespace, we have the keda-operator which is responsible for monitoring our trigger (the PostgresSQL Airflow DB) and scaling our deployment accordingly and the keda-operator-metrics-apiserver` which is responsible for serving metrics to the HorizontalPodAutoscaler. The native Kubernetes HorizontalPodAutoscaler is used to scale out our target CeleryWorker deployment.

KEDA is leveraging this already existing Autoscaler to scale out our deployment making KEDA a very lean component. Pretty cool.

Now take this DAG:

Here we are going to generate 20 tasks. The DAG wil run continuously and keep on generating new tasks for our CeleryWorkers to process.

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta


def my_custom_function(ts,**kwargs):
    """
    This can be any python code you want and is called from the python operator. The code is not executed until
    the task is run by the airflow scheduler.
    """
    print(f"I am task number {kwargs['task_number']}. This DAG Run execution date is {ts} and the current time is {datetime.now()}")
    print('Here is the full DAG Run context. It is available because provide_context=True')
    print(kwargs)


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

with DAG('example_dag_generated',
         start_date=datetime(2019, 1, 1),
         max_active_runs=3,
         schedule_interval=timedelta(minutes=30),
         default_args=default_args,
         ) as dag:

    t0 = DummyOperator(
        task_id='start'
    )

    # generate tasks with a loop. task_id must be unique
    for task in range(20):
        tn = PythonOperator(
            task_id=f'python_print_date_{task}',
            python_callable=my_custom_function,
            op_kwargs={'task_number': task},
            provide_context=True
        )


        t0 >> tn

When we run this DAG the following happens.

Airflow starts scheduling tasks, which will enter the queued state. Our KEDA operator is monitoring for running and queued tasks and will trigger because of the queued tasks.

kubectl -f logs -n keda keda-operator…

{"level":"info","ts":1603266764.262705,"logger":"scalehandler","msg":"Successfully updated deployment","ScaledObject.Namespace":"airflow","ScaledObject.Name":"airflow-worker","ScaledObject.ScaleType":"deployment","Deployment.Namespace":"airflow","Deployment.Name":"airflow-worker","Original Replicas Count":0,"New Replicas Count":1}
{"level":"info","ts":1603266771.0487137,"logger":"controller_scaledobject","msg":"Reconciling ScaledObject","Request.Namespace":"airflow","Request.Name":"airflow-worker"}
{"level":"info","ts":1603266771.04885,"logger":"controller_scaledobject","msg":"Detected ScaleType = Deployment","Request.Namespace":"airflow","Request.Name":"airflow-worker"}

Because it is triggered it will notify that the deployment target: airflow-worker will have to be scaled.

airflow-worker-dc75d6597-xtlhq       0/1     PodInitializing   0          22s

The deployment is scaled and Celery will broker messages to this new CeleryWorker. The CeleryWorker will start processing tasks. 

CeleryWorker process: example of completes workers

This process runs continuously and KEDA will ensure that when more tasks get scheduled more workers will get spin up.

airflow-worker-dc75d6597-8kvg5       0/1     Init:0/1          0          6s
airflow-worker-dc75d6597-glcd8       0/1     Init:0/1          0          6s
airflow-worker-dc75d6597-nxlwc       0/1     PodInitializing   0          6s
airflow-worker-dc75d6597-xtlhq       1/1     Running           0          81s
---
airflow-worker-dc75d6597-8kvg5       1/1     Running   0          84s
airflow-worker-dc75d6597-glcd8       1/1     Running   0          84s
airflow-worker-dc75d6597-nxlwc       1/1     Running   0          84s
airflow-worker-dc75d6597-xtlhq       1/1     Running   0          2m39s

Once there are no more tasks to be processed, KEDA will scale down our worker deployment to zero. kubectl -f logs -n keda keda-operator…

{"level":"info","ts":1603274700.8434458,"logger":"scalehandler","msg":"Successfully scaled deployment to 0 replicas","ScaledObject.Namespace":"airflow","ScaledObject.Name":"airflow-worker","ScaledObject.ScaleType":"deployment","Deployment.Namespace":"airflow","Deployment.Name":"airflow-worker"}

Running Airflow with KEDA brings the best of both worlds together. The elasticity of Kubernetes unlocked through KEDA in combination with the fast-follow, low overhead optimized CeleryExecutor and workers.

Setup is straightforward using the Helm chart from Astronomer although the added complexity of additional components will require a more intensive debugging flow when something does break somewhere down the line.

Altogether I am a big fan of running Airflow together with KEDA and am looking forward to a bright future for Airflow on Kubernetes in combination with KEDA.

Tim van de Keer, Dev / DataOps and Data Engineering enthousiast. Tim rolled into a data engineering role from a DevOps background and has been building cloud-native data platforms for the past three years.

He's an open-minded and positive person who is always looking to share knowledge and challenge himself with new technologies and paradigms. He is always open to help and active in several open source communities.
May 07, 2024 | BLOG | 6 MINUTES

8 questions you were afraid to ask about Talos answerd

Talos is a minimal Kubernetes OS that's quickly gaining popularity because of its ease of use and strong focus on security by default. It has already been …

April 30, 2024 | BLOG | 9 MINUTES

12 Factor: 13 years later

How can we make applications easy to operate? The 12-factor methodology is about 13 years old. How did it age in the cloud-native era? Do we need a 13th …

April 25, 2024 | BLOG | 5 MINUTES

Build your own Python Kubernetes Operator

Yes, you read it right – build a K8s operator in Python! I often get reactions like, "But doesn't it have to be in Golang?" Fortunately, that's not …