Cómo hemos implementado semántica de "Exactly Once" en nuestra base de datos de alto rendimiento

supercoco9 56 views 61 slides Jul 04, 2024
Slide 1
Slide 1 of 61
Slide 1
1
Slide 2
2
Slide 3
3
Slide 4
4
Slide 5
5
Slide 6
6
Slide 7
7
Slide 8
8
Slide 9
9
Slide 10
10
Slide 11
11
Slide 12
12
Slide 13
13
Slide 14
14
Slide 15
15
Slide 16
16
Slide 17
17
Slide 18
18
Slide 19
19
Slide 20
20
Slide 21
21
Slide 22
22
Slide 23
23
Slide 24
24
Slide 25
25
Slide 26
26
Slide 27
27
Slide 28
28
Slide 29
29
Slide 30
30
Slide 31
31
Slide 32
32
Slide 33
33
Slide 34
34
Slide 35
35
Slide 36
36
Slide 37
37
Slide 38
38
Slide 39
39
Slide 40
40
Slide 41
41
Slide 42
42
Slide 43
43
Slide 44
44
Slide 45
45
Slide 46
46
Slide 47
47
Slide 48
48
Slide 49
49
Slide 50
50
Slide 51
51
Slide 52
52
Slide 53
53
Slide 54
54
Slide 55
55
Slide 56
56
Slide 57
57
Slide 58
58
Slide 59
59
Slide 60
60
Slide 61
61

About This Presentation

Los sistemas distribuidos son difíciles. Los sistemas distribuidos de alto rendimiento, más. Latencias de red, mensajes sin confirmación de recibo, reinicios de servidores, fallos de hardware, bugs en el software, releases problemáticas, timeouts... hay un montón de motivos por los que es muy d...


Slide Content

Cómo hemos implementado
semántica de "Exactly Once"
en nuestra base de datos de alto rendimiento

.
Javier Ramírez
@supercoco9
Database Advocate

Agenda.
If you dislike technical details, this is
the wrong presentation
●Intro to Fast & Streaming Data
●Overview of QuestDB Storage
●The Problem with Exactly Once
●Common solutions
●The QuestDB implementation

Not all data
problems are
the same

●a factory floor with 500 machines, or
●a fleet with 500 vehicles, or
●50 trains, with 10 cars each, or
●500 users with a mobile phone

Sending data every second
How to be a (data) billionaire

86,400
* Seconds in one day

604,800
* Seconds in one week

2,628,288
* Seconds in one month. Well, in the average month of 30.42 days anyway

43,200,000 rows a day…….
302,400,000 rows a week….
1,314,144,000 rows a month

How I made my first billion
* See? On streaming data, It is kind of easy to get your first billion of data points

●Optimised for fast append-only ingestion
●Data lifecycle policies
●Analytics over chunks of time
●Time-based aggregations
●Often power real-time dashboards

Time-series database basics

QuestDB would like to be known for:
●Performance
○Also with smaller machines

●Developer Experience
○Multiple protocols and client libraries. Sensible
SQL extensions

●Proudly Open Source
○(Apache 2.0 license)

We have 400k smart meters, each
sending a record every 5 minutes.

~120 million rows per day

Real request from potential user

Quick demo

https://github.com/questdb/time-series-streaming-analytics-template

QuestDB ingestion and storage layer
●Data always stored by incremental timestamp.

●Data partitioned by time units and stored in tabular columnar format.

●No indexes needed*. Data is immediately available after writing.

●Predictable ingestion rate, even under demanding workloads (millions/second).

●Built-in event deduplication.

●Optimized data types (Symbol, geohash, ipv4, uuid…).

●Row updates and upserts supported.

Parallel Write Ahead Log (WAL)
15

Parallelism in the Write-Ahead Log
Client Connections C1 C2 C3
ϟ ϟ ϟ
W3W2W1WAL Writers
tx01 tx03 tx04
tx02
tx06
tx11
tx05
tx08
tx12
tx07
tx09
tx10
Sequencer W1[0]W1[1] W3[0]W2[0] …

Out-of-order Merge
W3W2W1
tx01 tx03 tx04
tx02
tx06
tx11
tx05
tx08
tx12
tx07
tx09
tx10
W1[0]W1[1] W3[0]W2[0] …
tx01

ts price symbol qty

ts01 178.08 AAPL 1000
ts02 148.66 GOOGL 400
ts03 424.86 MSFT 5000
ts10 178.09 AMZN 100
ts11 505.08 META 2500
ts12 394.14 GS 2000
… … … …
tx02

ts price symbol qty

ts04 192.42 JPM 5000
ts05 288.78 V 300
ts06 156.40 JNJ 6500
ts07 181.62 AMD 7800
ts08 37.33 BAC 1500
ts09 60.83 KO 4000
… … … …

