We love leveraging the power of Airflow with Kubernetes. Horizontally scalable dynamic data pipelines, who doesn’t want that? If you want to get started running Airflow on Kubernetes, containerizing your workloads, and using most out of both platforms then this post will show you how to do that in three different ways.
Code samples and configuration can be found here.
If you’d rather watch the accompanying video to this post, I did a Show ‘n Tell webinar on this topic previously which you can rewatch here.
Some prior knowledge of Airflow and Kubernetes is required.
The KubernetesPodOperator is an airflow builtin operator that you can use as a building block within your DAG’s.
A DAG stands for Acyclic Directed Graph and is basically your pipeline defitinion / workflow written in pure python. In your DAG you specify your pipeline steps as tasks using operators and define their flow (upstream and downstream dependecies). This DAG then gets scheduled by the Airflow scheduler and executed by the Executor.
There are many different operators available. From ones that can run your own python code to MySQL, azure, spark, cloud storage or serverless operators.
The KubernetesPodOperator enables you to run containerized workloads as pods on Kubernetes from your DAG.
It is perfect for when you want to use/re-use some existing containers in your ecosystem but want to schedule them from airflow and incorporate them into your workflow. This is by far the easiest way to get started running container workloads from Airflow on Kubernetes.
We want to ensure that we have Airflow running on our cluster. For this, we are using a simple deployment consisting of the Airflow webserver, scheduler/executor, and a separate PostgreSQL database deployment for the Airflow metadata DB.
airflow.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: airflow
namespace: airflow-k8spodoperator
spec:
replicas: 1
selector:
matchLabels:
name: airflow
template:
metadata:
labels:
name: airflow
spec:
automountServiceAccountToken: true
containers:
- args:
- webserver
- -p
- "8000"
env:
- name: AIRFLOW__CORE__SQL_ALCHEMY_CONN
value: postgresql://postgres:password@airflow-db:5432/postgres
- name: AIRFLOW__CORE__EXECUTOR
value: LocalExecutor
image: apache/airflow:1.10.12
imagePullPolicy: Always
volumeMounts:
- mountPath: /opt/airflow/logs/
mountPropagation: None
name: airflow-logs
- args:
- scheduler
env:
- name: AIRFLOW__CORE__SQL_ALCHEMY_CONN
value: postgresql://postgres:password@airflow-db:5432/postgres
- name: AIRFLOW__CORE__EXECUTOR
value: LocalExecutor
image: eu.gcr.io/fullstaq-st-tim/st-airflow:latest
imagePullPolicy: Always
name: airflow-scheduler
volumeMounts:
- mountPath: /opt/airflow/logs/
mountPropagation: None
name: airflow-logs
initContainers:
- args:
- initdb
env:
- name: AIRFLOW__CORE__SQL_ALCHEMY_CONN
value: postgresql://postgres:password@airflow-db:5432/postgres
- name: AIRFLOW__CORE__EXECUTOR
value: LocalExecutor
image: apache/airflow:1.10.12
imagePullPolicy: Always
I stripped out some of the configuration here like livenessProbe and readinessProbe to make it a bit more verbose but the full code sample can be found in the repository.
Airflow requires at least a web server and scheduler component. The init container is responsible for bootstrapping the database.
airflow-db.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: airflow-db
namespace: airflow-k8spodoperator
spec:
replicas: 1
selector:
matchLabels:
name: airflow-db
template:
metadata:
labels:
name: airflow-db
spec:
containers:
- env:
- name: POSTGRES_PASSWORD
value: password
image: postgres:9.6
imagePullPolicy: IfNotPresent
name: airflow-db
volumeMounts:
- mountPath: /var/lib/postgresql/data
mountPropagation: None
name: postgresql-data
restartPolicy: Always
schedulerName: default-scheduler
terminationGracePeriodSeconds: 30
volumes:
- emptyDir: {}
name: postgresql-data
A simple PostgreSQL database setup is required, with internal service to enable airflow to connect to it:
airflow-db-svc.yaml
apiVersion: v1
kind: Service
metadata:
name: airflow-db
namespace: airflow-k8spodoperator
spec:
clusterIP: None
ports:
- port: 5432
protocol: TCP
targetPort: 5432
selector:
name: airflow-db
sessionAffinity: None
type: ClusterIP
status:
loadBalancer: {}
We also need a service account and service account token available in our cluster to ensure the Operator can authenticate and is allowed to run Pods on the namespace we provide.
serviceaccount.yaml
apiVersion: v1
kind: ServiceAccount
metadata:
name: airflow-k8spodoperator
namespace: airflow-k8spodoperator
When these two prerequisites are met we can start running containerized workloads on Kubernetes from our DAG’s using the KubernetesPodOperator.
As an example take this dag below:
import logging
from datetime import datetime, timedelta
from pathlib import Path
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
log = logging.getLogger(__name__)
dag = DAG(
"example_using_k8s_pod_operator",
schedule_interval="0 1 * * *",
catchup=False,
default_args={
"owner": "admin",
"depends_on_past": False,
"start_date": datetime(2020, 8, 7),
"email_on_failure": False,
"email_on_retry": False,
"retries": 2,
"retry_delay": timedelta(seconds=30),
"sla": timedelta(hours=23),
},
)
with dag:
task_1 = KubernetesPodOperator(
image="ubuntu:16.04",
namespace="airflow-k8spodoperator",
cmds=["bash", "-cx"],
arguments=["echo", "10"],
labels={"foo": "bar"},
name="test-using-k8spodoperator-task-1",
task_id="task-1-echo",
is_delete_operator_pod=False,
in_cluster=True,
)
task_2 = KubernetesPodOperator(
image="ubuntu:16.04",
namespace="airflow-k8spodoperator",
cmds=["sleep"],
arguments=["300"],
labels={"foo": "bar"},
name="test-using-k8spodoperator-task-2",
task_id="task-2-sleep",
is_delete_operator_pod=False,
in_cluster=True,
)
task_1 >> task_2
Here we are using the KubernetesPodOperator to run a container as a pod from our DAG.
The KubernetesPodOperator has some required parameters like image
, namespace
, cmds
, name
, and task_id
but the full Kubernetes pod API is supported. We are also specifying to look for the in_cluster
authentication configuration (which uses our service account token) and to keep completed pods with is_delete_operator_pod
.
Since the full pod k8s API is supported we can supply anything our pod might need as arguments to the Operator:
secret_file = Secret('volume', '/etc/sql_conn', 'airflow-secrets', 'sql_alchemy_conn')
secret_env = Secret('env', 'SQL_CONN', 'airflow-secrets', 'sql_alchemy_conn')
secret_all_keys = Secret('env', None, 'airflow-secrets-2')
volume_mount = VolumeMount('test-volume',
mount_path='/root/mount_file',
sub_path=None,
read_only=True)
port = Port('http', 80)
configmaps = ['test-configmap-1', 'test-configmap-2']
volume_config= {
'persistentVolumeClaim':
{
'claimName': 'test-volume'
}
}
volume = Volume(name='test-volume', configs=volume_config)
k = KubernetesPodOperator(namespace='default',
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=["echo", "10"],
labels={"foo": "bar"},
secrets=[secret_file, secret_env, secret_all_keys],
ports=[port]
volumes=[volume],
volume_mounts=[volume_mount]
name="test",
task_id="task",
affinity=affinity,
is_delete_operator_pod=True,
hostnetwork=False,
tolerations=tolerations,
configmaps=configmaps
)
When this DAG is scheduled to run and executed by the Airflow executor our operator will create pods using the name
parameters to construct the pod name:
NAME READY STATUS RESTARTS AGE
airflow-56f875bb-dk6vq 2/2 Running 0 46h
airflow-db-57548fc4d-qvbgf 1/1 Running 0 47h
test-using-k8spodoperator-task-1-5a055bae 0/1 Completed 0 8s
test-using-k8spodoperator-task-1-60caa2f3 0/1 Completed 0 8s
The KubernetesPodOperator is by far the easiest way to get started running containerized workloads from Airflow on Kubernetes. Setup and management are minimal and since you can customize parameters and arguments per workload there is a high level of flexibility. The KubernetesPodOperator is useful when you already have some workloads as containers, maybe you have some custom java or go code which you want to include in your pipeline or you want to start transferring some container workloads to be managed by Airflow.
The downside of this aproach is that having highly customized containers with lots of dependencies will have to be translated into arguments that are passed to the Operator. This may take some research, trial and error to get right.
If you want to run any task in your DAG natively as a kubernetes pod you are better of:
The KubernetesExecutor is an abstraction layer that enables any task in your DAG to be run as a pod on your Kubernetes infrastructure. You configure this executor as part of your Airflow Deployment just like you would any other executor, albeit some additional configuration options are required.
For this example, we are utilizing again the simple Airflow deployment that we used for the KubernetesPodOperator with some additional configuration that is required by the KubernetesExecutor.
airflow.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: airflow
namespace: airflow-k8sexecutor
spec:
replicas: 1
selector:
matchLabels:
name: airflow
template:
metadata:
labels:
name: airflow
spec:
automountServiceAccountToken: true
containers:
- args:
- webserver
- -p
- "8000"
env:
- name: AIRFLOW__CORE__SQL_ALCHEMY_CONN
value: postgresql://postgres:password@airflow-db:5432/postgres
- name: AIRFLOW__CORE__EXECUTOR
value: KubernetesExecutor
- name: AIRFLOW__KUBERNETES__NAMESPACE
value: airflow-k8sexecutor
- name: AIRFLOW__KUBERNETES__WORKER_SERVICE_ACCOUNT_NAME
value: default
- name: AIRFLOW__KUBERNETES__IN_CLUSTER
value: 'true'
- name: AIRFLOW__KUBERNETES__DAG's_IN_IMAGE
value: 'true'
image: apache/airflow:1.10.12
imagePullPolicy: Always
volumeMounts:
- mountPath: /opt/airflow/logs/
mountPropagation: None
name: airflow-logs
- args:
- scheduler
env:
- name: AIRFLOW__CORE__SQL_ALCHEMY_CONN
value: postgresql://postgres:password@airflow-db:5432/postgres
- name: AIRFLOW__CORE__EXECUTOR
value: KubernetesExecutor
- name: AIRFLOW__KUBERNETES__NAMESPACE
value: airflow-k8sexecutor
- name: AIRFLOW__KUBERNETES__WORKER_SERVICE_ACCOUNT_NAME
value: default
- name: AIRFLOW__KUBERNETES__IN_CLUSTER
value: 'true'
- name: AIRFLOW__KUBERNETES__WORKER_CONTAINER_REPOSITORY
value: eu.gcr.io/fullstaq-st-tim/st-airflow-executor
- name: AIRFLOW__KUBERNETES__WORKER_CONTAINER_TAG
value: latest
- name: AIRFLOW__KUBERNETES__DAG's_IN_IMAGE
value: 'true'
- name: AIRFLOW__KUBERNETES__RUN_AS_USER
value: '50000'
image: apache/airflow:1.10.12
imagePullPolicy: Always
name: airflow-scheduler
volumeMounts:
- mountPath: /opt/airflow/logs/
mountPropagation: None
name: airflow-logs
initContainers:
- args:
- initdb
env:
- name: AIRFLOW__CORE__SQL_ALCHEMY_CONN
value: postgresql://postgres:password@airflow-db:5432/postgres
- name: AIRFLOW__CORE__EXECUTOR
value: KubernetesExecutor
- name: AIRFLOW__KUBERNETES__NAMESPACE
value: airflow-k8sexecutor
- name: AIRFLOW__KUBERNETES__WORKER_SERVICE_ACCOUNT_NAME
value: default
- name: AIRFLOW__KUBERNETES__IN_CLUSTER
value: 'true'
- name: AIRFLOW__KUBERNETES__WORKER_CONTAINER_REPOSITORY
value: apache/airflow
- name: AIRFLOW__KUBERNETES__WORKER_CONTAINER_TAG
value: 1.10.10
- name: AIRFLOW__KUBERNETES__DAG's_IN_IMAGE
value: 'true'
image: apache/airflow:1.10.12
imagePullPolicy: Always
Agains some configuration was omitted for brevity. Full code examples can be found here.
To enable the KubernetesExecutor we configure it in our deployment with the following additional configuration parameters:
We need to set:
The executor we want to use with Airflow to KubernetesExecutor
.
- name: AIRFLOW__CORE__EXECUTOR
value: KubernetesExecutor
The namespace where to run our worker pods.
- name: AIRFLOW__KUBERNETES__NAMESPACE
value: airflow-k8sexecutor
The kubernetes service account name to use for our workers
- name: AIRFLOW__KUBERNETES__WORKER_SERVICE_ACCOUNT_NAME
value: default
If our kubernetes authentication configuration is present in the cluster.
- name: AIRFLOW__KUBERNETES__IN_CLUSTER
value: 'true'
The container registry and container image name to use for our pod worker containers. And a separate environment variable for the tag to use. Since we are possibly going to be running any supplied Airflow operator as a task in a kubernetes pod we need to make sure that the dependencies for these operators are met in our worker image. Because of that, it is a good idea to use the Airflow docker image as your base.
- name: AIRFLOW__KUBERNETES__WORKER_CONTAINER_REPOSITORY
value: apache/airflow:1.10.12
- name: AIRFLOW__KUBERNETES__WORKER_CONTAINER_TAG
value: latest
Next, we need to supply how Airflow and Kubernetes have access to our dags. We can use git-sync, a shared volume, or bake the DAG’s into our Airflow images. In this case, we are doing the latter and tell Airflow to check for dags in our Image. When going this route you will have to bake your own Airflow image using the airflow base and adding a folder with your DAG’s. (Example dockerfiles can be found in the repository LINK)
- name: AIRFLOW__KUBERNETES__DAG's_IN_IMAGE
value: 'true'
Finally we tell Airflow to use the ‘airflow’ user to run as. This one is necessary without it the KubernetesExecutor will not be able to run your workloads.
- name: AIRFLOW__KUBERNETES__RUN_AS_USER
value: '50000'
With this configuration we setup Airflow to use the KubernetesExecutor. On to our DAG
import logging
import os
from datetime import datetime, timedelta
from pathlib import Path
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
log = logging.getLogger(__name__)
dag = DAG(
"example_using_k8s_executor",
schedule_interval="0 1 * * *",
catchup=False,
default_args={
"owner": "airflow",
"depends_on_past": False,
"start_date": datetime(2020, 8, 7),
"email_on_failure": False,
"email_on_retry": False,
"retries": 2,
"retry_delay": timedelta(seconds=30),
"sla": timedelta(hours=23),
},
)
def use_airflow_binary():
rc = os.system("airflow -h")
assert rc == 0
with dag:
task_1 = PythonOperator(
task_id="task-1",
python_callable=use_airflow_binary,
)
task_2 = PythonOperator(
task_id="task-2",
python_callable=use_airflow_binary,
)
task_1 >> task_2
This example shows a simple DAG with two tasks using the PythonOperator. What happens when we run it is that the kubernetes executor will watch the Airflow task queue and pick up any task in that queue and construct a KubernetesPod out of it. The executor then watches the pod for its status and syncs back the status to the scheduler which then knows if the task should be rescheduled, retried ,or that the next downstream task can be scheduled.
When we run this DAG our task will be run in our worker pod by the KubernetesExecutor and cleaned up after success or failure. If you do not want your worker pods to be cleaned up you can add the additional ENV var to your Airflow configuration AIRFLOW__KUBERNETS__IS_DELETE_WORKER_POD
and set it to false
.
NAME READY STATUS RESTARTS AGE
airflow-76c4f47d78-hw52g 2/2 Running 0 5d17h
airflow-db-6944c99c7c-fsmnz 1/1 Running 0 5d17h
exampleusingk8sexecutortask1-f1f0316e08bf49b3a4f97f10a12db542 0/1 ContainerCreating 0 2s
NAME READY STATUS RESTARTS AGE
airflow-76c4f47d78-hw52g 2/2 Running 0 5d17h
airflow-db-6944c99c7c-fsmnz 1/1 Running 0 5d17h
exampleusingk8sexecutortask1-f1f0316e08bf49b3a4f97f10a12db542 1/1 Running 0 5s
The KubernetesExecutor is great because you get the native elasticity of kubernetes together with all the good stuff from Airflow. It is no longer necessary to build your own containers with custom workloads as any task in your DAG is going to be run as a pod. There is some additional initial configuration required when setting up Airflow but the setup remains fairly easy to manage.
Do take into account that when you are going to be scheduling lots of tasks in parallel this could become very expensive for your cluster resources very quickly. Always keep an eye on this or set limits by using Airflow Pools (They can be set via ENV vars or via the UI).
Also, there is some startup and shutdown overhead every time a task gets spin up as a Pod. But depending on your expectations or requirements this is negligible.
And finally, by far my favorite:
Here we utilize a CNCF Sandbox project developed originally by The Microsoft Azure Functions team: KEDA
KEDA stands for Kubernetes Event Driven Autoscaler. It enables us to scale deployments on Kubernetes based on external events/metrics enabling scaling to and from zero. This together with Airflow’s CeleryExecutor brings us the best of both worlds. Kubernetes pedigree at running elastic scaling workloads and CerelyWorkers for low overhead continuous execution of Airflow DAG Tasks.
KEDA works by using a new custom resource definition (CRD) called ScaledObject
. The scaled object is the workhorse for scaling up and down deployments.
scaledobject.yaml
apiVersion: keda.k8s.io/v1alpha1
kind: ScaledObject
metadata:
name: airflow-worker
spec:
scaleTargetRef:
deploymentName: airflow-worker
pollingInterval: 10 # Optional. Default: 30 seconds
cooldownPeriod: 30 # Optional. Default: 300 seconds
maxReplicaCount: 10 # Optional. Default: 100
triggers:
- type: postgresql
metadata:
connection: AIRFLOW_CONN_AIRFLOW_DB
query: "SELECT ceil(COUNT(*)::decimal / 4) FROM task_instance WHERE state='running' OR state='queued'"
targetQueryValue: "1"
How it works is that the scaled object has a deployment target to scale in case a trigger is met. The trigger is called a Scaler and in this case, is a query executed on the Airflow PostgreSQL metadata database. The query is executed on interval and the result from the query determines if the deployment has to be scaled up or down. In this case, we are checking the number of Airflow task instances which have either a running
or queued
state.
If you want to get started with KEDA for scaling your deployments there are already multiple scalers available, like a scaler for Azure Blob Storage, one for scaling based on RabbitMQ messages, a MySQL scaler, or a scaler that scales based on Prometheus metrics.
For the full list check the documentation at keda.sh
For Airflow KEDA works in combination with the CeleryExecutor. Celery is a task queue implementation in python and together with KEDA it enables airflow to dynamically run tasks in celery workers in parallel. Scaling up and down CeleryWorkers as necessary based on queued or running tasks.
This has the advantage that the CeleryWorkers generally have less overhead in running tasks sequentially as there is no startup as with the KubernetesExecutor.
So you get the elasticity of Kubernetes, together with all the advantages Celery has to offer in terms of performance.
For this setup, we are utilizing the HELM chart developed by Astronomer to deploy Airflow + KEDA on Kubernetes.
I am referencing the official setup documentation from Astronomer Airflow, it can be found here.
helm repo add kedacore https://kedacore.github.io/charts
helm repo add astronomer https://helm.astronomer.io
helm repo update
kubectl create namespace keda
helm install keda \
--namespace keda kedacore/keda
kubectl create namespace airflow
helm install airflow \
--set executor=CeleryExecutor \
--set workers.keda.enabled=true \
--set workers.persistence.enabled=false \
--namespace airflow \
astronomer/airflow
After we have deployed Airflow with KEDA using the steps listed above we now have two namespaces.
namespace=airflow
NAME READY STATUS RESTARTS AGE
airflow-flower-5966c99975-7vh9r 1/1 Running 0 93s
airflow-postgresql-0 1/1 Running 0 92s
airflow-redis-0 1/1 Running 0 92s
airflow-scheduler-7f64b6cd67-lr95p 2/2 Running 0 93s
airflow-statsd-f7647597-9xdzv 1/1 Running 0 93s
airflow-webserver-74b794d767-rzp49 1/1 Running 0 93s
Since we are using Celery as our Executor we have some additional components to our Airflow deployment. Namely flower
, the management/monitoring UI for Celery, redis
for brokering messages to our CeleryWorkers, and a statsd
server which is included in the Helm chart for gathering Airflow metrics.
namespace=keda
NAME READY STATUS RESTARTS AGE
keda-operator-5d44c49879-wpknv 1/1 Running 0 3m16s
keda-operator-metrics-apiserver-86d8bbc4df-884cr 1/1 Running 0 2m49s
In our keda namespace, we have the keda-operator
which is responsible for monitoring our trigger (the PostgresSQL Airflow DB) and scaling our deployment accordingly and the
keda-operator-metrics-apiserver` which is responsible for serving metrics to the HorizontalPodAutoscaler. The native Kubernetes HorizontalPodAutoscaler is used to scale out our target CeleryWorker deployment.
KEDA is leveraging this already existing Autoscaler to scale out our deployment making KEDA a very lean component. Pretty cool.
Now take this DAG:
Here we are going to generate 20 tasks. The DAG wil run continuously and keep on generating new tasks for our CeleryWorkers to process.
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
def my_custom_function(ts,**kwargs):
"""
This can be any python code you want and is called from the python operator. The code is not executed until
the task is run by the airflow scheduler.
"""
print(f"I am task number {kwargs['task_number']}. This DAG Run execution date is {ts} and the current time is {datetime.now()}")
print('Here is the full DAG Run context. It is available because provide_context=True')
print(kwargs)
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
with DAG('example_dag_generated',
start_date=datetime(2019, 1, 1),
max_active_runs=3,
schedule_interval=timedelta(minutes=30),
default_args=default_args,
) as dag:
t0 = DummyOperator(
task_id='start'
)
# generate tasks with a loop. task_id must be unique
for task in range(20):
tn = PythonOperator(
task_id=f'python_print_date_{task}',
python_callable=my_custom_function,
op_kwargs={'task_number': task},
provide_context=True
)
t0 >> tn
When we run this DAG the following happens.
Airflow starts scheduling tasks, which will enter the queued state. Our KEDA operator is monitoring for running and queued tasks and will trigger because of the queued tasks.
kubectl -f logs -n keda keda-operator…
{"level":"info","ts":1603266764.262705,"logger":"scalehandler","msg":"Successfully updated deployment","ScaledObject.Namespace":"airflow","ScaledObject.Name":"airflow-worker","ScaledObject.ScaleType":"deployment","Deployment.Namespace":"airflow","Deployment.Name":"airflow-worker","Original Replicas Count":0,"New Replicas Count":1}
{"level":"info","ts":1603266771.0487137,"logger":"controller_scaledobject","msg":"Reconciling ScaledObject","Request.Namespace":"airflow","Request.Name":"airflow-worker"}
{"level":"info","ts":1603266771.04885,"logger":"controller_scaledobject","msg":"Detected ScaleType = Deployment","Request.Namespace":"airflow","Request.Name":"airflow-worker"}
Because it is triggered it will notify that the deployment target: airflow-worker
will have to be scaled.
airflow-worker-dc75d6597-xtlhq 0/1 PodInitializing 0 22s
The deployment is scaled and Celery will broker messages to this new CeleryWorker. The CeleryWorker will start processing tasks.
This process runs continuously and KEDA will ensure that when more tasks get scheduled more workers will get spin up.
airflow-worker-dc75d6597-8kvg5 0/1 Init:0/1 0 6s
airflow-worker-dc75d6597-glcd8 0/1 Init:0/1 0 6s
airflow-worker-dc75d6597-nxlwc 0/1 PodInitializing 0 6s
airflow-worker-dc75d6597-xtlhq 1/1 Running 0 81s
---
airflow-worker-dc75d6597-8kvg5 1/1 Running 0 84s
airflow-worker-dc75d6597-glcd8 1/1 Running 0 84s
airflow-worker-dc75d6597-nxlwc 1/1 Running 0 84s
airflow-worker-dc75d6597-xtlhq 1/1 Running 0 2m39s
Once there are no more tasks to be processed, KEDA will scale down our worker deployment to zero. kubectl -f logs -n keda keda-operator…
{"level":"info","ts":1603274700.8434458,"logger":"scalehandler","msg":"Successfully scaled deployment to 0 replicas","ScaledObject.Namespace":"airflow","ScaledObject.Name":"airflow-worker","ScaledObject.ScaleType":"deployment","Deployment.Namespace":"airflow","Deployment.Name":"airflow-worker"}
Running Airflow with KEDA brings the best of both worlds together. The elasticity of Kubernetes unlocked through KEDA in combination with the fast-follow, low overhead optimized CeleryExecutor and workers.
Setup is straightforward using the Helm chart from Astronomer although the added complexity of additional components will require a more intensive debugging flow when something does break somewhere down the line.
Altogether I am a big fan of running Airflow together with KEDA and am looking forward to a bright future for Airflow on Kubernetes in combination with KEDA.