diff --git a/crates/core/src/db/relational_db.rs b/crates/core/src/db/relational_db.rs index 4cf13096c96..9f0b992855b 100644 --- a/crates/core/src/db/relational_db.rs +++ b/crates/core/src/db/relational_db.rs @@ -403,7 +403,7 @@ impl RelationalDB { /// It is an error to call this method on an already-initialized database. /// /// See [`Self::open`] for further information. - pub fn set_initialized(&self, tx: &mut MutTx, host_type: HostType, program: Program) -> Result<(), DBError> { + pub fn set_initialized(&self, tx: &mut MutTx, program: Program) -> Result<(), DBError> { log::trace!( "[{}] DATABASE: set initialized owner={} program_hash={}", self.database_identity, @@ -426,7 +426,7 @@ impl RelationalDB { database_identity: self.database_identity.into(), owner_identity: self.owner_identity.into(), - program_kind: host_type.into(), + program_kind: program.kind, program_hash: program.hash, program_bytes: program.bytes, module_version: ONLY_MODULE_VERSION.into(), @@ -469,8 +469,8 @@ impl RelationalDB { /// the transactional context `tx`. /// - the `__init__` reducer contained in the module has been executed /// within the transactional context `tx`. - pub fn update_program(&self, tx: &mut MutTx, host_type: HostType, program: Program) -> Result<(), DBError> { - Ok(self.inner.update_program(tx, host_type.into(), program)?) + pub fn update_program(&self, tx: &mut MutTx, program: Program) -> Result<(), DBError> { + Ok(self.inner.update_program(tx, program)?) } fn restore_from_snapshot_or_bootstrap( @@ -2174,7 +2174,7 @@ pub mod tests_utils { assert_eq!(connected_clients.len(), expected_num_clients); let db = db.with_row_count(Self::row_count_fn()); db.with_auto_commit(Workload::Internal, |tx| { - db.set_initialized(tx, HostType::Wasm, Program::empty()) + db.set_initialized(tx, Program::empty(HostType::Wasm.into())) })?; Ok(db) } diff --git a/crates/core/src/host/host_controller.rs b/crates/core/src/host/host_controller.rs index b5577c8e7e7..82eb265c393 100644 --- a/crates/core/src/host/host_controller.rs +++ b/crates/core/src/host/host_controller.rs @@ -32,7 +32,7 @@ use spacetimedb_datastore::db_metrics::data_size::DATA_SIZE_METRICS; use spacetimedb_datastore::db_metrics::DB_METRICS; use spacetimedb_datastore::traits::Program; use spacetimedb_durability::{self as durability}; -use spacetimedb_lib::{hash_bytes, AlgebraicValue, Identity, Timestamp}; +use spacetimedb_lib::{AlgebraicValue, Identity, Timestamp}; use spacetimedb_paths::server::{ModuleLogsDir, ServerDataDir}; use spacetimedb_sats::hash::Hash; use spacetimedb_schema::auto_migrate::{ponder_migrate, AutoMigrateError, MigrationPolicy, PrettyPrintStyle}; @@ -403,10 +403,7 @@ impl HostController { program_bytes: Box<[u8]>, policy: MigrationPolicy, ) -> anyhow::Result { - let program = Program { - hash: hash_bytes(&program_bytes), - bytes: program_bytes, - }; + let program = Program::from_bytes(host_type.into(), program_bytes); trace!( "update module host {}/{}: genesis={} update-to={}", database.database_identity, @@ -451,7 +448,6 @@ impl HostController { let update_result = host .update_module( this.runtimes.clone(), - host_type, program, policy, this.energy_monitor.clone(), @@ -477,10 +473,7 @@ impl HostController { program_bytes: Box<[u8]>, style: PrettyPrintStyle, ) -> anyhow::Result { - let program = Program { - hash: hash_bytes(&program_bytes), - bytes: program_bytes, - }; + let program = Program::from_bytes(host_type.into(), program_bytes); trace!( "migrate plan {}/{}: genesis={} update-to={}", database.database_identity, @@ -710,7 +703,6 @@ async fn make_replica_ctx( #[allow(clippy::too_many_arguments)] async fn make_module_host( runtimes: Arc, - host_type: HostType, replica_ctx: Arc, scheduler: Scheduler, program: Program, @@ -732,7 +724,7 @@ async fn make_module_host( energy_monitor, }; - match host_type { + match HostType::from(program.kind) { HostType::Wasm => { asyncify(move || { let start = Instant::now(); @@ -753,12 +745,11 @@ async fn make_module_host( } } -async fn load_program(storage: &ProgramStorage, hash: Hash) -> anyhow::Result { - let bytes = storage +async fn load_program(storage: &ProgramStorage, hash: Hash) -> anyhow::Result> { + storage .lookup(hash) .await? - .with_context(|| format!("program {hash} not found"))?; - Ok(Program { hash, bytes }) + .with_context(|| format!("program {hash} not found")) } struct LaunchedModule { @@ -768,49 +759,63 @@ struct LaunchedModule { scheduler_starter: SchedulerStarter, } -#[allow(clippy::too_many_arguments)] -async fn launch_module( +struct ModuleLauncher { database: Database, replica_id: u64, program: Program, - on_panic: impl Fn() + Send + Sync + 'static, + on_panic: F, relational_db: Arc, energy_monitor: Arc, module_logs: Option, runtimes: Arc, core: AllocatedJobCore, bsatn_rlb_pool: BsatnRowListBuilderPool, -) -> anyhow::Result<(Program, LaunchedModule)> { - let db_identity = database.database_identity; - let host_type = database.host_type; +} - let replica_ctx = make_replica_ctx(module_logs, database, replica_id, relational_db, bsatn_rlb_pool) +impl ModuleLauncher { + async fn launch_module(self) -> anyhow::Result<(Program, LaunchedModule)> { + let db_identity = self.database.database_identity; + info!( + "launching module db={} replica={} program={} host_type={}", + db_identity, + self.replica_id, + self.program.hash, + HostType::from(self.program.kind) + ); + + let replica_ctx = make_replica_ctx( + self.module_logs, + self.database, + self.replica_id, + self.relational_db, + self.bsatn_rlb_pool, + ) .await .map(Arc::new)?; - let (scheduler, scheduler_starter) = Scheduler::open(replica_ctx.relational_db.clone()); - let (program, module_host) = make_module_host( - runtimes.clone(), - host_type, - replica_ctx.clone(), - scheduler.clone(), - program, - energy_monitor.clone(), - on_panic, - core, - ) - .await?; + let (scheduler, scheduler_starter) = Scheduler::open(replica_ctx.relational_db.clone()); + let (program, module_host) = make_module_host( + self.runtimes.clone(), + replica_ctx.clone(), + scheduler.clone(), + self.program, + self.energy_monitor, + self.on_panic, + self.core, + ) + .await?; - trace!("launched database {} with program {}", db_identity, program.hash); + trace!("launched database {} with program {}", db_identity, program.hash); - Ok(( - program, - LaunchedModule { - replica_ctx, - module_host, - scheduler, - scheduler_starter, - }, - )) + Ok(( + program, + LaunchedModule { + replica_ctx, + module_host, + scheduler, + scheduler_starter, + }, + )) + } } /// Update a module. @@ -944,27 +949,47 @@ impl Host { }; let (program, program_needs_init) = match db.program()? { // Launch module with program from existing database. - Some(program) => (program, false), + Some(program) => { + info!( + "loaded program {} from the database host-type={}", + program.hash, + HostType::from(program.kind) + ); + (program, false) + } // Database is empty, load program from external storage and run // initialization. - None => (load_program(program_storage, database.initial_program).await?, true), + None => { + info!( + "loading program {} from external storage host-type={}", + database.initial_program, database.host_type + ); + let program_bytes = load_program(program_storage, database.initial_program).await?; + let program = Program { + hash: database.initial_program, + bytes: program_bytes, + kind: database.host_type.into(), + }; + (program, true) + } }; - let (program, launched) = launch_module( + let (program, launched) = ModuleLauncher { database, replica_id, program, on_panic, - Arc::new(db), - energy_monitor.clone(), - match config.storage { + relational_db: Arc::new(db), + energy_monitor: energy_monitor.clone(), + module_logs: match config.storage { db::Storage::Memory => None, db::Storage::Disk => Some(replica_dir.module_logs()), }, - runtimes.clone(), - host_controller.db_cores.take(), - bsatn_rlb_pool.clone(), - ) + runtimes: runtimes.clone(), + core: host_controller.db_cores.take(), + bsatn_rlb_pool: bsatn_rlb_pool.clone(), + } + .launch_module() .await?; if program_needs_init { @@ -1045,21 +1070,22 @@ impl Host { page_pool, )?; - launch_module( + ModuleLauncher { database, - 0, + replica_id: 0, program, // No need to register a callback here: // proper publishes use it to unregister a panicked module, // but this module is not registered in the first place. - || log::error!("launch_module on_panic called for temporary publish in-memory instance"), - Arc::new(db), - Arc::new(NullEnergyMonitor), - None, - runtimes.clone(), + on_panic: || log::error!("launch_module on_panic called for temporary publish in-memory instance"), + relational_db: Arc::new(db), + energy_monitor: Arc::new(NullEnergyMonitor), + module_logs: None, + runtimes: runtimes.clone(), core, bsatn_rlb_pool, - ) + } + .launch_module() .await } @@ -1079,7 +1105,6 @@ impl Host { async fn update_module( &mut self, runtimes: Arc, - host_type: HostType, program: Program, policy: MigrationPolicy, energy_monitor: Arc, @@ -1091,7 +1116,6 @@ impl Host { let (program, module) = make_module_host( runtimes, - host_type, replica_ctx.clone(), scheduler.clone(), program, @@ -1266,7 +1290,7 @@ pub(crate) async fn extract_schema_with_pools( ) -> anyhow::Result { let owner_identity = Identity::from_u256(0xdcba_u32.into()); let database_identity = Identity::from_u256(0xabcd_u32.into()); - let program = Program::from_bytes(program_bytes); + let program = Program::from_bytes(host_type.into(), program_bytes); let database = Database { id: 0, diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index eef00858928..75d76f24273 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -464,7 +464,6 @@ fn init_database_inner( let stdb = &*replica_ctx.relational_db; let logger = replica_ctx.logger.system_logger(); let owner_identity = replica_ctx.database.owner_identity; - let host_type = replica_ctx.host_type; let tx = stdb.begin_mut_tx(IsolationLevel::Serializable, Workload::Internal); let auth_ctx = AuthCtx::for_current(owner_identity); @@ -499,7 +498,7 @@ fn init_database_inner( .with_context(|| format!("failed to create row-level security for table `{table_id}`: `{sql}`",))?; } - stdb.set_initialized(tx, host_type, program)?; + stdb.set_initialized(tx, program)?; anyhow::Ok(()) }) diff --git a/crates/core/src/host/wasm_common/module_host_actor.rs b/crates/core/src/host/wasm_common/module_host_actor.rs index b45a90ef402..310401a42b3 100644 --- a/crates/core/src/host/wasm_common/module_host_actor.rs +++ b/crates/core/src/host/wasm_common/module_host_actor.rs @@ -612,8 +612,9 @@ impl InstanceCommon { }; let program_hash = program.hash; + let host_type = HostType::from(program.kind); let tx = stdb.begin_mut_tx(IsolationLevel::Serializable, Workload::Internal); - let (mut tx, _) = stdb.with_auto_rollback(tx, |tx| stdb.update_program(tx, HostType::Wasm, program))?; + let (mut tx, _) = stdb.with_auto_rollback(tx, |tx| stdb.update_program(tx, program))?; system_logger.info(&format!("Updated program to {program_hash}")); let auth_ctx = AuthCtx::for_current(replica_ctx.database.owner_identity); @@ -631,7 +632,7 @@ impl InstanceCommon { } Ok(res) => { system_logger.info("Database updated"); - log::info!("Database updated, {}", stdb.database_identity()); + log::info!("Database updated, {} host-type={}", stdb.database_identity(), host_type); let res: UpdateDatabaseResult = match res { crate::db::update::UpdateResult::Success => UpdateDatabaseResult::UpdatePerformed, crate::db::update::UpdateResult::EvaluateSubscribedViews => { diff --git a/crates/core/src/messages/control_db.rs b/crates/core/src/messages/control_db.rs index c60584af2e2..865be57e410 100644 --- a/crates/core/src/messages/control_db.rs +++ b/crates/core/src/messages/control_db.rs @@ -99,11 +99,21 @@ pub enum HostType { Js = 1, } -impl From for ModuleKind { - fn from(host_type: crate::messages::control_db::HostType) -> Self { +impl From for ModuleKind { + fn from(host_type: HostType) -> Self { match host_type { - crate::messages::control_db::HostType::Wasm => Self::WASM, - crate::messages::control_db::HostType::Js => Self::JS, + HostType::Wasm => Self::WASM, + HostType::Js => Self::JS, + } + } +} + +impl From for HostType { + fn from(kind: ModuleKind) -> Self { + match kind { + ModuleKind::WASM => Self::Wasm, + ModuleKind::JS => Self::Js, + x => unreachable!("missing mapping from module kind {x:?} to host type"), } } } diff --git a/crates/datastore/src/locking_tx_datastore/datastore.rs b/crates/datastore/src/locking_tx_datastore/datastore.rs index a08b6386b36..53327b55c8b 100644 --- a/crates/datastore/src/locking_tx_datastore/datastore.rs +++ b/crates/datastore/src/locking_tx_datastore/datastore.rs @@ -14,8 +14,8 @@ use crate::{ use crate::{ execution_context::ExecutionContext, system_tables::{ - read_bytes_from_col, read_hash_from_col, read_identity_from_col, system_table_schema, ModuleKind, StClientRow, - StModuleFields, StModuleRow, StTableFields, ST_CLIENT_ID, ST_MODULE_ID, ST_TABLE_ID, + read_hash_from_col, read_identity_from_col, system_table_schema, StClientRow, StModuleFields, StModuleRow, + StTableFields, ST_CLIENT_ID, ST_MODULE_ID, ST_TABLE_ID, }, traits::{ DataRow, IsolationLevel, Metadata, MutTx, MutTxDatastore, Program, RowTypeForTable, Tx, TxData, TxDatastore, @@ -472,9 +472,17 @@ impl TxDatastore for Locking { self.iter_tx(tx, ST_MODULE_ID)? .next() .map(|row_ref| { - let hash = read_hash_from_col(row_ref, StModuleFields::ProgramHash)?; - let bytes = read_bytes_from_col(row_ref, StModuleFields::ProgramBytes)?; - Ok(Program { hash, bytes }) + let StModuleRow { + program_kind, + program_hash, + program_bytes, + .. + } = row_ref.try_into()?; + Ok(Program { + hash: program_hash, + bytes: program_bytes, + kind: program_kind, + }) }) .transpose() } @@ -674,7 +682,7 @@ impl MutTxDatastore for Locking { tx.iter(ST_MODULE_ID)?.next().map(metadata_from_row).transpose() } - fn update_program(&self, tx: &mut Self::MutTx, program_kind: ModuleKind, program: Program) -> Result<()> { + fn update_program(&self, tx: &mut Self::MutTx, program: Program) -> Result<()> { let old = tx .iter(ST_MODULE_ID)? .next() @@ -686,7 +694,7 @@ impl MutTxDatastore for Locking { .transpose()?; match old { Some((ptr, mut row)) => { - row.program_kind = program_kind; + row.program_kind = program.kind; row.program_hash = program.hash; row.program_bytes = program.bytes; diff --git a/crates/datastore/src/traits.rs b/crates/datastore/src/traits.rs index 9b98c11addf..998cdb859e1 100644 --- a/crates/datastore/src/traits.rs +++ b/crates/datastore/src/traits.rs @@ -505,6 +505,8 @@ pub struct Program { pub hash: Hash, /// The raw bytes of the program. pub bytes: Box<[u8]>, + /// The kind (host type) of the program. + pub kind: ModuleKind, } impl Program { @@ -512,15 +514,15 @@ impl Program { /// /// This computes the hash over `bytes`, so prefer constructing [`Program`] /// directly if the hash is already known. - pub fn from_bytes(bytes: impl Into>) -> Self { + pub fn from_bytes(kind: ModuleKind, bytes: impl Into>) -> Self { let bytes = bytes.into(); let hash = hash_bytes(&bytes); - Self { hash, bytes } + Self { hash, bytes, kind } } /// Create a [`Program`] with no bytes. - pub fn empty() -> Self { - Self::from_bytes([]) + pub fn empty(kind: ModuleKind) -> Self { + Self::from_bytes(kind, []) } } @@ -714,5 +716,5 @@ pub trait MutTxDatastore: TxDatastore + MutTx { fn metadata_mut_tx(&self, tx: &Self::MutTx) -> Result>; /// Update the datastore with the supplied binary program. - fn update_program(&self, tx: &mut Self::MutTx, program_kind: ModuleKind, program: Program) -> Result<()>; + fn update_program(&self, tx: &mut Self::MutTx, program: Program) -> Result<()>; } diff --git a/crates/guard/src/lib.rs b/crates/guard/src/lib.rs index f61244d7706..af937910a54 100644 --- a/crates/guard/src/lib.rs +++ b/crates/guard/src/lib.rs @@ -196,6 +196,12 @@ impl SpacetimeDbGuard { child.id(), host_url ); + // Ensure logs from the killed server are retained. + { + let mut new_logs = logs.lock().unwrap(); + let old_logs = self.logs.lock().unwrap(); + new_logs.insert_str(0, old_logs.as_str()); + } self.child = child; self.logs = logs; diff --git a/crates/smoketests/src/lib.rs b/crates/smoketests/src/lib.rs index 3aca5a2794e..aa54aea04ef 100644 --- a/crates/smoketests/src/lib.rs +++ b/crates/smoketests/src/lib.rs @@ -772,6 +772,8 @@ impl Smoketest { /// Initializes, writes, and publishes a TypeScript module from source. /// + /// Will publish with the `--clear-database` flag. + /// /// The module is initialized at `//spacetimedb`. /// On success this updates `self.database_identity`. pub fn publish_typescript_module_source( @@ -779,6 +781,22 @@ impl Smoketest { project_dir_name: &str, module_name: &str, module_source: &str, + ) -> Result { + self.publish_typescript_module_source_clear(project_dir_name, module_name, module_source, true) + } + + /// Initializes, writes, and publishes a TypeScript module from source. + /// + /// If `clear` is `true`, this will publish with the `--clear-database` flag. + /// + /// The module is initialized at `//spacetimedb`. + /// On success this updates `self.database_identity`. + pub fn publish_typescript_module_source_clear( + &mut self, + project_dir_name: &str, + module_name: &str, + module_source: &str, + clear: bool, ) -> Result { let module_root = self.project_dir.path().join(project_dir_name); let module_root_str = module_root.to_str().context("Invalid TypeScript project path")?; @@ -803,16 +821,19 @@ impl Smoketest { pnpm(&["install", ts_bindings_path], &module_path)?; let module_path_str = module_path.to_str().context("Invalid TypeScript module path")?; - let publish_output = self.spacetime(&[ + let mut publish_args = vec![ "publish", "--server", &self.server_url, "--module-path", module_path_str, "--yes", - "--clear-database", - module_name, - ])?; + ]; + if clear { + publish_args.push("--clear-database"); + } + publish_args.push(module_name); + let publish_output = self.spacetime(&publish_args)?; let re = Regex::new(r"identity: ([0-9a-fA-F]+)").unwrap(); let identity = re diff --git a/crates/smoketests/tests/smoketests/change_host_type.rs b/crates/smoketests/tests/smoketests/change_host_type.rs new file mode 100644 index 00000000000..33efd036dd6 --- /dev/null +++ b/crates/smoketests/tests/smoketests/change_host_type.rs @@ -0,0 +1,85 @@ +use spacetimedb_smoketests::{require_local_server, require_pnpm, Smoketest}; + +const TS_MODULE_BASIC: &str = r#"import { schema, t, table } from "spacetimedb/server"; + +const person = table( + { name: "person", public: true }, + { + id: t.u64().primaryKey().autoInc(), + name: t.string() + } +); +const spacetimedb = schema({ person }); +export default spacetimedb; + +export const add = spacetimedb.reducer({ name: t.string() }, (ctx, { name }) => { + ctx.db.person.insert({ id: 0n, name }); +}); +"#; + +/// Tests that updating a module and also changing the host type works. +/// +/// Note that this test restarts the server. +#[test] +fn test_update_with_different_host_type() { + require_pnpm!(); + require_local_server!(); + + const PERSON_A: &str = "Person A"; + const PERSON_B: &str = "Person B"; + const PERSON_C: &str = "Person C"; + + let mut test = Smoketest::builder() + .precompiled_module("modules-basic") + .autopublish(false) + .build(); + + let database_identity = test.publish_module().unwrap(); + add_person(&test, PERSON_A, "initial"); + + // Publish a TS module. + test.publish_typescript_module_source_clear("modules-basic-ts", &database_identity, TS_MODULE_BASIC, false) + .unwrap(); + add_person(&test, PERSON_B, "post module update"); + + // Restart and assert that the data is still there. + test.restart_server(); + assert_has_rows(&test, &[PERSON_A, PERSON_B], "post restart"); + + // Change back to original module and assert that the data is still there. + test.publish_module_clear(false).unwrap(); + add_person(&test, PERSON_C, "post revert"); + + // Restart once more and assert that the data is still there. + test.restart_server(); + assert_has_rows(&test, &[PERSON_A, PERSON_B, PERSON_C], "post restart 2"); +} + +fn add_person(test: &Smoketest, name: &str, context: &str) { + test.call("add", &[name]).unwrap(); + assert_has_person(test, name, context); +} + +fn assert_has_person(test: &Smoketest, name: &str, context: &str) { + let output = test + .sql_confirmed(&format!("select * from person where name = '{name}'")) + .unwrap(); + assert!( + output.contains(name), + "{}: expected {} to be in result: {}", + context, + name, + output + ); +} + +fn assert_has_rows(test: &Smoketest, names: &[&str], context: &str) { + let output = test.sql_confirmed("select * from person").unwrap(); + assert!( + output + .lines() + .skip(2) + .all(|row| names.iter().any(|name| row.contains(name))), + "{context}: expected all of {names:?} to be in result: {output}" + ) +} diff --git a/crates/smoketests/tests/smoketests/mod.rs b/crates/smoketests/tests/smoketests/mod.rs index 012d7988641..f5053652dd3 100644 --- a/crates/smoketests/tests/smoketests/mod.rs +++ b/crates/smoketests/tests/smoketests/mod.rs @@ -3,6 +3,7 @@ mod add_remove_index; mod auto_inc; mod auto_migration; mod call; +mod change_host_type; mod cli; mod client_connection_errors; mod confirmed_reads; diff --git a/crates/standalone/src/lib.rs b/crates/standalone/src/lib.rs index 6c4e61dc9c8..5176e17ff7c 100644 --- a/crates/standalone/src/lib.rs +++ b/crates/standalone/src/lib.rs @@ -124,9 +124,9 @@ pub enum GetLeaderHostError { NoSuchDatabase, #[error("replica does not exist")] NoSuchReplica, - #[error("error starting database")] + #[error("error starting database: {source:#}")] LaunchError { source: anyhow::Error }, - #[error("error accessing controldb")] + #[error("error accessing controldb: {0:#}")] Control(#[from] control_db::Error), } @@ -272,7 +272,7 @@ impl spacetimedb_client_api::ControlStateWriteAccess for StandaloneEnv { match existing_db { // The database does not already exist, so we'll create it. None => { - let program = Program::from_bytes(&spec.program_bytes[..]); + let program = Program::from_bytes(spec.host_type.into(), &spec.program_bytes[..]); let database = Database { id: 0, @@ -406,14 +406,14 @@ impl spacetimedb_client_api::ControlStateWriteAccess for StandaloneEnv { let database_id = database.id; if let Some(program) = spec.program_bytes { + if let Some(host_type) = spec.host_type { + database.host_type = host_type; + } let program_bytes = &program[..]; - let program = Program::from_bytes(program_bytes); + let program = Program::from_bytes(database.host_type.into(), program_bytes); let _hash_for_assert = program.hash; database.initial_program = program.hash; - if let Some(host_type) = spec.host_type { - database.host_type = host_type; - } self.host_controller .check_module_validity(database.clone(), program)