Getting Started with Spark Structured Streaming - Current 22

DustinVannoy 213 views 56 slides Oct 05, 2022
Slide 1
Slide 1 of 56
Slide 1
1
Slide 2
2
Slide 3
3
Slide 4
4
Slide 5
5
Slide 6
6
Slide 7
7
Slide 8
8
Slide 9
9
Slide 10
10
Slide 11
11
Slide 12
12
Slide 13
13
Slide 14
14
Slide 15
15
Slide 16
16
Slide 17
17
Slide 18
18
Slide 19
19
Slide 20
20
Slide 21
21
Slide 22
22
Slide 23
23
Slide 24
24
Slide 25
25
Slide 26
26
Slide 27
27
Slide 28
28
Slide 29
29
Slide 30
30
Slide 31
31
Slide 32
32
Slide 33
33
Slide 34
34
Slide 35
35
Slide 36
36
Slide 37
37
Slide 38
38
Slide 39
39
Slide 40
40
Slide 41
41
Slide 42
42
Slide 43
43
Slide 44
44
Slide 45
45
Slide 46
46
Slide 47
47
Slide 48
48
Slide 49
49
Slide 50
50
Slide 51
51
Slide 52
52
Slide 53
53
Slide 54
54
Slide 55
55
Slide 56
56

About This Presentation

Introduction to Spark Structured Streaming. Presented at Current 22 conference.


Slide Content

Getting Started with Spark Structured Streaming Dustin Vannoy dustinvannoy.com

Dustin Consultant – Data Engineer /in/ dustinvannoy youtube.com/DustinVannoy dustinvannoy.com Data Engineering SD meetup Technologies Azure & AWS Apache Spark Apache Kafka Azure Synapse Analytics Python & Scala Vannoy @dustinvannoy

Apache Spark 3

Why Spark? Batch and Streaming framework that has large community and wide adoption. Spark is a leader in data processing that scales across many machines. It is relatively easy to get started.

Yeah, but what is Spark? Spark consists of a programming API and execution engine Worker Worker Worker Worker Master from pyspark.sql import SparkSession from pyspark.sql.functions import col spark = SparkSession.builder.getOrCreate ()    song_df = spark.read \ .option(' sep ','\t') \ .option(" inferSchema ","true") \ .csv("/ databricks -datasets/songs/data-001/part-0000*")   tempo_df = song_df.select ( col('_c4').alias(' artist_name '), col('_c14').alias('tempo'), )   avg_tempo_df = tempo_df \ . groupBy (' artist_name ') \ .avg('tempo') \ . orderBy ('avg(tempo)',ascending=False)   avg_tempo_df.show (truncate=False)

Simple code, parallel compute Worker Controller Worker Worker

What is Spark? Fast, general purpose engine for large-scale data processing Replaces MapReduce as Hadoop parallel programming API Many options: Yarn / Spark Cluster / Local Scala / Python / Java / R Spark Core / SQL / Streaming / MLLib / Graph

What is Spark Structured Streaming? Alternative to traditional Spark Streaming which used DStreams If you are familiar with Spark, it is best to think of Structured Streaming as Spark SQL API but for streaming Use import spark.sql.streaming

What is Spark Structured Streaming? Tathagata Das “TD” - Lead Developer on Spark Streaming “Fast, fault-tolerant, exactly-once stateful stream processing without having to reason about streaming" "The simplest way to perform streaming analytics is not having to reason about streaming at all" A table that is constantly appended with each micro-batch Reference : https://youtu.be/rl8dIzTpxrI

Lazy Evaluation with Query Optimization Transformations not run immediately Builds up a DAG Optimizes steps Runs when an Action is called

Why use Spark Structured Streaming? Run Spark as batch or streaming Open-source with many connectors Stateful streaming and joins Kafka is not required for every stream

Apache Spark Options 12

Unified Analytics Platform Managed Apache Spark Performance optimizations Auto-scale and auto-terminate AWS, Azure, GCP Databricks

Amazon EMR Azure Synapse Spark Google Cloud Proc Cloud Specific Options

Install on Mac or Linux Use WSL on Windows Docker Container PySpark package Local

Spark and storage together Many containers per worker Use what you have Hadoop / YARN

On-premises or multi-cloud Scale size with familiar tools Use what you know Kubernetes

