Cómo hemos implementado semántica de "Exactly Once" en nuestra base de datos de alto rendimiento
supercoco9
56 views
61 slides
Jul 04, 2024
Slide 1 of 61
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
About This Presentation
Los sistemas distribuidos son difíciles. Los sistemas distribuidos de alto rendimiento, más. Latencias de red, mensajes sin confirmación de recibo, reinicios de servidores, fallos de hardware, bugs en el software, releases problemáticas, timeouts... hay un montón de motivos por los que es muy d...
Los sistemas distribuidos son difíciles. Los sistemas distribuidos de alto rendimiento, más. Latencias de red, mensajes sin confirmación de recibo, reinicios de servidores, fallos de hardware, bugs en el software, releases problemáticas, timeouts... hay un montón de motivos por los que es muy difícil saber si un mensaje que has enviado se ha recibido y procesado correctamente en destino. Así que para asegurar mandas el mensaje otra vez.. y otra... y cruzas los dedos para que el sistema del otro lado tenga tolerancia a los duplicados.
QuestDB es una base de datos open source diseñada para alto rendimiento. Nos queríamos asegurar de poder ofrecer garantías de "exactly once", deduplicando mensajes en tiempo de ingestión. En esta charla, te cuento cómo diseñamos e implementamos la palabra clave DEDUP en QuestDB, permitiendo deduplicar y además permitiendo Upserts en datos en tiempo real, añadiendo solo un 8% de tiempo de proceso, incluso en flujos con millones de inserciones por segundo.
Además, explicaré nuestra arquitectura de log de escrituras (WAL) paralelo y multithread. Por supuesto, todo esto te lo cuento con demos, para que veas cómo funciona en la práctica.
Size: 3.15 MB
Language: en
Added: Jul 04, 2024
Slides: 61 pages
Slide Content
Cómo hemos implementado
semántica de "Exactly Once"
en nuestra base de datos de alto rendimiento
.
Javier Ramírez
@supercoco9
Database Advocate
Agenda.
If you dislike technical details, this is
the wrong presentation
●Intro to Fast & Streaming Data
●Overview of QuestDB Storage
●The Problem with Exactly Once
●Common solutions
●The QuestDB implementation
Not all data
problems are
the same
●a factory floor with 500 machines, or
●a fleet with 500 vehicles, or
●50 trains, with 10 cars each, or
●500 users with a mobile phone
Sending data every second
How to be a (data) billionaire
86,400
* Seconds in one day
604,800
* Seconds in one week
2,628,288
* Seconds in one month. Well, in the average month of 30.42 days anyway
43,200,000 rows a day…….
302,400,000 rows a week….
1,314,144,000 rows a month
How I made my first billion
* See? On streaming data, It is kind of easy to get your first billion of data points
●Optimised for fast append-only ingestion
●Data lifecycle policies
●Analytics over chunks of time
●Time-based aggregations
●Often power real-time dashboards
Time-series database basics
QuestDB would like to be known for:
●Performance
○Also with smaller machines
●Developer Experience
○Multiple protocols and client libraries. Sensible
SQL extensions
●Proudly Open Source
○(Apache 2.0 license)
We have 400k smart meters, each
sending a record every 5 minutes.
Storage Engine - var-size
22
O
1
O
2
.i
O
3
.d
index data
Var size data
●Index contains 64-bit offsets into data file
●Data entries are length prefixed
●Index contains N+1 elements
O
4
Storage Engine - Ingestion logic
23
●Choice of slow and fast data paths
●Fast path is append
●Slow path is merge. It might affect data freshness, but not ingestion rate.
●New data is analyzed and partitioned into copy-tasks
●Copy tasks are executed concurrently
●Partitions are versioned
●Columns are versioned within partition
●Merge operation will create a new partition with new transaction index
●Queries will switch over to new snapshot when they are ready
QuestDB Storage* Model
* Time-sorted. No indexes neededhttps://questdb.io/docs/concept/storage-model/
Storage Engine - abstraction API
26
●Distinct and separate Writer and Reader API
●Low-level de-coupling of storage from compute
●Lock-free data structures
●Coordinate consistent data access via optimistic read
●Coordinate across processes
We have 400k smart meters, each
sending a record every 5 minutes.
~120 million rows per day
Real request from potential user
When you design it
When you operate it
Sometimes the sender cannot be
sure if an event was received
●Application errors
●Connectivity issues
●Network timeout/server busy
●Component temporarily offline/restarting/updating
●Hardware failure
●Full disk
●Just how protocols work
QuestDB at a glance
31
Network
API
Compute
API
Storage
API Storage Engine Writer API
ILP (over
TCP socket
or HTTP)
Bulk Loader SQL Engine
REST
PG Wire
Data Ingress
Data Egress Reader API
Reader API
TCP
over
Socket.
You might
experience
data loss.
TCP
Backpressure*
It might cause
problems on the
producer
*It gets worse with HTTP or sync protocols
Three possible options. It depends
on your requirements
●At most once
○you might lose data
●At least once
○you might get duplicates
●Exactly Once (or effectively once)
○you might lose your mind
Exactly once. How hard can it be?
Kafka design document (23 pages):
https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messa
ging
Kafka design document. Whole discussion (67 pages):
https://docs.google.com/document/d/11Jqy_GjUGtdXJK94XGsEIK7CP1SnQGdp2eF0wSw9ra8/edit#heading=h.xq0
ee1vnpz4o
Distributed Snapshots: Determining Global States of a Distributed System (13 pages):
https://www.microsoft.com/en-us/research/publication/distributed-snapshots-determining-global-states-distri
buted-system/?from=https://research.microsoft.com/en-us/um/people/lamport/pubs/chandy.pdf&type=exact
All producers, the
broker, and
consumers need
to coordinate
state
QuestDB Deduplication
Enabling/disabling Deduplication
CREATE TABLE IF NOT EXISTS 'trades' (
symbol SYMBOL capacity 256 CACHE,
side SYMBOL capacity 256 CACHE,
price DOUBLE,
amount DOUBLE,
timestamp TIMESTAMP
) timestamp (timestamp) PARTITION BY DAY WAL
DEDUP UPSERT KEYS(timestamp, symbol, side);
ALTER TABLE trades DEDUP DISABLE;
ALTER TABLE trades DEDUP ENABLE UPSERT KEYS(timestamp, symbol, side);
Where to find the implementation
https://github.com/questdb/questdb/pull/3524
https://github.com/questdb/questdb/pull/3566
The key part is done at:
https://github.com/questdb/questdb/blob/master/core/src/main/c/sh
are/dedup.cpp
https://github.com/questdb/questdb/blob/9626172376e59e2de743753b
1bf8d6f0356e9199/core/src/main/c/share/util.h
QuestDB
Deduplication
and Upsert.
High Level
Overview
template<typename LambdaDiff>
inline int64_t conventional_branching_search(const index_t *array, int64_t
count, int64_t value_index, LambdaDiff compare) {
int64_t low = 0;
int64_t high = count - 1;
while (low <= high) {
int64_t mid = (low + high) / 2;
auto diff = compare(value_index, array[mid].i);
if (diff == 0) {
return mid; // Found the element
} else if (diff < 0) {
high = mid - 1; // Search in the left half
} else {
low = mid + 1; // Search in the right half
}
}
return -1; // Element not found
}
Binary search by timestamp (not actual implementation)
QuestDB Query engine internals
●Our Java codebase has zero dependencies. No garbage collection on
the hot path. As close to the hardware as possible.
●We research the latest trends. Our code takes advantage of the
state-of-the-art in CPU, storage design, and data structures.
●We implement our own Just in Time Compiler to make query execution
as parallel (using SIMD and multi-core) and fast as possible.
●We spend weeks of development to save microseconds or
nanoseconds in many operations.
Modern
CPU
Pipeline
CPU
Branch
Prediction
template<typename LambdaDiff>
inline int64_t branch_free_search(const index_t *array, int64_t count, int64_t
value_index, LambdaDiff compare) {
const index_t *base = array;
int64_t n = count;
while (n > 1) {
int64_t half = n / 2;
MM_PREFETCH_T0(base + half / 2);
MM_PREFETCH_T0(base + half + half / 2);
auto diff = compare(value_index, base[half].i);
base = (diff > 0) ? base + half : base;
n -= half;
}
if (compare(value_index, base[0].i) == 0) {
return base - array;
}
if (base - array + 1 < count && compare(value_index, base[1].i) == 0) {
return base - array + 1;
}
return -1;
}
“Branch-free” Binary search by timestamp
Deduplication Performance
We will ingest 15 uncompressed CSV files, each containing 12,614,400 rows, for a total of 189,216,000 rows
representing 12 years of hourly data. On an AWS EC2 instance: m6a.4xlarge, 16 CPUs, 64 Gigs of RAM, GP3 EBS
volume.
The total size of the raw CSVs is about 17GB, and we are reading from a RAM disk to minimize the impact of
reading the files. We will be reading/parsing/ingesting from up to 8 files in parallel. The experiment's scripts are
written in Python. We could optimize ingestion by reducing CSV parsing time using a different programming
language or advanced techniques.
https://github.com/javier/deduplication-stats-questdb
What we discussed.
If you dislike technical details, it is
probably too late now
●Intro to Fast & Streaming Data
●Overview of QuestDB Storage
●The Problem with Exactly Once
●Common solutions
●The QuestDB implementation
QuestDB OSS
Open Source. Self-managed. Suitable for
production workloads.
https://github.com/questdb/questdb
QuestDB Enterprise
Licensed. Self-managed. Enterprise features like
RBAC, compression, replication, TLS on all
protocols, cold storage, K8s operator…
https://questdb.io/enterprise/
QuestDB Cloud
Fully managed, pay per usage environment,
with enterprise-grade features.
https://questdb.io/cloud/
61
●github.com/questdb/questdb
●https://questdb.io
●https://demo.questdb.io
●https://slack.questdb.io/
●https://github.com/questdb/time-series-
streaming-analytics-template
We ?????? contributions
and GitHub ⭐ stars
Javier Ramírez
@supercoco9
Database Advocate