Skip to content

Commit 2be63d0

Browse files
committed
fix(sqlite-native): restore kv error hook
1 parent 22df032 commit 2be63d0

6 files changed

Lines changed: 178 additions & 20 deletions

File tree

Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -515,6 +515,9 @@ members = [
515515
[workspace.dependencies.rivet-envoy-protocol]
516516
path = "engine/sdks/rust/envoy-protocol"
517517

518+
[workspace.dependencies.rivetkit-sqlite-native]
519+
path = "rivetkit-typescript/packages/sqlite-native"
520+
518521
[workspace.dependencies.epoxy-protocol]
519522
path = "engine/sdks/rust/epoxy-protocol"
520523

rivetkit-typescript/packages/rivetkit-native/index.d.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ export interface JsEnvoyConfig {
2727
poolName: string
2828
version: number
2929
metadata?: any
30+
notGlobal: boolean
3031
/**
3132
* Log level for the Rust tracing subscriber (e.g. "trace", "debug", "info", "warn", "error").
3233
* Falls back to RIVET_LOG_LEVEL, then LOG_LEVEL, then RUST_LOG env vars. Defaults to "warn".
@@ -59,6 +60,7 @@ export declare function startEnvoySyncJs(config: JsEnvoyConfig, eventCallback: (
5960
export declare function startEnvoyJs(config: JsEnvoyConfig, eventCallback: (event: any) => void): JsEnvoyHandle
6061
/** Native SQLite database handle exposed to JavaScript. */
6162
export declare class JsNativeDatabase {
63+
takeLastKvError(): string | null
6264
run(sql: string, params?: Array<JsBindParam> | undefined | null): Promise<ExecuteResult>
6365
query(sql: string, params?: Array<JsBindParam> | undefined | null): Promise<QueryResult>
6466
exec(sql: string): Promise<QueryResult>

rivetkit-typescript/packages/rivetkit-native/src/database.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,10 @@ impl EnvoyKv {
3434

3535
#[async_trait]
3636
impl SqliteKv for EnvoyKv {
37+
fn on_error(&self, actor_id: &str, error: &SqliteKvError) {
38+
tracing::error!(%actor_id, %error, "native sqlite kv operation failed");
39+
}
40+
3741
async fn on_open(&self, _actor_id: &str) -> Result<(), SqliteKvError> {
3842
Ok(())
3943
}
@@ -115,6 +119,13 @@ impl JsNativeDatabase {
115119
.and_then(|guard| guard.as_ref().map(NativeDatabase::as_ptr))
116120
.unwrap_or(ptr::null_mut())
117121
}
122+
123+
fn take_last_kv_error_inner(&self) -> Option<String> {
124+
self.db
125+
.lock()
126+
.ok()
127+
.and_then(|guard| guard.as_ref().and_then(NativeDatabase::take_last_kv_error))
128+
}
118129
}
119130

120131
#[napi(object)]
@@ -139,6 +150,11 @@ pub struct QueryResult {
139150

140151
#[napi]
141152
impl JsNativeDatabase {
153+
#[napi]
154+
pub fn take_last_kv_error(&self) -> Option<String> {
155+
self.take_last_kv_error_inner()
156+
}
157+
142158
#[napi]
143159
pub async fn run(
144160
&self,

rivetkit-typescript/packages/rivetkit-native/wrapper.js

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ function startEnvoySync(config) {
134134
poolName: config.poolName,
135135
version: config.version,
136136
metadata: config.metadata || null,
137-
notGlobal: config.notGlobal,
137+
notGlobal: config.notGlobal ?? false,
138138
},
139139
(event) => {
140140
handleEvent(event, config, wrappedHandle);
@@ -264,6 +264,20 @@ function mapRows(rows, columns) {
264264
});
265265
}
266266

267+
function wrapNativeStorageError(nativeDb, error) {
268+
const lastKvError =
269+
typeof nativeDb.takeLastKvError === "function"
270+
? nativeDb.takeLastKvError()
271+
: null;
272+
if (!lastKvError) {
273+
throw error;
274+
}
275+
throw new Error(
276+
`Database query failed because the underlying storage is no longer available (${lastKvError}). This usually means the actor is stopping. Use c.abortSignal to cancel long-running work before the actor shuts down.`,
277+
{ cause: error },
278+
);
279+
}
280+
267281
async function openRawDatabaseFromEnvoy(handle, actorId) {
268282
const nativeDb = await openDatabaseFromEnvoy(handle, actorId);
269283
let closed = false;
@@ -288,16 +302,28 @@ async function openRawDatabaseFromEnvoy(handle, actorId) {
288302
/\bRETURNING\b/i.test(query);
289303

290304
if (returnsRows) {
291-
const result = await nativeDb.query(query, bindings);
292-
return mapRows(result.rows, result.columns);
305+
try {
306+
const result = await nativeDb.query(query, bindings);
307+
return mapRows(result.rows, result.columns);
308+
} catch (error) {
309+
wrapNativeStorageError(nativeDb, error);
310+
}
293311
}
294312

295-
await nativeDb.run(query, bindings);
313+
try {
314+
await nativeDb.run(query, bindings);
315+
} catch (error) {
316+
wrapNativeStorageError(nativeDb, error);
317+
}
296318
return [];
297319
}
298320

299-
const result = await nativeDb.exec(query);
300-
return mapRows(result.rows, result.columns);
321+
try {
322+
const result = await nativeDb.exec(query);
323+
return mapRows(result.rows, result.columns);
324+
} catch (error) {
325+
wrapNativeStorageError(nativeDb, error);
326+
}
301327
},
302328
close: async () => {
303329
if (closed) {

rivetkit-typescript/packages/sqlite-native/src/sqlite_kv.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,10 @@ pub struct KvGetResult {
6868
/// at a higher level.
6969
#[async_trait]
7070
pub trait SqliteKv: Send + Sync {
71+
/// Called when a KV operation fails inside a VFS callback before the
72+
/// original error is collapsed into a generic SQLite IO error code.
73+
fn on_error(&self, _actor_id: &str, _error: &SqliteKvError) {}
74+
7175
/// Called when an actor's database is opened.
7276
async fn on_open(&self, _actor_id: &str) -> Result<(), SqliteKvError> {
7377
Ok(())

rivetkit-typescript/packages/sqlite-native/src/vfs.rs

Lines changed: 121 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,13 @@ use std::ffi::{c_char, c_int, c_void, CStr, CString};
88
use std::ptr;
99
use std::slice;
1010
use std::sync::atomic::{AtomicU64, Ordering};
11-
use std::sync::{Arc, OnceLock};
11+
use std::sync::{Arc, Mutex, OnceLock};
1212

1313
use libsqlite3_sys::*;
1414
use tokio::runtime::Handle;
1515

1616
use crate::kv;
17-
use crate::sqlite_kv::{KvGetResult, SqliteKv};
17+
use crate::sqlite_kv::{KvGetResult, SqliteKv, SqliteKvError};
1818

1919
// MARK: Panic Guard
2020

@@ -158,12 +158,62 @@ struct VfsContext {
158158
actor_id: String,
159159
main_file_name: String,
160160
read_cache_enabled: bool,
161+
last_error: Mutex<Option<String>>,
161162
rt_handle: Handle,
162163
io_methods: Box<sqlite3_io_methods>,
163164
vfs_metrics: Arc<VfsMetrics>,
164165
}
165166

166167
impl VfsContext {
168+
fn clear_last_error(&self) {
169+
match self.last_error.lock() {
170+
Ok(mut last_error) => {
171+
*last_error = None;
172+
}
173+
Err(err) => {
174+
tracing::warn!(%err, "native sqlite last_error mutex poisoned");
175+
}
176+
}
177+
}
178+
179+
fn set_last_error(&self, message: String) {
180+
match self.last_error.lock() {
181+
Ok(mut last_error) => {
182+
*last_error = Some(message);
183+
}
184+
Err(err) => {
185+
tracing::warn!(%err, "native sqlite last_error mutex poisoned");
186+
}
187+
}
188+
}
189+
190+
fn clone_last_error(&self) -> Option<String> {
191+
match self.last_error.lock() {
192+
Ok(last_error) => last_error.clone(),
193+
Err(err) => {
194+
tracing::warn!(%err, "native sqlite last_error mutex poisoned");
195+
None
196+
}
197+
}
198+
}
199+
200+
fn take_last_error(&self) -> Option<String> {
201+
match self.last_error.lock() {
202+
Ok(mut last_error) => last_error.take(),
203+
Err(err) => {
204+
tracing::warn!(%err, "native sqlite last_error mutex poisoned");
205+
None
206+
}
207+
}
208+
}
209+
210+
fn report_kv_error(&self, err: SqliteKvError) -> String {
211+
let message = err.to_string();
212+
self.set_last_error(message.clone());
213+
self.kv.on_error(&self.actor_id, &err);
214+
message
215+
}
216+
167217
fn resolve_file_tag(&self, path: &str) -> Option<u8> {
168218
if path == self.main_file_name {
169219
return Some(kv::FILE_TAG_MAIN);
@@ -187,7 +237,10 @@ impl VfsContext {
187237
let result = self
188238
.rt_handle
189239
.block_on(self.kv.batch_get(&self.actor_id, keys))
190-
.map_err(|e| e.to_string());
240+
.map_err(|err| self.report_kv_error(err));
241+
if result.is_ok() {
242+
self.clear_last_error();
243+
}
191244
let elapsed = start.elapsed();
192245
tracing::debug!(
193246
op = %format_args!("get({key_count}keys)"),
@@ -203,7 +256,10 @@ impl VfsContext {
203256
let result = self
204257
.rt_handle
205258
.block_on(self.kv.batch_put(&self.actor_id, keys, values))
206-
.map_err(|e| e.to_string());
259+
.map_err(|err| self.report_kv_error(err));
260+
if result.is_ok() {
261+
self.clear_last_error();
262+
}
207263
let elapsed = start.elapsed();
208264
tracing::debug!(
209265
op = %format_args!("put({key_count}keys)"),
@@ -219,7 +275,10 @@ impl VfsContext {
219275
let result = self
220276
.rt_handle
221277
.block_on(self.kv.batch_delete(&self.actor_id, keys))
222-
.map_err(|e| e.to_string());
278+
.map_err(|err| self.report_kv_error(err));
279+
if result.is_ok() {
280+
self.clear_last_error();
281+
}
223282
let elapsed = start.elapsed();
224283
tracing::debug!(
225284
op = %format_args!("del({key_count}keys)"),
@@ -234,7 +293,10 @@ impl VfsContext {
234293
let result = self
235294
.rt_handle
236295
.block_on(self.kv.delete_range(&self.actor_id, start, end))
237-
.map_err(|e| e.to_string());
296+
.map_err(|err| self.report_kv_error(err));
297+
if result.is_ok() {
298+
self.clear_last_error();
299+
}
238300
let elapsed = start_time.elapsed();
239301
tracing::debug!(
240302
op = "delRange",
@@ -574,7 +636,10 @@ unsafe extern "C" fn kv_io_write(
574636
let chunk_key = kv::get_chunk_key(file.file_tag, chunk_idx as u32).to_vec();
575637
let cached_chunk = if needs_existing && ctx.read_cache_enabled {
576638
let state = get_file_state(file.state);
577-
state.read_cache.get(chunk_key.as_slice()).cloned()
639+
state
640+
.read_cache
641+
.as_ref()
642+
.and_then(|read_cache| read_cache.get(chunk_key.as_slice()).cloned())
578643
} else {
579644
None
580645
};
@@ -616,7 +681,7 @@ unsafe extern "C" fn kv_io_write(
616681
let existing_chunk = plan.cached_chunk.as_deref().or_else(|| {
617682
plan.existing_chunk_index
618683
.and_then(|idx| existing_chunks.get(idx))
619-
.and_then(|value| value.as_ref())
684+
.and_then(|value| value.as_deref())
620685
});
621686

622687
let mut new_chunk = if let Some(existing_chunk) = existing_chunk {
@@ -1164,11 +1229,30 @@ unsafe extern "C" fn kv_vfs_current_time(_p_vfs: *mut sqlite3_vfs, p_time_out: *
11641229
}
11651230

11661231
unsafe extern "C" fn kv_vfs_get_last_error(
1167-
_p_vfs: *mut sqlite3_vfs,
1168-
_n_byte: c_int,
1169-
_z_err_msg: *mut c_char,
1232+
p_vfs: *mut sqlite3_vfs,
1233+
n_byte: c_int,
1234+
z_err_msg: *mut c_char,
11701235
) -> c_int {
1171-
vfs_catch_unwind!(SQLITE_IOERR, SQLITE_OK)
1236+
vfs_catch_unwind!(SQLITE_IOERR, {
1237+
if n_byte <= 0 || z_err_msg.is_null() {
1238+
return 0;
1239+
}
1240+
1241+
let ctx = get_vfs_ctx(p_vfs);
1242+
let last_error = ctx.clone_last_error();
1243+
let Some(message) = last_error else {
1244+
*z_err_msg = 0;
1245+
return 0;
1246+
};
1247+
1248+
let bytes = message.as_bytes();
1249+
let max_len = (n_byte as usize).saturating_sub(1);
1250+
let copy_len = bytes.len().min(max_len);
1251+
let dst = z_err_msg.cast::<u8>();
1252+
ptr::copy_nonoverlapping(bytes.as_ptr(), dst, copy_len);
1253+
*dst.add(copy_len) = 0u8;
1254+
0
1255+
})
11721256
}
11731257

11741258
// MARK: KvVfs
@@ -1183,6 +1267,10 @@ unsafe impl Send for KvVfs {}
11831267
unsafe impl Sync for KvVfs {}
11841268

11851269
impl KvVfs {
1270+
fn take_last_kv_error(&self) -> Option<String> {
1271+
unsafe { (*self.ctx_ptr).take_last_error() }
1272+
}
1273+
11861274
pub fn register(
11871275
name: &str,
11881276
kv: Arc<dyn SqliteKv>,
@@ -1210,6 +1298,7 @@ impl KvVfs {
12101298
actor_id: actor_id.clone(),
12111299
main_file_name: actor_id,
12121300
read_cache_enabled: read_cache_enabled(),
1301+
last_error: Mutex::new(None),
12131302
rt_handle,
12141303
io_methods: Box::new(io_methods),
12151304
vfs_metrics,
@@ -1279,6 +1368,10 @@ impl NativeDatabase {
12791368
pub fn as_ptr(&self) -> *mut sqlite3 {
12801369
self.db
12811370
}
1371+
1372+
pub fn take_last_kv_error(&self) -> Option<String> {
1373+
self._vfs.take_last_kv_error()
1374+
}
12821375
}
12831376

12841377
impl Drop for NativeDatabase {
@@ -1291,6 +1384,18 @@ impl Drop for NativeDatabase {
12911384
}
12921385
}
12931386

1387+
fn sqlite_error_message(db: *mut sqlite3) -> String {
1388+
unsafe {
1389+
if db.is_null() {
1390+
"unknown sqlite error".to_string()
1391+
} else {
1392+
CStr::from_ptr(sqlite3_errmsg(db))
1393+
.to_string_lossy()
1394+
.into_owned()
1395+
}
1396+
}
1397+
}
1398+
12941399
pub fn open_database(vfs: KvVfs, file_name: &str) -> Result<NativeDatabase, String> {
12951400
let c_name = CString::new(file_name).map_err(|err| err.to_string())?;
12961401
let mut db: *mut sqlite3 = ptr::null_mut();
@@ -1304,12 +1409,13 @@ pub fn open_database(vfs: KvVfs, file_name: &str) -> Result<NativeDatabase, Stri
13041409
)
13051410
};
13061411
if rc != SQLITE_OK {
1412+
let message = sqlite_error_message(db);
13071413
if !db.is_null() {
13081414
unsafe {
13091415
sqlite3_close(db);
13101416
}
13111417
}
1312-
return Err(format!("sqlite3_open_v2 failed with code {rc}"));
1418+
return Err(format!("sqlite3_open_v2 failed with code {rc}: {message}"));
13131419
}
13141420

13151421
for pragma in &[
@@ -1324,10 +1430,11 @@ pub fn open_database(vfs: KvVfs, file_name: &str) -> Result<NativeDatabase, Stri
13241430
let rc =
13251431
unsafe { sqlite3_exec(db, c_sql.as_ptr(), None, ptr::null_mut(), ptr::null_mut()) };
13261432
if rc != SQLITE_OK {
1433+
let message = sqlite_error_message(db);
13271434
unsafe {
13281435
sqlite3_close(db);
13291436
}
1330-
return Err(format!("{pragma} failed with code {rc}"));
1437+
return Err(format!("{pragma} failed with code {rc}: {message}"));
13311438
}
13321439
}
13331440

0 commit comments

Comments
 (0)