Accelerating Data Ingestion with Databricks Autoloader
databricks
2,193 views
46 slides
Jun 14, 2021
Slide 1 of 46
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
About This Presentation
Tracking which incoming files have been processed has always required thought and design when implementing an ETL framework. The Autoloader feature of Databricks looks to simplify this, taking away the pain of file watching and queue management. However, there can also be a lot of nuance and complex...
Tracking which incoming files have been processed has always required thought and design when implementing an ETL framework. The Autoloader feature of Databricks looks to simplify this, taking away the pain of file watching and queue management. However, there can also be a lot of nuance and complexity in setting up Autoloader and managing the process of ingesting data using it. After implementing an automated data loading process in a major US CPMG, Simon has some lessons to share from the experience.
This session will run through the initial setup and configuration of Autoloader in a Microsoft Azure environment, looking at the components used and what is created behind the scenes. We’ll then look at some of the limitations of the feature, before walking through the process of overcoming these limitations. We will build out a practical example that tackles evolving schemas, applying transformations to your stream, extracting telemetry from the process and finally, how to merge the incoming data into a Delta table.
After this session you will be better equipped to use Autoloader in a data ingestion platform, simplifying your production workloads and accelerating the time to realise value in your data!
Size: 2.35 MB
Language: en
Added: Jun 14, 2021
Slides: 46 pages
Slide Content
Accelerating Data
Ingestion with
Databricks Autoloader
Simon Whiteley
Director of Engineering, Advancing Analytics
Agenda
▪Why Incremental is Hard
▪Autoloader Components
▪Implementation
▪Evolution
▪Lessons
Why Incremental is Hard
Incremental Ingestion
BRONZE SILVERLANDING
?
Incremental Ingestion
▪Only Read New Files
▪Don’t Miss Files
▪Trigger Immediately
▪Repeatable Pattern
▪Fast over large directories
Existing Patterns – 3) DIY
triggered batch read
Blob File
Trigger
Logic
App
Azure
Function
Databricks
Job API
Incremental Ingestion Approaches
Approach Good At Bad At
Metadata ETL Repeatable
Not immediate,
requires polling
File Streaming
Repeatable
Immediate
Slows down over
large directories
DIY Architecture
Immediate
Triggering
Not Repeatable
Databricks Autoloader
Prakash Chockalingam
Databricks Engineering Blog
Auto Loader is an optimized cloud file
source for Apache Spark that loads
data continuously and efficiently from
cloud storage as new data arrives.
What is Autoloader?
Essentially, Autoloader combines our three approaches of:
•Storing Metadata about what has been read
•Using Structured Streaming for immediate processing
•Utilising Cloud-Native Components to optimise identifying
arriving files
There are two parts to the Autoloader job:
•CloudFiles DataReader
•CloudNotification Services (optional)
Cloudfiles Reader
Blob Storage
Blob Storage Queue
{“fileAdded”:”/landing/file 4.json”
•File 1.json
•File 2.json
•File 3.json
•File 4.json
Dataframe
Check Files in
Queue
Read specific files
from source
CloudFiles DataReader
df = ( spark
.readStream
.format(“cloudfiles”)
.option(“cloudfiles.format”,”json”)
.option(“cloudfiles.useNotifications”,”true”)
.schema(mySchema)
.load(“/mnt/landing/”)
)
Tells Spark to use
Autoloader
Tells Autoloader to
expect JSON files
Should Autoloader use
the Notification Queue
Low Frequency Streams
Autoloader
One
File Per
Day
24/7
Cluster
Low Frequency Streams
Autoloader
One
File Per
Day
1/7
Cluster df
.writeStream
.trigger(once=True)
.save(path)
Autoloader can be combined with trigger.Once
– each run finds only files not processed since
last run
Delta Implementation
▪Batch ETL Pattern
▪Merge Statements
▪Logging State
Evolving Schemas
New Features since Databricks Runtime 8.2
What is Schema Evolution?
{“ID”:1,“ProductName”:“Belt”}
{“ID”:2,“ProductName”:“T-Shirt”,”Size”:”XL”}
{“ID”:3,“ProductName”:“Shirt”,“Size”:“14”,
“Care”:{ “DryClean”: “Yes”,
“MachineWash”:“Don’t you dare”
}
}
How do we handle Evolution?
1.Fail the Stream
2.Manually Intervene
3.Automatically Evolve
In order to manage schema evolution, we need to know:
•What the schema is expected to be
•What the schema is now
•How we want to handle any changes in schema
Schema Inference
In Databricks 8.2 Onwards – simply don’t provide a
Schema to enable Schema Inference. This infers the
schema once when the stream is started and stores it as
metadata.
cloudfiles
.schemaLocation – where to store the schema
.inferColumnTypes – sample data to infer types
.schemaHints – manually specify data types for certain columns
Schema Evolution - Rescue
1
2
3
ID Product Name _rescued_data
1 Belt
ID Product Name _rescued_data
2 T-Shirt {“Size”:”XL”}
ID Product Name _rescued_data
3 Shirt {“Size”:”14”,”Care”:{“DryC…
Autoloader Lessons
▪EventGrid Quotas &
Settings
▪Streaming Best
Practices
▪Batching Best Practices
EventGrid Quota Lessons
•You can have 500 files from a single storage account
using the system topic
•Deleting checkpoint will reset the stream ID and create
a new Subscription/Queue, leaving an orphan set
•Use the CloudNotification Libraries to manage this
more closely with custom topics
Streaming Optimisation
•MaxBytesPerTrigger / MaxFilesPerTrigger
Manages the size of the streaming microbatch
•FetchParallelism
Manages the workload on your queue
Batch Lessons – Look for Lost Messages
Default 7 days!
Databricks Autoloader
▪Reduces complexity of ingesting files
▪Has some quirks in implementing ETL processes
▪Growing number of schema evolution features