18

WAL gets
applied
to the
storage.


Deduplication
happens at
this moment
19

Storage Engine - file system layout
20
2022–04-11
2022–04-12
.d .d
.d
.i
.d .i
string.d
price.d
string.i
Partition 2
Partition 1
Fixed-size
Column file
Var-size
Column files
price.dstring.dstring.i

Storage Engine - Partitioning
21
Ticker Side Price Amount Time
ETH-USD sell 2615.54 0.00044 18:00
ETH-USD sell 2617.13 0.001 19:00
BTC-USD buy 39269.98 0.000245 18:02
ETH-USD buy 2640.21 0.14810976 18:07
BTC-USD buy 39270.01 0.000245 19:05
BTC-USD sell 39250.21 0.00046562 18:05
ETH-USD sell 2621.03 0.000127 19:09
ETH-USD buy 2640.18 0.02593599 19:06
BTC-USD buy 39270.01 0.0127959 18:10
ETH-USD sell 2615.54 0.00044 18:00
BTC-USD buy 39269.98 0.000245 18:02
BTC-USD sell 39250.21 0.00046562 18:05
ETH-USD buy 2640.21 0.14810976 18:07
BTC-USD buy 39270.01 0.0127959 18:10
ETH-USD sell 2617.13 0.001 19:00
BTC-USD buy 39270.01 0.000245 19:05
ETH-USD buy 2640.18 0.02593599 19:06
ETH-USD sell 2621.03 0.000127 19:09
18H
19H
SQL:

Storage Engine - var-size
22
O
1
O
2
.i
O
3
.d
index data
Var size data
●Index contains 64-bit offsets into data file
●Data entries are length prefixed
●Index contains N+1 elements
O
4

Storage Engine - Ingestion logic
23
●Choice of slow and fast data paths
●Fast path is append
●Slow path is merge. It might affect data freshness, but not ingestion rate.
●New data is analyzed and partitioned into copy-tasks
●Copy tasks are executed concurrently

●Partitions are versioned



●Columns are versioned within partition



●Merge operation will create a new partition with new transaction index

●Queries will switch over to new snapshot when they are ready

Storage Engine - snapshots
24
2022–04-11T18.9901
ticker.d.10031
2022–04-11T18.9945
ticker.d.10049

QuestDB Storage* Model
* Time-sorted. No indexes neededhttps://questdb.io/docs/concept/storage-model/

Storage Engine - abstraction API
26
●Distinct and separate Writer and Reader API
●Low-level de-coupling of storage from compute
●Lock-free data structures
●Coordinate consistent data access via optimistic read
●Coordinate across processes

We have 400k smart meters, each
sending a record every 5 minutes.

~120 million rows per day

Real request from potential user

When you design it

When you operate it

Sometimes the sender cannot be
sure if an event was received
●Application errors
●Connectivity issues
●Network timeout/server busy
●Component temporarily offline/restarting/updating
●Hardware failure
●Full disk
●Just how protocols work

QuestDB at a glance
31
Network
API
Compute
API
Storage
API Storage Engine Writer API
ILP (over
TCP socket
or HTTP)
Bulk Loader SQL Engine
REST
PG Wire
Data Ingress
Data Egress Reader API
Reader API

TCP
over
Socket.

You might
experience
data loss.

TCP
Backpressure*


It might cause
problems on the
producer

*It gets worse with HTTP or sync protocols

Three possible options. It depends
on your requirements
●At most once
○you might lose data
●At least once
○you might get duplicates
●Exactly Once (or effectively once)
○you might lose your mind

Exactly once. How hard can it be?
Kafka design document (23 pages):
https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messa
ging
Kafka design document. Whole discussion (67 pages):
https://docs.google.com/document/d/11Jqy_GjUGtdXJK94XGsEIK7CP1SnQGdp2eF0wSw9ra8/edit#heading=h.xq0
ee1vnpz4o
Distributed Snapshots: Determining Global States of a Distributed System (13 pages):
https://www.microsoft.com/en-us/research/publication/distributed-snapshots-determining-global-states-distri
buted-system/?from=https://research.microsoft.com/en-us/um/people/lamport/pubs/chandy.pdf&type=exact

Snapshot
without
Channels.


Happy path

Snapshot
without
Channels.


Message
duplication

Snapshot
without
Channels.


Message Loss

Chandy-
Lamport.


Snapshotting
Channels and
Using Markers

Basic
Snapshot.


With actual
duplicate

Chandy-
Lamport.


With actual
duplicate

Chandy-
Lamport


With actual
duplicate across
snapshots

Apache Flink uses Chandy-Lamport
https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/learn-flink/fault_tolerance/

Job Graph

