diff --git a/quickwit/quickwit-parquet-engine/src/storage/writer.rs b/quickwit/quickwit-parquet-engine/src/storage/writer.rs index 2fcf45b9699..4722cd89538 100644 --- a/quickwit/quickwit-parquet-engine/src/storage/writer.rs +++ b/quickwit/quickwit-parquet-engine/src/storage/writer.rs @@ -298,7 +298,61 @@ impl ParquetWriter { Ok(sorted_batch) } - /// Validate, sort, and build WriterProperties for a batch. + /// Reorder columns for optimal physical layout in the Parquet file. + /// + /// Sort schema columns are placed first (in their configured sort order), + /// followed by all remaining data columns in alphabetical order. This + /// layout enables a two-GET streaming merge during compaction: the first + /// GET reads the footer, the second streams from the start of the row + /// group — sort columns arrive first, allowing the compactor to compute + /// the global merge order before data columns arrive. + fn reorder_columns(&self, batch: &RecordBatch) -> RecordBatch { + let schema = batch.schema(); + let mut ordered_indices: Vec = Vec::with_capacity(schema.fields().len()); + let mut used = vec![false; schema.fields().len()]; + + // Phase 1: sort schema columns in their configured order. + for sf in &self.resolved_sort_fields { + if let Ok(idx) = schema.index_of(sf.name.as_str()) { + if !used[idx] { + ordered_indices.push(idx); + used[idx] = true; + } + } + } + + // Phase 2: remaining columns, alphabetically by name. + let mut remaining: Vec<(usize, &str)> = schema + .fields() + .iter() + .enumerate() + .filter(|(i, _)| !used[*i]) + .map(|(i, f)| (i, f.name().as_str())) + .collect(); + remaining.sort_by_key(|(_, name)| *name); + for (idx, _) in remaining { + ordered_indices.push(idx); + } + + // Build reordered schema and columns. + let new_fields: Vec> = ordered_indices + .iter() + .map(|&i| Arc::new(schema.field(i).clone())) + .collect(); + let new_columns: Vec> = ordered_indices + .iter() + .map(|&i| Arc::clone(batch.column(i))) + .collect(); + let new_schema = Arc::new(arrow::datatypes::Schema::new_with_metadata( + new_fields, + schema.metadata().clone(), + )); + + RecordBatch::try_new(new_schema, new_columns) + .expect("reorder_columns: schema and columns must be consistent") + } + + /// Validate, sort, reorder columns, and build WriterProperties for a batch. fn prepare_write( &self, batch: &RecordBatch, @@ -306,7 +360,7 @@ impl ParquetWriter { ) -> Result<(RecordBatch, WriterProperties), ParquetWriteError> { validate_required_fields(&batch.schema()) .map_err(|e| ParquetWriteError::SchemaValidation(e.to_string()))?; - let sorted_batch = self.sort_batch(batch)?; + let sorted_batch = self.reorder_columns(&self.sort_batch(batch)?); let kv_metadata = split_metadata.map(build_compaction_key_value_metadata); @@ -907,4 +961,121 @@ mod tests { json_str ); } + + #[test] + fn test_column_ordering_sort_columns_first_then_alphabetical() { + // Default metrics sort fields: metric_name|service|env|datacenter|region|host| + // timeseries_id|timestamp_secs + let config = ParquetWriterConfig::default(); + let writer = ParquetWriter::new(config, &TableConfig::default()); + + // Create a batch with columns in a deliberately scrambled order. + // The tag columns (service, env, region, host) plus two extra data + // columns (zzz_extra, aaa_extra) that are NOT in the sort schema. + let batch = create_test_batch_with_tags( + 3, + &["host", "zzz_extra", "env", "region", "service", "aaa_extra"], + ); + let input_schema = batch.schema(); + let input_names: Vec<&str> = input_schema + .fields() + .iter() + .map(|f| f.name().as_str()) + .collect(); + // Sanity: input has tag columns in the scrambled order we specified, + // not in sort-schema or alphabetical order. + let host_pos = input_names.iter().position(|n| *n == "host").unwrap(); + let service_pos = input_names.iter().position(|n| *n == "service").unwrap(); + assert!( + host_pos < service_pos, + "input should have host before service (scrambled), got: {:?}", + input_names + ); + + let reordered = writer.reorder_columns(&batch); + let schema = reordered.schema(); + let names: Vec = schema.fields().iter().map(|f| f.name().clone()).collect(); + + // Sort schema columns that are present should come first, in sort order. + // From the default: metric_name, service, env, region, host, timestamp_secs + // (datacenter and timeseries_id are not in the batch). + // metric_type and value are required fields but NOT sort columns. + let expected_prefix = [ + "metric_name", + "service", + "env", + "region", + "host", + "timestamp_secs", + ]; + let sort_prefix: Vec<&str> = names + .iter() + .map(|s| s.as_str()) + .take_while(|n| expected_prefix.contains(n)) + .collect(); + assert_eq!( + sort_prefix, expected_prefix, + "sort schema columns should appear first in configured order, got: {:?}", + names + ); + + // Remaining columns should be alphabetical. + let remaining: Vec<&str> = names + .iter() + .skip(sort_prefix.len()) + .map(|s| s.as_str()) + .collect(); + let mut sorted_remaining = remaining.clone(); + sorted_remaining.sort(); + assert_eq!( + remaining, sorted_remaining, + "non-sort columns should be in alphabetical order, got: {:?}", + remaining + ); + + // All original columns must be present (no data loss). + assert_eq!(reordered.num_columns(), batch.num_columns()); + assert_eq!(reordered.num_rows(), batch.num_rows()); + } + + #[test] + fn test_column_ordering_preserved_in_parquet_file() { + use std::fs::File; + + use parquet::file::reader::{FileReader, SerializedFileReader}; + + let config = ParquetWriterConfig::default(); + let writer = ParquetWriter::new(config, &TableConfig::default()); + + let batch = create_test_batch_with_tags(3, &["host", "zzz_extra", "env", "service"]); + + let temp_dir = std::env::temp_dir(); + let path = temp_dir.join("test_column_ordering.parquet"); + writer + .write_to_file_with_metadata(&batch, &path, None) + .unwrap(); + + // Read back and verify physical column order from the Parquet schema. + let file = File::open(&path).unwrap(); + let reader = SerializedFileReader::new(file).unwrap(); + let parquet_schema = reader.metadata().file_metadata().schema_descr(); + let col_names: Vec = (0..parquet_schema.num_columns()) + .map(|i| parquet_schema.column(i).name().to_string()) + .collect(); + + // Sort columns first: metric_name, service, env, host, timestamp_secs + // Then remaining alphabetically: metric_type, value, zzz_extra + assert_eq!(col_names[0], "metric_name"); + assert_eq!(col_names[1], "service"); + assert_eq!(col_names[2], "env"); + assert_eq!(col_names[3], "host"); + assert_eq!(col_names[4], "timestamp_secs"); + + let remaining = &col_names[5..]; + let mut sorted = remaining.to_vec(); + sorted.sort(); + assert_eq!(remaining, &sorted, "data columns should be alphabetical"); + + std::fs::remove_file(&path).ok(); + } }