Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions benchmarks-website/server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ base64 = "0.22"
# track vortex-duckdb's bundled engine version (build.rs)
duckdb = { version = "1.10502", features = ["bundled"] }
maud = { version = "0.27", features = ["axum"] }
parking_lot = { workspace = true }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
subtle = "2.6"
Expand Down
2 changes: 1 addition & 1 deletion benchmarks-website/server/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use crate::ingest;
/// or a small `String`).
#[derive(Clone)]
pub struct AppState {
/// Mutex-guarded DuckDB connection. See [`crate::db`].
/// Shared DuckDB handle. See [`crate::db`].
pub db: DbHandle,
/// Bearer token expected on `/api/ingest`. Compared via constant-time eq.
pub bearer_token: Arc<String>,
Expand Down
40 changes: 29 additions & 11 deletions benchmarks-website/server/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@

//! DuckDB connection management plus the deterministic `measurement_id` hash.
//!
//! The server holds a single [`duckdb::Connection`] inside an async
//! [`tokio::sync::Mutex`]. All DB work runs inside `spawn_blocking` so the
//! Tokio runtime is never blocked on synchronous DuckDB calls.
//! The server keeps one root [`duckdb::Connection`] and clones a fresh
//! connection from it for each blocking DB task. All DB work runs inside
//! `spawn_blocking` so the Tokio runtime is never blocked on synchronous
//! DuckDB calls.
//!
//! `measurement_id` is a server-internal xxhash64 over `commit_sha` plus
//! each table's dimensional tuple. Including `commit_sha` makes every
Expand All @@ -21,7 +22,7 @@ use std::sync::Arc;
use anyhow::Context as _;
use anyhow::Result;
use duckdb::Connection;
use tokio::sync::Mutex;
use parking_lot::Mutex;
use twox_hash::XxHash64;

use crate::records::CompressionSize;
Expand All @@ -31,8 +32,25 @@ use crate::records::RandomAccessTime;
use crate::records::VectorSearchRun;
use crate::schema::SCHEMA_DDL;

/// A connection guard the rest of the crate hands around.
pub type DbHandle = Arc<Mutex<Connection>>;
/// Shared DuckDB handle. Cloning the handle is cheap; each DB task clones a
/// task-local [`Connection`] before doing work.
#[derive(Clone)]
pub struct DbHandle {
root: Arc<Mutex<Connection>>,
}

impl DbHandle {
fn new(root: Connection) -> Self {
Self {
root: Arc::new(Mutex::new(root)),
}
}

fn connection(&self) -> Result<Connection> {
let root = self.root.lock();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i don't love that this still requires locking, even though the criticial section is pretty short lived...guess it's fine 🤷

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that we cant really get around this? Also it is unlikely that this would have contention anyways

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this method is called on every request right?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

at this point, why do we even need the lock? can't we just clone here?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the type isn't send so it needs to be wrapped in something with interior mut

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I guess it might have some contention, but duckdb is going to have a lot of overhead in creating txns for all of these requests (which is further dominated by the actual queries) so I don't think it's a problem

root.try_clone().context("cloning DuckDB connection")
}
}

/// Open the DuckDB file at `path` (creating it if absent) and apply the
/// schema DDL. Returns a handle ready to be cloned into the Axum state.
Expand All @@ -41,20 +59,20 @@ pub fn open<P: AsRef<Path>>(path: P) -> Result<DbHandle> {
.with_context(|| format!("opening DuckDB at {}", path.as_ref().display()))?;
conn.execute_batch(SCHEMA_DDL)
.context("applying schema DDL")?;
Ok(Arc::new(Mutex::new(conn)))
Ok(DbHandle::new(conn))
}

/// Run a synchronous DB operation on the blocking pool, holding the connection
/// mutex for the duration of the call.
/// Run a synchronous DB operation on the blocking pool using a task-local
/// DuckDB connection cloned from the shared database handle.
pub async fn run_blocking<F, T>(handle: &DbHandle, f: F) -> Result<T>
where
F: FnOnce(&mut Connection) -> Result<T> + Send + 'static,
T: Send + 'static,
{
let handle = handle.clone();
tokio::task::spawn_blocking(move || {
let mut guard = handle.blocking_lock();
f(&mut guard)
let mut conn = handle.connection()?;
f(&mut conn)
})
.await
.context("DB task panicked")?
Expand Down
34 changes: 34 additions & 0 deletions benchmarks-website/server/src/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ use crate::records::Record;
use crate::records::VectorSearchRun;
use crate::schema::SCHEMA_VERSION;

// Unless we start merging 128 PR every second we are not hitting this max.
const WRITE_CONFLICT_ATTEMPTS: usize = 128;

/// Successful ingest response body.
#[derive(Debug, Serialize)]
pub struct IngestResponse {
Expand Down Expand Up @@ -102,7 +105,38 @@ fn validate_envelope(env: &Envelope) -> Result<(), IngestError> {
Ok(())
}

fn retry_write_conflicts<F, T>(mut op: F) -> Result<T>
where
F: FnMut() -> Result<T>,
{
for attempt in 1..=WRITE_CONFLICT_ATTEMPTS {
match op() {
Ok(value) => return Ok(value),
Err(err) if attempt < WRITE_CONFLICT_ATTEMPTS && is_retryable_write_conflict(&err) => {
std::thread::yield_now();
}
Err(err) => return Err(err),
}
}
unreachable!("loop either returns a value or the final error")
}

fn is_retryable_write_conflict(err: &anyhow::Error) -> bool {
err.chain().any(|cause| {
let message = cause.to_string().to_ascii_lowercase();
message.contains("conflict")
&& (message.contains("transaction")
|| message.contains("write")
|| message.contains("tuple")
|| message.contains("update"))
})
}

fn apply_envelope(conn: &mut Connection, env: Envelope) -> Result<IngestResponse> {
retry_write_conflicts(|| apply_envelope_once(conn, &env))
}

fn apply_envelope_once(conn: &mut Connection, env: &Envelope) -> Result<IngestResponse> {
let tx = conn.transaction().context("begin transaction")?;

upsert_commit(&tx, &env.commit).context("upsert commit")?;
Expand Down
6 changes: 3 additions & 3 deletions benchmarks-website/server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
//! |---------------|---------------------------------------------------------------------------------------------|
//! | [`app`] | [`app::AppState`] (DB handle + bearer + path) and the Axum router composition. |
//! | [`auth`] | Bearer-token middleware for `/api/ingest`. |
//! | [`db`] | [`db::DbHandle`] connection wrapper + the per-fact-table `measurement_id_*` hash functions. |
//! | [`db`] | [`db::DbHandle`] task-local connection cloning + the per-fact-table `measurement_id_*` hash functions. |
//! | [`schema`] | DuckDB DDL ([`schema::SCHEMA_DDL`]) and the wire schema version. |
//! | [`records`] | Wire shapes for `POST /api/ingest`. |
//! | [`ingest`] | `POST /api/ingest` handler — envelope validation, transaction, upsert dispatch. |
Expand All @@ -51,8 +51,8 @@
//! routes skip auth.
//! 3. The handler parses body / path / query into typed inputs (e.g.
//! [`slug::ChartKey::from_slug`]).
//! 4. The handler hands a closure to [`db::run_blocking`], which acquires
//! the connection mutex and runs the synchronous DuckDB call on
//! 4. The handler hands a closure to [`db::run_blocking`], which clones a
//! task-local DuckDB connection and runs the synchronous call on
//! `tokio::task::spawn_blocking` so the runtime stays free.
//! 5. The closure returns `Result<T, anyhow::Error>`. Errors are mapped
//! into [`error::IngestError`] / [`error::ApiError`] with the right
Expand Down
Loading