diff --git a/vortex-layout/src/layouts/row_idx/mod.rs b/vortex-layout/src/layouts/row_idx/mod.rs index fd7513aad96..8ce4a2f5f4b 100644 --- a/vortex-layout/src/layouts/row_idx/mod.rs +++ b/vortex-layout/src/layouts/row_idx/mod.rs @@ -9,6 +9,7 @@ use std::fmt::Formatter; use std::ops::BitAnd; use std::ops::Range; use std::sync::Arc; +use std::sync::OnceLock; use Nullability::NonNullable; pub use expr::*; @@ -47,7 +48,7 @@ pub struct RowIdxLayoutReader { name: Arc, row_offset: u64, child: Arc, - partition_cache: DashMap, + partition_cache: DashMap>>, session: VortexSession, } @@ -66,45 +67,52 @@ impl RowIdxLayoutReader { let key = ExactExpr(expr.clone()); // Check cache first with read-only lock. - if let Some(partitioning) = self.partition_cache.get(&key) { + if let Some(entry) = self.partition_cache.get(&key) + && let Some(partitioning) = entry.value().get() + { return partitioning.clone(); } - self.partition_cache + let cell = self + .partition_cache .entry(key) - .or_insert_with(|| { - // Partition the expression into row idx and child expressions. - let mut partitioned = partition(expr.clone(), self.dtype(), |expr| { - if expr.is::() { - vec![Partition::RowIdx] - } else if is_root(expr) { - vec![Partition::Child] - } else { - vec![] - } - }) - .vortex_expect("We should not fail to partition expression over struct fields"); + .or_insert_with(|| Arc::new(OnceLock::new())) + .clone(); - // If there's only a single partition, we can directly return the expression. - if partitioned.partitions.len() == 1 { - return match &partitioned.partition_annotations[0] { - Partition::RowIdx => { - Partitioning::RowIdx(replace(expr.clone(), &row_idx(), root())) - } - Partition::Child => Partitioning::Child(expr.clone()), - }; + cell.get_or_init(|| self.compute_partitioning(expr)).clone() + } + + fn compute_partitioning(&self, expr: &Expression) -> Partitioning { + // Partition the expression into row idx and child expressions. + let mut partitioned = partition(expr.clone(), self.dtype(), |expr| { + if expr.is::() { + vec![Partition::RowIdx] + } else if is_root(expr) { + vec![Partition::Child] + } else { + vec![] + } + }) + .vortex_expect("We should not fail to partition expression over struct fields"); + + // If there's only a single partition, we can directly return the expression. + if partitioned.partitions.len() == 1 { + return match &partitioned.partition_annotations[0] { + Partition::RowIdx => { + Partitioning::RowIdx(replace(expr.clone(), &row_idx(), root())) } + Partition::Child => Partitioning::Child(expr.clone()), + }; + } - // Replace the row_idx expression with the root expression in the row_idx partition. - partitioned.partitions = partitioned - .partitions - .into_iter() - .map(|p| replace(p, &row_idx(), root())) - .collect(); + // Replace the row_idx expression with the root expression in the row_idx partition. + partitioned.partitions = partitioned + .partitions + .into_iter() + .map(|p| replace(p, &row_idx(), root())) + .collect(); - Partitioning::Partitioned(Arc::new(partitioned)) - }) - .clone() + Partitioning::Partitioned(Arc::new(partitioned)) } } diff --git a/vortex-layout/src/layouts/struct_/reader.rs b/vortex-layout/src/layouts/struct_/reader.rs index fc665731bdd..bb069df9723 100644 --- a/vortex-layout/src/layouts/struct_/reader.rs +++ b/vortex-layout/src/layouts/struct_/reader.rs @@ -4,6 +4,7 @@ use std::collections::BTreeSet; use std::ops::Range; use std::sync::Arc; +use std::sync::OnceLock; use futures::try_join; use itertools::Itertools; @@ -57,7 +58,7 @@ pub struct StructReader { expanded_root_expr: Expression, field_lookup: Option>, - partitioned_expr_cache: DashMap, + partitioned_expr_cache: DashMap>>, } impl StructReader { @@ -152,51 +153,65 @@ impl StructReader { /// Utility for partitioning an expression over the fields of a struct. fn partition_expr(&self, expr: Expression) -> Partitioned { - self.partitioned_expr_cache - .entry(ExactExpr(expr.clone())) - .or_insert_with(|| { - // First, we expand the root scope into the fields of the struct to ensure - // that partitioning works correctly. - let expr = replace(expr.clone(), &root(), self.expanded_root_expr.clone()); - let expr = expr - .optimize_recursive(self.dtype()) - .vortex_expect("We should not fail to simplify expression over struct fields"); - - // Partition the expression into expressions that can be evaluated over individual fields - let mut partitioned = partition( - expr.clone(), - self.dtype(), - make_free_field_annotator( - self.dtype() - .as_struct_fields_opt() - .vortex_expect("We know it's a struct DType"), - ), - ) - .vortex_expect("We should not fail to partition expression over struct fields"); - - if partitioned.partitions.len() == 1 { - // If there's only one partition, we step into the field scope of the original - // expression by replacing any `$.a` with `$`. - return Partitioned::Single( - partitioned.partition_names[0].clone(), - replace(expr, &col(partitioned.partition_names[0].clone()), root()), - ); - } + let key = ExactExpr(expr.clone()); + + if let Some(entry) = self.partitioned_expr_cache.get(&key) + && let Some(partitioning) = entry.value().get() + { + return partitioning.clone(); + } - // We now need to process the partitioned expressions to rewrite the root scope - // to be that of the field, rather than the struct. In other words, "stepping in" - // to the field scope. - partitioned.partitions = partitioned - .partitions - .iter() - .zip_eq(partitioned.partition_names.iter()) - .map(|(e, name)| replace(e.clone(), &col(name.clone()), root())) - .collect(); - - Partitioned::Multi(Arc::new(partitioned)) - }) + let cell = self + .partitioned_expr_cache + .entry(key) + .or_insert_with(|| Arc::new(OnceLock::new())) + .clone(); + + cell.get_or_init(|| self.compute_partitioned_expr(expr)) .clone() } + + fn compute_partitioned_expr(&self, expr: Expression) -> Partitioned { + // First, we expand the root scope into the fields of the struct to ensure + // that partitioning works correctly. + let expr = replace(expr, &root(), self.expanded_root_expr.clone()); + let expr = expr + .optimize_recursive(self.dtype()) + .vortex_expect("We should not fail to simplify expression over struct fields"); + + // Partition the expression into expressions that can be evaluated over individual fields + let mut partitioned = partition( + expr.clone(), + self.dtype(), + make_free_field_annotator( + self.dtype() + .as_struct_fields_opt() + .vortex_expect("We know it's a struct DType"), + ), + ) + .vortex_expect("We should not fail to partition expression over struct fields"); + + if partitioned.partitions.len() == 1 { + // If there's only one partition, we step into the field scope of the original + // expression by replacing any `$.a` with `$`. + return Partitioned::Single( + partitioned.partition_names[0].clone(), + replace(expr, &col(partitioned.partition_names[0].clone()), root()), + ); + } + + // We now need to process the partitioned expressions to rewrite the root scope + // to be that of the field, rather than the struct. In other words, "stepping in" + // to the field scope. + partitioned.partitions = partitioned + .partitions + .iter() + .zip_eq(partitioned.partition_names.iter()) + .map(|(e, name)| replace(e.clone(), &col(name.clone()), root())) + .collect(); + + Partitioned::Multi(Arc::new(partitioned)) + } } /// When partitioning an expression, in the case it only has a single partition we can avoid