From 3a41da6118d07655536b3571abf5f8527b4a7f2a Mon Sep 17 00:00:00 2001 From: joshua-spacetime Date: Thu, 30 Apr 2026 15:24:31 -0700 Subject: [PATCH 1/8] wip --- crates/core/src/host/module_host.rs | 330 ++++++++++++++++++++-------- crates/core/src/host/scheduler.rs | 39 ++++ 2 files changed, 281 insertions(+), 88 deletions(-) diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index 88c45a3f867..5c8697e625a 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -352,9 +352,19 @@ enum ModuleHostInner { Js(Box), } +/// Wasm has two instance managers: one for procedures and one for reducers/views. +/// +/// Reducers are executed serially by the `SingleCoreExecutor`, so the instance pool +/// contains only a single instance. +/// +/// This is not the case for procedures which can be executed concurrently. +/// +/// Both managers need to be able to create fresh instances from the same compiled module, +/// so they share the module via `Arc`. struct WasmtimeModuleHost { executor: SingleCoreExecutor, - instance_manager: ModuleInstanceManager, + main_instance: ModuleInstanceManager>, + procedure_instances: ModuleInstanceManager>, } struct V8ModuleHost { @@ -406,6 +416,16 @@ impl GenericModule for super::v8::JsModule { } } +impl GenericModule for Arc { + type Instance = M::Instance; + async fn create_instance(&self) -> Self::Instance { + (**self).create_instance().await + } + fn host_type(&self) -> HostType { + (**self).host_type() + } +} + impl GenericModuleInstance for super::v8::JsInstance { fn trapped(&self) -> bool { self.trapped() @@ -747,102 +767,87 @@ impl CallProcedureParams { /// Holds a [`Module`] and a set of [`Instance`]s from it, /// and allocates the [`Instance`]s to be used for function calls. /// -/// Capable of managing and allocating multiple instances of the same module, -/// but this functionality is currently unused, as only one reducer runs at a time. -/// When we introduce procedures, it will be necessary to have multiple instances, -/// as each procedure invocation will have its own sandboxed instance, -/// and multiple procedures can run concurrently with up to one reducer. +/// This can either back a single long-lived serialized execution lane +/// or a pool that grows to support concurrent procedure execution. struct ModuleInstanceManager { instances: Mutex>, module: M, - module_instances_metric: ModuleInstancesMetric, - create_instance_time_metric: CreateInstanceTimeMetric, + metrics: InstanceManagerMetrics, } -/// Handle on the `spacetime_module_create_instance_time_seconds` label for a particular database -/// which calls `remove_label_values` to clean up on drop. -struct CreateInstanceTimeMetric { - metric: Histogram, - host_type: HostType, - database_identity: Identity, +/// The [`ModuleHost`] now manages two instance pools for wasm: one for +/// reducers/views and another for procedures, and both write to the same +/// metric labels. Therefore both must share the same handles or else each +/// copy would try to unregister the same metric labels independently on drop. +#[derive(Clone)] +struct InstanceManagerMetrics { + inner: Arc, } -/// Handle on the `spacetime_module_instances` label for a particular database -/// which calls `remove_label_values` to clean up on drop. -struct ModuleInstancesMetric { - metric: IntGauge, +/// Shared metric state for all instance managers of a module host. +struct InstanceManagerMetricsInner { + module_instances_metric: IntGauge, + create_instance_time_metric: Histogram, host_type: HostType, database_identity: Identity, - count: std::sync::Mutex, } -impl Drop for CreateInstanceTimeMetric { +impl Drop for InstanceManagerMetricsInner { fn drop(&mut self) { let _ = WORKER_METRICS .module_create_instance_time_seconds .remove_label_values(&self.database_identity, &self.host_type); - } -} - -impl Drop for ModuleInstancesMetric { - fn drop(&mut self) { let _ = WORKER_METRICS .module_instances .remove_label_values(&self.database_identity, &self.host_type); } } -impl ModuleInstancesMetric { - fn inc(&self) { - let mut count = self.count.lock().unwrap(); - *count += 1; - self.metric.set(*count); +impl InstanceManagerMetrics { + fn new(host_type: HostType, database_identity: Identity) -> Self { + Self { + inner: Arc::new(InstanceManagerMetricsInner { + module_instances_metric: WORKER_METRICS + .module_instances + .with_label_values(&database_identity, &host_type), + create_instance_time_metric: WORKER_METRICS + .module_create_instance_time_seconds + .with_label_values(&database_identity, &host_type), + host_type, + database_identity, + }), + } } - fn dec(&self) { - let mut count = self.count.lock().unwrap(); - if *count == 0 { - return; - } - *count -= 1; - self.metric.set(*count); + fn inc_instances(&self) { + self.inner.module_instances_metric.inc(); } -} -impl CreateInstanceTimeMetric { - fn observe(&self, duration: std::time::Duration) { - self.metric.observe(duration.as_secs_f64()); + fn dec_instances(&self) { + self.inner.module_instances_metric.dec(); + } + + fn observe_create_instance_time(&self, duration: std::time::Duration) { + self.inner.create_instance_time_metric.observe(duration.as_secs_f64()); } } impl ModuleInstanceManager { - fn new(module: M, init_inst: Option, database_identity: Identity) -> Self { + fn new(module: M, init_inst: Option, metrics: InstanceManagerMetrics) -> Self { let host_type = module.host_type(); - let module_instances_metric = ModuleInstancesMetric { - metric: WORKER_METRICS - .module_instances - .with_label_values(&database_identity, &host_type), - host_type, - database_identity, - count: std::sync::Mutex::new(1), - }; - - let create_instance_time_metric = CreateInstanceTimeMetric { - metric: WORKER_METRICS - .module_create_instance_time_seconds - .with_label_values(&database_identity, &host_type), - host_type, - database_identity, - }; + debug_assert_eq!(metrics.inner.host_type, host_type); let mut instances = VecDeque::new(); + let has_init_inst = init_inst.is_some(); instances.extend(init_inst); + if has_init_inst { + metrics.inc_instances(); + } Self { instances: Mutex::new(instances), module, - module_instances_metric, - create_instance_time_metric, + metrics, } } @@ -861,8 +866,8 @@ impl ModuleInstanceManager { let start_time = std::time::Instant::now(); let res = self.module.create_instance().await; let elapsed_time = start_time.elapsed(); - self.create_instance_time_metric.observe(elapsed_time); - self.module_instances_metric.inc(); + self.metrics.observe_create_instance_time(elapsed_time); + self.metrics.inc_instances(); res } } @@ -872,7 +877,7 @@ impl ModuleInstanceManager { // Don't return trapped instances; // they may have left internal data structures in the guest `Instance` // (WASM linear memory, V8 global scope) in a bad state. - self.module_instances_metric.dec(); + self.metrics.dec_instances(); return; } @@ -1082,16 +1087,21 @@ impl ModuleHost { init_inst, } => { info = module.info(); - let instance_manager = ModuleInstanceManager::new(module, Some(init_inst), database_identity); + let module = Arc::new(module); + let metrics = InstanceManagerMetrics::new(module.host_type(), database_identity); + let instance_manager = ModuleInstanceManager::new(module.clone(), Some(init_inst), metrics.clone()); + let procedure_instances = ModuleInstanceManager::new(module, None, metrics); Arc::new(ModuleHostInner::Wasm(Box::new(WasmtimeModuleHost { executor, - instance_manager, + main_instance: instance_manager, + procedure_instances, }))) } ModuleWithInstance::Js { module, init_inst } => { info = module.info(); let instance_lane = super::v8::JsInstanceLane::new(module.clone(), init_inst); - let procedure_instances = ModuleInstanceManager::new(module.clone(), None, database_identity); + let metrics = InstanceManagerMetrics::new(module.host_type(), database_identity); + let procedure_instances = ModuleInstanceManager::new(module.clone(), None, metrics); Arc::new(ModuleHostInner::Js(Box::new(V8ModuleHost { module, instance_lane, @@ -1232,7 +1242,7 @@ impl ModuleHost { Ok(match &*self.inner { ModuleHostInner::Wasm(wasm) => { let executor = &wasm.executor; - let instance_manager = &wasm.instance_manager; + let instance_manager = &wasm.main_instance; instance_manager .with_instance(async |inst| work_wasm(timer_guard, executor, inst, arg).await) .await @@ -1285,9 +1295,8 @@ impl ModuleHost { /// Run a function for this module using pooled instances. /// - /// For WASM, this is identical to [`Self::call`]. - /// For V8/JS, this uses the pooled procedure instances instead of the - /// single instance lane. + /// For WASM and V8/JS, this uses the pooled procedure instances instead of the + /// serialized reducer/view execution lane. async fn call_pooled( &self, label: &str, @@ -1309,7 +1318,7 @@ impl ModuleHost { Ok(match &*self.inner { ModuleHostInner::Wasm(host) => { - host.instance_manager + host.procedure_instances .with_instance(async |mut inst| { host.executor .run_job(async move || { @@ -1333,13 +1342,37 @@ impl ModuleHost { } async fn call_view_command(&self, label: &str, cmd: ViewCommand) -> Result { - self.call_pooled( - label, - cmd, - async |cmd, inst| Ok::<_, ViewCallError>(inst.call_view(cmd)), - async |cmd, inst| Ok::<_, ViewCallError>(inst.call_view(cmd).await), - ) - .await? + self.guard_closed()?; + let timer_guard = self.start_call_timer(label); + + scopeguard::defer_on_unwind!({ + log::warn!("pooled operation {label} panicked"); + (self.on_panic)(); + }); + + match &*self.inner { + ModuleHostInner::Wasm(host) => { + host.main_instance + .with_instance(async |mut inst| { + host.executor + .run_job(async move || { + drop(timer_guard); + (Ok::<_, ViewCallError>(inst.call_view(cmd)), inst) + }) + .await + }) + .await + } + ModuleHostInner::Js(host) => { + host.procedure_instances + .with_instance(async |inst| { + drop(timer_guard); + let res = Ok::<_, ViewCallError>(inst.call_view(cmd).await); + (res, inst) + }) + .await + } + } } pub async fn disconnect_client(&self, client_id: ClientActorId) { @@ -1940,13 +1973,44 @@ impl ModuleHost { &self, params: ScheduledFunctionParams, ) -> Result { - self.call_pooled( - "unknown scheduled function", - params, - async move |params, inst| Ok(inst.call_scheduled_function(params).await), - async move |params, inst| Ok(inst.call_scheduled_function(params).await), - ) - .await? + self.guard_closed()?; + let label = "unknown scheduled function"; + let timer_guard = self.start_call_timer(label); + + scopeguard::defer_on_unwind!({ + log::warn!("pooled operation {label} panicked"); + (self.on_panic)(); + }); + + match &*self.inner { + ModuleHostInner::Wasm(host) => { + let manager = if params.uses_procedure_pool(&self.info) { + &host.procedure_instances + } else { + &host.main_instance + }; + + manager + .with_instance(async |mut inst| { + host.executor + .run_job(async move || { + drop(timer_guard); + (Ok(inst.call_scheduled_function(params).await), inst) + }) + .await + }) + .await + } + ModuleHostInner::Js(host) => { + host.procedure_instances + .with_instance(async |inst| { + drop(timer_guard); + let res = Ok(inst.call_scheduled_function(params).await); + (res, inst) + }) + .await + } + } } /// Materializes the views return by the `view_collector`, if not already materialized, @@ -2510,14 +2574,14 @@ impl ModuleHost { pub(crate) fn replica_ctx(&self) -> &ReplicaContext { match &*self.inner { - ModuleHostInner::Wasm(wasm) => wasm.instance_manager.module.replica_ctx(), + ModuleHostInner::Wasm(wasm) => wasm.main_instance.module.replica_ctx(), ModuleHostInner::Js(js) => js.module.replica_ctx(), } } fn scheduler(&self) -> &Scheduler { match &*self.inner { - ModuleHostInner::Wasm(wasm) => wasm.instance_manager.module.scheduler(), + ModuleHostInner::Wasm(wasm) => wasm.main_instance.module.scheduler(), ModuleHostInner::Js(js) => js.module.scheduler(), } } @@ -2550,7 +2614,9 @@ fn args_error_log_message(function_kind: &str, function_name: &str) -> String { #[cfg(test)] mod tests { - use super::ModuleHost; + use super::{ + GenericModule, GenericModuleInstance, HostType, InstanceManagerMetrics, ModuleHost, ModuleInstanceManager, + }; use crate::client::{ ClientActorId, ClientConfig, ClientConnectionReceiver, ClientConnectionSender, OutboundMessage, Protocol, WsVersion, @@ -2561,8 +2627,38 @@ mod tests { use spacetimedb_lib::identity::AuthCtx; use spacetimedb_lib::{AlgebraicType, Identity}; use spacetimedb_sats::product; + use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; + #[derive(Clone)] + struct TestModule { + next_id: Arc, + } + + struct TestInstance { + id: usize, + trapped: bool, + } + + impl GenericModule for TestModule { + type Instance = TestInstance; + + async fn create_instance(&self) -> Self::Instance { + let id = self.next_id.fetch_add(1, Ordering::SeqCst); + TestInstance { id, trapped: false } + } + + fn host_type(&self) -> HostType { + HostType::Wasm + } + } + + impl GenericModuleInstance for TestInstance { + fn trapped(&self) -> bool { + self.trapped + } + } + fn v2_client_config() -> ClientConfig { ClientConfig { protocol: Protocol::Binary, @@ -2656,4 +2752,62 @@ mod tests { Ok(()) } + + #[test] + fn wasm_serialized_instance_is_reused_until_trap() -> anyhow::Result<()> { + let runtime = tokio::runtime::Runtime::new()?; + let module = TestModule { + next_id: Arc::new(AtomicUsize::new(1)), + }; + let metrics = InstanceManagerMetrics::new(HostType::Wasm, Identity::ZERO); + let manager = ModuleInstanceManager::new(module, Some(TestInstance { id: 0, trapped: false }), metrics); + + let first = runtime.block_on(manager.with_instance(async |inst| (inst.id, inst))); + let second = runtime.block_on(manager.with_instance(async |inst| (inst.id, inst))); + assert_eq!(first, 0); + assert_eq!(second, 0); + + let trapped = runtime.block_on(manager.with_instance(async |mut inst| { + inst.trapped = true; + (inst.id, inst) + })); + assert_eq!(trapped, 0); + + let replacement = runtime.block_on(manager.with_instance(async |inst| (inst.id, inst))); + assert_eq!(replacement, 1); + + Ok(()) + } + + #[test] + fn wasm_procedure_pool_does_not_consume_serialized_instance() -> anyhow::Result<()> { + let runtime = tokio::runtime::Runtime::new()?; + let module = Arc::new(TestModule { + next_id: Arc::new(AtomicUsize::new(1)), + }); + let metrics = InstanceManagerMetrics::new(HostType::Wasm, Identity::ZERO); + let serialized = ModuleInstanceManager::new( + module.clone(), + Some(TestInstance { id: 0, trapped: false }), + metrics.clone(), + ); + let procedures = ModuleInstanceManager::new(module, None, metrics); + + let (serialized_id, procedure_id, serialized_reuse) = runtime.block_on(async { + let serialized_inst = serialized.get_instance().await; + let procedure_inst = procedures.get_instance().await; + let serialized_id = serialized_inst.id; + let procedure_id = procedure_inst.id; + serialized.return_instance(serialized_inst).await; + procedures.return_instance(procedure_inst).await; + let serialized_reuse = serialized.with_instance(async |inst| (inst.id, inst)).await; + (serialized_id, procedure_id, serialized_reuse) + }); + + assert_eq!(serialized_id, 0); + assert_eq!(procedure_id, 1); + assert_eq!(serialized_reuse, 0); + + Ok(()) + } } diff --git a/crates/core/src/host/scheduler.rs b/crates/core/src/host/scheduler.rs index d3b285e9f16..908d187ed70 100644 --- a/crates/core/src/host/scheduler.rs +++ b/crates/core/src/host/scheduler.rs @@ -277,6 +277,45 @@ enum QueueItem { #[derive(Clone)] pub(crate) struct ScheduledFunctionParams(QueueItem); +impl ScheduledFunctionParams { + pub(crate) fn uses_procedure_pool(&self, module: &ModuleInfo) -> bool { + match &self.0 { + QueueItem::VolatileNonatomicImmediate { function_name, .. } => { + module.module_def.procedure_full(function_name.as_str()).is_some() + } + QueueItem::Id { id, .. } => { + let db = &**module.relational_db(); + let tx = db.begin_tx(Workload::Internal); + let table_id_col = StScheduledFields::TableId.col_id(); + let function_name_col = StScheduledFields::ReducerName.col_id(); + + match db.iter_by_col_eq(&tx, ST_SCHEDULED_ID, table_id_col, &id.table_id.into()) { + Ok(mut rows) => match rows.next() { + Some(row) => match row.read_col::>(function_name_col) { + Ok(function_name) => module.module_def.procedure_full(function_name.as_ref()).is_some(), + Err(err) => { + log::warn!( + "failed to read scheduled function name for table {}: {err:#}", + id.table_id + ); + false + } + }, + None => false, + }, + Err(err) => { + log::warn!( + "failed to scan scheduled function metadata for table {}: {err:#}", + id.table_id + ); + false + } + } + } + } + } +} + #[derive(thiserror::Error, Debug)] pub(crate) enum CallScheduledFunctionError { #[error(transparent)] From 5060fe6345e2448850f3b8f6790585766657e80b Mon Sep 17 00:00:00 2001 From: joshua-spacetime Date: Thu, 30 Apr 2026 16:41:04 -0700 Subject: [PATCH 2/8] more wip --- crates/core/src/host/module_host.rs | 257 +++++++++++----------------- crates/core/src/host/scheduler.rs | 9 +- crates/core/src/host/v8/mod.rs | 16 ++ 3 files changed, 122 insertions(+), 160 deletions(-) diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index 5c8697e625a..b5533e03d8f 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -367,6 +367,12 @@ struct WasmtimeModuleHost { procedure_instances: ModuleInstanceManager>, } +#[derive(Clone, Copy)] +enum InstanceKind { + Main, + Procedure, +} + struct V8ModuleHost { module: super::v8::JsModule, instance_lane: super::v8::JsInstanceLane, @@ -1074,6 +1080,16 @@ pub struct RefInstance<'a, I: WasmInstance> { } impl ModuleHost { + fn wasm_instance_manager<'a>( + host: &'a WasmtimeModuleHost, + kind: InstanceKind, + ) -> &'a ModuleInstanceManager> { + match kind { + InstanceKind::Main => &host.main_instance, + InstanceKind::Procedure => &host.procedure_instances, + } + } + pub(super) fn new( module: ModuleWithInstance, on_panic: impl Fn() + Send + Sync + 'static, @@ -1213,44 +1229,6 @@ impl ModuleHost { }) } - /// Run a function for this module which has access to the module instance. - async fn with_instance( - &self, - kind: &str, - label: &str, - arg: A, - timer: impl FnOnce(&str) -> Guard, - work_wasm: impl AsyncFnOnce(Guard, &SingleCoreExecutor, Box, A) -> (R, Box), - work_js: impl AsyncFnOnce(Guard, &super::v8::JsInstanceLane, A) -> R, - ) -> Result { - self.guard_closed()?; - let timer_guard = timer(label); - - // Operations on module instances (e.g. calling reducers) is blocking, - // partially because the computation can potentially take a long time - // and partially because interacting with the database requires taking - // a blocking lock. So, we run `f` on a dedicated thread with `self.executor`. - // This will bubble up any panic that may occur. - - // If a function call panics, we **must** ensure to call `self.on_panic` - // so that the module is discarded by the host controller. - scopeguard::defer_on_unwind!({ - log::warn!("{kind} {label} panicked"); - (self.on_panic)(); - }); - - Ok(match &*self.inner { - ModuleHostInner::Wasm(wasm) => { - let executor = &wasm.executor; - let instance_manager = &wasm.main_instance; - instance_manager - .with_instance(async |inst| work_wasm(timer_guard, executor, inst, arg).await) - .await - } - ModuleHostInner::Js(js) => work_js(timer_guard, &js.instance_lane, arg).await, - }) - } - /// Run a function for this module which has access to the module instance. /// /// For WASM, the function is run on the module's JobThread. @@ -1266,113 +1244,32 @@ impl ModuleHost { R: Send + 'static, A: Send + 'static, { - self.with_instance( + self.call_with_selected_instance( "reducer", label, + InstanceKind::Main, arg, - |l| self.start_call_timer(l), - // Operations on module instances (e.g. calling reducers) is blocking, - // partially because the computation can potentially take a long time - // and partially because interacting with the database requires taking a blocking lock. - // So, we run `work` on a dedicated thread with `self.executor`. - // This will bubble up any panic that may occur. - async move |timer_guard, executor, mut inst, arg| { - executor - .run_job(async move || { - drop(timer_guard); - (wasm(arg, &mut inst).await, inst) - }) - .await - }, - async move |timer_guard, inst, arg| { + wasm, + async move |arg, lane| { super::v8::assert_not_on_js_module_thread(label); - drop(timer_guard); - js(arg, inst).await + js(arg, lane).await }, + async move |_arg, _inst| unreachable!("main-instance call should not use pooled JS instances"), ) .await } - /// Run a function for this module using pooled instances. - /// - /// For WASM and V8/JS, this uses the pooled procedure instances instead of the - /// serialized reducer/view execution lane. - async fn call_pooled( - &self, - label: &str, - arg: A, - wasm: impl AsyncFnOnce(A, &mut ModuleInstance) -> R + Send + 'static, - js: impl AsyncFnOnce(A, &JsInstance) -> R, - ) -> Result - where - R: Send + 'static, - A: Send + 'static, - { - self.guard_closed()?; - let timer_guard = self.start_call_timer(label); - - scopeguard::defer_on_unwind!({ - log::warn!("pooled operation {label} panicked"); - (self.on_panic)(); - }); - - Ok(match &*self.inner { - ModuleHostInner::Wasm(host) => { - host.procedure_instances - .with_instance(async |mut inst| { - host.executor - .run_job(async move || { - drop(timer_guard); - (wasm(arg, &mut inst).await, inst) - }) - .await - }) - .await - } - ModuleHostInner::Js(host) => { - host.procedure_instances - .with_instance(async |inst| { - drop(timer_guard); - let res = js(arg, &inst).await; - (res, inst) - }) - .await - } - }) - } - async fn call_view_command(&self, label: &str, cmd: ViewCommand) -> Result { - self.guard_closed()?; - let timer_guard = self.start_call_timer(label); - - scopeguard::defer_on_unwind!({ - log::warn!("pooled operation {label} panicked"); - (self.on_panic)(); - }); - - match &*self.inner { - ModuleHostInner::Wasm(host) => { - host.main_instance - .with_instance(async |mut inst| { - host.executor - .run_job(async move || { - drop(timer_guard); - (Ok::<_, ViewCallError>(inst.call_view(cmd)), inst) - }) - .await - }) - .await - } - ModuleHostInner::Js(host) => { - host.procedure_instances - .with_instance(async |inst| { - drop(timer_guard); - let res = Ok::<_, ViewCallError>(inst.call_view(cmd).await); - (res, inst) - }) - .await - } - } + self.call_with_selected_instance( + "view operation", + label, + InstanceKind::Main, + cmd, + async |cmd, inst| Ok::<_, ViewCallError>(inst.call_view(cmd)), + async |cmd, lane| lane.call_view(cmd).await, + async |_cmd, _inst| unreachable!("main-instance view call should not use pooled JS instances"), + ) + .await? } pub async fn disconnect_client(&self, client_id: ClientActorId) { @@ -1960,57 +1857,101 @@ impl ModuleHost { name: &str, params: CallProcedureParams, ) -> Result { - self.call_pooled( + self.call_with_selected_instance( + "pooled operation", name, + InstanceKind::Procedure, params, async move |params, inst| inst.call_procedure(params).await, + async move |_params, _lane| unreachable!("procedure call should not use the serialized JS lane"), async move |params, inst| inst.call_procedure(params).await, ) .await } - pub(super) async fn call_scheduled_function( + pub(super) async fn call_scheduled_reducer( + &self, + params: ScheduledFunctionParams, + ) -> Result { + self.call_with_selected_instance( + "scheduled operation", + "scheduled reducer", + InstanceKind::Main, + params, + async |params, inst| Ok(inst.call_scheduled_function(params).await), + async |params, lane| Ok(lane.call_scheduled_function(params).await), + async |_params, _inst| unreachable!("scheduled reducer should not use pooled JS instances"), + ) + .await? + } + + pub(super) async fn call_scheduled_procedure( &self, params: ScheduledFunctionParams, ) -> Result { + self.call_with_selected_instance( + "scheduled operation", + "scheduled procedure", + InstanceKind::Procedure, + params, + async |params, inst| Ok(inst.call_scheduled_function(params).await), + async |_params, _lane| unreachable!("scheduled procedure should not use the serialized JS lane"), + async |params, inst| Ok(inst.call_scheduled_function(params).await), + ) + .await? + } + + async fn call_with_selected_instance( + &self, + kind: &str, + label: &str, + instance_kind: InstanceKind, + arg: A, + wasm: impl AsyncFnOnce(A, &mut ModuleInstance) -> R + Send + 'static, + js_main: impl AsyncFnOnce(A, &super::v8::JsInstanceLane) -> R, + js_procedure: impl AsyncFnOnce(A, &JsInstance) -> R, + ) -> Result + where + R: Send + 'static, + A: Send + 'static, + { self.guard_closed()?; - let label = "unknown scheduled function"; let timer_guard = self.start_call_timer(label); scopeguard::defer_on_unwind!({ - log::warn!("pooled operation {label} panicked"); + log::warn!("{kind} {label} panicked"); (self.on_panic)(); }); - match &*self.inner { + Ok(match &*self.inner { ModuleHostInner::Wasm(host) => { - let manager = if params.uses_procedure_pool(&self.info) { - &host.procedure_instances - } else { - &host.main_instance - }; - - manager + Self::wasm_instance_manager(host, instance_kind) .with_instance(async |mut inst| { host.executor .run_job(async move || { drop(timer_guard); - (Ok(inst.call_scheduled_function(params).await), inst) + (wasm(arg, &mut inst).await, inst) }) .await }) .await } - ModuleHostInner::Js(host) => { - host.procedure_instances - .with_instance(async |inst| { - drop(timer_guard); - let res = Ok(inst.call_scheduled_function(params).await); - (res, inst) - }) - .await - } - } + ModuleHostInner::Js(host) => match instance_kind { + InstanceKind::Main => { + drop(timer_guard); + js_main(arg, &host.instance_lane).await + } + InstanceKind::Procedure => { + host.procedure_instances + .with_instance(async |inst| { + drop(timer_guard); + let res = js_procedure(arg, &inst).await; + (res, inst) + }) + .await + } + }, + }) } /// Materializes the views return by the `view_collector`, if not already materialized, diff --git a/crates/core/src/host/scheduler.rs b/crates/core/src/host/scheduler.rs index 908d187ed70..017a2f20e4f 100644 --- a/crates/core/src/host/scheduler.rs +++ b/crates/core/src/host/scheduler.rs @@ -278,7 +278,7 @@ enum QueueItem { pub(crate) struct ScheduledFunctionParams(QueueItem); impl ScheduledFunctionParams { - pub(crate) fn uses_procedure_pool(&self, module: &ModuleInfo) -> bool { + pub(crate) fn is_procedure(&self, module: &ModuleInfo) -> bool { match &self.0 { QueueItem::VolatileNonatomicImmediate { function_name, .. } => { module.module_def.procedure_full(function_name.as_str()).is_some() @@ -379,7 +379,12 @@ impl SchedulerActor { return; }; - let result = module_host.call_scheduled_function(ScheduledFunctionParams(item)).await; + let params = ScheduledFunctionParams(item); + let result = if params.is_procedure(module_host.info()) { + module_host.call_scheduled_procedure(params).await + } else { + module_host.call_scheduled_reducer(params).await + }; match result { // If the module already exited, leave the `ScheduledFunction` in diff --git a/crates/core/src/host/v8/mod.rs b/crates/core/src/host/v8/mod.rs index 287d1eedf6b..92a23746b78 100644 --- a/crates/core/src/host/v8/mod.rs +++ b/crates/core/src/host/v8/mod.rs @@ -1258,6 +1258,22 @@ impl JsInstanceLane { .await .map_err(|_| ViewCallError::InternalError(instance_lane_worker_error("call_view"))) } + + /// Run a scheduled function on the instance lane exactly once. + /// + /// If the worker disappears before replying, we replace it for future + /// requests and panic so the host discards the broken module generation. + pub(in crate::host) async fn call_scheduled_function( + &self, + params: ScheduledFunctionParams, + ) -> CallScheduledFunctionResult { + self.run_once("call_scheduled_function", |inst: JsInstance| async move { + inst.send_request(|reply_tx| JsWorkerRequest::CallScheduledFunction { reply_tx, params }) + .await + }) + .await + .unwrap_or_else(|_| panic!("{}", instance_lane_worker_error("call_scheduled_function"))) + } } /// Performs some of the startup work of [`spawn_instance_worker`]. From 2b5ab31aebe1cc2ea2969e6da6d640ac84327ba8 Mon Sep 17 00:00:00 2001 From: joshua-spacetime Date: Thu, 30 Apr 2026 17:22:23 -0700 Subject: [PATCH 3/8] yet more wip --- crates/core/src/host/instance_env.rs | 7 + crates/core/src/host/module_host.rs | 214 ++++++++------------------- crates/core/src/host/scheduler.rs | 127 +++++++--------- 3 files changed, 120 insertions(+), 228 deletions(-) diff --git a/crates/core/src/host/instance_env.rs b/crates/core/src/host/instance_env.rs index 0d3d41632b1..86751aae70c 100644 --- a/crates/core/src/host/instance_env.rs +++ b/crates/core/src/host/instance_env.rs @@ -401,6 +401,12 @@ impl InstanceEnv { table_id: TableId, row_ptr: RowPointer, ) -> Result<(), NodesError> { + let table = stdb.schema_for_table_mut(tx, table_id)?; + let schedule = table + .schedule + .as_ref() + .expect("schedule_row should only be called when we know its a scheduler table"); + let function_name: Box = (&schedule.function_name[..]).into(); let (id_column, at_column) = stdb .table_scheduled_id_and_at(tx, table_id)? .expect("schedule_row should only be called when we know its a scheduler table"); @@ -414,6 +420,7 @@ impl InstanceEnv { .schedule( table_id, schedule_id, + function_name, schedule_at, id_column, at_column, diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index b5533e03d8f..d87edf01c2d 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -1080,10 +1080,10 @@ pub struct RefInstance<'a, I: WasmInstance> { } impl ModuleHost { - fn wasm_instance_manager<'a>( - host: &'a WasmtimeModuleHost, + fn wasm_instance_manager( + host: &WasmtimeModuleHost, kind: InstanceKind, - ) -> &'a ModuleInstanceManager> { + ) -> &ModuleInstanceManager> { match kind { InstanceKind::Main => &host.main_instance, InstanceKind::Procedure => &host.procedure_instances, @@ -1229,6 +1229,59 @@ impl ModuleHost { }) } + async fn with_instance( + &self, + kind: &str, + label: &str, + instance_kind: InstanceKind, + arg: A, + wasm: impl AsyncFnOnce(A, &mut ModuleInstance) -> R + Send + 'static, + js_main: impl AsyncFnOnce(A, &super::v8::JsInstanceLane) -> R, + js_procedure: impl AsyncFnOnce(A, &JsInstance) -> R, + ) -> Result + where + R: Send + 'static, + A: Send + 'static, + { + self.guard_closed()?; + let timer_guard = self.start_call_timer(label); + + scopeguard::defer_on_unwind!({ + log::warn!("{kind} {label} panicked"); + (self.on_panic)(); + }); + + Ok(match &*self.inner { + ModuleHostInner::Wasm(host) => { + Self::wasm_instance_manager(host, instance_kind) + .with_instance(async |mut inst| { + host.executor + .run_job(async move || { + drop(timer_guard); + (wasm(arg, &mut inst).await, inst) + }) + .await + }) + .await + } + ModuleHostInner::Js(host) => match instance_kind { + InstanceKind::Main => { + drop(timer_guard); + js_main(arg, &host.instance_lane).await + } + InstanceKind::Procedure => { + host.procedure_instances + .with_instance(async |inst| { + drop(timer_guard); + let res = js_procedure(arg, &inst).await; + (res, inst) + }) + .await + } + }, + }) + } + /// Run a function for this module which has access to the module instance. /// /// For WASM, the function is run on the module's JobThread. @@ -1244,7 +1297,7 @@ impl ModuleHost { R: Send + 'static, A: Send + 'static, { - self.call_with_selected_instance( + self.with_instance( "reducer", label, InstanceKind::Main, @@ -1260,7 +1313,7 @@ impl ModuleHost { } async fn call_view_command(&self, label: &str, cmd: ViewCommand) -> Result { - self.call_with_selected_instance( + self.with_instance( "view operation", label, InstanceKind::Main, @@ -1857,7 +1910,7 @@ impl ModuleHost { name: &str, params: CallProcedureParams, ) -> Result { - self.call_with_selected_instance( + self.with_instance( "pooled operation", name, InstanceKind::Procedure, @@ -1873,7 +1926,7 @@ impl ModuleHost { &self, params: ScheduledFunctionParams, ) -> Result { - self.call_with_selected_instance( + self.with_instance( "scheduled operation", "scheduled reducer", InstanceKind::Main, @@ -1889,7 +1942,7 @@ impl ModuleHost { &self, params: ScheduledFunctionParams, ) -> Result { - self.call_with_selected_instance( + self.with_instance( "scheduled operation", "scheduled procedure", InstanceKind::Procedure, @@ -1901,59 +1954,6 @@ impl ModuleHost { .await? } - async fn call_with_selected_instance( - &self, - kind: &str, - label: &str, - instance_kind: InstanceKind, - arg: A, - wasm: impl AsyncFnOnce(A, &mut ModuleInstance) -> R + Send + 'static, - js_main: impl AsyncFnOnce(A, &super::v8::JsInstanceLane) -> R, - js_procedure: impl AsyncFnOnce(A, &JsInstance) -> R, - ) -> Result - where - R: Send + 'static, - A: Send + 'static, - { - self.guard_closed()?; - let timer_guard = self.start_call_timer(label); - - scopeguard::defer_on_unwind!({ - log::warn!("{kind} {label} panicked"); - (self.on_panic)(); - }); - - Ok(match &*self.inner { - ModuleHostInner::Wasm(host) => { - Self::wasm_instance_manager(host, instance_kind) - .with_instance(async |mut inst| { - host.executor - .run_job(async move || { - drop(timer_guard); - (wasm(arg, &mut inst).await, inst) - }) - .await - }) - .await - } - ModuleHostInner::Js(host) => match instance_kind { - InstanceKind::Main => { - drop(timer_guard); - js_main(arg, &host.instance_lane).await - } - InstanceKind::Procedure => { - host.procedure_instances - .with_instance(async |inst| { - drop(timer_guard); - let res = js_procedure(arg, &inst).await; - (res, inst) - }) - .await - } - }, - }) - } - /// Materializes the views return by the `view_collector`, if not already materialized, /// and updates `st_view_sub` accordingly. /// @@ -2555,9 +2555,7 @@ fn args_error_log_message(function_kind: &str, function_name: &str) -> String { #[cfg(test)] mod tests { - use super::{ - GenericModule, GenericModuleInstance, HostType, InstanceManagerMetrics, ModuleHost, ModuleInstanceManager, - }; + use super::ModuleHost; use crate::client::{ ClientActorId, ClientConfig, ClientConnectionReceiver, ClientConnectionSender, OutboundMessage, Protocol, WsVersion, @@ -2568,38 +2566,8 @@ mod tests { use spacetimedb_lib::identity::AuthCtx; use spacetimedb_lib::{AlgebraicType, Identity}; use spacetimedb_sats::product; - use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; - #[derive(Clone)] - struct TestModule { - next_id: Arc, - } - - struct TestInstance { - id: usize, - trapped: bool, - } - - impl GenericModule for TestModule { - type Instance = TestInstance; - - async fn create_instance(&self) -> Self::Instance { - let id = self.next_id.fetch_add(1, Ordering::SeqCst); - TestInstance { id, trapped: false } - } - - fn host_type(&self) -> HostType { - HostType::Wasm - } - } - - impl GenericModuleInstance for TestInstance { - fn trapped(&self) -> bool { - self.trapped - } - } - fn v2_client_config() -> ClientConfig { ClientConfig { protocol: Protocol::Binary, @@ -2693,62 +2661,4 @@ mod tests { Ok(()) } - - #[test] - fn wasm_serialized_instance_is_reused_until_trap() -> anyhow::Result<()> { - let runtime = tokio::runtime::Runtime::new()?; - let module = TestModule { - next_id: Arc::new(AtomicUsize::new(1)), - }; - let metrics = InstanceManagerMetrics::new(HostType::Wasm, Identity::ZERO); - let manager = ModuleInstanceManager::new(module, Some(TestInstance { id: 0, trapped: false }), metrics); - - let first = runtime.block_on(manager.with_instance(async |inst| (inst.id, inst))); - let second = runtime.block_on(manager.with_instance(async |inst| (inst.id, inst))); - assert_eq!(first, 0); - assert_eq!(second, 0); - - let trapped = runtime.block_on(manager.with_instance(async |mut inst| { - inst.trapped = true; - (inst.id, inst) - })); - assert_eq!(trapped, 0); - - let replacement = runtime.block_on(manager.with_instance(async |inst| (inst.id, inst))); - assert_eq!(replacement, 1); - - Ok(()) - } - - #[test] - fn wasm_procedure_pool_does_not_consume_serialized_instance() -> anyhow::Result<()> { - let runtime = tokio::runtime::Runtime::new()?; - let module = Arc::new(TestModule { - next_id: Arc::new(AtomicUsize::new(1)), - }); - let metrics = InstanceManagerMetrics::new(HostType::Wasm, Identity::ZERO); - let serialized = ModuleInstanceManager::new( - module.clone(), - Some(TestInstance { id: 0, trapped: false }), - metrics.clone(), - ); - let procedures = ModuleInstanceManager::new(module, None, metrics); - - let (serialized_id, procedure_id, serialized_reuse) = runtime.block_on(async { - let serialized_inst = serialized.get_instance().await; - let procedure_inst = procedures.get_instance().await; - let serialized_id = serialized_inst.id; - let procedure_id = procedure_inst.id; - serialized.return_instance(serialized_inst).await; - procedures.return_instance(procedure_inst).await; - let serialized_reuse = serialized.with_instance(async |inst| (inst.id, inst)).await; - (serialized_id, procedure_id, serialized_reuse) - }); - - assert_eq!(serialized_id, 0); - assert_eq!(procedure_id, 1); - assert_eq!(serialized_reuse, 0); - - Ok(()) - } } diff --git a/crates/core/src/host/scheduler.rs b/crates/core/src/host/scheduler.rs index 017a2f20e4f..105ef9fc4fb 100644 --- a/crates/core/src/host/scheduler.rs +++ b/crates/core/src/host/scheduler.rs @@ -13,7 +13,7 @@ use rustc_hash::FxHashMap; use spacetimedb_client_api_messages::energy::EnergyQuanta; use spacetimedb_datastore::execution_context::{ExecutionContext, ReducerContext, Workload}; use spacetimedb_datastore::locking_tx_datastore::MutTxId; -use spacetimedb_datastore::system_tables::{StFields, StScheduledFields, ST_SCHEDULED_ID}; +use spacetimedb_datastore::system_tables::{StScheduledFields, ST_SCHEDULED_ID}; use spacetimedb_datastore::traits::IsolationLevel; use spacetimedb_lib::scheduler::ScheduleAt; use spacetimedb_lib::Timestamp; @@ -53,7 +53,8 @@ enum MsgOrExit { enum SchedulerMessage { Schedule { id: ScheduledFunctionId, - /// The timestamp we'll tell the reducer it is. + function_name: Box, + /// The timestamp we'll tell the scheduled function it is. effective_at: Timestamp, /// The actual instant we're scheduling for. real_at: Instant, @@ -64,11 +65,6 @@ enum SchedulerMessage { }, } -pub struct ScheduledFunction { - function: Box, - bsatn_args: Vec, -} - #[derive(Clone)] pub struct Scheduler { tx: mpsc::UnboundedSender>, @@ -107,6 +103,7 @@ impl SchedulerStarter { // Find all Scheduled tables for st_scheduled_row in self.db.iter(&tx, ST_SCHEDULED_ID)? { let table_id = st_scheduled_row.read_col(StScheduledFields::TableId)?; + let function_name = st_scheduled_row.read_col::>(StScheduledFields::ReducerName)?; let (id_column, at_column) = self .db .table_scheduled_id_and_at(&tx, table_id)? @@ -118,7 +115,7 @@ impl SchedulerStarter { // Insert each entry (row) in the scheduled table into `queue`. for scheduled_row in self.db.iter(&tx, table_id)? { let (schedule_id, schedule_at) = get_schedule_from_row(&scheduled_row, id_column, at_column)?; - // calculate duration left to call the scheduled reducer + // Calculate the remaining delay before calling the scheduled function. let duration = schedule_at.to_duration_from(now_ts); let at = schedule_at.to_timestamp_from(now_ts); let id = ScheduledFunctionId { @@ -127,7 +124,8 @@ impl SchedulerStarter { id_column, at_column, }; - let key = queue.insert_at(QueueItem::Id { id, at }, now_instant + duration); + let function_name = function_name.clone(); + let key = queue.insert_at(QueueItem::Id { id, function_name, at }, now_instant + duration); // This should never happen as duplicate entries should be gated by unique // constraint violation in scheduled tables. @@ -155,7 +153,7 @@ impl SchedulerStarter { } } -/// The maximum `Duration` into the future that we can schedule a reducer. +/// The maximum `Duration` into the future that we can schedule a function. /// /// `tokio_utils::time::DelayQueue`, as of version 0.7.8, /// limits its scheduling to at most approx. 2 years into the future. @@ -198,10 +196,12 @@ impl Scheduler { /// Schedule a reducer/procedure to run from a scheduled table. /// /// `fn_start` is the timestamp of the start of the current reducer/procedure. + #[allow(clippy::too_many_arguments)] pub(super) fn schedule( &self, table_id: TableId, schedule_id: u64, + function_name: Box, schedule_at: ScheduleAt, id_column: ColId, at_column: ColId, @@ -238,6 +238,7 @@ impl Scheduler { id_column, at_column, }, + function_name, effective_at, real_at, })); @@ -270,50 +271,33 @@ struct SchedulerActor { #[derive(Clone)] enum QueueItem { - Id { id: ScheduledFunctionId, at: Timestamp }, - VolatileNonatomicImmediate { function_name: String, args: FunctionArgs }, + Id { + id: ScheduledFunctionId, + // Carry the function name on the queued item so dispatch can classify + // reducer vs procedure without re-reading `st_scheduled`. + function_name: Box, + at: Timestamp, + }, + VolatileNonatomicImmediate { + function_name: String, + args: FunctionArgs, + }, } #[derive(Clone)] pub(crate) struct ScheduledFunctionParams(QueueItem); impl ScheduledFunctionParams { - pub(crate) fn is_procedure(&self, module: &ModuleInfo) -> bool { + fn function_name(&self) -> &str { match &self.0 { - QueueItem::VolatileNonatomicImmediate { function_name, .. } => { - module.module_def.procedure_full(function_name.as_str()).is_some() - } - QueueItem::Id { id, .. } => { - let db = &**module.relational_db(); - let tx = db.begin_tx(Workload::Internal); - let table_id_col = StScheduledFields::TableId.col_id(); - let function_name_col = StScheduledFields::ReducerName.col_id(); - - match db.iter_by_col_eq(&tx, ST_SCHEDULED_ID, table_id_col, &id.table_id.into()) { - Ok(mut rows) => match rows.next() { - Some(row) => match row.read_col::>(function_name_col) { - Ok(function_name) => module.module_def.procedure_full(function_name.as_ref()).is_some(), - Err(err) => { - log::warn!( - "failed to read scheduled function name for table {}: {err:#}", - id.table_id - ); - false - } - }, - None => false, - }, - Err(err) => { - log::warn!( - "failed to scan scheduled function metadata for table {}: {err:#}", - id.table_id - ); - false - } - } - } + QueueItem::Id { function_name, .. } => function_name.as_ref(), + QueueItem::VolatileNonatomicImmediate { function_name, .. } => function_name.as_str(), } } + + pub(crate) fn is_procedure(&self, module: &ModuleInfo) -> bool { + module.module_def.procedure_full(self.function_name()).is_some() + } } #[derive(thiserror::Error, Debug)] @@ -346,6 +330,7 @@ impl SchedulerActor { match msg { SchedulerMessage::Schedule { id, + function_name, effective_at, real_at, } => { @@ -353,7 +338,14 @@ impl SchedulerActor { if let Some(key) = self.key_map.get(&id) { self.queue.remove(key); } - let key = self.queue.insert_at(QueueItem::Id { id, at: effective_at }, real_at); + let key = self.queue.insert_at( + QueueItem::Id { + id, + function_name, + at: effective_at, + }, + real_at, + ); self.key_map.insert(id, key); } SchedulerMessage::ScheduleImmediate { function_name, args } => { @@ -379,7 +371,7 @@ impl SchedulerActor { return; }; - let params = ScheduledFunctionParams(item); + let params = ScheduledFunctionParams(item.clone()); let result = if params.is_procedure(module_host.info()) { module_host.call_scheduled_procedure(params).await } else { @@ -396,9 +388,16 @@ impl SchedulerActor { Ok(CallScheduledFunctionResult { reschedule: Some(Reschedule { at_ts, at_real }), }) => { - if let Some(id) = id { + if let QueueItem::Id { id, function_name, .. } = item { // If this was repeated, we need to add it back to the queue. - let key = self.queue.insert_at(QueueItem::Id { id, at: at_ts }, at_real); + let key = self.queue.insert_at( + QueueItem::Id { + id, + function_name, + at: at_ts, + }, + at_real, + ); self.key_map.insert(id, key); } } @@ -650,15 +649,13 @@ fn call_params_for_queued_item( item: QueueItem, ) -> anyhow::Result> { Ok(Some(match item { - QueueItem::Id { id, at } => { + QueueItem::Id { id, function_name, at } => { let Some(schedule_row) = get_schedule_row_mut(tx, db, id)? else { // If the row is not found, it means the schedule is cancelled by the user. return Ok(None); }; - let ScheduledFunction { function, bsatn_args } = process_schedule(tx, db, id.table_id, &schedule_row)?; - - let fun_args = FunctionArgs::Bsatn(bsatn_args.into()); - function_to_call_params(module, &function, fun_args, Some(at))? + let fun_args = FunctionArgs::Bsatn(schedule_row.to_bsatn_vec()?.into()); + function_to_call_params(module, function_name.as_ref(), fun_args, Some(at))? } QueueItem::VolatileNonatomicImmediate { function_name, args } => { function_to_call_params(module, &function_name, args, None)? @@ -711,28 +708,6 @@ fn function_to_call_params( Ok((ts, instant, params)) } -/// Generate [`ScheduledFunction`] for given [`ScheduledFunctionId`]. -fn process_schedule( - tx: &MutTxId, - db: &RelationalDB, - table_id: TableId, - schedule_row: &RowRef<'_>, -) -> Result { - // Get reducer name from `ST_SCHEDULED` table. - let table_id_col = StScheduledFields::TableId.col_id(); - let function_name_col = StScheduledFields::ReducerName.col_id(); - let st_scheduled_row = db - .iter_by_col_eq_mut(tx, ST_SCHEDULED_ID, table_id_col, &table_id.into())? - .next() - .ok_or_else(|| anyhow!("Scheduled table with id {table_id} entry does not exist in `st_scheduled`"))?; - let function = st_scheduled_row.read_col::>(function_name_col)?; - - Ok(ScheduledFunction { - function, - bsatn_args: schedule_row.to_bsatn_vec()?, - }) -} - /// Returns the `schedule_row` for `id`. fn get_schedule_row_mut<'a>( tx: &'a MutTxId, From 18a0928845a4a193706524a96f040f612b181f07 Mon Sep 17 00:00:00 2001 From: joshua-spacetime Date: Thu, 30 Apr 2026 17:37:55 -0700 Subject: [PATCH 4/8] with main instance --- crates/core/src/host/module_host.rs | 104 ++++++++++++++++------------ 1 file changed, 60 insertions(+), 44 deletions(-) diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index d87edf01c2d..bee6167b848 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -1229,15 +1229,13 @@ impl ModuleHost { }) } - async fn with_instance( + async fn with_main_instance( &self, kind: &str, label: &str, - instance_kind: InstanceKind, arg: A, wasm: impl AsyncFnOnce(A, &mut ModuleInstance) -> R + Send + 'static, - js_main: impl AsyncFnOnce(A, &super::v8::JsInstanceLane) -> R, - js_procedure: impl AsyncFnOnce(A, &JsInstance) -> R, + js: impl AsyncFnOnce(A, &super::v8::JsInstanceLane) -> R, ) -> Result where R: Send + 'static, @@ -1253,7 +1251,7 @@ impl ModuleHost { Ok(match &*self.inner { ModuleHostInner::Wasm(host) => { - Self::wasm_instance_manager(host, instance_kind) + Self::wasm_instance_manager(host, InstanceKind::Main) .with_instance(async |mut inst| { host.executor .run_job(async move || { @@ -1264,21 +1262,55 @@ impl ModuleHost { }) .await } - ModuleHostInner::Js(host) => match instance_kind { - InstanceKind::Main => { - drop(timer_guard); - js_main(arg, &host.instance_lane).await - } - InstanceKind::Procedure => { - host.procedure_instances - .with_instance(async |inst| { - drop(timer_guard); - let res = js_procedure(arg, &inst).await; - (res, inst) - }) - .await - } - }, + ModuleHostInner::Js(host) => { + drop(timer_guard); + js(arg, &host.instance_lane).await + } + }) + } + + async fn with_procedure_instance( + &self, + kind: &str, + label: &str, + arg: A, + wasm: impl AsyncFnOnce(A, &mut ModuleInstance) -> R + Send + 'static, + js: impl AsyncFnOnce(A, &JsInstance) -> R, + ) -> Result + where + R: Send + 'static, + A: Send + 'static, + { + self.guard_closed()?; + let timer_guard = self.start_call_timer(label); + + scopeguard::defer_on_unwind!({ + log::warn!("{kind} {label} panicked"); + (self.on_panic)(); + }); + + Ok(match &*self.inner { + ModuleHostInner::Wasm(host) => { + Self::wasm_instance_manager(host, InstanceKind::Procedure) + .with_instance(async |mut inst| { + host.executor + .run_job(async move || { + drop(timer_guard); + (wasm(arg, &mut inst).await, inst) + }) + .await + }) + .await + } + ModuleHostInner::Js(host) => { + host.procedure_instances + .with_instance(async |inst| { + drop(timer_guard); + let res = js(arg, &inst).await; + (res, inst) + }) + .await + } }) } @@ -1297,30 +1329,20 @@ impl ModuleHost { R: Send + 'static, A: Send + 'static, { - self.with_instance( - "reducer", - label, - InstanceKind::Main, - arg, - wasm, - async move |arg, lane| { - super::v8::assert_not_on_js_module_thread(label); - js(arg, lane).await - }, - async move |_arg, _inst| unreachable!("main-instance call should not use pooled JS instances"), - ) + self.with_main_instance("reducer", label, arg, wasm, async move |arg, lane| { + super::v8::assert_not_on_js_module_thread(label); + js(arg, lane).await + }) .await } async fn call_view_command(&self, label: &str, cmd: ViewCommand) -> Result { - self.with_instance( + self.with_main_instance( "view operation", label, - InstanceKind::Main, cmd, async |cmd, inst| Ok::<_, ViewCallError>(inst.call_view(cmd)), async |cmd, lane| lane.call_view(cmd).await, - async |_cmd, _inst| unreachable!("main-instance view call should not use pooled JS instances"), ) .await? } @@ -1910,13 +1932,11 @@ impl ModuleHost { name: &str, params: CallProcedureParams, ) -> Result { - self.with_instance( + self.with_procedure_instance( "pooled operation", name, - InstanceKind::Procedure, params, async move |params, inst| inst.call_procedure(params).await, - async move |_params, _lane| unreachable!("procedure call should not use the serialized JS lane"), async move |params, inst| inst.call_procedure(params).await, ) .await @@ -1926,14 +1946,12 @@ impl ModuleHost { &self, params: ScheduledFunctionParams, ) -> Result { - self.with_instance( + self.with_main_instance( "scheduled operation", "scheduled reducer", - InstanceKind::Main, params, async |params, inst| Ok(inst.call_scheduled_function(params).await), async |params, lane| Ok(lane.call_scheduled_function(params).await), - async |_params, _inst| unreachable!("scheduled reducer should not use pooled JS instances"), ) .await? } @@ -1942,13 +1960,11 @@ impl ModuleHost { &self, params: ScheduledFunctionParams, ) -> Result { - self.with_instance( + self.with_procedure_instance( "scheduled operation", "scheduled procedure", - InstanceKind::Procedure, params, async |params, inst| Ok(inst.call_scheduled_function(params).await), - async |_params, _lane| unreachable!("scheduled procedure should not use the serialized JS lane"), async |params, inst| Ok(inst.call_scheduled_function(params).await), ) .await? From 8caaa47c3a6e9af260a0a4f593e26ec84627e77a Mon Sep 17 00:00:00 2001 From: joshua-spacetime Date: Thu, 30 Apr 2026 17:45:05 -0700 Subject: [PATCH 5/8] is this better --- crates/core/src/host/module_host.rs | 95 +++++++++++++++-------------- 1 file changed, 49 insertions(+), 46 deletions(-) diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index bee6167b848..58274e28d5f 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -373,6 +373,37 @@ enum InstanceKind { Procedure, } +macro_rules! with_instance_impl { + ($self:expr, $kind:expr, $label:expr, $instance_kind:expr, $arg:ident, $wasm:expr, |$host:ident, $timer_guard:ident, $arg_js:ident| $js_body:expr) => {{ + $self.guard_closed()?; + let $timer_guard = $self.start_call_timer($label); + + scopeguard::defer_on_unwind!({ + log::warn!("{} {} panicked", $kind, $label); + ($self.on_panic)(); + }); + + Ok(match &*$self.inner { + ModuleHostInner::Wasm(host) => { + Self::wasm_instance_manager(host, $instance_kind) + .with_instance(async |mut inst| { + host.executor + .run_job(async move || { + drop($timer_guard); + (($wasm)($arg, &mut inst).await, inst) + }) + .await + }) + .await + } + ModuleHostInner::Js($host) => { + let $arg_js = $arg; + $js_body + } + }) + }}; +} + struct V8ModuleHost { module: super::v8::JsModule, instance_lane: super::v8::JsInstanceLane, @@ -1241,32 +1272,18 @@ impl ModuleHost { R: Send + 'static, A: Send + 'static, { - self.guard_closed()?; - let timer_guard = self.start_call_timer(label); - - scopeguard::defer_on_unwind!({ - log::warn!("{kind} {label} panicked"); - (self.on_panic)(); - }); - - Ok(match &*self.inner { - ModuleHostInner::Wasm(host) => { - Self::wasm_instance_manager(host, InstanceKind::Main) - .with_instance(async |mut inst| { - host.executor - .run_job(async move || { - drop(timer_guard); - (wasm(arg, &mut inst).await, inst) - }) - .await - }) - .await - } - ModuleHostInner::Js(host) => { + with_instance_impl!( + self, + kind, + label, + InstanceKind::Main, + arg, + wasm, + |host, timer_guard, arg| { drop(timer_guard); js(arg, &host.instance_lane).await } - }) + ) } async fn with_procedure_instance( @@ -1281,28 +1298,14 @@ impl ModuleHost { R: Send + 'static, A: Send + 'static, { - self.guard_closed()?; - let timer_guard = self.start_call_timer(label); - - scopeguard::defer_on_unwind!({ - log::warn!("{kind} {label} panicked"); - (self.on_panic)(); - }); - - Ok(match &*self.inner { - ModuleHostInner::Wasm(host) => { - Self::wasm_instance_manager(host, InstanceKind::Procedure) - .with_instance(async |mut inst| { - host.executor - .run_job(async move || { - drop(timer_guard); - (wasm(arg, &mut inst).await, inst) - }) - .await - }) - .await - } - ModuleHostInner::Js(host) => { + with_instance_impl!( + self, + kind, + label, + InstanceKind::Procedure, + arg, + wasm, + |host, timer_guard, arg| { host.procedure_instances .with_instance(async |inst| { drop(timer_guard); @@ -1311,7 +1314,7 @@ impl ModuleHost { }) .await } - }) + ) } /// Run a function for this module which has access to the module instance. From c82f362123d48117a2034ef52fea831ff003c6dd Mon Sep 17 00:00:00 2001 From: joshua-spacetime Date: Thu, 30 Apr 2026 17:48:01 -0700 Subject: [PATCH 6/8] bring back call_pooled --- crates/core/src/host/module_host.rs | 29 ++++++++++++++++++++--------- 1 file changed, 20 insertions(+), 9 deletions(-) diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index 58274e28d5f..f939afcb5fd 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -1332,16 +1332,30 @@ impl ModuleHost { R: Send + 'static, A: Send + 'static, { - self.with_main_instance("reducer", label, arg, wasm, async move |arg, lane| { + self.with_main_instance("main-instance operation", label, arg, wasm, async move |arg, lane| { super::v8::assert_not_on_js_module_thread(label); js(arg, lane).await }) .await } + async fn call_pooled( + &self, + label: &str, + arg: A, + wasm: impl AsyncFnOnce(A, &mut ModuleInstance) -> R + Send + 'static, + js: impl AsyncFnOnce(A, &JsInstance) -> R, + ) -> Result + where + R: Send + 'static, + A: Send + 'static, + { + self.with_procedure_instance("pooled operation", label, arg, wasm, js) + .await + } + async fn call_view_command(&self, label: &str, cmd: ViewCommand) -> Result { - self.with_main_instance( - "view operation", + self.call( label, cmd, async |cmd, inst| Ok::<_, ViewCallError>(inst.call_view(cmd)), @@ -1935,8 +1949,7 @@ impl ModuleHost { name: &str, params: CallProcedureParams, ) -> Result { - self.with_procedure_instance( - "pooled operation", + self.call_pooled( name, params, async move |params, inst| inst.call_procedure(params).await, @@ -1949,8 +1962,7 @@ impl ModuleHost { &self, params: ScheduledFunctionParams, ) -> Result { - self.with_main_instance( - "scheduled operation", + self.call( "scheduled reducer", params, async |params, inst| Ok(inst.call_scheduled_function(params).await), @@ -1963,8 +1975,7 @@ impl ModuleHost { &self, params: ScheduledFunctionParams, ) -> Result { - self.with_procedure_instance( - "scheduled operation", + self.call_pooled( "scheduled procedure", params, async |params, inst| Ok(inst.call_scheduled_function(params).await), From d2c9f7835d41c4d47797d1bf9cd7ef5b24e80a46 Mon Sep 17 00:00:00 2001 From: joshua-spacetime Date: Thu, 30 Apr 2026 22:38:07 -0700 Subject: [PATCH 7/8] checkout wasm instance **after** enqueuing job onto SingleCoreExecutor --- crates/core/src/host/module_host.rs | 65 +++++++++++++++++++++-------- 1 file changed, 48 insertions(+), 17 deletions(-) diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index f939afcb5fd..fd5f4dc3f82 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -354,17 +354,25 @@ enum ModuleHostInner { /// Wasm has two instance managers: one for procedures and one for reducers/views. /// +/// Both managers need to be able to create fresh instances from the same compiled module, +/// so they share the module via `Arc`. +/// /// Reducers are executed serially by the `SingleCoreExecutor`, so the instance pool /// contains only a single instance. /// /// This is not the case for procedures which can be executed concurrently. /// -/// Both managers need to be able to create fresh instances from the same compiled module, -/// so they share the module via `Arc`. +/// Note, for reducers it is important that we check out the instance **after** enqueuing +/// the job onto the `SingleCoreExecutor`, otherwise concurrent callers would contend on +/// the instance manager, potentially checking out multiple instances before their +/// execution was serialized. +/// +/// TODO: We should bound the instance pool for reducers so that attempting to checkout +/// more than one instance results in an error. struct WasmtimeModuleHost { executor: SingleCoreExecutor, - main_instance: ModuleInstanceManager>, - procedure_instances: ModuleInstanceManager>, + main_instance: Arc>>, + procedure_instances: Arc>>, } #[derive(Clone, Copy)] @@ -374,10 +382,26 @@ enum InstanceKind { } macro_rules! with_instance_impl { - ($self:expr, $kind:expr, $label:expr, $instance_kind:expr, $arg:ident, $wasm:expr, |$host:ident, $timer_guard:ident, $arg_js:ident| $js_body:expr) => {{ + ( + $self:expr, + $kind:expr, + $label:expr, + $instance_kind:expr, + $arg:ident, + $wasm:expr, + |$host:ident, $timer_guard:ident, $arg_js:ident| $js_body:expr + ) => {{ $self.guard_closed()?; let $timer_guard = $self.start_call_timer($label); + // Operations on module instances (e.g. calling reducers) is blocking, + // partially because the computation can potentially take a long time + // and partially because interacting with the database requires taking + // a blocking lock. So, we run `f` on a dedicated thread with `self.executor`. + // This will bubble up any panic that may occur. + + // If a function call panics, we **must** ensure to call `self.on_panic` + // so that the module is discarded by the host controller. scopeguard::defer_on_unwind!({ log::warn!("{} {} panicked", $kind, $label); ($self.on_panic)(); @@ -385,13 +409,16 @@ macro_rules! with_instance_impl { Ok(match &*$self.inner { ModuleHostInner::Wasm(host) => { - Self::wasm_instance_manager(host, $instance_kind) - .with_instance(async |mut inst| { - host.executor - .run_job(async move || { - drop($timer_guard); - (($wasm)($arg, &mut inst).await, inst) - }) + let executor = host.executor.clone(); + let instance_manager = Self::wasm_instance_manager(host, $instance_kind); + executor + .run_job(async move || { + // Queue first, then check out the instance from inside the queued job. + // This is required for both the serialized main lane and the procedure + // pool so that executor scheduling happens before instance allocation. + drop($timer_guard); + instance_manager + .with_instance(async move |mut inst| (($wasm)($arg, &mut inst).await, inst)) .await }) .await @@ -1114,10 +1141,10 @@ impl ModuleHost { fn wasm_instance_manager( host: &WasmtimeModuleHost, kind: InstanceKind, - ) -> &ModuleInstanceManager> { + ) -> Arc>> { match kind { - InstanceKind::Main => &host.main_instance, - InstanceKind::Procedure => &host.procedure_instances, + InstanceKind::Main => host.main_instance.clone(), + InstanceKind::Procedure => host.procedure_instances.clone(), } } @@ -1136,8 +1163,12 @@ impl ModuleHost { info = module.info(); let module = Arc::new(module); let metrics = InstanceManagerMetrics::new(module.host_type(), database_identity); - let instance_manager = ModuleInstanceManager::new(module.clone(), Some(init_inst), metrics.clone()); - let procedure_instances = ModuleInstanceManager::new(module, None, metrics); + let instance_manager = Arc::new(ModuleInstanceManager::new( + module.clone(), + Some(init_inst), + metrics.clone(), + )); + let procedure_instances = Arc::new(ModuleInstanceManager::new(module, None, metrics)); Arc::new(ModuleHostInner::Wasm(Box::new(WasmtimeModuleHost { executor, main_instance: instance_manager, From fb4b0efb2767127430d8d50e30b94d9d6a6034b9 Mon Sep 17 00:00:00 2001 From: joshua-spacetime Date: Fri, 1 May 2026 10:29:42 -0700 Subject: [PATCH 8/8] pipeline wasm --- crates/core/src/client/client_connection.rs | 188 +-- crates/core/src/client/message_handlers_v1.rs | 42 +- crates/core/src/client/message_handlers_v2.rs | 16 +- crates/core/src/host/module_host.rs | 1011 +++++++++++++---- .../src/host/wasm_common/module_host_actor.rs | 30 + crates/core/src/util/jobs.rs | 21 + crates/testing/src/modules.rs | 12 +- 7 files changed, 911 insertions(+), 409 deletions(-) diff --git a/crates/core/src/client/client_connection.rs b/crates/core/src/client/client_connection.rs index 0a7a7f1a11b..34d337fb973 100644 --- a/crates/core/src/client/client_connection.rs +++ b/crates/core/src/client/client_connection.rs @@ -7,18 +7,14 @@ use std::sync::Arc; use std::task::{Context, Poll}; use std::time::{Instant, SystemTime}; -use super::messages::{OneOffQueryResponseMessage, ProcedureResultMessage}; +use super::messages::OneOffQueryResponseMessage; use super::{message_handlers, ClientActorId, MessageHandleError, OutboundMessage}; use crate::db::relational_db::RelationalDB; use crate::error::DBError; use crate::host::module_host::ClientConnectedError; -use crate::host::{ - CallProcedureReturn, FunctionArgs, ModuleHost, NoSuchModule, ProcedureCallResult, ReducerCallError, - ReducerCallResult, -}; +use crate::host::{FunctionArgs, ModuleHost, NoSuchModule, ReducerCallError}; use crate::subscription::module_subscription_manager::BroadcastError; use crate::subscription::row_list_builder_pool::JsonRowListBuilderFakePool; -use crate::util::asyncify; use crate::util::prometheus_handle::IntGaugeExt; use crate::worker_metrics::WORKER_METRICS; use bytes::Bytes; @@ -30,8 +26,7 @@ use spacetimedb_auth::identity::{ConnectionAuthCtx, SpacetimeIdentityClaims}; use spacetimedb_client_api_messages::websocket::{common as ws_common, v1 as ws_v1, v2 as ws_v2}; use spacetimedb_durability::{DurableOffset, TxOffset}; use spacetimedb_lib::identity::{AuthCtx, RequestId}; -use spacetimedb_lib::metrics::ExecutionMetrics; -use spacetimedb_lib::{bsatn, Identity, TimeDuration, Timestamp}; +use spacetimedb_lib::Identity; use tokio::sync::mpsc::error::{SendError, TrySendError}; use tokio::sync::{mpsc, oneshot, watch}; use tokio::task::AbortHandle; @@ -841,7 +836,7 @@ impl ClientConnection { request_id: RequestId, timer: Instant, flags: ws_v1::CallReducerFlags, - ) -> Result { + ) -> Result<(), ReducerCallError> { let caller = match flags { ws_v1::CallReducerFlags::FullUpdate => Some(self.sender()), // Setting `sender = None` causes `eval_updates` to skip sending to the caller @@ -850,7 +845,7 @@ impl ClientConnection { }; self.module() - .call_reducer( + .call_reducer_ws( self.id.identity, Some(self.id.connection_id), caller, @@ -869,9 +864,9 @@ impl ClientConnection { request_id: RequestId, timer: Instant, _flags: ws_v2::CallReducerFlags, - ) -> Result { + ) -> Result<(), ReducerCallError> { self.module() - .call_reducer( + .call_reducer_ws( self.id.identity, Some(self.id.connection_id), Some(self.sender()), @@ -890,22 +885,17 @@ impl ClientConnection { request_id: RequestId, timer: Instant, ) -> Result<(), BroadcastError> { - let CallProcedureReturn { result, tx_offset } = self - .module() - .call_procedure( + self.module() + .call_procedure_ws( + self.sender(), + request_id, self.id.identity, Some(self.id.connection_id), Some(timer), procedure, args, ) - .await; - - let message = ProcedureResultMessage::from_result(&result, request_id); - - self.module() - .subscriptions() - .send_procedure_message(self.sender(), message, tx_offset) + .await } pub async fn call_procedure_v2( @@ -916,152 +906,58 @@ impl ClientConnection { timer: Instant, _flags: ws_v2::CallProcedureFlags, ) -> Result<(), BroadcastError> { - let CallProcedureReturn { result, tx_offset } = self - .module() - .call_procedure( + self.module() + .call_procedure_ws_v2( + self.sender(), + request_id, self.id.identity, Some(self.id.connection_id), Some(timer), procedure, FunctionArgs::Bsatn(args), ) - .await; - - let (status, timestamp, execution_duration) = match result { - Ok(ProcedureCallResult { - return_val, - execution_duration, - start_timestamp, - }) => ( - ws_v2::ProcedureStatus::Returned( - bsatn::to_vec(&return_val) - .expect("Procedure return value failed to serialize to BSATN") - .into(), - ), - start_timestamp, - TimeDuration::from(execution_duration), - ), - Err(err) => ( - ws_v2::ProcedureStatus::InternalError(err.to_string().into()), - Timestamp::UNIX_EPOCH, - TimeDuration::ZERO, - ), - }; - - let message = ws_v2::ProcedureResult { - status, - timestamp, - total_host_execution_duration: execution_duration, - request_id, - }; - - self.module() - .subscriptions() - .send_procedure_message_v2(self.sender(), message, tx_offset) + .await } - pub async fn subscribe_single( - &self, - subscription: ws_v1::SubscribeSingle, - timer: Instant, - ) -> Result, DBError> { - let me = self.clone(); + pub async fn subscribe_single(&self, subscription: ws_v1::SubscribeSingle, timer: Instant) -> Result<(), DBError> { self.module() - .on_module_thread_async("subscribe_single", async move || { - let host = me.module(); - host.subscriptions() - .add_single_subscription(Some(&host), me.sender, me.auth.clone(), subscription, timer, None) - .await - }) - .await? + .call_view_add_single_subscription_ws(self.sender(), self.auth.clone(), subscription, timer) + .await } - pub async fn unsubscribe( - &self, - request: ws_v1::Unsubscribe, - timer: Instant, - ) -> Result, DBError> { - let me = self.clone(); - asyncify(move || { - me.module() - .subscriptions() - .remove_single_subscription(me.sender, me.auth.clone(), request, timer) - }) - .await + pub async fn unsubscribe(&self, request: ws_v1::Unsubscribe, timer: Instant) -> Result<(), DBError> { + self.module() + .call_view_remove_single_subscription_ws(self.sender(), self.auth.clone(), request, timer) + .await } - pub async fn subscribe_v2( - &self, - request: ws_v2::Subscribe, - timer: Instant, - ) -> Result, DBError> { - let me = self.clone(); + pub async fn subscribe_v2(&self, request: ws_v2::Subscribe, timer: Instant) -> Result<(), DBError> { self.module() - .on_module_thread_async("subscribe_v2", async move || { - let host = me.module(); - host.subscriptions() - .add_v2_subscription(Some(&host), me.sender, me.auth.clone(), request, timer, None) - .await - }) - .await? + .call_view_add_v2_subscription_ws(self.sender(), self.auth.clone(), request, timer) + .await } - pub async fn subscribe_multi( - &self, - request: ws_v1::SubscribeMulti, - timer: Instant, - ) -> Result, DBError> { - let me = self.clone(); + pub async fn subscribe_multi(&self, request: ws_v1::SubscribeMulti, timer: Instant) -> Result<(), DBError> { self.module() - .on_module_thread_async("subscribe_multi", async move || { - let host = me.module(); - host.subscriptions() - .add_multi_subscription(Some(&host), me.sender, me.auth.clone(), request, timer, None) - .await - }) - .await? + .call_view_add_multi_subscription_ws(self.sender(), self.auth.clone(), request, timer) + .await } - pub async fn unsubscribe_multi( - &self, - request: ws_v1::UnsubscribeMulti, - timer: Instant, - ) -> Result, DBError> { - let me = self.clone(); + pub async fn unsubscribe_multi(&self, request: ws_v1::UnsubscribeMulti, timer: Instant) -> Result<(), DBError> { self.module() - .on_module_thread("unsubscribe_multi", move || { - me.module() - .subscriptions() - .remove_multi_subscription(me.sender, me.auth.clone(), request, timer) - }) - .await? + .call_view_remove_multi_subscription_ws(self.sender(), self.auth.clone(), request, timer) + .await } - pub async fn unsubscribe_v2( - &self, - request: ws_v2::Unsubscribe, - timer: Instant, - ) -> Result, DBError> { - let me = self.clone(); + pub async fn unsubscribe_v2(&self, request: ws_v2::Unsubscribe, timer: Instant) -> Result<(), DBError> { self.module() - .on_module_thread_async("unsubscribe_v2", async move || { - let host = me.module(); - host.subscriptions() - .remove_v2_subscription(Some(&host), me.sender, me.auth.clone(), request, timer) - .await - }) - .await? + .call_view_remove_v2_subscription_ws(self.sender(), self.auth.clone(), request, timer) + .await } - pub async fn subscribe(&self, subscription: ws_v1::Subscribe, timer: Instant) -> Result { - let me = self.clone(); + pub async fn subscribe(&self, subscription: ws_v1::Subscribe, timer: Instant) -> Result<(), DBError> { self.module() - .on_module_thread_async("subscribe", async move || { - let host = me.module(); - host.subscriptions() - .add_legacy_subscriber(Some(&host), me.sender, me.auth.clone(), subscription, timer, None) - .await - }) - .await? + .call_view_add_legacy_subscription_ws(self.sender(), self.auth.clone(), subscription, timer) + .await } pub async fn one_off_query_json( @@ -1071,7 +967,7 @@ impl ClientConnection { timer: Instant, ) -> Result<(), anyhow::Error> { self.module() - .one_off_query::( + .one_off_query_ws::( self.auth.clone(), query.to_owned(), self.sender.clone(), @@ -1091,7 +987,7 @@ impl ClientConnection { ) -> Result<(), anyhow::Error> { let bsatn_rlb_pool = self.module().replica_ctx().subscriptions.bsatn_rlb_pool.clone(); self.module() - .one_off_query::( + .one_off_query_ws::( self.auth.clone(), query.to_owned(), self.sender.clone(), @@ -1106,7 +1002,7 @@ impl ClientConnection { pub async fn one_off_query_v2(&self, query: &str, request_id: u32, timer: Instant) -> Result<(), anyhow::Error> { let bsatn_rlb_pool = self.module().replica_ctx().subscriptions.bsatn_rlb_pool.clone(); self.module() - .one_off_query_v2( + .one_off_query_v2_ws( self.auth.clone(), query.to_owned(), self.sender.clone(), diff --git a/crates/core/src/client/message_handlers_v1.rs b/crates/core/src/client/message_handlers_v1.rs index d6cf8fc6257..a6bc7a1c397 100644 --- a/crates/core/src/client/message_handlers_v1.rs +++ b/crates/core/src/client/message_handlers_v1.rs @@ -40,17 +40,6 @@ pub async fn handle(client: &ClientConnection, message: DataMessage, timer: Inst let mod_info = module.info(); let mod_metrics = &mod_info.metrics; let database_identity = mod_info.database_identity; - let db = module.relational_db(); - let record_metrics = |wl| { - move |metrics| { - if let Some(metrics) = metrics { - db.exec_counters_for(wl).record(&metrics); - } - } - }; - let sub_metrics = record_metrics(WorkloadType::Subscribe); - let unsub_metrics = record_metrics(WorkloadType::Unsubscribe); - type HandleResult<'a> = Result<(), (Option<&'a RawIdentifier>, Option, anyhow::Error)>; let res: HandleResult<'_> = match message { ws_v1::ClientMessage::CallReducer(ws_v1::CallReducer { @@ -59,49 +48,52 @@ pub async fn handle(client: &ClientConnection, message: DataMessage, timer: Inst request_id, flags, }) => { - let res = client.call_reducer(reducer, args, request_id, timer, flags).await; + let res = client + .call_reducer(reducer, args, request_id, timer, flags) + .await + .map_err(|e| { + ( + Some(reducer), + mod_info.module_def.reducer_full(&**reducer).map(|(id, _)| id), + e.into(), + ) + }); WORKER_METRICS .request_round_trip .with_label_values(&WorkloadType::Reducer, &database_identity, reducer) .observe(timer.elapsed().as_secs_f64()); - res.map(drop).map_err(|e| { - ( - Some(reducer), - mod_info.module_def.reducer_full(&**reducer).map(|(id, _)| id), - e.into(), - ) - }) + res } ws_v1::ClientMessage::SubscribeMulti(subscription) => { - let res = client.subscribe_multi(subscription, timer).await.map(sub_metrics); + let res = client.subscribe_multi(subscription, timer).await; mod_metrics .request_round_trip_subscribe .observe(timer.elapsed().as_secs_f64()); res.map_err(|e| (None, None, e.into())) } ws_v1::ClientMessage::UnsubscribeMulti(request) => { - let res = client.unsubscribe_multi(request, timer).await.map(unsub_metrics); + let res = client.unsubscribe_multi(request, timer).await; mod_metrics .request_round_trip_unsubscribe .observe(timer.elapsed().as_secs_f64()); res.map_err(|e| (None, None, e.into())) } ws_v1::ClientMessage::SubscribeSingle(subscription) => { - let res = client.subscribe_single(subscription, timer).await.map(sub_metrics); + let res = client.subscribe_single(subscription, timer).await; mod_metrics .request_round_trip_subscribe .observe(timer.elapsed().as_secs_f64()); res.map_err(|e| (None, None, e.into())) } ws_v1::ClientMessage::Unsubscribe(request) => { - let res = client.unsubscribe(request, timer).await.map(unsub_metrics); + let res = client.unsubscribe(request, timer).await; mod_metrics .request_round_trip_unsubscribe .observe(timer.elapsed().as_secs_f64()); res.map_err(|e| (None, None, e.into())) } ws_v1::ClientMessage::Subscribe(subscription) => { - let res = client.subscribe(subscription, timer).await.map(Some).map(sub_metrics); + let res = client.subscribe(subscription, timer).await; mod_metrics .request_round_trip_subscribe .observe(timer.elapsed().as_secs_f64()); @@ -134,7 +126,7 @@ pub async fn handle(client: &ClientConnection, message: DataMessage, timer: Inst if let Err(e) = res { log::warn!("Procedure call failed: {e:#}"); } - // `ClientConnection::call_procedure` handles sending the error message to the client if the call fails, + // `ClientConnection::call_procedure` handles sending the procedure result message itself, // so we don't need to return an `Err` here. Ok(()) } diff --git a/crates/core/src/client/message_handlers_v2.rs b/crates/core/src/client/message_handlers_v2.rs index 2db523e472d..8c918763380 100644 --- a/crates/core/src/client/message_handlers_v2.rs +++ b/crates/core/src/client/message_handlers_v2.rs @@ -32,27 +32,17 @@ pub(super) async fn handle_decoded_message( let mod_info = module.info(); let mod_metrics = &mod_info.metrics; let database_identity = mod_info.database_identity; - let db = module.relational_db(); - let record_metrics = |wl| { - move |metrics| { - if let Some(metrics) = metrics { - db.exec_counters_for(wl).record(&metrics); - } - } - }; - let sub_metrics = record_metrics(WorkloadType::Subscribe); - let unsub_metrics = record_metrics(WorkloadType::Unsubscribe); type HandleResult<'a> = Result<(), (Option<&'a RawIdentifier>, Option, anyhow::Error)>; let res: HandleResult<'_> = match message { ws_v2::ClientMessage::Subscribe(subscribe) => { - let res = client.subscribe_v2(subscribe, timer).await.map(sub_metrics); + let res = client.subscribe_v2(subscribe, timer).await; mod_metrics .request_round_trip_subscribe .observe(timer.elapsed().as_secs_f64()); res.map_err(|e| (None, None, e.into())) } ws_v2::ClientMessage::Unsubscribe(unsubscribe) => { - let res = client.unsubscribe_v2(unsubscribe, timer).await.map(unsub_metrics); + let res = client.unsubscribe_v2(unsubscribe, timer).await; mod_metrics .request_round_trip_unsubscribe .observe(timer.elapsed().as_secs_f64()); @@ -80,7 +70,7 @@ pub(super) async fn handle_decoded_message( .with_label_values(&WorkloadType::Reducer, &database_identity, reducer) .observe(timer.elapsed().as_secs_f64()); match res { - Ok(_) => { + Ok(()) => { // If this was not a success, we would have already sent an error message. Ok(()) } diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index fd5f4dc3f82..b72bdb9ad54 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -1,8 +1,8 @@ use super::{ - ArgsTuple, FunctionArgs, InvalidProcedureArguments, InvalidReducerArguments, ReducerCallResult, ReducerId, - ReducerOutcome, Scheduler, + ArgsTuple, FunctionArgs, InvalidProcedureArguments, InvalidReducerArguments, ProcedureCallResult, + ReducerCallResult, ReducerId, ReducerOutcome, Scheduler, }; -use crate::client::messages::{OneOffQueryResponseMessage, SerializableMessage}; +use crate::client::messages::{OneOffQueryResponseMessage, ProcedureResultMessage, SerializableMessage}; use crate::client::{ClientActorId, ClientConnectionSender}; use crate::database_logger::{DatabaseLogger, LogLevel, Record}; use crate::db::relational_db::RelationalDB; @@ -23,7 +23,7 @@ use crate::sql::ast::SchemaViewer; use crate::sql::execute::SqlResult; use crate::sql::parser::RowLevelExpr; use crate::subscription::module_subscription_actor::ModuleSubscriptions; -pub use crate::subscription::module_subscription_manager::TransactionOffset; +pub use crate::subscription::module_subscription_manager::{BroadcastError, TransactionOffset}; use crate::subscription::tx::DeltaTx; use crate::subscription::websocket_building::{BuildableWebsocketFormat, RowListBuilderSource}; use crate::subscription::{execute_plan, execute_plan_for_view}; @@ -47,7 +47,7 @@ use spacetimedb_data_structures::error_stream::ErrorStream; use spacetimedb_data_structures::map::{HashCollectionExt as _, HashSet}; use spacetimedb_datastore::error::DatastoreError; use spacetimedb_datastore::execution_context::{Workload, WorkloadType}; -use spacetimedb_datastore::locking_tx_datastore::{MutTxId, ViewCallInfo}; +use spacetimedb_datastore::locking_tx_datastore::{MutTxId, TxId, ViewCallInfo}; use spacetimedb_datastore::traits::{IsolationLevel, Program, TxData}; pub use spacetimedb_durability::{DurabilityExited, DurableOffset}; use spacetimedb_execution::pipelined::{PipelinedProject, ViewProject}; @@ -56,7 +56,7 @@ use spacetimedb_expr::expr::CollectViews; use spacetimedb_lib::db::raw_def::v9::Lifecycle; use spacetimedb_lib::identity::{AuthCtx, RequestId}; use spacetimedb_lib::metrics::ExecutionMetrics; -use spacetimedb_lib::{ConnectionId, Timestamp}; +use spacetimedb_lib::{bsatn, ConnectionId, TimeDuration, Timestamp}; use spacetimedb_primitives::{ArgId, ProcedureId, TableId, ViewFnPtr, ViewId}; use spacetimedb_query::compile_subscription; use spacetimedb_sats::raw_identifier::RawIdentifier; @@ -740,12 +740,24 @@ pub enum ViewCommand { request: ws_v1::SubscribeSingle, _timer: Instant, }, + RemoveSingleSubscription { + sender: Arc, + auth: AuthCtx, + request: ws_v1::Unsubscribe, + timer: Instant, + }, AddMultiSubscription { sender: Arc, auth: AuthCtx, request: ws_v1::SubscribeMulti, _timer: Instant, }, + RemoveMultiSubscription { + sender: Arc, + auth: AuthCtx, + request: ws_v1::UnsubscribeMulti, + timer: Instant, + }, AddLegacySubscription { sender: Arc, auth: AuthCtx, @@ -1291,6 +1303,62 @@ impl ModuleHost { }) } + fn enqueue_wasm_job(&self, kind: &str, label: &str, f: F) -> Result<(), NoSuchModule> + where + F: AsyncFnOnce() + Send + 'static, + { + self.guard_closed()?; + + let timer_guard = self.start_call_timer(label); + let kind = kind.to_owned(); + let label = label.to_owned(); + let on_panic = self.on_panic.clone(); + + match &*self.inner { + ModuleHostInner::Wasm(host) => { + let executor = host.executor.clone(); + executor.enqueue_job(async move || { + scopeguard::defer_on_unwind!({ + log::warn!("{} {} panicked", kind, label); + on_panic(); + }); + + drop(timer_guard); + f().await; + }); + Ok(()) + } + ModuleHostInner::Js(_) => unreachable!("enqueue_wasm_job should only be used for wasm"), + } + } + + fn enqueue_wasm_instance( + &self, + kind: &str, + label: &str, + instance_kind: InstanceKind, + arg: A, + wasm: impl AsyncFnOnce(A, &mut ModuleInstance) + Send + 'static, + ) -> Result<(), NoSuchModule> + where + A: Send + 'static, + { + match &*self.inner { + ModuleHostInner::Wasm(host) => { + let instance_manager = Self::wasm_instance_manager(host, instance_kind); + self.enqueue_wasm_job(kind, label, async move || { + instance_manager + .with_instance(async move |mut inst| { + wasm(arg, &mut inst).await; + ((), inst) + }) + .await; + }) + } + ModuleHostInner::Js(_) => unreachable!("enqueue_wasm_instance should only be used for wasm"), + } + } + async fn with_main_instance( &self, kind: &str, @@ -1395,6 +1463,52 @@ impl ModuleHost { .await? } + async fn call_view_command_ws(&self, label: &str, cmd: ViewCommand) -> Result<(), ViewCallError> { + match &*self.inner { + ModuleHostInner::Wasm(_) => { + match self.enqueue_wasm_instance( + "main-instance operation", + label, + InstanceKind::Main, + cmd, + async |cmd, inst| { + let _ = inst.call_view(cmd); + }, + ) { + Ok(()) => Ok(()), + Err(err) => Err(err.into()), + } + } + ModuleHostInner::Js(_) => self.call_view_command(label, cmd).await.map(drop), + } + } + + fn unwrap_subscription_command_result( + label: &str, + res: Result, + ) -> Result, DBError> { + match res.map_err(|e| DBError::Other(anyhow::anyhow!(e)))? { + ViewCommandResult::Subscription { result } => result, + ViewCommandResult::Sql { .. } => { + unreachable!("unexpected SQL result in {label}") + } + } + } + + async fn call_subscription_command( + &self, + label: &str, + cmd: ViewCommand, + ) -> Result, DBError> { + Self::unwrap_subscription_command_result(label, self.call_view_command(label, cmd).await) + } + + async fn call_subscription_command_ws(&self, label: &str, cmd: ViewCommand) -> Result<(), DBError> { + self.call_view_command_ws(label, cmd) + .await + .map_err(|e| DBError::Other(anyhow::anyhow!(e))) + } + pub async fn disconnect_client(&self, client_id: ClientActorId) { log::trace!("disconnecting client {client_id}"); if let Err(e) = self @@ -1636,6 +1750,30 @@ impl ModuleHost { }) } + fn call_procedure_params( + module: &ModuleInfo, + caller_identity: Identity, + caller_connection_id: Option, + timer: Option, + procedure_id: ProcedureId, + procedure_def: &ProcedureDef, + args: FunctionArgs, + ) -> Result { + let args = args + .into_tuple_for_def(&module.module_def, procedure_def) + .map_err(InvalidProcedureArguments)?; + let caller_connection_id = caller_connection_id.unwrap_or(ConnectionId::ZERO); + + Ok(CallProcedureParams { + timestamp: Timestamp::now(), + caller_identity, + caller_connection_id, + timer, + procedure_id, + args, + }) + } + async fn call_reducer_inner( &self, caller_identity: Identity, @@ -1721,6 +1859,82 @@ impl ModuleHost { res } + pub async fn call_reducer_ws( + &self, + caller_identity: Identity, + caller_connection_id: Option, + client: Option>, + request_id: Option, + timer: Option, + reducer_name: &str, + args: FunctionArgs, + ) -> Result<(), ReducerCallError> { + match &*self.inner { + ModuleHostInner::Js(_) => self + .call_reducer( + caller_identity, + caller_connection_id, + client, + request_id, + timer, + reducer_name, + args, + ) + .await + .map(drop), + ModuleHostInner::Wasm(_) => { + let res = (|| { + let (reducer_id, reducer_def) = self + .info + .module_def + .reducer_full(reducer_name) + .ok_or(ReducerCallError::NoSuchReducer)?; + if let Some(lifecycle) = reducer_def.lifecycle { + return Err(ReducerCallError::LifecycleReducer(lifecycle)); + } + + if reducer_def.visibility.is_private() && !self.is_database_owner(caller_identity) { + return Err(ReducerCallError::NoSuchReducer); + } + + let params = Self::call_reducer_params( + &self.info, + caller_identity, + caller_connection_id, + client, + request_id, + timer, + reducer_id, + reducer_def, + args, + )?; + + self.enqueue_wasm_instance( + "main-instance operation", + &reducer_def.name, + InstanceKind::Main, + params, + async |params, inst| { + let _ = inst.call_reducer(params); + }, + )?; + Ok(()) + })(); + + let log_message = match &res { + Err(ReducerCallError::NoSuchReducer) => Some(no_such_function_log_message("reducer", reducer_name)), + Err(ReducerCallError::Args(_)) => Some(args_error_log_message("reducer", reducer_name)), + _ => None, + }; + if let Some(log_message) = log_message { + self.inject_logs(LogLevel::Error, reducer_name, &log_message) + } + + res + } + } + } + pub async fn call_view_add_single_subscription( &self, sender: Arc, @@ -1734,19 +1948,25 @@ impl ModuleHost { request, _timer: timer, }; - - let res = self - .call_view_command("call_view_add_single_subscription", cmd) + self.call_subscription_command("call_view_add_single_subscription", cmd) .await - //TODO: handle error better - .map_err(|e| DBError::Other(anyhow::anyhow!(e)))?; + } - match res { - ViewCommandResult::Subscription { result } => result, - ViewCommandResult::Sql { .. } => { - unreachable!("unexpected SQL result in call_view_add_single_subscription") - } - } + pub async fn call_view_add_single_subscription_ws( + &self, + sender: Arc, + auth: AuthCtx, + request: ws_v1::SubscribeSingle, + timer: Instant, + ) -> Result<(), DBError> { + let cmd = ViewCommand::AddSingleSubscription { + sender, + auth, + request, + _timer: timer, + }; + self.call_subscription_command_ws("call_view_add_single_subscription", cmd) + .await } pub async fn call_view_add_v2_subscription( @@ -1762,19 +1982,25 @@ impl ModuleHost { request, _timer: timer, }; - - let res = self - .call_view_command("call_view_add_multi_subscription", cmd) + self.call_subscription_command("call_view_add_multi_subscription", cmd) .await - //TODO: handle error better - .map_err(|e| DBError::Other(anyhow::anyhow!(e)))?; + } - match res { - ViewCommandResult::Subscription { result } => result, - ViewCommandResult::Sql { .. } => { - unreachable!("unexpected SQL result in call_view_add_single_subscription") - } - } + pub async fn call_view_add_v2_subscription_ws( + &self, + sender: Arc, + auth: AuthCtx, + request: ws_v2::Subscribe, + timer: Instant, + ) -> Result<(), DBError> { + let cmd = ViewCommand::AddSubscriptionV2 { + sender, + auth, + request, + _timer: timer, + }; + self.call_subscription_command_ws("call_view_add_multi_subscription", cmd) + .await } pub async fn call_view_remove_v2_subscription( @@ -1790,18 +2016,25 @@ impl ModuleHost { request, timer, }; - - let res = self - .call_view_command("call_view_remove_v2_subscription", cmd) + self.call_subscription_command("call_view_remove_v2_subscription", cmd) .await - .map_err(|e| DBError::Other(anyhow::anyhow!(e)))?; + } - match res { - ViewCommandResult::Subscription { result } => result, - ViewCommandResult::Sql { .. } => { - unreachable!("unexpected SQL result in call_view_remove_v2_subscription") - } - } + pub async fn call_view_remove_v2_subscription_ws( + &self, + sender: Arc, + auth: AuthCtx, + request: ws_v2::Unsubscribe, + timer: Instant, + ) -> Result<(), DBError> { + let cmd = ViewCommand::RemoveSubscriptionV2 { + sender, + auth, + request, + timer, + }; + self.call_subscription_command_ws("call_view_remove_v2_subscription", cmd) + .await } pub async fn call_view_add_multi_subscription( @@ -1817,19 +2050,59 @@ impl ModuleHost { request, _timer: timer, }; + self.call_subscription_command("call_view_add_multi_subscription", cmd) + .await + } - let res = self - .call_view_command("call_view_add_multi_subscription", cmd) + pub async fn call_view_remove_single_subscription( + &self, + sender: Arc, + auth: AuthCtx, + request: ws_v1::Unsubscribe, + timer: Instant, + ) -> Result, DBError> { + let cmd = ViewCommand::RemoveSingleSubscription { + sender, + auth, + request, + timer, + }; + self.call_subscription_command("call_view_remove_single_subscription", cmd) .await - //TODO: handle error better - .map_err(|e| DBError::Other(anyhow::anyhow!(e)))?; + } - match res { - ViewCommandResult::Subscription { result } => result, - ViewCommandResult::Sql { .. } => { - unreachable!("unexpected SQL result in call_view_add_single_subscription") - } - } + pub async fn call_view_remove_single_subscription_ws( + &self, + sender: Arc, + auth: AuthCtx, + request: ws_v1::Unsubscribe, + timer: Instant, + ) -> Result<(), DBError> { + let cmd = ViewCommand::RemoveSingleSubscription { + sender, + auth, + request, + timer, + }; + self.call_subscription_command_ws("call_view_remove_single_subscription", cmd) + .await + } + + pub async fn call_view_add_multi_subscription_ws( + &self, + sender: Arc, + auth: AuthCtx, + request: ws_v1::SubscribeMulti, + timer: Instant, + ) -> Result<(), DBError> { + let cmd = ViewCommand::AddMultiSubscription { + sender, + auth, + request, + _timer: timer, + }; + self.call_subscription_command_ws("call_view_add_multi_subscription", cmd) + .await } pub async fn call_view_add_legacy_subscription( @@ -1845,19 +2118,59 @@ impl ModuleHost { subscribe, _timer: timer, }; + self.call_subscription_command("call_view_add_legacy_subscription", cmd) + .await + } - let res = self - .call_view_command("call_view_add_legacy_subscription", cmd) + pub async fn call_view_remove_multi_subscription( + &self, + sender: Arc, + auth: AuthCtx, + request: ws_v1::UnsubscribeMulti, + timer: Instant, + ) -> Result, DBError> { + let cmd = ViewCommand::RemoveMultiSubscription { + sender, + auth, + request, + timer, + }; + self.call_subscription_command("call_view_remove_multi_subscription", cmd) .await - //TODO: handle error better - .map_err(|e| DBError::Other(anyhow::anyhow!(e)))?; + } - match res { - ViewCommandResult::Subscription { result } => result, - ViewCommandResult::Sql { .. } => { - unreachable!("unexpected SQL result in call_view_add_single_subscription") - } - } + pub async fn call_view_remove_multi_subscription_ws( + &self, + sender: Arc, + auth: AuthCtx, + request: ws_v1::UnsubscribeMulti, + timer: Instant, + ) -> Result<(), DBError> { + let cmd = ViewCommand::RemoveMultiSubscription { + sender, + auth, + request, + timer, + }; + self.call_subscription_command_ws("call_view_remove_multi_subscription", cmd) + .await + } + + pub async fn call_view_add_legacy_subscription_ws( + &self, + sender: Arc, + auth: AuthCtx, + subscribe: ws_v1::Subscribe, + timer: Instant, + ) -> Result<(), DBError> { + let cmd = ViewCommand::AddLegacySubscription { + sender, + auth, + subscribe, + _timer: timer, + }; + self.call_subscription_command_ws("call_view_add_legacy_subscription", cmd) + .await } pub async fn call_view_sql( @@ -1944,6 +2257,149 @@ impl ModuleHost { ret } + pub async fn call_procedure_ws( + &self, + sender: Arc, + request_id: RequestId, + caller_identity: Identity, + caller_connection_id: Option, + timer: Option, + procedure_name: &str, + args: FunctionArgs, + ) -> Result<(), BroadcastError> { + let sender = sender.clone(); + self.call_procedure_ws_impl( + caller_identity, + caller_connection_id, + timer, + procedure_name, + args, + move |subscriptions, ret| { + let message = ProcedureResultMessage::from_result(&ret.result, request_id); + subscriptions.send_procedure_message(sender.clone(), message, ret.tx_offset) + }, + ) + .await + } + + pub async fn call_procedure_ws_v2( + &self, + sender: Arc, + request_id: RequestId, + caller_identity: Identity, + caller_connection_id: Option, + timer: Option, + procedure_name: &str, + args: FunctionArgs, + ) -> Result<(), BroadcastError> { + let sender = sender.clone(); + self.call_procedure_ws_impl( + caller_identity, + caller_connection_id, + timer, + procedure_name, + args, + move |subscriptions, ret| { + let message = Self::procedure_result_message_v2(ret.result, request_id); + subscriptions.send_procedure_message_v2(sender.clone(), message, ret.tx_offset) + }, + ) + .await + } + + async fn call_procedure_ws_impl( + &self, + caller_identity: Identity, + caller_connection_id: Option, + timer: Option, + procedure_name: &str, + args: FunctionArgs, + send_result: F, + ) -> Result<(), BroadcastError> + where + F: Fn(ModuleSubscriptions, CallProcedureReturn) -> Result<(), BroadcastError> + Clone + Send + 'static, + { + let call_return_err = |err| CallProcedureReturn { + result: Err(err), + tx_offset: None, + }; + + if let ModuleHostInner::Js(_) = &*self.inner { + let ret = self + .call_procedure(caller_identity, caller_connection_id, timer, procedure_name, args) + .await; + return send_result(self.subscriptions().clone(), ret); + } + + if let Err(err) = self.guard_closed() { + let ret = call_return_err(err.into()); + let subscriptions = self.subscriptions().clone(); + return send_result(subscriptions, ret); + } + + let (procedure_id, procedure_def) = match self.info.module_def.procedure_full(procedure_name) { + Some(procedure) => procedure, + None => { + let ret = call_return_err(ProcedureCallError::NoSuchProcedure); + let subscriptions = self.subscriptions().clone(); + return send_result(subscriptions, ret); + } + }; + + if procedure_def.visibility.is_private() && !self.is_database_owner(caller_identity) { + let ret = call_return_err(ProcedureCallError::NoSuchProcedure); + let subscriptions = self.subscriptions().clone(); + return send_result(subscriptions, ret); + } + + let params = match Self::call_procedure_params( + &self.info, + caller_identity, + caller_connection_id, + timer, + procedure_id, + procedure_def, + args, + ) { + Ok(params) => params, + Err(err) => { + let ret = call_return_err(err.into()); + let subscriptions = self.subscriptions().clone(); + return send_result(subscriptions, ret); + } + }; + + match &*self.inner { + ModuleHostInner::Wasm(_) => { + let subscriptions = self.subscriptions().clone(); + let procedure_label = procedure_def.name.to_string(); + let send_result_in_job = send_result.clone(); + let enqueue_res = self.enqueue_wasm_instance( + "pooled operation", + &procedure_label, + InstanceKind::Procedure, + (subscriptions, params), + async move |(subscriptions, params), inst| { + let ret = inst.call_procedure(params).await; + if let Err(err) = send_result_in_job(subscriptions, ret) { + log::warn!("Procedure call failed: {err:#}"); + } + }, + ); + + match enqueue_res { + Ok(()) => Ok(()), + Err(err) => { + let ret = call_return_err(err.into()); + let subscriptions = self.subscriptions().clone(); + send_result(subscriptions, ret) + } + } + } + ModuleHostInner::Js(_) => unreachable!("handled before wasm-specific procedure setup"), + } + } + async fn call_procedure_inner( &self, caller_identity: Identity, @@ -1970,6 +2426,39 @@ impl ModuleHost { Ok(self.call_procedure_with_params(&procedure_def.name, params).await?) } + fn procedure_result_message_v2( + result: Result, + request_id: RequestId, + ) -> ws_v2::ProcedureResult { + let (status, timestamp, execution_duration) = match result { + Ok(ProcedureCallResult { + return_val, + execution_duration, + start_timestamp, + }) => ( + ws_v2::ProcedureStatus::Returned( + bsatn::to_vec(&return_val) + .expect("Procedure return value failed to serialize to BSATN") + .into(), + ), + start_timestamp, + TimeDuration::from(execution_duration), + ), + Err(err) => ( + ws_v2::ProcedureStatus::InternalError(err.to_string().into()), + Timestamp::UNIX_EPOCH, + TimeDuration::ZERO, + ), + }; + + ws_v2::ProcedureResult { + status, + timestamp, + total_host_execution_duration: execution_duration, + request_id, + } + } + //TODO(shub) #4195: Also allow for collaborators along with owner fn is_database_owner(&self, caller_identity: Identity) -> bool { self.info.owner_identity == caller_identity @@ -2295,100 +2784,17 @@ impl ModuleHost { log::debug!("One-off query: {query}"); let metrics = self .on_module_thread("one_off_query", move || { - let (tx_offset_sender, tx_offset_receiver) = oneshot::channel(); - let tx = scopeguard::guard(db.begin_tx(Workload::Sql), |tx| { - let (tx_offset, tx_metrics, reducer) = db.release_tx(tx); - let _ = tx_offset_sender.send(tx_offset); - db.report_read_tx_metrics(reducer, tx_metrics); - }); - - // We wrap the actual query in a closure so we can use ? to handle errors without making - // the entire transaction abort with an error. - let result: Result<(ws_v1::OneOffTable, ExecutionMetrics), anyhow::Error> = (|| { - let tx = SchemaViewer::new(&*tx, &auth); - - let ( - // A query may compile down to several plans. - // This happens when there are multiple RLS rules per table. - // The original query is the union of these plans. - plans, - _, - table_name, - _, - ) = compile_subscription(&query, &tx, &auth)?; - - // Optimize each fragment - let optimized = plans - .into_iter() - .map(|plan| plan.optimize(&auth)) - .collect::, _>>()?; - - check_row_limit( - &optimized, - &db, - &tx, - // Estimate the number of rows this query will scan - |plan, tx| estimate_rows_scanned(tx, plan), - &auth, - )?; - - let return_table = || optimized.first().and_then(|plan| plan.return_table()); - - let returns_view_table = optimized.first().is_some_and(|plan| plan.returns_view_table()); - let num_cols = return_table().map(|schema| schema.num_cols()).unwrap_or_default(); - let num_private_cols = return_table() - .map(|schema| schema.num_private_cols()) - .unwrap_or_default(); - - let optimized = optimized - .into_iter() - // Convert into something we can execute - .map(PipelinedProject::from) - .collect::>(); - - let table_name = table_name.into(); - - if returns_view_table && num_private_cols > 0 { - let optimized = optimized - .into_iter() - .map(|plan| ViewProject::new(plan, num_cols, num_private_cols)) - .collect::>(); - // Execute the union and return the results - return execute_plan_for_view::(&optimized, &DeltaTx::from(&*tx), &rlb_pool) - .map(|(rows, _, metrics)| (ws_v1::OneOffTable { table_name, rows }, metrics)) - .context("One-off queries are not allowed to modify the database"); - } - - // Execute the union and return the results - execute_plan::(&optimized, &DeltaTx::from(&*tx), &rlb_pool) - .map(|(rows, _, metrics)| (ws_v1::OneOffTable { table_name, rows }, metrics)) - .context("One-off queries are not allowed to modify the database") - })(); - - let total_host_execution_duration = timer.elapsed().into(); - let (message, metrics): (SerializableMessage, Option) = match result { - Ok((rows, metrics)) => ( - into_message(OneOffQueryResponseMessage { - message_id, - error: None, - results: vec![rows], - total_host_execution_duration, - }), - Some(metrics), - ), - Err(err) => ( - into_message(OneOffQueryResponseMessage { - message_id, - error: Some(format!("{err}")), - results: vec![], - total_host_execution_duration, - }), - None, - ), - }; - - subscriptions.send_client_message(client, message, (&*tx, tx_offset_receiver))?; - Ok::, anyhow::Error>(metrics) + Self::one_off_query_inner( + db, + subscriptions, + auth, + query, + client, + message_id, + timer, + rlb_pool, + into_message, + ) }) .await??; @@ -2402,6 +2808,47 @@ impl ModuleHost { Ok(()) } + pub async fn one_off_query_ws( + &self, + auth: AuthCtx, + query: String, + client: Arc, + message_id: Vec, + timer: Instant, + rlb_pool: impl 'static + Send + RowListBuilderSource, + into_message: impl FnOnce(OneOffQueryResponseMessage) -> SerializableMessage + Send + 'static, + ) -> Result<(), anyhow::Error> { + match &*self.inner { + ModuleHostInner::Js(_) => { + self.one_off_query(auth, query, client, message_id, timer, rlb_pool, into_message) + .await + } + ModuleHostInner::Wasm(_) => { + let db = self.relational_db().clone(); + let subscriptions = self.replica_ctx().subscriptions.clone(); + log::debug!("One-off query: {query}"); + match self.enqueue_wasm_job("module-thread operation", "one_off_query", async move || { + if let Err(err) = Self::one_off_query_inner( + db, + subscriptions, + auth, + query, + client, + message_id, + timer, + rlb_pool, + into_message, + ) { + log::warn!("One-off query failed: {err:#}"); + } + }) { + Ok(()) => Ok(()), + Err(err) => Err(err.into()), + } + } + } + } + /// Execute a one-off query for v2 clients and send the results to the given client. /// /// This only returns an error if there is a db-level problem. @@ -2435,6 +2882,79 @@ impl ModuleHost { Ok(()) } + pub async fn one_off_query_v2_ws( + &self, + auth: AuthCtx, + query: String, + client: Arc, + request_id: u32, + timer: Instant, + rlb_pool: impl 'static + Send + RowListBuilderSource, + ) -> Result<(), anyhow::Error> { + match &*self.inner { + ModuleHostInner::Js(_) => { + self.one_off_query_v2(auth, query, client, request_id, timer, rlb_pool) + .await + } + ModuleHostInner::Wasm(_) => { + let db = self.relational_db().clone(); + let subscriptions = self.replica_ctx().subscriptions.clone(); + log::debug!("One-off query: {query}"); + match self.enqueue_wasm_job("module-thread operation", "one_off_query_v2", async move || { + if let Err(err) = + Self::one_off_query_v2_inner(db, subscriptions, auth, query, client, request_id, rlb_pool) + { + log::warn!("One-off query failed: {err:#}"); + } + }) { + Ok(()) => Ok(()), + Err(err) => Err(err.into()), + } + } + } + } + + fn one_off_query_inner( + db: Arc, + subscriptions: ModuleSubscriptions, + auth: AuthCtx, + query: String, + client: Arc, + message_id: Vec, + timer: Instant, + rlb_pool: impl 'static + Send + RowListBuilderSource, + into_message: impl FnOnce(OneOffQueryResponseMessage) -> SerializableMessage, + ) -> Result, anyhow::Error> { + Self::send_one_off_query_result( + db, + subscriptions, + auth, + query, + client, + rlb_pool, + |table_name, rows| ws_v1::OneOffTable { table_name, rows }, + move |subscriptions, client, result, tx_offset, tx| { + let total_host_execution_duration = timer.elapsed().into(); + let message = match result { + Ok(rows) => into_message(OneOffQueryResponseMessage { + message_id, + error: None, + results: vec![rows], + total_host_execution_duration, + }), + Err(err) => into_message(OneOffQueryResponseMessage { + message_id, + error: Some(err), + results: vec![], + total_host_execution_duration, + }), + }; + + subscriptions.send_client_message(client, message, (tx, tx_offset)) + }, + ) + } + fn one_off_query_v2_inner( db: Arc, subscriptions: ModuleSubscriptions, @@ -2444,6 +2964,57 @@ impl ModuleHost { request_id: u32, rlb_pool: impl 'static + Send + RowListBuilderSource, ) -> Result, anyhow::Error> { + Self::send_one_off_query_result( + db, + subscriptions, + auth, + query, + client, + rlb_pool, + |table_name, rows| ws_v2::SingleTableRows { + table: table_name, + rows, + }, + move |subscriptions, client, result, tx_offset, _tx| { + let message = match result { + Ok(rows) => ws_v2::OneOffQueryResult { + request_id, + result: Ok(ws_v2::QueryRows { + tables: vec![rows].into_boxed_slice(), + }), + }, + Err(err) => ws_v2::OneOffQueryResult { + request_id, + result: Err(err.into()), + }, + }; + + subscriptions.send_one_off_query_message_v2(client, message, tx_offset) + }, + ) + } + + fn send_one_off_query_result( + db: Arc, + subscriptions: ModuleSubscriptions, + auth: AuthCtx, + query: String, + client: Arc, + rlb_pool: impl 'static + Send + RowListBuilderSource, + wrap_rows: WrapRows, + send_result: SendResult, + ) -> Result, anyhow::Error> + where + F: BuildableWebsocketFormat, + WrapRows: Fn(RawIdentifier, F::List) -> T, + SendResult: FnOnce( + ModuleSubscriptions, + Arc, + Result, + TransactionOffset, + &TxId, + ) -> Result<(), BroadcastError>, + { let (tx_offset_sender, tx_offset_receiver) = oneshot::channel(); let tx = scopeguard::guard(db.begin_tx(Workload::Sql), |tx| { let (tx_offset, tx_metrics, reducer) = db.release_tx(tx); @@ -2451,82 +3022,76 @@ impl ModuleHost { db.report_read_tx_metrics(reducer, tx_metrics); }); - let result: Result<(ws_v2::SingleTableRows, ExecutionMetrics), anyhow::Error> = (|| { - let tx = SchemaViewer::new(&*tx, &auth); + let (result, metrics) = match Self::execute_one_off_query(&db, &tx, &auth, &query, &rlb_pool, wrap_rows) { + Ok((rows, metrics)) => (Ok(rows), Some(metrics)), + Err(err) => (Err(err.to_string()), None), + }; - let (plans, _, table_name, _) = compile_subscription(&query, &tx, &auth)?; + send_result(subscriptions, client, result, tx_offset_receiver, &tx)?; + Ok(metrics) + } - let optimized = plans - .into_iter() - .map(|plan| plan.optimize(&auth)) - .collect::, _>>()?; - - check_row_limit(&optimized, &db, &tx, |plan, tx| estimate_rows_scanned(tx, plan), &auth)?; - - let return_table = || optimized.first().and_then(|plan| plan.return_table()); - - let returns_view_table = optimized.first().is_some_and(|plan| plan.returns_view_table()); - let num_cols = return_table().map(|schema| schema.num_cols()).unwrap_or_default(); - let num_private_cols = return_table() - .map(|schema| schema.num_private_cols()) - .unwrap_or_default(); - - let optimized = optimized.into_iter().map(PipelinedProject::from).collect::>(); - - let table_name = table_name.into(); - - if returns_view_table && num_private_cols > 0 { - let optimized = optimized - .into_iter() - .map(|plan| ViewProject::new(plan, num_cols, num_private_cols)) - .collect::>(); - return execute_plan_for_view::(&optimized, &DeltaTx::from(&*tx), &rlb_pool) - .map(|(rows, _, metrics)| { - ( - ws_v2::SingleTableRows { - table: table_name, - rows, - }, - metrics, - ) - }) - .context("One-off queries are not allowed to modify the database"); - } + fn execute_one_off_query( + db: &Arc, + tx: &TxId, + auth: &AuthCtx, + query: &str, + rlb_pool: &impl RowListBuilderSource, + wrap_rows: WrapRows, + ) -> Result<(T, ExecutionMetrics), anyhow::Error> + where + F: BuildableWebsocketFormat, + WrapRows: Fn(RawIdentifier, F::List) -> T, + { + let schema_viewer = SchemaViewer::new(tx, auth); + + let ( + // A query may compile down to several plans. + // This happens when there are multiple RLS rules per table. + // The original query is the union of these plans. + plans, + _, + table_name, + _, + ) = compile_subscription(query, &schema_viewer, auth)?; + + let optimized = plans + .into_iter() + .map(|plan| plan.optimize(auth)) + .collect::, _>>()?; + + check_row_limit( + &optimized, + db, + &schema_viewer, + |plan, tx| estimate_rows_scanned(tx, plan), + auth, + )?; - execute_plan::(&optimized, &DeltaTx::from(&*tx), &rlb_pool) - .map(|(rows, _, metrics)| { - ( - ws_v2::SingleTableRows { - table: table_name, - rows, - }, - metrics, - ) - }) - .context("One-off queries are not allowed to modify the database") - })(); + let return_table = || optimized.first().and_then(|plan| plan.return_table()); - let (message, metrics) = match result { - Ok((rows, metrics)) => ( - ws_v2::OneOffQueryResult { - request_id, - result: Ok(ws_v2::QueryRows { - tables: vec![rows].into_boxed_slice(), - }), - }, - Some(metrics), - ), - Err(err) => ( - ws_v2::OneOffQueryResult { - request_id, - result: Err(err.to_string().into()), - }, - None, - ), - }; + let returns_view_table = optimized.first().is_some_and(|plan| plan.returns_view_table()); + let num_cols = return_table().map(|schema| schema.num_cols()).unwrap_or_default(); + let num_private_cols = return_table() + .map(|schema| schema.num_private_cols()) + .unwrap_or_default(); - subscriptions.send_one_off_query_message_v2(client, message, tx_offset_receiver)?; - Ok(metrics) + let optimized = optimized.into_iter().map(PipelinedProject::from).collect::>(); + let table_name = table_name.into(); + + if returns_view_table && num_private_cols > 0 { + let optimized = optimized + .into_iter() + .map(|plan| ViewProject::new(plan, num_cols, num_private_cols)) + .collect::>(); + return execute_plan_for_view::(&optimized, &DeltaTx::from(tx), rlb_pool) + .map(|(rows, _, metrics)| (wrap_rows(table_name, rows), metrics)) + .context("One-off queries are not allowed to modify the database"); + } + + execute_plan::(&optimized, &DeltaTx::from(tx), rlb_pool) + .map(|(rows, _, metrics)| (wrap_rows(table_name, rows), metrics)) + .context("One-off queries are not allowed to modify the database") } /// FIXME(jgilles): this is a temporary workaround for deleting not currently being supported diff --git a/crates/core/src/host/wasm_common/module_host_actor.rs b/crates/core/src/host/wasm_common/module_host_actor.rs index 3b709fc433e..501976cad1b 100644 --- a/crates/core/src/host/wasm_common/module_host_actor.rs +++ b/crates/core/src/host/wasm_common/module_host_actor.rs @@ -1056,6 +1056,21 @@ impl InstanceCommon { Err(err) => (ViewCommandResult::Subscription { result: Err(err) }, false), } } + ViewCommand::RemoveSingleSubscription { + sender, + auth, + request, + timer, + } => { + let res = info + .subscriptions + .remove_single_subscription(sender, auth, request, timer); + + match res { + Ok(metrics) => (ViewCommandResult::Subscription { result: Ok(metrics) }, false), + Err(err) => (ViewCommandResult::Subscription { result: Err(err) }, false), + } + } ViewCommand::AddLegacySubscription { sender, auth, @@ -1121,6 +1136,21 @@ impl InstanceCommon { Err(err) => (ViewCommandResult::Subscription { result: Err(err) }, false), } } + ViewCommand::RemoveMultiSubscription { + sender, + auth, + request, + timer, + } => { + let res = info + .subscriptions + .remove_multi_subscription(sender, auth, request, timer); + + match res { + Ok(metrics) => (ViewCommandResult::Subscription { result: Ok(metrics) }, false), + Err(err) => (ViewCommandResult::Subscription { result: Err(err) }, false), + } + } ViewCommand::Sql { db, sql_text, diff --git a/crates/core/src/util/jobs.rs b/crates/core/src/util/jobs.rs index 6f4f6b8bf73..403d7c560f7 100644 --- a/crates/core/src/util/jobs.rs +++ b/crates/core/src/util/jobs.rs @@ -337,6 +337,27 @@ impl SingleCoreExecutor { Self::spawn(AllocatedJobCore::default()) } + /// Enqueue a job on this database executor and return immediately without + /// waiting for the job to finish. + pub fn enqueue_job(&self, f: F) + where + F: AsyncFnOnce() + Send + 'static, + { + let span = tracing::Span::current(); + + self.inner + .job_tx + .send(Box::new(move || { + async move { + if AssertUnwindSafe(f().instrument(span)).catch_unwind().await.is_err() { + tracing::warn!("uncaught panic on `SingleCoreExecutor`") + } + } + .boxed_local() + })) + .unwrap_or_else(|_| panic!("job thread exited")); + } + /// Run a job for this database executor. pub async fn run_job(&self, f: F) -> R where diff --git a/crates/testing/src/modules.rs b/crates/testing/src/modules.rs index 318fff8d0cf..caf6ac1feb1 100644 --- a/crates/testing/src/modules.rs +++ b/crates/testing/src/modules.rs @@ -22,7 +22,6 @@ use spacetimedb::client::{ClientActorId, ClientConfig, ClientConnection, DataMes use spacetimedb::db::{Config, Storage}; use spacetimedb::host::FunctionArgs; use spacetimedb_client_api::{ControlStateReadAccess, ControlStateWriteAccess, DatabaseDef, NodeDelegate}; -use spacetimedb_client_api_messages::websocket::v1 as ws_v1; use spacetimedb_lib::{bsatn, sats}; pub use spacetimedb::database_logger::LogLevel; @@ -59,7 +58,16 @@ impl ModuleHandle { async fn call_reducer(&self, reducer: &str, args: FunctionArgs) -> anyhow::Result<()> { let result = self .client - .call_reducer(reducer, args, 0, Instant::now(), ws_v1::CallReducerFlags::FullUpdate) + .module() + .call_reducer( + self.client.id.identity, + Some(self.client.id.connection_id), + Some(self.client.sender()), + Some(0), + Some(Instant::now()), + reducer, + args, + ) .await; let result = match result { Ok(result) => result.into(),