Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
175 changes: 173 additions & 2 deletions quickwit/quickwit-parquet-engine/src/storage/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,15 +298,69 @@ impl ParquetWriter {
Ok(sorted_batch)
}

/// Validate, sort, and build WriterProperties for a batch.
/// Reorder columns for optimal physical layout in the Parquet file.
///
/// Sort schema columns are placed first (in their configured sort order),
/// followed by all remaining data columns in alphabetical order. This
/// layout enables a two-GET streaming merge during compaction: the first
/// GET reads the footer, the second streams from the start of the row
/// group — sort columns arrive first, allowing the compactor to compute
/// the global merge order before data columns arrive.
fn reorder_columns(&self, batch: &RecordBatch) -> RecordBatch {
let schema = batch.schema();
let mut ordered_indices: Vec<usize> = Vec::with_capacity(schema.fields().len());
let mut used = vec![false; schema.fields().len()];

// Phase 1: sort schema columns in their configured order.
for sf in &self.resolved_sort_fields {
if let Ok(idx) = schema.index_of(sf.name.as_str()) {
if !used[idx] {
ordered_indices.push(idx);
used[idx] = true;
}
}
}

// Phase 2: remaining columns, alphabetically by name.
let mut remaining: Vec<(usize, &str)> = schema
.fields()
.iter()
.enumerate()
.filter(|(i, _)| !used[*i])
.map(|(i, f)| (i, f.name().as_str()))
.collect();
remaining.sort_by_key(|(_, name)| *name);
for (idx, _) in remaining {
ordered_indices.push(idx);
}

// Build reordered schema and columns.
let new_fields: Vec<Arc<arrow::datatypes::Field>> = ordered_indices
.iter()
.map(|&i| Arc::new(schema.field(i).clone()))
.collect();
let new_columns: Vec<Arc<dyn arrow::array::Array>> = ordered_indices
.iter()
.map(|&i| Arc::clone(batch.column(i)))
.collect();
let new_schema = Arc::new(arrow::datatypes::Schema::new_with_metadata(
new_fields,
schema.metadata().clone(),
));

RecordBatch::try_new(new_schema, new_columns)
.expect("reorder_columns: schema and columns must be consistent")
}

/// Validate, sort, reorder columns, and build WriterProperties for a batch.
fn prepare_write(
&self,
batch: &RecordBatch,
split_metadata: Option<&MetricsSplitMetadata>,
) -> Result<(RecordBatch, WriterProperties), ParquetWriteError> {
validate_required_fields(&batch.schema())
.map_err(|e| ParquetWriteError::SchemaValidation(e.to_string()))?;
let sorted_batch = self.sort_batch(batch)?;
let sorted_batch = self.reorder_columns(&self.sort_batch(batch)?);

let kv_metadata = split_metadata.map(build_compaction_key_value_metadata);

Expand Down Expand Up @@ -907,4 +961,121 @@ mod tests {
json_str
);
}

#[test]
fn test_column_ordering_sort_columns_first_then_alphabetical() {
// Default metrics sort fields: metric_name|service|env|datacenter|region|host|
// timeseries_id|timestamp_secs
let config = ParquetWriterConfig::default();
let writer = ParquetWriter::new(config, &TableConfig::default());

// Create a batch with columns in a deliberately scrambled order.
// The tag columns (service, env, region, host) plus two extra data
// columns (zzz_extra, aaa_extra) that are NOT in the sort schema.
let batch = create_test_batch_with_tags(
3,
&["host", "zzz_extra", "env", "region", "service", "aaa_extra"],
);
let input_schema = batch.schema();
let input_names: Vec<&str> = input_schema
.fields()
.iter()
.map(|f| f.name().as_str())
.collect();
// Sanity: input has tag columns in the scrambled order we specified,
// not in sort-schema or alphabetical order.
let host_pos = input_names.iter().position(|n| *n == "host").unwrap();
let service_pos = input_names.iter().position(|n| *n == "service").unwrap();
assert!(
host_pos < service_pos,
"input should have host before service (scrambled), got: {:?}",
input_names
);

let reordered = writer.reorder_columns(&batch);
let schema = reordered.schema();
let names: Vec<String> = schema.fields().iter().map(|f| f.name().clone()).collect();

// Sort schema columns that are present should come first, in sort order.
// From the default: metric_name, service, env, region, host, timestamp_secs
// (datacenter and timeseries_id are not in the batch).
// metric_type and value are required fields but NOT sort columns.
let expected_prefix = [
"metric_name",
"service",
"env",
"region",
"host",
"timestamp_secs",
];
let sort_prefix: Vec<&str> = names
.iter()
.map(|s| s.as_str())
.take_while(|n| expected_prefix.contains(n))
.collect();
assert_eq!(
sort_prefix, expected_prefix,
"sort schema columns should appear first in configured order, got: {:?}",
names
);

// Remaining columns should be alphabetical.
let remaining: Vec<&str> = names
.iter()
.skip(sort_prefix.len())
.map(|s| s.as_str())
.collect();
let mut sorted_remaining = remaining.clone();
sorted_remaining.sort();
assert_eq!(
remaining, sorted_remaining,
"non-sort columns should be in alphabetical order, got: {:?}",
remaining
);

// All original columns must be present (no data loss).
assert_eq!(reordered.num_columns(), batch.num_columns());
assert_eq!(reordered.num_rows(), batch.num_rows());
}

#[test]
fn test_column_ordering_preserved_in_parquet_file() {
use std::fs::File;

use parquet::file::reader::{FileReader, SerializedFileReader};

let config = ParquetWriterConfig::default();
let writer = ParquetWriter::new(config, &TableConfig::default());

let batch = create_test_batch_with_tags(3, &["host", "zzz_extra", "env", "service"]);

let temp_dir = std::env::temp_dir();
let path = temp_dir.join("test_column_ordering.parquet");
writer
.write_to_file_with_metadata(&batch, &path, None)
.unwrap();

// Read back and verify physical column order from the Parquet schema.
let file = File::open(&path).unwrap();
let reader = SerializedFileReader::new(file).unwrap();
let parquet_schema = reader.metadata().file_metadata().schema_descr();
let col_names: Vec<String> = (0..parquet_schema.num_columns())
.map(|i| parquet_schema.column(i).name().to_string())
.collect();

// Sort columns first: metric_name, service, env, host, timestamp_secs
// Then remaining alphabetically: metric_type, value, zzz_extra
assert_eq!(col_names[0], "metric_name");
assert_eq!(col_names[1], "service");
assert_eq!(col_names[2], "env");
assert_eq!(col_names[3], "host");
assert_eq!(col_names[4], "timestamp_secs");

let remaining = &col_names[5..];
let mut sorted = remaining.to_vec();
sorted.sort();
assert_eq!(remaining, &sorted, "data columns should be alphabetical");

std::fs::remove_file(&path).ok();
}
}
Loading