Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
180 changes: 144 additions & 36 deletions crates/storage/src/qmdb_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,35 +13,36 @@

use crate::cache::{CachedValue, ShardedDbCache};
use crate::metrics::OptionalMetrics;
use crate::types::{
create_storage_key, create_storage_value_chunk, extract_value_from_chunk, StorageKey,
StorageValueChunk, MAX_VALUE_DATA_SIZE,
};
use crate::types::{create_storage_key, StorageKey, MAX_VALUE_DATA_SIZE};
use async_trait::async_trait;
use commonware_codec::RangeCfg;
use commonware_cryptography::sha256::Sha256;
use commonware_runtime::{
buffer::paged::CacheRef, BufferPooler, Clock, Metrics, Storage as RStorage,
};
use commonware_storage::qmdb::current::{unordered::variable::Db, VariableConfig};
use commonware_storage::translator::EightCap;
use commonware_storage::{
qmdb::current::{unordered::fixed::Db, FixedConfig},
Persistable,
};
use evolve_core::{ErrorCode, ReadonlyKV};
use std::num::{NonZeroU16, NonZeroU64, NonZeroUsize};
use std::sync::Arc;
use std::time::Instant;
use std::time::{Duration, Instant};
use thiserror::Error;
use tokio::runtime::RuntimeFlavor;
use tokio::sync::RwLock;
use tokio::sync::{
mpsc::{unbounded_channel, UnboundedSender},
RwLock,
};

/// Type alias for QMDB in Current state.
/// `N = 64` because SHA256 digests are 32 bytes and QMDB expects `2 * digest_size`.
type QmdbCurrent<C> = Db<C, StorageKey, StorageValueChunk, Sha256, EightCap, 64>;
type QmdbCurrent<C> = Db<C, StorageKey, Vec<u8>, Sha256, EightCap, 64>;

const PRUNE_SCHEDULE_DELAY: Duration = Duration::from_millis(50);
const PRUNE_RETRY_DELAY: Duration = Duration::from_millis(25);

