Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,11 @@ evolve_testapp = { path = "bin/testapp" }

# outside deps
linkme = { version = "0.3", default-features = false }
commonware-cryptography = "0.0.65"
commonware-runtime = "0.0.65"
commonware-storage = "0.0.65"
commonware-utils = "0.0.65"
commonware-codec = "0.0.65"
commonware-cryptography = "2026.3.0"
commonware-runtime = "2026.3.0"
commonware-storage = "2026.3.0"
commonware-utils = "2026.3.0"
commonware-codec = "2026.3.0"
borsh = { features = ["derive"], version = "1.5.5" }
clap = { version = "4.5.31", features = ["derive"] }
fixed = { version = "1.29", features = ["borsh", "serde"] }
Expand Down
16 changes: 12 additions & 4 deletions crates/rpc/evnode/src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,20 +95,28 @@ impl ExternalConsensusCommitSink {
.expect("failed to build commit sink runtime");

while let Ok(info) = receiver.recv() {
let state_root = info.state_root;
let operations = state_changes_to_operations(info.state_changes);
runtime.block_on(async {
let commit_hash = runtime.block_on(async {
storage
.batch(operations)
.await
.expect("storage batch failed");
storage.commit().await.expect("storage commit failed")
});
let committed_state_root = B256::from_slice(commit_hash.as_bytes());
if committed_state_root != info.state_root {
tracing::warn!(
height = info.height,
preview_state_root = %info.state_root,
committed_state_root = %committed_state_root,
"execution state root differed from committed storage root"
);
}
let block_hash = compute_block_hash(info.height, info.timestamp, parent_hash);
let metadata = BlockMetadata::new(
block_hash,
parent_hash,
state_root,
committed_state_root,
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Keep indexed state_root aligned with executor responses

This now stores committed_state_root in block metadata, while execute_txs still returns and validates against the projected root (compute_state_root(...) in service.rs), so clients that take the previous block root from indexed/RPC block data and send it as prev_state_root can be rejected with a mismatch. The change creates two competing root definitions for the same block across APIs, which is likely to break external-consensus integrations that round-trip state roots through block queries.

Useful? React with 👍 / 👎.

info.timestamp,
config.max_gas,
Address::ZERO,
Expand All @@ -134,7 +142,7 @@ impl ExternalConsensusCommitSink {
"Indexed block {} (hash={}, state_root={})",
info.height,
block_hash,
state_root
committed_state_root
);
}
}
Expand Down
142 changes: 129 additions & 13 deletions crates/rpc/evnode/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,20 @@ use crate::proto::evnode::v1::{
SetFinalResponse,
};

/// Compute a state root from changes.
/// Compute a projected state root from a base root plus pending changes.
///
/// This is a simple hash of all changes for MVP.
/// In production, this would be a proper Merkle root.
pub fn compute_state_root(changes: &[StateChange]) -> B256 {
/// This is not the storage engine's Merkle root. It is a deterministic hash used
/// to track pending execution state between `execute_txs` calls until the storage
/// backend commits the changes.
pub fn compute_state_root(base_state_root: B256, changes: &[StateChange]) -> B256 {
use alloy_primitives::keccak256;

if changes.is_empty() {
return B256::ZERO;
return base_state_root;
}

let mut data = Vec::new();
let mut data = Vec::with_capacity(B256::len_bytes());
data.extend_from_slice(base_state_root.as_slice());
for change in changes {
match change {
StateChange::Set { key, value } => {
Expand Down Expand Up @@ -169,7 +171,7 @@ pub struct BlockExecutedInfo {
pub state_changes: Vec<StateChange>,
/// Full block execution result.
pub block_result: BlockResult,
/// State root computed from changes (keccak-based).
/// Projected state root computed from prior state plus pending changes.
pub state_root: B256,
/// Transactions that were included in the block.
pub transactions: Vec<TxContext>,
Expand Down Expand Up @@ -313,6 +315,32 @@ where
// Store pending changes
self.state.pending_changes.write().await.extend(changes);
}

async fn resolve_prev_state_root(&self, prev_state_root: &[u8]) -> Result<B256, EvnodeError> {
let last_state_root = *self.state.last_state_root.read().await;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Serialize prev-root validation in execute_txs

The new prev_state_root guard reads last_state_root early, but the corresponding write of the new root happens much later, so two concurrent execute_txs calls can both validate against the same old root and both be accepted. In a real gRPC server this race allows parallel proposals to execute from one base root, and the later write wins, breaking the linear state-root chain the new validation is meant to enforce. This should be guarded with a per-request critical section or an atomic compare-and-swap style update around validation+write.

Useful? React with 👍 / 👎.


if prev_state_root.is_empty() {
return Ok(last_state_root);
}

if prev_state_root.len() != B256::len_bytes() {
return Err(EvnodeError::InvalidArgument(format!(
"prev_state_root must be {} bytes, got {}",
B256::len_bytes(),
prev_state_root.len()
)));
}

let provided_prev_state_root = B256::from_slice(prev_state_root);
if provided_prev_state_root != last_state_root {
return Err(EvnodeError::InvalidArgument(format!(
"prev_state_root mismatch: expected {}, got {}",
last_state_root, provided_prev_state_root
)));
}

Ok(provided_prev_state_root)
}
}

#[async_trait]
Expand Down Expand Up @@ -349,7 +377,7 @@ where
.map_err(|e| Status::internal(format!("genesis failed: {}", e)))?;

// Compute state root
let state_root = compute_state_root(&changes);
let state_root = compute_state_root(B256::ZERO, &changes);

// Handle state changes (store pending, call callback)
self.handle_state_changes(changes).await;
Expand Down Expand Up @@ -458,8 +486,13 @@ where
.into_changes()
.map_err(|e| Status::internal(format!("failed to get state changes: {:?}", e)))?;

// Compute updated state root
let updated_state_root = compute_state_root(&changes);
let prev_state_root = self
.resolve_prev_state_root(&req.prev_state_root)
.await
.map_err(Status::from)?;

// Compute updated state root over the previous pending root.
let updated_state_root = compute_state_root(prev_state_root, &changes);

// Log execution results (before moving ownership)
let executed_tx_count = result.tx_results.len();
Expand Down Expand Up @@ -864,7 +897,7 @@ mod tests {
key: b"k".to_vec(),
value: b"v".to_vec(),
}];
let expected_root = compute_state_root(&changes);
let expected_root = compute_state_root(B256::ZERO, &changes);
let service = mk_service(true);
let req = InitChainRequest {
genesis_time: Some(Timestamp {
Expand Down Expand Up @@ -1015,7 +1048,7 @@ mod tests {
key: b"gone".to_vec(),
},
];
let expected_root = compute_state_root(&expected_changes);
let expected_root = compute_state_root(B256::ZERO, &expected_changes);
let service = mk_service_with_execute_changes(expected_changes);
init_test_chain(&service).await;

Expand Down Expand Up @@ -1065,6 +1098,44 @@ mod tests {
assert_eq!(observed.tx_hashes, vec![valid_tx_hash]);
}

#[tokio::test]
async fn execute_txs_rejects_invalid_prev_state_root_length() {
let service = mk_service_with_execute_changes(Vec::new());
init_test_chain(&service).await;

let err = service
.execute_txs(Request::new(ExecuteTxsRequest {
block_height: 2,
timestamp: None,
prev_state_root: vec![0xaa; 31],
txs: vec![sample_legacy_tx_bytes()],
}))
.await
.expect_err("execute_txs should reject malformed prev_state_root");

assert_eq!(err.code(), Code::InvalidArgument);
assert!(err.message().contains("prev_state_root must be 32 bytes"));
}

#[tokio::test]
async fn execute_txs_rejects_mismatched_prev_state_root() {
let service = mk_service_with_execute_changes(Vec::new());
init_test_chain(&service).await;

let err = service
.execute_txs(Request::new(ExecuteTxsRequest {
block_height: 2,
timestamp: None,
prev_state_root: vec![0x11; 32],
txs: vec![sample_legacy_tx_bytes()],
}))
.await
.expect_err("execute_txs should reject mismatched prev_state_root");

assert_eq!(err.code(), Code::InvalidArgument);
assert!(err.message().contains("prev_state_root mismatch"));
}

#[tokio::test]
async fn execute_txs_prefers_block_callback_over_state_change_callback() {
let execute_changes = vec![StateChange::Set {
Expand Down Expand Up @@ -1119,7 +1190,7 @@ mod tests {
key: b"payload".to_vec(),
value: b"ok".to_vec(),
}];
let expected_root = compute_state_root(&execute_changes);
let expected_root = compute_state_root(B256::ZERO, &execute_changes);
let callback_snapshot = Arc::new(Mutex::new(None::<(u64, u64, usize, usize, B256)>));
let service = mk_service_with_execute_changes(execute_changes).with_on_block_executed({
let snapshot = Arc::clone(&callback_snapshot);
Expand Down Expand Up @@ -1291,4 +1362,49 @@ mod tests {
"only the unexecuted tail should be re-proposed"
);
}

#[tokio::test]
async fn execute_txs_chains_projected_state_root_from_previous_root() {
let execute_changes = vec![StateChange::Set {
key: b"next".to_vec(),
value: b"value".to_vec(),
}];
let genesis_root = compute_state_root(
B256::ZERO,
&[StateChange::Set {
key: b"k".to_vec(),
value: b"v".to_vec(),
}],
);
let expected_root = compute_state_root(genesis_root, &execute_changes);

let service = ExecutorServiceImpl::new(
MockStf::new(true, execute_changes),
MockStorage,
MockCodes,
ExecutorServiceConfig::default(),
);
service
.init_chain(Request::new(InitChainRequest {
genesis_time: None,
initial_height: 1,
chain_id: "chain-root".to_string(),
}))
.await
.expect("init should succeed");

let response = service
.execute_txs(Request::new(ExecuteTxsRequest {
block_height: 2,
timestamp: None,
prev_state_root: genesis_root.to_vec(),
txs: vec![sample_legacy_tx_bytes()],
}))
.await
.expect("execute_txs should succeed")
.into_inner();

assert_eq!(response.updated_state_root, expected_root.to_vec());
assert_eq!(*service.state.last_state_root.read().await, expected_root);
}
}
2 changes: 1 addition & 1 deletion crates/rpc/evnode/src/testapp_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ where
})?;

// Compute a simple state root from changes
let state_root = compute_state_root(&changes);
let state_root = compute_state_root(B256::ZERO, &changes);

tracing::info!(
"Genesis completed: scheduler={:?}, alice={:?}, bob={:?}",
Expand Down
12 changes: 7 additions & 5 deletions crates/storage/src/block_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@

use crate::types::{BlockHash, BlockStorageConfig};
use commonware_codec::RangeCfg;
use commonware_runtime::{buffer::PoolRef, Clock, Metrics, Storage as RStorage};
use commonware_runtime::{
buffer::paged::CacheRef, BufferPooler, Clock, Metrics, Storage as RStorage,
};
use commonware_storage::{
archive::{
prunable::{Archive, Config as ArchiveConfig},
Expand Down Expand Up @@ -74,14 +76,14 @@ const KEY_JOURNAL_CACHE_PAGES: usize = 64;
/// should be wrapped in a lock (`tokio::sync::Mutex`) when shared across tasks.
pub struct BlockStorage<C>
where
C: RStorage + Clock + Metrics + Clone + Send + Sync + 'static,
C: RStorage + BufferPooler + Clock + Metrics + Clone + Send + Sync + 'static,
{
archive: Archive<EightCap, C, BlockHash, bytes::Bytes>,
}

impl<C> BlockStorage<C>
where
C: RStorage + Clock + Metrics + Clone + Send + Sync + 'static,
C: RStorage + BufferPooler + Clock + Metrics + Clone + Send + Sync + 'static,
{
/// Initialize block storage using the provided runtime context and config.
///
Expand All @@ -108,12 +110,12 @@ where
// Buffer pool for the key journal.
let page_size = std::num::NonZeroU16::new(KEY_JOURNAL_PAGE_SIZE).unwrap();
let cache_pages = std::num::NonZeroUsize::new(KEY_JOURNAL_CACHE_PAGES).unwrap();
let key_buffer_pool = PoolRef::new(page_size, cache_pages);
let key_page_cache = CacheRef::from_pooler(&context, page_size, cache_pages);

let cfg = ArchiveConfig {
translator: EightCap,
key_partition: format!("{}-block-index", config.partition_prefix),
key_buffer_pool,
key_page_cache,
value_partition: format!("{}-block-data", config.partition_prefix),
// No compression by default. Blocks are often already compressed (gzip/zstd
// at the application layer), so double-compression wastes CPU.
Expand Down
Loading
Loading