Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 83 additions & 30 deletions crates/storage/src/qmdb_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,15 @@

use crate::cache::{CachedValue, ShardedDbCache};
use crate::metrics::OptionalMetrics;
use crate::types::{
create_storage_key, create_storage_value_chunk, extract_value_from_chunk, StorageKey,
StorageValueChunk, MAX_VALUE_DATA_SIZE,
};
use crate::types::{create_storage_key, StorageKey, MAX_VALUE_DATA_SIZE};
use async_trait::async_trait;
use commonware_codec::RangeCfg;
use commonware_cryptography::sha256::Sha256;
use commonware_runtime::{
buffer::paged::CacheRef, BufferPooler, Clock, Metrics, Storage as RStorage,
};
use commonware_storage::qmdb::current::{unordered::variable::Db, VariableConfig};
use commonware_storage::translator::EightCap;
use commonware_storage::{
qmdb::current::{unordered::fixed::Db, FixedConfig},
Persistable,
};
use evolve_core::{ErrorCode, ReadonlyKV};
use std::num::{NonZeroU16, NonZeroU64, NonZeroUsize};
use std::sync::Arc;
Expand All @@ -37,11 +32,11 @@ use tokio::sync::RwLock;

/// Type alias for QMDB in Current state.
/// `N = 64` because SHA256 digests are 32 bytes and QMDB expects `2 * digest_size`.
type QmdbCurrent<C> = Db<C, StorageKey, StorageValueChunk, Sha256, EightCap, 64>;
type QmdbCurrent<C> = Db<C, StorageKey, Vec<u8>, Sha256, EightCap, 64>;

#[derive(Debug)]
struct PreparedBatch {
updates: Vec<(StorageKey, Option<StorageValueChunk>)>,
updates: Vec<(StorageKey, Option<Vec<u8>>)>,
keys_to_invalidate: Vec<Vec<u8>>,
ops_count: usize,
sets: usize,
Expand All @@ -57,7 +52,7 @@ pub enum StorageError {
#[error("Key error")]
Key(ErrorCode),

#[error("Value too large for single chunk: {size} bytes (max: {max})")]
#[error("Value too large: {size} bytes (max: {max})")]
ValueTooLarge { size: usize, max: usize },

#[error("IO error: {0}")]
Expand Down Expand Up @@ -172,10 +167,12 @@ where
StorageError::InvalidConfig("write_buffer_size must be non-zero".to_string())
})?;

