diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml index d6d931c86d..b94a92df35 100644 --- a/crates/iceberg/Cargo.toml +++ b/crates/iceberg/Cargo.toml @@ -100,6 +100,10 @@ regex = { workspace = true } tempfile = { workspace = true } minijinja = { workspace = true } +[[example]] +name = "compaction_benchmark" +path = "examples/compaction_benchmark.rs" + [package.metadata.cargo-machete] # These dependencies are added to ensure minimal dependency version ignored = ["tap"] diff --git a/crates/iceberg/examples/compaction_benchmark.rs b/crates/iceberg/examples/compaction_benchmark.rs new file mode 100644 index 0000000000..973b131e39 --- /dev/null +++ b/crates/iceberg/examples/compaction_benchmark.rs @@ -0,0 +1,324 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Native compaction benchmark using iceberg-rust + DataFusion. +//! +//! Demonstrates the full native compaction pipeline: +//! 1. Generate N small Parquet files (simulate micro-batch fragmentation) +//! 2. Read all files via iceberg-rust scan (Arrow RecordBatch stream) +//! 3. Write a single compacted Parquet file via iceberg-rust ParquetWriter +//! 4. Commit replacement via ReplaceDataFilesAction +//! 5. Verify the compacted table via scan +//! +//! Compare these timings against Spark's SparkBinPackFileRewriteRunner. + +use std::collections::HashMap; +use std::sync::Arc; +use std::time::Instant; + +use arrow_array::{ArrayRef, Int64Array, RecordBatch, StringArray}; +use arrow_schema::SchemaRef as ArrowSchemaRef; +use futures::TryStreamExt; +use iceberg::arrow::schema_to_arrow_schema; +use iceberg::memory::{MemoryCatalogBuilder, MEMORY_CATALOG_WAREHOUSE}; +use iceberg::spec::{ + NestedField, PartitionSpec, PrimitiveType, Schema, SortOrder, Type, +}; +use iceberg::transaction::{ApplyTransactionAction, Transaction}; +use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder; +use iceberg::writer::file_writer::location_generator::{ + DefaultFileNameGenerator, DefaultLocationGenerator, +}; +use iceberg::writer::file_writer::rolling_writer::RollingFileWriterBuilder; +use iceberg::writer::file_writer::ParquetWriterBuilder; +use iceberg::writer::{IcebergWriter, IcebergWriterBuilder}; +use iceberg::{Catalog, CatalogBuilder, TableCreation}; +use parquet::file::properties::WriterProperties; + +fn create_schema() -> Schema { + Schema::builder() + .with_schema_id(0) + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(), + NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(), + NestedField::required(3, "value", Type::Primitive(PrimitiveType::Long)).into(), + NestedField::required(4, "category", Type::Primitive(PrimitiveType::String)).into(), + NestedField::required(5, "ts", Type::Primitive(PrimitiveType::Long)).into(), + ]) + .build() + .unwrap() +} + +fn generate_batch(arrow_schema: &ArrowSchemaRef, start_id: i64, num_rows: usize) -> RecordBatch { + let ids: Vec = (start_id..start_id + num_rows as i64).collect(); + let names: Vec = ids.iter().map(|i| format!("name_{}", i)).collect(); + let values: Vec = ids.iter().map(|i| i * 100).collect(); + let categories: Vec = ids.iter().map(|i| format!("cat_{}", i % 10)).collect(); + let timestamps: Vec = ids.iter().map(|i| 1700000000 + i).collect(); + + let columns: Vec = vec![ + Arc::new(Int64Array::from(ids)), + Arc::new(StringArray::from(names)), + Arc::new(Int64Array::from(values)), + Arc::new(StringArray::from(categories)), + Arc::new(Int64Array::from(timestamps)), + ]; + + RecordBatch::try_new(arrow_schema.clone(), columns).unwrap() +} + +async fn run_benchmark(num_files: usize, rows_per_file: usize) { + let total_rows = num_files * rows_per_file; + println!("=========================================================================="); + println!( + "Native Compaction Benchmark: {} files x {} rows = {} total rows", + num_files, rows_per_file, total_rows + ); + println!("=========================================================================="); + + // Setup catalog with temp directory + let temp_dir = tempfile::TempDir::new().unwrap(); + let warehouse_path = temp_dir.path().to_str().unwrap().to_string(); + + let catalog = MemoryCatalogBuilder::default() + .load( + "bench_catalog", + HashMap::from([(MEMORY_CATALOG_WAREHOUSE.to_string(), warehouse_path)]), + ) + .await + .unwrap(); + + let ns = iceberg::NamespaceIdent::new("bench_ns".to_string()); + catalog + .create_namespace(&ns, HashMap::new()) + .await + .unwrap(); + + let schema = create_schema(); + let table_creation = TableCreation::builder() + .name("fragmented_table".to_string()) + .schema(schema.clone()) + .partition_spec(PartitionSpec::unpartition_spec()) + .sort_order(SortOrder::unsorted_order()) + .build(); + + let mut table = catalog.create_table(&ns, table_creation).await.unwrap(); + + // Derive Arrow schema from Iceberg schema (includes field ID metadata) + let arrow_schema: ArrowSchemaRef = Arc::new( + schema_to_arrow_schema(table.metadata().current_schema()).unwrap(), + ); + + // Phase 1: Write N small files (simulating micro-batch ingestion) + let write_start = Instant::now(); + let mut all_data_files = Vec::new(); + + for file_idx in 0..num_files { + let start_id = (file_idx * rows_per_file) as i64; + let batch = generate_batch(&arrow_schema, start_id, rows_per_file); + + let location_gen = + DefaultLocationGenerator::new(table.metadata().clone()).unwrap(); + let file_name_gen = DefaultFileNameGenerator::new( + format!("frag_{:04}", file_idx), + None, + iceberg::spec::DataFileFormat::Parquet, + ); + let pw_builder = ParquetWriterBuilder::new( + WriterProperties::default(), + table.metadata().current_schema().clone(), + ); + let rolling_builder = RollingFileWriterBuilder::new_with_default_file_size( + pw_builder, + table.file_io().clone(), + location_gen, + file_name_gen, + ); + let dfw_builder = DataFileWriterBuilder::new(rolling_builder); + let mut writer = dfw_builder.build(None).await.unwrap(); + writer.write(batch).await.unwrap(); + let data_files = writer.close().await.unwrap(); + all_data_files.extend(data_files); + } + + // Commit all small files in a single transaction + let tx = Transaction::new(&table); + let action = tx.fast_append().add_data_files(all_data_files.clone()); + let tx = action.apply(tx).unwrap(); + table = tx.commit(&catalog).await.unwrap(); + let write_elapsed = write_start.elapsed(); + println!( + "Phase 1 - Write {} small files: {:>8.1} ms", + num_files, + write_elapsed.as_secs_f64() * 1000.0 + ); + println!( + " Files committed: {}, total rows: {}", + all_data_files.len(), + total_rows + ); + + // Phase 2: Read all files (scan) -- this is the compaction READ path + let scan_start = Instant::now(); + let scan = table.scan().select_all().build().unwrap(); + let stream = scan.to_arrow().await.unwrap(); + let batches: Vec = stream.try_collect().await.unwrap(); + let scan_elapsed = scan_start.elapsed(); + + let scanned_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + println!( + "Phase 2 - Scan all files (read path): {:>8.1} ms ({} rows, {} batches)", + scan_elapsed.as_secs_f64() * 1000.0, + scanned_rows, + batches.len() + ); + + // Phase 3: Write compacted file -- this is the compaction WRITE path + let compact_write_start = Instant::now(); + let location_gen = + DefaultLocationGenerator::new(table.metadata().clone()).unwrap(); + let file_name_gen = DefaultFileNameGenerator::new( + "compacted".to_string(), + None, + iceberg::spec::DataFileFormat::Parquet, + ); + let pw_builder = ParquetWriterBuilder::new( + WriterProperties::default(), + table.metadata().current_schema().clone(), + ); + let rolling_builder = RollingFileWriterBuilder::new_with_default_file_size( + pw_builder, + table.file_io().clone(), + location_gen, + file_name_gen, + ); + let dfw_builder = DataFileWriterBuilder::new(rolling_builder); + let mut compact_writer = dfw_builder.build(None).await.unwrap(); + + for batch in &batches { + compact_writer.write(batch.clone()).await.unwrap(); + } + let compacted_data_files = compact_writer.close().await.unwrap(); + let compact_write_elapsed = compact_write_start.elapsed(); + println!( + "Phase 3 - Write compacted file: {:>8.1} ms ({} output files)", + compact_write_elapsed.as_secs_f64() * 1000.0, + compacted_data_files.len() + ); + + // Phase 4: Commit replacement via ReplaceDataFilesAction + let commit_start = Instant::now(); + let snapshot_id = table + .metadata() + .current_snapshot() + .unwrap() + .snapshot_id(); + + let tx = Transaction::new(&table); + let action = tx + .replace_data_files() + .validate_from_snapshot(snapshot_id) + .delete_files(all_data_files) + .add_files(compacted_data_files); + let tx = action.apply(tx).unwrap(); + table = tx.commit(&catalog).await.unwrap(); + let commit_elapsed = commit_start.elapsed(); + println!( + "Phase 4 - Commit replacement: {:>8.1} ms", + commit_elapsed.as_secs_f64() * 1000.0 + ); + + // Phase 5: Verify by scanning compacted table + let verify_start = Instant::now(); + let scan = table.scan().select_all().build().unwrap(); + let stream = scan.to_arrow().await.unwrap(); + let verify_batches: Vec = stream.try_collect().await.unwrap(); + let verify_rows: usize = verify_batches.iter().map(|b| b.num_rows()).sum(); + let verify_elapsed = verify_start.elapsed(); + println!( + "Phase 5 - Verify (scan compacted): {:>8.1} ms ({} rows)", + verify_elapsed.as_secs_f64() * 1000.0, + verify_rows + ); + + assert_eq!( + verify_rows, total_rows, + "Row count mismatch after compaction" + ); + + let total_compaction = scan_elapsed + compact_write_elapsed + commit_elapsed; + println!("--------------------------------------------------------------------------"); + println!( + "Total compaction time (read+write+commit): {:>8.1} ms", + total_compaction.as_secs_f64() * 1000.0 + ); + println!( + " Read: {:>6.1} ms ({:.0}%)", + scan_elapsed.as_secs_f64() * 1000.0, + scan_elapsed.as_secs_f64() / total_compaction.as_secs_f64() * 100.0 + ); + println!( + " Write: {:>6.1} ms ({:.0}%)", + compact_write_elapsed.as_secs_f64() * 1000.0, + compact_write_elapsed.as_secs_f64() / total_compaction.as_secs_f64() * 100.0 + ); + println!( + " Commit: {:>6.1} ms ({:.0}%)", + commit_elapsed.as_secs_f64() * 1000.0, + commit_elapsed.as_secs_f64() / total_compaction.as_secs_f64() * 100.0 + ); + println!(); + + // Snapshot verification + let snapshots: Vec<_> = table.metadata().snapshots().collect(); + println!( + "Snapshots: {} (append + replace)", + snapshots.len() + ); + let current = table.metadata().current_snapshot().unwrap(); + println!( + "Current snapshot operation: {:?}", + current.summary().operation + ); + println!(); +} + +#[tokio::main] +async fn main() { + println!(); + println!("========================================================================"); + println!(" Native Iceberg Compaction Benchmark (iceberg-rust + Arrow)"); + println!(" No JVM, no Spark -- pure Rust pipeline"); + println!("========================================================================"); + println!(); + + // Small: 20 files x 1K rows = 20K rows + run_benchmark(20, 1_000).await; + + // Medium: 50 files x 10K rows = 500K rows + run_benchmark(50, 10_000).await; + + // Large: 100 files x 50K rows = 5M rows + run_benchmark(100, 50_000).await; + + // XL: 200 files x 50K rows = 10M rows + run_benchmark(200, 50_000).await; + + println!("========================================================================"); + println!(" Benchmark complete."); + println!("========================================================================"); +} diff --git a/crates/iceberg/src/spec/snapshot_summary.rs b/crates/iceberg/src/spec/snapshot_summary.rs index c67ee37d3e..c4767f31cf 100644 --- a/crates/iceberg/src/spec/snapshot_summary.rs +++ b/crates/iceberg/src/spec/snapshot_summary.rs @@ -339,6 +339,7 @@ pub(crate) fn update_snapshot_summaries( if summary.operation != Operation::Append && summary.operation != Operation::Overwrite && summary.operation != Operation::Delete + && summary.operation != Operation::Replace { return Err(Error::new( ErrorKind::DataInvalid, diff --git a/crates/iceberg/src/transaction/mod.rs b/crates/iceberg/src/transaction/mod.rs index 074c7fefe4..5c7658c382 100644 --- a/crates/iceberg/src/transaction/mod.rs +++ b/crates/iceberg/src/transaction/mod.rs @@ -54,6 +54,7 @@ mod action; pub use action::*; mod append; +mod replace_data_files; mod snapshot; mod sort_order; mod update_location; @@ -71,6 +72,7 @@ use crate::spec::TableProperties; use crate::table::Table; use crate::transaction::action::BoxedTransactionAction; use crate::transaction::append::FastAppendAction; +use crate::transaction::replace_data_files::ReplaceDataFilesAction; use crate::transaction::sort_order::ReplaceSortOrderAction; use crate::transaction::update_location::UpdateLocationAction; use crate::transaction::update_properties::UpdatePropertiesAction; @@ -156,6 +158,11 @@ impl Transaction { UpdateStatisticsAction::new() } + /// Creates a replace data files action (for compaction). + pub fn replace_data_files(&self) -> ReplaceDataFilesAction { + ReplaceDataFilesAction::new() + } + /// Commit transaction. pub async fn commit(self, catalog: &dyn Catalog) -> Result { if self.actions.is_empty() { diff --git a/crates/iceberg/src/transaction/replace_data_files.rs b/crates/iceberg/src/transaction/replace_data_files.rs new file mode 100644 index 0000000000..9961cfd4a7 --- /dev/null +++ b/crates/iceberg/src/transaction/replace_data_files.rs @@ -0,0 +1,577 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; +use std::sync::Arc; + +use async_trait::async_trait; +use uuid::Uuid; + +use crate::error::Result; +use crate::spec::{DataFile, ManifestEntry, ManifestFile, Operation}; +use crate::table::Table; +use crate::transaction::snapshot::{ + DefaultManifestProcess, SnapshotProduceOperation, SnapshotProducer, +}; +use crate::transaction::{ActionCommit, TransactionAction}; +use crate::{Error, ErrorKind}; + +/// Action to replace data files in a table (for compaction/rewrite operations). +pub struct ReplaceDataFilesAction { + commit_uuid: Option, + key_metadata: Option>, + snapshot_properties: HashMap, + files_to_delete: Vec, + files_to_add: Vec, + validate_from_snapshot_id: Option, + data_sequence_number: Option, +} + +impl ReplaceDataFilesAction { + pub(crate) fn new() -> Self { + Self { + commit_uuid: None, + key_metadata: None, + snapshot_properties: HashMap::default(), + files_to_delete: vec![], + files_to_add: vec![], + validate_from_snapshot_id: None, + data_sequence_number: None, + } + } + + /// Add files to delete (old files being replaced). + pub fn delete_files(mut self, files: impl IntoIterator) -> Self { + self.files_to_delete.extend(files); + self + } + + /// Add files to add (new files replacing old ones). + pub fn add_files(mut self, files: impl IntoIterator) -> Self { + self.files_to_add.extend(files); + self + } + + /// Set commit UUID. + pub fn set_commit_uuid(mut self, commit_uuid: Uuid) -> Self { + self.commit_uuid = Some(commit_uuid); + self + } + + /// Set key metadata for manifest files. + pub fn set_key_metadata(mut self, key_metadata: Vec) -> Self { + self.key_metadata = Some(key_metadata); + self + } + + /// Set snapshot summary properties. + pub fn set_snapshot_properties(mut self, props: HashMap) -> Self { + self.snapshot_properties = props; + self + } + + /// Validate that files to delete exist from this snapshot. + pub fn validate_from_snapshot(mut self, snapshot_id: i64) -> Self { + self.validate_from_snapshot_id = Some(snapshot_id); + self + } + + /// Set data sequence number for new files (for handling concurrent equality deletes). + pub fn data_sequence_number(mut self, seq_num: i64) -> Self { + self.data_sequence_number = Some(seq_num); + self + } +} + +#[async_trait] +impl TransactionAction for ReplaceDataFilesAction { + async fn commit(self: Arc, table: &Table) -> Result { + if self.files_to_delete.is_empty() { + return Err(Error::new( + ErrorKind::DataInvalid, + "Replace operation requires files to delete", + )); + } + + if self.files_to_add.is_empty() { + return Err(Error::new( + ErrorKind::DataInvalid, + "Replace operation requires files to add", + )); + } + + // Validate files exist from specified snapshot + if let Some(snapshot_id) = self.validate_from_snapshot_id { + Self::validate_files_exist(table, snapshot_id, &self.files_to_delete).await?; + } + + let mut snapshot_producer = SnapshotProducer::new( + table, + self.commit_uuid.unwrap_or_else(Uuid::now_v7), + self.key_metadata.clone(), + self.snapshot_properties.clone(), + self.files_to_add.clone(), + ); + + if let Some(seq_num) = self.data_sequence_number { + snapshot_producer = snapshot_producer.with_data_sequence_number(seq_num); + } + + snapshot_producer.validate_added_data_files()?; + + let replace_op = ReplaceOperation { + files_to_delete: self.files_to_delete.clone(), + }; + + snapshot_producer + .commit(replace_op, DefaultManifestProcess) + .await + } +} + +impl ReplaceDataFilesAction { + async fn validate_files_exist( + table: &Table, + snapshot_id: i64, + files_to_delete: &[DataFile], + ) -> Result<()> { + let snapshot = table + .metadata() + .snapshot_by_id(snapshot_id) + .ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!("Snapshot {snapshot_id} not found"), + ) + })?; + + let files_to_find: std::collections::HashSet<&str> = files_to_delete + .iter() + .map(|f| f.file_path.as_str()) + .collect(); + + let manifest_list = snapshot + .load_manifest_list(table.file_io(), &table.metadata_ref()) + .await?; + + let mut found_files = std::collections::HashSet::new(); + for entry in manifest_list.entries() { + let manifest = entry.load_manifest(table.file_io()).await?; + for e in manifest.entries() { + if e.is_alive() && files_to_find.contains(e.file_path()) { + found_files.insert(e.file_path().to_string()); + } + } + } + + let missing: Vec<_> = files_to_delete + .iter() + .filter(|f| !found_files.contains(&f.file_path)) + .map(|f| f.file_path.as_str()) + .collect(); + + if !missing.is_empty() { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Files not found in snapshot {}: {}", + snapshot_id, + missing.join(", ") + ), + )); + } + + Ok(()) + } +} + +struct ReplaceOperation { + files_to_delete: Vec, +} + +impl SnapshotProduceOperation for ReplaceOperation { + fn operation(&self) -> Operation { + Operation::Replace + } + + async fn delete_entries( + &self, + _snapshot_produce: &SnapshotProducer<'_>, + ) -> Result> { + Ok(vec![]) + } + + async fn existing_manifest( + &self, + snapshot_produce: &SnapshotProducer<'_>, + ) -> Result> { + let Some(snapshot) = snapshot_produce.table.metadata().current_snapshot() else { + return Ok(vec![]); + }; + + let files_to_delete: std::collections::HashSet<&str> = self + .files_to_delete + .iter() + .map(|f| f.file_path.as_str()) + .collect(); + + let manifest_list = snapshot + .load_manifest_list( + snapshot_produce.table.file_io(), + &snapshot_produce.table.metadata_ref(), + ) + .await?; + + // Include existing manifests that don't contain deleted files + let mut result = Vec::new(); + for entry in manifest_list.entries() { + if !entry.has_added_files() && !entry.has_existing_files() { + continue; + } + + let manifest = entry + .load_manifest(snapshot_produce.table.file_io()) + .await?; + let has_deleted_file = manifest + .entries() + .iter() + .any(|e| e.is_alive() && files_to_delete.contains(e.file_path())); + + if !has_deleted_file { + result.push(entry.clone()); + } + } + + Ok(result) + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use crate::TableUpdate; + use crate::spec::{ + DataContentType, DataFile, DataFileBuilder, DataFileFormat, Literal, Operation, Struct, + }; + use crate::transaction::tests::make_v2_minimal_table; + use crate::transaction::{Transaction, TransactionAction}; + + fn create_data_file(table: &crate::table::Table, path: &str, record_count: u64) -> DataFile { + DataFileBuilder::default() + .content(DataContentType::Data) + .file_path(path.to_string()) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(100) + .record_count(record_count) + .partition_spec_id(table.metadata().default_partition_spec_id()) + .partition(Struct::from_iter([Some(Literal::long(300))])) + .build() + .unwrap() + } + + #[tokio::test] + async fn test_replace_data_files_basic() { + let table = make_v2_minimal_table(); + let tx = Transaction::new(&table); + + let old_file = create_data_file(&table, "data/old.parquet", 100); + let new_file = create_data_file(&table, "data/new.parquet", 100); + + let action = tx + .replace_data_files() + .delete_files(vec![old_file]) + .add_files(vec![new_file]); + + let result = Arc::new(action).commit(&table).await; + assert!(result.is_ok()); + } + + #[tokio::test] + async fn test_replace_data_files_empty_deletes_fails() { + let table = make_v2_minimal_table(); + let tx = Transaction::new(&table); + + let new_file = create_data_file(&table, "data/new.parquet", 100); + + let action = tx.replace_data_files().add_files(vec![new_file]); + + let result = Arc::new(action).commit(&table).await; + assert!(result.is_err()); + } + + #[tokio::test] + async fn test_replace_data_files_empty_adds_fails() { + let table = make_v2_minimal_table(); + let tx = Transaction::new(&table); + + let old_file = create_data_file(&table, "data/old.parquet", 100); + + let action = tx.replace_data_files().delete_files(vec![old_file]); + + let result = Arc::new(action).commit(&table).await; + assert!(result.is_err()); + } + + #[tokio::test] + async fn test_replace_uses_replace_operation() { + let table = make_v2_minimal_table(); + let tx = Transaction::new(&table); + + let old_file = create_data_file(&table, "data/old.parquet", 100); + let new_file = create_data_file(&table, "data/new.parquet", 100); + + let action = tx + .replace_data_files() + .delete_files(vec![old_file]) + .add_files(vec![new_file]); + + let mut action_commit = Arc::new(action).commit(&table).await.unwrap(); + let updates = action_commit.take_updates(); + + let new_snapshot = if let TableUpdate::AddSnapshot { snapshot } = &updates[0] { + snapshot + } else { + panic!("Expected AddSnapshot"); + }; + + assert_eq!(new_snapshot.summary().operation, Operation::Replace); + } +} + +#[cfg(test)] +mod integration_tests { + use crate::memory::tests::new_memory_catalog; + use crate::spec::{ + DataContentType, DataFile, DataFileBuilder, DataFileFormat, Literal, Operation, Struct, + }; + use crate::transaction::tests::make_v3_minimal_table_in_catalog; + use crate::transaction::{ApplyTransactionAction, Transaction}; + + fn create_file(path: &str, record_count: u64) -> DataFile { + DataFileBuilder::default() + .content(DataContentType::Data) + .file_path(path.to_string()) + .file_format(DataFileFormat::Parquet) + .file_size_in_bytes(100) + .record_count(record_count) + .partition(Struct::from_iter([Some(Literal::long(0))])) + .partition_spec_id(0) + .build() + .unwrap() + } + + #[tokio::test] + async fn test_replace_after_append() { + let catalog = new_memory_catalog().await; + let table = make_v3_minimal_table_in_catalog(&catalog).await; + + // First append some files + let file1 = create_file("data/file1.parquet", 100); + let file2 = create_file("data/file2.parquet", 100); + let tx = Transaction::new(&table); + let action = tx + .fast_append() + .add_data_files(vec![file1.clone(), file2.clone()]); + let tx = action.apply(tx).unwrap(); + let table = tx.commit(&catalog).await.unwrap(); + + assert_eq!(table.metadata().snapshots().count(), 1); + + // Now replace file1 and file2 with a single compacted file + let compacted = create_file("data/compacted.parquet", 200); + let tx = Transaction::new(&table); + let action = tx + .replace_data_files() + .delete_files(vec![file1, file2]) + .add_files(vec![compacted]); + let tx = action.apply(tx).unwrap(); + let table = tx.commit(&catalog).await.unwrap(); + + assert_eq!(table.metadata().snapshots().count(), 2); + let snapshot = table.metadata().current_snapshot().unwrap(); + assert_eq!(snapshot.summary().operation, Operation::Replace); + + // Verify manifest has only the compacted file + let manifest_list = snapshot + .load_manifest_list(table.file_io(), table.metadata()) + .await + .unwrap(); + assert_eq!(manifest_list.entries().len(), 1); + + let manifest = manifest_list.entries()[0] + .load_manifest(table.file_io()) + .await + .unwrap(); + assert_eq!(manifest.entries().len(), 1); + assert_eq!(manifest.entries()[0].file_path(), "data/compacted.parquet"); + } + + #[tokio::test] + async fn test_replace_preserves_unrelated_files() { + let catalog = new_memory_catalog().await; + let table = make_v3_minimal_table_in_catalog(&catalog).await; + + // Append file1 + let file1 = create_file("data/file1.parquet", 100); + let tx = Transaction::new(&table); + let action = tx.fast_append().add_data_files(vec![file1.clone()]); + let tx = action.apply(tx).unwrap(); + let table = tx.commit(&catalog).await.unwrap(); + + // Append file2 in separate transaction + let file2 = create_file("data/file2.parquet", 100); + let tx = Transaction::new(&table); + let action = tx.fast_append().add_data_files(vec![file2.clone()]); + let tx = action.apply(tx).unwrap(); + let table = tx.commit(&catalog).await.unwrap(); + + // Replace only file1 + let file1_compacted = create_file("data/file1_compacted.parquet", 100); + let tx = Transaction::new(&table); + let action = tx + .replace_data_files() + .delete_files(vec![file1]) + .add_files(vec![file1_compacted]); + let tx = action.apply(tx).unwrap(); + let table = tx.commit(&catalog).await.unwrap(); + + // Verify both file2 manifest and new compacted manifest exist + let snapshot = table.metadata().current_snapshot().unwrap(); + let manifest_list = snapshot + .load_manifest_list(table.file_io(), table.metadata()) + .await + .unwrap(); + + // Should have 2 manifests: one for file2 (preserved), one for compacted + assert_eq!(manifest_list.entries().len(), 2); + + // Collect all file paths + let mut all_files = Vec::new(); + for entry in manifest_list.entries() { + let manifest = entry.load_manifest(table.file_io()).await.unwrap(); + for e in manifest.entries() { + if e.is_alive() { + all_files.push(e.file_path().to_string()); + } + } + } + all_files.sort(); + assert_eq!(all_files, vec![ + "data/file1_compacted.parquet", + "data/file2.parquet" + ]); + } + + #[tokio::test] + async fn test_validate_from_snapshot_success() { + let catalog = new_memory_catalog().await; + let table = make_v3_minimal_table_in_catalog(&catalog).await; + + let file1 = create_file("data/file1.parquet", 100); + let tx = Transaction::new(&table); + let action = tx.fast_append().add_data_files(vec![file1.clone()]); + let tx = action.apply(tx).unwrap(); + let table = tx.commit(&catalog).await.unwrap(); + + let snapshot_id = table.metadata().current_snapshot().unwrap().snapshot_id(); + + // Replace with validation from correct snapshot + let compacted = create_file("data/compacted.parquet", 100); + let tx = Transaction::new(&table); + let action = tx + .replace_data_files() + .validate_from_snapshot(snapshot_id) + .delete_files(vec![file1]) + .add_files(vec![compacted]); + let tx = action.apply(tx).unwrap(); + let result = tx.commit(&catalog).await; + assert!(result.is_ok()); + } + + #[tokio::test] + async fn test_validate_from_snapshot_missing_file() { + let catalog = new_memory_catalog().await; + let table = make_v3_minimal_table_in_catalog(&catalog).await; + + let file1 = create_file("data/file1.parquet", 100); + let tx = Transaction::new(&table); + let action = tx.fast_append().add_data_files(vec![file1.clone()]); + let tx = action.apply(tx).unwrap(); + let table = tx.commit(&catalog).await.unwrap(); + + let snapshot_id = table.metadata().current_snapshot().unwrap().snapshot_id(); + + // Try to delete a file that doesn't exist + let nonexistent = create_file("data/nonexistent.parquet", 100); + let compacted = create_file("data/compacted.parquet", 100); + let tx = Transaction::new(&table); + let action = tx + .replace_data_files() + .validate_from_snapshot(snapshot_id) + .delete_files(vec![nonexistent]) + .add_files(vec![compacted]); + let tx = action.apply(tx).unwrap(); + let result = tx.commit(&catalog).await; + assert!(result.is_err()); + } + + #[tokio::test] + async fn test_data_sequence_number() { + let catalog = new_memory_catalog().await; + let table = make_v3_minimal_table_in_catalog(&catalog).await; + + let file1 = create_file("data/file1.parquet", 100); + let tx = Transaction::new(&table); + let action = tx.fast_append().add_data_files(vec![file1.clone()]); + let tx = action.apply(tx).unwrap(); + let table = tx.commit(&catalog).await.unwrap(); + + let original_seq = table + .metadata() + .current_snapshot() + .unwrap() + .sequence_number(); + + // Replace with custom sequence number + let compacted = create_file("data/compacted.parquet", 100); + let tx = Transaction::new(&table); + let action = tx + .replace_data_files() + .data_sequence_number(original_seq) + .delete_files(vec![file1]) + .add_files(vec![compacted]); + let tx = action.apply(tx).unwrap(); + let table = tx.commit(&catalog).await.unwrap(); + + // Verify the new manifest entry has the custom sequence number + let snapshot = table.metadata().current_snapshot().unwrap(); + let manifest_list = snapshot + .load_manifest_list(table.file_io(), table.metadata()) + .await + .unwrap(); + + for entry in manifest_list.entries() { + let manifest = entry.load_manifest(table.file_io()).await.unwrap(); + for e in manifest.entries() { + if e.file_path() == "data/compacted.parquet" { + assert_eq!(e.sequence_number(), Some(original_seq)); + } + } + } + } +} diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs index c8bf26a174..10578bbf59 100644 --- a/crates/iceberg/src/transaction/snapshot.rs +++ b/crates/iceberg/src/transaction/snapshot.rs @@ -118,6 +118,8 @@ pub(crate) struct SnapshotProducer<'a> { // It starts from 0 and increments for each new manifest file. // Note: This counter is limited to the range of (0..u64::MAX). manifest_counter: RangeFrom, + // Custom data sequence number for compaction operations. + data_sequence_number: Option, } impl<'a> SnapshotProducer<'a> { @@ -136,9 +138,15 @@ impl<'a> SnapshotProducer<'a> { snapshot_properties, added_data_files, manifest_counter: (0..), + data_sequence_number: None, } } + pub(crate) fn with_data_sequence_number(mut self, seq_num: i64) -> Self { + self.data_sequence_number = Some(seq_num); + self + } + pub(crate) fn validate_added_data_files(&self) -> Result<()> { for data_file in &self.added_data_files { if data_file.content_type() != crate::spec::DataContentType::Data { @@ -300,18 +308,26 @@ impl<'a> SnapshotProducer<'a> { let snapshot_id = self.snapshot_id; let format_version = self.table.metadata().format_version(); - let manifest_entries = added_data_files.into_iter().map(|data_file| { - let builder = ManifestEntry::builder() - .status(crate::spec::ManifestStatus::Added) - .data_file(data_file); - if format_version == FormatVersion::V1 { - builder.snapshot_id(snapshot_id).build() - } else { - // For format version > 1, we set the snapshot id at the inherited time to avoid rewrite the manifest file when - // commit failed. - builder.build() - } - }); + let data_sequence_number = self.data_sequence_number; + let manifest_entries: Vec<_> = added_data_files + .into_iter() + .map(|data_file| match (format_version, data_sequence_number) { + (FormatVersion::V1, _) => ManifestEntry::builder() + .status(crate::spec::ManifestStatus::Added) + .snapshot_id(snapshot_id) + .data_file(data_file) + .build(), + (_, Some(seq_num)) => ManifestEntry::builder() + .status(crate::spec::ManifestStatus::Added) + .sequence_number(seq_num) + .data_file(data_file) + .build(), + (_, None) => ManifestEntry::builder() + .status(crate::spec::ManifestStatus::Added) + .data_file(data_file) + .build(), + }) + .collect(); let mut writer = self.new_manifest_writer(ManifestContentType::Data)?; for entry in manifest_entries { writer.add_entry(entry)?;