Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 14 additions & 56 deletions crates/core/src/db/relational_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use spacetimedb_datastore::locking_tx_datastore::datastore::TxMetrics;
use spacetimedb_datastore::locking_tx_datastore::state_view::{
IterByColEqMutTx, IterByColRangeMutTx, IterMutTx, StateView,
};
use spacetimedb_datastore::locking_tx_datastore::{MutTxId, TxId};
use spacetimedb_datastore::locking_tx_datastore::{ApplyHistoryCounters, MutTxId, TxId};
use spacetimedb_datastore::system_tables::{
system_tables, StModuleRow, ST_CLIENT_ID, ST_CONNECTION_CREDENTIALS_ID, ST_VIEW_SUB_ID,
};
Expand Down Expand Up @@ -1645,62 +1645,20 @@ impl RelationalDB {
}
}

fn apply_history<H>(datastore: &Locking, database_identity: Identity, history: H) -> Result<(), DBError>
where
H: durability::History<TxData = Txdata>,
{
log::info!("[{database_identity}] DATABASE: applying transaction history...");

// TODO: Revisit once we actually replay history suffixes, ie. starting
// from an offset larger than the history's min offset.
// TODO: We may want to require that a `tokio::runtime::Handle` is
// always supplied when constructing a `RelationalDB`. This would allow
// to spawn a timer task here which just prints the progress periodically
// in case the history is finite but very long.
let (_, max_tx_offset) = history.tx_range_hint();
let mut last_logged_percentage = 0;
let progress = |tx_offset: u64| {
if let Some(max_tx_offset) = max_tx_offset {
let percentage = f64::floor((tx_offset as f64 / max_tx_offset as f64) * 100.0) as i32;
if percentage > last_logged_percentage && percentage % 10 == 0 {
log::info!("[{database_identity}] Loaded {percentage}% ({tx_offset}/{max_tx_offset})");
last_logged_percentage = percentage;
}
// Print _something_ even if we don't know what's still ahead.
} else if tx_offset.is_multiple_of(10_000) {
log::info!("[{database_identity}] Loading transaction {tx_offset}");
}
fn apply_history(
datastore: &Locking,
database_identity: Identity,
history: impl durability::History<TxData = Txdata>,
) -> Result<(), DBError> {
let counters = ApplyHistoryCounters {
replay_commitlog_time_seconds: WORKER_METRICS
.replay_commitlog_time_seconds
.with_label_values(&database_identity),
replay_commitlog_num_commits: WORKER_METRICS
.replay_commitlog_num_commits
.with_label_values(&database_identity),
};

let time_before = std::time::Instant::now();

let mut replay = datastore.replay(
progress,
// We don't want to instantiate an incorrect state;
// if the commitlog contains an inconsistency we'd rather get a hard error than showing customers incorrect data.
spacetimedb_datastore::locking_tx_datastore::datastore::ErrorBehavior::FailFast,
);
let start_tx_offset = replay.next_tx_offset();
history
.fold_transactions_from(start_tx_offset, &mut replay)
.map_err(anyhow::Error::from)?;

let time_elapsed = time_before.elapsed();
WORKER_METRICS
.replay_commitlog_time_seconds
.with_label_values(&database_identity)
.set(time_elapsed.as_secs_f64());

let end_tx_offset = replay.next_tx_offset();
WORKER_METRICS
.replay_commitlog_num_commits
.with_label_values(&database_identity)
.set((end_tx_offset - start_tx_offset) as _);

log::info!("[{database_identity}] DATABASE: applied transaction history");
datastore.rebuild_state_after_replay()?;
log::info!("[{database_identity}] DATABASE: rebuilt state after replay");

spacetimedb_datastore::locking_tx_datastore::apply_history(datastore, database_identity, history, counters)?;
Ok(())
}

Expand Down
Loading
Loading