Skip to content

Commit 24baf98

Browse files
committed
claude plan
Signed-off-by: Adam Gutglick <adam@spiraldb.com>
1 parent 948cd04 commit 24baf98

3 files changed

Lines changed: 39 additions & 12 deletions

File tree

vortex-scan/src/fetch_plan.rs

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
// SPDX-License-Identifier: Apache-2.0
22
// SPDX-FileCopyrightText: Copyright the Vortex contributors
33

4+
use std::collections::BTreeSet;
45
use std::ops::Range;
56

67
use itertools::Itertools;
78
use vortex_array::dtype::DType;
89
use vortex_array::dtype::Field;
910
use vortex_array::dtype::FieldMask;
11+
use vortex_array::dtype::FieldName;
1012
use vortex_array::dtype::FieldNames;
1113
use vortex_array::dtype::FieldPath;
1214
use vortex_array::expr::Expression;
@@ -52,6 +54,7 @@ impl MaterializationPlan {
5254
dtype: &DType,
5355
filter_present: bool,
5456
projection_field_mask: &[FieldMask],
57+
filter_field_names: &BTreeSet<FieldName>,
5558
) -> Self {
5659
let projected_row_bytes = estimate_field_mask_row_bytes(dtype, projection_field_mask);
5760
if !filter_present {
@@ -103,6 +106,14 @@ impl MaterializationPlan {
103106
};
104107

105108
let carry_cost_bytes_per_row = estimate_dtype_row_bytes(&field_dtype);
109+
// Fields shared with the filter are already fetched during filter evaluation,
110+
// so keep them immediate to avoid double IO.
111+
if filter_field_names.contains(name) {
112+
immediate_carry_cost =
113+
immediate_carry_cost.saturating_add(carry_cost_bytes_per_row);
114+
immediate.push(name.clone());
115+
continue;
116+
}
106117
if should_defer_field(&field_dtype, carry_cost_bytes_per_row) {
107118
deferred_carry_cost = deferred_carry_cost.saturating_add(carry_cost_bytes_per_row);
108119
deferred_groups.push(DeferredFieldGroup {
@@ -283,6 +294,8 @@ fn estimate_dtype_row_bytes(dtype: &DType) -> usize {
283294

284295
#[cfg(test)]
285296
mod tests {
297+
use std::collections::BTreeSet;
298+
286299
use vortex_array::dtype::DType;
287300
use vortex_array::dtype::Field;
288301
use vortex_array::dtype::FieldMask;
@@ -323,7 +336,7 @@ mod tests {
323336
fn deferred_plan_activates_for_narrow_filtered_projection() {
324337
let projection = select(["id", "payload"], root());
325338
let mask = vec![FieldMask::Prefix(FieldPath::from(Field::Name("id".into())))];
326-
let plan = MaterializationPlan::from_projection(&projection, &scan_dtype(), true, &mask);
339+
let plan = MaterializationPlan::from_projection(&projection, &scan_dtype(), true, &mask, &BTreeSet::new());
327340
let deferred = plan.deferred().expect("deferred plan");
328341
assert_eq!(
329342
deferred.final_fields(),
@@ -337,22 +350,22 @@ mod tests {
337350
fn deferred_plan_stays_off_for_unfiltered_projection() {
338351
let projection = select(["id", "payload"], root());
339352
let mask = vec![FieldMask::Prefix(FieldPath::from(Field::Name("id".into())))];
340-
let plan = MaterializationPlan::from_projection(&projection, &scan_dtype(), false, &mask);
353+
let plan = MaterializationPlan::from_projection(&projection, &scan_dtype(), false, &mask, &BTreeSet::new());
341354
assert!(plan.deferred().is_none());
342355
}
343356

344357
#[test]
345358
fn deferred_plan_stays_off_for_root_projection() {
346359
let mask = vec![FieldMask::Prefix(FieldPath::from(Field::Name("id".into())))];
347-
let plan = MaterializationPlan::from_projection(&root(), &scan_dtype(), true, &mask);
360+
let plan = MaterializationPlan::from_projection(&root(), &scan_dtype(), true, &mask, &BTreeSet::new());
348361
assert!(plan.deferred().is_none());
349362
}
350363

351364
#[test]
352365
fn deferred_plan_stays_off_for_wide_projection() {
353366
let projection = select(["id", "score", "payload", "nested"], root());
354367
let mask = vec![FieldMask::Prefix(FieldPath::from(Field::Name("id".into())))];
355-
let plan = MaterializationPlan::from_projection(&projection, &scan_dtype(), true, &mask);
368+
let plan = MaterializationPlan::from_projection(&projection, &scan_dtype(), true, &mask, &BTreeSet::new());
356369
assert!(plan.deferred().is_none());
357370
}
358371

vortex-scan/src/repeated_scan.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -562,24 +562,24 @@ impl<A: 'static + Send> StagedSplitStream<A> {
562562

563563
// Phase 1: Collect a window of ready splits
564564
let mut window: Vec<(usize, FilteredSplit)> = Vec::new();
565+
let mut window_bytes: usize = 0;
565566
while self.available_projection_slots() > window.len()
566567
&& self.should_start_projection()
567568
{
568569
let Some((idx, filtered)) = self.filter.take_ready() else {
569570
break;
570571
};
571-
if !window.is_empty()
572-
&& self.projection.in_flight_projection_bytes > 0
573-
&& self
574-
.projection
575-
.in_flight_projection_bytes
576-
.saturating_add(filtered.estimated_projection_bytes)
577-
> DEFERRED_IN_FLIGHT_BUDGET_BYTES
578-
{
572+
let candidate_total = self
573+
.projection
574+
.in_flight_projection_bytes
575+
.saturating_add(window_bytes)
576+
.saturating_add(filtered.estimated_projection_bytes);
577+
if !window.is_empty() && candidate_total > DEFERRED_IN_FLIGHT_BUDGET_BYTES {
579578
self.filter.push_ready(idx, filtered);
580579
break;
581580
}
582581

582+
window_bytes = window_bytes.saturating_add(filtered.estimated_projection_bytes);
583583
window.push((idx, filtered));
584584
}
585585

vortex-scan/src/scan_builder.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
// SPDX-License-Identifier: Apache-2.0
22
// SPDX-FileCopyrightText: Copyright the Vortex contributors
33

4+
use std::collections::BTreeSet;
45
use std::ops::Range;
56
use std::pin::Pin;
67
use std::sync::Arc;
@@ -281,11 +282,24 @@ impl<A: 'static + Send> ScanBuilder<A> {
281282
// producing work before we've traversed every touched layout.
282283
let (filter_mask, projection_mask, projection_only_mask) =
283284
filter_and_projection_masks(&projection, filter.as_ref(), layout_reader.dtype())?;
285+
let filter_field_names: BTreeSet<FieldName> = filter_mask
286+
.iter()
287+
.filter_map(|m| match m {
288+
FieldMask::Prefix(path) | FieldMask::Exact(path) => {
289+
path.parts().first().and_then(|f| match f {
290+
Field::Name(n) => Some(n.clone()),
291+
_ => None,
292+
})
293+
}
294+
FieldMask::All => None,
295+
})
296+
.collect();
284297
let materialization_plan = MaterializationPlan::from_projection(
285298
&projection,
286299
&dtype,
287300
filter.is_some(),
288301
&projection_mask,
302+
&filter_field_names,
289303
);
290304
let scan_metrics = self
291305
.metrics_registry

0 commit comments

Comments
 (0)