diff --git a/Cargo.toml b/Cargo.toml index 739079fd..afc88752 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,9 +27,9 @@ lockfree = { version = "0.5.1" } fastrand = "2.3.0" futures = "0.3.30" uuid = { version = "1.10.0", features = ["v4", "v7"] } -data_bucket = "=0.3.9" +# data_bucket = "=0.3.9" # data_bucket = { git = "https://github.com/pathscale/DataBucket", branch = "page_cdc_correction", version = "0.2.7" } -# data_bucket = { path = "../DataBucket", version = "0.3.8" } +data_bucket = { path = "../DataBucket", version = "0.3.10" } performance_measurement_codegen = { path = "performance_measurement/codegen", version = "0.1.0", optional = true } performance_measurement = { path = "performance_measurement", version = "0.1.0", optional = true } indexset = { version = "=0.14.0", features = ["concurrent", "cdc", "multimap"] } diff --git a/codegen/src/persist_index/generator.rs b/codegen/src/persist_index/generator.rs index 940cd430..6e928d67 100644 --- a/codegen/src/persist_index/generator.rs +++ b/codegen/src/persist_index/generator.rs @@ -364,12 +364,29 @@ impl Generator { if is_unsized(&ty.to_string()) { let node = if is_unique { quote! { - let node = UnsizedNode::from_inner(page.inner.get_node(), #const_name); + let node = page + .inner + .get_node() + .into_iter() + .map(|p| IndexPair { + key: p.key, + value: p.value.into(), + }) + .collect(); + let node = UnsizedNode::from_inner(node, #const_name); #i.attach_node(node); } } else { quote! { - let inner = page.inner.get_node(); + let inner: Vec<_> = page + .inner + .get_node() + .into_iter() + .map(|p| IndexPair { + key: p.key, + value: OffsetEqLink(p.value), + }) + .collect(); let mut last_key = inner.first().expect("Node should be not empty").key.clone(); let mut discriminator = 0; let mut inner = inner.into_iter().map(move |p| { @@ -390,7 +407,7 @@ impl Generator { } }; quote! { - let #i: #t<_, Link, UnsizedNode<_>> = #t::with_maximum_node_size(#const_name); + let #i: #t<_, OffsetEqLink, UnsizedNode<_>> = #t::with_maximum_node_size(#const_name); for page in persisted.#i.1 { #node } @@ -398,12 +415,28 @@ impl Generator { } else { let node = if is_unique { quote! { - let node = page.inner.get_node(); + let node = page + .inner + .get_node() + .into_iter() + .map(|p| IndexPair { + key: p.key, + value: p.value.into(), + }) + .collect(); #i.attach_node(node); } } else { quote! { - let inner = page.inner.get_node(); + let inner: Vec<_> = page + .inner + .get_node() + .into_iter() + .map(|p| IndexPair { + key: p.key, + value: OffsetEqLink(p.value), + }) + .collect(); let mut last_key = inner.first().expect("Node should be not empty").key.clone(); let mut discriminator = 0; let mut inner = inner.into_iter().map(move |p| { @@ -424,7 +457,7 @@ impl Generator { }; quote! { let size = get_index_page_size_from_data_length::<#ty>(#const_name); - let #i: #t<_, Link> = #t::with_maximum_node_size(size); + let #i: #t<_, OffsetEqLink> = #t::with_maximum_node_size(size); for page in persisted.#i.1 { #node } diff --git a/codegen/src/persist_table/generator/space_file/mod.rs b/codegen/src/persist_table/generator/space_file/mod.rs index 07a8619a..1760015e 100644 --- a/codegen/src/persist_table/generator/space_file/mod.rs +++ b/codegen/src/persist_table/generator/space_file/mod.rs @@ -156,23 +156,53 @@ impl Generator { let const_name = name_generator.get_page_inner_size_const_ident(); let pk_type = name_generator.get_primary_key_type_ident(); - let pk_map = if self.attributes.pk_unsized { + let primary_index_init = if self.attributes.pk_unsized { let pk_ident = &self.pk_ident; quote! { - let pk_map = IndexMap::<#pk_ident, Link, UnsizedNode<_>>::with_maximum_node_size(#const_name); + let pk_map = IndexMap::<#pk_ident, OffsetEqLink<#const_name>, UnsizedNode<_>>::with_maximum_node_size(#const_name); for page in self.primary_index.1 { - let node = page.inner.get_node(); + let node = page + .inner + .get_node() + .into_iter() + .map(|p| IndexPair { + key: p.key, + value: p.value.into(), + }) + .collect(); pk_map.attach_node(UnsizedNode::from_inner(node, #const_name)); } + // Reconstruct reverse_pk_map by iterating over pk_map + let mut reverse_pk_map = IndexMap::, #pk_ident>::new(); + for entry in pk_map.iter() { + let (pk, link) = entry; + reverse_pk_map.insert(*link, pk.clone()); + } + let primary_index = PrimaryIndex { pk_map, reverse_pk_map }; } } else { quote! { let size = get_index_page_size_from_data_length::<#pk_type>(#const_name); - let pk_map = IndexMap::with_maximum_node_size(size); + let pk_map = IndexMap::<_, OffsetEqLink<#const_name>>::with_maximum_node_size(size); for page in self.primary_index.1 { - let node = page.inner.get_node(); + let node = page + .inner + .get_node() + .into_iter() + .map(|p| IndexPair { + key: p.key, + value: p.value.into(), + }) + .collect(); pk_map.attach_node(node); } + // Reconstruct reverse_pk_map by iterating over pk_map + let mut reverse_pk_map = IndexMap::, #pk_type>::new(); + for entry in pk_map.iter() { + let (pk, link) = entry; + reverse_pk_map.insert(*link, pk.clone()); + } + let primary_index = PrimaryIndex { pk_map, reverse_pk_map }; } }; @@ -191,12 +221,12 @@ impl Generator { .with_empty_links(self.data_info.inner.empty_links_list); let indexes = #index_ident::from_persisted(self.indexes); - #pk_map + #primary_index_init let table = WorkTable { - data, - pk_map, - indexes, + data: std::sync::Arc::new(data), + primary_index: std::sync::Arc::new(primary_index), + indexes: std::sync::Arc::new(indexes), pk_gen: PrimaryKeyGeneratorState::from_state(self.data_info.inner.pk_gen_state), lock_map: LockMap::default(), update_state: IndexMap::default(), diff --git a/codegen/src/persist_table/generator/space_file/worktable_impls.rs b/codegen/src/persist_table/generator/space_file/worktable_impls.rs index bd5efd80..9fe0a1fb 100644 --- a/codegen/src/persist_table/generator/space_file/worktable_impls.rs +++ b/codegen/src/persist_table/generator/space_file/worktable_impls.rs @@ -105,7 +105,7 @@ impl Generator { quote! { pub fn get_peristed_primary_key_with_toc(&self) -> (Vec>>, Vec>>) { let mut pages = vec![]; - for node in self.0.pk_map.iter_nodes() { + for node in self.0.primary_index.pk_map.iter_nodes() { let page = UnsizedIndexPage::from_node(node.lock_arc().as_ref()); pages.push(page); } @@ -118,7 +118,7 @@ impl Generator { pub fn get_peristed_primary_key_with_toc(&self) -> (Vec>>, Vec>>) { let size = get_index_page_size_from_data_length::<#pk_type>(#const_name); let mut pages = vec![]; - for node in self.0.pk_map.iter_nodes() { + for node in self.0.primary_index.pk_map.iter_nodes() { let page = IndexPage::from_node(node.lock_arc().as_ref(), size); pages.push(page); } diff --git a/codegen/src/worktable/generator/index/cdc.rs b/codegen/src/worktable/generator/index/cdc.rs index 9a10a0dd..20edfff8 100644 --- a/codegen/src/worktable/generator/index/cdc.rs +++ b/codegen/src/worktable/generator/index/cdc.rs @@ -104,7 +104,7 @@ impl Generator { let remove = if idx.is_unique { quote! { if row_new.#i == row_old.#i { - let events = self.#index_field_name.insert_cdc(row_new.#i.clone(), link_new).1; + let events = TableIndexCdc::insert_cdc(&self.#index_field_name, row_new.#i.clone(), link_new).1; #index_field_name.extend(events.into_iter().map(|ev| ev.into()).collect::>()); } else { let (_, events) = TableIndexCdc::remove_cdc(&self.#index_field_name, row_old.#i.clone(), link_old); @@ -113,7 +113,7 @@ impl Generator { } } else { quote! { - let events = self.#index_field_name.insert_cdc(row_new.#i.clone(), link_new).1; + let events = TableIndexCdc::insert_cdc(&self.#index_field_name, row_new.#i.clone(), link_new).1; #index_field_name.extend(events.into_iter().map(|ev| ev.into()).collect::>()); let (_, events) = TableIndexCdc::remove_cdc(&self.#index_field_name, row_old.#i.clone(), link_old); #index_field_name.extend(events.into_iter().map(|ev| ev.into()).collect::>()); diff --git a/codegen/src/worktable/generator/index/mod.rs b/codegen/src/worktable/generator/index/mod.rs index a258df2b..af30d1aa 100644 --- a/codegen/src/worktable/generator/index/mod.rs +++ b/codegen/src/worktable/generator/index/mod.rs @@ -60,16 +60,16 @@ impl Generator { let res = if idx.is_unique { if is_unsized(&t.to_string()) { quote! { - #i: IndexMap<#t, Link, UnsizedNode>> + #i: IndexMap<#t, OffsetEqLink, UnsizedNode>> } } else { - quote! {#i: IndexMap<#t, Link>} + quote! {#i: IndexMap<#t, OffsetEqLink>} } } else { if is_unsized(&t.to_string()) { - quote! {#i: IndexMultiMap<#t, Link, UnsizedNode>>} + quote! {#i: IndexMultiMap<#t, OffsetEqLink, UnsizedNode>>} } else { - quote! {#i: IndexMultiMap<#t, Link>} + quote! {#i: IndexMultiMap<#t, OffsetEqLink>} } }; Ok::<_, syn::Error>(res) diff --git a/codegen/src/worktable/generator/index/usual.rs b/codegen/src/worktable/generator/index/usual.rs index 1bc508cf..6989b311 100644 --- a/codegen/src/worktable/generator/index/usual.rs +++ b/codegen/src/worktable/generator/index/usual.rs @@ -125,15 +125,15 @@ impl Generator { let remove = if idx.is_unique { quote! { if val_new == val_old { - self.#index_field_name.insert(val_new.clone(), link_new); + TableIndex::insert(&self.#index_field_name, val_new.clone(), link_new); } else { - TableIndex::remove(&self.#index_field_name, val_old, link_old); + TableIndex::remove(&self.#index_field_name, &val_old, link_old); } } } else { quote! { - self.#index_field_name.insert(val_new.clone(), link_new); - TableIndex::remove(&self.#index_field_name, val_old, link_old); + TableIndex::insert(&self.#index_field_name, val_new.clone(), link_new); + TableIndex::remove(&self.#index_field_name, &val_old, link_old); } }; let insert = if idx.is_unique { @@ -212,14 +212,8 @@ impl Generator { row.#i } }; - if idx.is_unique { - quote! { - self.#index_field_name.remove(&#row); - } - } else { - quote! { - self.#index_field_name.remove(&#row, &link); - } + quote! { + TableIndex::remove(&self.#index_field_name, &#row, link); } }) .collect::>(); @@ -259,7 +253,7 @@ impl Generator { if let Some(diff) = difference.get(#diff_key) { if let #avt_type_ident::#variant_ident(old) = &diff.old { let key_old = #old_value_expr; - TableIndex::remove(&self.#index_field_name, key_old, link); + TableIndex::remove(&self.#index_field_name, &key_old, link); } } } @@ -372,19 +366,10 @@ impl Generator { row.#i } }; - let delete = if idx.is_unique { - quote! { - self.#index_field_name.remove(&#row); - } - } else { - quote! { - self.#index_field_name.remove(&#row, &link); - } - }; quote! { #avt_index_ident::#index_variant => { - #delete + TableIndex::remove(&self.#index_field_name, &#row, link); }, } }) diff --git a/codegen/src/worktable/generator/queries/delete.rs b/codegen/src/worktable/generator/queries/delete.rs index 5b9b13d8..3f764315 100644 --- a/codegen/src/worktable/generator/queries/delete.rs +++ b/codegen/src/worktable/generator/queries/delete.rs @@ -76,7 +76,7 @@ impl Generator { let process = if self.is_persist { quote! { let secondary_keys_events = self.0.indexes.delete_row_cdc(row, link)?; - let (_, primary_key_events) = TableIndexCdc::remove_cdc(&self.0.pk_map, pk.clone(), link); + let (_, primary_key_events) = self.0.primary_index.remove_cdc(pk.clone(), link); self.0.data.delete(link).map_err(WorkTableError::PagesError)?; let mut op: Operation< <<#pk_ident as TablePrimaryKey>::Generator as PrimaryKeyGeneratorState>::State, @@ -93,16 +93,17 @@ impl Generator { } else { quote! { self.0.indexes.delete_row(row, link)?; - self.0.pk_map.remove(&pk); + self.0.primary_index.remove(&pk, link); self.0.data.delete(link).map_err(WorkTableError::PagesError)?; } }; if is_locked { quote! { let link = match self.0 + .primary_index .pk_map .get(&pk) - .map(|v| v.get().value) + .map(|v| v.get().value.into()) .ok_or(WorkTableError::NotFound) { Ok(l) => l, Err(e) => { @@ -117,9 +118,10 @@ impl Generator { } else { quote! { let link = self.0 + .primary_index .pk_map .get(&pk) - .map(|v| v.get().value) + .map(|v| v.get().value.into()) .ok_or(WorkTableError::NotFound)?; let row = self.select(pk.clone()).unwrap(); #process @@ -197,7 +199,7 @@ impl Generator { pub async fn #name(&self, by: #type_) -> core::result::Result<(), WorkTableError> { let rows_to_update = self.0.indexes.#index.get(#by).map(|kv| kv.1).collect::>(); for link in rows_to_update { - let row = self.0.data.select_non_ghosted(*link).map_err(WorkTableError::PagesError)?; + let row = self.0.data.select_non_ghosted(link.0).map_err(WorkTableError::PagesError)?; self.delete(row.get_primary_key()).await?; } core::result::Result::Ok(()) @@ -217,7 +219,7 @@ impl Generator { }; quote! { pub async fn #name(&self, by: #type_) -> core::result::Result<(), WorkTableError> { - let row_to_update = self.0.indexes.#index.get(#by).map(|v| v.get().value); + let row_to_update = self.0.indexes.#index.get(#by).map(|v| v.get().value.into()); if let Some(link) = row_to_update { let row = self.0.data.select_non_ghosted(link).map_err(WorkTableError::PagesError)?; self.delete(row.get_primary_key()).await?; diff --git a/codegen/src/worktable/generator/queries/in_place.rs b/codegen/src/worktable/generator/queries/in_place.rs index 922eb372..6ed10718 100644 --- a/codegen/src/worktable/generator/queries/in_place.rs +++ b/codegen/src/worktable/generator/queries/in_place.rs @@ -114,9 +114,9 @@ impl Generator { }; let link = self .0 - .pk_map + .primary_index.pk_map .get(&pk) - .map(|v| v.get().value) + .map(|v| v.get().value.into()) .ok_or(WorkTableError::NotFound)?; unsafe { self.0 diff --git a/codegen/src/worktable/generator/queries/select.rs b/codegen/src/worktable/generator/queries/select.rs index 1676746f..91873e9d 100644 --- a/codegen/src/worktable/generator/queries/select.rs +++ b/codegen/src/worktable/generator/queries/select.rs @@ -29,9 +29,9 @@ impl Generator { #column_range_type, #row_fields_ident> { - let iter = self.0.pk_map + let iter = self.0.primary_index.pk_map .iter() - .filter_map(|(_, link)| self.0.data.select_non_ghosted(*link).ok()); + .filter_map(|(_, link)| self.0.data.select_non_ghosted(link.0).ok()); SelectQueryBuilder::new(iter) } diff --git a/codegen/src/worktable/generator/queries/update.rs b/codegen/src/worktable/generator/queries/update.rs index dae5fb45..26955206 100644 --- a/codegen/src/worktable/generator/queries/update.rs +++ b/codegen/src/worktable/generator/queries/update.rs @@ -93,9 +93,9 @@ impl Generator { }; let link = match self.0 - .pk_map + .primary_index.pk_map .get(&pk) - .map(|v| v.get().value) + .map(|v| v.get().value.into()) .ok_or(WorkTableError::NotFound) { Ok(l) => l, Err(e) => { @@ -478,9 +478,9 @@ impl Generator { }; let link = match self.0 - .pk_map + .primary_index.pk_map .get(&pk) - .map(|v| v.get().value) + .map(|v| v.get().value.into()) .ok_or(WorkTableError::NotFound) { Ok(l) => l, Err(e) => { @@ -609,7 +609,7 @@ impl Generator { quote! { pub async fn #method_ident(&self, row: #query_ident, by: #by_ident) -> core::result::Result<(), WorkTableError> { - let links: Vec<_> = self.0.indexes.#index.get(#by).map(|(_, l)| *l).collect(); + let links: Vec<_> = self.0.indexes.#index.get(#by).map(|(_, l)| l.0).collect(); let mut locks = std::collections::HashMap::new(); for link in links.iter() { @@ -620,7 +620,7 @@ impl Generator { locks.insert(pk, op_lock); } - let links: Vec<_> = self.0.indexes.#index.get(#by).map(|(_, l)| *l).collect(); + let links: Vec<_> = self.0.indexes.#index.get(#by).map(|(_, l)| l.0).collect(); let mut pk_to_unlock: std::collections::HashMap<_, std::sync::Arc> = std::collections::HashMap::new(); let op_id = OperationId::Multi(uuid::Uuid::now_v7()); for link in links.into_iter() { @@ -710,7 +710,7 @@ impl Generator { let link = self.0.indexes.#index .get(#by) - .map(|kv| kv.get().value) + .map(|v| v.get().value.into()) .ok_or(WorkTableError::NotFound)?; let pk = self.0.data.select_non_ghosted(link)?.get_primary_key().clone(); @@ -720,7 +720,7 @@ impl Generator { let link = match self.0.indexes.#index .get(#by) - .map(|kv| kv.get().value) + .map(|v| v.get().value.into()) .ok_or(WorkTableError::NotFound) { Ok(l) => l, Err(e) => { diff --git a/codegen/src/worktable/generator/table/impls.rs b/codegen/src/worktable/generator/table/impls.rs index 34b5974c..493e50ee 100644 --- a/codegen/src/worktable/generator/table/impls.rs +++ b/codegen/src/worktable/generator/table/impls.rs @@ -62,21 +62,27 @@ impl Generator { }) .collect::>(); let pk_types_unsized = is_unsized_vec(pk_types); - let index_size = if pk_types_unsized { + let index_setup = if pk_types_unsized { quote! { - let size = #const_name; + inner.primary_index = std::sync::Arc::new(PrimaryIndex { + pk_map: IndexMap::<#pk_type, OffsetEqLink<#const_name>, UnsizedNode<_>>::with_maximum_node_size(#const_name), + reverse_pk_map: IndexMap::new(), + }); } } else { quote! { let size = get_index_page_size_from_data_length::<#pk_type>(#const_name); + inner.primary_index = std::sync::Arc::new(PrimaryIndex { + pk_map: IndexMap::<_, OffsetEqLink<#const_name>>::with_maximum_node_size(size), + reverse_pk_map: IndexMap::new(), + }); } }; quote! { pub async fn new(config: PersistenceConfig) -> eyre::Result { let mut inner = WorkTable::default(); inner.table_name = #table_name; - #index_size - inner.pk_map = IndexMap::with_maximum_node_size(size); + #index_setup let table_files_path = format!("{}/{}", config.tables_path, #dir_name); let engine: #engine = PersistenceEngine::from_table_files_path(table_files_path).await?; core::result::Result::Ok(Self( @@ -168,7 +174,7 @@ impl Generator { pub async fn upsert(&self, row: #row_type) -> core::result::Result<(), WorkTableError> { let pk = row.get_primary_key(); let need_to_update = { - if let Some(_) = self.0.pk_map.get(&pk) + if let Some(_) = self.0.primary_index.pk_map.get(&pk) { true } else { @@ -238,7 +244,7 @@ impl Generator { fn gen_table_iter_inner(&self, func: TokenStream) -> TokenStream { quote! { - let first = self.0.pk_map.iter().next().map(|(k, v)| (k.clone(), *v)); + let first = self.0.primary_index.pk_map.iter().next().map(|(k, v)| (k.clone(), v.0)); let Some((mut k, link)) = first else { return Ok(()) }; @@ -249,12 +255,12 @@ impl Generator { let mut ind = false; while !ind { let next = { - let mut iter = self.0.pk_map.range(k.clone()..); - let next = iter.next().map(|(k, v)| (k.clone(), *v)).filter(|(key, _)| key != &k); + let mut iter = self.0.primary_index.pk_map.range(k.clone()..); + let next = iter.next().map(|(k, v)| (k.clone(), v.0)).filter(|(key, _)| key != &k); if next.is_some() { next } else { - iter.next().map(|(k, v)| (k.clone(), *v)) + iter.next().map(|(k, v)| (k.clone(), v.0)) } }; if let Some((key, link)) = next { @@ -273,7 +279,7 @@ impl Generator { fn gen_table_count_fn(&self) -> TokenStream { quote! { pub fn count(&self) -> usize { - let count = self.0.pk_map.len(); + let count = self.0.primary_index.pk_map.len(); count } } diff --git a/codegen/src/worktable/generator/table/index_fns.rs b/codegen/src/worktable/generator/table/index_fns.rs index 8471879e..52b88f36 100644 --- a/codegen/src/worktable/generator/table/index_fns.rs +++ b/codegen/src/worktable/generator/table/index_fns.rs @@ -65,7 +65,7 @@ impl Generator { Ok(quote! { pub fn #fn_name(&self, by: #type_) -> Option<#row_ident> { - let link = self.0.indexes.#field_ident.get(#by).map(|kv| kv.get().value)?; + let link = self.0.indexes.#field_ident.get(#by).map(|kv| kv.get().value.into())?; self.0.data.select_non_ghosted(link).ok() } }) @@ -104,7 +104,7 @@ impl Generator { let rows = self.0.indexes.#field_ident .get(#by) .into_iter() - .filter_map(|(_, link)| self.0.data.select_non_ghosted(*link).ok()) + .filter_map(|(_, link)| self.0.data.select_non_ghosted(link.0).ok()) .filter(move |r| &r.#row_field_ident == &by); SelectQueryBuilder::new(rows) diff --git a/codegen/src/worktable/generator/table/mod.rs b/codegen/src/worktable/generator/table/mod.rs index fb096078..6f0562ba 100644 --- a/codegen/src/worktable/generator/table/mod.rs +++ b/codegen/src/worktable/generator/table/mod.rs @@ -101,11 +101,11 @@ impl Generator { }; let node_type = if pk_types_unsized { quote! { - UnsizedNode> + UnsizedNode>> } } else { quote! { - Vec> + Vec>> } }; @@ -121,8 +121,8 @@ impl Generator { #index_type, #lock_ident, <#primary_key_type as TablePrimaryKey>::Generator, - #node_type, - #inner_const_name + #inner_const_name, + #node_type > #persist_type_part ); @@ -139,7 +139,8 @@ impl Generator { #index_type, #lock_ident, <#primary_key_type as TablePrimaryKey>::Generator, - #node_type, + { INNER_PAGE_SIZE }, + #node_type > #persist_type_part ); diff --git a/src/in_memory/data.rs b/src/in_memory/data.rs index ac7103b6..50c6edcf 100644 --- a/src/in_memory/data.rs +++ b/src/in_memory/data.rs @@ -257,10 +257,74 @@ impl Data { Ok(bytes.to_vec()) } + /// Moves data within the page from one location to another. + /// Used for defragmentation - shifts data left to fill gaps. + /// + /// # Safety + /// Caller must ensure: + /// - Both `from` and `to` links are valid and point to the same page + /// - `from.length` equals `to.length` + /// - No other references exist during this operation + pub unsafe fn move_from_to(&self, from: Link, to: Link) -> Result<(), ExecutionError> { + if from.length != to.length { + return Err(ExecutionError::InvalidLink); + } + + let inner_data = unsafe { &mut *self.inner_data.get() }; + let src_offset = from.offset as usize; + let dst_offset = to.offset as usize; + let length = from.length as usize; + + // Use ptr::copy for overlapping memory regions (safe for shifting left) + // When moving left (dst_offset < src_offset), this works correctly + unsafe { + std::ptr::copy( + inner_data.as_ptr().add(src_offset), + inner_data.as_mut_ptr().add(dst_offset), + length, + ); + } + + Ok(()) + } + + /// Saves raw serialized bytes to the end of the page. + /// Used for moving already-serialized data without re-serialization. + pub fn save_raw_row(&self, data: &[u8]) -> Result { + let length = data.len(); + if length > DATA_LENGTH { + return Err(ExecutionError::PageTooSmall { + need: length, + allowed: DATA_LENGTH, + }); + } + let length = length as u32; + let offset = self.free_offset.fetch_add(length, Ordering::AcqRel); + if offset > DATA_LENGTH as u32 - length { + return Err(ExecutionError::PageIsFull { + need: length, + left: DATA_LENGTH as i64 - offset as i64, + }); + } + + let inner_data = unsafe { &mut *self.inner_data.get() }; + inner_data[offset as usize..][..length as usize].copy_from_slice(data); + + Ok(Link { + page_id: self.id, + offset, + length, + }) + } + pub fn get_bytes(&self) -> [u8; DATA_LENGTH] { let data = unsafe { &*self.inner_data.get() }; data.0 } + + pub fn free_space(&self) -> usize { + DATA_LENGTH.saturating_sub(self.free_offset.load(Ordering::Acquire) as usize) + } } /// Error that can appear on [`Data`] page operations. @@ -293,6 +357,7 @@ mod tests { use rkyv::{Archive, Deserialize, Serialize}; use crate::in_memory::data::{Data, ExecutionError, INNER_PAGE_SIZE}; + use crate::prelude::Link; #[derive( Archive, Copy, Clone, Deserialize, Debug, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize, @@ -316,12 +381,16 @@ mod tests { let page = Data::::new(1.into()); let row = TestRow { a: 10, b: 20 }; + let initial_free = page.free_space(); + assert!(initial_free > 0); + let link = page.save_row(&row).unwrap(); assert_eq!(link.page_id, page.id); assert_eq!(link.length, 16); assert_eq!(link.offset, 0); assert_eq!(page.free_offset.load(Ordering::Relaxed), link.length); + assert_eq!(page.free_space(), initial_free - link.length as usize); let inner_data = unsafe { &mut *page.inner_data.get() }; let bytes = &inner_data[link.offset as usize..link.length as usize]; @@ -408,6 +477,9 @@ mod tests { fn data_page_save_many_rows() { let page = Data::::new(1.into()); + let initial_free = page.free_space(); + let mut total_used = 0; + let mut rows = Vec::new(); let mut links = Vec::new(); for i in 1..10 { @@ -418,9 +490,12 @@ mod tests { rows.push(row); let link = page.save_row(&row); + total_used += link.as_ref().unwrap().length as usize; links.push(link) } + assert_eq!(page.free_space(), initial_free - total_used); + let inner_data = unsafe { &mut *page.inner_data.get() }; for (i, link) in links.into_iter().enumerate() { @@ -499,4 +574,125 @@ mod tests { let _ = shared.get_row(link).unwrap(); } } + + #[test] + fn move_from_to() { + let page = Data::::new(1.into()); + + let row1 = TestRow { a: 100, b: 200 }; + let link1 = page.save_row(&row1).unwrap(); + assert_eq!(link1.offset, 0); + + let row2 = TestRow { a: 300, b: 400 }; + let link2 = page.save_row(&row2).unwrap(); + assert_eq!(link2.offset, 16); + + let new_link = Link { + page_id: link2.page_id, + offset: 0, + length: link2.length, + }; + + unsafe { page.move_from_to(link2, new_link).unwrap() }; + + let moved_row = page.get_row(new_link).unwrap(); + assert_eq!(moved_row, row2); + } + + #[test] + fn move_from_to_different_lengths() { + let page = Data::::new(1.into()); + + let from = Link { + page_id: 1.into(), + offset: 0, + length: 16, + }; + let to = Link { + page_id: 1.into(), + offset: 32, + length: 8, + }; + + let result = unsafe { page.move_from_to(from, to) }; + assert!(matches!(result, Err(ExecutionError::InvalidLink))); + } + + #[test] + fn save_raw_row_appends_to_page() { + let page = Data::::new(1.into()); + let row = TestRow { a: 42, b: 99 }; + + let link = page.save_row(&row).unwrap(); + let raw_data = page.get_raw_row(link).unwrap(); + + let new_link = page.save_raw_row(&raw_data).unwrap(); + + assert_eq!(new_link.page_id, page.id); + assert_eq!(new_link.length, link.length); + assert_eq!(new_link.offset, link.length); + + let retrieved = page.get_row(new_link).unwrap(); + assert_eq!(retrieved, row); + } + + #[test] + fn save_raw_row_page_too_small() { + let page = Data::::new(1.into()); + let data = vec![0u8; 32]; + + let result = page.save_raw_row(&data); + assert!(matches!(result, Err(ExecutionError::PageTooSmall { .. }))); + } + + #[test] + fn save_raw_row_page_full() { + let page = Data::::new(1.into()); + let row = TestRow { a: 1, b: 2 }; + let _ = page.save_row(&row).unwrap(); + + let data = vec![0u8; 16]; + let result = page.save_raw_row(&data); + assert!(matches!(result, Err(ExecutionError::PageIsFull { .. }))); + } + + #[test] + fn save_raw_row_move_between_pages() { + let page1 = Data::::new(1.into()); + let page2 = Data::::new(2.into()); + + let original = TestRow { a: 123, b: 456 }; + let link1 = page1.save_row(&original).unwrap(); + + let raw = page1.get_raw_row(link1).unwrap(); + let link2 = page2.save_raw_row(&raw).unwrap(); + + let retrieved = page2.get_row(link2).unwrap(); + assert_eq!(retrieved, original); + } + + #[test] + fn save_raw_row_multiple_entries() { + let page = Data::::new(1.into()); + let row = TestRow { a: 77, b: 88 }; + + let link = page.save_row(&row).unwrap(); + let raw_data = page.get_raw_row(link).unwrap(); + let row_size = link.length as usize; + + let initial_free = page.free_space(); + + let mut links = vec![link]; + for i in 0..5 { + let new_link = page.save_raw_row(&raw_data).unwrap(); + links.push(new_link); + let expected_free = initial_free - ((i + 1) as usize * row_size); + assert_eq!(page.free_space(), expected_free); + } + + for link in links { + let retrieved = page.get_row(link).unwrap(); + assert_eq!(retrieved, row); + } + } } diff --git a/src/in_memory/empty_link_registry.rs b/src/in_memory/empty_link_registry.rs index 35d5e06c..a4a7e264 100644 --- a/src/in_memory/empty_link_registry.rs +++ b/src/in_memory/empty_link_registry.rs @@ -1,11 +1,14 @@ use crate::in_memory::DATA_INNER_LENGTH; use data_bucket::Link; +use data_bucket::page::PageId; +use derive_more::Into; use indexset::concurrent::multimap::BTreeMultiMap; use indexset::concurrent::set::BTreeSet; use parking_lot::FairMutex; +use std::sync::atomic::{AtomicU32, Ordering}; /// A link wrapper that implements `Ord` based on absolute index calculation. -#[derive(Copy, Clone, Debug, Eq, PartialEq)] +#[derive(Copy, Clone, Debug, Eq, PartialEq, Into)] pub struct IndexOrdLink(pub Link); impl IndexOrdLink { @@ -73,6 +76,11 @@ impl Ord for IndexOrdLink { pub struct EmptyLinkRegistry { index_ord_links: BTreeSet>, length_ord_links: BTreeMultiMap, + + pub page_links_map: BTreeMultiMap, + + sum_links_len: AtomicU32, + op_lock: FairMutex<()>, } @@ -81,12 +89,32 @@ impl Default for EmptyLinkRegistry { Self { index_ord_links: BTreeSet::new(), length_ord_links: BTreeMultiMap::new(), + page_links_map: BTreeMultiMap::new(), + sum_links_len: Default::default(), op_lock: Default::default(), } } } impl EmptyLinkRegistry { + pub fn remove_link>(&self, link: L) { + let link = link.into(); + self.index_ord_links.remove(&IndexOrdLink(link)); + self.length_ord_links.remove(&link.length, &link); + self.page_links_map.remove(&link.page_id, &link); + + self.sum_links_len.fetch_sub(link.length, Ordering::AcqRel); + } + + fn insert_link>(&self, link: L) { + let link = link.into(); + self.index_ord_links.insert(IndexOrdLink(link)); + self.length_ord_links.insert(link.length, link); + self.page_links_map.insert(link.page_id, link); + + self.sum_links_len.fetch_add(link.length, Ordering::AcqRel); + } + pub fn push(&self, link: Link) { let mut index_ord_link = IndexOrdLink(link); let _g = self.op_lock.lock(); @@ -101,9 +129,7 @@ impl EmptyLinkRegistry { drop(iter); // Remove left neighbor - self.index_ord_links.remove(&possible_left_neighbor); - self.length_ord_links - .remove(&possible_left_neighbor.0.length, &possible_left_neighbor.0); + self.remove_link(possible_left_neighbor); index_ord_link = united_link; } @@ -120,39 +146,35 @@ impl EmptyLinkRegistry { drop(iter); // Remove right neighbor - self.index_ord_links.remove(&possible_right_neighbor); - self.length_ord_links.remove( - &possible_right_neighbor.0.length, - &possible_right_neighbor.0, - ); + self.remove_link(possible_right_neighbor); index_ord_link = united_link; } } } - self.index_ord_links.insert(index_ord_link); - self.length_ord_links - .insert(index_ord_link.0.length, index_ord_link.0); + self.insert_link(index_ord_link); } pub fn pop_max(&self) -> Option { let _g = self.op_lock.lock(); let mut iter = self.length_ord_links.iter().rev(); - let (len, max_length_link) = iter.next()?; - let index_ord_link = IndexOrdLink(*max_length_link); + let (_, max_length_link) = iter.next()?; drop(iter); - self.length_ord_links.remove(len, max_length_link); - self.index_ord_links.remove(&index_ord_link); + self.remove_link(*max_length_link); - Some(index_ord_link.0) + Some(*max_length_link) } pub fn iter(&self) -> impl Iterator + '_ { self.index_ord_links.iter().map(|l| l.0) } + + pub fn get_empty_links_size_bytes(&self) -> u32 { + self.sum_links_len.load(Ordering::Acquire) + } } #[cfg(test)] @@ -160,7 +182,142 @@ mod tests { use super::*; #[test] - fn test_empty_link_registry_insert_and_pop() { + fn test_unite_with_right_neighbor() { + let left = IndexOrdLink::(Link { + page_id: 1.into(), + offset: 0, + length: 100, + }); + + let right = IndexOrdLink::(Link { + page_id: 1.into(), + offset: 100, + length: 50, + }); + + let united = left.unite_with_right_neighbor(&right).unwrap(); + assert_eq!(united.0.page_id, 1.into()); + assert_eq!(united.0.offset, 0); + assert_eq!(united.0.length, 150); + } + + #[test] + fn test_unite_with_left_neighbor() { + let left = IndexOrdLink::(Link { + page_id: 1.into(), + offset: 0, + length: 100, + }); + + let right = IndexOrdLink::(Link { + page_id: 1.into(), + offset: 100, + length: 50, + }); + + let united = right.unite_with_left_neighbor(&left).unwrap(); + assert_eq!(united.0.page_id, 1.into()); + assert_eq!(united.0.offset, 0); + assert_eq!(united.0.length, 150); + } + + #[test] + fn test_unite_fails_on_gap() { + let link1 = IndexOrdLink::(Link { + page_id: 1.into(), + offset: 0, + length: 100, + }); + + let link2 = IndexOrdLink::(Link { + page_id: 1.into(), + offset: 200, + length: 50, + }); + + assert!(link1.unite_with_right_neighbor(&link2).is_none()); + assert!(link2.unite_with_left_neighbor(&link1).is_none()); + } + + #[test] + fn test_unite_fails_on_different_pages() { + let link1 = IndexOrdLink::(Link { + page_id: 1.into(), + offset: 0, + length: 100, + }); + + let link2 = IndexOrdLink::(Link { + page_id: 2.into(), + offset: 100, + length: 50, + }); + + assert!(link1.unite_with_right_neighbor(&link2).is_none()); + assert!(link2.unite_with_left_neighbor(&link1).is_none()); + } + + #[test] + fn test_index_ord_link_ordering() { + const TEST_DATA_LENGTH: usize = 1000; + + let link1 = IndexOrdLink::(Link { + page_id: 1.into(), + offset: 0, + length: 100, + }); + + let link2 = IndexOrdLink::(Link { + page_id: 1.into(), + offset: 100, + length: 50, + }); + + let link3 = IndexOrdLink::(Link { + page_id: 2.into(), + offset: 0, + length: 200, + }); + + assert!(link1 < link2); + assert!(link2 < link3); + assert!(link1 < link3); + } + + #[test] + fn test_push_merges_both_sides() { + let registry = EmptyLinkRegistry::::default(); + + let left = Link { + page_id: 1.into(), + offset: 0, + length: 100, + }; + + let middle = Link { + page_id: 1.into(), + offset: 100, + length: 50, + }; + + let right = Link { + page_id: 1.into(), + offset: 150, + length: 75, + }; + + registry.push(left); + registry.push(right); + registry.push(middle); + + let result = registry.pop_max().unwrap(); + assert_eq!(result.page_id, 1.into()); + assert_eq!(result.offset, 0); + assert_eq!(result.length, 225); + } + + #[test] + fn test_push_non_adjacent_no_merge() { let registry = EmptyLinkRegistry::::default(); let link1 = Link { @@ -170,13 +327,69 @@ mod tests { }; let link2 = Link { + page_id: 1.into(), + offset: 200, + length: 50, + }; + + registry.push(link1); + registry.push(link2); + + let pop1 = registry.pop_max().unwrap(); + let pop2 = registry.pop_max().unwrap(); + + assert_eq!(pop1.length, 100); + assert_eq!(pop2.length, 50); + } + + #[test] + fn test_pop_max_returns_largest() { + let registry = EmptyLinkRegistry::::default(); + + let small = Link { + page_id: 1.into(), + offset: 0, + length: 50, + }; + + let large = Link { page_id: 1.into(), offset: 100, + length: 200, + }; + + let medium = Link { + page_id: 1.into(), + offset: 300, + length: 100, + }; + + registry.push(small); + registry.push(large); + registry.push(medium); + + assert_eq!(registry.pop_max().unwrap().length, 300); // two links were united + assert_eq!(registry.pop_max().unwrap().length, 50); + } + + #[test] + fn test_iter_returns_all_links() { + let registry = EmptyLinkRegistry::::default(); + + let link1 = Link { + page_id: 1.into(), + offset: 0, + length: 100, + }; + + let link2 = Link { + page_id: 2.into(), + offset: 0, length: 150, }; let link3 = Link { - page_id: 2.into(), + page_id: 3.into(), offset: 0, length: 200, }; @@ -185,15 +398,41 @@ mod tests { registry.push(link2); registry.push(link3); - // After inserting link1 and link2, they should be united - let united_link = Link { + let links: Vec = registry.iter().collect(); + assert_eq!(links.len(), 3); + } + + #[test] + fn test_empty_registry() { + let registry = EmptyLinkRegistry::::default(); + + assert_eq!(registry.pop_max(), None); + assert_eq!(registry.iter().count(), 0); + } + + #[test] + fn test_sum_links_counter() { + let registry = EmptyLinkRegistry::::default(); + + let link1 = Link { page_id: 1.into(), offset: 0, - length: 250, + length: 100, }; - assert_eq!(registry.pop_max(), Some(united_link)); - assert_eq!(registry.pop_max(), Some(link3)); - assert_eq!(registry.pop_max(), None); + let link2 = Link { + page_id: 1.into(), + offset: 100, + length: 150, + }; + + registry.push(link1); + assert_eq!(registry.sum_links_len.load(Ordering::Acquire), 100); + + registry.push(link2); + assert_eq!(registry.sum_links_len.load(Ordering::Acquire), 250); + + registry.pop_max(); + assert_eq!(registry.sum_links_len.load(Ordering::Acquire), 0); } } diff --git a/src/in_memory/mod.rs b/src/in_memory/mod.rs index addfaae0..c032ea1e 100644 --- a/src/in_memory/mod.rs +++ b/src/in_memory/mod.rs @@ -4,5 +4,6 @@ mod pages; mod row; pub use data::{DATA_INNER_LENGTH, Data, ExecutionError as DataExecutionError}; +pub use empty_link_registry::EmptyLinkRegistry; pub use pages::{DataPages, ExecutionError as PagesExecutionError}; pub use row::{GhostWrapper, Query, RowWrapper, StorableRow}; diff --git a/src/in_memory/pages.rs b/src/in_memory/pages.rs index 94329795..081ce9d5 100644 --- a/src/in_memory/pages.rs +++ b/src/in_memory/pages.rs @@ -1,11 +1,6 @@ -use std::{ - fmt::Debug, - sync::atomic::{AtomicU32, AtomicU64, Ordering}, - sync::{Arc, RwLock}, -}; - use data_bucket::page::PageId; use derive_more::{Display, Error, From}; +use parking_lot::RwLock; #[cfg(feature = "perf_measurements")] use performance_measurement_codegen::performance_measurement; use rkyv::{ @@ -15,6 +10,12 @@ use rkyv::{ ser::{Serializer, allocator::ArenaHandle, sharing::Share}, util::AlignedVec, }; +use std::collections::VecDeque; +use std::{ + fmt::Debug, + sync::Arc, + sync::atomic::{AtomicU32, AtomicU64, Ordering}, +}; use crate::in_memory::empty_link_registry::EmptyLinkRegistry; use crate::{ @@ -39,6 +40,8 @@ where empty_links: EmptyLinkRegistry, + empty_pages: Arc>>, + /// Count of saved rows. row_count: AtomicU64, @@ -67,6 +70,7 @@ where // We are starting ID's from `1` because `0`'s page in file is info page. pages: RwLock::new(vec![Arc::new(Data::new(1.into()))]), empty_links: EmptyLinkRegistry::::default(), + empty_pages: Default::default(), row_count: AtomicU64::new(0), last_page_id: AtomicU32::new(1), current_page_id: AtomicU32::new(1), @@ -82,6 +86,7 @@ where Self { pages: RwLock::new(vec), empty_links: EmptyLinkRegistry::default(), + empty_pages: Default::default(), row_count: AtomicU64::new(0), last_page_id: AtomicU32::new(last_page_id as u32), current_page_id: AtomicU32::new(last_page_id as u32), @@ -102,28 +107,22 @@ where { let general_row = ::WrappedRow::from_inner(row); - //println!("Popping empty link"); if let Some(link) = self.empty_links.pop_max() { - //println!("Empty link len {}", self.empty_links.len()); - let pages = self.pages.read().unwrap(); + let pages = self.pages.read(); let current_page: usize = page_id_mapper(link.page_id.into()); let page = &pages[current_page]; match unsafe { page.try_save_row_by_link(&general_row, link) } { Ok((link, left_link)) => { if let Some(l) = left_link { - //println!("Pushing empty link"); self.empty_links.push(l); - //println!("Pushed empty link"); } return Ok(link); } // Ok(l) => return Ok(l), Err(e) => match e { DataExecutionError::InvalidLink => { - //println!("Pushing empty link"); self.empty_links.push(link); - //println!("Pushed empty link"); } DataExecutionError::PageIsFull { .. } | DataExecutionError::PageTooSmall { .. } @@ -135,7 +134,7 @@ where loop { let (link, tried_page) = { - let pages = self.pages.read().unwrap(); + let pages = self.pages.read(); let current_page = page_id_mapper(self.current_page_id.load(Ordering::Acquire) as usize); let page = &pages[current_page]; @@ -152,7 +151,15 @@ where if tried_page == page_id_mapper(self.current_page_id.load(Ordering::Relaxed) as usize) { - self.add_next_page(tried_page); + let mut g = self.empty_pages.write(); + if let Some(page_id) = g.pop_front() { + let _pages = self.pages.write(); + self.current_page_id + .store(page_id.into(), Ordering::Release); + } else { + drop(g); + self.add_next_page(tried_page); + } } } DataExecutionError::PageTooSmall { .. } @@ -184,12 +191,12 @@ where } fn add_next_page(&self, tried_page: usize) { - let mut pages = self.pages.write().expect("lock should be not poisoned"); + let mut pages = self.pages.write(); if tried_page == page_id_mapper(self.current_page_id.load(Ordering::Acquire) as usize) { let index = self.last_page_id.fetch_add(1, Ordering::AcqRel) + 1; pages.push(Arc::new(Data::new(index.into()))); - self.current_page_id.fetch_add(1, Ordering::AcqRel); + self.current_page_id.store(index, Ordering::Release); } } @@ -206,7 +213,7 @@ where <::WrappedRow as Archive>::Archived: Portable + Deserialize<::WrappedRow, HighDeserializer>, { - let pages = self.pages.read().unwrap(); + let pages = self.pages.read(); let page = pages // - 1 is used because page ids are starting from 1. .get(page_id_mapper(link.page_id.into())) @@ -224,7 +231,7 @@ where <::WrappedRow as Archive>::Archived: Portable + Deserialize<::WrappedRow, HighDeserializer>, { - let pages = self.pages.read().unwrap(); + let pages = self.pages.read(); let page = pages // - 1 is used because page ids are starting from 1. .get(page_id_mapper(link.page_id.into())) @@ -248,7 +255,7 @@ where >, Op: Fn(&<::WrappedRow as Archive>::Archived) -> Res, { - let pages = self.pages.read().unwrap(); + let pages = self.pages.read(); let page = pages .get::(page_id_mapper(link.page_id.into())) .ok_or(ExecutionError::PageNotFound(link.page_id))?; @@ -277,7 +284,7 @@ where <::WrappedRow as Archive>::Archived: Portable, Op: FnMut(&mut <::WrappedRow as Archive>::Archived) -> Res, { - let pages = self.pages.read().unwrap(); + let pages = self.pages.read(); let page = pages .get(page_id_mapper(link.page_id.into())) .ok_or(ExecutionError::PageNotFound(link.page_id))?; @@ -308,7 +315,7 @@ where Strategy, Share>, rkyv::rancor::Error>, >, { - let pages = self.pages.read().unwrap(); + let pages = self.pages.read(); let page = pages .get(page_id_mapper(link.page_id.into())) .ok_or(ExecutionError::PageNotFound(link.page_id))?; @@ -327,7 +334,7 @@ where } pub fn select_raw(&self, link: Link) -> Result, ExecutionError> { - let pages = self.pages.read().unwrap(); + let pages = self.pages.read(); let page = pages .get(page_id_mapper(link.page_id.into())) .ok_or(ExecutionError::PageNotFound(link.page_id))?; @@ -335,8 +342,27 @@ where .map_err(ExecutionError::DataPageError) } + pub fn mark_page_empty(&self, page_id: PageId) { + let mut g = self.empty_pages.write(); + g.push_back(page_id); + } + + pub fn get_empty_pages(&self) -> Vec { + let g = self.empty_pages.read(); + g.iter().map(|p| *p).collect() + } + + pub fn get_page( + &self, + page_id: PageId, + ) -> Option::WrappedRow, DATA_LENGTH>>> { + let pages = self.pages.read(); + let page = pages.get(page_id_mapper(page_id.into()))?; + Some(page.clone()) + } + pub fn get_bytes(&self) -> Vec<([u8; DATA_LENGTH], u32)> { - let pages = self.pages.read().unwrap(); + let pages = self.pages.read(); pages .iter() .map(|p| (p.get_bytes(), p.free_offset.load(Ordering::Relaxed))) @@ -344,13 +370,17 @@ where } pub fn get_page_count(&self) -> usize { - self.pages.read().unwrap().len() + self.pages.read().len() } pub fn get_empty_links(&self) -> Vec { self.empty_links.iter().collect() } + pub fn empty_links_registry(&self) -> &EmptyLinkRegistry { + &self.empty_links + } + pub fn with_empty_links(mut self, links: Vec) -> Self { let registry = EmptyLinkRegistry::default(); for l in links { @@ -376,11 +406,12 @@ pub enum ExecutionError { #[cfg(test)] mod tests { use std::collections::HashSet; + use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; - use std::sync::{Arc, RwLock}; use std::thread; use std::time::Instant; + use parking_lot::RwLock; use rkyv::with::{AtomicLoad, Relaxed}; use rkyv::{Archive, Deserialize, Serialize}; @@ -567,7 +598,7 @@ mod tests { for i in 0..1000 { let row = TestRow { a: i, b: j * i + 1 }; - let mut pages = pages_shared.write().unwrap(); + let mut pages = pages_shared.write(); pages.insert(row); } }); @@ -598,7 +629,7 @@ mod tests { for i in 0..1000 { let row = TestRow { a: i, b: j * i + 1 }; - let mut pages = pages_shared.write().unwrap(); + let mut pages = pages_shared.write(); pages.push(row); } }); diff --git a/src/index/mod.rs b/src/index/mod.rs index a164ecb8..caafbf95 100644 --- a/src/index/mod.rs +++ b/src/index/mod.rs @@ -1,5 +1,6 @@ mod available_index; mod multipair; +mod primary_index; mod table_index; mod table_secondary_index; mod unsized_node; @@ -8,7 +9,8 @@ pub use available_index::AvailableIndex; pub use indexset::concurrent::map::BTreeMap as IndexMap; pub use indexset::concurrent::multimap::BTreeMultiMap as IndexMultiMap; pub use multipair::MultiPairRecreate; -pub use table_index::{TableIndex, TableIndexCdc}; +pub use primary_index::PrimaryIndex; +pub use table_index::{TableIndex, TableIndexCdc, convert_change_events}; pub use table_secondary_index::{ IndexError, TableSecondaryIndex, TableSecondaryIndexCdc, TableSecondaryIndexEventsOps, TableSecondaryIndexInfo, diff --git a/src/index/multipair.rs b/src/index/multipair.rs index 670dd6b6..ae2aca20 100644 --- a/src/index/multipair.rs +++ b/src/index/multipair.rs @@ -1,13 +1,12 @@ -use data_bucket::Link; use indexset::core::multipair::MultiPair; use indexset::core::pair::Pair; -pub trait MultiPairRecreate { - fn with_last_discriminator(self, discriminator: u64) -> MultiPair; +pub trait MultiPairRecreate { + fn with_last_discriminator(self, discriminator: u64) -> MultiPair; } -impl MultiPairRecreate for Pair { - fn with_last_discriminator(self, discriminator: u64) -> MultiPair { +impl MultiPairRecreate for Pair { + fn with_last_discriminator(self, discriminator: u64) -> MultiPair { MultiPair { key: self.key, value: self.value, diff --git a/src/index/primary_index.rs b/src/index/primary_index.rs new file mode 100644 index 00000000..c49a4c03 --- /dev/null +++ b/src/index/primary_index.rs @@ -0,0 +1,463 @@ +//! Combined storage for primary and reverse indexes. +//! +//! [`PrimaryIndex`] keeps both the primary key index (PK → [`OffsetEqLink`]) +//! and the reverse index ([`OffsetEqLink`] → PK) in sync. + +use std::fmt::Debug; +use std::hash::Hash; + +use data_bucket::Link; +use indexset::cdc::change::ChangeEvent; +use indexset::core::node::NodeLike; +use indexset::core::pair::Pair; + +use crate::util::OffsetEqLink; +use crate::{IndexMap, TableIndex, TableIndexCdc, convert_change_events}; + +/// Combined storage for primary and reverse indexes. +/// +/// Maintains bidirectional mapping between primary keys and their data locations: +/// - **Forward index**: `PrimaryKey` → [`OffsetEqLink`] (primary lookups) +/// - **Reverse index**: [`OffsetEqLink`] → `PrimaryKey` (vacuum, position queries) +#[derive(Debug)] +pub struct PrimaryIndex< + PrimaryKey, + const DATA_LENGTH: usize, + PkNodeType = Vec>>, +> where + PrimaryKey: Clone + Ord + Send + 'static + std::hash::Hash, + PkNodeType: NodeLike>> + Send + 'static, +{ + pub pk_map: IndexMap, PkNodeType>, + pub reverse_pk_map: IndexMap, PrimaryKey>, +} + +impl Default + for PrimaryIndex +where + PrimaryKey: Clone + Ord + Send + 'static + std::hash::Hash, + PkNodeType: NodeLike>> + Send + 'static, +{ + fn default() -> Self { + Self { + pk_map: IndexMap::default(), + reverse_pk_map: IndexMap::default(), + } + } +} + +impl TableIndex + for PrimaryIndex +where + PrimaryKey: Debug + Eq + Hash + Clone + Send + Ord, + PkNodeType: NodeLike>> + Send + 'static, +{ + fn insert(&self, value: PrimaryKey, link: Link) -> Option { + let offset_link = OffsetEqLink(link); + let old = self.pk_map.insert(value.clone(), offset_link); + if let Some(old_link) = old { + // Update reverse index + self.reverse_pk_map.remove(&old_link); + } + self.reverse_pk_map.insert(offset_link, value); + old.map(|l| l.0) + } + + fn insert_checked(&self, value: PrimaryKey, link: Link) -> Option<()> { + let offset_link = OffsetEqLink(link); + self.pk_map.checked_insert(value.clone(), offset_link)?; + self.reverse_pk_map.checked_insert(offset_link, value)?; + Some(()) + } + + fn remove(&self, value: &PrimaryKey, _: Link) -> Option<(PrimaryKey, Link)> { + let (_, old_link) = self.pk_map.remove(value)?; + self.reverse_pk_map.remove(&old_link); + Some((value.clone(), old_link.0)) + } +} + +impl TableIndexCdc + for PrimaryIndex +where + PrimaryKey: Debug + Eq + Hash + Clone + Send + Ord, + PkNodeType: NodeLike>> + Send + 'static, +{ + fn insert_cdc( + &self, + value: PrimaryKey, + link: Link, + ) -> (Option, Vec>>) { + let offset_link = OffsetEqLink(link); + let (res, evs) = self.pk_map.insert_cdc(value.clone(), offset_link); + let res_link = res.map(|l| l.0); + if let Some(res) = res { + self.reverse_pk_map.remove(&res); + } + self.reverse_pk_map.insert(offset_link, value); + + (res_link, convert_change_events(evs)) + } + + fn insert_checked_cdc( + &self, + value: PrimaryKey, + link: Link, + ) -> Option>>> { + let offset_link = OffsetEqLink(link); + let res = self.pk_map.checked_insert_cdc(value.clone(), offset_link); + + if let Some(evs) = res { + self.reverse_pk_map.insert(offset_link, value); + Some(convert_change_events(evs)) + } else { + None + } + } + + fn remove_cdc( + &self, + value: PrimaryKey, + _: Link, + ) -> ( + Option<(PrimaryKey, Link)>, + Vec>>, + ) { + let (res, evs) = self.pk_map.remove_cdc(&value); + + if let Some((pk, old_link)) = res { + let offset_link = OffsetEqLink(old_link.0); + self.reverse_pk_map.remove(&offset_link); + (Some((pk, old_link.0)), convert_change_events(evs)) + } else { + (None, convert_change_events(evs)) + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use data_bucket::page::PageId; + + const TEST_DATA_LENGTH: usize = 4096; + + type TestPrimaryIndex = + PrimaryIndex>>>; + + #[test] + fn test_default_creates_empty_indexes() { + let index = TestPrimaryIndex::default(); + assert_eq!(index.pk_map.len(), 0); + assert_eq!(index.reverse_pk_map.len(), 0); + } + + #[test] + fn test_insert_creates_bidirectional_mapping() { + let index = TestPrimaryIndex::default(); + let link = Link { + page_id: PageId::from(1), + offset: 100, + length: 50, + }; + + index.insert(42, link); + + assert_eq!(index.pk_map.get(&42).map(|v| v.get().value.0), Some(link)); + assert_eq!( + index + .reverse_pk_map + .get(&OffsetEqLink(link)) + .map(|v| v.get().value), + Some(42) + ); + } + + #[test] + fn test_insert_returns_old_link_on_duplicate() { + let index = TestPrimaryIndex::default(); + let link1 = Link { + page_id: PageId::from(1), + offset: 100, + length: 50, + }; + let link2 = Link { + page_id: PageId::from(2), + offset: 200, + length: 50, + }; + + index.insert(42, link1); + let old = index.insert(42, link2); + + assert_eq!(old, Some(link1)); + assert_eq!(index.pk_map.get(&42).map(|v| v.get().value.0), Some(link2)); + assert_eq!( + index + .reverse_pk_map + .get(&OffsetEqLink(link2)) + .map(|v| v.get().value), + Some(42) + ); + assert!(index.reverse_pk_map.get(&OffsetEqLink(link1)).is_none()); + } + + #[test] + fn test_insert_checked_succeeds_on_new_key() { + let index = TestPrimaryIndex::default(); + let link = Link { + page_id: PageId::from(1), + offset: 100, + length: 50, + }; + + let result = index.insert_checked(42, link); + assert_eq!(result, Some(())); + + assert_eq!(index.pk_map.get(&42).map(|v| v.get().value.0), Some(link)); + assert_eq!( + index + .reverse_pk_map + .get(&OffsetEqLink(link)) + .map(|v| v.get().value), + Some(42) + ); + } + + #[test] + fn test_insert_checked_fails_on_duplicate() { + let index = TestPrimaryIndex::default(); + let link1 = Link { + page_id: PageId::from(1), + offset: 100, + length: 50, + }; + let link2 = Link { + page_id: PageId::from(2), + offset: 200, + length: 50, + }; + + index.insert_checked(42, link1).unwrap(); + let result = index.insert_checked(42, link2); + + assert_eq!(result, None); + assert_eq!(index.pk_map.get(&42).map(|v| v.get().value.0), Some(link1)); + } + + #[test] + fn test_removing_existing_key() { + let index = TestPrimaryIndex::default(); + let link = Link { + page_id: PageId::from(1), + offset: 100, + length: 50, + }; + + index.insert(42, link); + let removed = index.remove(&42, link); + + assert_eq!(removed, Some((42, link))); + assert!(index.pk_map.get(&42).is_none()); + assert!(index.reverse_pk_map.get(&OffsetEqLink(link)).is_none()); + } + + #[test] + fn test_removing_nonexistent_key_returns_none() { + let index = TestPrimaryIndex::default(); + let link = Link { + page_id: PageId::from(1), + offset: 100, + length: 50, + }; + + let removed = index.remove(&42, link); + assert_eq!(removed, None); + } + + #[test] + fn test_insert_cdc_new_key() { + let index = TestPrimaryIndex::default(); + let link = Link { + page_id: PageId::from(1), + offset: 100, + length: 50, + }; + + let (old_link, _events) = index.insert_cdc(42, link); + + assert_eq!(old_link, None); + assert_eq!(index.pk_map.get(&42).map(|v| v.get().value.0), Some(link)); + assert_eq!( + index + .reverse_pk_map + .get(&OffsetEqLink(link)) + .map(|v| v.get().value), + Some(42) + ); + } + + #[test] + fn test_insert_cdc_existing_key() { + let index = TestPrimaryIndex::default(); + let link1 = Link { + page_id: PageId::from(1), + offset: 100, + length: 50, + }; + let link2 = Link { + page_id: PageId::from(2), + offset: 200, + length: 50, + }; + + index.insert_cdc(42, link1); + let (old_link, _events) = index.insert_cdc(42, link2); + + assert_eq!(old_link, Some(link1)); + assert_eq!(index.pk_map.get(&42).map(|v| v.get().value.0), Some(link2)); + assert!(index.reverse_pk_map.get(&OffsetEqLink(link1)).is_none()); + assert_eq!( + index + .reverse_pk_map + .get(&OffsetEqLink(link2)) + .map(|v| v.get().value), + Some(42) + ); + } + + #[test] + fn test_insert_checked_cdc_new_key() { + let index = TestPrimaryIndex::default(); + let link = Link { + page_id: PageId::from(1), + offset: 100, + length: 50, + }; + + let events = index.insert_checked_cdc(42, link); + + assert!(events.is_some()); + assert_eq!(index.pk_map.get(&42).map(|v| v.get().value.0), Some(link)); + } + + #[test] + fn test_insert_checked_cdc_existing_key() { + let index = TestPrimaryIndex::default(); + let link1 = Link { + page_id: PageId::from(1), + offset: 100, + length: 50, + }; + let link2 = Link { + page_id: PageId::from(2), + offset: 200, + length: 50, + }; + + index.insert_checked_cdc(42, link1).unwrap(); + let events = index.insert_checked_cdc(42, link2); + + assert!(events.is_none()); + assert_eq!(index.pk_map.get(&42).map(|v| v.get().value.0), Some(link1)); + } + + #[test] + fn test_remove_cdc_existing_key() { + let index = TestPrimaryIndex::default(); + let link = Link { + page_id: PageId::from(1), + offset: 100, + length: 50, + }; + + index.insert_cdc(42, link); + let (removed, _events) = index.remove_cdc(42, link); + + assert_eq!(removed, Some((42, link))); + assert!(index.pk_map.get(&42).is_none()); + assert!(index.reverse_pk_map.get(&OffsetEqLink(link)).is_none()); + } + + #[test] + fn test_remove_cdc_nonexistent_key() { + let index = TestPrimaryIndex::default(); + let link = Link { + page_id: PageId::from(1), + offset: 100, + length: 50, + }; + + let (removed, _events) = index.remove_cdc(42, link); + + assert_eq!(removed, None); + } + + #[test] + fn test_multiple_keys_maintain_separate_mappings() { + let index = TestPrimaryIndex::default(); + let link1 = Link { + page_id: PageId::from(1), + offset: 100, + length: 50, + }; + let link2 = Link { + page_id: PageId::from(2), + offset: 200, + length: 50, + }; + let link3 = Link { + page_id: PageId::from(3), + offset: 300, + length: 50, + }; + + index.insert(1, link1); + index.insert(2, link2); + index.insert(3, link3); + + assert_eq!(index.pk_map.get(&1).map(|v| v.get().value.0), Some(link1)); + assert_eq!(index.pk_map.get(&2).map(|v| v.get().value.0), Some(link2)); + assert_eq!(index.pk_map.get(&3).map(|v| v.get().value.0), Some(link3)); + + assert_eq!( + index + .reverse_pk_map + .get(&OffsetEqLink(link1)) + .map(|v| v.get().value), + Some(1) + ); + assert_eq!( + index + .reverse_pk_map + .get(&OffsetEqLink(link2)) + .map(|v| v.get().value), + Some(2) + ); + assert_eq!( + index + .reverse_pk_map + .get(&OffsetEqLink(link3)) + .map(|v| v.get().value), + Some(3) + ); + } + + #[test] + fn test_reverse_lookup_by_link() { + let index = TestPrimaryIndex::default(); + let link = Link { + page_id: PageId::from(1), + offset: 100, + length: 50, + }; + + index.insert(42, link); + + let pk = index + .reverse_pk_map + .get(&OffsetEqLink(link)) + .map(|v| v.get().value); + assert_eq!(pk, Some(42)); + } +} diff --git a/src/index/table_index/cdc.rs b/src/index/table_index/cdc.rs index eef930a2..be7241ff 100644 --- a/src/index/table_index/cdc.rs +++ b/src/index/table_index/cdc.rs @@ -7,6 +7,8 @@ use indexset::core::multipair::MultiPair; use indexset::core::node::NodeLike; use indexset::core::pair::Pair; +use crate::index::table_index::util::convert_change_events; +use crate::util::OffsetEqLink; use crate::{IndexMap, IndexMultiMap}; pub trait TableIndexCdc { @@ -20,23 +22,25 @@ pub trait TableIndexCdc { ) -> (Option<(T, Link)>, Vec>>); } -impl TableIndexCdc for IndexMultiMap +impl TableIndexCdc for IndexMultiMap, Node> where T: Debug + Eq + Hash + Clone + Send + Ord, - Node: NodeLike> + Send + 'static, + Node: NodeLike>> + Send + 'static, { fn insert_cdc(&self, value: T, link: Link) -> (Option, Vec>>) { - let (res, evs) = self.insert_cdc(value, link); - (res, evs.into_iter().map(Into::into).collect()) + let (res, evs) = self.insert_cdc(value, OffsetEqLink(link)); + let pair_evs = evs.into_iter().map(Into::into).collect(); + let res_link = res.map(|l| l.0); + (res_link, convert_change_events(pair_evs)) } - // TODO: refactor this to be more straightforward fn insert_checked_cdc(&self, value: T, link: Link) -> Option>>> { - let (res, evs) = self.insert_cdc(value, link); + let (res, evs) = self.insert_cdc(value, OffsetEqLink(link)); + let pair_evs = evs.into_iter().map(Into::into).collect(); if res.is_some() { None } else { - Some(evs.into_iter().map(Into::into).collect()) + Some(convert_change_events(pair_evs)) } } @@ -45,22 +49,27 @@ where value: T, link: Link, ) -> (Option<(T, Link)>, Vec>>) { - let (res, evs) = self.remove_cdc(&value, &link); - (res, evs.into_iter().map(Into::into).collect()) + let (res, evs) = self.remove_cdc(&value, &OffsetEqLink(link)); + let pair_evs = evs.into_iter().map(Into::into).collect(); + let res_pair = res.map(|(k, v)| (k, v.into())); + (res_pair, convert_change_events(pair_evs)) } } -impl TableIndexCdc for IndexMap +impl TableIndexCdc for IndexMap, Node> where T: Debug + Eq + Hash + Clone + Send + Ord, - Node: NodeLike> + Send + 'static, + Node: NodeLike>> + Send + 'static, { fn insert_cdc(&self, value: T, link: Link) -> (Option, Vec>>) { - self.insert_cdc(value, link) + let (res, evs) = self.insert_cdc(value, OffsetEqLink(link)); + let res_link = res.map(|l| l.0); + (res_link, convert_change_events(evs)) } fn insert_checked_cdc(&self, value: T, link: Link) -> Option>>> { - self.checked_insert_cdc(value, link) + let res = self.checked_insert_cdc(value, OffsetEqLink(link)); + res.map(|evs| convert_change_events(evs)) } fn remove_cdc( @@ -68,6 +77,8 @@ where value: T, _: Link, ) -> (Option<(T, Link)>, Vec>>) { - self.remove_cdc(&value) + let (res, evs) = self.remove_cdc(&value); + let res_pair = res.map(|(k, v)| (k, v.0)); + (res_pair, convert_change_events(evs)) } } diff --git a/src/index/table_index/mod.rs b/src/index/table_index/mod.rs index 6272cbb8..88cf043e 100644 --- a/src/index/table_index/mod.rs +++ b/src/index/table_index/mod.rs @@ -6,54 +6,58 @@ use indexset::core::multipair::MultiPair; use indexset::core::node::NodeLike; use indexset::core::pair::Pair; +use crate::util::OffsetEqLink; use crate::{IndexMap, IndexMultiMap}; mod cdc; +pub mod util; pub use cdc::TableIndexCdc; +pub use util::convert_change_events; pub trait TableIndex { fn insert(&self, value: T, link: Link) -> Option; fn insert_checked(&self, value: T, link: Link) -> Option<()>; - fn remove(&self, value: T, link: Link) -> Option<(T, Link)>; + fn remove(&self, value: &T, link: Link) -> Option<(T, Link)>; } -impl TableIndex for IndexMultiMap +impl TableIndex for IndexMultiMap where T: Debug + Eq + Hash + Clone + Send + Ord, - Node: NodeLike> + Send + 'static, + Node: NodeLike> + Send + 'static, { fn insert(&self, value: T, link: Link) -> Option { - self.insert(value, link) + self.insert(value, OffsetEqLink(link)).map(|l| l.0) } fn insert_checked(&self, value: T, link: Link) -> Option<()> { - if self.insert(value, link).is_some() { + if self.insert(value, OffsetEqLink(link)).is_some() { None } else { Some(()) } } - fn remove(&self, value: T, link: Link) -> Option<(T, Link)> { - self.remove(&value, &link) + fn remove(&self, value: &T, link: Link) -> Option<(T, Link)> { + self.remove(value, &OffsetEqLink(link)) + .map(|(v, l)| (v, l.0)) } } -impl TableIndex for IndexMap +impl TableIndex for IndexMap where T: Debug + Eq + Hash + Clone + Send + Ord, - Node: NodeLike> + Send + 'static, + Node: NodeLike> + Send + 'static, { fn insert(&self, value: T, link: Link) -> Option { - self.insert(value, link) + self.insert(value, OffsetEqLink(link)).map(|l| l.0) } fn insert_checked(&self, value: T, link: Link) -> Option<()> { - self.checked_insert(value, link) + self.checked_insert(value, OffsetEqLink(link)) } - fn remove(&self, value: T, _: Link) -> Option<(T, Link)> { - self.remove(&value) + fn remove(&self, value: &T, _: Link) -> Option<(T, Link)> { + self.remove(value).map(|(v, l)| (v, l.0)) } } diff --git a/src/index/table_index/util.rs b/src/index/table_index/util.rs new file mode 100644 index 00000000..5f308a13 --- /dev/null +++ b/src/index/table_index/util.rs @@ -0,0 +1,85 @@ +use indexset::cdc::change::ChangeEvent; +use indexset::core::pair::Pair; + +pub fn convert_change_event(ev: ChangeEvent>) -> ChangeEvent> +where + L1: Into, +{ + match ev { + ChangeEvent::InsertAt { + event_id, + max_value, + value, + index, + } => ChangeEvent::InsertAt { + event_id, + max_value: Pair { + key: max_value.key, + value: max_value.value.into(), + }, + value: Pair { + key: value.key, + value: value.value.into(), + }, + index, + }, + ChangeEvent::RemoveAt { + event_id, + max_value, + value, + index, + } => ChangeEvent::RemoveAt { + event_id, + max_value: Pair { + key: max_value.key, + value: max_value.value.into(), + }, + value: Pair { + key: value.key, + value: value.value.into(), + }, + index, + }, + ChangeEvent::CreateNode { + event_id, + max_value, + } => ChangeEvent::CreateNode { + event_id, + max_value: Pair { + key: max_value.key, + value: max_value.value.into(), + }, + }, + ChangeEvent::RemoveNode { + event_id, + max_value, + } => ChangeEvent::RemoveNode { + event_id, + max_value: Pair { + key: max_value.key, + value: max_value.value.into(), + }, + }, + ChangeEvent::SplitNode { + event_id, + max_value, + split_index, + } => ChangeEvent::SplitNode { + event_id, + max_value: Pair { + key: max_value.key, + value: max_value.value.into(), + }, + split_index, + }, + } +} + +pub fn convert_change_events( + evs: Vec>>, +) -> Vec>> +where + L1: Into, +{ + evs.into_iter().map(convert_change_event).collect() +} diff --git a/src/index/table_secondary_index/cdc.rs b/src/index/table_secondary_index/cdc.rs index 8bea76b1..88f7e7c5 100644 --- a/src/index/table_secondary_index/cdc.rs +++ b/src/index/table_secondary_index/cdc.rs @@ -2,7 +2,7 @@ use std::collections::HashMap; use data_bucket::Link; -use crate::{Difference, IndexError}; +use crate::{Difference, IndexError, TableSecondaryIndex}; pub trait TableSecondaryIndexCdc { fn save_row_cdc( @@ -33,3 +33,43 @@ pub trait TableSecondaryIndexCdc>, ) -> Result>; } + +impl + TableSecondaryIndexCdc for T +where + T: TableSecondaryIndex, +{ + fn save_row_cdc(&self, row: Row, link: Link) -> Result<(), IndexError> { + self.save_row(row, link) + } + + fn reinsert_row_cdc( + &self, + row_old: Row, + link_old: Link, + row_new: Row, + link_new: Link, + ) -> Result<(), IndexError> { + self.reinsert_row(row_old, link_old, row_new, link_new) + } + + fn delete_row_cdc(&self, row: Row, link: Link) -> Result<(), IndexError> { + self.delete_row(row, link) + } + + fn process_difference_insert_cdc( + &self, + link: Link, + differences: HashMap<&str, Difference>, + ) -> Result<(), IndexError> { + self.process_difference_insert(link, differences) + } + + fn process_difference_remove_cdc( + &self, + link: Link, + differences: HashMap<&str, Difference>, + ) -> Result<(), IndexError> { + self.process_difference_remove(link, differences) + } +} diff --git a/src/lib.rs b/src/lib.rs index d6699cb3..fb2b300d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -35,12 +35,12 @@ pub mod prelude { pub use crate::primary_key::{PrimaryKeyGenerator, PrimaryKeyGeneratorState, TablePrimaryKey}; pub use crate::table::select::{Order, QueryParams, SelectQueryBuilder, SelectQueryExecutor}; pub use crate::table::system_info::{IndexInfo, IndexKind, SystemInfo}; - pub use crate::util::{OrderedF32Def, OrderedF64Def}; + pub use crate::util::{OffsetEqLink, OrderedF32Def, OrderedF64Def}; pub use crate::{ AvailableIndex, Difference, IndexError, IndexMap, IndexMultiMap, MultiPairRecreate, - TableIndex, TableIndexCdc, TableRow, TableSecondaryIndex, TableSecondaryIndexCdc, - TableSecondaryIndexEventsOps, TableSecondaryIndexInfo, UnsizedNode, WorkTable, - WorkTableError, + PrimaryIndex, TableIndex, TableIndexCdc, TableRow, TableSecondaryIndex, + TableSecondaryIndexCdc, TableSecondaryIndexEventsOps, TableSecondaryIndexInfo, UnsizedNode, + WorkTable, WorkTableError, }; pub use data_bucket::{ DATA_VERSION, DataPage, GENERAL_HEADER_SIZE, GeneralHeader, GeneralPage, INNER_PAGE_SIZE, @@ -62,7 +62,3 @@ pub mod prelude { pub const WT_INDEX_EXTENSION: &str = ".wt.idx"; pub const WT_DATA_EXTENSION: &str = ".wt.data"; } - -// TODO: -// 1. add checked inserts to indexset to not insert/remove but just insert with violation error -// 2. Add pre-update state storage to avoid ghost reads of updated data if it will be rolled back diff --git a/src/lock/mod.rs b/src/lock/mod.rs index 4c5a230c..959b7a62 100644 --- a/src/lock/mod.rs +++ b/src/lock/mod.rs @@ -12,7 +12,7 @@ use futures::task::AtomicWaker; use parking_lot::Mutex; pub use map::LockMap; -pub use row_lock::RowLock; +pub use row_lock::{FullRowLock, RowLock}; #[derive(Debug)] pub struct Lock { diff --git a/src/lock/row_lock.rs b/src/lock/row_lock.rs index 5720c3b7..73356850 100644 --- a/src/lock/row_lock.rs +++ b/src/lock/row_lock.rs @@ -18,3 +18,49 @@ pub trait RowLock { where Self: Sized; } + +/// Full row lock represented by a single lock. +/// Unlike generated per-column lock types, this uses one lock for the entire +/// row. +#[derive(Debug)] +pub struct FullRowLock { + l: Arc, +} + +impl FullRowLock { + pub fn unlock(&self) { + self.l.unlock(); + } +} + +impl RowLock for FullRowLock { + fn is_locked(&self) -> bool { + self.l.is_locked() + } + + fn with_lock(id: u16) -> (Self, Arc) + where + Self: Sized, + { + let l = Arc::new(Lock::new(id)); + (FullRowLock { l: l.clone() }, l) + } + + fn lock(&mut self, id: u16) -> (HashSet>, Arc) { + let mut set = HashSet::new(); + let l = Arc::new(Lock::new(id)); + set.insert(self.l.clone()); + self.l = l.clone(); + + (set, l) + } + + fn merge(&mut self, other: &mut Self) -> HashSet> + where + Self: Sized, + { + let set = HashSet::from_iter([self.l.clone()]); + self.l = other.l.clone(); + set + } +} diff --git a/src/mem_stat/mod.rs b/src/mem_stat/mod.rs index 6a4c7572..d6d1bbac 100644 --- a/src/mem_stat/mod.rs +++ b/src/mem_stat/mod.rs @@ -16,6 +16,7 @@ use uuid::Uuid; use crate::IndexMultiMap; use crate::persistence::OperationType; use crate::prelude::OperationId; +use crate::util::OffsetEqLink; use crate::{IndexMap, impl_memstat_zero}; pub trait MemStat { @@ -179,3 +180,14 @@ where } impl_memstat_zero!(Link, PageId, Uuid, OperationId, OperationType); + +// OffsetEqLink has zero heap size (just wraps Link) +impl MemStat for OffsetEqLink { + fn heap_size(&self) -> usize { + 0 + } + + fn used_size(&self) -> usize { + 0 + } +} diff --git a/src/table/mod.rs b/src/table/mod.rs index 19f46685..82731bd9 100644 --- a/src/table/mod.rs +++ b/src/table/mod.rs @@ -1,19 +1,18 @@ pub mod select; pub mod system_info; - -use std::fmt::Debug; -use std::marker::PhantomData; +pub mod vacuum; use crate::in_memory::{DataPages, GhostWrapper, RowWrapper, StorableRow}; use crate::lock::LockMap; use crate::persistence::{InsertOperation, Operation}; use crate::prelude::{OperationId, PrimaryKeyGeneratorState}; use crate::primary_key::{PrimaryKeyGenerator, TablePrimaryKey}; +use crate::util::OffsetEqLink; use crate::{ - AvailableIndex, IndexError, IndexMap, TableRow, TableSecondaryIndex, TableSecondaryIndexCdc, - in_memory, + AvailableIndex, IndexError, IndexMap, PrimaryIndex, TableIndex, TableIndexCdc, TableRow, + TableSecondaryIndex, TableSecondaryIndexCdc, convert_change_events, in_memory, }; -use data_bucket::{INNER_PAGE_SIZE, Link}; +use data_bucket::INNER_PAGE_SIZE; use derive_more::{Display, Error, From}; use indexset::core::node::NodeLike; use indexset::core::pair::Pair; @@ -26,6 +25,9 @@ use rkyv::ser::allocator::ArenaHandle; use rkyv::ser::sharing::Share; use rkyv::util::AlignedVec; use rkyv::{Archive, Deserialize, Serialize}; +use std::fmt::Debug; +use std::marker::PhantomData; +use std::sync::Arc; use uuid::Uuid; #[derive(Debug)] @@ -37,18 +39,18 @@ pub struct WorkTable< SecondaryIndexes = (), LockType = (), PkGen = ::Generator, - PkNodeType = Vec>, const DATA_LENGTH: usize = INNER_PAGE_SIZE, + PkNodeType = Vec>>, > where PrimaryKey: Clone + Ord + Send + 'static + std::hash::Hash, Row: StorableRow + Send + Clone + 'static, - PkNodeType: NodeLike> + Send + 'static, + PkNodeType: NodeLike>> + Send + 'static, { - pub data: DataPages, + pub data: Arc>, - pub pk_map: IndexMap, + pub primary_index: Arc>, - pub indexes: SecondaryIndexes, + pub indexes: Arc, pub pk_gen: PkGen, @@ -70,8 +72,8 @@ impl< SecondaryIndexes, LockType, PkGen, - PkNodeType, const DATA_LENGTH: usize, + PkNodeType, > Default for WorkTable< Row, @@ -81,22 +83,22 @@ impl< SecondaryIndexes, LockType, PkGen, - PkNodeType, DATA_LENGTH, + PkNodeType, > where PrimaryKey: Debug + Clone + Ord + Send + TablePrimaryKey + std::hash::Hash, SecondaryIndexes: Default, PkGen: Default, - PkNodeType: NodeLike> + Send + 'static, + PkNodeType: NodeLike>> + Send + 'static, Row: StorableRow + Send + Clone + 'static, ::WrappedRow: RowWrapper, { fn default() -> Self { Self { - data: DataPages::new(), - pk_map: IndexMap::default(), - indexes: SecondaryIndexes::default(), + data: Arc::new(DataPages::new()), + primary_index: Arc::new(PrimaryIndex::default()), + indexes: Arc::new(SecondaryIndexes::default()), pk_gen: Default::default(), lock_map: LockMap::default(), update_state: IndexMap::default(), @@ -114,8 +116,8 @@ impl< SecondaryIndexes, LockType, PkGen, - PkNodeType, const DATA_LENGTH: usize, + PkNodeType, > WorkTable< Row, @@ -125,13 +127,13 @@ impl< SecondaryIndexes, LockType, PkGen, - PkNodeType, DATA_LENGTH, + PkNodeType, > where Row: TableRow, PrimaryKey: Debug + Clone + Ord + Send + TablePrimaryKey + std::hash::Hash, - PkNodeType: NodeLike> + Send + 'static, + PkNodeType: NodeLike>> + Send + 'static, Row: StorableRow + Send + Clone + 'static, ::WrappedRow: RowWrapper, { @@ -157,7 +159,11 @@ where <::WrappedRow as Archive>::Archived: Deserialize<::WrappedRow, HighDeserializer>, { - let link = self.pk_map.get(&pk).map(|v| v.get().value); + let link = self + .primary_index + .pk_map + .get(&pk) + .map(|v| v.get().value.into()); if let Some(link) = link { self.data.select(link).ok() } else { @@ -192,7 +198,11 @@ where .data .insert(row.clone()) .map_err(WorkTableError::PagesError)?; - if self.pk_map.checked_insert(pk.clone(), link).is_none() { + if self + .primary_index + .insert_checked(pk.clone(), link) + .is_none() + { self.data.delete(link).map_err(WorkTableError::PagesError)?; return Err(WorkTableError::AlreadyExists("Primary".to_string())); }; @@ -203,7 +213,7 @@ where inserted_already, } => { self.data.delete(link).map_err(WorkTableError::PagesError)?; - self.pk_map.remove(&pk); + self.primary_index.remove(&pk, link); self.indexes .delete_from_indexes(row, link, inserted_already)?; @@ -254,11 +264,12 @@ where .data .insert_cdc(row.clone()) .map_err(WorkTableError::PagesError)?; - let primary_key_events = self.pk_map.checked_insert_cdc(pk.clone(), link); - if primary_key_events.is_none() { + let primary_key_events = self.primary_index.insert_checked_cdc(pk.clone(), link); + let Some(primary_key_events) = primary_key_events else { self.data.delete(link).map_err(WorkTableError::PagesError)?; return Err(WorkTableError::AlreadyExists("Primary".to_string())); - } + }; + let primary_key_events = convert_change_events(primary_key_events); let indexes_res = self.indexes.save_row_cdc(row.clone(), link); if let Err(e) = indexes_res { return match e { @@ -267,7 +278,7 @@ where inserted_already, } => { self.data.delete(link).map_err(WorkTableError::PagesError)?; - self.pk_map.remove(&pk); + self.primary_index.remove(&pk, link); self.indexes .delete_from_indexes(row, link, inserted_already)?; @@ -289,7 +300,7 @@ where let op = Operation::Insert(InsertOperation { id: OperationId::Single(Uuid::now_v7()), pk_gen_state: self.pk_gen.get_state(), - primary_key_events: primary_key_events.expect("should be checked before for existence"), + primary_key_events, secondary_keys_events: indexes_res.expect("was checked before"), bytes, link, @@ -306,6 +317,8 @@ where /// part is for new row. Goal is to make `PrimaryKey` of the row always /// acceptable. As for reinsert `PrimaryKey` will be same for both old and /// new [`Link`]'s, goal will be achieved. + /// + /// [`Link`]: data_bucket::Link pub fn reinsert(&self, row_old: Row, row_new: Row) -> Result where Row: Archive @@ -329,9 +342,10 @@ where return Err(WorkTableError::PrimaryUpdateTry); } let old_link = self + .primary_index .pk_map .get(&pk) - .map(|v| v.get().value) + .map(|v| v.get().value.into()) .ok_or(WorkTableError::NotFound)?; let new_link = self .data @@ -342,7 +356,7 @@ where .with_mut_ref(new_link, |r| r.unghost()) .map_err(WorkTableError::PagesError)? } - self.pk_map.insert(pk.clone(), new_link); + self.primary_index.insert(pk.clone(), new_link); let indexes_res = self .indexes @@ -353,7 +367,7 @@ where at, inserted_already, } => { - self.pk_map.insert(pk.clone(), old_link); + self.primary_index.insert(pk.clone(), old_link); self.indexes .delete_from_indexes(row_new, new_link, inserted_already)?; self.data @@ -405,9 +419,10 @@ where return Err(WorkTableError::PrimaryUpdateTry); } let old_link = self + .primary_index .pk_map .get(&pk) - .map(|v| v.get().value) + .map(|v| v.get().value.into()) .ok_or(WorkTableError::NotFound)?; let (new_link, _) = self .data @@ -418,7 +433,8 @@ where .with_mut_ref(new_link, |r| r.unghost()) .map_err(WorkTableError::PagesError)? } - let (_, primary_key_events) = self.pk_map.insert_cdc(pk.clone(), new_link); + let (_, primary_key_events) = self.primary_index.insert_cdc(pk.clone(), new_link); + let primary_key_events = convert_change_events(primary_key_events); let indexes_res = self.indexes .reinsert_row_cdc(row_old, old_link, row_new.clone(), new_link); @@ -428,7 +444,7 @@ where at, inserted_already, } => { - self.pk_map.insert(pk.clone(), old_link); + self.primary_index.insert(pk.clone(), old_link); self.indexes .delete_from_indexes(row_new, new_link, inserted_already)?; self.data diff --git a/src/table/system_info.rs b/src/table/system_info.rs index 6575195b..a87d5621 100644 --- a/src/table/system_info.rs +++ b/src/table/system_info.rs @@ -1,4 +1,3 @@ -use data_bucket::Link; use indexset::core::node::NodeLike; use indexset::core::pair::Pair; use prettytable::{Table, format::consts::FORMAT_NO_BORDER_LINE_SEPARATOR, row}; @@ -6,6 +5,7 @@ use std::fmt::{self, Debug, Display, Formatter}; use crate::in_memory::{RowWrapper, StorableRow}; use crate::mem_stat::MemStat; +use crate::util::OffsetEqLink; use crate::{TableSecondaryIndexInfo, WorkTable}; #[derive(Debug)] @@ -53,8 +53,8 @@ impl< SecondaryIndexes, LockType, PkGen, - NodeType, const DATA_LENGTH: usize, + NodeType, > WorkTable< Row, @@ -64,19 +64,19 @@ impl< SecondaryIndexes, LockType, PkGen, - NodeType, DATA_LENGTH, + NodeType, > where PrimaryKey: Debug + Clone + Ord + Send + 'static + std::hash::Hash, Row: StorableRow + Send + Clone + 'static, ::WrappedRow: RowWrapper, - NodeType: NodeLike> + Send + 'static, + NodeType: NodeLike>> + Send + 'static, SecondaryIndexes: MemStat + TableSecondaryIndexInfo, { pub fn system_info(&self) -> SystemInfo { let page_count = self.data.get_page_count(); - let row_count = self.pk_map.len(); + let row_count = self.primary_index.pk_map.len(); let empty_links = self.data.get_empty_links().len(); diff --git a/src/table/vacuum/fragmentation_info.rs b/src/table/vacuum/fragmentation_info.rs new file mode 100644 index 00000000..f52106a2 --- /dev/null +++ b/src/table/vacuum/fragmentation_info.rs @@ -0,0 +1,57 @@ +use std::collections::HashMap; + +use data_bucket::Link; +use data_bucket::page::PageId; + +use crate::in_memory::EmptyLinkRegistry; + +/// Fragmentation info for a single data [`Page`]. +/// +/// [`Page`]: crate::in_memory::Data +#[derive(Debug, Copy, Clone)] +pub struct PageFragmentationInfo { + pub page_id: PageId, + pub empty_bytes: u32, + /// Ratio of filled bytes to empty bytes. Higher means more utilized. + pub filled_empty_ratio: f64, +} + +impl EmptyLinkRegistry { + pub fn get_page_empty_links(&self, page_id: PageId) -> Vec { + self.page_links_map + .get(&page_id) + .map(|(_, link)| *link) + .collect() + } + + pub fn get_per_page_info(&self) -> Vec> { + let mut page_empty_bytes: HashMap = HashMap::new(); + + for (page_id, link) in self.page_links_map.iter() { + let entry = page_empty_bytes.entry(*page_id).or_insert(0); + *entry += link.length; + } + + let mut per_page_data: Vec> = page_empty_bytes + .into_iter() + .map(|(page_id, empty_bytes)| { + let filled_empty_ratio = if empty_bytes > 0 { + let filled_bytes = DATA_LENGTH.saturating_sub(empty_bytes as usize); + filled_bytes as f64 / empty_bytes as f64 + } else { + 0.0 + }; + + PageFragmentationInfo { + page_id, + empty_bytes, + filled_empty_ratio, + } + }) + .collect(); + + per_page_data.sort_by_key(|info| info.page_id); + + per_page_data + } +} diff --git a/src/table/vacuum/lock.rs b/src/table/vacuum/lock.rs new file mode 100644 index 00000000..0f8dd74b --- /dev/null +++ b/src/table/vacuum/lock.rs @@ -0,0 +1,196 @@ +use std::sync::Arc; + +use data_bucket::Link; +use data_bucket::page::PageId; + +use crate::lock::{FullRowLock, LockMap, RowLock}; + +/// Lock manager for vacuum operations. +/// Supports locking at both page and link granularity. +#[derive(Debug, Default)] +pub struct VacuumLock { + per_link_lock: Arc>, + per_page_lock: Arc>, +} + +impl VacuumLock { + /// Locks a page, returning the [`FullRowLock`]. + pub fn lock_page(&self, page_id: PageId) -> Arc> { + if let Some(lock) = self.per_page_lock.get(&page_id) { + return lock; + } + + let (row_lock, _) = FullRowLock::with_lock(self.per_page_lock.next_id()); + let lock = Arc::new(tokio::sync::RwLock::new(row_lock)); + self.per_page_lock.insert(page_id, lock.clone()); + lock + } + + /// Locks a [`Link`], returning the [`FullRowLock`]. + pub fn lock_link(&self, link: Link) -> Arc> { + if let Some(lock) = self.per_link_lock.get(&link) { + return lock; + } + + let (row_lock, _lock) = FullRowLock::with_lock(self.per_link_lock.next_id()); + let lock = Arc::new(tokio::sync::RwLock::new(row_lock)); + self.per_link_lock.insert(link, lock.clone()); + lock + } + + /// Checks if a [`Link`] is locked. + /// [`Link`] is locked if it was locked OR its page is locked. + pub fn is_link_locked(&self, link: &Link) -> bool { + if let Some(page_lock) = self.per_page_lock.get(&link.page_id) { + match page_lock.try_read() { + Ok(guard) => { + if guard.is_locked() { + return true; + } + } + Err(_) => return true, // write lock held + } + } + + if let Some(link_lock) = self.per_link_lock.get(link) { + match link_lock.try_read() { + Ok(guard) => { + if guard.is_locked() { + return true; + } + } + Err(_) => return true, // write lock held + } + } + + false + } + + /// Checks if a page is locked. + pub fn is_page_locked(&self, page_id: &PageId) -> bool { + if let Some(page_lock) = self.per_page_lock.get(page_id) { + match page_lock.try_read() { + Ok(guard) => guard.is_locked(), + Err(_) => true, // write lock held + } + } else { + false + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_is_page_locked_not_locked() { + let vacuum_lock = VacuumLock::default(); + let page_id = PageId::from(1); + + assert!(!vacuum_lock.is_page_locked(&page_id)); + } + + #[tokio::test] + async fn test_is_page_locked_after_lock() { + let vacuum_lock = VacuumLock::default(); + let page_id = PageId::from(1); + + let _lock = vacuum_lock.lock_page(page_id); + + assert!(vacuum_lock.is_page_locked(&page_id)); + } + + #[tokio::test] + async fn test_is_page_locked_with_write_lock() { + let vacuum_lock = VacuumLock::default(); + let page_id = PageId::from(1); + + let lock = vacuum_lock.lock_page(page_id); + let _write_guard = lock.write().await; + + assert!(vacuum_lock.is_page_locked(&page_id)); + } + + #[test] + fn test_is_link_locked_not_locked() { + let vacuum_lock = VacuumLock::default(); + let link = Link { + page_id: PageId::from(1), + offset: 0, + length: 100, + }; + + assert!(!vacuum_lock.is_link_locked(&link)); + } + + #[tokio::test] + async fn test_is_link_locked_by_link() { + let vacuum_lock = VacuumLock::default(); + let link = Link { + page_id: PageId::from(1), + offset: 0, + length: 100, + }; + + let _lock = vacuum_lock.lock_link(link); + + assert!(vacuum_lock.is_link_locked(&link)); + } + + #[tokio::test] + async fn test_is_link_locked_by_page() { + let vacuum_lock = VacuumLock::default(); + let link = Link { + page_id: PageId::from(1), + offset: 0, + length: 100, + }; + + let _lock = vacuum_lock.lock_page(link.page_id); + + assert!(vacuum_lock.is_link_locked(&link)); + } + + #[tokio::test] + async fn test_is_link_locked_with_link_write_lock() { + let vacuum_lock = VacuumLock::default(); + let link = Link { + page_id: PageId::from(1), + offset: 0, + length: 100, + }; + + let lock = vacuum_lock.lock_link(link); + let _write_guard = lock.write().await; + + assert!(vacuum_lock.is_link_locked(&link)); + } + + #[tokio::test] + async fn test_lock_page_returns_same_lock() { + let vacuum_lock = VacuumLock::default(); + let page_id = PageId::from(1); + + let lock1 = vacuum_lock.lock_page(page_id); + let lock2 = vacuum_lock.lock_page(page_id); + + // Same pointer = same lock instance + assert!(Arc::ptr_eq(&lock1, &lock2)); + } + + #[tokio::test] + async fn test_lock_link_returns_same_lock() { + let vacuum_lock = VacuumLock::default(); + let link = Link { + page_id: PageId::from(1), + offset: 0, + length: 100, + }; + + let lock1 = vacuum_lock.lock_link(link); + let lock2 = vacuum_lock.lock_link(link); + + assert!(Arc::ptr_eq(&lock1, &lock2)); + } +} diff --git a/src/table/vacuum/mod.rs b/src/table/vacuum/mod.rs new file mode 100644 index 00000000..e2250c4b --- /dev/null +++ b/src/table/vacuum/mod.rs @@ -0,0 +1,832 @@ +mod fragmentation_info; +mod lock; +mod page; + +use std::collections::VecDeque; +use std::fmt::Debug; +use std::marker::PhantomData; +use std::sync::Arc; +use std::sync::atomic::Ordering; + +use data_bucket::Link; +use data_bucket::page::PageId; +use indexset::core::node::NodeLike; +use indexset::core::pair::Pair; +use rkyv::rancor::Strategy; +use rkyv::ser::Serializer; +use rkyv::ser::allocator::ArenaHandle; +use rkyv::ser::sharing::Share; +use rkyv::util::AlignedVec; +use rkyv::{Archive, Serialize}; + +use crate::in_memory::{DataPages, GhostWrapper, RowWrapper, StorableRow}; +use crate::prelude::{OffsetEqLink, TablePrimaryKey}; +use crate::vacuum::fragmentation_info::PageFragmentationInfo; +use crate::vacuum::lock::VacuumLock; +use crate::{AvailableIndex, PrimaryIndex, TableRow, TableSecondaryIndex, TableSecondaryIndexCdc}; + +#[derive(Debug)] +pub struct EmptyDataVacuum< + Row, + PrimaryKey, + PkNodeType, + SecondaryIndexes, + AvailableTypes, + AvailableIndexes, + const DATA_LENGTH: usize, + SecondaryEvents = (), +> where + PrimaryKey: Clone + Ord + Send + 'static + std::hash::Hash, + Row: StorableRow + Send + Clone + 'static, + PkNodeType: NodeLike>> + Send + 'static, +{ + data_pages: Arc>, + vacuum_lock: Arc, + + primary_index: Arc>, + secondary_indexes: Arc, + + phantom_data: PhantomData<(SecondaryEvents, AvailableTypes, AvailableIndexes)>, +} + +impl< + Row, + PrimaryKey, + PkNodeType, + SecondaryIndexes, + AvailableTypes, + AvailableIndexes, + const DATA_LENGTH: usize, + SecondaryEvents, +> + EmptyDataVacuum< + Row, + PrimaryKey, + PkNodeType, + SecondaryIndexes, + AvailableTypes, + AvailableIndexes, + DATA_LENGTH, + SecondaryEvents, + > +where + Row: TableRow + StorableRow + Send + Clone + 'static, + PrimaryKey: Debug + Clone + Ord + Send + TablePrimaryKey + std::hash::Hash, + PkNodeType: NodeLike>> + Send + 'static, + ::WrappedRow: RowWrapper, + Row: Archive + + Clone + + for<'a> Serialize< + Strategy, Share>, rkyv::rancor::Error>, + >, + ::WrappedRow: Archive + + for<'a> Serialize< + Strategy, Share>, rkyv::rancor::Error>, + >, + <::WrappedRow as Archive>::Archived: GhostWrapper, + SecondaryIndexes: TableSecondaryIndex + + TableSecondaryIndexCdc, + AvailableIndexes: Debug + AvailableIndex, +{ + async fn defragment(&self) { + let per_page_info = self.data_pages.empty_links_registry().get_per_page_info(); + let mut in_migration_pages = VecDeque::new(); + let mut free_pages = vec![]; + let mut defragmented_pages = VecDeque::new(); + + let mut info_iter = per_page_info.into_iter(); + while let Some(info) = info_iter.next() { + let page_id = info.page_id; + if let Some(id) = defragmented_pages.pop_front() { + match self.move_data_from(page_id, id).await { + (true, true) => { + // from moved fully and on to no more space + free_pages.push(page_id); + } + (true, false) => { + // from moved fully but to has space + free_pages.push(id); + defragmented_pages.push_back(id); + } + (false, true) => { + // from was not moved but to have NO space + in_migration_pages.push_back(page_id); + } + (false, false) => unreachable!( + "at least one of two situations should appear to break from while cycle" + ), + } + } else { + let page_id = info.page_id; + self.defragment_page(info).await; + if let Some(id) = in_migration_pages.pop_front() { + match self.move_data_from(id, page_id).await { + (true, true) => { + // from moved fully and on to no more space + free_pages.push(id); + } + (true, false) => { + // from moved fully but to has space + free_pages.push(id); + defragmented_pages.push_back(page_id); + } + (false, true) => { + // from was not moved but to have NO space + in_migration_pages.push_back(id); + } + (false, false) => unreachable!( + "at least one of two situations should appear to break from while cycle" + ), + } + } else { + defragmented_pages.push_back(page_id); + } + } + } + + for in_migration_pages in in_migration_pages { + let page_start = Link { + page_id: in_migration_pages, + offset: 0, + length: 0, + }; + self.shift_data_in_range(page_start, None); + } + + for id in free_pages { + self.data_pages.mark_page_empty(id) + } + } + + async fn move_data_from(&self, from: PageId, to: PageId) -> (bool, bool) { + let from_lock = self.vacuum_lock.lock_page(from); + let to_lock = self.vacuum_lock.lock_page(to); + + let to_page = self + .data_pages + .get_page(to) + .expect("should exist as link exists"); + let from_page = self + .data_pages + .get_page(from) + .expect("should exist as link exists"); + let to_free_space = to_page.free_space(); + + let page_start = OffsetEqLink::<_>(Link { + page_id: from, + offset: 0, + length: 0, + }); + + let page_end = OffsetEqLink::<_>(Link { + page_id: from.next(), + offset: 0, + length: 0, + }); + + let mut range = self + .primary_index + .reverse_pk_map + .range(page_start..page_end); + let mut sum_links_len = 0; + let mut links = vec![]; + let mut from_page_will_be_moved = false; + let mut to_page_will_be_filled = false; + + loop { + let Some((next, pk)) = range.next() else { + from_page_will_be_moved = true; + break; + }; + + if sum_links_len + next.length > to_free_space as u32 { + to_page_will_be_filled = true; + if range.next().is_none() { + from_page_will_be_moved = true; + } + break; + } + sum_links_len += next.length; + links.push((*next, pk.clone())); + } + + drop(range); + + for (from_link, pk) in links { + let raw_data = from_page + .get_raw_row(from_link.0) + .expect("link is not bigger than free offset"); + let new_link = to_page + .save_raw_row(&raw_data) + .expect("page is not full as checked on links collection"); + self.update_index_after_move(pk, from_link.0, new_link); + } + + { + let g = from_lock.read().await; + g.unlock() + } + { + let g = to_lock.read().await; + g.unlock() + } + + (from_page_will_be_moved, to_page_will_be_filled) + } + + async fn defragment_page(&self, info: PageFragmentationInfo) { + let registry = self.data_pages.empty_links_registry(); + let mut page_empty_links = registry + .page_links_map + .get(&info.page_id) + .map(|(_, l)| *l) + .collect::>(); + page_empty_links.sort_by(|l1, l2| l1.offset.cmp(&l2.offset)); + + let lock = self.vacuum_lock.lock_page(info.page_id); + let mut empty_links_iter = page_empty_links.into_iter(); + + let Some(mut current_empty) = empty_links_iter.next() else { + return; + }; + registry.remove_link(current_empty); + + let Some(mut next_empty) = empty_links_iter.next() else { + self.shift_data_in_range(current_empty, None); + return; + }; + registry.remove_link(next_empty); + + loop { + let offset = self.shift_data_in_range(current_empty, Some(next_empty.offset)); + + let new_next = empty_links_iter.next(); + match new_next { + Some(link) => { + registry.remove_link(link); + current_empty = Link { + page_id: next_empty.page_id, + offset, + length: next_empty.length + (next_empty.offset - offset), + }; + next_empty = link; + } + None => { + let from = Link { + page_id: next_empty.page_id, + offset, + length: next_empty.length + (next_empty.offset - offset), + }; + self.shift_data_in_range(from, None); + break; + } + } + } + + let l = lock.read().await; + l.unlock(); + } + + fn shift_data_in_range(&self, start_link: Link, end_offset: Option) -> u32 { + let page_id = start_link.page_id; + let page = self + .data_pages + .get_page(page_id) + .expect("should exist as link exists"); + let start_link = OffsetEqLink::<_>(start_link); + let range = if let Some(offset) = end_offset { + let end = OffsetEqLink::<_>(Link { + page_id, + offset, + length: 0, + }); + self.primary_index.reverse_pk_map.range(start_link..end) + } else { + let end = OffsetEqLink::<_>(Link { + page_id: page_id.next(), + offset: 0, + length: 0, + }); + self.primary_index.reverse_pk_map.range(start_link..end) + } + .map(|(l, pk)| (*l, pk.clone())) + .collect::>(); + let mut range_iter = range.into_iter(); + + let mut entry_offset = start_link.0.offset; + while let Some((link, pk)) = range_iter.next() { + let link_value = link.0; + + if let Some(end) = end_offset { + if entry_offset + link_value.length >= end { + return entry_offset; + } + } + + let new_link = Link { + page_id, + offset: entry_offset, + length: link_value.length, + }; + + // TODO: Safety comment + unsafe { + page.move_from_to(link_value, new_link) + .expect("should use valid links") + } + entry_offset += link_value.length; + self.update_index_after_move(pk.clone(), link_value, new_link); + } + + if end_offset.is_none() { + // Is safe as page is locked now and we can get here only if end_offset + // is not set so we are shifting till page end. + page.free_offset.store(entry_offset, Ordering::Release); + } + + entry_offset + } + + fn update_index_after_move(&self, pk: PrimaryKey, old_link: Link, new_link: Link) { + let old_offset_link = OffsetEqLink(old_link); + let new_offset_link = OffsetEqLink(new_link); + + self.primary_index + .pk_map + .insert(pk.clone(), new_offset_link); + self.primary_index.reverse_pk_map.remove(&old_offset_link); + self.primary_index + .reverse_pk_map + .insert(new_offset_link, pk); + // TODO: update secondary indexes + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + use std::marker::PhantomData; + use std::sync::Arc; + + use indexset::core::pair::Pair; + use worktable_codegen::{MemStat, worktable}; + + use crate::in_memory::{GhostWrapper, RowWrapper, StorableRow}; + use crate::prelude::*; + use crate::vacuum::EmptyDataVacuum; + use crate::vacuum::lock::VacuumLock; + + worktable!( + name: Test, + columns: { + id: u64 primary_key autoincrement, + test: i64, + another: u64, + exchange: String + }, + indexes: { + test_idx: test unique, + exchnage_idx: exchange, + another_idx: another, + } + ); + + /// Creates an EmptyDataVacuum instance from a WorkTable + fn create_vacuum( + table: &TestWorkTable, + ) -> EmptyDataVacuum< + TestRow, + TestPrimaryKey, + Vec>>, + TestIndex, + TestAvaiableTypes, + TestAvailableIndexes, + TEST_INNER_SIZE, + > { + EmptyDataVacuum { + data_pages: Arc::clone(&table.0.data), + vacuum_lock: Arc::new(VacuumLock::default()), + primary_index: Arc::clone(&table.0.primary_index), + secondary_indexes: Arc::clone(&table.0.indexes), + phantom_data: PhantomData, + } + } + + #[tokio::test] + async fn test_vacuum_shift_data_in_range_single_gap() { + let table = TestWorkTable::default(); + + let mut ids = Vec::new(); + for i in 0..10 { + let row = TestRow { + id: table.get_next_pk().into(), + test: i, + another: i as u64, + exchange: format!("test{}", i), + }; + let id = row.id; + table.insert(row.clone()).unwrap(); + ids.push((id, row)); + } + + let first_two_ids = ids.iter().take(2).map(|(i, _)| *i).collect::>(); + + table.delete(first_two_ids[0].into()).await.unwrap(); + table.delete(first_two_ids[1].into()).await.unwrap(); + + let vacuum = create_vacuum(&table); + vacuum.defragment().await; + + for (id, expected) in ids.into_iter().skip(2) { + let row = table.select(id); + assert_eq!(row, Some(expected)); + } + } + + #[tokio::test] + async fn test_vacuum_shift_data_middle_gap() { + let table = TestWorkTable::default(); + + let mut ids = HashMap::new(); + for i in 0..20 { + let row = TestRow { + id: table.get_next_pk().into(), + test: i * 10, + another: i as u64, + exchange: format!("test{}", i), + }; + let id = row.id; + table.insert(row.clone()).unwrap(); + ids.insert(id, row); + } + + let ids_to_delete = ids.keys().skip(5).take(2).cloned().collect::>(); + + table.delete(ids_to_delete[0].into()).await.unwrap(); + table.delete(ids_to_delete[1].into()).await.unwrap(); + + let vacuum = create_vacuum(&table); + vacuum.defragment().await; + + for (id, expected) in ids + .into_iter() + .filter(|(i, _)| *i != ids_to_delete[0] && *i != ids_to_delete[1]) + { + let row = table.select(id); + assert_eq!(row, Some(expected)); + } + } + + #[tokio::test] + async fn test_vacuum_shift_data_last_records() { + let table = TestWorkTable::default(); + + let mut ids = HashMap::new(); + for i in 0..10 { + let row = TestRow { + id: table.get_next_pk().into(), + test: i, + another: i as u64, + exchange: format!("test{}", i), + }; + let id = row.id; + table.insert(row.clone()).unwrap(); + ids.insert(id, row); + } + + let last_two_ids = ids.keys().skip(8).take(2).cloned().collect::>(); + + table.delete(last_two_ids[1].into()).await.unwrap(); + table.delete(last_two_ids[0].into()).await.unwrap(); + + let vacuum = create_vacuum(&table); + vacuum.defragment().await; + + for (id, expected) in ids + .into_iter() + .filter(|(i, _)| *i != last_two_ids[0] && *i != last_two_ids[1]) + { + let row = table.select(id); + assert_eq!(row, Some(expected)); + } + } + + #[tokio::test] + async fn test_vacuum_shift_data_multiple_gaps() { + let table = TestWorkTable::default(); + + let mut ids = HashMap::new(); + for i in 0..15 { + let row = TestRow { + id: table.get_next_pk().into(), + test: i, + another: i as u64, + exchange: format!("test{}", i), + }; + let id = row.id; + table.insert(row.clone()).unwrap(); + ids.insert(id, row); + } + + let ids_to_delete = [1, 3, 5, 7].map(|idx| ids.keys().cloned().nth(idx).unwrap()); + + for id in &ids_to_delete { + table.delete((*id).into()).await.unwrap(); + } + + let vacuum = create_vacuum(&table); + vacuum.defragment().await; + + for (id, expected) in ids.into_iter().filter(|(i, _)| !ids_to_delete.contains(i)) { + let row = table.select(id); + assert_eq!(row, Some(expected)); + } + } + + #[tokio::test] + async fn test_vacuum_shift_data_single_record_left() { + let table = TestWorkTable::default(); + + let mut ids = Vec::new(); + for i in 0..5 { + let row = TestRow { + id: table.get_next_pk().into(), + test: i, + another: i as u64, + exchange: format!("test{}", i), + }; + let id = row.id; + table.insert(row.clone()).unwrap(); + ids.push((id, row)); + } + + let remaining_id = ids[0].0; + + for (id, _) in ids.iter().skip(1) { + table.delete((*id).into()).await.unwrap(); + } + + let vacuum = create_vacuum(&table); + vacuum.defragment().await; + + let row = table.select(remaining_id); + assert_eq!(row, Some(ids[0].1.clone())); + } + + #[tokio::test] + async fn test_vacuum_defragment_on_delete_last() { + let table = TestWorkTable::default(); + + let mut ids = Vec::new(); + for i in 0..5 { + let row = TestRow { + id: table.get_next_pk().into(), + test: i, + another: i as u64, + exchange: format!("test{}", i), + }; + let id = row.id; + table.insert(row.clone()).unwrap(); + ids.push((id, row)); + } + + table.delete(ids.last().unwrap().0.into()).await.unwrap(); + + let vacuum = create_vacuum(&table); + vacuum.defragment().await; + + for (id, expected) in ids.into_iter().take(4) { + let row = table.select(id); + assert_eq!(row, Some(expected)); + } + } + + #[tokio::test] + async fn test_vacuum_shift_data_variable_string_lengths() { + let table = TestWorkTable::default(); + + let mut ids = HashMap::new(); + let strings = vec![ + "a", + "bbbb", + "cccccc", + "dddddddd", + "eeeeeeeeee", + "ffffffffffff", + "gggggggggggggg", + ]; + + for (i, s) in strings.iter().enumerate() { + let row = TestRow { + id: table.get_next_pk().into(), + test: i as i64, + another: i as u64, + exchange: s.to_string(), + }; + let id = row.id; + table.insert(row.clone()).unwrap(); + ids.insert(id, row); + } + + let ids_to_delete = ids.keys().take(3).cloned().collect::>(); + + for id in &ids_to_delete { + table.delete((*id).into()).await.unwrap(); + } + + let vacuum = create_vacuum(&table); + vacuum.defragment().await; + + for (id, expected) in ids.into_iter().filter(|(i, _)| !ids_to_delete.contains(i)) { + let row = table.select(id); + assert_eq!(row, Some(expected)); + } + } + + #[tokio::test] + async fn test_vacuum_insert_after_free_offset_update() { + let table = TestWorkTable::default(); + + let mut original_ids = HashMap::new(); + for i in 0..8 { + let row = TestRow { + id: table.get_next_pk().into(), + test: i, + another: i as u64, + exchange: format!("original{}", i), + }; + let id = row.id; + table.insert(row.clone()).unwrap(); + original_ids.insert(id, row); + } + + let ids_to_delete = original_ids.keys().take(3).cloned().collect::>(); + for id in &ids_to_delete { + table.delete((*id).into()).await.unwrap(); + } + + let vacuum = create_vacuum(&table); + vacuum.defragment().await; + + let mut new_ids = HashMap::new(); + for i in 0..3 { + let row = TestRow { + id: table.get_next_pk().into(), + test: 100 + i, + another: (100 + i) as u64, + exchange: format!("new{}", i), + }; + let id = row.id; + table.insert(row.clone()).unwrap(); + new_ids.insert(id, row); + } + + for (id, expected) in original_ids + .into_iter() + .filter(|(i, _)| !ids_to_delete.contains(i)) + { + let row = table.select(id); + assert_eq!(row, Some(expected)); + } + + for (id, expected) in new_ids { + let row = table.select(id); + assert_eq!(row, Some(expected)); + } + } + + #[tokio::test] + async fn test_vacuum_multi_page_data_migration() { + let table = TestWorkTable::default(); + + let mut ids = Vec::new(); + // row is ~40 bytes so ~409 rows per page + for i in 0..500 { + let row = TestRow { + id: table.get_next_pk().into(), + test: i, + another: i as u64, + exchange: format!("test{}", i), + }; + let id = row.id; + table.insert(row.clone()).unwrap(); + ids.push((id, row)); + } + + let ids_to_delete: Vec<_> = ids.iter().map(|(i, _)| *i).take(20).collect(); + for id in &ids_to_delete { + table.delete((*id).into()).await.unwrap(); + } + + let vacuum = create_vacuum(&table); + vacuum.defragment().await; + + for (id, expected) in ids.into_iter().filter(|(i, _)| !ids_to_delete.contains(i)) { + let row = table.select(id); + assert_eq!(row, Some(expected)); + } + } + + #[tokio::test] + async fn test_vacuum_multi_page_alternating_deletes() { + let table = TestWorkTable::default(); + + let mut ids = Vec::new(); + // row is ~40 bytes so ~409 rows per page + for i in 0..500 { + let row = TestRow { + id: table.get_next_pk().into(), + test: i, + another: i as u64, + exchange: format!("test{}", i), + }; + let id = row.id; + table.insert(row.clone()).unwrap(); + ids.push((id, row)); + } + + let ids_to_delete: Vec<_> = ids.iter().step_by(20).map(|(id, _)| *id).collect(); + for id in &ids_to_delete { + table.delete((*id).into()).await.unwrap(); + } + + let vacuum = create_vacuum(&table); + vacuum.defragment().await; + + for (id, expected) in ids + .into_iter() + .filter(|(id, _)| !ids_to_delete.contains(id)) + { + let row = table.select(id); + assert_eq!(row, Some(expected)); + } + } + + #[tokio::test] + async fn test_vacuum_multi_page_last() { + let table = TestWorkTable::default(); + + let mut ids = Vec::new(); + // row is ~40 bytes so ~409 rows per page + for i in 0..500 { + let row = TestRow { + id: table.get_next_pk().into(), + test: i, + another: i as u64, + exchange: format!("test{}", i), + }; + let id = row.id; + table.insert(row.clone()).unwrap(); + ids.push((id, row)); + } + + table.delete(ids.last().unwrap().0.into()).await.unwrap(); + + let vacuum = create_vacuum(&table); + vacuum.defragment().await; + + for (id, expected) in ids.into_iter().take(499) { + let row = table.select(id); + assert_eq!(row, Some(expected)); + } + } + + #[tokio::test] + async fn test_vacuum_multi_page_free_page() { + let table = TestWorkTable::default(); + + let mut ids = Vec::new(); + // row is ~40 bytes so ~409 rows per page + for i in 0..1000 { + let row = TestRow { + id: table.get_next_pk().into(), + test: i, + another: i as u64, + exchange: format!("test{}", i), + }; + let id = row.id; + table.insert(row.clone()).unwrap(); + ids.push((id, row)); + } + + let mut ids_to_delete: Vec<_> = ids.iter().skip(300).take(400).map(|(id, _)| *id).collect(); + // remove last too to trigger vacuum for last page too. + ids_to_delete.push(ids.last().unwrap().0); + for id in &ids_to_delete { + table.delete((*id).into()).await.unwrap(); + } + + let vacuum = create_vacuum(&table); + vacuum.defragment().await; + + assert!(table.0.data.get_empty_pages().len() > 0); + + for (id, expected) in ids + .into_iter() + .filter(|(id, _)| !ids_to_delete.contains(id)) + { + let row = table.select(id); + assert_eq!(row, Some(expected)); + } + } +} diff --git a/src/table/vacuum/page.rs b/src/table/vacuum/page.rs new file mode 100644 index 00000000..8b137891 --- /dev/null +++ b/src/table/vacuum/page.rs @@ -0,0 +1 @@ + diff --git a/src/util/mod.rs b/src/util/mod.rs index 7e68e97a..cfb87fa6 100644 --- a/src/util/mod.rs +++ b/src/util/mod.rs @@ -1,5 +1,7 @@ +mod offset_eq_link; mod optimized_vec; mod ordered_float; +pub use offset_eq_link::OffsetEqLink; pub use optimized_vec::OptimizedVec; pub use ordered_float::{OrderedF32Def, OrderedF64Def}; diff --git a/src/util/offset_eq_link.rs b/src/util/offset_eq_link.rs new file mode 100644 index 00000000..d93e98a9 --- /dev/null +++ b/src/util/offset_eq_link.rs @@ -0,0 +1,217 @@ +//! A link wrapper with equality based on absolute position. +//! +//! [`OffsetEqLink`] wraps a [`Link`] and implements `Eq`, `Ord`, and `Hash` +//! based on its absolute index within the data pages, rather than on the +//! raw `Link` fields. + +use data_bucket::{Link, SizeMeasurable}; +use derive_more::From; + +use crate::in_memory::DATA_INNER_LENGTH; +use crate::prelude::Into; + +/// A link wrapper that implements `Eq` based on absolute index. +#[derive(Copy, Clone, Debug, Default, Into, From)] +pub struct OffsetEqLink(pub Link); + +impl OffsetEqLink { + /// Calculates the absolute index of the link. + pub fn absolute_index(&self) -> u64 { + let page_id: u32 = self.0.page_id.into(); + (page_id as u64 * DATA_LENGTH as u64) + self.0.offset as u64 + } +} + +impl std::hash::Hash for OffsetEqLink { + fn hash(&self, state: &mut H) { + self.absolute_index().hash(state); + } +} + +impl PartialOrd for OffsetEqLink { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for OffsetEqLink { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.absolute_index().cmp(&other.absolute_index()) + } +} + +impl PartialEq for OffsetEqLink { + fn eq(&self, other: &Self) -> bool { + self.absolute_index().eq(&other.absolute_index()) + } +} + +impl Eq for OffsetEqLink {} + +impl std::ops::Deref for OffsetEqLink { + type Target = Link; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl AsRef for OffsetEqLink { + fn as_ref(&self) -> &Link { + &self.0 + } +} + +impl PartialEq for OffsetEqLink { + fn eq(&self, other: &Link) -> bool { + self.0.eq(other) + } +} + +impl SizeMeasurable for OffsetEqLink { + fn aligned_size(&self) -> usize { + self.0.aligned_size() + } + + fn align() -> Option { + Link::align() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use data_bucket::page::PageId; + use std::collections::HashSet; + + const TEST_DATA_LENGTH: usize = 4096; + + #[test] + fn test_same_position_different_length_are_equal() { + let link1 = OffsetEqLink::(Link { + page_id: PageId::from(1), + offset: 100, + length: 50, + }); + let link2 = OffsetEqLink::(Link { + page_id: PageId::from(1), + offset: 100, + length: 100, + }); + + assert_eq!(link1, link2); + } + + #[test] + fn test_different_page_not_equal() { + let link1 = OffsetEqLink::(Link { + page_id: PageId::from(1), + offset: 100, + length: 50, + }); + let link2 = OffsetEqLink::(Link { + page_id: PageId::from(2), + offset: 100, + length: 50, + }); + + assert_ne!(link1, link2); + } + + #[test] + fn test_different_offset_not_equal() { + let link1 = OffsetEqLink::(Link { + page_id: PageId::from(1), + offset: 100, + length: 50, + }); + let link2 = OffsetEqLink::(Link { + page_id: PageId::from(1), + offset: 200, + length: 50, + }); + + assert_ne!(link1, link2); + } + + #[test] + fn test_ordering_same_page() { + let link1 = OffsetEqLink::(Link { + page_id: PageId::from(1), + offset: 100, + length: 50, + }); + let link2 = OffsetEqLink::(Link { + page_id: PageId::from(1), + offset: 200, + length: 50, + }); + + assert!(link1 < link2); + } + + #[test] + fn test_ordering_different_pages() { + let link1 = OffsetEqLink::(Link { + page_id: PageId::from(1), + offset: 4000, + length: 50, + }); + let link2 = OffsetEqLink::(Link { + page_id: PageId::from(2), + offset: 100, + length: 50, + }); + + assert!(link1 < link2); // page 1 end < page 2 start + } + + #[test] + fn test_ordering_within_page_boundaries() { + let link1 = OffsetEqLink::(Link { + page_id: PageId::from(0), + offset: 0, + length: 10, + }); + let link2 = OffsetEqLink::(Link { + page_id: PageId::from(0), + offset: TEST_DATA_LENGTH as u32 - 1, + length: 10, + }); + let link3 = OffsetEqLink::(Link { + page_id: PageId::from(1), + offset: 0, + length: 10, + }); + + assert!(link1 < link2); + assert!(link2 < link3); + } + + #[test] + fn test_hash_consistent_with_equality() { + let link1 = OffsetEqLink::(Link { + page_id: PageId::from(1), + offset: 100, + length: 50, + }); + let link2 = OffsetEqLink::(Link { + page_id: PageId::from(1), + offset: 100, + length: 100, + }); + let link3 = OffsetEqLink::(Link { + page_id: PageId::from(1), + offset: 200, + length: 50, + }); + + let mut set = HashSet::new(); + set.insert(link1); + set.insert(link2); + set.insert(link3); + + // link1 and link2 are equal, so only 2 elements in set + assert_eq!(set.len(), 2); + } +} diff --git a/tests/persistence/read.rs b/tests/persistence/read.rs index 7b658052..2cc4749d 100644 --- a/tests/persistence/read.rs +++ b/tests/persistence/read.rs @@ -28,7 +28,7 @@ async fn test_info_parse() { assert_eq!(info.inner.page_count, 1); assert_eq!(info.inner.name, "TestPersist"); assert_eq!(info.inner.pk_gen_state, 0); - assert_eq!(info.inner.empty_links_list, vec![]); + assert_eq!(info.inner.empty_links_list, Vec::::new()); } #[tokio::test] diff --git a/tests/worktable/base.rs b/tests/worktable/base.rs index dacc0148..ef3675c1 100644 --- a/tests/worktable/base.rs +++ b/tests/worktable/base.rs @@ -188,7 +188,7 @@ async fn update_string() { exchange: "test".to_string(), }; let pk = table.insert(row.clone()).unwrap(); - let first_link = table.0.pk_map.get(&pk).unwrap().get().value; + let first_link = table.0.primary_index.pk_map.get(&pk).unwrap().get().value; let updated = TestRow { id: pk.clone().into(), test: 2, @@ -271,7 +271,13 @@ async fn delete() { exchange: "test".to_string(), }; let pk = table.insert(row.clone()).unwrap(); - let link = table.0.pk_map.get(&pk).map(|kv| kv.get().value).unwrap(); + let link = table + .0 + .primary_index + .pk_map + .get(&pk) + .map(|kv| kv.get().value) + .unwrap(); table.delete(pk.clone()).await.unwrap(); let selected_row = table.select(pk); assert!(selected_row.is_none()); @@ -287,7 +293,13 @@ async fn delete() { exchange: "test".to_string(), }; let pk = table.insert(updated.clone()).unwrap(); - let new_link = table.0.pk_map.get(&pk).map(|kv| kv.get().value).unwrap(); + let new_link = table + .0 + .primary_index + .pk_map + .get(&pk) + .map(|kv| kv.get().value) + .unwrap(); assert_eq!(link, new_link) } @@ -372,7 +384,13 @@ async fn delete_and_insert_less() { exchange: "test1234567890".to_string(), }; let pk = table.insert(row.clone()).unwrap(); - let link = table.0.pk_map.get(&pk).map(|kv| kv.get().value).unwrap(); + let link = table + .0 + .primary_index + .pk_map + .get(&pk) + .map(|kv| kv.get().value) + .unwrap(); table.delete(pk.clone()).await.unwrap(); let selected_row = table.select(pk); assert!(selected_row.is_none()); @@ -384,9 +402,15 @@ async fn delete_and_insert_less() { exchange: "test1".to_string(), }; let pk = table.insert(updated.clone()).unwrap(); - let new_link = table.0.pk_map.get(&pk).map(|kv| kv.get().value).unwrap(); + let new_link = table + .0 + .primary_index + .pk_map + .get(&pk) + .map(|kv| kv.get().value) + .unwrap(); - assert_ne!(link, new_link) + assert_ne!(link.0, new_link.0) } #[tokio::test] @@ -406,7 +430,13 @@ async fn delete_and_replace() { exchange: "test".to_string(), }; let pk = table.insert(row.clone()).unwrap(); - let link = table.0.pk_map.get(&pk).map(|kv| kv.get().value).unwrap(); + let link = table + .0 + .primary_index + .pk_map + .get(&pk) + .map(|kv| kv.get().value) + .unwrap(); table.delete(pk.clone()).await.unwrap(); let selected_row = table.select(pk); assert!(selected_row.is_none()); @@ -418,7 +448,13 @@ async fn delete_and_replace() { exchange: "test".to_string(), }; let pk = table.insert(updated.clone()).unwrap(); - let new_link = table.0.pk_map.get(&pk).map(|kv| kv.get().value).unwrap(); + let new_link = table + .0 + .primary_index + .pk_map + .get(&pk) + .map(|kv| kv.get().value) + .unwrap(); assert_eq!(link, new_link) } diff --git a/tests/worktable/index/insert.rs b/tests/worktable/index/insert.rs index 4dea2a0f..69e557b3 100644 --- a/tests/worktable/index/insert.rs +++ b/tests/worktable/index/insert.rs @@ -131,12 +131,13 @@ fn insert_when_secondary_unique_exists() { .indexes .attr2_idx .get(&row.attr2) - .map(|r| r.get().value), + .map(|r| r.get().value.0), table .0 + .primary_index .pk_map .get(&TestPrimaryKey(row.id)) - .map(|r| r.get().value) + .map(|r| r.get().value.0) ); } @@ -188,12 +189,13 @@ fn insert_when_secondary_unique_string_exists() { .indexes .attr4_idx .get(&row.attr4) - .map(|r| r.get().value), + .map(|r| r.get().value.0), table .0 + .primary_index .pk_map .get(&TestPrimaryKey(row.id)) - .map(|r| r.get().value) + .map(|r| r.get().value.0) ); } diff --git a/tests/worktable/unsized_.rs b/tests/worktable/unsized_.rs index b6dc787c..c47ea617 100644 --- a/tests/worktable/unsized_.rs +++ b/tests/worktable/unsized_.rs @@ -38,7 +38,7 @@ async fn test_update_string_full_row() { exchange: "test".to_string(), }; let pk = table.insert(row.clone()).unwrap(); - let first_link = table.0.pk_map.get(&pk).unwrap().get().value; + let first_link = table.0.primary_index.pk_map.get(&pk).unwrap().get().value; table .update(TestRow { @@ -74,7 +74,7 @@ async fn test_update_string_by_unique() { exchange: "test".to_string(), }; let pk = table.insert(row.clone()).unwrap(); - let first_link = table.0.pk_map.get(&pk).unwrap().get().value; + let first_link = table.0.primary_index.pk_map.get(&pk).unwrap().get().value; let row = ExchangeByTestQuery { exchange: "bigger test to test string update".to_string(), @@ -105,7 +105,7 @@ async fn test_update_string_by_pk() { exchange: "test".to_string(), }; let pk = table.insert(row.clone()).unwrap(); - let first_link = table.0.pk_map.get(&pk).unwrap().get().value; + let first_link = table.0.primary_index.pk_map.get(&pk).unwrap().get().value; let row = ExchangeByIdQuery { exchange: "bigger test to test string update".to_string(), @@ -136,7 +136,7 @@ async fn test_update_string_by_non_unique() { exchange: "test".to_string(), }; let pk = table.insert(row1.clone()).unwrap(); - let first_link = table.0.pk_map.get(&pk).unwrap().get().value; + let first_link = table.0.primary_index.pk_map.get(&pk).unwrap().get().value; let row2 = TestRow { id: table.get_next_pk().into(), test: 2, @@ -144,7 +144,7 @@ async fn test_update_string_by_non_unique() { exchange: "test".to_string(), }; let pk = table.insert(row2.clone()).unwrap(); - let second_link = table.0.pk_map.get(&pk).unwrap().get().value; + let second_link = table.0.primary_index.pk_map.get(&pk).unwrap().get().value; let row = ExchangeByAbotherQuery { exchange: "bigger test to test string update".to_string(), @@ -329,7 +329,7 @@ async fn test_update_many_strings_by_unique() { other_srting: "other".to_string(), }; let pk = table.insert(row.clone()).unwrap(); - let first_link = table.0.pk_map.get(&pk).unwrap().get().value; + let first_link = table.0.primary_index.pk_map.get(&pk).unwrap().get().value; let row = ExchangeAndSomeByTestQuery { exchange: "bigger test to test string update".to_string(), @@ -368,7 +368,7 @@ async fn test_update_many_strings_by_pk() { other_srting: "other".to_string(), }; let pk = table.insert(row.clone()).unwrap(); - let first_link = table.0.pk_map.get(&pk).unwrap().get().value; + let first_link = table.0.primary_index.pk_map.get(&pk).unwrap().get().value; let row = ExchangeAndSomeByIdQuery { exchange: "bigger test to test string update".to_string(), @@ -404,7 +404,7 @@ async fn test_update_many_strings_by_non_unique() { other_srting: "other".to_string(), }; let pk = table.insert(row1.clone()).unwrap(); - let first_link = table.0.pk_map.get(&pk).unwrap().get().value; + let first_link = table.0.primary_index.pk_map.get(&pk).unwrap().get().value; let row2 = TestMoreStringsRow { id: table.get_next_pk().into(), test: 2, @@ -414,7 +414,7 @@ async fn test_update_many_strings_by_non_unique() { other_srting: "other".to_string(), }; let pk = table.insert(row2.clone()).unwrap(); - let second_link = table.0.pk_map.get(&pk).unwrap().get().value; + let second_link = table.0.primary_index.pk_map.get(&pk).unwrap().get().value; let row = ExchangeAndSomeByAnotherQuery { exchange: "bigger test to test string update".to_string(), @@ -472,7 +472,7 @@ async fn test_update_many_strings_by_string() { other_srting: "other er".to_string(), }; let pk = table.insert(row1.clone()).unwrap(); - let first_link = table.0.pk_map.get(&pk).unwrap().get().value; + let first_link = table.0.primary_index.pk_map.get(&pk).unwrap().get().value; let row2 = TestMoreStringsRow { id: table.get_next_pk().into(), test: 2, @@ -482,7 +482,7 @@ async fn test_update_many_strings_by_string() { other_srting: "other".to_string(), }; let pk = table.insert(row2.clone()).unwrap(); - let second_link = table.0.pk_map.get(&pk).unwrap().get().value; + let second_link = table.0.primary_index.pk_map.get(&pk).unwrap().get().value; let row = SomeOtherByExchangeQuery { other_srting: "bigger test to test string update".to_string(),