let qmdb_config = FixedConfig {
log_journal_partition: format!("{}_log-journal", config.partition_prefix),
let qmdb_config = VariableConfig {
log_partition: format!("{}_log-journal", config.partition_prefix),
log_items_per_blob: NonZeroU64::new(1000).unwrap(),
log_write_buffer: write_buffer_size,
log_compression: None,
log_codec_config: ((), (RangeCfg::from(0..=MAX_VALUE_DATA_SIZE), ())),
mmr_journal_partition: format!("{}_mmr-journal", config.partition_prefix),
mmr_items_per_blob: NonZeroU64::new(1000).unwrap(),
mmr_write_buffer: write_buffer_size,
Expand Down Expand Up @@ -313,9 +310,8 @@ where

sets += 1;
let storage_key = create_storage_key(&key)?;
let storage_value = create_storage_value_chunk(&value)?;
keys_to_invalidate.push(key);
updates.push((storage_key, Some(storage_value)));
updates.push((storage_key, Some(value)));
}
crate::types::Operation::Remove { key } => {
deletes += 1;
Expand All @@ -338,8 +334,10 @@ where
/// Commit the current state and generate a commit hash.
pub async fn commit_state(&self) -> Result<crate::types::CommitHash, StorageError> {
let start = Instant::now();
let db = self.db.read().await;
db.commit().await.map_err(map_qmdb_error)?;
let mut db = self.db.write().await;
let prune_loc = db.inactivity_floor_loc();
db.prune(prune_loc).await.map_err(map_qmdb_error)?;
db.sync().await.map_err(map_qmdb_error)?;
let hash = root_to_commit_hash(db.root())?;
self.metrics.record_commit(start.elapsed().as_secs_f64());
Ok(hash)
Expand Down Expand Up @@ -429,17 +427,10 @@ where
}

fn decode_storage_value(
result: Result<Option<StorageValueChunk>, impl std::fmt::Display>,
result: Result<Option<Vec<u8>>, impl std::fmt::Display>,
) -> Result<Option<Vec<u8>>, ErrorCode> {
match result {
Ok(Some(value_chunk)) => match extract_value_from_chunk(&value_chunk) {
Some(data) if data.is_empty() => Ok(None),
Some(data) => Ok(Some(data)),
None => {
tracing::warn!("Invalid value chunk format, treating as absent");
Ok(None)
}
},
Ok(Some(value)) => Ok(Some(value)),
Ok(None) => Ok(None),
Err(e) => {
let err_str = e.to_string();
Expand Down Expand Up @@ -1043,8 +1034,6 @@ mod tests {
runner.start(|context| async move {
let storage = QmdbStorage::new(context, config).await.unwrap();

// Empty values are treated as removed (since we use all-zeros to signal removal)
// This is a known limitation
storage
.apply_batch(vec![crate::types::Operation::Set {
key: b"empty".to_vec(),
Expand All @@ -1053,9 +1042,8 @@ mod tests {
.await
.unwrap();

// Empty value should be treated as None (due to removal semantics)
let result = storage.get(b"empty").unwrap();
assert_eq!(result, None);
assert_eq!(result, Some(Vec::new()));
})
}

Expand Down Expand Up @@ -1325,6 +1313,71 @@ mod tests {
})
}

#[test]
fn test_commit_prunes_inactive_history() {
let temp_dir = TempDir::new().unwrap();
let config = crate::types::StorageConfig {
path: temp_dir.path().to_path_buf(),
..Default::default()
};

let runtime_config = TokioConfig::default()
.with_storage_directory(temp_dir.path())
.with_worker_threads(2);

let runner = Runner::new(runtime_config);

runner.start(|context| async move {
use commonware_storage::qmdb::store::LogStore;

let storage = QmdbStorage::new(context, config).await.unwrap();
const KEYS: usize = 1_100;

let initial_ops = (0..KEYS)
.map(|i| crate::types::Operation::Set {
key: format!("key-{i}").into_bytes(),
value: format!("value-{i}-v1").into_bytes(),
})
.collect();
storage.apply_batch(initial_ops).await.unwrap();
storage.commit_state().await.unwrap();

let start_before = {
let db = storage.db.read().await;
*db.bounds().await.start
};

let second_ops = (0..KEYS)
.map(|i| {
if i % 5 == 0 {
crate::types::Operation::Remove {
key: format!("key-{i}").into_bytes(),
}
} else {
crate::types::Operation::Set {
key: format!("key-{i}").into_bytes(),
value: format!("value-{i}-v2").into_bytes(),
}
}
})
.collect();
storage.apply_batch(second_ops).await.unwrap();
storage.commit_state().await.unwrap();

let start_after = {
let db = storage.db.read().await;
*db.bounds().await.start
};

assert!(
start_after > start_before,
"prune boundary did not advance: before={start_before}, after={start_after}"
);
assert_eq!(storage.get(b"key-1").unwrap(), Some(b"value-1-v2".to_vec()));
assert_eq!(storage.get(b"key-0").unwrap(), None);
})
}

#[test]
fn test_preview_batch_root_matches_eventual_commit_hash() {
let temp_dir = TempDir::new().unwrap();
Expand Down
54 changes: 7 additions & 47 deletions crates/storage/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,12 +143,9 @@ pub const MAX_KEY_SIZE: usize = evolve_core::MAX_STORAGE_KEY_SIZE;
const _: () = assert!(MAX_KEY_SIZE + KEY_LENGTH_PREFIX_SIZE == STORAGE_KEY_SIZE);
pub const MAX_BATCH_SIZE: usize = 10_000; // 10k operations

// For commonware integration, we use fixed-size types
// Keys are 256 bytes (padded with zeros if needed)
// For commonware integration, keys use a fixed-size envelope while values stay variable-sized.
// Keys are 256 bytes (padded with zeros if needed).
pub type StorageKey = FixedBytes<STORAGE_KEY_SIZE>;
// Values are stored in 4KB chunks
pub const STORAGE_VALUE_SIZE: usize = 4096;
pub type StorageValueChunk = FixedBytes<STORAGE_VALUE_SIZE>;

/// Helper functions for creating storage keys
pub fn create_storage_key(key: &[u8]) -> Result<StorageKey, ErrorCode> {
Expand All @@ -164,47 +161,10 @@ pub fn create_storage_key(key: &[u8]) -> Result<StorageKey, ErrorCode> {
Ok(StorageKey::new(data))
}

/// Length prefix size for value storage (4 bytes for u32 length)
pub const VALUE_LENGTH_PREFIX_SIZE: usize = 4;

/// Maximum actual value size (chunk size minus length prefix)
pub const MAX_VALUE_DATA_SIZE: usize = STORAGE_VALUE_SIZE - VALUE_LENGTH_PREFIX_SIZE;
/// Maximum value size accepted by the storage layer.
pub const MAX_VALUE_SIZE: usize = MAX_VALUE_DATA_SIZE;

/// Helper function for creating storage value chunks
///
/// Stores value with a 4-byte length prefix to preserve exact data semantics.
/// Format: [len_u32_le][data][padding]
pub fn create_storage_value_chunk(value: &[u8]) -> Result<StorageValueChunk, ErrorCode> {
if value.len() > MAX_VALUE_DATA_SIZE {
return Err(ERR_VALUE_TOO_LARGE);
}

let mut data = [0u8; STORAGE_VALUE_SIZE];
// Store length as 4-byte little-endian prefix
let len_bytes = (value.len() as u32).to_le_bytes();
data[..VALUE_LENGTH_PREFIX_SIZE].copy_from_slice(&len_bytes);
// Store actual value after length prefix
data[VALUE_LENGTH_PREFIX_SIZE..VALUE_LENGTH_PREFIX_SIZE + value.len()].copy_from_slice(value);

Ok(StorageValueChunk::new(data))
}

/// Extract value from storage chunk by reading length prefix
///
/// Returns the exact bytes that were stored, preserving trailing zeros.
pub fn extract_value_from_chunk(chunk: &StorageValueChunk) -> Option<Vec<u8>> {
let data = chunk.as_ref();
// Read length from 4-byte little-endian prefix
let len_bytes: [u8; 4] = data[..VALUE_LENGTH_PREFIX_SIZE].try_into().ok()?;
let len = u32::from_le_bytes(len_bytes) as usize;

// Validate length
if len > MAX_VALUE_DATA_SIZE {
return None;
}

// Extract exactly 'len' bytes of actual data
Some(data[VALUE_LENGTH_PREFIX_SIZE..VALUE_LENGTH_PREFIX_SIZE + len].to_vec())
}
/// We keep the existing 4092-byte limit to preserve bounded memory usage even though the
/// underlying QMDB encoding is now variable-sized.
pub const MAX_VALUE_SIZE: usize = 4092;
/// Backwards-compatible alias used by existing tests and callers.
pub const MAX_VALUE_DATA_SIZE: usize = MAX_VALUE_SIZE;
Loading