Skip to content

Commit 4e380c8

Browse files
committed
fix(sqlite-native): surface kv errors from native sqlite
1 parent ff117f9 commit 4e380c8

5 files changed

Lines changed: 144 additions & 15 deletions

File tree

rivetkit-typescript/packages/rivetkit-native/index.d.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ export declare function startEnvoySyncJs(config: JsEnvoyConfig, eventCallback: (
5959
export declare function startEnvoyJs(config: JsEnvoyConfig, eventCallback: (event: any) => void): JsEnvoyHandle
6060
/** Native SQLite database handle exposed to JavaScript. */
6161
export declare class JsNativeDatabase {
62+
takeLastKvError(): string | null
6263
run(sql: string, params?: Array<JsBindParam> | undefined | null): Promise<ExecuteResult>
6364
query(sql: string, params?: Array<JsBindParam> | undefined | null): Promise<QueryResult>
6465
exec(sql: string): Promise<QueryResult>

rivetkit-typescript/packages/rivetkit-native/src/database.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,10 @@ impl EnvoyKv {
3434

3535
#[async_trait]
3636
impl SqliteKv for EnvoyKv {
37+
fn on_error(&self, actor_id: &str, error: &SqliteKvError) {
38+
tracing::error!(%actor_id, %error, "native sqlite kv operation failed");
39+
}
40+
3741
async fn on_open(&self, _actor_id: &str) -> Result<(), SqliteKvError> {
3842
Ok(())
3943
}
@@ -116,6 +120,14 @@ impl JsNativeDatabase {
116120
.and_then(|guard| guard.as_ref().map(NativeDatabase::as_ptr))
117121
.unwrap_or(ptr::null_mut())
118122
}
123+
124+
fn take_last_kv_error_inner(&self) -> Option<String> {
125+
self
126+
.db
127+
.lock()
128+
.ok()
129+
.and_then(|guard| guard.as_ref().and_then(NativeDatabase::take_last_kv_error))
130+
}
119131
}
120132

121133
#[napi(object)]
@@ -140,6 +152,11 @@ pub struct QueryResult {
140152

141153
#[napi]
142154
impl JsNativeDatabase {
155+
#[napi]
156+
pub fn take_last_kv_error(&self) -> Option<String> {
157+
self.take_last_kv_error_inner()
158+
}
159+
143160
#[napi]
144161
pub async fn run(
145162
&self,

rivetkit-typescript/packages/rivetkit-native/wrapper.js

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,20 @@ function mapRows(rows, columns) {
264264
});
265265
}
266266

267+
async function wrapNativeStorageError(nativeDb, error) {
268+
const lastKvError =
269+
typeof nativeDb.takeLastKvError === "function"
270+
? await nativeDb.takeLastKvError()
271+
: null;
272+
if (!lastKvError) {
273+
throw error;
274+
}
275+
throw new Error(
276+
`Database query failed because the underlying storage is no longer available (${lastKvError}). This usually means the actor is stopping. Use c.abortSignal to cancel long-running work before the actor shuts down.`,
277+
{ cause: error instanceof Error ? error : undefined },
278+
);
279+
}
280+
267281
async function openRawDatabaseFromEnvoy(handle, actorId) {
268282
const nativeDb = await openDatabaseFromEnvoy(handle, actorId);
269283
let closed = false;
@@ -288,15 +302,29 @@ async function openRawDatabaseFromEnvoy(handle, actorId) {
288302
/\bRETURNING\b/i.test(query);
289303

290304
if (returnsRows) {
291-
const result = await nativeDb.query(query, bindings);
305+
let result;
306+
try {
307+
result = await nativeDb.query(query, bindings);
308+
} catch (error) {
309+
await wrapNativeStorageError(nativeDb, error);
310+
}
292311
return mapRows(result.rows, result.columns);
293312
}
294313

295-
await nativeDb.run(query, bindings);
314+
try {
315+
await nativeDb.run(query, bindings);
316+
} catch (error) {
317+
await wrapNativeStorageError(nativeDb, error);
318+
}
296319
return [];
297320
}
298321

299-
const result = await nativeDb.exec(query);
322+
let result;
323+
try {
324+
result = await nativeDb.exec(query);
325+
} catch (error) {
326+
await wrapNativeStorageError(nativeDb, error);
327+
}
300328
return mapRows(result.rows, result.columns);
301329
},
302330
close: async () => {

rivetkit-typescript/packages/sqlite-native/src/sqlite_kv.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,10 @@ pub struct KvGetResult {
6868
/// at a higher level.
6969
#[async_trait]
7070
pub trait SqliteKv: Send + Sync {
71+
/// Called when a KV operation fails inside a VFS callback before the
72+
/// original error is collapsed into a generic SQLite IO error code.
73+
fn on_error(&self, _actor_id: &str, _error: &SqliteKvError) {}
74+
7175
/// Called when an actor's database is opened.
7276
async fn on_open(&self, _actor_id: &str) -> Result<(), SqliteKvError> {
7377
Ok(())

rivetkit-typescript/packages/sqlite-native/src/vfs.rs

Lines changed: 91 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,13 @@ use std::ffi::{c_char, c_int, c_void, CStr, CString};
88
use std::ptr;
99
use std::slice;
1010
use std::sync::atomic::{AtomicU64, Ordering};
11-
use std::sync::{Arc, OnceLock};
11+
use std::sync::{Arc, Mutex, OnceLock};
1212

1313
use libsqlite3_sys::*;
1414
use tokio::runtime::Handle;
1515

1616
use crate::kv;
17-
use crate::sqlite_kv::{KvGetResult, SqliteKv};
17+
use crate::sqlite_kv::{KvGetResult, SqliteKv, SqliteKvError};
1818

1919
// MARK: Panic Guard
2020

@@ -158,12 +158,32 @@ struct VfsContext {
158158
actor_id: String,
159159
main_file_name: String,
160160
read_cache_enabled: bool,
161+
last_error: Mutex<Option<String>>,
161162
rt_handle: Handle,
162163
io_methods: Box<sqlite3_io_methods>,
163164
vfs_metrics: Arc<VfsMetrics>,
164165
}
165166

166167
impl VfsContext {
168+
fn clear_last_error(&self) {
169+
if let Ok(mut last_error) = self.last_error.lock() {
170+
*last_error = None;
171+
}
172+
}
173+
174+
fn set_last_error(&self, message: String) {
175+
if let Ok(mut last_error) = self.last_error.lock() {
176+
*last_error = Some(message);
177+
}
178+
}
179+
180+
fn report_kv_error(&self, err: SqliteKvError) -> String {
181+
let message = err.to_string();
182+
self.set_last_error(message.clone());
183+
self.kv.on_error(&self.actor_id, &err);
184+
message
185+
}
186+
167187
fn resolve_file_tag(&self, path: &str) -> Option<u8> {
168188
if path == self.main_file_name {
169189
return Some(kv::FILE_TAG_MAIN);
@@ -187,7 +207,10 @@ impl VfsContext {
187207
let result = self
188208
.rt_handle
189209
.block_on(self.kv.batch_get(&self.actor_id, keys))
190-
.map_err(|e| e.to_string());
210+
.map_err(|err| self.report_kv_error(err));
211+
if result.is_ok() {
212+
self.clear_last_error();
213+
}
191214
let elapsed = start.elapsed();
192215
tracing::debug!(
193216
op = %format_args!("get({key_count}keys)"),
@@ -208,7 +231,10 @@ impl VfsContext {
208231
let result = self
209232
.rt_handle
210233
.block_on(self.kv.batch_put(&self.actor_id, keys, values))
211-
.map_err(|e| e.to_string());
234+
.map_err(|err| self.report_kv_error(err));
235+
if result.is_ok() {
236+
self.clear_last_error();
237+
}
212238
let elapsed = start.elapsed();
213239
tracing::debug!(
214240
op = %format_args!("put({key_count}keys)"),
@@ -229,7 +255,10 @@ impl VfsContext {
229255
let result = self
230256
.rt_handle
231257
.block_on(self.kv.batch_delete(&self.actor_id, keys))
232-
.map_err(|e| e.to_string());
258+
.map_err(|err| self.report_kv_error(err));
259+
if result.is_ok() {
260+
self.clear_last_error();
261+
}
233262
let elapsed = start.elapsed();
234263
tracing::debug!(
235264
op = %format_args!("del({key_count}keys)"),
@@ -249,7 +278,10 @@ impl VfsContext {
249278
let result = self
250279
.rt_handle
251280
.block_on(self.kv.delete_range(&self.actor_id, start, end))
252-
.map_err(|e| e.to_string());
281+
.map_err(|err| self.report_kv_error(err));
282+
if result.is_ok() {
283+
self.clear_last_error();
284+
}
253285
let elapsed = start_time.elapsed();
254286
tracing::debug!(
255287
op = "delRange",
@@ -1180,11 +1212,29 @@ unsafe extern "C" fn kv_vfs_current_time(_p_vfs: *mut sqlite3_vfs, p_time_out: *
11801212
}
11811213

11821214
unsafe extern "C" fn kv_vfs_get_last_error(
1183-
_p_vfs: *mut sqlite3_vfs,
1184-
_n_byte: c_int,
1185-
_z_err_msg: *mut c_char,
1215+
p_vfs: *mut sqlite3_vfs,
1216+
n_byte: c_int,
1217+
z_err_msg: *mut c_char,
11861218
) -> c_int {
1187-
vfs_catch_unwind!(SQLITE_IOERR, SQLITE_OK)
1219+
vfs_catch_unwind!(SQLITE_IOERR, {
1220+
if n_byte <= 0 || z_err_msg.is_null() {
1221+
return 0;
1222+
}
1223+
1224+
let ctx = get_vfs_ctx(p_vfs);
1225+
let last_error = ctx.last_error.lock().ok().and_then(|guard| guard.clone());
1226+
let Some(message) = last_error else {
1227+
*z_err_msg = 0;
1228+
return 0;
1229+
};
1230+
1231+
let bytes = message.as_bytes();
1232+
let max_len = (n_byte as usize).saturating_sub(1);
1233+
let copy_len = bytes.len().min(max_len);
1234+
ptr::copy_nonoverlapping(bytes.as_ptr(), z_err_msg.cast::<u8>(), copy_len);
1235+
*z_err_msg.add(copy_len) = 0;
1236+
0
1237+
})
11881238
}
11891239

11901240
// MARK: KvVfs
@@ -1199,6 +1249,16 @@ unsafe impl Send for KvVfs {}
11991249
unsafe impl Sync for KvVfs {}
12001250

12011251
impl KvVfs {
1252+
fn take_last_kv_error(&self) -> Option<String> {
1253+
unsafe {
1254+
(*self.ctx_ptr)
1255+
.last_error
1256+
.lock()
1257+
.ok()
1258+
.and_then(|mut last_error| last_error.take())
1259+
}
1260+
}
1261+
12021262
pub fn register(
12031263
name: &str,
12041264
kv: Arc<dyn SqliteKv>,
@@ -1226,6 +1286,7 @@ impl KvVfs {
12261286
actor_id: actor_id.clone(),
12271287
main_file_name: actor_id,
12281288
read_cache_enabled: read_cache_enabled(),
1289+
last_error: Mutex::new(None),
12291290
rt_handle,
12301291
io_methods: Box::new(io_methods),
12311292
vfs_metrics,
@@ -1295,6 +1356,10 @@ impl NativeDatabase {
12951356
pub fn as_ptr(&self) -> *mut sqlite3 {
12961357
self.db
12971358
}
1359+
1360+
pub fn take_last_kv_error(&self) -> Option<String> {
1361+
self._vfs.take_last_kv_error()
1362+
}
12981363
}
12991364

13001365
impl Drop for NativeDatabase {
@@ -1307,6 +1372,18 @@ impl Drop for NativeDatabase {
13071372
}
13081373
}
13091374

1375+
fn sqlite_error_message(db: *mut sqlite3) -> String {
1376+
unsafe {
1377+
if db.is_null() {
1378+
"unknown sqlite error".to_string()
1379+
} else {
1380+
CStr::from_ptr(sqlite3_errmsg(db))
1381+
.to_string_lossy()
1382+
.into_owned()
1383+
}
1384+
}
1385+
}
1386+
13101387
pub fn open_database(vfs: KvVfs, file_name: &str) -> Result<NativeDatabase, String> {
13111388
let c_name = CString::new(file_name).map_err(|err| err.to_string())?;
13121389
let mut db: *mut sqlite3 = ptr::null_mut();
@@ -1320,12 +1397,13 @@ pub fn open_database(vfs: KvVfs, file_name: &str) -> Result<NativeDatabase, Stri
13201397
)
13211398
};
13221399
if rc != SQLITE_OK {
1400+
let message = sqlite_error_message(db);
13231401
if !db.is_null() {
13241402
unsafe {
13251403
sqlite3_close(db);
13261404
}
13271405
}
1328-
return Err(format!("sqlite3_open_v2 failed with code {rc}"));
1406+
return Err(format!("sqlite3_open_v2 failed with code {rc}: {message}"));
13291407
}
13301408

13311409
for pragma in &[
@@ -1340,10 +1418,11 @@ pub fn open_database(vfs: KvVfs, file_name: &str) -> Result<NativeDatabase, Stri
13401418
let rc =
13411419
unsafe { sqlite3_exec(db, c_sql.as_ptr(), None, ptr::null_mut(), ptr::null_mut()) };
13421420
if rc != SQLITE_OK {
1421+
let message = sqlite_error_message(db);
13431422
unsafe {
13441423
sqlite3_close(db);
13451424
}
1346-
return Err(format!("{pragma} failed with code {rc}"));
1425+
return Err(format!("{pragma} failed with code {rc}: {message}"));
13471426
}
13481427
}
13491428

0 commit comments

Comments
 (0)