KSQL - Stream Processing simplified!

gschmutz 867 views 48 slides Sep 25, 2018
Slide 1
Slide 1 of 48
Slide 1
1
Slide 2
2
Slide 3
3
Slide 4
4
Slide 5
5
Slide 6
6
Slide 7
7
Slide 8
8
Slide 9
9
Slide 10
10
Slide 11
11
Slide 12
12
Slide 13
13
Slide 14
14
Slide 15
15
Slide 16
16
Slide 17
17
Slide 18
18
Slide 19
19
Slide 20
20
Slide 21
21
Slide 22
22
Slide 23
23
Slide 24
24
Slide 25
25
Slide 26
26
Slide 27
27
Slide 28
28
Slide 29
29
Slide 30
30
Slide 31
31
Slide 32
32
Slide 33
33
Slide 34
34
Slide 35
35
Slide 36
36
Slide 37
37
Slide 38
38
Slide 39
39
Slide 40
40
Slide 41
41
Slide 42
42
Slide 43
43
Slide 44
44
Slide 45
45
Slide 46
46
Slide 47
47
Slide 48
48

About This Presentation

KSQL is a stream processing SQL engine, which allows stream processing on top of Apache Kafka. KSQL is based on Kafka Stream and provides capabilities for consuming messages from Kafka, analysing these messages in near-realtime with a SQL like language and produce results again to a Kafka topic. By ...


Slide Content

BASEL BERN BRUGG DÜSSELDORF FRANKFURT A.M. FREIBURG I.BR. GENF
HAMBURG KOPENHAGEN LAUSANNE MÜNCHEN STUTTGART WIEN ZÜRICH
KSQL
Stream Processing leicht gemacht!
Guido Schmutz
DOAG Big Data 2018 – 20.9.2018

@gschmutz guidoschmutz.wordpress.com

Guido Schmutz
Working at Trivadis for more than 21 years
Oracle ACE Director for Fusion Middleware and SOA
Consultant, Trainer Software Architect for Java, Oracle, SOA and
Big Data / Fast Data
Head of Trivadis Architecture Board
Technology Manager @ Trivadis

More than 30 years of software development experience

Contact: [email protected]
Blog: http://guidoschmutz.wordpress.com
Slideshare: http://www.slideshare.net/gschmutz
Twitter: gschmutz

Agenda
1.What is Apache Kafka?
2.KSQL in Action
3.Summary


• ,

What is Apache Kafka?



are

Apache Kafka – A Streaming Platform
High-Level Architecture








Distributed Log at the Core
Scale-Out Architecture








Logs do not (necessarily) forget

Hold Data for Long-Term – Data Retention
Producer 1
Broker 1
Broker 2
Broker 3
1.Never

2.Time based (TTL)
log.retention.{ms | minutes | hours}
3.Size based
log.retention.bytes

4.Log compaction based
(entries with same key are removed):

kafka-topics.sh --zookeeper zk:2181 \
--create --topic customers \
--replication-factor 1 \
--partitions 1 \
--config cleanup.policy=compact

Keep Topics in Compacted Form
0 1 2 3 4 5 6 7 8 9 10 11
K1 K2 K1 K1 K3 K2 K4 K5 K5 K2 K6 K2
V1 V2 V3 V4 V5 V6 V7 V8 V9 V10 V11
Offset
Key
Value
3 4 6 8 9 10
K1 K3 K4 K5 K2 K6
V4 V5 V7 V9 V10 V11
Offset
Key
Value
Compaction
V1
V2
V3 V4
V5
V6
V7
V8 V9
V1
0
V1
1
K1
K3
K4
K5
K2
K6

Demo (I)
Driving
Info
truck_drivin
g
info
Position
console
consumer
Testdata-Generator by Hortonworks
truck_positio
n
{"timestamp":1537343400827,"truckId":87,
"driverId":13,"routeId":987179512,"eventType":"Norma
l",
"correlationId":"-3208700263746910537"}
{"timestamp":1537342514539,"truckId":
87,"latitude":38.65,"longitude":-90.21}

Demo (I) – Create Kafka Topic
$ kafka-topics --zookeeper zookeeper:2181 --create
--topic truck_position --partitions 8 --replication -factor 1
$ kafka-topics --zookeeper zookeeper:2181 –list
__consumer_offsets
_confluent-metrics
_schemas
docker-connect-configs
docker-connect-offsets
docker-connect-status
truck_position

