From 4890e211b2140b75a45ebdaf1825ce1a68325487 Mon Sep 17 00:00:00 2001 From: JingsongLi Date: Thu, 16 Apr 2026 14:50:03 +0800 Subject: [PATCH 1/2] feat: add postpone bucket (bucket=-2) write support for primary-key tables Postpone bucket mode writes data in KV format without sorting or deduplication, deferring bucket assignment to background compaction. Files are written to `bucket-postpone` directory and are invisible to normal reads until compacted. --- .../datafusion/tests/pk_tables.rs | 121 ++++++ crates/paimon/src/spec/core_options.rs | 15 + crates/paimon/src/table/data_file_writer.rs | 10 +- crates/paimon/src/table/kv_file_writer.rs | 57 +-- crates/paimon/src/table/mod.rs | 1 + .../paimon/src/table/postpone_file_writer.rs | 290 ++++++++++++++ crates/paimon/src/table/table_scan.rs | 13 +- crates/paimon/src/table/table_write.rs | 374 ++++++++++++++++-- crates/paimon/src/table/write_builder.rs | 2 +- 9 files changed, 814 insertions(+), 69 deletions(-) create mode 100644 crates/paimon/src/table/postpone_file_writer.rs diff --git a/crates/integrations/datafusion/tests/pk_tables.rs b/crates/integrations/datafusion/tests/pk_tables.rs index 753963d1..b2fae224 100644 --- a/crates/integrations/datafusion/tests/pk_tables.rs +++ b/crates/integrations/datafusion/tests/pk_tables.rs @@ -1259,3 +1259,124 @@ async fn test_pk_first_row_insert_overwrite() { "After second OVERWRITE: still 2 files (no stale level-0 files accumulated)" ); } + +// ======================= Postpone Bucket (bucket = -2) ======================= + +/// Postpone bucket files are invisible to normal SELECT but visible via scan_all_files. +#[tokio::test] +async fn test_postpone_write_invisible_to_select() { + let (_tmp, catalog) = create_test_env(); + let handler = create_handler(catalog.clone()); + handler + .sql("CREATE SCHEMA paimon.test_db") + .await + .expect("CREATE SCHEMA failed"); + + handler + .sql( + "CREATE TABLE paimon.test_db.t_postpone ( + id INT NOT NULL, value INT, + PRIMARY KEY (id) + ) WITH ('bucket' = '-2')", + ) + .await + .unwrap(); + + // Write data + handler + .sql("INSERT INTO paimon.test_db.t_postpone VALUES (1, 10), (2, 20), (3, 30)") + .await + .unwrap() + .collect() + .await + .unwrap(); + + // scan_all_files should find the postpone file + let table = catalog + .get_table(&Identifier::new("test_db", "t_postpone")) + .await + .unwrap(); + let plan = table + .new_read_builder() + .new_scan() + .with_scan_all_files() + .plan() + .await + .unwrap(); + let file_count: usize = plan.splits().iter().map(|s| s.data_files().len()).sum(); + assert_eq!(file_count, 1, "scan_all_files should find 1 postpone file"); + + // Normal SELECT should return 0 rows (postpone files are invisible) + let count = row_count(&handler, "SELECT * FROM paimon.test_db.t_postpone").await; + assert_eq!(count, 0, "SELECT should return 0 rows for postpone table"); +} + +/// INSERT OVERWRITE on a postpone table should replace old files with new ones. +#[tokio::test] +async fn test_postpone_insert_overwrite() { + let (_tmp, catalog) = create_test_env(); + let handler = create_handler(catalog.clone()); + handler + .sql("CREATE SCHEMA paimon.test_db") + .await + .expect("CREATE SCHEMA failed"); + + handler + .sql( + "CREATE TABLE paimon.test_db.t_postpone_ow ( + id INT NOT NULL, value INT, + PRIMARY KEY (id) + ) WITH ('bucket' = '-2')", + ) + .await + .unwrap(); + + // First commit + handler + .sql("INSERT INTO paimon.test_db.t_postpone_ow VALUES (1, 10), (2, 20)") + .await + .unwrap() + .collect() + .await + .unwrap(); + + let table = catalog + .get_table(&Identifier::new("test_db", "t_postpone_ow")) + .await + .unwrap(); + let plan = table + .new_read_builder() + .new_scan() + .with_scan_all_files() + .plan() + .await + .unwrap(); + let file_count: usize = plan.splits().iter().map(|s| s.data_files().len()).sum(); + assert_eq!(file_count, 1, "After INSERT: 1 postpone file"); + + // INSERT OVERWRITE should replace old file + handler + .sql("INSERT OVERWRITE paimon.test_db.t_postpone_ow VALUES (3, 30)") + .await + .unwrap() + .collect() + .await + .unwrap(); + + let table = catalog + .get_table(&Identifier::new("test_db", "t_postpone_ow")) + .await + .unwrap(); + let plan = table + .new_read_builder() + .new_scan() + .with_scan_all_files() + .plan() + .await + .unwrap(); + let file_count: usize = plan.splits().iter().map(|s| s.data_files().len()).sum(); + assert_eq!( + file_count, 1, + "After OVERWRITE: only 1 new file (old file deleted)" + ); +} diff --git a/crates/paimon/src/spec/core_options.rs b/crates/paimon/src/spec/core_options.rs index ae9d7cc2..85aa8013 100644 --- a/crates/paimon/src/spec/core_options.rs +++ b/crates/paimon/src/spec/core_options.rs @@ -28,6 +28,11 @@ const BUCKET_KEY_OPTION: &str = "bucket-key"; const BUCKET_FUNCTION_TYPE_OPTION: &str = "bucket-function.type"; const BUCKET_OPTION: &str = "bucket"; const DEFAULT_BUCKET: i32 = -1; +/// Postpone bucket mode: data is written to `bucket-postpone` directory +/// and is invisible to readers until compaction assigns real bucket numbers. +pub const POSTPONE_BUCKET: i32 = -2; +/// Directory name for postpone bucket files. +pub const POSTPONE_BUCKET_DIR: &str = "bucket-postpone"; const COMMIT_MAX_RETRIES_OPTION: &str = "commit.max-retries"; const COMMIT_TIMEOUT_OPTION: &str = "commit.timeout"; const COMMIT_MIN_RETRY_WAIT_OPTION: &str = "commit.min-retry-wait"; @@ -63,6 +68,16 @@ pub enum MergeEngine { FirstRow, } +/// Format the bucket directory name for a given bucket number. +/// Returns `"bucket-postpone"` for `POSTPONE_BUCKET` (-2), otherwise `"bucket-{N}"`. +pub fn bucket_dir_name(bucket: i32) -> String { + if bucket == POSTPONE_BUCKET { + POSTPONE_BUCKET_DIR.to_string() + } else { + format!("bucket-{bucket}") + } +} + /// Typed accessors for common table options. /// /// This mirrors pypaimon's `CoreOptions` pattern while staying lightweight. diff --git a/crates/paimon/src/table/data_file_writer.rs b/crates/paimon/src/table/data_file_writer.rs index cfbc55a5..e58e0e18 100644 --- a/crates/paimon/src/table/data_file_writer.rs +++ b/crates/paimon/src/table/data_file_writer.rs @@ -25,7 +25,7 @@ use crate::arrow::format::{create_format_writer, FormatFileWriter}; use crate::io::FileIO; use crate::spec::stats::BinaryTableStats; -use crate::spec::{DataFileMeta, EMPTY_SERIALIZED_ROW}; +use crate::spec::{bucket_dir_name, DataFileMeta, EMPTY_SERIALIZED_ROW}; use crate::Result; use arrow_array::RecordBatch; use chrono::Utc; @@ -133,11 +133,13 @@ impl DataFileWriter { ); let bucket_dir = if self.partition_path.is_empty() { - format!("{}/bucket-{}", self.table_location, self.bucket) + format!("{}/{}", self.table_location, bucket_dir_name(self.bucket)) } else { format!( - "{}/{}/bucket-{}", - self.table_location, self.partition_path, self.bucket + "{}/{}/{}", + self.table_location, + self.partition_path, + bucket_dir_name(self.bucket) ) }; self.file_io.mkdirs(&format!("{bucket_dir}/")).await?; diff --git a/crates/paimon/src/table/kv_file_writer.rs b/crates/paimon/src/table/kv_file_writer.rs index 3309820b..ef740157 100644 --- a/crates/paimon/src/table/kv_file_writer.rs +++ b/crates/paimon/src/table/kv_file_writer.rs @@ -74,9 +74,6 @@ pub(crate) struct KeyValueWriteConfig { pub sequence_field_indices: Vec, /// Merge engine for deduplication. pub merge_engine: MergeEngine, - /// Column index in user schema that provides the row kind value. - /// Resolved from: `rowkind.field` option > `_VALUE_KIND` column > None (all INSERT). - pub value_kind_col_index: Option, } impl KeyValueFileWriter { @@ -200,23 +197,8 @@ impl KeyValueFileWriter { let min_key = self.extract_key_binary_row(&combined, first_row)?; let max_key = self.extract_key_binary_row(&combined, last_row)?; - // Build physical schema (thin-mode): [_SEQUENCE_NUMBER, _VALUE_KIND, all_user_cols...] - let user_fields = user_schema.fields(); - let mut physical_fields: Vec> = Vec::new(); - physical_fields.push(Arc::new(ArrowField::new( - SEQUENCE_NUMBER_FIELD_NAME, - ArrowDataType::Int64, - false, - ))); - physical_fields.push(Arc::new(ArrowField::new( - VALUE_KIND_FIELD_NAME, - ArrowDataType::Int8, - false, - ))); - for field in user_fields.iter() { - physical_fields.push(field.clone()); - } - let physical_schema = Arc::new(ArrowSchema::new(physical_fields)); + // Build physical schema and open writer. + let physical_schema = build_physical_schema(&user_schema); // Open parquet writer. let file_name = format!( @@ -262,8 +244,13 @@ impl KeyValueFileWriter { }, )?, ); - // Value kind column. - match self.config.value_kind_col_index { + // Value kind column — resolve from batch schema. + let vk_idx = combined + .schema() + .fields() + .iter() + .position(|f| f.name() == crate::spec::VALUE_KIND_FIELD_NAME); + match vk_idx { Some(vk_idx) => { physical_columns.push( arrow_select::take::take( @@ -282,8 +269,11 @@ impl KeyValueFileWriter { physical_columns.push(Arc::new(Int8Array::from(vec![0i8; chunk_len]))); } } - // All user columns. + // All user columns (skip _VALUE_KIND if present — already handled above). for idx in 0..combined.num_columns() { + if Some(idx) == vk_idx { + continue; + } physical_columns.push( arrow_select::take::take(combined.column(idx).as_ref(), &chunk_indices, None) .map_err(|e| crate::Error::DataInvalid { @@ -459,3 +449,24 @@ impl KeyValueFileWriter { Ok(builder.build_serialized()) } } + +/// Build the physical schema: [_SEQUENCE_NUMBER, _VALUE_KIND, user_cols (excluding _VALUE_KIND)...] +pub(crate) fn build_physical_schema(user_schema: &ArrowSchema) -> Arc { + let mut physical_fields: Vec> = Vec::new(); + physical_fields.push(Arc::new(ArrowField::new( + SEQUENCE_NUMBER_FIELD_NAME, + ArrowDataType::Int64, + false, + ))); + physical_fields.push(Arc::new(ArrowField::new( + VALUE_KIND_FIELD_NAME, + ArrowDataType::Int8, + false, + ))); + for field in user_schema.fields().iter() { + if field.name() != VALUE_KIND_FIELD_NAME { + physical_fields.push(field.clone()); + } + } + Arc::new(ArrowSchema::new(physical_fields)) +} diff --git a/crates/paimon/src/table/mod.rs b/crates/paimon/src/table/mod.rs index 8fb8ef33..e63beaf5 100644 --- a/crates/paimon/src/table/mod.rs +++ b/crates/paimon/src/table/mod.rs @@ -29,6 +29,7 @@ mod full_text_search_builder; pub(crate) mod global_index_scanner; mod kv_file_reader; mod kv_file_writer; +mod postpone_file_writer; mod read_builder; pub(crate) mod rest_env; pub(crate) mod row_id_predicate; diff --git a/crates/paimon/src/table/postpone_file_writer.rs b/crates/paimon/src/table/postpone_file_writer.rs new file mode 100644 index 00000000..89daa347 --- /dev/null +++ b/crates/paimon/src/table/postpone_file_writer.rs @@ -0,0 +1,290 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Postpone bucket file writer for primary-key tables with `bucket = -2`. +//! +//! Writes data in KV format (`_SEQUENCE_NUMBER`, `_VALUE_KIND` + user columns) +//! but without sorting or deduplication — compaction assigns real buckets later. +//! +//! Uses a special file naming prefix: `data-u-{commitUser}-s-0-w-`. +//! +//! Reference: [PostponeBucketWriter](https://github.com/apache/paimon/blob/release-1.3/paimon-core/src/main/java/org/apache/paimon/table/sink/PostponeBucketWriter.java) + +use crate::arrow::format::{create_format_writer, FormatFileWriter}; +use crate::io::FileIO; +use crate::spec::stats::BinaryTableStats; +use crate::spec::{bucket_dir_name, DataFileMeta, EMPTY_SERIALIZED_ROW, VALUE_KIND_FIELD_NAME}; +use crate::table::kv_file_writer::build_physical_schema; +use crate::Result; +use arrow_array::{Int64Array, Int8Array, RecordBatch}; +use chrono::Utc; +use std::sync::Arc; +use tokio::task::JoinSet; + +/// Configuration for [`PostponeFileWriter`]. +pub(crate) struct PostponeWriteConfig { + pub table_location: String, + pub partition_path: String, + pub bucket: i32, + pub schema_id: i64, + pub target_file_size: i64, + pub file_compression: String, + pub file_compression_zstd_level: i32, + pub write_buffer_size: i64, + /// Data file name prefix: `"data-u-{commitUser}-s-0-w-"`. + pub data_file_prefix: String, +} + +/// Writer for postpone bucket mode (`bucket = -2`). +/// +/// Streams data directly to a FormatFileWriter in arrival order (no sort/dedup), +/// prepending `_SEQUENCE_NUMBER` and `_VALUE_KIND` columns to each batch. +/// Rolls to a new file when `target_file_size` is reached. +pub(crate) struct PostponeFileWriter { + file_io: FileIO, + config: PostponeWriteConfig, + next_sequence_number: i64, + current_writer: Option>, + current_file_name: Option, + current_row_count: i64, + /// Sequence number at which the current file started. + current_file_start_seq: i64, + written_files: Vec, + /// Background file close tasks spawned during rolling. + in_flight_closes: JoinSet>, +} + +impl PostponeFileWriter { + pub(crate) fn new(file_io: FileIO, config: PostponeWriteConfig) -> Self { + Self { + file_io, + config, + next_sequence_number: 0, + current_writer: None, + current_file_name: None, + current_row_count: 0, + current_file_start_seq: 0, + written_files: Vec::new(), + in_flight_closes: JoinSet::new(), + } + } + + pub(crate) async fn write(&mut self, batch: &RecordBatch) -> Result<()> { + if batch.num_rows() == 0 { + return Ok(()); + } + + if self.current_writer.is_none() { + self.open_new_file(batch.schema()).await?; + } + + let num_rows = batch.num_rows(); + let start_seq = self.next_sequence_number; + let end_seq = start_seq + num_rows as i64 - 1; + self.next_sequence_number = end_seq + 1; + + // Build physical batch: [_SEQUENCE_NUMBER, _VALUE_KIND, all_user_cols...] + let mut physical_columns: Vec> = Vec::new(); + physical_columns.push(Arc::new(Int64Array::from( + (start_seq..=end_seq).collect::>(), + ))); + let vk_idx = batch + .schema() + .fields() + .iter() + .position(|f| f.name() == VALUE_KIND_FIELD_NAME); + match vk_idx { + Some(vk_idx) => physical_columns.push(batch.column(vk_idx).clone()), + None => physical_columns.push(Arc::new(Int8Array::from(vec![0i8; num_rows]))), + } + // All user columns (skip _VALUE_KIND if present — already handled above). + for (i, col) in batch.columns().iter().enumerate() { + if Some(i) == vk_idx { + continue; + } + physical_columns.push(col.clone()); + } + + let physical_schema = build_physical_schema(&batch.schema()); + let physical_batch = + RecordBatch::try_new(physical_schema, physical_columns).map_err(|e| { + crate::Error::DataInvalid { + message: format!("Failed to create physical batch: {e}"), + source: None, + } + })?; + + self.current_row_count += num_rows as i64; + self.current_writer + .as_mut() + .unwrap() + .write(&physical_batch) + .await?; + + // Roll to a new file if target size is reached — close in background + if self.current_writer.as_ref().unwrap().num_bytes() as i64 >= self.config.target_file_size + { + self.roll_file(); + } + + // Flush row group if in-progress buffer exceeds write_buffer_size + if let Some(w) = self.current_writer.as_mut() { + if w.in_progress_size() as i64 >= self.config.write_buffer_size { + w.flush().await?; + } + } + + Ok(()) + } + + pub(crate) async fn prepare_commit(&mut self) -> Result> { + self.close_current_file().await?; + while let Some(result) = self.in_flight_closes.join_next().await { + let meta = result.map_err(|e| crate::Error::DataInvalid { + message: format!("Background file close task panicked: {e}"), + source: None, + })??; + self.written_files.push(meta); + } + Ok(std::mem::take(&mut self.written_files)) + } + + /// Spawn the current writer's close in the background for non-blocking rolling. + fn roll_file(&mut self) { + let writer = match self.current_writer.take() { + Some(w) => w, + None => return, + }; + let file_name = self.current_file_name.take().unwrap(); + let row_count = self.current_row_count; + let min_seq = self.current_file_start_seq; + let max_seq = self.next_sequence_number - 1; + self.current_row_count = 0; + let schema_id = self.config.schema_id; + + self.in_flight_closes.spawn(async move { + let file_size = writer.close().await? as i64; + Ok(build_meta( + file_name, file_size, row_count, min_seq, max_seq, schema_id, + )) + }); + } + + async fn open_new_file(&mut self, user_schema: arrow_schema::SchemaRef) -> Result<()> { + let file_name = format!( + "{}{}-{}.parquet", + self.config.data_file_prefix, + uuid::Uuid::new_v4(), + self.written_files.len() + ); + let bucket_dir = if self.config.partition_path.is_empty() { + format!( + "{}/{}", + self.config.table_location, + bucket_dir_name(self.config.bucket) + ) + } else { + format!( + "{}/{}/{}", + self.config.table_location, + self.config.partition_path, + bucket_dir_name(self.config.bucket) + ) + }; + self.file_io.mkdirs(&format!("{bucket_dir}/")).await?; + let physical_schema = build_physical_schema(&user_schema); + let file_path = format!("{bucket_dir}/{file_name}"); + let output = self.file_io.new_output(&file_path)?; + let writer = create_format_writer( + &output, + physical_schema, + &self.config.file_compression, + self.config.file_compression_zstd_level, + ) + .await?; + self.current_writer = Some(writer); + self.current_file_name = Some(file_name); + self.current_row_count = 0; + self.current_file_start_seq = self.next_sequence_number; + Ok(()) + } + + async fn close_current_file(&mut self) -> Result<()> { + let writer = match self.current_writer.take() { + Some(w) => w, + None => return Ok(()), + }; + let file_name = self.current_file_name.take().unwrap(); + let row_count = self.current_row_count; + self.current_row_count = 0; + let file_size = writer.close().await? as i64; + + let min_seq = self.current_file_start_seq; + let max_seq = self.next_sequence_number - 1; + + let meta = build_meta( + file_name, + file_size, + row_count, + min_seq, + max_seq, + self.config.schema_id, + ); + self.written_files.push(meta); + Ok(()) + } +} + +fn build_meta( + file_name: String, + file_size: i64, + row_count: i64, + min_seq: i64, + max_seq: i64, + schema_id: i64, +) -> DataFileMeta { + DataFileMeta { + file_name, + file_size, + row_count, + min_key: EMPTY_SERIALIZED_ROW.clone(), + max_key: EMPTY_SERIALIZED_ROW.clone(), + key_stats: BinaryTableStats::new( + EMPTY_SERIALIZED_ROW.clone(), + EMPTY_SERIALIZED_ROW.clone(), + vec![], + ), + value_stats: BinaryTableStats::new( + EMPTY_SERIALIZED_ROW.clone(), + EMPTY_SERIALIZED_ROW.clone(), + vec![], + ), + min_sequence_number: min_seq, + max_sequence_number: max_seq, + schema_id, + level: 0, + extra_files: vec![], + creation_time: Some(Utc::now()), + delete_row_count: Some(0), + embedded_index: None, + file_source: Some(0), // FileSource.APPEND + value_stats_cols: Some(vec![]), + external_path: None, + first_row_id: None, + write_cols: None, + } +} diff --git a/crates/paimon/src/table/table_scan.rs b/crates/paimon/src/table/table_scan.rs index 9b2a7e50..e71aae07 100644 --- a/crates/paimon/src/table/table_scan.rs +++ b/crates/paimon/src/table/table_scan.rs @@ -30,8 +30,9 @@ use super::Table; use crate::io::FileIO; use crate::predicate_stats::data_leaf_may_match; use crate::spec::{ - eval_row, BinaryRow, CoreOptions, DataField, DataFileMeta, FileKind, IndexManifest, - ManifestEntry, ManifestFileMeta, PartitionComputer, Predicate, Snapshot, TimeTravelSelector, + bucket_dir_name, eval_row, BinaryRow, CoreOptions, DataField, DataFileMeta, FileKind, + IndexManifest, ManifestEntry, ManifestFileMeta, PartitionComputer, Predicate, Snapshot, + TimeTravelSelector, }; use crate::table::bin_pack::split_for_batch; use crate::table::source::{ @@ -135,6 +136,7 @@ async fn read_all_manifest_entries( table_path: &str, snapshot: &Snapshot, skip_level_zero: bool, + scan_all_files: bool, has_primary_keys: bool, partition_predicate: Option<&Predicate>, partition_fields: &[DataField], @@ -173,7 +175,7 @@ async fn read_all_manifest_entries( if skip_level_zero && has_primary_keys && entry.file().level == 0 { return false; } - if has_primary_keys && entry.bucket() < 0 { + if has_primary_keys && !scan_all_files && entry.bucket() < 0 { return false; } if let Some(pred) = bucket_predicate { @@ -537,6 +539,7 @@ impl<'a> TableScan<'a> { table_path, snapshot, skip_level_zero, + self.scan_all_files, has_primary_keys, self.partition_predicate.as_ref(), &partition_fields, @@ -673,9 +676,9 @@ impl<'a> TableScan<'a> { let bucket_path = if let Some(ref computer) = partition_computer { let partition_path = computer.generate_partition_path(&partition_row)?; - format!("{base_path}/{partition_path}bucket-{bucket}") + format!("{base_path}/{partition_path}{}", bucket_dir_name(bucket)) } else { - format!("{base_path}/bucket-{bucket}") + format!("{base_path}/{}", bucket_dir_name(bucket)) }; // Original `partition` Vec consumed by PartitionBucket for DV map lookup. diff --git a/crates/paimon/src/table/table_write.rs b/crates/paimon/src/table/table_write.rs index 77b8e97d..5084733c 100644 --- a/crates/paimon/src/table/table_write.rs +++ b/crates/paimon/src/table/table_write.rs @@ -24,11 +24,12 @@ use crate::spec::DataFileMeta; use crate::spec::PartitionComputer; use crate::spec::{ extract_datum_from_arrow, BinaryRow, BinaryRowBuilder, CoreOptions, DataField, DataType, Datum, - MergeEngine, Predicate, PredicateBuilder, EMPTY_SERIALIZED_ROW, + MergeEngine, Predicate, PredicateBuilder, EMPTY_SERIALIZED_ROW, POSTPONE_BUCKET, }; use crate::table::commit_message::CommitMessage; use crate::table::data_file_writer::DataFileWriter; use crate::table::kv_file_writer::{KeyValueFileWriter, KeyValueWriteConfig}; +use crate::table::postpone_file_writer::{PostponeFileWriter, PostponeWriteConfig}; use crate::table::{SnapshotManager, Table, TableScan}; use crate::Result; use arrow_array::RecordBatch; @@ -43,10 +44,11 @@ fn schema_contains_blob_type(fields: &[DataField]) -> bool { .any(|field| field.data_type().contains_blob_type()) } -/// Enum to hold either an append-only writer or a key-value writer. +/// Enum to hold either an append-only writer, a key-value writer, or a postpone writer. enum FileWriter { Append(DataFileWriter), KeyValue(KeyValueFileWriter), + Postpone(PostponeFileWriter), } impl FileWriter { @@ -54,6 +56,7 @@ impl FileWriter { match self { FileWriter::Append(w) => w.write(batch).await, FileWriter::KeyValue(w) => w.write(batch).await, + FileWriter::Postpone(w) => w.write(batch).await, } } @@ -61,6 +64,7 @@ impl FileWriter { match self { FileWriter::Append(ref mut w) => w.prepare_commit().await, FileWriter::KeyValue(ref mut w) => w.prepare_commit().await, + FileWriter::Postpone(ref mut w) => w.prepare_commit().await, } } } @@ -95,14 +99,14 @@ pub struct TableWrite { sequence_field_indices: Vec, /// Merge engine for primary-key tables. merge_engine: MergeEngine, - /// Column index in user schema for row kind (resolved from rowkind.field or _VALUE_KIND). - value_kind_col_index: Option, /// Cache of per-partition bucket→max_sequence_number, lazily populated on first write. partition_seq_cache: HashMap, HashMap>, + /// Commit user identifier, used for postpone file naming. + commit_user: String, } impl TableWrite { - pub(crate) fn new(table: &Table) -> crate::Result { + pub(crate) fn new(table: &Table, commit_user: String) -> crate::Result { let schema = table.schema(); let core_options = CoreOptions::new(schema.options()); @@ -124,16 +128,17 @@ impl TableWrite { let has_primary_keys = !schema.primary_keys().is_empty(); if has_primary_keys { - if total_buckets < 1 { + if total_buckets < 1 && total_buckets != POSTPONE_BUCKET { return Err(crate::Error::Unsupported { message: format!( - "KeyValueFileWriter does not support bucket={total_buckets}, only fixed bucket (>= 1) is supported" + "KeyValueFileWriter does not support bucket={total_buckets}, only fixed bucket (>= 1) or postpone bucket (= -2) is supported" ), }); } - if core_options - .changelog_producer() - .eq_ignore_ascii_case("input") + if total_buckets != POSTPONE_BUCKET + && core_options + .changelog_producer() + .eq_ignore_ascii_case("input") { return Err(crate::Error::Unsupported { message: "KeyValueFileWriter does not support changelog-producer=input" @@ -200,11 +205,6 @@ impl TableWrite { }); } - // Resolve value_kind column from _VALUE_KIND in user schema, if present. - let value_kind_col_index = fields - .iter() - .position(|f| f.name() == crate::spec::VALUE_KIND_FIELD_NAME); - Ok(Self { table: table.clone(), partition_writers: HashMap::new(), @@ -222,8 +222,8 @@ impl TableWrite { primary_key_types, sequence_field_indices, merge_engine, - value_kind_col_index, partition_seq_cache: HashMap::new(), + commit_user, }) } @@ -297,7 +297,15 @@ impl TableWrite { ) -> Result> { // Fast path: no partitions and single bucket — skip per-row routing if self.partition_field_indices.is_empty() && self.total_buckets <= 1 { - return Ok(vec![((EMPTY_SERIALIZED_ROW.clone(), 0), batch.clone())]); + let bucket = if self.total_buckets == POSTPONE_BUCKET { + POSTPONE_BUCKET + } else { + 0 + }; + return Ok(vec![( + (EMPTY_SERIALIZED_ROW.clone(), bucket), + batch.clone(), + )]); } let fields = self.table.schema().fields(); @@ -412,7 +420,9 @@ impl TableWrite { }; // Compute bucket - let bucket = if self.total_buckets <= 1 || self.bucket_key_indices.is_empty() { + let bucket = if self.total_buckets == POSTPONE_BUCKET { + POSTPONE_BUCKET + } else if self.total_buckets <= 1 || self.bucket_key_indices.is_empty() { 0 } else { let mut datums: Vec<(Option, DataType)> = Vec::new(); @@ -438,6 +448,7 @@ impl TableWrite { }; let writer = if self.primary_key_indices.is_empty() { + // Append-only writer for non-PK tables. FileWriter::Append(DataFileWriter::new( self.table.file_io().clone(), self.table.location().to_string(), @@ -452,6 +463,23 @@ impl TableWrite { None, // first_row_id: assigned by commit None, // write_cols: full-row write )) + } else if bucket == POSTPONE_BUCKET { + // Postpone bucket: KV format but no sorting/dedup, special file naming. + let data_file_prefix = format!("data-u-{}-s-0-w-", self.commit_user); + FileWriter::Postpone(PostponeFileWriter::new( + self.table.file_io().clone(), + PostponeWriteConfig { + table_location: self.table.location().to_string(), + partition_path, + bucket, + schema_id: self.schema_id, + target_file_size: self.target_file_size, + file_compression: self.file_compression.clone(), + file_compression_zstd_level: self.file_compression_zstd_level, + write_buffer_size: self.write_buffer_size, + data_file_prefix, + }, + )) } else { // Lazily scan partition sequence numbers on first writer creation per partition. if !self.partition_seq_cache.contains_key(&partition_bytes) { @@ -481,7 +509,6 @@ impl TableWrite { primary_key_types: self.primary_key_types.clone(), sequence_field_indices: self.sequence_field_indices.clone(), merge_engine: self.merge_engine, - value_kind_col_index: self.value_kind_col_index, }, next_seq, )) @@ -504,6 +531,7 @@ mod tests { }; use crate::table::{SnapshotManager, TableCommit}; use arrow_array::Int32Array; + use arrow_array::RecordBatchReader as _; use arrow_schema::{ DataType as ArrowDataType, Field as ArrowField, Schema as ArrowSchema, TimeUnit, }; @@ -624,7 +652,7 @@ mod tests { setup_dirs(&file_io, table_path).await; let table = test_table(&file_io, table_path); - let mut table_write = TableWrite::new(&table).unwrap(); + let mut table_write = TableWrite::new(&table, "test-user".to_string()).unwrap(); let batch = make_batch(vec![1, 2, 3], vec![10, 20, 30]); table_write.write_arrow_batch(&batch).await.unwrap(); @@ -655,7 +683,9 @@ mod tests { None, ); - let err = TableWrite::new(&table).err().unwrap(); + let err = TableWrite::new(&table, "test-user".to_string()) + .err() + .unwrap(); assert!( matches!(err, crate::Error::Unsupported { message } if message.contains("BlobType")) ); @@ -671,7 +701,9 @@ mod tests { None, ); - let err = TableWrite::new(&table).err().unwrap(); + let err = TableWrite::new(&table, "test-user".to_string()) + .err() + .unwrap(); assert!( matches!(err, crate::Error::Unsupported { message } if message.contains("BlobType")) ); @@ -684,7 +716,7 @@ mod tests { setup_dirs(&file_io, table_path).await; let table = test_partitioned_table(&file_io, table_path); - let mut table_write = TableWrite::new(&table).unwrap(); + let mut table_write = TableWrite::new(&table, "test-user".to_string()).unwrap(); let batch = make_partitioned_batch(vec!["a", "b", "a"], vec![1, 2, 3]); table_write.write_arrow_batch(&batch).await.unwrap(); @@ -715,7 +747,7 @@ mod tests { let file_io = test_file_io(); let table_path = "memory:/test_table_write_empty"; let table = test_table(&file_io, table_path); - let mut table_write = TableWrite::new(&table).unwrap(); + let mut table_write = TableWrite::new(&table, "test-user".to_string()).unwrap(); let batch = make_batch(vec![], vec![]); table_write.write_arrow_batch(&batch).await.unwrap(); @@ -731,7 +763,7 @@ mod tests { setup_dirs(&file_io, table_path).await; let table = test_table(&file_io, table_path); - let mut table_write = TableWrite::new(&table).unwrap(); + let mut table_write = TableWrite::new(&table, "test-user".to_string()).unwrap(); // First write + prepare_commit table_write @@ -763,7 +795,7 @@ mod tests { setup_dirs(&file_io, table_path).await; let table = test_table(&file_io, table_path); - let mut table_write = TableWrite::new(&table).unwrap(); + let mut table_write = TableWrite::new(&table, "test-user".to_string()).unwrap(); table_write .write_arrow_batch(&make_batch(vec![1, 2], vec![10, 20])) @@ -827,7 +859,7 @@ mod tests { setup_dirs(&file_io, table_path).await; let table = test_bucketed_table(&file_io, table_path); - let mut table_write = TableWrite::new(&table).unwrap(); + let mut table_write = TableWrite::new(&table, "test-user".to_string()).unwrap(); // Row with NULL bucket key should not panic let batch = make_nullable_id_batch(vec![None, Some(1), None], vec![10, 20, 30]); @@ -849,7 +881,7 @@ mod tests { setup_dirs(&file_io, table_path).await; let table = test_bucketed_table(&file_io, table_path); - let mut table_write = TableWrite::new(&table).unwrap(); + let mut table_write = TableWrite::new(&table, "test-user".to_string()).unwrap(); // Two NULLs should land in the same bucket let batch = make_nullable_id_batch(vec![None, None], vec![10, 20]); @@ -877,7 +909,7 @@ mod tests { // Compute bucket for NULL key let fields = table.schema().fields().to_vec(); - let tw = TableWrite::new(&table).unwrap(); + let tw = TableWrite::new(&table, "test-user".to_string()).unwrap(); let batch_null = make_nullable_id_batch(vec![None], vec![10]); let (_, bucket_null) = tw @@ -942,7 +974,7 @@ mod tests { None, ); - let tw = TableWrite::new(&table).unwrap(); + let tw = TableWrite::new(&table, "test-user".to_string()).unwrap(); let fields = table.schema().fields().to_vec(); // Build a batch: d=NULL, ltz=NULL, ntz=NULL, k=1 @@ -1016,7 +1048,7 @@ mod tests { None, ); - let mut table_write = TableWrite::new(&table).unwrap(); + let mut table_write = TableWrite::new(&table, "test-user".to_string()).unwrap(); // Write multiple batches — each should roll to a new file table_write @@ -1069,7 +1101,7 @@ mod tests { setup_dirs(&file_io, table_path).await; let table = test_pk_table(&file_io, table_path); - let mut table_write = TableWrite::new(&table).unwrap(); + let mut table_write = TableWrite::new(&table, "test-user".to_string()).unwrap(); let batch = make_batch(vec![3, 1, 2], vec![30, 10, 20]); table_write.write_arrow_batch(&batch).await.unwrap(); @@ -1104,7 +1136,7 @@ mod tests { setup_dirs(&file_io, table_path).await; let table = test_pk_table(&file_io, table_path); - let mut table_write = TableWrite::new(&table).unwrap(); + let mut table_write = TableWrite::new(&table, "test-user".to_string()).unwrap(); // Write unsorted data let batch = make_batch(vec![5, 2, 4, 1, 3], vec![50, 20, 40, 10, 30]); @@ -1162,7 +1194,7 @@ mod tests { let table = test_pk_table(&file_io, table_path); // First commit: id=1,2,3 - let mut tw1 = TableWrite::new(&table).unwrap(); + let mut tw1 = TableWrite::new(&table, "test-user".to_string()).unwrap(); tw1.write_arrow_batch(&make_batch(vec![1, 2, 3], vec![10, 20, 30])) .await .unwrap(); @@ -1171,7 +1203,7 @@ mod tests { commit.commit(msgs1).await.unwrap(); // Second commit: id=2,3,4 with updated values, higher sequence numbers - let mut tw2 = TableWrite::new(&table).unwrap(); + let mut tw2 = TableWrite::new(&table, "test-user".to_string()).unwrap(); tw2.write_arrow_batch(&make_batch(vec![2, 3, 4], vec![200, 300, 400])) .await .unwrap(); @@ -1224,7 +1256,7 @@ mod tests { setup_dirs(&file_io, table_path).await; let table = test_pk_table(&file_io, table_path); - let mut table_write = TableWrite::new(&table).unwrap(); + let mut table_write = TableWrite::new(&table, "test-user".to_string()).unwrap(); let batch = make_batch(vec![1, 2], vec![10, 20]); table_write.write_arrow_batch(&batch).await.unwrap(); @@ -1235,4 +1267,274 @@ mod tests { assert_eq!(file.min_sequence_number, 0); assert_eq!(file.max_sequence_number, 1); } + + // ----------------------------------------------------------------------- + // Postpone bucket (bucket = -2) write tests + // ----------------------------------------------------------------------- + + fn test_postpone_pk_schema() -> TableSchema { + let schema = Schema::builder() + .column("id", DataType::Int(IntType::new())) + .column("value", DataType::Int(IntType::new())) + .primary_key(["id"]) + .option("bucket", "-2") + .build() + .unwrap(); + TableSchema::new(0, &schema) + } + + fn test_postpone_pk_table(file_io: &FileIO, table_path: &str) -> Table { + Table::new( + file_io.clone(), + Identifier::new("default", "test_postpone_table"), + table_path.to_string(), + test_postpone_pk_schema(), + None, + ) + } + + fn test_postpone_partitioned_schema() -> TableSchema { + let schema = Schema::builder() + .column("pt", DataType::VarChar(VarCharType::string_type())) + .column("id", DataType::Int(IntType::new())) + .column("value", DataType::Int(IntType::new())) + .primary_key(["pt", "id"]) + .partition_keys(["pt"]) + .option("bucket", "-2") + .build() + .unwrap(); + TableSchema::new(0, &schema) + } + + fn test_postpone_partitioned_table(file_io: &FileIO, table_path: &str) -> Table { + Table::new( + file_io.clone(), + Identifier::new("default", "test_postpone_table"), + table_path.to_string(), + test_postpone_partitioned_schema(), + None, + ) + } + + fn make_partitioned_batch_3col(pts: Vec<&str>, ids: Vec, values: Vec) -> RecordBatch { + let schema = Arc::new(ArrowSchema::new(vec![ + ArrowField::new("pt", ArrowDataType::Utf8, false), + ArrowField::new("id", ArrowDataType::Int32, false), + ArrowField::new("value", ArrowDataType::Int32, false), + ])); + RecordBatch::try_new( + schema, + vec![ + Arc::new(arrow_array::StringArray::from(pts)), + Arc::new(Int32Array::from(ids)), + Arc::new(Int32Array::from(values)), + ], + ) + .unwrap() + } + + #[tokio::test] + async fn test_postpone_write_and_commit() { + let file_io = test_file_io(); + let table_path = "memory:/test_postpone_write"; + setup_dirs(&file_io, table_path).await; + + let table = test_postpone_pk_table(&file_io, table_path); + let mut table_write = TableWrite::new(&table, "test-user".to_string()).unwrap(); + + let batch = make_batch(vec![3, 1, 2], vec![30, 10, 20]); + table_write.write_arrow_batch(&batch).await.unwrap(); + + let messages = table_write.prepare_commit().await.unwrap(); + assert_eq!(messages.len(), 1); + assert_eq!(messages[0].bucket, POSTPONE_BUCKET); + assert_eq!(messages[0].new_files.len(), 1); + assert_eq!(messages[0].new_files[0].row_count, 3); + + // Commit and verify snapshot + let commit = TableCommit::new(table, "test-user".to_string()); + commit.commit(messages).await.unwrap(); + + let snap_manager = SnapshotManager::new(file_io.clone(), table_path.to_string()); + let snapshot = snap_manager.get_latest_snapshot().await.unwrap().unwrap(); + assert_eq!(snapshot.id(), 1); + assert_eq!(snapshot.total_record_count(), Some(3)); + } + + #[tokio::test] + async fn test_postpone_write_empty_batch() { + let file_io = test_file_io(); + let table_path = "memory:/test_postpone_empty"; + let table = test_postpone_pk_table(&file_io, table_path); + let mut table_write = TableWrite::new(&table, "test-user".to_string()).unwrap(); + + let batch = make_batch(vec![], vec![]); + table_write.write_arrow_batch(&batch).await.unwrap(); + + let messages = table_write.prepare_commit().await.unwrap(); + assert!(messages.is_empty()); + } + + #[tokio::test] + async fn test_postpone_write_multiple_batches() { + let file_io = test_file_io(); + let table_path = "memory:/test_postpone_multi"; + setup_dirs(&file_io, table_path).await; + + let table = test_postpone_pk_table(&file_io, table_path); + let mut table_write = TableWrite::new(&table, "test-user".to_string()).unwrap(); + + table_write + .write_arrow_batch(&make_batch(vec![1, 2], vec![10, 20])) + .await + .unwrap(); + table_write + .write_arrow_batch(&make_batch(vec![3, 4], vec![30, 40])) + .await + .unwrap(); + + let messages = table_write.prepare_commit().await.unwrap(); + assert_eq!(messages.len(), 1); + assert_eq!(messages[0].bucket, POSTPONE_BUCKET); + + let total_rows: i64 = messages[0].new_files.iter().map(|f| f.row_count).sum(); + assert_eq!(total_rows, 4); + } + + #[tokio::test] + async fn test_postpone_write_partitioned() { + let file_io = test_file_io(); + let table_path = "memory:/test_postpone_partitioned"; + setup_dirs(&file_io, table_path).await; + + let table = test_postpone_partitioned_table(&file_io, table_path); + let mut table_write = TableWrite::new(&table, "test-user".to_string()).unwrap(); + + let batch = + make_partitioned_batch_3col(vec!["a", "b", "a"], vec![1, 2, 3], vec![10, 20, 30]); + table_write.write_arrow_batch(&batch).await.unwrap(); + + let messages = table_write.prepare_commit().await.unwrap(); + // 2 partitions + assert_eq!(messages.len(), 2); + // All messages should use POSTPONE_BUCKET + for msg in &messages { + assert_eq!(msg.bucket, POSTPONE_BUCKET); + } + + let total_rows: i64 = messages + .iter() + .flat_map(|m| &m.new_files) + .map(|f| f.row_count) + .sum(); + assert_eq!(total_rows, 3); + + // Commit and verify + let commit = TableCommit::new(table, "test-user".to_string()); + commit.commit(messages).await.unwrap(); + + let snap_manager = SnapshotManager::new(file_io.clone(), table_path.to_string()); + let snapshot = snap_manager.get_latest_snapshot().await.unwrap().unwrap(); + assert_eq!(snapshot.id(), 1); + assert_eq!(snapshot.total_record_count(), Some(3)); + } + + #[tokio::test] + async fn test_postpone_write_reusable() { + let file_io = test_file_io(); + let table_path = "memory:/test_postpone_reuse"; + setup_dirs(&file_io, table_path).await; + + let table = test_postpone_pk_table(&file_io, table_path); + let mut table_write = TableWrite::new(&table, "test-user".to_string()).unwrap(); + + // First write + prepare_commit + table_write + .write_arrow_batch(&make_batch(vec![1, 2], vec![10, 20])) + .await + .unwrap(); + let messages1 = table_write.prepare_commit().await.unwrap(); + assert_eq!(messages1.len(), 1); + assert_eq!(messages1[0].new_files[0].row_count, 2); + + // Second write + prepare_commit (reuse) + table_write + .write_arrow_batch(&make_batch(vec![3, 4, 5], vec![30, 40, 50])) + .await + .unwrap(); + let messages2 = table_write.prepare_commit().await.unwrap(); + assert_eq!(messages2.len(), 1); + assert_eq!(messages2[0].new_files[0].row_count, 3); + + // Empty prepare_commit + let messages3 = table_write.prepare_commit().await.unwrap(); + assert!(messages3.is_empty()); + } + + #[tokio::test] + async fn test_postpone_write_file_naming_and_kv_format() { + let file_io = test_file_io(); + let table_path = "memory:/test_postpone_kv"; + setup_dirs(&file_io, table_path).await; + + let table = test_postpone_pk_table(&file_io, table_path); + let mut table_write = TableWrite::new(&table, "my-commit-user".to_string()).unwrap(); + + let batch = make_batch(vec![3, 1, 2], vec![30, 10, 20]); + table_write.write_arrow_batch(&batch).await.unwrap(); + + let messages = table_write.prepare_commit().await.unwrap(); + let file = &messages[0].new_files[0]; + + // Verify postpone file naming: data-u-{commitUser}-s-{writeId}-w-{uuid}-{index}.parquet + assert!( + file.file_name.starts_with("data-u-my-commit-user-s-"), + "Expected postpone file prefix, got: {}", + file.file_name + ); + assert!( + file.file_name.contains("-w-"), + "Expected -w- in file name, got: {}", + file.file_name + ); + + // Verify KV format: read the parquet file and check physical columns + let bucket_dir = format!("{table_path}/bucket-postpone"); + let file_path = format!("{bucket_dir}/{}", file.file_name); + let input = file_io.new_input(&file_path).unwrap(); + let data = input.read().await.unwrap(); + let reader = + parquet::arrow::arrow_reader::ParquetRecordBatchReader::try_new(data, 1024).unwrap(); + let schema = reader.schema(); + // Physical schema: [_SEQUENCE_NUMBER, _VALUE_KIND, id, value] + assert_eq!(schema.fields().len(), 4); + assert_eq!(schema.field(0).name(), "_SEQUENCE_NUMBER"); + assert_eq!(schema.field(1).name(), "_VALUE_KIND"); + assert_eq!(schema.field(2).name(), "id"); + assert_eq!(schema.field(3).name(), "value"); + + // Data should be in arrival order (not sorted by PK): 3, 1, 2 + let batches: Vec = reader.into_iter().map(|r| r.unwrap()).collect(); + let ids: Vec = batches + .iter() + .flat_map(|b| { + b.column(2) + .as_any() + .downcast_ref::() + .unwrap() + .values() + .iter() + .copied() + }) + .collect(); + assert_eq!( + ids, + vec![3, 1, 2], + "Postpone mode should preserve arrival order" + ); + + // Empty key stats for postpone mode + assert_eq!(file.min_key, EMPTY_SERIALIZED_ROW.clone()); + assert_eq!(file.max_key, EMPTY_SERIALIZED_ROW.clone()); + } } diff --git a/crates/paimon/src/table/write_builder.rs b/crates/paimon/src/table/write_builder.rs index 57a4820c..9f238d65 100644 --- a/crates/paimon/src/table/write_builder.rs +++ b/crates/paimon/src/table/write_builder.rs @@ -49,6 +49,6 @@ impl<'a> WriteBuilder<'a> { /// For primary-key tables, sequence numbers are lazily scanned per partition /// when the first writer for that partition is created. pub fn new_write(&self) -> crate::Result { - TableWrite::new(self.table) + TableWrite::new(self.table, self.commit_user.clone()) } } From b393c46e096e35429507563f784da197c779e534 Mon Sep 17 00:00:00 2001 From: JingsongLi Date: Thu, 16 Apr 2026 15:51:40 +0800 Subject: [PATCH 2/2] Fix comment --- .../paimon/src/table/postpone_file_writer.rs | 21 ++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/crates/paimon/src/table/postpone_file_writer.rs b/crates/paimon/src/table/postpone_file_writer.rs index 89daa347..af7cb064 100644 --- a/crates/paimon/src/table/postpone_file_writer.rs +++ b/crates/paimon/src/table/postpone_file_writer.rs @@ -31,7 +31,7 @@ use crate::spec::{bucket_dir_name, DataFileMeta, EMPTY_SERIALIZED_ROW, VALUE_KIN use crate::table::kv_file_writer::build_physical_schema; use crate::Result; use arrow_array::{Int64Array, Int8Array, RecordBatch}; -use chrono::Utc; +use chrono::{DateTime, Utc}; use std::sync::Arc; use tokio::task::JoinSet; @@ -63,6 +63,8 @@ pub(crate) struct PostponeFileWriter { current_row_count: i64, /// Sequence number at which the current file started. current_file_start_seq: i64, + /// Timestamp captured when the current file was opened (used for deterministic replay order). + current_file_creation_time: DateTime, written_files: Vec, /// Background file close tasks spawned during rolling. in_flight_closes: JoinSet>, @@ -78,6 +80,7 @@ impl PostponeFileWriter { current_file_name: None, current_row_count: 0, current_file_start_seq: 0, + current_file_creation_time: Utc::now(), written_files: Vec::new(), in_flight_closes: JoinSet::new(), } @@ -175,11 +178,20 @@ impl PostponeFileWriter { let max_seq = self.next_sequence_number - 1; self.current_row_count = 0; let schema_id = self.config.schema_id; + // Capture creation_time from when the file was opened, not when the async close finishes. + // Java's postpone compaction sorts by creationTime for replay order. + let creation_time = self.current_file_creation_time; self.in_flight_closes.spawn(async move { let file_size = writer.close().await? as i64; Ok(build_meta( - file_name, file_size, row_count, min_seq, max_seq, schema_id, + file_name, + file_size, + row_count, + min_seq, + max_seq, + schema_id, + creation_time, )) }); } @@ -220,6 +232,7 @@ impl PostponeFileWriter { self.current_file_name = Some(file_name); self.current_row_count = 0; self.current_file_start_seq = self.next_sequence_number; + self.current_file_creation_time = Utc::now(); Ok(()) } @@ -243,6 +256,7 @@ impl PostponeFileWriter { min_seq, max_seq, self.config.schema_id, + self.current_file_creation_time, ); self.written_files.push(meta); Ok(()) @@ -256,6 +270,7 @@ fn build_meta( min_seq: i64, max_seq: i64, schema_id: i64, + creation_time: DateTime, ) -> DataFileMeta { DataFileMeta { file_name, @@ -278,7 +293,7 @@ fn build_meta( schema_id, level: 0, extra_files: vec![], - creation_time: Some(Utc::now()), + creation_time: Some(creation_time), delete_row_count: Some(0), embedded_index: None, file_source: Some(0), // FileSource.APPEND