Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion rivetkit-typescript/packages/rivetkit-native/index.d.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/* tslint:disable */

Check failure on line 1 in rivetkit-typescript/packages/rivetkit-native/index.d.ts

View workflow job for this annotation

GitHub Actions / RivetKit / Quality Check

format

Formatter would have printed the following content:
/* eslint-disable */

/* auto-generated by NAPI-RS */
Expand All @@ -18,7 +18,7 @@
rows: Array<Array<any>>
}
/** Open a native SQLite database backed by the envoy's KV channel. */
export declare function openDatabaseFromEnvoy(jsHandle: JsEnvoyHandle, actorId: string): Promise<JsNativeDatabase>
export declare function openDatabaseFromEnvoy(jsHandle: JsEnvoyHandle, actorId: string, preloadedEntries?: Array<JsKvEntry> | undefined | null): Promise<JsNativeDatabase>
/** Configuration for starting the native envoy client. */
export interface JsEnvoyConfig {
endpoint: string
Expand Down
20 changes: 19 additions & 1 deletion rivetkit-typescript/packages/rivetkit-native/src/bridge_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ impl EnvoyCallbacks for BridgeCallbacks {
actor_id: String,
generation: u32,
config: protocol::ActorConfig,
_preloaded_kv: Option<protocol::PreloadedKv>,
preloaded_kv: Option<protocol::PreloadedKv>,
) -> BoxFuture<anyhow::Result<()>> {
let response_map = self.response_map.clone();
let event_cb = self.event_cb.clone();
Expand All @@ -79,6 +79,7 @@ impl EnvoyCallbacks for BridgeCallbacks {
"key": config.key,
"createTs": config.create_ts,
"input": config.input.map(|v| base64_encode(&v)),
"preloadedKv": preloaded_kv.as_ref().map(encode_preloaded_kv),
"responseId": response_id,
});

Expand Down Expand Up @@ -318,3 +319,20 @@ fn base64_decode(data: &str) -> Option<Vec<u8>> {
use base64::Engine;
base64::engine::general_purpose::STANDARD.decode(data).ok()
}

fn encode_preloaded_kv(preloaded_kv: &protocol::PreloadedKv) -> serde_json::Value {
serde_json::json!({
"entries": preloaded_kv.entries.iter().map(|entry| {
serde_json::json!({
"key": base64_encode(&entry.key),
"value": base64_encode(&entry.value),
"metadata": {
"version": base64_encode(&entry.metadata.version),
"updateTs": entry.metadata.update_ts,
},
})
}).collect::<Vec<_>>(),
"requestedGetKeys": preloaded_kv.requested_get_keys.iter().map(|key| base64_encode(key)).collect::<Vec<_>>(),
"requestedPrefixes": preloaded_kv.requested_prefixes.iter().map(|key| base64_encode(key)).collect::<Vec<_>>(),
})
}
15 changes: 14 additions & 1 deletion rivetkit-typescript/packages/rivetkit-native/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use rivetkit_sqlite_native::vfs::{KvVfs, NativeDatabase};
use tokio::runtime::Handle;

use crate::envoy_handle::JsEnvoyHandle;
use crate::types::JsKvEntry;

/// SqliteKv adapter that routes operations through the envoy handle's KV methods.
pub struct EnvoyKv {
Expand Down Expand Up @@ -501,12 +502,24 @@ fn exec_statements(db: *mut sqlite3, sql: &str) -> napi::Result<QueryResult> {
pub async fn open_database_from_envoy(
js_handle: &JsEnvoyHandle,
actor_id: String,
preloaded_entries: Option<Vec<JsKvEntry>>,
) -> napi::Result<JsNativeDatabase> {
let envoy_kv = Arc::new(EnvoyKv::new(js_handle.handle.clone(), actor_id.clone()));
let preloaded_entries = preloaded_entries
.unwrap_or_default()
.into_iter()
.map(|entry| (entry.key.to_vec(), entry.value.to_vec()))
.collect();
let rt_handle = Handle::current();
let db = tokio::task::spawn_blocking(move || {
let vfs_name = format!("envoy-kv-{}", actor_id);
let vfs = KvVfs::register(&vfs_name, envoy_kv, actor_id.clone(), rt_handle)
let vfs = KvVfs::register(
&vfs_name,
envoy_kv,
actor_id.clone(),
rt_handle,
preloaded_entries,
)
.map_err(|e| napi::Error::from_reason(format!("failed to register VFS: {}", e)))?;

rivetkit_sqlite_native::vfs::open_database(vfs, &actor_id)
Expand Down
2 changes: 2 additions & 0 deletions rivetkit-typescript/packages/rivetkit-native/wrapper.d.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { JsNativeDatabase, JsKvEntry, JsKvListOptions } from "./index";

Check failure on line 1 in rivetkit-typescript/packages/rivetkit-native/wrapper.d.ts

View workflow job for this annotation

GitHub Actions / RivetKit / Quality Check

format

Formatter would have printed the following content:

export type { JsNativeDatabase, JsKvEntry, JsKvListOptions };

Expand Down Expand Up @@ -126,6 +126,7 @@
export declare function openDatabaseFromEnvoy(
handle: EnvoyHandle,
actorId: string,
preloadedEntries?: readonly [Uint8Array, Uint8Array][] | null,
): Promise<JsNativeDatabase>;

export interface NativeRawDatabase {
Expand All @@ -139,6 +140,7 @@
export declare function openRawDatabaseFromEnvoy(
handle: EnvoyHandle,
actorId: string,
preloadedEntries?: readonly [Uint8Array, Uint8Array][] | null,
): Promise<NativeRawDatabase>;

export declare const utils: {};
45 changes: 40 additions & 5 deletions rivetkit-typescript/packages/rivetkit-native/wrapper.js
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,40 @@ async function startEnvoy(config) {
/**
* Open a native database backed by envoy KV.
*/
async function openDatabaseFromEnvoy(handle, actorId) {
async function openDatabaseFromEnvoy(handle, actorId, preloadedEntries) {
const rawHandle = handle._raw || handle;
return native.openDatabaseFromEnvoy(rawHandle, actorId);
const nativePreloadedEntries = preloadedEntries
? preloadedEntries.map(([key, value]) => ({
key: Buffer.from(key),
value: Buffer.from(value),
}))
: null;
return native.openDatabaseFromEnvoy(
rawHandle,
actorId,
nativePreloadedEntries,
);
}

function decodePreloadedKv(preloadedKv) {
if (!preloadedKv) {
return null;
}

const decodeBytes = (value) => Uint8Array.from(Buffer.from(value, "base64"));

return {
entries: (preloadedKv.entries || []).map((entry) => ({
key: decodeBytes(entry.key),
value: decodeBytes(entry.value),
metadata: {
version: decodeBytes(entry.metadata.version),
updateTs: entry.metadata.updateTs,
},
})),
requestedGetKeys: (preloadedKv.requestedGetKeys || []).map(decodeBytes),
requestedPrefixes: (preloadedKv.requestedPrefixes || []).map(decodeBytes),
};
}

function isPlainObject(value) {
Expand Down Expand Up @@ -278,8 +309,12 @@ function wrapNativeStorageError(nativeDb, error) {
);
}

async function openRawDatabaseFromEnvoy(handle, actorId) {
const nativeDb = await openDatabaseFromEnvoy(handle, actorId);
async function openRawDatabaseFromEnvoy(handle, actorId, preloadedEntries) {
const nativeDb = await openDatabaseFromEnvoy(
handle,
actorId,
preloadedEntries,
);
let closed = false;

const ensureOpen = () => {
Expand Down Expand Up @@ -356,7 +391,7 @@ function handleEvent(event, config, wrappedHandle) {
event.actorId,
event.generation,
actorConfig,
null, // preloadedKv
decodePreloadedKv(event.preloadedKv),
),
).then(
async () => {
Expand Down
5 changes: 4 additions & 1 deletion rivetkit-typescript/packages/rivetkit/src/db/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ export interface NativeSqliteConfig {
* Replaces the transport-config-based NativeSqliteConfig seam.
*/
export interface NativeDatabaseProvider {
open(actorId: string): Promise<RawAccess>;
open(
actorId: string,
preloadedEntries?: [Uint8Array, Uint8Array][],
): Promise<RawAccess>;
}

/**
Expand Down
5 changes: 4 additions & 1 deletion rivetkit-typescript/packages/rivetkit/src/db/mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,10 @@ export function db({
// path where databases are opened from a live runtime handle
// (e.g., the native envoy client).
if (ctx.nativeDatabaseProvider) {
return await ctx.nativeDatabaseProvider.open(ctx.actorId);
return await ctx.nativeDatabaseProvider.open(
ctx.actorId,
ctx.preloadedEntries,
);
}

const { database: db, kvStore } = await openActorDatabase(ctx);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -580,10 +580,14 @@ export class EngineActorDriver implements ActorDriver {

const envoy = this.#envoy;
return {
open: async (actorId: string) => {
open: async (
actorId: string,
preloadedEntries?: [Uint8Array, Uint8Array][],
) => {
return await nativeMod.openRawDatabaseFromEnvoy(
envoy,
actorId,
preloadedEntries,
);
},
};
Expand Down
Loading
Loading