Data Structures Handling Trillions of Daily Streaming Events by Evan Chan

ScyllaDB 84 views 29 slides Mar 04, 2025
Slide 1
Slide 1 of 29
Slide 1
1
Slide 2
2
Slide 3
3
Slide 4
4
Slide 5
5
Slide 6
6
Slide 7
7
Slide 8
8
Slide 9
9
Slide 10
10
Slide 11
11
Slide 12
12
Slide 13
13
Slide 14
14
Slide 15
15
Slide 16
16
Slide 17
17
Slide 18
18
Slide 19
19
Slide 20
20
Slide 21
21
Slide 22
22
Slide 23
23
Slide 24
24
Slide 25
25
Slide 26
26
Slide 27
27
Slide 28
28
Slide 29
29

About This Presentation

Conviva built a high-performance streaming analytics pipeline processing trillions of events daily—then made it programmable for diverse workloads. This talk covers how they boosted throughput & memory efficiency 5x+ with Rust, zero-copy data, better execution models & smarter data structu...


Slide Content

A ScyllaDB Community
Data Structures Handling
Trillions of Daily Streaming
Events
Evan Chan
Principal Software Engineer

Evan Chan (he/him)
■20+ years high performance data processing
■Creator, FiloDB - distributed time series DB
■Used in production at Apple, trillions of time
series, PBs of data
■Rust/Scala/Spark/Cassandra/etc….

Conviva

Our Streaming Architecture
Gateway
Time-State Analytics
Custom Stream
Processor
Device with
API
Device with
API
Device with
API
Device with
API
Device with
API
Device with
API
Device with
API
Device with
API
Device with
API
Device with
API
Device with
API
Device with
API
Data Warehouse
Kafka
API
UI

Stream Processing Evolution
■Many iterations of stream processing technologies
■Custom Java -> Spark Streaming -> Flink -> Scala/Akka -> Rust
■Complex stateful event processing and metrics calculations
■Why Rust?
■SQL not well suited to complex stateful processing
■> 3x more cost effective than JVM/Scala
■Much better control over memory, data structures (compared to JVM)
■More safety than C++
■More efficient than Go, and no GC pauses

DAGs, Timelines, and Joins

Timelines for Easier Stateful Computation

Timeline Data Structure
Timeline A
List of Spans
Span1
(t1, t2)
Event1
Span2
(t2, t3)
Event2
New heap allocation for list of spans,
and list of events
Span3
(t3, t4)
Event3 Event4
List of Events

DAGs of Timelines
Count
Count
EventA
EventB
Add
Timeline
Timeline
Timeline
Timeline
Watermark
Watermark
Watermark
Watermark
Complicated Joins - multiple
watermarks and intersecting
different span time intervals

Timeline A
More Efficient Timelines: Using SmallVec
SmallVec
Span1
(t1, t2)
Span3
(t3, t4)
Event1
Span2
(t2, t3)
Event2
Event3 Event4
■SmallVec allows one to inline a fixed number of elements, avoiding a
heap allocation unless the list grows beyond the inline limit

Simpler Data and Execution Model, Joins
■Most joins are simple
■We simplified binary operators
(joins) by processing one event at
a time across the entire DAG - now
they are always aligned in time
■Use a “global queue” to resolve
interval/span differences
■No more need for watermarks

Improving CPU and Throughput
Scala/Akka
Custom
processing
code
1x
Throughput
Rust
Programmable
platform
SmallVec-based
Timelines
3x End to
End
>=4x CPU
(no I/O)
Rust
Simplified Data
Model + Joins
8-20x CPU
improvement
(no I/O)

HashMaps, Copy-on-Write,
Immutability

Input: {
“Context”: {
“Build”: {
“Os”: “iOS”,
“Version”: “17.6”,
},
“geoInfo”: {
“Latitude”: “37.185…”..
}
}
}
●Our events are complex deeply
nested JSON objects
●Many intermediate transforms
of JSON objects are often
necessary
●Output events and metrics are
also usually nested
Nested HashMaps are Everywhere

Updating Nested Maps is $$$
■Malloc and clone calls for each key, value + hashing… = very $$$

