Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions vortex-array/public-api.lock
Original file line number Diff line number Diff line change
Expand Up @@ -23264,8 +23264,6 @@ impl vortex_array::ExecutionCtx

pub fn vortex_array::ExecutionCtx::allocator(&self) -> vortex_array::memory::HostAllocatorRef

pub fn vortex_array::ExecutionCtx::log(&mut self, core::fmt::Arguments<'_>)

pub fn vortex_array::ExecutionCtx::new(vortex_session::VortexSession) -> Self

pub fn vortex_array::ExecutionCtx::session(&self) -> &vortex_session::VortexSession
Expand All @@ -23282,10 +23280,6 @@ impl core::fmt::Display for vortex_array::ExecutionCtx

pub fn vortex_array::ExecutionCtx::fmt(&self, &mut core::fmt::Formatter<'_>) -> core::fmt::Result

impl core::ops::drop::Drop for vortex_array::ExecutionCtx

pub fn vortex_array::ExecutionCtx::drop(&mut self)

pub struct vortex_array::ExecutionResult

impl vortex_array::ExecutionResult
Expand Down
1 change: 0 additions & 1 deletion vortex-array/src/arrays/scalar_fn/vtable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,6 @@ impl VTable for ScalarFn {
}

fn execute(array: Array<Self>, ctx: &mut ExecutionCtx) -> VortexResult<ExecutionResult> {
ctx.log(format_args!("scalar_fn({}): executing", array.scalar_fn()));
let args = VecExecutionArgs::new(array.children(), array.len());
array
.scalar_fn()
Expand Down
249 changes: 146 additions & 103 deletions vortex-array/src/executor.rs

Large diffs are not rendered by default.

30 changes: 29 additions & 1 deletion vortex-array/src/kernel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,41 @@ impl<V: VTable> ParentKernelSet<V> {
child_idx: usize,
ctx: &mut ExecutionCtx,
) -> VortexResult<Option<ArrayRef>> {
for kernel in self.kernels.iter() {
for (kernel_idx, kernel) in self.kernels.iter().enumerate() {
crate::trace_array!(use(kernel_idx));
if !kernel.matches(parent) {
crate::trace_array!(record_execute_parent_attempt(
crate::test_harness::trace::current_execute_parent_phase(),
parent,
child.array(),
child_idx,
crate::test_harness::trace::TraceSource::Static,
format!("kernel[{kernel_idx}]"),
crate::test_harness::trace::AttemptOutcome::NoMatch,
));
continue;
}
if let Some(reduced) = kernel.execute_parent(child, parent, child_idx, ctx)? {
crate::trace_array!(record_execute_parent_applied(
crate::test_harness::trace::current_execute_parent_phase(),
parent,
child.array(),
child_idx,
crate::test_harness::trace::TraceSource::Static,
format!("kernel[{kernel_idx}]"),
&reduced,
));
return Ok(Some(reduced));
}
crate::trace_array!(record_execute_parent_attempt(
crate::test_harness::trace::current_execute_parent_phase(),
parent,
child.array(),
child_idx,
crate::test_harness::trace::TraceSource::Static,
format!("kernel[{kernel_idx}]"),
crate::test_harness::trace::AttemptOutcome::Declined,
));
}
Ok(None)
}
Expand Down
2 changes: 2 additions & 0 deletions vortex-array/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ pub mod accessor;
pub mod aggregate_fn;
#[doc(hidden)]
pub mod aliases;
mod trace_macros;
pub(crate) use trace_macros::trace_array;
mod array;
pub mod arrays;
pub mod arrow;
Expand Down
81 changes: 59 additions & 22 deletions vortex-array/src/optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,22 +71,24 @@ fn try_optimize(
let mut any_optimizations = false;
let array_ref = session.and_then(|s| s.get_opt::<ArrayKernels>());

crate::trace_array!(record_optimize_start(array, session.is_some()));

// Apply reduction rules to the current array until no more rules apply.
let mut loop_counter = 0;
'outer: loop {
if loop_counter > 100 {
vortex_bail!("Exceeded maximum optimization iterations (possible infinite loop)");
}
loop_counter += 1;
for _ in 0..=100 {
crate::trace_array!(record_optimize_loop_start(&current_array));

if let Some(new_array) = current_array.reduce()? {
current_array = new_array;
any_optimizations = true;
crate::trace_array!(record_optimize_loop_end());
continue;
}

crate::trace_array!(record_optimize_reduce_none(&current_array));

// Apply parent reduction rules to each slot in the context of the current array.
// Its important to take all slots here, as `current_array` can change inside the loop.
let mut parent_reduced = None;
for (slot_idx, slot) in current_array.slots().iter().enumerate() {
let Some(child) = slot else { continue };

Expand All @@ -95,34 +97,62 @@ fn try_optimize(
&& let Some(plugins) =
array_ref.find_reduce_parent(current_array.encoding_id(), child.encoding_id())
{
for plugin in plugins.as_ref() {
for (plugin_idx, plugin) in plugins.as_ref().iter().enumerate() {
crate::trace_array!(use(plugin_idx));
if let Some(new_array) = plugin(child, &current_array, slot_idx)? {
current_array = new_array;
any_optimizations = true;
continue 'outer;
crate::trace_array!(record_parent_reduce_applied(
&current_array,
child,
slot_idx,
crate::test_harness::trace::TraceSource::Session(plugin_idx),
"reduce_parent_fn",
&new_array,
));
parent_reduced = Some(new_array);
break;
}
crate::trace_array!(record_parent_reduce_attempt(
&current_array,
child,
slot_idx,
crate::test_harness::trace::TraceSource::Session(plugin_idx),
"reduce_parent_fn",
crate::test_harness::trace::AttemptOutcome::Declined,
));
}
if parent_reduced.is_some() {
break;
}
}

if let Some(new_array) = child.reduce_parent(&current_array, slot_idx)? {
// If the parent was replaced, then we attempt to reduce it again.
current_array = new_array;
any_optimizations = true;

// Continue to the start of the outer loop
continue 'outer;
parent_reduced = Some(new_array);
break;
}
}

if let Some(new_array) = parent_reduced {
// If the parent was replaced, then we attempt to reduce it again.
current_array = new_array;
any_optimizations = true;
crate::trace_array!(record_optimize_loop_end());
continue;
}

crate::trace_array!(record_optimize_parent_reduce_none(&current_array));
crate::trace_array!(record_optimize_loop_end());

// No more optimizations can be applied
break;
}
crate::trace_array!(record_optimize_done(&current_array, any_optimizations));

if any_optimizations {
Ok(Some(current_array))
} else {
Ok(None)
if any_optimizations {
return Ok(Some(current_array));
} else {
return Ok(None);
}
}

vortex_bail!("Exceeded maximum optimization iterations (possible infinite loop)");
}

fn try_optimize_recursive(
Expand All @@ -132,6 +162,8 @@ fn try_optimize_recursive(
let mut current_array = array.clone();
let mut any_optimizations = false;

crate::trace_array!(record_optimize_recursive_start(array));

if let Some(new_array) = try_optimize(&current_array, Some(session))? {
current_array = new_array;
any_optimizations = true;
Expand All @@ -143,6 +175,11 @@ fn try_optimize_recursive(
match slot {
Some(child) => {
if let Some(new_child) = try_optimize_recursive(child, session)? {
crate::trace_array!(record_optimize_recursive_slot(
new_slots.len(),
child,
&new_child,
));
new_slots.push(Some(new_child));
any_slot_optimized = true;
} else {
Expand Down
30 changes: 30 additions & 0 deletions vortex-array/src/optimizer/rules.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,14 @@ impl<V: VTable> ReduceRuleSet<V> {
pub fn evaluate(&self, array: ArrayView<'_, V>) -> VortexResult<Option<ArrayRef>> {
for rule in self.rules.iter() {
if let Some(reduced) = rule.reduce(array)? {
crate::trace_array!(record_reduce_applied(array.array(), *rule, &reduced));
return Ok(Some(reduced));
}
crate::trace_array!(record_reduce_attempt(
array.array(),
*rule,
crate::test_harness::trace::AttemptOutcome::Declined,
));
}
Ok(None)
}
Expand Down Expand Up @@ -176,6 +182,14 @@ impl<V: VTable> ParentRuleSet<V> {
) -> VortexResult<Option<ArrayRef>> {
for rule in self.rules.iter() {
if !rule.matches(parent) {
crate::trace_array!(record_parent_reduce_attempt(
parent,
child.array(),
child_idx,
crate::test_harness::trace::TraceSource::Static,
crate::test_harness::trace::compact_label(*rule),
crate::test_harness::trace::AttemptOutcome::NoMatch,
));
continue;
}
if let Some(reduced) = rule.reduce_parent(child, parent, child_idx)? {
Expand All @@ -198,8 +212,24 @@ impl<V: VTable> ParentRuleSet<V> {
);
}

crate::trace_array!(record_parent_reduce_applied(
parent,
child.array(),
child_idx,
crate::test_harness::trace::TraceSource::Static,
crate::test_harness::trace::compact_label(*rule),
&reduced,
));
return Ok(Some(reduced));
}
crate::trace_array!(record_parent_reduce_attempt(
parent,
child.array(),
child_idx,
crate::test_harness::trace::TraceSource::Static,
crate::test_harness::trace::compact_label(*rule),
crate::test_harness::trace::AttemptOutcome::Declined,
));
}
Ok(None)
}
Expand Down
4 changes: 4 additions & 0 deletions vortex-array/src/test_harness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ use crate::VortexSessionExecute;
use crate::arrays::BoolArray;
use crate::arrays::bool::BoolArrayExt;

#[cfg(not(codspeed))]
pub mod trace;
pub mod trace_arrays;

/// Check that a named metadata matches its previous versioning.
///
/// Goldenfile takes care of checking for equality against a checked-in file.
Expand Down
Loading
Loading