Airflow: Save Tons of Money by Using Deferrable Operators

kaxil 909 views 30 slides Sep 14, 2022
Slide 1
Slide 1 of 30
Slide 1
1
Slide 2
2
Slide 3
3
Slide 4
4
Slide 5
5
Slide 6
6
Slide 7
7
Slide 8
8
Slide 9
9
Slide 10
10
Slide 11
11
Slide 12
12
Slide 13
13
Slide 14
14
Slide 15
15
Slide 16
16
Slide 17
17
Slide 18
18
Slide 19
19
Slide 20
20
Slide 21
21
Slide 22
22
Slide 23
23
Slide 24
24
Slide 25
25
Slide 26
26
Slide 27
27
Slide 28
28
Slide 29
29
Slide 30
30

About This Presentation

This talk is from Open Source Summit 2022

Apache Airflow 2.2 introduced the concept of Deferrable Tasks that uses Python's async feature.

All the Airflow sensors and poll-based operators can be hugely optimized to save tons of money by freeing up worker slots when polling.

This session will c...


Slide Content

Airflow: Deferrable
“Async” Operators
How you can save tons of money!!

Kaxil Naik
Open Source Summit 2022

Who am I?
●Committer & PMC Member of Apache Airflow
●Director of Airflow Engineering @ Astronomer


@kaxil

What is Apache Airflow?

A platform to programmatically author,
schedule, and monitor workflows

Example DAG

Why deferrable operators?
The problem around current operators & sensors


What are they?
And how do they work?


Available async operators
How to find & use the available async operators

Why deferrable operators?

Scheduler
Time
Scheduler
Worker

Scheduler
Time
Scheduler
Submit Spark Job
Typical Operator
Poll Spark cluster Job Completion
Worker

Scheduler
Time
Scheduler
Typical Sensor
Wait for files to arrive in S3 Arrived!
Worker

Scheduler
Time
Scheduler
What a waste!
Wait for files to arrive in S3 Arrived!
Worker
Submit Spark Job Poll Spark cluster Job Completion
Wasted resources
Wasted resources
Operator
Sensor

Time
Polling for Spark Job completion Done!
Multiple
Sensors
Wait for files to arrive in S3 Done!
Polling for Bigquery Job completion Done!
Polling for Spark Job completion Done!
Wait for files to arrive in S3 Done!
Wait for files to arrive in GCS Done!
Imagine the Cost!
Multiple worker slots
Blocked !!

What are deferrable operators?

Scheduler
Time
Scheduler
Worker
Triggerer
Worker

Scheduler
Time
Scheduler
Submit Spark Job
Async Operator
Free slot Job CompletionWorker
Poll Spark clusterTriggerer

Scheduler
Time
Scheduler
Task 1
Worker Slot (Sync vs Async)
Task 2Sync
Operator
Async
Operator
Task 1 Task 2 Task 3 Task 2Task 1 Task 4

Task runs on the Worker, then “defers” itself
Example: Submit an API call and stores job_id in DB


Runs on Triggerer
Async polling until the criteria is met, stores response in DB


Back to the Worker
To show response in the logs and set Task state

Scheduler
Time
Scheduler
Submit Spark Job
Async Operator
Free slot Job CompletionWorker
Poll Spark clusterTriggerer
Trigger - a
new concept

Trigger is different than Operator

Must be asynchronous & quick
So Triggerer can run thousands of them per CPU core


Should not persistent state
So we can shuffle them around between Triggerers as needed


Must support running multiple copies of itself
For reliability during network partitions

class DateTimeTrigger(BaseTrigger):

def __init__(self, moment: datetime.datetime):
super().__init__()
self.moment = moment

def serialize(self):
return ("mymodule.DateTimeTrigger" , {"moment": self.moment})

async def run(self):
while self.moment > timezone.utcnow():
await asyncio.sleep(1)
yield TriggerEvent(self.moment)

class WaitOneHourSensor(BaseSensorOperator):

def execute(self, context):
self.defer(
trigger=TimeDeltaTrigger(timedelta( hours=1)),
method_name="execute_complete",
)

def execute_complete(self, context, event=None):
# We have no more work to do here. Mark as complete.
return

Available Deferrable Operators

Core Airflow & Providers
https://github.com/apache/airflow/


Astronomer Providers
https://github.com/astronomer/astronomer-providers
50+ async operators

Astronomer Providers
https://github.com/astronomer/astronomer-providers

50+ async operators
Built and maintained with ❤ by Astronomer
Apache 2 licensed & Open-source
Drop-in replacements of “sync” Operators
Openlineage support

Astronomer Providers
https://github.com/astronomer/astronomer-providers


Async Operators available for:
●AWS, Google Cloud, Microsoft Azure
●Databricks, Snowflake
●Kubernetes, Apache Livy, Apache Hive
●HTTP, Filesystem

from astronomer.providers.amazon.aws.sensors.s3 import S3KeySensorAsync as S3KeySensor

waiting_for_s3_key = S3KeySensor(
task_id="waiting_for_s3_key",
bucket_key="sample_key.txt",
wildcard_match=False,
bucket_name="sample-bucket",
)

Caveats when building one

More cost benefits when polling takes longer time
We've seen over a 90% reduction in resources for a 10 min wait


Not everything can be deferred
It must be an external event/system with a unique identifier


Triggerer logs are not visible in the Webserver/UI
Only task logs from workers are displayed on UI

Thank You
@kaxil