Apache Spark Syntax 18

Spark Example - Read df = spark. read .format(" kafka ") .options(**config) .load() df = spark. readStream .format(" kafka ") .options(**config) .load() Batch Streaming

Spark Example - Write df. write .format(" kafka ") … df. writeStream .format(" kafka ") … Batch Streaming

Structured Streaming - Read df = spark.readStream .format("delta") .options(** delta_config ) .load(path)

Structured Streaming - Transformations new_df = df.withColumn (" new_column ", concat ("col1", "col2" ))

Structured Streaming - Transformations new_df = df.selectExpr ( " * ", " concat (col1, '-',col2) as new_col ", " current_timestamp () as load_tm ")

Structured Streaming - Actions writeStream.format (“console”).start() writeStream.format (“ kafka ”).start() writeStream.foreach (…).start() writeStream.foreachBatch (…).start()

Structured Streaming - Write df.writeStream .format(" kafka ") .options(** producer_config ) .option(" checkpointLocation ","/ tmp /cp001") .start()

Structured Streaming – Output Mode How to output data when triggered Append - Just keep adding newest records Update – Only those updated since last trigger Complete mode - Output latest state of table (useful for aggregation results)

Example from docs (Structured Streaming Programming Guide)

Structured Streaming – Write with Output Mode df.writeStream .format(" kafka ") . outputMode ("complete") .options(** producer_config ) .option(" checkpointLocation ","/ tmp /cp001") .start()

Structured Streaming - Triggers Specify how often to check for a new data Default: micro-batch mode, as soon as possible Fixed: micro-batch mode, wait specified time Only once: just one micro-batch then job stops Continuous: continuously read from source, experimental and only works for certain types of queries

Structured Streaming – Write with Trigger df.writeStream .format(" kafka ") .trigger( processingTime ="5 seconds") .options(** producer_config ) .option(" checkpointLocation ","/ tmp /cp001") .start()

Structured Streaming - Windows Calculate over time using windows -> Avg trips every 5 minutes Tumbling window: specific period of time, such as 1:00 to 1:30 Sliding windows: fixed window, but can track multiple groups... 1:00 – 1:30, 1:05 – 1:35, 1:10 – 1:40, etc Session windows: similar to an inactive session timeout… group together until a long enough gap in time between events

Comparing windows (Structured Streaming Programming Guide)

Window Example df2.groupBy( col(" VendorID "), window(col(" pickup_dt "), "10 min")) .avg(" trip_distance ")

Streaming Joins Stream-static Simple join Does not tracking state Options: Inner, Left Outer, Left Semi Stream-stream Complex join Data may arrive early or late Options: Inner, Left/Right Outer, Full Outer, Left Semi

Stream to Static DataFrame Join zdf = spark.read.format ("delta").load(path) trip_df = ( tdf.join ( zdf , tdf [" PULocId "] == zdf [" LocID "] , how="left")

Stream to Stream Join trip_df = ( tdf.withWatermark ("eventTime","1 minute") .join( pmt.withWatermark (" pTime ", "1 hour"), expr(""" LocId = PULocId AND zTime >= eventTime AND zTime <= eventTime + interval 1 hour """), how=" leftOuter ")

Stateful Streaming Aggregations, dropDuplicates , and Joins will require storing state Custom stateful operations possible HDFS or RocksDB State Store (Spark 3.2) If retaining many keys, HDFSBackedStateStore is likely to have JVM memory issues

Example Structured Streaming Application 38

Databricks Notebook - Setup

Databricks Notebook – Confluent Cloud Config

Databricks Notebook – Spark Schema

Databricks Notebook – Read Kafka Record

Databricks Notebook – Convert JSON

Databricks Notebook – Read Lookup Table

Databricks Notebook – Join and write

Databricks Notebook – Check destination

Monitoring 47

SQL UI

SQL UI - Detail

Structured Streaming UI

Structured Streaming UI

Notebook Streaming UI - Timeseries

Notebook Streaming UI – Query Progress

Cloud Logging Services

Why Kafka? Streaming data directly from one system to another often problematic Kafka serves as the scalable broker, keeping up with producer and persisting data for all consumers

The Log “It is an append-only, totally-ordered sequence of records ordered by time.” - Jay Kreps Reference: The Log - Jay Kreps