#[derive(Debug)]
struct PreparedBatch {
updates: Vec<(StorageKey, Option<StorageValueChunk>)>,
updates: Vec<(StorageKey, Option<Vec<u8>>)>,
keys_to_invalidate: Vec<Vec<u8>>,
ops_count: usize,
sets: usize,
Expand All @@ -57,7 +58,7 @@ pub enum StorageError {
#[error("Key error")]
Key(ErrorCode),

#[error("Value too large for single chunk: {size} bytes (max: {max})")]
#[error("Value too large: {size} bytes (max: {max})")]
ValueTooLarge { size: usize, max: usize },

#[error("IO error: {0}")]
Expand Down Expand Up @@ -114,6 +115,7 @@ where
C: RStorage + BufferPooler + Clock + Metrics + Clone + Send + Sync + 'static,
{
db: Arc<RwLock<QmdbCurrent<C>>>,
prune_tx: UnboundedSender<()>,
/// Read cache for fast synchronous lookups
cache: Arc<ShardedDbCache>,
/// Optional metrics for monitoring storage performance
Expand All @@ -127,6 +129,7 @@ where
fn clone(&self) -> Self {
Self {
db: self.db.clone(),
prune_tx: self.prune_tx.clone(),
cache: self.cache.clone(),
metrics: self.metrics.clone(),
}
Expand All @@ -137,6 +140,42 @@ impl<C> QmdbStorage<C>
where
C: RStorage + BufferPooler + Clock + Metrics + Clone + Send + Sync + 'static,
{
fn spawn_prune_worker(db: Arc<RwLock<QmdbCurrent<C>>>) -> UnboundedSender<()> {
let (prune_tx, mut prune_rx) = unbounded_channel::<()>();

tokio::spawn(async move {
while prune_rx.recv().await.is_some() {
tokio::time::sleep(PRUNE_SCHEDULE_DELAY).await;
while prune_rx.try_recv().is_ok() {}

loop {
let mut db = match db.try_write() {
Ok(db) => db,
Err(_) => {
while prune_rx.try_recv().is_ok() {}
tokio::time::sleep(PRUNE_RETRY_DELAY).await;
continue;
}
};

let prune_loc = db.inactivity_floor_loc();
if let Err(err) = db.prune(prune_loc).await {
tracing::error!("background prune failed: {err}");
break;
}

if let Err(err) = db.sync().await {
tracing::error!("background prune sync failed: {err}");
}

break;
}
}
});

prune_tx
}

/// Create a new QmdbStorage instance
pub async fn new(
context: C,
Expand Down Expand Up @@ -172,10 +211,12 @@ where
StorageError::InvalidConfig("write_buffer_size must be non-zero".to_string())
})?;

let qmdb_config = FixedConfig {
log_journal_partition: format!("{}_log-journal", config.partition_prefix),
let qmdb_config = VariableConfig {
log_partition: format!("{}_log-journal", config.partition_prefix),
log_items_per_blob: NonZeroU64::new(1000).unwrap(),
log_write_buffer: write_buffer_size,
log_compression: None,
log_codec_config: ((), (RangeCfg::from(0..=MAX_VALUE_DATA_SIZE), ())),
mmr_journal_partition: format!("{}_mmr-journal", config.partition_prefix),
mmr_items_per_blob: NonZeroU64::new(1000).unwrap(),
mmr_write_buffer: write_buffer_size,
Expand All @@ -189,12 +230,16 @@ where
page_cache: CacheRef::from_pooler(&context, page_size, capacity),
};

let db = Db::init(context, qmdb_config)
.await
.map_err(map_qmdb_error)?;
let db = Arc::new(RwLock::new(
Db::init(context, qmdb_config)
.await
.map_err(map_qmdb_error)?,
));
let prune_tx = Self::spawn_prune_worker(db.clone());

Ok(Self {
db: Arc::new(RwLock::new(db)),
db,
prune_tx,
cache: Arc::new(ShardedDbCache::with_defaults()),
metrics,
})
Expand Down Expand Up @@ -313,9 +358,8 @@ where

sets += 1;
let storage_key = create_storage_key(&key)?;
let storage_value = create_storage_value_chunk(&value)?;
keys_to_invalidate.push(key);
updates.push((storage_key, Some(storage_value)));
updates.push((storage_key, Some(value)));
}
crate::types::Operation::Remove { key } => {
deletes += 1;
Expand All @@ -338,9 +382,11 @@ where
/// Commit the current state and generate a commit hash.
pub async fn commit_state(&self) -> Result<crate::types::CommitHash, StorageError> {
let start = Instant::now();
let db = self.db.read().await;
db.commit().await.map_err(map_qmdb_error)?;
let db = self.db.write().await;
db.sync().await.map_err(map_qmdb_error)?;
let hash = root_to_commit_hash(db.root())?;
drop(db);
let _ = self.prune_tx.send(());
self.metrics.record_commit(start.elapsed().as_secs_f64());
Ok(hash)
}
Expand Down Expand Up @@ -429,17 +475,10 @@ where
}

fn decode_storage_value(
result: Result<Option<StorageValueChunk>, impl std::fmt::Display>,
result: Result<Option<Vec<u8>>, impl std::fmt::Display>,
) -> Result<Option<Vec<u8>>, ErrorCode> {
match result {
Ok(Some(value_chunk)) => match extract_value_from_chunk(&value_chunk) {
Some(data) if data.is_empty() => Ok(None),
Some(data) => Ok(Some(data)),
None => {
tracing::warn!("Invalid value chunk format, treating as absent");
Ok(None)
}
},
Ok(Some(value)) => Ok(Some(value)),
Ok(None) => Ok(None),
Err(e) => {
let err_str = e.to_string();
Expand Down Expand Up @@ -1043,8 +1082,6 @@ mod tests {
runner.start(|context| async move {
let storage = QmdbStorage::new(context, config).await.unwrap();

// Empty values are treated as removed (since we use all-zeros to signal removal)
// This is a known limitation
storage
.apply_batch(vec![crate::types::Operation::Set {
key: b"empty".to_vec(),
Expand All @@ -1053,9 +1090,8 @@ mod tests {
.await
.unwrap();

// Empty value should be treated as None (due to removal semantics)
let result = storage.get(b"empty").unwrap();
assert_eq!(result, None);
assert_eq!(result, Some(Vec::new()));
})
}

Expand Down Expand Up @@ -1325,6 +1361,78 @@ mod tests {
})
}

#[test]
fn test_commit_prunes_inactive_history() {
let temp_dir = TempDir::new().unwrap();
let config = crate::types::StorageConfig {
path: temp_dir.path().to_path_buf(),
..Default::default()
};

let runtime_config = TokioConfig::default()
.with_storage_directory(temp_dir.path())
.with_worker_threads(2);

let runner = Runner::new(runtime_config);

runner.start(|context| async move {
use commonware_storage::qmdb::store::LogStore;

let storage = QmdbStorage::new(context, config).await.unwrap();
const KEYS: usize = 1_100;

let initial_ops = (0..KEYS)
.map(|i| crate::types::Operation::Set {
key: format!("key-{i}").into_bytes(),
value: format!("value-{i}-v1").into_bytes(),
})
.collect();
storage.apply_batch(initial_ops).await.unwrap();
storage.commit_state().await.unwrap();

let start_before = {
let db = storage.db.read().await;
*db.bounds().await.start
};

let second_ops = (0..KEYS)
.map(|i| {
if i % 5 == 0 {
crate::types::Operation::Remove {
key: format!("key-{i}").into_bytes(),
}
} else {
crate::types::Operation::Set {
key: format!("key-{i}").into_bytes(),
value: format!("value-{i}-v2").into_bytes(),
}
}
})
.collect();
storage.apply_batch(second_ops).await.unwrap();
storage.commit_state().await.unwrap();

let mut start_after = start_before;
for _ in 0..50 {
start_after = {
let db = storage.db.read().await;
*db.bounds().await.start
};
if start_after > start_before {
break;
}
tokio::time::sleep(Duration::from_millis(20)).await;
}

assert!(
start_after > start_before,
"prune boundary did not advance: before={start_before}, after={start_after}"
);
assert_eq!(storage.get(b"key-1").unwrap(), Some(b"value-1-v2".to_vec()));
assert_eq!(storage.get(b"key-0").unwrap(), None);
})
}

#[test]
fn test_preview_batch_root_matches_eventual_commit_hash() {
let temp_dir = TempDir::new().unwrap();
Expand Down
54 changes: 7 additions & 47 deletions crates/storage/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,12 +143,9 @@ pub const MAX_KEY_SIZE: usize = evolve_core::MAX_STORAGE_KEY_SIZE;
const _: () = assert!(MAX_KEY_SIZE + KEY_LENGTH_PREFIX_SIZE == STORAGE_KEY_SIZE);
pub const MAX_BATCH_SIZE: usize = 10_000; // 10k operations

// For commonware integration, we use fixed-size types
// Keys are 256 bytes (padded with zeros if needed)
// For commonware integration, keys use a fixed-size envelope while values stay variable-sized.
// Keys are 256 bytes (padded with zeros if needed).
pub type StorageKey = FixedBytes<STORAGE_KEY_SIZE>;
// Values are stored in 4KB chunks
pub const STORAGE_VALUE_SIZE: usize = 4096;
pub type StorageValueChunk = FixedBytes<STORAGE_VALUE_SIZE>;

/// Helper functions for creating storage keys
pub fn create_storage_key(key: &[u8]) -> Result<StorageKey, ErrorCode> {
Expand All @@ -164,47 +161,10 @@ pub fn create_storage_key(key: &[u8]) -> Result<StorageKey, ErrorCode> {
Ok(StorageKey::new(data))
}

/// Length prefix size for value storage (4 bytes for u32 length)
pub const VALUE_LENGTH_PREFIX_SIZE: usize = 4;

/// Maximum actual value size (chunk size minus length prefix)
pub const MAX_VALUE_DATA_SIZE: usize = STORAGE_VALUE_SIZE - VALUE_LENGTH_PREFIX_SIZE;
/// Maximum value size accepted by the storage layer.
pub const MAX_VALUE_SIZE: usize = MAX_VALUE_DATA_SIZE;

/// Helper function for creating storage value chunks
///
/// Stores value with a 4-byte length prefix to preserve exact data semantics.
/// Format: [len_u32_le][data][padding]
pub fn create_storage_value_chunk(value: &[u8]) -> Result<StorageValueChunk, ErrorCode> {
if value.len() > MAX_VALUE_DATA_SIZE {
return Err(ERR_VALUE_TOO_LARGE);
}

let mut data = [0u8; STORAGE_VALUE_SIZE];
// Store length as 4-byte little-endian prefix
let len_bytes = (value.len() as u32).to_le_bytes();
data[..VALUE_LENGTH_PREFIX_SIZE].copy_from_slice(&len_bytes);
// Store actual value after length prefix
data[VALUE_LENGTH_PREFIX_SIZE..VALUE_LENGTH_PREFIX_SIZE + value.len()].copy_from_slice(value);

Ok(StorageValueChunk::new(data))
}

/// Extract value from storage chunk by reading length prefix
///
/// Returns the exact bytes that were stored, preserving trailing zeros.
pub fn extract_value_from_chunk(chunk: &StorageValueChunk) -> Option<Vec<u8>> {
let data = chunk.as_ref();
// Read length from 4-byte little-endian prefix
let len_bytes: [u8; 4] = data[..VALUE_LENGTH_PREFIX_SIZE].try_into().ok()?;
let len = u32::from_le_bytes(len_bytes) as usize;

// Validate length
if len > MAX_VALUE_DATA_SIZE {
return None;
}

// Extract exactly 'len' bytes of actual data
Some(data[VALUE_LENGTH_PREFIX_SIZE..VALUE_LENGTH_PREFIX_SIZE + len].to_vec())
}
/// We keep the existing 4092-byte limit to preserve bounded memory usage even though the
/// underlying QMDB encoding is now variable-sized.
pub const MAX_VALUE_SIZE: usize = 4092;
/// Backwards-compatible alias used by existing tests and callers.
pub const MAX_VALUE_DATA_SIZE: usize = MAX_VALUE_SIZE;
Loading