Getting Started With Spark Structured Streaming With Dustin Vannoy | Current 2022

HostedbyConfluent 1,071 views 56 slides Oct 21, 2022
Slide 1
Slide 1 of 56
Slide 1
1
Slide 2
2
Slide 3
3
Slide 4
4
Slide 5
5
Slide 6
6
Slide 7
7
Slide 8
8
Slide 9
9
Slide 10
10
Slide 11
11
Slide 12
12
Slide 13
13
Slide 14
14
Slide 15
15
Slide 16
16
Slide 17
17
Slide 18
18
Slide 19
19
Slide 20
20
Slide 21
21
Slide 22
22
Slide 23
23
Slide 24
24
Slide 25
25
Slide 26
26
Slide 27
27
Slide 28
28
Slide 29
29
Slide 30
30
Slide 31
31
Slide 32
32
Slide 33
33
Slide 34
34
Slide 35
35
Slide 36
36
Slide 37
37
Slide 38
38
Slide 39
39
Slide 40
40
Slide 41
41
Slide 42
42
Slide 43
43
Slide 44
44
Slide 45
45
Slide 46
46
Slide 47
47
Slide 48
48
Slide 49
49
Slide 50
50
Slide 51
51
Slide 52
52
Slide 53
53
Slide 54
54
Slide 55
55
Slide 56
56

About This Presentation

Getting Started With Spark Structured Streaming With Dustin Vannoy | Current 2022

Many data pipelines still default to processing data nightly or hourly, but information is created all the time and should be available much sooner. While the move to stream processing adds complexity, Spark Structure...


Slide Content

Getting Started with
Spark Structured Streaming
Dustin Vannoy
dustinvannoy.com

Dustin
Consultant –Data Engineer
/in/dustinvannoy
youtube.com/DustinVannoy
dustinvannoy.com
Data Engineering SD meetup
Technologies
➢Azure & AWS
➢Apache Spark
➢Apache Kafka
➢Azure Synapse Analytics
➢Python & Scala
Vannoy
@dustinvannoy

Apache Spark
3

Why Spark?
Batch and Streaming
framework that has
large community
and wide adoption.
Spark is a leader in
data processing that
scales across many
machines. It is
relatively easy to get
started.

Yeah, but what isSpark?
Spark consists of a programming API and execution engine
Worker Worker Worker Worker
Master
from pyspark.sqlimport SparkSession
from pyspark.sql.functions import col
spark = SparkSession.builder.getOrCreate ()
song_df= spark.read\
.option('sep','\t') \
.option("inferSchema","true") \
.csv("/databricks-datasets/songs/data-001/part-0000*")
tempo_df= song_df.select(
col('_c4').alias('artist_name'),
col('_c14').alias('tempo'),
)
avg_tempo_df= tempo_df\
.groupBy('artist_name') \
.avg('tempo') \
.orderBy('avg(tempo)',ascending=False)
avg_tempo_df.show(truncate=False)

Simple code, parallel compute
Worker
Controller
WorkerWorker

What is Spark?
●Fast, general purpose engine for large-scale data processing
●Replaces MapReduce as Hadoop parallel programming API
●Many options:
○Yarn / Spark Cluster / Local
○Scala / Python / Java / R
○Spark Core / SQL / Streaming / MLLib / Graph

What is Spark Structured Streaming?
●Alternative to traditional Spark Streaming which used
DStreams
●If you are familiar with Spark, it is best to think of
Structured Streaming as Spark SQL API but for streaming
●Use import spark.sql.streaming

What is Spark Structured Streaming?
Tathagata Das “TD” -Lead Developer on Spark Streaming
●“Fast, fault-tolerant, exactly-once stateful stream
processing without having to reason about streaming"
●"The simplest way to perform streaming analytics is not
having to reason about streaming at all"
●A table that is constantly appended with each micro-batch
Reference: https://youtu.be/rl8dIzTpxrI

Lazy Evaluation with Query Optimization
●Transformations not run immediately
●Builds up a DAG
●Optimizes steps
●Runs when an Action is called

Why use Spark Structured Streaming?
●Run Spark as batch or streaming
●Open-source with many connectors
●Stateful streaming and joins
●Kafka is not required for every stream

Apache Spark Options
12

Unified Analytics Platform
Managed Apache Spark
Performance optimizations
Auto-scale and auto-terminate
AWS, Azure, GCP
Databricks

Amazon EMR
Azure Synapse Spark
Google Cloud Proc
Cloud Specific Options

Install on Mac or Linux
Use WSL on Windows
Docker Container
PySparkpackage
Local

Spark and storage together
Many containers per worker
Use what you have
Hadoop / YARN