Execution
Graph

Task
Lifecycle
Diagram.



Yet, it gets no
dupes

Exactly
once
In Kafka.


All producers, the
broker, and
consumers need
to coordinate
state

QuestDB Deduplication

Enabling/disabling Deduplication
CREATE TABLE IF NOT EXISTS 'trades' (
symbol SYMBOL capacity 256 CACHE,
side SYMBOL capacity 256 CACHE,
price DOUBLE,
amount DOUBLE,
timestamp TIMESTAMP
) timestamp (timestamp) PARTITION BY DAY WAL
DEDUP UPSERT KEYS(timestamp, symbol, side);

ALTER TABLE trades DEDUP DISABLE;

ALTER TABLE trades DEDUP ENABLE UPSERT KEYS(timestamp, symbol, side);

Where to find the implementation
https://github.com/questdb/questdb/pull/3524

https://github.com/questdb/questdb/pull/3566

The key part is done at:
https://github.com/questdb/questdb/blob/master/core/src/main/c/sh
are/dedup.cpp
https://github.com/questdb/questdb/blob/9626172376e59e2de743753b
1bf8d6f0356e9199/core/src/main/c/share/util.h

QuestDB
Deduplication
and Upsert.


High Level
Overview

template<typename LambdaDiff>
inline int64_t conventional_branching_search(const index_t *array, int64_t
count, int64_t value_index, LambdaDiff compare) {
int64_t low = 0;
int64_t high = count - 1;
while (low <= high) {
int64_t mid = (low + high) / 2;
auto diff = compare(value_index, array[mid].i);
if (diff == 0) {
return mid; // Found the element
} else if (diff < 0) {
high = mid - 1; // Search in the left half
} else {
low = mid + 1; // Search in the right half
}
}
return -1; // Element not found
}
Binary search by timestamp (not actual implementation)

QuestDB Query engine internals
●Our Java codebase has zero dependencies. No garbage collection on
the hot path. As close to the hardware as possible.

●We research the latest trends. Our code takes advantage of the
state-of-the-art in CPU, storage design, and data structures.

●We implement our own Just in Time Compiler to make query execution
as parallel (using SIMD and multi-core) and fast as possible.

●We spend weeks of development to save microseconds or
nanoseconds in many operations.

Modern
CPU
Pipeline

CPU
Branch
Prediction

template<typename LambdaDiff>
inline int64_t branch_free_search(const index_t *array, int64_t count, int64_t
value_index, LambdaDiff compare) {
const index_t *base = array;
int64_t n = count;
while (n > 1) {
int64_t half = n / 2;
MM_PREFETCH_T0(base + half / 2);
MM_PREFETCH_T0(base + half + half / 2);
auto diff = compare(value_index, base[half].i);
base = (diff > 0) ? base + half : base;
n -= half;
}
if (compare(value_index, base[0].i) == 0) {
return base - array;
}
if (base - array + 1 < count && compare(value_index, base[1].i) == 0) {
return base - array + 1;
}
return -1;
}
“Branch-free” Binary search by timestamp

Quick deduplication demo

https://github.com/questdb/time-series-streaming-analytics-template

Deduplication Performance
We will ingest 15 uncompressed CSV files, each containing 12,614,400 rows, for a total of 189,216,000 rows
representing 12 years of hourly data. On an AWS EC2 instance: m6a.4xlarge, 16 CPUs, 64 Gigs of RAM, GP3 EBS
volume.

The total size of the raw CSVs is about 17GB, and we are reading from a RAM disk to minimize the impact of
reading the files. We will be reading/parsing/ingesting from up to 8 files in parallel. The experiment's scripts are
written in Python. We could optimize ingestion by reducing CSV parsing time using a different programming
language or advanced techniques.
https://github.com/javier/deduplication-stats-questdb

What we discussed.
If you dislike technical details, it is
probably too late now
●Intro to Fast & Streaming Data
●Overview of QuestDB Storage
●The Problem with Exactly Once
●Common solutions
●The QuestDB implementation

QuestDB OSS
Open Source. Self-managed. Suitable for
production workloads.
https://github.com/questdb/questdb


QuestDB Enterprise
Licensed. Self-managed. Enterprise features like
RBAC, compression, replication, TLS on all
protocols, cold storage, K8s operator…
https://questdb.io/enterprise/

QuestDB Cloud
Fully managed, pay per usage environment,
with enterprise-grade features.
https://questdb.io/cloud/

61
●github.com/questdb/questdb
●https://questdb.io
●https://demo.questdb.io
●https://slack.questdb.io/
●https://github.com/questdb/time-series-
streaming-analytics-template
We ?????? contributions
and GitHub ⭐ stars
Javier Ramírez
@supercoco9
Database Advocate