diff --git a/crates/integrations/datafusion/src/sql_context.rs b/crates/integrations/datafusion/src/sql_context.rs index 62e2b7b3..3f7ff189 100644 --- a/crates/integrations/datafusion/src/sql_context.rs +++ b/crates/integrations/datafusion/src/sql_context.rs @@ -3147,7 +3147,7 @@ mod tests { assert_eq!(identifier.object(), "t1"); assert_eq!(changes.len(), 1); assert!( - matches!(&changes[0], SchemaChange::AddColumn { field_name, .. } if field_name == "age") + matches!(&changes[0], SchemaChange::AddColumn { field_names, .. } if field_names.first().map(String::as_str) == Some("age")) ); } else { panic!("expected AlterTable call"); @@ -3171,10 +3171,10 @@ mod tests { assert!(matches!( &changes[0], SchemaChange::AddColumn { - field_name, + field_names, data_type, .. - } if field_name == "payload" && matches!(data_type, PaimonDataType::Blob(_)) + } if field_names.first().map(String::as_str) == Some("payload") && matches!(data_type, PaimonDataType::Blob(_)) )); } else { panic!("expected AlterTable call"); @@ -3196,7 +3196,7 @@ mod tests { if let CatalogCall::AlterTable { changes, .. } = &calls[0] { assert_eq!(changes.len(), 1); assert!( - matches!(&changes[0], SchemaChange::DropColumn { field_name } if field_name == "age") + matches!(&changes[0], SchemaChange::DropColumn { field_names } if field_names.first().map(String::as_str) == Some("age")) ); } else { panic!("expected AlterTable call"); @@ -3219,8 +3219,8 @@ mod tests { assert_eq!(changes.len(), 1); assert!(matches!( &changes[0], - SchemaChange::RenameColumn { field_name, new_name } - if field_name == "old_name" && new_name == "new_name" + SchemaChange::RenameColumn { field_names, new_name } + if field_names.first().map(String::as_str) == Some("old_name") && new_name == "new_name" )); } else { panic!("expected AlterTable call"); diff --git a/crates/integrations/datafusion/tests/sql_context_tests.rs b/crates/integrations/datafusion/tests/sql_context_tests.rs index 7deed175..d4f0c1da 100644 --- a/crates/integrations/datafusion/tests/sql_context_tests.rs +++ b/crates/integrations/datafusion/tests/sql_context_tests.rs @@ -480,23 +480,18 @@ async fn test_alter_table_add_column() { .await .unwrap(); - // ALTER TABLE is not yet implemented in FileSystemCatalog, so we expect an error - let result = sql_context + sql_context .sql("ALTER TABLE paimon.mydb.alter_test ADD COLUMN age INT") - .await; + .await + .expect("ALTER TABLE ADD COLUMN should succeed"); - // FileSystemCatalog does not support AddColumn schema change yet - assert!( - result.is_err(), - "ALTER TABLE ADD COLUMN should fail because AddColumn is not yet supported" - ); - let err_msg = result.unwrap_err().to_string(); - assert!( - err_msg.contains("not yet implemented") - || err_msg.contains("Unsupported") - || err_msg.contains("not yet supported"), - "Error should indicate alter_table is not implemented, got: {err_msg}" - ); + // The new column is appended to the table schema. + let table = catalog + .get_table(&Identifier::new("mydb", "alter_test")) + .await + .unwrap(); + let names: Vec<&str> = table.schema().fields().iter().map(|f| f.name()).collect(); + assert_eq!(names, vec!["id", "name", "age"]); } #[tokio::test] diff --git a/crates/paimon/src/api/api_request.rs b/crates/paimon/src/api/api_request.rs index 33a52fd7..28e4bacf 100644 --- a/crates/paimon/src/api/api_request.rs +++ b/crates/paimon/src/api/api_request.rs @@ -22,7 +22,10 @@ use serde::{Deserialize, Serialize}; use std::collections::HashMap; -use crate::{catalog::Identifier, spec::Schema}; +use crate::{ + catalog::Identifier, + spec::{Schema, SchemaChange}, +}; /// Request to create a new database. #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] @@ -95,6 +98,23 @@ impl CreateTableRequest { } } +/// Request to alter a table's schema. +/// +/// Wire-compatible with Java Paimon's `AlterTableRequest` (`{"changes": [...]}`). +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct AlterTableRequest { + /// The ordered list of schema changes to apply. + pub changes: Vec, +} + +impl AlterTableRequest { + /// Create a new AlterTableRequest. + pub fn new(changes: Vec) -> Self { + Self { changes } + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/crates/paimon/src/api/mod.rs b/crates/paimon/src/api/mod.rs index 8584cb08..7672b9a7 100644 --- a/crates/paimon/src/api/mod.rs +++ b/crates/paimon/src/api/mod.rs @@ -31,7 +31,8 @@ mod api_response; // Re-export request types pub use api_request::{ - AlterDatabaseRequest, CreateDatabaseRequest, CreateTableRequest, RenameTableRequest, + AlterDatabaseRequest, AlterTableRequest, CreateDatabaseRequest, CreateTableRequest, + RenameTableRequest, }; // Re-export response types diff --git a/crates/paimon/src/api/rest_api.rs b/crates/paimon/src/api/rest_api.rs index 5333fcae..8721f48c 100644 --- a/crates/paimon/src/api/rest_api.rs +++ b/crates/paimon/src/api/rest_api.rs @@ -25,11 +25,12 @@ use std::collections::HashMap; use crate::api::rest_client::HttpClient; use crate::catalog::Identifier; use crate::common::{CatalogOptions, Options}; -use crate::spec::{Partition, PartitionStatistics, Schema, Snapshot}; +use crate::spec::{Partition, PartitionStatistics, Schema, SchemaChange, Snapshot}; use crate::Result; use super::api_request::{ - AlterDatabaseRequest, CreateDatabaseRequest, CreateTableRequest, RenameTableRequest, + AlterDatabaseRequest, AlterTableRequest, CreateDatabaseRequest, CreateTableRequest, + RenameTableRequest, }; use super::api_response::{ ConfigResponse, GetDatabaseResponse, GetTableResponse, ListDatabasesResponse, @@ -343,6 +344,21 @@ impl RESTApi { Ok(()) } + /// Alter a table's schema by applying a list of schema changes. + pub async fn alter_table( + &self, + identifier: &Identifier, + changes: Vec, + ) -> Result<()> { + let database = identifier.database(); + let table = identifier.object(); + validate_non_empty_multi(&[(database, "database name"), (table, "table name")])?; + let path = self.resource_paths.table(database, table); + let request = AlterTableRequest::new(changes); + let _resp: serde_json::Value = self.client.post(&path, &request).await?; + Ok(()) + } + /// Get table information. pub async fn get_table(&self, identifier: &Identifier) -> Result { let database = identifier.database(); diff --git a/crates/paimon/src/catalog/filesystem.rs b/crates/paimon/src/catalog/filesystem.rs index 7f18f952..7a1e8dc3 100644 --- a/crates/paimon/src/catalog/filesystem.rs +++ b/crates/paimon/src/catalog/filesystem.rs @@ -439,11 +439,29 @@ impl Catalog for FileSystemCatalog { full_name: identifier.full_name(), })?; - let new_schema = current.apply_changes(changes)?; + let new_schema = current + .apply_changes(changes) + .map_err(|e| fill_table_name(e, identifier))?; self.save_table_schema(&table_path, &new_schema).await } } +/// `TableSchema::apply_changes` returns column errors without a table name; +/// fill in the identifier's full name so the message identifies the table. +fn fill_table_name(err: Error, identifier: &Identifier) -> Error { + match err { + Error::ColumnNotExist { column, .. } => Error::ColumnNotExist { + full_name: identifier.full_name(), + column, + }, + Error::ColumnAlreadyExist { column, .. } => Error::ColumnAlreadyExist { + full_name: identifier.full_name(), + column, + }, + other => other, + } +} + #[cfg(test)] #[cfg(not(windows))] // Skip on Windows due to path compatibility issues mod tests { @@ -728,4 +746,201 @@ mod tests { ); } } + + use crate::spec::{ColumnMove, DataType, IntType, SchemaChange, VarCharType}; + + /// Two-column table (id INT, name VARCHAR) used by the alter-table tests. + fn two_column_schema() -> Schema { + Schema::builder() + .column("id", DataType::Int(IntType::new())) + .column("name", DataType::VarChar(VarCharType::string_type())) + .build() + .unwrap() + } + + async fn setup_table(catalog: &FileSystemCatalog, schema: Schema) -> Identifier { + catalog + .create_database("db", false, HashMap::new()) + .await + .unwrap(); + let id = Identifier::new("db", "t"); + catalog.create_table(&id, schema, false).await.unwrap(); + id + } + + #[tokio::test] + async fn test_alter_table_column_changes() { + let (_tmp, catalog) = create_test_catalog(); + let id = setup_table(&catalog, two_column_schema()).await; + + // Add a column at the end; it must take highest_field_id + 1. + catalog + .alter_table( + &id, + vec![SchemaChange::add_column( + "age".to_string(), + DataType::Int(IntType::new()), + )], + false, + ) + .await + .unwrap(); + let ts = catalog.get_table(&id).await.unwrap(); + let ts = ts.schema(); + let names: Vec<&str> = ts.fields().iter().map(|f| f.name()).collect(); + assert_eq!(names, vec!["id", "name", "age"]); + let age = ts.fields().iter().find(|f| f.name() == "age").unwrap(); + assert_eq!(age.id(), 2, "new column gets highest_field_id + 1"); + assert_eq!(ts.id(), 1, "schema version bumped"); + + // Add a column moved to the front. + catalog + .alter_table( + &id, + vec![SchemaChange::add_column_with_description_and_column_move( + "rowkey".to_string(), + DataType::Int(IntType::new()), + "primary".to_string(), + ColumnMove::move_first("rowkey".to_string()), + )], + false, + ) + .await + .unwrap(); + let ts = catalog.get_table(&id).await.unwrap(); + let ts = ts.schema(); + assert_eq!(ts.fields()[0].name(), "rowkey"); + assert_eq!(ts.fields()[0].description(), Some("primary")); + + // Rename, update comment, update type, update nullability, drop. + catalog + .alter_table( + &id, + vec![ + SchemaChange::rename_column("name".to_string(), "full_name".to_string()), + SchemaChange::update_column_comment("id".to_string(), "the id".to_string()), + SchemaChange::update_column_type( + "age".to_string(), + DataType::BigInt(crate::spec::BigIntType::new()), + ), + SchemaChange::update_column_nullability("id".to_string(), false), + SchemaChange::drop_column("rowkey".to_string()), + ], + false, + ) + .await + .unwrap(); + let ts = catalog.get_table(&id).await.unwrap(); + let ts = ts.schema(); + let names: Vec<&str> = ts.fields().iter().map(|f| f.name()).collect(); + assert_eq!(names, vec!["id", "full_name", "age"]); + let id_field = ts.fields().iter().find(|f| f.name() == "id").unwrap(); + assert_eq!(id_field.description(), Some("the id")); + assert!(!id_field.data_type().is_nullable()); + let age_field = ts.fields().iter().find(|f| f.name() == "age").unwrap(); + assert!(matches!(age_field.data_type(), DataType::BigInt(_))); + } + + #[tokio::test] + async fn test_alter_table_reposition_column() { + let (_tmp, catalog) = create_test_catalog(); + let id = setup_table(&catalog, two_column_schema()).await; + + // Move `name` before `id`. + catalog + .alter_table( + &id, + vec![SchemaChange::update_column_position( + ColumnMove::move_first("name".to_string()), + )], + false, + ) + .await + .unwrap(); + let ts = catalog.get_table(&id).await.unwrap(); + let names: Vec<&str> = ts.schema().fields().iter().map(|f| f.name()).collect(); + assert_eq!(names, vec!["name", "id"]); + } + + #[tokio::test] + async fn test_alter_table_errors() { + let (_tmp, catalog) = create_test_catalog(); + let id = setup_table(&catalog, two_column_schema()).await; + + // Add a duplicate column -> ColumnAlreadyExist. + let err = catalog + .alter_table( + &id, + vec![SchemaChange::add_column( + "name".to_string(), + DataType::Int(IntType::new()), + )], + false, + ) + .await + .unwrap_err(); + assert!( + matches!(err, Error::ColumnAlreadyExist { .. }), + "got {err:?}" + ); + + // Drop a missing column -> ColumnNotExist. + let err = catalog + .alter_table( + &id, + vec![SchemaChange::drop_column("ghost".to_string())], + false, + ) + .await + .unwrap_err(); + assert!(matches!(err, Error::ColumnNotExist { .. }), "got {err:?}"); + + // Altering a missing table: ignored vs error. + let missing = Identifier::new("db", "nope"); + catalog + .alter_table( + &missing, + vec![SchemaChange::update_column_comment( + "id".to_string(), + "x".to_string(), + )], + true, + ) + .await + .unwrap(); + let err = catalog + .alter_table( + &missing, + vec![SchemaChange::update_column_comment( + "id".to_string(), + "x".to_string(), + )], + false, + ) + .await + .unwrap_err(); + assert!(matches!(err, Error::TableNotExist { .. }), "got {err:?}"); + } + + #[tokio::test] + async fn test_alter_table_drop_primary_key_column_rejected() { + let (_tmp, catalog) = create_test_catalog(); + let schema = Schema::builder() + .column("id", DataType::Int(IntType::new())) + .column("name", DataType::VarChar(VarCharType::string_type())) + .primary_key(["id"]) + .build() + .unwrap(); + let id = setup_table(&catalog, schema).await; + + let err = catalog + .alter_table( + &id, + vec![SchemaChange::drop_column("id".to_string())], + false, + ) + .await + .unwrap_err(); + assert!(matches!(err, Error::Unsupported { .. }), "got {err:?}"); + } } diff --git a/crates/paimon/src/catalog/rest/rest_catalog.rs b/crates/paimon/src/catalog/rest/rest_catalog.rs index 09b93547..5ff5d6b0 100644 --- a/crates/paimon/src/catalog/rest/rest_catalog.rs +++ b/crates/paimon/src/catalog/rest/rest_catalog.rs @@ -330,13 +330,17 @@ impl Catalog for RESTCatalog { async fn alter_table( &self, - _identifier: &Identifier, - _changes: Vec, - _ignore_if_not_exists: bool, + identifier: &Identifier, + changes: Vec, + ignore_if_not_exists: bool, ) -> Result<()> { - // TODO: Implement alter_table when RESTApi supports it - Err(Error::Unsupported { - message: "Alter table is not yet implemented for REST catalog".to_string(), + let result = self + .api + .alter_table(identifier, changes) + .await + .map_err(|e| map_rest_error_for_table(e, identifier)); + ignore_error_if(result, |e| { + ignore_if_not_exists && matches!(e, Error::TableNotExist { .. }) }) } diff --git a/crates/paimon/src/spec/schema.rs b/crates/paimon/src/spec/schema.rs index 76d09b4c..4b51b061 100644 --- a/crates/paimon/src/spec/schema.rs +++ b/crates/paimon/src/spec/schema.rs @@ -17,7 +17,7 @@ use crate::spec::core_options::{first_row_supports_changelog_producer, CoreOptions}; use crate::spec::types::{ArrayType, DataType, MapType, MultisetType, RowType}; -use crate::spec::PartialUpdateConfig; +use crate::spec::{ColumnMove, ColumnMoveType, PartialUpdateConfig}; use serde::{Deserialize, Serialize}; use serde_with::serde_as; use std::collections::{HashMap, HashSet}; @@ -127,27 +127,153 @@ impl TableSchema { } /// Apply a list of schema changes and return a new schema with incremented ID. + /// + /// Column-level changes operate on **top-level** columns only: a + /// `field_names` path with more than one element (a nested struct field) is + /// rejected with [`crate::Error::Unsupported`]. + /// + /// Column errors ([`crate::Error::ColumnNotExist`] / + /// [`crate::Error::ColumnAlreadyExist`]) are returned with an empty table + /// name; the calling catalog fills in the table's full name. pub fn apply_changes(&self, changes: Vec) -> crate::Result { + use crate::spec::SchemaChange; + + // Column errors carry no table name here; the catalog layer fills it in. + let full_name = ""; + let mut new_schema = self.clone(); new_schema.id += 1; new_schema.time_millis = chrono::Utc::now().timestamp_millis(); + // Operate on an owned field list, then write it back. + let mut fields = std::mem::take(&mut new_schema.fields); + let mut highest_field_id = new_schema.highest_field_id; + for change in changes { match change { - crate::spec::SchemaChange::SetOption { key, value } => { + SchemaChange::SetOption { key, value } => { new_schema.options.insert(key, value); } - crate::spec::SchemaChange::RemoveOption { key } => { + SchemaChange::RemoveOption { key } => { new_schema.options.remove(&key); } - other => { - return Err(crate::Error::Unsupported { - message: format!("Schema change not yet supported: {other:?}"), - }); + SchemaChange::UpdateComment { comment } => { + new_schema.comment = comment; + } + SchemaChange::AddColumn { + field_names, + data_type, + comment, + column_move, + } => { + let name = top_level_field(&field_names)?; + if field_index(&fields, name).is_some() { + return Err(crate::Error::ColumnAlreadyExist { + full_name: full_name.to_string(), + column: name.to_string(), + }); + } + highest_field_id += 1; + let field = DataField::new(highest_field_id, name.to_string(), data_type) + .with_description(comment); + insert_field_with_move(&mut fields, field, column_move.as_ref(), full_name)?; + } + SchemaChange::RenameColumn { + field_names, + new_name, + } => { + let name = top_level_field(&field_names)?; + let idx = + field_index(&fields, name).ok_or_else(|| crate::Error::ColumnNotExist { + full_name: full_name.to_string(), + column: name.to_string(), + })?; + if new_name != name && field_index(&fields, &new_name).is_some() { + return Err(crate::Error::ColumnAlreadyExist { + full_name: full_name.to_string(), + column: new_name, + }); + } + fields[idx] = fields[idx].clone().with_name(new_name.clone()); + rename_in_keys(&mut new_schema.partition_keys, name, &new_name); + rename_in_keys(&mut new_schema.primary_keys, name, &new_name); + } + SchemaChange::DropColumn { field_names } => { + let name = top_level_field(&field_names)?; + let idx = + field_index(&fields, name).ok_or_else(|| crate::Error::ColumnNotExist { + full_name: full_name.to_string(), + column: name.to_string(), + })?; + if new_schema.partition_keys.iter().any(|k| k == name) + || new_schema.primary_keys.iter().any(|k| k == name) + { + return Err(crate::Error::Unsupported { + message: format!( + "Cannot drop partition or primary key column '{name}' of table {full_name}" + ), + }); + } + fields.remove(idx); + } + SchemaChange::UpdateColumnType { + field_names, + new_data_type, + keep_nullability, + } => { + let name = top_level_field(&field_names)?; + let idx = + field_index(&fields, name).ok_or_else(|| crate::Error::ColumnNotExist { + full_name: full_name.to_string(), + column: name.to_string(), + })?; + let old = &fields[idx]; + // Lenient: replace the type without cast-compatibility checks. + let target = if keep_nullability { + new_data_type.copy_with_nullable(old.data_type().is_nullable())? + } else { + new_data_type + }; + fields[idx] = DataField::new(old.id(), old.name().to_string(), target) + .with_description(old.description().map(|s| s.to_string())); + } + SchemaChange::UpdateColumnNullability { + field_names, + new_nullability, + } => { + let name = top_level_field(&field_names)?; + let idx = + field_index(&fields, name).ok_or_else(|| crate::Error::ColumnNotExist { + full_name: full_name.to_string(), + column: name.to_string(), + })?; + let old = &fields[idx]; + let nt = old.data_type().copy_with_nullable(new_nullability)?; + fields[idx] = DataField::new(old.id(), old.name().to_string(), nt) + .with_description(old.description().map(|s| s.to_string())); + } + SchemaChange::UpdateColumnComment { + field_names, + new_comment, + } => { + let name = top_level_field(&field_names)?; + let idx = + field_index(&fields, name).ok_or_else(|| crate::Error::ColumnNotExist { + full_name: full_name.to_string(), + column: name.to_string(), + })?; + fields[idx] = fields[idx].clone().with_description(Some(new_comment)); + } + SchemaChange::UpdateColumnPosition { column_move } => { + apply_move(&mut fields, &column_move, full_name)?; } } } + new_schema.fields = fields; + new_schema.highest_field_id = + highest_field_id.max(Self::current_highest_field_id(&new_schema.fields)); + Schema::validate_first_row_changelog_producer(&new_schema.options)?; Ok(new_schema) } @@ -180,6 +306,107 @@ impl TableSchema { } } +/// Extract the single top-level column name from a `field_names` path. +/// +/// Nested struct field paths (length > 1) are not yet supported. +fn top_level_field(field_names: &[String]) -> crate::Result<&str> { + match field_names { + [name] => Ok(name.as_str()), + [] => Err(crate::Error::ConfigInvalid { + message: "Schema change has empty fieldNames".to_string(), + }), + _ => Err(crate::Error::Unsupported { + message: format!("Altering nested struct fields is not supported yet: {field_names:?}"), + }), + } +} + +/// Index of the field with the given name, if any. +fn field_index(fields: &[DataField], name: &str) -> Option { + fields.iter().position(|f| f.name() == name) +} + +/// Rename a key in a partition/primary key list, if present. +fn rename_in_keys(keys: &mut [String], old: &str, new: &str) { + for key in keys.iter_mut() { + if key == old { + *key = new.to_string(); + } + } +} + +/// Insert a brand-new field according to an optional move (used by `AddColumn`). +fn insert_field_with_move( + fields: &mut Vec, + field: DataField, + column_move: Option<&ColumnMove>, + full_name: &str, +) -> crate::Result<()> { + let Some(mv) = column_move else { + fields.push(field); + return Ok(()); + }; + match mv.move_type() { + ColumnMoveType::FIRST => fields.insert(0, field), + ColumnMoveType::LAST => fields.push(field), + ColumnMoveType::AFTER | ColumnMoveType::BEFORE => { + let reference = move_reference(mv)?; + let ref_idx = + field_index(fields, reference).ok_or_else(|| crate::Error::ColumnNotExist { + full_name: full_name.to_string(), + column: reference.to_string(), + })?; + let at = match mv.move_type() { + ColumnMoveType::AFTER => ref_idx + 1, + _ => ref_idx, + }; + fields.insert(at, field); + } + } + Ok(()) +} + +/// Move an existing field to a new position (used by `UpdateColumnPosition`). +/// +/// Mirrors Java `SchemaManager.applyMove`: remove the field first, then resolve +/// the reference index in the reduced list so the offset is already adjusted. +fn apply_move(fields: &mut Vec, mv: &ColumnMove, full_name: &str) -> crate::Result<()> { + let idx = field_index(fields, mv.field_name()).ok_or_else(|| crate::Error::ColumnNotExist { + full_name: full_name.to_string(), + column: mv.field_name().to_string(), + })?; + let field = fields.remove(idx); + match mv.move_type() { + ColumnMoveType::FIRST => fields.insert(0, field), + ColumnMoveType::LAST => fields.push(field), + ColumnMoveType::AFTER | ColumnMoveType::BEFORE => { + let reference = move_reference(mv)?; + let ref_idx = + field_index(fields, reference).ok_or_else(|| crate::Error::ColumnNotExist { + full_name: full_name.to_string(), + column: reference.to_string(), + })?; + let at = match mv.move_type() { + ColumnMoveType::AFTER => ref_idx + 1, + _ => ref_idx, + }; + fields.insert(at, field); + } + } + Ok(()) +} + +/// The reference (anchor) field name required by `AFTER`/`BEFORE` moves. +fn move_reference(mv: &ColumnMove) -> crate::Result<&str> { + mv.reference_field_name() + .ok_or_else(|| crate::Error::ConfigInvalid { + message: format!( + "Move of type {:?} requires a reference field name", + mv.move_type() + ), + }) +} + pub const ROW_ID_FIELD_NAME: &str = "_ROW_ID"; pub const ROW_ID_FIELD_ID: i32 = i32::MAX - 5; diff --git a/crates/paimon/src/spec/schema_change.rs b/crates/paimon/src/spec/schema_change.rs index 6a11c0a4..9f69400b 100644 --- a/crates/paimon/src/spec/schema_change.rs +++ b/crates/paimon/src/spec/schema_change.rs @@ -20,77 +20,71 @@ use serde::{Deserialize, Serialize}; /// Schema change to table. /// -/// Reference: -#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] +/// The JSON wire format is kept compatible with Java Paimon's `SchemaChange`, +/// which is an internally-tagged polymorphic type (`@JsonTypeInfo` with an +/// `"action"` discriminator). Each variant therefore serializes as +/// `{"action": "", ...fields}` with `fieldNames` arrays (a column path; +/// only top-level single-element paths are currently applied — see +/// `TableSchema::apply_changes`). +/// +/// Reference: +// +// Note: `dropPrimaryKey` and `updateColumnDefaultValue` from Java are not yet +// modeled here; they are out of scope for the current alter-table support. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(tag = "action", rename_all = "camelCase")] pub enum SchemaChange { /// A SchemaChange to set a table option. - /// - /// Reference: SetOption { key: String, value: String }, /// A SchemaChange to remove a table option. - /// - /// Reference: RemoveOption { key: String }, /// A SchemaChange to update a table comment. - /// - /// Reference: UpdateComment { comment: Option }, /// A SchemaChange to add a new field. - /// - /// Reference: #[serde(rename_all = "camelCase")] AddColumn { - field_name: String, + field_names: Vec, data_type: DataType, - description: Option, + comment: Option, #[serde(rename = "move")] column_move: Option, }, /// A SchemaChange to rename a field. - /// - /// Reference: #[serde(rename_all = "camelCase")] RenameColumn { - field_name: String, + field_names: Vec, new_name: String, }, /// A SchemaChange to drop a field. - /// - /// Reference: #[serde(rename_all = "camelCase")] - DropColumn { field_name: String }, + DropColumn { field_names: Vec }, /// A SchemaChange to update the field's type. - /// - /// Reference: #[serde(rename_all = "camelCase")] UpdateColumnType { - field_name: String, - data_type: DataType, - }, - /// A SchemaChange to update the field's position. - /// - /// Reference: - #[serde(rename_all = "camelCase")] - UpdateColumnPosition { - #[serde(rename = "move")] - column_move: ColumnMove, + field_names: Vec, + new_data_type: DataType, + /// When true, keep the existing column's nullability instead of taking + /// it from `new_data_type`. + #[serde(default)] + keep_nullability: bool, }, /// A SchemaChange to update the field's nullability. - /// - /// Reference: #[serde(rename_all = "camelCase")] UpdateColumnNullability { - field_name: Vec, - nullable: bool, + field_names: Vec, + new_nullability: bool, }, /// A SchemaChange to update the (nested) field's comment. - /// - /// Reference: #[serde(rename_all = "camelCase")] UpdateColumnComment { field_names: Vec, - new_description: String, + new_comment: String, + }, + /// A SchemaChange to update the field's position. + #[serde(rename_all = "camelCase")] + UpdateColumnPosition { + #[serde(rename = "move")] + column_move: ColumnMove, }, } @@ -113,9 +107,9 @@ impl SchemaChange { /// impl the `add_column`. pub fn add_column(field_name: String, data_type: DataType) -> Self { SchemaChange::AddColumn { - field_name, + field_names: vec![field_name], data_type, - description: None, + comment: None, column_move: None, } } @@ -127,9 +121,9 @@ impl SchemaChange { description: String, ) -> Self { SchemaChange::AddColumn { - field_name, + field_names: vec![field_name], data_type, - description: Some(description), + comment: Some(description), column_move: None, } } @@ -142,9 +136,9 @@ impl SchemaChange { column_move: ColumnMove, ) -> Self { SchemaChange::AddColumn { - field_name, + field_names: vec![field_name], data_type, - description: Some(description), + comment: Some(description), column_move: Some(column_move), } } @@ -152,21 +146,24 @@ impl SchemaChange { /// impl the `rename_column`. pub fn rename_column(field_name: String, new_name: String) -> Self { SchemaChange::RenameColumn { - field_name, + field_names: vec![field_name], new_name, } } /// impl the `drop_column`. pub fn drop_column(field_name: String) -> Self { - SchemaChange::DropColumn { field_name } + SchemaChange::DropColumn { + field_names: vec![field_name], + } } /// impl the `update_column_type`. pub fn update_column_type(field_name: String, new_data_type: DataType) -> Self { SchemaChange::UpdateColumnType { - field_name, - data_type: new_data_type, + field_names: vec![field_name], + new_data_type, + keep_nullability: false, } } @@ -175,19 +172,19 @@ impl SchemaChange { SchemaChange::UpdateColumnPosition { column_move } } - /// impl the `update_column_position`. + /// impl the `update_column_nullability`. pub fn update_column_nullability(field_name: String, new_nullability: bool) -> Self { SchemaChange::UpdateColumnNullability { - field_name: vec![field_name], - nullable: new_nullability, + field_names: vec![field_name], + new_nullability, } } /// impl the `update_columns_nullability`. pub fn update_columns_nullability(field_names: Vec, new_nullability: bool) -> Self { SchemaChange::UpdateColumnNullability { - field_name: field_names, - nullable: new_nullability, + field_names, + new_nullability, } } @@ -195,7 +192,7 @@ impl SchemaChange { pub fn update_column_comment(field_name: String, comment: String) -> Self { SchemaChange::UpdateColumnComment { field_names: vec![field_name], - new_description: comment, + new_comment: comment, } } @@ -203,28 +200,32 @@ impl SchemaChange { pub fn update_columns_comment(field_names: Vec, comment: String) -> Self { SchemaChange::UpdateColumnComment { field_names, - new_description: comment, + new_comment: comment, } } } /// The type of move. /// -/// Reference: +/// Reference: #[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)] pub enum ColumnMoveType { FIRST, AFTER, + BEFORE, + LAST, } /// Represents a requested column move in a struct. /// -/// Reference: +/// Reference: #[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)] #[serde(rename_all = "camelCase")] pub struct ColumnMove { pub field_name: String, - pub referenced_field_name: Option, + /// The anchor column for `AFTER`/`BEFORE` moves (`None` for `FIRST`/`LAST`). + /// Named `referenceFieldName` on the wire to match Java Paimon. + pub reference_field_name: Option, #[serde(rename = "type")] pub move_type: ColumnMoveType, } @@ -235,9 +236,9 @@ impl ColumnMove { &self.field_name } - /// Get the referenced field name. - pub fn referenced_field_name(&self) -> Option<&str> { - self.referenced_field_name.as_deref() + /// Get the reference field name. + pub fn reference_field_name(&self) -> Option<&str> { + self.reference_field_name.as_deref() } /// Get the move type. @@ -249,19 +250,37 @@ impl ColumnMove { pub fn move_first(field_name: String) -> Self { ColumnMove { field_name, - referenced_field_name: None, + reference_field_name: None, move_type: ColumnMoveType::FIRST, } } + /// Create a new `Move` with `LAST` move type. + pub fn move_last(field_name: String) -> Self { + ColumnMove { + field_name, + reference_field_name: None, + move_type: ColumnMoveType::LAST, + } + } + /// Create a new `Move` with `AFTER` move type. - pub fn move_after(field_name: String, referenced_field_name: String) -> Self { + pub fn move_after(field_name: String, reference_field_name: String) -> Self { ColumnMove { field_name, - referenced_field_name: Some(referenced_field_name), + reference_field_name: Some(reference_field_name), move_type: ColumnMoveType::AFTER, } } + + /// Create a new `Move` with `BEFORE` move type. + pub fn move_before(field_name: String, reference_field_name: String) -> Self { + ColumnMove { + field_name, + reference_field_name: Some(reference_field_name), + move_type: ColumnMoveType::BEFORE, + } + } } #[cfg(test)] @@ -271,79 +290,66 @@ mod tests { #[test] fn test_schema_change_serialize_deserialize() { + // Java-compatible wire format: internally tagged by "action", with + // `fieldNames` arrays and `referenceFieldName` move anchors. let json_data = r#" [ { - "setOption": { - "key": "snapshot.time-retained", - "value": "2h" - } + "action": "setOption", + "key": "snapshot.time-retained", + "value": "2h" }, { - "removeOption": { - "key": "compaction.max.file-num" - } + "action": "removeOption", + "key": "compaction.max.file-num" }, { - "updateComment": { - "comment": "table.comment" - } + "action": "updateComment", + "comment": "table.comment" }, { - "addColumn": { + "action": "addColumn", + "fieldNames": ["col1"], + "dataType": "INT", + "comment": "col1_description", + "move": { "fieldName": "col1", - "dataType": "INT", - "description": "col1_description", - "move": { - "fieldName": "col1_first", - "referencedFieldName": null, - "type": "FIRST" - } + "referenceFieldName": null, + "type": "FIRST" } }, { - "renameColumn": { - "fieldName": "col3", - "newName": "col3_new_name" - } + "action": "renameColumn", + "fieldNames": ["col3"], + "newName": "col3_new_name" }, { - "dropColumn": { - "fieldName": "col1" - } + "action": "dropColumn", + "fieldNames": ["col1"] }, { - "updateColumnType": { - "fieldName": "col14", - "dataType": "DOUBLE" - } + "action": "updateColumnType", + "fieldNames": ["col14"], + "newDataType": "DOUBLE", + "keepNullability": false }, { - "updateColumnPosition": { - "move": { - "fieldName": "col4_first", - "referencedFieldName": null, - "type": "FIRST" - } + "action": "updateColumnPosition", + "move": { + "fieldName": "col4", + "referenceFieldName": "col3", + "type": "AFTER" } }, { - "updateColumnNullability": { - "fieldName": [ - "col5", - "f2" - ], - "nullable": false - } + "action": "updateColumnNullability", + "fieldNames": ["col5", "f2"], + "newNullability": false }, { - "updateColumnComment": { - "fieldNames": [ - "col5", - "f1" - ], - "newDescription": "col5 f1 field" - } + "action": "updateColumnComment", + "fieldNames": ["col5", "f1"], + "newComment": "col5 f1 field" } ]"#; @@ -364,57 +370,63 @@ mod tests { comment: Some("table.comment".to_string()), }, SchemaChange::AddColumn { - field_name: "col1".to_string(), + field_names: vec!["col1".to_string()], data_type: DataType::Int(IntType::new()), - description: Some("col1_description".to_string()), - column_move: Some(ColumnMove { - field_name: "col1_first".to_string(), - referenced_field_name: None, - move_type: ColumnMoveType::FIRST, - }), + comment: Some("col1_description".to_string()), + column_move: Some(ColumnMove::move_first("col1".to_string())), }, SchemaChange::RenameColumn { - field_name: "col3".to_string(), + field_names: vec!["col3".to_string()], new_name: "col3_new_name".to_string(), }, SchemaChange::DropColumn { - field_name: "col1".to_string(), + field_names: vec!["col1".to_string()], }, SchemaChange::UpdateColumnType { - field_name: "col14".to_string(), - data_type: DataType::Double(DoubleType::new()), + field_names: vec!["col14".to_string()], + new_data_type: DataType::Double(DoubleType::new()), + keep_nullability: false, }, SchemaChange::UpdateColumnPosition { - column_move: ColumnMove { - field_name: "col4_first".to_string(), - referenced_field_name: None, - move_type: ColumnMoveType::FIRST, - }, + column_move: ColumnMove::move_after("col4".to_string(), "col3".to_string()), }, SchemaChange::UpdateColumnNullability { - field_name: vec!["col5".to_string(), "f2".to_string()], - nullable: false, + field_names: vec!["col5".to_string(), "f2".to_string()], + new_nullability: false, }, SchemaChange::UpdateColumnComment { field_names: vec!["col5".to_string(), "f1".to_string()], - new_description: "col5 f1 field".to_string(), + new_comment: "col5 f1 field".to_string(), }, ] ); } + #[test] + fn test_schema_change_serialize_shape() { + // Verify the serialized JSON carries the Java "action" discriminator. + let change = SchemaChange::add_column("c".to_string(), DataType::Int(IntType::new())); + let value = serde_json::to_value(&change).unwrap(); + assert_eq!(value["action"], "addColumn"); + assert_eq!(value["fieldNames"][0], "c"); + + // Round-trip through JSON. + let round: SchemaChange = serde_json::from_value(value).unwrap(); + assert_eq!(round, change); + } + #[test] fn test_column_move_serialize_deserialize() { let json_data = r#" [ { "fieldName": "col1", - "referencedFieldName": null, + "referenceFieldName": null, "type": "FIRST" }, { "fieldName": "col2_after", - "referencedFieldName": "col2", + "referenceFieldName": "col2", "type": "AFTER" } ]"#; diff --git a/crates/paimon/tests/mock_server.rs b/crates/paimon/tests/mock_server.rs index 69abf233..6a9f38d0 100644 --- a/crates/paimon/tests/mock_server.rs +++ b/crates/paimon/tests/mock_server.rs @@ -34,8 +34,9 @@ use std::sync::{Arc, Mutex}; use tokio::task::JoinHandle; use paimon::api::{ - AlterDatabaseRequest, AuditRESTResponse, ConfigResponse, ErrorResponse, GetDatabaseResponse, - GetTableResponse, ListDatabasesResponse, ListTablesResponse, RenameTableRequest, ResourcePaths, + AlterDatabaseRequest, AlterTableRequest, AuditRESTResponse, ConfigResponse, ErrorResponse, + GetDatabaseResponse, GetTableResponse, ListDatabasesResponse, ListTablesResponse, + RenameTableRequest, ResourcePaths, }; #[derive(Clone, Debug, Default)] @@ -451,6 +452,40 @@ impl RESTServer { } } + /// Handle POST /databases/:db/tables/:table - alter a table. + /// + /// The mock does not mutate the stored schema; it only validates that the + /// table exists, which is enough to exercise the client's alter-table path + /// (request serialization + 2xx handling). + pub async fn alter_table( + Path((db, table)): Path<(String, String)>, + Extension(state): Extension>, + Json(_request): Json, + ) -> impl IntoResponse { + let s = state.inner.lock().unwrap(); + let key = format!("{db}.{table}"); + if s.no_permission_tables.contains(&key) { + let err = ErrorResponse::new( + Some("table".to_string()), + Some(table), + Some("No Permission".to_string()), + Some(403), + ); + return (StatusCode::FORBIDDEN, Json(err)).into_response(); + } + if s.tables.contains_key(&key) { + (StatusCode::OK, Json(serde_json::json!(""))).into_response() + } else { + let err = ErrorResponse::new( + Some("table".to_string()), + Some(table), + Some("Not Found".to_string()), + Some(404), + ); + (StatusCode::NOT_FOUND, Json(err)).into_response() + } + } + /// Handle POST /rename-table - rename a table. pub async fn rename_table( Extension(state): Extension>, @@ -722,7 +757,9 @@ pub async fn start_mock_server( ) .route( &format!("{prefix}/databases/:db/tables/:table"), - get(RESTServer::get_table).delete(RESTServer::drop_table), + get(RESTServer::get_table) + .post(RESTServer::alter_table) + .delete(RESTServer::drop_table), ) .route( &format!("{prefix}/tables/rename"), diff --git a/crates/paimon/tests/rest_catalog_test.rs b/crates/paimon/tests/rest_catalog_test.rs index 2166b44d..70a86c5c 100644 --- a/crates/paimon/tests/rest_catalog_test.rs +++ b/crates/paimon/tests/rest_catalog_test.rs @@ -25,7 +25,7 @@ use std::collections::HashMap; use paimon::api::ConfigResponse; use paimon::catalog::{Catalog, Identifier, RESTCatalog}; use paimon::common::Options; -use paimon::spec::{BigIntType, DataType, Schema, VarCharType}; +use paimon::spec::{BigIntType, DataType, Schema, SchemaChange, VarCharType}; mod mock_server; use mock_server::{start_mock_server, RESTServer}; @@ -465,17 +465,35 @@ async fn test_catalog_rename_table_ignore_if_not_exists() { // ==================== Alter Table Tests ==================== #[tokio::test] -async fn test_catalog_alter_table_unsupported() { +async fn test_catalog_alter_table() { let ctx = setup_catalog(vec!["default"]).await; let identifier = Identifier::new("default", "some_table"); - - // alter_table should return Unsupported error - let result = ctx.catalog.alter_table(&identifier, vec![], false).await; - assert!( - result.is_err(), - "alter_table should return Unsupported error" - ); + ctx.catalog + .create_table(&identifier, test_schema(), false) + .await + .unwrap(); + + // alter_table on an existing table succeeds (client builds the request and + // POSTs it; the mock accepts it). + let changes = vec![SchemaChange::update_column_comment( + "id".to_string(), + "the id".to_string(), + )]; + let result = ctx.catalog.alter_table(&identifier, changes, false).await; + assert!(result.is_ok(), "alter_table should succeed: {result:?}"); + + // alter_table on a missing table: error, unless ignore_if_not_exists. + let missing = Identifier::new("default", "ghost"); + assert!(ctx + .catalog + .alter_table(&missing, vec![], false) + .await + .is_err()); + ctx.catalog + .alter_table(&missing, vec![], true) + .await + .unwrap(); } // ==================== Multiple Databases Tests ====================