Demo (I) – Run Producer and Kafka-Console-Consumer

Demo (I) – Java Producer to "truck_position"
Constructing a Kafka Producer







private Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers","broker-1:9092);
kafkaProps.put("key.serializer", "...StringSerializer");
kafkaProps.put("value.serializer", "...StringSerializer");

producer = new KafkaProducer<String, String>(kafkaProps );
ProducerRecord<String, String> record =
new ProducerRecord<>("truck_position", driverId, eventData);
try {
metadata = producer.send(record) .get();
} catch (Exception e) {}

Apache Kafka – wait there is more!
Source
Connector
trucking_
driver
Kafka Broker
Sink
Connector
Stream
Processing

Kafka Connect - Overview
Source
Connecto
r
Sink
Connecto
r

Choosing the Right API

•Java, c#, c++,
scala, phyton,
node.js, go, php


•subscribe()
•poll()
•send()
•flush()

•Anything Kafka

•Fluent Java API



•mapValues()
•filter()
•flush()


•Stream Analytics

•SQL dialect



•SELECT … FROM

•JOIN ... WHERE
•GROUP BY


•Stream Analytics
Consumer,
Producer API
Kafka Streams KSQL

•Declarative



•Configuration
•REST API
•Out-of-the-box
connectors

•Stream
Integration
Kafka Connect
Flexibility Simplicity
Source: adapted from Confluent

Demo (II) – Connect to MQTT through Kafka Connect
truck/nn/
position
mqtt to
kafka
Driving
Info
Position
truck/nn/
drving-info
mqtt to
kafka
truck_driving
info
truck_positio
n
{"timestamp":1537343400827,"truckId":87,
"driverId":13,"routeId":987179512,"eventType":"Norma
l",
"correlationId":"-3208700263746910537"}
{"timestamp":1537342514539,"truckId":
87,"latitude":38.65,"longitude":-90.21}

KSQL in Action



are

KSQL: a Streaming SQL Engine for Apache Kafka
•Enables stream processing with zero coding required
•The simples way to process streams of data in real-time
•Powered by Kafka and Kafka Streams: scalable, distributed, mature
•All you need is Kafka – no complex deployments
•available as Developer preview!

•STREAM and TABLE as first-class citizens
•STREAM = data in motion
•TABLE = collected state of a stream
•join STREAM and TABLE

KSQL Architecture & Components
KSQL Server
•runs the engine that executes KSQL queries
•includes processing, reading, and writing data to and from the target Kafka cluster
•KSQL servers form KSQL clusters and can run in containers, virtual machines, and
bare-metal machines
•You can add and remove servers to/from the same KSQL cluster during live
operations to elastically scale KSQL’s processing capacity as desired
•You can deploy different KSQL clusters to achieve workload isolation

KSQL CLI
•You can interactively write KSQL queries by using the KSQL command line interface
(CLI).
•KSQL CLI acts as a client to the KSQL server
•For production scenarios you may also configure KSQL servers to run in non-
interactive “headless” configuration, thereby preventing KSQL CLI access

Demo (IV) - Start Kafka KSQL
$ docker-compose exec ksql-cli ksql-cli local --bootstrap-server broker-1:9092
======================================
= _ __ _____ ____ _ =
= | |/ // ____| / __ \| | =
= | ' / | (___ | | | | | =
= | < \___ \| | | | | =
= | . \ ____) | |__| | |____ =
= |_|\_\_____ / \___\_\______| =
= =
= Streaming SQL Engine for Kafka =
Copyright 2017 Confluent Inc.

CLI v0.1, Server v0.1 located at http://localhost:9098

Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!

ksql>

Terminology
Stream

•an unbounded sequence of structured data
(“facts”)
•Facts in a stream are immutable: new facts
can be inserted to a stream, but existing
facts can never be updated or deleted •Streams can be created from a Kafka topic
or derived from an existing stream
•A stream’s underlying data is durably stored
(persisted) within a Kafka topic on the Kafka
brokers

Table

