Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions crates/core/src/db/relational_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ impl RelationalDB {
/// It is an error to call this method on an already-initialized database.
///
/// See [`Self::open`] for further information.
pub fn set_initialized(&self, tx: &mut MutTx, host_type: HostType, program: Program) -> Result<(), DBError> {
pub fn set_initialized(&self, tx: &mut MutTx, program: Program) -> Result<(), DBError> {
log::trace!(
"[{}] DATABASE: set initialized owner={} program_hash={}",
self.database_identity,
Expand All @@ -426,7 +426,7 @@ impl RelationalDB {
database_identity: self.database_identity.into(),
owner_identity: self.owner_identity.into(),

program_kind: host_type.into(),
program_kind: program.kind,
program_hash: program.hash,
program_bytes: program.bytes,
module_version: ONLY_MODULE_VERSION.into(),
Expand Down Expand Up @@ -469,8 +469,8 @@ impl RelationalDB {
/// the transactional context `tx`.
/// - the `__init__` reducer contained in the module has been executed
/// within the transactional context `tx`.
pub fn update_program(&self, tx: &mut MutTx, host_type: HostType, program: Program) -> Result<(), DBError> {
Ok(self.inner.update_program(tx, host_type.into(), program)?)
pub fn update_program(&self, tx: &mut MutTx, program: Program) -> Result<(), DBError> {
Ok(self.inner.update_program(tx, program)?)
}

fn restore_from_snapshot_or_bootstrap(
Expand Down Expand Up @@ -2174,7 +2174,7 @@ pub mod tests_utils {
assert_eq!(connected_clients.len(), expected_num_clients);
let db = db.with_row_count(Self::row_count_fn());
db.with_auto_commit(Workload::Internal, |tx| {
db.set_initialized(tx, HostType::Wasm, Program::empty())
db.set_initialized(tx, Program::empty(HostType::Wasm.into()))
})?;
Ok(db)
}
Expand Down
156 changes: 90 additions & 66 deletions crates/core/src/host/host_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use spacetimedb_datastore::db_metrics::data_size::DATA_SIZE_METRICS;
use spacetimedb_datastore::db_metrics::DB_METRICS;
use spacetimedb_datastore::traits::Program;
use spacetimedb_durability::{self as durability};
use spacetimedb_lib::{hash_bytes, AlgebraicValue, Identity, Timestamp};
use spacetimedb_lib::{AlgebraicValue, Identity, Timestamp};
use spacetimedb_paths::server::{ModuleLogsDir, ServerDataDir};
use spacetimedb_sats::hash::Hash;
use spacetimedb_schema::auto_migrate::{ponder_migrate, AutoMigrateError, MigrationPolicy, PrettyPrintStyle};
Expand Down Expand Up @@ -403,10 +403,7 @@ impl HostController {
program_bytes: Box<[u8]>,
policy: MigrationPolicy,
) -> anyhow::Result<UpdateDatabaseResult> {
let program = Program {
hash: hash_bytes(&program_bytes),
bytes: program_bytes,
};
let program = Program::from_bytes(host_type.into(), program_bytes);
trace!(
"update module host {}/{}: genesis={} update-to={}",
database.database_identity,
Expand Down Expand Up @@ -451,7 +448,6 @@ impl HostController {
let update_result = host
.update_module(
this.runtimes.clone(),
host_type,
program,
policy,
this.energy_monitor.clone(),
Expand All @@ -477,10 +473,7 @@ impl HostController {
program_bytes: Box<[u8]>,
style: PrettyPrintStyle,
) -> anyhow::Result<MigratePlanResult> {
let program = Program {
hash: hash_bytes(&program_bytes),
bytes: program_bytes,
};
let program = Program::from_bytes(host_type.into(), program_bytes);
trace!(
"migrate plan {}/{}: genesis={} update-to={}",
database.database_identity,
Expand Down Expand Up @@ -710,7 +703,6 @@ async fn make_replica_ctx(
#[allow(clippy::too_many_arguments)]
async fn make_module_host(
runtimes: Arc<HostRuntimes>,
host_type: HostType,
replica_ctx: Arc<ReplicaContext>,
scheduler: Scheduler,
program: Program,
Expand All @@ -732,7 +724,7 @@ async fn make_module_host(
energy_monitor,
};

match host_type {
match HostType::from(program.kind) {
HostType::Wasm => {
asyncify(move || {
let start = Instant::now();
Expand All @@ -753,12 +745,11 @@ async fn make_module_host(
}
}

async fn load_program(storage: &ProgramStorage, hash: Hash) -> anyhow::Result<Program> {
let bytes = storage
async fn load_program(storage: &ProgramStorage, hash: Hash) -> anyhow::Result<Box<[u8]>> {
storage
.lookup(hash)
.await?
.with_context(|| format!("program {hash} not found"))?;
Ok(Program { hash, bytes })
.with_context(|| format!("program {hash} not found"))
}

struct LaunchedModule {
Expand All @@ -768,49 +759,63 @@ struct LaunchedModule {
scheduler_starter: SchedulerStarter,
}

#[allow(clippy::too_many_arguments)]
async fn launch_module(
struct ModuleLauncher<F> {
database: Database,
replica_id: u64,
program: Program,
on_panic: impl Fn() + Send + Sync + 'static,
on_panic: F,
relational_db: Arc<RelationalDB>,
energy_monitor: Arc<dyn EnergyMonitor>,
module_logs: Option<ModuleLogsDir>,
runtimes: Arc<HostRuntimes>,
core: AllocatedJobCore,
bsatn_rlb_pool: BsatnRowListBuilderPool,
) -> anyhow::Result<(Program, LaunchedModule)> {
let db_identity = database.database_identity;
let host_type = database.host_type;
}

let replica_ctx = make_replica_ctx(module_logs, database, replica_id, relational_db, bsatn_rlb_pool)
impl<F: Fn() + Send + Sync + 'static> ModuleLauncher<F> {
async fn launch_module(self) -> anyhow::Result<(Program, LaunchedModule)> {
let db_identity = self.database.database_identity;
info!(
"launching module db={} replica={} program={} host_type={}",
db_identity,
self.replica_id,
self.program.hash,
HostType::from(self.program.kind)
);

let replica_ctx = make_replica_ctx(
self.module_logs,
self.database,
self.replica_id,
self.relational_db,
self.bsatn_rlb_pool,
)
.await
.map(Arc::new)?;
let (scheduler, scheduler_starter) = Scheduler::open(replica_ctx.relational_db.clone());
let (program, module_host) = make_module_host(
runtimes.clone(),
host_type,
replica_ctx.clone(),
scheduler.clone(),
program,
energy_monitor.clone(),
on_panic,
core,
)
.await?;
let (scheduler, scheduler_starter) = Scheduler::open(replica_ctx.relational_db.clone());
let (program, module_host) = make_module_host(
self.runtimes.clone(),
replica_ctx.clone(),
scheduler.clone(),
self.program,
self.energy_monitor,
self.on_panic,
self.core,
)
.await?;

trace!("launched database {} with program {}", db_identity, program.hash);
trace!("launched database {} with program {}", db_identity, program.hash);

Ok((
program,
LaunchedModule {
replica_ctx,
module_host,
scheduler,
scheduler_starter,
},
))
Ok((
program,
LaunchedModule {
replica_ctx,
module_host,
scheduler,
scheduler_starter,
},
))
}
}

/// Update a module.
Expand Down Expand Up @@ -944,27 +949,47 @@ impl Host {
};
let (program, program_needs_init) = match db.program()? {
// Launch module with program from existing database.
Some(program) => (program, false),
Some(program) => {
info!(
"loaded program {} from the database host-type={}",
program.hash,
HostType::from(program.kind)
);
(program, false)
}
// Database is empty, load program from external storage and run
// initialization.
None => (load_program(program_storage, database.initial_program).await?, true),
None => {
info!(
"loading program {} from external storage host-type={}",
database.initial_program, database.host_type
);
let program_bytes = load_program(program_storage, database.initial_program).await?;
let program = Program {
hash: database.initial_program,
bytes: program_bytes,
kind: database.host_type.into(),
};
(program, true)
}
};

let (program, launched) = launch_module(
let (program, launched) = ModuleLauncher {
database,
replica_id,
program,
on_panic,
Arc::new(db),
energy_monitor.clone(),
match config.storage {
relational_db: Arc::new(db),
energy_monitor: energy_monitor.clone(),
module_logs: match config.storage {
db::Storage::Memory => None,
db::Storage::Disk => Some(replica_dir.module_logs()),
},
runtimes.clone(),
host_controller.db_cores.take(),
bsatn_rlb_pool.clone(),
)
runtimes: runtimes.clone(),
core: host_controller.db_cores.take(),
bsatn_rlb_pool: bsatn_rlb_pool.clone(),
}
.launch_module()
.await?;

if program_needs_init {
Expand Down Expand Up @@ -1045,21 +1070,22 @@ impl Host {
page_pool,
)?;

launch_module(
ModuleLauncher {
database,
0,
replica_id: 0,
program,
// No need to register a callback here:
// proper publishes use it to unregister a panicked module,
// but this module is not registered in the first place.
|| log::error!("launch_module on_panic called for temporary publish in-memory instance"),
Arc::new(db),
Arc::new(NullEnergyMonitor),
None,
runtimes.clone(),
on_panic: || log::error!("launch_module on_panic called for temporary publish in-memory instance"),
relational_db: Arc::new(db),
energy_monitor: Arc::new(NullEnergyMonitor),
module_logs: None,
runtimes: runtimes.clone(),
core,
bsatn_rlb_pool,
)
}
.launch_module()
.await
}

Expand All @@ -1079,7 +1105,6 @@ impl Host {
async fn update_module(
&mut self,
runtimes: Arc<HostRuntimes>,
host_type: HostType,
program: Program,
policy: MigrationPolicy,
energy_monitor: Arc<dyn EnergyMonitor>,
Expand All @@ -1091,7 +1116,6 @@ impl Host {

let (program, module) = make_module_host(
runtimes,
host_type,
replica_ctx.clone(),
scheduler.clone(),
program,
Expand Down Expand Up @@ -1266,7 +1290,7 @@ pub(crate) async fn extract_schema_with_pools(
) -> anyhow::Result<ModuleDef> {
let owner_identity = Identity::from_u256(0xdcba_u32.into());
let database_identity = Identity::from_u256(0xabcd_u32.into());
let program = Program::from_bytes(program_bytes);
let program = Program::from_bytes(host_type.into(), program_bytes);

let database = Database {
id: 0,
Expand Down
3 changes: 1 addition & 2 deletions crates/core/src/host/module_host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,6 @@ fn init_database_inner(
let stdb = &*replica_ctx.relational_db;
let logger = replica_ctx.logger.system_logger();
let owner_identity = replica_ctx.database.owner_identity;
let host_type = replica_ctx.host_type;

let tx = stdb.begin_mut_tx(IsolationLevel::Serializable, Workload::Internal);
let auth_ctx = AuthCtx::for_current(owner_identity);
Expand Down Expand Up @@ -499,7 +498,7 @@ fn init_database_inner(
.with_context(|| format!("failed to create row-level security for table `{table_id}`: `{sql}`",))?;
}

stdb.set_initialized(tx, host_type, program)?;
stdb.set_initialized(tx, program)?;

anyhow::Ok(())
})
Expand Down
5 changes: 3 additions & 2 deletions crates/core/src/host/wasm_common/module_host_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -612,8 +612,9 @@ impl InstanceCommon {
};

let program_hash = program.hash;
let host_type = HostType::from(program.kind);
let tx = stdb.begin_mut_tx(IsolationLevel::Serializable, Workload::Internal);
let (mut tx, _) = stdb.with_auto_rollback(tx, |tx| stdb.update_program(tx, HostType::Wasm, program))?;
let (mut tx, _) = stdb.with_auto_rollback(tx, |tx| stdb.update_program(tx, program))?;
system_logger.info(&format!("Updated program to {program_hash}"));

let auth_ctx = AuthCtx::for_current(replica_ctx.database.owner_identity);
Expand All @@ -631,7 +632,7 @@ impl InstanceCommon {
}
Ok(res) => {
system_logger.info("Database updated");
log::info!("Database updated, {}", stdb.database_identity());
log::info!("Database updated, {} host-type={}", stdb.database_identity(), host_type);
let res: UpdateDatabaseResult = match res {
crate::db::update::UpdateResult::Success => UpdateDatabaseResult::UpdatePerformed,
crate::db::update::UpdateResult::EvaluateSubscribedViews => {
Expand Down
18 changes: 14 additions & 4 deletions crates/core/src/messages/control_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,21 @@ pub enum HostType {
Js = 1,
}

impl From<crate::messages::control_db::HostType> for ModuleKind {
fn from(host_type: crate::messages::control_db::HostType) -> Self {
impl From<HostType> for ModuleKind {
fn from(host_type: HostType) -> Self {
match host_type {
crate::messages::control_db::HostType::Wasm => Self::WASM,
crate::messages::control_db::HostType::Js => Self::JS,
HostType::Wasm => Self::WASM,
HostType::Js => Self::JS,
}
}
}

impl From<ModuleKind> for HostType {
fn from(kind: ModuleKind) -> Self {
match kind {
ModuleKind::WASM => Self::Wasm,
ModuleKind::JS => Self::Js,
x => unreachable!("missing mapping from module kind {x:?} to host type"),
}
}
}
Loading
Loading