diff --git a/crates/catalog/glue/src/catalog.rs b/crates/catalog/glue/src/catalog.rs index 37a7996f80..1b3317ad83 100644 --- a/crates/catalog/glue/src/catalog.rs +++ b/crates/catalog/glue/src/catalog.rs @@ -17,6 +17,7 @@ use std::collections::HashMap; use std::fmt::Debug; +use std::str::FromStr; use anyhow::anyhow; use async_trait::async_trait; @@ -526,14 +527,14 @@ impl Catalog for GlueCatalog { let metadata = TableMetadataBuilder::from_table_creation(creation)? .build()? .metadata; - let metadata_location = - MetadataLocation::new_with_table_location(location.clone()).to_string(); + let metadata_location = MetadataLocation::new_with_metadata(location.clone(), &metadata); metadata.write_to(&self.file_io, &metadata_location).await?; + let metadata_location_str = metadata_location.to_string(); let glue_table = convert_to_glue_table( &table_name, - metadata_location.clone(), + metadata_location_str.clone(), &metadata, metadata.properties(), None, @@ -551,7 +552,7 @@ impl Catalog for GlueCatalog { Table::builder() .file_io(self.file_io()) - .metadata_location(metadata_location) + .metadata_location(metadata_location_str) .metadata(metadata) .identifier(TableIdent::new(NamespaceIdent::new(db_name), table_name)) .build() @@ -789,12 +790,13 @@ impl Catalog for GlueCatalog { let current_metadata_location = current_table.metadata_location_result()?.to_string(); let staged_table = commit.apply(current_table)?; - let staged_metadata_location = staged_table.metadata_location_result()?; + let staged_metadata_location_str = staged_table.metadata_location_result()?; + let staged_metadata_location = MetadataLocation::from_str(staged_metadata_location_str)?; // Write new metadata staged_table .metadata() - .write_to(staged_table.file_io(), staged_metadata_location) + .write_to(staged_table.file_io(), &staged_metadata_location) .await?; // Persist staged table to Glue with optimistic locking diff --git a/crates/catalog/glue/src/utils.rs b/crates/catalog/glue/src/utils.rs index f3be58381a..906e6fcc18 100644 --- a/crates/catalog/glue/src/utils.rs +++ b/crates/catalog/glue/src/utils.rs @@ -306,8 +306,6 @@ mod tests { fn test_convert_to_glue_table() -> Result<()> { let table_name = "my_table".to_string(); let location = "s3a://warehouse/hive".to_string(); - let metadata_location = MetadataLocation::new_with_table_location(location).to_string(); - let properties = HashMap::new(); let schema = Schema::builder() .with_schema_id(1) .with_fields(vec![ @@ -316,6 +314,8 @@ mod tests { .build()?; let metadata = create_metadata(schema)?; + let metadata_location = + MetadataLocation::new_with_metadata(location, &metadata).to_string(); let parameters = HashMap::from([ (ICEBERG_FIELD_ID.to_string(), "1".to_string()), @@ -336,8 +336,13 @@ mod tests { .location(metadata.location()) .build(); - let result = - convert_to_glue_table(&table_name, metadata_location, &metadata, &properties, None)?; + let result = convert_to_glue_table( + &table_name, + metadata_location, + &metadata, + metadata.properties(), + None, + )?; assert_eq!(result.name(), &table_name); assert_eq!(result.description(), None); diff --git a/crates/catalog/hms/src/catalog.rs b/crates/catalog/hms/src/catalog.rs index b7d192210b..4a8b1e787a 100644 --- a/crates/catalog/hms/src/catalog.rs +++ b/crates/catalog/hms/src/catalog.rs @@ -442,17 +442,17 @@ impl Catalog for HmsCatalog { .build()? .metadata; - let metadata_location = - MetadataLocation::new_with_table_location(location.clone()).to_string(); + let metadata_location = MetadataLocation::new_with_metadata(location.clone(), &metadata); metadata.write_to(&self.file_io, &metadata_location).await?; + let metadata_location_str = metadata_location.to_string(); let hive_table = convert_to_hive_table( db_name.clone(), metadata.current_schema(), table_name.clone(), location, - metadata_location.clone(), + metadata_location_str.clone(), metadata.properties(), )?; @@ -464,7 +464,7 @@ impl Catalog for HmsCatalog { Table::builder() .file_io(self.file_io()) - .metadata_location(metadata_location) + .metadata_location(metadata_location_str) .metadata(metadata) .identifier(TableIdent::new(NamespaceIdent::new(db_name), table_name)) .build() diff --git a/crates/catalog/hms/src/utils.rs b/crates/catalog/hms/src/utils.rs index 096e792f61..cd9b557397 100644 --- a/crates/catalog/hms/src/utils.rs +++ b/crates/catalog/hms/src/utils.rs @@ -311,8 +311,8 @@ fn get_current_time() -> Result { #[cfg(test)] mod tests { - use iceberg::spec::{NestedField, PrimitiveType, Type}; - use iceberg::{MetadataLocation, Namespace, NamespaceIdent}; + use iceberg::spec::{NestedField, PrimitiveType, TableMetadataBuilder, Type}; + use iceberg::{MetadataLocation, Namespace, NamespaceIdent, TableCreation}; use super::*; @@ -343,8 +343,6 @@ mod tests { let db_name = "my_db".to_string(); let table_name = "my_table".to_string(); let location = "s3a://warehouse/hms".to_string(); - let metadata_location = - MetadataLocation::new_with_table_location(location.clone()).to_string(); let properties = HashMap::new(); let schema = Schema::builder() .with_schema_id(1) @@ -354,6 +352,18 @@ mod tests { ]) .build()?; + let table_creation = TableCreation::builder() + .name(table_name.clone()) + .location(location.clone()) + .schema(schema.clone()) + .properties(properties.clone()) + .build(); + let metadata = TableMetadataBuilder::from_table_creation(table_creation)? + .build()? + .metadata; + let metadata_location = + MetadataLocation::new_with_metadata(location.clone(), &metadata).to_string(); + let result = convert_to_hive_table( db_name.clone(), &schema, diff --git a/crates/catalog/s3tables/src/catalog.rs b/crates/catalog/s3tables/src/catalog.rs index a6ac60bc79..a1935e0036 100644 --- a/crates/catalog/s3tables/src/catalog.rs +++ b/crates/catalog/s3tables/src/catalog.rs @@ -17,6 +17,7 @@ use std::collections::HashMap; use std::future::Future; +use std::str::FromStr; use async_trait::async_trait; use aws_sdk_s3tables::operation::create_table::CreateTableOutput; @@ -476,17 +477,18 @@ impl Catalog for S3TablesCatalog { let metadata = TableMetadataBuilder::from_table_creation(creation)? .build()? .metadata; - let metadata_location = - MetadataLocation::new_with_table_location(table_location).to_string(); + + let metadata_location = MetadataLocation::new_with_metadata(table_location, &metadata); metadata.write_to(&self.file_io, &metadata_location).await?; // update metadata location + let metadata_location_str = metadata_location.to_string(); self.s3tables_client .update_table_metadata_location() .table_bucket_arn(self.config.table_bucket_arn.clone()) .namespace(namespace.to_url_string()) .name(table_ident.name()) - .metadata_location(metadata_location.clone()) + .metadata_location(metadata_location_str.clone()) .version_token(create_resp.version_token()) .send() .await @@ -494,7 +496,7 @@ impl Catalog for S3TablesCatalog { let table = Table::builder() .identifier(table_ident) - .metadata_location(metadata_location) + .metadata_location(metadata_location_str) .metadata(metadata) .file_io(self.file_io.clone()) .build()?; @@ -605,11 +607,12 @@ impl Catalog for S3TablesCatalog { self.load_table_with_version_token(&table_ident).await?; let staged_table = commit.apply(current_table)?; - let staged_metadata_location = staged_table.metadata_location_result()?; + let staged_metadata_location_str = staged_table.metadata_location_result()?; + let staged_metadata_location = MetadataLocation::from_str(staged_metadata_location_str)?; staged_table .metadata() - .write_to(staged_table.file_io(), staged_metadata_location) + .write_to(staged_table.file_io(), &staged_metadata_location) .await?; let builder = self @@ -619,7 +622,7 @@ impl Catalog for S3TablesCatalog { .namespace(table_namespace.to_url_string()) .name(table_ident.name()) .version_token(version_token) - .metadata_location(staged_metadata_location); + .metadata_location(staged_metadata_location_str); let _ = builder.send().await.map_err(|e| { let error = e.into_service_error(); diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs index e3b8a17be3..90ef42c5db 100644 --- a/crates/catalog/sql/src/catalog.rs +++ b/crates/catalog/sql/src/catalog.rs @@ -829,21 +829,22 @@ impl Catalog for SqlCatalog { .build()? .metadata; let tbl_metadata_location = - MetadataLocation::new_with_table_location(location.clone()).to_string(); + MetadataLocation::new_with_metadata(location.clone(), &tbl_metadata); tbl_metadata .write_to(&self.fileio, &tbl_metadata_location) .await?; + let tbl_metadata_location_str = tbl_metadata_location.to_string(); self.execute(&format!( "INSERT INTO {CATALOG_TABLE_NAME} ({CATALOG_FIELD_CATALOG_NAME}, {CATALOG_FIELD_TABLE_NAMESPACE}, {CATALOG_FIELD_TABLE_NAME}, {CATALOG_FIELD_METADATA_LOCATION_PROP}, {CATALOG_FIELD_RECORD_TYPE}) VALUES (?, ?, ?, ?, ?) - "), vec![Some(&self.name), Some(&namespace.join(".")), Some(&tbl_name.clone()), Some(&tbl_metadata_location), Some(CATALOG_FIELD_TABLE_RECORD_TYPE)], None).await?; + "), vec![Some(&self.name), Some(&namespace.join(".")), Some(&tbl_name.clone()), Some(&tbl_metadata_location_str), Some(CATALOG_FIELD_TABLE_RECORD_TYPE)], None).await?; Ok(Table::builder() .file_io(self.fileio.clone()) - .metadata_location(tbl_metadata_location) + .metadata_location(tbl_metadata_location_str) .identifier(tbl_ident) .metadata(tbl_metadata) .build()?) @@ -927,13 +928,15 @@ impl Catalog for SqlCatalog { let current_metadata_location = current_table.metadata_location_result()?.to_string(); let staged_table = commit.apply(current_table)?; - let staged_metadata_location = staged_table.metadata_location_result()?; + let staged_metadata_location_str = staged_table.metadata_location_result()?; + let staged_metadata_location = MetadataLocation::from_str(staged_metadata_location_str)?; staged_table .metadata() .write_to(staged_table.file_io(), &staged_metadata_location) .await?; + let staged_metadata_location_str = staged_metadata_location.to_string(); let update_result = self .execute( &format!( @@ -949,7 +952,7 @@ impl Catalog for SqlCatalog { AND {CATALOG_FIELD_METADATA_LOCATION_PROP} = ?" ), vec![ - Some(staged_metadata_location), + Some(&staged_metadata_location_str), Some(current_metadata_location.as_str()), Some(&self.name), Some(table_ident.name()), diff --git a/crates/iceberg/src/catalog/memory/catalog.rs b/crates/iceberg/src/catalog/memory/catalog.rs index df0299acb2..3976fbd90e 100644 --- a/crates/iceberg/src/catalog/memory/catalog.rs +++ b/crates/iceberg/src/catalog/memory/catalog.rs @@ -18,6 +18,7 @@ //! This module contains memory catalog implementation. use std::collections::HashMap; +use std::str::FromStr; use async_trait::async_trait; use futures::lock::{Mutex, MutexGuard}; @@ -279,15 +280,15 @@ impl Catalog for MemoryCatalog { let metadata = TableMetadataBuilder::from_table_creation(table_creation)? .build()? .metadata; - let metadata_location = MetadataLocation::new_with_table_location(location).to_string(); + let metadata_location = MetadataLocation::new_with_metadata(location, &metadata); metadata.write_to(&self.file_io, &metadata_location).await?; - root_namespace_state.insert_new_table(&table_ident, metadata_location.clone())?; + root_namespace_state.insert_new_table(&table_ident, metadata_location.to_string())?; Table::builder() .file_io(self.file_io.clone()) - .metadata_location(metadata_location) + .metadata_location(metadata_location.to_string()) .metadata(metadata) .identifier(table_ident) .build() @@ -365,12 +366,11 @@ impl Catalog for MemoryCatalog { let staged_table = commit.apply(current_table)?; // Write table metadata to the new location + let metadata_location = + MetadataLocation::from_str(staged_table.metadata_location_result()?)?; staged_table .metadata() - .write_to( - staged_table.file_io(), - staged_table.metadata_location_result()?, - ) + .write_to(staged_table.file_io(), &metadata_location) .await?; // Flip the pointer to reference the new metadata file. diff --git a/crates/iceberg/src/catalog/metadata_location.rs b/crates/iceberg/src/catalog/metadata_location.rs index 3705ee42dc..ed28118879 100644 --- a/crates/iceberg/src/catalog/metadata_location.rs +++ b/crates/iceberg/src/catalog/metadata_location.rs @@ -15,41 +15,86 @@ // specific language governing permissions and limitations // under the License. +use std::collections::HashMap; use std::fmt::Display; use std::str::FromStr; use uuid::Uuid; +use crate::compression::CompressionCodec; +use crate::spec::{TableMetadata, parse_metadata_file_compression}; use crate::{Error, ErrorKind, Result}; /// Helper for parsing a location of the format: `/metadata/-.metadata.json` +/// or with compression: `/metadata/-.gz.metadata.json` #[derive(Clone, Debug, PartialEq)] pub struct MetadataLocation { table_location: String, version: i32, id: Uuid, + compression_codec: CompressionCodec, } impl MetadataLocation { + /// Determines the compression codec from table properties. + /// Parse errors result in CompressionCodec::None. + fn compression_from_properties(properties: &HashMap) -> CompressionCodec { + parse_metadata_file_compression(properties).unwrap_or(CompressionCodec::None) + } + /// Creates a completely new metadata location starting at version 0. - /// Only used for creating a new table. For updates, see `with_next_version`. + /// Only used for creating a new table. For updates, see `next_version`. + #[deprecated( + since = "0.8.0", + note = "Use new_with_metadata instead to properly handle compression settings" + )] pub fn new_with_table_location(table_location: impl ToString) -> Self { Self { table_location: table_location.to_string(), version: 0, id: Uuid::new_v4(), + compression_codec: CompressionCodec::None, + } + } + + /// Creates a completely new metadata location starting at version 0, + /// with compression settings from the table metadata. + /// Only used for creating a new table. For updates, see `next_version`. + pub fn new_with_metadata(table_location: impl ToString, metadata: &TableMetadata) -> Self { + Self { + table_location: table_location.to_string(), + version: 0, + id: Uuid::new_v4(), + compression_codec: Self::compression_from_properties(metadata.properties()), } } /// Creates a new metadata location for an updated metadata file. + /// Increments the version number and generates a new UUID. pub fn with_next_version(&self) -> Self { Self { table_location: self.table_location.clone(), version: self.version + 1, id: Uuid::new_v4(), + compression_codec: self.compression_codec, + } + } + + /// Updates the metadata location with compression settings from the new metadata. + pub fn with_new_metadata(&self, new_metadata: &TableMetadata) -> Self { + Self { + table_location: self.table_location.clone(), + version: self.version, + id: self.id, + compression_codec: Self::compression_from_properties(new_metadata.properties()), } } + /// Returns the compression codec used for this metadata location. + pub fn compression_codec(&self) -> CompressionCodec { + self.compression_codec + } + fn parse_metadata_path_prefix(path: &str) -> Result { let prefix = path.strip_suffix("/metadata").ok_or(Error::new( ErrorKind::Unexpected, @@ -59,30 +104,43 @@ impl MetadataLocation { Ok(prefix.to_string()) } - /// Parses a file name of the format `-.metadata.json`. - fn parse_file_name(file_name: &str) -> Result<(i32, Uuid)> { - let (version, id) = file_name - .strip_suffix(".metadata.json") - .ok_or(Error::new( - ErrorKind::Unexpected, - format!("Invalid metadata file ending: {file_name}"), - ))? - .split_once('-') - .ok_or(Error::new( - ErrorKind::Unexpected, - format!("Invalid metadata file name format: {file_name}"), - ))?; - - Ok((version.parse::()?, Uuid::parse_str(id)?)) + /// Parses a file name of the format `-.metadata.json` + /// or with compression: `-.gz.metadata.json`. + /// Parse errors for compression codec result in CompressionCodec::None. + fn parse_file_name(file_name: &str) -> Result<(i32, Uuid, CompressionCodec)> { + let stripped = file_name.strip_suffix(".metadata.json").ok_or(Error::new( + ErrorKind::Unexpected, + format!("Invalid metadata file ending: {file_name}"), + ))?; + + // Check for compression suffix (e.g., .gz) + let gzip_suffix = CompressionCodec::Gzip.suffix()?; + let (stripped, compression_codec) = if let Some(s) = stripped.strip_suffix(gzip_suffix) { + (s, CompressionCodec::Gzip) + } else { + (stripped, CompressionCodec::None) + }; + + let (version, id) = stripped.split_once('-').ok_or(Error::new( + ErrorKind::Unexpected, + format!("Invalid metadata file name format: {file_name}"), + ))?; + + Ok(( + version.parse::()?, + Uuid::parse_str(id)?, + compression_codec, + )) } } impl Display for MetadataLocation { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let suffix = self.compression_codec.suffix().unwrap_or(""); write!( f, - "{}/metadata/{:0>5}-{}.metadata.json", - self.table_location, self.version, self.id + "{}/metadata/{:0>5}-{}{}.metadata.json", + self.table_location, self.version, self.id, suffix ) } } @@ -97,23 +155,41 @@ impl FromStr for MetadataLocation { ))?; let prefix = Self::parse_metadata_path_prefix(path)?; - let (version, id) = Self::parse_file_name(file_name)?; + let (version, id, compression_codec) = Self::parse_file_name(file_name)?; Ok(MetadataLocation { table_location: prefix, version, id, + compression_codec, }) } } #[cfg(test)] mod test { + use std::collections::HashMap; use std::str::FromStr; use uuid::Uuid; - use crate::MetadataLocation; + use crate::compression::CompressionCodec; + use crate::spec::{Schema, TableMetadata, TableMetadataBuilder}; + use crate::{MetadataLocation, TableCreation}; + + fn create_test_metadata(properties: HashMap) -> TableMetadata { + let table_creation = TableCreation::builder() + .name("test_table".to_string()) + .location("/test/table".to_string()) + .schema(Schema::builder().build().unwrap()) + .properties(properties) + .build(); + TableMetadataBuilder::from_table_creation(table_creation) + .unwrap() + .build() + .unwrap() + .metadata + } #[test] fn test_metadata_location_from_string() { @@ -125,6 +201,7 @@ mod test { table_location: "".to_string(), version: 1234567, id: Uuid::from_str("2cd22b57-5127-4198-92ba-e4e67c79821b").unwrap(), + compression_codec: CompressionCodec::None, }), ), // Some prefix @@ -134,6 +211,7 @@ mod test { table_location: "/abc".to_string(), version: 1234567, id: Uuid::from_str("2cd22b57-5127-4198-92ba-e4e67c79821b").unwrap(), + compression_codec: CompressionCodec::None, }), ), // Longer prefix @@ -143,6 +221,7 @@ mod test { table_location: "/abc/def".to_string(), version: 1234567, id: Uuid::from_str("2cd22b57-5127-4198-92ba-e4e67c79821b").unwrap(), + compression_codec: CompressionCodec::None, }), ), // Prefix with special characters @@ -152,6 +231,7 @@ mod test { table_location: "https://127.0.0.1".to_string(), version: 1234567, id: Uuid::from_str("2cd22b57-5127-4198-92ba-e4e67c79821b").unwrap(), + compression_codec: CompressionCodec::None, }), ), // Another id @@ -161,6 +241,7 @@ mod test { table_location: "/abc".to_string(), version: 1234567, id: Uuid::from_str("81056704-ce5b-41c4-bb83-eb6408081af6").unwrap(), + compression_codec: CompressionCodec::None, }), ), // Version 0 @@ -170,6 +251,17 @@ mod test { table_location: "/abc".to_string(), version: 0, id: Uuid::from_str("2cd22b57-5127-4198-92ba-e4e67c79821b").unwrap(), + compression_codec: CompressionCodec::None, + }), + ), + // With gzip compression + ( + "/abc/metadata/1234567-2cd22b57-5127-4198-92ba-e4e67c79821b.gz.metadata.json", + Ok(MetadataLocation { + table_location: "/abc".to_string(), + version: 1234567, + id: Uuid::from_str("2cd22b57-5127-4198-92ba-e4e67c79821b").unwrap(), + compression_codec: CompressionCodec::Gzip, }), ), // Negative version @@ -216,8 +308,9 @@ mod test { #[test] fn test_metadata_location_with_next_version() { + let metadata = create_test_metadata(HashMap::new()); let test_cases = vec![ - MetadataLocation::new_with_table_location("/abc"), + MetadataLocation::new_with_metadata("/abc", &metadata), MetadataLocation::from_str( "/abc/def/metadata/1234567-2cd22b57-5127-4198-92ba-e4e67c79821b.metadata.json", ) @@ -233,4 +326,79 @@ mod test { assert_ne!(next.id, input.id); } } + + #[test] + fn test_with_next_version_preserves_compression() { + // Start from a parsed location with no compression + let location_none = MetadataLocation::from_str( + "/test/table/metadata/00000-2cd22b57-5127-4198-92ba-e4e67c79821b.metadata.json", + ) + .unwrap(); + assert_eq!(location_none.compression_codec, CompressionCodec::None); + + let next_none = location_none.with_next_version(); + assert_eq!(next_none.compression_codec, CompressionCodec::None); + assert_eq!(next_none.version, 1); + + // Start from a parsed location with gzip compression + let location_gzip = MetadataLocation::from_str( + "/test/table/metadata/00005-81056704-ce5b-41c4-bb83-eb6408081af6.gz.metadata.json", + ) + .unwrap(); + assert_eq!(location_gzip.compression_codec, CompressionCodec::Gzip); + + let next_gzip = location_gzip.with_next_version(); + assert_eq!(next_gzip.compression_codec, CompressionCodec::Gzip); + assert_eq!(next_gzip.version, 6); + } + + #[test] + fn test_with_new_metadata_updates_compression() { + // Start from a parsed location with no compression + let location = MetadataLocation::from_str( + "/test/table/metadata/00000-2cd22b57-5127-4198-92ba-e4e67c79821b.metadata.json", + ) + .unwrap(); + assert_eq!(location.compression_codec, CompressionCodec::None); + + // Update to gzip compression + let mut props_gzip = HashMap::new(); + props_gzip.insert( + "write.metadata.compression-codec".to_string(), + "gzip".to_string(), + ); + let metadata_gzip = create_test_metadata(props_gzip); + let updated_gzip = location.with_new_metadata(&metadata_gzip); + assert_eq!(updated_gzip.compression_codec, CompressionCodec::Gzip); + assert_eq!(updated_gzip.version, 0); + assert_eq!( + updated_gzip.to_string(), + "/test/table/metadata/00000-2cd22b57-5127-4198-92ba-e4e67c79821b.gz.metadata.json" + ); + + // Update back to no compression + let props_none = HashMap::new(); + let metadata_none = create_test_metadata(props_none); + let updated_none = updated_gzip.with_new_metadata(&metadata_none); + assert_eq!(updated_none.compression_codec, CompressionCodec::None); + assert_eq!(updated_none.version, 0); + assert_eq!( + updated_none.to_string(), + "/test/table/metadata/00000-2cd22b57-5127-4198-92ba-e4e67c79821b.metadata.json" + ); + + // Test explicit "none" codec + let mut props_explicit_none = HashMap::new(); + props_explicit_none.insert( + "write.metadata.compression-codec".to_string(), + "none".to_string(), + ); + let metadata_explicit_none = create_test_metadata(props_explicit_none); + let updated_explicit = updated_gzip.with_new_metadata(&metadata_explicit_none); + assert_eq!(updated_explicit.compression_codec, CompressionCodec::None); + assert_eq!( + updated_explicit.to_string(), + "/test/table/metadata/00000-2cd22b57-5127-4198-92ba-e4e67c79821b.metadata.json" + ); + } } diff --git a/crates/iceberg/src/catalog/mod.rs b/crates/iceberg/src/catalog/mod.rs index f3a521379e..e8bc773421 100644 --- a/crates/iceberg/src/catalog/mod.rs +++ b/crates/iceberg/src/catalog/mod.rs @@ -353,13 +353,16 @@ impl TableCommit { metadata_builder = update.apply(metadata_builder)?; } - // Bump the version of metadata + // Build the new metadata + let new_metadata = metadata_builder.build()?.metadata; + let new_metadata_location = MetadataLocation::from_str(current_metadata_location)? .with_next_version() + .with_new_metadata(&new_metadata) .to_string(); Ok(table - .with_metadata(Arc::new(metadata_builder.build()?.metadata)) + .with_metadata(Arc::new(new_metadata)) .with_metadata_location(new_metadata_location)) } } diff --git a/crates/iceberg/src/compression.rs b/crates/iceberg/src/compression.rs index 1218d81df6..42f5298437 100644 --- a/crates/iceberg/src/compression.rs +++ b/crates/iceberg/src/compression.rs @@ -85,6 +85,23 @@ impl CompressionCodec { pub(crate) fn is_none(&self) -> bool { matches!(self, CompressionCodec::None) } + + /// Returns the file extension suffix for this compression codec. + /// Returns empty string for None, ".gz" for Gzip. + /// + /// # Errors + /// + /// Returns an error for Lz4 and Zstd as they are not fully supported. + pub fn suffix(&self) -> Result<&'static str> { + match self { + CompressionCodec::None => Ok(""), + CompressionCodec::Gzip => Ok(".gz"), + codec @ (CompressionCodec::Lz4 | CompressionCodec::Zstd) => Err(Error::new( + ErrorKind::FeatureUnsupported, + format!("suffix not defined for {codec:?}"), + )), + } + } } #[cfg(test)] @@ -133,4 +150,21 @@ mod tests { ); } } + + #[test] + fn test_suffix() { + // Test supported codecs + assert_eq!(CompressionCodec::None.suffix().unwrap(), ""); + assert_eq!(CompressionCodec::Gzip.suffix().unwrap(), ".gz"); + + // Test unsupported codecs return errors + assert!(CompressionCodec::Lz4.suffix().is_err()); + assert!(CompressionCodec::Zstd.suffix().is_err()); + + let lz4_err = CompressionCodec::Lz4.suffix().unwrap_err(); + assert!(lz4_err.to_string().contains("suffix not defined for Lz4")); + + let zstd_err = CompressionCodec::Zstd.suffix().unwrap_err(); + assert!(zstd_err.to_string().contains("suffix not defined for Zstd")); + } } diff --git a/crates/iceberg/src/spec/mod.rs b/crates/iceberg/src/spec/mod.rs index 707ebbb630..b23ca1eda0 100644 --- a/crates/iceberg/src/spec/mod.rs +++ b/crates/iceberg/src/spec/mod.rs @@ -50,6 +50,7 @@ pub use sort::*; pub use statistic_file::*; pub use table_metadata::*; pub(crate) use table_metadata_builder::FIRST_FIELD_ID; +pub(crate) use table_properties::parse_metadata_file_compression; pub use table_properties::*; pub use transform::*; pub(crate) use values::decimal_utils; diff --git a/crates/iceberg/src/spec/table_metadata.rs b/crates/iceberg/src/spec/table_metadata.rs index 28f753e9f1..81d20cada8 100644 --- a/crates/iceberg/src/spec/table_metadata.rs +++ b/crates/iceberg/src/spec/table_metadata.rs @@ -35,8 +35,9 @@ pub use super::table_metadata_builder::{TableMetadataBuildResult, TableMetadataB use super::{ DEFAULT_PARTITION_SPEC_ID, PartitionSpecRef, PartitionStatisticsFile, SchemaId, SchemaRef, SnapshotRef, SnapshotRetention, SortOrder, SortOrderRef, StatisticsFile, StructType, - TableProperties, + TableProperties, parse_metadata_file_compression, }; +use crate::catalog::MetadataLocation; use crate::compression::CompressionCodec; use crate::error::{Result, timestamp_ms_to_utc}; use crate::io::FileIO; @@ -360,6 +361,18 @@ impl TableMetadata { &self.properties } + /// Returns the metadata compression codec from table properties. + /// + /// Returns `CompressionCodec::None` if compression is disabled or not configured. + /// Returns `CompressionCodec::Gzip` if gzip compression is enabled. + /// + /// # Errors + /// + /// Returns an error if the compression codec property has an invalid value. + pub fn metadata_compression_codec(&self) -> Result { + parse_metadata_file_compression(&self.properties) + } + /// Returns typed table properties parsed from the raw properties map with defaults. pub fn table_properties(&self) -> Result { TableProperties::try_from(&self.properties).map_err(|e| { @@ -466,11 +479,39 @@ impl TableMetadata { pub async fn write_to( &self, file_io: &FileIO, - metadata_location: impl AsRef, + metadata_location: &MetadataLocation, ) -> Result<()> { + let json_data = serde_json::to_vec(self)?; + + // Check if compression codec from properties matches the one in metadata_location + let codec = parse_metadata_file_compression(&self.properties)?; + + if codec != metadata_location.compression_codec() { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Compression codec mismatch: metadata_location has {:?}, but table properties specify {:?}", + metadata_location.compression_codec(), + codec + ), + )); + } + + // Apply compression based on codec + let data_to_write = match codec { + CompressionCodec::Gzip => codec.compress(json_data)?, + CompressionCodec::None => json_data, + _ => { + return Err(Error::new( + ErrorKind::DataInvalid, + format!("Unsupported metadata compression codec: {codec:?}"), + )); + } + }; + file_io - .new_output(metadata_location)? - .write(serde_json::to_vec(self)?.into()) + .new_output(metadata_location.to_string())? + .write(data_to_write.into()) .await } @@ -1567,6 +1608,7 @@ mod tests { use uuid::Uuid; use super::{FormatVersion, MetadataLog, SnapshotLog, TableMetadataBuilder}; + use crate::catalog::MetadataLocation; use crate::compression::CompressionCodec; use crate::io::FileIOBuilder; use crate::spec::table_metadata::TableMetadata; @@ -1574,7 +1616,7 @@ mod tests { BlobMetadata, EncryptedKey, INITIAL_ROW_ID, Literal, NestedField, NullOrder, Operation, PartitionSpec, PartitionStatisticsFile, PrimitiveLiteral, PrimitiveType, Schema, Snapshot, SnapshotReference, SnapshotRetention, SortDirection, SortField, SortOrder, StatisticsFile, - Summary, Transform, Type, UnboundPartitionField, + Summary, TableProperties, Transform, Type, UnboundPartitionField, }; use crate::{ErrorKind, TableCreation}; @@ -3547,7 +3589,8 @@ mod tests { let original_metadata: TableMetadata = get_test_table_metadata("TableMetadataV2Valid.json"); // Define the metadata location - let metadata_location = format!("{temp_path}/metadata.json"); + let metadata_location = MetadataLocation::new_with_metadata(temp_path, &original_metadata); + let metadata_location_str = metadata_location.to_string(); // Write the metadata original_metadata @@ -3556,10 +3599,10 @@ mod tests { .unwrap(); // Verify the file exists - assert!(fs::metadata(&metadata_location).is_ok()); + assert!(fs::metadata(&metadata_location_str).is_ok()); // Read the metadata back - let read_metadata = TableMetadata::read_from(&file_io, &metadata_location) + let read_metadata = TableMetadata::read_from(&file_io, &metadata_location_str) .await .unwrap(); @@ -3603,6 +3646,63 @@ mod tests { assert!(result.is_err()); } + #[tokio::test] + async fn test_table_metadata_write_with_gzip_compression() { + let temp_dir = TempDir::new().unwrap(); + let temp_path = temp_dir.path().to_str().unwrap(); + let file_io = FileIOBuilder::new_fs_io().build().unwrap(); + + // Get a test metadata and add gzip compression property + let original_metadata: TableMetadata = get_test_table_metadata("TableMetadataV2Valid.json"); + + // Modify properties to enable gzip compression (using mixed case to test case-insensitive matching) + let mut props = original_metadata.properties.clone(); + props.insert( + TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(), + "GziP".to_string(), + ); + // Use builder to create new metadata with updated properties + let compressed_metadata = + TableMetadataBuilder::new_from_metadata(original_metadata.clone(), None) + .assign_uuid(original_metadata.table_uuid) + .set_properties(props.clone()) + .unwrap() + .build() + .unwrap() + .metadata; + + // Create MetadataLocation with compression codec from metadata + let metadata_location = + MetadataLocation::new_with_metadata(temp_path, &compressed_metadata); + let metadata_location_str = metadata_location.to_string(); + + // Verify the location has the .gz extension + assert!(metadata_location_str.contains(".gz.metadata.json")); + + // Write the metadata with compression + compressed_metadata + .write_to(&file_io, &metadata_location) + .await + .unwrap(); + + // Verify the compressed file exists + assert!(std::path::Path::new(&metadata_location_str).exists()); + + // Read the raw file and check it's gzip compressed + let raw_content = std::fs::read(&metadata_location_str).unwrap(); + assert!(raw_content.len() > 2); + assert_eq!(raw_content[0], 0x1F); // gzip magic number + assert_eq!(raw_content[1], 0x8B); // gzip magic number + + // Read the metadata back using the compressed location + let read_metadata = TableMetadata::read_from(&file_io, &metadata_location_str) + .await + .unwrap(); + + // Verify the complete round-trip: read metadata should match what we wrote + assert_eq!(read_metadata, compressed_metadata); + } + #[test] fn test_partition_name_exists() { let schema = Schema::builder() diff --git a/crates/iceberg/src/spec/table_properties.rs b/crates/iceberg/src/spec/table_properties.rs index 413604f51c..6e08318479 100644 --- a/crates/iceberg/src/spec/table_properties.rs +++ b/crates/iceberg/src/spec/table_properties.rs @@ -16,24 +16,85 @@ // under the License. use std::collections::HashMap; +use std::fmt::Display; +use std::str::FromStr; + +use crate::compression::CompressionCodec; +use crate::error::{Error, ErrorKind, Result}; // Helper function to parse a property from a HashMap // If the property is not found, use the default value -fn parse_property( +fn parse_property( properties: &HashMap, key: &str, default: T, -) -> Result +) -> Result where - ::Err: std::fmt::Display, + ::Err: Display, { properties.get(key).map_or(Ok(default), |value| { - value - .parse::() - .map_err(|e| anyhow::anyhow!("Invalid value for {key}: {e}")) + value.parse::().map_err(|e| { + Error::new( + ErrorKind::DataInvalid, + format!("Invalid value for {key}: {e}"), + ) + }) }) } +/// Parse compression codec for metadata files from table properties. +/// Retrieves the compression codec property, applies defaults, and parses the value. +/// Only "none" (or empty string) and "gzip" are supported for metadata compression. +/// +/// # Arguments +/// +/// * `properties` - HashMap containing table properties +/// +/// # Errors +/// +/// Returns an error if the codec is not "none", "", or "gzip" (case-insensitive). +/// Lz4 and Zstd are not supported for metadata file compression. +pub(crate) fn parse_metadata_file_compression( + properties: &HashMap, +) -> Result { + let value = properties + .get(TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC) + .map(|s| s.as_str()) + .unwrap_or(TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC_DEFAULT); + + // Handle empty string as None + if value.is_empty() { + return Ok(CompressionCodec::None); + } + + // Lowercase the value for case-insensitive parsing + let lowercase_value = value.to_lowercase(); + + // Use serde to parse the codec (which has rename_all = "lowercase") + let codec: CompressionCodec = serde_json::from_value(serde_json::Value::String( + lowercase_value, + )) + .map_err(|_| { + Error::new( + ErrorKind::DataInvalid, + format!( + "Invalid metadata compression codec: {value}. Only 'none' and 'gzip' are supported." + ), + ) + })?; + + // Validate that only None and Gzip are used for metadata + match codec { + CompressionCodec::None | CompressionCodec::Gzip => Ok(codec), + CompressionCodec::Lz4 | CompressionCodec::Zstd => Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Invalid metadata compression codec: {value}. Only 'none' and 'gzip' are supported for metadata files." + ), + )), + } +} + /// TableProperties that contains the properties of a table. #[derive(Debug)] pub struct TableProperties { @@ -49,6 +110,8 @@ pub struct TableProperties { pub write_format_default: String, /// The target file size for files. pub write_target_file_size_bytes: usize, + /// Compression codec for metadata files (JSON) + pub metadata_compression_codec: CompressionCodec, /// Whether to use `FanoutWriter` for partitioned tables. pub write_datafusion_fanout_enabled: bool, } @@ -139,6 +202,11 @@ impl TableProperties { pub const PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES: &str = "write.target-file-size-bytes"; /// Default target file size pub const PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT: usize = 512 * 1024 * 1024; // 512 MB + + /// Compression codec for metadata files (JSON) + pub const PROPERTY_METADATA_COMPRESSION_CODEC: &str = "write.metadata.compression-codec"; + /// Default metadata compression codec - uncompressed + pub const PROPERTY_METADATA_COMPRESSION_CODEC_DEFAULT: &str = "none"; /// Whether to use `FanoutWriter` for partitioned tables (handles unsorted data). /// If false, uses `ClusteredWriter` (requires sorted data, more memory efficient). pub const PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED: &str = "write.datafusion.fanout.enabled"; @@ -148,9 +216,9 @@ impl TableProperties { impl TryFrom<&HashMap> for TableProperties { // parse by entry key or use default value - type Error = anyhow::Error; + type Error = Error; - fn try_from(props: &HashMap) -> Result { + fn try_from(props: &HashMap) -> Result { Ok(TableProperties { commit_num_retries: parse_property( props, @@ -182,6 +250,7 @@ impl TryFrom<&HashMap> for TableProperties { TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES, TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT, )?, + metadata_compression_codec: parse_metadata_file_compression(props)?, write_datafusion_fanout_enabled: parse_property( props, TableProperties::PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED, @@ -194,6 +263,7 @@ impl TryFrom<&HashMap> for TableProperties { #[cfg(test)] mod tests { use super::*; + use crate::compression::CompressionCodec; #[test] fn test_table_properties_default() { @@ -219,6 +289,73 @@ mod tests { table_properties.write_target_file_size_bytes, TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT ); + // Test compression defaults (none means CompressionCodec::None) + assert_eq!( + table_properties.metadata_compression_codec, + CompressionCodec::None + ); + } + + #[test] + fn test_table_properties_compression() { + let props = HashMap::from([( + TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(), + "gzip".to_string(), + )]); + let table_properties = TableProperties::try_from(&props).unwrap(); + assert_eq!( + table_properties.metadata_compression_codec, + CompressionCodec::Gzip + ); + } + + #[test] + fn test_table_properties_compression_none() { + let props = HashMap::from([( + TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(), + "none".to_string(), + )]); + let table_properties = TableProperties::try_from(&props).unwrap(); + assert_eq!( + table_properties.metadata_compression_codec, + CompressionCodec::None + ); + } + + #[test] + fn test_table_properties_compression_case_insensitive() { + // Test uppercase + let props_upper = HashMap::from([( + TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(), + "GZIP".to_string(), + )]); + let table_properties = TableProperties::try_from(&props_upper).unwrap(); + assert_eq!( + table_properties.metadata_compression_codec, + CompressionCodec::Gzip + ); + + // Test mixed case + let props_mixed = HashMap::from([( + TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(), + "GzIp".to_string(), + )]); + let table_properties = TableProperties::try_from(&props_mixed).unwrap(); + assert_eq!( + table_properties.metadata_compression_codec, + CompressionCodec::Gzip + ); + + // Test "NONE" should also be case-insensitive + let props_none_upper = HashMap::from([( + TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(), + "NONE".to_string(), + )]); + let table_properties = TableProperties::try_from(&props_none_upper).unwrap(); + assert_eq!( + table_properties.metadata_compression_codec, + CompressionCodec::None + ); } #[test] @@ -293,4 +430,118 @@ mod tests { "Invalid value for write.target-file-size-bytes: invalid digit found in string" )); } + + #[test] + fn test_table_properties_compression_invalid_rejected() { + let invalid_codecs = ["lz4", "zstd", "snappy"]; + + for codec in invalid_codecs { + let props = HashMap::from([( + TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(), + codec.to_string(), + )]); + let err = TableProperties::try_from(&props).unwrap_err(); + let err_msg = err.to_string(); + assert!( + err_msg.contains(&format!("Invalid metadata compression codec: {codec}")), + "Expected error message to contain codec '{codec}', got: {err_msg}" + ); + assert!( + err_msg.contains("Only 'none' and 'gzip' are supported"), + "Expected error message to contain supported codecs, got: {err_msg}" + ); + } + } + + #[test] + fn test_parse_metadata_file_compression_valid() { + // Test with "none" + let props = HashMap::from([( + TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(), + "none".to_string(), + )]); + assert_eq!( + parse_metadata_file_compression(&props).unwrap(), + CompressionCodec::None + ); + + // Test with empty string + let props = HashMap::from([( + TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(), + "".to_string(), + )]); + assert_eq!( + parse_metadata_file_compression(&props).unwrap(), + CompressionCodec::None + ); + + // Test with "gzip" + let props = HashMap::from([( + TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(), + "gzip".to_string(), + )]); + assert_eq!( + parse_metadata_file_compression(&props).unwrap(), + CompressionCodec::Gzip + ); + + // Test case insensitivity - "NONE" + let props = HashMap::from([( + TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(), + "NONE".to_string(), + )]); + assert_eq!( + parse_metadata_file_compression(&props).unwrap(), + CompressionCodec::None + ); + + // Test case insensitivity - "GZIP" + let props = HashMap::from([( + TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(), + "GZIP".to_string(), + )]); + assert_eq!( + parse_metadata_file_compression(&props).unwrap(), + CompressionCodec::Gzip + ); + + // Test case insensitivity - "GzIp" + let props = HashMap::from([( + TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(), + "GzIp".to_string(), + )]); + assert_eq!( + parse_metadata_file_compression(&props).unwrap(), + CompressionCodec::Gzip + ); + + // Test default when property is missing + let props = HashMap::new(); + assert_eq!( + parse_metadata_file_compression(&props).unwrap(), + CompressionCodec::None + ); + } + + #[test] + fn test_parse_metadata_file_compression_invalid() { + let invalid_codecs = ["lz4", "zstd", "snappy"]; + + for codec in invalid_codecs { + let props = HashMap::from([( + TableProperties::PROPERTY_METADATA_COMPRESSION_CODEC.to_string(), + codec.to_string(), + )]); + let err = parse_metadata_file_compression(&props).unwrap_err(); + let err_msg = err.to_string(); + assert!( + err_msg.contains("Invalid metadata compression codec"), + "Expected error message to contain 'Invalid metadata compression codec', got: {err_msg}" + ); + assert!( + err_msg.contains("Only 'none' and 'gzip' are supported"), + "Expected error message to contain supported codecs, got: {err_msg}" + ); + } + } }