Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 106 additions & 1 deletion crates/paimon/src/spec/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ impl Schema {
let primary_keys = Self::normalize_primary_keys(&primary_keys, &mut options)?;
let partition_keys = Self::normalize_partition_keys(&partition_keys, &mut options)?;
let fields = Self::normalize_fields(&fields, &partition_keys, &primary_keys)?;
Self::validate_blob_fields(&fields, &partition_keys, &options)?;

Ok(Self {
fields,
Expand Down Expand Up @@ -391,6 +392,54 @@ impl Schema {
Ok(())
}

fn validate_blob_fields(
fields: &[DataField],
partition_keys: &[String],
options: &HashMap<String, String>,
) -> crate::Result<()> {
let blob_field_names = Self::top_level_blob_field_names(fields);
if blob_field_names.is_empty() {
return Ok(());
}

let core_options = CoreOptions::new(options);
if !core_options.data_evolution_enabled() {
return Err(crate::Error::ConfigInvalid {
message: "Data evolution config must enabled for table with BLOB type column."
.to_string(),
});
}

if fields.len() == blob_field_names.len() {
return Err(crate::Error::ConfigInvalid {
message: "Table with BLOB type column must have other normal columns.".to_string(),
});
}

let partition_key_set: HashSet<&str> = partition_keys.iter().map(String::as_str).collect();
if blob_field_names
.iter()
.any(|name| partition_key_set.contains(name))
{
return Err(crate::Error::ConfigInvalid {
message: "The BLOB type column can not be part of partition keys.".to_string(),
});
}

Ok(())
}

/// Returns top-level Blob field names for create-time Blob contract checks.
fn top_level_blob_field_names(fields: &[DataField]) -> Vec<&str> {
fields
.iter()
.filter_map(|field| match field.data_type() {
DataType::Blob(_) => Some(field.name()),
_ => None,
})
.collect()
}

/// Returns the set of names that appear more than once.
pub fn duplicate_fields(names: &[String]) -> HashSet<String> {
let mut seen = HashMap::new();
Expand Down Expand Up @@ -572,7 +621,7 @@ impl Default for SchemaBuilder {

#[cfg(test)]
mod tests {
use crate::spec::IntType;
use crate::spec::{BlobType, IntType};

use super::*;

Expand Down Expand Up @@ -745,6 +794,62 @@ mod tests {
assert_eq!(schema.primary_keys(), &["a", "b"]);
}

#[test]
fn test_blob_schema_validation_requires_data_evolution() {
let err = Schema::builder()
.column("id", DataType::Int(IntType::new()))
.column("payload", DataType::Blob(BlobType::new()))
.build()
.unwrap_err();

assert!(
matches!(err, crate::Error::ConfigInvalid { message } if message.contains("Data evolution config must enabled")),
"blob columns should require data-evolution.enabled"
);
}

#[test]
fn test_blob_schema_validation_rejects_all_blob_columns() {
let err = Schema::builder()
.column("payload", DataType::Blob(BlobType::new()))
.option("data-evolution.enabled", "true")
.build()
.unwrap_err();

assert!(
matches!(err, crate::Error::ConfigInvalid { message } if message.contains("must have other normal columns")),
"blob-only tables should be rejected"
);
}

#[test]
fn test_blob_schema_validation_rejects_blob_partition_keys() {
let err = Schema::builder()
.column("id", DataType::Int(IntType::new()))
.column("payload", DataType::Blob(BlobType::new()))
.partition_keys(["payload"])
.option("data-evolution.enabled", "true")
.build()
.unwrap_err();

assert!(
matches!(err, crate::Error::ConfigInvalid { message } if message.contains("can not be part of partition keys")),
"blob columns should be rejected as partition keys during schema validation"
);
}

#[test]
fn test_blob_schema_validation_accepts_valid_blob_table() {
let schema = Schema::builder()
.column("id", DataType::Int(IntType::new()))
.column("payload", DataType::Blob(BlobType::new()))
.option("data-evolution.enabled", "true")
.build()
.unwrap();

assert_eq!(schema.fields().len(), 2);
}

#[test]
fn test_schema_builder_column_row_type() {
let row_type = RowType::new(vec![DataField::new(
Expand Down
1 change: 1 addition & 0 deletions crates/paimon/src/table/table_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,7 @@ mod tests {
let schema = Schema::builder()
.column("id", DataType::Int(IntType::new()))
.column("payload", DataType::Blob(BlobType::new()))
.option("data-evolution.enabled", "true")
.build()
.unwrap();
TableSchema::new(0, &schema)
Expand Down
Loading