Replacing RocksDB with ScyllaDB in Kafka Streams by Almog Gavra

ScyllaDB 265 views 55 slides Mar 05, 2025
Slide 1
Slide 1 of 55
Slide 1
1
Slide 2
2
Slide 3
3
Slide 4
4
Slide 5
5
Slide 6
6
Slide 7
7
Slide 8
8
Slide 9
9
Slide 10
10
Slide 11
11
Slide 12
12
Slide 13
13
Slide 14
14
Slide 15
15
Slide 16
16
Slide 17
17
Slide 18
18
Slide 19
19
Slide 20
20
Slide 21
21
Slide 22
22
Slide 23
23
Slide 24
24
Slide 25
25
Slide 26
26
Slide 27
27
Slide 28
28
Slide 29
29
Slide 30
30
Slide 31
31
Slide 32
32
Slide 33
33
Slide 34
34
Slide 35
35
Slide 36
36
Slide 37
37
Slide 38
38
Slide 39
39
Slide 40
40
Slide 41
41
Slide 42
42
Slide 43
43
Slide 44
44
Slide 45
45
Slide 46
46
Slide 47
47
Slide 48
48
Slide 49
49
Slide 50
50
Slide 51
51
Slide 52
52
Slide 53
53
Slide 54
54
Slide 55
55

About This Presentation

Learn how Responsive replaced embedded RocksDB with ScyllaDB in Kafka Streams, simplifying the architecture and unlocking massive availability and scale. The talk covers unbundling stream processors, key ScyllaDB features tested, and lessons learned from the transition.


Slide Content

A ScyllaDB Community
Replacing RocksDB with
ScyllaDB in Kafka Streams
Almog Gavra
Co-Founder, Responsive

Realtime Search Ingest @
Stream Processing @
Co-Founder @
Almog Gavra

About Kafka Streams
Why Introduce ScyllaDB?
Architecture Deep Dive
Lessons Learned & Practical Tips
Agenda

About Kafka Streams

Kafka
A storage backend optimized for storing
ordered “event” data

Kafka
Kafka Streams
A storage backend optimized for storing
ordered “event” data
A library for building event-driven
applications: realtime, responsive & stateful

Kafka Streams is Everywhere
Realtime Inference
Walmart uses Kafka Streams
to power fraud detection and
purchase recommendations

Kafka Streams is Everywhere
Realtime Inference
Walmart uses Kafka Streams
to power fraud detection and
purchase recommendations
Logistics
Michelin uses Kafka Streams to
handle their tier distribution, ensuring
delivery is tracked in realtime

Kafka Streams is Everywhere
Realtime Inference
Walmart uses Kafka Streams
to power fraud detection and
purchase recommendations
Logistics
Michelin uses Kafka Streams to
handle their tier distribution, ensuring
delivery is tracked in realtime
Liquidity Management
Michelin uses Kafka Streams to
handle their tier distribution, ensuring
delivery is tracked in realtime

Why Introduce ScyllaDB?

Original Design Goals
Just a Library
Kafka Streams should be easy
to integrate with existing apps.

Original Design Goals
Just a Library
Kafka Streams should be easy
to integrate with existing apps.
Complete API
All stream processing use cases
should be possible to write using
Kafka Streams.

Original Design Goals
Just a Library
Kafka Streams should be easy
to integrate with existing apps.
Complete API
All stream processing use cases
should be possible to write using
Kafka Streams.
Depend only on Kafka
There should be no dependencies on
external systems (such as HDFS or
YARN).

Original Design Goals
Depend only on Kafka
There should be no dependencies on
external systems (such as HDFS or
YARN).

Original Design Goals
Availability
Source of truth is in a “changelog” topic in Kafka. If
assignment changes, state must be restored.

Original Design Goals
Availability
Source of truth is in a “changelog” topic in Kafka. If
assignment changes, state must be restored.
Flexibility
Dynamic scaling is impractical. Most deployments
are provisioned for peak throughput.

Original Design Goals
Availability
Source of truth is in a “changelog” topic in Kafka. If
assignment changes, state must be restored.
Flexibility
Dynamic scaling is impractical. Most deployments
are provisioned for peak throughput.
Performance
Difficult to properly attribute resources to the
stream processing vs. RocksDB storage subsystem

What Changed?
2016
Kafka Streams is released with
Apache Kafka 0.10

