diff --git a/rivetkit-typescript/packages/rivetkit-native/index.d.ts b/rivetkit-typescript/packages/rivetkit-native/index.d.ts index 7b9659b72e..3f618fb1f3 100644 --- a/rivetkit-typescript/packages/rivetkit-native/index.d.ts +++ b/rivetkit-typescript/packages/rivetkit-native/index.d.ts @@ -18,7 +18,7 @@ export interface QueryResult { rows: Array> } /** Open a native SQLite database backed by the envoy's KV channel. */ -export declare function openDatabaseFromEnvoy(jsHandle: JsEnvoyHandle, actorId: string): Promise +export declare function openDatabaseFromEnvoy(jsHandle: JsEnvoyHandle, actorId: string, preloadedEntries?: Array | undefined | null): Promise /** Configuration for starting the native envoy client. */ export interface JsEnvoyConfig { endpoint: string diff --git a/rivetkit-typescript/packages/rivetkit-native/src/bridge_actor.rs b/rivetkit-typescript/packages/rivetkit-native/src/bridge_actor.rs index f96e0b1a65..1735910a16 100644 --- a/rivetkit-typescript/packages/rivetkit-native/src/bridge_actor.rs +++ b/rivetkit-typescript/packages/rivetkit-native/src/bridge_actor.rs @@ -64,7 +64,7 @@ impl EnvoyCallbacks for BridgeCallbacks { actor_id: String, generation: u32, config: protocol::ActorConfig, - _preloaded_kv: Option, + preloaded_kv: Option, ) -> BoxFuture> { let response_map = self.response_map.clone(); let event_cb = self.event_cb.clone(); @@ -79,6 +79,7 @@ impl EnvoyCallbacks for BridgeCallbacks { "key": config.key, "createTs": config.create_ts, "input": config.input.map(|v| base64_encode(&v)), + "preloadedKv": preloaded_kv.as_ref().map(encode_preloaded_kv), "responseId": response_id, }); @@ -318,3 +319,20 @@ fn base64_decode(data: &str) -> Option> { use base64::Engine; base64::engine::general_purpose::STANDARD.decode(data).ok() } + +fn encode_preloaded_kv(preloaded_kv: &protocol::PreloadedKv) -> serde_json::Value { + serde_json::json!({ + "entries": preloaded_kv.entries.iter().map(|entry| { + serde_json::json!({ + "key": base64_encode(&entry.key), + "value": base64_encode(&entry.value), + "metadata": { + "version": base64_encode(&entry.metadata.version), + "updateTs": entry.metadata.update_ts, + }, + }) + }).collect::>(), + "requestedGetKeys": preloaded_kv.requested_get_keys.iter().map(|key| base64_encode(key)).collect::>(), + "requestedPrefixes": preloaded_kv.requested_prefixes.iter().map(|key| base64_encode(key)).collect::>(), + }) +} diff --git a/rivetkit-typescript/packages/rivetkit-native/src/database.rs b/rivetkit-typescript/packages/rivetkit-native/src/database.rs index eea6cb0794..779d0de7be 100644 --- a/rivetkit-typescript/packages/rivetkit-native/src/database.rs +++ b/rivetkit-typescript/packages/rivetkit-native/src/database.rs @@ -19,6 +19,7 @@ use rivetkit_sqlite_native::vfs::{KvVfs, NativeDatabase}; use tokio::runtime::Handle; use crate::envoy_handle::JsEnvoyHandle; +use crate::types::JsKvEntry; /// SqliteKv adapter that routes operations through the envoy handle's KV methods. pub struct EnvoyKv { @@ -501,12 +502,24 @@ fn exec_statements(db: *mut sqlite3, sql: &str) -> napi::Result { pub async fn open_database_from_envoy( js_handle: &JsEnvoyHandle, actor_id: String, + preloaded_entries: Option>, ) -> napi::Result { let envoy_kv = Arc::new(EnvoyKv::new(js_handle.handle.clone(), actor_id.clone())); + let preloaded_entries = preloaded_entries + .unwrap_or_default() + .into_iter() + .map(|entry| (entry.key.to_vec(), entry.value.to_vec())) + .collect(); let rt_handle = Handle::current(); let db = tokio::task::spawn_blocking(move || { let vfs_name = format!("envoy-kv-{}", actor_id); - let vfs = KvVfs::register(&vfs_name, envoy_kv, actor_id.clone(), rt_handle) + let vfs = KvVfs::register( + &vfs_name, + envoy_kv, + actor_id.clone(), + rt_handle, + preloaded_entries, + ) .map_err(|e| napi::Error::from_reason(format!("failed to register VFS: {}", e)))?; rivetkit_sqlite_native::vfs::open_database(vfs, &actor_id) diff --git a/rivetkit-typescript/packages/rivetkit-native/wrapper.d.ts b/rivetkit-typescript/packages/rivetkit-native/wrapper.d.ts index 2f6a48bb3c..165c9719e7 100644 --- a/rivetkit-typescript/packages/rivetkit-native/wrapper.d.ts +++ b/rivetkit-typescript/packages/rivetkit-native/wrapper.d.ts @@ -126,6 +126,7 @@ export declare function startEnvoy(config: EnvoyConfig): Promise; export declare function openDatabaseFromEnvoy( handle: EnvoyHandle, actorId: string, + preloadedEntries?: readonly [Uint8Array, Uint8Array][] | null, ): Promise; export interface NativeRawDatabase { @@ -139,6 +140,7 @@ export interface NativeRawDatabase { export declare function openRawDatabaseFromEnvoy( handle: EnvoyHandle, actorId: string, + preloadedEntries?: readonly [Uint8Array, Uint8Array][] | null, ): Promise; export declare const utils: {}; diff --git a/rivetkit-typescript/packages/rivetkit-native/wrapper.js b/rivetkit-typescript/packages/rivetkit-native/wrapper.js index 09c4fa6804..8d62c32a8f 100644 --- a/rivetkit-typescript/packages/rivetkit-native/wrapper.js +++ b/rivetkit-typescript/packages/rivetkit-native/wrapper.js @@ -158,9 +158,40 @@ async function startEnvoy(config) { /** * Open a native database backed by envoy KV. */ -async function openDatabaseFromEnvoy(handle, actorId) { +async function openDatabaseFromEnvoy(handle, actorId, preloadedEntries) { const rawHandle = handle._raw || handle; - return native.openDatabaseFromEnvoy(rawHandle, actorId); + const nativePreloadedEntries = preloadedEntries + ? preloadedEntries.map(([key, value]) => ({ + key: Buffer.from(key), + value: Buffer.from(value), + })) + : null; + return native.openDatabaseFromEnvoy( + rawHandle, + actorId, + nativePreloadedEntries, + ); +} + +function decodePreloadedKv(preloadedKv) { + if (!preloadedKv) { + return null; + } + + const decodeBytes = (value) => Uint8Array.from(Buffer.from(value, "base64")); + + return { + entries: (preloadedKv.entries || []).map((entry) => ({ + key: decodeBytes(entry.key), + value: decodeBytes(entry.value), + metadata: { + version: decodeBytes(entry.metadata.version), + updateTs: entry.metadata.updateTs, + }, + })), + requestedGetKeys: (preloadedKv.requestedGetKeys || []).map(decodeBytes), + requestedPrefixes: (preloadedKv.requestedPrefixes || []).map(decodeBytes), + }; } function isPlainObject(value) { @@ -278,8 +309,12 @@ function wrapNativeStorageError(nativeDb, error) { ); } -async function openRawDatabaseFromEnvoy(handle, actorId) { - const nativeDb = await openDatabaseFromEnvoy(handle, actorId); +async function openRawDatabaseFromEnvoy(handle, actorId, preloadedEntries) { + const nativeDb = await openDatabaseFromEnvoy( + handle, + actorId, + preloadedEntries, + ); let closed = false; const ensureOpen = () => { @@ -356,7 +391,7 @@ function handleEvent(event, config, wrappedHandle) { event.actorId, event.generation, actorConfig, - null, // preloadedKv + decodePreloadedKv(event.preloadedKv), ), ).then( async () => { diff --git a/rivetkit-typescript/packages/rivetkit/src/db/config.ts b/rivetkit-typescript/packages/rivetkit/src/db/config.ts index aef1866d7e..8ccf20521d 100644 --- a/rivetkit-typescript/packages/rivetkit/src/db/config.ts +++ b/rivetkit-typescript/packages/rivetkit/src/db/config.ts @@ -17,7 +17,10 @@ export interface NativeSqliteConfig { * Replaces the transport-config-based NativeSqliteConfig seam. */ export interface NativeDatabaseProvider { - open(actorId: string): Promise; + open( + actorId: string, + preloadedEntries?: [Uint8Array, Uint8Array][], + ): Promise; } /** diff --git a/rivetkit-typescript/packages/rivetkit/src/db/mod.ts b/rivetkit-typescript/packages/rivetkit/src/db/mod.ts index 3f6ba54c30..1eef4ff448 100644 --- a/rivetkit-typescript/packages/rivetkit/src/db/mod.ts +++ b/rivetkit-typescript/packages/rivetkit/src/db/mod.ts @@ -52,7 +52,10 @@ export function db({ // path where databases are opened from a live runtime handle // (e.g., the native envoy client). if (ctx.nativeDatabaseProvider) { - return await ctx.nativeDatabaseProvider.open(ctx.actorId); + return await ctx.nativeDatabaseProvider.open( + ctx.actorId, + ctx.preloadedEntries, + ); } const { database: db, kvStore } = await openActorDatabase(ctx); diff --git a/rivetkit-typescript/packages/rivetkit/src/drivers/engine/actor-driver.ts b/rivetkit-typescript/packages/rivetkit/src/drivers/engine/actor-driver.ts index 17905defbd..9486ddb5d0 100644 --- a/rivetkit-typescript/packages/rivetkit/src/drivers/engine/actor-driver.ts +++ b/rivetkit-typescript/packages/rivetkit/src/drivers/engine/actor-driver.ts @@ -580,10 +580,14 @@ export class EngineActorDriver implements ActorDriver { const envoy = this.#envoy; return { - open: async (actorId: string) => { + open: async ( + actorId: string, + preloadedEntries?: [Uint8Array, Uint8Array][], + ) => { return await nativeMod.openRawDatabaseFromEnvoy( envoy, actorId, + preloadedEntries, ); }, }; diff --git a/rivetkit-typescript/packages/sqlite-native/src/vfs.rs b/rivetkit-typescript/packages/sqlite-native/src/vfs.rs index c63ef0cd4e..a7669dff03 100644 --- a/rivetkit-typescript/packages/sqlite-native/src/vfs.rs +++ b/rivetkit-typescript/packages/sqlite-native/src/vfs.rs @@ -118,6 +118,48 @@ fn read_cache_enabled() -> bool { }) } +type StartupPreloadEntries = Vec<(Vec, Vec)>; + +fn sort_startup_preload(entries: &mut StartupPreloadEntries) { + entries.sort_by(|a, b| a.0.cmp(&b.0)); +} + +fn startup_preload_search( + entries: &StartupPreloadEntries, + key: &[u8], +) -> Result { + entries.binary_search_by(|(candidate, _)| candidate.as_slice().cmp(key)) +} + +fn startup_preload_get<'a>( + entries: &'a StartupPreloadEntries, + key: &[u8], +) -> Option<&'a [u8]> { + startup_preload_search(entries, key) + .ok() + .map(|idx| entries[idx].1.as_slice()) +} + +fn startup_preload_put(entries: &mut StartupPreloadEntries, key: &[u8], value: &[u8]) { + if let Ok(idx) = startup_preload_search(entries, key) { + entries[idx].1 = value.to_vec(); + } +} + +fn startup_preload_delete(entries: &mut StartupPreloadEntries, key: &[u8]) { + if let Ok(idx) = startup_preload_search(entries, key) { + entries.remove(idx); + } +} + +fn startup_preload_delete_range( + entries: &mut StartupPreloadEntries, + start: &[u8], + end: &[u8], +) { + entries.retain(|(key, _)| key.as_slice() < start || key.as_slice() >= end); +} + // MARK: VFS Metrics /// Per-VFS-callback operation metrics for diagnosing native vs WASM performance. @@ -157,6 +199,8 @@ struct VfsContext { kv: Arc, actor_id: String, main_file_name: String, + // Bounded startup entries shipped with actor start. This is not the opt-in read cache. + startup_preload: Mutex>, read_cache_enabled: bool, last_error: Mutex>, rt_handle: Handle, @@ -231,13 +275,58 @@ impl VfsContext { } } + fn update_startup_preload( + &self, + f: impl FnOnce(&mut StartupPreloadEntries), + ) { + if let Ok(mut guard) = self.startup_preload.lock() { + if let Some(entries) = guard.as_mut() { + f(entries); + } + } + } + fn kv_get(&self, keys: Vec>) -> Result { let key_count = keys.len(); let start = std::time::Instant::now(); - let result = self - .rt_handle - .block_on(self.kv.batch_get(&self.actor_id, keys)) - .map_err(|err| self.report_kv_error(err)); + let (preloaded_keys, preloaded_values, miss_keys) = if let Ok(guard) = + self.startup_preload.lock() + { + if let Some(entries) = guard.as_ref() { + let mut hit_keys = Vec::new(); + let mut hit_values = Vec::new(); + let mut misses = Vec::new(); + for key in keys { + if let Some(value) = startup_preload_get(entries, key.as_slice()) { + hit_keys.push(key); + hit_values.push(value.to_vec()); + } else { + misses.push(key); + } + } + (hit_keys, hit_values, misses) + } else { + (Vec::new(), Vec::new(), keys) + } + } else { + (Vec::new(), Vec::new(), keys) + }; + let result = if miss_keys.is_empty() { + Ok(KvGetResult { + keys: preloaded_keys, + values: preloaded_values, + }) + } else { + self + .rt_handle + .block_on(self.kv.batch_get(&self.actor_id, miss_keys)) + .map(|mut result| { + result.keys.extend(preloaded_keys); + result.values.extend(preloaded_values); + result + }) + .map_err(|err| self.report_kv_error(err)) + }; if result.is_ok() { self.clear_last_error(); } @@ -255,10 +344,15 @@ impl VfsContext { let start = std::time::Instant::now(); let result = self .rt_handle - .block_on(self.kv.batch_put(&self.actor_id, keys, values)) + .block_on(self.kv.batch_put(&self.actor_id, keys.clone(), values.clone())) .map_err(|err| self.report_kv_error(err)); if result.is_ok() { self.clear_last_error(); + self.update_startup_preload(|entries| { + for (key, value) in keys.iter().zip(values.iter()) { + startup_preload_put(entries, key.as_slice(), value.as_slice()); + } + }); } let elapsed = start.elapsed(); tracing::debug!( @@ -274,10 +368,15 @@ impl VfsContext { let start = std::time::Instant::now(); let result = self .rt_handle - .block_on(self.kv.batch_delete(&self.actor_id, keys)) + .block_on(self.kv.batch_delete(&self.actor_id, keys.clone())) .map_err(|err| self.report_kv_error(err)); if result.is_ok() { self.clear_last_error(); + self.update_startup_preload(|entries| { + for key in &keys { + startup_preload_delete(entries, key.as_slice()); + } + }); } let elapsed = start.elapsed(); tracing::debug!( @@ -290,12 +389,21 @@ impl VfsContext { fn kv_delete_range(&self, start: Vec, end: Vec) -> Result<(), String> { let start_time = std::time::Instant::now(); + let preload_start = start.clone(); + let preload_end = end.clone(); let result = self .rt_handle .block_on(self.kv.delete_range(&self.actor_id, start, end)) .map_err(|err| self.report_kv_error(err)); if result.is_ok() { self.clear_last_error(); + self.update_startup_preload(|entries| { + startup_preload_delete_range( + entries, + preload_start.as_slice(), + preload_end.as_slice(), + ); + }); } let elapsed = start_time.elapsed(); tracing::debug!( @@ -1274,6 +1382,7 @@ impl KvVfs { kv: Arc, actor_id: String, rt_handle: Handle, + mut startup_preload: StartupPreloadEntries, ) -> Result { let mut io_methods: sqlite3_io_methods = unsafe { std::mem::zeroed() }; io_methods.iVersion = 1; @@ -1291,10 +1400,12 @@ impl KvVfs { io_methods.xDeviceCharacteristics = Some(kv_io_device_characteristics); let vfs_metrics = Arc::new(VfsMetrics::new()); + sort_startup_preload(&mut startup_preload); let ctx = Box::new(VfsContext { kv, actor_id: actor_id.clone(), main_file_name: actor_id, + startup_preload: Mutex::new((!startup_preload.is_empty()).then_some(startup_preload)), read_cache_enabled: read_cache_enabled(), last_error: Mutex::new(None), rt_handle, @@ -1520,4 +1631,46 @@ mod tests { .iter() .all(|byte| *byte == 0)); } + + #[test] + fn startup_preload_helpers_use_exact_key_matches() { + let mut entries = vec![ + (vec![3], vec![30]), + (vec![1], vec![10]), + (vec![2], vec![20]), + ]; + sort_startup_preload(&mut entries); + + assert_eq!(startup_preload_get(&entries, &[1]), Some(&[10][..])); + assert_eq!(startup_preload_get(&entries, &[2]), Some(&[20][..])); + assert_eq!(startup_preload_get(&entries, &[4]), None); + } + + #[test] + fn startup_preload_helpers_update_without_growing() { + let mut entries = vec![(vec![1], vec![10]), (vec![2], vec![20])]; + sort_startup_preload(&mut entries); + + startup_preload_put(&mut entries, &[2], &[99]); + startup_preload_put(&mut entries, &[3], &[30]); + startup_preload_delete(&mut entries, &[1]); + startup_preload_delete(&mut entries, &[7]); + + assert_eq!(entries, vec![(vec![2], vec![99])]); + } + + #[test] + fn startup_preload_helpers_delete_range_is_half_open() { + let mut entries = vec![ + (vec![1], vec![10]), + (vec![2], vec![20]), + (vec![3], vec![30]), + (vec![4], vec![40]), + ]; + sort_startup_preload(&mut entries); + + startup_preload_delete_range(&mut entries, &[2], &[4]); + + assert_eq!(entries, vec![(vec![1], vec![10]), (vec![4], vec![40])]); + } }