Confluent Workshop Series: ksqlDB로 스트리밍 앱 빌드

ConfluentInc 695 views 79 slides Oct 25, 2021
Slide 1
Slide 1 of 103
Slide 1
1
Slide 2
2
Slide 3
3
Slide 4
4
Slide 5
5
Slide 6
6
Slide 7
7
Slide 8
8
Slide 9
9
Slide 10
10
Slide 11
11
Slide 12
12
Slide 13
13
Slide 14
14
Slide 15
15
Slide 16
16
Slide 17
17
Slide 18
18
Slide 19
19
Slide 20
20
Slide 21
21
Slide 22
22
Slide 23
23
Slide 24
24
Slide 25
25
Slide 26
26
Slide 27
27
Slide 28
28
Slide 29
29
Slide 30
30
Slide 31
31
Slide 32
32
Slide 33
33
Slide 34
34
Slide 35
35
Slide 36
36
Slide 37
37
Slide 38
38
Slide 39
39
Slide 40
40
Slide 41
41
Slide 42
42
Slide 43
43
Slide 44
44
Slide 45
45
Slide 46
46
Slide 47
47
Slide 48
48
Slide 49
49
Slide 50
50
Slide 51
51
Slide 52
52
Slide 53
53
Slide 54
54
Slide 55
55
Slide 56
56
Slide 57
57
Slide 58
58
Slide 59
59
Slide 60
60
Slide 61
61
Slide 62
62
Slide 63
63
Slide 64
64
Slide 65
65
Slide 66
66
Slide 67
67
Slide 68
68
Slide 69
69
Slide 70
70
Slide 71
71
Slide 72
72
Slide 73
73
Slide 74
74
Slide 75
75
Slide 76
76
Slide 77
77
Slide 78
78
Slide 79
79
Slide 80
80
Slide 81
81
Slide 82
82
Slide 83
83
Slide 84
84
Slide 85
85
Slide 86
86
Slide 87
87
Slide 88
88
Slide 89
89
Slide 90
90
Slide 91
91
Slide 92
92
Slide 93
93
Slide 94
94
Slide 95
95
Slide 96
96
Slide 97
97
Slide 98
98
Slide 99
99
Slide 100
100
Slide 101
101
Slide 102
102
Slide 103
103

About This Presentation

Confluent Workshop Series: ksqlDB로 스트리밍 앱 빌드


Slide Content

Workshop Series:
ksqlDB П?ǧ????g??
2021??10??20?w?ԏ?w??2??

׿?Ջ:
Jupil Hwang (?d????)
Hyunsoo Kim (?;???)
׿??׹?:
14:00 –17:00
2

Agenda —ksqlDB ???g??
33
01???g??χ?
02:00 -02:10 PM
05Lab: Hands on ??а
03:00 AM -05:00 PM
02Talk: Kafka, Kafka Streams ?ksqlDB χ?
02:10 -02:30 PM
03Lab: ?׾?ǧ??ԏʻgې??μ
02:30 -02:45 PM
04Lab: ??а?S???P?s?S?+
02:45 -03:00 PM

4
•Q&A
•궁금한점이있으시다면Q&A를통해질문보내주시기바랍니다. 발
표이후연사가직접답변전달할예정입니다.
•온라인설문조사
•금일워크샵에대한소중한의견보내주시기바랍니다. 향후알찬내
용을준비하는데참고하겠습니다.
•설문조사참여링크는(1) Zoom 채팅창통해확인, (2) 행사종료이
후웹브라우저통해자동참여
워크샵안내사항

Confluent Platform & Cloud:
???׹??Ϋ?oʟ?П?ǧ??Ŧ??

