-
Notifications
You must be signed in to change notification settings - Fork 146
Expand file tree
/
Copy pathexecutor.rs
More file actions
401 lines (367 loc) · 15.4 KB
/
executor.rs
File metadata and controls
401 lines (367 loc) · 15.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors
use std::env::VarError;
use std::fmt;
use std::fmt::Display;
use std::sync::Arc;
use std::sync::LazyLock;
use std::sync::atomic::AtomicUsize;
use vortex_error::VortexExpect;
use vortex_error::VortexResult;
use vortex_error::vortex_bail;
use vortex_error::vortex_panic;
use vortex_session::VortexSession;
use crate::AnyCanonical;
use crate::ArrayRef;
use crate::Canonical;
use crate::DynArray;
use crate::IntoArray;
use crate::matcher::Matcher;
use crate::optimizer::ArrayOptimizer;
/// Maximum number of iterations to attempt when executing an array before giving up and returning
/// an error.
pub(crate) static MAX_ITERATIONS: LazyLock<usize> =
LazyLock::new(|| match std::env::var("VORTEX_MAX_ITERATIONS") {
Ok(val) => val
.parse::<usize>()
.unwrap_or_else(|e| vortex_panic!("VORTEX_MAX_ITERATIONS is not a valid usize: {e}")),
Err(VarError::NotPresent) => 128,
Err(VarError::NotUnicode(_)) => {
vortex_panic!("VORTEX_MAX_ITERATIONS is not a valid unicode string")
}
});
/// Marker trait for types that an [`ArrayRef`] can be executed into.
///
/// Implementors must provide an implementation of `execute` that takes
/// an [`ArrayRef`] and an [`ExecutionCtx`], and produces an instance of the
/// implementor type.
///
/// Users should use the `Array::execute` or `Array::execute_as` methods
pub trait Executable: Sized {
fn execute(array: ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<Self>;
}
impl dyn DynArray + '_ {
/// Execute this array to produce an instance of `E`.
///
/// See the [`Executable`] implementation for details on how this execution is performed.
pub fn execute<E: Executable>(self: Arc<Self>, ctx: &mut ExecutionCtx) -> VortexResult<E> {
E::execute(self, ctx)
}
/// Execute this array, labeling the execution step with a name for tracing.
pub fn execute_as<E: Executable>(
self: Arc<Self>,
_name: &'static str,
ctx: &mut ExecutionCtx,
) -> VortexResult<E> {
E::execute(self, ctx)
}
/// Iteratively execute this array until the [`Matcher`] matches, using an explicit work
/// stack.
///
/// The scheduler repeatedly:
/// 1. Checks if the current array matches `M` — if so, pops the stack or returns.
/// 2. Runs `execute_parent` on each child for child-driven optimizations.
/// 3. Calls `execute` which returns an [`ExecutionStep`].
///
/// Note: the returned array may not match `M`. If execution converges to a canonical form
/// that does not match `M`, the canonical array is returned since no further execution
/// progress is possible.
///
/// For safety, we will error when the number of execution iterations reaches a configurable
/// maximum (default 128, override with `VORTEX_MAX_ITERATIONS`).
pub fn execute_until<M: Matcher>(
self: Arc<Self>,
ctx: &mut ExecutionCtx,
) -> VortexResult<ArrayRef> {
static MAX_ITERATIONS: LazyLock<usize> =
LazyLock::new(|| match std::env::var("VORTEX_MAX_ITERATIONS") {
Ok(val) => val.parse::<usize>().unwrap_or_else(|e| {
vortex_panic!("VORTEX_MAX_ITERATIONS is not a valid usize: {e}")
}),
Err(VarError::NotPresent) => 128,
Err(VarError::NotUnicode(_)) => {
vortex_panic!("VORTEX_MAX_ITERATIONS is not a valid unicode string")
}
});
let mut current = self.optimize()?;
// Stack frames: (parent, child_idx, done_predicate_for_child)
let mut stack: Vec<(ArrayRef, usize, DonePredicate)> = Vec::new();
for _ in 0..*MAX_ITERATIONS {
// Check for termination: use the stack frame's done predicate, or the root matcher.
let is_done = stack
.last()
.map_or(M::matches as DonePredicate, |frame| frame.2);
if is_done(current.as_ref()) {
match stack.pop() {
None => {
ctx.log(format_args!("-> {}", current));
return Ok(current);
}
Some((parent, child_idx, _)) => {
current = parent.with_child(child_idx, current)?;
current = current.optimize()?;
continue;
}
}
}
// If we've reached canonical form, we can't execute any further regardless
// of whether the matcher matched.
if AnyCanonical::matches(current.as_ref()) {
match stack.pop() {
None => {
ctx.log(format_args!("-> canonical (unmatched) {}", current));
return Ok(current);
}
Some((parent, child_idx, _)) => {
current = parent.with_child(child_idx, current)?;
current = current.optimize()?;
continue;
}
}
}
// Try execute_parent (child-driven optimized execution)
if let Some(rewritten) = try_execute_parent(¤t, ctx)? {
ctx.log(format_args!(
"execute_parent rewrote {} -> {}",
current, rewritten
));
current = rewritten.optimize()?;
continue;
}
// Execute the array itself
match current.vtable().execute(¤t, ctx)? {
ExecutionStep::ExecuteChild(i, done) => {
let child = current
.nth_child(i)
.vortex_expect("ExecuteChild index in bounds");
ctx.log(format_args!(
"ExecuteChild({i}): pushing {}, focusing on {}",
current, child
));
stack.push((current, i, done));
current = child.optimize()?;
}
ExecutionStep::Done(result) => {
ctx.log(format_args!("Done: {} -> {}", current, result));
current = result;
}
}
}
vortex_bail!(
"Exceeded maximum execution iterations ({}) while executing array",
*MAX_ITERATIONS,
)
}
}
/// Execution context for batch CPU compute.
///
/// Accumulates a trace of execution steps. Individual steps are logged at TRACE level for
/// real-time following, and the full trace is dumped at DEBUG level when the context is dropped.
#[derive(Debug, Clone)]
pub struct ExecutionCtx {
id: usize,
session: VortexSession,
ops: Vec<String>,
}
impl ExecutionCtx {
/// Create a new execution context with the given session.
pub fn new(session: VortexSession) -> Self {
static EXEC_CTX_ID: AtomicUsize = AtomicUsize::new(0);
let id = EXEC_CTX_ID.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
Self {
id,
session,
ops: Vec::new(),
}
}
/// Get the session associated with this execution context.
pub fn session(&self) -> &VortexSession {
&self.session
}
/// Log an execution step at the current depth.
///
/// Steps are accumulated and dumped as a single trace on Drop at DEBUG level.
/// Individual steps are also logged at TRACE level for real-time following.
///
/// Use the [`format_args!`] macro to create the `msg` argument.
pub fn log(&mut self, msg: fmt::Arguments<'_>) {
if tracing::enabled!(tracing::Level::DEBUG) {
let formatted = format!(" - {msg}");
tracing::trace!("exec[{}]: {formatted}", self.id);
self.ops.push(formatted);
}
}
}
impl Display for ExecutionCtx {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "exec[{}]", self.id)
}
}
impl Drop for ExecutionCtx {
fn drop(&mut self) {
if !self.ops.is_empty() && tracing::enabled!(tracing::Level::DEBUG) {
// Unlike itertools `.format()` (panics in 0.14 on second format)
struct FmtOps<'a>(&'a [String]);
impl Display for FmtOps<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
for (i, op) in self.0.iter().enumerate() {
if i > 0 {
f.write_str("\n")?;
}
f.write_str(op)?;
}
Ok(())
}
}
tracing::debug!("exec[{}] trace:\n{}", self.id, FmtOps(&self.ops));
}
}
}
/// Executing an [`ArrayRef`] into an [`ArrayRef`] is the atomic execution loop within Vortex.
///
/// It attempts to take the smallest possible step of execution such that the returned array
/// is incrementally more "executed" than the input array. In other words, it is closer to becoming
/// a canonical array.
///
/// The execution steps are as follows:
/// 0. Check for canonical.
/// 1. Attempt to `reduce` the array with metadata-only optimizations.
/// 2. Attempt to call `reduce_parent` on each child.
/// 3. Attempt to call `execute_parent` on each child.
/// 4. Call `execute` on the array itself (which returns an [`ExecutionStep`]).
///
/// Most users will not call this method directly, instead preferring to specify an executable
/// target such as [`crate::Columnar`], [`Canonical`], or any of the canonical array types (such as
/// [`crate::arrays::PrimitiveArray`]).
impl Executable for ArrayRef {
fn execute(array: ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<Self> {
// 0. Check for canonical
if let Some(canonical) = array.as_opt::<AnyCanonical>() {
ctx.log(format_args!("-> canonical {}", array));
return Ok(Canonical::from(canonical).into_array());
}
// 1. reduce (metadata-only rewrites)
if let Some(reduced) = array.vtable().reduce(&array)? {
ctx.log(format_args!("reduce: rewrote {} -> {}", array, reduced));
reduced.statistics().inherit_from(array.statistics());
return Ok(reduced);
}
// 2. reduce_parent (child-driven metadata-only rewrites)
for child_idx in 0..array.nchildren() {
let child = array.nth_child(child_idx).vortex_expect("checked length");
if let Some(reduced_parent) = child.vtable().reduce_parent(&child, &array, child_idx)? {
ctx.log(format_args!(
"reduce_parent: child[{}]({}) rewrote {} -> {}",
child_idx,
child.encoding_id(),
array,
reduced_parent
));
reduced_parent.statistics().inherit_from(array.statistics());
return Ok(reduced_parent);
}
}
// 3. execute_parent (child-driven optimized execution)
for child_idx in 0..array.nchildren() {
let child = array.nth_child(child_idx).vortex_expect("checked length");
if let Some(executed_parent) = child
.vtable()
.execute_parent(&child, &array, child_idx, ctx)?
{
ctx.log(format_args!(
"execute_parent: child[{}]({}) rewrote {} -> {}",
child_idx,
child.encoding_id(),
array,
executed_parent
));
executed_parent
.statistics()
.inherit_from(array.statistics());
return Ok(executed_parent);
}
}
// 4. execute (returns an ExecutionStep)
ctx.log(format_args!("executing {}", array));
match array.vtable().execute(&array, ctx)? {
ExecutionStep::Done(result) => {
ctx.log(format_args!("-> {}", result.as_ref()));
Ok(result)
}
ExecutionStep::ExecuteChild(i, _) => {
// For single-step execution, handle ExecuteChild by executing the child,
// replacing it, and returning the updated array.
let child = array.nth_child(i).vortex_expect("valid child index");
let executed_child = child.execute::<ArrayRef>(ctx)?;
array.with_child(i, executed_child)
}
}
}
}
/// Try execute_parent on each child of the array.
fn try_execute_parent(array: &ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<Option<ArrayRef>> {
for child_idx in 0..array.nchildren() {
let child = array
.nth_child(child_idx)
.vortex_expect("checked nchildren");
if let Some(result) = child
.vtable()
.execute_parent(&child, array, child_idx, ctx)?
{
result.statistics().inherit_from(array.statistics());
return Ok(Some(result));
}
}
Ok(None)
}
/// A predicate that determines when an array has reached a desired form during execution.
pub type DonePredicate = fn(&dyn DynArray) -> bool;
/// The result of a single execution step on an array encoding.
///
/// Instead of recursively executing children, encodings return an `ExecutionStep` that tells the
/// scheduler what to do next. This enables the scheduler to manage execution iteratively using
/// an explicit work stack, run cross-step optimizations, and cache shared sub-expressions.
pub enum ExecutionStep {
/// Request that the scheduler execute child at the given index, using the provided
/// [`DonePredicate`] to determine when the child is "done", then replace the child in this
/// array and re-enter execution.
///
/// Between steps, the scheduler runs reduce/reduce_parent rules to fixpoint, enabling
/// cross-step optimization (e.g., pushing scalar functions through newly-decoded children).
///
/// Use [`ExecutionStep::execute_child`] instead of constructing this variant directly.
ExecuteChild(usize, DonePredicate),
/// Execution is complete. The result may be in any encoding — not necessarily canonical.
/// The scheduler will continue executing the result if it has not yet reached the target form.
Done(ArrayRef),
}
impl ExecutionStep {
/// Request execution of child at `child_idx` until it matches the given [`Matcher`].
pub fn execute_child<M: Matcher>(child_idx: usize) -> Self {
ExecutionStep::ExecuteChild(child_idx, M::matches)
}
/// Signal that execution is complete with the given result.
pub fn done(result: ArrayRef) -> Self {
ExecutionStep::Done(result)
}
}
impl fmt::Debug for ExecutionStep {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
ExecutionStep::ExecuteChild(idx, _) => {
f.debug_tuple("ExecuteChild").field(idx).finish()
}
ExecutionStep::Done(result) => f.debug_tuple("Done").field(result).finish(),
}
}
}
/// Extension trait for creating an execution context from a session.
pub trait VortexSessionExecute {
/// Create a new execution context from this session.
fn create_execution_ctx(&self) -> ExecutionCtx;
}
impl VortexSessionExecute for VortexSession {
fn create_execution_ctx(&self) -> ExecutionCtx {
ExecutionCtx::new(self.clone())
}
}