diff --git a/Cargo.lock b/Cargo.lock index 6f9e9ae7a3e..d3c2f04d231 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8205,6 +8205,26 @@ dependencies = [ "thiserror 1.0.69", ] +[[package]] +name = "spacetimedb-dst" +version = "2.6.0" +dependencies = [ + "anyhow", + "clap 4.5.50", + "spacetimedb-commitlog", + "spacetimedb-datastore", + "spacetimedb-durability", + "spacetimedb-engine", + "spacetimedb-lib", + "spacetimedb-primitives", + "spacetimedb-runtime", + "spacetimedb-sats", + "spacetimedb-schema", + "spacetimedb-table", + "tracing", + "tracing-subscriber", +] + [[package]] name = "spacetimedb-durability" version = "2.6.0" diff --git a/Cargo.toml b/Cargo.toml index 3f877c82330..5f2646c26ec 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -75,7 +75,7 @@ members = [ "crates/bindings-typescript/test-app/server", "crates/bindings-typescript/test-react-router-app/server", "crates/bindings-typescript/test-solid-router/server", - "crates/query-builder", + "crates/query-builder", "crates/dst", ] default-members = ["crates/cli", "crates/standalone", "crates/update"] # cargo feature graph resolver. v3 is default in edition2024 but workspace diff --git a/crates/dst/Cargo.toml b/crates/dst/Cargo.toml new file mode 100644 index 00000000000..2eadb7d99df --- /dev/null +++ b/crates/dst/Cargo.toml @@ -0,0 +1,24 @@ +[package] +name = "spacetimedb-dst" +version.workspace = true +edition.workspace = true +rust-version.workspace = true + +[dependencies] +anyhow.workspace = true +clap.workspace = true +spacetimedb-datastore = { path = "../datastore", default-features = false, features = ["simulation"] } +spacetimedb-commitlog.workspace = true +spacetimedb-durability = { path = "../durability", default-features = false, features = ["simulation"] } +spacetimedb-engine = { path = "../engine", default-features = false, features = ["simulation"] } +spacetimedb-lib.workspace = true +spacetimedb-primitives.workspace = true +spacetimedb-runtime = { path = "../runtime/", default-features = false, features = ["simulation"] } +spacetimedb-sats.workspace = true +spacetimedb-schema.workspace = true +spacetimedb-table = { path = "../table", default-features = false } +tracing.workspace = true +tracing-subscriber.workspace = true + +[lints] +workspace = true diff --git a/crates/dst/README.md b/crates/dst/README.md new file mode 100644 index 00000000000..1a4dfcb2396 --- /dev/null +++ b/crates/dst/README.md @@ -0,0 +1,20 @@ +# SpacetimeDB DST + +Deterministic Simulation Testing framework for SpacetimeDB. + +## Test + +```sh +cargo test -p spacetimedb-dst +``` + +## Run + +```sh +cargo run -p spacetimedb-dst -- run --seed 42 --max-interactions 1000 +``` + +Options: + +- `--seed ` — RNG seed (defaults to wall-clock nanos) +- `--max-interactions ` — interaction budget diff --git a/crates/dst/src/engine.rs b/crates/dst/src/engine.rs new file mode 100644 index 00000000000..8dcfaab46a7 --- /dev/null +++ b/crates/dst/src/engine.rs @@ -0,0 +1,303 @@ +use std::{io, sync::Arc}; + +use spacetimedb_commitlog::SizeOnDisk; +use spacetimedb_datastore::execution_context::Workload; +use spacetimedb_datastore::traits::{IsolationLevel, TxData}; +use spacetimedb_engine::error::{DBError, DatastoreError, IndexError}; +use spacetimedb_engine::persistence::{DiskSizeFn, Durability as EngineDurability, Persistence}; +use spacetimedb_engine::relational_db::{MutTx, RelationalDB}; +use spacetimedb_lib::{Identity, RawModuleDef}; +use spacetimedb_primitives::TableId; +use spacetimedb_runtime::sim::{Rng, Runtime as SimRuntime}; +use spacetimedb_runtime::Handle; +use spacetimedb_schema::def::ModuleDef; +use spacetimedb_schema::schema::{Schema, TableSchema}; +use spacetimedb_table::page_pool::PagePool; + +mod model; +mod properties; +mod workload; + +use self::workload::{ + normalize_rows, row_to_bytes, CommitDelta, CountState, InsertOutcome, Interaction, Observation, TableDelta, + TableRowCount, +}; + +use crate::engine::model::Model; +use crate::engine::properties::EngineProperties; +use crate::engine::workload::WorkloadGen; +use crate::schema::{default_schema, to_raw_def, SchemaPlan}; +use crate::sim::commitlog::{InMemoryCommitlog, InMemoryCommitlogHandle}; +use crate::traits::{TargetDriver, TestSuite}; + +pub struct EngineTarget { + db: Option, + table_ids: Vec, + active_mut_tx: Option, + commitlog: InMemoryCommitlog, + runtime_handle: Handle, +} + +impl EngineTarget { + pub fn init(schema: SchemaPlan, runtime_seed: u64) -> anyhow::Result { + let runtime = SimRuntime::new(runtime_seed); + let runtime_handle = Handle::simulation(runtime.handle()); + let commitlog = InMemoryCommitlog::new(); + let db = Self::open_db(&commitlog, runtime_handle.clone())?; + + Self::install_schema(&db, &schema)?; + let table_ids = Self::load_table_ids(&db, &schema)?; + + Ok(Self { + db: Some(db), + table_ids, + active_mut_tx: None, + commitlog, + runtime_handle, + }) + } + + fn open_db(commitlog: &InMemoryCommitlog, runtime_handle: Handle) -> anyhow::Result { + let history = commitlog.open_handle()?; + let persistence = Self::persistence(history.clone(), runtime_handle); + let (db, connected_clients) = RelationalDB::open( + Identity::ZERO, + Identity::ZERO, + history, + Some(persistence), + None, + PagePool::new_for_test(), + )?; + anyhow::ensure!(connected_clients.is_empty(), "replay produced connected clients"); + Ok(db) + } + + fn persistence(handle: InMemoryCommitlogHandle, runtime_handle: Handle) -> Persistence { + let durability: Arc = Arc::new(handle); + let disk_size: DiskSizeFn = Arc::new(|| { + io::Result::Ok(SizeOnDisk { + total_bytes: 0, + total_blocks: 0, + }) + }); + Persistence { + durability, + disk_size, + snapshots: None, + runtime: runtime_handle, + } + } + + fn install_schema(db: &RelationalDB, schema: &SchemaPlan) -> anyhow::Result<()> { + let raw = to_raw_def(schema); + let raw_module_def = RawModuleDef::V10(raw); + let module_def = + ModuleDef::try_from(raw_module_def).map_err(|e| anyhow::anyhow!("schema validation failed: {e}"))?; + + db.with_auto_commit(Workload::Internal, |tx| -> Result<(), DBError> { + for table_def in module_def.tables() { + let tbl_schema = TableSchema::from_module_def(&module_def, table_def, (), TableId::SENTINEL); + db.create_table(tx, tbl_schema)?; + } + Ok(()) + })?; + + Ok(()) + } + + fn load_table_ids(db: &RelationalDB, schema: &SchemaPlan) -> anyhow::Result> { + let mut table_ids = Vec::with_capacity(schema.tables.len()); + db.with_auto_commit(Workload::Internal, |tx| -> Result<(), DBError> { + for table_plan in &schema.tables { + let id = db + .table_id_from_name_mut(tx, &table_plan.name)? + .ok_or_else(|| anyhow::anyhow!("table '{}' not found after creation", table_plan.name))?; + table_ids.push(id); + } + Ok(()) + })?; + Ok(table_ids) + } + + fn reopen_from_commitlog(&mut self) -> anyhow::Result<()> { + let db = self + .db + .take() + .ok_or_else(|| anyhow::anyhow!("replay without open database"))?; + + drop(db); + + self.db = Some(Self::open_db(&self.commitlog, self.runtime_handle.clone())?); + Ok(()) + } + + fn count_state(&self) -> anyhow::Result { + let db = self + .db + .as_ref() + .ok_or_else(|| anyhow::anyhow!("database is not open"))?; + let tx = db.begin_tx(Workload::Internal); + let mut row_counts = Vec::with_capacity(self.table_ids.len()); + + for (table, table_id) in self.table_ids.iter().enumerate() { + let count = match db.iter(&tx, *table_id) { + Ok(iter) => iter.count() as u64, + Err(err) => { + let _ = db.release_tx(tx); + return Err(err.into()); + } + }; + row_counts.push(TableRowCount { table, count }); + } + + let _ = db.release_tx(tx); + Ok(CountState { row_counts }) + } + + fn is_unique_constraint_violation(error: &DBError) -> bool { + matches!( + error, + DBError::Datastore(DatastoreError::Index(IndexError::UniqueConstraintViolation(_))) + ) + } + + fn commit_delta_from_tx_data(&self, tx_data: &TxData) -> CommitDelta { + let mut tables = Vec::new(); + + for (table_id, entry) in tx_data.iter_table_entries() { + let Some(table) = self.table_ids.iter().position(|id| *id == table_id) else { + continue; + }; + + let inserts = normalize_rows(entry.inserts.iter().cloned().collect()); + let deletes = normalize_rows(entry.deletes.iter().cloned().collect()); + if inserts.is_empty() && deletes.is_empty() && !entry.truncated { + continue; + } + + tables.push(TableDelta { + table, + inserts, + deletes, + truncated: entry.truncated, + }); + } + + tables.sort_by_key(|delta| delta.table); + CommitDelta { tables } + } + + pub fn execute(&mut self, interaction: &Interaction) -> anyhow::Result { + tracing::debug!(?interaction, "executing interaction"); + + let observation = match interaction { + Interaction::BeginMutTx => { + anyhow::ensure!( + self.active_mut_tx.is_none(), + "begin mutable transaction while one is already active" + ); + let db = self + .db + .as_ref() + .ok_or_else(|| anyhow::anyhow!("database is not open"))?; + self.active_mut_tx = Some(db.begin_mut_tx(IsolationLevel::Serializable, Workload::Internal)); + Ok(Observation::BeganMutTx) + } + Interaction::Insert { table, row } => { + let table_id = self.table_ids[*table]; + let bytes = row_to_bytes(row); + let db = self + .db + .as_ref() + .ok_or_else(|| anyhow::anyhow!("database is not open"))?; + let tx = self + .active_mut_tx + .as_mut() + .ok_or_else(|| anyhow::anyhow!("insert without active mutable transaction"))?; + let outcome = match db.insert(tx, table_id, &bytes) { + Ok((_generated_columns, row, _flags)) => InsertOutcome::Accepted(row.to_product_value()), + // Generated rows can intentionally hit unique constraints; the oracle validates that rejection. + Err(error) if Self::is_unique_constraint_violation(&error) => { + InsertOutcome::UniqueConstraintViolation + } + Err(error) => return Err(error.into()), + }; + Ok(Observation::Inserted { outcome }) + } + Interaction::Delete { table, row } => { + let table_id = self.table_ids[*table]; + let db = self + .db + .as_ref() + .ok_or_else(|| anyhow::anyhow!("database is not open"))?; + let tx = self + .active_mut_tx + .as_mut() + .ok_or_else(|| anyhow::anyhow!("delete without active mutable transaction"))?; + db.delete_by_rel(tx, table_id, [row.clone()]); + Ok(Observation::Deleted) + } + Interaction::CommitTx => { + let tx = self + .active_mut_tx + .take() + .ok_or_else(|| anyhow::anyhow!("commit without active mutable transaction"))?; + let db = self + .db + .as_ref() + .ok_or_else(|| anyhow::anyhow!("database is not open"))?; + let Some((_tx_offset, tx_data, _tx_metrics, _reducer)) = db.commit_tx(tx)? else { + anyhow::bail!("commit produced no transaction data"); + }; + Ok(Observation::Committed { + delta: self.commit_delta_from_tx_data(&tx_data), + }) + } + Interaction::Replay => { + let _ = self.active_mut_tx.take(); + self.reopen_from_commitlog()?; + Ok(Observation::Replayed { + state: self.count_state()?, + }) + } + }; + + match &observation { + Ok(observation) => tracing::debug!(?observation, "observed interaction"), + Err(error) => tracing::error!(?interaction, %error, "interaction failed"), + } + + observation + } +} + +impl TargetDriver for EngineTarget { + type Observation = Observation; + + fn execute(&mut self, interaction: &Interaction) -> Result { + EngineTarget::execute(self, interaction) + } +} +pub struct EngineTest; + +impl TestSuite for EngineTest { + type Interaction = Interaction; + + type Interactions = WorkloadGen; + + type Target = EngineTarget; + + type Properties = EngineProperties; + + fn build(&self, rng: Rng) -> Result<(Self::Interactions, Self::Target, Self::Properties), anyhow::Error> { + let schema = default_schema(rng.clone()); + let runtime_seed = rng.next_u64(); + let target = EngineTarget::init(schema.clone(), runtime_seed)?; + let properties = EngineProperties::new(schema.clone()); + + let model = Model::new(schema); + let interactions = WorkloadGen::new(rng, model); + + Ok((interactions, target, properties)) + } +} diff --git a/crates/dst/src/engine/model.rs b/crates/dst/src/engine/model.rs new file mode 100644 index 00000000000..5d6913212e0 --- /dev/null +++ b/crates/dst/src/engine/model.rs @@ -0,0 +1,358 @@ +use super::workload::{ + normalize_rows, CommitDelta, CountState, InsertOutcome, Interaction, Observation, Row, TableDelta, TableRowCount, +}; +use crate::schema::SchemaPlan; + +#[derive(Debug)] +pub struct Model { + schema: SchemaPlan, + committed_tables: Vec, + pending_tx: Option, +} + +#[derive(Debug)] +struct TableState { + rows: Vec, +} + +#[derive(Debug)] +struct PendingTx { + tables: Vec, +} + +// Keep mutable transactions as an overlay: committed rows stay shared, while +// pending tables record only new rows and delete markers. +#[derive(Debug, Default)] +struct PendingTable { + inserts: Vec, + deletes: Vec, +} + +impl PendingTable { + fn is_deleted(&self, row: &Row) -> bool { + self.deletes.iter().any(|deleted| deleted == row) + } +} + +impl PendingTx { + fn new(table_count: usize) -> Self { + Self { + tables: (0..table_count).map(|_| PendingTable::default()).collect(), + } + } +} + +impl Model { + pub fn new(schema: SchemaPlan) -> Self { + let committed_tables = schema.tables.iter().map(|_| TableState { rows: vec![] }).collect(); + Self { + schema, + committed_tables, + pending_tx: None, + } + } + + pub fn schema(&self) -> &SchemaPlan { + &self.schema + } + + fn pending_table(&self, table: usize) -> Option<&PendingTable> { + self.pending_tx.as_ref().map(|pending_tx| &pending_tx.tables[table]) + } + + fn pending_table_mut(&mut self, table: usize) -> &mut PendingTable { + debug_assert!(self.pending_tx.is_some()); + &mut self.pending_tx.as_mut().expect("active transaction").tables[table] + } + + fn visible_committed_rows(&self, table: usize) -> impl Iterator + '_ { + let pending_table = self.pending_table(table); + self.committed_tables[table] + .rows + .iter() + .filter(move |row| pending_table.is_none_or(|pending_table| !pending_table.is_deleted(row))) + } + + // Visibility is committed rows minus delete markers, followed by pending inserts. + fn visible_rows(&self, table: usize) -> impl Iterator + '_ { + self.visible_committed_rows(table).chain( + self.pending_table(table) + .into_iter() + .flat_map(|pending_table| pending_table.inserts.iter()), + ) + } + + fn visible_count(&self, table: usize) -> u64 { + self.visible_rows(table).count() as u64 + } + + fn any_visible_row(&self, table: usize, matches: impl FnMut(&Row) -> bool) -> bool { + self.visible_rows(table).any(matches) + } + + fn violates_unique_constraint(&self, table: usize, row: &Row) -> bool { + let table_plan = &self.schema.tables[table]; + for constraint in &table_plan.unique_constraints { + if self.any_visible_row(table, |visible_row| { + constraint + .columns + .iter() + .all(|&col| visible_row.elements[col] == row.elements[col]) + }) { + return true; + } + } + false + } + + pub fn apply(&mut self, interaction: &Interaction) -> Observation { + match interaction { + Interaction::BeginMutTx => { + debug_assert!(self.pending_tx.is_none()); + self.pending_tx = Some(PendingTx::new(self.committed_tables.len())); + Observation::BeganMutTx + } + Interaction::Insert { table, row } => { + debug_assert!(self.pending_tx.is_some()); + // Properties feed the target-returned row here, so sequence-generated + // values become part of the oracle before commit/replay checks run. + if self.any_visible_row(*table, |visible_row| visible_row == row) { + return Observation::Inserted { + outcome: InsertOutcome::Accepted(row.clone()), + }; + } + + if self.violates_unique_constraint(*table, row) { + return Observation::Inserted { + outcome: InsertOutcome::UniqueConstraintViolation, + }; + } + + self.pending_table_mut(*table).inserts.push(row.clone()); + Observation::Inserted { + outcome: InsertOutcome::Accepted(row.clone()), + } + } + Interaction::Delete { table, row } => { + debug_assert!(self.pending_tx.is_some()); + if self.any_visible_row(*table, |visible_row| visible_row == row) { + let committed_has_row = self.visible_committed_rows(*table).any(|committed| committed == row); + let pending_table = self.pending_table_mut(*table); + pending_table.inserts.retain(|inserted| inserted != row); + if committed_has_row && !pending_table.is_deleted(row) { + pending_table.deletes.push(row.clone()); + } + } + Observation::Deleted + } + Interaction::CommitTx => { + debug_assert!(self.pending_tx.is_some()); + let pending_tx = self.pending_tx.take().expect("active transaction"); + let delta = self.commit_pending(pending_tx); + Observation::Committed { delta } + } + Interaction::Replay => { + self.pending_tx = None; + Observation::Replayed { + state: self.count_state(), + } + } + } + } + + fn commit_pending(&mut self, pending_tx: PendingTx) -> CommitDelta { + let mut tables = Vec::new(); + + for (table, pending_table) in pending_tx.tables.into_iter().enumerate() { + if pending_table.inserts.is_empty() && pending_table.deletes.is_empty() { + continue; + } + + let before_rows = &self.committed_tables[table].rows; + let inserts = normalize_rows( + pending_table + .inserts + .iter() + .filter(|inserted| !before_rows.contains(inserted)) + .cloned() + .collect(), + ); + // A delete followed by the same insert leaves the committed set unchanged. + let deletes = normalize_rows( + before_rows + .iter() + .filter(|before| pending_table.is_deleted(before) && !pending_table.inserts.contains(before)) + .cloned() + .collect(), + ); + let after_count = before_rows + .iter() + .filter(|before| !pending_table.is_deleted(before)) + .count() + + pending_table.inserts.len(); + let truncated = !before_rows.is_empty() && after_count == 0 && !deletes.is_empty(); + + if !inserts.is_empty() || !deletes.is_empty() || truncated { + tables.push(TableDelta { + table, + inserts, + deletes, + truncated, + }); + } + + let committed_rows = &mut self.committed_tables[table].rows; + committed_rows.retain(|row| !pending_table.is_deleted(row)); + committed_rows.extend(pending_table.inserts); + } + + CommitDelta { tables } + } + + pub fn in_mut_tx(&self) -> bool { + self.pending_tx.is_some() + } + + pub fn row_count(&self, table: usize) -> usize { + self.visible_count(table) as usize + } + + pub fn row(&self, table: usize, row: usize) -> Option<&Row> { + self.visible_rows(table).nth(row) + } + + #[cfg(test)] + pub fn rows(&self, table: usize) -> Vec { + self.visible_rows(table).cloned().collect() + } + + fn count_state(&self) -> CountState { + let row_counts = (0..self.schema.tables.len()) + .map(|table| TableRowCount { + table, + count: self.visible_count(table), + }) + .collect(); + CountState { row_counts } + } +} + +#[cfg(test)] +mod tests { + use spacetimedb_lib::AlgebraicValue; + + use super::*; + use crate::schema::{ColumnPlan, IndexAlgorithm, IndexPlan, TablePlan, Type, UniqueConstraintPlan}; + + fn schema() -> SchemaPlan { + SchemaPlan { + tables: vec![TablePlan { + name: "items".into(), + columns: vec![ColumnPlan { + name: "id".into(), + ty: Type::U64, + }], + primary_key: Some(0), + indexes: vec![IndexPlan { + columns: vec![0], + algorithm: IndexAlgorithm::BTree, + }], + unique_constraints: vec![UniqueConstraintPlan { columns: vec![0] }], + sequences: vec![], + is_public: true, + }], + } + } + + fn row(id: u64) -> Row { + Row { + elements: vec![AlgebraicValue::U64(id)].into(), + } + } + + #[test] + fn begin_mut_tx_does_not_clone_committed_tables() { + let mut model = Model::new(schema()); + model.committed_tables[0].rows.push(row(1)); + + model.apply(&Interaction::BeginMutTx); + + let pending_tx = model.pending_tx.as_ref().expect("active transaction"); + assert!(pending_tx + .tables + .iter() + .all(|table| table.inserts.is_empty() && table.deletes.is_empty())); + assert_eq!(model.rows(0), vec![row(1)]); + } + + #[test] + fn insert_records_delta_without_cloning_committed_rows() { + let mut model = Model::new(schema()); + model.committed_tables[0].rows.push(row(1)); + + model.apply(&Interaction::BeginMutTx); + model.apply(&Interaction::Insert { table: 0, row: row(2) }); + + let pending_table = &model.pending_tx.as_ref().expect("active transaction").tables[0]; + assert_eq!(pending_table.inserts, vec![row(2)]); + assert!(pending_table.deletes.is_empty()); + assert_eq!(model.committed_tables[0].rows, vec![row(1)]); + assert_eq!(model.rows(0), vec![row(1), row(2)]); + } + + #[test] + fn delete_records_marker_without_cloning_committed_rows() { + let mut model = Model::new(schema()); + model.committed_tables[0].rows.push(row(1)); + model.committed_tables[0].rows.push(row(2)); + + model.apply(&Interaction::BeginMutTx); + model.apply(&Interaction::Delete { table: 0, row: row(1) }); + + let pending_table = &model.pending_tx.as_ref().expect("active transaction").tables[0]; + assert!(pending_table.inserts.is_empty()); + assert_eq!(pending_table.deletes, vec![row(1)]); + assert_eq!(model.committed_tables[0].rows, vec![row(1), row(2)]); + assert_eq!(model.rows(0), vec![row(2)]); + } + + #[test] + fn insert_is_visible_before_commit_and_replay_rolls_back() { + let mut model = Model::new(schema()); + + model.apply(&Interaction::BeginMutTx); + model.apply(&Interaction::Insert { table: 0, row: row(1) }); + assert_eq!(model.row_count(0), 1); + + model.apply(&Interaction::Replay); + model.apply(&Interaction::BeginMutTx); + assert_eq!(model.row_count(0), 0); + } + + #[test] + fn commit_applies_only_pending_overlay() { + let mut model = Model::new(schema()); + + model.apply(&Interaction::BeginMutTx); + model.apply(&Interaction::Insert { table: 0, row: row(1) }); + let observation = model.apply(&Interaction::CommitTx); + + let Observation::Committed { delta, .. } = observation else { + panic!("expected commit observation"); + }; + assert_eq!(delta.tables.len(), 1); + assert_eq!(delta.tables[0].inserts, vec![row(1)]); + assert_eq!(model.committed_tables[0].rows, vec![row(1)]); + } + + #[test] + fn delete_is_visible_before_commit() { + let mut model = Model::new(schema()); + model.committed_tables[0].rows.push(row(1)); + + model.apply(&Interaction::BeginMutTx); + model.apply(&Interaction::Delete { table: 0, row: row(1) }); + + assert_eq!(model.row_count(0), 0); + } +} diff --git a/crates/dst/src/engine/properties.rs b/crates/dst/src/engine/properties.rs new file mode 100644 index 00000000000..667eec09510 --- /dev/null +++ b/crates/dst/src/engine/properties.rs @@ -0,0 +1,173 @@ +use super::model::Model; +use super::workload::{InsertOutcome, Interaction, Observation, Row}; +use crate::schema::SchemaPlan; +use crate::traits::Properties; + +pub struct EngineProperties { + oracle: EngineOracle, + properties: Vec>, +} + +impl EngineProperties { + pub fn new(schema: SchemaPlan) -> Self { + Self { + oracle: EngineOracle::new(schema), + properties: vec![ + Box::new(InsertMatches), + Box::new(CommitMatches), + Box::new(ReplayMatchesModel), + ], + } + } +} + +impl Properties for EngineProperties { + fn observe(&mut self, interaction: &Interaction, observation: &Observation) -> Result<(), anyhow::Error> { + let expected = self.oracle.apply(interaction, observation)?; + + for property in &self.properties { + if property.observes(interaction) { + property.check(interaction, observation, &expected)?; + } + } + + Ok(()) + } +} + +trait EngineProperty { + fn observes(&self, interaction: &Interaction) -> bool; + + fn check(&self, interaction: &Interaction, observation: &Observation, expected: &Observation) + -> anyhow::Result<()>; +} + +struct EngineOracle { + model: Model, +} + +impl EngineOracle { + fn new(schema: SchemaPlan) -> Self { + Self { + model: Model::new(schema), + } + } + + fn apply(&mut self, interaction: &Interaction, observation: &Observation) -> anyhow::Result { + let observation = match (interaction, observation) { + ( + Interaction::Insert { table, .. }, + Observation::Inserted { + outcome: InsertOutcome::Accepted(row), + }, + ) => self.apply_insert(*table, row), + ( + Interaction::Insert { .. }, + Observation::Inserted { + outcome: InsertOutcome::UniqueConstraintViolation, + }, + ) => self.model.apply(interaction), + (Interaction::Insert { .. }, _) => anyhow::bail!("insert produced unexpected observation"), + _ => self.model.apply(interaction), + }; + + Ok(observation) + } + + fn apply_insert(&mut self, table: usize, row: &Row) -> Observation { + self.model.apply(&Interaction::Insert { + table, + row: row.clone(), + }) + } +} + +struct InsertMatches; + +impl EngineProperty for InsertMatches { + fn observes(&self, interaction: &Interaction) -> bool { + matches!(interaction, Interaction::Insert { .. }) + } + + fn check( + &self, + _interaction: &Interaction, + observation: &Observation, + expected: &Observation, + ) -> anyhow::Result<()> { + let Observation::Inserted { outcome } = observation else { + anyhow::bail!("insert_matches: insert produced unexpected observation"); + }; + let Observation::Inserted { outcome: expected } = expected else { + unreachable!("InsertMatches only subscribes to insert interactions"); + }; + + match (outcome, expected) { + (InsertOutcome::Accepted(row), InsertOutcome::Accepted(expected)) => { + anyhow::ensure!(row == expected, "insert_matches: accepted row diverged from model"); + } + (InsertOutcome::UniqueConstraintViolation, InsertOutcome::UniqueConstraintViolation) => {} + (InsertOutcome::Accepted(_), InsertOutcome::UniqueConstraintViolation) => { + anyhow::bail!("insert_matches: target accepted row rejected by model"); + } + (InsertOutcome::UniqueConstraintViolation, InsertOutcome::Accepted(_)) => { + anyhow::bail!("insert_matches: target rejected row accepted by model"); + } + } + + Ok(()) + } +} + +struct CommitMatches; + +impl EngineProperty for CommitMatches { + fn observes(&self, interaction: &Interaction) -> bool { + matches!(interaction, Interaction::CommitTx) + } + + fn check( + &self, + _interaction: &Interaction, + observation: &Observation, + expected: &Observation, + ) -> anyhow::Result<()> { + let Observation::Committed { delta, .. } = observation else { + anyhow::bail!("commit_matches: commit produced unexpected observation"); + }; + let Observation::Committed { delta: expected, .. } = expected else { + unreachable!("CommitMatches only subscribes to commit interactions"); + }; + + anyhow::ensure!(delta == expected, "commit_matches: committed delta diverged from model"); + Ok(()) + } +} + +struct ReplayMatchesModel; + +impl EngineProperty for ReplayMatchesModel { + fn observes(&self, interaction: &Interaction) -> bool { + matches!(interaction, Interaction::Replay) + } + + fn check( + &self, + _interaction: &Interaction, + observation: &Observation, + expected: &Observation, + ) -> anyhow::Result<()> { + let Observation::Replayed { state } = observation else { + anyhow::bail!("replay_matches_model: replay produced unexpected observation"); + }; + let Observation::Replayed { state: expected } = expected else { + unreachable!("ReplayMatchesModel only subscribes to replay interactions"); + }; + + anyhow::ensure!( + state == expected, + "replay_matches_model: replayed state diverged from model" + ); + Ok(()) + } +} diff --git a/crates/dst/src/engine/workload.rs b/crates/dst/src/engine/workload.rs new file mode 100644 index 00000000000..bc9e4fcc36e --- /dev/null +++ b/crates/dst/src/engine/workload.rs @@ -0,0 +1,308 @@ +use std::fmt::{Debug, Error, Formatter}; + +use spacetimedb_lib::bsatn::to_vec; +use spacetimedb_lib::{AlgebraicValue, ProductValue}; +use spacetimedb_runtime::sim::Rng; +use spacetimedb_sats::ArrayValue; + +use super::model::Model; +use crate::schema::{SchemaPlan, TablePlan, Type}; + +pub type Row = ProductValue; + +#[derive(Debug, Clone)] +pub enum Interaction { + BeginMutTx, + Insert { table: usize, row: Row }, + Delete { table: usize, row: Row }, + CommitTx, + Replay, +} + +#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)] +pub struct InteractionCounts { + pub total: usize, + pub begin_mut_tx: usize, + pub insert: usize, + pub delete: usize, + pub commit_tx: usize, + pub replay: usize, +} + +impl InteractionCounts { + pub fn record(&mut self, interaction: &Interaction) { + self.total += 1; + + match interaction { + Interaction::BeginMutTx => self.begin_mut_tx += 1, + Interaction::Insert { .. } => self.insert += 1, + Interaction::Delete { .. } => self.delete += 1, + Interaction::CommitTx => self.commit_tx += 1, + Interaction::Replay => self.replay += 1, + } + } +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum Observation { + BeganMutTx, + Inserted { outcome: InsertOutcome }, + Deleted, + Committed { delta: CommitDelta }, + Replayed { state: CountState }, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum InsertOutcome { + Accepted(Row), + UniqueConstraintViolation, +} + +#[derive(Debug, Clone, Copy)] +pub struct InteractionWeights { + pub insert: u64, + pub delete: u64, + pub commit_tx: u64, + pub replay: u64, +} + +impl Default for InteractionWeights { + fn default() -> Self { + Self { + insert: 50, + delete: 20, + commit_tx: 29, + replay: 1, + } + } +} + +#[derive(Debug, Clone, Copy)] +enum InteractionChoice { + Insert, + Delete, + CommitTx, + Replay, +} + +pub struct WorkloadGen { + rng: Rng, + model: Model, + stats: InteractionCounts, + weights: InteractionWeights, +} + +impl WorkloadGen { + pub fn new(rng: Rng, model: Model) -> Self { + Self { + rng, + model, + stats: InteractionCounts::default(), + weights: InteractionWeights::default(), + } + } + + pub fn stats(&self) -> InteractionCounts { + self.stats + } + + fn schema(&self) -> &SchemaPlan { + self.model.schema() + } + + fn gen_value(&self, ty: Type) -> AlgebraicValue { + match ty { + Type::Bool => AlgebraicValue::Bool(self.rng.next_u64().is_multiple_of(2)), + Type::I64 => AlgebraicValue::I64(self.rng.next_u64() as i64), + Type::U64 => AlgebraicValue::U64(self.rng.next_u64()), + Type::String => AlgebraicValue::String(format!("v_{}", self.rng.next_u64()).into()), + Type::Bytes => { + let len = (self.rng.next_u64() % 16) as usize; + let bytes: Vec = (0..len).map(|_| self.rng.next_u64() as u8).collect(); + AlgebraicValue::Array(ArrayValue::U8(bytes.into())) + } + } + } + + fn gen_row(&self, table: &TablePlan) -> Row { + table + .columns + .iter() + .map(|c| self.gen_value(c.ty)) + .collect::() + } + + fn gen_insert_row(&self, table_idx: usize) -> Row { + let table = &self.schema().tables[table_idx]; + let mut row = self.gen_row(table); + + if let Some(sequence) = table.sequences.first() { + row.elements[sequence.column] = match table.columns[sequence.column].ty { + Type::I64 => AlgebraicValue::I64(0), + Type::U64 => AlgebraicValue::U64(0), + _ => unreachable!("sequence columns are integral"), + }; + } + + row + } + + fn non_auto_inc_table_idx(&self) -> Option { + let auto_inc_table = self + .schema() + .auto_inc_table_and_column() + .map(|(table_idx, _)| table_idx); + + (0..self.schema().tables.len()).find(|&table_idx| Some(table_idx) != auto_inc_table) + } + + pub fn next_interaction(&mut self) -> Interaction { + let choice = self.pick_interaction_choice(); + let interaction = self.interaction_from_choice(choice); + + self.model.apply(&interaction); + self.stats.record(&interaction); + + interaction + } + + fn interaction_from_choice(&mut self, choice: InteractionChoice) -> Interaction { + if !self.model.in_mut_tx() { + return match choice { + InteractionChoice::Replay => Interaction::Replay, + + // Insert/Delete/CommitTx are not legal outside a mutable tx. + // Treat those weighted choices as pressure to start one. + InteractionChoice::Insert | InteractionChoice::Delete | InteractionChoice::CommitTx => { + Interaction::BeginMutTx + } + }; + } + + match choice { + InteractionChoice::Replay => Interaction::Replay, + + InteractionChoice::Insert => { + let table = self.insert_table_idx(); + + Interaction::Insert { + table, + row: self.gen_insert_row(table), + } + } + + InteractionChoice::Delete => { + let Some(table) = self.deletable_table_idx() else { + return Interaction::CommitTx; + }; + + let row_index = self.rng.index(self.model.row_count(table)); + + Interaction::Delete { + table, + row: self + .model + .row(table, row_index) + .expect("row index is in bounds") + .clone(), + } + } + + InteractionChoice::CommitTx => Interaction::CommitTx, + } + } + + fn pick_interaction_choice(&mut self) -> InteractionChoice { + let weights = self.weights; + + match self.pick_weighted(&[weights.insert, weights.delete, weights.commit_tx, weights.replay]) { + 0 => InteractionChoice::Insert, + 1 => InteractionChoice::Delete, + 2 => InteractionChoice::CommitTx, + 3 => InteractionChoice::Replay, + _ => unreachable!(), + } + } + + fn pick_weighted(&mut self, weights: &[u64]) -> usize { + let total: u64 = weights.iter().sum(); + + assert!(total > 0, "at least one interaction weight must be non-zero"); + + let mut selected = self.rng.next_u64() % total; + + for (idx, weight) in weights.iter().copied().enumerate() { + if selected < weight { + return idx; + } + + selected -= weight; + } + + unreachable!("selected value is always inside total weight") + } + + fn insert_table_idx(&self) -> usize { + let auto_inc_table_idx = self + .schema() + .auto_inc_table_and_column() + .map(|(table_idx, _)| table_idx); + + match auto_inc_table_idx { + Some(table_idx) if !self.rng.next_u64().is_multiple_of(3) => table_idx, + _ => self.rng.index(self.schema().tables.len()), + } + } + + fn deletable_table_idx(&self) -> Option { + self.non_auto_inc_table_idx() + .filter(|&table_idx| self.model.row_count(table_idx) > 0) + } +} + +impl Debug for WorkloadGen { + fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), Error> { + write!(f, "{:?}", self.stats()) + } +} + +impl Iterator for WorkloadGen { + type Item = Interaction; + + fn next(&mut self) -> Option { + Some(self.next_interaction()) + } +} + +pub fn row_to_bytes(row: &Row) -> Vec { + to_vec(row).expect("row serialization must not fail") +} + +pub fn normalize_rows(mut rows: Vec) -> Vec { + rows.sort_by_key(row_to_bytes); + rows +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct CountState { + pub row_counts: Vec, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct TableRowCount { + pub table: usize, + pub count: u64, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct CommitDelta { + pub tables: Vec, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct TableDelta { + pub table: usize, + pub inserts: Vec, + pub deletes: Vec, + pub truncated: bool, +} diff --git a/crates/dst/src/main.rs b/crates/dst/src/main.rs new file mode 100644 index 00000000000..e208f921a83 --- /dev/null +++ b/crates/dst/src/main.rs @@ -0,0 +1,91 @@ +use std::time::{SystemTime, UNIX_EPOCH}; + +use clap::{Args, Parser, Subcommand}; +use spacetimedb_runtime::sim::Rng; +use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; + +mod engine; +mod schema; +mod sim; +mod traits; + +use crate::{engine::EngineTest, traits::TestSuite}; + +#[derive(Parser, Debug)] +#[command(name = "spacetimedb-dst")] +#[command(about = "Run deterministic simulation targets")] +struct Cli { + #[command(subcommand)] + command: Command, +} + +#[derive(Subcommand, Debug)] +enum Command { + Run(RunArgs), +} + +#[derive(Args, Debug)] +struct RunArgs { + #[arg(long, help = "Seed for generated choices. Defaults to wall-clock time.")] + seed: Option, + #[arg(long, help = "Deterministic interaction budget.")] + max_interactions: Option, +} + +fn main() -> anyhow::Result<()> { + init_tracing(); + match Cli::parse().command { + Command::Run(args) => run_command(args), + } +} + +fn init_tracing() { + let timer = tracing_subscriber::fmt::time(); + let format = tracing_subscriber::fmt::format::Format::default() + .with_timer(timer) + .with_line_number(true) + .with_file(true) + .with_target(false) + .compact(); + let fmt_layer = tracing_subscriber::fmt::Layer::default() + .event_format(format) + .with_writer(std::io::stderr); + let env_filter_layer = tracing_subscriber::EnvFilter::from_default_env(); + + let _ = tracing_subscriber::Registry::default() + .with(fmt_layer) + .with(env_filter_layer) + .try_init(); +} + +fn run_command(args: RunArgs) -> anyhow::Result<()> { + let seed = resolve_seed(args.seed); + let config = RunConfig { + max_interactions: args.max_interactions, + seed, + }; + + tracing::info!(?config, "initial run config"); + + // Generate schema from seed. + let rng = Rng::new(config.seed); + + let test = EngineTest {}; + test.run(rng, config.max_interactions)?; + Ok(()) +} + +fn resolve_seed(seed: Option) -> u64 { + seed.unwrap_or_else(|| { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("time went backwards") + .as_nanos() as u64 + }) +} + +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct RunConfig { + pub max_interactions: Option, + pub seed: u64, +} diff --git a/crates/dst/src/schema.rs b/crates/dst/src/schema.rs new file mode 100644 index 00000000000..641281db3c3 --- /dev/null +++ b/crates/dst/src/schema.rs @@ -0,0 +1,475 @@ +use spacetimedb_lib::db::raw_def::v10::*; +use spacetimedb_lib::db::raw_def::v9::{RawIndexAlgorithm, TableAccess, TableType}; +use spacetimedb_primitives::{ColId, ColList}; +use spacetimedb_runtime::sim::Rng; +use spacetimedb_sats::{AlgebraicType, ArrayType, ProductType, ProductTypeElement}; + +pub fn default_schema(rng: Rng) -> SchemaPlan { + let profile = SchemaProfile::default(); + let mut plan = SchemaGenerator::new(rng, profile).gen_schema(); + plan.ensure_auto_inc_table(); + plan +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub enum Type { + Bool, + I64, + U64, + String, + Bytes, +} + +impl Type { + pub const ALL: &'static [Type] = &[Type::Bool, Type::I64, Type::U64, Type::String, Type::Bytes]; + + pub fn to_algebraic(self) -> AlgebraicType { + match self { + Type::Bool => AlgebraicType::Bool, + Type::I64 => AlgebraicType::I64, + Type::U64 => AlgebraicType::U64, + Type::String => AlgebraicType::String, + Type::Bytes => AlgebraicType::Array(ArrayType { + elem_ty: Box::new(AlgebraicType::U8), + }), + } + } + + pub fn is_integral(self) -> bool { + matches!(self, Type::I64 | Type::U64) + } +} + +// Schema plan — the canonical source of truth. +// This Schema should be able to translate to valid `RawModuleDefV10`. +#[derive(Debug, Clone)] +pub struct SchemaPlan { + pub tables: Vec, +} + +impl SchemaPlan { + pub fn auto_inc_table_and_column(&self) -> Option<(usize, usize)> { + self.tables + .iter() + .enumerate() + .find_map(|(table_idx, table)| table.sequences.first().map(|sequence| (table_idx, sequence.column))) + } + + pub fn ensure_auto_inc_table(&mut self) { + if self.auto_inc_table_and_column().is_some() { + return; + } + + let table = self.tables.first_mut().expect("schema must contain at least one table"); + if table.columns.is_empty() { + table.columns.push(ColumnPlan { + name: "id".into(), + ty: Type::U64, + }); + } else { + table.columns[0].ty = Type::U64; + } + + table.primary_key = Some(0); + if !table + .unique_constraints + .iter() + .any(|constraint| constraint.columns == [0]) + { + table.unique_constraints.push(UniqueConstraintPlan { columns: vec![0] }); + } + if !table.indexes.iter().any(|index| index.columns == [0]) { + table.indexes.push(IndexPlan { + columns: vec![0], + algorithm: IndexAlgorithm::BTree, + }); + } + table.sequences = vec![SequencePlan::new(0, Type::U64).expect("u64 is integral")]; + } +} + +#[derive(Debug, Clone)] +pub struct TablePlan { + pub name: String, + pub columns: Vec, + pub primary_key: Option, + pub indexes: Vec, + pub unique_constraints: Vec, + pub sequences: Vec, + pub is_public: bool, +} + +#[derive(Debug, Clone)] +pub struct ColumnPlan { + pub name: String, + pub ty: Type, +} + +#[derive(Debug, Clone)] +pub struct IndexPlan { + /// Indices into `TablePlan.columns`. + pub columns: Vec, + pub algorithm: IndexAlgorithm, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum IndexAlgorithm { + BTree, + Hash, +} + +#[derive(Debug, Clone)] +pub struct UniqueConstraintPlan { + /// Indices into `TablePlan.columns`. Non-empty. + pub columns: Vec, +} + +/// A sequence on a specific integral column. +#[derive(Debug, Clone)] +pub struct SequencePlan { + /// Index into `TablePlan.columns`. + pub column: usize, +} + +impl SequencePlan { + /// Create a sequence plan. Returns `None` if the type is not integral. + pub fn new(column: usize, ty: Type) -> Option { + if !ty.is_integral() { + return None; + } + Some(Self { column }) + } +} + +// Lowering into RawModuleDefV10. +pub fn to_raw_def(schema: &SchemaPlan) -> RawModuleDefV10 { + let mut builder = RawModuleDefV10Builder::new(); + builder.set_case_conversion_policy(CaseConversionPolicy::None); + + for table in &schema.tables { + to_raw_def_table(&mut builder, table); + } + + builder.finish() +} + +fn to_raw_def_table(builder: &mut RawModuleDefV10Builder, table: &TablePlan) { + let product_type = ProductType { + elements: table + .columns + .iter() + .map(|col| ProductTypeElement { + name: Some(col.name.clone().into()), + algebraic_type: col.ty.to_algebraic(), + }) + .collect(), + }; + + let mut tbl = builder.build_table_with_new_type(table.name.clone(), product_type, true); + + tbl = tbl.with_type(TableType::User); + tbl = tbl.with_access(if table.is_public { + TableAccess::Public + } else { + TableAccess::Private + }); + // Primary key. + if let Some(pk) = table.primary_key { + tbl = tbl.with_primary_key(ColId(pk as u16)); + } + + // Unique constraints — all of them, including PK-matching. + for constraint in &table.unique_constraints { + let col_list: ColList = constraint.columns.iter().map(|&c| ColId(c as u16)).collect(); + tbl = tbl.with_unique_constraint(col_list); + } + + // Indexes. + for index in &table.indexes { + let col_list: ColList = index.columns.iter().map(|&c| ColId(c as u16)).collect(); + + let algorithm = match index.algorithm { + IndexAlgorithm::BTree => RawIndexAlgorithm::BTree { columns: col_list }, + IndexAlgorithm::Hash => RawIndexAlgorithm::Hash { columns: col_list }, + }; + + let source_name = format!( + "{}_{}_idx", + table.name, + index + .columns + .iter() + .map(|&c| table.columns[c].name.as_str()) + .collect::>() + .join("_") + ); + + tbl = tbl.with_index_no_accessor_name(algorithm, source_name); + } + + // Sequences — all of them. + for seq in &table.sequences { + tbl = tbl.with_column_sequence(ColId(seq.column as u16)); + } + + tbl.finish(); +} + +/// Controls the shape of generated schemas. +#[derive(Debug, Clone)] +pub struct SchemaProfile { + pub table_count: (usize, usize), + pub columns: (usize, usize), + pub pk_prob: f64, + pub auto_inc_prob: f64, + pub indexes: (usize, usize), + pub unique_constraints: (usize, usize), + pub btree_prob: f64, + pub private_prob: f64, +} + +impl Default for SchemaProfile { + fn default() -> Self { + Self { + table_count: (1, 100), + columns: (1, 10), + pk_prob: 0.7, + auto_inc_prob: 0.3, + indexes: (0, 3), + unique_constraints: (0, 2), + btree_prob: 0.7, + private_prob: 0.1, + } + } +} + +pub struct SchemaGenerator { + rng: Rng, + profile: SchemaProfile, +} + +impl SchemaGenerator { + pub fn new(rng: Rng, profile: SchemaProfile) -> Self { + Self { rng, profile } + } + + fn range(&self, (lo, hi): (usize, usize)) -> usize { + if lo >= hi { + return lo; + } + lo + (self.rng.next_u64() as usize % (hi - lo + 1)) + } + + fn gen_type(&self) -> Type { + Type::ALL[self.rng.index(Type::ALL.len())] + } + + fn gen_columns(&self) -> Vec { + let n = self.range(self.profile.columns); + let mut names = Vec::with_capacity(n); + let mut seen = Vec::with_capacity(n); + for _ in 0..n { + let name = self.gen_column_name(&seen); + seen.push(name.clone()); + names.push(ColumnPlan { + name, + ty: self.gen_type(), + }); + } + names + } + + fn gen_ident(&self) -> String { + const CHARS: &[u8] = b"abcdefghijklmnopqrstuvwxyz0123456789_"; + const FIRST: &[u8] = b"abcdefghijklmnopqrstuvwxyz_"; + let len = 4 + (self.rng.next_u64() as usize % 12); + let mut s = String::with_capacity(len); + s.push(FIRST[self.rng.index(FIRST.len())] as char); + for _ in 1..len { + s.push(CHARS[self.rng.index(CHARS.len())] as char); + } + s + } + + fn gen_column_name(&self, seen: &[String]) -> String { + loop { + let name = self.gen_ident(); + if !seen.contains(&name) { + return name; + } + } + } + + fn gen_unique_constraints(&self, columns: &[ColumnPlan], pk: &Option) -> Vec { + let n = self.range(self.profile.unique_constraints); + let mut seen: Vec> = Vec::new(); + let mut result = Vec::new(); + for _ in 0..n { + let num_cols = 1 + self.rng.index(columns.len().min(3)); + let mut cols: Vec = (0..num_cols).map(|_| self.rng.index(columns.len())).collect(); + cols.sort(); + cols.dedup(); + if !cols.is_empty() && !seen.contains(&cols) { + seen.push(cols.clone()); + result.push(UniqueConstraintPlan { columns: cols }); + } + } + // Ensure PK has a matching unique constraint. + if let Some(pk) = pk + && !seen.iter().any(|cols| cols.len() == 1 && cols[0] == *pk) + { + result.push(UniqueConstraintPlan { columns: vec![*pk] }); + } + result + } + + fn gen_indexes( + &self, + columns: &[ColumnPlan], + unique_constraints: &[UniqueConstraintPlan], + pk: &Option, + ) -> Vec { + // Every unique constraint and PK needs a matching index. + let mut seen_cols: Vec> = Vec::new(); + let mut indexes: Vec = Vec::new(); + + // Index for PK. + if let Some(pk) = pk { + seen_cols.push(vec![*pk]); + indexes.push(IndexPlan { + columns: vec![*pk], + algorithm: IndexAlgorithm::BTree, + }); + } + + // Indexes for unique constraints. + for constraint in unique_constraints { + if seen_cols.contains(&constraint.columns) { + continue; + } + seen_cols.push(constraint.columns.clone()); + indexes.push(IndexPlan { + columns: constraint.columns.clone(), + algorithm: IndexAlgorithm::BTree, + }); + } + + // Additional random indexes. + let n = self.range(self.profile.indexes); + for _ in 0..n { + let num_cols = 1 + self.rng.index(columns.len().min(3)); + let mut cols: Vec = (0..num_cols).map(|_| self.rng.index(columns.len())).collect(); + cols.sort(); + cols.dedup(); + if cols.is_empty() || seen_cols.contains(&cols) { + continue; + } + seen_cols.push(cols.clone()); + let algorithm = if self.rng.sample_probability(self.profile.btree_prob) { + IndexAlgorithm::BTree + } else { + IndexAlgorithm::Hash + }; + indexes.push(IndexPlan { + columns: cols, + algorithm, + }); + } + + indexes + } + + fn gen_table(&self, _table_index: usize) -> TablePlan { + let columns = self.gen_columns(); + + let primary_key = if self.rng.sample_probability(self.profile.pk_prob) && !columns.is_empty() { + Some(self.rng.index(columns.len())) + } else { + None + }; + + let unique_constraints = self.gen_unique_constraints(&columns, &primary_key); + + let sequences = if let Some(pk) = primary_key { + if columns[pk].ty.is_integral() && self.rng.sample_probability(self.profile.auto_inc_prob) { + SequencePlan::new(pk, columns[pk].ty).into_iter().collect() + } else { + vec![] + } + } else { + vec![] + }; + + let indexes = self.gen_indexes(&columns, &unique_constraints, &primary_key); + + let name = format!("tbl_{}", self.gen_ident()); + + TablePlan { + name, + columns, + primary_key, + indexes, + unique_constraints, + sequences, + is_public: !self.rng.sample_probability(self.profile.private_prob), + } + } + + pub fn gen_schema(&self) -> SchemaPlan { + let table_count = self.range(self.profile.table_count); + let tables = (0..table_count).map(|i| self.gen_table(i)).collect(); + SchemaPlan { tables } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn lower_single_table() { + let schema = SchemaPlan { + tables: vec![TablePlan { + name: "users".into(), + columns: vec![ + ColumnPlan { + name: "id".into(), + ty: Type::U64, + }, + ColumnPlan { + name: "name".into(), + ty: Type::String, + }, + ColumnPlan { + name: "score".into(), + ty: Type::I64, + }, + ], + primary_key: Some(0), + indexes: vec![IndexPlan { + columns: vec![2], + algorithm: IndexAlgorithm::BTree, + }], + unique_constraints: vec![UniqueConstraintPlan { columns: vec![0] }], + sequences: vec![SequencePlan::new(0, Type::U64).unwrap()], + is_public: true, + }], + }; + + let raw = to_raw_def(&schema); + + // Should have Typespace, Types, and Tables sections. + assert!(raw.typespace().is_some()); + assert!(raw.types().is_some()); + let tables = raw.tables().unwrap(); + assert_eq!(tables.len(), 1); + + let t = &tables[0]; + assert_eq!(t.source_name.as_ref(), "users"); + assert_eq!(t.table_type, TableType::User); + assert_eq!(t.table_access, TableAccess::Public); + assert_eq!(t.primary_key.len(), 1); + assert_eq!(t.indexes.len(), 1); + assert_eq!(t.sequences.len(), 1); + } +} diff --git a/crates/dst/src/sim/commitlog.rs b/crates/dst/src/sim/commitlog.rs new file mode 100644 index 00000000000..dbf6d004a4c --- /dev/null +++ b/crates/dst/src/sim/commitlog.rs @@ -0,0 +1,494 @@ +use std::{ + collections::{btree_map, BTreeMap}, + fmt, io, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, Mutex, RwLock, + }, +}; + +use spacetimedb_commitlog::{ + repo::{CompressOnce, CompressionStats, Repo, RepoWithoutLockFile, SegmentLen, SegmentReader, TxOffset}, + segment::{FileLike, Header}, + Commitlog, Decoder, Options, Transaction, +}; +use spacetimedb_durability::{Close, Durability, DurableOffset, History, PreparedTx}; +use spacetimedb_engine::relational_db::Txdata; +use spacetimedb_runtime::sync::watch; + +#[derive(Clone, Debug)] +pub struct InMemoryCommitlog { + repo: Memory, + options: Options, +} + +impl InMemoryCommitlog { + pub fn new() -> Self { + Self { + repo: Memory::unlimited(), + options: Options::default(), + } + } + + pub fn open_handle(&self) -> io::Result { + InMemoryCommitlogHandle::open(self.repo.clone(), self.options) + } +} + +#[derive(Clone)] +pub struct InMemoryCommitlogHandle { + inner: Arc, +} + +struct HandleInner { + log: Commitlog, + durable_tx: watch::Sender>, + closed: AtomicBool, +} + +impl InMemoryCommitlogHandle { + fn open(repo: Memory, options: Options) -> io::Result { + let log = Commitlog::open_with_repo(repo, options)?; + let (durable_tx, _) = watch::channel(log.max_committed_offset()); + Ok(Self { + inner: Arc::new(HandleInner { + log, + durable_tx, + closed: AtomicBool::new(false), + }), + }) + } +} + +impl Durability for InMemoryCommitlogHandle { + type TxData = Txdata; + + fn append_tx(&self, tx: PreparedTx) { + assert!( + !self.inner.closed.load(Ordering::Acquire), + "in-memory commitlog durability is closed" + ); + + let tx = tx.into_transaction(); + self.inner.log.commit([tx]).expect("in-memory commitlog append failed"); + let durable_offset = self + .inner + .log + .flush_and_sync() + .expect("in-memory commitlog flush failed"); + let _ = self.inner.durable_tx.send(durable_offset); + } + + fn durable_tx_offset(&self) -> DurableOffset { + self.inner.durable_tx.subscribe().into() + } + + fn close(&self) -> Close { + self.inner.closed.store(true, Ordering::Release); + let durable_offset = self.inner.log.max_committed_offset(); + let _ = self.inner.durable_tx.send(durable_offset); + Box::pin(async move { durable_offset }) + } +} + +impl History for InMemoryCommitlogHandle { + type TxData = Txdata; + + fn fold_transactions_from(&self, offset: TxOffset, decoder: D) -> Result<(), D::Error> + where + D: Decoder, + D::Error: From, + { + self.inner.log.fold_transactions_from(offset, decoder) + } + + fn transactions_from<'a, D>( + &self, + offset: TxOffset, + decoder: &'a D, + ) -> impl Iterator, D::Error>> + where + D: Decoder, + D::Error: From, + Self::TxData: 'a, + { + self.inner.log.transactions_from(offset, decoder) + } + + fn tx_range_hint(&self) -> (TxOffset, Option) { + let min = self.inner.log.min_committed_offset().unwrap_or_default(); + let max = self.inner.log.max_committed_offset(); + + (min, max) + } +} + +const PAGE_SIZE: usize = 4096; + +type SharedLock = Arc>; +type SpaceOnDevice = Arc>; + +#[derive(Clone, Debug)] +pub struct Memory { + space: SpaceOnDevice, + segments: SharedLock>>, +} + +impl Memory { + pub fn new(total_space: u64) -> Self { + Self { + space: Arc::new(Mutex::new(total_space)), + segments: <_>::default(), + } + } + + pub fn unlimited() -> Self { + Self::new(u64::MAX) + } +} + +impl fmt::Display for Memory { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str("") + } +} + +impl Repo for Memory { + type SegmentWriter = Segment; + type SegmentReader = ReadOnlySegment; + + fn create_segment(&self, offset: u64, header: Header) -> io::Result { + let mut inner = self.segments.write().unwrap(); + let mut segment = match inner.entry(offset) { + btree_map::Entry::Occupied(entry) => { + let entry = entry.get(); + if entry.read().unwrap().is_empty() { + Segment::from_shared(self.space.clone(), entry.clone()) + } else { + return Err(io::Error::new( + io::ErrorKind::AlreadyExists, + format!("segment {offset} already exists"), + )); + } + } + btree_map::Entry::Vacant(entry) => { + let storage = entry.insert(Arc::new(RwLock::new(Storage::new()))); + Segment::from_shared(self.space.clone(), storage.clone()) + } + }; + header.write(&mut segment)?; + + Ok(segment) + } + + fn open_segment_reader(&self, offset: u64) -> io::Result { + self.open_segment_writer(offset).map(Into::into) + } + + fn open_segment_writer(&self, offset: u64) -> io::Result { + let inner = self.segments.read().unwrap(); + let Some(buf) = inner.get(&offset) else { + return Err(io::Error::new( + io::ErrorKind::NotFound, + format!("segment {offset} does not exist"), + )); + }; + Ok(Segment::from_shared(self.space.clone(), buf.clone())) + } + + fn remove_segment(&self, offset: u64) -> io::Result<()> { + let mut inner = self.segments.write().unwrap(); + if inner.remove(&offset).is_none() { + return Err(io::Error::new( + io::ErrorKind::NotFound, + format!("segment {offset} does not exist"), + )); + } + + Ok(()) + } + + fn compress_segment_with(&self, _: u64, _: impl CompressOnce) -> io::Result { + Ok(<_>::default()) + } + + fn existing_offsets(&self) -> io::Result> { + Ok(self.segments.read().unwrap().keys().copied().collect()) + } +} + +impl RepoWithoutLockFile for Memory {} + +#[derive(Debug)] +struct Storage { + alloc: u64, + buf: Vec, +} + +impl Storage { + fn new() -> Self { + Self { + alloc: 0, + buf: Vec::with_capacity(PAGE_SIZE), + } + } + + const fn len(&self) -> usize { + self.buf.len() + } + + const fn is_empty(&self) -> bool { + self.buf.is_empty() + } +} + +#[derive(Clone, Debug)] +pub struct Segment { + pos: u64, + storage: SharedLock, + space: SpaceOnDevice, +} + +impl Segment { + fn from_shared(space: SpaceOnDevice, storage: SharedLock) -> Self { + Self { pos: 0, storage, space } + } + + fn len(&self) -> usize { + self.storage.read().unwrap().len() + } +} + +impl io::Write for Segment { + fn write(&mut self, buf: &[u8]) -> io::Result { + if buf.is_empty() { + return Ok(0); + } + + let mut storage = self.storage.write().unwrap(); + let requested_end = self + .pos + .checked_add(buf.len() as u64) + .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, "write position overflow"))?; + + if requested_end > storage.alloc { + let mut avail = self.space.lock().unwrap(); + + if self.pos >= storage.alloc { + let minimum_alloc = next_page_multiple( + self.pos + .checked_add(1) + .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, "write position overflow"))?, + )?; + let needed = minimum_alloc - storage.alloc; + if *avail < needed { + return Err(enospc()); + } + } + + let target_alloc = next_page_multiple(requested_end)?; + let wanted = target_alloc - storage.alloc; + let available = wanted.min(*avail); + + storage.alloc += available; + *avail -= available; + } + + debug_assert!(self.pos < storage.alloc); + let writable = buf.len().min((storage.alloc - self.pos) as usize); + let start = self.pos as usize; + let end = start + writable; + + if storage.buf.len() < start { + storage.buf.resize(start, 0); + } + if storage.buf.len() < end { + storage.buf.resize(end, 0); + } + storage.buf[start..end].copy_from_slice(&buf[..writable]); + self.pos += writable as u64; + + Ok(writable) + } + + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } +} + +impl io::Read for Segment { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + let storage = self.storage.read().unwrap(); + + let Some(remaining) = storage.len().checked_sub(self.pos as usize) else { + return Ok(0); + }; + let want = remaining.min(buf.len()); + let pos = self.pos as usize; + buf[..want].copy_from_slice(&storage.buf[pos..pos + want]); + self.pos += want as u64; + + Ok(want) + } +} + +impl io::Seek for Segment { + fn seek(&mut self, pos: io::SeekFrom) -> io::Result { + let (base_pos, offset) = match pos { + io::SeekFrom::Start(n) => { + self.pos = n; + return Ok(n); + } + io::SeekFrom::End(n) => (self.len() as u64, n), + io::SeekFrom::Current(n) => (self.pos, n), + }; + match base_pos.checked_add_signed(offset) { + Some(n) => { + self.pos = n; + Ok(n) + } + None => Err(io::Error::new( + io::ErrorKind::InvalidInput, + "invalid seek to a negative or overflowing position", + )), + } + } +} + +impl SegmentLen for Segment { + fn segment_len(&mut self) -> io::Result { + Ok(self.len() as u64) + } +} + +impl FileLike for Segment { + fn fsync(&mut self) -> io::Result<()> { + Ok(()) + } + + fn ftruncate(&mut self, _tx_offset: u64, size: u64) -> io::Result<()> { + let mut storage = self.storage.write().unwrap(); + let mut avail = self.space.lock().unwrap(); + + if size > storage.alloc { + if *avail == 0 { + return Err(enospc()); + } + + let want = size.next_multiple_of(PAGE_SIZE as u64) - storage.alloc; + let have = want.min(*avail); + + storage.alloc += have; + *avail -= have; + storage.buf.resize(size as usize, 0); + + if want > have { + return Err(enospc()); + } + } else { + let alloc = size.next_multiple_of(PAGE_SIZE as u64); + *avail += storage.alloc - alloc; + storage.alloc = alloc; + storage.buf.resize(size as usize, 0); + } + + Ok(()) + } +} + +pub struct ReadOnlySegment { + inner: io::BufReader, +} + +impl From for ReadOnlySegment { + fn from(inner: Segment) -> Self { + Self { + inner: io::BufReader::new(inner), + } + } +} + +impl SegmentReader for ReadOnlySegment { + fn sealed(&self) -> bool { + false + } +} + +impl io::Read for ReadOnlySegment { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + self.inner.read(buf) + } +} + +impl io::BufRead for ReadOnlySegment { + fn fill_buf(&mut self) -> io::Result<&[u8]> { + self.inner.fill_buf() + } + + fn consume(&mut self, amount: usize) { + self.inner.consume(amount); + } +} + +impl io::Seek for ReadOnlySegment { + fn seek(&mut self, pos: io::SeekFrom) -> io::Result { + self.inner.seek(pos) + } +} + +impl SegmentLen for ReadOnlySegment {} + +fn next_page_multiple(size: u64) -> io::Result { + let page = PAGE_SIZE as u64; + let remainder = size % page; + if remainder == 0 { + return Ok(size); + } + + size.checked_add(page - remainder) + .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, "allocation size overflow")) +} + +fn enospc() -> io::Error { + io::Error::new(io::ErrorKind::StorageFull, "no space left on device") +} + +#[cfg(test)] +mod tests { + use std::io::{Read, Seek, Write}; + + use super::*; + + fn segment() -> Segment { + Segment::from_shared(Arc::new(Mutex::new(u64::MAX)), Arc::new(RwLock::new(Storage::new()))) + } + + #[test] + fn write_overwrites_at_seek_position() { + let mut segment = segment(); + + segment.write_all(b"abcdef").unwrap(); + segment.seek(io::SeekFrom::Start(2)).unwrap(); + segment.write_all(b"XY").unwrap(); + + let mut bytes = Vec::new(); + segment.seek(io::SeekFrom::Start(0)).unwrap(); + segment.read_to_end(&mut bytes).unwrap(); + + assert_eq!(bytes, b"abXYef"); + } + + #[test] + fn write_after_end_fills_gap_with_zeroes() { + let mut segment = segment(); + + segment.seek(io::SeekFrom::Start(4)).unwrap(); + segment.write_all(&[1, 2]).unwrap(); + + let mut bytes = Vec::new(); + segment.seek(io::SeekFrom::Start(0)).unwrap(); + segment.read_to_end(&mut bytes).unwrap(); + + assert_eq!(bytes, &[0, 0, 0, 0, 1, 2]); + } +} diff --git a/crates/dst/src/sim/mod.rs b/crates/dst/src/sim/mod.rs new file mode 100644 index 00000000000..3a448644d48 --- /dev/null +++ b/crates/dst/src/sim/mod.rs @@ -0,0 +1 @@ +pub mod commitlog; diff --git a/crates/dst/src/traits.rs b/crates/dst/src/traits.rs new file mode 100644 index 00000000000..d84aafa97db --- /dev/null +++ b/crates/dst/src/traits.rs @@ -0,0 +1,51 @@ +use anyhow::Error; +use spacetimedb_runtime::sim::Rng; + +/// This should be implemented by System under test. +pub trait TargetDriver { + type Observation; + + fn execute(&mut self, interaction: &I) -> Result; +} + +/// Ensures if Output of `TargetDrive` is expected for the input +pub trait Properties { + fn observe(&mut self, interaction: &I, observation: &O) -> Result<(), Error>; +} + +pub type TestSuiteParts = ( + ::Interactions, + ::Target, + ::Properties, +); + +pub trait TestSuite { + type Interaction; + type Interactions: Iterator + std::fmt::Debug; + type Target: TargetDriver; + type Properties: Properties>::Observation>; + + fn build(&self, rng: Rng) -> Result, Error> + where + Self: Sized; + + fn run(&self, rng: Rng, max_interactions: Option) -> Result<(), Error> + where + Self: Sized, + { + let (mut interactions, mut target, mut properties) = self.build(rng)?; + + let result = (|| { + for interaction in interactions.by_ref().take(max_interactions.unwrap_or(usize::MAX)) { + let observation = target.execute(&interaction)?; + properties.observe(&interaction, &observation)?; + } + + Ok(()) + })(); + + tracing::info!(interaction_counts = ?interactions, "final interaction counts"); + + result + } +}