Migrating 50TB Data From a Home-Grown Database to ScyllaDB, Fast by Terence Liu
ScyllaDB
191 views
16 slides
Mar 11, 2025
Slide 1 of 16
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
About This Presentation
Terence share how Clearview AI's infra needs evolved and why they chose ScyllaDB after first-principles research. From fast ingestion to production queries, the talk explores their journey with Rust, embedded DB readers, and the ScyllaDB Rust driver—plus config tips for bulk ingestion and achi...
Terence share how Clearview AI's infra needs evolved and why they chose ScyllaDB after first-principles research. From fast ingestion to production queries, the talk explores their journey with Rust, embedded DB readers, and the ScyllaDB Rust driver—plus config tips for bulk ingestion and achieving data parity.
Size: 1.43 MB
Language: en
Added: Mar 11, 2025
Slides: 16 pages
Slide Content
A ScyllaDB Community
Migrating 50TB Data From a
Home-Grown Database to
ScyllaDB, Fast
Terence Liu
Head of ML
Terence Liu
■Computer Vision, vector search, data pipelines /
databases @ Clearview AI since 2021
■Previously, Platform (CIS) @ Bloomberg LP
■Clearview AI: High vectors (60B+) /engineers ratio :)
■In-house database & its utility
■The search for a fuller-featured NoSQL database
■Snapshot bulk ingestion (with RocksDB iterator & Scylla Rust driver)
■Transferring to PROD to bring alive
■Reaching data parity (dual write + diff backfill)
Presentation Agenda
In-house Database & its Utility
●Based on RocksDB
●Distributed KV store
●Fills up a shard and moves on
to new one
●Keys are already hashes
●Not resizable
●Metadata for point lookup
●Suits our needs till now
The Search for a Fuller-Featured NoSQL Database
●TiKV
○Almost a drop-in replacement
○Key hashing, Raft, auto-shard placement
○However, we would like a little more data modeling & indexability
●Cassandra
○Leaderless with tunable consistency
○Wide-column store with data modeling & indexibilty
●ScyllaDB
○Drop-in replacement for Cassandra
○C++ & Shard-per-core for performance
We started with ingesting a most recent snapshot of data from all shards
of the home-grown database.
■We have very rare deletions - afforded to do snapshot ingestion
■Allows us to test writes with safety
■Repeatable if needed
■Leads to an equivalent data collection with new DB
■Can test read performance at the target scale
Snapshot Bulk Ingestion
■Each shard contains multiple RocksDB databases in home-grown DB.
■Each RocksDB database is modeled as a UDT in Scylla.
■Use RocksDB Rust binding to parallelly iterate over key ranges.
■Use Scylla Rust driver to write to Scylla.
■Monitor client host’s CPUs, disk/networking IO to not be bottleneck.
Snapshot Bulk Ingestion (Client-Side)
#[derive(Debug, FromUserType, SerializeValue)]
pub struct ImageUDT {
/// URL of the image file
pub url: String,
}
Snapshot Bulk Ingestion (Code)
let prepared_stmt = session
.prepare(format!(
"INSERT INTO metadata.metadata (blob_id, image) VALUES(?, ?)" ,
))
.await?;
let mut prepared_batch = Batch::default();
for _ in 0..N_BATCH {
prepared_batch.append_statement(prepared_stmt.clone());
}
// ... aggregate batch content into values
if let Err(e) = session.batch(&prepared_batch, &values).await {
warn!("Error: {}. Sending one-by-one", e);
for (blob_id, col_data) in values {
session
.execute_unpaged(&prepared_stmt, (blob_id, col_data))
.await
.unwrap();
}
};
Full sample code
Throughput: 700k/s
CPU-bound
Snapshot Bulk Ingestion (Monitoring)
/// Generates N evenly distributed token ranges covering ScyllaDB's entire token range
/// Returns a vector of tuples where each tuple is (start_token, end_token)
pub fn generate_token_ranges (n_ranges: u32) -> Vec<(i64, i64)> {
// ScyllaDB uses a signed 64-bit integer range for tokens
const MIN_TOKEN: i64 = i64::MIN;
const MAX_TOKEN: i64 = i64::MAX;
// Calculate the size of each range
// Using i128 for intermediate calculations to avoid overflow
let total_range: i128 = (MAX_TOKEN as i128) - (MIN_TOKEN as i128);
let range_size: i128 = total_range / (n_ranges as i128);
// Generate the ranges
let mut ranges = Vec::with_capacity(n_ranges as usize);
for i in 0..n_ranges {
let start = (MIN_TOKEN as i128 + (i as i128 * range_size)) as i64;
let end = if i == n_ranges - 1 {
MAX_TOKEN
} else {
(MIN_TOKEN as i128 + ((i as i128 + 1) * range_size)) as i64
};
ranges.push((start, end));
}
ranges
}
Total Count
Validation
Total Count Validation
let mut prepared_stmt = session
.prepare(format!("SELECT count(*) FROM {}.{} WHERE token(blob_id) > ? AND token(blob_id) <= ? BYPASS CACHE" , keyspace_name, table_name))
.await?;
let mut token_ranges = generate_token_ranges (65536);
let total = token_ranges
.into_par_iter()
.map(|(start, end)| {
let count = runtime.block_on(async {
let res = session
.execute_unpaged(&prepared_stmt, (start, end))
.await
.unwrap();
let row = res.single_row_typed::<(i64,)>().unwrap();
row.0 as usize
});
count
})
.reduce(|| 0_usize, |acc, c| acc + c);
●Run range scan for total count as validation (4~5 hrs)
●Save SSTs directory to S3
●Benchmark single-node read performance against this copy
●nodetool refresh on a full cluster to import (days)
Bring-Alive
●Configure the application to write to both old and new databases
●Backfill the difference between initial snapshot and time of dual-write
●Test read integrity
●Disable read/write path to old DB in application
●Decommission old DB
Reaching Data Parity
Thank you! Stay in Touch
Terence Liu [email protected]
@terencezliu
@terencezl
https://www.linkedin.com/in/terence-z-liu