diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index 331aff0ca06..36cdc201f79 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -47,7 +47,7 @@ use spacetimedb_data_structures::error_stream::ErrorStream; use spacetimedb_data_structures::map::{HashCollectionExt as _, HashSet}; use spacetimedb_datastore::error::DatastoreError; use spacetimedb_datastore::execution_context::{Workload, WorkloadType}; -use spacetimedb_datastore::locking_tx_datastore::{MutTxId, ViewCallInfo}; +use spacetimedb_datastore::locking_tx_datastore::{MutTxId, ViewCallInfo, ViewInstanceArgs}; use spacetimedb_datastore::traits::{IsolationLevel, Program, TxData}; pub use spacetimedb_durability::{DurabilityExited, DurableOffset}; use spacetimedb_engine::sql::rls::RowLevelExpr; @@ -60,7 +60,7 @@ use spacetimedb_lib::http::{Request as HttpRequest, Response as HttpResponse}; use spacetimedb_lib::identity::{AuthCtx, RequestId}; use spacetimedb_lib::metrics::ExecutionMetrics; use spacetimedb_lib::{bsatn, ConnectionId, TimeDuration, Timestamp}; -use spacetimedb_primitives::{ArgId, HttpHandlerId, ProcedureId, TableId, ViewFnPtr, ViewId}; +use spacetimedb_primitives::{HttpHandlerId, ProcedureId, TableId, ViewFnPtr, ViewId}; use spacetimedb_query::compile_subscription; use spacetimedb_sats::raw_identifier::RawIdentifier; use spacetimedb_sats::{AlgebraicType, AlgebraicTypeRef, ProductValue}; @@ -1103,17 +1103,6 @@ pub(crate) fn resolve_view_for_refresh<'a>( ) })?; - let is_anonymous = view_call.sender.is_none(); - - if st_view.is_anonymous != is_anonymous { - return Err(anyhow::anyhow!( - "found is_anonymous={} in st_view, but {} in readset when updating view `{}`", - st_view.is_anonymous, - is_anonymous, - view_name, - )); - } - let is_anonymous = view_def.is_anonymous; if st_view.is_anonymous != is_anonymous { @@ -2849,11 +2838,10 @@ impl ModuleHost { } /// Materializes the views return by the `view_collector`, if not already materialized, - /// and updates `st_view_sub` accordingly. + /// and updates view lifecycle state accordingly. /// - /// Passing [`Workload::Sql`] will update `st_view_sub.last_called`. - /// Passing [`Workload::Subscribe`] will also increment `st_view_sub.num_subscribers`, - /// in addition to updating `st_view_sub.last_called`. + /// Passing [`Workload::Sql`] will update the instance's last-used timestamp. + /// Passing [`Workload::Subscribe`] will also increment the subscriber's refcount. pub fn materialize_views( mut tx: MutTxId, instance: &mut RefInstance<'_, I>, @@ -2870,12 +2858,14 @@ impl ModuleHost { let view_id = st_view_row.view_id; let table_id = st_view_row.table_id.ok_or(ViewCallError::TableDoesNotExist(view_id))?; let is_anonymous = st_view_row.is_anonymous; - let sender = if is_anonymous { None } else { Some(caller) }; - let is_materialized = if is_anonymous { - tx.is_anonymous_view_materialized(view_id)? + let args = if is_anonymous { + ViewInstanceArgs::Anonymous } else { - tx.is_view_materialized(view_id, ArgId::SENTINEL, caller)? + ViewInstanceArgs::Sender(caller) }; + let view_call = ViewCallInfo::from_args(view_id, args); + let sender = args.sender(); + let is_materialized = tx.is_view_materialized(&view_call)?; if !is_materialized { let (res, trapped) = Self::call_view(instance, tx, &view_name, view_id, table_id, Nullary, caller, sender)?; @@ -2886,11 +2876,11 @@ impl ModuleHost { } // If this is a sql call, we only update this view's "last called" timestamp if let Workload::Sql = workload { - tx.update_view_timestamp(view_id, ArgId::SENTINEL, caller)?; + tx.update_view_timestamp(view_call.clone(), args)?; } // If this is a subscribe call, we also increment this view's subscriber count if let Workload::Subscribe = workload { - tx.subscribe_view(view_id, ArgId::SENTINEL, caller)?; + tx.subscribe_view(view_call, args, caller)?; } } Ok((tx, false)) @@ -2926,7 +2916,6 @@ impl ModuleHost { let mut abi_duration = Duration::ZERO; let mut trapped = false; for view_call in tx.views_for_refresh().cloned().collect::>() { - let sender = view_call.sender; let resolved = match resolve_view_for_refresh(&tx, module_def, &view_call) { Ok(resolved) => resolved, Err(err) => { @@ -2934,6 +2923,16 @@ impl ModuleHost { break; } }; + let sender = match tx.view_instance_args(&view_call) { + Some(args) => args.sender(), + None => { + outcome = ViewOutcome::Failed(format!( + "failed to look up materialized view args for view {}", + view_call.view_id + )); + break; + } + }; let ResolvedViewForRefresh { view_id, table_id, diff --git a/crates/core/src/host/v8/syscall/common.rs b/crates/core/src/host/v8/syscall/common.rs index a8afb679a26..e6e287c545b 100644 --- a/crates/core/src/host/v8/syscall/common.rs +++ b/crates/core/src/host/v8/syscall/common.rs @@ -763,10 +763,22 @@ fn refresh_views( let view_def = resolved.view_def; let view_name = &view_def.name; let fn_ptr = view_def.fn_ptr; + let sender = tx + .as_ref() + .expect("procedure tx missing while looking up refreshed view args") + .view_instance_args(&view_call) + .ok_or_else(|| { + TypeError(format!( + "failed to look up materialized view args for view {}", + view_call.view_id + )) + .throw(scope) + })? + .sender(); let current_tx = tx.take().expect("procedure tx missing during view refresh"); let (next_tx, call_result) = tx_slot.set(current_tx, || { - call_view(scope, hooks, &view_call, view_name, table_id, fn_ptr) + call_view(scope, hooks, &view_call, view_name, table_id, fn_ptr, sender) }); tx = Some(next_tx); let return_data = call_result?; @@ -851,6 +863,7 @@ fn call_view( view_name: &Identifier, table_id: TableId, fn_ptr: ViewFnPtr, + sender: Option, ) -> SysCallResult { let prev_func_type = get_env(scope)? .instance_env @@ -858,7 +871,7 @@ fn call_view( let result = { let args = crate::host::ArgsTuple::nullary(); - match view_call.sender { + match sender { Some(sender) => call_call_view( scope, hooks, diff --git a/crates/core/src/host/wasm_common/module_host_actor.rs b/crates/core/src/host/wasm_common/module_host_actor.rs index a800a3654b5..4bb2d2eedad 100644 --- a/crates/core/src/host/wasm_common/module_host_actor.rs +++ b/crates/core/src/host/wasm_common/module_host_actor.rs @@ -748,7 +748,7 @@ impl InstanceCommon { } } - /// Re-evaluates all views which have entries in `st_view_subs`. + /// Re-evaluates all materialized view instances tracked in view lifecycle state. fn evaluate_subscribed_views( &mut self, tx: MutTxId, @@ -1346,7 +1346,10 @@ impl InstanceCommon { (Ok(raw), sender) => { // This is wrapped in a closure to simplify error handling. let outcome: Result = (|| { - let view_call = ViewCallInfo { view_id, sender }; + let view_call = match sender { + Some(sender) => ViewCallInfo::sender(view_id, sender), + None => ViewCallInfo::anonymous(view_id), + }; let result = ViewResult::from_return_data(raw).context("Error parsing view result")?; let typespace = self.info.module_def.typespace(); let row_product_type = typespace @@ -1496,10 +1499,10 @@ fn collect_subscribed_view_calls( let table_id = st_view .table_id .ok_or_else(|| anyhow::anyhow!("view {} does not have a backing table in database", &view_name))?; - let subs = tx.lookup_st_view_subs(view_id)?; + let view_instances = tx.materialized_view_instances_for_view(view_id); if *is_anonymous { - if subs.is_empty() { + if view_instances.is_empty() { continue; } view_calls.push(CallViewParams { @@ -1516,14 +1519,17 @@ fn collect_subscribed_view_calls( continue; } - for sub in subs { + for args in view_instances { + let Some(sender) = args.sender() else { + continue; + }; view_calls.push(CallViewParams { view_name: view_name.clone(), view_id, table_id, fn_ptr: *fn_ptr, caller: owner_identity, - sender: Some(sub.identity.into()), + sender: Some(sender), args: ArgsTuple::nullary(), row_type: *product_type_ref, timestamp: Timestamp::now(), @@ -1770,10 +1776,7 @@ impl InstanceOp for ViewOp<'_> { } fn call_type(&self) -> FuncCallType { - FuncCallType::View(ViewCallInfo { - view_id: self.view_id, - sender: Some(*self.sender), - }) + FuncCallType::View(ViewCallInfo::sender(self.view_id, *self.sender)) } } @@ -1798,10 +1801,7 @@ impl InstanceOp for AnonymousViewOp<'_> { } fn call_type(&self) -> FuncCallType { - FuncCallType::View(ViewCallInfo { - view_id: self.view_id, - sender: None, - }) + FuncCallType::View(ViewCallInfo::anonymous(self.view_id)) } } @@ -1899,9 +1899,9 @@ impl InstanceOp for HttpHandlerOp { mod tests { use super::collect_subscribed_view_calls; use crate::db::relational_db::tests_utils::{begin_mut_tx, TestDB}; + use spacetimedb_datastore::locking_tx_datastore::{ViewCallInfo, ViewInstanceArgs}; use spacetimedb_lib::db::raw_def::v9::RawModuleDefV9Builder; use spacetimedb_lib::{AlgebraicType, Identity, ProductType}; - use spacetimedb_primitives::ArgId; use spacetimedb_sats::raw_identifier::RawIdentifier; use spacetimedb_schema::def::ModuleDef; @@ -1939,8 +1939,9 @@ mod tests { let mut tx = begin_mut_tx(&stdb); let (view_id, _table_id) = stdb.create_view(&mut tx, &module_def, view_def)?; - tx.subscribe_view(view_id, ArgId::SENTINEL, Identity::ZERO)?; - tx.subscribe_view(view_id, ArgId::SENTINEL, Identity::ONE)?; + let view_call = ViewCallInfo::anonymous(view_id); + tx.subscribe_view(view_call.clone(), ViewInstanceArgs::Anonymous, Identity::ZERO)?; + tx.subscribe_view(view_call, ViewInstanceArgs::Anonymous, Identity::ONE)?; // Two subscriber rows exist, but anonymous views should still be reevaluated once // because they share a single materialization. @@ -1968,8 +1969,10 @@ mod tests { let mut tx = begin_mut_tx(&stdb); let (view_id, _table_id) = stdb.create_view(&mut tx, &module_def, view_def)?; - tx.subscribe_view(view_id, ArgId::SENTINEL, Identity::ZERO)?; - tx.subscribe_view(view_id, ArgId::SENTINEL, Identity::ONE)?; + let zero_args = ViewInstanceArgs::Sender(Identity::ZERO); + let one_args = ViewInstanceArgs::Sender(Identity::ONE); + tx.subscribe_view(ViewCallInfo::from_args(view_id, zero_args), zero_args, Identity::ZERO)?; + tx.subscribe_view(ViewCallInfo::from_args(view_id, one_args), one_args, Identity::ONE)?; // Sender-backed views keep one materialization per sender, so reevaluation must // preserve both callers. diff --git a/crates/core/src/host/wasmtime/wasm_instance_env.rs b/crates/core/src/host/wasmtime/wasm_instance_env.rs index 3158b852158..e429be2056e 100644 --- a/crates/core/src/host/wasmtime/wasm_instance_env.rs +++ b/crates/core/src/host/wasmtime/wasm_instance_env.rs @@ -1777,10 +1777,22 @@ impl WasmInstanceEnv { let view_def = resolved.view_def; let view_name = &view_def.name; let fn_ptr = view_def.fn_ptr; + let sender = tx + .as_ref() + .expect("procedure tx missing while looking up refreshed view args") + .view_instance_args(&view_call) + .ok_or_else(|| { + anyhow!( + "failed to look up materialized view args for view {}", + view_call.view_id + ) + })? + .sender(); let current_tx = tx.take().expect("procedure tx missing during view refresh"); - let (next_tx, call_result) = - tx_slot.set(current_tx, || Self::call_view(caller, &view_call, view_name, fn_ptr)); + let (next_tx, call_result) = tx_slot.set(current_tx, || { + Self::call_view(caller, &view_call, view_name, fn_ptr, sender) + }); tx = Some(next_tx); let return_data = call_result?; @@ -1836,6 +1848,7 @@ impl WasmInstanceEnv { view_call: &ViewCallInfo, view_name: &Identifier, fn_ptr: ViewFnPtr, + sender: Option, ) -> anyhow::Result { let prev_func_type = caller .data_mut() @@ -1863,7 +1876,7 @@ impl WasmInstanceEnv { call_view_anon, view_name, fn_ptr.0, - view_call.sender, + sender, args_source.0, result_sink, true, diff --git a/crates/core/src/subscription/module_subscription_actor.rs b/crates/core/src/subscription/module_subscription_actor.rs index d43cf96662d..21d20a2aefe 100644 --- a/crates/core/src/subscription/module_subscription_actor.rs +++ b/crates/core/src/subscription/module_subscription_actor.rs @@ -32,7 +32,7 @@ use spacetimedb_data_structures::map::{HashCollectionExt as _, HashMap, HashSet} use spacetimedb_datastore::db_metrics::DB_METRICS; use spacetimedb_datastore::execution_context::{Workload, WorkloadType}; use spacetimedb_datastore::locking_tx_datastore::datastore::TxMetrics; -use spacetimedb_datastore::locking_tx_datastore::{MutTxId, TxId}; +use spacetimedb_datastore::locking_tx_datastore::{MutTxId, TxId, ViewCallInfo}; use spacetimedb_datastore::traits::{IsolationLevel, TxData}; use spacetimedb_durability::TxOffset; use spacetimedb_execution::ExecutionParams; @@ -42,7 +42,6 @@ use spacetimedb_lib::metrics::ExecutionMetrics; use spacetimedb_lib::Identity; use spacetimedb_lib::{bsatn, identity::AuthCtx}; use spacetimedb_physical_plan::plan::ProjectPlan; -use spacetimedb_primitives::ArgId; use spacetimedb_schema::def::RawModuleDefVersion; use spacetimedb_table::static_assert_size; use std::{ @@ -1855,7 +1854,7 @@ impl ModuleSubscriptions { /// and subsequently downgrades to a read-only transaction. /// /// Unlike [`Self::materialize_views_and_downgrade_tx`] which populates the views' backing tables, - /// this method just decrements the subscriber count in `st_view_sub`. + /// this method just decrements the subscriber count in view lifecycle state. /// Views without any subscribers are cleaned up async. fn unsubscribe_views_and_downgrade_tx( &self, @@ -1869,7 +1868,7 @@ impl ModuleSubscriptions { Ok(self.guard_tx(tx, opts)) } - /// We unsubscribe from views by decrementing the subscriber count in `st_view_sub`. + /// We unsubscribe from views by decrementing the subscriber count in the view lifecycle state. /// Views without any subscribers are cleaned up async. fn _unsubscribe_views( tx: &mut MutTxId, @@ -1879,7 +1878,13 @@ impl ModuleSubscriptions { let mut view_ids = HashSet::new(); view_collector.collect_views(&mut view_ids); for view_id in view_ids { - tx.unsubscribe_view(view_id, ArgId::SENTINEL, sender)?; + let is_anonymous = tx.lookup_st_view(view_id)?.is_anonymous; + let view_call = if is_anonymous { + ViewCallInfo::anonymous(view_id) + } else { + ViewCallInfo::sender(view_id, sender) + }; + tx.unsubscribe_view(view_call, sender)?; } Ok(()) } diff --git a/crates/datastore/src/locking_tx_datastore/committed_state.rs b/crates/datastore/src/locking_tx_datastore/committed_state.rs index 797b265bac8..46e2e60131f 100644 --- a/crates/datastore/src/locking_tx_datastore/committed_state.rs +++ b/crates/datastore/src/locking_tx_datastore/committed_state.rs @@ -10,7 +10,11 @@ use crate::{ db_metrics::DB_METRICS, error::TableError, execution_context::ExecutionContext, - locking_tx_datastore::{mut_tx::ViewReadSets, state_view::ScanOrIndex, IterByColRangeTx}, + locking_tx_datastore::{ + mut_tx::{ViewInstanceState, ViewInstanceTxState, ViewReadSets}, + state_view::ScanOrIndex, + IterByColRangeTx, + }, system_tables::{ system_tables, StColumnRow, StConstraintRow, StIndexRow, StSequenceRow, StTableRow, SystemTable, ST_CLIENT_ID, ST_CLIENT_IDX, ST_COLUMN_ID, ST_COLUMN_IDX, ST_COLUMN_NAME, ST_CONSTRAINT_ID, ST_CONSTRAINT_IDX, @@ -32,10 +36,10 @@ use crate::{ }; use anyhow::anyhow; use core::{convert::Infallible, ops::RangeBounds}; -use spacetimedb_data_structures::map::{IntMap, IntSet}; +use spacetimedb_data_structures::map::{HashMap, HashSet, IntMap, IntSet}; use spacetimedb_durability::TxOffset; use spacetimedb_lib::{db::auth::StTableType, Identity}; -use spacetimedb_primitives::{ColList, IndexId, TableId, ViewId}; +use spacetimedb_primitives::{ColList, IndexId, TableId}; use spacetimedb_sats::memory_usage::MemoryUsage; use spacetimedb_sats::{AlgebraicValue, ProductValue}; use spacetimedb_schema::schema::TableSchema; @@ -83,6 +87,9 @@ pub struct CommittedState { /// and its read set will be updated accordingly. read_sets: ViewReadSets, + /// Ephemeral materialized view lifecycle state keyed by `(view_id, arg_hash)`. + view_instances: HashMap, + /// Tables which do not need to be made persistent. /// These include: /// - system tables: `st_view_sub`, `st_view_arg` @@ -121,6 +128,7 @@ impl MemoryUsage for CommittedState { page_pool: _, datastore_page_bytes, read_sets, + view_instances, ephemeral_tables, } = self; // NOTE(centril): We do not want to include the heap usage of `page_pool` as it's a shared resource. @@ -130,6 +138,7 @@ impl MemoryUsage for CommittedState { + index_id_map.heap_usage() + datastore_page_bytes.heap_usage() + read_sets.heap_usage() + + view_instances.heap_usage() + ephemeral_tables.heap_usage() } } @@ -198,6 +207,7 @@ impl CommittedState { blob_store: <_>::default(), index_id_map: <_>::default(), read_sets: <_>::default(), + view_instances: <_>::default(), page_pool, datastore_page_bytes: 0, ephemeral_tables: <_>::default(), @@ -506,11 +516,47 @@ impl CommittedState { tx_data.has_rows_or_connect_disconnect(ctx.reducer_context().map(|rcx| &rcx.name)) } - pub(super) fn drop_view_from_read_sets(&mut self, view_id: ViewId, sender: Option) { - self.read_sets.remove_view(view_id, sender) + pub(super) fn view_instance(&self, call: &ViewCallInfo) -> Option<&ViewInstanceState> { + self.view_instances.get(call) + } + + pub(super) fn active_view_calls_for_subscriber(&self, subscriber: Identity) -> HashSet { + self.view_instances + .iter() + .filter(|(_, state)| { + state + .active_subscribers + .get(&subscriber) + .is_some_and(|count| *count > 0) + }) + .map(|(call, _)| call.clone()) + .collect() } - pub(super) fn merge(&mut self, tx_state: TxState, read_sets: ViewReadSets, ctx: &ExecutionContext) -> TxData { + pub(super) fn view_instances(&self) -> impl Iterator { + self.view_instances.iter() + } + + fn merge_view_instances(&mut self, view_instances: ViewInstanceTxState) { + for (call, state) in view_instances.into_changes() { + match state { + Some(state) => { + self.view_instances.insert(call, state); + } + None => { + self.view_instances.remove(&call); + } + } + } + } + + pub(super) fn merge( + &mut self, + tx_state: TxState, + read_sets: ViewReadSets, + view_instances: ViewInstanceTxState, + ctx: &ExecutionContext, + ) -> TxData { let mut tx_data = TxData::default(); let mut truncates = IntSet::default(); @@ -539,6 +585,7 @@ impl CommittedState { // which implies `tx_data` already contains inserts and deletes for view tables // so that we can pass updated set of table ids. self.merge_read_sets(read_sets); + self.merge_view_instances(view_instances); // Store in `tx_data` which of the updated tables are ephemeral. // NOTE: This must be called before `tx_consumes_offset`, so that diff --git a/crates/datastore/src/locking_tx_datastore/datastore.rs b/crates/datastore/src/locking_tx_datastore/datastore.rs index d07714379f4..ffc662f3ce5 100644 --- a/crates/datastore/src/locking_tx_datastore/datastore.rs +++ b/crates/datastore/src/locking_tx_datastore/datastore.rs @@ -932,6 +932,7 @@ impl MutTx for Locking { tx_state: TxState::default(), lock_wait_time, read_sets: <_>::default(), + view_instances: <_>::default(), timer, ctx, metrics, @@ -967,6 +968,7 @@ impl Locking { tx_state: TxState::default(), lock_wait_time, read_sets: <_>::default(), + view_instances: <_>::default(), timer, ctx, metrics, diff --git a/crates/datastore/src/locking_tx_datastore/mod.rs b/crates/datastore/src/locking_tx_datastore/mod.rs index 8f77b462bdd..2eebaf4e619 100644 --- a/crates/datastore/src/locking_tx_datastore/mod.rs +++ b/crates/datastore/src/locking_tx_datastore/mod.rs @@ -3,7 +3,7 @@ pub mod committed_state; pub mod datastore; mod mut_tx; -pub use mut_tx::{FuncCallType, IndexScanPointOrRange, MutTxId, ViewCallInfo}; +pub use mut_tx::{FuncCallType, IndexScanPointOrRange, MutTxId, ViewCallInfo, ViewInstanceArgs}; mod sequence; pub mod state_view; pub use state_view::{IterByColEqTx, IterByColRangeTx}; diff --git a/crates/datastore/src/locking_tx_datastore/mut_tx.rs b/crates/datastore/src/locking_tx_datastore/mut_tx.rs index 66591569b52..3debc259afe 100644 --- a/crates/datastore/src/locking_tx_datastore/mut_tx.rs +++ b/crates/datastore/src/locking_tx_datastore/mut_tx.rs @@ -11,8 +11,8 @@ use super::{ use crate::{ error::ViewError, system_tables::{ - system_tables, ConnectionIdViaU128, IdentityViaU256, StConnectionCredentialsFields, StConnectionCredentialsRow, - StViewColumnFields, StViewFields, StViewParamFields, StViewParamRow, StViewSubFields, StViewSubRow, + system_tables, ConnectionIdViaU128, StConnectionCredentialsFields, StConnectionCredentialsRow, + StViewColumnFields, StViewFields, StViewParamFields, StViewParamRow, StViewSubFields, ST_CONNECTION_CREDENTIALS_ID, ST_VIEW_COLUMN_ID, ST_VIEW_ID, ST_VIEW_PARAM_ID, ST_VIEW_SUB_ID, }, }; @@ -48,7 +48,7 @@ use spacetimedb_lib::{ sender_view_arg_hash_value, ConnectionId, Identity, Timestamp, }; use spacetimedb_primitives::{ - col_list, ArgId, ColId, ColList, ColSet, ConstraintId, IndexId, ScheduleId, SequenceId, TableId, ViewId, + col_list, ColId, ColList, ColSet, ConstraintId, IndexId, ScheduleId, SequenceId, TableId, ViewId, }; use spacetimedb_sats::{ bsatn::to_writer, memory_usage::MemoryUsage, raw_identifier::RawIdentifier, ser::Serialize, AlgebraicValue, @@ -83,7 +83,143 @@ use std::{ #[derive(Clone, Debug, Eq, PartialEq, Hash)] pub struct ViewCallInfo { pub view_id: ViewId, - pub sender: Option, + pub arg_hash: AlgebraicValue, +} + +impl ViewCallInfo { + pub fn anonymous(view_id: ViewId) -> Self { + Self { + view_id, + arg_hash: MutTxId::anonymous_view_arg_hash(), + } + } + + pub fn sender(view_id: ViewId, sender: Identity) -> Self { + Self { + view_id, + arg_hash: MutTxId::view_arg_hash(sender), + } + } + + pub fn from_args(view_id: ViewId, args: ViewInstanceArgs) -> Self { + match args { + ViewInstanceArgs::Anonymous => Self::anonymous(view_id), + ViewInstanceArgs::Sender(sender) => Self::sender(view_id, sender), + } + } +} + +impl MemoryUsage for ViewCallInfo { + fn heap_usage(&self) -> usize { + self.arg_hash.heap_usage() + } +} + +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub enum ViewInstanceArgs { + Anonymous, + Sender(Identity), +} + +impl ViewInstanceArgs { + pub fn sender(self) -> Option { + match self { + Self::Anonymous => None, + Self::Sender(sender) => Some(sender), + } + } +} + +impl MemoryUsage for ViewInstanceArgs {} + +#[derive(Clone, Debug)] +pub(super) struct ViewInstanceState { + pub(super) args: ViewInstanceArgs, + pub(super) active_subscribers: HashMap, + pub(super) last_used: Timestamp, +} + +impl MemoryUsage for ViewInstanceState { + fn heap_usage(&self) -> usize { + self.active_subscribers.capacity() * std::mem::size_of::<(Identity, u64)>() + } +} + +impl ViewInstanceState { + fn new(args: ViewInstanceArgs, last_used: Timestamp) -> Self { + Self { + args, + active_subscribers: HashMap::default(), + last_used, + } + } + + fn has_subscribers(&self) -> bool { + !self.active_subscribers.is_empty() + } +} + +/// Transaction-local overlay for materialized view lifecycle state. +/// +/// The overlay stores a full post-image for each touched view instead of a delta. +/// This keeps same-transaction reads, rollback, and commit simple. +/// +/// TODO: This will clone the `active_subscribers` from the committed state, +/// which for a highly subscribed-to view can be expensive. +/// Look into optimizing this later. +#[derive(Default)] +pub(super) struct ViewInstanceTxState { + changes: HashMap>, +} + +impl ViewInstanceTxState { + fn get<'a>(&'a self, committed_state: &'a CommittedState, call: &ViewCallInfo) -> Option<&'a ViewInstanceState> { + match self.changes.get(call) { + Some(Some(state)) => Some(state), + Some(None) => None, + None => committed_state.view_instance(call), + } + } + + fn get_cloned(&self, committed_state: &CommittedState, call: &ViewCallInfo) -> Option { + self.get(committed_state, call).cloned() + } + + fn set(&mut self, call: ViewCallInfo, state: ViewInstanceState) { + self.changes.insert(call, Some(state)); + } + + fn remove(&mut self, call: ViewCallInfo) { + self.changes.insert(call, None); + } + + fn active_view_calls_for_subscriber( + &self, + committed_state: &CommittedState, + subscriber: Identity, + ) -> HashSet { + let mut calls = committed_state.active_view_calls_for_subscriber(subscriber); + for (call, state) in &self.changes { + match state { + Some(state) + if state + .active_subscribers + .get(&subscriber) + .is_some_and(|count| *count > 0) => + { + calls.insert(call.clone()); + } + _ => { + calls.remove(call); + } + } + } + calls + } + + pub(super) fn into_changes(self) -> HashMap> { + self.changes + } } /// A data structure for tracking the database rows/keys that are read by views @@ -91,6 +227,8 @@ pub struct ViewCallInfo { pub struct ViewReadSets { tables: IntMap, replacements: HashSet, + view_removals: HashSet, + removals: HashSet, } impl MemoryUsage for ViewReadSets { @@ -102,7 +240,10 @@ impl MemoryUsage for ViewReadSets { impl ViewReadSets { /// Returns whether there are no read sets recorded. pub fn is_empty(&self) -> bool { - self.tables.is_empty() && self.replacements.is_empty() + self.tables.is_empty() + && self.replacements.is_empty() + && self.view_removals.is_empty() + && self.removals.is_empty() } /// Returns the views that perform a full scan of this table @@ -126,14 +267,24 @@ impl ViewReadSets { self.replacements.insert(call); } - /// Removes keys for `view_id` from the read set - pub fn remove_view(&mut self, view_id: ViewId, sender: Option) { + /// Removes keys for `view_id` from the read set. + pub fn remove_view(&mut self, view_id: ViewId) { self.tables.retain(|_, readset| { - readset.remove_view(view_id, sender); + readset.remove_view(view_id); !readset.is_empty() }); } + /// On commit, removes keys for `call` from the committed read set. + pub fn remove_view_on_commit(&mut self, call: ViewCallInfo) { + self.removals.insert(call); + } + + /// On commit, removes all keys for `view_id` from the committed read set. + pub fn remove_view_id_on_commit(&mut self, view_id: ViewId) { + self.view_removals.insert(view_id); + } + /// Removes keys for exactly `call` from the read set. fn remove_view_call(&mut self, call: &ViewCallInfo) { self.tables.retain(|_, readset| { @@ -144,6 +295,14 @@ impl ViewReadSets { /// Merge or union read sets together pub fn merge(&mut self, readset: Self) { + for view_id in readset.view_removals { + self.remove_view(view_id); + } + + for call in readset.removals { + self.remove_view_call(&call); + } + for call in readset.replacements { self.remove_view_call(&call); } @@ -216,11 +375,9 @@ impl TableReadSet { self.table_scans.is_empty() && self.index_reads.is_empty() } - /// Removes keys for `view_id` from the read set, optionally filtering by `sender` - fn remove_view(&mut self, view_id: ViewId, sender: Option) { - let matches_call = |call: &ViewCallInfo| { - call.view_id == view_id && sender.as_ref().is_none_or(|s| call.sender.as_ref() == Some(s)) - }; + /// Removes keys for `view_id` from the read set. + fn remove_view(&mut self, view_id: ViewId) { + let matches_call = |call: &ViewCallInfo| call.view_id == view_id; // Remove from table_scans self.table_scans.retain(|call| !matches_call(call)); @@ -282,6 +439,7 @@ pub struct MutTxId { pub(super) sequence_state_lock: SharedMutexGuard, pub(super) lock_wait_time: Duration, pub(super) read_sets: ViewReadSets, + pub(super) view_instances: ViewInstanceTxState, // TODO(cloutiertyler): The below were made `pub` for the datastore split. We should // make these private again. pub timer: Instant, @@ -291,7 +449,7 @@ pub struct MutTxId { pub(crate) _not_send: PhantomData>, } -static_assert_size!(MutTxId, 464); +static_assert_size!(MutTxId, 560); impl MutTxId { /// Record that a view performs a table scan in this transaction's read set @@ -438,16 +596,15 @@ impl MutTxId { } Either::Right(res.into_iter()) } - /// Removes keys for `view_id` from the committed read set. + /// Removes keys for `view_id` from the committed read set on commit. /// Used for dropping views in an auto-migration. pub fn drop_view_from_committed_read_set(&mut self, view_id: ViewId) { - self.committed_state_write_lock.drop_view_from_read_sets(view_id, None) + self.read_sets.remove_view_id_on_commit(view_id) } - /// Removes a specific view call from the committed read set. - pub fn drop_view_with_sender_from_committed_read_set(&mut self, view_id: ViewId, sender: Identity) { - self.committed_state_write_lock - .drop_view_from_read_sets(view_id, Some(sender)) + /// Removes a specific view call from the committed read set on commit. + pub fn drop_view_call_from_committed_read_set(&mut self, call: ViewCallInfo) { + self.read_sets.remove_view_on_commit(call) } } @@ -640,6 +797,9 @@ impl MutTxId { self.drop_st_view_param(view_id)?; self.drop_st_view_column(view_id)?; self.drop_st_view_sub(view_id)?; + for call in self.effective_view_instances_for_view(view_id).into_keys() { + self.view_instances.remove(call); + } self.drop_view_from_committed_read_set(view_id); // Drop the view's backing table if materialized @@ -987,7 +1147,7 @@ impl MutTxId { self.delete_col_eq(ST_VIEW_COLUMN_ID, StViewColumnFields::ViewId.col_id(), &view_id.into()) } - /// Drops the rows in `st_view_sub` for this `view_id` + /// Drops any legacy rows in `st_view_sub` for this `view_id`. fn drop_st_view_sub(&mut self, view_id: ViewId) -> Result<()> { self.delete_col_eq(ST_VIEW_SUB_ID, StViewSubFields::ViewId.col_id(), &view_id.into()) } @@ -2332,9 +2492,9 @@ impl MutTxId { before_release: impl FnOnce(&Arc), ) -> (TxOffset, Arc, TxMetrics, Option, u64) { let tx_offset = self.committed_state_write_lock.next_tx_offset; - let tx_data = self - .committed_state_write_lock - .merge(self.tx_state, self.read_sets, &self.ctx); + let tx_data = + self.committed_state_write_lock + .merge(self.tx_state, self.read_sets, self.view_instances, &self.ctx); // Compute and keep enough info that we can // record metrics after the transaction has ended @@ -2398,9 +2558,9 @@ impl MutTxId { workload: Workload, before_downgrade: impl FnOnce(&Arc), ) -> (Arc, TxMetrics, TxId, u64) { - let tx_data = self - .committed_state_write_lock - .merge(self.tx_state, self.read_sets, &self.ctx); + let tx_data = + self.committed_state_write_lock + .merge(self.tx_state, self.read_sets, self.view_instances, &self.ctx); // Compute and keep enough info that we can // record metrics after the transaction has ended @@ -2603,283 +2763,184 @@ impl MutTxId { sender_view_arg_hash_value(sender) } - /// Does this caller have an entry for `view_id` in `st_view_sub`? - pub fn is_view_materialized(&self, view_id: ViewId, arg_id: ArgId, sender: Identity) -> Result { - use StViewSubFields::*; - let sender = IdentityViaU256(sender); - let cols = col_list![ViewId, ArgId, Identity]; - let value = AlgebraicValue::product([view_id.into(), arg_id.into(), sender.into()]); - Ok(self.iter_by_col_eq(ST_VIEW_SUB_ID, cols, &value)?.next().is_some()) + fn get_view_instance(&self, call: &ViewCallInfo) -> Option<&ViewInstanceState> { + self.view_instances.get(&self.committed_state_write_lock, call) + } + + fn get_view_instance_cloned(&self, call: &ViewCallInfo) -> Option { + self.view_instances.get_cloned(&self.committed_state_write_lock, call) + } + + fn effective_view_instances(&self) -> HashMap { + let mut instances = self + .committed_state_write_lock + .view_instances() + .map(|(call, state)| (call.clone(), state.clone())) + .collect::>(); + + for (call, state) in &self.view_instances.changes { + match state { + Some(state) => { + instances.insert(call.clone(), state.clone()); + } + None => { + instances.remove(call); + } + } + } + + instances + } + + fn effective_view_instances_for_view(&self, view_id: ViewId) -> HashMap { + self.effective_view_instances() + .into_iter() + .filter(|(call, _)| call.view_id == view_id) + .collect() + } + + /// Is this view argument currently materialized? + pub fn is_view_materialized(&self, call: &ViewCallInfo) -> Result { + Ok(self.get_view_instance(call).is_some()) + } + + /// Returns the stored arguments needed to execute this materialized view argument. + pub fn view_instance_args(&self, call: &ViewCallInfo) -> Option { + self.get_view_instance(call).map(|state| state.args) } - /// Does any `st_view_sub` row exist for this anonymous view? - pub fn is_anonymous_view_materialized(&self, view_id: ViewId) -> Result { - let cols = StViewSubFields::ViewId; - let value = view_id.into(); - Ok(self.iter_by_col_eq(ST_VIEW_SUB_ID, cols, &value)?.next().is_some()) + /// Returns all materialized view instances for `view_id`. + pub fn materialized_view_instances_for_view(&self, view_id: ViewId) -> Vec { + self.effective_view_instances_for_view(view_id) + .into_values() + .map(|state| state.args) + .collect() } - /// Updates the `last_called` timestamp in `st_view_sub`. - /// Inserts a row into `st_view_sub` with no subscribers if the row does not exist. + /// Returns active subscribers for a materialized view. + #[cfg(any(test, feature = "test"))] + pub fn active_subscribers_for_view(&self, view_id: ViewId) -> Vec<(Identity, u64)> { + self.effective_view_instances_for_view(view_id) + .into_values() + .flat_map(|state| state.active_subscribers.into_iter()) + .collect() + } + + /// Updates the `last_used` timestamp for a materialized view argument. /// /// This is invoked when calling a view, but not subscribing to it. /// Such is the case for the sql http api. - pub fn update_view_timestamp(&mut self, view_id: ViewId, arg_id: ArgId, sender: Identity) -> Result<()> { - self.update_view_timestamp_at(view_id, arg_id, sender, Timestamp::now()) + pub fn update_view_timestamp(&mut self, call: ViewCallInfo, args: ViewInstanceArgs) -> Result<()> { + self.update_view_timestamp_at(call, args, Timestamp::now()) } - /// Updates the `last_called` timestamp in `st_view_sub` to an explicit value. + /// Updates the `last_used` timestamp for a materialized view argument to an explicit value. pub fn update_view_timestamp_at( &mut self, - view_id: ViewId, - arg_id: ArgId, - sender: Identity, - last_called: Timestamp, + call: ViewCallInfo, + args: ViewInstanceArgs, + last_used: Timestamp, ) -> Result<()> { - use StViewSubFields::*; + let mut state = self + .get_view_instance_cloned(&call) + .unwrap_or_else(|| ViewInstanceState::new(args, last_used)); + state.args = args; + state.last_used = last_used; + self.view_instances.set(call, state); + Ok(()) + } - let identity = IdentityViaU256(sender); - let cols = col_list![ViewId, ArgId, Identity]; - let value = AlgebraicValue::product([view_id.into(), arg_id.into(), identity.into()]); - let last_called = last_called.into(); + /// Increment this subscriber's refcount for a materialized view argument. + pub fn subscribe_view(&mut self, call: ViewCallInfo, args: ViewInstanceArgs, subscriber: Identity) -> Result<()> { + let mut state = self + .get_view_instance_cloned(&call) + .unwrap_or_else(|| ViewInstanceState::new(args, Timestamp::now())); + state.args = args; + *state.active_subscribers.entry(subscriber).or_default() += 1; + state.last_used = Timestamp::now(); + self.view_instances.set(call, state); + Ok(()) + } - // Update `last_called` of `st_view_sub` row - if let Some((row, ptr)) = self - .iter_by_col_eq(ST_VIEW_SUB_ID, cols, &value)? - .next() - .map(|row_ref| StViewSubRow::try_from(row_ref).map(|row| (row, row_ref.pointer()))) - .transpose()? - { - self.delete(ST_VIEW_SUB_ID, ptr)?; - self.insert_via_serialize_bsatn(ST_VIEW_SUB_ID, &StViewSubRow { last_called, ..row })?; + /// Decrement this subscriber's refcount for a materialized view argument. + pub fn unsubscribe_view(&mut self, call: ViewCallInfo, subscriber: Identity) -> Result<()> { + let Some(mut state) = self.get_view_instance_cloned(&call) else { return Ok(()); + }; + + if let Some(count) = state.active_subscribers.get_mut(&subscriber) { + *count = count.saturating_sub(1); + if *count == 0 { + state.active_subscribers.remove(&subscriber); + } + state.last_used = Timestamp::now(); + self.view_instances.set(call, state); } - // Insert `st_view_sub` row with 0 subscribers - self.insert_via_serialize_bsatn( - ST_VIEW_SUB_ID, - &StViewSubRow { - view_id, - arg_id, - identity, - num_subscribers: 0, - has_subscribers: false, - last_called, - }, - )?; Ok(()) } - /// Increment `num_subscribers` in `st_view_sub` to effectively subscribe a caller to a view. - /// We insert a row if there are no current subscribers and the row does not exist. - pub fn subscribe_view(&mut self, view_id: ViewId, arg_id: ArgId, sender: Identity) -> Result<()> { - use StViewSubFields::*; - - let identity = IdentityViaU256(sender); - let cols = col_list![ViewId, ArgId, Identity]; - let value = AlgebraicValue::product([view_id.into(), arg_id.into(), identity.into()]); - let last_called = Timestamp::now().into(); + /// Decrement this subscriber's refcount for all of their subscribed view arguments. + pub fn unsubscribe_views(&mut self, subscriber: Identity) -> Result<()> { + let calls = self + .view_instances + .active_view_calls_for_subscriber(&self.committed_state_write_lock, subscriber); - // Update `last_called` of `st_view_sub` row - if let Some((row, ptr)) = self - .iter_by_col_eq(ST_VIEW_SUB_ID, cols, &value)? - .next() - .map(|row_ref| StViewSubRow::try_from(row_ref).map(|row| (row, row_ref.pointer()))) - .transpose()? - { - self.delete(ST_VIEW_SUB_ID, ptr)?; - self.insert_via_serialize_bsatn( - ST_VIEW_SUB_ID, - &StViewSubRow { - num_subscribers: row.num_subscribers + 1, - has_subscribers: true, - last_called, - ..row - }, - )?; - return Ok(()); + for call in calls { + self.unsubscribe_view(call, subscriber)?; } - // Insert `st_view_sub` row with 1 subscriber - self.insert_via_serialize_bsatn( - ST_VIEW_SUB_ID, - &StViewSubRow { - view_id, - arg_id, - identity, - num_subscribers: 1, - has_subscribers: true, - last_called, - }, - )?; Ok(()) } - /// Clean up views that have no subscribers and haven’t been called recently. - /// - /// This function will scan for subscription entries in `st_view_sub` where: - /// - `has_subscribers == false`, `num_subscribers == 0`. - /// - `last_called` is older than `expiration_duration`. - /// - /// For each such expired row: - /// 1. It deletes the expired `st_view_sub` row. - /// 2. If that row was the last remaining materialization entry for the view, - /// it clears the backing table and removes the view from the committed read set. - /// - /// The cleanup is bounded by a total `max_duration`. The function stops when either: - /// - all expired views have been processed, or - /// - the `max_duration` budget is reached. + /// Clean up materialized view arguments that have no subscribers and haven’t been used recently. /// /// Returns a tuple `(cleaned, total_expired)`: - /// - `cleaned`: Number of expired `st_view_sub` rows deleted in this run. - /// - `total_expired`: Total number of expired rows found (even if not all were cleaned due to time budget). + /// - `cleaned`: Number of expired materialized view arguments deleted in this run. + /// - `total_expired`: Total number of expired materialized view arguments found. pub fn clear_expired_views( &mut self, expiration_duration: Duration, max_duration: Duration, ) -> Result<(usize, usize)> { let start = std::time::Instant::now(); - let now = Timestamp::now(); - let expiration_threshold = now - expiration_duration; + let expiration_threshold = Timestamp::now() - expiration_duration; let mut cleaned_count = 0; - // Collect all expired views from st_view_sub - let expired_items: Vec<(ViewId, Identity, RowPointer)> = self - .iter_by_col_eq( - ST_VIEW_SUB_ID, - StViewSubFields::HasSubscribers, - &AlgebraicValue::from(false), - )? - .filter_map(|row_ref| { - let row = StViewSubRow::try_from(row_ref).expect("Failed to deserialize st_view_sub row"); - - if !row.has_subscribers && row.num_subscribers == 0 && row.last_called.0 < expiration_threshold { - Some((row.view_id, row.identity.into(), row_ref.pointer())) - } else { - None - } + let expired_items = self + .effective_view_instances() + .into_iter() + .filter_map(|(call, state)| { + (!state.has_subscribers() && state.last_used < expiration_threshold).then_some(call) }) - .collect(); + .collect::>(); let total_expired = expired_items.len(); - // For each expired subscription row, clear the backing table only if that row - // was the last remaining entry for the shared materialization. - for (view_id, sender, sub_row_ptr) in expired_items { - // Check if we've exceeded our time budget + for call in expired_items { if start.elapsed() >= max_duration { break; } - let StViewRow { - table_id, is_anonymous, .. - } = self.lookup_st_view(view_id)?; + let StViewRow { table_id, .. } = self.lookup_st_view(call.view_id)?; let table_id = table_id.expect("views have backing table"); + let rows_to_delete = self + .iter_by_col_eq(table_id, VIEW_ARG_HASH_COL, &call.arg_hash)? + .map(|res| res.pointer()) + .collect::>(); - let drop_materialization = !is_anonymous || !self.has_other_st_view_sub_entries(view_id, sub_row_ptr)?; - if drop_materialization { - let arg_hash = if is_anonymous { - Self::anonymous_view_arg_hash() - } else { - Self::view_arg_hash(sender) - }; - let rows_to_delete = self - .iter_by_col_eq(table_id, VIEW_ARG_HASH_COL, &arg_hash)? - .map(|res| res.pointer()) - .collect::>(); - - for row_ptr in rows_to_delete { - self.delete(table_id, row_ptr)?; - } - - if is_anonymous { - self.drop_view_from_committed_read_set(view_id); - } else { - self.drop_view_with_sender_from_committed_read_set(view_id, sender); - } + for row_ptr in rows_to_delete { + self.delete(table_id, row_ptr)?; } - // Finally, delete the subscription row - self.delete(ST_VIEW_SUB_ID, sub_row_ptr)?; + self.drop_view_call_from_committed_read_set(call.clone()); + self.view_instances.remove(call); cleaned_count += 1; } Ok((cleaned_count, total_expired)) } - /// Decrement `num_subscribers` in `st_view_sub` to effectively unsubscribe a caller from a view. - pub fn unsubscribe_view(&mut self, view_id: ViewId, arg_id: ArgId, sender: Identity) -> Result<()> { - use StViewSubFields::*; - - let identity = IdentityViaU256(sender); - let cols = col_list![ViewId, ArgId, Identity]; - let value = AlgebraicValue::product([view_id.into(), arg_id.into(), identity.into()]); - let last_called = Timestamp::now().into(); - - // Update `last_called` of `st_view_sub` row - if let Some((row, ptr)) = self - .iter_by_col_eq(ST_VIEW_SUB_ID, cols, &value)? - .next() - .map(|row_ref| StViewSubRow::try_from(row_ref).map(|row| (row, row_ref.pointer()))) - .transpose()? - { - self.delete(ST_VIEW_SUB_ID, ptr)?; - self.insert_via_serialize_bsatn( - ST_VIEW_SUB_ID, - &StViewSubRow { - num_subscribers: row.num_subscribers - 1, - has_subscribers: row.num_subscribers > 1, - last_called, - ..row - }, - )?; - } - Ok(()) - } - - /// To effectively unsubscribe a caller from all of their subscribed views, - /// we decrement `num_subscribers` in `st_view_sub` for all of a caller's views. - pub fn unsubscribe_views(&mut self, sender: Identity) -> Result<()> { - let sender = IdentityViaU256(sender); - let cols = col_list![StViewSubFields::Identity]; - let value = sender.into(); - - // Collect the rows for this identity. - // These are rows for which we will decrement the subscriber count. - let rows_to_delete = self - .iter_by_col_eq(ST_VIEW_SUB_ID, cols, &value)? - .map(|row_ref| StViewSubRow::try_from(row_ref).map(|row| (row, row_ref.pointer()))) - .filter(|result| match result { - Ok((row, _)) => row.has_subscribers && row.num_subscribers > 0, - _ => true, - }) - .collect::>>()?; - - // Copy the rows to delete and decrement their subscriber count. - // These are the rows that we will insert. - let rows_to_insert = rows_to_delete - .iter() - .map(|(row, _)| row.clone()) - .map(|row| StViewSubRow { - num_subscribers: row.num_subscribers - 1, - has_subscribers: row.num_subscribers > 1, - ..row - }) - .collect::>(); - - // Delete the old rows - for (_, ptr) in rows_to_delete { - self.delete(ST_VIEW_SUB_ID, ptr)?; - } - - // Insert the new rows - for row in rows_to_insert { - self.insert_via_serialize_bsatn(ST_VIEW_SUB_ID, &row)?; - } - - Ok(()) - } - /// Clear all rows from all view tables without dropping them. pub fn clear_all_views(&mut self) -> Result<()> { for table_id in self @@ -2894,25 +2955,6 @@ impl MutTxId { Ok(()) } - /// Get all view subscriptions for a given view. - pub fn lookup_st_view_subs(&self, view_id: ViewId) -> Result> { - let cols = StViewSubFields::ViewId; - let value = view_id.into(); - self.iter_by_col_eq(ST_VIEW_SUB_ID, cols, &value)? - .map(StViewSubRow::try_from) - .collect::>>() - } - - /// Does this `view_id` have other entries in `st_view_sub` besides `current_ptr`? - /// Can be true for anonymous views with multiple subscribers. - fn has_other_st_view_sub_entries(&self, view_id: ViewId, current_ptr: RowPointer) -> Result { - let cols = StViewSubFields::ViewId; - let value = view_id.into(); - Ok(self - .iter_by_col_eq(ST_VIEW_SUB_ID, cols, &value)? - .any(|row_ref| row_ref.pointer() != current_ptr)) - } - /// Lookup a row in `st_view` by its primary key fn st_view_row(&self, view_id: ViewId) -> Result> { self.iter_by_col_eq(ST_VIEW_ID, col_list![StViewFields::ViewId], &view_id.into())? diff --git a/crates/engine/src/relational_db.rs b/crates/engine/src/relational_db.rs index d12bb0c090e..5ba71da15cb 100644 --- a/crates/engine/src/relational_db.rs +++ b/crates/engine/src/relational_db.rs @@ -1657,11 +1657,7 @@ impl RelationalDB { view_call: ViewCallInfo, rows: Vec, ) -> Result<(), DBError> { - let arg_hash = match view_call.sender { - Some(sender) => MutTxId::view_arg_hash(sender), - None => MutTxId::anonymous_view_arg_hash(), - }; - self.materialize_view_arg_hash(tx, table_id, arg_hash, rows)?; + self.materialize_view_arg_hash(tx, table_id, view_call.arg_hash.clone(), rows)?; tx.replace_view_read_set(view_call); Ok(()) @@ -2369,6 +2365,7 @@ mod tests { use spacetimedb_data_structures::map::IntMap; use spacetimedb_datastore::error::{DatastoreError, IndexError}; use spacetimedb_datastore::execution_context::ReducerContext; + use spacetimedb_datastore::locking_tx_datastore::ViewInstanceArgs; use spacetimedb_datastore::system_tables::{ system_tables, StConstraintRow, StIndexRow, StSequenceRow, StTableRow, ST_CONSTRAINT_ID, ST_INDEX_ID, ST_SEQUENCE_ID, ST_TABLE_ID, @@ -2524,7 +2521,8 @@ mod tests { let row_pv = |v: u8| product![v]; let mut tx = begin_mut_tx(stdb); - tx.subscribe_view(view_id, ArgId::SENTINEL, sender)?; + let args = ViewInstanceArgs::Sender(sender); + tx.subscribe_view(ViewCallInfo::from_args(view_id, args), args, sender)?; stdb.materialize_view(&mut tx, table_id, sender, vec![row_pv(v)])?; stdb.commit_tx(tx)?; @@ -2561,9 +2559,12 @@ mod tests { .collect() } - fn update_last_called(stdb: &TestDB, view_id: ViewId, sender: Identity, last_called: Timestamp) -> ResultTest<()> { + fn update_last_called(stdb: &TestDB, view_call: ViewCallInfo, last_called: Timestamp) -> ResultTest<()> { let mut tx = begin_mut_tx(stdb); - tx.update_view_timestamp_at(view_id, ArgId::SENTINEL, sender, last_called)?; + let args = tx + .view_instance_args(&view_call) + .expect("view instance should exist before updating last_called"); + tx.update_view_timestamp_at(view_call, args, last_called)?; stdb.commit_tx(tx)?; Ok(()) } @@ -2593,10 +2594,10 @@ mod tests { ); let tx = begin_mut_tx(&stdb); - let subs_rows = tx.lookup_st_view_subs(view_id)?; + let subscribers = tx.active_subscribers_for_view(view_id); assert!( - subs_rows.is_empty(), - "st_view_subs should be empty after reopening the database" + subscribers.is_empty(), + "view lifecycle subscribers should be empty after reopening the database" ); Ok(()) } @@ -2618,7 +2619,8 @@ mod tests { }; let mut tx = begin_mut_tx(&stdb); - tx.subscribe_view(view_id, ArgId::SENTINEL, Identity::ONE)?; + let args = ViewInstanceArgs::Sender(Identity::ONE); + tx.subscribe_view(ViewCallInfo::from_args(view_id, args), args, Identity::ONE)?; stdb.materialize_view(&mut tx, table_id, Identity::ONE, vec![product![10u8]])?; let (tx_offset_2, tx_data, ..) = stdb.commit_tx(tx)?.unwrap(); @@ -2660,10 +2662,10 @@ mod tests { ); let tx = begin_mut_tx(&stdb); - let subs_rows = tx.lookup_st_view_subs(view_id)?; + let subscribers = tx.active_subscribers_for_view(view_id); assert!( - subs_rows.is_empty(), - "st_view_subs should be empty after reopening the database" + subscribers.is_empty(), + "view lifecycle subscribers should be empty after reopening the database" ); Ok(()) } @@ -2709,8 +2711,11 @@ mod tests { ); let tx = begin_mut_tx(&stdb); - let st = tx.lookup_st_view_subs(view_id)?; - assert!(st.is_empty(), "st_view_subs should also be cleared after restart"); + let subscribers = tx.active_subscribers_for_view(view_id); + assert!( + subscribers.is_empty(), + "view lifecycle subscribers should also be cleared after restart" + ); stdb.commit_tx(tx)?; // Reinsert after restart @@ -2740,11 +2745,11 @@ mod tests { "Sender 2 row should remain" ); - // And st_view_subs must reflect only sender2 + // And lifecycle state must reflect only sender2. let tx = begin_mut_tx(&stdb); - let st_after = tx.lookup_st_view_subs(view_id)?; - assert_eq!(st_after.len(), 1); - assert_eq!(st_after[0].identity.0, sender2); + let mut subscribers = tx.active_subscribers_for_view(view_id); + subscribers.sort_by_key(|(identity, _)| identity.to_u256()); + assert_eq!(subscribers, vec![(sender2, 1)]); Ok(()) } @@ -2763,25 +2768,21 @@ mod tests { let live_sender = Identity::ZERO; let mut tx = begin_mut_tx(&stdb); - tx.subscribe_view(view_id, ArgId::SENTINEL, stale_sender)?; - tx.subscribe_view(view_id, ArgId::SENTINEL, live_sender)?; - stdb.materialize_view_call( - &mut tx, - table_id, - ViewCallInfo { view_id, sender: None }, - vec![product![42u8]], - )?; + let view_call = ViewCallInfo::anonymous(view_id); + tx.subscribe_view(view_call.clone(), ViewInstanceArgs::Anonymous, stale_sender)?; + tx.subscribe_view(view_call.clone(), ViewInstanceArgs::Anonymous, live_sender)?; + stdb.materialize_view_call(&mut tx, table_id, view_call, vec![product![42u8]])?; stdb.commit_tx(tx)?; let mut tx = begin_mut_tx(&stdb); - tx.unsubscribe_view(view_id, ArgId::SENTINEL, stale_sender)?; + tx.unsubscribe_view(ViewCallInfo::anonymous(view_id), stale_sender)?; stdb.commit_tx(tx)?; // Make one row definitely expired without relying on wall-clock sleeps. - update_last_called(&stdb, view_id, stale_sender, Timestamp::UNIX_EPOCH)?; + update_last_called(&stdb, ViewCallInfo::anonymous(view_id), Timestamp::UNIX_EPOCH)?; let mut tx = begin_mut_tx(&stdb); - tx.update_view_timestamp(view_id, ArgId::SENTINEL, live_sender)?; + tx.update_view_timestamp(ViewCallInfo::anonymous(view_id), ViewInstanceArgs::Anonymous)?; stdb.commit_tx(tx)?; // Cleanup should remove only the stale subscriber row and keep the shared @@ -2797,11 +2798,9 @@ mod tests { ); let tx = begin_mut_tx(&stdb); - let st_after = tx.lookup_st_view_subs(view_id)?; - assert_eq!(st_after.len(), 1); - assert_eq!(st_after[0].identity.0, live_sender); - assert!(st_after[0].has_subscribers); - assert_eq!(st_after[0].num_subscribers, 1); + let mut subscribers = tx.active_subscribers_for_view(view_id); + subscribers.sort_by_key(|(identity, _)| identity.to_u256()); + assert_eq!(subscribers, vec![(live_sender, 1)]); Ok(()) } @@ -2818,21 +2817,17 @@ mod tests { let sender = Identity::ONE; let mut tx = begin_mut_tx(&stdb); - tx.subscribe_view(view_id, ArgId::SENTINEL, sender)?; - stdb.materialize_view_call( - &mut tx, - table_id, - ViewCallInfo { view_id, sender: None }, - vec![product![42u8]], - )?; + let view_call = ViewCallInfo::anonymous(view_id); + tx.subscribe_view(view_call.clone(), ViewInstanceArgs::Anonymous, sender)?; + stdb.materialize_view_call(&mut tx, table_id, view_call, vec![product![42u8]])?; stdb.commit_tx(tx)?; let mut tx = begin_mut_tx(&stdb); - tx.unsubscribe_view(view_id, ArgId::SENTINEL, sender)?; + tx.unsubscribe_view(ViewCallInfo::anonymous(view_id), sender)?; stdb.commit_tx(tx)?; // Mark the unsubscribed row as expired so cleanup can process it immediately. - update_last_called(&stdb, view_id, sender, Timestamp::UNIX_EPOCH)?; + update_last_called(&stdb, ViewCallInfo::anonymous(view_id), Timestamp::UNIX_EPOCH)?; // With no remaining subscriber rows, cleanup should drop the shared // anonymous materialization and remove the bookkeeping row. @@ -2846,8 +2841,8 @@ mod tests { ); let tx = begin_mut_tx(&stdb); - let st_after = tx.lookup_st_view_subs(view_id)?; - assert!(st_after.is_empty()); + let subscribers = tx.active_subscribers_for_view(view_id); + assert!(subscribers.is_empty()); Ok(()) }