From a0f54bb385d7111dccfc1b4acd3b185bf0c782f0 Mon Sep 17 00:00:00 2001 From: Nathan Flurry Date: Sun, 12 Apr 2026 20:16:23 -0700 Subject: [PATCH] fix(sqlite-native): restore kv error hook --- Cargo.toml | 3 + .../packages/rivetkit-native/index.d.ts | 1 + .../packages/rivetkit-native/src/database.rs | 16 +++ .../packages/rivetkit-native/wrapper.js | 38 ++++- .../packages/sqlite-native/src/sqlite_kv.rs | 4 + .../packages/sqlite-native/src/vfs.rs | 135 ++++++++++++++++-- 6 files changed, 177 insertions(+), 20 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 76bda6f1b4..7dc249c207 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -515,6 +515,9 @@ members = [ [workspace.dependencies.rivet-envoy-protocol] path = "engine/sdks/rust/envoy-protocol" + [workspace.dependencies.rivetkit-sqlite-native] + path = "rivetkit-typescript/packages/sqlite-native" + [workspace.dependencies.epoxy-protocol] path = "engine/sdks/rust/epoxy-protocol" diff --git a/rivetkit-typescript/packages/rivetkit-native/index.d.ts b/rivetkit-typescript/packages/rivetkit-native/index.d.ts index 8dd3ad8de6..7b9659b72e 100644 --- a/rivetkit-typescript/packages/rivetkit-native/index.d.ts +++ b/rivetkit-typescript/packages/rivetkit-native/index.d.ts @@ -60,6 +60,7 @@ export declare function startEnvoySyncJs(config: JsEnvoyConfig, eventCallback: ( export declare function startEnvoyJs(config: JsEnvoyConfig, eventCallback: (event: any) => void): JsEnvoyHandle /** Native SQLite database handle exposed to JavaScript. */ export declare class JsNativeDatabase { + takeLastKvError(): string | null run(sql: string, params?: Array | undefined | null): Promise query(sql: string, params?: Array | undefined | null): Promise exec(sql: string): Promise diff --git a/rivetkit-typescript/packages/rivetkit-native/src/database.rs b/rivetkit-typescript/packages/rivetkit-native/src/database.rs index be98463611..eea6cb0794 100644 --- a/rivetkit-typescript/packages/rivetkit-native/src/database.rs +++ b/rivetkit-typescript/packages/rivetkit-native/src/database.rs @@ -34,6 +34,10 @@ impl EnvoyKv { #[async_trait] impl SqliteKv for EnvoyKv { + fn on_error(&self, actor_id: &str, error: &SqliteKvError) { + tracing::error!(%actor_id, %error, "native sqlite kv operation failed"); + } + async fn on_open(&self, _actor_id: &str) -> Result<(), SqliteKvError> { Ok(()) } @@ -115,6 +119,13 @@ impl JsNativeDatabase { .and_then(|guard| guard.as_ref().map(NativeDatabase::as_ptr)) .unwrap_or(ptr::null_mut()) } + + fn take_last_kv_error_inner(&self) -> Option { + self.db + .lock() + .ok() + .and_then(|guard| guard.as_ref().and_then(NativeDatabase::take_last_kv_error)) + } } #[napi(object)] @@ -139,6 +150,11 @@ pub struct QueryResult { #[napi] impl JsNativeDatabase { + #[napi] + pub fn take_last_kv_error(&self) -> Option { + self.take_last_kv_error_inner() + } + #[napi] pub async fn run( &self, diff --git a/rivetkit-typescript/packages/rivetkit-native/wrapper.js b/rivetkit-typescript/packages/rivetkit-native/wrapper.js index b65c635ef2..09c4fa6804 100644 --- a/rivetkit-typescript/packages/rivetkit-native/wrapper.js +++ b/rivetkit-typescript/packages/rivetkit-native/wrapper.js @@ -134,7 +134,7 @@ function startEnvoySync(config) { poolName: config.poolName, version: config.version, metadata: config.metadata || null, - notGlobal: config.notGlobal, + notGlobal: config.notGlobal ?? false, }, (event) => { handleEvent(event, config, wrappedHandle); @@ -264,6 +264,20 @@ function mapRows(rows, columns) { }); } +function wrapNativeStorageError(nativeDb, error) { + const lastKvError = + typeof nativeDb.takeLastKvError === "function" + ? nativeDb.takeLastKvError() + : null; + if (!lastKvError) { + throw error; + } + throw new Error( + `Database query failed because the underlying storage is no longer available (${lastKvError}). This usually means the actor is stopping. Use c.abortSignal to cancel long-running work before the actor shuts down.`, + { cause: error }, + ); +} + async function openRawDatabaseFromEnvoy(handle, actorId) { const nativeDb = await openDatabaseFromEnvoy(handle, actorId); let closed = false; @@ -288,16 +302,28 @@ async function openRawDatabaseFromEnvoy(handle, actorId) { /\bRETURNING\b/i.test(query); if (returnsRows) { - const result = await nativeDb.query(query, bindings); - return mapRows(result.rows, result.columns); + try { + const result = await nativeDb.query(query, bindings); + return mapRows(result.rows, result.columns); + } catch (error) { + wrapNativeStorageError(nativeDb, error); + } } - await nativeDb.run(query, bindings); + try { + await nativeDb.run(query, bindings); + } catch (error) { + wrapNativeStorageError(nativeDb, error); + } return []; } - const result = await nativeDb.exec(query); - return mapRows(result.rows, result.columns); + try { + const result = await nativeDb.exec(query); + return mapRows(result.rows, result.columns); + } catch (error) { + wrapNativeStorageError(nativeDb, error); + } }, close: async () => { if (closed) { diff --git a/rivetkit-typescript/packages/sqlite-native/src/sqlite_kv.rs b/rivetkit-typescript/packages/sqlite-native/src/sqlite_kv.rs index 4d906b0552..6f7d75059c 100644 --- a/rivetkit-typescript/packages/sqlite-native/src/sqlite_kv.rs +++ b/rivetkit-typescript/packages/sqlite-native/src/sqlite_kv.rs @@ -68,6 +68,10 @@ pub struct KvGetResult { /// at a higher level. #[async_trait] pub trait SqliteKv: Send + Sync { + /// Called when a KV operation fails inside a VFS callback before the + /// original error is collapsed into a generic SQLite IO error code. + fn on_error(&self, _actor_id: &str, _error: &SqliteKvError) {} + /// Called when an actor's database is opened. async fn on_open(&self, _actor_id: &str) -> Result<(), SqliteKvError> { Ok(()) diff --git a/rivetkit-typescript/packages/sqlite-native/src/vfs.rs b/rivetkit-typescript/packages/sqlite-native/src/vfs.rs index e8bdd55fad..2e7f0adb8a 100644 --- a/rivetkit-typescript/packages/sqlite-native/src/vfs.rs +++ b/rivetkit-typescript/packages/sqlite-native/src/vfs.rs @@ -8,13 +8,13 @@ use std::ffi::{c_char, c_int, c_void, CStr, CString}; use std::ptr; use std::slice; use std::sync::atomic::{AtomicU64, Ordering}; -use std::sync::{Arc, OnceLock}; +use std::sync::{Arc, Mutex, OnceLock}; use libsqlite3_sys::*; use tokio::runtime::Handle; use crate::kv; -use crate::sqlite_kv::{KvGetResult, SqliteKv}; +use crate::sqlite_kv::{KvGetResult, SqliteKv, SqliteKvError}; // MARK: Panic Guard @@ -158,12 +158,62 @@ struct VfsContext { actor_id: String, main_file_name: String, read_cache_enabled: bool, + last_error: Mutex>, rt_handle: Handle, io_methods: Box, vfs_metrics: Arc, } impl VfsContext { + fn clear_last_error(&self) { + match self.last_error.lock() { + Ok(mut last_error) => { + *last_error = None; + } + Err(err) => { + tracing::warn!(%err, "native sqlite last_error mutex poisoned"); + } + } + } + + fn set_last_error(&self, message: String) { + match self.last_error.lock() { + Ok(mut last_error) => { + *last_error = Some(message); + } + Err(err) => { + tracing::warn!(%err, "native sqlite last_error mutex poisoned"); + } + } + } + + fn clone_last_error(&self) -> Option { + match self.last_error.lock() { + Ok(last_error) => last_error.clone(), + Err(err) => { + tracing::warn!(%err, "native sqlite last_error mutex poisoned"); + None + } + } + } + + fn take_last_error(&self) -> Option { + match self.last_error.lock() { + Ok(mut last_error) => last_error.take(), + Err(err) => { + tracing::warn!(%err, "native sqlite last_error mutex poisoned"); + None + } + } + } + + fn report_kv_error(&self, err: SqliteKvError) -> String { + let message = err.to_string(); + self.set_last_error(message.clone()); + self.kv.on_error(&self.actor_id, &err); + message + } + fn resolve_file_tag(&self, path: &str) -> Option { if path == self.main_file_name { return Some(kv::FILE_TAG_MAIN); @@ -187,7 +237,10 @@ impl VfsContext { let result = self .rt_handle .block_on(self.kv.batch_get(&self.actor_id, keys)) - .map_err(|e| e.to_string()); + .map_err(|err| self.report_kv_error(err)); + if result.is_ok() { + self.clear_last_error(); + } let elapsed = start.elapsed(); tracing::debug!( op = %format_args!("get({key_count}keys)"), @@ -203,7 +256,10 @@ impl VfsContext { let result = self .rt_handle .block_on(self.kv.batch_put(&self.actor_id, keys, values)) - .map_err(|e| e.to_string()); + .map_err(|err| self.report_kv_error(err)); + if result.is_ok() { + self.clear_last_error(); + } let elapsed = start.elapsed(); tracing::debug!( op = %format_args!("put({key_count}keys)"), @@ -219,7 +275,10 @@ impl VfsContext { let result = self .rt_handle .block_on(self.kv.batch_delete(&self.actor_id, keys)) - .map_err(|e| e.to_string()); + .map_err(|err| self.report_kv_error(err)); + if result.is_ok() { + self.clear_last_error(); + } let elapsed = start.elapsed(); tracing::debug!( op = %format_args!("del({key_count}keys)"), @@ -234,7 +293,10 @@ impl VfsContext { let result = self .rt_handle .block_on(self.kv.delete_range(&self.actor_id, start, end)) - .map_err(|e| e.to_string()); + .map_err(|err| self.report_kv_error(err)); + if result.is_ok() { + self.clear_last_error(); + } let elapsed = start_time.elapsed(); tracing::debug!( op = "delRange", @@ -574,7 +636,10 @@ unsafe extern "C" fn kv_io_write( let chunk_key = kv::get_chunk_key(file.file_tag, chunk_idx as u32).to_vec(); let cached_chunk = if needs_existing && ctx.read_cache_enabled { let state = get_file_state(file.state); - state.read_cache.get(chunk_key.as_slice()).cloned() + state + .read_cache + .as_ref() + .and_then(|read_cache| read_cache.get(chunk_key.as_slice()).cloned()) } else { None }; @@ -616,7 +681,7 @@ unsafe extern "C" fn kv_io_write( let existing_chunk = plan.cached_chunk.as_deref().or_else(|| { plan.existing_chunk_index .and_then(|idx| existing_chunks.get(idx)) - .and_then(|value| value.as_ref()) + .and_then(|value| value.as_deref()) }); let mut new_chunk = if let Some(existing_chunk) = existing_chunk { @@ -1164,11 +1229,30 @@ unsafe extern "C" fn kv_vfs_current_time(_p_vfs: *mut sqlite3_vfs, p_time_out: * } unsafe extern "C" fn kv_vfs_get_last_error( - _p_vfs: *mut sqlite3_vfs, - _n_byte: c_int, - _z_err_msg: *mut c_char, + p_vfs: *mut sqlite3_vfs, + n_byte: c_int, + z_err_msg: *mut c_char, ) -> c_int { - vfs_catch_unwind!(SQLITE_IOERR, SQLITE_OK) + vfs_catch_unwind!(SQLITE_IOERR, { + if n_byte <= 0 || z_err_msg.is_null() { + return 0; + } + + let ctx = get_vfs_ctx(p_vfs); + let last_error = ctx.clone_last_error(); + let Some(message) = last_error else { + *z_err_msg = 0; + return 0; + }; + + let bytes = message.as_bytes(); + let max_len = (n_byte as usize).saturating_sub(1); + let copy_len = bytes.len().min(max_len); + let dst = z_err_msg.cast::(); + ptr::copy_nonoverlapping(bytes.as_ptr(), dst, copy_len); + *dst.add(copy_len) = 0u8; + 0 + }) } // MARK: KvVfs @@ -1183,6 +1267,10 @@ unsafe impl Send for KvVfs {} unsafe impl Sync for KvVfs {} impl KvVfs { + fn take_last_kv_error(&self) -> Option { + unsafe { (*self.ctx_ptr).take_last_error() } + } + pub fn register( name: &str, kv: Arc, @@ -1210,6 +1298,7 @@ impl KvVfs { actor_id: actor_id.clone(), main_file_name: actor_id, read_cache_enabled: read_cache_enabled(), + last_error: Mutex::new(None), rt_handle, io_methods: Box::new(io_methods), vfs_metrics, @@ -1279,6 +1368,10 @@ impl NativeDatabase { pub fn as_ptr(&self) -> *mut sqlite3 { self.db } + + pub fn take_last_kv_error(&self) -> Option { + self._vfs.take_last_kv_error() + } } impl Drop for NativeDatabase { @@ -1291,6 +1384,18 @@ impl Drop for NativeDatabase { } } +fn sqlite_error_message(db: *mut sqlite3) -> String { + unsafe { + if db.is_null() { + "unknown sqlite error".to_string() + } else { + CStr::from_ptr(sqlite3_errmsg(db)) + .to_string_lossy() + .into_owned() + } + } +} + pub fn open_database(vfs: KvVfs, file_name: &str) -> Result { let c_name = CString::new(file_name).map_err(|err| err.to_string())?; let mut db: *mut sqlite3 = ptr::null_mut(); @@ -1304,12 +1409,13 @@ pub fn open_database(vfs: KvVfs, file_name: &str) -> Result Result