•materialized View of events with only the
latest value for a key
•a view of a stream, or another table, and
represents a collection of evolving facts
•the equivalent of a traditional database table
but enriched by streaming semantics such
as windowing •Facts in a table are mutable: new facts can
be inserted to the table, and existing facts
can be updated or deleted
•Tables can be created from a Kafka topic or
derived from existing streams and tables

CREATE STREAM
Create a new stream, backed by a Kafka topic, with the specified columns and
properties




Supported column data types:
•BOOLEAN, INTEGER, BIGINT, DOUBLE, VARCHAR or STRING
•ARRAY<ArrayType>
•MAP<VARCHAR, ValueType>
•STRUCT<FieldName FieldType, ...>

Supports the following serialization formats: CSV, JSON, AVRO

KSQL adds the implicit columns ROWTIME and ROWKEY to every stream

CREATE STREAM stream_name ( { column_name data_type } [, ...] )
WITH ( property_name = expression [, ...] );

CREATE TABLE
Create a new table with the specified columns and properties




Supports same data types as CREATE STREAM

KSQL adds the implicit columns ROWTIME and ROWKEY to every table as well

KSQL has currently the following requirements for creating a table from a Kafka topic
•message key must also be present as a field/column in the Kafka message value
•message key must be in VARCHAR aka STRING format
CREATE TABLE table_name ( { column_name data_type } [, ...] ) WITH (
property_name = expression [, ...] );

Demo (III) – Create a STREAM on truck_driving_info
truck/nn/
position
mqtt to
kafka
Position truck_positio
n
Driving
Info
truck/nn/
drving-info
mqtt to
kafka
truck_driving
info
Stream
{"timestamp":1537343400827,"truckId":87,
"driverId":13,"routeId":987179512,"eventType":"Norma
l",
"correlationId":"-3208700263746910537"}
{"timestamp":1537342514539,"truckId":
87,"latitude":38.65,"longitude":-90.21}

