Migrating 50TB Data From a Home-Grown Database to ScyllaDB, Fast by Terence Liu

ScyllaDB 191 views 16 slides Mar 11, 2025
Slide 1
Slide 1 of 16
Slide 1
1
Slide 2
2
Slide 3
3
Slide 4
4
Slide 5
5
Slide 6
6
Slide 7
7
Slide 8
8
Slide 9
9
Slide 10
10
Slide 11
11
Slide 12
12
Slide 13
13
Slide 14
14
Slide 15
15
Slide 16
16

About This Presentation

Terence share how Clearview AI's infra needs evolved and why they chose ScyllaDB after first-principles research. From fast ingestion to production queries, the talk explores their journey with Rust, embedded DB readers, and the ScyllaDB Rust driver—plus config tips for bulk ingestion and achi...


Slide Content

A ScyllaDB Community
Migrating 50TB Data From a
Home-Grown Database to
ScyllaDB, Fast
Terence Liu
Head of ML

Terence Liu
■Computer Vision, vector search, data pipelines /
databases @ Clearview AI since 2021
■Previously, Platform (CIS) @ Bloomberg LP
■Clearview AI: High vectors (60B+) /engineers ratio :)

■In-house database & its utility
■The search for a fuller-featured NoSQL database
■Snapshot bulk ingestion (with RocksDB iterator & Scylla Rust driver)
■Transferring to PROD to bring alive
■Reaching data parity (dual write + diff backfill)
Presentation Agenda

In-house Database & its Utility
●Based on RocksDB
●Distributed KV store
●Fills up a shard and moves on
to new one
●Keys are already hashes
●Not resizable
●Metadata for point lookup
●Suits our needs till now

The Search for a Fuller-Featured NoSQL Database
●TiKV
○Almost a drop-in replacement
○Key hashing, Raft, auto-shard placement
○However, we would like a little more data modeling & indexability
●Cassandra
○Leaderless with tunable consistency
○Wide-column store with data modeling & indexibilty
●ScyllaDB
○Drop-in replacement for Cassandra
○C++ & Shard-per-core for performance

We started with ingesting a most recent snapshot of data from all shards
of the home-grown database.
■We have very rare deletions - afforded to do snapshot ingestion
■Allows us to test writes with safety
■Repeatable if needed
■Leads to an equivalent data collection with new DB
■Can test read performance at the target scale

Snapshot Bulk Ingestion

■Each shard contains multiple RocksDB databases in home-grown DB.
■Each RocksDB database is modeled as a UDT in Scylla.
■Use RocksDB Rust binding to parallelly iterate over key ranges.
■Use Scylla Rust driver to write to Scylla.
■Monitor client host’s CPUs, disk/networking IO to not be bottleneck.
Snapshot Bulk Ingestion (Client-Side)

■i3en.24xlarge - 96 vCPUs, 768 GB mem, 60TB NVMe disks in RAID0
■Recommended by Scylla docs.
■Monitor server’s CPUs, disk/networking IO.
■Scylla Monitoring Stack quite helpful.
■Adjust params (only for bulk ingestion!):
■Increase thresholds & timeouts: batch_size_fail_threshold_in_kb,
batch_size_warn_threshold_in_kb, write_request_timeout_in_ms,
read_request_timeout_in_ms
■hinted_handoff_enabled: false
Snapshot Bulk Ingestion (Server-Side)

Snapshot Bulk Ingestion (Data Modeling)
CREATE KEYSPACE metadata WITH replication = {'class': 'NetworkTopologyStrategy' ,
'replication_factor' : 1} AND tablets = {'initial': 4096};
USE metadata;

CREATE TYPE image (
url text,
-- ... other fields
);

CREATE TABLE metadata (
blob_id ascii PRIMARY KEY,
image image,
-- ... other columns
) WITH compression = {'sstable_compression' : 'org.apache.cassandra.io.compress.ZstdCompressor' };

const N_BATCH: usize = 200;

pub async fn create_session(uri: &str) -> Result<Session> {
let profile = ExecutionProfile::builder()
.consistency(Consistency::LocalOne)
.request_timeout(None) // no request timeout
.build();

SessionBuilder::new()
.known_node(uri)
.default_execution_profile_handle (profile.into_handle())
.build()
.await
.map_err(From::from)
}

#[derive(Debug, FromUserType, SerializeValue)]
pub struct ImageUDT {
/// URL of the image file
pub url: String,
}
Snapshot Bulk Ingestion (Code)
let prepared_stmt = session
.prepare(format!(
"INSERT INTO metadata.metadata (blob_id, image) VALUES(?, ?)" ,
))
.await?;

let mut prepared_batch = Batch::default();
for _ in 0..N_BATCH {
prepared_batch.append_statement(prepared_stmt.clone());
}
// ... aggregate batch content into values
if let Err(e) = session.batch(&prepared_batch, &values).await {
warn!("Error: {}. Sending one-by-one", e);
for (blob_id, col_data) in values {
session
.execute_unpaged(&prepared_stmt, (blob_id, col_data))
.await
.unwrap();
}
};
Full sample code
Throughput: 700k/s
CPU-bound

Snapshot Bulk Ingestion (Monitoring)

/// Generates N evenly distributed token ranges covering ScyllaDB's entire token range
/// Returns a vector of tuples where each tuple is (start_token, end_token)
pub fn generate_token_ranges (n_ranges: u32) -> Vec<(i64, i64)> {
// ScyllaDB uses a signed 64-bit integer range for tokens
const MIN_TOKEN: i64 = i64::MIN;
const MAX_TOKEN: i64 = i64::MAX;

// Calculate the size of each range
// Using i128 for intermediate calculations to avoid overflow
let total_range: i128 = (MAX_TOKEN as i128) - (MIN_TOKEN as i128);
let range_size: i128 = total_range / (n_ranges as i128);

// Generate the ranges
let mut ranges = Vec::with_capacity(n_ranges as usize);
for i in 0..n_ranges {
let start = (MIN_TOKEN as i128 + (i as i128 * range_size)) as i64;
let end = if i == n_ranges - 1 {
MAX_TOKEN
} else {
(MIN_TOKEN as i128 + ((i as i128 + 1) * range_size)) as i64
};
ranges.push((start, end));
}
ranges
}
Total Count
Validation

Total Count Validation
let mut prepared_stmt = session
.prepare(format!("SELECT count(*) FROM {}.{} WHERE token(blob_id) > ? AND token(blob_id) <= ? BYPASS CACHE" , keyspace_name, table_name))
.await?;
let mut token_ranges = generate_token_ranges (65536);

let total = token_ranges
.into_par_iter()
.map(|(start, end)| {
let count = runtime.block_on(async {
let res = session
.execute_unpaged(&prepared_stmt, (start, end))
.await
.unwrap();
let row = res.single_row_typed::<(i64,)>().unwrap();
row.0 as usize
});
count
})
.reduce(|| 0_usize, |acc, c| acc + c);

●Run range scan for total count as validation (4~5 hrs)
●Save SSTs directory to S3
●Benchmark single-node read performance against this copy
●nodetool refresh on a full cluster to import (days)
Bring-Alive

●Configure the application to write to both old and new databases
●Backfill the difference between initial snapshot and time of dual-write
●Test read integrity
●Disable read/write path to old DB in application
●Decommission old DB
Reaching Data Parity

Thank you! Stay in Touch
Terence Liu
[email protected]
@terencezliu
@terencezl
https://www.linkedin.com/in/terence-z-liu
Tags