Skip to content

Commit d4979d2

Browse files
committed
More sophisticated monolithic path
Signed-off-by: Adam Gutglick <adam@spiraldb.com>
1 parent 5227ea7 commit d4979d2

4 files changed

Lines changed: 86 additions & 32 deletions

File tree

vortex-scan/src/fetch_plan.rs

Lines changed: 41 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@ impl MaterializationPlan {
5757
filter_field_names: &BTreeSet<FieldName>,
5858
) -> Self {
5959
let projected_row_bytes = estimate_field_mask_row_bytes(dtype, projection_field_mask);
60+
let projection_aligned_splits =
61+
filter_present && projection_masks_include_wide_fields(dtype, projection_field_mask);
6062
if !filter_present {
6163
return Self::Monolithic {
6264
projected_row_bytes,
@@ -67,20 +69,20 @@ impl MaterializationPlan {
6769
let Some(final_fields) = simple_root_projection_fields(projection, dtype) else {
6870
return Self::Monolithic {
6971
projected_row_bytes,
70-
projection_aligned_splits: false,
72+
projection_aligned_splits,
7173
};
7274
};
7375
if final_fields.is_empty() || !final_fields.iter().all_unique() {
7476
return Self::Monolithic {
7577
projected_row_bytes,
76-
projection_aligned_splits: false,
78+
projection_aligned_splits,
7779
};
7880
}
7981

8082
let Some(struct_fields) = dtype.as_struct_fields_opt() else {
8183
return Self::Monolithic {
8284
projected_row_bytes,
83-
projection_aligned_splits: false,
85+
projection_aligned_splits,
8486
};
8587
};
8688
if final_fields.len() == struct_fields.nfields()
@@ -101,7 +103,7 @@ impl MaterializationPlan {
101103
let Some(field_dtype) = struct_fields.field(name) else {
102104
return Self::Monolithic {
103105
projected_row_bytes,
104-
projection_aligned_splits: false,
106+
projection_aligned_splits,
105107
};
106108
};
107109

@@ -133,15 +135,15 @@ impl MaterializationPlan {
133135
if deferred_groups.is_empty() {
134136
return Self::Monolithic {
135137
projected_row_bytes,
136-
projection_aligned_splits: false,
138+
projection_aligned_splits,
137139
};
138140
}
139141

140142
let total_carry_cost = immediate_carry_cost.saturating_add(deferred_carry_cost);
141143
if total_carry_cost == 0 || deferred_carry_cost.saturating_mul(2) < total_carry_cost {
142144
return Self::Monolithic {
143145
projected_row_bytes,
144-
projection_aligned_splits: false,
146+
projection_aligned_splits,
145147
};
146148
}
147149

@@ -220,6 +222,38 @@ impl DeferredFieldGroup {
220222
}
221223
}
222224

225+
fn projection_masks_include_wide_fields(dtype: &DType, field_masks: &[FieldMask]) -> bool {
226+
field_masks
227+
.iter()
228+
.any(|mask| mask_targets_wide_field(dtype, mask))
229+
}
230+
231+
fn mask_targets_wide_field(dtype: &DType, field_mask: &FieldMask) -> bool {
232+
match field_mask {
233+
FieldMask::All => true,
234+
FieldMask::Prefix(path) | FieldMask::Exact(path) => {
235+
if path.is_root() {
236+
return true;
237+
}
238+
239+
path.resolve(dtype.clone())
240+
.map(|target| is_wide_projection_dtype(&target))
241+
.unwrap_or_else(|| is_wide_projection_dtype(dtype))
242+
}
243+
}
244+
}
245+
246+
fn is_wide_projection_dtype(dtype: &DType) -> bool {
247+
matches!(
248+
dtype,
249+
DType::Utf8(_)
250+
| DType::Binary(_)
251+
| DType::List(..)
252+
| DType::FixedSizeList(..)
253+
| DType::Struct(..)
254+
)
255+
}
256+
223257
fn simple_root_projection_fields(projection: &Expression, dtype: &DType) -> Option<FieldNames> {
224258
let struct_fields = dtype.as_struct_fields_opt()?;
225259
if projection.is::<Root>() {
@@ -237,14 +271,7 @@ fn simple_root_projection_fields(projection: &Expression, dtype: &DType) -> Opti
237271
}
238272

239273
fn should_defer_field(dtype: &DType, row_cost_bytes: usize) -> bool {
240-
matches!(
241-
dtype,
242-
DType::Utf8(_)
243-
| DType::Binary(_)
244-
| DType::List(..)
245-
| DType::FixedSizeList(..)
246-
| DType::Struct(..)
247-
) || row_cost_bytes > IMMEDIATE_FIELD_ROW_BYTES_THRESHOLD
274+
is_wide_projection_dtype(dtype) || row_cost_bytes > IMMEDIATE_FIELD_ROW_BYTES_THRESHOLD
248275
}
249276

250277
pub(crate) fn estimate_field_mask_row_bytes(dtype: &DType, field_masks: &[FieldMask]) -> usize {

vortex-scan/src/repeated_scan.rs

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -278,15 +278,12 @@ impl<A: 'static + Send> RepeatedScan<A> {
278278
) -> VortexResult<BoxStream<'static, VortexResult<A>>> {
279279
let ctx = self.task_context();
280280
let concurrency = concurrency.max(1);
281-
if matches!(self.materialization_plan, MaterializationPlan::Monolithic { .. }) {
281+
if matches!(
282+
self.materialization_plan,
283+
MaterializationPlan::Monolithic { .. }
284+
) {
282285
let split_ranges = self.split_ranges(row_range)?.collect::<Vec<_>>();
283-
return self.legacy_stream_from_ranges(
284-
ctx,
285-
split_ranges,
286-
concurrency,
287-
ordered,
288-
handle,
289-
);
286+
return self.legacy_stream_from_ranges(ctx, split_ranges, concurrency, ordered, handle);
290287
}
291288

292289
let filter_ahead = filter_ahead_for(concurrency, self.filter.is_some());

vortex-scan/src/scan_builder.rs

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -989,6 +989,7 @@ mod test {
989989
("b", DType::Primitive(PType::I32, Nullability::NonNullable)),
990990
("c", DType::Primitive(PType::I32, Nullability::NonNullable)),
991991
("d", DType::Primitive(PType::I32, Nullability::NonNullable)),
992+
("payload", DType::Utf8(Nullability::Nullable)),
992993
]),
993994
Nullability::NonNullable,
994995
),
@@ -1053,7 +1054,8 @@ mod test {
10531054
_expr: &Expression,
10541055
_mask: MaskFuture,
10551056
) -> VortexResult<ArrayFuture> {
1056-
unimplemented!("not needed for this test");
1057+
let array = PrimitiveArray::from_iter([1i32]).into_array();
1058+
Ok(Box::pin(async move { Ok(array) }))
10571059
}
10581060
}
10591061

@@ -1070,7 +1072,7 @@ mod test {
10701072
drop(scan.execute(None)?);
10711073

10721074
let seen_masks = seen_masks.lock();
1073-
assert_eq!(seen_masks.len(), 1);
1075+
assert!(!seen_masks.is_empty());
10741076
assert!(
10751077
seen_masks[0].contains(&FieldMask::Prefix(FieldPath::from_name("a"))),
10761078
"expected split discovery to include filter field"
@@ -1092,7 +1094,7 @@ mod test {
10921094
drop(scan.execute(None)?);
10931095

10941096
let seen_masks = seen_masks.lock();
1095-
assert_eq!(seen_masks.len(), 1);
1097+
assert!(!seen_masks.is_empty());
10961098
assert_eq!(
10971099
seen_masks[0],
10981100
vec![FieldMask::Prefix(FieldPath::from_name("b"))]
@@ -1114,7 +1116,7 @@ mod test {
11141116
drop(scan.execute(None)?);
11151117

11161118
let seen_masks = seen_masks.lock();
1117-
assert_eq!(seen_masks.len(), 1);
1119+
assert!(!seen_masks.is_empty());
11181120
assert_eq!(seen_masks[0].len(), 4);
11191121
assert!(seen_masks[0].contains(&FieldMask::Prefix(FieldPath::from_name("a"))));
11201122
assert!(seen_masks[0].contains(&FieldMask::Prefix(FieldPath::from_name("b"))));
@@ -1123,4 +1125,25 @@ mod test {
11231125

11241126
Ok(())
11251127
}
1128+
1129+
#[test]
1130+
fn filtered_wide_single_column_split_discovery_includes_projection_field() -> VortexResult<()> {
1131+
let seen_masks = Arc::new(Mutex::new(Vec::new()));
1132+
let reader = Arc::new(RecordingSplitMaskReader::new(seen_masks.clone()));
1133+
let session = crate::test::SCAN_SESSION.clone();
1134+
1135+
let scan = ScanBuilder::new(session, reader)
1136+
.with_filter(eq(col("a"), lit(1i32)))
1137+
.with_projection(col("payload"))
1138+
.prepare()?;
1139+
drop(scan.execute(None)?);
1140+
1141+
let seen_masks = seen_masks.lock();
1142+
assert!(!seen_masks.is_empty());
1143+
assert_eq!(seen_masks[0].len(), 2);
1144+
assert!(seen_masks[0].contains(&FieldMask::Prefix(FieldPath::from_name("a"))));
1145+
assert!(seen_masks[0].contains(&FieldMask::Prefix(FieldPath::from_name("payload"))));
1146+
1147+
Ok(())
1148+
}
11261149
}

vortex-scan/src/tasks.rs

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,10 @@ pub(super) fn split_exec<A: 'static + Send>(
244244
split: Range<u64>,
245245
limit: Option<&mut u64>,
246246
) -> VortexResult<TaskFuture<Option<A>>> {
247-
if matches!(ctx.materialization_plan, MaterializationPlan::Monolithic { .. }) {
247+
if matches!(
248+
ctx.materialization_plan,
249+
MaterializationPlan::Monolithic { .. }
250+
) {
248251
return split_exec_monolithic(ctx, split, limit);
249252
}
250253

@@ -346,9 +349,11 @@ fn split_exec_monolithic<A: 'static + Send>(
346349
}
347350
};
348351

349-
let projection_fetch_hints =
350-
ctx.materialization_plan
351-
.fetch_hints(ctx.reader.as_ref(), &ctx.projection_field_mask, &row_range)?;
352+
let projection_fetch_hints = ctx.materialization_plan.fetch_hints(
353+
ctx.reader.as_ref(),
354+
&ctx.projection_field_mask,
355+
&row_range,
356+
)?;
352357
let projection_field_count = projection_field_count(&ctx.materialization_plan, &ctx);
353358
let (projection_future, segment_request_count) = with_request_count_scope(|| {
354359
ctx.reader
@@ -372,7 +377,9 @@ fn split_exec_monolithic<A: 'static + Send>(
372377
metrics
373378
.projection_fetch_hints
374379
.update(projection_fetch_hints.len() as f64);
375-
metrics.projection_fields.update(projection_field_count as f64);
380+
metrics
381+
.projection_fields
382+
.update(projection_field_count as f64);
376383
}
377384

378385
let array = projection_future.await?;
@@ -757,7 +764,7 @@ mod tests {
757764
segment_source: None,
758765
});
759766

760-
let _future = filter_split(ctx, 0..4, None).unwrap();
767+
let _future = filter_split(ctx.clone(), 0..4, None).unwrap();
761768
assert_eq!(projection_calls.load(Ordering::Relaxed), 0);
762769

763770
let result = block_on(split_exec(ctx, 0..4, None).unwrap()).unwrap();

0 commit comments

Comments
 (0)