diff --git a/Cargo.toml b/Cargo.toml index 5486f47..119c501 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -70,11 +70,11 @@ evolve_testapp = { path = "bin/testapp" } # outside deps linkme = { version = "0.3", default-features = false } -commonware-cryptography = "0.0.65" -commonware-runtime = "0.0.65" -commonware-storage = "0.0.65" -commonware-utils = "0.0.65" -commonware-codec = "0.0.65" +commonware-cryptography = "2026.3.0" +commonware-runtime = "2026.3.0" +commonware-storage = "2026.3.0" +commonware-utils = "2026.3.0" +commonware-codec = "2026.3.0" borsh = { features = ["derive"], version = "1.5.5" } clap = { version = "4.5.31", features = ["derive"] } fixed = { version = "1.29", features = ["borsh", "serde"] } diff --git a/crates/rpc/evnode/src/runner.rs b/crates/rpc/evnode/src/runner.rs index 4765a8f..370c6ca 100644 --- a/crates/rpc/evnode/src/runner.rs +++ b/crates/rpc/evnode/src/runner.rs @@ -32,10 +32,11 @@ use evolve_server::{ use evolve_stf_traits::AccountsCodeStorage; use evolve_storage::{Operation, Storage, StorageConfig}; use evolve_tx_eth::TxContext; +use tokio::sync::oneshot; use crate::{ - BlockExecutedInfo, EvnodeServer, EvnodeServerConfig, EvnodeStfExecutor, ExecutorServiceConfig, - OnBlockExecuted, + BlockExecutedInfo, EvnodeError, EvnodeServer, EvnodeServerConfig, EvnodeStfExecutor, + ExecutorServiceConfig, OnBlockExecuted, }; type SharedChainIndex = Arc; @@ -66,11 +67,16 @@ struct ExternalConsensusSinkConfig { } struct ExternalConsensusCommitSink { - sender: Option>, + sender: Option>, worker: Option>, current_height: Arc, } +struct QueuedBlockExecution { + info: BlockExecutedInfo, + result_tx: oneshot::Sender>, +} + impl ExternalConsensusCommitSink { fn spawn( storage: S, @@ -82,7 +88,7 @@ impl ExternalConsensusCommitSink { { const MAX_PENDING_BLOCKS: usize = 16; - let (sender, receiver) = mpsc::sync_channel::(MAX_PENDING_BLOCKS); + let (sender, receiver) = mpsc::sync_channel::(MAX_PENDING_BLOCKS); let current_height = Arc::new(AtomicU64::new(config.initial_height)); let current_height_for_worker = Arc::clone(¤t_height); @@ -94,54 +100,78 @@ impl ExternalConsensusCommitSink { .build() .expect("failed to build commit sink runtime"); - while let Ok(info) = receiver.recv() { - let state_root = info.state_root; + while let Ok(queued) = receiver.recv() { + let QueuedBlockExecution { info, result_tx } = queued; let operations = state_changes_to_operations(info.state_changes); - runtime.block_on(async { - storage - .batch(operations) - .await - .expect("storage batch failed"); - storage.commit().await.expect("storage commit failed") + let result = runtime.block_on(async { + storage.batch(operations).await.map_err(|err| { + EvnodeError::Storage(format!("storage batch failed: {err:?}")) + })?; + storage.commit().await.map_err(|err| { + EvnodeError::Storage(format!("storage commit failed: {err:?}")) + }) }); - let block_hash = compute_block_hash(info.height, info.timestamp, parent_hash); - let metadata = BlockMetadata::new( - block_hash, - parent_hash, - state_root, - info.timestamp, - config.max_gas, - Address::ZERO, - config.chain_id, - ); - - let block = BlockBuilder::::new() - .number(info.height) - .timestamp(info.timestamp) - .transactions(info.transactions) - .build(); - let (stored_block, stored_txs, stored_receipts) = - build_index_data(&block, &info.block_result, &metadata); - - if config.indexing_enabled { - if let Some(ref index) = chain_index { - if let Err(err) = - index.store_block(stored_block, stored_txs, stored_receipts) - { - tracing::warn!("Failed to index block {}: {:?}", info.height, err); - } else { - tracing::debug!( - "Indexed block {} (hash={}, state_root={})", - info.height, - block_hash, - state_root + + match result { + Ok(commit_hash) => { + let committed_state_root = B256::from_slice(commit_hash.as_bytes()); + if committed_state_root != info.state_root { + tracing::warn!( + height = info.height, + preview_state_root = %info.state_root, + committed_state_root = %committed_state_root, + "execution state root differed from committed storage root" ); } + let block_hash = + compute_block_hash(info.height, info.timestamp, parent_hash); + let metadata = BlockMetadata::new( + block_hash, + parent_hash, + committed_state_root, + info.timestamp, + config.max_gas, + Address::ZERO, + config.chain_id, + ); + + let block = BlockBuilder::::new() + .number(info.height) + .timestamp(info.timestamp) + .transactions(info.transactions) + .build(); + let (stored_block, stored_txs, stored_receipts) = + build_index_data(&block, &info.block_result, &metadata); + + if config.indexing_enabled { + if let Some(ref index) = chain_index { + if let Err(err) = + index.store_block(stored_block, stored_txs, stored_receipts) + { + tracing::warn!( + "Failed to index block {}: {:?}", + info.height, + err + ); + } else { + tracing::debug!( + "Indexed block {} (hash={}, state_root={})", + info.height, + block_hash, + committed_state_root + ); + } + } + } + + parent_hash = block_hash; + current_height_for_worker.store(info.height, Ordering::SeqCst); + let _ = result_tx.send(Ok(committed_state_root)); + } + Err(err) => { + let _ = result_tx.send(Err(err)); } } - - parent_hash = block_hash; - current_height_for_worker.store(info.height, Ordering::SeqCst); } }); @@ -160,9 +190,23 @@ impl ExternalConsensusCommitSink { .clone(); Arc::new(move |info| { - sender - .send(info) - .expect("external consensus commit sink stopped unexpectedly"); + let sender = sender.clone(); + Box::pin(async move { + let (result_tx, result_rx) = oneshot::channel(); + sender + .send(QueuedBlockExecution { info, result_tx }) + .map_err(|_| { + EvnodeError::Unavailable( + "external consensus commit sink stopped unexpectedly".to_string(), + ) + })?; + result_rx.await.map_err(|_| { + EvnodeError::Unavailable( + "external consensus commit sink stopped before returning a root" + .to_string(), + ) + })? + }) }) } diff --git a/crates/rpc/evnode/src/service.rs b/crates/rpc/evnode/src/service.rs index 35b85ce..337881d 100644 --- a/crates/rpc/evnode/src/service.rs +++ b/crates/rpc/evnode/src/service.rs @@ -3,6 +3,8 @@ //! This module provides the gRPC server implementation for the EVNode ExecutorService, //! which allows ev-node to interact with the Evolve execution layer. +use std::future::Future; +use std::pin::Pin; use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use std::sync::Arc; @@ -16,7 +18,7 @@ use evolve_stf::results::BlockResult; use evolve_stf_traits::{AccountsCodeStorage, StateChange, Transaction}; use evolve_tx_eth::TxContext; use prost_types::Timestamp; -use tokio::sync::RwLock; +use tokio::sync::{Mutex, RwLock}; use tonic::{Request, Response, Status}; use crate::error::EvnodeError; @@ -27,18 +29,32 @@ use crate::proto::evnode::v1::{ SetFinalResponse, }; -/// Compute a state root from changes. +/// Compute a projected state root from a base root plus pending changes. /// -/// This is a simple hash of all changes for MVP. -/// In production, this would be a proper Merkle root. -pub fn compute_state_root(changes: &[StateChange]) -> B256 { +/// This is not the storage engine's Merkle root. It is a deterministic hash used +/// to track pending execution state between `execute_txs` calls until the storage +/// backend commits the changes. +pub fn compute_state_root(base_state_root: B256, changes: &[StateChange]) -> B256 { use alloy_primitives::keccak256; if changes.is_empty() { - return B256::ZERO; + return base_state_root; } - let mut data = Vec::new(); + let estimated_changes_bytes: usize = changes + .iter() + .map(|change| match change { + StateChange::Set { key, value } => { + 1 + std::mem::size_of::() + + key.len() + + std::mem::size_of::() + + value.len() + } + StateChange::Remove { key } => 1 + std::mem::size_of::() + key.len(), + }) + .sum(); + let mut data = Vec::with_capacity(B256::len_bytes() + estimated_changes_bytes); + data.extend_from_slice(base_state_root.as_slice()); for change in changes { match change { StateChange::Set { key, value } => { @@ -133,6 +149,8 @@ struct ExecutorState { finalized_height: AtomicU64, /// Pending state changes (applied but not yet committed). pending_changes: RwLock>, + /// Serializes block execution so validation and root updates stay linear. + execution_lock: Mutex<()>, } impl ExecutorState { @@ -146,6 +164,7 @@ impl ExecutorState { last_state_root: RwLock::new(B256::ZERO), finalized_height: AtomicU64::new(0), pending_changes: RwLock::new(Vec::new()), + execution_lock: Mutex::new(()), } } } @@ -157,6 +176,10 @@ impl ExecutorState { /// Note: The callback receives a reference to the state changes. pub type StateChangeCallback = Arc; +/// Future returned by the block execution callback. +pub type OnBlockExecutedFuture = + Pin> + Send + 'static>>; + /// Information passed to the `OnBlockExecuted` callback after a block is executed. /// /// Contains all data needed for storage commit and chain indexing. @@ -169,7 +192,7 @@ pub struct BlockExecutedInfo { pub state_changes: Vec, /// Full block execution result. pub block_result: BlockResult, - /// State root computed from changes (keccak-based). + /// Projected state root computed from prior state plus pending changes. pub state_root: B256, /// Transactions that were included in the block. pub transactions: Vec, @@ -179,8 +202,9 @@ pub struct BlockExecutedInfo { /// /// This provides all data needed for storage commit + chain indexing in one call. /// When set, this replaces the `StateChangeCallback` for `execute_txs` — the caller -/// takes full responsibility for persisting state changes. -pub type OnBlockExecuted = Arc; +/// takes full responsibility for persisting state changes and must return the +/// canonical committed state root that should be exposed to subsequent callers. +pub type OnBlockExecuted = Arc OnBlockExecutedFuture + Send + Sync>; /// ExecutorService implementation for EVNode. /// @@ -313,6 +337,32 @@ where // Store pending changes self.state.pending_changes.write().await.extend(changes); } + + async fn resolve_prev_state_root(&self, prev_state_root: &[u8]) -> Result { + let last_state_root = *self.state.last_state_root.read().await; + + if prev_state_root.is_empty() { + return Ok(last_state_root); + } + + if prev_state_root.len() != B256::len_bytes() { + return Err(EvnodeError::InvalidArgument(format!( + "prev_state_root must be {} bytes, got {}", + B256::len_bytes(), + prev_state_root.len() + ))); + } + + let provided_prev_state_root = B256::from_slice(prev_state_root); + if provided_prev_state_root != last_state_root { + return Err(EvnodeError::InvalidArgument(format!( + "prev_state_root mismatch: expected {}, got {}", + last_state_root, provided_prev_state_root + ))); + } + + Ok(provided_prev_state_root) + } } #[async_trait] @@ -349,7 +399,7 @@ where .map_err(|e| Status::internal(format!("genesis failed: {}", e)))?; // Compute state root - let state_root = compute_state_root(&changes); + let state_root = compute_state_root(B256::ZERO, &changes); // Handle state changes (store pending, call callback) self.handle_state_changes(changes).await; @@ -443,6 +493,13 @@ where } } + let _execute_guard = self.state.execution_lock.lock().await; + + let prev_state_root = self + .resolve_prev_state_root(&req.prev_state_root) + .await + .map_err(Status::from)?; + // Build the block let block = evolve_server::BlockBuilder::::new() .number(req.block_height) @@ -451,15 +508,16 @@ where .transactions(transactions) .build(); - // Execute through STF + // Execute through STF while holding the execution lock so storage reads and + // root validation remain linearized. let (result, exec_state) = self.stf.execute_block(&self.storage, &self.codes, &block); let changes = exec_state .into_changes() .map_err(|e| Status::internal(format!("failed to get state changes: {:?}", e)))?; - // Compute updated state root - let updated_state_root = compute_state_root(&changes); + // Compute the projected state root over the previous pending root. + let projected_state_root = compute_state_root(prev_state_root, &changes); // Log execution results (before moving ownership) let executed_tx_count = result.tx_results.len(); @@ -480,31 +538,34 @@ where gas_used ); - // Finalize the block proposal: remove executed txs, return unexecuted in-flight txs. - if let Some(ref mempool) = self.mempool { - let executed_hashes: Vec<[u8; 32]> = block - .transactions - .iter() - .take(executed_tx_count) - .map(|tx| tx.hash().0) - .collect(); - let mut pool = mempool.write().await; - pool.finalize(&executed_hashes); - } + let executed_hashes: Vec<[u8; 32]> = block + .transactions + .iter() + .take(executed_tx_count) + .map(|tx| tx.hash().0) + .collect(); - // Handle state changes / notify callback - if let Some(ref callback) = self.on_block_executed { + // Handle state changes / notify callback. The callback returns the committed + // root so external-consensus callers observe the same root that gets indexed. + let updated_state_root = if let Some(ref callback) = self.on_block_executed { let info = BlockExecutedInfo { height: req.block_height, timestamp, state_changes: changes, block_result: result, - state_root: updated_state_root, + state_root: projected_state_root, transactions: block.transactions, }; - callback(info); + callback(info).await.map_err(Status::from)? } else { self.handle_state_changes(changes).await; + projected_state_root + }; + + // Finalize the block proposal only after state persistence succeeds. + if let Some(ref mempool) = self.mempool { + let mut pool = mempool.write().await; + pool.finalize(&executed_hashes); } // Update state @@ -615,8 +676,10 @@ mod tests { use evolve_stf::results::TxResult; use evolve_stf_traits::Transaction; use k256::ecdsa::SigningKey; + use std::collections::VecDeque; use std::sync::atomic::AtomicUsize; use std::sync::Mutex; + use tokio::sync::Notify; use tonic::Code; use crate::proto::evnode::v1::executor_service_server::ExecutorService; @@ -864,7 +927,7 @@ mod tests { key: b"k".to_vec(), value: b"v".to_vec(), }]; - let expected_root = compute_state_root(&changes); + let expected_root = compute_state_root(B256::ZERO, &changes); let service = mk_service(true); let req = InitChainRequest { genesis_time: Some(Timestamp { @@ -1015,7 +1078,7 @@ mod tests { key: b"gone".to_vec(), }, ]; - let expected_root = compute_state_root(&expected_changes); + let expected_root = compute_state_root(B256::ZERO, &expected_changes); let service = mk_service_with_execute_changes(expected_changes); init_test_chain(&service).await; @@ -1065,6 +1128,44 @@ mod tests { assert_eq!(observed.tx_hashes, vec![valid_tx_hash]); } + #[tokio::test] + async fn execute_txs_rejects_invalid_prev_state_root_length() { + let service = mk_service_with_execute_changes(Vec::new()); + init_test_chain(&service).await; + + let err = service + .execute_txs(Request::new(ExecuteTxsRequest { + block_height: 2, + timestamp: None, + prev_state_root: vec![0xaa; 31], + txs: vec![sample_legacy_tx_bytes()], + })) + .await + .expect_err("execute_txs should reject malformed prev_state_root"); + + assert_eq!(err.code(), Code::InvalidArgument); + assert!(err.message().contains("prev_state_root must be 32 bytes")); + } + + #[tokio::test] + async fn execute_txs_rejects_mismatched_prev_state_root() { + let service = mk_service_with_execute_changes(Vec::new()); + init_test_chain(&service).await; + + let err = service + .execute_txs(Request::new(ExecuteTxsRequest { + block_height: 2, + timestamp: None, + prev_state_root: vec![0x11; 32], + txs: vec![sample_legacy_tx_bytes()], + })) + .await + .expect_err("execute_txs should reject mismatched prev_state_root"); + + assert_eq!(err.code(), Code::InvalidArgument); + assert!(err.message().contains("prev_state_root mismatch")); + } + #[tokio::test] async fn execute_txs_prefers_block_callback_over_state_change_callback() { let execute_changes = vec![StateChange::Set { @@ -1083,8 +1184,9 @@ mod tests { }) .with_on_block_executed({ let calls = Arc::clone(&block_callback_calls); - Arc::new(move |_| { + Arc::new(move |info| { calls.fetch_add(1, Ordering::SeqCst); + Box::pin(async move { Ok(info.state_root) }) }) }); init_test_chain(&service).await; @@ -1119,7 +1221,7 @@ mod tests { key: b"payload".to_vec(), value: b"ok".to_vec(), }]; - let expected_root = compute_state_root(&execute_changes); + let expected_root = compute_state_root(B256::ZERO, &execute_changes); let callback_snapshot = Arc::new(Mutex::new(None::<(u64, u64, usize, usize, B256)>)); let service = mk_service_with_execute_changes(execute_changes).with_on_block_executed({ let snapshot = Arc::clone(&callback_snapshot); @@ -1133,6 +1235,8 @@ mod tests { info.block_result.tx_results.len(), info.state_root, )); + let projected_root = info.state_root; + Box::pin(async move { Ok(projected_root) }) }) }); init_test_chain(&service).await; @@ -1161,6 +1265,155 @@ mod tests { assert_eq!(snapshot.4, expected_root); } + #[tokio::test] + async fn execute_txs_returns_committed_root_from_block_callback() { + let execute_changes = vec![StateChange::Set { + key: b"payload".to_vec(), + value: b"ok".to_vec(), + }]; + let committed_roots = Arc::new(Mutex::new(VecDeque::from([ + B256::repeat_byte(0x11), + B256::repeat_byte(0x22), + ]))); + let service = mk_service_with_execute_changes(execute_changes).with_on_block_executed({ + let committed_roots = Arc::clone(&committed_roots); + Arc::new(move |_info| { + let committed_roots = Arc::clone(&committed_roots); + Box::pin(async move { + committed_roots + .lock() + .expect("committed roots lock should not be poisoned") + .pop_front() + .ok_or_else(|| { + EvnodeError::Internal( + "missing committed root for block callback test".to_string(), + ) + }) + }) + }) + }); + init_test_chain(&service).await; + + let first_response = service + .execute_txs(Request::new(ExecuteTxsRequest { + block_height: 2, + timestamp: None, + prev_state_root: vec![], + txs: vec![sample_legacy_tx_bytes()], + })) + .await + .expect("first execute_txs should succeed") + .into_inner(); + + assert_eq!( + first_response.updated_state_root, + B256::repeat_byte(0x11).to_vec() + ); + assert_eq!( + *service.state.last_state_root.read().await, + B256::repeat_byte(0x11) + ); + + let second_response = service + .execute_txs(Request::new(ExecuteTxsRequest { + block_height: 3, + timestamp: None, + prev_state_root: first_response.updated_state_root.clone(), + txs: vec![sample_legacy_tx_bytes()], + })) + .await + .expect("second execute_txs should accept the committed prev_state_root") + .into_inner(); + + assert_eq!( + second_response.updated_state_root, + B256::repeat_byte(0x22).to_vec() + ); + assert_eq!( + *service.state.last_state_root.read().await, + B256::repeat_byte(0x22) + ); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn execute_txs_serializes_prev_state_root_validation() { + let execute_changes = vec![StateChange::Set { + key: b"payload".to_vec(), + value: b"ok".to_vec(), + }]; + let first_callback_entered = Arc::new(Notify::new()); + let release_first_callback = Arc::new(Notify::new()); + let callback_calls = Arc::new(AtomicUsize::new(0)); + let committed_root = B256::repeat_byte(0x33); + + let service = Arc::new( + mk_service_with_execute_changes(execute_changes).with_on_block_executed({ + let first_callback_entered = Arc::clone(&first_callback_entered); + let release_first_callback = Arc::clone(&release_first_callback); + let callback_calls = Arc::clone(&callback_calls); + Arc::new(move |info| { + let first_callback_entered = Arc::clone(&first_callback_entered); + let release_first_callback = Arc::clone(&release_first_callback); + let callback_calls = Arc::clone(&callback_calls); + Box::pin(async move { + if callback_calls.fetch_add(1, Ordering::SeqCst) == 0 { + first_callback_entered.notify_one(); + release_first_callback.notified().await; + } + Ok::<_, EvnodeError>(if callback_calls.load(Ordering::SeqCst) == 1 { + committed_root + } else { + info.state_root + }) + }) + }) + }), + ); + init_test_chain(service.as_ref()).await; + + let first_service = Arc::clone(&service); + let first_request = tokio::spawn(async move { + first_service + .execute_txs(Request::new(ExecuteTxsRequest { + block_height: 2, + timestamp: None, + prev_state_root: vec![], + txs: vec![sample_legacy_tx_bytes()], + })) + .await + }); + + first_callback_entered.notified().await; + + let second_service = Arc::clone(&service); + let second_request = tokio::spawn(async move { + second_service + .execute_txs(Request::new(ExecuteTxsRequest { + block_height: 3, + timestamp: None, + prev_state_root: B256::ZERO.to_vec(), + txs: vec![sample_legacy_tx_bytes()], + })) + .await + }); + + release_first_callback.notify_one(); + + let first_response = first_request + .await + .expect("first task should complete") + .expect("first execute_txs should succeed") + .into_inner(); + assert_eq!(first_response.updated_state_root, committed_root.to_vec()); + + let err = second_request + .await + .expect("second task should complete") + .expect_err("second execute_txs should reject the stale prev_state_root"); + assert_eq!(err.code(), Code::InvalidArgument); + assert!(err.message().contains("prev_state_root mismatch")); + } + #[tokio::test] async fn get_txs_filter_limits_and_finalize_interact_consistently() { let tx_bytes = sample_legacy_tx_bytes(); @@ -1291,4 +1544,49 @@ mod tests { "only the unexecuted tail should be re-proposed" ); } + + #[tokio::test] + async fn execute_txs_chains_projected_state_root_from_previous_root() { + let execute_changes = vec![StateChange::Set { + key: b"next".to_vec(), + value: b"value".to_vec(), + }]; + let genesis_root = compute_state_root( + B256::ZERO, + &[StateChange::Set { + key: b"k".to_vec(), + value: b"v".to_vec(), + }], + ); + let expected_root = compute_state_root(genesis_root, &execute_changes); + + let service = ExecutorServiceImpl::new( + MockStf::new(true, execute_changes), + MockStorage, + MockCodes, + ExecutorServiceConfig::default(), + ); + service + .init_chain(Request::new(InitChainRequest { + genesis_time: None, + initial_height: 1, + chain_id: "chain-root".to_string(), + })) + .await + .expect("init should succeed"); + + let response = service + .execute_txs(Request::new(ExecuteTxsRequest { + block_height: 2, + timestamp: None, + prev_state_root: genesis_root.to_vec(), + txs: vec![sample_legacy_tx_bytes()], + })) + .await + .expect("execute_txs should succeed") + .into_inner(); + + assert_eq!(response.updated_state_root, expected_root.to_vec()); + assert_eq!(*service.state.last_state_root.read().await, expected_root); + } } diff --git a/crates/rpc/evnode/src/testapp_impl.rs b/crates/rpc/evnode/src/testapp_impl.rs index e42dddd..d3f63c6 100644 --- a/crates/rpc/evnode/src/testapp_impl.rs +++ b/crates/rpc/evnode/src/testapp_impl.rs @@ -45,7 +45,7 @@ where })?; // Compute a simple state root from changes - let state_root = compute_state_root(&changes); + let state_root = compute_state_root(B256::ZERO, &changes); tracing::info!( "Genesis completed: scheduler={:?}, alice={:?}, bob={:?}", diff --git a/crates/storage/src/block_store.rs b/crates/storage/src/block_store.rs index caa5bfa..9b29f99 100644 --- a/crates/storage/src/block_store.rs +++ b/crates/storage/src/block_store.rs @@ -32,7 +32,9 @@ use crate::types::{BlockHash, BlockStorageConfig}; use commonware_codec::RangeCfg; -use commonware_runtime::{buffer::PoolRef, Clock, Metrics, Storage as RStorage}; +use commonware_runtime::{ + buffer::paged::CacheRef, BufferPooler, Clock, Metrics, Storage as RStorage, +}; use commonware_storage::{ archive::{ prunable::{Archive, Config as ArchiveConfig}, @@ -74,14 +76,14 @@ const KEY_JOURNAL_CACHE_PAGES: usize = 64; /// should be wrapped in a lock (`tokio::sync::Mutex`) when shared across tasks. pub struct BlockStorage where - C: RStorage + Clock + Metrics + Clone + Send + Sync + 'static, + C: RStorage + BufferPooler + Clock + Metrics + Clone + Send + Sync + 'static, { archive: Archive, } impl BlockStorage where - C: RStorage + Clock + Metrics + Clone + Send + Sync + 'static, + C: RStorage + BufferPooler + Clock + Metrics + Clone + Send + Sync + 'static, { /// Initialize block storage using the provided runtime context and config. /// @@ -108,12 +110,12 @@ where // Buffer pool for the key journal. let page_size = std::num::NonZeroU16::new(KEY_JOURNAL_PAGE_SIZE).unwrap(); let cache_pages = std::num::NonZeroUsize::new(KEY_JOURNAL_CACHE_PAGES).unwrap(); - let key_buffer_pool = PoolRef::new(page_size, cache_pages); + let key_page_cache = CacheRef::from_pooler(&context, page_size, cache_pages); let cfg = ArchiveConfig { translator: EightCap, key_partition: format!("{}-block-index", config.partition_prefix), - key_buffer_pool, + key_page_cache, value_partition: format!("{}-block-data", config.partition_prefix), // No compression by default. Blocks are often already compressed (gzip/zstd // at the application layer), so double-compression wastes CPU. diff --git a/crates/storage/src/qmdb_impl.rs b/crates/storage/src/qmdb_impl.rs index 4ca2201..48922eb 100644 --- a/crates/storage/src/qmdb_impl.rs +++ b/crates/storage/src/qmdb_impl.rs @@ -1,21 +1,12 @@ -//! QmdbStorage implementation using commonware's QMDB +//! QmdbStorage implementation using commonware's Current QMDB. //! //! QMDB (Quick Merkle Database) provides: //! - Historical proofs for any value ever associated with a key //! - Efficient Merkle root computation //! - Pruning support for storage management //! -//! ## State Machine -//! -//! QMDB uses a state machine pattern: -//! - Clean (Merkleized, Durable) - ready for proofs, has root hash -//! - Mutable (Unmerkleized, NonDurable) - can update/delete keys -//! -//! Transitions: -//! - init() → Clean -//! - into_mutable() → Mutable -//! - commit(metadata) → (Durable, Range) - returns tuple -//! - into_merkleized() → Clean +//! Mutations are expressed as speculative +//! batches that are merkleized before they are applied to the live database. // Instant is used for performance metrics, not consensus-affecting logic. #![allow(clippy::disallowed_types)] @@ -28,13 +19,14 @@ use crate::types::{ }; use async_trait::async_trait; use commonware_cryptography::sha256::Sha256; -use commonware_runtime::{utils::buffer::pool::PoolRef, Clock, Metrics, Storage as RStorage}; -use commonware_storage::qmdb::{ - current::{unordered::fixed::Db, FixedConfig}, - store::{Durable, NonDurable}, - Merkleized, Unmerkleized, +use commonware_runtime::{ + buffer::paged::CacheRef, BufferPooler, Clock, Metrics, Storage as RStorage, }; use commonware_storage::translator::EightCap; +use commonware_storage::{ + qmdb::current::{unordered::fixed::Db, FixedConfig}, + Persistable, +}; use evolve_core::{ErrorCode, ReadonlyKV}; use std::num::{NonZeroU16, NonZeroU64, NonZeroUsize}; use std::sync::Arc; @@ -43,27 +35,17 @@ use thiserror::Error; use tokio::runtime::RuntimeFlavor; use tokio::sync::RwLock; -/// Type alias for QMDB in Clean state (Merkleized, Durable) -/// N = 64 because SHA256 digest is 32 bytes, and N must be 2 * digest_size -type QmdbClean = - Db, Durable>; - -/// Type alias for QMDB in Mutable state (Unmerkleized, NonDurable) -type QmdbMutable = - Db; - -/// Type alias for QMDB in Durable/Unmerkleized state (after commit, before merkleize) -type QmdbDurable = - Db; - -/// Internal state enum to track QMDB state machine transitions -enum QmdbState { - /// Clean state - ready for proofs, durable - Clean(QmdbClean), - /// Mutable state - can perform updates/deletes - Mutable(QmdbMutable), - /// Transitional state for ownership management - Transitioning, +/// Type alias for QMDB in Current state. +/// `N = 64` because SHA256 digests are 32 bytes and QMDB expects `2 * digest_size`. +type QmdbCurrent = Db; + +#[derive(Debug)] +struct PreparedBatch { + updates: Vec<(StorageKey, Option)>, + keys_to_invalidate: Vec>, + ops_count: usize, + sets: usize, + deletes: usize, } /// Error types for QmdbStorage @@ -94,38 +76,44 @@ impl From for StorageError { } } -/// Maps QMDB errors to evolve error codes -fn map_qmdb_error(err: impl std::fmt::Display) -> ErrorCode { - tracing::error!("QMDB error: {err}"); - crate::types::ERR_ADB_ERROR +fn map_qmdb_error(err: impl std::fmt::Display) -> StorageError { + StorageError::Qmdb(err.to_string()) +} + +fn map_storage_error(err: StorageError) -> ErrorCode { + match err { + StorageError::Key(code) => code, + StorageError::ValueTooLarge { .. } => crate::types::ERR_VALUE_TOO_LARGE, + StorageError::InvalidState(_) => crate::types::ERR_CONCURRENCY_ERROR, + StorageError::Qmdb(err) => { + tracing::error!("QMDB error: {err}"); + crate::types::ERR_ADB_ERROR + } + StorageError::Io(err) => { + tracing::error!("Storage IO error: {err}"); + crate::types::ERR_STORAGE_IO + } + StorageError::InvalidConfig(err) => { + tracing::error!("Invalid storage config: {err}"); + crate::types::ERR_STORAGE_IO + } + } } -/// Maps concurrency errors to evolve error codes -fn map_concurrency_error(err: impl std::fmt::Display) -> ErrorCode { - tracing::error!("Concurrency error: {err}"); - crate::types::ERR_CONCURRENCY_ERROR +fn root_to_commit_hash(root: impl AsRef<[u8]>) -> Result { + let bytes: [u8; 32] = root + .as_ref() + .try_into() + .map_err(|_| StorageError::Qmdb("invalid root hash size".to_string()))?; + Ok(crate::types::CommitHash::new(bytes)) } -/// QmdbStorage implements evolve's storage traits using commonware's QMDB -/// -/// Provides: -/// - Synchronous read via `ReadonlyKV::get()` (uses block_on internally) -/// - Async batch operations via `Storage::batch()` -/// - Async commit with Merkle root via `Storage::commit()` -/// - In-memory read cache (ShardedDbCache) for reduced lock contention -/// -/// ## State Machine Management -/// -/// The wrapper automatically manages QMDB state transitions: -/// - `batch()` ensures the DB is in Mutable state before applying operations -/// - `commit()` transitions through: Mutable → Durable → Clean (with Merkle root) +/// QmdbStorage implements evolve's storage traits using commonware's QMDB. pub struct QmdbStorage where - C: RStorage + Clock + Metrics + Clone + Send + Sync + 'static, + C: RStorage + BufferPooler + Clock + Metrics + Clone + Send + Sync + 'static, { - #[allow(dead_code)] - context: Arc, - state: Arc>>, + db: Arc>>, /// Read cache for fast synchronous lookups cache: Arc, /// Optional metrics for monitoring storage performance @@ -134,12 +122,11 @@ where impl Clone for QmdbStorage where - C: RStorage + Clock + Metrics + Clone + Send + Sync + 'static, + C: RStorage + BufferPooler + Clock + Metrics + Clone + Send + Sync + 'static, { fn clone(&self) -> Self { Self { - context: self.context.clone(), - state: self.state.clone(), + db: self.db.clone(), cache: self.cache.clone(), metrics: self.metrics.clone(), } @@ -148,7 +135,7 @@ where impl QmdbStorage where - C: RStorage + Clock + Metrics + Clone + Send + Sync + 'static, + C: RStorage + BufferPooler + Clock + Metrics + Clone + Send + Sync + 'static, { /// Create a new QmdbStorage instance pub async fn new( @@ -193,20 +180,21 @@ where mmr_items_per_blob: NonZeroU64::new(1000).unwrap(), mmr_write_buffer: write_buffer_size, mmr_metadata_partition: format!("{}_mmr-metadata", config.partition_prefix), - bitmap_metadata_partition: format!("{}_bitmap-metadata", config.partition_prefix), + grafted_mmr_metadata_partition: format!( + "{}_grafted-mmr-metadata", + config.partition_prefix + ), translator: EightCap, thread_pool: None, - buffer_pool: PoolRef::new(page_size, capacity), + page_cache: CacheRef::from_pooler(&context, page_size, capacity), }; - // Initialize QMDB - starts in Clean state (Merkleized, Durable) - let db: QmdbClean = Db::init(context.clone(), qmdb_config) + let db = Db::init(context, qmdb_config) .await - .map_err(|e| StorageError::Qmdb(e.to_string()))?; + .map_err(map_qmdb_error)?; Ok(Self { - context: Arc::new(context), - state: Arc::new(RwLock::new(QmdbState::Clean(db))), + db: Arc::new(RwLock::new(db)), cache: Arc::new(ShardedDbCache::with_defaults()), metrics, }) @@ -234,92 +222,88 @@ where self.get_async_uncached(key).await } - /// Commit the current state and generate a commit hash - /// - /// State transition: Mutable → Durable → Clean (Merkleized) - pub async fn commit_state(&self) -> Result { - let start = Instant::now(); - let mut state_guard = self.state.write().await; + /// Async batch get that resolves multiple keys while taking the QMDB read lock once. + pub async fn get_many_async( + &self, + keys: &[Vec], + ) -> Result>>, ErrorCode> { + if keys.len() > crate::types::MAX_BATCH_SIZE { + return Err(crate::types::ERR_BATCH_TOO_LARGE); + } - // Take ownership to perform state transitions - let current_state = std::mem::replace(&mut *state_guard, QmdbState::Transitioning); + if keys.is_empty() { + return Ok(Vec::new()); + } - let clean_db = match current_state { - QmdbState::Clean(db) => { - // Already clean, just return current root - db - } - QmdbState::Mutable(db) => { - // Mutable → commit() → (Durable, Range) - let (durable_db, _range): (QmdbDurable, _) = db - .commit(None) - .await - .map_err(|e| StorageError::Qmdb(e.to_string()))?; - - // Durable → into_merkleized() → Clean - let clean: QmdbClean = durable_db - .into_merkleized() - .await - .map_err(|e| StorageError::Qmdb(e.to_string()))?; - - clean - } - QmdbState::Transitioning => { - return Err(StorageError::InvalidState( - "Storage in transitioning state".to_string(), - )); + let mut results = vec![None; keys.len()]; + let mut misses = Vec::with_capacity(keys.len()); + + for (idx, key) in keys.iter().enumerate() { + if let Some(cached) = self.cache.get(key) { + self.metrics.record_cache_hit(); + match cached { + CachedValue::Present(data) => results[idx] = Some(data), + CachedValue::Absent => {} + } + continue; } - }; - // Get the root hash - let root = clean_db.root(); - let hash = match root.as_ref().try_into() { - Ok(bytes) => crate::types::CommitHash::new(bytes), - Err(_) => { - *state_guard = QmdbState::Clean(clean_db); - return Err(StorageError::Qmdb("Invalid root hash size".to_string())); + self.metrics.record_cache_miss(); + misses.push((idx, key.clone(), create_storage_key(key)?)); + } + + if misses.is_empty() { + return Ok(results); + } + + let start = Instant::now(); + let miss_values = self.get_many_async_uncached(&misses).await?; + self.metrics + .record_read_latency(start.elapsed().as_secs_f64()); + + for (miss, value) in misses.into_iter().zip(miss_values) { + let (idx, key, _) = miss; + match &value { + Some(data) => self.cache.insert_present(key, data.clone()), + None => self.cache.insert_absent(key), } - }; + results[idx] = value; + } - // Store clean state back - *state_guard = QmdbState::Clean(clean_db); + Ok(results) + } - // Record commit latency - self.metrics.record_commit(start.elapsed().as_secs_f64()); + /// Synchronous wrapper for batched reads. + pub fn get_many(&self, keys: &[Vec]) -> Result>>, ErrorCode> { + if let Ok(handle) = tokio::runtime::Handle::try_current() { + return match handle.runtime_flavor() { + RuntimeFlavor::MultiThread => { + tokio::task::block_in_place(|| handle.block_on(self.get_many_async(keys))) + } + RuntimeFlavor::CurrentThread => Err(crate::types::ERR_RUNTIME_ERROR), + _ => tokio::task::block_in_place(|| handle.block_on(self.get_many_async(keys))), + }; + } - Ok(hash) + futures::executor::block_on(self.get_many_async(keys)) } - /// Apply a batch of operations - pub async fn apply_batch( - &self, + fn prepare_batch( operations: Vec, - ) -> Result<(), StorageError> { - // Validate batch size against MAX_BATCH_SIZE + ) -> Result { if operations.len() > crate::types::MAX_BATCH_SIZE { return Err(StorageError::Key(crate::types::ERR_BATCH_TOO_LARGE)); } - if operations.is_empty() { - return Ok(()); - } - - let start = Instant::now(); let ops_count = operations.len(); let mut sets = 0usize; let mut deletes = 0usize; - - // Collect keys for cache invalidation - let mut keys_to_invalidate = Vec::with_capacity(operations.len()); - - // Pre-compute all storage keys and values before acquiring the lock - let mut prepared_updates: Vec<(StorageKey, Option)> = - Vec::with_capacity(operations.len()); + let mut keys_to_invalidate = Vec::with_capacity(ops_count); + let mut updates = Vec::with_capacity(ops_count); for op in operations { match op { crate::types::Operation::Set { key, value } => { - // Validate value size before processing (must fit with length prefix) if value.len() > MAX_VALUE_DATA_SIZE { return Err(StorageError::ValueTooLarge { size: value.len(), @@ -331,63 +315,92 @@ where let storage_key = create_storage_key(&key)?; let storage_value = create_storage_value_chunk(&value)?; keys_to_invalidate.push(key); - prepared_updates.push((storage_key, Some(storage_value))); + updates.push((storage_key, Some(storage_value))); } crate::types::Operation::Remove { key } => { deletes += 1; let storage_key = create_storage_key(&key)?; keys_to_invalidate.push(key); - prepared_updates.push((storage_key, None)); + updates.push((storage_key, None)); } } } - let mut state_guard = self.state.write().await; + Ok(PreparedBatch { + updates, + keys_to_invalidate, + ops_count, + sets, + deletes, + }) + } + + /// Commit the current state and generate a commit hash. + pub async fn commit_state(&self) -> Result { + let start = Instant::now(); + let db = self.db.read().await; + db.commit().await.map_err(map_qmdb_error)?; + let hash = root_to_commit_hash(db.root())?; + self.metrics.record_commit(start.elapsed().as_secs_f64()); + Ok(hash) + } - // Take ownership to perform state transition if needed - let current_state = std::mem::replace(&mut *state_guard, QmdbState::Transitioning); + /// Preview the state root produced by applying a batch without mutating the database. + pub async fn preview_batch_root( + &self, + operations: &[crate::types::Operation], + ) -> Result { + let prepared = Self::prepare_batch(operations.to_vec())?; + let db = self.db.read().await; - let mut mutable_db: QmdbMutable = match current_state { - QmdbState::Clean(db) => { - // Clean → into_mutable() → Mutable (sync method) - db.into_mutable() - } - QmdbState::Mutable(db) => db, - QmdbState::Transitioning => { - return Err(StorageError::InvalidState( - "Storage in transitioning state".to_string(), - )); - } - }; + if prepared.updates.is_empty() { + return root_to_commit_hash(db.root()); + } - // Apply all updates - for (storage_key, maybe_value) in prepared_updates { - match maybe_value { - Some(storage_value) => { - if let Err(e) = mutable_db.update(storage_key, storage_value).await { - *state_guard = QmdbState::Mutable(mutable_db); - return Err(StorageError::Qmdb(e.to_string())); - } - } - None => { - // Delete the key - if let Err(e) = mutable_db.delete(storage_key).await { - *state_guard = QmdbState::Mutable(mutable_db); - return Err(StorageError::Qmdb(e.to_string())); - } - } - } + let mut batch = db.new_batch(); + for (storage_key, maybe_value) in prepared.updates { + batch.write(storage_key, maybe_value); } + let merkleized = batch.merkleize(None).await.map_err(map_qmdb_error)?; + root_to_commit_hash(merkleized.root()) + } - // Store mutable state back - *state_guard = QmdbState::Mutable(mutable_db); + /// Apply a batch of operations. + pub async fn apply_batch( + &self, + operations: Vec, + ) -> Result<(), StorageError> { + let prepared = Self::prepare_batch(operations)?; + if prepared.updates.is_empty() { + return Ok(()); + } + + let start = Instant::now(); + let PreparedBatch { + updates, + keys_to_invalidate, + ops_count, + sets, + deletes, + } = prepared; + + { + let mut db = self.db.write().await; + let changeset = { + let mut batch = db.new_batch(); + for (storage_key, maybe_value) in updates { + batch.write(storage_key, maybe_value); + } + let merkleized = batch.merkleize(None).await.map_err(map_qmdb_error)?; + merkleized.finalize() + }; + db.apply_batch(changeset).await.map_err(map_qmdb_error)?; + } - // Invalidate cache entries for modified keys for key in keys_to_invalidate { self.cache.invalidate(&key); } - // Record batch metrics self.metrics .record_batch(start.elapsed().as_secs_f64(), ops_count, sets, deletes); @@ -397,50 +410,44 @@ where /// Internal async get implementation - bypasses cache, hits QMDB directly. async fn get_async_uncached(&self, key: &[u8]) -> Result>, ErrorCode> { let storage_key = create_storage_key(key)?; - let state = self.state.clone(); - - let result: Result, _> = { - let state_guard = state.read().await; - let state_name = match &*state_guard { - QmdbState::Clean(_) => "Clean", - QmdbState::Mutable(_) => "Mutable", - QmdbState::Transitioning => "Transitioning", - }; - tracing::debug!( - "get_async_uncached: state={}, key_len={}", - state_name, - key.len() - ); + let db = self.db.read().await; + Self::decode_storage_value(db.get(&storage_key).await) + } - match &*state_guard { - QmdbState::Clean(db) => db.get(&storage_key).await, - QmdbState::Mutable(db) => db.get(&storage_key).await, - QmdbState::Transitioning => { - return Err(crate::types::ERR_CONCURRENCY_ERROR); - } - } - }; + async fn get_many_async_uncached( + &self, + misses: &[(usize, Vec, StorageKey)], + ) -> Result>>, ErrorCode> { + let db = self.db.read().await; + let mut values = Vec::with_capacity(misses.len()); + for (_, _, storage_key) in misses { + values.push(Self::decode_storage_value(db.get(storage_key).await)?); + } + + Ok(values) + } + + fn decode_storage_value( + result: Result, impl std::fmt::Display>, + ) -> Result>, ErrorCode> { match result { - Ok(Some(value_chunk)) => { - // Extract value using length prefix - match extract_value_from_chunk(&value_chunk) { - Some(data) if data.is_empty() => Ok(None), // Empty value treated as absent - Some(data) => Ok(Some(data)), - None => { - // Invalid length prefix - treat as corrupted/absent - tracing::warn!("Invalid value chunk format, treating as absent"); - Ok(None) - } + Ok(Some(value_chunk)) => match extract_value_from_chunk(&value_chunk) { + Some(data) if data.is_empty() => Ok(None), + Some(data) => Ok(Some(data)), + None => { + tracing::warn!("Invalid value chunk format, treating as absent"); + Ok(None) } - } + }, Ok(None) => Ok(None), Err(e) => { let err_str = e.to_string(); if err_str.contains("not found") { Ok(None) } else { - Err(map_qmdb_error(err_str)) + tracing::error!("QMDB read error: {err_str}"); + Err(crate::types::ERR_ADB_ERROR) } } } @@ -450,7 +457,7 @@ where // Implement ReadonlyKV for QmdbStorage impl ReadonlyKV for QmdbStorage where - C: RStorage + Clock + Metrics + Clone + Send + Sync + 'static, + C: RStorage + BufferPooler + Clock + Metrics + Clone + Send + Sync + 'static, { fn get(&self, key: &[u8]) -> Result>, ErrorCode> { // Fast path: check cache first (synchronous, no async overhead) @@ -498,18 +505,16 @@ where #[async_trait(?Send)] impl crate::Storage for QmdbStorage where - C: RStorage + Clock + Metrics + Clone + Send + Sync + 'static, + C: RStorage + BufferPooler + Clock + Metrics + Clone + Send + Sync + 'static, { async fn commit(&self) -> Result { - self.commit_state() - .await - .map_err(|_| crate::types::ERR_STORAGE_IO) + self.commit_state().await.map_err(map_storage_error) } async fn batch(&self, operations: Vec) -> Result<(), ErrorCode> { self.apply_batch(operations) .await - .map_err(map_concurrency_error) + .map_err(map_storage_error) } } @@ -1319,4 +1324,133 @@ mod tests { assert_eq!(retrieved, value, "Value must be identical after commit"); }) } + + #[test] + fn test_preview_batch_root_matches_eventual_commit_hash() { + let temp_dir = TempDir::new().unwrap(); + let config = crate::types::StorageConfig { + path: temp_dir.path().to_path_buf(), + ..Default::default() + }; + + let runtime_config = TokioConfig::default() + .with_storage_directory(temp_dir.path()) + .with_worker_threads(2); + + let runner = Runner::new(runtime_config); + + runner.start(|context| async move { + let storage = QmdbStorage::new(context, config).await.unwrap(); + storage + .apply_batch(vec![crate::types::Operation::Set { + key: b"base".to_vec(), + value: b"value1".to_vec(), + }]) + .await + .unwrap(); + let committed_base_hash = storage.commit_state().await.unwrap(); + + let preview_operations = vec![ + crate::types::Operation::Remove { + key: b"base".to_vec(), + }, + crate::types::Operation::Set { + key: b"next".to_vec(), + value: b"value2".to_vec(), + }, + ]; + let preview_hash = storage + .preview_batch_root(&preview_operations) + .await + .unwrap(); + + assert_ne!(preview_hash, committed_base_hash); + assert_eq!(storage.get(b"base").unwrap(), Some(b"value1".to_vec())); + assert_eq!(storage.get(b"next").unwrap(), None); + + storage.apply_batch(preview_operations).await.unwrap(); + let committed_hash = storage.commit_state().await.unwrap(); + + assert_eq!(preview_hash, committed_hash); + assert_eq!(storage.get(b"base").unwrap(), None); + assert_eq!(storage.get(b"next").unwrap(), Some(b"value2".to_vec())); + }) + } + + #[test] + fn test_get_many_returns_results_in_order() { + let temp_dir = TempDir::new().unwrap(); + let config = crate::types::StorageConfig { + path: temp_dir.path().to_path_buf(), + ..Default::default() + }; + + let runtime_config = TokioConfig::default() + .with_storage_directory(temp_dir.path()) + .with_worker_threads(2); + + let runner = Runner::new(runtime_config); + + runner.start(|context| async move { + let storage = QmdbStorage::new(context, config).await.unwrap(); + storage + .apply_batch(vec![ + crate::types::Operation::Set { + key: b"key1".to_vec(), + value: b"value1".to_vec(), + }, + crate::types::Operation::Set { + key: b"key2".to_vec(), + value: b"value2".to_vec(), + }, + ]) + .await + .unwrap(); + + let values = storage + .get_many(&[b"missing".to_vec(), b"key2".to_vec(), b"key1".to_vec()]) + .unwrap(); + + assert_eq!( + values, + vec![None, Some(b"value2".to_vec()), Some(b"value1".to_vec())] + ); + }) + } + + #[test] + fn test_get_many_uses_cache_for_present_and_absent_values() { + let temp_dir = TempDir::new().unwrap(); + let config = crate::types::StorageConfig { + path: temp_dir.path().to_path_buf(), + ..Default::default() + }; + + let runtime_config = TokioConfig::default() + .with_storage_directory(temp_dir.path()) + .with_worker_threads(2); + + let runner = Runner::new(runtime_config); + + runner.start(|context| async move { + let storage = QmdbStorage::new(context, config).await.unwrap(); + storage + .apply_batch(vec![crate::types::Operation::Set { + key: b"key".to_vec(), + value: b"value".to_vec(), + }]) + .await + .unwrap(); + + let first = storage + .get_many(&[b"key".to_vec(), b"missing".to_vec()]) + .unwrap(); + let second = storage + .get_many(&[b"key".to_vec(), b"missing".to_vec()]) + .unwrap(); + + assert_eq!(first, second); + assert_eq!(first, vec![Some(b"value".to_vec()), None]); + }) + } }