From c00f12f39da4d75f6cea601327d22ec77d740250 Mon Sep 17 00:00:00 2001 From: Connor Tsui Date: Fri, 8 May 2026 17:05:35 -0400 Subject: [PATCH] use many connections on the benchmarks server Signed-off-by: Connor Tsui --- Cargo.lock | 1 + benchmarks-website/server/Cargo.toml | 1 + benchmarks-website/server/src/app.rs | 2 +- benchmarks-website/server/src/db.rs | 40 ++++++++++++++++++------- benchmarks-website/server/src/ingest.rs | 34 +++++++++++++++++++++ benchmarks-website/server/src/lib.rs | 6 ++-- 6 files changed, 69 insertions(+), 15 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index bf8c74d7058..9520911ca5b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10438,6 +10438,7 @@ dependencies = [ "flate2", "insta", "maud", + "parking_lot", "reqwest 0.13.3", "serde", "serde_json", diff --git a/benchmarks-website/server/Cargo.toml b/benchmarks-website/server/Cargo.toml index 420ee082ff9..abc94925fb5 100644 --- a/benchmarks-website/server/Cargo.toml +++ b/benchmarks-website/server/Cargo.toml @@ -29,6 +29,7 @@ base64 = "0.22" # track vortex-duckdb's bundled engine version (build.rs) duckdb = { version = "1.10502", features = ["bundled"] } maud = { version = "0.27", features = ["axum"] } +parking_lot = { workspace = true } serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true } subtle = "2.6" diff --git a/benchmarks-website/server/src/app.rs b/benchmarks-website/server/src/app.rs index d013bfe9ad7..81f742f6d43 100644 --- a/benchmarks-website/server/src/app.rs +++ b/benchmarks-website/server/src/app.rs @@ -35,7 +35,7 @@ use crate::ingest; /// or a small `String`). #[derive(Clone)] pub struct AppState { - /// Mutex-guarded DuckDB connection. See [`crate::db`]. + /// Shared DuckDB handle. See [`crate::db`]. pub db: DbHandle, /// Bearer token expected on `/api/ingest`. Compared via constant-time eq. pub bearer_token: Arc, diff --git a/benchmarks-website/server/src/db.rs b/benchmarks-website/server/src/db.rs index ce503a701ae..3ed7952f1e5 100644 --- a/benchmarks-website/server/src/db.rs +++ b/benchmarks-website/server/src/db.rs @@ -3,9 +3,10 @@ //! DuckDB connection management plus the deterministic `measurement_id` hash. //! -//! The server holds a single [`duckdb::Connection`] inside an async -//! [`tokio::sync::Mutex`]. All DB work runs inside `spawn_blocking` so the -//! Tokio runtime is never blocked on synchronous DuckDB calls. +//! The server keeps one root [`duckdb::Connection`] and clones a fresh +//! connection from it for each blocking DB task. All DB work runs inside +//! `spawn_blocking` so the Tokio runtime is never blocked on synchronous +//! DuckDB calls. //! //! `measurement_id` is a server-internal xxhash64 over `commit_sha` plus //! each table's dimensional tuple. Including `commit_sha` makes every @@ -21,7 +22,7 @@ use std::sync::Arc; use anyhow::Context as _; use anyhow::Result; use duckdb::Connection; -use tokio::sync::Mutex; +use parking_lot::Mutex; use twox_hash::XxHash64; use crate::records::CompressionSize; @@ -31,8 +32,25 @@ use crate::records::RandomAccessTime; use crate::records::VectorSearchRun; use crate::schema::SCHEMA_DDL; -/// A connection guard the rest of the crate hands around. -pub type DbHandle = Arc>; +/// Shared DuckDB handle. Cloning the handle is cheap; each DB task clones a +/// task-local [`Connection`] before doing work. +#[derive(Clone)] +pub struct DbHandle { + root: Arc>, +} + +impl DbHandle { + fn new(root: Connection) -> Self { + Self { + root: Arc::new(Mutex::new(root)), + } + } + + fn connection(&self) -> Result { + let root = self.root.lock(); + root.try_clone().context("cloning DuckDB connection") + } +} /// Open the DuckDB file at `path` (creating it if absent) and apply the /// schema DDL. Returns a handle ready to be cloned into the Axum state. @@ -41,11 +59,11 @@ pub fn open>(path: P) -> Result { .with_context(|| format!("opening DuckDB at {}", path.as_ref().display()))?; conn.execute_batch(SCHEMA_DDL) .context("applying schema DDL")?; - Ok(Arc::new(Mutex::new(conn))) + Ok(DbHandle::new(conn)) } -/// Run a synchronous DB operation on the blocking pool, holding the connection -/// mutex for the duration of the call. +/// Run a synchronous DB operation on the blocking pool using a task-local +/// DuckDB connection cloned from the shared database handle. pub async fn run_blocking(handle: &DbHandle, f: F) -> Result where F: FnOnce(&mut Connection) -> Result + Send + 'static, @@ -53,8 +71,8 @@ where { let handle = handle.clone(); tokio::task::spawn_blocking(move || { - let mut guard = handle.blocking_lock(); - f(&mut guard) + let mut conn = handle.connection()?; + f(&mut conn) }) .await .context("DB task panicked")? diff --git a/benchmarks-website/server/src/ingest.rs b/benchmarks-website/server/src/ingest.rs index b97401bea8a..b5bd6d8bc4d 100644 --- a/benchmarks-website/server/src/ingest.rs +++ b/benchmarks-website/server/src/ingest.rs @@ -57,6 +57,9 @@ use crate::records::Record; use crate::records::VectorSearchRun; use crate::schema::SCHEMA_VERSION; +// Unless we start merging 128 PR every second we are not hitting this max. +const WRITE_CONFLICT_ATTEMPTS: usize = 128; + /// Successful ingest response body. #[derive(Debug, Serialize)] pub struct IngestResponse { @@ -102,7 +105,38 @@ fn validate_envelope(env: &Envelope) -> Result<(), IngestError> { Ok(()) } +fn retry_write_conflicts(mut op: F) -> Result +where + F: FnMut() -> Result, +{ + for attempt in 1..=WRITE_CONFLICT_ATTEMPTS { + match op() { + Ok(value) => return Ok(value), + Err(err) if attempt < WRITE_CONFLICT_ATTEMPTS && is_retryable_write_conflict(&err) => { + std::thread::yield_now(); + } + Err(err) => return Err(err), + } + } + unreachable!("loop either returns a value or the final error") +} + +fn is_retryable_write_conflict(err: &anyhow::Error) -> bool { + err.chain().any(|cause| { + let message = cause.to_string().to_ascii_lowercase(); + message.contains("conflict") + && (message.contains("transaction") + || message.contains("write") + || message.contains("tuple") + || message.contains("update")) + }) +} + fn apply_envelope(conn: &mut Connection, env: Envelope) -> Result { + retry_write_conflicts(|| apply_envelope_once(conn, &env)) +} + +fn apply_envelope_once(conn: &mut Connection, env: &Envelope) -> Result { let tx = conn.transaction().context("begin transaction")?; upsert_commit(&tx, &env.commit).context("upsert commit")?; diff --git a/benchmarks-website/server/src/lib.rs b/benchmarks-website/server/src/lib.rs index 3174aa50696..7cda68338f1 100644 --- a/benchmarks-website/server/src/lib.rs +++ b/benchmarks-website/server/src/lib.rs @@ -35,7 +35,7 @@ //! |---------------|---------------------------------------------------------------------------------------------| //! | [`app`] | [`app::AppState`] (DB handle + bearer + path) and the Axum router composition. | //! | [`auth`] | Bearer-token middleware for `/api/ingest`. | -//! | [`db`] | [`db::DbHandle`] connection wrapper + the per-fact-table `measurement_id_*` hash functions. | +//! | [`db`] | [`db::DbHandle`] task-local connection cloning + the per-fact-table `measurement_id_*` hash functions. | //! | [`schema`] | DuckDB DDL ([`schema::SCHEMA_DDL`]) and the wire schema version. | //! | [`records`] | Wire shapes for `POST /api/ingest`. | //! | [`ingest`] | `POST /api/ingest` handler — envelope validation, transaction, upsert dispatch. | @@ -51,8 +51,8 @@ //! routes skip auth. //! 3. The handler parses body / path / query into typed inputs (e.g. //! [`slug::ChartKey::from_slug`]). -//! 4. The handler hands a closure to [`db::run_blocking`], which acquires -//! the connection mutex and runs the synchronous DuckDB call on +//! 4. The handler hands a closure to [`db::run_blocking`], which clones a +//! task-local DuckDB connection and runs the synchronous call on //! `tokio::task::spawn_blocking` so the runtime stays free. //! 5. The closure returns `Result`. Errors are mapped //! into [`error::IngestError`] / [`error::ApiError`] with the right