Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ lockfree = { version = "0.5.1" }
fastrand = "2.3.0"
futures = "0.3.30"
uuid = { version = "1.10.0", features = ["v4", "v7"] }
data_bucket = "=0.3.9"
# data_bucket = "=0.3.9"
# data_bucket = { git = "https://github.com/pathscale/DataBucket", branch = "page_cdc_correction", version = "0.2.7" }
# data_bucket = { path = "../DataBucket", version = "0.3.8" }
data_bucket = { path = "../DataBucket", version = "0.3.10" }
performance_measurement_codegen = { path = "performance_measurement/codegen", version = "0.1.0", optional = true }
performance_measurement = { path = "performance_measurement", version = "0.1.0", optional = true }
indexset = { version = "=0.14.0", features = ["concurrent", "cdc", "multimap"] }
Expand Down
45 changes: 39 additions & 6 deletions codegen/src/persist_index/generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -364,12 +364,29 @@ impl Generator {
if is_unsized(&ty.to_string()) {
let node = if is_unique {
quote! {
let node = UnsizedNode::from_inner(page.inner.get_node(), #const_name);
let node = page
.inner
.get_node()
.into_iter()
.map(|p| IndexPair {
key: p.key,
value: p.value.into(),
})
.collect();
let node = UnsizedNode::from_inner(node, #const_name);
#i.attach_node(node);
}
} else {
quote! {
let inner = page.inner.get_node();
let inner: Vec<_> = page
.inner
.get_node()
.into_iter()
.map(|p| IndexPair {
key: p.key,
value: OffsetEqLink(p.value),
})
.collect();
let mut last_key = inner.first().expect("Node should be not empty").key.clone();
let mut discriminator = 0;
let mut inner = inner.into_iter().map(move |p| {
Expand All @@ -390,20 +407,36 @@ impl Generator {
}
};
quote! {
let #i: #t<_, Link, UnsizedNode<_>> = #t::with_maximum_node_size(#const_name);
let #i: #t<_, OffsetEqLink, UnsizedNode<_>> = #t::with_maximum_node_size(#const_name);
for page in persisted.#i.1 {
#node
}
}
} else {
let node = if is_unique {
quote! {
let node = page.inner.get_node();
let node = page
.inner
.get_node()
.into_iter()
.map(|p| IndexPair {
key: p.key,
value: p.value.into(),
})
.collect();
#i.attach_node(node);
}
} else {
quote! {
let inner = page.inner.get_node();
let inner: Vec<_> = page
.inner
.get_node()
.into_iter()
.map(|p| IndexPair {
key: p.key,
value: OffsetEqLink(p.value),
})
.collect();
let mut last_key = inner.first().expect("Node should be not empty").key.clone();
let mut discriminator = 0;
let mut inner = inner.into_iter().map(move |p| {
Expand All @@ -424,7 +457,7 @@ impl Generator {
};
quote! {
let size = get_index_page_size_from_data_length::<#ty>(#const_name);
let #i: #t<_, Link> = #t::with_maximum_node_size(size);
let #i: #t<_, OffsetEqLink> = #t::with_maximum_node_size(size);
for page in persisted.#i.1 {
#node
}
Expand Down
48 changes: 39 additions & 9 deletions codegen/src/persist_table/generator/space_file/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,23 +156,53 @@ impl Generator {
let const_name = name_generator.get_page_inner_size_const_ident();
let pk_type = name_generator.get_primary_key_type_ident();

let pk_map = if self.attributes.pk_unsized {
let primary_index_init = if self.attributes.pk_unsized {
let pk_ident = &self.pk_ident;
quote! {
let pk_map = IndexMap::<#pk_ident, Link, UnsizedNode<_>>::with_maximum_node_size(#const_name);
let pk_map = IndexMap::<#pk_ident, OffsetEqLink<#const_name>, UnsizedNode<_>>::with_maximum_node_size(#const_name);
for page in self.primary_index.1 {
let node = page.inner.get_node();
let node = page
.inner
.get_node()
.into_iter()
.map(|p| IndexPair {
key: p.key,
value: p.value.into(),
})
.collect();
pk_map.attach_node(UnsizedNode::from_inner(node, #const_name));
}
// Reconstruct reverse_pk_map by iterating over pk_map
let mut reverse_pk_map = IndexMap::<OffsetEqLink<#const_name>, #pk_ident>::new();
for entry in pk_map.iter() {
let (pk, link) = entry;
reverse_pk_map.insert(*link, pk.clone());
}
let primary_index = PrimaryIndex { pk_map, reverse_pk_map };
}
} else {
quote! {
let size = get_index_page_size_from_data_length::<#pk_type>(#const_name);
let pk_map = IndexMap::with_maximum_node_size(size);
let pk_map = IndexMap::<_, OffsetEqLink<#const_name>>::with_maximum_node_size(size);
for page in self.primary_index.1 {
let node = page.inner.get_node();
let node = page
.inner
.get_node()
.into_iter()
.map(|p| IndexPair {
key: p.key,
value: p.value.into(),
})
.collect();
pk_map.attach_node(node);
}
// Reconstruct reverse_pk_map by iterating over pk_map
let mut reverse_pk_map = IndexMap::<OffsetEqLink<#const_name>, #pk_type>::new();
for entry in pk_map.iter() {
let (pk, link) = entry;
reverse_pk_map.insert(*link, pk.clone());
}
let primary_index = PrimaryIndex { pk_map, reverse_pk_map };
}
};

Expand All @@ -191,12 +221,12 @@ impl Generator {
.with_empty_links(self.data_info.inner.empty_links_list);
let indexes = #index_ident::from_persisted(self.indexes);

#pk_map
#primary_index_init

let table = WorkTable {
data,
pk_map,
indexes,
data: std::sync::Arc::new(data),
primary_index: std::sync::Arc::new(primary_index),
indexes: std::sync::Arc::new(indexes),
pk_gen: PrimaryKeyGeneratorState::from_state(self.data_info.inner.pk_gen_state),
lock_map: LockMap::default(),
update_state: IndexMap::default(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ impl Generator {
quote! {
pub fn get_peristed_primary_key_with_toc(&self) -> (Vec<GeneralPage<TableOfContentsPage<(#pk_type, Link)>>>, Vec<GeneralPage<UnsizedIndexPage<#pk_type, {#const_name as u32}>>>) {
let mut pages = vec![];
for node in self.0.pk_map.iter_nodes() {
for node in self.0.primary_index.pk_map.iter_nodes() {
let page = UnsizedIndexPage::from_node(node.lock_arc().as_ref());
pages.push(page);
}
Expand All @@ -118,7 +118,7 @@ impl Generator {
pub fn get_peristed_primary_key_with_toc(&self) -> (Vec<GeneralPage<TableOfContentsPage<(#pk_type, Link)>>>, Vec<GeneralPage<IndexPage<#pk_type>>>) {
let size = get_index_page_size_from_data_length::<#pk_type>(#const_name);
let mut pages = vec![];
for node in self.0.pk_map.iter_nodes() {
for node in self.0.primary_index.pk_map.iter_nodes() {
let page = IndexPage::from_node(node.lock_arc().as_ref(), size);
pages.push(page);
}
Expand Down
4 changes: 2 additions & 2 deletions codegen/src/worktable/generator/index/cdc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ impl Generator {
let remove = if idx.is_unique {
quote! {
if row_new.#i == row_old.#i {
let events = self.#index_field_name.insert_cdc(row_new.#i.clone(), link_new).1;
let events = TableIndexCdc::insert_cdc(&self.#index_field_name, row_new.#i.clone(), link_new).1;
#index_field_name.extend(events.into_iter().map(|ev| ev.into()).collect::<Vec<_>>());
} else {
let (_, events) = TableIndexCdc::remove_cdc(&self.#index_field_name, row_old.#i.clone(), link_old);
Expand All @@ -113,7 +113,7 @@ impl Generator {
}
} else {
quote! {
let events = self.#index_field_name.insert_cdc(row_new.#i.clone(), link_new).1;
let events = TableIndexCdc::insert_cdc(&self.#index_field_name, row_new.#i.clone(), link_new).1;
#index_field_name.extend(events.into_iter().map(|ev| ev.into()).collect::<Vec<_>>());
let (_, events) = TableIndexCdc::remove_cdc(&self.#index_field_name, row_old.#i.clone(), link_old);
#index_field_name.extend(events.into_iter().map(|ev| ev.into()).collect::<Vec<_>>());
Expand Down
8 changes: 4 additions & 4 deletions codegen/src/worktable/generator/index/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,16 +60,16 @@ impl Generator {
let res = if idx.is_unique {
if is_unsized(&t.to_string()) {
quote! {
#i: IndexMap<#t, Link, UnsizedNode<IndexPair<#t, Link>>>
#i: IndexMap<#t, OffsetEqLink, UnsizedNode<IndexPair<#t, OffsetEqLink>>>
}
} else {
quote! {#i: IndexMap<#t, Link>}
quote! {#i: IndexMap<#t, OffsetEqLink>}
}
} else {
if is_unsized(&t.to_string()) {
quote! {#i: IndexMultiMap<#t, Link, UnsizedNode<IndexMultiPair<#t, Link>>>}
quote! {#i: IndexMultiMap<#t, OffsetEqLink, UnsizedNode<IndexMultiPair<#t, OffsetEqLink>>>}
} else {
quote! {#i: IndexMultiMap<#t, Link>}
quote! {#i: IndexMultiMap<#t, OffsetEqLink>}
}
};
Ok::<_, syn::Error>(res)
Expand Down
31 changes: 8 additions & 23 deletions codegen/src/worktable/generator/index/usual.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,15 +125,15 @@ impl Generator {
let remove = if idx.is_unique {
quote! {
if val_new == val_old {
self.#index_field_name.insert(val_new.clone(), link_new);
TableIndex::insert(&self.#index_field_name, val_new.clone(), link_new);
} else {
TableIndex::remove(&self.#index_field_name, val_old, link_old);
TableIndex::remove(&self.#index_field_name, &val_old, link_old);
}
}
} else {
quote! {
self.#index_field_name.insert(val_new.clone(), link_new);
TableIndex::remove(&self.#index_field_name, val_old, link_old);
TableIndex::insert(&self.#index_field_name, val_new.clone(), link_new);
TableIndex::remove(&self.#index_field_name, &val_old, link_old);
}
};
let insert = if idx.is_unique {
Expand Down Expand Up @@ -212,14 +212,8 @@ impl Generator {
row.#i
}
};
if idx.is_unique {
quote! {
self.#index_field_name.remove(&#row);
}
} else {
quote! {
self.#index_field_name.remove(&#row, &link);
}
quote! {
TableIndex::remove(&self.#index_field_name, &#row, link);
}
})
.collect::<Vec<_>>();
Expand Down Expand Up @@ -259,7 +253,7 @@ impl Generator {
if let Some(diff) = difference.get(#diff_key) {
if let #avt_type_ident::#variant_ident(old) = &diff.old {
let key_old = #old_value_expr;
TableIndex::remove(&self.#index_field_name, key_old, link);
TableIndex::remove(&self.#index_field_name, &key_old, link);
}
}
}
Expand Down Expand Up @@ -372,19 +366,10 @@ impl Generator {
row.#i
}
};
let delete = if idx.is_unique {
quote! {
self.#index_field_name.remove(&#row);
}
} else {
quote! {
self.#index_field_name.remove(&#row, &link);
}
};

quote! {
#avt_index_ident::#index_variant => {
#delete
TableIndex::remove(&self.#index_field_name, &#row, link);
},
}
})
Expand Down
14 changes: 8 additions & 6 deletions codegen/src/worktable/generator/queries/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ impl Generator {
let process = if self.is_persist {
quote! {
let secondary_keys_events = self.0.indexes.delete_row_cdc(row, link)?;
let (_, primary_key_events) = TableIndexCdc::remove_cdc(&self.0.pk_map, pk.clone(), link);
let (_, primary_key_events) = self.0.primary_index.remove_cdc(pk.clone(), link);
self.0.data.delete(link).map_err(WorkTableError::PagesError)?;
let mut op: Operation<
<<#pk_ident as TablePrimaryKey>::Generator as PrimaryKeyGeneratorState>::State,
Expand All @@ -93,16 +93,17 @@ impl Generator {
} else {
quote! {
self.0.indexes.delete_row(row, link)?;
self.0.pk_map.remove(&pk);
self.0.primary_index.remove(&pk, link);
self.0.data.delete(link).map_err(WorkTableError::PagesError)?;
}
};
if is_locked {
quote! {
let link = match self.0
.primary_index
.pk_map
.get(&pk)
.map(|v| v.get().value)
.map(|v| v.get().value.into())
.ok_or(WorkTableError::NotFound) {
Ok(l) => l,
Err(e) => {
Expand All @@ -117,9 +118,10 @@ impl Generator {
} else {
quote! {
let link = self.0
.primary_index
.pk_map
.get(&pk)
.map(|v| v.get().value)
.map(|v| v.get().value.into())
.ok_or(WorkTableError::NotFound)?;
let row = self.select(pk.clone()).unwrap();
#process
Expand Down Expand Up @@ -197,7 +199,7 @@ impl Generator {
pub async fn #name(&self, by: #type_) -> core::result::Result<(), WorkTableError> {
let rows_to_update = self.0.indexes.#index.get(#by).map(|kv| kv.1).collect::<Vec<_>>();
for link in rows_to_update {
let row = self.0.data.select_non_ghosted(*link).map_err(WorkTableError::PagesError)?;
let row = self.0.data.select_non_ghosted(link.0).map_err(WorkTableError::PagesError)?;
self.delete(row.get_primary_key()).await?;
}
core::result::Result::Ok(())
Expand All @@ -217,7 +219,7 @@ impl Generator {
};
quote! {
pub async fn #name(&self, by: #type_) -> core::result::Result<(), WorkTableError> {
let row_to_update = self.0.indexes.#index.get(#by).map(|v| v.get().value);
let row_to_update = self.0.indexes.#index.get(#by).map(|v| v.get().value.into());
if let Some(link) = row_to_update {
let row = self.0.data.select_non_ghosted(link).map_err(WorkTableError::PagesError)?;
self.delete(row.get_primary_key()).await?;
Expand Down
4 changes: 2 additions & 2 deletions codegen/src/worktable/generator/queries/in_place.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,9 @@ impl Generator {
};
let link = self
.0
.pk_map
.primary_index.pk_map
.get(&pk)
.map(|v| v.get().value)
.map(|v| v.get().value.into())
.ok_or(WorkTableError::NotFound)?;
unsafe {
self.0
Expand Down
4 changes: 2 additions & 2 deletions codegen/src/worktable/generator/queries/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ impl Generator {
#column_range_type,
#row_fields_ident>
{
let iter = self.0.pk_map
let iter = self.0.primary_index.pk_map
.iter()
.filter_map(|(_, link)| self.0.data.select_non_ghosted(*link).ok());
.filter_map(|(_, link)| self.0.data.select_non_ghosted(link.0).ok());

SelectQueryBuilder::new(iter)
}
Expand Down
Loading