Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions vortex-array/src/expr/transform/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ use crate::expr::traversal::NodeExt;
use crate::expr::traversal::NodeRewriter;
use crate::expr::traversal::Transformed;
use crate::expr::traversal::TraversalOrder;
use crate::scalar_fn::fns::merge::Merge;
use crate::scalar_fn::fns::pack::Pack;

/// Partition an expression into sub-expressions that are uniquely associated with an annotation.
/// A root expression is also returned that can be used to recombine the results of the partitions
Expand Down Expand Up @@ -183,8 +185,11 @@ where

fn visit_down(&mut self, node: Self::NodeTy) -> VortexResult<Transformed<Self::NodeTy>> {
match self.annotations.get(&node) {
// If this expression only accesses a single field, then we can skip the children
Some(annotations) if annotations.len() == 1 => {
// Single-field expressions can skip their children, except reconstruction nodes
// (`pack`/`merge`): collapsing those applies nested-struct validity at the wrong level.
Some(annotations)
if annotations.len() == 1 && !node.is::<Pack>() && !node.is::<Merge>() =>
{
let annotation = annotations
.iter()
.next()
Expand Down Expand Up @@ -336,11 +341,13 @@ mod tests {
let expr = merge([col("a"), pack([("b", col("b"))], NonNullable)]);

let partitioned = partition(expr, &dtype, make_free_field_annotator(fields)).unwrap();
// The `pack` operand stays in the root (reconstruction nodes are not collapsed), so `b` is
// referenced as `$.b.b_0` over a bare `col("b")` partition, equivalent to the old `$.b.b_0.b`.
let expected = pack(
[
("x", get_item("x", get_item("a_0", col("a")))),
("y", get_item("y", get_item("a_0", col("a")))),
("b", get_item("b", get_item("b_0", col("b")))),
("b", get_item("b_0", col("b"))),
],
NonNullable,
);
Expand All @@ -357,7 +364,7 @@ mod tests {
assert_eq!(part_a, &expected_a, "{part_a} {expected_a}");

let part_b = partitioned.find_partition(&"b".into()).unwrap();
let expected_b = pack([("b_0", pack([("b", col("b"))], NonNullable))], NonNullable);
let expected_b = pack([("b_0", col("b"))], NonNullable);
assert_eq!(part_b, &expected_b, "{part_b} {expected_b}");
}
}
56 changes: 56 additions & 0 deletions vortex-file/tests/test_write_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,3 +209,59 @@ async fn test_write_empty_nullable_struct_column() {
}
assert_eq!(rows, 0);
}

/// Regression test: a nullable struct field nested directly inside another single-field struct
/// used to come back non-nullable on a file round-trip (`{c0={a={b=i32}?}?}` read back as
/// `{c0={a={b=i32}}?}`), silently dropping the inner `?`.
#[tokio::test]
async fn test_nested_nullable_struct_roundtrip() {
let mut ctx = SESSION.create_execution_ctx();

// {c0 = {a = {b = i32}?}?}: c0 is null at row 1, a is null at row 0.
let b = PrimitiveArray::from_iter([0i32, 0]).into_array();
let a = StructArray::new(
FieldNames::from(["b"]),
vec![b],
2,
Validity::Array(BoolArray::from_iter([false, true]).into_array()),
)
.into_array();
let c0 = StructArray::new(
FieldNames::from(["a"]),
vec![a],
2,
Validity::Array(BoolArray::from_iter([true, false]).into_array()),
)
.into_array();
let data =
StructArray::new(FieldNames::from(["c0"]), vec![c0], 2, Validity::NonNullable).into_array();

let mut bytes = Vec::new();
SESSION
.write_options()
.write(&mut bytes, data.to_array_stream())
.await
.expect("write");

let bytes = ByteBuffer::from(bytes);
let vxf = SESSION.open_options().open_buffer(bytes).expect("open");
let stream = vxf
.scan()
.expect("scan")
.into_stream()
.expect("into_stream");
pin_mut!(stream);

let chunk = stream
.next()
.await
.unwrap()
.expect("read back should succeed");
assert_eq!(
chunk.dtype(),
data.dtype(),
"nested struct field nullability must round-trip"
);
vortex_array::assert_arrays_eq!(data, chunk, &mut ctx);
assert!(stream.next().await.is_none(), "expected a single chunk");
}
16 changes: 10 additions & 6 deletions vortex-layout/src/layouts/struct_/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,12 +188,16 @@ impl StructReader {
)?;

if partitioned.partitions.len() == 1 {
// If there's only one partition, we step into the field scope of the original
// expression by replacing any `$.a` with `$`.
return Ok(Partitioned::Single(
partitioned.partition_names[0].clone(),
replace(expr, &col(partitioned.partition_names[0].clone()), root()),
));
// A single partition can delegate to the child verbatim (`$.a` -> `$`), but only when
// nothing is reconstructed here: `pack`/`merge` accumulate wrappers across nested
// structs and misplace validity, so reconstruction falls through to the multi path.
let single = replace(expr, &col(partitioned.partition_names[0].clone()), root());
if !(single.is::<Pack>() || single.is::<Merge>()) {
return Ok(Partitioned::Single(
partitioned.partition_names[0].clone(),
single,
));
}
}

// We now need to process the partitioned expressions to rewrite the root scope
Expand Down
Loading