Skip to content

Commit db38027

Browse files
committed
fix(sqlite-native): restore native startup kv preload
1 parent 12c0baa commit db38027

9 files changed

Lines changed: 255 additions & 18 deletions

File tree

rivetkit-typescript/packages/rivetkit-native/index.d.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ export interface QueryResult {
1818
rows: Array<Array<any>>
1919
}
2020
/** Open a native SQLite database backed by the envoy's KV channel. */
21-
export declare function openDatabaseFromEnvoy(jsHandle: JsEnvoyHandle, actorId: string): Promise<JsNativeDatabase>
21+
export declare function openDatabaseFromEnvoy(jsHandle: JsEnvoyHandle, actorId: string, preloadedEntries?: Array<JsKvEntry> | undefined | null): Promise<JsNativeDatabase>
2222
/** Configuration for starting the native envoy client. */
2323
export interface JsEnvoyConfig {
2424
endpoint: string

rivetkit-typescript/packages/rivetkit-native/src/bridge_actor.rs

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ impl EnvoyCallbacks for BridgeCallbacks {
6464
actor_id: String,
6565
generation: u32,
6666
config: protocol::ActorConfig,
67-
_preloaded_kv: Option<protocol::PreloadedKv>,
67+
preloaded_kv: Option<protocol::PreloadedKv>,
6868
) -> BoxFuture<anyhow::Result<()>> {
6969
let response_map = self.response_map.clone();
7070
let event_cb = self.event_cb.clone();
@@ -79,6 +79,7 @@ impl EnvoyCallbacks for BridgeCallbacks {
7979
"key": config.key,
8080
"createTs": config.create_ts,
8181
"input": config.input.map(|v| base64_encode(&v)),
82+
"preloadedKv": preloaded_kv.as_ref().map(encode_preloaded_kv),
8283
"responseId": response_id,
8384
});
8485

@@ -318,3 +319,20 @@ fn base64_decode(data: &str) -> Option<Vec<u8>> {
318319
use base64::Engine;
319320
base64::engine::general_purpose::STANDARD.decode(data).ok()
320321
}
322+
323+
fn encode_preloaded_kv(preloaded_kv: &protocol::PreloadedKv) -> serde_json::Value {
324+
serde_json::json!({
325+
"entries": preloaded_kv.entries.iter().map(|entry| {
326+
serde_json::json!({
327+
"key": base64_encode(&entry.key),
328+
"value": base64_encode(&entry.value),
329+
"metadata": {
330+
"version": base64_encode(&entry.metadata.version),
331+
"updateTs": entry.metadata.update_ts,
332+
},
333+
})
334+
}).collect::<Vec<_>>(),
335+
"requestedGetKeys": preloaded_kv.requested_get_keys.iter().map(|key| base64_encode(key)).collect::<Vec<_>>(),
336+
"requestedPrefixes": preloaded_kv.requested_prefixes.iter().map(|key| base64_encode(key)).collect::<Vec<_>>(),
337+
})
338+
}

rivetkit-typescript/packages/rivetkit-native/src/database.rs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use rivetkit_sqlite_native::vfs::{KvVfs, NativeDatabase};
1919
use tokio::runtime::Handle;
2020

2121
use crate::envoy_handle::JsEnvoyHandle;
22+
use crate::types::JsKvEntry;
2223

2324
/// SqliteKv adapter that routes operations through the envoy handle's KV methods.
2425
pub struct EnvoyKv {
@@ -511,12 +512,24 @@ fn exec_statements(db: *mut sqlite3, sql: &str) -> napi::Result<QueryResult> {
511512
pub async fn open_database_from_envoy(
512513
js_handle: &JsEnvoyHandle,
513514
actor_id: String,
515+
preloaded_entries: Option<Vec<JsKvEntry>>,
514516
) -> napi::Result<JsNativeDatabase> {
515517
let envoy_kv = Arc::new(EnvoyKv::new(js_handle.handle.clone(), actor_id.clone()));
518+
let preloaded_entries = preloaded_entries
519+
.unwrap_or_default()
520+
.into_iter()
521+
.map(|entry| (entry.key.to_vec(), entry.value.to_vec()))
522+
.collect();
516523
let rt_handle = Handle::current();
517524
let db = tokio::task::spawn_blocking(move || {
518525
let vfs_name = format!("envoy-kv-{}", actor_id);
519-
let vfs = KvVfs::register(&vfs_name, envoy_kv, actor_id.clone(), rt_handle)
526+
let vfs = KvVfs::register(
527+
&vfs_name,
528+
envoy_kv,
529+
actor_id.clone(),
530+
rt_handle,
531+
preloaded_entries,
532+
)
520533
.map_err(|e| napi::Error::from_reason(format!("failed to register VFS: {}", e)))?;
521534

522535
rivetkit_sqlite_native::vfs::open_database(vfs, &actor_id)

rivetkit-typescript/packages/rivetkit-native/wrapper.d.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@ export declare function startEnvoy(config: EnvoyConfig): Promise<EnvoyHandle>;
126126
export declare function openDatabaseFromEnvoy(
127127
handle: EnvoyHandle,
128128
actorId: string,
129+
preloadedEntries?: readonly [Uint8Array, Uint8Array][] | null,
129130
): Promise<JsNativeDatabase>;
130131

131132
export interface NativeRawDatabase {
@@ -139,6 +140,7 @@ export interface NativeRawDatabase {
139140
export declare function openRawDatabaseFromEnvoy(
140141
handle: EnvoyHandle,
141142
actorId: string,
143+
preloadedEntries?: readonly [Uint8Array, Uint8Array][] | null,
142144
): Promise<NativeRawDatabase>;
143145

144146
export declare const utils: {};

rivetkit-typescript/packages/rivetkit-native/wrapper.js

Lines changed: 40 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -158,9 +158,40 @@ async function startEnvoy(config) {
158158
/**
159159
* Open a native database backed by envoy KV.
160160
*/
161-
async function openDatabaseFromEnvoy(handle, actorId) {
161+
async function openDatabaseFromEnvoy(handle, actorId, preloadedEntries) {
162162
const rawHandle = handle._raw || handle;
163-
return native.openDatabaseFromEnvoy(rawHandle, actorId);
163+
const nativePreloadedEntries = preloadedEntries
164+
? preloadedEntries.map(([key, value]) => ({
165+
key: Buffer.from(key),
166+
value: Buffer.from(value),
167+
}))
168+
: null;
169+
return native.openDatabaseFromEnvoy(
170+
rawHandle,
171+
actorId,
172+
nativePreloadedEntries,
173+
);
174+
}
175+
176+
function decodePreloadedKv(preloadedKv) {
177+
if (!preloadedKv) {
178+
return null;
179+
}
180+
181+
const decodeBytes = (value) => Uint8Array.from(Buffer.from(value, "base64"));
182+
183+
return {
184+
entries: (preloadedKv.entries || []).map((entry) => ({
185+
key: decodeBytes(entry.key),
186+
value: decodeBytes(entry.value),
187+
metadata: {
188+
version: decodeBytes(entry.metadata.version),
189+
updateTs: entry.metadata.updateTs,
190+
},
191+
})),
192+
requestedGetKeys: (preloadedKv.requestedGetKeys || []).map(decodeBytes),
193+
requestedPrefixes: (preloadedKv.requestedPrefixes || []).map(decodeBytes),
194+
};
164195
}
165196

166197
function isPlainObject(value) {
@@ -264,8 +295,12 @@ function mapRows(rows, columns) {
264295
});
265296
}
266297

267-
async function openRawDatabaseFromEnvoy(handle, actorId) {
268-
const nativeDb = await openDatabaseFromEnvoy(handle, actorId);
298+
async function openRawDatabaseFromEnvoy(handle, actorId, preloadedEntries) {
299+
const nativeDb = await openDatabaseFromEnvoy(
300+
handle,
301+
actorId,
302+
preloadedEntries,
303+
);
269304
let closed = false;
270305

271306
const ensureOpen = () => {
@@ -330,7 +365,7 @@ function handleEvent(event, config, wrappedHandle) {
330365
event.actorId,
331366
event.generation,
332367
actorConfig,
333-
null, // preloadedKv
368+
decodePreloadedKv(event.preloadedKv),
334369
),
335370
).then(
336371
async () => {

rivetkit-typescript/packages/rivetkit/src/db/config.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,10 @@ export interface NativeSqliteConfig {
1717
* Replaces the transport-config-based NativeSqliteConfig seam.
1818
*/
1919
export interface NativeDatabaseProvider {
20-
open(actorId: string): Promise<RawAccess>;
20+
open(
21+
actorId: string,
22+
preloadedEntries?: [Uint8Array, Uint8Array][],
23+
): Promise<RawAccess>;
2124
}
2225

2326
/**

rivetkit-typescript/packages/rivetkit/src/db/mod.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,10 @@ export function db({
5252
// path where databases are opened from a live runtime handle
5353
// (e.g., the native envoy client).
5454
if (ctx.nativeDatabaseProvider) {
55-
return await ctx.nativeDatabaseProvider.open(ctx.actorId);
55+
return await ctx.nativeDatabaseProvider.open(
56+
ctx.actorId,
57+
ctx.preloadedEntries,
58+
);
5659
}
5760

5861
const { database: db, kvStore } = await openActorDatabase(ctx);

rivetkit-typescript/packages/rivetkit/src/drivers/engine/actor-driver.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -580,10 +580,14 @@ export class EngineActorDriver implements ActorDriver {
580580

581581
const envoy = this.#envoy;
582582
return {
583-
open: async (actorId: string) => {
583+
open: async (
584+
actorId: string,
585+
preloadedEntries?: [Uint8Array, Uint8Array][],
586+
) => {
584587
return await nativeMod.openRawDatabaseFromEnvoy(
585588
envoy,
586589
actorId,
590+
preloadedEntries,
587591
);
588592
},
589593
};

0 commit comments

Comments
 (0)