Demo (III) - Create a STREAM on truck_driving_info
ksql> CREATE STREAM truck_driving_info_s \
(ts VARCHAR, \
truckId VARCHAR, \
driverId BIGINT, \
routeId BIGINT, \
eventType VARCHAR, \
correlationId VARCHAR) \
WITH (kafka_topic='truck_driving_info', \
value_format= ‘JSON');

Message
----------------
Stream created

Demo (III) - Create a STREAM on truck_driving_info
ksql> describe truck_position_s;

Field | Type
---------------------------------
ROWTIME | BIGINT
ROWKEY | VARCHAR(STRING)
TS | VARCHAR(STRING)
TRUCKID | VARCHAR(STRING)
DRIVERID | BIGINT
ROUTEID | BIGINT
EVENTTYPE | VARCHAR(STRING)
LATITUDE | DOUBLE
LONGITUDE | DOUBLE
CORRELATIONID | VARCHAR(STRING)

SELECT
Selects rows from a KSQL stream or table

Result of this statement will not be persisted in a Kafka topic and will only be printed out
in the console








from_item is one of the following: stream_name, table_name
SELECT select_expr [, ...]
FROM from_item
[ LEFT JOIN join_table ON join_criteria ]
[ WINDOW window_expression ]
[ WHERE condition ]
[ GROUP BY grouping_expression ]
[ HAVING having_expression ]
[ LIMIT count ];

Demo (III) – Use SELECT to browse from Stream
truck/nn/
position
mqtt to
kafka
KSQL CLI
Driving
Info
Position
truck/nn/
drving-info
mqtt to
kafka
truck_driving
info
truck_positio
n
Stream
{"timestamp":1537342514539,"truckId":
87,"latitude":38.65,"longitude":-90.21}
{"timestamp":1537343400827,"truckId":87,
"driverId":13,"routeId":987179512,"eventType":"Norma
l",
"correlationId":"-3208700263746910537"}

Demo (III) – Use SELECT to browse from Stream
ksql> SELECT * FROM truck_position_s;
1522847870317 | "truck /13/position0 | �1522847870310 | 44 | 13 | 1390372503 |
Normal | 41.71 | -91.32 | -2458274393837068406
1522847870376 | "truck /14/position0 | �1522847870370 | 35 | 14 | 1961634315 |
Normal | 37.66 | -94.3 | -2458274393837068406
1522847870418 | "truck /21/position0 | �1522847870410 | 58 | 21 | 137128276 |
Normal | 36.17 | -95.99 | -2458274393837068406
1522847870397 | "truck /29/position0 | �1522847870390 | 18 | 29 | 1090292248 |
Normal | 41.67 | -91.24 | -2458274393837068406

ksql> SELECT * FROM truck_position_s WHERE eventType != 'Normal';
1522847914246 | "truck /11/position0 | �1522847914240 | 54 | 11 | 1198242881 |
Lane Departure | 40.86 | -89.91 | -2458274393837068406
1522847915125 | "truck /10/position0 | �1522847915120 | 93 | 10 | 1384345811 |
Overspeed | 40.38 | -89.17 | -2458274393837068406
1522847919216 | "truck /12/position0 | �1522847919210 | 75 | 12 | 24929475 |
Overspeed | 42.23 | -91.78 | -2458274393837068406

CREATE STREAM … AS SELECT …
Create a new KSQL table along with the corresponding Kafka topic and stream the
result of the SELECT query as a changelog into the topic

WINDOW clause can only be used if the from_item is a stream
CREATE STREAM stream_name
[WITH ( property_name = expression [, ...] )]
AS SELECT select_expr [, ...]
FROM from_stream [ LEFT | FULL | INNER ]
JOIN [join_table | join_stream]
[ WITHIN [(before TIMEUNIT, after TIMEUNIT) | N TIMEUNIT] ] ON join_criteria
[ WHERE condition ]
[PARTITION BY column_name];

INSERT INTO … AS SELECT …
Stream the result of the SELECT query into an existing stream and its underlying topic

schema and partitioning column produced by the query must match the stream’s
schema and key

If the schema and partitioning column are incompatible with the stream, then the
statement will return an error

stream_name and from_item must both
refer to a Stream. Tables are not supported!
CREATE STREAM stream_name ...;

INSERT INTO stream_name
SELECT select_expr [., ...]
FROM from_stream
[ WHERE condition ]
[ PARTITION BY column_name ];

Demo (IV) – CREATE AS … SELECT …
truck/nn/
position
mqtt to
kafka
Position
{"timestamp":1537342514539,"truckId":
87,"latitude":38.65,"longitude":-90.21}
truck_positio
n
detect_dangerou
s_driving
Driving
Info
truck/nn/
drving-info
mqtt to
kafka
truck_driving
info
Stream Stream
dangerous_
driving
{"timestamp":1537343400827,"truckId":87,
"driverId":13,"routeId":987179512,"eventType":"Norma
l",
"correlationId":"-3208700263746910537"}

Demo (IV) – CREATE AS … SELECT …
ksql> CREATE STREAM dangerous_driving_s \
WITH (kafka_topic= dangerous_driving_s', \
value_format= 'JSON') \
AS SELECT * FROM truck_position_s \
WHERE eventtype != 'Normal';

Message
----------------------------
Stream created and running
ksql> select * from dangerous_driving_s;
1522848286143 | "truck /15/position0 | �1522848286125 | 98 | 15 | 987179512 |
Overspeed | 34.78 | -92.31 | -2458274393837068406
1522848295729 | "truck /11/position0 | �1522848295720 | 54 | 11 | 1198242881 |
Unsafe following distance | 38.43 | -90.35 | -2458274393837068406
1522848313018 | "truck /11/position0 | �1522848313000 | 54 | 11 | 1198242881 |
Overspeed | 41.87 | -87.67 | -2458274393837068406

Functions
Scalar Functions
•ABS, ROUND, CEIL, FLOOR
•ARRAYCONTAINS
•CONCAT, SUBSTRING, TRIM
•EXTRACJSONFIELD
•GEO_DISTANCE
•LCASE, UCASE
•MASK, MASK_KEEP_LEFT,
MASK_KEEP_RIGHT, MASK_LEFT,
MASK_RIGHT
•RANDOM
•STRINGTOTIMESTAMP,
TIMESTAMPTOSTRING
Aggregate Functions
•COUNT
•MAX
•MIN
•SUM
•TOPK
•TOPKDISTINCT


User-Defined Functions (UDF) and User-
Defined Aggregate Functions (UDAF) •Currently only supported using Java

Windowing
Introduction to Stream Processing
Since streams are unbounded, you need
some meaningful time frames to do
computations (i.e. aggregations)

Computations over events done using
windows of data



Windows give the power to keep a
working memory and look back at recent
data efficiently

Windows are tracked per unique key

Time
Stream of Data
Window of Data

Sliding Window (aka
Hopping Window) - uses
eviction and trigger policies
that are based on time: window
length and sliding interval
length
Fixed Window (aka Tumbling
Window) - eviction policy always
based on the window being full
and trigger policy based on
either the count of items in the
window or time
Session Window – composed
of sequences of temporarily
related events terminated by a
gap of inactivity greater than
some timeout
Windowing
Introduction to Stream Processing
Time Time Time

Demo (IV) – Aggregate and Window
truck/nn/
position
mqtt to
kafka
detect_dangerou
s_driving
Driving
Info
Position
truck/nn/
drving-info
mqtt to
kafka
truck_driving
info
truck_positio
n
Stream Stream
dangerous_
driving
{"timestamp":1537343400827,"truckId":87,
"driverId":13,"routeId":987179512,"eventType":"Norma
l",
"correlationId":"-3208700263746910537"}
{"timestamp":1537342514539,"truckId":
87,"latitude":38.65,"longitude":-90.21}
count_by_
eventType
Table
Count_by_
evnet_type

Demo (IV) – SELECT COUNT … GROUP BY
ksql> CREATE TABLE dangerous_driving_count AS \
SELECT eventType, count(*) nof \
FROM dangerous_driving_s \
WINDOW TUMBLING (SIZE 30 SECONDS) \
GROUP BY eventType;

Message
----------------------------
Table created and running
ksql> SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy- MM-dd HH:mm:ss.SSS’),
eventType, nof
FROM dangerous_driving_count ;;
2018-09-19 20:10:59.587 | Overspeed | 1
2018-09-19 20:11:15.713 | Unsafe following distance | 1
2018-09-19 20:11:39.662 | Unsafe tail distance | 1
2018-09-19 20:12:03.870 | Unsafe following distance | 1
2018-09-19 20:12:04.502 | Overspeed | 1
2018-09-19 20:12:05.856 | Lane Departure | 1

Joining
Introduction to Stream Processing
Challenges of joining streams

1.Data streams need to be aligned as they
come because they have different timestamps
2.since streams are never-ending, the joins
must be limited; otherwise join will never end
3.join needs to produce results continuously as
there is no end to the data

Stream to Static (Table) Join
Stream to Stream Join (one window join)






Stream to Stream Join (two window join)

Stream-
to-Static
Join
Stream-
to-Stream
Join
Stream-
to-Stream
Join
Time
Time
Time

Demo (V) – Join Table to enrich with Driver data
truck/nn/
position
mqtt to
kafka
detect_dangerou
s_driving
Driving
Info
Position
truck/nn/
drving-info
mqtt to
kafka
truck_driving
info
truck_positio
n
Truck
Driver
jdbc-
source
trucking_
driver
27, Walter, Ward, Y, 24-JUL-85, 2017-10-02
15:19:00
{"id":27,"firstName":"Walter"
,"lastName":"Ward","availab
le":"Y","birthdate":"24-JUL-
85","last_update":15069230
52012}
Stream
Table
Stream
join_dangerous_
driving_driver
Stream
dangerous_dr
iving_driver
dangerous_
driving
{"timestamp":1537343400827,"truckId":87,
"driverId":13,"routeId":987179512,"eventType":"Norma
l",
"correlationId":"-3208700263746910537"}
{"timestamp":1537342514539,"truckId":
87,"latitude":38.65,"longitude":-90.21}

Demo (V) – Join Table to enrich with Driver data
#!/bin/bash
curl -X "POST" "http://192.168.69.138:8083/connectors " \
-H "Content-Type: application/json " \
-d $'{
"name": "jdbc-driver-source",
"config": {
"connector.class": "JdbcSourceConnector",
"connection.url":"jdbc:postgresql://db/sample?user=sample&password=sample ",

"mode": "timestamp",
"timestamp.column.name ":"last_update",
"table.whitelist":"driver",
"validate.non.null":"false",
"topic.prefix":"truck_",
"key.converter":"org.apache.kafka.connect.json.JsonConverter ",
"key.converter.schemas.enable ": "false",
"value.converter":"org.apache.kafka.connect.json.JsonConverter ",
"value.converter.schemas.enable ": "false",
"name": "jdbc-driver-source",
"transforms":"createKey,extractInt ",
"transforms.createKey.type ":"org.apache.kafka.connect.transforms.ValueToKey ",
"transforms.createKey.fields ":"id",
"transforms.extractInt.type ":"org.apache.kafka.connect.transforms.ExtractField$Key ",
"transforms.extractInt.field ":"id"
}
}'