On-premises or multi-cloud
Scale size with familiar tools
Use what you know
Kubernetes

Apache Spark Syntax
18

Spark Example -Read
df = spark.read
.format("kafka")
.options(**config)
.load()
df = spark.readStream
.format("kafka")
.options(**config)
.load()
Batch Streaming

Spark Example -Write
df.write
.format("kafka")

df.writeStream
.format("kafka")

Batch Streaming

Structured Streaming -Read
df = spark.readStream
.format("delta")
.options(**delta_config)
.load(path)

Structured Streaming -Transformations
new_df= df.withColumn("new_column",
concat("col1", "col2" ))

Structured Streaming -Transformations
new_df= df.selectExpr(
"*",
"concat(col1, '-',col2) as new_col",
"current_timestamp() as load_tm")

Structured Streaming -Actions
●writeStream.format(“console”).start()
●writeStream.format(“kafka”).start()
●writeStream.foreach(…).start()
●writeStream.foreachBatch(…).start()

Structured Streaming -Write
df.writeStream
.format("kafka")
.options(**producer_config)
.option("checkpointLocation","/tmp/cp001")
.start()

Structured Streaming –Output Mode
How to output data when triggered
●Append -Just keep adding newest records
●Update –Only those updated since last trigger
●Complete mode -Output latest state of table
(useful for aggregation results)

Example from docs (Structured Streaming Programming Guide)

Structured Streaming –Write with Output Mode
df.writeStream
.format("kafka")
.outputMode("complete")
.options(**producer_config)
.option("checkpointLocation","/tmp/cp001")
.start()

Structured Streaming -Triggers
Specify how often to check for a new data
●Default: micro-batch mode, as soon as possible
●Fixed: micro-batch mode, wait specified time
●Only once: just one micro-batch then job stops
●Continuous: continuously read from source,
experimental and only works for certain types of queries

Structured Streaming –Write with Trigger
df.writeStream
.format("kafka")
.trigger(processingTime="5 seconds")
.options(**producer_config)
.option("checkpointLocation","/tmp/cp001")
.start()

Structured Streaming -Windows
Calculate over time using windows -> Avg trips every 5 minutes
●Tumbling window: specific period of time, such as 1:00 to 1:30
●Sliding windows: fixed window, but can track multiple groups...
1:00 –1:30, 1:05 –1:35, 1:10 –1:40, etc
●Session windows: similar to an inactive session timeout…
group together until a long enough gap in time between events

Comparing windows (Structured Streaming Programming Guide)

Window Example
df2.groupBy(
col("VendorID"),
window(col("pickup_dt"), "10 min"))
.avg("trip_distance")

Streaming Joins
Stream-static
Simple join
Does not tracking state
Options: Inner, Left Outer,
Left Semi
Stream-stream
Complex join
Data may arrive early or
late
Options: Inner, Left/Right
Outer, Full Outer, Left Semi

Stream to Static DataFrameJoin
zdf= spark.read.format("delta").load(path)
trip_df= (
tdf.join(zdf,
tdf["PULocId"] == zdf["LocID"],
how="left")

Stream to Stream Join
trip_df= (
tdf.withWatermark("eventTime","1 minute")
.join(pmt.withWatermark("pTime", "1 hour"),
expr("""
LocId= PULocIdAND
zTime>= eventTimeAND
zTime<= eventTime+ interval 1 hour
"""),
how="leftOuter")

Stateful Streaming
Aggregations, dropDuplicates, and Joins will require storing state
●Custom stateful operations possible
●HDFS or RocksDBState Store (Spark 3.2)
●If retaining many keys, HDFSBackedStateStoreis likely to have JVM
memory issues

Example Structured
Streaming Application
38

Databricks Notebook -Setup

Databricks Notebook –Confluent Cloud Config

Databricks Notebook –Spark Schema

Databricks Notebook –Read Kafka Record

Databricks Notebook –Convert JSON

Databricks Notebook –Read Lookup Table

Databricks Notebook –Join and write

Databricks Notebook –Check destination

Monitoring
47

SQL UI

SQL UI -Detail

Structured Streaming UI

Structured Streaming UI

Notebook Streaming UI -Timeseries

Notebook Streaming UI –Query Progress

Cloud Logging Services

References
●https://spark.apache.org/docs/latest/structured-streaming-programming-
guide.html
●https://databricks.com/blog/2016/01/04/introducing-apache-spark-
datasets.html
●https://databricks.com/blog/2016/07/14/a-tale-of-three-apache-spark-
apis-rdds-dataframes-and-datasets.html

Website: dustinvannoy.com
Twitter: @dustinvannoy
YouTube: youtube.com/DustinVannoy
More Content
Thank you!