From b7116a20da57c3f7a5b82915f607452b8df7e843 Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Wed, 6 May 2026 14:54:51 +0100 Subject: [PATCH 1/4] Exectuion and optimization tracing harness for testing Signed-off-by: Adam Gutglick --- vortex-array/public-api.lock | 6 - .../src/arrays/scalar_fn/vtable/mod.rs | 1 - vortex-array/src/executor.rs | 248 ++-- vortex-array/src/kernel.rs | 30 +- vortex-array/src/lib.rs | 5 + vortex-array/src/optimizer/mod.rs | 81 +- vortex-array/src/optimizer/rules.rs | 30 + vortex-array/src/test_harness.rs | 3 + vortex-array/src/test_harness/trace.rs | 1319 +++++++++++++++++ vortex-array/src/test_harness/trace_arrays.rs | 332 +++++ vortex-array/src/trace_macros.rs | 63 + 11 files changed, 1985 insertions(+), 133 deletions(-) create mode 100644 vortex-array/src/test_harness/trace.rs create mode 100644 vortex-array/src/test_harness/trace_arrays.rs create mode 100644 vortex-array/src/trace_macros.rs diff --git a/vortex-array/public-api.lock b/vortex-array/public-api.lock index e65e2d59bec..e6c2d904def 100644 --- a/vortex-array/public-api.lock +++ b/vortex-array/public-api.lock @@ -23264,8 +23264,6 @@ impl vortex_array::ExecutionCtx pub fn vortex_array::ExecutionCtx::allocator(&self) -> vortex_array::memory::HostAllocatorRef -pub fn vortex_array::ExecutionCtx::log(&mut self, core::fmt::Arguments<'_>) - pub fn vortex_array::ExecutionCtx::new(vortex_session::VortexSession) -> Self pub fn vortex_array::ExecutionCtx::session(&self) -> &vortex_session::VortexSession @@ -23282,10 +23280,6 @@ impl core::fmt::Display for vortex_array::ExecutionCtx pub fn vortex_array::ExecutionCtx::fmt(&self, &mut core::fmt::Formatter<'_>) -> core::fmt::Result -impl core::ops::drop::Drop for vortex_array::ExecutionCtx - -pub fn vortex_array::ExecutionCtx::drop(&mut self) - pub struct vortex_array::ExecutionResult impl vortex_array::ExecutionResult diff --git a/vortex-array/src/arrays/scalar_fn/vtable/mod.rs b/vortex-array/src/arrays/scalar_fn/vtable/mod.rs index c0249ac4824..4349e4f68c7 100644 --- a/vortex-array/src/arrays/scalar_fn/vtable/mod.rs +++ b/vortex-array/src/arrays/scalar_fn/vtable/mod.rs @@ -145,7 +145,6 @@ impl VTable for ScalarFn { } fn execute(array: Array, ctx: &mut ExecutionCtx) -> VortexResult { - ctx.log(format_args!("scalar_fn({}): executing", array.scalar_fn())); let args = VecExecutionArgs::new(array.children(), array.len()); array .scalar_fn() diff --git a/vortex-array/src/executor.rs b/vortex-array/src/executor.rs index f6a2496335d..54eb46d3bc2 100644 --- a/vortex-array/src/executor.rs +++ b/vortex-array/src/executor.rs @@ -160,23 +160,56 @@ impl ArrayRef { let mut stack: Vec = Vec::new(); let max_iterations = max_iterations(); - for _ in 0..max_iterations { + crate::trace_array!(record_execute_until_start::(¤t_array)); + + for iteration in 0..max_iterations { + crate::trace_array_use!(iteration); + crate::trace_array!(record_execute_until_iteration( + iteration + 1, + ¤t_array, + stack + .last() + .map(|frame| (&frame.parent_array, frame.slot_idx)), + current_builder.is_some(), + )); + let is_done = stack .last() .map_or(M::matches as DonePredicate, |frame| frame.done); - if is_done(¤t_array) || AnyCanonical::matches(¤t_array) { + let done_target = is_done(¤t_array); + let done_canonical = AnyCanonical::matches(¤t_array); + crate::trace_array!(record_execute_until_done_check(done_target, done_canonical)); + + if done_target || done_canonical { match stack.pop() { None => { debug_assert!( current_builder.is_none(), "root activation should not retain a builder" ); - ctx.log(format_args!("-> {}", current_array)); + crate::trace_array!(record_execute_until_return(¤t_array)); return Ok(current_array); } Some(frame) => { + let trace_pop_frame = crate::trace_array_value!( + Some(( + frame.parent_array.clone(), + current_array.clone(), + frame.slot_idx + )), + None::<(ArrayRef, ArrayRef, usize)> + ); (current_array, current_builder) = pop_frame(frame, current_array)?; + if let Some((parent_before, child_before, slot_idx)) = trace_pop_frame { + crate::trace_array_use!(parent_before, child_before, slot_idx,); + crate::trace_array!(record_execute_until_pop_frame( + &parent_before, + slot_idx, + &child_before, + ¤t_array, + )); + } continue; } } @@ -194,45 +227,58 @@ impl ArrayRef { // would be lost when we restore frame.parent_builder. if current_builder.is_none() && let Some(frame) = stack.last() - && let Some(result) = - current_array.execute_parent(&frame.parent_array, frame.slot_idx, ctx)? + && let Some(result) = execute_parent_for_child( + "stack_execute_parent", + &frame.parent_array, + ¤t_array, + frame.slot_idx, + None, + ctx, + )? { - ctx.log(format_args!( - "execute_parent (stack) rewrote {} -> {}", - current_array, result - )); let frame = stack.pop().vortex_expect("just peeked"); - current_array = result.optimize_ctx(ctx.session())?; + let optimized = result.optimize_ctx(ctx.session())?; + crate::trace_array!(record_execute_optimized(&result, &optimized)); + current_array = optimized; current_builder = frame.parent_builder; continue; } + if current_builder.is_none() && stack.last().is_some() { + crate::trace_array!(record_execute_parent_none( + "stack_execute_parent", + ¤t_array, + )); + } // Step 2b: execute_parent against current_array's own children. if current_builder.is_none() && let Some(rewritten) = try_execute_parent(¤t_array, ctx)? { - ctx.log(format_args!( - "execute_parent rewrote {} -> {}", - current_array, rewritten - )); - current_array = rewritten.optimize_ctx(ctx.session())?; + let optimized = rewritten.optimize_ctx(ctx.session())?; + crate::trace_array!(record_execute_optimized(&rewritten, &optimized)); + current_array = optimized; continue; } + if current_builder.is_none() { + crate::trace_array!(record_execute_parent_none( + "child_execute_parent", + ¤t_array, + )); + } // execute step let expected_len = current_array.len(); let expected_dtype = current_array.dtype().clone(); let stats = current_array.statistics().to_array_stats(); let encoding_id = current_array.encoding_id(); + crate::trace_array!(record_execute_encoding(¤t_array)); let result = current_array.execute_encoding_unchecked(ctx)?; let (array, step) = result.into_parts(); match step { ExecutionStep::ExecuteSlot(i, done) => { let (parent, child) = unsafe { array.take_slot_unchecked(i) }?; - ctx.log(format_args!( - "ExecuteSlot({i}): pushing {}, focusing on {}", - parent, child - )); + + crate::trace_array!(record_execute_slot(i, &parent, &child)); stack.push(StackFrame { parent_array: parent, parent_builder: current_builder.take(), @@ -246,6 +292,7 @@ impl ArrayRef { } ExecutionStep::AppendChild(i) => { if current_builder.is_none() { + crate::trace_array!(record_builder_start(&array)); current_builder = Some(builder_with_capacity_in( ctx.allocator(), array.dtype(), @@ -253,10 +300,10 @@ impl ArrayRef { )); } let (parent, child) = unsafe { array.take_slot_unchecked(i) }?; - ctx.log(format_args!( - "AppendChild({i}): appending {} into builder", - child - )); + + crate::trace_array!(record_append_child(i, &parent, &child)); + crate::trace_array!(record_builder_append(&child)); + // TODO(joe)[7674]: replace with a builder kernel registry so we don't // need to go through the VTable append_to_builder indirection. child.append_to_builder( @@ -268,7 +315,8 @@ impl ArrayRef { current_array = parent; } ExecutionStep::Done => { - ctx.log(format_args!("Done: {}", array)); + let had_builder = current_builder.is_some(); + crate::trace_array!(record_execute_done(&array)); (current_array, current_builder) = finalize_done( array, current_builder, @@ -277,6 +325,9 @@ impl ArrayRef { stats, encoding_id, )?; + if had_builder { + crate::trace_array!(record_builder_finish(¤t_array)); + } } } } @@ -303,8 +354,6 @@ pub struct ExecutionCtx { session: VortexSession, #[cfg(debug_assertions)] id: usize, - #[cfg(debug_assertions)] - ops: Vec, } impl ExecutionCtx { @@ -318,8 +367,6 @@ impl ExecutionCtx { std::sync::atomic::AtomicUsize::new(0); EXEC_CTX_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed) }, - #[cfg(debug_assertions)] - ops: Vec::new(), } } @@ -332,22 +379,6 @@ impl ExecutionCtx { pub fn allocator(&self) -> HostAllocatorRef { self.session.allocator() } - - /// Log an execution step at the current depth. - /// - /// Steps are accumulated and dumped as a single trace on Drop at DEBUG level. - /// Individual steps are also logged at TRACE level for real-time following. - /// - /// Use the [`format_args!`] macro to create the `msg` argument. - pub fn log(&mut self, msg: fmt::Arguments<'_>) { - #[cfg(debug_assertions)] - if tracing::enabled!(tracing::Level::DEBUG) { - let formatted = format!(" - {msg}"); - tracing::trace!("exec[{}]: {formatted}", self.id); - self.ops.push(formatted); - } - let _ = msg; - } } impl Display for ExecutionCtx { @@ -359,28 +390,6 @@ impl Display for ExecutionCtx { } } -#[cfg(debug_assertions)] -impl Drop for ExecutionCtx { - fn drop(&mut self) { - if !self.ops.is_empty() && tracing::enabled!(tracing::Level::DEBUG) { - // Unlike itertools `.format()` (panics in 0.14 on second format) - struct FmtOps<'a>(&'a [String]); - impl Display for FmtOps<'_> { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - for (i, op) in self.0.iter().enumerate() { - if i > 0 { - f.write_str("\n")?; - } - f.write_str(op)?; - } - Ok(()) - } - } - tracing::debug!("exec[{}] trace:\n{}", self.id, FmtOps(&self.ops)); - } - } -} - /// Single-step execution: takes one step toward canonical form. /// /// Steps through reduce, reduce_parent, execute_parent, then execute. For `ExecuteSlot`, @@ -395,60 +404,68 @@ impl Drop for ExecutionCtx { /// `AppendChild` is returned. impl Executable for ArrayRef { fn execute(array: ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult { + crate::trace_array!(record_single_step_start(&array)); + if let Some(canonical) = array.as_opt::() { - ctx.log(format_args!("-> canonical {}", array)); - return Ok(Canonical::from(canonical).into_array()); + let output = Canonical::from(canonical).into_array(); + crate::trace_array!(record_single_step_applied("canonical", &array, &output)); + return Ok(output); } + crate::trace_array!(record_single_step_phase_none("canonical", &array)); if let Some(reduced) = array.reduce()? { - ctx.log(format_args!("reduce: rewrote {} -> {}", array, reduced)); reduced.statistics().inherit_from(array.statistics()); + crate::trace_array!(record_single_step_applied("reduce", &array, &reduced)); return Ok(reduced); } + crate::trace_array!(record_single_step_phase_none("reduce", &array)); for (slot_idx, slot) in array.slots().iter().enumerate() { let Some(child) = slot else { continue }; if let Some(reduced_parent) = child.reduce_parent(&array, slot_idx)? { - ctx.log(format_args!( - "reduce_parent: slot[{}]({}) rewrote {} -> {}", - slot_idx, - child.encoding_id(), - array, - reduced_parent - )); reduced_parent.statistics().inherit_from(array.statistics()); + crate::trace_array!(record_single_step_applied( + "reduce_parent", + &array, + &reduced_parent, + )); return Ok(reduced_parent); } } + crate::trace_array!(record_single_step_phase_none("reduce_parent", &array)); let tmp_session = ctx.session().clone(); let kernels = tmp_session.get_opt::(); for (slot_idx, slot) in array.slots().iter().enumerate() { let Some(child) = slot else { continue }; - if let Some(executed_parent) = - execute_parent_for_child(&array, child, slot_idx, kernels.as_ref(), ctx)? - { - ctx.log(format_args!( - "execute_parent: slot[{}]({}) rewrote {} -> {}", - slot_idx, - child.encoding_id(), - array, - executed_parent - )); + if let Some(executed_parent) = execute_parent_for_child( + "single_step_execute_parent", + &array, + child, + slot_idx, + kernels.as_ref(), + ctx, + )? { executed_parent .statistics() .inherit_from(array.statistics()); + crate::trace_array!(record_single_step_applied( + "execute_parent", + &array, + &executed_parent, + )); return Ok(executed_parent); } } + crate::trace_array!(record_single_step_phase_none("execute_parent", &array)); + crate::trace_array!(record_execute_encoding(&array)); - ctx.log(format_args!("executing {}", array)); let result = array.execute_encoding(ctx)?; let (array, step) = result.into_parts(); match step { ExecutionStep::Done => { - ctx.log(format_args!("-> {}", array)); + crate::trace_array!(record_execute_done(&array)); Ok(array) } ExecutionStep::ExecuteSlot(i, _) => { @@ -458,9 +475,12 @@ impl Executable for ArrayRef { } ExecutionStep::AppendChild(_) => { // Single-step: build the entire parent via the builder path. + crate::trace_array!(record_builder_start(&array)); let builder = builder_with_capacity_in(ctx.allocator(), array.dtype(), array.len()); let mut builder = execute_into_builder(array, builder, ctx)?; - Ok(builder.finish()) + let output = builder.finish(); + crate::trace_array!(record_builder_finish(&output)); + Ok(output) } } } @@ -536,6 +556,7 @@ fn finalize_done( } fn execute_parent_for_child( + phase: &'static str, parent: &ArrayRef, child: &ArrayRef, slot_idx: usize, @@ -546,14 +567,33 @@ fn execute_parent_for_child( && let Some(plugins) = kernels.find_execute_parent(parent.encoding_id(), child.encoding_id()) { - for plugin in plugins.as_ref() { + for (plugin_idx, plugin) in plugins.as_ref().iter().enumerate() { + crate::trace_array_use!(plugin_idx); if let Some(result) = plugin(child, parent, slot_idx, ctx)? { + crate::trace_array!(record_execute_parent_applied( + phase, + parent, + child, + slot_idx, + crate::test_harness::trace::TraceSource::Session(plugin_idx), + "execute_parent_fn", + &result, + )); return Ok(Some(result)); } + crate::trace_array!(record_execute_parent_attempt( + phase, + parent, + child, + slot_idx, + crate::test_harness::trace::TraceSource::Session(plugin_idx), + "execute_parent_fn", + crate::test_harness::trace::AttemptOutcome::Declined, + )); } } - child.execute_parent(parent, slot_idx, ctx) + crate::trace_array_scope!(phase, || child.execute_parent(parent, slot_idx, ctx)) } /// Try execute_parent on each occupied slot of the array. @@ -563,16 +603,14 @@ fn try_execute_parent(array: &ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult< for (slot_idx, slot) in array.slots().iter().enumerate() { let Some(child) = slot else { continue }; - if let Some(executed_parent) = - execute_parent_for_child(array, child, slot_idx, kernels.as_ref(), ctx)? - { - ctx.log(format_args!( - "execute_parent: slot[{}]({}) rewrote {} -> {}", - slot_idx, - child.encoding_id(), - array, - executed_parent - )); + if let Some(executed_parent) = execute_parent_for_child( + "child_execute_parent", + array, + child, + slot_idx, + kernels.as_ref(), + ctx, + )? { executed_parent .statistics() .inherit_from(array.statistics()); @@ -674,8 +712,10 @@ impl ExecutionResult { /// /// The provided array is the (possibly modified) parent that still needs its slot executed. pub fn execute_slot(array: impl IntoArray, slot_idx: usize) -> Self { + let array = array.into_array(); + crate::trace_array!(record_execute_step_request::(&array, slot_idx)); Self { - array: array.into_array(), + array, step: ExecutionStep::ExecuteSlot(slot_idx, M::matches), } } @@ -684,8 +724,10 @@ impl ExecutionResult { /// activation's canonical builder, and leave the returned parent as the next /// `current_array`. pub fn append_child(array: impl IntoArray, slot_idx: usize) -> Self { + let array = array.into_array(); + crate::trace_array!(record_append_child_request(&array, slot_idx)); Self { - array: array.into_array(), + array, step: ExecutionStep::AppendChild(slot_idx), } } diff --git a/vortex-array/src/kernel.rs b/vortex-array/src/kernel.rs index f5b75471437..7bd4e81600d 100644 --- a/vortex-array/src/kernel.rs +++ b/vortex-array/src/kernel.rs @@ -64,13 +64,41 @@ impl ParentKernelSet { child_idx: usize, ctx: &mut ExecutionCtx, ) -> VortexResult> { - for kernel in self.kernels.iter() { + for (kernel_idx, kernel) in self.kernels.iter().enumerate() { + crate::trace_array_use!(kernel_idx); if !kernel.matches(parent) { + crate::trace_array!(record_execute_parent_attempt( + crate::test_harness::trace::current_execute_parent_phase(), + parent, + child.array(), + child_idx, + crate::test_harness::trace::TraceSource::Static, + format!("kernel[{kernel_idx}]"), + crate::test_harness::trace::AttemptOutcome::NoMatch, + )); continue; } if let Some(reduced) = kernel.execute_parent(child, parent, child_idx, ctx)? { + crate::trace_array!(record_execute_parent_applied( + crate::test_harness::trace::current_execute_parent_phase(), + parent, + child.array(), + child_idx, + crate::test_harness::trace::TraceSource::Static, + format!("kernel[{kernel_idx}]"), + &reduced, + )); return Ok(Some(reduced)); } + crate::trace_array!(record_execute_parent_attempt( + crate::test_harness::trace::current_execute_parent_phase(), + parent, + child.array(), + child_idx, + crate::test_harness::trace::TraceSource::Static, + format!("kernel[{kernel_idx}]"), + crate::test_harness::trace::AttemptOutcome::Declined, + )); } Ok(None) } diff --git a/vortex-array/src/lib.rs b/vortex-array/src/lib.rs index 0a9c5969ecc..a8e497c5f20 100644 --- a/vortex-array/src/lib.rs +++ b/vortex-array/src/lib.rs @@ -33,6 +33,11 @@ pub mod accessor; pub mod aggregate_fn; #[doc(hidden)] pub mod aliases; +mod trace_macros; +pub(crate) use trace_macros::trace_array; +pub(crate) use trace_macros::trace_array_scope; +pub(crate) use trace_macros::trace_array_use; +pub(crate) use trace_macros::trace_array_value; mod array; pub mod arrays; pub mod arrow; diff --git a/vortex-array/src/optimizer/mod.rs b/vortex-array/src/optimizer/mod.rs index d6e93ca0561..3f07d5a0460 100644 --- a/vortex-array/src/optimizer/mod.rs +++ b/vortex-array/src/optimizer/mod.rs @@ -71,22 +71,24 @@ fn try_optimize( let mut any_optimizations = false; let array_ref = session.and_then(|s| s.get_opt::()); + crate::trace_array!(record_optimize_start(array, session.is_some())); + // Apply reduction rules to the current array until no more rules apply. - let mut loop_counter = 0; - 'outer: loop { - if loop_counter > 100 { - vortex_bail!("Exceeded maximum optimization iterations (possible infinite loop)"); - } - loop_counter += 1; + for _ in 0..=100 { + crate::trace_array!(record_optimize_loop_start(¤t_array)); if let Some(new_array) = current_array.reduce()? { current_array = new_array; any_optimizations = true; + crate::trace_array!(record_optimize_loop_end()); continue; } + crate::trace_array!(record_optimize_reduce_none(¤t_array)); + // Apply parent reduction rules to each slot in the context of the current array. // Its important to take all slots here, as `current_array` can change inside the loop. + let mut parent_reduced = None; for (slot_idx, slot) in current_array.slots().iter().enumerate() { let Some(child) = slot else { continue }; @@ -95,34 +97,62 @@ fn try_optimize( && let Some(plugins) = array_ref.find_reduce_parent(current_array.encoding_id(), child.encoding_id()) { - for plugin in plugins.as_ref() { + for (plugin_idx, plugin) in plugins.as_ref().iter().enumerate() { + crate::trace_array_use!(plugin_idx); if let Some(new_array) = plugin(child, ¤t_array, slot_idx)? { - current_array = new_array; - any_optimizations = true; - continue 'outer; + crate::trace_array!(record_parent_reduce_applied( + ¤t_array, + child, + slot_idx, + crate::test_harness::trace::TraceSource::Session(plugin_idx), + "reduce_parent_fn", + &new_array, + )); + parent_reduced = Some(new_array); + break; } + crate::trace_array!(record_parent_reduce_attempt( + ¤t_array, + child, + slot_idx, + crate::test_harness::trace::TraceSource::Session(plugin_idx), + "reduce_parent_fn", + crate::test_harness::trace::AttemptOutcome::Declined, + )); + } + if parent_reduced.is_some() { + break; } } if let Some(new_array) = child.reduce_parent(¤t_array, slot_idx)? { - // If the parent was replaced, then we attempt to reduce it again. - current_array = new_array; - any_optimizations = true; - - // Continue to the start of the outer loop - continue 'outer; + parent_reduced = Some(new_array); + break; } } + if let Some(new_array) = parent_reduced { + // If the parent was replaced, then we attempt to reduce it again. + current_array = new_array; + any_optimizations = true; + crate::trace_array!(record_optimize_loop_end()); + continue; + } + + crate::trace_array!(record_optimize_parent_reduce_none(¤t_array)); + crate::trace_array!(record_optimize_loop_end()); + // No more optimizations can be applied - break; - } + crate::trace_array!(record_optimize_done(¤t_array, any_optimizations)); - if any_optimizations { - Ok(Some(current_array)) - } else { - Ok(None) + if any_optimizations { + return Ok(Some(current_array)); + } else { + return Ok(None); + } } + + vortex_bail!("Exceeded maximum optimization iterations (possible infinite loop)"); } fn try_optimize_recursive( @@ -132,6 +162,8 @@ fn try_optimize_recursive( let mut current_array = array.clone(); let mut any_optimizations = false; + crate::trace_array!(record_optimize_recursive_start(array)); + if let Some(new_array) = try_optimize(¤t_array, Some(session))? { current_array = new_array; any_optimizations = true; @@ -143,6 +175,11 @@ fn try_optimize_recursive( match slot { Some(child) => { if let Some(new_child) = try_optimize_recursive(child, session)? { + crate::trace_array!(record_optimize_recursive_slot( + new_slots.len(), + child, + &new_child, + )); new_slots.push(Some(new_child)); any_slot_optimized = true; } else { diff --git a/vortex-array/src/optimizer/rules.rs b/vortex-array/src/optimizer/rules.rs index e505b21a199..5ccd48e10c3 100644 --- a/vortex-array/src/optimizer/rules.rs +++ b/vortex-array/src/optimizer/rules.rs @@ -133,8 +133,14 @@ impl ReduceRuleSet { pub fn evaluate(&self, array: ArrayView<'_, V>) -> VortexResult> { for rule in self.rules.iter() { if let Some(reduced) = rule.reduce(array)? { + crate::trace_array!(record_reduce_applied(array.array(), *rule, &reduced)); return Ok(Some(reduced)); } + crate::trace_array!(record_reduce_attempt( + array.array(), + *rule, + crate::test_harness::trace::AttemptOutcome::Declined, + )); } Ok(None) } @@ -176,6 +182,14 @@ impl ParentRuleSet { ) -> VortexResult> { for rule in self.rules.iter() { if !rule.matches(parent) { + crate::trace_array!(record_parent_reduce_attempt( + parent, + child.array(), + child_idx, + crate::test_harness::trace::TraceSource::Static, + crate::test_harness::trace::compact_label(*rule), + crate::test_harness::trace::AttemptOutcome::NoMatch, + )); continue; } if let Some(reduced) = rule.reduce_parent(child, parent, child_idx)? { @@ -198,8 +212,24 @@ impl ParentRuleSet { ); } + crate::trace_array!(record_parent_reduce_applied( + parent, + child.array(), + child_idx, + crate::test_harness::trace::TraceSource::Static, + crate::test_harness::trace::compact_label(*rule), + &reduced, + )); return Ok(Some(reduced)); } + crate::trace_array!(record_parent_reduce_attempt( + parent, + child.array(), + child_idx, + crate::test_harness::trace::TraceSource::Static, + crate::test_harness::trace::compact_label(*rule), + crate::test_harness::trace::AttemptOutcome::Declined, + )); } Ok(None) } diff --git a/vortex-array/src/test_harness.rs b/vortex-array/src/test_harness.rs index dabeb8bdde4..2783c50d3f8 100644 --- a/vortex-array/src/test_harness.rs +++ b/vortex-array/src/test_harness.rs @@ -13,6 +13,9 @@ use crate::VortexSessionExecute; use crate::arrays::BoolArray; use crate::arrays::bool::BoolArrayExt; +pub mod trace; +pub mod trace_arrays; + /// Check that a named metadata matches its previous versioning. /// /// Goldenfile takes care of checking for equality against a checked-in file. diff --git a/vortex-array/src/test_harness/trace.rs b/vortex-array/src/test_harness/trace.rs new file mode 100644 index 00000000000..45b5250e694 --- /dev/null +++ b/vortex-array/src/test_harness/trace.rs @@ -0,0 +1,1319 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::cell::Cell; +use std::cell::RefCell; +use std::fmt; +use std::fmt::Debug; +use std::fmt::Display; + +use vortex_error::VortexExpect; +use vortex_error::VortexResult; +use vortex_error::vortex_err; + +use crate::ArrayRef; + +/// Controls how much rule and kernel resolution detail is captured. +#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)] +pub enum TraceResolution { + /// Record only the operations that actually executed. + #[default] + ExecutedOnly, + /// Also record rule and kernel attempts that matched but declined, or did not match. + Attempts, +} + +/// Options for [`trace_array_with`]. +#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)] +pub struct TraceOptions { + /// The amount of rule and kernel resolution detail to include. + pub resolution: TraceResolution, +} + +/// The result of a traced operation. +#[derive(Clone, Debug)] +pub struct Traced { + /// The value returned by the traced closure. + pub output: T, + /// A stable, snapshot-friendly rendering of optimizer and execution activity. + pub trace: TraceDisplay, +} + +/// A stable, snapshot-friendly trace. +#[derive(Clone, Debug, Default)] +pub struct TraceDisplay { + options: TraceOptions, + events: Vec, +} + +impl Display for TraceDisplay { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let hidden_events = self.hidden_events(); + let mut optimize_depth = 0usize; + let mut wrote_event = false; + + for (idx, event) in self.events.iter().enumerate() { + if hidden_events[idx] { + continue; + } + + if event.closes_before(self.options.resolution) { + optimize_depth = optimize_depth.saturating_sub(1); + } + + if event.is_hidden(self.options.resolution) { + continue; + } + + if wrote_event { + writeln!(f)?; + } else { + wrote_event = true; + } + + write_indent( + f, + optimize_depth + event.relative_indent(self.options.resolution, optimize_depth > 0), + )?; + event.fmt_line(f, self.options.resolution)?; + + if event.opens_after(self.options.resolution) { + optimize_depth += 1; + } + if event.closes_after(self.options.resolution) { + optimize_depth = optimize_depth.saturating_sub(1); + } + } + Ok(()) + } +} + +impl TraceDisplay { + fn hidden_events(&self) -> Vec { + let mut hidden = vec![false; self.events.len()]; + if self.options.resolution != TraceResolution::ExecutedOnly { + return hidden; + } + + let mut optimize_stack = Vec::new(); + for (idx, event) in self.events.iter().enumerate() { + match event { + TraceEvent::OptimizeStart { .. } => optimize_stack.push(idx), + TraceEvent::OptimizeDone { changed, .. } => { + let Some(start) = optimize_stack.pop() else { + continue; + }; + if !changed { + hidden[start..=idx].fill(true); + } + } + _ => {} + } + } + hidden + } +} + +fn write_indent(f: &mut fmt::Formatter<'_>, depth: usize) -> fmt::Result { + for _ in 0..depth { + f.write_str(" ")?; + } + Ok(()) +} + +/// Run `f` while capturing default trace output. +/// +/// The default resolution records the rule rewrites, parent kernels, execution steps, and builder +/// activity that actually executed. Use [`trace_array_with`] and [`TraceResolution::Attempts`] +/// when a test needs to assert on every declined rule or kernel attempt. +pub fn trace_array(f: impl FnOnce() -> VortexResult) -> VortexResult> { + trace_array_with(TraceOptions::default(), f) +} + +/// Run `f` while capturing trace output using `options`. +/// +/// Trace capture is thread-local and intentionally does not propagate to worker threads. Nested +/// trace captures return an error so tests do not accidentally merge unrelated traces. +pub fn trace_array_with( + options: TraceOptions, + f: impl FnOnce() -> VortexResult, +) -> VortexResult> { + ACTIVE_TRACE.with(|active| { + let mut active = active.borrow_mut(); + if active.is_some() { + return Err(vortex_err!("trace_array captures cannot be nested")); + } + *active = Some(TraceRecorder::new(options)); + Ok(()) + })?; + TRACE_INTEREST.with(|interest| interest.set(TraceInterest::from(options.resolution))); + + let guard = ActiveTraceGuard; + let output = f(); + let recorder = ACTIVE_TRACE.with(|active| { + active + .borrow_mut() + .take() + .vortex_expect("trace recorder must be installed") + }); + drop(guard); + + output.map(|output| Traced { + output, + trace: recorder.finish(), + }) +} + +/// Returns true when the current thread has an active trace recorder. +#[inline] +pub(crate) fn is_active() -> bool { + TRACE_INTEREST.with(|interest| interest.get().is_active()) +} + +#[inline] +pub(crate) fn if_active(enabled: impl FnOnce() -> R, disabled: impl FnOnce() -> R) -> R { + if is_active() { enabled() } else { disabled() } +} + +#[inline] +fn attempts_enabled() -> bool { + TRACE_INTEREST.with(|interest| interest.get() == TraceInterest::Attempts) +} + +#[derive(Clone, Copy, Debug)] +pub(crate) enum TraceSource { + Static, + Session(usize), +} + +impl Display for TraceSource { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + TraceSource::Static => f.write_str("static"), + TraceSource::Session(idx) => write!(f, "session[{idx}]"), + } + } +} + +#[derive(Clone, Copy, Debug)] +pub(crate) enum AttemptOutcome { + Declined, + NoMatch, +} + +impl Display for AttemptOutcome { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + AttemptOutcome::Declined => f.write_str("declined"), + AttemptOutcome::NoMatch => f.write_str("no-match"), + } + } +} + +#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)] +enum TraceInterest { + #[default] + Off, + ExecutedOnly, + Attempts, +} + +impl TraceInterest { + #[inline] + fn is_active(self) -> bool { + self != Self::Off + } +} + +impl From for TraceInterest { + fn from(resolution: TraceResolution) -> Self { + match resolution { + TraceResolution::ExecutedOnly => Self::ExecutedOnly, + TraceResolution::Attempts => Self::Attempts, + } + } +} + +#[derive(Clone, Debug, Eq, PartialEq)] +pub(crate) struct ArraySummary { + encoding: String, + len: usize, + dtype: String, +} + +impl ArraySummary { + pub(crate) fn new(array: &ArrayRef) -> Self { + Self { + encoding: array.encoding_id().to_string(), + len: array.len(), + dtype: array.dtype().to_string(), + } + } +} + +impl Display for ArraySummary { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{} len={} dtype={}", self.encoding, self.len, self.dtype) + } +} + +pub(crate) fn record_optimize_start(root: &ArrayRef, session: bool) { + record(TraceEvent::OptimizeStart { + root: ArraySummary::new(root), + session, + }); +} + +pub(crate) fn record_optimize_loop_start(array: &ArrayRef) { + if !attempts_enabled() { + return; + } + record(TraceEvent::OptimizeLoopStart { + array: ArraySummary::new(array), + }); +} + +pub(crate) fn record_optimize_loop_end() { + if !attempts_enabled() { + return; + } + record(TraceEvent::OptimizeLoopEnd); +} + +pub(crate) fn record_optimize_reduce_none(array: &ArrayRef) { + if !attempts_enabled() { + return; + } + record(TraceEvent::PhaseNone { + indent: 0, + phase: "reduce", + subject: "array", + array: ArraySummary::new(array), + }); +} + +pub(crate) fn record_optimize_parent_reduce_none(array: &ArrayRef) { + if !attempts_enabled() { + return; + } + record(TraceEvent::PhaseNone { + indent: 0, + phase: "reduce_parent", + subject: "array", + array: ArraySummary::new(array), + }); +} + +pub(crate) fn record_optimize_done(output: &ArrayRef, changed: bool) { + record(TraceEvent::OptimizeDone { + output: ArraySummary::new(output), + changed, + }); +} + +pub(crate) fn record_optimize_recursive_start(root: &ArrayRef) { + record(TraceEvent::OptimizeRecursiveStart { + root: ArraySummary::new(root), + }); +} + +pub(crate) fn record_optimize_recursive_slot(slot_idx: usize, input: &ArrayRef, output: &ArrayRef) { + record(TraceEvent::OptimizeRecursiveSlot { + slot_idx, + input: ArraySummary::new(input), + output: ArraySummary::new(output), + }); +} + +pub(crate) fn record_reduce_attempt(array: &ArrayRef, rule: &dyn Debug, outcome: AttemptOutcome) { + if !attempts_enabled() { + return; + } + record_attempt(TraceEvent::ReduceAttempt { + array: ArraySummary::new(array), + rule: compact_label(rule), + outcome, + }); +} + +pub(crate) fn record_reduce_applied(array: &ArrayRef, rule: &dyn Debug, output: &ArrayRef) { + record(TraceEvent::ReduceApplied { + array: ArraySummary::new(array), + rule: compact_label(rule), + output: ArraySummary::new(output), + }); +} + +pub(crate) fn record_parent_reduce_attempt( + parent: &ArrayRef, + child: &ArrayRef, + slot_idx: usize, + source: TraceSource, + rule: impl Into, + outcome: AttemptOutcome, +) { + if !attempts_enabled() { + return; + } + record_attempt(TraceEvent::ParentReduceAttempt { + parent: ArraySummary::new(parent), + child: ArraySummary::new(child), + slot_idx, + source, + rule: rule.into(), + outcome, + }); +} + +pub(crate) fn record_parent_reduce_applied( + parent: &ArrayRef, + child: &ArrayRef, + slot_idx: usize, + source: TraceSource, + rule: impl Into, + output: &ArrayRef, +) { + record(TraceEvent::ParentReduceApplied { + parent: ArraySummary::new(parent), + child: ArraySummary::new(child), + slot_idx, + source, + rule: rule.into(), + output: ArraySummary::new(output), + }); +} + +pub(crate) fn record_execute_until_start(root: &ArrayRef) { + record(TraceEvent::ExecuteUntilStart { + target: short_type_name::(), + root: ArraySummary::new(root), + }); +} + +pub(crate) fn record_execute_until_iteration( + iteration: usize, + current: &ArrayRef, + stack_parent: Option<(&ArrayRef, usize)>, + builder_active: bool, +) { + record(TraceEvent::ExecuteUntilIteration { + iteration, + current: ArraySummary::new(current), + stack_parent: stack_parent.map(|(array, slot_idx)| (ArraySummary::new(array), slot_idx)), + builder_active, + }); +} + +pub(crate) fn record_execute_until_done_check(target: bool, canonical: bool) { + if !attempts_enabled() { + return; + } + record(TraceEvent::ExecuteUntilDoneCheck { target, canonical }); +} + +pub(crate) fn record_execute_until_return(output: &ArrayRef) { + record(TraceEvent::ExecuteUntilReturn { + output: ArraySummary::new(output), + }); +} + +pub(crate) fn record_execute_until_pop_frame( + parent: &ArrayRef, + slot_idx: usize, + child: &ArrayRef, + output: &ArrayRef, +) { + record(TraceEvent::ExecuteUntilPopFrame { + parent: ArraySummary::new(parent), + slot_idx, + child: ArraySummary::new(child), + output: ArraySummary::new(output), + }); +} + +pub(crate) fn record_execute_parent_attempt( + phase: &'static str, + parent: &ArrayRef, + child: &ArrayRef, + slot_idx: usize, + source: TraceSource, + kernel: impl Into, + outcome: AttemptOutcome, +) { + if !attempts_enabled() { + return; + } + record_attempt(TraceEvent::ExecuteParentAttempt { + phase, + parent: ArraySummary::new(parent), + child: ArraySummary::new(child), + slot_idx, + source, + kernel: kernel.into(), + outcome, + }); +} + +pub(crate) fn record_execute_parent_applied( + phase: &'static str, + parent: &ArrayRef, + child: &ArrayRef, + slot_idx: usize, + source: TraceSource, + kernel: impl Into, + output: &ArrayRef, +) { + record(TraceEvent::ExecuteParentApplied { + phase, + parent: ArraySummary::new(parent), + child: ArraySummary::new(child), + slot_idx, + source, + kernel: kernel.into(), + output: ArraySummary::new(output), + }); +} + +pub(crate) fn record_execute_parent_none(phase: &'static str, current: &ArrayRef) { + if !attempts_enabled() { + return; + } + record(TraceEvent::PhaseNone { + indent: 2, + phase, + subject: "current", + array: ArraySummary::new(current), + }); +} + +pub(crate) fn record_execute_optimized(input: &ArrayRef, output: &ArrayRef) { + let changed = !ArrayRef::ptr_eq(input, output); + if !changed && !attempts_enabled() { + return; + } + record(TraceEvent::ExecuteOptimized { + input: ArraySummary::new(input), + output: ArraySummary::new(output), + changed, + }); +} + +pub(crate) fn record_execute_encoding(array: &ArrayRef) { + if !attempts_enabled() { + return; + } + record(TraceEvent::ExecuteEncoding { + array: ArraySummary::new(array), + }); +} + +pub(crate) fn record_execute_step_request(array: &ArrayRef, slot_idx: usize) { + if !attempts_enabled() { + return; + } + record(TraceEvent::ExecutionRequest { + step: "ExecuteSlot", + parent: ArraySummary::new(array), + slot_idx, + target: Some(short_type_name::()), + }); +} + +pub(crate) fn record_append_child_request(array: &ArrayRef, slot_idx: usize) { + if !attempts_enabled() { + return; + } + record(TraceEvent::ExecutionRequest { + step: "AppendChild", + parent: ArraySummary::new(array), + slot_idx, + target: None, + }); +} + +pub(crate) fn record_execute_slot(slot_idx: usize, parent: &ArrayRef, child: &ArrayRef) { + record(TraceEvent::SlotTransition { + step: "ExecuteSlot", + slot_idx, + parent: ArraySummary::new(parent), + child: ArraySummary::new(child), + }); +} + +pub(crate) fn record_builder_start(array: &ArrayRef) { + record(TraceEvent::BuilderEvent { + action: "start", + subject: "array", + array: ArraySummary::new(array), + }); +} + +pub(crate) fn record_append_child(slot_idx: usize, parent: &ArrayRef, child: &ArrayRef) { + record(TraceEvent::SlotTransition { + step: "AppendChild", + slot_idx, + parent: ArraySummary::new(parent), + child: ArraySummary::new(child), + }); +} + +pub(crate) fn record_builder_append(child: &ArrayRef) { + record(TraceEvent::BuilderEvent { + action: "append", + subject: "child", + array: ArraySummary::new(child), + }); +} + +pub(crate) fn record_execute_done(array: &ArrayRef) { + record(TraceEvent::ExecuteDone { + array: ArraySummary::new(array), + }); +} + +pub(crate) fn record_builder_finish(output: &ArrayRef) { + record(TraceEvent::BuilderEvent { + action: "finish", + subject: "output", + array: ArraySummary::new(output), + }); +} + +pub(crate) fn record_single_step_start(array: &ArrayRef) { + record(TraceEvent::SingleStepStart { + array: ArraySummary::new(array), + }); +} + +pub(crate) fn record_single_step_phase_none(phase: &'static str, array: &ArrayRef) { + if !attempts_enabled() { + return; + } + record(TraceEvent::PhaseNone { + indent: 1, + phase, + subject: "array", + array: ArraySummary::new(array), + }); +} + +pub(crate) fn record_single_step_applied(phase: &'static str, input: &ArrayRef, output: &ArrayRef) { + record(TraceEvent::SingleStepApplied { + phase, + input: ArraySummary::new(input), + output: ArraySummary::new(output), + }); +} + +pub(crate) fn with_execute_parent_phase(phase: &'static str, f: impl FnOnce() -> R) -> R { + EXECUTE_PARENT_PHASE.with(|active| { + let previous = active.replace(phase); + let result = f(); + active.set(previous); + result + }) +} + +pub(crate) fn with_execute_parent_phase_if_active( + phase: &'static str, + f: impl FnOnce() -> R, +) -> R { + if is_active() { + with_execute_parent_phase(phase, f) + } else { + f() + } +} + +pub(crate) fn current_execute_parent_phase() -> &'static str { + EXECUTE_PARENT_PHASE.with(Cell::get) +} + +fn record(event: TraceEvent) { + ACTIVE_TRACE.with(|active| { + if let Some(recorder) = active.borrow_mut().as_mut() { + recorder.events.push(event); + } + }); +} + +fn record_attempt(event: TraceEvent) { + ACTIVE_TRACE.with(|active| { + if let Some(recorder) = active.borrow_mut().as_mut() + && recorder.options.resolution == TraceResolution::Attempts + { + recorder.events.push(event); + } + }); +} + +pub(crate) fn compact_label(value: &dyn Debug) -> String { + let label = format!("{value:?}"); + if let Some(label) = adapter_field(&label, "rule") { + return label.to_string(); + } + if let Some(label) = adapter_field(&label, "kernel") { + return label.to_string(); + } + label +} + +fn adapter_field<'a>(label: &'a str, field: &str) -> Option<&'a str> { + let marker = format!("{field}: "); + let start = label.find(&marker)? + marker.len(); + let rest = &label[start..]; + let end = rest.rfind(" }")?; + Some(&rest[..end]) +} + +fn short_type_name() -> String { + std::any::type_name::() + .rsplit("::") + .next() + .vortex_expect("type names are never empty") + .to_string() +} + +thread_local! { + static TRACE_INTEREST: Cell = const { Cell::new(TraceInterest::Off) }; + static ACTIVE_TRACE: RefCell> = const { RefCell::new(None) }; + static EXECUTE_PARENT_PHASE: Cell<&'static str> = const { Cell::new("execute_parent") }; +} + +struct ActiveTraceGuard; + +impl Drop for ActiveTraceGuard { + fn drop(&mut self) { + TRACE_INTEREST.with(|interest| interest.set(TraceInterest::Off)); + ACTIVE_TRACE.with(|active| { + active.borrow_mut().take(); + }); + } +} + +#[derive(Debug)] +struct TraceRecorder { + options: TraceOptions, + events: Vec, +} + +impl TraceRecorder { + fn new(options: TraceOptions) -> Self { + Self { + options, + events: Vec::new(), + } + } + + fn finish(self) -> TraceDisplay { + TraceDisplay { + options: self.options, + events: self.events, + } + } +} + +#[derive(Clone, Debug)] +enum TraceEvent { + OptimizeStart { + root: ArraySummary, + session: bool, + }, + OptimizeLoopStart { + array: ArraySummary, + }, + OptimizeLoopEnd, + OptimizeDone { + output: ArraySummary, + changed: bool, + }, + OptimizeRecursiveStart { + root: ArraySummary, + }, + OptimizeRecursiveSlot { + slot_idx: usize, + input: ArraySummary, + output: ArraySummary, + }, + ReduceAttempt { + array: ArraySummary, + rule: String, + outcome: AttemptOutcome, + }, + ReduceApplied { + array: ArraySummary, + rule: String, + output: ArraySummary, + }, + ParentReduceAttempt { + parent: ArraySummary, + child: ArraySummary, + slot_idx: usize, + source: TraceSource, + rule: String, + outcome: AttemptOutcome, + }, + ParentReduceApplied { + parent: ArraySummary, + child: ArraySummary, + slot_idx: usize, + source: TraceSource, + rule: String, + output: ArraySummary, + }, + ExecuteUntilStart { + target: String, + root: ArraySummary, + }, + ExecuteUntilIteration { + iteration: usize, + current: ArraySummary, + stack_parent: Option<(ArraySummary, usize)>, + builder_active: bool, + }, + ExecuteUntilDoneCheck { + target: bool, + canonical: bool, + }, + ExecuteUntilReturn { + output: ArraySummary, + }, + ExecuteUntilPopFrame { + parent: ArraySummary, + slot_idx: usize, + child: ArraySummary, + output: ArraySummary, + }, + ExecuteParentAttempt { + phase: &'static str, + parent: ArraySummary, + child: ArraySummary, + slot_idx: usize, + source: TraceSource, + kernel: String, + outcome: AttemptOutcome, + }, + ExecuteParentApplied { + phase: &'static str, + parent: ArraySummary, + child: ArraySummary, + slot_idx: usize, + source: TraceSource, + kernel: String, + output: ArraySummary, + }, + PhaseNone { + indent: usize, + phase: &'static str, + subject: &'static str, + array: ArraySummary, + }, + ExecuteOptimized { + input: ArraySummary, + output: ArraySummary, + changed: bool, + }, + ExecuteEncoding { + array: ArraySummary, + }, + ExecutionRequest { + step: &'static str, + parent: ArraySummary, + slot_idx: usize, + target: Option, + }, + SlotTransition { + step: &'static str, + slot_idx: usize, + parent: ArraySummary, + child: ArraySummary, + }, + BuilderEvent { + action: &'static str, + subject: &'static str, + array: ArraySummary, + }, + ExecuteDone { + array: ArraySummary, + }, + SingleStepStart { + array: ArraySummary, + }, + SingleStepApplied { + phase: &'static str, + input: ArraySummary, + output: ArraySummary, + }, +} + +impl TraceEvent { + fn is_hidden(&self, resolution: TraceResolution) -> bool { + match resolution { + TraceResolution::Attempts => matches!(self, TraceEvent::OptimizeLoopEnd), + TraceResolution::ExecutedOnly => matches!( + self, + TraceEvent::OptimizeLoopStart { .. } + | TraceEvent::OptimizeLoopEnd + | TraceEvent::PhaseNone { .. } + | TraceEvent::ExecuteUntilDoneCheck { .. } + | TraceEvent::ExecuteEncoding { .. } + | TraceEvent::ExecutionRequest { .. } + | TraceEvent::ExecuteOptimized { changed: false, .. } + | TraceEvent::ExecuteParentAttempt { .. } + | TraceEvent::ReduceAttempt { .. } + | TraceEvent::ParentReduceAttempt { .. } + ), + } + } + + fn opens_after(&self, resolution: TraceResolution) -> bool { + match resolution { + TraceResolution::Attempts => matches!( + self, + TraceEvent::OptimizeStart { .. } | TraceEvent::OptimizeLoopStart { .. } + ), + TraceResolution::ExecutedOnly => matches!(self, TraceEvent::OptimizeStart { .. }), + } + } + + fn closes_before(&self, resolution: TraceResolution) -> bool { + match resolution { + TraceResolution::Attempts => matches!(self, TraceEvent::OptimizeLoopEnd), + TraceResolution::ExecutedOnly => false, + } + } + + fn closes_after(&self, _resolution: TraceResolution) -> bool { + matches!(self, TraceEvent::OptimizeDone { .. }) + } + + fn relative_indent(&self, _resolution: TraceResolution, in_optimize_scope: bool) -> usize { + match self { + TraceEvent::OptimizeStart { .. } + | TraceEvent::OptimizeLoopStart { .. } + | TraceEvent::OptimizeDone { .. } => 0, + TraceEvent::ReduceAttempt { .. } + | TraceEvent::ReduceApplied { .. } + | TraceEvent::ParentReduceAttempt { .. } + | TraceEvent::ParentReduceApplied { .. } + if in_optimize_scope => + { + 0 + } + TraceEvent::PhaseNone { indent, .. } => *indent, + TraceEvent::ReduceAttempt { .. } + | TraceEvent::ReduceApplied { .. } + | TraceEvent::ParentReduceAttempt { .. } + | TraceEvent::ParentReduceApplied { .. } + | TraceEvent::ExecuteUntilDoneCheck { .. } + | TraceEvent::ExecuteUntilPopFrame { .. } + | TraceEvent::ExecuteParentAttempt { .. } + | TraceEvent::ExecuteParentApplied { .. } + | TraceEvent::ExecuteOptimized { .. } + | TraceEvent::ExecuteEncoding { .. } + | TraceEvent::ExecutionRequest { .. } + | TraceEvent::SlotTransition { .. } + | TraceEvent::BuilderEvent { .. } + | TraceEvent::ExecuteDone { .. } => 2, + TraceEvent::OptimizeRecursiveSlot { .. } + | TraceEvent::ExecuteUntilIteration { .. } + | TraceEvent::ExecuteUntilReturn { .. } + | TraceEvent::SingleStepApplied { .. } => 1, + TraceEvent::OptimizeLoopEnd + | TraceEvent::OptimizeRecursiveStart { .. } + | TraceEvent::ExecuteUntilStart { .. } + | TraceEvent::SingleStepStart { .. } => 0, + } + } + + fn fmt_line(&self, f: &mut fmt::Formatter<'_>, resolution: TraceResolution) -> fmt::Result { + match self { + TraceEvent::OptimizeStart { root, session } => { + write!(f, "optimize root={root} session={session}") + } + TraceEvent::OptimizeLoopStart { array } => { + write!(f, "loop input={array}") + } + TraceEvent::OptimizeLoopEnd => Ok(()), + TraceEvent::OptimizeDone { output, changed } => match resolution { + TraceResolution::Attempts => write!(f, "done output={output} changed={changed}"), + TraceResolution::ExecutedOnly => write!(f, "done output={output}"), + }, + TraceEvent::OptimizeRecursiveStart { root } => { + write!(f, "optimize_recursive root={root}") + } + TraceEvent::OptimizeRecursiveSlot { + slot_idx, + input, + output, + } => write!(f, "recursive slot={slot_idx} input={input} output={output}"), + TraceEvent::ReduceAttempt { + array, + rule, + outcome, + } => write!( + f, + "reduce attempt array={array} source=static rule={rule} outcome={outcome}" + ), + TraceEvent::ReduceApplied { + array, + rule, + output, + } => match resolution { + TraceResolution::Attempts => write!( + f, + "reduce applied array={array} source=static rule={rule} output={output}" + ), + TraceResolution::ExecutedOnly => { + write!(f, "reduce {rule}: {array} -> {output}") + } + }, + TraceEvent::ParentReduceAttempt { + parent, + child, + slot_idx, + source, + rule, + outcome, + } => write!( + f, + "reduce_parent attempt slot={slot_idx} parent={parent} child={child} source={source} rule={rule} outcome={outcome}" + ), + TraceEvent::ParentReduceApplied { + parent, + child, + slot_idx, + source, + rule, + output, + } => match resolution { + TraceResolution::Attempts => write!( + f, + "reduce_parent applied slot={slot_idx} parent={parent} child={child} source={source} rule={rule} output={output}" + ), + TraceResolution::ExecutedOnly => write!( + f, + "reduce_parent {source}:{rule} slot={slot_idx} parent={parent} child={child} -> {output}" + ), + }, + TraceEvent::ExecuteUntilStart { target, root } => { + write!(f, "execute_until target={target} root={root}") + } + TraceEvent::ExecuteUntilIteration { + iteration, + current, + stack_parent, + builder_active, + } => { + write!(f, "iter {iteration} current={current}")?; + if let Some((parent, slot_idx)) = stack_parent { + write!(f, " stack_parent={parent} slot={slot_idx}")?; + } + write!(f, " builder_active={builder_active}") + } + TraceEvent::ExecuteUntilDoneCheck { target, canonical } => { + write!(f, "done_check target={target} canonical={canonical}") + } + TraceEvent::ExecuteUntilReturn { output } => { + write!(f, "return output={output}") + } + TraceEvent::ExecuteUntilPopFrame { + parent, + slot_idx, + child, + output, + } => write!( + f, + "pop_frame slot={slot_idx} parent={parent} child={child} output={output}" + ), + TraceEvent::ExecuteParentAttempt { + phase, + parent, + child, + slot_idx, + source, + kernel, + outcome, + } => write!( + f, + "{phase} attempt slot={slot_idx} parent={parent} child={child} source={source} kernel={kernel} outcome={outcome}" + ), + TraceEvent::ExecuteParentApplied { + phase, + parent, + child, + slot_idx, + source, + kernel, + output, + } => match resolution { + TraceResolution::Attempts => write!( + f, + "{phase} applied slot={slot_idx} parent={parent} child={child} source={source} kernel={kernel} output={output}" + ), + TraceResolution::ExecutedOnly => write!( + f, + "{phase} {source}:{kernel} slot={slot_idx} parent={parent} child={child} -> {output}" + ), + }, + TraceEvent::PhaseNone { + phase, + subject, + array, + .. + } => { + write!(f, "{phase} none {subject}={array}") + } + TraceEvent::ExecuteOptimized { + input, + output, + changed, + } => match resolution { + TraceResolution::Attempts => write!( + f, + "optimize_ctx input={input} output={output} changed={changed}" + ), + TraceResolution::ExecutedOnly => write!(f, "optimize_ctx {input} -> {output}"), + }, + TraceEvent::ExecuteEncoding { array } => { + write!(f, "execute encoding={array}") + } + TraceEvent::ExecutionRequest { + step, + parent, + slot_idx, + target, + } => { + write!(f, "request {step} slot={slot_idx}")?; + if let Some(target) = target { + write!(f, " target={target}")?; + } + write!(f, " parent={parent}") + } + TraceEvent::SlotTransition { + step, + slot_idx, + parent, + child, + } => write!(f, "{step} slot={slot_idx} parent={parent} child={child}"), + TraceEvent::ExecuteDone { array } => { + write!(f, "Done array={array}") + } + TraceEvent::BuilderEvent { + action, + subject, + array, + } => { + write!(f, "builder {action} {subject}={array}") + } + TraceEvent::SingleStepStart { array } => { + write!(f, "execute_step input={array}") + } + TraceEvent::SingleStepApplied { + phase, + input, + output, + } => match resolution { + TraceResolution::Attempts => { + write!(f, "{phase} applied input={input} output={output}") + } + TraceResolution::ExecutedOnly => write!(f, "{phase} {input} -> {output}"), + }, + } + } +} + +#[cfg(test)] +mod tests { + use vortex_error::VortexResult; + use vortex_mask::Mask; + use vortex_session::VortexSession; + + use crate::Canonical; + use crate::ExecutionCtx; + use crate::IntoArray; + use crate::arrays::ChunkedArray; + use crate::arrays::Filter; + use crate::arrays::FilterArray; + use crate::arrays::Primitive; + use crate::arrays::PrimitiveArray; + use crate::arrays::filter::FilterArrayExt; + use crate::assert_arrays_eq; + use crate::optimizer::ArrayOptimizer; + use crate::session::ArraySession; + use crate::test_harness::trace::TraceOptions; + use crate::test_harness::trace::TraceResolution; + use crate::test_harness::trace::trace_array; + use crate::test_harness::trace::trace_array_with; + use crate::test_harness::trace_arrays::stack_parent_fixture; + + #[test] + fn trace_optimize_reduce_fixpoint() -> VortexResult<()> { + let values = PrimitiveArray::from_iter([0i32, 1, 2, 3]).into_array(); + let filter = + FilterArray::try_new(values.clone(), Mask::new_true(values.len()))?.into_array(); + + let traced = trace_array(|| filter.optimize())?; + + assert!(traced.output.is::()); + assert_arrays_eq!(traced.output, values); + insta::assert_snapshot!(traced.trace.to_string(), @r" +optimize root=vortex.filter len=4 dtype=i32 session=false + reduce TrivialFilterRule: vortex.filter len=4 dtype=i32 -> vortex.primitive len=4 dtype=i32 + done output=vortex.primitive len=4 dtype=i32 +"); + + Ok(()) + } + + #[test] + fn trace_optimize_parent_reduce_fixpoint_attempts() -> VortexResult<()> { + let values = PrimitiveArray::from_iter([0i32, 1, 2, 3, 4, 5]).into_array(); + let inner = FilterArray::try_new( + values, + Mask::from_iter([true, false, true, true, false, true]), + )? + .into_array(); + let outer = + FilterArray::try_new(inner, Mask::from_iter([false, true, true, false]))?.into_array(); + + let traced = trace_array_with( + TraceOptions { + resolution: TraceResolution::ExecutedOnly, + }, + || outer.optimize(), + )?; + + let optimized_filter = traced.output.as_::(); + assert!(optimized_filter.child().is::()); + assert_arrays_eq!(traced.output, PrimitiveArray::from_iter([2i32, 3])); + insta::assert_snapshot!(traced.trace.to_string(), @r" +optimize root=vortex.filter len=2 dtype=i32 session=false + reduce_parent static:FilterFilterRule slot=0 parent=vortex.filter len=2 dtype=i32 child=vortex.filter len=4 dtype=i32 -> vortex.filter len=2 dtype=i32 + done output=vortex.filter len=2 dtype=i32 +"); + + let mut ctx = ExecutionCtx::new(VortexSession::empty().with::()); + let traced = trace_array_with( + TraceOptions { + resolution: TraceResolution::ExecutedOnly, + }, + || outer.execute::(&mut ctx), + )?; + + insta::assert_snapshot!(traced.trace.to_string(), @r" +execute_until target=AnyCanonical root=vortex.filter len=2 dtype=i32 + iter 1 current=vortex.filter len=2 dtype=i32 builder_active=false + ExecuteSlot slot=0 parent=vortex.filter len=2 dtype=i32 child=vortex.filter len=4 dtype=i32 + iter 2 current=vortex.filter len=4 dtype=i32 stack_parent=vortex.filter len=2 dtype=i32 slot=0 builder_active=false + Done array=vortex.primitive len=4 dtype=i32 + iter 3 current=vortex.primitive len=4 dtype=i32 stack_parent=vortex.filter len=2 dtype=i32 slot=0 builder_active=false + pop_frame slot=0 parent=vortex.filter len=2 dtype=i32 child=vortex.primitive len=4 dtype=i32 output=vortex.filter len=2 dtype=i32 + iter 4 current=vortex.filter len=2 dtype=i32 builder_active=false + Done array=vortex.primitive len=2 dtype=i32 + iter 5 current=vortex.primitive len=2 dtype=i32 builder_active=false + return output=vortex.primitive len=2 dtype=i32 +"); + + Ok(()) + } + + #[test] + fn trace_execution_stack_parent_kernel_attempts() -> VortexResult<()> { + let mut ctx = ExecutionCtx::new(VortexSession::empty().with::()); + let parent = stack_parent_fixture()?; + + let traced = trace_array_with( + TraceOptions { + resolution: TraceResolution::Attempts, + }, + || parent.execute::(&mut ctx), + )?; + + assert_arrays_eq!(traced.output, PrimitiveArray::from_iter([1i32, 2, 3])); + insta::assert_snapshot!(traced.trace.to_string(), @r" +execute_until target=AnyCanonical root=vortex.test.stack-parent len=3 dtype=i32 + iter 1 current=vortex.test.stack-parent len=3 dtype=i32 builder_active=false + done_check target=false canonical=false + child_execute_parent attempt slot=0 parent=vortex.test.stack-parent len=3 dtype=i32 child=vortex.test.stack-child len=3 dtype=i32 source=static kernel=kernel[0] outcome=declined + child_execute_parent attempt slot=0 parent=vortex.test.stack-parent len=3 dtype=i32 child=vortex.test.stack-child len=3 dtype=i32 source=static kernel=kernel[1] outcome=declined + child_execute_parent none current=vortex.test.stack-parent len=3 dtype=i32 + execute encoding=vortex.test.stack-parent len=3 dtype=i32 + request ExecuteSlot slot=0 target=Primitive parent=vortex.test.stack-parent len=3 dtype=i32 + ExecuteSlot slot=0 parent=vortex.test.stack-parent len=3 dtype=i32 child=vortex.test.stack-child len=3 dtype=i32 + iter 2 current=vortex.test.stack-child len=3 dtype=i32 stack_parent=vortex.test.stack-parent len=3 dtype=i32 slot=0 builder_active=false + done_check target=false canonical=false + stack_execute_parent attempt slot=0 parent=vortex.test.stack-parent len=3 dtype=i32 child=vortex.test.stack-child len=3 dtype=i32 source=static kernel=kernel[0] outcome=declined + stack_execute_parent applied slot=0 parent=vortex.test.stack-parent len=3 dtype=i32 child=vortex.test.stack-child len=3 dtype=i32 source=static kernel=kernel[1] output=vortex.primitive len=3 dtype=i32 +optimize root=vortex.primitive len=3 dtype=i32 session=true + loop input=vortex.primitive len=3 dtype=i32 + reduce none array=vortex.primitive len=3 dtype=i32 + reduce_parent none array=vortex.primitive len=3 dtype=i32 + done output=vortex.primitive len=3 dtype=i32 changed=false + optimize_ctx input=vortex.primitive len=3 dtype=i32 output=vortex.primitive len=3 dtype=i32 changed=false + iter 3 current=vortex.primitive len=3 dtype=i32 builder_active=false + done_check target=true canonical=true + return output=vortex.primitive len=3 dtype=i32 +"); + + Ok(()) + } + + #[test] + fn trace_execution_chunked_append_child_flow() -> VortexResult<()> { + let chunks = vec![ + PrimitiveArray::from_iter([1i32, 2]).into_array(), + PrimitiveArray::from_iter([3i32]).into_array(), + PrimitiveArray::from_iter([4i32, 5]).into_array(), + ]; + let dtype = chunks[0].dtype().clone(); + let chunked = ChunkedArray::try_new(chunks, dtype)?.into_array(); + let mut ctx = ExecutionCtx::new(VortexSession::empty().with::()); + + let traced = trace_array(|| { + chunked + .execute::(&mut ctx) + .map(IntoArray::into_array) + })?; + + assert_arrays_eq!(traced.output, PrimitiveArray::from_iter([1i32, 2, 3, 4, 5])); + insta::assert_snapshot!(traced.trace.to_string(), @r" +execute_until target=AnyCanonical root=vortex.chunked len=5 dtype=i32 + iter 1 current=vortex.chunked len=5 dtype=i32 builder_active=false + builder start array=vortex.chunked len=5 dtype=i32 + AppendChild slot=1 parent=vortex.chunked len=5 dtype=i32 child=vortex.primitive len=2 dtype=i32 + builder append child=vortex.primitive len=2 dtype=i32 +execute_until target=AnyCanonical root=vortex.primitive len=2 dtype=i32 + iter 1 current=vortex.primitive len=2 dtype=i32 builder_active=false + return output=vortex.primitive len=2 dtype=i32 +execute_until target=AnyCanonical root=vortex.primitive len=2 dtype=i32 + iter 1 current=vortex.primitive len=2 dtype=i32 builder_active=false + return output=vortex.primitive len=2 dtype=i32 + iter 2 current=vortex.chunked len=5 dtype=i32 builder_active=true + AppendChild slot=2 parent=vortex.chunked len=5 dtype=i32 child=vortex.primitive len=1 dtype=i32 + builder append child=vortex.primitive len=1 dtype=i32 +execute_until target=AnyCanonical root=vortex.primitive len=1 dtype=i32 + iter 1 current=vortex.primitive len=1 dtype=i32 builder_active=false + return output=vortex.primitive len=1 dtype=i32 +execute_until target=AnyCanonical root=vortex.primitive len=1 dtype=i32 + iter 1 current=vortex.primitive len=1 dtype=i32 builder_active=false + return output=vortex.primitive len=1 dtype=i32 + iter 3 current=vortex.chunked len=5 dtype=i32 builder_active=true + AppendChild slot=3 parent=vortex.chunked len=5 dtype=i32 child=vortex.primitive len=2 dtype=i32 + builder append child=vortex.primitive len=2 dtype=i32 +execute_until target=AnyCanonical root=vortex.primitive len=2 dtype=i32 + iter 1 current=vortex.primitive len=2 dtype=i32 builder_active=false + return output=vortex.primitive len=2 dtype=i32 +execute_until target=AnyCanonical root=vortex.primitive len=2 dtype=i32 + iter 1 current=vortex.primitive len=2 dtype=i32 builder_active=false + return output=vortex.primitive len=2 dtype=i32 + iter 4 current=vortex.chunked len=5 dtype=i32 builder_active=true + Done array=vortex.primitive len=0 dtype=i32 + builder finish output=vortex.primitive len=5 dtype=i32 + iter 5 current=vortex.primitive len=5 dtype=i32 builder_active=false + return output=vortex.primitive len=5 dtype=i32 +"); + + Ok(()) + } +} diff --git a/vortex-array/src/test_harness/trace_arrays.rs b/vortex-array/src/test_harness/trace_arrays.rs new file mode 100644 index 00000000000..ff795cd95ab --- /dev/null +++ b/vortex-array/src/test_harness/trace_arrays.rs @@ -0,0 +1,332 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! Small encodings used by trace snapshot tests. + +use std::fmt::Display; +use std::fmt::Formatter; +use std::hash::Hasher; + +use vortex_error::VortexResult; +use vortex_error::vortex_bail; +use vortex_error::vortex_ensure; +use vortex_error::vortex_panic; +use vortex_session::VortexSession; +use vortex_session::registry::CachedId; + +use crate::ArrayEq; +use crate::ArrayHash; +use crate::ArrayRef; +use crate::ExecutionCtx; +use crate::ExecutionResult; +use crate::IntoArray; +use crate::Precision; +use crate::VTable; +use crate::array::Array; +use crate::array::ArrayId; +use crate::array::ArrayParts; +use crate::array::ArrayView; +use crate::array::vtable::NotSupported; +use crate::array::vtable::ValidityVTable; +use crate::arrays::Primitive; +use crate::arrays::PrimitiveArray; +use crate::buffer::BufferHandle; +use crate::dtype::DType; +use crate::dtype::Nullability; +use crate::dtype::PType; +use crate::kernel::ExecuteParentKernel; +use crate::kernel::ParentKernelSet; +use crate::matcher::Matcher; +use crate::serde::ArrayChildren; +use crate::validity::Validity; + +/// Create a `StackParent(StackChild)` fixture. +/// +/// `StackParent` requests `ExecuteSlot` until its child is `Primitive`. `StackChild` has one +/// declined parent kernel followed by one successful parent kernel, so strict trace snapshots can +/// assert that stack parent kernels run before the child decodes itself. +pub fn stack_parent_fixture() -> VortexResult { + stack_parent(stack_child()?) +} + +/// Create the child encoding used by [`stack_parent_fixture`]. +pub fn stack_child() -> VortexResult { + Ok( + Array::try_from_parts(ArrayParts::new(StackChild, test_dtype(), 3, StackChildData))? + .into_array(), + ) +} + +/// Wrap `child` in the parent encoding used by [`stack_parent_fixture`]. +pub fn stack_parent(child: ArrayRef) -> VortexResult { + Ok(Array::try_from_parts( + ArrayParts::new( + StackParent, + child.dtype().clone(), + child.len(), + StackParentData, + ) + .with_slots(vec![Some(child)]), + )? + .into_array()) +} + +fn test_dtype() -> DType { + DType::Primitive(PType::I32, Nullability::NonNullable) +} + +#[derive(Clone, Debug)] +struct StackParent; + +#[derive(Clone, Debug)] +struct StackParentData; + +impl Display for StackParentData { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.write_str("stack-parent") + } +} + +impl ArrayHash for StackParentData { + fn array_hash(&self, _state: &mut H, _precision: Precision) {} +} + +impl ArrayEq for StackParentData { + fn array_eq(&self, _other: &Self, _precision: Precision) -> bool { + true + } +} + +impl ValidityVTable for StackParent { + fn validity(_array: ArrayView<'_, StackParent>) -> VortexResult { + Ok(Validity::NonNullable) + } +} + +impl VTable for StackParent { + type TypedArrayData = StackParentData; + type OperationsVTable = NotSupported; + type ValidityVTable = Self; + + fn id(&self) -> ArrayId { + static ID: CachedId = CachedId::new("vortex.test.stack-parent"); + *ID + } + + fn validate( + &self, + _data: &Self::TypedArrayData, + dtype: &DType, + len: usize, + slots: &[Option], + ) -> VortexResult<()> { + vortex_ensure!(dtype == &test_dtype(), "unexpected stack parent dtype"); + vortex_ensure!(len == 3, "unexpected stack parent length"); + vortex_ensure!(slots.len() == 1, "stack parent must have one child slot"); + let Some(child) = &slots[0] else { + vortex_bail!("stack parent child slot is missing"); + }; + vortex_ensure!(child.dtype() == dtype, "stack parent child dtype mismatch"); + vortex_ensure!(child.len() == len, "stack parent child length mismatch"); + Ok(()) + } + + fn nbuffers(_array: ArrayView<'_, Self>) -> usize { + 0 + } + + fn buffer(_array: ArrayView<'_, Self>, idx: usize) -> BufferHandle { + vortex_panic!("StackParent buffer index {idx} out of bounds") + } + + fn buffer_name(_array: ArrayView<'_, Self>, _idx: usize) -> Option { + None + } + + fn serialize( + _array: ArrayView<'_, Self>, + _session: &VortexSession, + ) -> VortexResult>> { + Ok(None) + } + + fn deserialize( + &self, + _dtype: &DType, + _len: usize, + _metadata: &[u8], + _buffers: &[BufferHandle], + _children: &dyn ArrayChildren, + _session: &VortexSession, + ) -> VortexResult> { + vortex_bail!("StackParent cannot be deserialized") + } + + fn slot_name(_array: ArrayView<'_, Self>, idx: usize) -> String { + match idx { + 0 => "child".to_string(), + _ => vortex_panic!("StackParent slot index {idx} out of bounds"), + } + } + + fn execute(array: Array, _ctx: &mut ExecutionCtx) -> VortexResult { + let Some(child) = array.slots()[0].as_ref() else { + vortex_bail!("stack parent child slot is missing"); + }; + if !child.is::() { + return Ok(ExecutionResult::execute_slot::(array, 0)); + } + + Ok(ExecutionResult::done(child.clone())) + } +} + +#[derive(Clone, Debug)] +struct StackChild; + +#[derive(Clone, Debug)] +struct StackChildData; + +impl Display for StackChildData { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.write_str("stack-child") + } +} + +impl ArrayHash for StackChildData { + fn array_hash(&self, _state: &mut H, _precision: Precision) {} +} + +impl ArrayEq for StackChildData { + fn array_eq(&self, _other: &Self, _precision: Precision) -> bool { + true + } +} + +impl ValidityVTable for StackChild { + fn validity(_array: ArrayView<'_, StackChild>) -> VortexResult { + Ok(Validity::NonNullable) + } +} + +impl VTable for StackChild { + type TypedArrayData = StackChildData; + type OperationsVTable = NotSupported; + type ValidityVTable = Self; + + fn id(&self) -> ArrayId { + static ID: CachedId = CachedId::new("vortex.test.stack-child"); + *ID + } + + fn validate( + &self, + _data: &Self::TypedArrayData, + dtype: &DType, + len: usize, + slots: &[Option], + ) -> VortexResult<()> { + vortex_ensure!(dtype == &test_dtype(), "unexpected stack child dtype"); + vortex_ensure!(len == 3, "unexpected stack child length"); + vortex_ensure!(slots.is_empty(), "stack child must not have slots"); + Ok(()) + } + + fn nbuffers(_array: ArrayView<'_, Self>) -> usize { + 0 + } + + fn buffer(_array: ArrayView<'_, Self>, idx: usize) -> BufferHandle { + vortex_panic!("StackChild buffer index {idx} out of bounds") + } + + fn buffer_name(_array: ArrayView<'_, Self>, _idx: usize) -> Option { + None + } + + fn serialize( + _array: ArrayView<'_, Self>, + _session: &VortexSession, + ) -> VortexResult>> { + Ok(None) + } + + fn deserialize( + &self, + _dtype: &DType, + _len: usize, + _metadata: &[u8], + _buffers: &[BufferHandle], + _children: &dyn ArrayChildren, + _session: &VortexSession, + ) -> VortexResult> { + vortex_bail!("StackChild cannot be deserialized") + } + + fn slot_name(_array: ArrayView<'_, Self>, idx: usize) -> String { + vortex_panic!("StackChild slot index {idx} out of bounds") + } + + fn execute(array: Array, _ctx: &mut ExecutionCtx) -> VortexResult { + debug_assert!(array.slots().is_empty()); + Ok(ExecutionResult::done(PrimitiveArray::from_iter([ + 99i32, 99, 99, + ]))) + } + + fn execute_parent( + array: ArrayView<'_, Self>, + parent: &ArrayRef, + child_idx: usize, + ctx: &mut ExecutionCtx, + ) -> VortexResult> { + STACK_CHILD_PARENT_KERNELS.execute(array, parent, child_idx, ctx) + } +} + +const STACK_CHILD_PARENT_KERNELS: ParentKernelSet = ParentKernelSet::new(&[ + ParentKernelSet::lift(&StackDeclineKernel), + ParentKernelSet::lift(&StackParentKernel), +]); + +#[derive(Debug)] +struct StackDeclineKernel; + +impl ExecuteParentKernel for StackDeclineKernel { + type Parent = StackParent; + + fn execute_parent( + &self, + _array: ArrayView<'_, StackChild>, + _parent: ::Match<'_>, + _child_idx: usize, + _ctx: &mut ExecutionCtx, + ) -> VortexResult> { + Ok(None) + } +} + +#[derive(Debug)] +struct StackParentKernel; + +impl ExecuteParentKernel for StackParentKernel { + type Parent = StackParent; + + fn execute_parent( + &self, + _array: ArrayView<'_, StackChild>, + parent: ::Match<'_>, + child_idx: usize, + _ctx: &mut ExecutionCtx, + ) -> VortexResult> { + if parent + .slots() + .get(child_idx) + .is_some_and(|slot| slot.is_none()) + { + return Ok(Some(PrimitiveArray::from_iter([1i32, 2, 3]).into_array())); + } + + Ok(None) + } +} diff --git a/vortex-array/src/trace_macros.rs b/vortex-array/src/trace_macros.rs new file mode 100644 index 00000000000..6b39ac99b1a --- /dev/null +++ b/vortex-array/src/trace_macros.rs @@ -0,0 +1,63 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +#[cfg(any(test, feature = "_test-harness"))] +macro_rules! trace_array { + ($($event:tt)*) => { + $crate::test_harness::trace::if_active( + || $crate::test_harness::trace::$($event)*, + || {}, + ) + }; +} + +#[cfg(not(any(test, feature = "_test-harness")))] +macro_rules! trace_array { + ($($event:tt)*) => {{}}; +} + +#[cfg(any(test, feature = "_test-harness"))] +macro_rules! trace_array_value { + ($enabled:expr, $disabled:expr) => { + $crate::test_harness::trace::if_active(|| $enabled, || $disabled) + }; +} + +#[cfg(not(any(test, feature = "_test-harness")))] +macro_rules! trace_array_value { + ($enabled:expr, $disabled:expr) => { + $disabled + }; +} + +#[cfg(any(test, feature = "_test-harness"))] +macro_rules! trace_array_scope { + ($phase:expr, || $body:expr) => { + $crate::test_harness::trace::with_execute_parent_phase_if_active($phase, || $body) + }; +} + +#[cfg(not(any(test, feature = "_test-harness")))] +macro_rules! trace_array_scope { + ($phase:expr, || $body:expr) => {{ + let _ = $phase; + $body + }}; +} + +#[cfg(any(test, feature = "_test-harness"))] +macro_rules! trace_array_use { + ($($value:expr),* $(,)?) => {{}}; +} + +#[cfg(not(any(test, feature = "_test-harness")))] +macro_rules! trace_array_use { + ($($value:expr),* $(,)?) => { + let _ = ($(&$value),*); + }; +} + +pub(crate) use trace_array; +pub(crate) use trace_array_scope; +pub(crate) use trace_array_use; +pub(crate) use trace_array_value; From b21cd0f17b2dd6fe3882dba73244c3e9ffb1cb47 Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Wed, 6 May 2026 21:43:49 +0100 Subject: [PATCH 2/4] Maybe better perf? Signed-off-by: Adam Gutglick --- vortex-array/src/test_harness/trace.rs | 54 ++++++++++++++++++-------- vortex-array/src/trace_macros.rs | 13 ++++--- 2 files changed, 45 insertions(+), 22 deletions(-) diff --git a/vortex-array/src/test_harness/trace.rs b/vortex-array/src/test_harness/trace.rs index 45b5250e694..c6be23ba176 100644 --- a/vortex-array/src/test_harness/trace.rs +++ b/vortex-array/src/test_harness/trace.rs @@ -6,12 +6,16 @@ use std::cell::RefCell; use std::fmt; use std::fmt::Debug; use std::fmt::Display; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering; use vortex_error::VortexExpect; use vortex_error::VortexResult; use vortex_error::vortex_err; +use crate::ArrayId; use crate::ArrayRef; +use crate::dtype::DType; /// Controls how much rule and kernel resolution detail is captured. #[derive(Clone, Copy, Debug, Default, Eq, PartialEq)] @@ -138,6 +142,7 @@ pub fn trace_array_with( options: TraceOptions, f: impl FnOnce() -> VortexResult, ) -> VortexResult> { + let interest = TraceInterest::from(options.resolution); ACTIVE_TRACE.with(|active| { let mut active = active.borrow_mut(); if active.is_some() { @@ -146,9 +151,13 @@ pub fn trace_array_with( *active = Some(TraceRecorder::new(options)); Ok(()) })?; - TRACE_INTEREST.with(|interest| interest.set(TraceInterest::from(options.resolution))); + TRACE_INTEREST.with(|trace_interest| trace_interest.set(interest)); + ACTIVE_TRACE_COUNT.fetch_add(1, Ordering::Relaxed); + if interest == TraceInterest::Attempts { + ATTEMPTS_TRACE_COUNT.fetch_add(1, Ordering::Relaxed); + } - let guard = ActiveTraceGuard; + let guard = ActiveTraceGuard { interest }; let output = f(); let recorder = ACTIVE_TRACE.with(|active| { active @@ -165,18 +174,19 @@ pub fn trace_array_with( } /// Returns true when the current thread has an active trace recorder. -#[inline] +#[inline(always)] pub(crate) fn is_active() -> bool { + if ACTIVE_TRACE_COUNT.load(Ordering::Relaxed) == 0 { + return false; + } TRACE_INTEREST.with(|interest| interest.get().is_active()) } -#[inline] -pub(crate) fn if_active(enabled: impl FnOnce() -> R, disabled: impl FnOnce() -> R) -> R { - if is_active() { enabled() } else { disabled() } -} - -#[inline] +#[inline(always)] fn attempts_enabled() -> bool { + if ATTEMPTS_TRACE_COUNT.load(Ordering::Relaxed) == 0 { + return false; + } TRACE_INTEREST.with(|interest| interest.get() == TraceInterest::Attempts) } @@ -236,17 +246,17 @@ impl From for TraceInterest { #[derive(Clone, Debug, Eq, PartialEq)] pub(crate) struct ArraySummary { - encoding: String, + encoding: ArrayId, len: usize, - dtype: String, + dtype: DType, } impl ArraySummary { pub(crate) fn new(array: &ArrayRef) -> Self { Self { - encoding: array.encoding_id().to_string(), + encoding: array.encoding_id(), len: array.len(), - dtype: array.dtype().to_string(), + dtype: array.dtype().clone(), } } } @@ -638,10 +648,11 @@ fn record(event: TraceEvent) { } fn record_attempt(event: TraceEvent) { + if !attempts_enabled() { + return; + } ACTIVE_TRACE.with(|active| { - if let Some(recorder) = active.borrow_mut().as_mut() - && recorder.options.resolution == TraceResolution::Attempts - { + if let Some(recorder) = active.borrow_mut().as_mut() { recorder.events.push(event); } }); @@ -680,10 +691,19 @@ thread_local! { static EXECUTE_PARENT_PHASE: Cell<&'static str> = const { Cell::new("execute_parent") }; } -struct ActiveTraceGuard; +static ACTIVE_TRACE_COUNT: AtomicUsize = AtomicUsize::new(0); +static ATTEMPTS_TRACE_COUNT: AtomicUsize = AtomicUsize::new(0); + +struct ActiveTraceGuard { + interest: TraceInterest, +} impl Drop for ActiveTraceGuard { fn drop(&mut self) { + if self.interest == TraceInterest::Attempts { + ATTEMPTS_TRACE_COUNT.fetch_sub(1, Ordering::Relaxed); + } + ACTIVE_TRACE_COUNT.fetch_sub(1, Ordering::Relaxed); TRACE_INTEREST.with(|interest| interest.set(TraceInterest::Off)); ACTIVE_TRACE.with(|active| { active.borrow_mut().take(); diff --git a/vortex-array/src/trace_macros.rs b/vortex-array/src/trace_macros.rs index 6b39ac99b1a..c626834e107 100644 --- a/vortex-array/src/trace_macros.rs +++ b/vortex-array/src/trace_macros.rs @@ -4,10 +4,9 @@ #[cfg(any(test, feature = "_test-harness"))] macro_rules! trace_array { ($($event:tt)*) => { - $crate::test_harness::trace::if_active( - || $crate::test_harness::trace::$($event)*, - || {}, - ) + if $crate::test_harness::trace::is_active() { + $crate::test_harness::trace::$($event)* + } }; } @@ -19,7 +18,11 @@ macro_rules! trace_array { #[cfg(any(test, feature = "_test-harness"))] macro_rules! trace_array_value { ($enabled:expr, $disabled:expr) => { - $crate::test_harness::trace::if_active(|| $enabled, || $disabled) + if $crate::test_harness::trace::is_active() { + $enabled + } else { + $disabled + } }; } From 8a96458710abb8e440aa19e3292657a0bb4a7daa Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Thu, 7 May 2026 12:00:45 +0100 Subject: [PATCH 3/4] No codspeed Signed-off-by: Adam Gutglick --- vortex-array/src/test_harness.rs | 1 + vortex-array/src/trace_macros.rs | 16 ++++++++-------- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/vortex-array/src/test_harness.rs b/vortex-array/src/test_harness.rs index 2783c50d3f8..2395007650b 100644 --- a/vortex-array/src/test_harness.rs +++ b/vortex-array/src/test_harness.rs @@ -13,6 +13,7 @@ use crate::VortexSessionExecute; use crate::arrays::BoolArray; use crate::arrays::bool::BoolArrayExt; +#[cfg(not(codspeed))] pub mod trace; pub mod trace_arrays; diff --git a/vortex-array/src/trace_macros.rs b/vortex-array/src/trace_macros.rs index c626834e107..ac5c57d1c13 100644 --- a/vortex-array/src/trace_macros.rs +++ b/vortex-array/src/trace_macros.rs @@ -1,7 +1,7 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors -#[cfg(any(test, feature = "_test-harness"))] +#[cfg(all(any(test, feature = "_test-harness"), not(codspeed)))] macro_rules! trace_array { ($($event:tt)*) => { if $crate::test_harness::trace::is_active() { @@ -10,12 +10,12 @@ macro_rules! trace_array { }; } -#[cfg(not(any(test, feature = "_test-harness")))] +#[cfg(any(not(any(test, feature = "_test-harness")), codspeed))] macro_rules! trace_array { ($($event:tt)*) => {{}}; } -#[cfg(any(test, feature = "_test-harness"))] +#[cfg(all(any(test, feature = "_test-harness"), not(codspeed)))] macro_rules! trace_array_value { ($enabled:expr, $disabled:expr) => { if $crate::test_harness::trace::is_active() { @@ -26,21 +26,21 @@ macro_rules! trace_array_value { }; } -#[cfg(not(any(test, feature = "_test-harness")))] +#[cfg(any(not(any(test, feature = "_test-harness")), codspeed))] macro_rules! trace_array_value { ($enabled:expr, $disabled:expr) => { $disabled }; } -#[cfg(any(test, feature = "_test-harness"))] +#[cfg(all(any(test, feature = "_test-harness"), not(codspeed)))] macro_rules! trace_array_scope { ($phase:expr, || $body:expr) => { $crate::test_harness::trace::with_execute_parent_phase_if_active($phase, || $body) }; } -#[cfg(not(any(test, feature = "_test-harness")))] +#[cfg(any(not(any(test, feature = "_test-harness")), codspeed))] macro_rules! trace_array_scope { ($phase:expr, || $body:expr) => {{ let _ = $phase; @@ -48,12 +48,12 @@ macro_rules! trace_array_scope { }}; } -#[cfg(any(test, feature = "_test-harness"))] +#[cfg(all(any(test, feature = "_test-harness"), not(codspeed)))] macro_rules! trace_array_use { ($($value:expr),* $(,)?) => {{}}; } -#[cfg(not(any(test, feature = "_test-harness")))] +#[cfg(any(not(any(test, feature = "_test-harness")), codspeed))] macro_rules! trace_array_use { ($($value:expr),* $(,)?) => { let _ = ($(&$value),*); From 4f2f3bc26f8a7e18aa1a183fae47b8bda211b305 Mon Sep 17 00:00:00 2001 From: Adam Gutglick Date: Thu, 7 May 2026 18:38:38 +0100 Subject: [PATCH 4/4] less stuff Signed-off-by: Adam Gutglick --- vortex-array/src/executor.rs | 13 ++--- vortex-array/src/kernel.rs | 2 +- vortex-array/src/lib.rs | 3 -- vortex-array/src/optimizer/mod.rs | 2 +- vortex-array/src/trace_macros.rs | 83 ++++++++++++++----------------- 5 files changed, 45 insertions(+), 58 deletions(-) diff --git a/vortex-array/src/executor.rs b/vortex-array/src/executor.rs index 54eb46d3bc2..4abec168d3e 100644 --- a/vortex-array/src/executor.rs +++ b/vortex-array/src/executor.rs @@ -154,6 +154,7 @@ impl ArrayRef { /// partially consumes `current_array`: some slots already live in the builder, so a /// parent rewrite would observe inconsistent state and could discard accumulated builder /// data. + #[expect(clippy::cognitive_complexity)] pub fn execute_until(self, ctx: &mut ExecutionCtx) -> VortexResult { let mut current_array = self; let mut current_builder: Option> = None; @@ -163,7 +164,7 @@ impl ArrayRef { crate::trace_array!(record_execute_until_start::(¤t_array)); for iteration in 0..max_iterations { - crate::trace_array_use!(iteration); + crate::trace_array!(use(iteration)); crate::trace_array!(record_execute_until_iteration( iteration + 1, ¤t_array, @@ -192,17 +193,17 @@ impl ArrayRef { return Ok(current_array); } Some(frame) => { - let trace_pop_frame = crate::trace_array_value!( + let trace_pop_frame = crate::trace_array!(value( Some(( frame.parent_array.clone(), current_array.clone(), frame.slot_idx )), None::<(ArrayRef, ArrayRef, usize)> - ); + )); (current_array, current_builder) = pop_frame(frame, current_array)?; if let Some((parent_before, child_before, slot_idx)) = trace_pop_frame { - crate::trace_array_use!(parent_before, child_before, slot_idx,); + crate::trace_array!(use(parent_before, child_before, slot_idx,)); crate::trace_array!(record_execute_until_pop_frame( &parent_before, slot_idx, @@ -568,7 +569,7 @@ fn execute_parent_for_child( kernels.find_execute_parent(parent.encoding_id(), child.encoding_id()) { for (plugin_idx, plugin) in plugins.as_ref().iter().enumerate() { - crate::trace_array_use!(plugin_idx); + crate::trace_array!(use(plugin_idx)); if let Some(result) = plugin(child, parent, slot_idx, ctx)? { crate::trace_array!(record_execute_parent_applied( phase, @@ -593,7 +594,7 @@ fn execute_parent_for_child( } } - crate::trace_array_scope!(phase, || child.execute_parent(parent, slot_idx, ctx)) + crate::trace_array!(scope(phase, || child.execute_parent(parent, slot_idx, ctx))) } /// Try execute_parent on each occupied slot of the array. diff --git a/vortex-array/src/kernel.rs b/vortex-array/src/kernel.rs index 7bd4e81600d..2471fa6c161 100644 --- a/vortex-array/src/kernel.rs +++ b/vortex-array/src/kernel.rs @@ -65,7 +65,7 @@ impl ParentKernelSet { ctx: &mut ExecutionCtx, ) -> VortexResult> { for (kernel_idx, kernel) in self.kernels.iter().enumerate() { - crate::trace_array_use!(kernel_idx); + crate::trace_array!(use(kernel_idx)); if !kernel.matches(parent) { crate::trace_array!(record_execute_parent_attempt( crate::test_harness::trace::current_execute_parent_phase(), diff --git a/vortex-array/src/lib.rs b/vortex-array/src/lib.rs index a8e497c5f20..cce2592e9ab 100644 --- a/vortex-array/src/lib.rs +++ b/vortex-array/src/lib.rs @@ -35,9 +35,6 @@ pub mod aggregate_fn; pub mod aliases; mod trace_macros; pub(crate) use trace_macros::trace_array; -pub(crate) use trace_macros::trace_array_scope; -pub(crate) use trace_macros::trace_array_use; -pub(crate) use trace_macros::trace_array_value; mod array; pub mod arrays; pub mod arrow; diff --git a/vortex-array/src/optimizer/mod.rs b/vortex-array/src/optimizer/mod.rs index 3f07d5a0460..252923361eb 100644 --- a/vortex-array/src/optimizer/mod.rs +++ b/vortex-array/src/optimizer/mod.rs @@ -98,7 +98,7 @@ fn try_optimize( array_ref.find_reduce_parent(current_array.encoding_id(), child.encoding_id()) { for (plugin_idx, plugin) in plugins.as_ref().iter().enumerate() { - crate::trace_array_use!(plugin_idx); + crate::trace_array!(use(plugin_idx)); if let Some(new_array) = plugin(child, ¤t_array, slot_idx)? { crate::trace_array!(record_parent_reduce_applied( ¤t_array, diff --git a/vortex-array/src/trace_macros.rs b/vortex-array/src/trace_macros.rs index ac5c57d1c13..ea8f16ebc28 100644 --- a/vortex-array/src/trace_macros.rs +++ b/vortex-array/src/trace_macros.rs @@ -1,66 +1,55 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors -#[cfg(all(any(test, feature = "_test-harness"), not(codspeed)))] macro_rules! trace_array { - ($($event:tt)*) => { - if $crate::test_harness::trace::is_active() { - $crate::test_harness::trace::$($event)* + (@when_enabled { $($enabled:tt)* } else { $($disabled:tt)* }) => {{ + #[cfg(all(any(test, feature = "_test-harness"), not(codspeed)))] + { + $($enabled)* } - }; -} -#[cfg(any(not(any(test, feature = "_test-harness")), codspeed))] -macro_rules! trace_array { - ($($event:tt)*) => {{}}; -} + #[cfg(any(not(any(test, feature = "_test-harness")), codspeed))] + { + $($disabled)* + } + }}; -#[cfg(all(any(test, feature = "_test-harness"), not(codspeed)))] -macro_rules! trace_array_value { - ($enabled:expr, $disabled:expr) => { - if $crate::test_harness::trace::is_active() { - $enabled + (@if_active { $($enabled:tt)* } else { $($disabled:tt)* }) => { + $crate::trace_array!(@when_enabled { + if $crate::test_harness::trace::is_active() { + $($enabled)* + } else { + $($disabled)* + } } else { - $disabled - } + $($disabled)* + }) }; -} -#[cfg(any(not(any(test, feature = "_test-harness")), codspeed))] -macro_rules! trace_array_value { - ($enabled:expr, $disabled:expr) => { - $disabled + (use($($value:expr),* $(,)?)) => { + $crate::trace_array!(@when_enabled {} else { + let _ = ($(&$value),*); + }) }; -} -#[cfg(all(any(test, feature = "_test-harness"), not(codspeed)))] -macro_rules! trace_array_scope { - ($phase:expr, || $body:expr) => { - $crate::test_harness::trace::with_execute_parent_phase_if_active($phase, || $body) + (value($enabled:expr, $disabled:expr)) => { + $crate::trace_array!(@if_active { $enabled } else { $disabled }) }; -} - -#[cfg(any(not(any(test, feature = "_test-harness")), codspeed))] -macro_rules! trace_array_scope { - ($phase:expr, || $body:expr) => {{ - let _ = $phase; - $body - }}; -} -#[cfg(all(any(test, feature = "_test-harness"), not(codspeed)))] -macro_rules! trace_array_use { - ($($value:expr),* $(,)?) => {{}}; -} + (scope($phase:expr, || $body:expr)) => { + $crate::trace_array!(@when_enabled { + $crate::test_harness::trace::with_execute_parent_phase_if_active($phase, || $body) + } else {{ + let _ = $phase; + $body + }}) + }; -#[cfg(any(not(any(test, feature = "_test-harness")), codspeed))] -macro_rules! trace_array_use { - ($($value:expr),* $(,)?) => { - let _ = ($(&$value),*); + ($($event:tt)*) => { + $crate::trace_array!(@if_active { + $crate::test_harness::trace::$($event)* + } else {}) }; } pub(crate) use trace_array; -pub(crate) use trace_array_scope; -pub(crate) use trace_array_use; -pub(crate) use trace_array_value;