Demo (V) - Create Table with Driver State
ksql> CREATE TABLE driver_t \
(id BIGINT, \
first_name VARCHAR, \
last_name VARCHAR, \
available VARCHAR) \
WITH (kafka_topic='truck_driver', \
value_format= 'JSON', \
key='id');

Message
----------------
Table created

Demo (V) - Create Table with Driver State
ksql> CREATE STREAM dangerous_driving_and_driver_s \
WITH (kafka_topic='dangerous_driving_and_driver_s ', \
value_format= 'JSON') \
AS SELECT driverId, first_name, last_name, truckId, routeId, eventtype \
FROM truck_position_s \
LEFT JOIN driver_t \
ON dangerous_driving_and_driver_s.driverId = driver_t.id;

Message
----------------------------
Stream created and running

ksql> select * from dangerous_driving_and_driver_s ;
1511173352906 | 21 | 21 | Lila | Page | 58 | 1594289134 | Unsafe tail distance
1511173353669 | 12 | 12 | Laurence | Lindsey | 93 | 1384345811 | Lane Departure
1511173435385 | 11 | 11 | Micky | Isaacson | 22 | 1198242881 | Unsafe tail
distance

Demo (VI) – Stream-to-Stream Join
truck/nn/
position
mqtt to
kafka
detect_dangerou
s_driving
Driving
Info
Position
truck/nn/
drving-info
mqtt to
kafka
truck_driving
info
truck_positio
n
join_dangerous_
and_position
Truck
Driver
jdbc-
source
trucking_
driver
27, Walter, Ward, Y, 24-JUL-85, 2017-10-02
15:19:00
{"id":27,"firstName":"Walter"
,"lastName":"Ward","availab
le":"Y","birthdate":"24-JUL-
85","last_update":15069230
52012}
Stream
Stream
Table
Stream
join_dangerous_
driving_driver
Stream
dangerous_dr
iving_driver
dangerous_
driving
Stream
dangerous_
driving_position
{"timestamp":1537343400827,"truckId":87,
"driverId":13,"routeId":987179512,"eventType":"Norma
l",
"correlationId":"-3208700263746910537"}
{"timestamp":1537342514539,"truckId":
87,"latitude":38.65,"longitude":-90.21}