This helped reduce the deep cloning significantly.
●Immutable hashmaps are copy-on-write: clones are cheap until you
modify a clone
●Only need to clone and modify a subtree, much cheaper
●In Rust, implemented using a trie, similar to B-Tree
○Memory efficiency depends partly on the size of each node and your number of
elements
●We found immutable hashmaps reduced CPU (much less time for
allocations) but still used up lots of memory in practice

Immutable HashMaps

im::HashMap still used up a significant amount of memory, even though
not cloning helps with CPU and allocations.
■We have lots of deeply nested but small maps
■Immutable hashmaps are much more complex to track structural
sharing, and must store hashes as well.
■Older versions in im::HashMap retain more memory
■im::HashMap has a larger node size (32 vs 11)
■Prefer BTreeMap to HashMap when possible

HashMaps vs BTrees

Solution:
■Use smaller size of BTrees
as a base
■Use an Arc (Atomic Ref
Count) for copy-on-write
■Ended up using >5x less
memory

Copy-on-write BTrees and Ref Counting
Ref Count BTree
Atomic Ref Atomic Ref
clone
Deep Clone on Write

■Schemas for most maps are
fixed
■Use an external schema with
a heterogeneous typed list -
a “virtual struct”
■Map from key in schema to
index in list
■No keys: faster, less memory

Vecs and Schemas - No More Keys!
Schema
0: req_type: string 1: url: string2: duration: int
“Struct”
“Login” “http://…” 50

Improving Memory Usage > 10x
■Changing from im::HashMap to Copy-On-Write BTree to Vecs
helped a huge amount in memory consumption:
Map Data Structure Used Total Retained Memory Improvement Factor
im::HashMap 134 GiB 1x
Copy-on-write BTreeMap 17 GiB 7.8x
Vec with external schema 9 GiB 14.8x

Zero-Copy for the Win

Input
■Deserialization is very $$$: Multiple rounds, many small objects

Why Serialization Matters
JSON Bytes
Deserialize to Intermediate Representation
JsonObject
K1
Str1
K3K2
Str2 JsonObject2
K4
Str3
K5
Str4
IR to Final Data Structures
DeviceInfo
UserAgent
Version
GeoInfo

"Zero-copy" is a bit of misnomer.
Copying bits around - a bit copy - is actually really efficient and fast.
What is not fast are all the steps of deserialization - because the input cannot
be used as the data structure.
●Examining EACH input value
●Translating EACH input bytes into its native type
●LOTS and LOTS of allocations!
Solution - structure binary data format in a way that lets one read directly the
bytes!

“Zero-Copy”: No More Deserialization?

Input
■No allocations: reader extracts types directly from bytes!

Zero-Copy In Action
.fbs Bytes
FlatBuffers Reader
get_user-agent() ->
&str
get_geo_prop(“lat”)
-> &str
●Reader just “interprets” bytes
●Provides methods to extract type
references to data in bytes, eg string
references (&str in Rust)
●MUCH easier in native languages
(Rust, C++)
○Native UTF8 string support
○Smart pointers in Rust
●FlatBuffers, Cap’n Proto, others

Input
Zero-Copy and HashMap Transformations?
JSON Bytes
Zero-Copy Reader
get_user-agent() ->
&str
get_geo_prop(“lat”)
-> &str
●What if I want to update a field in my
HashMap?
●(Yes, some ZC formats suport maps)
●Well, zero-copy input formats are
read-only!
●I must now deserialize fully and
update my in-memory data structure
Deserialize to HashMap Add/Update Field

Input
Zero-Copy Data Structures = Win!
ZeroCopy Bytes
(.fbs, etc)
Zero-Copy Reader
get_geo_info_struct()
-> &StructReader
●Zero-copy data structure: Reference
original struct bytes, add new field.
No copying!
UpdateStruct
Orig Struct
Ref
New Field
UpdateStruct
New FieldGeoInfo
get_user-agent() ->
&str
UserAge
nt

Conclusions

Conclusion

■Biggest improvement came from simplifying data model and
execution model (2-5x)
■Improving HashMaps helps up to 8x, >10x if we avoid maps
■Minimize allocations and object trees
■Copy-on-write and zero-copy when possible
■Rust is the new King of Data Processing

Stay in Touch
Evan Chan
https://velvia.github.io/about
X/Twitter: @evanfchan
Github.com/velvia
IG: @platypus.arts
Tags