Skip to content

Commit f5b434a

Browse files
authored
cleanup unused datasets from hotblocks storage (#46)
1 parent a19c000 commit f5b434a

4 files changed

Lines changed: 73 additions & 0 deletions

File tree

crates/hotblocks/src/data_service.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use futures::FutureExt;
77
use futures::{StreamExt, TryStreamExt};
88
use sqd_data_client::reqwest::ReqwestDataClient;
99
use sqd_storage::db::DatasetId;
10+
use tracing::{error, info};
1011
use std::collections::{BTreeMap, HashMap};
1112
use std::sync::Arc;
1213

@@ -21,6 +22,16 @@ pub struct DataService {
2122

2223
impl DataService {
2324
pub async fn start(db: DBRef, datasets: BTreeMap<DatasetId, DatasetConfig>) -> anyhow::Result<Self> {
25+
let all_datasets = db.get_all_datasets()?;
26+
for dataset in all_datasets {
27+
if !datasets.contains_key(&dataset.id) {
28+
info!("deleting unconfigured dataset {}", dataset.id);
29+
if let Err(err) = db.delete_dataset(dataset.id) {
30+
error!("failed to delete dataset {}: {}", dataset.id, err);
31+
}
32+
}
33+
}
34+
2435
let mut controllers = futures::stream::iter(datasets.into_iter())
2536
.map(|(dataset_id, cfg)| {
2637
let db = db.clone();

crates/storage/src/db/db.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,28 @@ impl Database {
257257
perform_dataset_compaction(&self.db, dataset_id, max_chunk_size, write_amplification_limit, compaction_len_limit)
258258
}
259259

260+
pub fn delete_dataset(&self, dataset_id: DatasetId) -> anyhow::Result<()> {
261+
Tx::new(&self.db).run(|tx| {
262+
let label = tx.find_label_for_update(dataset_id)?;
263+
if label.is_none() {
264+
return Ok(());
265+
}
266+
267+
let chunks = tx.list_chunks(dataset_id, 0, None);
268+
for chunk_result in chunks {
269+
let chunk = chunk_result?;
270+
tx.delete_chunk(dataset_id, &chunk)?;
271+
}
272+
273+
tx.delete_label(dataset_id)?;
274+
275+
Ok(())
276+
})?;
277+
278+
self.cleanup()?;
279+
Ok(())
280+
}
281+
260282
pub fn cleanup(&self) -> anyhow::Result<usize> {
261283
deleted_deleted_tables(&self.db)
262284
}

crates/storage/src/db/write/tx.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,11 @@ impl <'a> Tx<'a> {
110110
Ok(())
111111
}
112112

113+
pub fn delete_label(&self, dataset_id: DatasetId) -> anyhow::Result<()> {
114+
self.transaction.delete_cf(self.cf_handle(CF_DATASETS), dataset_id)?;
115+
Ok(())
116+
}
117+
113118
pub fn write_chunk(&self, dataset_id: DatasetId, chunk: &Chunk) -> anyhow::Result<()> {
114119
self.transaction.put_cf(
115120
self.cf_handle(CF_CHUNKS),

crates/storage/tests/database_ops.rs

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -397,3 +397,38 @@ fn labels() {
397397
assert_eq!(label.version(), 3); // 2 succesfull updates, 1 set finalized head
398398
assert_eq!(label.finalized_head(), Some(&finalized_head));
399399
}
400+
401+
#[test]
402+
fn delete_dataset() {
403+
let (db, dataset_id) = setup_db();
404+
405+
let chunk1 = Chunk::V0 {
406+
first_block: 0,
407+
last_block: 100,
408+
last_block_hash: "last_1".to_owned(),
409+
parent_block_hash: "base".to_owned(),
410+
tables: Default::default(),
411+
};
412+
let chunk2 = Chunk::V0 {
413+
first_block: 101,
414+
last_block: 200,
415+
last_block_hash: "last_2".to_owned(),
416+
parent_block_hash: "last_1".to_owned(),
417+
tables: Default::default(),
418+
};
419+
420+
assert!(db.insert_chunk(dataset_id, &chunk1).is_ok());
421+
assert!(db.insert_chunk(dataset_id, &chunk2).is_ok());
422+
423+
let datasets = db.get_all_datasets().unwrap();
424+
assert_eq!(datasets.len(), 1);
425+
assert_eq!(datasets[0].id, dataset_id);
426+
validate_chunks(&db, dataset_id, [&chunk1, &chunk2].to_vec());
427+
428+
assert!(db.delete_dataset(dataset_id).is_ok());
429+
430+
let datasets = db.get_all_datasets().unwrap();
431+
assert_eq!(datasets.len(), 0);
432+
433+
validate_chunks(&db, dataset_id, [].to_vec());
434+
}

0 commit comments

Comments
 (0)