Demo (V) - Stream-to-Stream Join
ksql> CREATE STREAM truck_position_s \
(timestamp VARCHAR, \
truckId VARCHAR, \
latitude DOUBLE, \
longitude DOUBLE) \
WITH (kafka_topic='truck_position', \
value_format='JSON');

ksql> SELECT ddad.driverid , ddad.first_name, ddad.last_name, ddad.truckid,
ddad.routeid , ddad.eventtype, tp.latitude, tp.longitude \
FROM dangerous_driving_and_driver_s ddad \
INNER JOIN truck_position_s tp \
WITHIN 1 minute \
ON tp.truckid = ddad.truckid;

11 | Micky | Isaacson | 47 | 1961634315 | Unsafe tail distance | 38.99 | -94.3 8
11 | Micky | Isaacson | 47 | 1961634315 | Unsafe tail distance | 38.67 | -94.3 8
12 | Laurence | Lindsey | 52 | 1198242881 | Lane Departure | 38.0 | -94.37

Summary

Summary
KSQL is another way to work with data in Kafka => you can (re)use some of your SQL
knowledge

Similar semantics to SQL, but is for queries on continuous, streaming data

Well-suited for structured data (there is the ”S” in KSQL)

KSQL is dependent on “Kafka core”
•KSQL consumes from Kafka broker
•KSQL produces to Kafka broker

KSQL runs as a Java application and can be deployed to various resource managers

Use Kafka Connect or any other Stream Data Integration tool to bring your data into
Kafka first

Technology on its own won't help you.
You need to know how to use it properly.