Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 40 additions & 32 deletions vortex-layout/src/layouts/row_idx/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use std::fmt::Formatter;
use std::ops::BitAnd;
use std::ops::Range;
use std::sync::Arc;
use std::sync::OnceLock;

use Nullability::NonNullable;
pub use expr::*;
Expand Down Expand Up @@ -47,7 +48,7 @@ pub struct RowIdxLayoutReader {
name: Arc<str>,
row_offset: u64,
child: Arc<dyn LayoutReader>,
partition_cache: DashMap<ExactExpr, Partitioning>,
partition_cache: DashMap<ExactExpr, Arc<OnceLock<Partitioning>>>,
session: VortexSession,
}

Expand All @@ -66,45 +67,52 @@ impl RowIdxLayoutReader {
let key = ExactExpr(expr.clone());

// Check cache first with read-only lock.
if let Some(partitioning) = self.partition_cache.get(&key) {
if let Some(entry) = self.partition_cache.get(&key)
&& let Some(partitioning) = entry.value().get()
{
return partitioning.clone();
}

self.partition_cache
let cell = self
.partition_cache
.entry(key)
.or_insert_with(|| {
// Partition the expression into row idx and child expressions.
let mut partitioned = partition(expr.clone(), self.dtype(), |expr| {
if expr.is::<RowIdx>() {
vec![Partition::RowIdx]
} else if is_root(expr) {
vec![Partition::Child]
} else {
vec![]
}
})
.vortex_expect("We should not fail to partition expression over struct fields");
.or_insert_with(|| Arc::new(OnceLock::new()))
.clone();

// If there's only a single partition, we can directly return the expression.
if partitioned.partitions.len() == 1 {
return match &partitioned.partition_annotations[0] {
Partition::RowIdx => {
Partitioning::RowIdx(replace(expr.clone(), &row_idx(), root()))
}
Partition::Child => Partitioning::Child(expr.clone()),
};
cell.get_or_init(|| self.compute_partitioning(expr)).clone()
}

fn compute_partitioning(&self, expr: &Expression) -> Partitioning {
// Partition the expression into row idx and child expressions.
let mut partitioned = partition(expr.clone(), self.dtype(), |expr| {
if expr.is::<RowIdx>() {
vec![Partition::RowIdx]
} else if is_root(expr) {
vec![Partition::Child]
} else {
vec![]
}
})
.vortex_expect("We should not fail to partition expression over struct fields");

// If there's only a single partition, we can directly return the expression.
if partitioned.partitions.len() == 1 {
return match &partitioned.partition_annotations[0] {
Partition::RowIdx => {
Partitioning::RowIdx(replace(expr.clone(), &row_idx(), root()))
}
Partition::Child => Partitioning::Child(expr.clone()),
};
}

// Replace the row_idx expression with the root expression in the row_idx partition.
partitioned.partitions = partitioned
.partitions
.into_iter()
.map(|p| replace(p, &row_idx(), root()))
.collect();
// Replace the row_idx expression with the root expression in the row_idx partition.
partitioned.partitions = partitioned
.partitions
.into_iter()
.map(|p| replace(p, &row_idx(), root()))
.collect();

Partitioning::Partitioned(Arc::new(partitioned))
})
.clone()
Partitioning::Partitioned(Arc::new(partitioned))
}
}

Expand Down
101 changes: 58 additions & 43 deletions vortex-layout/src/layouts/struct_/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use std::collections::BTreeSet;
use std::ops::Range;
use std::sync::Arc;
use std::sync::OnceLock;

use futures::try_join;
use itertools::Itertools;
Expand Down Expand Up @@ -57,7 +58,7 @@ pub struct StructReader {
expanded_root_expr: Expression,

field_lookup: Option<HashMap<FieldName, usize>>,
partitioned_expr_cache: DashMap<ExactExpr, Partitioned>,
partitioned_expr_cache: DashMap<ExactExpr, Arc<OnceLock<Partitioned>>>,
}

impl StructReader {
Expand Down Expand Up @@ -152,51 +153,65 @@ impl StructReader {

/// Utility for partitioning an expression over the fields of a struct.
fn partition_expr(&self, expr: Expression) -> Partitioned {
self.partitioned_expr_cache
.entry(ExactExpr(expr.clone()))
.or_insert_with(|| {
// First, we expand the root scope into the fields of the struct to ensure
// that partitioning works correctly.
let expr = replace(expr.clone(), &root(), self.expanded_root_expr.clone());
let expr = expr
.optimize_recursive(self.dtype())
.vortex_expect("We should not fail to simplify expression over struct fields");

// Partition the expression into expressions that can be evaluated over individual fields
let mut partitioned = partition(
expr.clone(),
self.dtype(),
make_free_field_annotator(
self.dtype()
.as_struct_fields_opt()
.vortex_expect("We know it's a struct DType"),
),
)
.vortex_expect("We should not fail to partition expression over struct fields");

if partitioned.partitions.len() == 1 {
// If there's only one partition, we step into the field scope of the original
// expression by replacing any `$.a` with `$`.
return Partitioned::Single(
partitioned.partition_names[0].clone(),
replace(expr, &col(partitioned.partition_names[0].clone()), root()),
);
}
let key = ExactExpr(expr.clone());

if let Some(entry) = self.partitioned_expr_cache.get(&key)
&& let Some(partitioning) = entry.value().get()
{
return partitioning.clone();
}

// We now need to process the partitioned expressions to rewrite the root scope
// to be that of the field, rather than the struct. In other words, "stepping in"
// to the field scope.
partitioned.partitions = partitioned
.partitions
.iter()
.zip_eq(partitioned.partition_names.iter())
.map(|(e, name)| replace(e.clone(), &col(name.clone()), root()))
.collect();

Partitioned::Multi(Arc::new(partitioned))
})
let cell = self
.partitioned_expr_cache
.entry(key)
.or_insert_with(|| Arc::new(OnceLock::new()))
.clone();

cell.get_or_init(|| self.compute_partitioned_expr(expr))
.clone()
}

fn compute_partitioned_expr(&self, expr: Expression) -> Partitioned {
// First, we expand the root scope into the fields of the struct to ensure
// that partitioning works correctly.
let expr = replace(expr, &root(), self.expanded_root_expr.clone());
let expr = expr
.optimize_recursive(self.dtype())
.vortex_expect("We should not fail to simplify expression over struct fields");

// Partition the expression into expressions that can be evaluated over individual fields
let mut partitioned = partition(
expr.clone(),
self.dtype(),
make_free_field_annotator(
self.dtype()
.as_struct_fields_opt()
.vortex_expect("We know it's a struct DType"),
),
)
.vortex_expect("We should not fail to partition expression over struct fields");

if partitioned.partitions.len() == 1 {
// If there's only one partition, we step into the field scope of the original
// expression by replacing any `$.a` with `$`.
return Partitioned::Single(
partitioned.partition_names[0].clone(),
replace(expr, &col(partitioned.partition_names[0].clone()), root()),
);
}

// We now need to process the partitioned expressions to rewrite the root scope
// to be that of the field, rather than the struct. In other words, "stepping in"
// to the field scope.
partitioned.partitions = partitioned
.partitions
.iter()
.zip_eq(partitioned.partition_names.iter())
.map(|(e, name)| replace(e.clone(), &col(name.clone()), root()))
.collect();

Partitioned::Multi(Arc::new(partitioned))
}
}

/// When partitioning an expression, in the case it only has a single partition we can avoid
Expand Down
Loading