Filter messages to a separate topic in real-time
28
Partition 0
Partition 1
Partition 2
Topic: Blue and Red Widgets
Partition 0
Partition 1
Partition 2
Topic: Blue Widgets Only
STREAM
PROCESSING
Filters
Easily merge and join topics to one another
30
Partition 0
Partition 1
Partition 2
Topic: Blue and Red Widgets
Partition 0
Partition 1
Partition 2
Topic: Green and Yellow Widgets
Partition 0
Partition 1
Partition 2
Topic: Blue and Yellow Widgets
STREAM
PROCESSING
Joins
31
Joins
CREATESTREAMenriched_readings AS
SELECT reading, area, brand_name,
FROM readings
INNER JOIN brands b
ON b.sensor = readings.sensor
EMIT CHANGES;
Aggregate streams into tables and capture
summary statistics
32
Partition 0
Partition 1
Partition 2
Topic: Blue and Red WidgetsTable: Widget Count
STREAM
PROCESSING
Widget ColorCount
Blue15
Red9
Aggregate
33
Aggregate CREATE TABLE avg_readings AS
SELECTsensor,
AVG(reading) AS location
FROMreadings
GROUP BYsensor
EMIT CHANGES;
Workshop
35
•Zoom과브라우저(Instructions, ksqlDB console 및Confluent
Control Center)로작업하게됩니다.
•질문이있는경우Zoom chat 기능을통해게시할수있습니다.
•막히더라도걱정하지마세요-Zoom에서"Raise hand" 버튼을
사용하면Confluent 엔지니어가도와드릴것입니다.
•그냥앞질러서복사하여붙여넣기하는것을피하십시오-대부분의
사람들은실제로콘솔에코드를입력할때더잘배웁니다. 그리고
실수로부터배울수있습니다.
•
교육진행하는방법
•Streams = INSERT only
Immutable, append-only
•Tables = INSERT, UPDATE, DELETE
Mutable, row key (event.key) identifies which
row
51
The key to mutability is … the event.key!
52
StreamTable
Has unique key constraint?NoYes
First event with key ‘alice’ arrivesINSERTINSERT
Another event with key ‘alice’ arrivesINSERTUPDATE
Event with key ‘alice’ and value == null arrivesINSERTDELETE
Event with key == null arrivesINSERT<ignored>
RDBMS analogy: A Stream is ~ a Table that has no unique key and is append-only.
Creating a table from a stream or topic
streams
Aggregating a stream (COUNT example)
streams
Aggregating a stream (COUNT example)
streams
KSQL for Data Exploration
An easy way to inspect your data in Kafka
SHOW TOPICS;
SELECTpage, user_id, status, bytes
FROMclickstream
WHEREuser_agentLIKE'Mozilla/5.0%';
PRINT 'my-topic'FROM BEGINNING;
56
KSQL for Data Transformation
Quickly make derivations of existing data in Kafka
CREATE STREAMclicks_by_user_id
WITH(PARTITIONS=6,
TIMESTAMP='view_time’
VALUE_FORMAT='JSON') AS
SELECT* FROMclickstream
PARTITIONBYuser_id;
Change number of partitions1
Convert data to JSON2
Repartition the data3
57
Hands on
4.3 ˃?Ǻۺ??ʿ
4.4 Query ȣ???+Ǽ
8 ?G?????S?+?ň????
KSQL for Real-Time, Streaming ETL
Filter, cleanse, process data while it is in motion
CREATE STREAMclicks_from_vip_users AS
SELECTuser_id, u.country, page, action
FROMclickstream c
LEFT JOINusers u ONc.user_id = u.user_id
WHEREu.level ='Platinum'; Pick only VIP users1
60
CDC —only after state
61
JSON 데이터는Debezium CDC를통해
MySQL에서가져오는정보를보여줍니다.
여기서"BEFORE" 데이터가없음을알수있습
니다(null임).
이것은레코드가업데이트없이방금생성되었음
을의미합니다. 새고객이처음추가된경우를예
로들수있습니다.
CDC —before and after
62
이제고객레코드에대한업데이트가있었기때
문에일부"BEFORE" 데이터가있습니다.
KSQL for Anomaly Detection
Aggregate data to identify patterns and anomalies in real-time
CREATE TABLEpossible_fraud AS
SELECTcard_number, COUNT(*)
FROMauthorization_attempts
WINDOW TUMBLING(SIZE30SECONDS)
GROUP BYcard_number
HAVING COUNT(*) > 3;
Aggregate data1
… per 30-sec windows2
63
KSQL for Real-Time Monitoring
Derive insights from events (IoT, sensors, etc.) and turn them into actions
CREATE TABLEfailing_vehicles AS
SELECTvehicle, COUNT(*)
FROMvehicle_monitoring_stream
WINDOW TUMBLING(SIZE1MINUTE)
WHEREevent_type = 'ERROR’
GROUP BYvehicle
HAVING COUNT(*) >= 5;Now we know to alert, and whom1
64
Confluent Control Center
C3 -Connector ??ǧ
ksqlDB -Cloud UI (1/2)
67
ksqlDB -Cloud UI (2/2)
68
Monitoring ksqlDB applications
Data flow (1/2)
69
Monitoring ksqlDB applications
Data flow (2/2)
70
ksqlDB Internals
Storage Layer
(Brokers)
Processing Layer
(ksqlDB, KStreams,
etc.)
Partitions play a central role in Kafka
72
Topics are partitioned. Partitions enable scalability,elasticity,fault-tolerance.
storedin
replicatedbased on
orderedbased on
partitionsData is
joinedbased on
read from andwritten to
processedbased on
Processing
Layer
(KSQL,
KStreams)
001001110111000000110010000110Topic
aliceParisbobSydneyaliceRomeStream
plus schema (serdes)
alice2
bob1Table
plus aggregation
Storage Layer
(Brokers)
Topics vs. Streams and Tables
73
Kafka Processing
Data is processed per-partition
...
...
...
...
P1
P2
P3
P4
StorageProcessing
read via
network
Topic App Instance 1Application
App Instance 2
‘payments’ with consumer group
‘my-app’
74
ksqlDB ?W?/?????[?Ճ???G?
81
Streaming ETLAnomaly detection
Real-time monitoring
and Analytics
Sensor data and IoTCustomer 360-view
https://docs.ksqldb.io/en/latest/#what-can-i-do-with-ksqldb
Example: Streaming ETL pipeline
82
* Full example here
•Apache Kafka is a popular choice for powering data pipelines
•ksqlDB makes it simple to transform data within the pipeline,
preparing the messages for consumption by another system.
Example: Anomaly detection
83
•Identify patterns and spot anomalies in real-time data with
millisecond latency, enabling you to properly surface out-of-the-
ordinary events and to handle fraudulent activities separately.
* Full example here
Free eBooks
Kafka: The Definitive Guide
Neha Narkhede, Gwen Shapira, Todd
Palino
Making Sense of Stream Processing
Martin Kleppmann
I ❤Logs
Jay Kreps
Designing Event-Driven Systems
Ben Stopford
http://cnfl.io/book-bundle
Confluent?S?W??χ???P?s?S???$ʐ
92
Confluent Blog
cnfl.io/blog
Confluent Cloud
cnfl.io/confluent-cloud
Community
cnfl.io/meetups
93
Max processing parallelism = #input partitions
...
...
...
...
P1
P2
P3
P4
Topic Application Instance 1
Application Instance 2
Application Instance 3
Application Instance 4
Application Instance 5 *** idle ***
Application Instance 6 *** idle ***
→ Need higher parallelism? Increase the original topic’s partition count.
→ Higher parallelism for just one usecase? Derive a new topic from the
original with higher partition count. Lower its retention to save storage.
94
How to increase # of partitions when needed
CREATE STREAM products_repartitioned
WITH (PARTITIONS=30) AS
SELECT * FROM products
95
KSQL example: statement below creates a new stream with the desired number of partitions.
‘Hot’ partitions is a problem, often caused by
Strategies to address hot partitions include
1a. Ingress: Find better partitioning function ƒ(event.key) for producers
1b. Storage: Re-partition data into new topic if you can’t change the original
2. Scale processing vertically, e.g. more powerful CPU instances
...
...
...
...
P1
P2
P3
P4
96
1. Events not evenly distributed across partitions
2. Events evenly distributed but certain events take longer to process
Joining Streams and Tables
Data must be ‘co-partitioned’
TableStream
Join Output
(Stream) 97
Joining Streams and Tables
Data must be ‘co-partitioned’
bobmale
alicefemale
alexmale
aliceParis
Table
P1
P2
P3
zoiefemale
andrewmale
minafemale
nataliefemale
blakemale
aliceParis
Stream
P2
(alice, Paris) from
stream’s P2 has a
matchingentry for
alice in the table’s P2.
female 98
Joining Streams and Tables
Data is looked up in same partition number
99
aliceParis alicemale
alicefemale
aliceParis
Stream Table
P2 P1
P2
P3
Here, key ‘alice’ exists in
multiple partitions.
But entry in P2
(female) is used
because the stream-
side event is from
stream’s partition P2.
female
Scenario 2
Joining Streams and Tables
Data is looked up in same partition number
100
aliceParis alicemale
aliceParis
Stream Table
P2 P1
P2
P3
Here, key ‘alice’ exists
only in the table’s P1 !=
P2.
null
no
match!
Scenario 3
Data co-partitioning requirements in detail
Further Reading on Joining Streams and Tables:
https://www.confluent.io/kafka-summit-sf18/zen-and-the-art-of-streaming-joins
https://docs.confluent.io/current/ksql/docs/developer-guide/partition-data.html
101
1. Same keying scheme for both input sides
2. Same number of partitions
3. Same partitioning function ƒ(event.key)
Why is that so?
Because of how input data is mapped to stream tasks
...
...
...
P1
P2
P3
storage
processingstate
Stream Task 2read via
network
Strea
m
Topic
...
...
...
P1
P2
P3
Table
Topic
from stream’s P2
from table’s P2
102
How to re-partition your data when needed
CREATE STREAM products_repartitioned
WITH (PARTITIONS=42) AS
SELECT * FROM products
PARTITION BY product_id;
103
KSQL example: statement below creates a new stream with changed number of partitions and a new field as
event.key (so that its data is now correctly co-partitioned for joining)