Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 117 additions & 0 deletions crates/integration_tests/tests/read_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1152,6 +1152,20 @@ async fn test_read_schema_evolution_type_promotion() {
);
}

fn assert_plan_file_formats(plan: &Plan, expected_formats: &[&str], table_name: &str) {
let formats: HashSet<&str> = plan
.splits()
.iter()
.flat_map(|split| split.data_files())
.filter_map(|file| file.file_name.rsplit_once('.').map(|(_, ext)| ext))
.collect();
assert_eq!(
formats,
expected_formats.iter().copied().collect(),
"{table_name} should scan the expected data file formats"
);
}

/// Stats pruning should treat a newly added column as all-NULL for old files.
#[tokio::test]
async fn test_stats_pruning_schema_evolution_added_column_eq_prunes_old_files() {
Expand Down Expand Up @@ -1379,6 +1393,109 @@ async fn test_read_schema_evolution_drop_column() {
);
}

/// Test reading a mixed-format table after ALTER COLUMN ... FIRST/AFTER.
/// Old files keep the original physical column order; new files use moved columns.
#[tokio::test]
async fn test_read_mixed_format_schema_evolution_reorder_move_column() {
let (plan, batches) =
scan_and_read_with_fs_catalog("mixed_format_schema_evolution_reorder_move_column", None)
.await;

assert_plan_file_formats(
&plan,
&["avro", "orc", "parquet"],
"mixed_format_schema_evolution_reorder_move_column",
);

for batch in &batches {
let schema = batch.schema();
let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
assert_eq!(
field_names,
vec!["right_value", "left_value", "id"],
"Full read should expose the current table schema order"
);
}

let mut rows: Vec<(i32, String, String)> = Vec::new();
for batch in &batches {
let right_value = batch
.column_by_name("right_value")
.and_then(|c| c.as_any().downcast_ref::<StringArray>())
.expect("right_value");
let left_value = batch
.column_by_name("left_value")
.and_then(|c| c.as_any().downcast_ref::<StringArray>())
.expect("left_value");
let id = batch
.column_by_name("id")
.and_then(|c| c.as_any().downcast_ref::<Int32Array>())
.expect("id");
for i in 0..batch.num_rows() {
rows.push((
id.value(i),
left_value.value(i).to_string(),
right_value.value(i).to_string(),
));
}
}
rows.sort_by_key(|(id, _, _)| *id);

assert_eq!(
rows,
vec![
(1, "parquet-left-1".into(), "parquet-right-1".into()),
(2, "parquet-left-2".into(), "parquet-right-2".into()),
(3, "orc-left-3".into(), "orc-right-3".into()),
(4, "orc-left-4".into(), "orc-right-4".into()),
(5, "avro-left-5".into(), "avro-right-5".into()),
(6, "avro-left-6".into(), "avro-right-6".into()),
],
"Mixed-format REORDER/MOVE COLUMN should read values by field id, not physical position"
);

let (_, projected_batches) = scan_and_read_with_fs_catalog(
"mixed_format_schema_evolution_reorder_move_column",
Some(&["id", "right_value"]),
)
.await;
let mut projected_rows: Vec<(i32, String)> = Vec::new();
for batch in &projected_batches {
let schema = batch.schema();
let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
assert_eq!(
field_names,
vec!["id", "right_value"],
"Projection should follow caller-specified order"
);

let id = batch
.column_by_name("id")
.and_then(|c| c.as_any().downcast_ref::<Int32Array>())
.expect("projected id");
let right_value = batch
.column_by_name("right_value")
.and_then(|c| c.as_any().downcast_ref::<StringArray>())
.expect("projected right_value");
for i in 0..batch.num_rows() {
projected_rows.push((id.value(i), right_value.value(i).to_string()));
}
}
projected_rows.sort_by_key(|(id, _)| *id);
assert_eq!(
projected_rows,
vec![
(1, "parquet-right-1".into()),
(2, "parquet-right-2".into()),
(3, "orc-right-3".into()),
(4, "orc-right-4".into()),
(5, "avro-right-5".into()),
(6, "avro-right-6".into()),
],
"Projection should still map reordered old and new files by field id"
);
}

// ---------------------------------------------------------------------------
// Complex type integration tests
// ---------------------------------------------------------------------------
Expand Down
50 changes: 50 additions & 0 deletions dev/spark/provision.py
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,56 @@ def main():
"""
)

# ===== Mixed-format Schema Evolution: Reorder/Move Column =====
# Old Parquet files use the original order (id, left_value, right_value).
# ORC and Avro files are written after moving columns; readers should expose
# the current table schema order and map old/new files by field id.
spark.sql(
"""
CREATE TABLE IF NOT EXISTS mixed_format_schema_evolution_reorder_move_column (
id INT,
left_value STRING,
right_value STRING
) USING paimon
TBLPROPERTIES (
'file.format' = 'parquet'
)
"""
)
spark.sql(
"""
INSERT INTO mixed_format_schema_evolution_reorder_move_column VALUES
(1, 'parquet-left-1', 'parquet-right-1'),
(2, 'parquet-left-2', 'parquet-right-2')
"""
)
spark.sql(
"ALTER TABLE mixed_format_schema_evolution_reorder_move_column ALTER COLUMN right_value FIRST"
)
spark.sql(
"ALTER TABLE mixed_format_schema_evolution_reorder_move_column SET TBLPROPERTIES ('file.format' = 'orc')"
)
spark.sql(
"""
INSERT INTO mixed_format_schema_evolution_reorder_move_column VALUES
('orc-right-3', 3, 'orc-left-3'),
('orc-right-4', 4, 'orc-left-4')
"""
)
spark.sql(
"ALTER TABLE mixed_format_schema_evolution_reorder_move_column ALTER COLUMN left_value AFTER right_value"
)
spark.sql(
"ALTER TABLE mixed_format_schema_evolution_reorder_move_column SET TBLPROPERTIES ('file.format' = 'avro')"
)
spark.sql(
"""
INSERT INTO mixed_format_schema_evolution_reorder_move_column VALUES
('avro-right-5', 'avro-left-5', 5),
('avro-right-6', 'avro-left-6', 6)
"""
)

# ===== Complex Types table: ARRAY, MAP, STRUCT =====
spark.sql(
"""
Expand Down
Loading