What Changed?
2016
Kafka Streams is released with
Apache Kafka 0.10
2017
Cloud databases gain
mainstream adoption

What Changed?
2016
Kafka Streams is released with
Apache Kafka 0.10
2017
Cloud databases gain
mainstream adoption
2019
Kubernetes wins out as de
facto orchestration system

What Changed?
2016
Kafka Streams is released with
Apache Kafka 0.10
2017
Cloud databases gain
mainstream adoption
2019
Kubernetes wins out as de
facto orchestration system
2020
Kafka Streams popularity soars,
widening possible use cases

What Changed?
2016
Kafka Streams is released with
Apache Kafka 0.10
2017
Cloud databases gain
mainstream adoption
2019
Kubernetes wins out as de
facto orchestration system
2020
Kafka Streams popularity soars,
widening possible use cases
Today
Some assumptions guiding the original
Kafka Streams design are outdated

Lifting the “Kafka Only Requirement”

Lifting the “Kafka Only Requirement”

Lifting the “Kafka Only Requirement”

Deep Dive: Metronome
Metronome powers billing for companies like
OpenAI, NVIDIA and Databricks using Kafka
Streams.

Mission Critical Feature: Realtime Spend Limits

Key Results Migrating
from RocksDB to Scylla
Availability
Going from regular incidents to “not thinking about
Kafka Streams”
Throughput Growth
ScyllaDB scaled without hiccup as their data size
and throughput scaled
Scale Potential
Decoupled compute and storage means we’ve been
able to scale number of Kafka Partitions and
ScyllaDB cluster size independently
99.99%
3x

Architecture Deep Dive

Data Model
CREATE TABLE key_value
partitionKey INTEGER,
dataKey BLOB,
dataValue BLOB,
epoch BIGINT,
offset BIGINT,
PRIMARY KEY ((partitionKey),
dataKey);

Raw Data
Since Kafka Streams deals only with serialized
bytes (users supply the serializers) it makes storing
data easy and general purpose

Data Model
CREATE TABLE key_value
partitionKey INTEGER,
dataKey BLOB,
dataValue BLOB,
epoch BIGINT,
offset BIGINT,
PRIMARY KEY ((partitionKey),
dataKey);

Primary Key
Using the Kafka partition allows us to implement
LWTs per partition and the data key allows us to
implement efficient lookups and range scans

Data Model
CREATE TABLE key_value
partitionKey INTEGER,
dataKey BLOB,
dataValue BLOB,
epoch BIGINT,
offset BIGINT,
PRIMARY KEY ((partitionKey),
dataKey);

Restoring State
Storing the offset with a sentinel dataKey enables
efficient client hand-offs in failure scenarios

Data Model
CREATE TABLE key_value
partitionKey INTEGER,
dataKey BLOB,
dataValue BLOB,
epoch BIGINT,
offset BIGINT,
PRIMARY KEY ((partitionKey),
dataKey);

What’s This?

Data Model
CREATE TABLE key_value
partitionKey INTEGER,
dataKey BLOB,
dataValue BLOB,
epoch BIGINT,
offset BIGINT,
PRIMARY KEY ((partitionKey),
dataKey);

Node B
Node A
process Kafka commit
Fencing Zombies

Node B
Node A
process Kafka commit
write to
ScyllaDB
Fencing Zombies

Node B
Node A
process Kafka commit
write to
ScyllaDB
process Kafka commit
write to
ScyllaDB
Fencing Zombies

Node B
Node A
process Kafka commit
write to
ScyllaDB
process Kafka commit
write to
ScyllaDB
prevents other nodes from
committing to Kafka (but not
writing to Scylla!)
Fencing Zombies

Node B
Node A
process Kafka commit
write to
ScyllaDB
process Kafka commit
write to
ScyllaDB
write to
ScyllaDB
Fencing Zombies

Node B
Node A
process Kafka commit
write to
ScyllaDB
process Kafka commit
write to
ScyllaDB
write to
ScyllaDB
Might overwrite data from the
previous ScyllaDB write!
Fencing Zombies

Data Model
CREATE TABLE key_value
partitionKey INTEGER,
dataKey BLOB,
dataValue BLOB,
epoch BIGINT,
offset BIGINT,
PRIMARY KEY ((partitionKey),
dataKey);

BEGIN BATCH;
starts an Atomic Batch (not
for speed, but for LWT)

