diff --git a/crates/core/src/client/client_connection.rs b/crates/core/src/client/client_connection.rs index 0a7a7f1a11b..34d337fb973 100644 --- a/crates/core/src/client/client_connection.rs +++ b/crates/core/src/client/client_connection.rs @@ -7,18 +7,14 @@ use std::sync::Arc; use std::task::{Context, Poll}; use std::time::{Instant, SystemTime}; -use super::messages::{OneOffQueryResponseMessage, ProcedureResultMessage}; +use super::messages::OneOffQueryResponseMessage; use super::{message_handlers, ClientActorId, MessageHandleError, OutboundMessage}; use crate::db::relational_db::RelationalDB; use crate::error::DBError; use crate::host::module_host::ClientConnectedError; -use crate::host::{ - CallProcedureReturn, FunctionArgs, ModuleHost, NoSuchModule, ProcedureCallResult, ReducerCallError, - ReducerCallResult, -}; +use crate::host::{FunctionArgs, ModuleHost, NoSuchModule, ReducerCallError}; use crate::subscription::module_subscription_manager::BroadcastError; use crate::subscription::row_list_builder_pool::JsonRowListBuilderFakePool; -use crate::util::asyncify; use crate::util::prometheus_handle::IntGaugeExt; use crate::worker_metrics::WORKER_METRICS; use bytes::Bytes; @@ -30,8 +26,7 @@ use spacetimedb_auth::identity::{ConnectionAuthCtx, SpacetimeIdentityClaims}; use spacetimedb_client_api_messages::websocket::{common as ws_common, v1 as ws_v1, v2 as ws_v2}; use spacetimedb_durability::{DurableOffset, TxOffset}; use spacetimedb_lib::identity::{AuthCtx, RequestId}; -use spacetimedb_lib::metrics::ExecutionMetrics; -use spacetimedb_lib::{bsatn, Identity, TimeDuration, Timestamp}; +use spacetimedb_lib::Identity; use tokio::sync::mpsc::error::{SendError, TrySendError}; use tokio::sync::{mpsc, oneshot, watch}; use tokio::task::AbortHandle; @@ -841,7 +836,7 @@ impl ClientConnection { request_id: RequestId, timer: Instant, flags: ws_v1::CallReducerFlags, - ) -> Result { + ) -> Result<(), ReducerCallError> { let caller = match flags { ws_v1::CallReducerFlags::FullUpdate => Some(self.sender()), // Setting `sender = None` causes `eval_updates` to skip sending to the caller @@ -850,7 +845,7 @@ impl ClientConnection { }; self.module() - .call_reducer( + .call_reducer_ws( self.id.identity, Some(self.id.connection_id), caller, @@ -869,9 +864,9 @@ impl ClientConnection { request_id: RequestId, timer: Instant, _flags: ws_v2::CallReducerFlags, - ) -> Result { + ) -> Result<(), ReducerCallError> { self.module() - .call_reducer( + .call_reducer_ws( self.id.identity, Some(self.id.connection_id), Some(self.sender()), @@ -890,22 +885,17 @@ impl ClientConnection { request_id: RequestId, timer: Instant, ) -> Result<(), BroadcastError> { - let CallProcedureReturn { result, tx_offset } = self - .module() - .call_procedure( + self.module() + .call_procedure_ws( + self.sender(), + request_id, self.id.identity, Some(self.id.connection_id), Some(timer), procedure, args, ) - .await; - - let message = ProcedureResultMessage::from_result(&result, request_id); - - self.module() - .subscriptions() - .send_procedure_message(self.sender(), message, tx_offset) + .await } pub async fn call_procedure_v2( @@ -916,152 +906,58 @@ impl ClientConnection { timer: Instant, _flags: ws_v2::CallProcedureFlags, ) -> Result<(), BroadcastError> { - let CallProcedureReturn { result, tx_offset } = self - .module() - .call_procedure( + self.module() + .call_procedure_ws_v2( + self.sender(), + request_id, self.id.identity, Some(self.id.connection_id), Some(timer), procedure, FunctionArgs::Bsatn(args), ) - .await; - - let (status, timestamp, execution_duration) = match result { - Ok(ProcedureCallResult { - return_val, - execution_duration, - start_timestamp, - }) => ( - ws_v2::ProcedureStatus::Returned( - bsatn::to_vec(&return_val) - .expect("Procedure return value failed to serialize to BSATN") - .into(), - ), - start_timestamp, - TimeDuration::from(execution_duration), - ), - Err(err) => ( - ws_v2::ProcedureStatus::InternalError(err.to_string().into()), - Timestamp::UNIX_EPOCH, - TimeDuration::ZERO, - ), - }; - - let message = ws_v2::ProcedureResult { - status, - timestamp, - total_host_execution_duration: execution_duration, - request_id, - }; - - self.module() - .subscriptions() - .send_procedure_message_v2(self.sender(), message, tx_offset) + .await } - pub async fn subscribe_single( - &self, - subscription: ws_v1::SubscribeSingle, - timer: Instant, - ) -> Result, DBError> { - let me = self.clone(); + pub async fn subscribe_single(&self, subscription: ws_v1::SubscribeSingle, timer: Instant) -> Result<(), DBError> { self.module() - .on_module_thread_async("subscribe_single", async move || { - let host = me.module(); - host.subscriptions() - .add_single_subscription(Some(&host), me.sender, me.auth.clone(), subscription, timer, None) - .await - }) - .await? + .call_view_add_single_subscription_ws(self.sender(), self.auth.clone(), subscription, timer) + .await } - pub async fn unsubscribe( - &self, - request: ws_v1::Unsubscribe, - timer: Instant, - ) -> Result, DBError> { - let me = self.clone(); - asyncify(move || { - me.module() - .subscriptions() - .remove_single_subscription(me.sender, me.auth.clone(), request, timer) - }) - .await + pub async fn unsubscribe(&self, request: ws_v1::Unsubscribe, timer: Instant) -> Result<(), DBError> { + self.module() + .call_view_remove_single_subscription_ws(self.sender(), self.auth.clone(), request, timer) + .await } - pub async fn subscribe_v2( - &self, - request: ws_v2::Subscribe, - timer: Instant, - ) -> Result, DBError> { - let me = self.clone(); + pub async fn subscribe_v2(&self, request: ws_v2::Subscribe, timer: Instant) -> Result<(), DBError> { self.module() - .on_module_thread_async("subscribe_v2", async move || { - let host = me.module(); - host.subscriptions() - .add_v2_subscription(Some(&host), me.sender, me.auth.clone(), request, timer, None) - .await - }) - .await? + .call_view_add_v2_subscription_ws(self.sender(), self.auth.clone(), request, timer) + .await } - pub async fn subscribe_multi( - &self, - request: ws_v1::SubscribeMulti, - timer: Instant, - ) -> Result, DBError> { - let me = self.clone(); + pub async fn subscribe_multi(&self, request: ws_v1::SubscribeMulti, timer: Instant) -> Result<(), DBError> { self.module() - .on_module_thread_async("subscribe_multi", async move || { - let host = me.module(); - host.subscriptions() - .add_multi_subscription(Some(&host), me.sender, me.auth.clone(), request, timer, None) - .await - }) - .await? + .call_view_add_multi_subscription_ws(self.sender(), self.auth.clone(), request, timer) + .await } - pub async fn unsubscribe_multi( - &self, - request: ws_v1::UnsubscribeMulti, - timer: Instant, - ) -> Result, DBError> { - let me = self.clone(); + pub async fn unsubscribe_multi(&self, request: ws_v1::UnsubscribeMulti, timer: Instant) -> Result<(), DBError> { self.module() - .on_module_thread("unsubscribe_multi", move || { - me.module() - .subscriptions() - .remove_multi_subscription(me.sender, me.auth.clone(), request, timer) - }) - .await? + .call_view_remove_multi_subscription_ws(self.sender(), self.auth.clone(), request, timer) + .await } - pub async fn unsubscribe_v2( - &self, - request: ws_v2::Unsubscribe, - timer: Instant, - ) -> Result, DBError> { - let me = self.clone(); + pub async fn unsubscribe_v2(&self, request: ws_v2::Unsubscribe, timer: Instant) -> Result<(), DBError> { self.module() - .on_module_thread_async("unsubscribe_v2", async move || { - let host = me.module(); - host.subscriptions() - .remove_v2_subscription(Some(&host), me.sender, me.auth.clone(), request, timer) - .await - }) - .await? + .call_view_remove_v2_subscription_ws(self.sender(), self.auth.clone(), request, timer) + .await } - pub async fn subscribe(&self, subscription: ws_v1::Subscribe, timer: Instant) -> Result { - let me = self.clone(); + pub async fn subscribe(&self, subscription: ws_v1::Subscribe, timer: Instant) -> Result<(), DBError> { self.module() - .on_module_thread_async("subscribe", async move || { - let host = me.module(); - host.subscriptions() - .add_legacy_subscriber(Some(&host), me.sender, me.auth.clone(), subscription, timer, None) - .await - }) - .await? + .call_view_add_legacy_subscription_ws(self.sender(), self.auth.clone(), subscription, timer) + .await } pub async fn one_off_query_json( @@ -1071,7 +967,7 @@ impl ClientConnection { timer: Instant, ) -> Result<(), anyhow::Error> { self.module() - .one_off_query::( + .one_off_query_ws::( self.auth.clone(), query.to_owned(), self.sender.clone(), @@ -1091,7 +987,7 @@ impl ClientConnection { ) -> Result<(), anyhow::Error> { let bsatn_rlb_pool = self.module().replica_ctx().subscriptions.bsatn_rlb_pool.clone(); self.module() - .one_off_query::( + .one_off_query_ws::( self.auth.clone(), query.to_owned(), self.sender.clone(), @@ -1106,7 +1002,7 @@ impl ClientConnection { pub async fn one_off_query_v2(&self, query: &str, request_id: u32, timer: Instant) -> Result<(), anyhow::Error> { let bsatn_rlb_pool = self.module().replica_ctx().subscriptions.bsatn_rlb_pool.clone(); self.module() - .one_off_query_v2( + .one_off_query_v2_ws( self.auth.clone(), query.to_owned(), self.sender.clone(), diff --git a/crates/core/src/client/message_handlers_v1.rs b/crates/core/src/client/message_handlers_v1.rs index d6cf8fc6257..a6bc7a1c397 100644 --- a/crates/core/src/client/message_handlers_v1.rs +++ b/crates/core/src/client/message_handlers_v1.rs @@ -40,17 +40,6 @@ pub async fn handle(client: &ClientConnection, message: DataMessage, timer: Inst let mod_info = module.info(); let mod_metrics = &mod_info.metrics; let database_identity = mod_info.database_identity; - let db = module.relational_db(); - let record_metrics = |wl| { - move |metrics| { - if let Some(metrics) = metrics { - db.exec_counters_for(wl).record(&metrics); - } - } - }; - let sub_metrics = record_metrics(WorkloadType::Subscribe); - let unsub_metrics = record_metrics(WorkloadType::Unsubscribe); - type HandleResult<'a> = Result<(), (Option<&'a RawIdentifier>, Option, anyhow::Error)>; let res: HandleResult<'_> = match message { ws_v1::ClientMessage::CallReducer(ws_v1::CallReducer { @@ -59,49 +48,52 @@ pub async fn handle(client: &ClientConnection, message: DataMessage, timer: Inst request_id, flags, }) => { - let res = client.call_reducer(reducer, args, request_id, timer, flags).await; + let res = client + .call_reducer(reducer, args, request_id, timer, flags) + .await + .map_err(|e| { + ( + Some(reducer), + mod_info.module_def.reducer_full(&**reducer).map(|(id, _)| id), + e.into(), + ) + }); WORKER_METRICS .request_round_trip .with_label_values(&WorkloadType::Reducer, &database_identity, reducer) .observe(timer.elapsed().as_secs_f64()); - res.map(drop).map_err(|e| { - ( - Some(reducer), - mod_info.module_def.reducer_full(&**reducer).map(|(id, _)| id), - e.into(), - ) - }) + res } ws_v1::ClientMessage::SubscribeMulti(subscription) => { - let res = client.subscribe_multi(subscription, timer).await.map(sub_metrics); + let res = client.subscribe_multi(subscription, timer).await; mod_metrics .request_round_trip_subscribe .observe(timer.elapsed().as_secs_f64()); res.map_err(|e| (None, None, e.into())) } ws_v1::ClientMessage::UnsubscribeMulti(request) => { - let res = client.unsubscribe_multi(request, timer).await.map(unsub_metrics); + let res = client.unsubscribe_multi(request, timer).await; mod_metrics .request_round_trip_unsubscribe .observe(timer.elapsed().as_secs_f64()); res.map_err(|e| (None, None, e.into())) } ws_v1::ClientMessage::SubscribeSingle(subscription) => { - let res = client.subscribe_single(subscription, timer).await.map(sub_metrics); + let res = client.subscribe_single(subscription, timer).await; mod_metrics .request_round_trip_subscribe .observe(timer.elapsed().as_secs_f64()); res.map_err(|e| (None, None, e.into())) } ws_v1::ClientMessage::Unsubscribe(request) => { - let res = client.unsubscribe(request, timer).await.map(unsub_metrics); + let res = client.unsubscribe(request, timer).await; mod_metrics .request_round_trip_unsubscribe .observe(timer.elapsed().as_secs_f64()); res.map_err(|e| (None, None, e.into())) } ws_v1::ClientMessage::Subscribe(subscription) => { - let res = client.subscribe(subscription, timer).await.map(Some).map(sub_metrics); + let res = client.subscribe(subscription, timer).await; mod_metrics .request_round_trip_subscribe .observe(timer.elapsed().as_secs_f64()); @@ -134,7 +126,7 @@ pub async fn handle(client: &ClientConnection, message: DataMessage, timer: Inst if let Err(e) = res { log::warn!("Procedure call failed: {e:#}"); } - // `ClientConnection::call_procedure` handles sending the error message to the client if the call fails, + // `ClientConnection::call_procedure` handles sending the procedure result message itself, // so we don't need to return an `Err` here. Ok(()) } diff --git a/crates/core/src/client/message_handlers_v2.rs b/crates/core/src/client/message_handlers_v2.rs index 2db523e472d..8c918763380 100644 --- a/crates/core/src/client/message_handlers_v2.rs +++ b/crates/core/src/client/message_handlers_v2.rs @@ -32,27 +32,17 @@ pub(super) async fn handle_decoded_message( let mod_info = module.info(); let mod_metrics = &mod_info.metrics; let database_identity = mod_info.database_identity; - let db = module.relational_db(); - let record_metrics = |wl| { - move |metrics| { - if let Some(metrics) = metrics { - db.exec_counters_for(wl).record(&metrics); - } - } - }; - let sub_metrics = record_metrics(WorkloadType::Subscribe); - let unsub_metrics = record_metrics(WorkloadType::Unsubscribe); type HandleResult<'a> = Result<(), (Option<&'a RawIdentifier>, Option, anyhow::Error)>; let res: HandleResult<'_> = match message { ws_v2::ClientMessage::Subscribe(subscribe) => { - let res = client.subscribe_v2(subscribe, timer).await.map(sub_metrics); + let res = client.subscribe_v2(subscribe, timer).await; mod_metrics .request_round_trip_subscribe .observe(timer.elapsed().as_secs_f64()); res.map_err(|e| (None, None, e.into())) } ws_v2::ClientMessage::Unsubscribe(unsubscribe) => { - let res = client.unsubscribe_v2(unsubscribe, timer).await.map(unsub_metrics); + let res = client.unsubscribe_v2(unsubscribe, timer).await; mod_metrics .request_round_trip_unsubscribe .observe(timer.elapsed().as_secs_f64()); @@ -80,7 +70,7 @@ pub(super) async fn handle_decoded_message( .with_label_values(&WorkloadType::Reducer, &database_identity, reducer) .observe(timer.elapsed().as_secs_f64()); match res { - Ok(_) => { + Ok(()) => { // If this was not a success, we would have already sent an error message. Ok(()) } diff --git a/crates/core/src/host/instance_env.rs b/crates/core/src/host/instance_env.rs index 0d3d41632b1..86751aae70c 100644 --- a/crates/core/src/host/instance_env.rs +++ b/crates/core/src/host/instance_env.rs @@ -401,6 +401,12 @@ impl InstanceEnv { table_id: TableId, row_ptr: RowPointer, ) -> Result<(), NodesError> { + let table = stdb.schema_for_table_mut(tx, table_id)?; + let schedule = table + .schedule + .as_ref() + .expect("schedule_row should only be called when we know its a scheduler table"); + let function_name: Box = (&schedule.function_name[..]).into(); let (id_column, at_column) = stdb .table_scheduled_id_and_at(tx, table_id)? .expect("schedule_row should only be called when we know its a scheduler table"); @@ -414,6 +420,7 @@ impl InstanceEnv { .schedule( table_id, schedule_id, + function_name, schedule_at, id_column, at_column, diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index 88c45a3f867..b72bdb9ad54 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -1,8 +1,8 @@ use super::{ - ArgsTuple, FunctionArgs, InvalidProcedureArguments, InvalidReducerArguments, ReducerCallResult, ReducerId, - ReducerOutcome, Scheduler, + ArgsTuple, FunctionArgs, InvalidProcedureArguments, InvalidReducerArguments, ProcedureCallResult, + ReducerCallResult, ReducerId, ReducerOutcome, Scheduler, }; -use crate::client::messages::{OneOffQueryResponseMessage, SerializableMessage}; +use crate::client::messages::{OneOffQueryResponseMessage, ProcedureResultMessage, SerializableMessage}; use crate::client::{ClientActorId, ClientConnectionSender}; use crate::database_logger::{DatabaseLogger, LogLevel, Record}; use crate::db::relational_db::RelationalDB; @@ -23,7 +23,7 @@ use crate::sql::ast::SchemaViewer; use crate::sql::execute::SqlResult; use crate::sql::parser::RowLevelExpr; use crate::subscription::module_subscription_actor::ModuleSubscriptions; -pub use crate::subscription::module_subscription_manager::TransactionOffset; +pub use crate::subscription::module_subscription_manager::{BroadcastError, TransactionOffset}; use crate::subscription::tx::DeltaTx; use crate::subscription::websocket_building::{BuildableWebsocketFormat, RowListBuilderSource}; use crate::subscription::{execute_plan, execute_plan_for_view}; @@ -47,7 +47,7 @@ use spacetimedb_data_structures::error_stream::ErrorStream; use spacetimedb_data_structures::map::{HashCollectionExt as _, HashSet}; use spacetimedb_datastore::error::DatastoreError; use spacetimedb_datastore::execution_context::{Workload, WorkloadType}; -use spacetimedb_datastore::locking_tx_datastore::{MutTxId, ViewCallInfo}; +use spacetimedb_datastore::locking_tx_datastore::{MutTxId, TxId, ViewCallInfo}; use spacetimedb_datastore::traits::{IsolationLevel, Program, TxData}; pub use spacetimedb_durability::{DurabilityExited, DurableOffset}; use spacetimedb_execution::pipelined::{PipelinedProject, ViewProject}; @@ -56,7 +56,7 @@ use spacetimedb_expr::expr::CollectViews; use spacetimedb_lib::db::raw_def::v9::Lifecycle; use spacetimedb_lib::identity::{AuthCtx, RequestId}; use spacetimedb_lib::metrics::ExecutionMetrics; -use spacetimedb_lib::{ConnectionId, Timestamp}; +use spacetimedb_lib::{bsatn, ConnectionId, TimeDuration, Timestamp}; use spacetimedb_primitives::{ArgId, ProcedureId, TableId, ViewFnPtr, ViewId}; use spacetimedb_query::compile_subscription; use spacetimedb_sats::raw_identifier::RawIdentifier; @@ -352,9 +352,83 @@ enum ModuleHostInner { Js(Box), } +/// Wasm has two instance managers: one for procedures and one for reducers/views. +/// +/// Both managers need to be able to create fresh instances from the same compiled module, +/// so they share the module via `Arc`. +/// +/// Reducers are executed serially by the `SingleCoreExecutor`, so the instance pool +/// contains only a single instance. +/// +/// This is not the case for procedures which can be executed concurrently. +/// +/// Note, for reducers it is important that we check out the instance **after** enqueuing +/// the job onto the `SingleCoreExecutor`, otherwise concurrent callers would contend on +/// the instance manager, potentially checking out multiple instances before their +/// execution was serialized. +/// +/// TODO: We should bound the instance pool for reducers so that attempting to checkout +/// more than one instance results in an error. struct WasmtimeModuleHost { executor: SingleCoreExecutor, - instance_manager: ModuleInstanceManager, + main_instance: Arc>>, + procedure_instances: Arc>>, +} + +#[derive(Clone, Copy)] +enum InstanceKind { + Main, + Procedure, +} + +macro_rules! with_instance_impl { + ( + $self:expr, + $kind:expr, + $label:expr, + $instance_kind:expr, + $arg:ident, + $wasm:expr, + |$host:ident, $timer_guard:ident, $arg_js:ident| $js_body:expr + ) => {{ + $self.guard_closed()?; + let $timer_guard = $self.start_call_timer($label); + + // Operations on module instances (e.g. calling reducers) is blocking, + // partially because the computation can potentially take a long time + // and partially because interacting with the database requires taking + // a blocking lock. So, we run `f` on a dedicated thread with `self.executor`. + // This will bubble up any panic that may occur. + + // If a function call panics, we **must** ensure to call `self.on_panic` + // so that the module is discarded by the host controller. + scopeguard::defer_on_unwind!({ + log::warn!("{} {} panicked", $kind, $label); + ($self.on_panic)(); + }); + + Ok(match &*$self.inner { + ModuleHostInner::Wasm(host) => { + let executor = host.executor.clone(); + let instance_manager = Self::wasm_instance_manager(host, $instance_kind); + executor + .run_job(async move || { + // Queue first, then check out the instance from inside the queued job. + // This is required for both the serialized main lane and the procedure + // pool so that executor scheduling happens before instance allocation. + drop($timer_guard); + instance_manager + .with_instance(async move |mut inst| (($wasm)($arg, &mut inst).await, inst)) + .await + }) + .await + } + ModuleHostInner::Js($host) => { + let $arg_js = $arg; + $js_body + } + }) + }}; } struct V8ModuleHost { @@ -406,6 +480,16 @@ impl GenericModule for super::v8::JsModule { } } +impl GenericModule for Arc { + type Instance = M::Instance; + async fn create_instance(&self) -> Self::Instance { + (**self).create_instance().await + } + fn host_type(&self) -> HostType { + (**self).host_type() + } +} + impl GenericModuleInstance for super::v8::JsInstance { fn trapped(&self) -> bool { self.trapped() @@ -656,12 +740,24 @@ pub enum ViewCommand { request: ws_v1::SubscribeSingle, _timer: Instant, }, + RemoveSingleSubscription { + sender: Arc, + auth: AuthCtx, + request: ws_v1::Unsubscribe, + timer: Instant, + }, AddMultiSubscription { sender: Arc, auth: AuthCtx, request: ws_v1::SubscribeMulti, _timer: Instant, }, + RemoveMultiSubscription { + sender: Arc, + auth: AuthCtx, + request: ws_v1::UnsubscribeMulti, + timer: Instant, + }, AddLegacySubscription { sender: Arc, auth: AuthCtx, @@ -747,102 +843,87 @@ impl CallProcedureParams { /// Holds a [`Module`] and a set of [`Instance`]s from it, /// and allocates the [`Instance`]s to be used for function calls. /// -/// Capable of managing and allocating multiple instances of the same module, -/// but this functionality is currently unused, as only one reducer runs at a time. -/// When we introduce procedures, it will be necessary to have multiple instances, -/// as each procedure invocation will have its own sandboxed instance, -/// and multiple procedures can run concurrently with up to one reducer. +/// This can either back a single long-lived serialized execution lane +/// or a pool that grows to support concurrent procedure execution. struct ModuleInstanceManager { instances: Mutex>, module: M, - module_instances_metric: ModuleInstancesMetric, - create_instance_time_metric: CreateInstanceTimeMetric, + metrics: InstanceManagerMetrics, } -/// Handle on the `spacetime_module_create_instance_time_seconds` label for a particular database -/// which calls `remove_label_values` to clean up on drop. -struct CreateInstanceTimeMetric { - metric: Histogram, - host_type: HostType, - database_identity: Identity, +/// The [`ModuleHost`] now manages two instance pools for wasm: one for +/// reducers/views and another for procedures, and both write to the same +/// metric labels. Therefore both must share the same handles or else each +/// copy would try to unregister the same metric labels independently on drop. +#[derive(Clone)] +struct InstanceManagerMetrics { + inner: Arc, } -/// Handle on the `spacetime_module_instances` label for a particular database -/// which calls `remove_label_values` to clean up on drop. -struct ModuleInstancesMetric { - metric: IntGauge, +/// Shared metric state for all instance managers of a module host. +struct InstanceManagerMetricsInner { + module_instances_metric: IntGauge, + create_instance_time_metric: Histogram, host_type: HostType, database_identity: Identity, - count: std::sync::Mutex, } -impl Drop for CreateInstanceTimeMetric { +impl Drop for InstanceManagerMetricsInner { fn drop(&mut self) { let _ = WORKER_METRICS .module_create_instance_time_seconds .remove_label_values(&self.database_identity, &self.host_type); - } -} - -impl Drop for ModuleInstancesMetric { - fn drop(&mut self) { let _ = WORKER_METRICS .module_instances .remove_label_values(&self.database_identity, &self.host_type); } } -impl ModuleInstancesMetric { - fn inc(&self) { - let mut count = self.count.lock().unwrap(); - *count += 1; - self.metric.set(*count); +impl InstanceManagerMetrics { + fn new(host_type: HostType, database_identity: Identity) -> Self { + Self { + inner: Arc::new(InstanceManagerMetricsInner { + module_instances_metric: WORKER_METRICS + .module_instances + .with_label_values(&database_identity, &host_type), + create_instance_time_metric: WORKER_METRICS + .module_create_instance_time_seconds + .with_label_values(&database_identity, &host_type), + host_type, + database_identity, + }), + } } - fn dec(&self) { - let mut count = self.count.lock().unwrap(); - if *count == 0 { - return; - } - *count -= 1; - self.metric.set(*count); + fn inc_instances(&self) { + self.inner.module_instances_metric.inc(); } -} -impl CreateInstanceTimeMetric { - fn observe(&self, duration: std::time::Duration) { - self.metric.observe(duration.as_secs_f64()); + fn dec_instances(&self) { + self.inner.module_instances_metric.dec(); + } + + fn observe_create_instance_time(&self, duration: std::time::Duration) { + self.inner.create_instance_time_metric.observe(duration.as_secs_f64()); } } impl ModuleInstanceManager { - fn new(module: M, init_inst: Option, database_identity: Identity) -> Self { + fn new(module: M, init_inst: Option, metrics: InstanceManagerMetrics) -> Self { let host_type = module.host_type(); - let module_instances_metric = ModuleInstancesMetric { - metric: WORKER_METRICS - .module_instances - .with_label_values(&database_identity, &host_type), - host_type, - database_identity, - count: std::sync::Mutex::new(1), - }; - - let create_instance_time_metric = CreateInstanceTimeMetric { - metric: WORKER_METRICS - .module_create_instance_time_seconds - .with_label_values(&database_identity, &host_type), - host_type, - database_identity, - }; + debug_assert_eq!(metrics.inner.host_type, host_type); let mut instances = VecDeque::new(); + let has_init_inst = init_inst.is_some(); instances.extend(init_inst); + if has_init_inst { + metrics.inc_instances(); + } Self { instances: Mutex::new(instances), module, - module_instances_metric, - create_instance_time_metric, + metrics, } } @@ -861,8 +942,8 @@ impl ModuleInstanceManager { let start_time = std::time::Instant::now(); let res = self.module.create_instance().await; let elapsed_time = start_time.elapsed(); - self.create_instance_time_metric.observe(elapsed_time); - self.module_instances_metric.inc(); + self.metrics.observe_create_instance_time(elapsed_time); + self.metrics.inc_instances(); res } } @@ -872,7 +953,7 @@ impl ModuleInstanceManager { // Don't return trapped instances; // they may have left internal data structures in the guest `Instance` // (WASM linear memory, V8 global scope) in a bad state. - self.module_instances_metric.dec(); + self.metrics.dec_instances(); return; } @@ -1069,6 +1150,16 @@ pub struct RefInstance<'a, I: WasmInstance> { } impl ModuleHost { + fn wasm_instance_manager( + host: &WasmtimeModuleHost, + kind: InstanceKind, + ) -> Arc>> { + match kind { + InstanceKind::Main => host.main_instance.clone(), + InstanceKind::Procedure => host.procedure_instances.clone(), + } + } + pub(super) fn new( module: ModuleWithInstance, on_panic: impl Fn() + Send + Sync + 'static, @@ -1082,16 +1173,25 @@ impl ModuleHost { init_inst, } => { info = module.info(); - let instance_manager = ModuleInstanceManager::new(module, Some(init_inst), database_identity); + let module = Arc::new(module); + let metrics = InstanceManagerMetrics::new(module.host_type(), database_identity); + let instance_manager = Arc::new(ModuleInstanceManager::new( + module.clone(), + Some(init_inst), + metrics.clone(), + )); + let procedure_instances = Arc::new(ModuleInstanceManager::new(module, None, metrics)); Arc::new(ModuleHostInner::Wasm(Box::new(WasmtimeModuleHost { executor, - instance_manager, + main_instance: instance_manager, + procedure_instances, }))) } ModuleWithInstance::Js { module, init_inst } => { info = module.info(); let instance_lane = super::v8::JsInstanceLane::new(module.clone(), init_inst); - let procedure_instances = ModuleInstanceManager::new(module.clone(), None, database_identity); + let metrics = InstanceManagerMetrics::new(module.host_type(), database_identity); + let procedure_instances = ModuleInstanceManager::new(module.clone(), None, metrics); Arc::new(ModuleHostInner::Js(Box::new(V8ModuleHost { module, instance_lane, @@ -1203,50 +1303,65 @@ impl ModuleHost { }) } - /// Run a function for this module which has access to the module instance. - async fn with_instance( - &self, - kind: &str, - label: &str, - arg: A, - timer: impl FnOnce(&str) -> Guard, - work_wasm: impl AsyncFnOnce(Guard, &SingleCoreExecutor, Box, A) -> (R, Box), - work_js: impl AsyncFnOnce(Guard, &super::v8::JsInstanceLane, A) -> R, - ) -> Result { + fn enqueue_wasm_job(&self, kind: &str, label: &str, f: F) -> Result<(), NoSuchModule> + where + F: AsyncFnOnce() + Send + 'static, + { self.guard_closed()?; - let timer_guard = timer(label); - // Operations on module instances (e.g. calling reducers) is blocking, - // partially because the computation can potentially take a long time - // and partially because interacting with the database requires taking - // a blocking lock. So, we run `f` on a dedicated thread with `self.executor`. - // This will bubble up any panic that may occur. + let timer_guard = self.start_call_timer(label); + let kind = kind.to_owned(); + let label = label.to_owned(); + let on_panic = self.on_panic.clone(); - // If a function call panics, we **must** ensure to call `self.on_panic` - // so that the module is discarded by the host controller. - scopeguard::defer_on_unwind!({ - log::warn!("{kind} {label} panicked"); - (self.on_panic)(); - }); + match &*self.inner { + ModuleHostInner::Wasm(host) => { + let executor = host.executor.clone(); + executor.enqueue_job(async move || { + scopeguard::defer_on_unwind!({ + log::warn!("{} {} panicked", kind, label); + on_panic(); + }); + + drop(timer_guard); + f().await; + }); + Ok(()) + } + ModuleHostInner::Js(_) => unreachable!("enqueue_wasm_job should only be used for wasm"), + } + } - Ok(match &*self.inner { - ModuleHostInner::Wasm(wasm) => { - let executor = &wasm.executor; - let instance_manager = &wasm.instance_manager; - instance_manager - .with_instance(async |inst| work_wasm(timer_guard, executor, inst, arg).await) - .await + fn enqueue_wasm_instance( + &self, + kind: &str, + label: &str, + instance_kind: InstanceKind, + arg: A, + wasm: impl AsyncFnOnce(A, &mut ModuleInstance) + Send + 'static, + ) -> Result<(), NoSuchModule> + where + A: Send + 'static, + { + match &*self.inner { + ModuleHostInner::Wasm(host) => { + let instance_manager = Self::wasm_instance_manager(host, instance_kind); + self.enqueue_wasm_job(kind, label, async move || { + instance_manager + .with_instance(async move |mut inst| { + wasm(arg, &mut inst).await; + ((), inst) + }) + .await; + }) } - ModuleHostInner::Js(js) => work_js(timer_guard, &js.instance_lane, arg).await, - }) + ModuleHostInner::Js(_) => unreachable!("enqueue_wasm_instance should only be used for wasm"), + } } - /// Run a function for this module which has access to the module instance. - /// - /// For WASM, the function is run on the module's JobThread. - /// For V8/JS, the function is run in the current task. - async fn call( + async fn with_main_instance( &self, + kind: &str, label: &str, arg: A, wasm: impl AsyncFnOnce(A, &mut ModuleInstance) -> R + Send + 'static, @@ -1256,40 +1371,23 @@ impl ModuleHost { R: Send + 'static, A: Send + 'static, { - self.with_instance( - "reducer", + with_instance_impl!( + self, + kind, label, + InstanceKind::Main, arg, - |l| self.start_call_timer(l), - // Operations on module instances (e.g. calling reducers) is blocking, - // partially because the computation can potentially take a long time - // and partially because interacting with the database requires taking a blocking lock. - // So, we run `work` on a dedicated thread with `self.executor`. - // This will bubble up any panic that may occur. - async move |timer_guard, executor, mut inst, arg| { - executor - .run_job(async move || { - drop(timer_guard); - (wasm(arg, &mut inst).await, inst) - }) - .await - }, - async move |timer_guard, inst, arg| { - super::v8::assert_not_on_js_module_thread(label); + wasm, + |host, timer_guard, arg| { drop(timer_guard); - js(arg, inst).await - }, + js(arg, &host.instance_lane).await + } ) - .await } - /// Run a function for this module using pooled instances. - /// - /// For WASM, this is identical to [`Self::call`]. - /// For V8/JS, this uses the pooled procedure instances instead of the - /// single instance lane. - async fn call_pooled( + async fn with_procedure_instance( &self, + kind: &str, label: &str, arg: A, wasm: impl AsyncFnOnce(A, &mut ModuleInstance) -> R + Send + 'static, @@ -1299,28 +1397,14 @@ impl ModuleHost { R: Send + 'static, A: Send + 'static, { - self.guard_closed()?; - let timer_guard = self.start_call_timer(label); - - scopeguard::defer_on_unwind!({ - log::warn!("pooled operation {label} panicked"); - (self.on_panic)(); - }); - - Ok(match &*self.inner { - ModuleHostInner::Wasm(host) => { - host.instance_manager - .with_instance(async |mut inst| { - host.executor - .run_job(async move || { - drop(timer_guard); - (wasm(arg, &mut inst).await, inst) - }) - .await - }) - .await - } - ModuleHostInner::Js(host) => { + with_instance_impl!( + self, + kind, + label, + InstanceKind::Procedure, + arg, + wasm, + |host, timer_guard, arg| { host.procedure_instances .with_instance(async |inst| { drop(timer_guard); @@ -1329,19 +1413,102 @@ impl ModuleHost { }) .await } + ) + } + + /// Run a function for this module which has access to the module instance. + /// + /// For WASM, the function is run on the module's JobThread. + /// For V8/JS, the function is run in the current task. + async fn call( + &self, + label: &str, + arg: A, + wasm: impl AsyncFnOnce(A, &mut ModuleInstance) -> R + Send + 'static, + js: impl AsyncFnOnce(A, &super::v8::JsInstanceLane) -> R, + ) -> Result + where + R: Send + 'static, + A: Send + 'static, + { + self.with_main_instance("main-instance operation", label, arg, wasm, async move |arg, lane| { + super::v8::assert_not_on_js_module_thread(label); + js(arg, lane).await }) + .await + } + + async fn call_pooled( + &self, + label: &str, + arg: A, + wasm: impl AsyncFnOnce(A, &mut ModuleInstance) -> R + Send + 'static, + js: impl AsyncFnOnce(A, &JsInstance) -> R, + ) -> Result + where + R: Send + 'static, + A: Send + 'static, + { + self.with_procedure_instance("pooled operation", label, arg, wasm, js) + .await } async fn call_view_command(&self, label: &str, cmd: ViewCommand) -> Result { - self.call_pooled( + self.call( label, cmd, async |cmd, inst| Ok::<_, ViewCallError>(inst.call_view(cmd)), - async |cmd, inst| Ok::<_, ViewCallError>(inst.call_view(cmd).await), + async |cmd, lane| lane.call_view(cmd).await, ) .await? } + async fn call_view_command_ws(&self, label: &str, cmd: ViewCommand) -> Result<(), ViewCallError> { + match &*self.inner { + ModuleHostInner::Wasm(_) => { + match self.enqueue_wasm_instance( + "main-instance operation", + label, + InstanceKind::Main, + cmd, + async |cmd, inst| { + let _ = inst.call_view(cmd); + }, + ) { + Ok(()) => Ok(()), + Err(err) => Err(err.into()), + } + } + ModuleHostInner::Js(_) => self.call_view_command(label, cmd).await.map(drop), + } + } + + fn unwrap_subscription_command_result( + label: &str, + res: Result, + ) -> Result, DBError> { + match res.map_err(|e| DBError::Other(anyhow::anyhow!(e)))? { + ViewCommandResult::Subscription { result } => result, + ViewCommandResult::Sql { .. } => { + unreachable!("unexpected SQL result in {label}") + } + } + } + + async fn call_subscription_command( + &self, + label: &str, + cmd: ViewCommand, + ) -> Result, DBError> { + Self::unwrap_subscription_command_result(label, self.call_view_command(label, cmd).await) + } + + async fn call_subscription_command_ws(&self, label: &str, cmd: ViewCommand) -> Result<(), DBError> { + self.call_view_command_ws(label, cmd) + .await + .map_err(|e| DBError::Other(anyhow::anyhow!(e))) + } + pub async fn disconnect_client(&self, client_id: ClientActorId) { log::trace!("disconnecting client {client_id}"); if let Err(e) = self @@ -1583,6 +1750,30 @@ impl ModuleHost { }) } + fn call_procedure_params( + module: &ModuleInfo, + caller_identity: Identity, + caller_connection_id: Option, + timer: Option, + procedure_id: ProcedureId, + procedure_def: &ProcedureDef, + args: FunctionArgs, + ) -> Result { + let args = args + .into_tuple_for_def(&module.module_def, procedure_def) + .map_err(InvalidProcedureArguments)?; + let caller_connection_id = caller_connection_id.unwrap_or(ConnectionId::ZERO); + + Ok(CallProcedureParams { + timestamp: Timestamp::now(), + caller_identity, + caller_connection_id, + timer, + procedure_id, + args, + }) + } + async fn call_reducer_inner( &self, caller_identity: Identity, @@ -1668,6 +1859,82 @@ impl ModuleHost { res } + pub async fn call_reducer_ws( + &self, + caller_identity: Identity, + caller_connection_id: Option, + client: Option>, + request_id: Option, + timer: Option, + reducer_name: &str, + args: FunctionArgs, + ) -> Result<(), ReducerCallError> { + match &*self.inner { + ModuleHostInner::Js(_) => self + .call_reducer( + caller_identity, + caller_connection_id, + client, + request_id, + timer, + reducer_name, + args, + ) + .await + .map(drop), + ModuleHostInner::Wasm(_) => { + let res = (|| { + let (reducer_id, reducer_def) = self + .info + .module_def + .reducer_full(reducer_name) + .ok_or(ReducerCallError::NoSuchReducer)?; + if let Some(lifecycle) = reducer_def.lifecycle { + return Err(ReducerCallError::LifecycleReducer(lifecycle)); + } + + if reducer_def.visibility.is_private() && !self.is_database_owner(caller_identity) { + return Err(ReducerCallError::NoSuchReducer); + } + + let params = Self::call_reducer_params( + &self.info, + caller_identity, + caller_connection_id, + client, + request_id, + timer, + reducer_id, + reducer_def, + args, + )?; + + self.enqueue_wasm_instance( + "main-instance operation", + &reducer_def.name, + InstanceKind::Main, + params, + async |params, inst| { + let _ = inst.call_reducer(params); + }, + )?; + Ok(()) + })(); + + let log_message = match &res { + Err(ReducerCallError::NoSuchReducer) => Some(no_such_function_log_message("reducer", reducer_name)), + Err(ReducerCallError::Args(_)) => Some(args_error_log_message("reducer", reducer_name)), + _ => None, + }; + if let Some(log_message) = log_message { + self.inject_logs(LogLevel::Error, reducer_name, &log_message) + } + + res + } + } + } + pub async fn call_view_add_single_subscription( &self, sender: Arc, @@ -1681,19 +1948,25 @@ impl ModuleHost { request, _timer: timer, }; - - let res = self - .call_view_command("call_view_add_single_subscription", cmd) + self.call_subscription_command("call_view_add_single_subscription", cmd) .await - //TODO: handle error better - .map_err(|e| DBError::Other(anyhow::anyhow!(e)))?; + } - match res { - ViewCommandResult::Subscription { result } => result, - ViewCommandResult::Sql { .. } => { - unreachable!("unexpected SQL result in call_view_add_single_subscription") - } - } + pub async fn call_view_add_single_subscription_ws( + &self, + sender: Arc, + auth: AuthCtx, + request: ws_v1::SubscribeSingle, + timer: Instant, + ) -> Result<(), DBError> { + let cmd = ViewCommand::AddSingleSubscription { + sender, + auth, + request, + _timer: timer, + }; + self.call_subscription_command_ws("call_view_add_single_subscription", cmd) + .await } pub async fn call_view_add_v2_subscription( @@ -1709,19 +1982,25 @@ impl ModuleHost { request, _timer: timer, }; - - let res = self - .call_view_command("call_view_add_multi_subscription", cmd) + self.call_subscription_command("call_view_add_multi_subscription", cmd) .await - //TODO: handle error better - .map_err(|e| DBError::Other(anyhow::anyhow!(e)))?; + } - match res { - ViewCommandResult::Subscription { result } => result, - ViewCommandResult::Sql { .. } => { - unreachable!("unexpected SQL result in call_view_add_single_subscription") - } - } + pub async fn call_view_add_v2_subscription_ws( + &self, + sender: Arc, + auth: AuthCtx, + request: ws_v2::Subscribe, + timer: Instant, + ) -> Result<(), DBError> { + let cmd = ViewCommand::AddSubscriptionV2 { + sender, + auth, + request, + _timer: timer, + }; + self.call_subscription_command_ws("call_view_add_multi_subscription", cmd) + .await } pub async fn call_view_remove_v2_subscription( @@ -1737,18 +2016,25 @@ impl ModuleHost { request, timer, }; - - let res = self - .call_view_command("call_view_remove_v2_subscription", cmd) + self.call_subscription_command("call_view_remove_v2_subscription", cmd) .await - .map_err(|e| DBError::Other(anyhow::anyhow!(e)))?; + } - match res { - ViewCommandResult::Subscription { result } => result, - ViewCommandResult::Sql { .. } => { - unreachable!("unexpected SQL result in call_view_remove_v2_subscription") - } - } + pub async fn call_view_remove_v2_subscription_ws( + &self, + sender: Arc, + auth: AuthCtx, + request: ws_v2::Unsubscribe, + timer: Instant, + ) -> Result<(), DBError> { + let cmd = ViewCommand::RemoveSubscriptionV2 { + sender, + auth, + request, + timer, + }; + self.call_subscription_command_ws("call_view_remove_v2_subscription", cmd) + .await } pub async fn call_view_add_multi_subscription( @@ -1764,19 +2050,59 @@ impl ModuleHost { request, _timer: timer, }; + self.call_subscription_command("call_view_add_multi_subscription", cmd) + .await + } - let res = self - .call_view_command("call_view_add_multi_subscription", cmd) + pub async fn call_view_remove_single_subscription( + &self, + sender: Arc, + auth: AuthCtx, + request: ws_v1::Unsubscribe, + timer: Instant, + ) -> Result, DBError> { + let cmd = ViewCommand::RemoveSingleSubscription { + sender, + auth, + request, + timer, + }; + self.call_subscription_command("call_view_remove_single_subscription", cmd) .await - //TODO: handle error better - .map_err(|e| DBError::Other(anyhow::anyhow!(e)))?; + } - match res { - ViewCommandResult::Subscription { result } => result, - ViewCommandResult::Sql { .. } => { - unreachable!("unexpected SQL result in call_view_add_single_subscription") - } - } + pub async fn call_view_remove_single_subscription_ws( + &self, + sender: Arc, + auth: AuthCtx, + request: ws_v1::Unsubscribe, + timer: Instant, + ) -> Result<(), DBError> { + let cmd = ViewCommand::RemoveSingleSubscription { + sender, + auth, + request, + timer, + }; + self.call_subscription_command_ws("call_view_remove_single_subscription", cmd) + .await + } + + pub async fn call_view_add_multi_subscription_ws( + &self, + sender: Arc, + auth: AuthCtx, + request: ws_v1::SubscribeMulti, + timer: Instant, + ) -> Result<(), DBError> { + let cmd = ViewCommand::AddMultiSubscription { + sender, + auth, + request, + _timer: timer, + }; + self.call_subscription_command_ws("call_view_add_multi_subscription", cmd) + .await } pub async fn call_view_add_legacy_subscription( @@ -1792,19 +2118,59 @@ impl ModuleHost { subscribe, _timer: timer, }; + self.call_subscription_command("call_view_add_legacy_subscription", cmd) + .await + } - let res = self - .call_view_command("call_view_add_legacy_subscription", cmd) + pub async fn call_view_remove_multi_subscription( + &self, + sender: Arc, + auth: AuthCtx, + request: ws_v1::UnsubscribeMulti, + timer: Instant, + ) -> Result, DBError> { + let cmd = ViewCommand::RemoveMultiSubscription { + sender, + auth, + request, + timer, + }; + self.call_subscription_command("call_view_remove_multi_subscription", cmd) .await - //TODO: handle error better - .map_err(|e| DBError::Other(anyhow::anyhow!(e)))?; + } - match res { - ViewCommandResult::Subscription { result } => result, - ViewCommandResult::Sql { .. } => { - unreachable!("unexpected SQL result in call_view_add_single_subscription") - } - } + pub async fn call_view_remove_multi_subscription_ws( + &self, + sender: Arc, + auth: AuthCtx, + request: ws_v1::UnsubscribeMulti, + timer: Instant, + ) -> Result<(), DBError> { + let cmd = ViewCommand::RemoveMultiSubscription { + sender, + auth, + request, + timer, + }; + self.call_subscription_command_ws("call_view_remove_multi_subscription", cmd) + .await + } + + pub async fn call_view_add_legacy_subscription_ws( + &self, + sender: Arc, + auth: AuthCtx, + subscribe: ws_v1::Subscribe, + timer: Instant, + ) -> Result<(), DBError> { + let cmd = ViewCommand::AddLegacySubscription { + sender, + auth, + subscribe, + _timer: timer, + }; + self.call_subscription_command_ws("call_view_add_legacy_subscription", cmd) + .await } pub async fn call_view_sql( @@ -1891,6 +2257,149 @@ impl ModuleHost { ret } + pub async fn call_procedure_ws( + &self, + sender: Arc, + request_id: RequestId, + caller_identity: Identity, + caller_connection_id: Option, + timer: Option, + procedure_name: &str, + args: FunctionArgs, + ) -> Result<(), BroadcastError> { + let sender = sender.clone(); + self.call_procedure_ws_impl( + caller_identity, + caller_connection_id, + timer, + procedure_name, + args, + move |subscriptions, ret| { + let message = ProcedureResultMessage::from_result(&ret.result, request_id); + subscriptions.send_procedure_message(sender.clone(), message, ret.tx_offset) + }, + ) + .await + } + + pub async fn call_procedure_ws_v2( + &self, + sender: Arc, + request_id: RequestId, + caller_identity: Identity, + caller_connection_id: Option, + timer: Option, + procedure_name: &str, + args: FunctionArgs, + ) -> Result<(), BroadcastError> { + let sender = sender.clone(); + self.call_procedure_ws_impl( + caller_identity, + caller_connection_id, + timer, + procedure_name, + args, + move |subscriptions, ret| { + let message = Self::procedure_result_message_v2(ret.result, request_id); + subscriptions.send_procedure_message_v2(sender.clone(), message, ret.tx_offset) + }, + ) + .await + } + + async fn call_procedure_ws_impl( + &self, + caller_identity: Identity, + caller_connection_id: Option, + timer: Option, + procedure_name: &str, + args: FunctionArgs, + send_result: F, + ) -> Result<(), BroadcastError> + where + F: Fn(ModuleSubscriptions, CallProcedureReturn) -> Result<(), BroadcastError> + Clone + Send + 'static, + { + let call_return_err = |err| CallProcedureReturn { + result: Err(err), + tx_offset: None, + }; + + if let ModuleHostInner::Js(_) = &*self.inner { + let ret = self + .call_procedure(caller_identity, caller_connection_id, timer, procedure_name, args) + .await; + return send_result(self.subscriptions().clone(), ret); + } + + if let Err(err) = self.guard_closed() { + let ret = call_return_err(err.into()); + let subscriptions = self.subscriptions().clone(); + return send_result(subscriptions, ret); + } + + let (procedure_id, procedure_def) = match self.info.module_def.procedure_full(procedure_name) { + Some(procedure) => procedure, + None => { + let ret = call_return_err(ProcedureCallError::NoSuchProcedure); + let subscriptions = self.subscriptions().clone(); + return send_result(subscriptions, ret); + } + }; + + if procedure_def.visibility.is_private() && !self.is_database_owner(caller_identity) { + let ret = call_return_err(ProcedureCallError::NoSuchProcedure); + let subscriptions = self.subscriptions().clone(); + return send_result(subscriptions, ret); + } + + let params = match Self::call_procedure_params( + &self.info, + caller_identity, + caller_connection_id, + timer, + procedure_id, + procedure_def, + args, + ) { + Ok(params) => params, + Err(err) => { + let ret = call_return_err(err.into()); + let subscriptions = self.subscriptions().clone(); + return send_result(subscriptions, ret); + } + }; + + match &*self.inner { + ModuleHostInner::Wasm(_) => { + let subscriptions = self.subscriptions().clone(); + let procedure_label = procedure_def.name.to_string(); + let send_result_in_job = send_result.clone(); + let enqueue_res = self.enqueue_wasm_instance( + "pooled operation", + &procedure_label, + InstanceKind::Procedure, + (subscriptions, params), + async move |(subscriptions, params), inst| { + let ret = inst.call_procedure(params).await; + if let Err(err) = send_result_in_job(subscriptions, ret) { + log::warn!("Procedure call failed: {err:#}"); + } + }, + ); + + match enqueue_res { + Ok(()) => Ok(()), + Err(err) => { + let ret = call_return_err(err.into()); + let subscriptions = self.subscriptions().clone(); + send_result(subscriptions, ret) + } + } + } + ModuleHostInner::Js(_) => unreachable!("handled before wasm-specific procedure setup"), + } + } + async fn call_procedure_inner( &self, caller_identity: Identity, @@ -1917,6 +2426,39 @@ impl ModuleHost { Ok(self.call_procedure_with_params(&procedure_def.name, params).await?) } + fn procedure_result_message_v2( + result: Result, + request_id: RequestId, + ) -> ws_v2::ProcedureResult { + let (status, timestamp, execution_duration) = match result { + Ok(ProcedureCallResult { + return_val, + execution_duration, + start_timestamp, + }) => ( + ws_v2::ProcedureStatus::Returned( + bsatn::to_vec(&return_val) + .expect("Procedure return value failed to serialize to BSATN") + .into(), + ), + start_timestamp, + TimeDuration::from(execution_duration), + ), + Err(err) => ( + ws_v2::ProcedureStatus::InternalError(err.to_string().into()), + Timestamp::UNIX_EPOCH, + TimeDuration::ZERO, + ), + }; + + ws_v2::ProcedureResult { + status, + timestamp, + total_host_execution_duration: execution_duration, + request_id, + } + } + //TODO(shub) #4195: Also allow for collaborators along with owner fn is_database_owner(&self, caller_identity: Identity) -> bool { self.info.owner_identity == caller_identity @@ -1936,15 +2478,28 @@ impl ModuleHost { .await } - pub(super) async fn call_scheduled_function( + pub(super) async fn call_scheduled_reducer( + &self, + params: ScheduledFunctionParams, + ) -> Result { + self.call( + "scheduled reducer", + params, + async |params, inst| Ok(inst.call_scheduled_function(params).await), + async |params, lane| Ok(lane.call_scheduled_function(params).await), + ) + .await? + } + + pub(super) async fn call_scheduled_procedure( &self, params: ScheduledFunctionParams, ) -> Result { self.call_pooled( - "unknown scheduled function", + "scheduled procedure", params, - async move |params, inst| Ok(inst.call_scheduled_function(params).await), - async move |params, inst| Ok(inst.call_scheduled_function(params).await), + async |params, inst| Ok(inst.call_scheduled_function(params).await), + async |params, inst| Ok(inst.call_scheduled_function(params).await), ) .await? } @@ -2229,100 +2784,17 @@ impl ModuleHost { log::debug!("One-off query: {query}"); let metrics = self .on_module_thread("one_off_query", move || { - let (tx_offset_sender, tx_offset_receiver) = oneshot::channel(); - let tx = scopeguard::guard(db.begin_tx(Workload::Sql), |tx| { - let (tx_offset, tx_metrics, reducer) = db.release_tx(tx); - let _ = tx_offset_sender.send(tx_offset); - db.report_read_tx_metrics(reducer, tx_metrics); - }); - - // We wrap the actual query in a closure so we can use ? to handle errors without making - // the entire transaction abort with an error. - let result: Result<(ws_v1::OneOffTable, ExecutionMetrics), anyhow::Error> = (|| { - let tx = SchemaViewer::new(&*tx, &auth); - - let ( - // A query may compile down to several plans. - // This happens when there are multiple RLS rules per table. - // The original query is the union of these plans. - plans, - _, - table_name, - _, - ) = compile_subscription(&query, &tx, &auth)?; - - // Optimize each fragment - let optimized = plans - .into_iter() - .map(|plan| plan.optimize(&auth)) - .collect::, _>>()?; - - check_row_limit( - &optimized, - &db, - &tx, - // Estimate the number of rows this query will scan - |plan, tx| estimate_rows_scanned(tx, plan), - &auth, - )?; - - let return_table = || optimized.first().and_then(|plan| plan.return_table()); - - let returns_view_table = optimized.first().is_some_and(|plan| plan.returns_view_table()); - let num_cols = return_table().map(|schema| schema.num_cols()).unwrap_or_default(); - let num_private_cols = return_table() - .map(|schema| schema.num_private_cols()) - .unwrap_or_default(); - - let optimized = optimized - .into_iter() - // Convert into something we can execute - .map(PipelinedProject::from) - .collect::>(); - - let table_name = table_name.into(); - - if returns_view_table && num_private_cols > 0 { - let optimized = optimized - .into_iter() - .map(|plan| ViewProject::new(plan, num_cols, num_private_cols)) - .collect::>(); - // Execute the union and return the results - return execute_plan_for_view::(&optimized, &DeltaTx::from(&*tx), &rlb_pool) - .map(|(rows, _, metrics)| (ws_v1::OneOffTable { table_name, rows }, metrics)) - .context("One-off queries are not allowed to modify the database"); - } - - // Execute the union and return the results - execute_plan::(&optimized, &DeltaTx::from(&*tx), &rlb_pool) - .map(|(rows, _, metrics)| (ws_v1::OneOffTable { table_name, rows }, metrics)) - .context("One-off queries are not allowed to modify the database") - })(); - - let total_host_execution_duration = timer.elapsed().into(); - let (message, metrics): (SerializableMessage, Option) = match result { - Ok((rows, metrics)) => ( - into_message(OneOffQueryResponseMessage { - message_id, - error: None, - results: vec![rows], - total_host_execution_duration, - }), - Some(metrics), - ), - Err(err) => ( - into_message(OneOffQueryResponseMessage { - message_id, - error: Some(format!("{err}")), - results: vec![], - total_host_execution_duration, - }), - None, - ), - }; - - subscriptions.send_client_message(client, message, (&*tx, tx_offset_receiver))?; - Ok::, anyhow::Error>(metrics) + Self::one_off_query_inner( + db, + subscriptions, + auth, + query, + client, + message_id, + timer, + rlb_pool, + into_message, + ) }) .await??; @@ -2336,6 +2808,47 @@ impl ModuleHost { Ok(()) } + pub async fn one_off_query_ws( + &self, + auth: AuthCtx, + query: String, + client: Arc, + message_id: Vec, + timer: Instant, + rlb_pool: impl 'static + Send + RowListBuilderSource, + into_message: impl FnOnce(OneOffQueryResponseMessage) -> SerializableMessage + Send + 'static, + ) -> Result<(), anyhow::Error> { + match &*self.inner { + ModuleHostInner::Js(_) => { + self.one_off_query(auth, query, client, message_id, timer, rlb_pool, into_message) + .await + } + ModuleHostInner::Wasm(_) => { + let db = self.relational_db().clone(); + let subscriptions = self.replica_ctx().subscriptions.clone(); + log::debug!("One-off query: {query}"); + match self.enqueue_wasm_job("module-thread operation", "one_off_query", async move || { + if let Err(err) = Self::one_off_query_inner( + db, + subscriptions, + auth, + query, + client, + message_id, + timer, + rlb_pool, + into_message, + ) { + log::warn!("One-off query failed: {err:#}"); + } + }) { + Ok(()) => Ok(()), + Err(err) => Err(err.into()), + } + } + } + } + /// Execute a one-off query for v2 clients and send the results to the given client. /// /// This only returns an error if there is a db-level problem. @@ -2369,6 +2882,79 @@ impl ModuleHost { Ok(()) } + pub async fn one_off_query_v2_ws( + &self, + auth: AuthCtx, + query: String, + client: Arc, + request_id: u32, + timer: Instant, + rlb_pool: impl 'static + Send + RowListBuilderSource, + ) -> Result<(), anyhow::Error> { + match &*self.inner { + ModuleHostInner::Js(_) => { + self.one_off_query_v2(auth, query, client, request_id, timer, rlb_pool) + .await + } + ModuleHostInner::Wasm(_) => { + let db = self.relational_db().clone(); + let subscriptions = self.replica_ctx().subscriptions.clone(); + log::debug!("One-off query: {query}"); + match self.enqueue_wasm_job("module-thread operation", "one_off_query_v2", async move || { + if let Err(err) = + Self::one_off_query_v2_inner(db, subscriptions, auth, query, client, request_id, rlb_pool) + { + log::warn!("One-off query failed: {err:#}"); + } + }) { + Ok(()) => Ok(()), + Err(err) => Err(err.into()), + } + } + } + } + + fn one_off_query_inner( + db: Arc, + subscriptions: ModuleSubscriptions, + auth: AuthCtx, + query: String, + client: Arc, + message_id: Vec, + timer: Instant, + rlb_pool: impl 'static + Send + RowListBuilderSource, + into_message: impl FnOnce(OneOffQueryResponseMessage) -> SerializableMessage, + ) -> Result, anyhow::Error> { + Self::send_one_off_query_result( + db, + subscriptions, + auth, + query, + client, + rlb_pool, + |table_name, rows| ws_v1::OneOffTable { table_name, rows }, + move |subscriptions, client, result, tx_offset, tx| { + let total_host_execution_duration = timer.elapsed().into(); + let message = match result { + Ok(rows) => into_message(OneOffQueryResponseMessage { + message_id, + error: None, + results: vec![rows], + total_host_execution_duration, + }), + Err(err) => into_message(OneOffQueryResponseMessage { + message_id, + error: Some(err), + results: vec![], + total_host_execution_duration, + }), + }; + + subscriptions.send_client_message(client, message, (tx, tx_offset)) + }, + ) + } + fn one_off_query_v2_inner( db: Arc, subscriptions: ModuleSubscriptions, @@ -2378,6 +2964,57 @@ impl ModuleHost { request_id: u32, rlb_pool: impl 'static + Send + RowListBuilderSource, ) -> Result, anyhow::Error> { + Self::send_one_off_query_result( + db, + subscriptions, + auth, + query, + client, + rlb_pool, + |table_name, rows| ws_v2::SingleTableRows { + table: table_name, + rows, + }, + move |subscriptions, client, result, tx_offset, _tx| { + let message = match result { + Ok(rows) => ws_v2::OneOffQueryResult { + request_id, + result: Ok(ws_v2::QueryRows { + tables: vec![rows].into_boxed_slice(), + }), + }, + Err(err) => ws_v2::OneOffQueryResult { + request_id, + result: Err(err.into()), + }, + }; + + subscriptions.send_one_off_query_message_v2(client, message, tx_offset) + }, + ) + } + + fn send_one_off_query_result( + db: Arc, + subscriptions: ModuleSubscriptions, + auth: AuthCtx, + query: String, + client: Arc, + rlb_pool: impl 'static + Send + RowListBuilderSource, + wrap_rows: WrapRows, + send_result: SendResult, + ) -> Result, anyhow::Error> + where + F: BuildableWebsocketFormat, + WrapRows: Fn(RawIdentifier, F::List) -> T, + SendResult: FnOnce( + ModuleSubscriptions, + Arc, + Result, + TransactionOffset, + &TxId, + ) -> Result<(), BroadcastError>, + { let (tx_offset_sender, tx_offset_receiver) = oneshot::channel(); let tx = scopeguard::guard(db.begin_tx(Workload::Sql), |tx| { let (tx_offset, tx_metrics, reducer) = db.release_tx(tx); @@ -2385,82 +3022,76 @@ impl ModuleHost { db.report_read_tx_metrics(reducer, tx_metrics); }); - let result: Result<(ws_v2::SingleTableRows, ExecutionMetrics), anyhow::Error> = (|| { - let tx = SchemaViewer::new(&*tx, &auth); + let (result, metrics) = match Self::execute_one_off_query(&db, &tx, &auth, &query, &rlb_pool, wrap_rows) { + Ok((rows, metrics)) => (Ok(rows), Some(metrics)), + Err(err) => (Err(err.to_string()), None), + }; - let (plans, _, table_name, _) = compile_subscription(&query, &tx, &auth)?; + send_result(subscriptions, client, result, tx_offset_receiver, &tx)?; + Ok(metrics) + } - let optimized = plans - .into_iter() - .map(|plan| plan.optimize(&auth)) - .collect::, _>>()?; - - check_row_limit(&optimized, &db, &tx, |plan, tx| estimate_rows_scanned(tx, plan), &auth)?; - - let return_table = || optimized.first().and_then(|plan| plan.return_table()); - - let returns_view_table = optimized.first().is_some_and(|plan| plan.returns_view_table()); - let num_cols = return_table().map(|schema| schema.num_cols()).unwrap_or_default(); - let num_private_cols = return_table() - .map(|schema| schema.num_private_cols()) - .unwrap_or_default(); - - let optimized = optimized.into_iter().map(PipelinedProject::from).collect::>(); - - let table_name = table_name.into(); - - if returns_view_table && num_private_cols > 0 { - let optimized = optimized - .into_iter() - .map(|plan| ViewProject::new(plan, num_cols, num_private_cols)) - .collect::>(); - return execute_plan_for_view::(&optimized, &DeltaTx::from(&*tx), &rlb_pool) - .map(|(rows, _, metrics)| { - ( - ws_v2::SingleTableRows { - table: table_name, - rows, - }, - metrics, - ) - }) - .context("One-off queries are not allowed to modify the database"); - } + fn execute_one_off_query( + db: &Arc, + tx: &TxId, + auth: &AuthCtx, + query: &str, + rlb_pool: &impl RowListBuilderSource, + wrap_rows: WrapRows, + ) -> Result<(T, ExecutionMetrics), anyhow::Error> + where + F: BuildableWebsocketFormat, + WrapRows: Fn(RawIdentifier, F::List) -> T, + { + let schema_viewer = SchemaViewer::new(tx, auth); + + let ( + // A query may compile down to several plans. + // This happens when there are multiple RLS rules per table. + // The original query is the union of these plans. + plans, + _, + table_name, + _, + ) = compile_subscription(query, &schema_viewer, auth)?; + + let optimized = plans + .into_iter() + .map(|plan| plan.optimize(auth)) + .collect::, _>>()?; + + check_row_limit( + &optimized, + db, + &schema_viewer, + |plan, tx| estimate_rows_scanned(tx, plan), + auth, + )?; - execute_plan::(&optimized, &DeltaTx::from(&*tx), &rlb_pool) - .map(|(rows, _, metrics)| { - ( - ws_v2::SingleTableRows { - table: table_name, - rows, - }, - metrics, - ) - }) - .context("One-off queries are not allowed to modify the database") - })(); + let return_table = || optimized.first().and_then(|plan| plan.return_table()); - let (message, metrics) = match result { - Ok((rows, metrics)) => ( - ws_v2::OneOffQueryResult { - request_id, - result: Ok(ws_v2::QueryRows { - tables: vec![rows].into_boxed_slice(), - }), - }, - Some(metrics), - ), - Err(err) => ( - ws_v2::OneOffQueryResult { - request_id, - result: Err(err.to_string().into()), - }, - None, - ), - }; + let returns_view_table = optimized.first().is_some_and(|plan| plan.returns_view_table()); + let num_cols = return_table().map(|schema| schema.num_cols()).unwrap_or_default(); + let num_private_cols = return_table() + .map(|schema| schema.num_private_cols()) + .unwrap_or_default(); - subscriptions.send_one_off_query_message_v2(client, message, tx_offset_receiver)?; - Ok(metrics) + let optimized = optimized.into_iter().map(PipelinedProject::from).collect::>(); + let table_name = table_name.into(); + + if returns_view_table && num_private_cols > 0 { + let optimized = optimized + .into_iter() + .map(|plan| ViewProject::new(plan, num_cols, num_private_cols)) + .collect::>(); + return execute_plan_for_view::(&optimized, &DeltaTx::from(tx), rlb_pool) + .map(|(rows, _, metrics)| (wrap_rows(table_name, rows), metrics)) + .context("One-off queries are not allowed to modify the database"); + } + + execute_plan::(&optimized, &DeltaTx::from(tx), rlb_pool) + .map(|(rows, _, metrics)| (wrap_rows(table_name, rows), metrics)) + .context("One-off queries are not allowed to modify the database") } /// FIXME(jgilles): this is a temporary workaround for deleting not currently being supported @@ -2510,14 +3141,14 @@ impl ModuleHost { pub(crate) fn replica_ctx(&self) -> &ReplicaContext { match &*self.inner { - ModuleHostInner::Wasm(wasm) => wasm.instance_manager.module.replica_ctx(), + ModuleHostInner::Wasm(wasm) => wasm.main_instance.module.replica_ctx(), ModuleHostInner::Js(js) => js.module.replica_ctx(), } } fn scheduler(&self) -> &Scheduler { match &*self.inner { - ModuleHostInner::Wasm(wasm) => wasm.instance_manager.module.scheduler(), + ModuleHostInner::Wasm(wasm) => wasm.main_instance.module.scheduler(), ModuleHostInner::Js(js) => js.module.scheduler(), } } diff --git a/crates/core/src/host/scheduler.rs b/crates/core/src/host/scheduler.rs index d3b285e9f16..105ef9fc4fb 100644 --- a/crates/core/src/host/scheduler.rs +++ b/crates/core/src/host/scheduler.rs @@ -13,7 +13,7 @@ use rustc_hash::FxHashMap; use spacetimedb_client_api_messages::energy::EnergyQuanta; use spacetimedb_datastore::execution_context::{ExecutionContext, ReducerContext, Workload}; use spacetimedb_datastore::locking_tx_datastore::MutTxId; -use spacetimedb_datastore::system_tables::{StFields, StScheduledFields, ST_SCHEDULED_ID}; +use spacetimedb_datastore::system_tables::{StScheduledFields, ST_SCHEDULED_ID}; use spacetimedb_datastore::traits::IsolationLevel; use spacetimedb_lib::scheduler::ScheduleAt; use spacetimedb_lib::Timestamp; @@ -53,7 +53,8 @@ enum MsgOrExit { enum SchedulerMessage { Schedule { id: ScheduledFunctionId, - /// The timestamp we'll tell the reducer it is. + function_name: Box, + /// The timestamp we'll tell the scheduled function it is. effective_at: Timestamp, /// The actual instant we're scheduling for. real_at: Instant, @@ -64,11 +65,6 @@ enum SchedulerMessage { }, } -pub struct ScheduledFunction { - function: Box, - bsatn_args: Vec, -} - #[derive(Clone)] pub struct Scheduler { tx: mpsc::UnboundedSender>, @@ -107,6 +103,7 @@ impl SchedulerStarter { // Find all Scheduled tables for st_scheduled_row in self.db.iter(&tx, ST_SCHEDULED_ID)? { let table_id = st_scheduled_row.read_col(StScheduledFields::TableId)?; + let function_name = st_scheduled_row.read_col::>(StScheduledFields::ReducerName)?; let (id_column, at_column) = self .db .table_scheduled_id_and_at(&tx, table_id)? @@ -118,7 +115,7 @@ impl SchedulerStarter { // Insert each entry (row) in the scheduled table into `queue`. for scheduled_row in self.db.iter(&tx, table_id)? { let (schedule_id, schedule_at) = get_schedule_from_row(&scheduled_row, id_column, at_column)?; - // calculate duration left to call the scheduled reducer + // Calculate the remaining delay before calling the scheduled function. let duration = schedule_at.to_duration_from(now_ts); let at = schedule_at.to_timestamp_from(now_ts); let id = ScheduledFunctionId { @@ -127,7 +124,8 @@ impl SchedulerStarter { id_column, at_column, }; - let key = queue.insert_at(QueueItem::Id { id, at }, now_instant + duration); + let function_name = function_name.clone(); + let key = queue.insert_at(QueueItem::Id { id, function_name, at }, now_instant + duration); // This should never happen as duplicate entries should be gated by unique // constraint violation in scheduled tables. @@ -155,7 +153,7 @@ impl SchedulerStarter { } } -/// The maximum `Duration` into the future that we can schedule a reducer. +/// The maximum `Duration` into the future that we can schedule a function. /// /// `tokio_utils::time::DelayQueue`, as of version 0.7.8, /// limits its scheduling to at most approx. 2 years into the future. @@ -198,10 +196,12 @@ impl Scheduler { /// Schedule a reducer/procedure to run from a scheduled table. /// /// `fn_start` is the timestamp of the start of the current reducer/procedure. + #[allow(clippy::too_many_arguments)] pub(super) fn schedule( &self, table_id: TableId, schedule_id: u64, + function_name: Box, schedule_at: ScheduleAt, id_column: ColId, at_column: ColId, @@ -238,6 +238,7 @@ impl Scheduler { id_column, at_column, }, + function_name, effective_at, real_at, })); @@ -270,13 +271,35 @@ struct SchedulerActor { #[derive(Clone)] enum QueueItem { - Id { id: ScheduledFunctionId, at: Timestamp }, - VolatileNonatomicImmediate { function_name: String, args: FunctionArgs }, + Id { + id: ScheduledFunctionId, + // Carry the function name on the queued item so dispatch can classify + // reducer vs procedure without re-reading `st_scheduled`. + function_name: Box, + at: Timestamp, + }, + VolatileNonatomicImmediate { + function_name: String, + args: FunctionArgs, + }, } #[derive(Clone)] pub(crate) struct ScheduledFunctionParams(QueueItem); +impl ScheduledFunctionParams { + fn function_name(&self) -> &str { + match &self.0 { + QueueItem::Id { function_name, .. } => function_name.as_ref(), + QueueItem::VolatileNonatomicImmediate { function_name, .. } => function_name.as_str(), + } + } + + pub(crate) fn is_procedure(&self, module: &ModuleInfo) -> bool { + module.module_def.procedure_full(self.function_name()).is_some() + } +} + #[derive(thiserror::Error, Debug)] pub(crate) enum CallScheduledFunctionError { #[error(transparent)] @@ -307,6 +330,7 @@ impl SchedulerActor { match msg { SchedulerMessage::Schedule { id, + function_name, effective_at, real_at, } => { @@ -314,7 +338,14 @@ impl SchedulerActor { if let Some(key) = self.key_map.get(&id) { self.queue.remove(key); } - let key = self.queue.insert_at(QueueItem::Id { id, at: effective_at }, real_at); + let key = self.queue.insert_at( + QueueItem::Id { + id, + function_name, + at: effective_at, + }, + real_at, + ); self.key_map.insert(id, key); } SchedulerMessage::ScheduleImmediate { function_name, args } => { @@ -340,7 +371,12 @@ impl SchedulerActor { return; }; - let result = module_host.call_scheduled_function(ScheduledFunctionParams(item)).await; + let params = ScheduledFunctionParams(item.clone()); + let result = if params.is_procedure(module_host.info()) { + module_host.call_scheduled_procedure(params).await + } else { + module_host.call_scheduled_reducer(params).await + }; match result { // If the module already exited, leave the `ScheduledFunction` in @@ -352,9 +388,16 @@ impl SchedulerActor { Ok(CallScheduledFunctionResult { reschedule: Some(Reschedule { at_ts, at_real }), }) => { - if let Some(id) = id { + if let QueueItem::Id { id, function_name, .. } = item { // If this was repeated, we need to add it back to the queue. - let key = self.queue.insert_at(QueueItem::Id { id, at: at_ts }, at_real); + let key = self.queue.insert_at( + QueueItem::Id { + id, + function_name, + at: at_ts, + }, + at_real, + ); self.key_map.insert(id, key); } } @@ -606,15 +649,13 @@ fn call_params_for_queued_item( item: QueueItem, ) -> anyhow::Result> { Ok(Some(match item { - QueueItem::Id { id, at } => { + QueueItem::Id { id, function_name, at } => { let Some(schedule_row) = get_schedule_row_mut(tx, db, id)? else { // If the row is not found, it means the schedule is cancelled by the user. return Ok(None); }; - let ScheduledFunction { function, bsatn_args } = process_schedule(tx, db, id.table_id, &schedule_row)?; - - let fun_args = FunctionArgs::Bsatn(bsatn_args.into()); - function_to_call_params(module, &function, fun_args, Some(at))? + let fun_args = FunctionArgs::Bsatn(schedule_row.to_bsatn_vec()?.into()); + function_to_call_params(module, function_name.as_ref(), fun_args, Some(at))? } QueueItem::VolatileNonatomicImmediate { function_name, args } => { function_to_call_params(module, &function_name, args, None)? @@ -667,28 +708,6 @@ fn function_to_call_params( Ok((ts, instant, params)) } -/// Generate [`ScheduledFunction`] for given [`ScheduledFunctionId`]. -fn process_schedule( - tx: &MutTxId, - db: &RelationalDB, - table_id: TableId, - schedule_row: &RowRef<'_>, -) -> Result { - // Get reducer name from `ST_SCHEDULED` table. - let table_id_col = StScheduledFields::TableId.col_id(); - let function_name_col = StScheduledFields::ReducerName.col_id(); - let st_scheduled_row = db - .iter_by_col_eq_mut(tx, ST_SCHEDULED_ID, table_id_col, &table_id.into())? - .next() - .ok_or_else(|| anyhow!("Scheduled table with id {table_id} entry does not exist in `st_scheduled`"))?; - let function = st_scheduled_row.read_col::>(function_name_col)?; - - Ok(ScheduledFunction { - function, - bsatn_args: schedule_row.to_bsatn_vec()?, - }) -} - /// Returns the `schedule_row` for `id`. fn get_schedule_row_mut<'a>( tx: &'a MutTxId, diff --git a/crates/core/src/host/v8/mod.rs b/crates/core/src/host/v8/mod.rs index 287d1eedf6b..92a23746b78 100644 --- a/crates/core/src/host/v8/mod.rs +++ b/crates/core/src/host/v8/mod.rs @@ -1258,6 +1258,22 @@ impl JsInstanceLane { .await .map_err(|_| ViewCallError::InternalError(instance_lane_worker_error("call_view"))) } + + /// Run a scheduled function on the instance lane exactly once. + /// + /// If the worker disappears before replying, we replace it for future + /// requests and panic so the host discards the broken module generation. + pub(in crate::host) async fn call_scheduled_function( + &self, + params: ScheduledFunctionParams, + ) -> CallScheduledFunctionResult { + self.run_once("call_scheduled_function", |inst: JsInstance| async move { + inst.send_request(|reply_tx| JsWorkerRequest::CallScheduledFunction { reply_tx, params }) + .await + }) + .await + .unwrap_or_else(|_| panic!("{}", instance_lane_worker_error("call_scheduled_function"))) + } } /// Performs some of the startup work of [`spawn_instance_worker`]. diff --git a/crates/core/src/host/wasm_common/module_host_actor.rs b/crates/core/src/host/wasm_common/module_host_actor.rs index 3b709fc433e..501976cad1b 100644 --- a/crates/core/src/host/wasm_common/module_host_actor.rs +++ b/crates/core/src/host/wasm_common/module_host_actor.rs @@ -1056,6 +1056,21 @@ impl InstanceCommon { Err(err) => (ViewCommandResult::Subscription { result: Err(err) }, false), } } + ViewCommand::RemoveSingleSubscription { + sender, + auth, + request, + timer, + } => { + let res = info + .subscriptions + .remove_single_subscription(sender, auth, request, timer); + + match res { + Ok(metrics) => (ViewCommandResult::Subscription { result: Ok(metrics) }, false), + Err(err) => (ViewCommandResult::Subscription { result: Err(err) }, false), + } + } ViewCommand::AddLegacySubscription { sender, auth, @@ -1121,6 +1136,21 @@ impl InstanceCommon { Err(err) => (ViewCommandResult::Subscription { result: Err(err) }, false), } } + ViewCommand::RemoveMultiSubscription { + sender, + auth, + request, + timer, + } => { + let res = info + .subscriptions + .remove_multi_subscription(sender, auth, request, timer); + + match res { + Ok(metrics) => (ViewCommandResult::Subscription { result: Ok(metrics) }, false), + Err(err) => (ViewCommandResult::Subscription { result: Err(err) }, false), + } + } ViewCommand::Sql { db, sql_text, diff --git a/crates/core/src/util/jobs.rs b/crates/core/src/util/jobs.rs index 6f4f6b8bf73..403d7c560f7 100644 --- a/crates/core/src/util/jobs.rs +++ b/crates/core/src/util/jobs.rs @@ -337,6 +337,27 @@ impl SingleCoreExecutor { Self::spawn(AllocatedJobCore::default()) } + /// Enqueue a job on this database executor and return immediately without + /// waiting for the job to finish. + pub fn enqueue_job(&self, f: F) + where + F: AsyncFnOnce() + Send + 'static, + { + let span = tracing::Span::current(); + + self.inner + .job_tx + .send(Box::new(move || { + async move { + if AssertUnwindSafe(f().instrument(span)).catch_unwind().await.is_err() { + tracing::warn!("uncaught panic on `SingleCoreExecutor`") + } + } + .boxed_local() + })) + .unwrap_or_else(|_| panic!("job thread exited")); + } + /// Run a job for this database executor. pub async fn run_job(&self, f: F) -> R where diff --git a/crates/testing/src/modules.rs b/crates/testing/src/modules.rs index 318fff8d0cf..caf6ac1feb1 100644 --- a/crates/testing/src/modules.rs +++ b/crates/testing/src/modules.rs @@ -22,7 +22,6 @@ use spacetimedb::client::{ClientActorId, ClientConfig, ClientConnection, DataMes use spacetimedb::db::{Config, Storage}; use spacetimedb::host::FunctionArgs; use spacetimedb_client_api::{ControlStateReadAccess, ControlStateWriteAccess, DatabaseDef, NodeDelegate}; -use spacetimedb_client_api_messages::websocket::v1 as ws_v1; use spacetimedb_lib::{bsatn, sats}; pub use spacetimedb::database_logger::LogLevel; @@ -59,7 +58,16 @@ impl ModuleHandle { async fn call_reducer(&self, reducer: &str, args: FunctionArgs) -> anyhow::Result<()> { let result = self .client - .call_reducer(reducer, args, 0, Instant::now(), ws_v1::CallReducerFlags::FullUpdate) + .module() + .call_reducer( + self.client.id.identity, + Some(self.client.id.connection_id), + Some(self.client.sender()), + Some(0), + Some(Instant::now()), + reducer, + args, + ) .await; let result = match result { Ok(result) => result.into(),