Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion crates/app/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,8 @@ type RuntimeContext = TokioContext;
/// subsystem — all produced blocks must be persisted.
async fn build_block_archive(context: TokioContext) -> OnBlockArchive {
let config = BlockStorageConfig::default();
let retention = config.retention_blocks;
let prune_interval = config.blocks_per_section;
let store = BlockStorage::new(context, config)
.await
.expect("failed to initialize block archive storage");
Expand All @@ -236,10 +238,18 @@ async fn build_block_archive(context: TokioContext) -> OnBlockArchive {
if let Err(e) = store.put_sync(block_number, block_hash, block_bytes).await {
tracing::warn!("Failed to archive block {}: {:?}", block_number, e);
}

// Prune old blocks at section boundaries to bound disk usage.
if retention > 0 && block_number > retention && block_number % prune_interval == 0 {
let min_block = block_number.saturating_sub(retention);
if let Err(e) = store.prune(min_block).await {
tracing::warn!(min_block, "Failed to prune block archive: {:?}", e);
}
}
Comment thread
tac0turtle marked this conversation as resolved.
}
});

tracing::info!("Block archive storage enabled");
tracing::info!(retention, "Block archive storage enabled");

Arc::new(move |block_number, block_hash, block_bytes| {
let hash_bytes = ArchiveBlockHash::new(block_hash.0);
Expand Down
53 changes: 32 additions & 21 deletions crates/rpc/chain-index/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,12 +172,10 @@ impl PersistentChainIndex {
miner BLOB NOT NULL,
extra_data BLOB NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_blocks_hash ON blocks(hash);

CREATE TABLE IF NOT EXISTS transactions (
hash BLOB PRIMARY KEY,
block_number INTEGER NOT NULL,
block_hash BLOB NOT NULL,
transaction_index INTEGER NOT NULL,
from_addr BLOB NOT NULL,
to_addr BLOB,
Expand All @@ -200,7 +198,6 @@ impl PersistentChainIndex {
CREATE TABLE IF NOT EXISTS receipts (
transaction_hash BLOB PRIMARY KEY,
transaction_index INTEGER NOT NULL,
block_hash BLOB NOT NULL,
block_number INTEGER NOT NULL,
from_addr BLOB NOT NULL,
to_addr BLOB,
Expand All @@ -209,6 +206,7 @@ impl PersistentChainIndex {
contract_address BLOB,
status INTEGER NOT NULL,
tx_type INTEGER NOT NULL,
revert_reason TEXT,
Comment thread
tac0turtle marked this conversation as resolved.
FOREIGN KEY (block_number) REFERENCES blocks(number)
);
CREATE INDEX IF NOT EXISTS idx_receipts_block ON receipts(block_number);
Expand Down Expand Up @@ -280,6 +278,10 @@ impl PersistentChainIndex {
})
}

/// Parse a transaction row. Expected column order:
/// t.hash, t.block_number, b.hash (block_hash from JOIN), t.transaction_index,
/// t.from_addr, t.to_addr, t.value, t.gas, t.gas_price, t.input, t.nonce,
/// t.v, t.r, t.s, t.tx_type, t.chain_id, t.max_fee_per_gas, t.max_priority_fee_per_gas
fn row_to_stored_transaction(row: &rusqlite::Row<'_>) -> rusqlite::Result<StoredTransaction> {
use alloy_primitives::{Bytes, U256};

Expand Down Expand Up @@ -329,6 +331,10 @@ impl PersistentChainIndex {
})
}

/// Parse a receipt row. Expected column order:
/// r.transaction_hash, r.transaction_index, b.hash (block_hash from JOIN),
/// r.block_number, r.from_addr, r.to_addr, r.cumulative_gas_used, r.gas_used,
/// r.contract_address, r.status, r.tx_type, t.gas_price, r.revert_reason
fn row_to_stored_receipt(row: &rusqlite::Row<'_>) -> rusqlite::Result<StoredReceipt> {
let transaction_hash_bytes: Vec<u8> = row.get(0)?;
let transaction_index: i64 = row.get(1)?;
Expand All @@ -342,6 +348,7 @@ impl PersistentChainIndex {
let status: i64 = row.get(9)?;
let tx_type: i64 = row.get(10)?;
let effective_gas_price_bytes: Vec<u8> = row.get(11)?;
let revert_reason: Option<String> = row.get(12)?;

let to = to_bytes
.as_deref()
Expand All @@ -366,6 +373,7 @@ impl PersistentChainIndex {
logs: vec![], // logs are stored separately
status: status as u8,
tx_type: tx_type as u8,
revert_reason,
})
}
}
Expand Down Expand Up @@ -457,10 +465,12 @@ impl ChainIndex for PersistentChainIndex {

let conn = self.read_conn()?;
let result = conn.query_row(
"SELECT hash, block_number, block_hash, transaction_index, from_addr, to_addr,
value, gas, gas_price, input, nonce, v, r, s, tx_type, chain_id,
max_fee_per_gas, max_priority_fee_per_gas
FROM transactions WHERE hash = ?",
"SELECT t.hash, t.block_number, b.hash, t.transaction_index, t.from_addr, t.to_addr,
t.value, t.gas, t.gas_price, t.input, t.nonce, t.v, t.r, t.s, t.tx_type,
t.chain_id, t.max_fee_per_gas, t.max_priority_fee_per_gas
FROM transactions t
INNER JOIN blocks b ON b.number = t.block_number
WHERE t.hash = ?",
params![hash.as_slice()],
Self::row_to_stored_transaction,
);
Expand Down Expand Up @@ -500,14 +510,15 @@ impl ChainIndex for PersistentChainIndex {

let conn = self.read_conn()?;
let result = conn.query_row(
"SELECT receipts.transaction_hash, receipts.transaction_index, receipts.block_hash,
receipts.block_number, receipts.from_addr, receipts.to_addr,
receipts.cumulative_gas_used, receipts.gas_used,
receipts.contract_address, receipts.status, receipts.tx_type,
transactions.gas_price
FROM receipts
INNER JOIN transactions ON transactions.hash = receipts.transaction_hash
WHERE receipts.transaction_hash = ?",
"SELECT r.transaction_hash, r.transaction_index, b.hash,
r.block_number, r.from_addr, r.to_addr,
r.cumulative_gas_used, r.gas_used,
r.contract_address, r.status, r.tx_type,
t.gas_price, r.revert_reason
FROM receipts r
INNER JOIN transactions t ON t.hash = r.transaction_hash
INNER JOIN blocks b ON b.number = r.block_number
WHERE r.transaction_hash = ?",
params![hash.as_slice()],
Self::row_to_stored_receipt,
);
Expand Down Expand Up @@ -632,14 +643,13 @@ fn insert_transaction(
) -> ChainIndexResult<()> {
tx.execute(
"INSERT OR REPLACE INTO transactions
(hash, block_number, block_hash, transaction_index, from_addr, to_addr,
(hash, block_number, transaction_index, from_addr, to_addr,
value, gas, gas_price, input, nonce, v, r, s, tx_type, chain_id,
max_fee_per_gas, max_priority_fee_per_gas)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
params![
transaction.hash.as_slice(),
transaction.block_number as i64,
transaction.block_hash.as_slice(),
array_index,
transaction.from.as_slice(),
transaction.to.as_ref().map(|a| a.as_slice()),
Expand Down Expand Up @@ -671,13 +681,12 @@ fn insert_receipt(
) -> ChainIndexResult<()> {
tx.execute(
"INSERT OR REPLACE INTO receipts
(transaction_hash, transaction_index, block_hash, block_number, from_addr, to_addr,
cumulative_gas_used, gas_used, contract_address, status, tx_type)
(transaction_hash, transaction_index, block_number, from_addr, to_addr,
cumulative_gas_used, gas_used, contract_address, status, tx_type, revert_reason)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
params![
receipt.transaction_hash.as_slice(),
array_index,
receipt.block_hash.as_slice(),
receipt.block_number as i64,
receipt.from.as_slice(),
receipt.to.as_ref().map(|a| a.as_slice()),
Expand All @@ -686,6 +695,7 @@ fn insert_receipt(
receipt.contract_address.as_ref().map(|a| a.as_slice()),
receipt.status as i64,
receipt.tx_type as i64,
receipt.revert_reason.as_deref(),
],
)?;
Ok(())
Expand Down Expand Up @@ -864,6 +874,7 @@ mod tests {
logs: vec![],
status: if success { 1 } else { 0 },
tx_type: 0,
revert_reason: None,
}
}

Expand Down
5 changes: 5 additions & 0 deletions crates/rpc/chain-index/src/integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,10 @@ fn build_stored_receipt<Tx: Transaction>(
let to = resolve_recipient_address(tx);
let logs: Vec<StoredLog> = tx_result.events.iter().map(event_to_stored_log).collect();
let status = if tx_result.response.is_ok() { 1 } else { 0 };
let revert_reason = match &tx_result.response {
Err(err) => Some(format!("ErrorCode(id=0x{:04x}, arg={})", err.id, err.arg)),
Ok(_) => None,
};

StoredReceipt {
transaction_hash: tx_hash,
Expand All @@ -347,6 +351,7 @@ fn build_stored_receipt<Tx: Transaction>(
logs,
status,
tx_type: eth_fields.as_ref().map(|f| f.tx_type).unwrap_or(0),
revert_reason,
}
}

Expand Down
1 change: 1 addition & 0 deletions crates/rpc/chain-index/src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -878,6 +878,7 @@ mod tests {
logs: vec![],
status: 1,
tx_type: 0,
revert_reason: None,
}
}

Expand Down
4 changes: 4 additions & 0 deletions crates/rpc/chain-index/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,9 @@ pub struct StoredReceipt {
pub status: u8,
/// Transaction type.
pub tx_type: u8,
/// Revert reason for failed transactions.
#[serde(skip_serializing_if = "Option::is_none")]
pub revert_reason: Option<String>,
}

impl StoredReceipt {
Expand Down Expand Up @@ -204,6 +207,7 @@ impl StoredReceipt {
logs_bloom: Bytes::new(),
tx_type: U64::from(self.tx_type as u64),
status: U64::from(self.status as u64),
revert_reason: self.revert_reason.clone(),
}
}
}
Expand Down
5 changes: 5 additions & 0 deletions crates/rpc/types/src/receipt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ pub struct RpcReceipt {
pub tx_type: U64,
/// Status (1 = success, 0 = failure)
pub status: U64,
/// Revert reason for failed transactions.
#[serde(skip_serializing_if = "Option::is_none")]
pub revert_reason: Option<String>,
}

impl RpcReceipt {
Expand Down Expand Up @@ -78,6 +81,7 @@ impl RpcReceipt {
logs_bloom: Bytes::new(),
tx_type: U64::ZERO,
status: U64::from(Self::STATUS_SUCCESS),
revert_reason: None,
}
}

Expand Down Expand Up @@ -108,6 +112,7 @@ impl RpcReceipt {
logs_bloom: Bytes::new(),
tx_type: U64::ZERO,
status: U64::from(Self::STATUS_FAILURE),
revert_reason: None,
}
}

Expand Down
11 changes: 8 additions & 3 deletions crates/storage/src/block_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,14 @@ where
key_partition: format!("{}-block-index", config.partition_prefix),
key_page_cache,
value_partition: format!("{}-block-data", config.partition_prefix),
// No compression by default. Blocks are often already compressed (gzip/zstd
// at the application layer), so double-compression wastes CPU.
compression: None,
// Zstd compression for block data. Borsh-encoded ArchivedBlocks contain
// repetitive structure (hashes, encoded txs) and compress well.
// Level 3 gives ~2x ratio with minimal CPU. 0 = disabled.
compression: if config.compression_level > 0 {
Some(config.compression_level)
} else {
None
},
// `bytes::Bytes` uses `RangeCfg<usize>` as its codec config.
// An unbounded range accepts blocks of any size.
codec_config: RangeCfg::from(..),
Expand Down
19 changes: 19 additions & 0 deletions crates/storage/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,23 @@ pub struct BlockStorageConfig {
///
/// Default: 4096 bytes
pub replay_buffer: usize,

/// Zstd compression level for block data (1-21).
///
/// Borsh-encoded blocks compress well due to repetitive hash structures.
/// Level 3 gives ~2x compression with minimal CPU overhead.
/// Set to 0 to disable compression.
/// Default: 3
pub compression_level: u8,

/// Number of recent blocks to retain before pruning.
///
/// Pruning happens at section granularity (`blocks_per_section`), so the
/// actual retention may be slightly higher. Set to 0 to disable pruning
/// (keep all blocks).
///
/// Default: 100_000 (~1 day at 1 block/sec)
pub retention_blocks: u64,
}

impl Default for BlockStorageConfig {
Expand All @@ -51,6 +68,8 @@ impl Default for BlockStorageConfig {
key_write_buffer: 1024 * 1024, // 1MB
value_write_buffer: 4 * 1024 * 1024, // 4MB
replay_buffer: 4096,
compression_level: 3,
retention_blocks: 100_000,
}
}
}
Expand Down
Loading