From 2105cdfd27425e40de87a01872c614f5abc5d666 Mon Sep 17 00:00:00 2001 From: Handy-caT <37216852+Handy-caT@users.noreply.github.com> Date: Wed, 17 Dec 2025 20:40:19 +0300 Subject: [PATCH 1/3] WIP --- Cargo.toml | 2 +- src/in_memory/data.rs | 36 + src/in_memory/empty_link_registry.rs | 197 +++ src/in_memory/mod.rs | 1 + src/in_memory/pages.rs | 52 +- src/lock/mod.rs | 6 + tests/worktable/unsized_.rs | 1852 +++++++++++++++++++++++++- 7 files changed, 2090 insertions(+), 56 deletions(-) create mode 100644 src/in_memory/empty_link_registry.rs diff --git a/Cargo.toml b/Cargo.toml index 4a63c17..bab2d56 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,7 +29,7 @@ futures = "0.3.30" uuid = { version = "1.10.0", features = ["v4", "v7"] } data_bucket = "0.3.6" # data_bucket = { git = "https://github.com/pathscale/DataBucket", branch = "page_cdc_correction", version = "0.2.7" } -# data_bucket = { path = "../DataBucket", version = "0.3.4" } +# data_bucket = { path = "../DataBucket", version = "0.3.6" } performance_measurement_codegen = { path = "performance_measurement/codegen", version = "0.1.0", optional = true } performance_measurement = { path = "performance_measurement", version = "0.1.0", optional = true } # indexset = { version = "0.12.3", features = ["concurrent", "cdc", "multimap"] } diff --git a/src/in_memory/data.rs b/src/in_memory/data.rs index 0e824cb..7fce256 100644 --- a/src/in_memory/data.rs +++ b/src/in_memory/data.rs @@ -158,6 +158,42 @@ impl Data { Ok(link) } + pub unsafe fn try_save_row_by_link( + &self, + row: &Row, + mut link: Link, + ) -> Result<(Link, Option), ExecutionError> + where + Row: Archive + + for<'a> Serialize< + Strategy, Share>, rkyv::rancor::Error>, + >, + { + let bytes = rkyv::to_bytes(row).map_err(|_| ExecutionError::SerializeError)?; + let length = bytes.len() as u32; + if length > link.length { + return Err(ExecutionError::InvalidLink); + } + + let link_diff = link.length - length; + let link_left = if link_diff > 0 { + link.length -= link_diff; + Some(Link { + page_id: link.page_id, + offset: link.offset + link.length, + length: link_diff, + }) + } else { + None + }; + + let inner_data = unsafe { &mut *self.inner_data.get() }; + inner_data[link.offset as usize..][..link.length as usize] + .copy_from_slice(bytes.as_slice()); + + Ok((link, link_left)) + } + /// # Safety /// This function is `unsafe` because it returns a mutable reference to an archived row. /// The caller must ensure that there are no other references to the same data diff --git a/src/in_memory/empty_link_registry.rs b/src/in_memory/empty_link_registry.rs new file mode 100644 index 0000000..b374c68 --- /dev/null +++ b/src/in_memory/empty_link_registry.rs @@ -0,0 +1,197 @@ +use crate::in_memory::DATA_INNER_LENGTH; +use data_bucket::Link; +use indexset::concurrent::multimap::BTreeMultiMap; +use indexset::concurrent::set::BTreeSet; +use parking_lot::FairMutex; + +/// A link wrapper that implements `Ord` based on absolute index calculation. +#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord)] +pub struct IndexOrdLink(pub Link); + +impl IndexOrdLink { + /// Calculates the absolute index of the link. + fn absolute_index(&self) -> u64 { + let page_id: u32 = self.0.page_id.into(); + (page_id as u64 * DATA_LENGTH as u64) + self.0.offset as u64 + } + + fn unite_with_right_neighbor(&self, other: &Self) -> Option { + let self_end = self.absolute_index() + self.0.length as u64; + let other_start = other.absolute_index(); + + if self.0.page_id != other.0.page_id { + return None; + } + + if self_end == other_start { + let new_length = self.0.length + other.0.length; + Some(IndexOrdLink(Link { + page_id: self.0.page_id, + offset: self.0.offset, + length: new_length, + })) + } else { + None + } + } + + fn unite_with_left_neighbor(&self, other: &Self) -> Option { + let other_end = other.absolute_index() + other.0.length as u64; + let self_start = self.absolute_index(); + + if self.0.page_id != other.0.page_id { + return None; + } + + if other_end == self_start { + let new_offset = other.0.offset; + let new_length = self.0.length + other.0.length; + Some(IndexOrdLink(Link { + page_id: other.0.page_id, + offset: new_offset, + length: new_length, + })) + } else { + None + } + } +} + +impl PartialOrd for IndexOrdLink { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.absolute_index().cmp(&other.absolute_index())) + } +} + +#[derive(Debug)] +pub struct EmptyLinkRegistry { + index_ord_links: BTreeSet>, + length_ord_links: BTreeMultiMap, + op_lock: FairMutex<()>, +} + +impl Default for EmptyLinkRegistry { + fn default() -> Self { + Self { + index_ord_links: BTreeSet::new(), + length_ord_links: BTreeMultiMap::new(), + op_lock: Default::default(), + } + } +} + +impl EmptyLinkRegistry { + pub fn push(&self, link: Link) { + let mut index_ord_link = IndexOrdLink(link.clone()); + let _g = self.op_lock.lock(); + + { + let mut iter = self.index_ord_links.range(..index_ord_link).rev(); + if let Some(possible_left_neighbor) = iter.next() { + let possible_left_neighbor = *possible_left_neighbor; + if let Some(united_link) = + index_ord_link.unite_with_left_neighbor(&possible_left_neighbor) + { + drop(iter); + + // Remove left neighbor + self.index_ord_links.remove(&possible_left_neighbor); + self.length_ord_links + .remove(&possible_left_neighbor.0.length, &possible_left_neighbor.0); + + index_ord_link = united_link; + } + } + } + + { + let mut iter = self.index_ord_links.range(index_ord_link..); + if let Some(possible_right_neighbor) = iter.next() { + let possible_right_neighbor = *possible_right_neighbor; + if let Some(united_link) = + index_ord_link.unite_with_right_neighbor(&possible_right_neighbor) + { + drop(iter); + + // Remove right neighbor + self.index_ord_links.remove(&possible_right_neighbor); + self.length_ord_links.remove( + &possible_right_neighbor.0.length, + &possible_right_neighbor.0, + ); + + index_ord_link = united_link; + } + } + } + + self.index_ord_links.insert(index_ord_link); + self.length_ord_links + .insert(index_ord_link.0.length, index_ord_link.0); + } + + pub fn pop_max(&self) -> Option { + let _g = self.op_lock.lock(); + + let mut iter = self.length_ord_links.iter().rev(); + let (len, max_length_link) = iter.next()?; + let index_ord_link = IndexOrdLink(*max_length_link); + drop(iter); + + self.length_ord_links.remove(len, max_length_link); + self.index_ord_links.remove(&index_ord_link); + + Some(index_ord_link.0) + } + + pub fn len(&self) -> usize { + self.index_ord_links.len() + } + + pub fn iter(&self) -> impl Iterator + '_ { + self.index_ord_links.iter().map(|l| l.0) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_empty_link_registry_insert_and_pop() { + let registry = EmptyLinkRegistry::::default(); + + let link1 = Link { + page_id: 1.into(), + offset: 0, + length: 100, + }; + + let link2 = Link { + page_id: 1.into(), + offset: 100, + length: 150, + }; + + let link3 = Link { + page_id: 2.into(), + offset: 0, + length: 200, + }; + + registry.push(link1.clone()); + registry.push(link2.clone()); + registry.push(link3.clone()); + + // After inserting link1 and link2, they should be united + let united_link = Link { + page_id: 1.into(), + offset: 0, + length: 250, + }; + + assert_eq!(registry.pop_max(), Some(united_link)); + assert_eq!(registry.pop_max(), Some(link3)); + assert_eq!(registry.pop_max(), None); + } +} diff --git a/src/in_memory/mod.rs b/src/in_memory/mod.rs index 1a14334..addfaae 100644 --- a/src/in_memory/mod.rs +++ b/src/in_memory/mod.rs @@ -1,4 +1,5 @@ mod data; +mod empty_link_registry; mod pages; mod row; diff --git a/src/in_memory/pages.rs b/src/in_memory/pages.rs index 65823b9..9432979 100644 --- a/src/in_memory/pages.rs +++ b/src/in_memory/pages.rs @@ -6,7 +6,6 @@ use std::{ use data_bucket::page::PageId; use derive_more::{Display, Error, From}; -use lockfree::stack::Stack; #[cfg(feature = "perf_measurements")] use performance_measurement_codegen::performance_measurement; use rkyv::{ @@ -17,6 +16,7 @@ use rkyv::{ util::AlignedVec, }; +use crate::in_memory::empty_link_registry::EmptyLinkRegistry; use crate::{ in_memory::{ DATA_INNER_LENGTH, Data, DataExecutionError, @@ -37,9 +37,7 @@ where /// Pages vector. Currently, not lock free. pages: RwLock::WrappedRow, DATA_LENGTH>>>>, - /// Stack with empty [`Link`]s. It stores [`Link`]s of rows that was deleted. - // TODO: Proper empty links registry + defragmentation - empty_links: Stack, + empty_links: EmptyLinkRegistry, /// Count of saved rows. row_count: AtomicU64, @@ -68,7 +66,7 @@ where Self { // We are starting ID's from `1` because `0`'s page in file is info page. pages: RwLock::new(vec![Arc::new(Data::new(1.into()))]), - empty_links: Stack::new(), + empty_links: EmptyLinkRegistry::::default(), row_count: AtomicU64::new(0), last_page_id: AtomicU32::new(1), current_page_id: AtomicU32::new(1), @@ -83,7 +81,7 @@ where let last_page_id = vec.len(); Self { pages: RwLock::new(vec), - empty_links: Stack::new(), + empty_links: EmptyLinkRegistry::default(), row_count: AtomicU64::new(0), last_page_id: AtomicU32::new(last_page_id as u32), current_page_id: AtomicU32::new(last_page_id as u32), @@ -104,24 +102,35 @@ where { let general_row = ::WrappedRow::from_inner(row); - if let Some(link) = self.empty_links.pop() { + //println!("Popping empty link"); + if let Some(link) = self.empty_links.pop_max() { + //println!("Empty link len {}", self.empty_links.len()); let pages = self.pages.read().unwrap(); let current_page: usize = page_id_mapper(link.page_id.into()); let page = &pages[current_page]; - if let Err(e) = unsafe { page.save_row_by_link(&general_row, link) } { - match e { + match unsafe { page.try_save_row_by_link(&general_row, link) } { + Ok((link, left_link)) => { + if let Some(l) = left_link { + //println!("Pushing empty link"); + self.empty_links.push(l); + //println!("Pushed empty link"); + } + return Ok(link); + } + // Ok(l) => return Ok(l), + Err(e) => match e { DataExecutionError::InvalidLink => { + //println!("Pushing empty link"); self.empty_links.push(link); + //println!("Pushed empty link"); } DataExecutionError::PageIsFull { .. } | DataExecutionError::PageTooSmall { .. } | DataExecutionError::SerializeError | DataExecutionError::DeserializeError => return Err(e.into()), - } - } else { - return Ok(link); - }; + }, + } } loop { @@ -311,7 +320,9 @@ where } pub fn delete(&self, link: Link) -> Result<(), ExecutionError> { + //println!("Pushing empty link"); self.empty_links.push(link); + //println!("Pushed empty link"); Ok(()) } @@ -337,20 +348,15 @@ where } pub fn get_empty_links(&self) -> Vec { - let mut res = vec![]; - for l in self.empty_links.pop_iter() { - res.push(l) - } - - res + self.empty_links.iter().collect() } pub fn with_empty_links(mut self, links: Vec) -> Self { - let stack = Stack::new(); + let registry = EmptyLinkRegistry::default(); for l in links { - stack.push(l) + registry.push(l) } - self.empty_links = stack; + self.empty_links = registry; self } @@ -496,7 +502,7 @@ mod tests { let link = pages.insert(row).unwrap(); pages.delete(link).unwrap(); - assert_eq!(pages.empty_links.pop(), Some(link)); + assert_eq!(pages.empty_links.pop_max(), Some(link)); pages.empty_links.push(link); let row = TestRow { a: 20, b: 20 }; diff --git a/src/lock/mod.rs b/src/lock/mod.rs index 57a6e76..4c5a230 100644 --- a/src/lock/mod.rs +++ b/src/lock/mod.rs @@ -35,6 +35,12 @@ impl Hash for Lock { } } +impl Drop for Lock { + fn drop(&mut self) { + self.unlock() + } +} + impl Lock { pub fn new(id: u16) -> Self { Self { diff --git a/tests/worktable/unsized_.rs b/tests/worktable/unsized_.rs index 834b0de..ba11904 100644 --- a/tests/worktable/unsized_.rs +++ b/tests/worktable/unsized_.rs @@ -1,34 +1,1750 @@ -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::sync::Arc; use std::time::Duration; use parking_lot::Mutex; - use worktable::prelude::*; use worktable::worktable; -worktable! ( - name: Test, - columns: { - id: u64 primary_key autoincrement, - test: i64, - another: u64, - exchange: String, - }, - indexes: { - test_idx: test unique, - exchnage_idx: exchange, - another_idx: another, +#[derive( + Clone, + rkyv::Archive, + Debug, + Default, + rkyv::Deserialize, + Hash, + rkyv::Serialize, + From, + Eq, + Into, + PartialEq, + PartialOrd, + Ord, + SizeMeasure, +)] +#[rkyv(derive(PartialEq, Eq, PartialOrd, Ord, Debug))] +pub struct TestPrimaryKey(u64); +impl TablePrimaryKey for TestPrimaryKey { + type Generator = std::sync::atomic::AtomicU64; +} +#[derive(rkyv::Archive, Debug, rkyv::Deserialize, Clone, rkyv::Serialize, PartialEq, MemStat)] +#[rkyv(derive(Debug))] +#[repr(C)] +pub struct TestRow { + pub id: u64, + pub test: i64, + pub another: u64, + pub exchange: String, +} +impl TableRow for TestRow { + fn get_primary_key(&self) -> TestPrimaryKey { + self.id.clone().into() } - queries: { - update: { - ExchangeByTest(exchange) by test, - ExchangeById(exchange) by id, - ExchangeByAbother(exchange) by another, +} +#[derive(rkyv::Archive, Debug, rkyv::Deserialize, Clone, rkyv::Serialize, PartialEq)] +#[rkyv(derive(Debug))] +#[repr(C)] +pub enum TestRowFields { + Another, + Id, + Exchange, + Test, +} +impl Query for TestRow { + fn merge(self, row: TestRow) -> TestRow { + self + } +} +#[derive(Clone, Debug, From, PartialEq)] +#[non_exhaustive] +pub enum TestAvaiableTypes { + #[from] + STRING(String), + #[from] + U64(u64), + #[from] + I64(i64), +} +#[derive(rkyv::Archive, Debug, rkyv::Deserialize, rkyv::Serialize)] +#[repr(C)] +pub struct TestWrapper { + inner: TestRow, + is_ghosted: bool, + is_deleted: bool, +} +impl RowWrapper for TestWrapper { + fn get_inner(self) -> TestRow { + self.inner + } + fn is_ghosted(&self) -> bool { + self.is_ghosted + } + fn from_inner(inner: TestRow) -> Self { + Self { + inner, + is_ghosted: true, + is_deleted: false, + } + } +} +impl StorableRow for TestRow { + type WrappedRow = TestWrapper; +} +impl GhostWrapper for ArchivedTestWrapper { + fn unghost(&mut self) { + self.is_ghosted = false; + } +} +#[derive(Debug, Clone)] +pub struct TestLock { + another_lock: Option>, + id_lock: Option>, + exchange_lock: Option>, + test_lock: Option>, +} +impl TestLock { + pub fn new() -> Self { + Self { + another_lock: None, + id_lock: None, + exchange_lock: None, + test_lock: None, + } + } +} +impl RowLock for TestLock { + fn is_locked(&self) -> bool { + self.another_lock + .as_ref() + .map(|l| l.is_locked()) + .unwrap_or(false) + || self + .id_lock + .as_ref() + .map(|l| l.is_locked()) + .unwrap_or(false) + || self + .exchange_lock + .as_ref() + .map(|l| l.is_locked()) + .unwrap_or(false) + || self + .test_lock + .as_ref() + .map(|l| l.is_locked()) + .unwrap_or(false) + } + #[allow(clippy::mutable_key_type)] + fn lock( + &mut self, + id: u16, + ) -> ( + std::collections::HashSet>, + std::sync::Arc, + ) { + let mut set = std::collections::HashSet::new(); + let lock = std::sync::Arc::new(Lock::new(id)); + if let Some(lock) = &self.another_lock { + set.insert(lock.clone()); + } + self.another_lock = Some(lock.clone()); + if let Some(lock) = &self.id_lock { + set.insert(lock.clone()); + } + self.id_lock = Some(lock.clone()); + if let Some(lock) = &self.exchange_lock { + set.insert(lock.clone()); + } + self.exchange_lock = Some(lock.clone()); + if let Some(lock) = &self.test_lock { + set.insert(lock.clone()); + } + self.test_lock = Some(lock.clone()); + (set, lock) + } + fn with_lock(id: u16) -> (Self, std::sync::Arc) { + let lock = std::sync::Arc::new(Lock::new(id)); + ( + Self { + another_lock: Some(lock.clone()), + id_lock: Some(lock.clone()), + exchange_lock: Some(lock.clone()), + test_lock: Some(lock.clone()), + }, + lock, + ) + } + #[allow(clippy::mutable_key_type)] + fn merge(&mut self, other: &mut Self) -> std::collections::HashSet> { + let mut set = std::collections::HashSet::new(); + if let Some(another_lock) = &other.another_lock { + if self.another_lock.is_none() { + self.another_lock = Some(another_lock.clone()); + } else { + set.insert(another_lock.clone()); + } + } + other.another_lock = self.another_lock.clone(); + if let Some(id_lock) = &other.id_lock { + if self.id_lock.is_none() { + self.id_lock = Some(id_lock.clone()); + } else { + set.insert(id_lock.clone()); + } + } + other.id_lock = self.id_lock.clone(); + if let Some(exchange_lock) = &other.exchange_lock { + if self.exchange_lock.is_none() { + self.exchange_lock = Some(exchange_lock.clone()); + } else { + set.insert(exchange_lock.clone()); + } + } + other.exchange_lock = self.exchange_lock.clone(); + if let Some(test_lock) = &other.test_lock { + if self.test_lock.is_none() { + self.test_lock = Some(test_lock.clone()); + } else { + set.insert(test_lock.clone()); + } + } + other.test_lock = self.test_lock.clone(); + set + } +} +#[derive(Debug, MemStat)] +pub struct TestIndex { + exchnage_idx: IndexMultiMap>>, + test_idx: IndexMap, + another_idx: IndexMultiMap, +} +impl TableSecondaryIndex for TestIndex { + fn save_row( + &self, + row: TestRow, + link: Link, + ) -> core::result::Result<(), IndexError> { + let mut inserted_indexes: Vec = vec![]; + if self + .exchnage_idx + .insert_checked(row.exchange.clone(), link) + .is_none() + { + return Err(IndexError::AlreadyExists { + at: TestAvailableIndexes::ExchnageIdx, + inserted_already: inserted_indexes.clone(), + }); + } + inserted_indexes.push(TestAvailableIndexes::ExchnageIdx); + if self + .test_idx + .insert_checked(row.test.clone(), link) + .is_none() + { + return Err(IndexError::AlreadyExists { + at: TestAvailableIndexes::TestIdx, + inserted_already: inserted_indexes.clone(), + }); + } + inserted_indexes.push(TestAvailableIndexes::TestIdx); + if self + .another_idx + .insert_checked(row.another.clone(), link) + .is_none() + { + return Err(IndexError::AlreadyExists { + at: TestAvailableIndexes::AnotherIdx, + inserted_already: inserted_indexes.clone(), + }); + } + inserted_indexes.push(TestAvailableIndexes::AnotherIdx); + core::result::Result::Ok(()) + } + fn reinsert_row( + &self, + row_old: TestRow, + link_old: Link, + row_new: TestRow, + link_new: Link, + ) -> core::result::Result<(), IndexError> { + let mut inserted_indexes: Vec = vec![]; + let row = &row_new; + let val_new = row.test.clone(); + let row = &row_old; + let val_old = row.test.clone(); + if val_new != val_old { + if self + .test_idx + .insert_checked(val_new.clone(), link_new) + .is_none() + { + return Err(IndexError::AlreadyExists { + at: TestAvailableIndexes::TestIdx, + inserted_already: inserted_indexes.clone(), + }); + } + inserted_indexes.push(TestAvailableIndexes::TestIdx); + } + let row = &row_new; + let val_new = row.exchange.clone(); + let row = &row_old; + let val_old = row.exchange.clone(); + self.exchnage_idx.insert(val_new.clone(), link_new); + TableIndex::remove(&self.exchnage_idx, val_old, link_old); + let row = &row_new; + let val_new = row.test.clone(); + let row = &row_old; + let val_old = row.test.clone(); + if val_new == val_old { + self.test_idx.insert(val_new.clone(), link_new); + } else { + TableIndex::remove(&self.test_idx, val_old, link_old); + } + let row = &row_new; + let val_new = row.another.clone(); + let row = &row_old; + let val_old = row.another.clone(); + self.another_idx.insert(val_new.clone(), link_new); + TableIndex::remove(&self.another_idx, val_old, link_old); + core::result::Result::Ok(()) + } + fn delete_row( + &self, + row: TestRow, + link: Link, + ) -> core::result::Result<(), IndexError> { + self.exchnage_idx.remove(&row.exchange, &link); + self.test_idx.remove(&row.test); + self.another_idx.remove(&row.another, &link); + core::result::Result::Ok(()) + } + fn process_difference_insert( + &self, + link: Link, + difference: std::collections::HashMap<&str, Difference>, + ) -> core::result::Result<(), IndexError> { + let mut inserted_indexes: Vec = vec![]; + if let Some(diff) = difference.get("exchange") { + if let TestAvaiableTypes::STRING(new) = &diff.new { + let key_new = new.to_string(); + if TableIndex::insert_checked(&self.exchnage_idx, key_new, link).is_none() { + return Err(IndexError::AlreadyExists { + at: TestAvailableIndexes::ExchnageIdx, + inserted_already: inserted_indexes.clone(), + }); + } + inserted_indexes.push(TestAvailableIndexes::ExchnageIdx); + } + } + if let Some(diff) = difference.get("test") { + if let TestAvaiableTypes::I64(new) = &diff.new { + let key_new = *new; + if TableIndex::insert_checked(&self.test_idx, key_new, link).is_none() { + return Err(IndexError::AlreadyExists { + at: TestAvailableIndexes::TestIdx, + inserted_already: inserted_indexes.clone(), + }); + } + inserted_indexes.push(TestAvailableIndexes::TestIdx); + } + } + if let Some(diff) = difference.get("another") { + if let TestAvaiableTypes::U64(new) = &diff.new { + let key_new = *new; + if TableIndex::insert_checked(&self.another_idx, key_new, link).is_none() { + return Err(IndexError::AlreadyExists { + at: TestAvailableIndexes::AnotherIdx, + inserted_already: inserted_indexes.clone(), + }); + } + inserted_indexes.push(TestAvailableIndexes::AnotherIdx); + } + } + core::result::Result::Ok(()) + } + fn process_difference_remove( + &self, + link: Link, + difference: std::collections::HashMap<&str, Difference>, + ) -> core::result::Result<(), IndexError> { + if let Some(diff) = difference.get("exchange") { + if let TestAvaiableTypes::STRING(old) = &diff.old { + let key_old = old.to_string(); + TableIndex::remove(&self.exchnage_idx, key_old, link); + } + } + if let Some(diff) = difference.get("test") { + if let TestAvaiableTypes::I64(old) = &diff.old { + let key_old = *old; + TableIndex::remove(&self.test_idx, key_old, link); + } + } + if let Some(diff) = difference.get("another") { + if let TestAvaiableTypes::U64(old) = &diff.old { + let key_old = *old; + TableIndex::remove(&self.another_idx, key_old, link); + } + } + core::result::Result::Ok(()) + } + fn delete_from_indexes( + &self, + row: TestRow, + link: Link, + indexes: Vec, + ) -> core::result::Result<(), IndexError> { + for index in indexes { + match index { + TestAvailableIndexes::ExchnageIdx => { + self.exchnage_idx.remove(&row.exchange, &link); + } + TestAvailableIndexes::TestIdx => { + self.test_idx.remove(&row.test); + } + TestAvailableIndexes::AnotherIdx => { + self.another_idx.remove(&row.another, &link); + } + } + } + core::result::Result::Ok(()) + } +} +impl TableSecondaryIndexInfo for TestIndex { + fn index_info(&self) -> Vec { + let mut info = Vec::new(); + info.push(IndexInfo { + name: "exchnage_idx".to_string(), + index_type: IndexKind::NonUnique, + key_count: self.exchnage_idx.len(), + capacity: self.exchnage_idx.capacity(), + heap_size: self.exchnage_idx.heap_size(), + used_size: self.exchnage_idx.used_size(), + node_count: self.exchnage_idx.node_count(), + }); + info.push(IndexInfo { + name: "test_idx".to_string(), + index_type: IndexKind::Unique, + key_count: self.test_idx.len(), + capacity: self.test_idx.capacity(), + heap_size: self.test_idx.heap_size(), + used_size: self.test_idx.used_size(), + node_count: self.test_idx.node_count(), + }); + info.push(IndexInfo { + name: "another_idx".to_string(), + index_type: IndexKind::NonUnique, + key_count: self.another_idx.len(), + capacity: self.another_idx.capacity(), + heap_size: self.another_idx.heap_size(), + used_size: self.another_idx.used_size(), + node_count: self.another_idx.node_count(), + }); + info + } + fn is_empty(&self) -> bool { + self.exchnage_idx.len() == 0 && self.test_idx.len() == 0 && self.another_idx.len() == 0 + } +} +impl Default for TestIndex { + fn default() -> Self { + Self { + exchnage_idx: IndexMultiMap::with_maximum_node_size(TEST_INNER_SIZE), + test_idx: IndexMap::with_maximum_node_size( + get_index_page_size_from_data_length::(TEST_INNER_SIZE), + ), + another_idx: IndexMultiMap::with_maximum_node_size( + get_index_page_size_from_data_length::(TEST_INNER_SIZE), + ), } } +} +#[derive(Debug, Clone, Copy, MoreDisplay, PartialEq, PartialOrd, Ord, Hash, Eq)] +pub enum TestAvailableIndexes { + ExchnageIdx, + TestIdx, + AnotherIdx, +} +impl AvailableIndex for TestAvailableIndexes { + fn to_string_value(&self) -> String { + ToString::to_string(&self) + } +} +const TEST_PAGE_SIZE: usize = PAGE_SIZE; +const TEST_INNER_SIZE: usize = TEST_PAGE_SIZE - GENERAL_HEADER_SIZE; +#[derive(Debug)] +pub struct TestWorkTable( + WorkTable< + TestRow, + TestPrimaryKey, + TestAvaiableTypes, + TestAvailableIndexes, + TestIndex, + TestLock, + ::Generator, + Vec>, + >, ); +impl Default for TestWorkTable { + fn default() -> Self { + let mut inner = WorkTable::default(); + inner.table_name = "Test"; + Self(inner) + } +} +impl TestWorkTable { + pub fn name(&self) -> &'static str { + &self.0.table_name + } + pub fn select(&self, pk: Pk) -> Option + where + TestPrimaryKey: From, + { + self.0.select(pk.into()) + } + pub fn insert(&self, row: TestRow) -> core::result::Result { + self.0.insert(row) + } + pub fn reinsert( + &self, + row_old: TestRow, + row_new: TestRow, + ) -> core::result::Result { + self.0.reinsert(row_old, row_new) + } + pub async fn upsert(&self, row: TestRow) -> core::result::Result<(), WorkTableError> { + let pk = row.get_primary_key(); + let need_to_update = { + if let Some(_) = self.0.pk_map.get(&pk) { + true + } else { + false + } + }; + if need_to_update { + self.update(row).await?; + } else { + self.insert(row)?; + } + core::result::Result::Ok(()) + } + pub fn count(&self) -> usize { + let count = self.0.pk_map.len(); + count + } + pub fn get_next_pk(&self) -> TestPrimaryKey { + self.0.get_next_pk() + } + pub fn iter_with core::result::Result<(), WorkTableError>>( + &self, + f: F, + ) -> core::result::Result<(), WorkTableError> { + let first = self.0.pk_map.iter().next().map(|(k, v)| (k.clone(), *v)); + let Some((mut k, link)) = first else { + return Ok(()); + }; + let data = self + .0 + .data + .select_non_ghosted(link) + .map_err(WorkTableError::PagesError)?; + f(data)?; + let mut ind = false; + while !ind { + let next = { + let mut iter = self.0.pk_map.range(k.clone()..); + let next = iter + .next() + .map(|(k, v)| (k.clone(), *v)) + .filter(|(key, _)| key != &k); + if next.is_some() { + next + } else { + iter.next().map(|(k, v)| (k.clone(), *v)) + } + }; + if let Some((key, link)) = next { + let data = self + .0 + .data + .select_non_ghosted(link) + .map_err(WorkTableError::PagesError)?; + f(data)?; + k = key + } else { + ind = true; + }; + } + core::result::Result::Ok(()) + } + pub async fn iter_with_async< + F: Fn(TestRow) -> Fut, + Fut: std::future::Future>, + >( + &self, + f: F, + ) -> core::result::Result<(), WorkTableError> { + let first = self.0.pk_map.iter().next().map(|(k, v)| (k.clone(), *v)); + let Some((mut k, link)) = first else { + return Ok(()); + }; + let data = self + .0 + .data + .select_non_ghosted(link) + .map_err(WorkTableError::PagesError)?; + f(data).await?; + let mut ind = false; + while !ind { + let next = { + let mut iter = self.0.pk_map.range(k.clone()..); + let next = iter + .next() + .map(|(k, v)| (k.clone(), *v)) + .filter(|(key, _)| key != &k); + if next.is_some() { + next + } else { + iter.next().map(|(k, v)| (k.clone(), *v)) + } + }; + if let Some((key, link)) = next { + let data = self + .0 + .data + .select_non_ghosted(link) + .map_err(WorkTableError::PagesError)?; + f(data).await?; + k = key + } else { + ind = true; + }; + } + core::result::Result::Ok(()) + } + pub fn system_info(&self) -> SystemInfo { + self.0.system_info() + } +} +impl TestWorkTable { + pub fn select_by_exchange( + &self, + by: String, + ) -> SelectQueryBuilder< + TestRow, + impl DoubleEndedIterator + '_, + TestColumnRange, + TestRowFields, + > { + let rows = self + .0 + .indexes + .exchnage_idx + .get(&by) + .into_iter() + .filter_map(|(_, link)| self.0.data.select_non_ghosted(*link).ok()) + .filter(move |r| &r.exchange == &by); + SelectQueryBuilder::new(rows) + } + pub fn select_by_test(&self, by: i64) -> Option { + let link = self.0.indexes.test_idx.get(&by).map(|kv| kv.get().value)?; + self.0.data.select_non_ghosted(link).ok() + } + pub fn select_by_another( + &self, + by: u64, + ) -> SelectQueryBuilder< + TestRow, + impl DoubleEndedIterator + '_, + TestColumnRange, + TestRowFields, + > { + let rows = self + .0 + .indexes + .another_idx + .get(&by) + .into_iter() + .filter_map(|(_, link)| self.0.data.select_non_ghosted(*link).ok()) + .filter(move |r| &r.another == &by); + SelectQueryBuilder::new(rows) + } +} +impl SelectQueryExecutor + for SelectQueryBuilder +where + I: DoubleEndedIterator + Sized, +{ + fn where_by( + self, + predicate: F, + ) -> SelectQueryBuilder< + TestRow, + impl DoubleEndedIterator + Sized, + TestColumnRange, + TestRowFields, + > + where + F: FnMut(&TestRow) -> bool, + { + SelectQueryBuilder { + params: self.params, + iter: self.iter.filter(predicate), + } + } + fn execute(self) -> Result, WorkTableError> { + let mut iter: Box> = Box::new(self.iter); + if !self.params.range.is_empty() { + for (range, column) in &self.params.range { + iter = match (column, range.clone().into()) { + (TestRowFields::Another, TestColumnRange::U64(range)) => { + Box::new(iter.filter(move |row| range.contains(&row.another))) + as Box> + } + (TestRowFields::Another, TestColumnRange::U64Inclusive(range)) => { + Box::new(iter.filter(move |row| range.contains(&row.another))) + as Box> + } + (TestRowFields::Another, TestColumnRange::U64From(range)) => { + Box::new(iter.filter(move |row| range.contains(&row.another))) + as Box> + } + (TestRowFields::Another, TestColumnRange::U64To(range)) => { + Box::new(iter.filter(move |row| range.contains(&row.another))) + as Box> + } + (TestRowFields::Another, TestColumnRange::U64ToInclusive(range)) => { + Box::new(iter.filter(move |row| range.contains(&row.another))) + as Box> + } + (TestRowFields::Id, TestColumnRange::U64(range)) => { + Box::new(iter.filter(move |row| range.contains(&row.id))) + as Box> + } + (TestRowFields::Id, TestColumnRange::U64Inclusive(range)) => { + Box::new(iter.filter(move |row| range.contains(&row.id))) + as Box> + } + (TestRowFields::Id, TestColumnRange::U64From(range)) => { + Box::new(iter.filter(move |row| range.contains(&row.id))) + as Box> + } + (TestRowFields::Id, TestColumnRange::U64To(range)) => { + Box::new(iter.filter(move |row| range.contains(&row.id))) + as Box> + } + (TestRowFields::Id, TestColumnRange::U64ToInclusive(range)) => { + Box::new(iter.filter(move |row| range.contains(&row.id))) + as Box> + } + (TestRowFields::Test, TestColumnRange::I64(range)) => { + Box::new(iter.filter(move |row| range.contains(&row.test))) + as Box> + } + (TestRowFields::Test, TestColumnRange::I64Inclusive(range)) => { + Box::new(iter.filter(move |row| range.contains(&row.test))) + as Box> + } + (TestRowFields::Test, TestColumnRange::I64From(range)) => { + Box::new(iter.filter(move |row| range.contains(&row.test))) + as Box> + } + (TestRowFields::Test, TestColumnRange::I64To(range)) => { + Box::new(iter.filter(move |row| range.contains(&row.test))) + as Box> + } + (TestRowFields::Test, TestColumnRange::I64ToInclusive(range)) => { + Box::new(iter.filter(move |row| range.contains(&row.test))) + as Box> + } + _ => unreachable!(), + }; + } + } + if !self.params.order.is_empty() { + let mut items: Vec = iter.collect(); + items.sort_by(|a, b| { + for (order, col) in &self.params.order { + match col { + TestRowFields::Another => { + let cmp = a + .another + .partial_cmp(&b.another) + .unwrap_or(std::cmp::Ordering::Equal); + if cmp != std::cmp::Ordering::Equal { + return match order { + Order::Asc => cmp, + Order::Desc => cmp.reverse(), + }; + } + } + TestRowFields::Id => { + let cmp = a.id.partial_cmp(&b.id).unwrap_or(std::cmp::Ordering::Equal); + if cmp != std::cmp::Ordering::Equal { + return match order { + Order::Asc => cmp, + Order::Desc => cmp.reverse(), + }; + } + } + TestRowFields::Exchange => { + let cmp = a + .exchange + .partial_cmp(&b.exchange) + .unwrap_or(std::cmp::Ordering::Equal); + if cmp != std::cmp::Ordering::Equal { + return match order { + Order::Asc => cmp, + Order::Desc => cmp.reverse(), + }; + } + } + TestRowFields::Test => { + let cmp = a + .test + .partial_cmp(&b.test) + .unwrap_or(std::cmp::Ordering::Equal); + if cmp != std::cmp::Ordering::Equal { + return match order { + Order::Asc => cmp, + Order::Desc => cmp.reverse(), + }; + } + } + _ => continue, + } + } + std::cmp::Ordering::Equal + }); + iter = Box::new(items.into_iter()); + } + let iter_result: Box> = + if let Some(offset) = self.params.offset { + Box::new(iter.skip(offset)) + } else { + Box::new(iter) + }; + let iter_result: Box> = if let Some(limit) = self.params.limit + { + Box::new(iter_result.take(limit)) + } else { + Box::new(iter_result) + }; + Ok(iter_result.collect()) + } +} +#[derive(Debug, Clone)] +pub enum TestColumnRange { + U64(std::ops::Range), + U64Inclusive(std::ops::RangeInclusive), + U64From(std::ops::RangeFrom), + U64To(std::ops::RangeTo), + U64ToInclusive(std::ops::RangeToInclusive), + I64(std::ops::Range), + I64Inclusive(std::ops::RangeInclusive), + I64From(std::ops::RangeFrom), + I64To(std::ops::RangeTo), + I64ToInclusive(std::ops::RangeToInclusive), +} +impl From> for TestColumnRange { + fn from(range: std::ops::Range) -> Self { + Self::U64(range) + } +} +impl From> for TestColumnRange { + fn from(range: std::ops::RangeInclusive) -> Self { + Self::U64Inclusive(range) + } +} +impl From> for TestColumnRange { + fn from(range: std::ops::RangeFrom) -> Self { + Self::U64From(range) + } +} +impl From> for TestColumnRange { + fn from(range: std::ops::RangeTo) -> Self { + Self::U64To(range) + } +} +impl From> for TestColumnRange { + fn from(range: std::ops::RangeToInclusive) -> Self { + Self::U64ToInclusive(range) + } +} +impl From> for TestColumnRange { + fn from(range: std::ops::Range) -> Self { + Self::I64(range) + } +} +impl From> for TestColumnRange { + fn from(range: std::ops::RangeInclusive) -> Self { + Self::I64Inclusive(range) + } +} +impl From> for TestColumnRange { + fn from(range: std::ops::RangeFrom) -> Self { + Self::I64From(range) + } +} +impl From> for TestColumnRange { + fn from(range: std::ops::RangeTo) -> Self { + Self::I64To(range) + } +} +impl From> for TestColumnRange { + fn from(range: std::ops::RangeToInclusive) -> Self { + Self::I64ToInclusive(range) + } +} +#[derive(rkyv::Archive, Debug, rkyv::Deserialize, Clone, rkyv::Serialize)] +#[repr(C)] +pub struct ExchangeByTestQuery { + pub exchange: String, +} +impl Query for ExchangeByTestQuery { + fn merge(self, mut row: TestRow) -> TestRow { + row.exchange = self.exchange; + row + } +} +#[derive(rkyv::Archive, Debug, rkyv::Deserialize, Clone, rkyv::Serialize)] +#[repr(C)] +pub struct ExchangeByIdQuery { + pub exchange: String, +} +impl Query for ExchangeByIdQuery { + fn merge(self, mut row: TestRow) -> TestRow { + row.exchange = self.exchange; + row + } +} +#[derive(rkyv::Archive, Debug, rkyv::Deserialize, Clone, rkyv::Serialize)] +#[repr(C)] +pub struct ExchangeByAbotherQuery { + pub exchange: String, +} +impl Query for ExchangeByAbotherQuery { + fn merge(self, mut row: TestRow) -> TestRow { + row.exchange = self.exchange; + row + } +} +pub type ExchangeByTestBy = i64; +pub type ExchangeByIdBy = u64; +pub type ExchangeByAbotherBy = u64; +impl TestLock { + #[allow(clippy::mutable_key_type)] + pub fn lock_update_exchange_by_test( + &mut self, + id: u16, + ) -> ( + std::collections::HashSet>, + std::sync::Arc, + ) { + let mut set = std::collections::HashSet::new(); + let new_lock = std::sync::Arc::new(Lock::new(id)); + if let Some(lock) = &self.exchange_lock { + set.insert(lock.clone()); + } + self.exchange_lock = Some(new_lock.clone()); + (set, new_lock) + } + #[allow(clippy::mutable_key_type)] + pub fn lock_update_exchange_by_id( + &mut self, + id: u16, + ) -> ( + std::collections::HashSet>, + std::sync::Arc, + ) { + let mut set = std::collections::HashSet::new(); + let new_lock = std::sync::Arc::new(Lock::new(id)); + if let Some(lock) = &self.exchange_lock { + set.insert(lock.clone()); + } + self.exchange_lock = Some(new_lock.clone()); + (set, new_lock) + } + #[allow(clippy::mutable_key_type)] + pub fn lock_update_exchange_by_abother( + &mut self, + id: u16, + ) -> ( + std::collections::HashSet>, + std::sync::Arc, + ) { + let mut set = std::collections::HashSet::new(); + let new_lock = std::sync::Arc::new(Lock::new(id)); + if let Some(lock) = &self.exchange_lock { + set.insert(lock.clone()); + } + self.exchange_lock = Some(new_lock.clone()); + (set, new_lock) + } +} +impl TestWorkTable { + pub fn select_all( + &self, + ) -> SelectQueryBuilder< + TestRow, + impl DoubleEndedIterator + '_ + Sized, + TestColumnRange, + TestRowFields, + > { + let iter = self + .0 + .pk_map + .iter() + .filter_map(|(_, link)| self.0.data.select_non_ghosted(*link).ok()); + SelectQueryBuilder::new(iter) + } +} +impl TestWorkTable { + pub async fn update(&self, row: TestRow) -> core::result::Result<(), WorkTableError> { + let pk = row.get_primary_key(); + let lock = { + let lock_id = self.0.lock_map.next_id(); + if let Some(lock) = self.0.lock_map.get(&pk) { + let mut lock_guard = lock.write().await; + #[allow(clippy::mutable_key_type)] + let (locks, op_lock) = lock_guard.lock(lock_id); + drop(lock_guard); + futures::future::join_all(locks.iter().map(|l| l.wait()).collect::>()).await; + op_lock + } else { + #[allow(clippy::mutable_key_type)] + let (lock, op_lock) = TestLock::with_lock(lock_id); + let mut lock = std::sync::Arc::new(tokio::sync::RwLock::new(lock)); + let mut guard = lock.write().await; + if let Some(old_lock) = self.0.lock_map.insert(pk.clone(), lock.clone()) { + let mut old_lock_guard = old_lock.write().await; + #[allow(clippy::mutable_key_type)] + let locks = guard.merge(&mut *old_lock_guard); + drop(old_lock_guard); + drop(guard); + futures::future::join_all(locks.iter().map(|l| l.wait()).collect::>()) + .await; + } + op_lock + } + }; + let link = self + .0 + .pk_map + .get(&pk) + .map(|v| v.get().value) + .ok_or(WorkTableError::NotFound)?; + let row_old = self.0.data.select_non_ghosted(link)?; + self.0.update_state.insert(pk.clone(), row_old); + let mut bytes = rkyv::to_bytes::(&row) + .map_err(|_| WorkTableError::SerializeError)?; + if true { + lock.unlock(); + let lock = { + let lock_id = self.0.lock_map.next_id(); + if let Some(lock) = self.0.lock_map.get(&pk) { + let mut lock_guard = lock.write().await; + #[allow(clippy::mutable_key_type)] + let (locks, op_lock) = lock_guard.lock(lock_id); + drop(lock_guard); + futures::future::join_all(locks.iter().map(|l| l.wait()).collect::>()) + .await; + op_lock + } else { + #[allow(clippy::mutable_key_type)] + let (lock, op_lock) = TestLock::with_lock(lock_id); + let mut lock = std::sync::Arc::new(tokio::sync::RwLock::new(lock)); + let mut guard = lock.write().await; + if let Some(old_lock) = self.0.lock_map.insert(pk.clone(), lock.clone()) { + let mut old_lock_guard = old_lock.write().await; + #[allow(clippy::mutable_key_type)] + let locks = guard.merge(&mut *old_lock_guard); + drop(old_lock_guard); + drop(guard); + futures::future::join_all( + locks.iter().map(|l| l.wait()).collect::>(), + ) + .await; + } + op_lock + } + }; + let row_old = self.0.data.select_non_ghosted(link)?; + if let Err(e) = self.reinsert(row_old, row) { + self.0.update_state.remove(&pk); + lock.unlock(); + return Err(e); + } + self.0.update_state.remove(&pk); + lock.unlock(); + self.0.lock_map.remove_with_lock_check(&pk).await; + return core::result::Result::Ok(()); + } + let mut archived_row = unsafe { + rkyv::access_unchecked_mut::<::Archived>(&mut bytes[..]) + .unseal_unchecked() + }; + let op_id = OperationId::Single(uuid::Uuid::now_v7()); + let row_old = self.0.data.select_non_ghosted(link)?; + let row_new = row.clone(); + let updated_bytes: Vec = vec![]; + let mut diffs: std::collections::HashMap<&str, Difference> = + std::collections::HashMap::new(); + let old = &row_old.exchange; + let new = &row_new.exchange; + if old != new { + let diff = Difference:: { + old: old.clone().into(), + new: new.clone().into(), + }; + diffs.insert("exchange", diff); + } + let old = &row_old.test; + let new = &row_new.test; + if old != new { + let diff = Difference:: { + old: old.clone().into(), + new: new.clone().into(), + }; + diffs.insert("test", diff); + } + let old = &row_old.another; + let new = &row_new.another; + if old != new { + let diff = Difference:: { + old: old.clone().into(), + new: new.clone().into(), + }; + diffs.insert("another", diff); + } + let indexes_res = self + .0 + .indexes + .process_difference_insert(link, diffs.clone()); + if let Err(e) = indexes_res { + return match e { + IndexError::AlreadyExists { + at, + inserted_already, + } => { + self.0.indexes.delete_from_indexes( + row_new.merge(row_old.clone()), + link, + inserted_already, + )?; + Err(WorkTableError::AlreadyExists(at.to_string_value())) + } + IndexError::NotFound => Err(WorkTableError::NotFound), + }; + } + unsafe { + self.0 + .data + .with_mut_ref(link, move |archived| { + std::mem::swap(&mut archived.inner.another, &mut archived_row.another); + std::mem::swap(&mut archived.inner.id, &mut archived_row.id); + std::mem::swap(&mut archived.inner.exchange, &mut archived_row.exchange); + std::mem::swap(&mut archived.inner.test, &mut archived_row.test); + }) + .map_err(WorkTableError::PagesError)? + }; + self.0.indexes.process_difference_remove(link, diffs)?; + self.0.update_state.remove(&pk); + lock.unlock(); + self.0.lock_map.remove_with_lock_check(&pk).await; + core::result::Result::Ok(()) + } + pub async fn update_exchange_by_test( + &self, + row: ExchangeByTestQuery, + by: ExchangeByTestBy, + ) -> core::result::Result<(), WorkTableError> { + let mut bytes = rkyv::to_bytes::(&row) + .map_err(|_| WorkTableError::SerializeError)?; + let mut archived_row = unsafe { + rkyv::access_unchecked_mut::<::Archived>( + &mut bytes[..], + ) + .unseal_unchecked() + }; + let link = self + .0 + .indexes + .test_idx + .get(&by) + .map(|kv| kv.get().value) + .ok_or(WorkTableError::NotFound)?; + let pk = self + .0 + .data + .select_non_ghosted(link)? + .get_primary_key() + .clone(); + let lock = { + let lock_id = self.0.lock_map.next_id(); + if let Some(lock) = self.0.lock_map.get(&pk) { + let mut lock_guard = lock.write().await; + #[allow(clippy::mutable_key_type)] + let (locks, op_lock) = lock_guard.lock_update_exchange_by_test(lock_id); + drop(lock_guard); + futures::future::join_all(locks.iter().map(|l| l.wait()).collect::>()).await; + op_lock + } else { + let mut lock = TestLock::new(); + #[allow(clippy::mutable_key_type)] + let (_, op_lock) = lock.lock_update_exchange_by_test(lock_id); + let lock = std::sync::Arc::new(tokio::sync::RwLock::new(lock)); + let mut guard = lock.write().await; + if let Some(old_lock) = self.0.lock_map.insert(pk.clone(), lock.clone()) { + let mut old_lock_guard = old_lock.write().await; + #[allow(clippy::mutable_key_type)] + let locks = guard.merge(&mut *old_lock_guard); + drop(old_lock_guard); + drop(guard); + futures::future::join_all(locks.iter().map(|l| l.wait()).collect::>()) + .await; + } + op_lock + } + }; + let link = self + .0 + .indexes + .test_idx + .get(&by) + .map(|kv| kv.get().value) + .ok_or(WorkTableError::NotFound)?; + let op_id = OperationId::Single(uuid::Uuid::now_v7()); + let mut need_to_reinsert = true; + need_to_reinsert |= archived_row.get_exchange_size() != self.get_exchange_size(link)?; + if need_to_reinsert { + lock.unlock(); + let lock = { + let lock_id = self.0.lock_map.next_id(); + if let Some(lock) = self.0.lock_map.get(&pk) { + let mut lock_guard = lock.write().await; + #[allow(clippy::mutable_key_type)] + let (locks, op_lock) = lock_guard.lock(lock_id); + drop(lock_guard); + futures::future::join_all(locks.iter().map(|l| l.wait()).collect::>()) + .await; + op_lock + } else { + #[allow(clippy::mutable_key_type)] + let (lock, op_lock) = TestLock::with_lock(lock_id); + let mut lock = std::sync::Arc::new(tokio::sync::RwLock::new(lock)); + let mut guard = lock.write().await; + if let Some(old_lock) = self.0.lock_map.insert(pk.clone(), lock.clone()) { + let mut old_lock_guard = old_lock.write().await; + #[allow(clippy::mutable_key_type)] + let locks = guard.merge(&mut *old_lock_guard); + drop(old_lock_guard); + drop(guard); + futures::future::join_all( + locks.iter().map(|l| l.wait()).collect::>(), + ) + .await; + } + op_lock + } + }; + let row_old = self + .0 + .select(pk.clone()) + .expect("should not be deleted by other thread"); + let mut row_new = row_old.clone(); + let pk = row_old.get_primary_key().clone(); + row_new.exchange = row.exchange; + if let Err(e) = self.reinsert(row_old, row_new) { + self.0.update_state.remove(&pk); + lock.unlock(); + return Err(e); + } + lock.unlock(); + self.0.lock_map.remove_with_lock_check(&pk).await; + return core::result::Result::Ok(()); + } + let row_old = self.0.data.select_non_ghosted(link)?; + let row_new = row.clone(); + let updated_bytes: Vec = vec![]; + let mut diffs: std::collections::HashMap<&str, Difference> = + std::collections::HashMap::new(); + let old = &row_old.exchange; + let new = &row_new.exchange; + if old != new { + let diff = Difference:: { + old: old.clone().into(), + new: new.clone().into(), + }; + diffs.insert("exchange", diff); + } + let indexes_res = self + .0 + .indexes + .process_difference_insert(link, diffs.clone()); + if let Err(e) = indexes_res { + return match e { + IndexError::AlreadyExists { + at, + inserted_already, + } => { + self.0.indexes.delete_from_indexes( + row_new.merge(row_old.clone()), + link, + inserted_already, + )?; + Err(WorkTableError::AlreadyExists(at.to_string_value())) + } + IndexError::NotFound => Err(WorkTableError::NotFound), + }; + } + unsafe { + self.0 + .data + .with_mut_ref(link, |archived| { + std::mem::swap(&mut archived.inner.exchange, &mut archived_row.exchange); + }) + .map_err(WorkTableError::PagesError)?; + } + self.0.indexes.process_difference_remove(link, diffs)?; + lock.unlock(); + self.0.lock_map.remove_with_lock_check(&pk).await; + core::result::Result::Ok(()) + } + pub async fn update_exchange_by_id( + &self, + row: ExchangeByIdQuery, + pk: Pk, + ) -> core::result::Result<(), WorkTableError> + where + TestPrimaryKey: From, + { + let pk = pk.into(); + let lock = { + let lock_id = self.0.lock_map.next_id(); + if let Some(lock) = self.0.lock_map.get(&pk) { + let mut lock_guard = lock.write().await; + #[allow(clippy::mutable_key_type)] + let (locks, op_lock) = lock_guard.lock_update_exchange_by_id(lock_id); + drop(lock_guard); + futures::future::join_all(locks.iter().map(|l| l.wait()).collect::>()).await; + op_lock + } else { + let mut lock = TestLock::new(); + #[allow(clippy::mutable_key_type)] + let (_, op_lock) = lock.lock_update_exchange_by_id(lock_id); + let lock = std::sync::Arc::new(tokio::sync::RwLock::new(lock)); + let mut guard = lock.write().await; + if let Some(old_lock) = self.0.lock_map.insert(pk.clone(), lock.clone()) { + let mut old_lock_guard = old_lock.write().await; + #[allow(clippy::mutable_key_type)] + let locks = guard.merge(&mut *old_lock_guard); + drop(old_lock_guard); + drop(guard); + futures::future::join_all(locks.iter().map(|l| l.wait()).collect::>()) + .await; + } + op_lock + } + }; + let link = self + .0 + .pk_map + .get(&pk) + .map(|v| v.get().value) + .ok_or(WorkTableError::NotFound)?; + let mut bytes = rkyv::to_bytes::(&row) + .map_err(|_| WorkTableError::SerializeError)?; + let mut archived_row = unsafe { + rkyv::access_unchecked_mut::<::Archived>( + &mut bytes[..], + ) + .unseal_unchecked() + }; + let op_id = OperationId::Single(uuid::Uuid::now_v7()); + let mut need_to_reinsert = true; + need_to_reinsert |= archived_row.get_exchange_size() != self.get_exchange_size(link)?; + if need_to_reinsert { + lock.unlock(); + let lock = { + let lock_id = self.0.lock_map.next_id(); + if let Some(lock) = self.0.lock_map.get(&pk) { + let mut lock_guard = lock.write().await; + #[allow(clippy::mutable_key_type)] + let (locks, op_lock) = lock_guard.lock(lock_id); + drop(lock_guard); + futures::future::join_all(locks.iter().map(|l| l.wait()).collect::>()) + .await; + op_lock + } else { + #[allow(clippy::mutable_key_type)] + let (lock, op_lock) = TestLock::with_lock(lock_id); + let mut lock = std::sync::Arc::new(tokio::sync::RwLock::new(lock)); + let mut guard = lock.write().await; + if let Some(old_lock) = self.0.lock_map.insert(pk.clone(), lock.clone()) { + let mut old_lock_guard = old_lock.write().await; + #[allow(clippy::mutable_key_type)] + let locks = guard.merge(&mut *old_lock_guard); + drop(old_lock_guard); + drop(guard); + futures::future::join_all( + locks.iter().map(|l| l.wait()).collect::>(), + ) + .await; + } + op_lock + } + }; + let row_old = self + .0 + .select(pk.clone()) + .expect("should not be deleted by other thread"); + let mut row_new = row_old.clone(); + let pk = row_old.get_primary_key().clone(); + row_new.exchange = row.exchange; + if let Err(e) = self.reinsert(row_old, row_new) { + self.0.update_state.remove(&pk); + lock.unlock(); + return Err(e); + } + lock.unlock(); + self.0.lock_map.remove_with_lock_check(&pk).await; + return core::result::Result::Ok(()); + } + let row_old = self.0.data.select_non_ghosted(link)?; + let row_new = row.clone(); + let updated_bytes: Vec = vec![]; + let mut diffs: std::collections::HashMap<&str, Difference> = + std::collections::HashMap::new(); + let old = &row_old.exchange; + let new = &row_new.exchange; + if old != new { + let diff = Difference:: { + old: old.clone().into(), + new: new.clone().into(), + }; + diffs.insert("exchange", diff); + } + let indexes_res = self + .0 + .indexes + .process_difference_insert(link, diffs.clone()); + if let Err(e) = indexes_res { + return match e { + IndexError::AlreadyExists { + at, + inserted_already, + } => { + self.0.indexes.delete_from_indexes( + row_new.merge(row_old.clone()), + link, + inserted_already, + )?; + Err(WorkTableError::AlreadyExists(at.to_string_value())) + } + IndexError::NotFound => Err(WorkTableError::NotFound), + }; + } + unsafe { + self.0 + .data + .with_mut_ref(link, |archived| { + std::mem::swap(&mut archived.inner.exchange, &mut archived_row.exchange); + }) + .map_err(WorkTableError::PagesError)? + }; + self.0.indexes.process_difference_remove(link, diffs)?; + lock.unlock(); + self.0.lock_map.remove_with_lock_check(&pk).await; + core::result::Result::Ok(()) + } + pub async fn update_exchange_by_abother( + &self, + row: ExchangeByAbotherQuery, + by: ExchangeByAbotherBy, + ) -> core::result::Result<(), WorkTableError> { + let links: Vec<_> = self + .0 + .indexes + .another_idx + .get(&by) + .map(|(_, l)| *l) + .collect(); + let mut locks = std::collections::HashMap::new(); + for link in links.iter() { + let pk = self + .0 + .data + .select_non_ghosted(*link)? + .get_primary_key() + .clone(); + let op_lock = { + let lock_id = self.0.lock_map.next_id(); + if let Some(lock) = self.0.lock_map.get(&pk) { + let mut lock_guard = lock.write().await; + #[allow(clippy::mutable_key_type)] + let (locks, op_lock) = lock_guard.lock_update_exchange_by_abother(lock_id); + drop(lock_guard); + futures::future::join_all(locks.iter().map(|l| l.wait()).collect::>()) + .await; + op_lock + } else { + let mut lock = TestLock::new(); + #[allow(clippy::mutable_key_type)] + let (_, op_lock) = lock.lock_update_exchange_by_abother(lock_id); + let lock = std::sync::Arc::new(tokio::sync::RwLock::new(lock)); + let mut guard = lock.write().await; + if let Some(old_lock) = self.0.lock_map.insert(pk.clone(), lock.clone()) { + let mut old_lock_guard = old_lock.write().await; + #[allow(clippy::mutable_key_type)] + let locks = guard.merge(&mut *old_lock_guard); + drop(old_lock_guard); + drop(guard); + futures::future::join_all( + locks.iter().map(|l| l.wait()).collect::>(), + ) + .await; + } + op_lock + } + }; + locks.insert(pk, op_lock); + } + let links: Vec<_> = self + .0 + .indexes + .another_idx + .get(&by) + .map(|(_, l)| *l) + .collect(); + let mut pk_to_unlock: std::collections::HashMap<_, std::sync::Arc> = + std::collections::HashMap::new(); + let op_id = OperationId::Multi(uuid::Uuid::now_v7()); + for link in links.into_iter() { + let pk = self + .0 + .data + .select_non_ghosted(link)? + .get_primary_key() + .clone(); + let mut bytes = rkyv::to_bytes::(&row) + .map_err(|_| WorkTableError::SerializeError)?; + let mut archived_row = unsafe { + rkyv::access_unchecked_mut::<::Archived>( + &mut bytes[..], + ) + .unseal_unchecked() + }; + let mut need_to_reinsert = true; + need_to_reinsert |= archived_row.get_exchange_size() != self.get_exchange_size(link)?; + if need_to_reinsert { + let op_lock = locks + .remove(&pk) + .expect("should not be deleted as links are unique"); + op_lock.unlock(); + let lock = { + let lock_id = self.0.lock_map.next_id(); + if let Some(lock) = self.0.lock_map.get(&pk) { + let mut lock_guard = lock.write().await; + #[allow(clippy::mutable_key_type)] + let (locks, op_lock) = lock_guard.lock(lock_id); + drop(lock_guard); + futures::future::join_all( + locks.iter().map(|l| l.wait()).collect::>(), + ) + .await; + op_lock + } else { + #[allow(clippy::mutable_key_type)] + let (lock, op_lock) = TestLock::with_lock(lock_id); + let mut lock = std::sync::Arc::new(tokio::sync::RwLock::new(lock)); + let mut guard = lock.write().await; + if let Some(old_lock) = self.0.lock_map.insert(pk.clone(), lock.clone()) { + let mut old_lock_guard = old_lock.write().await; + #[allow(clippy::mutable_key_type)] + let locks = guard.merge(&mut *old_lock_guard); + drop(old_lock_guard); + drop(guard); + futures::future::join_all( + locks.iter().map(|l| l.wait()).collect::>(), + ) + .await; + } + op_lock + } + }; + let row_old = self + .select(pk.clone()) + .expect("should not be deleted by other thread"); + let mut row_new = row_old.clone(); + row_new.exchange = row.exchange.clone(); + if let Err(e) = self.reinsert(row_old, row_new) { + self.0.update_state.remove(&pk); + lock.unlock(); + return Err(e); + } + lock.unlock(); + self.0.lock_map.remove_with_lock_check(&pk).await; + continue; + } else { + pk_to_unlock.insert( + pk.clone(), + locks + .remove(&pk) + .expect("should not be deleted as links are unique"), + ); + } + let row_old = self.0.data.select_non_ghosted(link)?; + let row_new = row.clone(); + let updated_bytes: Vec = vec![]; + let mut diffs: std::collections::HashMap<&str, Difference> = + std::collections::HashMap::new(); + let old = &row_old.exchange; + let new = &row_new.exchange; + if old != new { + let diff = Difference:: { + old: old.clone().into(), + new: new.clone().into(), + }; + diffs.insert("exchange", diff); + } + let indexes_res = self + .0 + .indexes + .process_difference_insert(link, diffs.clone()); + if let Err(e) = indexes_res { + return match e { + IndexError::AlreadyExists { + at, + inserted_already, + } => { + self.0.indexes.delete_from_indexes( + row_new.merge(row_old.clone()), + link, + inserted_already, + )?; + Err(WorkTableError::AlreadyExists(at.to_string_value())) + } + IndexError::NotFound => Err(WorkTableError::NotFound), + }; + } + unsafe { + self.0 + .data + .with_mut_ref(link, |archived| { + std::mem::swap(&mut archived.inner.exchange, &mut archived_row.exchange); + }) + .map_err(WorkTableError::PagesError)?; + } + self.0.indexes.process_difference_remove(link, diffs)?; + } + for (pk, lock) in pk_to_unlock { + lock.unlock(); + self.0.lock_map.remove_with_lock_check(&pk).await; + } + core::result::Result::Ok(()) + } +} +impl TestWorkTable {} +impl TestWorkTable { + pub async fn delete(&self, pk: TestPrimaryKey) -> core::result::Result<(), WorkTableError> { + let lock = { + let lock_id = self.0.lock_map.next_id(); + if let Some(lock) = self.0.lock_map.get(&pk) { + let mut lock_guard = lock.write().await; + #[allow(clippy::mutable_key_type)] + let (locks, op_lock) = lock_guard.lock(lock_id); + drop(lock_guard); + futures::future::join_all(locks.iter().map(|l| l.wait()).collect::>()).await; + op_lock + } else { + #[allow(clippy::mutable_key_type)] + let (lock, op_lock) = TestLock::with_lock(lock_id); + let mut lock = std::sync::Arc::new(tokio::sync::RwLock::new(lock)); + let mut guard = lock.write().await; + if let Some(old_lock) = self.0.lock_map.insert(pk.clone(), lock.clone()) { + let mut old_lock_guard = old_lock.write().await; + #[allow(clippy::mutable_key_type)] + let locks = guard.merge(&mut *old_lock_guard); + drop(old_lock_guard); + drop(guard); + futures::future::join_all(locks.iter().map(|l| l.wait()).collect::>()) + .await; + } + op_lock + } + }; + let link = match self + .0 + .pk_map + .get(&pk) + .map(|v| v.get().value) + .ok_or(WorkTableError::NotFound) + { + Ok(l) => l, + Err(e) => { + lock.unlock(); + self.0.lock_map.remove_with_lock_check(&pk).await; + return Err(e); + } + }; + + let row = self.select(pk.clone()).unwrap(); + self.0.indexes.delete_row(row, link)?; + self.0.pk_map.remove(&pk); + self.0 + .data + .delete(link) + .map_err(WorkTableError::PagesError)?; + lock.unlock(); + self.0.lock_map.remove_with_lock_check(&pk).await; + core::result::Result::Ok(()) + } + pub fn delete_without_lock( + &self, + pk: TestPrimaryKey, + ) -> core::result::Result<(), WorkTableError> { + let link = self + .0 + .pk_map + .get(&pk) + .map(|v| v.get().value) + .ok_or(WorkTableError::NotFound)?; + let row = self.select(pk.clone()).unwrap(); + self.0.indexes.delete_row(row, link)?; + self.0.pk_map.remove(&pk); + self.0 + .data + .delete(link) + .map_err(WorkTableError::PagesError)?; + core::result::Result::Ok(()) + } +} +impl TestWorkTable { + fn get_exchange_size(&self, link: Link) -> core::result::Result { + self.0 + .data + .with_ref(link, |row_ref| { + row_ref.inner.exchange.as_str().to_string().aligned_size() + }) + .map_err(WorkTableError::PagesError) + } +} +impl ArchivedExchangeByTestQuery { + pub fn get_exchange_size(&self) -> usize { + self.exchange.as_str().to_string().aligned_size() + } +} +impl ArchivedExchangeByIdQuery { + pub fn get_exchange_size(&self) -> usize { + self.exchange.as_str().to_string().aligned_size() + } +} +impl ArchivedExchangeByAbotherQuery { + pub fn get_exchange_size(&self) -> usize { + self.exchange.as_str().to_string().aligned_size() + } +} #[tokio::test] async fn test_update_string_full_row() { let table = TestWorkTable::default(); @@ -174,9 +1890,13 @@ async fn test_update_string_by_non_unique() { } ); let empty_links = table.0.data.get_empty_links(); - assert_eq!(empty_links.len(), 2); - assert!(empty_links.contains(&first_link)); - assert!(empty_links.contains(&second_link)) + assert_eq!(empty_links.len(), 1); + let l = Link { + page_id: first_link.page_id, + offset: first_link.offset, + length: first_link.length + second_link.length, + }; + assert!(empty_links.contains(&l)) } #[tokio::test] @@ -448,9 +2168,13 @@ async fn test_update_many_strings_by_non_unique() { } ); let empty_links = table.0.data.get_empty_links(); - assert_eq!(empty_links.len(), 2); - assert!(empty_links.contains(&first_link)); - assert!(empty_links.contains(&second_link)) + assert_eq!(empty_links.len(), 1); + let l = Link { + page_id: first_link.page_id, + offset: first_link.offset, + length: first_link.length + second_link.length, + }; + assert!(empty_links.contains(&l)); } #[tokio::test] @@ -512,9 +2236,13 @@ async fn test_update_many_strings_by_string() { } ); let empty_links = table.0.data.get_empty_links(); - assert_eq!(empty_links.len(), 2); - assert!(empty_links.contains(&first_link)); - assert!(empty_links.contains(&second_link)) + assert_eq!(empty_links.len(), 1); + let l = Link { + page_id: first_link.page_id, + offset: first_link.offset, + length: first_link.length + second_link.length, + }; + assert!(empty_links.contains(&l)); } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] @@ -770,6 +2498,70 @@ async fn update_parallel_more_strings_with_select_non_unique() { } } +#[tokio::test(flavor = "multi_thread", worker_threads = 3)] +async fn delete_parallel() { + let table = Arc::new(TestMoreStringsWorkTable::default()); + let deleted_state = Arc::new(Mutex::new(HashSet::new())); + for i in 0..1000 { + let e_val = fastrand::u8(0..100); + let s_val = fastrand::u8(0..100); + let row = TestMoreStringsRow { + id: table.get_next_pk().into(), + test: i + 1, + another: 1, + exchange: format!("test_{e_val}"), + some_string: format!("some_{s_val}"), + other_srting: format!("other_{i}"), + }; + let _ = table.insert(row.clone()).unwrap(); + } + let shared = table.clone(); + let h1 = tokio::spawn(async move { + for i in 1_000..6_000 { + let e_val = fastrand::u8(0..100); + let s_val = fastrand::u8(0..100); + let row = TestMoreStringsRow { + id: shared.get_next_pk().into(), + test: i + 1, + another: 1, + exchange: format!("test_{e_val}"), + some_string: format!("some_{s_val}"), + other_srting: format!("other_{i}"), + }; + let _ = shared.insert(row.clone()).unwrap(); + } + }); + let shared = table.clone(); + let shared_deleted_state = deleted_state.clone(); + let h2 = tokio::spawn(async move { + for _ in 0..1_000 { + let id_to_update = fastrand::u64(0..1000); + let _ = shared.delete(id_to_update.into()).await; + { + let mut guard = shared_deleted_state.lock(); + guard.insert(id_to_update); + } + } + }); + for _ in 0..5_000 { + let val = fastrand::u8(0..100); + let vals = table + .select_by_exchange(format!("test_{val}")) + .execute() + .unwrap(); + for v in vals { + assert_eq!(v.exchange, format!("test_{val}")) + } + } + h1.await.unwrap(); + h2.await.unwrap(); + + for id in deleted_state.lock_arc().iter() { + let row = table.select(*id); + assert!(row.is_none()) + } +} + #[tokio::test(flavor = "multi_thread", worker_threads = 3)] async fn update_parallel_more_strings_with_select_unique() { let table = Arc::new(TestMoreStringsWorkTable::default()); @@ -843,8 +2635,4 @@ async fn update_parallel_more_strings_with_select_unique() { let row = table.select(*id).unwrap(); assert_eq!(&row.exchange, e) } - for (id, a) in a_state.lock_arc().iter() { - let row = table.select(*id).unwrap(); - assert_eq!(&row.another, a) - } } From 4fde351b926bf2a7d0e68a7fa9d70571e9f4b626 Mon Sep 17 00:00:00 2001 From: Handy-caT <37216852+Handy-caT@users.noreply.github.com> Date: Thu, 18 Dec 2025 15:20:03 +0300 Subject: [PATCH 2/3] fix empty links registry related issues --- .../src/worktable/generator/queries/delete.rs | 42 +- .../src/worktable/generator/queries/update.rs | 36 +- tests/worktable/unsized_.rs | 1805 +---------------- 3 files changed, 131 insertions(+), 1752 deletions(-) diff --git a/codegen/src/worktable/generator/queries/delete.rs b/codegen/src/worktable/generator/queries/delete.rs index 9da2983..5b9b13d 100644 --- a/codegen/src/worktable/generator/queries/delete.rs +++ b/codegen/src/worktable/generator/queries/delete.rs @@ -36,7 +36,7 @@ impl Generator { fn gen_full_row_delete(&mut self) -> TokenStream { let name_generator = WorktableNameGenerator::from_table_name(self.name.to_string()); let pk_ident = name_generator.get_primary_key_type_ident(); - let delete_logic = self.gen_delete_logic(); + let delete_logic = self.gen_delete_logic(true); let full_row_lock = self.gen_full_lock_for_update(); quote! { @@ -58,7 +58,7 @@ impl Generator { fn gen_full_row_delete_without_lock(&mut self) -> TokenStream { let name_generator = WorktableNameGenerator::from_table_name(self.name.to_string()); let pk_ident = name_generator.get_primary_key_type_ident(); - let delete_logic = self.gen_delete_logic(); + let delete_logic = self.gen_delete_logic(false); quote! { pub fn delete_without_lock(&self, pk: #pk_ident) -> core::result::Result<(), WorkTableError> { @@ -68,7 +68,7 @@ impl Generator { } } - fn gen_delete_logic(&self) -> TokenStream { + fn gen_delete_logic(&self, is_locked: bool) -> TokenStream { let name_generator = WorktableNameGenerator::from_table_name(self.name.to_string()); let pk_ident = name_generator.get_primary_key_type_ident(); let secondary_events_ident = name_generator.get_space_secondary_index_events_ident(); @@ -97,15 +97,33 @@ impl Generator { self.0.data.delete(link).map_err(WorkTableError::PagesError)?; } }; - - quote! { - let link = self.0 - .pk_map - .get(&pk) - .map(|v| v.get().value) - .ok_or(WorkTableError::NotFound)?; - let row = self.select(pk.clone()).unwrap(); - #process + if is_locked { + quote! { + let link = match self.0 + .pk_map + .get(&pk) + .map(|v| v.get().value) + .ok_or(WorkTableError::NotFound) { + Ok(l) => l, + Err(e) => { + lock.unlock(); // Releases locks + self.0.lock_map.remove_with_lock_check(&pk).await; // Removes locks + return Err(e); + } + }; + let row = self.select(pk.clone()).unwrap(); + #process + } + } else { + quote! { + let link = self.0 + .pk_map + .get(&pk) + .map(|v| v.get().value) + .ok_or(WorkTableError::NotFound)?; + let row = self.select(pk.clone()).unwrap(); + #process + } } } diff --git a/codegen/src/worktable/generator/queries/update.rs b/codegen/src/worktable/generator/queries/update.rs index c84638b..dae5fb4 100644 --- a/codegen/src/worktable/generator/queries/update.rs +++ b/codegen/src/worktable/generator/queries/update.rs @@ -92,11 +92,19 @@ impl Generator { #full_row_lock }; - let link = self.0 + let link = match self.0 .pk_map .get(&pk) .map(|v| v.get().value) - .ok_or(WorkTableError::NotFound)?; + .ok_or(WorkTableError::NotFound) { + Ok(l) => l, + Err(e) => { + lock.unlock(); + self.0.lock_map.remove_with_lock_check(&pk).await; + + return Err(e); + } + }; let row_old = self.0.data.select_non_ghosted(link)?; self.0.update_state.insert(pk.clone(), row_old); @@ -469,11 +477,19 @@ impl Generator { #custom_lock }; - let link = self.0 + let link = match self.0 .pk_map .get(&pk) .map(|v| v.get().value) - .ok_or(WorkTableError::NotFound)?; + .ok_or(WorkTableError::NotFound) { + Ok(l) => l, + Err(e) => { + lock.unlock(); + self.0.lock_map.remove_with_lock_check(&pk).await; + + return Err(e); + } + }; let mut bytes = rkyv::to_bytes::(&row).map_err(|_| WorkTableError::SerializeError)?; let mut archived_row = unsafe { rkyv::access_unchecked_mut::<<#query_ident as rkyv::Archive>::Archived>(&mut bytes[..]).unseal_unchecked() }; @@ -702,10 +718,18 @@ impl Generator { #custom_lock }; - let link = self.0.indexes.#index + let link = match self.0.indexes.#index .get(#by) .map(|kv| kv.get().value) - .ok_or(WorkTableError::NotFound)?; + .ok_or(WorkTableError::NotFound) { + Ok(l) => l, + Err(e) => { + lock.unlock(); + self.0.lock_map.remove_with_lock_check(&pk).await; + + return Err(e); + } + }; let op_id = OperationId::Single(uuid::Uuid::now_v7()); #size_check diff --git a/tests/worktable/unsized_.rs b/tests/worktable/unsized_.rs index ba11904..bca552c 100644 --- a/tests/worktable/unsized_.rs +++ b/tests/worktable/unsized_.rs @@ -6,1745 +6,28 @@ use parking_lot::Mutex; use worktable::prelude::*; use worktable::worktable; -#[derive( - Clone, - rkyv::Archive, - Debug, - Default, - rkyv::Deserialize, - Hash, - rkyv::Serialize, - From, - Eq, - Into, - PartialEq, - PartialOrd, - Ord, - SizeMeasure, -)] -#[rkyv(derive(PartialEq, Eq, PartialOrd, Ord, Debug))] -pub struct TestPrimaryKey(u64); -impl TablePrimaryKey for TestPrimaryKey { - type Generator = std::sync::atomic::AtomicU64; -} -#[derive(rkyv::Archive, Debug, rkyv::Deserialize, Clone, rkyv::Serialize, PartialEq, MemStat)] -#[rkyv(derive(Debug))] -#[repr(C)] -pub struct TestRow { - pub id: u64, - pub test: i64, - pub another: u64, - pub exchange: String, -} -impl TableRow for TestRow { - fn get_primary_key(&self) -> TestPrimaryKey { - self.id.clone().into() - } -} -#[derive(rkyv::Archive, Debug, rkyv::Deserialize, Clone, rkyv::Serialize, PartialEq)] -#[rkyv(derive(Debug))] -#[repr(C)] -pub enum TestRowFields { - Another, - Id, - Exchange, - Test, -} -impl Query for TestRow { - fn merge(self, row: TestRow) -> TestRow { - self - } -} -#[derive(Clone, Debug, From, PartialEq)] -#[non_exhaustive] -pub enum TestAvaiableTypes { - #[from] - STRING(String), - #[from] - U64(u64), - #[from] - I64(i64), -} -#[derive(rkyv::Archive, Debug, rkyv::Deserialize, rkyv::Serialize)] -#[repr(C)] -pub struct TestWrapper { - inner: TestRow, - is_ghosted: bool, - is_deleted: bool, -} -impl RowWrapper for TestWrapper { - fn get_inner(self) -> TestRow { - self.inner - } - fn is_ghosted(&self) -> bool { - self.is_ghosted - } - fn from_inner(inner: TestRow) -> Self { - Self { - inner, - is_ghosted: true, - is_deleted: false, - } - } -} -impl StorableRow for TestRow { - type WrappedRow = TestWrapper; -} -impl GhostWrapper for ArchivedTestWrapper { - fn unghost(&mut self) { - self.is_ghosted = false; - } -} -#[derive(Debug, Clone)] -pub struct TestLock { - another_lock: Option>, - id_lock: Option>, - exchange_lock: Option>, - test_lock: Option>, -} -impl TestLock { - pub fn new() -> Self { - Self { - another_lock: None, - id_lock: None, - exchange_lock: None, - test_lock: None, - } - } -} -impl RowLock for TestLock { - fn is_locked(&self) -> bool { - self.another_lock - .as_ref() - .map(|l| l.is_locked()) - .unwrap_or(false) - || self - .id_lock - .as_ref() - .map(|l| l.is_locked()) - .unwrap_or(false) - || self - .exchange_lock - .as_ref() - .map(|l| l.is_locked()) - .unwrap_or(false) - || self - .test_lock - .as_ref() - .map(|l| l.is_locked()) - .unwrap_or(false) - } - #[allow(clippy::mutable_key_type)] - fn lock( - &mut self, - id: u16, - ) -> ( - std::collections::HashSet>, - std::sync::Arc, - ) { - let mut set = std::collections::HashSet::new(); - let lock = std::sync::Arc::new(Lock::new(id)); - if let Some(lock) = &self.another_lock { - set.insert(lock.clone()); - } - self.another_lock = Some(lock.clone()); - if let Some(lock) = &self.id_lock { - set.insert(lock.clone()); - } - self.id_lock = Some(lock.clone()); - if let Some(lock) = &self.exchange_lock { - set.insert(lock.clone()); - } - self.exchange_lock = Some(lock.clone()); - if let Some(lock) = &self.test_lock { - set.insert(lock.clone()); - } - self.test_lock = Some(lock.clone()); - (set, lock) - } - fn with_lock(id: u16) -> (Self, std::sync::Arc) { - let lock = std::sync::Arc::new(Lock::new(id)); - ( - Self { - another_lock: Some(lock.clone()), - id_lock: Some(lock.clone()), - exchange_lock: Some(lock.clone()), - test_lock: Some(lock.clone()), - }, - lock, - ) - } - #[allow(clippy::mutable_key_type)] - fn merge(&mut self, other: &mut Self) -> std::collections::HashSet> { - let mut set = std::collections::HashSet::new(); - if let Some(another_lock) = &other.another_lock { - if self.another_lock.is_none() { - self.another_lock = Some(another_lock.clone()); - } else { - set.insert(another_lock.clone()); - } - } - other.another_lock = self.another_lock.clone(); - if let Some(id_lock) = &other.id_lock { - if self.id_lock.is_none() { - self.id_lock = Some(id_lock.clone()); - } else { - set.insert(id_lock.clone()); - } - } - other.id_lock = self.id_lock.clone(); - if let Some(exchange_lock) = &other.exchange_lock { - if self.exchange_lock.is_none() { - self.exchange_lock = Some(exchange_lock.clone()); - } else { - set.insert(exchange_lock.clone()); - } - } - other.exchange_lock = self.exchange_lock.clone(); - if let Some(test_lock) = &other.test_lock { - if self.test_lock.is_none() { - self.test_lock = Some(test_lock.clone()); - } else { - set.insert(test_lock.clone()); - } - } - other.test_lock = self.test_lock.clone(); - set - } -} -#[derive(Debug, MemStat)] -pub struct TestIndex { - exchnage_idx: IndexMultiMap>>, - test_idx: IndexMap, - another_idx: IndexMultiMap, -} -impl TableSecondaryIndex for TestIndex { - fn save_row( - &self, - row: TestRow, - link: Link, - ) -> core::result::Result<(), IndexError> { - let mut inserted_indexes: Vec = vec![]; - if self - .exchnage_idx - .insert_checked(row.exchange.clone(), link) - .is_none() - { - return Err(IndexError::AlreadyExists { - at: TestAvailableIndexes::ExchnageIdx, - inserted_already: inserted_indexes.clone(), - }); - } - inserted_indexes.push(TestAvailableIndexes::ExchnageIdx); - if self - .test_idx - .insert_checked(row.test.clone(), link) - .is_none() - { - return Err(IndexError::AlreadyExists { - at: TestAvailableIndexes::TestIdx, - inserted_already: inserted_indexes.clone(), - }); - } - inserted_indexes.push(TestAvailableIndexes::TestIdx); - if self - .another_idx - .insert_checked(row.another.clone(), link) - .is_none() - { - return Err(IndexError::AlreadyExists { - at: TestAvailableIndexes::AnotherIdx, - inserted_already: inserted_indexes.clone(), - }); - } - inserted_indexes.push(TestAvailableIndexes::AnotherIdx); - core::result::Result::Ok(()) - } - fn reinsert_row( - &self, - row_old: TestRow, - link_old: Link, - row_new: TestRow, - link_new: Link, - ) -> core::result::Result<(), IndexError> { - let mut inserted_indexes: Vec = vec![]; - let row = &row_new; - let val_new = row.test.clone(); - let row = &row_old; - let val_old = row.test.clone(); - if val_new != val_old { - if self - .test_idx - .insert_checked(val_new.clone(), link_new) - .is_none() - { - return Err(IndexError::AlreadyExists { - at: TestAvailableIndexes::TestIdx, - inserted_already: inserted_indexes.clone(), - }); - } - inserted_indexes.push(TestAvailableIndexes::TestIdx); - } - let row = &row_new; - let val_new = row.exchange.clone(); - let row = &row_old; - let val_old = row.exchange.clone(); - self.exchnage_idx.insert(val_new.clone(), link_new); - TableIndex::remove(&self.exchnage_idx, val_old, link_old); - let row = &row_new; - let val_new = row.test.clone(); - let row = &row_old; - let val_old = row.test.clone(); - if val_new == val_old { - self.test_idx.insert(val_new.clone(), link_new); - } else { - TableIndex::remove(&self.test_idx, val_old, link_old); - } - let row = &row_new; - let val_new = row.another.clone(); - let row = &row_old; - let val_old = row.another.clone(); - self.another_idx.insert(val_new.clone(), link_new); - TableIndex::remove(&self.another_idx, val_old, link_old); - core::result::Result::Ok(()) - } - fn delete_row( - &self, - row: TestRow, - link: Link, - ) -> core::result::Result<(), IndexError> { - self.exchnage_idx.remove(&row.exchange, &link); - self.test_idx.remove(&row.test); - self.another_idx.remove(&row.another, &link); - core::result::Result::Ok(()) - } - fn process_difference_insert( - &self, - link: Link, - difference: std::collections::HashMap<&str, Difference>, - ) -> core::result::Result<(), IndexError> { - let mut inserted_indexes: Vec = vec![]; - if let Some(diff) = difference.get("exchange") { - if let TestAvaiableTypes::STRING(new) = &diff.new { - let key_new = new.to_string(); - if TableIndex::insert_checked(&self.exchnage_idx, key_new, link).is_none() { - return Err(IndexError::AlreadyExists { - at: TestAvailableIndexes::ExchnageIdx, - inserted_already: inserted_indexes.clone(), - }); - } - inserted_indexes.push(TestAvailableIndexes::ExchnageIdx); - } - } - if let Some(diff) = difference.get("test") { - if let TestAvaiableTypes::I64(new) = &diff.new { - let key_new = *new; - if TableIndex::insert_checked(&self.test_idx, key_new, link).is_none() { - return Err(IndexError::AlreadyExists { - at: TestAvailableIndexes::TestIdx, - inserted_already: inserted_indexes.clone(), - }); - } - inserted_indexes.push(TestAvailableIndexes::TestIdx); - } - } - if let Some(diff) = difference.get("another") { - if let TestAvaiableTypes::U64(new) = &diff.new { - let key_new = *new; - if TableIndex::insert_checked(&self.another_idx, key_new, link).is_none() { - return Err(IndexError::AlreadyExists { - at: TestAvailableIndexes::AnotherIdx, - inserted_already: inserted_indexes.clone(), - }); - } - inserted_indexes.push(TestAvailableIndexes::AnotherIdx); - } - } - core::result::Result::Ok(()) - } - fn process_difference_remove( - &self, - link: Link, - difference: std::collections::HashMap<&str, Difference>, - ) -> core::result::Result<(), IndexError> { - if let Some(diff) = difference.get("exchange") { - if let TestAvaiableTypes::STRING(old) = &diff.old { - let key_old = old.to_string(); - TableIndex::remove(&self.exchnage_idx, key_old, link); - } - } - if let Some(diff) = difference.get("test") { - if let TestAvaiableTypes::I64(old) = &diff.old { - let key_old = *old; - TableIndex::remove(&self.test_idx, key_old, link); - } - } - if let Some(diff) = difference.get("another") { - if let TestAvaiableTypes::U64(old) = &diff.old { - let key_old = *old; - TableIndex::remove(&self.another_idx, key_old, link); - } - } - core::result::Result::Ok(()) - } - fn delete_from_indexes( - &self, - row: TestRow, - link: Link, - indexes: Vec, - ) -> core::result::Result<(), IndexError> { - for index in indexes { - match index { - TestAvailableIndexes::ExchnageIdx => { - self.exchnage_idx.remove(&row.exchange, &link); - } - TestAvailableIndexes::TestIdx => { - self.test_idx.remove(&row.test); - } - TestAvailableIndexes::AnotherIdx => { - self.another_idx.remove(&row.another, &link); - } - } - } - core::result::Result::Ok(()) - } -} -impl TableSecondaryIndexInfo for TestIndex { - fn index_info(&self) -> Vec { - let mut info = Vec::new(); - info.push(IndexInfo { - name: "exchnage_idx".to_string(), - index_type: IndexKind::NonUnique, - key_count: self.exchnage_idx.len(), - capacity: self.exchnage_idx.capacity(), - heap_size: self.exchnage_idx.heap_size(), - used_size: self.exchnage_idx.used_size(), - node_count: self.exchnage_idx.node_count(), - }); - info.push(IndexInfo { - name: "test_idx".to_string(), - index_type: IndexKind::Unique, - key_count: self.test_idx.len(), - capacity: self.test_idx.capacity(), - heap_size: self.test_idx.heap_size(), - used_size: self.test_idx.used_size(), - node_count: self.test_idx.node_count(), - }); - info.push(IndexInfo { - name: "another_idx".to_string(), - index_type: IndexKind::NonUnique, - key_count: self.another_idx.len(), - capacity: self.another_idx.capacity(), - heap_size: self.another_idx.heap_size(), - used_size: self.another_idx.used_size(), - node_count: self.another_idx.node_count(), - }); - info - } - fn is_empty(&self) -> bool { - self.exchnage_idx.len() == 0 && self.test_idx.len() == 0 && self.another_idx.len() == 0 +worktable! ( + name: Test, + columns: { + id: u64 primary_key autoincrement, + test: i64, + another: u64, + exchange: String, + }, + indexes: { + test_idx: test unique, + exchnage_idx: exchange, + another_idx: another, } -} -impl Default for TestIndex { - fn default() -> Self { - Self { - exchnage_idx: IndexMultiMap::with_maximum_node_size(TEST_INNER_SIZE), - test_idx: IndexMap::with_maximum_node_size( - get_index_page_size_from_data_length::(TEST_INNER_SIZE), - ), - another_idx: IndexMultiMap::with_maximum_node_size( - get_index_page_size_from_data_length::(TEST_INNER_SIZE), - ), + queries: { + update: { + ExchangeByTest(exchange) by test, + ExchangeById(exchange) by id, + ExchangeByAbother(exchange) by another, } } -} -#[derive(Debug, Clone, Copy, MoreDisplay, PartialEq, PartialOrd, Ord, Hash, Eq)] -pub enum TestAvailableIndexes { - ExchnageIdx, - TestIdx, - AnotherIdx, -} -impl AvailableIndex for TestAvailableIndexes { - fn to_string_value(&self) -> String { - ToString::to_string(&self) - } -} -const TEST_PAGE_SIZE: usize = PAGE_SIZE; -const TEST_INNER_SIZE: usize = TEST_PAGE_SIZE - GENERAL_HEADER_SIZE; -#[derive(Debug)] -pub struct TestWorkTable( - WorkTable< - TestRow, - TestPrimaryKey, - TestAvaiableTypes, - TestAvailableIndexes, - TestIndex, - TestLock, - ::Generator, - Vec>, - >, ); -impl Default for TestWorkTable { - fn default() -> Self { - let mut inner = WorkTable::default(); - inner.table_name = "Test"; - Self(inner) - } -} -impl TestWorkTable { - pub fn name(&self) -> &'static str { - &self.0.table_name - } - pub fn select(&self, pk: Pk) -> Option - where - TestPrimaryKey: From, - { - self.0.select(pk.into()) - } - pub fn insert(&self, row: TestRow) -> core::result::Result { - self.0.insert(row) - } - pub fn reinsert( - &self, - row_old: TestRow, - row_new: TestRow, - ) -> core::result::Result { - self.0.reinsert(row_old, row_new) - } - pub async fn upsert(&self, row: TestRow) -> core::result::Result<(), WorkTableError> { - let pk = row.get_primary_key(); - let need_to_update = { - if let Some(_) = self.0.pk_map.get(&pk) { - true - } else { - false - } - }; - if need_to_update { - self.update(row).await?; - } else { - self.insert(row)?; - } - core::result::Result::Ok(()) - } - pub fn count(&self) -> usize { - let count = self.0.pk_map.len(); - count - } - pub fn get_next_pk(&self) -> TestPrimaryKey { - self.0.get_next_pk() - } - pub fn iter_with core::result::Result<(), WorkTableError>>( - &self, - f: F, - ) -> core::result::Result<(), WorkTableError> { - let first = self.0.pk_map.iter().next().map(|(k, v)| (k.clone(), *v)); - let Some((mut k, link)) = first else { - return Ok(()); - }; - let data = self - .0 - .data - .select_non_ghosted(link) - .map_err(WorkTableError::PagesError)?; - f(data)?; - let mut ind = false; - while !ind { - let next = { - let mut iter = self.0.pk_map.range(k.clone()..); - let next = iter - .next() - .map(|(k, v)| (k.clone(), *v)) - .filter(|(key, _)| key != &k); - if next.is_some() { - next - } else { - iter.next().map(|(k, v)| (k.clone(), *v)) - } - }; - if let Some((key, link)) = next { - let data = self - .0 - .data - .select_non_ghosted(link) - .map_err(WorkTableError::PagesError)?; - f(data)?; - k = key - } else { - ind = true; - }; - } - core::result::Result::Ok(()) - } - pub async fn iter_with_async< - F: Fn(TestRow) -> Fut, - Fut: std::future::Future>, - >( - &self, - f: F, - ) -> core::result::Result<(), WorkTableError> { - let first = self.0.pk_map.iter().next().map(|(k, v)| (k.clone(), *v)); - let Some((mut k, link)) = first else { - return Ok(()); - }; - let data = self - .0 - .data - .select_non_ghosted(link) - .map_err(WorkTableError::PagesError)?; - f(data).await?; - let mut ind = false; - while !ind { - let next = { - let mut iter = self.0.pk_map.range(k.clone()..); - let next = iter - .next() - .map(|(k, v)| (k.clone(), *v)) - .filter(|(key, _)| key != &k); - if next.is_some() { - next - } else { - iter.next().map(|(k, v)| (k.clone(), *v)) - } - }; - if let Some((key, link)) = next { - let data = self - .0 - .data - .select_non_ghosted(link) - .map_err(WorkTableError::PagesError)?; - f(data).await?; - k = key - } else { - ind = true; - }; - } - core::result::Result::Ok(()) - } - pub fn system_info(&self) -> SystemInfo { - self.0.system_info() - } -} -impl TestWorkTable { - pub fn select_by_exchange( - &self, - by: String, - ) -> SelectQueryBuilder< - TestRow, - impl DoubleEndedIterator + '_, - TestColumnRange, - TestRowFields, - > { - let rows = self - .0 - .indexes - .exchnage_idx - .get(&by) - .into_iter() - .filter_map(|(_, link)| self.0.data.select_non_ghosted(*link).ok()) - .filter(move |r| &r.exchange == &by); - SelectQueryBuilder::new(rows) - } - pub fn select_by_test(&self, by: i64) -> Option { - let link = self.0.indexes.test_idx.get(&by).map(|kv| kv.get().value)?; - self.0.data.select_non_ghosted(link).ok() - } - pub fn select_by_another( - &self, - by: u64, - ) -> SelectQueryBuilder< - TestRow, - impl DoubleEndedIterator + '_, - TestColumnRange, - TestRowFields, - > { - let rows = self - .0 - .indexes - .another_idx - .get(&by) - .into_iter() - .filter_map(|(_, link)| self.0.data.select_non_ghosted(*link).ok()) - .filter(move |r| &r.another == &by); - SelectQueryBuilder::new(rows) - } -} -impl SelectQueryExecutor - for SelectQueryBuilder -where - I: DoubleEndedIterator + Sized, -{ - fn where_by( - self, - predicate: F, - ) -> SelectQueryBuilder< - TestRow, - impl DoubleEndedIterator + Sized, - TestColumnRange, - TestRowFields, - > - where - F: FnMut(&TestRow) -> bool, - { - SelectQueryBuilder { - params: self.params, - iter: self.iter.filter(predicate), - } - } - fn execute(self) -> Result, WorkTableError> { - let mut iter: Box> = Box::new(self.iter); - if !self.params.range.is_empty() { - for (range, column) in &self.params.range { - iter = match (column, range.clone().into()) { - (TestRowFields::Another, TestColumnRange::U64(range)) => { - Box::new(iter.filter(move |row| range.contains(&row.another))) - as Box> - } - (TestRowFields::Another, TestColumnRange::U64Inclusive(range)) => { - Box::new(iter.filter(move |row| range.contains(&row.another))) - as Box> - } - (TestRowFields::Another, TestColumnRange::U64From(range)) => { - Box::new(iter.filter(move |row| range.contains(&row.another))) - as Box> - } - (TestRowFields::Another, TestColumnRange::U64To(range)) => { - Box::new(iter.filter(move |row| range.contains(&row.another))) - as Box> - } - (TestRowFields::Another, TestColumnRange::U64ToInclusive(range)) => { - Box::new(iter.filter(move |row| range.contains(&row.another))) - as Box> - } - (TestRowFields::Id, TestColumnRange::U64(range)) => { - Box::new(iter.filter(move |row| range.contains(&row.id))) - as Box> - } - (TestRowFields::Id, TestColumnRange::U64Inclusive(range)) => { - Box::new(iter.filter(move |row| range.contains(&row.id))) - as Box> - } - (TestRowFields::Id, TestColumnRange::U64From(range)) => { - Box::new(iter.filter(move |row| range.contains(&row.id))) - as Box> - } - (TestRowFields::Id, TestColumnRange::U64To(range)) => { - Box::new(iter.filter(move |row| range.contains(&row.id))) - as Box> - } - (TestRowFields::Id, TestColumnRange::U64ToInclusive(range)) => { - Box::new(iter.filter(move |row| range.contains(&row.id))) - as Box> - } - (TestRowFields::Test, TestColumnRange::I64(range)) => { - Box::new(iter.filter(move |row| range.contains(&row.test))) - as Box> - } - (TestRowFields::Test, TestColumnRange::I64Inclusive(range)) => { - Box::new(iter.filter(move |row| range.contains(&row.test))) - as Box> - } - (TestRowFields::Test, TestColumnRange::I64From(range)) => { - Box::new(iter.filter(move |row| range.contains(&row.test))) - as Box> - } - (TestRowFields::Test, TestColumnRange::I64To(range)) => { - Box::new(iter.filter(move |row| range.contains(&row.test))) - as Box> - } - (TestRowFields::Test, TestColumnRange::I64ToInclusive(range)) => { - Box::new(iter.filter(move |row| range.contains(&row.test))) - as Box> - } - _ => unreachable!(), - }; - } - } - if !self.params.order.is_empty() { - let mut items: Vec = iter.collect(); - items.sort_by(|a, b| { - for (order, col) in &self.params.order { - match col { - TestRowFields::Another => { - let cmp = a - .another - .partial_cmp(&b.another) - .unwrap_or(std::cmp::Ordering::Equal); - if cmp != std::cmp::Ordering::Equal { - return match order { - Order::Asc => cmp, - Order::Desc => cmp.reverse(), - }; - } - } - TestRowFields::Id => { - let cmp = a.id.partial_cmp(&b.id).unwrap_or(std::cmp::Ordering::Equal); - if cmp != std::cmp::Ordering::Equal { - return match order { - Order::Asc => cmp, - Order::Desc => cmp.reverse(), - }; - } - } - TestRowFields::Exchange => { - let cmp = a - .exchange - .partial_cmp(&b.exchange) - .unwrap_or(std::cmp::Ordering::Equal); - if cmp != std::cmp::Ordering::Equal { - return match order { - Order::Asc => cmp, - Order::Desc => cmp.reverse(), - }; - } - } - TestRowFields::Test => { - let cmp = a - .test - .partial_cmp(&b.test) - .unwrap_or(std::cmp::Ordering::Equal); - if cmp != std::cmp::Ordering::Equal { - return match order { - Order::Asc => cmp, - Order::Desc => cmp.reverse(), - }; - } - } - _ => continue, - } - } - std::cmp::Ordering::Equal - }); - iter = Box::new(items.into_iter()); - } - let iter_result: Box> = - if let Some(offset) = self.params.offset { - Box::new(iter.skip(offset)) - } else { - Box::new(iter) - }; - let iter_result: Box> = if let Some(limit) = self.params.limit - { - Box::new(iter_result.take(limit)) - } else { - Box::new(iter_result) - }; - Ok(iter_result.collect()) - } -} -#[derive(Debug, Clone)] -pub enum TestColumnRange { - U64(std::ops::Range), - U64Inclusive(std::ops::RangeInclusive), - U64From(std::ops::RangeFrom), - U64To(std::ops::RangeTo), - U64ToInclusive(std::ops::RangeToInclusive), - I64(std::ops::Range), - I64Inclusive(std::ops::RangeInclusive), - I64From(std::ops::RangeFrom), - I64To(std::ops::RangeTo), - I64ToInclusive(std::ops::RangeToInclusive), -} -impl From> for TestColumnRange { - fn from(range: std::ops::Range) -> Self { - Self::U64(range) - } -} -impl From> for TestColumnRange { - fn from(range: std::ops::RangeInclusive) -> Self { - Self::U64Inclusive(range) - } -} -impl From> for TestColumnRange { - fn from(range: std::ops::RangeFrom) -> Self { - Self::U64From(range) - } -} -impl From> for TestColumnRange { - fn from(range: std::ops::RangeTo) -> Self { - Self::U64To(range) - } -} -impl From> for TestColumnRange { - fn from(range: std::ops::RangeToInclusive) -> Self { - Self::U64ToInclusive(range) - } -} -impl From> for TestColumnRange { - fn from(range: std::ops::Range) -> Self { - Self::I64(range) - } -} -impl From> for TestColumnRange { - fn from(range: std::ops::RangeInclusive) -> Self { - Self::I64Inclusive(range) - } -} -impl From> for TestColumnRange { - fn from(range: std::ops::RangeFrom) -> Self { - Self::I64From(range) - } -} -impl From> for TestColumnRange { - fn from(range: std::ops::RangeTo) -> Self { - Self::I64To(range) - } -} -impl From> for TestColumnRange { - fn from(range: std::ops::RangeToInclusive) -> Self { - Self::I64ToInclusive(range) - } -} -#[derive(rkyv::Archive, Debug, rkyv::Deserialize, Clone, rkyv::Serialize)] -#[repr(C)] -pub struct ExchangeByTestQuery { - pub exchange: String, -} -impl Query for ExchangeByTestQuery { - fn merge(self, mut row: TestRow) -> TestRow { - row.exchange = self.exchange; - row - } -} -#[derive(rkyv::Archive, Debug, rkyv::Deserialize, Clone, rkyv::Serialize)] -#[repr(C)] -pub struct ExchangeByIdQuery { - pub exchange: String, -} -impl Query for ExchangeByIdQuery { - fn merge(self, mut row: TestRow) -> TestRow { - row.exchange = self.exchange; - row - } -} -#[derive(rkyv::Archive, Debug, rkyv::Deserialize, Clone, rkyv::Serialize)] -#[repr(C)] -pub struct ExchangeByAbotherQuery { - pub exchange: String, -} -impl Query for ExchangeByAbotherQuery { - fn merge(self, mut row: TestRow) -> TestRow { - row.exchange = self.exchange; - row - } -} -pub type ExchangeByTestBy = i64; -pub type ExchangeByIdBy = u64; -pub type ExchangeByAbotherBy = u64; -impl TestLock { - #[allow(clippy::mutable_key_type)] - pub fn lock_update_exchange_by_test( - &mut self, - id: u16, - ) -> ( - std::collections::HashSet>, - std::sync::Arc, - ) { - let mut set = std::collections::HashSet::new(); - let new_lock = std::sync::Arc::new(Lock::new(id)); - if let Some(lock) = &self.exchange_lock { - set.insert(lock.clone()); - } - self.exchange_lock = Some(new_lock.clone()); - (set, new_lock) - } - #[allow(clippy::mutable_key_type)] - pub fn lock_update_exchange_by_id( - &mut self, - id: u16, - ) -> ( - std::collections::HashSet>, - std::sync::Arc, - ) { - let mut set = std::collections::HashSet::new(); - let new_lock = std::sync::Arc::new(Lock::new(id)); - if let Some(lock) = &self.exchange_lock { - set.insert(lock.clone()); - } - self.exchange_lock = Some(new_lock.clone()); - (set, new_lock) - } - #[allow(clippy::mutable_key_type)] - pub fn lock_update_exchange_by_abother( - &mut self, - id: u16, - ) -> ( - std::collections::HashSet>, - std::sync::Arc, - ) { - let mut set = std::collections::HashSet::new(); - let new_lock = std::sync::Arc::new(Lock::new(id)); - if let Some(lock) = &self.exchange_lock { - set.insert(lock.clone()); - } - self.exchange_lock = Some(new_lock.clone()); - (set, new_lock) - } -} -impl TestWorkTable { - pub fn select_all( - &self, - ) -> SelectQueryBuilder< - TestRow, - impl DoubleEndedIterator + '_ + Sized, - TestColumnRange, - TestRowFields, - > { - let iter = self - .0 - .pk_map - .iter() - .filter_map(|(_, link)| self.0.data.select_non_ghosted(*link).ok()); - SelectQueryBuilder::new(iter) - } -} -impl TestWorkTable { - pub async fn update(&self, row: TestRow) -> core::result::Result<(), WorkTableError> { - let pk = row.get_primary_key(); - let lock = { - let lock_id = self.0.lock_map.next_id(); - if let Some(lock) = self.0.lock_map.get(&pk) { - let mut lock_guard = lock.write().await; - #[allow(clippy::mutable_key_type)] - let (locks, op_lock) = lock_guard.lock(lock_id); - drop(lock_guard); - futures::future::join_all(locks.iter().map(|l| l.wait()).collect::>()).await; - op_lock - } else { - #[allow(clippy::mutable_key_type)] - let (lock, op_lock) = TestLock::with_lock(lock_id); - let mut lock = std::sync::Arc::new(tokio::sync::RwLock::new(lock)); - let mut guard = lock.write().await; - if let Some(old_lock) = self.0.lock_map.insert(pk.clone(), lock.clone()) { - let mut old_lock_guard = old_lock.write().await; - #[allow(clippy::mutable_key_type)] - let locks = guard.merge(&mut *old_lock_guard); - drop(old_lock_guard); - drop(guard); - futures::future::join_all(locks.iter().map(|l| l.wait()).collect::>()) - .await; - } - op_lock - } - }; - let link = self - .0 - .pk_map - .get(&pk) - .map(|v| v.get().value) - .ok_or(WorkTableError::NotFound)?; - let row_old = self.0.data.select_non_ghosted(link)?; - self.0.update_state.insert(pk.clone(), row_old); - let mut bytes = rkyv::to_bytes::(&row) - .map_err(|_| WorkTableError::SerializeError)?; - if true { - lock.unlock(); - let lock = { - let lock_id = self.0.lock_map.next_id(); - if let Some(lock) = self.0.lock_map.get(&pk) { - let mut lock_guard = lock.write().await; - #[allow(clippy::mutable_key_type)] - let (locks, op_lock) = lock_guard.lock(lock_id); - drop(lock_guard); - futures::future::join_all(locks.iter().map(|l| l.wait()).collect::>()) - .await; - op_lock - } else { - #[allow(clippy::mutable_key_type)] - let (lock, op_lock) = TestLock::with_lock(lock_id); - let mut lock = std::sync::Arc::new(tokio::sync::RwLock::new(lock)); - let mut guard = lock.write().await; - if let Some(old_lock) = self.0.lock_map.insert(pk.clone(), lock.clone()) { - let mut old_lock_guard = old_lock.write().await; - #[allow(clippy::mutable_key_type)] - let locks = guard.merge(&mut *old_lock_guard); - drop(old_lock_guard); - drop(guard); - futures::future::join_all( - locks.iter().map(|l| l.wait()).collect::>(), - ) - .await; - } - op_lock - } - }; - let row_old = self.0.data.select_non_ghosted(link)?; - if let Err(e) = self.reinsert(row_old, row) { - self.0.update_state.remove(&pk); - lock.unlock(); - return Err(e); - } - self.0.update_state.remove(&pk); - lock.unlock(); - self.0.lock_map.remove_with_lock_check(&pk).await; - return core::result::Result::Ok(()); - } - let mut archived_row = unsafe { - rkyv::access_unchecked_mut::<::Archived>(&mut bytes[..]) - .unseal_unchecked() - }; - let op_id = OperationId::Single(uuid::Uuid::now_v7()); - let row_old = self.0.data.select_non_ghosted(link)?; - let row_new = row.clone(); - let updated_bytes: Vec = vec![]; - let mut diffs: std::collections::HashMap<&str, Difference> = - std::collections::HashMap::new(); - let old = &row_old.exchange; - let new = &row_new.exchange; - if old != new { - let diff = Difference:: { - old: old.clone().into(), - new: new.clone().into(), - }; - diffs.insert("exchange", diff); - } - let old = &row_old.test; - let new = &row_new.test; - if old != new { - let diff = Difference:: { - old: old.clone().into(), - new: new.clone().into(), - }; - diffs.insert("test", diff); - } - let old = &row_old.another; - let new = &row_new.another; - if old != new { - let diff = Difference:: { - old: old.clone().into(), - new: new.clone().into(), - }; - diffs.insert("another", diff); - } - let indexes_res = self - .0 - .indexes - .process_difference_insert(link, diffs.clone()); - if let Err(e) = indexes_res { - return match e { - IndexError::AlreadyExists { - at, - inserted_already, - } => { - self.0.indexes.delete_from_indexes( - row_new.merge(row_old.clone()), - link, - inserted_already, - )?; - Err(WorkTableError::AlreadyExists(at.to_string_value())) - } - IndexError::NotFound => Err(WorkTableError::NotFound), - }; - } - unsafe { - self.0 - .data - .with_mut_ref(link, move |archived| { - std::mem::swap(&mut archived.inner.another, &mut archived_row.another); - std::mem::swap(&mut archived.inner.id, &mut archived_row.id); - std::mem::swap(&mut archived.inner.exchange, &mut archived_row.exchange); - std::mem::swap(&mut archived.inner.test, &mut archived_row.test); - }) - .map_err(WorkTableError::PagesError)? - }; - self.0.indexes.process_difference_remove(link, diffs)?; - self.0.update_state.remove(&pk); - lock.unlock(); - self.0.lock_map.remove_with_lock_check(&pk).await; - core::result::Result::Ok(()) - } - pub async fn update_exchange_by_test( - &self, - row: ExchangeByTestQuery, - by: ExchangeByTestBy, - ) -> core::result::Result<(), WorkTableError> { - let mut bytes = rkyv::to_bytes::(&row) - .map_err(|_| WorkTableError::SerializeError)?; - let mut archived_row = unsafe { - rkyv::access_unchecked_mut::<::Archived>( - &mut bytes[..], - ) - .unseal_unchecked() - }; - let link = self - .0 - .indexes - .test_idx - .get(&by) - .map(|kv| kv.get().value) - .ok_or(WorkTableError::NotFound)?; - let pk = self - .0 - .data - .select_non_ghosted(link)? - .get_primary_key() - .clone(); - let lock = { - let lock_id = self.0.lock_map.next_id(); - if let Some(lock) = self.0.lock_map.get(&pk) { - let mut lock_guard = lock.write().await; - #[allow(clippy::mutable_key_type)] - let (locks, op_lock) = lock_guard.lock_update_exchange_by_test(lock_id); - drop(lock_guard); - futures::future::join_all(locks.iter().map(|l| l.wait()).collect::>()).await; - op_lock - } else { - let mut lock = TestLock::new(); - #[allow(clippy::mutable_key_type)] - let (_, op_lock) = lock.lock_update_exchange_by_test(lock_id); - let lock = std::sync::Arc::new(tokio::sync::RwLock::new(lock)); - let mut guard = lock.write().await; - if let Some(old_lock) = self.0.lock_map.insert(pk.clone(), lock.clone()) { - let mut old_lock_guard = old_lock.write().await; - #[allow(clippy::mutable_key_type)] - let locks = guard.merge(&mut *old_lock_guard); - drop(old_lock_guard); - drop(guard); - futures::future::join_all(locks.iter().map(|l| l.wait()).collect::>()) - .await; - } - op_lock - } - }; - let link = self - .0 - .indexes - .test_idx - .get(&by) - .map(|kv| kv.get().value) - .ok_or(WorkTableError::NotFound)?; - let op_id = OperationId::Single(uuid::Uuid::now_v7()); - let mut need_to_reinsert = true; - need_to_reinsert |= archived_row.get_exchange_size() != self.get_exchange_size(link)?; - if need_to_reinsert { - lock.unlock(); - let lock = { - let lock_id = self.0.lock_map.next_id(); - if let Some(lock) = self.0.lock_map.get(&pk) { - let mut lock_guard = lock.write().await; - #[allow(clippy::mutable_key_type)] - let (locks, op_lock) = lock_guard.lock(lock_id); - drop(lock_guard); - futures::future::join_all(locks.iter().map(|l| l.wait()).collect::>()) - .await; - op_lock - } else { - #[allow(clippy::mutable_key_type)] - let (lock, op_lock) = TestLock::with_lock(lock_id); - let mut lock = std::sync::Arc::new(tokio::sync::RwLock::new(lock)); - let mut guard = lock.write().await; - if let Some(old_lock) = self.0.lock_map.insert(pk.clone(), lock.clone()) { - let mut old_lock_guard = old_lock.write().await; - #[allow(clippy::mutable_key_type)] - let locks = guard.merge(&mut *old_lock_guard); - drop(old_lock_guard); - drop(guard); - futures::future::join_all( - locks.iter().map(|l| l.wait()).collect::>(), - ) - .await; - } - op_lock - } - }; - let row_old = self - .0 - .select(pk.clone()) - .expect("should not be deleted by other thread"); - let mut row_new = row_old.clone(); - let pk = row_old.get_primary_key().clone(); - row_new.exchange = row.exchange; - if let Err(e) = self.reinsert(row_old, row_new) { - self.0.update_state.remove(&pk); - lock.unlock(); - return Err(e); - } - lock.unlock(); - self.0.lock_map.remove_with_lock_check(&pk).await; - return core::result::Result::Ok(()); - } - let row_old = self.0.data.select_non_ghosted(link)?; - let row_new = row.clone(); - let updated_bytes: Vec = vec![]; - let mut diffs: std::collections::HashMap<&str, Difference> = - std::collections::HashMap::new(); - let old = &row_old.exchange; - let new = &row_new.exchange; - if old != new { - let diff = Difference:: { - old: old.clone().into(), - new: new.clone().into(), - }; - diffs.insert("exchange", diff); - } - let indexes_res = self - .0 - .indexes - .process_difference_insert(link, diffs.clone()); - if let Err(e) = indexes_res { - return match e { - IndexError::AlreadyExists { - at, - inserted_already, - } => { - self.0.indexes.delete_from_indexes( - row_new.merge(row_old.clone()), - link, - inserted_already, - )?; - Err(WorkTableError::AlreadyExists(at.to_string_value())) - } - IndexError::NotFound => Err(WorkTableError::NotFound), - }; - } - unsafe { - self.0 - .data - .with_mut_ref(link, |archived| { - std::mem::swap(&mut archived.inner.exchange, &mut archived_row.exchange); - }) - .map_err(WorkTableError::PagesError)?; - } - self.0.indexes.process_difference_remove(link, diffs)?; - lock.unlock(); - self.0.lock_map.remove_with_lock_check(&pk).await; - core::result::Result::Ok(()) - } - pub async fn update_exchange_by_id( - &self, - row: ExchangeByIdQuery, - pk: Pk, - ) -> core::result::Result<(), WorkTableError> - where - TestPrimaryKey: From, - { - let pk = pk.into(); - let lock = { - let lock_id = self.0.lock_map.next_id(); - if let Some(lock) = self.0.lock_map.get(&pk) { - let mut lock_guard = lock.write().await; - #[allow(clippy::mutable_key_type)] - let (locks, op_lock) = lock_guard.lock_update_exchange_by_id(lock_id); - drop(lock_guard); - futures::future::join_all(locks.iter().map(|l| l.wait()).collect::>()).await; - op_lock - } else { - let mut lock = TestLock::new(); - #[allow(clippy::mutable_key_type)] - let (_, op_lock) = lock.lock_update_exchange_by_id(lock_id); - let lock = std::sync::Arc::new(tokio::sync::RwLock::new(lock)); - let mut guard = lock.write().await; - if let Some(old_lock) = self.0.lock_map.insert(pk.clone(), lock.clone()) { - let mut old_lock_guard = old_lock.write().await; - #[allow(clippy::mutable_key_type)] - let locks = guard.merge(&mut *old_lock_guard); - drop(old_lock_guard); - drop(guard); - futures::future::join_all(locks.iter().map(|l| l.wait()).collect::>()) - .await; - } - op_lock - } - }; - let link = self - .0 - .pk_map - .get(&pk) - .map(|v| v.get().value) - .ok_or(WorkTableError::NotFound)?; - let mut bytes = rkyv::to_bytes::(&row) - .map_err(|_| WorkTableError::SerializeError)?; - let mut archived_row = unsafe { - rkyv::access_unchecked_mut::<::Archived>( - &mut bytes[..], - ) - .unseal_unchecked() - }; - let op_id = OperationId::Single(uuid::Uuid::now_v7()); - let mut need_to_reinsert = true; - need_to_reinsert |= archived_row.get_exchange_size() != self.get_exchange_size(link)?; - if need_to_reinsert { - lock.unlock(); - let lock = { - let lock_id = self.0.lock_map.next_id(); - if let Some(lock) = self.0.lock_map.get(&pk) { - let mut lock_guard = lock.write().await; - #[allow(clippy::mutable_key_type)] - let (locks, op_lock) = lock_guard.lock(lock_id); - drop(lock_guard); - futures::future::join_all(locks.iter().map(|l| l.wait()).collect::>()) - .await; - op_lock - } else { - #[allow(clippy::mutable_key_type)] - let (lock, op_lock) = TestLock::with_lock(lock_id); - let mut lock = std::sync::Arc::new(tokio::sync::RwLock::new(lock)); - let mut guard = lock.write().await; - if let Some(old_lock) = self.0.lock_map.insert(pk.clone(), lock.clone()) { - let mut old_lock_guard = old_lock.write().await; - #[allow(clippy::mutable_key_type)] - let locks = guard.merge(&mut *old_lock_guard); - drop(old_lock_guard); - drop(guard); - futures::future::join_all( - locks.iter().map(|l| l.wait()).collect::>(), - ) - .await; - } - op_lock - } - }; - let row_old = self - .0 - .select(pk.clone()) - .expect("should not be deleted by other thread"); - let mut row_new = row_old.clone(); - let pk = row_old.get_primary_key().clone(); - row_new.exchange = row.exchange; - if let Err(e) = self.reinsert(row_old, row_new) { - self.0.update_state.remove(&pk); - lock.unlock(); - return Err(e); - } - lock.unlock(); - self.0.lock_map.remove_with_lock_check(&pk).await; - return core::result::Result::Ok(()); - } - let row_old = self.0.data.select_non_ghosted(link)?; - let row_new = row.clone(); - let updated_bytes: Vec = vec![]; - let mut diffs: std::collections::HashMap<&str, Difference> = - std::collections::HashMap::new(); - let old = &row_old.exchange; - let new = &row_new.exchange; - if old != new { - let diff = Difference:: { - old: old.clone().into(), - new: new.clone().into(), - }; - diffs.insert("exchange", diff); - } - let indexes_res = self - .0 - .indexes - .process_difference_insert(link, diffs.clone()); - if let Err(e) = indexes_res { - return match e { - IndexError::AlreadyExists { - at, - inserted_already, - } => { - self.0.indexes.delete_from_indexes( - row_new.merge(row_old.clone()), - link, - inserted_already, - )?; - Err(WorkTableError::AlreadyExists(at.to_string_value())) - } - IndexError::NotFound => Err(WorkTableError::NotFound), - }; - } - unsafe { - self.0 - .data - .with_mut_ref(link, |archived| { - std::mem::swap(&mut archived.inner.exchange, &mut archived_row.exchange); - }) - .map_err(WorkTableError::PagesError)? - }; - self.0.indexes.process_difference_remove(link, diffs)?; - lock.unlock(); - self.0.lock_map.remove_with_lock_check(&pk).await; - core::result::Result::Ok(()) - } - pub async fn update_exchange_by_abother( - &self, - row: ExchangeByAbotherQuery, - by: ExchangeByAbotherBy, - ) -> core::result::Result<(), WorkTableError> { - let links: Vec<_> = self - .0 - .indexes - .another_idx - .get(&by) - .map(|(_, l)| *l) - .collect(); - let mut locks = std::collections::HashMap::new(); - for link in links.iter() { - let pk = self - .0 - .data - .select_non_ghosted(*link)? - .get_primary_key() - .clone(); - let op_lock = { - let lock_id = self.0.lock_map.next_id(); - if let Some(lock) = self.0.lock_map.get(&pk) { - let mut lock_guard = lock.write().await; - #[allow(clippy::mutable_key_type)] - let (locks, op_lock) = lock_guard.lock_update_exchange_by_abother(lock_id); - drop(lock_guard); - futures::future::join_all(locks.iter().map(|l| l.wait()).collect::>()) - .await; - op_lock - } else { - let mut lock = TestLock::new(); - #[allow(clippy::mutable_key_type)] - let (_, op_lock) = lock.lock_update_exchange_by_abother(lock_id); - let lock = std::sync::Arc::new(tokio::sync::RwLock::new(lock)); - let mut guard = lock.write().await; - if let Some(old_lock) = self.0.lock_map.insert(pk.clone(), lock.clone()) { - let mut old_lock_guard = old_lock.write().await; - #[allow(clippy::mutable_key_type)] - let locks = guard.merge(&mut *old_lock_guard); - drop(old_lock_guard); - drop(guard); - futures::future::join_all( - locks.iter().map(|l| l.wait()).collect::>(), - ) - .await; - } - op_lock - } - }; - locks.insert(pk, op_lock); - } - let links: Vec<_> = self - .0 - .indexes - .another_idx - .get(&by) - .map(|(_, l)| *l) - .collect(); - let mut pk_to_unlock: std::collections::HashMap<_, std::sync::Arc> = - std::collections::HashMap::new(); - let op_id = OperationId::Multi(uuid::Uuid::now_v7()); - for link in links.into_iter() { - let pk = self - .0 - .data - .select_non_ghosted(link)? - .get_primary_key() - .clone(); - let mut bytes = rkyv::to_bytes::(&row) - .map_err(|_| WorkTableError::SerializeError)?; - let mut archived_row = unsafe { - rkyv::access_unchecked_mut::<::Archived>( - &mut bytes[..], - ) - .unseal_unchecked() - }; - let mut need_to_reinsert = true; - need_to_reinsert |= archived_row.get_exchange_size() != self.get_exchange_size(link)?; - if need_to_reinsert { - let op_lock = locks - .remove(&pk) - .expect("should not be deleted as links are unique"); - op_lock.unlock(); - let lock = { - let lock_id = self.0.lock_map.next_id(); - if let Some(lock) = self.0.lock_map.get(&pk) { - let mut lock_guard = lock.write().await; - #[allow(clippy::mutable_key_type)] - let (locks, op_lock) = lock_guard.lock(lock_id); - drop(lock_guard); - futures::future::join_all( - locks.iter().map(|l| l.wait()).collect::>(), - ) - .await; - op_lock - } else { - #[allow(clippy::mutable_key_type)] - let (lock, op_lock) = TestLock::with_lock(lock_id); - let mut lock = std::sync::Arc::new(tokio::sync::RwLock::new(lock)); - let mut guard = lock.write().await; - if let Some(old_lock) = self.0.lock_map.insert(pk.clone(), lock.clone()) { - let mut old_lock_guard = old_lock.write().await; - #[allow(clippy::mutable_key_type)] - let locks = guard.merge(&mut *old_lock_guard); - drop(old_lock_guard); - drop(guard); - futures::future::join_all( - locks.iter().map(|l| l.wait()).collect::>(), - ) - .await; - } - op_lock - } - }; - let row_old = self - .select(pk.clone()) - .expect("should not be deleted by other thread"); - let mut row_new = row_old.clone(); - row_new.exchange = row.exchange.clone(); - if let Err(e) = self.reinsert(row_old, row_new) { - self.0.update_state.remove(&pk); - lock.unlock(); - return Err(e); - } - lock.unlock(); - self.0.lock_map.remove_with_lock_check(&pk).await; - continue; - } else { - pk_to_unlock.insert( - pk.clone(), - locks - .remove(&pk) - .expect("should not be deleted as links are unique"), - ); - } - let row_old = self.0.data.select_non_ghosted(link)?; - let row_new = row.clone(); - let updated_bytes: Vec = vec![]; - let mut diffs: std::collections::HashMap<&str, Difference> = - std::collections::HashMap::new(); - let old = &row_old.exchange; - let new = &row_new.exchange; - if old != new { - let diff = Difference:: { - old: old.clone().into(), - new: new.clone().into(), - }; - diffs.insert("exchange", diff); - } - let indexes_res = self - .0 - .indexes - .process_difference_insert(link, diffs.clone()); - if let Err(e) = indexes_res { - return match e { - IndexError::AlreadyExists { - at, - inserted_already, - } => { - self.0.indexes.delete_from_indexes( - row_new.merge(row_old.clone()), - link, - inserted_already, - )?; - Err(WorkTableError::AlreadyExists(at.to_string_value())) - } - IndexError::NotFound => Err(WorkTableError::NotFound), - }; - } - unsafe { - self.0 - .data - .with_mut_ref(link, |archived| { - std::mem::swap(&mut archived.inner.exchange, &mut archived_row.exchange); - }) - .map_err(WorkTableError::PagesError)?; - } - self.0.indexes.process_difference_remove(link, diffs)?; - } - for (pk, lock) in pk_to_unlock { - lock.unlock(); - self.0.lock_map.remove_with_lock_check(&pk).await; - } - core::result::Result::Ok(()) - } -} -impl TestWorkTable {} -impl TestWorkTable { - pub async fn delete(&self, pk: TestPrimaryKey) -> core::result::Result<(), WorkTableError> { - let lock = { - let lock_id = self.0.lock_map.next_id(); - if let Some(lock) = self.0.lock_map.get(&pk) { - let mut lock_guard = lock.write().await; - #[allow(clippy::mutable_key_type)] - let (locks, op_lock) = lock_guard.lock(lock_id); - drop(lock_guard); - futures::future::join_all(locks.iter().map(|l| l.wait()).collect::>()).await; - op_lock - } else { - #[allow(clippy::mutable_key_type)] - let (lock, op_lock) = TestLock::with_lock(lock_id); - let mut lock = std::sync::Arc::new(tokio::sync::RwLock::new(lock)); - let mut guard = lock.write().await; - if let Some(old_lock) = self.0.lock_map.insert(pk.clone(), lock.clone()) { - let mut old_lock_guard = old_lock.write().await; - #[allow(clippy::mutable_key_type)] - let locks = guard.merge(&mut *old_lock_guard); - drop(old_lock_guard); - drop(guard); - futures::future::join_all(locks.iter().map(|l| l.wait()).collect::>()) - .await; - } - op_lock - } - }; - let link = match self - .0 - .pk_map - .get(&pk) - .map(|v| v.get().value) - .ok_or(WorkTableError::NotFound) - { - Ok(l) => l, - Err(e) => { - lock.unlock(); - self.0.lock_map.remove_with_lock_check(&pk).await; - return Err(e); - } - }; - - let row = self.select(pk.clone()).unwrap(); - self.0.indexes.delete_row(row, link)?; - self.0.pk_map.remove(&pk); - self.0 - .data - .delete(link) - .map_err(WorkTableError::PagesError)?; - lock.unlock(); - self.0.lock_map.remove_with_lock_check(&pk).await; - core::result::Result::Ok(()) - } - pub fn delete_without_lock( - &self, - pk: TestPrimaryKey, - ) -> core::result::Result<(), WorkTableError> { - let link = self - .0 - .pk_map - .get(&pk) - .map(|v| v.get().value) - .ok_or(WorkTableError::NotFound)?; - let row = self.select(pk.clone()).unwrap(); - self.0.indexes.delete_row(row, link)?; - self.0.pk_map.remove(&pk); - self.0 - .data - .delete(link) - .map_err(WorkTableError::PagesError)?; - core::result::Result::Ok(()) - } -} -impl TestWorkTable { - fn get_exchange_size(&self, link: Link) -> core::result::Result { - self.0 - .data - .with_ref(link, |row_ref| { - row_ref.inner.exchange.as_str().to_string().aligned_size() - }) - .map_err(WorkTableError::PagesError) - } -} -impl ArchivedExchangeByTestQuery { - pub fn get_exchange_size(&self) -> usize { - self.exchange.as_str().to_string().aligned_size() - } -} -impl ArchivedExchangeByIdQuery { - pub fn get_exchange_size(&self) -> usize { - self.exchange.as_str().to_string().aligned_size() - } -} -impl ArchivedExchangeByAbotherQuery { - pub fn get_exchange_size(&self) -> usize { - self.exchange.as_str().to_string().aligned_size() - } -} #[tokio::test] async fn test_update_string_full_row() { let table = TestWorkTable::default(); @@ -2636,3 +919,57 @@ async fn update_parallel_more_strings_with_select_unique() { assert_eq!(&row.exchange, e) } } + +#[tokio::test(flavor = "multi_thread", worker_threads = 3)] +async fn upsert_parallel() { + let table = Arc::new(TestMoreStringsWorkTable::default()); + let e_state = Arc::new(Mutex::new(HashMap::new())); + for i in 0..1000 { + let e_val = fastrand::u8(0..100); + let s_val = fastrand::u8(0..100); + let row = TestMoreStringsRow { + id: table.get_next_pk().into(), + test: i, + another: 1, + exchange: format!("test_{e_val}"), + some_string: format!("some_{s_val}"), + other_srting: format!("other_{i}"), + }; + let _ = table.insert(row.clone()).unwrap(); + } + let shared = table.clone(); + let shared_e_state = e_state.clone(); + let h1 = tokio::spawn(async move { + for _ in 0..4_000 { + let val = fastrand::u8(0..100); + let id_to_update = fastrand::u64(0..1000); + let row = TestMoreStringsRow { + id: id_to_update, + test: id_to_update as i64, + another: 1, + exchange: format!("test_{val}"), + some_string: format!("some_{val}"), + other_srting: format!("other_{id_to_update}"), + }; + let _ = shared.upsert(row.clone()).await.unwrap(); + { + let mut guard = shared_e_state.lock(); + guard + .entry(id_to_update) + .and_modify(|v| *v = format!("test_{val}")) + .or_insert(format!("test_{val}")); + } + } + }); + for _ in 0..20_000 { + let val = fastrand::i64(0..1000); + let res = table.select_by_test(val); + assert!(res.is_some()) + } + h1.await.unwrap(); + + for (id, e) in e_state.lock_arc().iter() { + let row = table.select(*id).unwrap(); + assert_eq!(&row.exchange, e) + } +} From 8325aca64ef8cea219fb712a375f817a69a5f42a Mon Sep 17 00:00:00 2001 From: Handy-caT <37216852+Handy-caT@users.noreply.github.com> Date: Thu, 18 Dec 2025 16:48:30 +0300 Subject: [PATCH 3/3] bump --- Cargo.toml | 4 ++-- codegen/Cargo.toml | 2 +- src/in_memory/data.rs | 1 + src/in_memory/empty_link_registry.rs | 22 ++++++++++++---------- tests/worktable/unsized_.rs | 2 +- 5 files changed, 17 insertions(+), 14 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index bab2d56..a9ac4ef 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,7 +3,7 @@ members = ["codegen", "examples", "performance_measurement", "performance_measur [package] name = "worktable" -version = "0.8.13" +version = "0.8.15" edition = "2024" authors = ["Handy-caT"] license = "MIT" @@ -16,7 +16,7 @@ perf_measurements = ["dep:performance_measurement", "dep:performance_measurement # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -worktable_codegen = { path = "codegen", version = "0.8.6" } +worktable_codegen = { path = "codegen", version = "0.8.14" } eyre = "0.6.12" derive_more = { version = "2.0.1", features = ["from", "error", "display", "into"] } diff --git a/codegen/Cargo.toml b/codegen/Cargo.toml index 5cd8c12..643768b 100644 --- a/codegen/Cargo.toml +++ b/codegen/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "worktable_codegen" -version = "0.8.6" +version = "0.8.14" edition = "2024" license = "MIT" description = "WorkTable codegeneration crate" diff --git a/src/in_memory/data.rs b/src/in_memory/data.rs index 7fce256..ac7103b 100644 --- a/src/in_memory/data.rs +++ b/src/in_memory/data.rs @@ -158,6 +158,7 @@ impl Data { Ok(link) } + #[allow(clippy::missing_safety_doc)] pub unsafe fn try_save_row_by_link( &self, row: &Row, diff --git a/src/in_memory/empty_link_registry.rs b/src/in_memory/empty_link_registry.rs index b374c68..35d5e06 100644 --- a/src/in_memory/empty_link_registry.rs +++ b/src/in_memory/empty_link_registry.rs @@ -5,7 +5,7 @@ use indexset::concurrent::set::BTreeSet; use parking_lot::FairMutex; /// A link wrapper that implements `Ord` based on absolute index calculation. -#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord)] +#[derive(Copy, Clone, Debug, Eq, PartialEq)] pub struct IndexOrdLink(pub Link); impl IndexOrdLink { @@ -59,7 +59,13 @@ impl IndexOrdLink { impl PartialOrd for IndexOrdLink { fn partial_cmp(&self, other: &Self) -> Option { - Some(self.absolute_index().cmp(&other.absolute_index())) + Some(self.cmp(other)) + } +} + +impl Ord for IndexOrdLink { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.absolute_index().cmp(&other.absolute_index()) } } @@ -82,7 +88,7 @@ impl Default for EmptyLinkRegistry { impl EmptyLinkRegistry { pub fn push(&self, link: Link) { - let mut index_ord_link = IndexOrdLink(link.clone()); + let mut index_ord_link = IndexOrdLink(link); let _g = self.op_lock.lock(); { @@ -144,10 +150,6 @@ impl EmptyLinkRegistry { Some(index_ord_link.0) } - pub fn len(&self) -> usize { - self.index_ord_links.len() - } - pub fn iter(&self) -> impl Iterator + '_ { self.index_ord_links.iter().map(|l| l.0) } @@ -179,9 +181,9 @@ mod tests { length: 200, }; - registry.push(link1.clone()); - registry.push(link2.clone()); - registry.push(link3.clone()); + registry.push(link1); + registry.push(link2); + registry.push(link3); // After inserting link1 and link2, they should be united let united_link = Link { diff --git a/tests/worktable/unsized_.rs b/tests/worktable/unsized_.rs index bca552c..b6dc787 100644 --- a/tests/worktable/unsized_.rs +++ b/tests/worktable/unsized_.rs @@ -951,7 +951,7 @@ async fn upsert_parallel() { some_string: format!("some_{val}"), other_srting: format!("other_{id_to_update}"), }; - let _ = shared.upsert(row.clone()).await.unwrap(); + shared.upsert(row.clone()).await.unwrap(); { let mut guard = shared_e_state.lock(); guard