Data Model
CREATE TABLE key_value
partitionKey INTEGER,
dataKey BLOB,
dataValue BLOB,
epoch BIGINT,
offset BIGINT,
PRIMARY KEY ((partitionKey),
dataKey);

BEGIN BATCH;
UPDATE key_value
SET epoch = 12
WHERE
partitionKey = 1
dataKey = metadata_key
IF epoch <= 12;

Data Model
CREATE TABLE key_value
partitionKey INTEGER,
dataKey BLOB,
dataValue BLOB,
epoch BIGINT,
offset BIGINT,
PRIMARY KEY ((partitionKey),
dataKey);

BEGIN BATCH;
UPDATE key_value
SET epoch = 12
WHERE
partitionKey = 1
dataKey = metadata_key
IF epoch <= 12;
INSERT INTO key_value VALUES …;
INSERT INTO key_value VALUES …;
INSERT INTO key_value VALUES …;
INSERT INTO key_value VALUES …;

Data Model
CREATE TABLE key_value
partitionKey INTEGER,
dataKey BLOB,
dataValue BLOB,
epoch BIGINT,
offset BIGINT,
PRIMARY KEY ((partitionKey),
dataKey);

BEGIN BATCH;
UPDATE key_value
SET epoch = 12
WHERE
partitionKey = 1
dataKey = metadata_key
IF epoch <= 12;
INSERT INTO key_value VALUES …;
INSERT INTO key_value VALUES …;
INSERT INTO key_value VALUES …;
INSERT INTO key_value VALUES …;
APPLY BATCH;

Data Model
CREATE TABLE key_value
partitionKey INTEGER,
dataKey BLOB,
dataValue BLOB,
epoch BIGINT,
offset BIGINT,
PRIMARY KEY ((partitionKey),
dataKey);

BEGIN BATCH;
UPDATE key_value
SET epoch = 12
WHERE
partitionKey = 1
dataKey = metadata_key
IF epoch <= 12;
INSERT INTO key_value VALUES …;
INSERT INTO key_value VALUES …;
INSERT INTO key_value VALUES …;
INSERT INTO key_value VALUES …;
APPLY BATCH;

Lessons Learned

Latency Issues? Check Disk!
Bloom Filter Usage
An increase in disk reads is often a symptom
something else is wrong. For us, there were a few
times Bloom Filters were not properly constructed
(once due to a config, once due to a bug)!

Use LWT Only When Necessary
Cost of Atomic Batches
Atomic Batches significantly slow down write
throughput, even if they’re contained to only a
single partition
Throughput Before/After Removing LWTs

Selecting Node Size
Node Size
How to choose your ScyllaDB Cloud node type?

Consistency
Don’t Be Inconsistent!
Make sure you’ve set your Read / Write consistency
levels. The default is ONE / ONE, which can give
inconsistent results!
Favor Fast Reads
Favor Fast Writes
Inconsistent ConsistentONE/ONE

Consistency
Don’t Be Inconsistent!
Make sure you’ve set your Read / Write consistency
levels. The default is ONE / ONE, which can give
inconsistent results!
Favor Fast Reads
Favor Fast Writes
Inconsistent Consistent
QUORUM/
QUORUM
ONE/ONE

Consistency
Don’t Be Inconsistent!
Make sure you’ve set your Read / Write consistency
levels. The default is ONE / ONE, which can give
inconsistent results!
Favor Fast Reads
Favor Fast Writes
Inconsistent Consistent
QUORUM/
QUORUM
ONE/ONE
ONE/ALL
risky, but has niche
use cases

Consistency
QUORUM/QUORUM
Need Faster Reads
Option: use ONE/ALL temporarily

Consistency
QUORUM/QUORUM
Need Faster Reads
Option: use ONE/ALL temporarily
Migrate to ALL / ALL
This is an intermediate state necessary
to maintain consistency
Run Repair
Ensure all data is available on all nodes
from the QUORUM / QUORUM time

Consistency
QUORUM/QUORUM
Need Faster Reads
Option: use ONE/ALL temporarily
Migrate to ALL / ALL
This is an intermediate state necessary
to maintain consistency
Run Repair
Ensure all data is available on all nodes
from the QUORUM / QUORUM time
Enable ONE / ALL
Speedy reads! This can help you
temporarily and migrating back to
QUORUM / QUORUM is safe.

Stay in Touch
Almog Gavra
[email protected]
@almog.ai
@agavra
/in/agavra/
Tags