??߰???k?o?+?s???wϏ???S???S?3??
AppApp
DWH
Transactional
Databases
Analytics
Databases
Data Flow
DBDB
AppApp
MOM MOM
ETL
ETL
EAI / ESB
?3??
●???׹??oʟ?׻??o?????+?S׻?
●???;ړǧ???????W?Pՠ?o??ߺ???՜
?c
●????/?g??(Pub/sub) ??׹?????׻?
??(Point-to-Point) ?뺫
●?k?o?+׻ψ?,????o?+???{?l
●???׹??O??׻ړǧ˃??
App App

???Wԯ?k?o?+ԏ?gΧ?h?????W?W????ޟ?Hړ??ԏ
NoSQL DBsBig Data Analytics
??????7?Wט???S???
Ϋ??۾E????oʻ?O?S
???k?o?+?S?3??????k
?o?+?K???ӯö??????
?[?Ճ???G?
AppApp
DWH
Transactional
Databases
Analytics
Databases
Data Flow
DBDB
AppApp
MOM MOM
ETL
ETL
EAI / ESB
App App

?o???@: ???׹??k?o?+ړǧ?w???WП?ǧ??Ŧ??
스트리밍플랫폼은조직
의모든사람과시스템에
게데이터에대한단일
정보소스(single source
of truth)를제공한다.
NoSQL DBsBig Data Analytics
AppApp
DWH
Transactional
Databases
Analytics
Databases
Data Flow
DBDB
AppAppApp App
Streaming Platform

???׹??k?o?+ړǧ?w
???W???Wԯ?#?????
80% +
Fortune 100 ?+???o??
ƫ?S??ΧԤ??s
Apache Kafka
Confluent ?ǸՋ???o
LinkedIn?????
•?k?o?+П?Ƿ?????W?W???Ϋ?oʟ?П?ǧ??Ŧ??
•?k?o?+Producer?;Consumer?w?ǧ(Decouple)
•?o?+ր?S??????W???O???????W?k?o?+߰?????

Event Streaming Platform ?S???K
??Ԥ?o???o?ߺ?, ՠ?[?????x?S??, ??՜?Wμ?3?뺫μ?3?ǧ???, ޿? ???o?k????O?μ?3?뺫
Core LoansCredit CardsPatient
Lending
?k?o?+??ޟ?Hړ???????S????՜?S?k??Ԥ?o???o?߼+?G?3???׻?3ʻ?S?o??/ӯ?0????
??????Ԥ?o???o????
??߰??Data ?뺿?$ʐ:
...Device
Logs... ......
Data StoresLogs3rd Party
Apps
Custom Apps /
Microservices
Real-time
Inventory
Real-time
Fraud
Detection
Real-time
Customer
360
Machine
Learning
Models
Real-time
Data
Transformat
ion
...
Data in Motion Applications
Data-in-Motion Pipeline
Amazon
S3
SaaS
apps
???WԯData in Motion ?$ʐ:
????W??ޟ?Hړ???????, ?Ϋ?, ???׹????L??o??. ??Ԥ?????S?????????׻
?3???ۺ????Wԯ??׃??Пȣ?s????ψ?O?W??

???׹?Streaming Platform : ?k?o?+??׻ψ???7?W׿?O?S??
????߰?d???
??{?WΧԤՋ
????
???׹??oʟ?
???׹??oʟ?П?Ƿ???
A SaleA shipment
A TradeA Customer
Experience
???׹??k?o?+
?+???׃??П
11…and more???׹??k?o?+ړǧ?+????ȣ????П?W???P????
Ԥ
??׃??Пԏ?o???ٺ??k?o?+
ړǧ

12
Event Stream Processing
???׹??7?W??ӯ????oʟ?П?Ƿ??o?;???#??k?o?+?w???S??, ?o???oʟ??????׹??7?W???L?S?+???W
?+?Հ????

What’s stream processing good for?
13
Materialized Cache
?׿???7?W???k?o?????k?o?+?S
μ????ՠview?w?G???S??
???
Streaming ETL Pipeline
??Ϝ??s?oʟ?П?Ƿ???kՌ?S??,
?o?w??S?SSource?;Sink?w?뺫
?S??ΧԤ
Event-Driven
Microservice
?oʟ?П?Ƿ??????+??oʟ??#
?/???+??7?Wʻ?????ǧ?k

Confluent Platform Conceptual Architecture
14
OSS
Apache Kafka
Data
Sink
POJO /
MicroServices
Data
Sink
OSSApacheKafka®?????WMessaging?o??DataIntegration/ETL?????WΧԤ?ӯ?Gа????.
POJO /
MicroServices
Streams
Apps
Source
Connector
Data
Source Sink
Connector
ksqlDBSchema
Registry

Confluent Platform Conceptual Architecture
15
Confluent Platform
(Apache Kafka)
Enterprise
Security
ksqlDB
Replicator
Machine
Learning
Data
Sink
Data
Source
Schema
Registry
Control
Center
Source
Connector
Sink
Connector
Micro
Services
Mobile
Devices
Car/IoT
MQTT
Proxy
REST
Proxy
Sensor
Data
Sink
Confluent Platform
(Apache Kafka)
ConfluentPlatform?SKafkaCluster?w????7?WConnect,Replicator,ksqlDB?REST/MQTTProxy?w?뺫?S??ΧԤ?d????.
Streams
Apps

Confluent?????Wʇ?oʟ?П?ǧ?????+??
Hall of Innovation
CTO Innovation
Award Winner
2019
Enterprise Technology
Innovation
AWARDS
Vision
●Kafka?S?8??Ջ???Ǹ
●Event streaming ????+
??
Category Leadership
●Kafka commits 80% ?oμ
●1?,???׹??oμ?SKafka ?+
?׻?˺???ې??
●5000??oμ?SKafka ?o??
П?+ԯ???s?Sԫ
Value
●Risk ??k
●????ȣ?W?+??g?,
●TCO ??
●Time-to-market ??ψ?O
Product
●Kafka?w?C???W?+??Ԥ??
?7?W?Pՠ
●??S?Software ?
Cloud-Native Service ?
??
16

Confluent??Enterprise Apache Kafka?w????d????
17
???S??ՠχ?;?$???7?W?+???[?Ճ
??????SՋ?-cloud, on-
prem, hybrid, or multi-cloud
??ԏ?k?o?+χП, ???g??l
Χ?o?S?뺫–Connect
?$???Wӌ?S?k?o?+?w??ՠ?S??ړǧ
?S???X޿? ???7?W?Pՠ
Stream processing
application???gې?S????ՠ?gԯ
?$ʐ–KStreams, ksqlDB
????W?oʟ??w???3?S????׃??П
???W?3П??l
???????׻
??Ԥ???W???W??ۏ?o?
ԯ????#?,?????

Confluent?????+???w?o׃?oʟ?П?ǧ??Ŧ????????d????
18
????SՋ?
?????+????S???3?,
Open Source | Community licensed
Fully Managed Cloud ServiceSelf-managed Software
TrainingPartnersEnterprise
Support
Professional
Services
ARCHITECTOPERATORDEVELOPER EXECUTIVE
Confluent Platform
?????s?,???????,
Self-Balancing Clusters | Tiered Storage
????WDevOps Ջ???O
Operator | Ansible
GUI-?+???ǧ?ȣ???+Ǽ
Control Center | Proactive Support
?oʟ?П?ǧ??k?o?+ʛ?oП
ksqlDB
??{?WPre-built ???O??П?W
Connectors | Hub| Schema Registry
??ӌ?Wӳӯ?w߰?W??
Non-Java Clients| REST Proxy
Admin REST APIs
???Wʇ޿? ?,
Multi-Region Clusters | Replicator
Cluster Linking
?k?o?+?3?S?,
Schema Registry| Schema Validation
???+???w?o׃????C
RBAC | Secrets | Audit Logs
TCO / ROI
Revenue / Cost / Risk Impact
Complete Engagement Model
????ȣԯ????
??#???sԯ???+??
??Ջ??Ϋ?,
?????O
ԯ???S????
??????s?+????׃??П?,???????W?೿??

Apache Kafka?{?
19
Kafka ??distributed commit log
•?oʟ?П?Ƿ??Publish?S??Subscribe?d????.
•?Pՠ?,?o??ӯ????ړǧ???o??а????.
•Transaction??׻???S???k?o?+?w?C???S??????d????.
12345678
Append-only
writes
Reads are a single
seek and scan
AppAppApp
Producers
AppAppApp
Consumers
Kafka
Cluster

Kafka Connect ?;Kafka Streams ?{?
Kafka Streams API
•???׹??7?W?k?o?+?wړǧ?S???W??Java ?[?
ǧ???o?S????o?g?W???П??
•Producer/ Consumer APIs ?+?
Kafka Connect API
•???s??П?W??Kafka?S?C????o???Pՠ??
???W߰?d?$ʐ
•?O?$?o??ԏ???G
Orders
Customers
STREAM
PROCESSING
KStreams / KTable

Multi-Language
Development
Confluent???O??{?W??Ջ???O??П?W??????d????
?) Connector?w???W???3????׻??
21
200+ Pre-Built
Connectors
Event Stream
Processing
ksqlDB
/ KStream

Stream Processing by Analogy
Kafka Cluster
Connect APIStream ProcessingConnect API
$ cat <in.txt | grep“ksql” |tra-z A-Z >out.txt

Confluent?wΧԤ?W3??׻П?Ƿړǧ?$??
23
Kafka ClientsKafka StreamsksqlDB
ConsumerRecords<String, String>records =
consumer.poll(100);
Map<String, Integer>counts =newDefaultMap<String,
Integer>();
for(ConsumerRecord<String, Integer>record :records) {
String key =record.key();
intc =counts.get(key)
c +=record.value()
counts.put(key, c)
}
for(Map.Entry<String, Integer>entry :counts.entrySet()) {
intstateCount;
intattempts;
while(attempts++<MAX_RETRIES) {
try{
stateCount =stateStore.getValue(entry.getKey())
stateStore.setValue(entry.getKey(), entry.getValue() +
stateCount)
break;
} catch(StateStoreException e) {
RetryUtils.backoff(attempts);
}
}
}
builder
.stream("input-stream",
Consumed.with(Serdes.String(), Serdes.String()))
.groupBy((key, value) -> value)
.count()
.toStream()
.to("counts", Produced.with(Serdes.String(), Serdes.Long()));
SELECTx, count(*) FROMstream GROUP BYx EMIT
CHANGES;

subscribe(), poll(), send(),
flush(), beginTransaction(), …
KStream, KTable, filter(), map(),
flatMap(), join(), aggregate(),
transform(), …
CREATE STREAM, CREATE TABLE,
SELECT, JOIN, GROUP BY, SUM, …
Stream Processing
KSQL UDFs
24

П?Ƿ???W?3????ޟ?Hړ????O
25
?+?o????П?Ƿ???W?3???????o?3-5 ??S?Ϋ??П?W???gې, ߰?dʺ?ǧ?o?w???а????
DB
CONNECTOR
CONNECTOR
APP
APP
DB
STREAM
PROCESSING
CONNECTORAPPDB
2
3
4
1

П?Ƿ???W?3????ޟ?Hړ????O
26
ksqlDB???oʟ?ܜړ, П?Ƿړǧ, Push ?Pull ??ǧ????????W?S???SϏ???S??????d????
DB
APP
APP
DB
PULL
PUSH
CONNECTORS
STREAM
PROCESSING
STATE STORES
ksqlDB
1 2
APP

Serve lookups against
materialized views
Create
materialized views
Perform continuous
transformations
Capture data
CREATE STREAMpurchases AS
SELECTviewtime, userid,pageid, TIMESTAMPTOSTRING(viewtime, 'yyyy-MM-dd')
FROMpageviews;
CREATE TABLEorders_by_country AS
SELECTcountry, COUNT(*) ASorder_count, SUM(order_total) ASorder_total
FROMpurchases
WINDOW TUMBLING(SIZE5 MINUTES)
LEFT JOINuser_profiles ONpurchases.customer_id = user_profiles.customer_id
GROUP BYcountry
EMIT CHANGES;
SELECT* FROMorders_by_country WHEREcountry='usa';
CREATE SOURCE CONNECTORjdbcConnector WITH(
‘connector.class’ = '...JdbcSourceConnector',
‘connection.url’ = '...',
…);
Connector ???,
Stream ???,
Table ???,
Query ???
SQL?W?????WП?ǧ??l?G??

Filter messages to a separate topic in real-time
28
Partition 0
Partition 1
Partition 2
Topic: Blue and Red Widgets
Partition 0
Partition 1
Partition 2
Topic: Blue Widgets Only
STREAM
PROCESSING
Filters

29
Filters CREATE STREAM high_readings AS
SELECTsensor,
reading,
FROMreadings
WHEREreading > 41
EMIT CHANGES;

Easily merge and join topics to one another
30
Partition 0
Partition 1
Partition 2
Topic: Blue and Red Widgets
Partition 0
Partition 1
Partition 2
Topic: Green and Yellow Widgets
Partition 0
Partition 1
Partition 2
Topic: Blue and Yellow Widgets
STREAM
PROCESSING
Joins

31
Joins
CREATESTREAMenriched_readings AS
SELECT reading, area, brand_name,
FROM readings
INNER JOIN brands b
ON b.sensor = readings.sensor
EMIT CHANGES;

Aggregate streams into tables and capture
summary statistics
32
Partition 0
Partition 1
Partition 2
Topic: Blue and Red WidgetsTable: Widget Count
STREAM
PROCESSING
Widget ColorCount
Blue15
Red9
Aggregate

33
Aggregate CREATE TABLE avg_readings AS
SELECTsensor,
AVG(reading) AS location
FROMreadings
GROUP BYsensor
EMIT CHANGES;

Workshop

35
•Zoom과브라우저(Instructions, ksqlDB console 및Confluent
Control Center)로작업하게됩니다.
•질문이있는경우Zoom chat 기능을통해게시할수있습니다.
•막히더라도걱정하지마세요-Zoom에서"Raise hand" 버튼을
사용하면Confluent 엔지니어가도와드릴것입니다.
•그냥앞질러서복사하여붙여넣기하는것을피하십시오-대부분의
사람들은실제로콘솔에코드를입력할때더잘배웁니다. 그리고
실수로부터배울수있습니다.

교육진행하는방법

?׾?ǧ?


37
•?ۺ?k?o?+ʛ?oП??Ճ???h??Χ??Χ?o?
•?ۺ?k?o?+ʛ?oП???_????ʣ????ՠ?d????
•????;??Χ?o????ȣ??w?l???ǧ???wՌ?,?S?????П?w?Ĺ??[?Ճа????
•ǧ????ǧ??Ԥ???o?g?W???П??submit?#
•id?wΧԤ?S??ǧ??????3?k??ۺ???-ǧ????????x??ۺ???
?h??Χ/???h?;ΧԤՋǧ???w???׹??7?Wړǧ?S??
??׃??П???S?w?????S??Ջ?d????.

Use Case -????Sڨ????
38
•?w?{ǧ?????????h?Oՠ???Sڨ??????ӳ??ӯՃа????. ?o?????h/?h??Χ?S?ۺ??????????????S?k, ?ԏ?W
?k?o?+?W??ՠ?d????.
•9/12/19 12:55:05 GMT, 5313, {
"rating_id": 5313,
"user_id": 3,
"stars": 1,
"route_id": 6975,
"rating_time": 1519304105213,
"channel": "web",
"message": "why is it so difficult to keep the bathrooms clean?"
}

Use Case -Approach 1
39
리뷰를데이터웨어하우스로이동시킵니다.
매월말에검토를처리한다음, 상당한수의의견이접수된해
당부서에전달합니다.
이접근방식은이미발생했었던일을알려줍니다.

Use Case -Approach 2
40
실시간으로리뷰를처리하고공항관리팀에대시보드를
제공합니다.
이대시보드는주제별로리뷰를정렬하여청결과관련된
문제를신속하게표시할수있습니다.
이접근방식은지금무슨일이일어나고있는지알려줍
니다.

Use Case -Approach 3
41
실시간으로리뷰를처리합니다.
최근10?동안의화장실청결과관련된3??S나쁜리뷰
에대한알림을설정합니다.
자동으로청소직원을호출하여문제를처리합니다.
이접근방식은무슨일이일어나고있는지에따라무언
가를수행합니다.

Hands on
3. ??а?S???????W?GП?
3.2.1 ?G׻?????S?׼+?ň????

Cluster Architectural Overview
43
MySQL
?ۺ?k?o?+ʛ?oП
Microservice
ΧԤՋǧ??
Website
?Ĺ????*?oՃ????
??o׻
Kafka Connect
Datagen Source
connector
MySQL CDC
connector
Kafka
ksqlDB
transforms
enriches
queries

ksqlDB??Ջگ?o??П?+??????
44
ksqlDB??Kafka Brokers??
????ʿ??node?W?g?,
Confluent Control
Center
ksqlDB Editor &
DataFlow
ksqlDB
CLI
ksqlDB
RESTFul
API

ksqlDB console
45

ksqlDB console
46
> show topics;
> show streams;
> print 'ratings';

Hands on
4. ڦʃ??ksqlDB П?ǧ??[?ǧ???o?S
???߼+
4.2.2 ?G׻?????S?׼+?ň????

Discussion -tables vs streams
48
> describe extended customers;
> select * from customers emit changes;
> select * from customers_flat emit changes;

Stream <-> Table duality
http://www.confluent.io/blog/introducing-kafka-streams-stream-processing-made-simple
http://docs.confluent.io/current/streams/concepts.html#duality-of-streams-and-tables
49

Streams and Tables
{ "event_ts": "2020-02-17T15:22:00Z",
"person" : "robin",
"location": "Leeds"
}{ "event_ts": "2020-02-17T17:23:00Z",
"person" : "robin",
"location": "London"
}{ "event_ts": "2020-02-17T22:23:00Z",
"person" : "robin",
"location": "Wakefield"
}
{ "event_ts": "2020-02-18T09:00:00Z",
"person" : "robin",
"location": "Leeds"
}
+--------------------+-------+---------+
|EVENT_TS |PERSON |LOCATION |
+--------------------+-------+---------+
|2020-02-17 15:22:00 |robin |Leeds |
|2020-02-17 17:23:00 |robin |London |
|2020-02-17 22:23:00 |robin |Wakefield|
|2020-02-18 09:00:00 |robin |Leeds |
+-------+---------+
|PERSON |LOCATION |
+-------+---------+
|robin |Leeds |
Kafka topic
+-------+---------+
|PERSON |LOCATION |
+-------+---------+
|robin |London |
+-------+---------+
|PERSON |LOCATION |
+-------+---------+
|robin |Wakefield|
+-------+---------+
|PERSON |LOCATION |
+-------+---------+
|robin |Leeds |
ksqlDB TableksqlDB Stream
Stream (append-only series of
events):
Topic + Schema
Table: state for
given key
Topic + Schema
50

•Streams = INSERT only
Immutable, append-only
•Tables = INSERT, UPDATE, DELETE
Mutable, row key (event.key) identifies which
row
51

The key to mutability is … the event.key!
52
StreamTable
Has unique key constraint?NoYes
First event with key ‘alice’ arrivesINSERTINSERT
Another event with key ‘alice’ arrivesINSERTUPDATE
Event with key ‘alice’ and value == null arrivesINSERTDELETE
Event with key == null arrivesINSERT<ignored>
RDBMS analogy: A Stream is ~ a Table that has no unique key and is append-only.

Creating a table from a stream or topic
streams

Aggregating a stream (COUNT example)
streams

Aggregating a stream (COUNT example)
streams

KSQL for Data Exploration
An easy way to inspect your data in Kafka
SHOW TOPICS;
SELECTpage, user_id, status, bytes
FROMclickstream
WHEREuser_agentLIKE'Mozilla/5.0%';
PRINT 'my-topic'FROM BEGINNING;
56

KSQL for Data Transformation
Quickly make derivations of existing data in Kafka
CREATE STREAMclicks_by_user_id
WITH(PARTITIONS=6,
TIMESTAMP='view_time’
VALUE_FORMAT='JSON') AS
SELECT* FROMclickstream
PARTITIONBYuser_id;
Change number of partitions1
Convert data to JSON2
Repartition the data3
57

Hands on
4.3 ˃?Ǻۺ??ʿ
4.4 Query ȣ???+Ǽ
8 ?G׻?????S?׼+?ň????

ԫǧ???$??W?w???ع??S?+???o՛???Cۏ?????.
59
•?K??S??W???s?˺???П?W????k?o?+?w???3?;Kafka?W???3?Oа????.
•?o?k?o?+?????o???׹?ʻ?S?????S??Format??????׻??а????.
•?o?K??S?ʿdata streams?wjoin?а????.
•Event Stream?????o׻ψ???7?W???????k?o?+??Query?;?w?S?[?G???oʟ??w???,?S??Query?w
??????а????.
•?oȣ?ۺ~?o???+???w?o׃??ȣ???????$????!

KSQL for Real-Time, Streaming ETL
Filter, cleanse, process data while it is in motion
CREATE STREAMclicks_from_vip_users AS
SELECTuser_id, u.country, page, action
FROMclickstream c
LEFT JOINusers u ONc.user_id = u.user_id
WHEREu.level ='Platinum'; Pick only VIP users1
60

CDC —only after state
61
JSON 데이터는Debezium CDC를통해
MySQL에서가져오는정보를보여줍니다.
여기서"BEFORE" 데이터가없음을알수있습
니다(null임).
이것은레코드가업데이트없이방금생성되었음
을의미합니다. 새고객이처음추가된경우를예
로들수있습니다.

CDC —before and after
62
이제고객레코드에대한업데이트가있었기때
문에일부"BEFORE" 데이터가있습니다.

KSQL for Anomaly Detection
Aggregate data to identify patterns and anomalies in real-time
CREATE TABLEpossible_fraud AS
SELECTcard_number, COUNT(*)
FROMauthorization_attempts
WINDOW TUMBLING(SIZE30SECONDS)
GROUP BYcard_number
HAVING COUNT(*) > 3;
Aggregate data1
… per 30-sec windows2
63

KSQL for Real-Time Monitoring
Derive insights from events (IoT, sensors, etc.) and turn them into actions
CREATE TABLEfailing_vehicles AS
SELECTvehicle, COUNT(*)
FROMvehicle_monitoring_stream
WINDOW TUMBLING(SIZE1MINUTE)
WHEREevent_type = 'ERROR’
GROUP BYvehicle
HAVING COUNT(*) >= 5;Now we know to alert, and whom1
64

Confluent Control Center

C3 -Connector ??ǧ

ksqlDB -Cloud UI (1/2)
67

ksqlDB -Cloud UI (2/2)
68

Monitoring ksqlDB applications
Data flow (1/2)
69

Monitoring ksqlDB applications
Data flow (2/2)
70

ksqlDB Internals

Storage Layer
(Brokers)
Processing Layer
(ksqlDB, KStreams,
etc.)
Partitions play a central role in Kafka
72
Topics are partitioned. Partitions enable scalability,elasticity,fault-tolerance.
storedin
replicatedbased on
orderedbased on
partitionsData is
joinedbased on
read from andwritten to
processedbased on

Processing
Layer
(KSQL,
KStreams)
001001110111000000110010000110Topic
aliceParisbobSydneyaliceRomeStream
plus schema (serdes)
alice2
bob1Table
plus aggregation
Storage Layer
(Brokers)
Topics vs. Streams and Tables
73

Kafka Processing
Data is processed per-partition
...
...
...
...
P1
P2
P3
P4
StorageProcessing
read via
network
Topic App Instance 1Application
App Instance 2
‘payments’ with consumer group
‘my-app’
74

Kafka Processing
Data is processed per-partition
...
...
...
...
P1
P2
P3
P4
StorageProcessingState
Stream Task 1
Stream Task 2
Stream Task 3
Stream Task 4
read via
network
Application Instance 1Topic
Application Instance2
75

Streams and Tables are partitioned, too
...
...
...
...
P1
P2
P3
P4
Stream Task 1
Stream Task 2
Stream Task 3
Stream Task 4
KTable / TABLE
2GB
3GB
5GB
2GB
Application Instance 1
Application Instance 2
76

Kafka Streams Architecture
77

Advanced Features

Windowing
79
“10??o????3??oμ?Sǧ???w??7?o?GǷ”
Windowed Query?????WksqlDB?S??{?W׻????ΧԤ?S?纸??로직을?G???d????.
?o?w߰?oΧ?+??oμ??׻?;??;?3??????WϏ???S???g???[?Ճа????.
TumblingHoppingSession
WINDOW TUMBLING (SIZE5MINUTES)
GROUP BYkey
WINDOW HOPPING (SIZE5MINUTE, ADVANCE BY 1MINUTE)
GROUP BYkey
WINDOW SESSION(60SECONDS)
GROUP BYkey

UDF and machine learning
80
“???׹??k?o?+???3???????G??ǧד????Ԥ?S????ӯԏ”
??ՠ?c???
ksqlDB는스트림처리를단순화하기위해여러내장함수들을제공합니다. 예는다음과같습니다.:
•GEODISTANCE: 두위도/경도좌표사이의거리를측정
•MASK: 문자열을마스크하거나난독화된버전으로변환
•JSON_ARRAY_CONTAINS: 배열에검색값이포함되어있는지확인
ΧԤՋ??S?c???
사용자정의함수를개발하여ksqlDB에서사용가능한기능을확장합니다. 일반적인사용사례는ksqlDB를
통해기계학습알고리즘을구현하여이러한모델이실시간데이터변환에기여할수있도록하는것입니다.

ksqlDB ?W?/?????[?Ճ???G?
81
Streaming ETLAnomaly detection
Real-time monitoring
and Analytics
Sensor data and IoTCustomer 360-view
https://docs.ksqldb.io/en/latest/#what-can-i-do-with-ksqldb

Example: Streaming ETL pipeline
82
* Full example here
•Apache Kafka is a popular choice for powering data pipelines
•ksqlDB makes it simple to transform data within the pipeline,
preparing the messages for consumption by another system.

Example: Anomaly detection
83
•Identify patterns and spot anomalies in real-time data with
millisecond latency, enabling you to properly surface out-of-the-
ordinary events and to handle fraudulent activities separately.
* Full example here

Any questions?
87

one more …

Developerhttps://developer.confluent.io

Tutorials
90
•??է????????S?w????Ԥ??Ճ??ΧԤΧ?;??
•Kafka ?ksqlDB?????W?o?o?w????7?W
Kafka ?ksqlDB?wΧԤ?S???k?o?+???
??׃??П???S?w?o?Gӯ???Ճ?????/??????
??ʿ?[?Ճа?üG?
•ӯ???{?+??Ռ?o?w?[׻ȣ?o?????o?
•Apache Kafka®?????WՋа??w?$?3?S?????.
https://kafka-tutorials.confluent.io/

Free eBooks
Kafka: The Definitive Guide
Neha Narkhede, Gwen Shapira, Todd
Palino
Making Sense of Stream Processing
Martin Kleppmann
I ❤Logs
Jay Kreps
Designing Event-Driven Systems
Ben Stopford
http://cnfl.io/book-bundle

Confluent?S?W??χ???P?s?S???$ʐ
92
Confluent Blog
cnfl.io/blog
Confluent Cloud
cnfl.io/confluent-cloud
Community
cnfl.io/meetups

93

Max processing parallelism = #input partitions
...
...
...
...
P1
P2
P3
P4
Topic Application Instance 1
Application Instance 2
Application Instance 3
Application Instance 4
Application Instance 5 *** idle ***
Application Instance 6 *** idle ***
→ Need higher parallelism? Increase the original topic’s partition count.
→ Higher parallelism for just one usecase? Derive a new topic from the
original with higher partition count. Lower its retention to save storage.
94

How to increase # of partitions when needed
CREATE STREAM products_repartitioned
WITH (PARTITIONS=30) AS
SELECT * FROM products
95
KSQL example: statement below creates a new stream with the desired number of partitions.

‘Hot’ partitions is a problem, often caused by
Strategies to address hot partitions include
1a. Ingress: Find better partitioning function ƒ(event.key) for producers
1b. Storage: Re-partition data into new topic if you can’t change the original
2. Scale processing vertically, e.g. more powerful CPU instances
...
...
...
...
P1
P2
P3
P4
96
1. Events not evenly distributed across partitions
2. Events evenly distributed but certain events take longer to process

Joining Streams and Tables
Data must be ‘co-partitioned’
TableStream
Join Output
(Stream) 97

Joining Streams and Tables
Data must be ‘co-partitioned’
bobmale
alicefemale
alexmale
aliceParis
Table
P1
P2
P3
zoiefemale
andrewmale
minafemale
nataliefemale
blakemale
aliceParis
Stream
P2
(alice, Paris) from
stream’s P2 has a
matchingentry for
alice in the table’s P2.
female 98

Joining Streams and Tables
Data is looked up in same partition number
99
aliceParis alicemale
alicefemale
aliceParis
Stream Table
P2 P1
P2
P3
Here, key ‘alice’ exists in
multiple partitions.
But entry in P2
(female) is used
because the stream-
side event is from
stream’s partition P2.
female
Scenario 2

Joining Streams and Tables
Data is looked up in same partition number
100
aliceParis alicemale
aliceParis
Stream Table
P2 P1
P2
P3
Here, key ‘alice’ exists
only in the table’s P1 !=
P2.
null
no
match!
Scenario 3

Data co-partitioning requirements in detail
Further Reading on Joining Streams and Tables:
https://www.confluent.io/kafka-summit-sf18/zen-and-the-art-of-streaming-joins
https://docs.confluent.io/current/ksql/docs/developer-guide/partition-data.html
101
1. Same keying scheme for both input sides
2. Same number of partitions
3. Same partitioning function ƒ(event.key)

Why is that so?
Because of how input data is mapped to stream tasks
...
...
...
P1
P2
P3
storage
processingstate
Stream Task 2read via
network
Strea
m
Topic
...
...
...
P1
P2
P3
Table
Topic
from stream’s P2
from table’s P2
102

How to re-partition your data when needed
CREATE STREAM products_repartitioned
WITH (PARTITIONS=42) AS
SELECT * FROM products
PARTITION BY product_id;
103
KSQL example: statement below creates a new stream with changed number of partitions and a new field as
event.key (so that its data is now correctly co-partitioned for joining)