diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 038ece72..dc04b5b4 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -90,6 +90,12 @@ jobs: RUST_LOG: DEBUG RUST_BACKTRACE: full + - name: Test paimon-rest-server + run: cargo test -p paimon-rest-server --all-targets + env: + RUST_LOG: DEBUG + RUST_BACKTRACE: full + integration: runs-on: ubuntu-latest steps: diff --git a/Cargo.toml b/Cargo.toml index cd45c340..ea2d82b9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,7 +17,7 @@ [workspace] resolver = "2" -members = ["crates/paimon", "crates/integration_tests", "bindings/c", "bindings/python", "crates/integrations/datafusion"] +members = ["crates/paimon", "crates/paimon-rest-server", "crates/integration_tests", "bindings/c", "bindings/python", "crates/integrations/datafusion"] [workspace.package] version = "0.3.0" diff --git a/crates/integrations/datafusion/src/sql_context.rs b/crates/integrations/datafusion/src/sql_context.rs index 62e2b7b3..3f7ff189 100644 --- a/crates/integrations/datafusion/src/sql_context.rs +++ b/crates/integrations/datafusion/src/sql_context.rs @@ -3147,7 +3147,7 @@ mod tests { assert_eq!(identifier.object(), "t1"); assert_eq!(changes.len(), 1); assert!( - matches!(&changes[0], SchemaChange::AddColumn { field_name, .. } if field_name == "age") + matches!(&changes[0], SchemaChange::AddColumn { field_names, .. } if field_names.first().map(String::as_str) == Some("age")) ); } else { panic!("expected AlterTable call"); @@ -3171,10 +3171,10 @@ mod tests { assert!(matches!( &changes[0], SchemaChange::AddColumn { - field_name, + field_names, data_type, .. - } if field_name == "payload" && matches!(data_type, PaimonDataType::Blob(_)) + } if field_names.first().map(String::as_str) == Some("payload") && matches!(data_type, PaimonDataType::Blob(_)) )); } else { panic!("expected AlterTable call"); @@ -3196,7 +3196,7 @@ mod tests { if let CatalogCall::AlterTable { changes, .. } = &calls[0] { assert_eq!(changes.len(), 1); assert!( - matches!(&changes[0], SchemaChange::DropColumn { field_name } if field_name == "age") + matches!(&changes[0], SchemaChange::DropColumn { field_names } if field_names.first().map(String::as_str) == Some("age")) ); } else { panic!("expected AlterTable call"); @@ -3219,8 +3219,8 @@ mod tests { assert_eq!(changes.len(), 1); assert!(matches!( &changes[0], - SchemaChange::RenameColumn { field_name, new_name } - if field_name == "old_name" && new_name == "new_name" + SchemaChange::RenameColumn { field_names, new_name } + if field_names.first().map(String::as_str) == Some("old_name") && new_name == "new_name" )); } else { panic!("expected AlterTable call"); diff --git a/crates/integrations/datafusion/tests/sql_context_tests.rs b/crates/integrations/datafusion/tests/sql_context_tests.rs index 7deed175..d4f0c1da 100644 --- a/crates/integrations/datafusion/tests/sql_context_tests.rs +++ b/crates/integrations/datafusion/tests/sql_context_tests.rs @@ -480,23 +480,18 @@ async fn test_alter_table_add_column() { .await .unwrap(); - // ALTER TABLE is not yet implemented in FileSystemCatalog, so we expect an error - let result = sql_context + sql_context .sql("ALTER TABLE paimon.mydb.alter_test ADD COLUMN age INT") - .await; + .await + .expect("ALTER TABLE ADD COLUMN should succeed"); - // FileSystemCatalog does not support AddColumn schema change yet - assert!( - result.is_err(), - "ALTER TABLE ADD COLUMN should fail because AddColumn is not yet supported" - ); - let err_msg = result.unwrap_err().to_string(); - assert!( - err_msg.contains("not yet implemented") - || err_msg.contains("Unsupported") - || err_msg.contains("not yet supported"), - "Error should indicate alter_table is not implemented, got: {err_msg}" - ); + // The new column is appended to the table schema. + let table = catalog + .get_table(&Identifier::new("mydb", "alter_test")) + .await + .unwrap(); + let names: Vec<&str> = table.schema().fields().iter().map(|f| f.name()).collect(); + assert_eq!(names, vec!["id", "name", "age"]); } #[tokio::test] diff --git a/crates/paimon-rest-server/Cargo.toml b/crates/paimon-rest-server/Cargo.toml new file mode 100644 index 00000000..238eacc1 --- /dev/null +++ b/crates/paimon-rest-server/Cargo.toml @@ -0,0 +1,50 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "paimon-rest-server" +version.workspace = true +edition.workspace = true +homepage.workspace = true +repository.workspace = true +license.workspace = true +rust-version.workspace = true +description = "A FileSystemCatalog-backed Paimon REST catalog server for local end-to-end testing." +# Testing/dev tool: not published to crates.io. +publish = false + +[lib] +name = "paimon_rest_server" +path = "src/lib.rs" + +[[bin]] +name = "paimon-rest-server" +path = "src/main.rs" + +[dependencies] +paimon = { workspace = true } +axum = { version = "0.7", features = ["macros", "tokio", "http1", "http2"] } +tokio = { version = "1.39.2", features = ["rt-multi-thread", "macros", "net", "signal", "time"] } +serde = { version = "1", features = ["derive"] } +serde_json = "1.0.120" +async-trait = "0.1.81" + +[dev-dependencies] +tempfile = "3" +arrow-array = { workspace = true } +arrow-schema = { workspace = true } +futures = "0.3" diff --git a/crates/paimon-rest-server/README.md b/crates/paimon-rest-server/README.md new file mode 100644 index 00000000..2f914740 --- /dev/null +++ b/crates/paimon-rest-server/README.md @@ -0,0 +1,80 @@ + + +# paimon-rest-server + +A Paimon REST catalog server backed by a real [`FileSystemCatalog`], for +**local end-to-end testing** of the REST catalog client. It is a dev/testing +tool and is **not published** to crates.io. + +Unlike the in-memory mock used in `paimon`'s own unit tests, this server maps +the Paimon REST protocol onto a real `FileSystemCatalog`, so the client-side +`RESTCatalog` can be exercised against actual on-disk metadata: + +- config + database/table metadata CRUD; +- append write + commit (the commit endpoint persists the posted snapshot via + `SnapshotManager`) + read back; +- column-level `alter table`. + +Because both the server and the client point at the **same** local warehouse, +the client writes data files directly while the server persists the snapshot +metadata it receives on the commit endpoint. The wire format mirrors Java +Paimon, so the same warehouse can be round-tripped with a Java reader/writer. + +## Run the standalone server + +```bash +REST_WAREHOUSE=/tmp/paimon-warehouse REST_HOST=127.0.0.1 REST_PORT=8080 \ + cargo run -p paimon-rest-server +``` + +Environment variables: `REST_WAREHOUSE` (default `/tmp/paimon-warehouse`), +`REST_HOST` (default `127.0.0.1`), `REST_PORT` (default `8080`), +`REST_PREFIX` (default empty). + +## Use as a test fixture + +```rust,no_run +# async fn run() -> Result<(), Box> { +let server = paimon_rest_server::FsRestCatalogServer::start("/tmp/paimon-wh", "").await?; +// Point a RESTCatalog client at `server.url()`. +# Ok(()) } +``` + +See `tests/e2e.rs` for the full metadata / write-commit-read / alter-table +round trips. + +## Endpoints + +Served under the configured prefix (`/v1/...` by default): + +| Method | Path | Operation | +| --- | --- | --- | +| GET | `/v1/config` | server config | +| GET / POST | `/databases` | list / create database | +| GET / POST / DELETE | `/databases/{db}` | get / alter (no-op) / drop database | +| GET / POST | `/databases/{db}/tables` | list / create table | +| GET / POST / DELETE | `/databases/{db}/tables/{table}` | get / alter / drop table | +| POST | `/tables/rename` | rename table | +| POST | `/databases/{db}/tables/{table}/commit` | commit a snapshot | + +The data-token endpoint returns `501`; it is never called when +`data-token.enabled=false` (the default). + +[`FileSystemCatalog`]: https://docs.rs/paimon diff --git a/crates/paimon-rest-server/src/lib.rs b/crates/paimon-rest-server/src/lib.rs new file mode 100644 index 00000000..a867543d --- /dev/null +++ b/crates/paimon-rest-server/src/lib.rs @@ -0,0 +1,493 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! A real Paimon REST catalog server backed by [`FileSystemCatalog`]. +//! +//! Unlike the in-memory mock used by `paimon`'s own tests, this server maps the +//! Paimon REST protocol onto a real [`FileSystemCatalog`], so the client-side +//! [`RESTCatalog`](paimon::catalog::RESTCatalog) can be exercised end to end: +//! metadata CRUD, plus append write + commit + read back (the commit endpoint +//! persists the posted snapshot via [`SnapshotManager`]). +//! +//! Because both the server and the client point at the *same* local warehouse, +//! the client writes data files directly while the server only persists the +//! snapshot metadata it receives on the commit endpoint. +//! +//! # Example +//! ```no_run +//! # async fn run() -> Result<(), Box> { +//! let server = paimon_rest_server::FsRestCatalogServer::start("/tmp/paimon-wh", "").await?; +//! println!("listening on {}", server.url()); +//! # Ok(()) } +//! ``` + +use std::collections::HashMap; +use std::net::SocketAddr; +use std::sync::Arc; + +use axum::{ + extract::{Extension, Json, Path, Query}, + http::StatusCode, + response::{IntoResponse, Response}, + routing::{get, post}, + serve, Router, +}; +use serde::Deserialize; +use serde_json::json; + +use paimon::api::{ + AlterDatabaseRequest, AlterTableRequest, AuditRESTResponse, ConfigResponse, CreateTableRequest, + ErrorResponse, GetDatabaseResponse, GetTableResponse, ListDatabasesResponse, + ListTablesResponse, RenameTableRequest, ResourcePaths, +}; +use paimon::catalog::{Catalog, Identifier}; +use paimon::common::{CatalogOptions, Options}; +use paimon::spec::{Schema, Snapshot}; +use paimon::table::SnapshotManager; +use paimon::{Error, FileSystemCatalog}; + +/// Convenience boxed error type for server construction (covers both +/// [`paimon::Error`] from catalog setup and [`std::io::Error`] from binding). +pub type BoxError = Box; + +/// Shared server state handed to every request handler. +struct AppState { + catalog: FileSystemCatalog, + config: ConfigResponse, +} + +/// A running FileSystemCatalog-backed REST catalog server. +/// +/// The server runs on a background Tokio task and is aborted on drop, so a +/// test can simply let it go out of scope when finished. +pub struct FsRestCatalogServer { + addr: SocketAddr, + handle: tokio::task::JoinHandle<()>, +} + +impl FsRestCatalogServer { + /// Start a server on an OS-assigned port (`127.0.0.1:0`). + /// + /// `warehouse` is the local warehouse root; `prefix` is the REST resource + /// prefix (pass `""` for none, mirroring `/v1/...`). + pub async fn start(warehouse: impl Into, prefix: &str) -> Result { + Self::start_on(warehouse, prefix, "127.0.0.1", 0).await + } + + /// Start a server bound to an explicit host and port. + pub async fn start_on( + warehouse: impl Into, + prefix: &str, + host: &str, + port: u16, + ) -> Result { + let warehouse = warehouse.into(); + + let mut options = Options::new(); + options.set(CatalogOptions::WAREHOUSE, warehouse.clone()); + let catalog = FileSystemCatalog::new(options)?; + + // Advertise the prefix (and warehouse) to clients via GET /v1/config. + let mut defaults = HashMap::new(); + if !prefix.is_empty() { + defaults.insert(CatalogOptions::PREFIX.to_string(), prefix.to_string()); + } + defaults.insert(CatalogOptions::WAREHOUSE.to_string(), warehouse); + let config = ConfigResponse::new(defaults); + + let state = Arc::new(AppState { catalog, config }); + let app = build_router(prefix, state); + + let listener = tokio::net::TcpListener::bind((host, port)).await?; + let addr = listener.local_addr()?; + let handle = tokio::spawn(async move { + if let Err(e) = serve(listener, app.into_make_service()).await { + eprintln!("paimon-rest-server error: {e}"); + } + }); + + Ok(Self { addr, handle }) + } + + /// The bound socket address. + pub fn addr(&self) -> SocketAddr { + self.addr + } + + /// The base URL clients should connect to (e.g. `http://127.0.0.1:54321`). + pub fn url(&self) -> String { + format!("http://{}", self.addr) + } +} + +impl Drop for FsRestCatalogServer { + fn drop(&mut self) { + self.handle.abort(); + } +} + +/// Build the axum router, wiring every endpoint under the given prefix. +fn build_router(prefix: &str, state: Arc) -> Router { + let paths = ResourcePaths::new(prefix); + let base = paths.base_path(); + + Router::new() + // Config endpoint is always at the fixed /v1/config path. + .route("/v1/config", get(get_config)) + .route( + &format!("{base}/databases"), + get(list_databases).post(create_database), + ) + .route( + &format!("{base}/databases/:db"), + get(get_database).post(alter_database).delete(drop_database), + ) + .route( + &format!("{base}/databases/:db/tables"), + get(list_tables).post(create_table), + ) + .route( + &format!("{base}/databases/:db/tables/:table"), + get(get_table).post(alter_table).delete(drop_table), + ) + .route(&format!("{base}/tables/rename"), post(rename_table)) + .route( + &format!("{base}/databases/:db/tables/:table/commit"), + post(commit), + ) + // Token endpoint is never hit when `data-token.enabled=false` (default), + // but we serve a stub so a misconfigured client gets a clear 501. + .route( + &format!("{base}/databases/:db/tables/:table/token"), + get(table_token_stub), + ) + .layer(Extension(state)) +} + +// ============================================================================ +// Error mapping: paimon::Error -> (HTTP status, ErrorResponse) +// +// The client reconstructs the original error solely from the HTTP status code +// (see `crates/paimon/src/api/rest_error.rs`), so the code below MUST line the +// status codes up with `RestError::from_error_response`. +// ============================================================================ + +fn error_response(e: Error) -> Response { + let (status, resource_type, resource_name) = match &e { + Error::DatabaseNotExist { database } => ( + StatusCode::NOT_FOUND, + Some("database".to_string()), + Some(database.clone()), + ), + Error::TableNotExist { full_name } => ( + StatusCode::NOT_FOUND, + Some("table".to_string()), + Some(full_name.clone()), + ), + Error::DatabaseAlreadyExist { database } => ( + StatusCode::CONFLICT, + Some("database".to_string()), + Some(database.clone()), + ), + Error::TableAlreadyExist { full_name } => ( + StatusCode::CONFLICT, + Some("table".to_string()), + Some(full_name.clone()), + ), + Error::DatabaseNotEmpty { database } => ( + StatusCode::CONFLICT, + Some("database".to_string()), + Some(database.clone()), + ), + Error::ColumnNotExist { full_name, column } => ( + StatusCode::BAD_REQUEST, + Some(format!("column:{full_name}")), + Some(column.clone()), + ), + Error::ColumnAlreadyExist { full_name, column } => ( + StatusCode::BAD_REQUEST, + Some(format!("column:{full_name}")), + Some(column.clone()), + ), + Error::IdentifierInvalid { .. } | Error::ConfigInvalid { .. } => { + (StatusCode::BAD_REQUEST, None, None) + } + Error::Unsupported { .. } => (StatusCode::NOT_IMPLEMENTED, None, None), + _ => (StatusCode::INTERNAL_SERVER_ERROR, None, None), + }; + + let body = ErrorResponse::new( + resource_type, + resource_name, + Some(e.to_string()), + Some(status.as_u16() as i32), + ); + (status, Json(body)).into_response() +} + +/// A 2xx response with an empty JSON body, for operations whose REST contract +/// returns nothing meaningful (the client deserializes these as `Value`). +fn ok_empty() -> Response { + (StatusCode::OK, Json(json!({}))).into_response() +} + +// ============================================================================ +// Handlers +// ============================================================================ + +async fn get_config( + Query(_params): Query>, + Extension(state): Extension>, +) -> Response { + (StatusCode::OK, Json(state.config.clone())).into_response() +} + +async fn list_databases(Extension(state): Extension>) -> Response { + match state.catalog.list_databases().await { + Ok(mut dbs) => { + dbs.sort(); + (StatusCode::OK, Json(ListDatabasesResponse::new(dbs, None))).into_response() + } + Err(e) => error_response(e), + } +} + +async fn create_database( + Extension(state): Extension>, + Json(payload): Json, +) -> Response { + match state + .catalog + .create_database(&payload.name, false, payload.options) + .await + { + Ok(()) => ok_empty(), + Err(e) => error_response(e), + } +} + +async fn get_database( + Path(db): Path, + Extension(state): Extension>, +) -> Response { + match state.catalog.get_database(&db).await { + Ok(database) => { + let response = GetDatabaseResponse::new( + Some(database.name.clone()), + Some(database.name), + None, + database.options, + empty_audit(), + ); + (StatusCode::OK, Json(response)).into_response() + } + Err(e) => error_response(e), + } +} + +/// Alter database: `FileSystemCatalog` does not persist database properties, +/// and the `Catalog` trait has no `alter_database`, so no client path reaches +/// this. We only validate that the database exists and return OK; the request +/// is intentionally a no-op. +async fn alter_database( + Path(db): Path, + Extension(state): Extension>, + Json(_request): Json, +) -> Response { + match state.catalog.get_database(&db).await { + Ok(_) => ok_empty(), + Err(e) => error_response(e), + } +} + +async fn drop_database( + Path(db): Path, + Extension(state): Extension>, +) -> Response { + // The client (`RESTCatalog::drop_database`) already enforces the non-cascade + // "database must be empty" check before issuing the DELETE, so the server + // force-drops with cascade=true. + match state.catalog.drop_database(&db, false, true).await { + Ok(()) => ok_empty(), + Err(e) => error_response(e), + } +} + +async fn list_tables( + Path(db): Path, + Extension(state): Extension>, +) -> Response { + match state.catalog.list_tables(&db).await { + Ok(mut tables) => { + tables.sort(); + ( + StatusCode::OK, + Json(ListTablesResponse::new(Some(tables), None)), + ) + .into_response() + } + Err(e) => error_response(e), + } +} + +async fn create_table( + Path(db): Path, + Extension(state): Extension>, + Json(request): Json, +) -> Response { + // Trust the path's database; take the table name from the request body. + let identifier = Identifier::new(db, request.identifier.object().to_string()); + match state + .catalog + .create_table(&identifier, request.schema, false) + .await + { + Ok(()) => ok_empty(), + Err(e) => error_response(e), + } +} + +async fn get_table( + Path((db, table)): Path<(String, String)>, + Extension(state): Extension>, +) -> Response { + let identifier = Identifier::new(db, table.clone()); + let resolved = match state.catalog.get_table(&identifier).await { + Ok(t) => t, + Err(e) => return error_response(e), + }; + + let table_schema = resolved.schema(); + // Convert the stored `TableSchema` into the DDL `Schema` the response + // carries. `Schema` is a field subset of `TableSchema` (both camelCase), + // and serde ignores the extra keys, preserving field ids exactly. + let schema: Schema = + match serde_json::to_value(table_schema).and_then(serde_json::from_value::) { + Ok(s) => s, + Err(e) => { + return error_response(Error::DataInvalid { + message: format!("Failed to convert table schema: {e}"), + source: Some(Box::new(e)), + }) + } + }; + + let response = GetTableResponse::new( + // FileSystemCatalog has no UUID concept; the full name is a stable id + // that satisfies the client's RESTEnv requirement. + Some(identifier.full_name()), + Some(table), + Some(resolved.location().to_string()), + Some(false), + Some(table_schema.id()), + Some(schema), + empty_audit(), + ); + (StatusCode::OK, Json(response)).into_response() +} + +async fn drop_table( + Path((db, table)): Path<(String, String)>, + Extension(state): Extension>, +) -> Response { + let identifier = Identifier::new(db, table); + match state.catalog.drop_table(&identifier, false).await { + Ok(()) => ok_empty(), + Err(e) => error_response(e), + } +} + +async fn alter_table( + Path((db, table)): Path<(String, String)>, + Extension(state): Extension>, + Json(request): Json, +) -> Response { + let identifier = Identifier::new(db, table); + match state + .catalog + .alter_table(&identifier, request.changes, false) + .await + { + Ok(()) => ok_empty(), + Err(e) => error_response(e), + } +} + +async fn rename_table( + Extension(state): Extension>, + Json(request): Json, +) -> Response { + match state + .catalog + .rename_table(&request.source, &request.destination, false) + .await + { + Ok(()) => ok_empty(), + Err(e) => error_response(e), + } +} + +/// Request body posted by the client's `RESTSnapshotCommit` (see +/// `crates/paimon/src/api/rest_api.rs::commit_snapshot`). +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +struct CommitRequest { + #[serde(default)] + table_uuid: Option, + snapshot: Snapshot, + #[serde(default)] + statistics: serde_json::Value, +} + +async fn commit( + Path((db, table)): Path<(String, String)>, + Extension(state): Extension>, + Json(request): Json, +) -> Response { + let _ = (request.table_uuid, request.statistics); + let identifier = Identifier::new(db, table); + + // Resolve the table's FileIO and on-disk location, then persist the posted + // snapshot exactly like the filesystem catalog's own commit path does. + let resolved = match state.catalog.get_table(&identifier).await { + Ok(t) => t, + Err(e) => return error_response(e), + }; + let manager = SnapshotManager::new(resolved.file_io().clone(), resolved.location().to_string()); + match manager.commit_snapshot(&request.snapshot).await { + Ok(success) => (StatusCode::OK, Json(json!({ "success": success }))).into_response(), + Err(e) => error_response(e), + } +} + +async fn table_token_stub(Path((_db, _table)): Path<(String, String)>) -> Response { + let body = ErrorResponse::new( + None, + None, + Some( + "Data token is not supported by paimon-rest-server; \ + set data-token.enabled=false (the default)." + .to_string(), + ), + Some(StatusCode::NOT_IMPLEMENTED.as_u16() as i32), + ); + (StatusCode::NOT_IMPLEMENTED, Json(body)).into_response() +} + +fn empty_audit() -> AuditRESTResponse { + AuditRESTResponse::new(None, None, None, None, None) +} diff --git a/crates/paimon-rest-server/src/main.rs b/crates/paimon-rest-server/src/main.rs new file mode 100644 index 00000000..8a4cbfeb --- /dev/null +++ b/crates/paimon-rest-server/src/main.rs @@ -0,0 +1,63 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Standalone Paimon REST catalog server backed by a local filesystem warehouse. +//! +//! # Usage +//! ```bash +//! REST_WAREHOUSE=/tmp/paimon-warehouse REST_HOST=127.0.0.1 REST_PORT=8080 \ +//! cargo run -p paimon-rest-server +//! ``` +//! +//! Then point a client (e.g. the `rest_local_smoke` example, or Java) at it: +//! ```bash +//! REST_URI=http://localhost:8080 REST_WAREHOUSE=/tmp/paimon-warehouse \ +//! cargo run -p paimon --example rest_local_smoke +//! ``` + +use paimon_rest_server::{BoxError, FsRestCatalogServer}; + +fn env_or(key: &str, default: &str) -> String { + std::env::var(key).unwrap_or_else(|_| default.to_string()) +} + +#[tokio::main] +async fn main() -> Result<(), BoxError> { + let warehouse = env_or("REST_WAREHOUSE", "/tmp/paimon-warehouse"); + let host = env_or("REST_HOST", "127.0.0.1"); + let port: u16 = env_or("REST_PORT", "8080") + .parse() + .map_err(|e| format!("invalid REST_PORT: {e}"))?; + let prefix = env_or("REST_PREFIX", ""); + + // Ensure the warehouse directory exists so FileIO can list it. + std::fs::create_dir_all(&warehouse)?; + + let server = FsRestCatalogServer::start_on(warehouse.clone(), &prefix, &host, port).await?; + + println!("Paimon REST catalog server (filesystem-backed)"); + println!(" warehouse : {warehouse}"); + println!(" listening : {}", server.url()); + if !prefix.is_empty() { + println!(" prefix : {prefix}"); + } + println!("Press Ctrl-C to stop."); + + tokio::signal::ctrl_c().await?; + println!("\nShutting down."); + Ok(()) +} diff --git a/crates/paimon-rest-server/tests/e2e.rs b/crates/paimon-rest-server/tests/e2e.rs new file mode 100644 index 00000000..51e74ed7 --- /dev/null +++ b/crates/paimon-rest-server/tests/e2e.rs @@ -0,0 +1,301 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! End-to-end tests for the FileSystemCatalog-backed REST catalog server. +//! +//! Each test spins up a real [`FsRestCatalogServer`] over a temporary warehouse +//! and drives it through the client-side [`RESTCatalog`], covering metadata CRUD +//! and a full append write + commit + read-back round trip. + +use std::collections::HashMap; +use std::sync::Arc; + +use arrow_array::{Int32Array, RecordBatch, StringArray}; +use arrow_schema::{DataType as ArrowDataType, Field as ArrowField, Schema as ArrowSchema}; +use futures::TryStreamExt; +use tempfile::TempDir; + +use paimon::catalog::{Catalog, Identifier, RESTCatalog}; +use paimon::common::{CatalogOptions, Options}; +use paimon::spec::{BigIntType, DataType, IntType, Schema, SchemaChange, VarCharType}; + +use paimon_rest_server::FsRestCatalogServer; + +/// Holds the temp warehouse and the running server so they outlive the catalog. +struct TestContext { + _warehouse: TempDir, + _server: FsRestCatalogServer, + catalog: RESTCatalog, +} + +async fn setup() -> TestContext { + let warehouse = TempDir::new().expect("create temp warehouse"); + let warehouse_path = warehouse.path().to_str().unwrap().to_string(); + + let server = FsRestCatalogServer::start(warehouse_path.clone(), "") + .await + .expect("start server"); + + let mut options = Options::new(); + options.set(CatalogOptions::METASTORE, "rest"); + options.set(CatalogOptions::URI, server.url()); + options.set(CatalogOptions::WAREHOUSE, &warehouse_path); + options.set(CatalogOptions::TOKEN_PROVIDER, "bear"); + options.set(CatalogOptions::TOKEN, "dummy-token"); + + let catalog = RESTCatalog::new(options, true) + .await + .expect("create RESTCatalog"); + + TestContext { + _warehouse: warehouse, + _server: server, + catalog, + } +} + +fn append_only_schema() -> Schema { + Schema::builder() + .column("id", DataType::Int(IntType::new())) + .column("name", DataType::VarChar(VarCharType::new(255).unwrap())) + .option("bucket", "1") + .option("bucket-key", "id") + .build() + .expect("build schema") +} + +fn sample_batch() -> RecordBatch { + let arrow_schema = Arc::new(ArrowSchema::new(vec![ + ArrowField::new("id", ArrowDataType::Int32, true), + ArrowField::new("name", ArrowDataType::Utf8, true), + ])); + RecordBatch::try_new( + arrow_schema, + vec![ + Arc::new(Int32Array::from(vec![1, 2, 3])), + Arc::new(StringArray::from(vec!["alice", "bob", "carol"])), + ], + ) + .expect("build batch") +} + +// ==================== Database metadata ==================== + +#[tokio::test] +async fn test_database_crud() { + let ctx = setup().await; + let cat = &ctx.catalog; + + // Initially empty. + assert!(cat.list_databases().await.unwrap().is_empty()); + + // Create + list + get. + cat.create_database("db1", false, HashMap::new()) + .await + .unwrap(); + let dbs = cat.list_databases().await.unwrap(); + assert_eq!(dbs, vec!["db1".to_string()]); + let db = cat.get_database("db1").await.unwrap(); + assert_eq!(db.name, "db1"); + + // Duplicate without ignore_if_exists -> error; with ignore -> ok. + assert!(cat + .create_database("db1", false, HashMap::new()) + .await + .is_err()); + cat.create_database("db1", true, HashMap::new()) + .await + .unwrap(); + + // Get missing -> error. + assert!(cat.get_database("nope").await.is_err()); + + // Drop + verify; ignore_if_not_exists semantics. + cat.drop_database("db1", false, false).await.unwrap(); + assert!(cat.list_databases().await.unwrap().is_empty()); + assert!(cat.drop_database("db1", false, false).await.is_err()); + cat.drop_database("db1", true, false).await.unwrap(); +} + +// ==================== Table metadata ==================== + +#[tokio::test] +async fn test_table_crud_and_rename() { + let ctx = setup().await; + let cat = &ctx.catalog; + cat.create_database("db", false, HashMap::new()) + .await + .unwrap(); + + let users = Identifier::new("db", "users"); + cat.create_table(&users, append_only_schema(), false) + .await + .unwrap(); + assert_eq!( + cat.list_tables("db").await.unwrap(), + vec!["users".to_string()] + ); + + // Duplicate create errors unless ignored. + assert!(cat + .create_table(&users, append_only_schema(), false) + .await + .is_err()); + cat.create_table(&users, append_only_schema(), true) + .await + .unwrap(); + + // get_table returns a usable table with the right schema/location. + let table = cat.get_table(&users).await.unwrap(); + assert_eq!(table.schema().fields().len(), 2); + assert!(table.location().ends_with("db.db/users")); + + // Rename round trip. + let renamed = Identifier::new("db", "users_renamed"); + cat.rename_table(&users, &renamed, false).await.unwrap(); + assert_eq!( + cat.list_tables("db").await.unwrap(), + vec!["users_renamed".to_string()] + ); + assert!(cat.get_table(&users).await.is_err()); + cat.rename_table(&renamed, &users, false).await.unwrap(); + + // Drop + missing semantics. + cat.drop_table(&users, false).await.unwrap(); + assert!(cat.list_tables("db").await.unwrap().is_empty()); + assert!(cat.drop_table(&users, false).await.is_err()); + cat.drop_table(&users, true).await.unwrap(); +} + +#[tokio::test] +async fn test_get_table_missing() { + let ctx = setup().await; + let cat = &ctx.catalog; + cat.create_database("db", false, HashMap::new()) + .await + .unwrap(); + assert!(cat + .get_table(&Identifier::new("db", "ghost")) + .await + .is_err()); +} + +// ==================== Full write + commit + read ==================== + +#[tokio::test] +async fn test_write_commit_read_roundtrip() { + let ctx = setup().await; + let cat = &ctx.catalog; + + cat.create_database("smoke_db", false, HashMap::new()) + .await + .unwrap(); + let ident = Identifier::new("smoke_db", "users"); + cat.create_table(&ident, append_only_schema(), false) + .await + .unwrap(); + + // Append write + commit through RESTSnapshotCommit (-> server commit endpoint). + let table = cat.get_table(&ident).await.unwrap(); + let write_builder = table.new_write_builder(); + let mut writer = write_builder.new_write().unwrap(); + writer.write_arrow_batch(&sample_batch()).await.unwrap(); + let messages = writer.prepare_commit().await.unwrap(); + assert!(!messages.is_empty(), "expected at least one commit message"); + write_builder.new_commit().commit(messages).await.unwrap(); + + // Read back. + let table = cat.get_table(&ident).await.unwrap(); + let read_builder = table.new_read_builder(); + let plan = read_builder.new_scan().plan().await.unwrap(); + let read = read_builder.new_read().unwrap(); + let mut stream = read.to_arrow(plan.splits()).unwrap(); + + let mut total = 0usize; + while let Some(batch) = stream.try_next().await.unwrap() { + total += batch.num_rows(); + } + assert_eq!(total, 3, "expected 3 rows read back, got {total}"); +} + +// ==================== alter table over REST ==================== + +#[tokio::test] +async fn test_alter_table_columns() { + let ctx = setup().await; + let cat = &ctx.catalog; + + cat.create_database("db", false, HashMap::new()) + .await + .unwrap(); + let ident = Identifier::new("db", "events"); + cat.create_table(&ident, append_only_schema(), false) + .await + .unwrap(); + + // Apply a batch of column changes through the REST alter_table path. + cat.alter_table( + &ident, + vec![ + SchemaChange::add_column("age".to_string(), DataType::Int(IntType::new())), + SchemaChange::rename_column("name".to_string(), "full_name".to_string()), + SchemaChange::update_column_comment("id".to_string(), "the id".to_string()), + SchemaChange::update_column_type( + "age".to_string(), + DataType::BigInt(BigIntType::new()), + ), + ], + false, + ) + .await + .unwrap(); + + // The server persisted a new schema version; get_table reflects it. + let table = cat.get_table(&ident).await.unwrap(); + let schema = table.schema(); + let names: Vec<&str> = schema.fields().iter().map(|f| f.name()).collect(); + assert_eq!(names, vec!["id", "full_name", "age"]); + + let id_field = schema.fields().iter().find(|f| f.name() == "id").unwrap(); + assert_eq!(id_field.description(), Some("the id")); + let age_field = schema.fields().iter().find(|f| f.name() == "age").unwrap(); + assert!(matches!(age_field.data_type(), DataType::BigInt(_))); + + // alter on a missing table: ignored vs. error. + let missing = Identifier::new("db", "nope"); + cat.alter_table( + &missing, + vec![SchemaChange::update_column_comment( + "id".to_string(), + "x".to_string(), + )], + true, + ) + .await + .unwrap(); + assert!(cat + .alter_table( + &missing, + vec![SchemaChange::update_column_comment( + "id".to_string(), + "x".to_string(), + )], + false, + ) + .await + .is_err()); +} diff --git a/crates/paimon/src/api/api_request.rs b/crates/paimon/src/api/api_request.rs index 33a52fd7..28e4bacf 100644 --- a/crates/paimon/src/api/api_request.rs +++ b/crates/paimon/src/api/api_request.rs @@ -22,7 +22,10 @@ use serde::{Deserialize, Serialize}; use std::collections::HashMap; -use crate::{catalog::Identifier, spec::Schema}; +use crate::{ + catalog::Identifier, + spec::{Schema, SchemaChange}, +}; /// Request to create a new database. #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] @@ -95,6 +98,23 @@ impl CreateTableRequest { } } +/// Request to alter a table's schema. +/// +/// Wire-compatible with Java Paimon's `AlterTableRequest` (`{"changes": [...]}`). +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct AlterTableRequest { + /// The ordered list of schema changes to apply. + pub changes: Vec, +} + +impl AlterTableRequest { + /// Create a new AlterTableRequest. + pub fn new(changes: Vec) -> Self { + Self { changes } + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/crates/paimon/src/api/mod.rs b/crates/paimon/src/api/mod.rs index 8584cb08..7672b9a7 100644 --- a/crates/paimon/src/api/mod.rs +++ b/crates/paimon/src/api/mod.rs @@ -31,7 +31,8 @@ mod api_response; // Re-export request types pub use api_request::{ - AlterDatabaseRequest, CreateDatabaseRequest, CreateTableRequest, RenameTableRequest, + AlterDatabaseRequest, AlterTableRequest, CreateDatabaseRequest, CreateTableRequest, + RenameTableRequest, }; // Re-export response types diff --git a/crates/paimon/src/api/rest_api.rs b/crates/paimon/src/api/rest_api.rs index 5333fcae..8721f48c 100644 --- a/crates/paimon/src/api/rest_api.rs +++ b/crates/paimon/src/api/rest_api.rs @@ -25,11 +25,12 @@ use std::collections::HashMap; use crate::api::rest_client::HttpClient; use crate::catalog::Identifier; use crate::common::{CatalogOptions, Options}; -use crate::spec::{Partition, PartitionStatistics, Schema, Snapshot}; +use crate::spec::{Partition, PartitionStatistics, Schema, SchemaChange, Snapshot}; use crate::Result; use super::api_request::{ - AlterDatabaseRequest, CreateDatabaseRequest, CreateTableRequest, RenameTableRequest, + AlterDatabaseRequest, AlterTableRequest, CreateDatabaseRequest, CreateTableRequest, + RenameTableRequest, }; use super::api_response::{ ConfigResponse, GetDatabaseResponse, GetTableResponse, ListDatabasesResponse, @@ -343,6 +344,21 @@ impl RESTApi { Ok(()) } + /// Alter a table's schema by applying a list of schema changes. + pub async fn alter_table( + &self, + identifier: &Identifier, + changes: Vec, + ) -> Result<()> { + let database = identifier.database(); + let table = identifier.object(); + validate_non_empty_multi(&[(database, "database name"), (table, "table name")])?; + let path = self.resource_paths.table(database, table); + let request = AlterTableRequest::new(changes); + let _resp: serde_json::Value = self.client.post(&path, &request).await?; + Ok(()) + } + /// Get table information. pub async fn get_table(&self, identifier: &Identifier) -> Result { let database = identifier.database(); diff --git a/crates/paimon/src/catalog/filesystem.rs b/crates/paimon/src/catalog/filesystem.rs index 7f18f952..7a1e8dc3 100644 --- a/crates/paimon/src/catalog/filesystem.rs +++ b/crates/paimon/src/catalog/filesystem.rs @@ -439,11 +439,29 @@ impl Catalog for FileSystemCatalog { full_name: identifier.full_name(), })?; - let new_schema = current.apply_changes(changes)?; + let new_schema = current + .apply_changes(changes) + .map_err(|e| fill_table_name(e, identifier))?; self.save_table_schema(&table_path, &new_schema).await } } +/// `TableSchema::apply_changes` returns column errors without a table name; +/// fill in the identifier's full name so the message identifies the table. +fn fill_table_name(err: Error, identifier: &Identifier) -> Error { + match err { + Error::ColumnNotExist { column, .. } => Error::ColumnNotExist { + full_name: identifier.full_name(), + column, + }, + Error::ColumnAlreadyExist { column, .. } => Error::ColumnAlreadyExist { + full_name: identifier.full_name(), + column, + }, + other => other, + } +} + #[cfg(test)] #[cfg(not(windows))] // Skip on Windows due to path compatibility issues mod tests { @@ -728,4 +746,201 @@ mod tests { ); } } + + use crate::spec::{ColumnMove, DataType, IntType, SchemaChange, VarCharType}; + + /// Two-column table (id INT, name VARCHAR) used by the alter-table tests. + fn two_column_schema() -> Schema { + Schema::builder() + .column("id", DataType::Int(IntType::new())) + .column("name", DataType::VarChar(VarCharType::string_type())) + .build() + .unwrap() + } + + async fn setup_table(catalog: &FileSystemCatalog, schema: Schema) -> Identifier { + catalog + .create_database("db", false, HashMap::new()) + .await + .unwrap(); + let id = Identifier::new("db", "t"); + catalog.create_table(&id, schema, false).await.unwrap(); + id + } + + #[tokio::test] + async fn test_alter_table_column_changes() { + let (_tmp, catalog) = create_test_catalog(); + let id = setup_table(&catalog, two_column_schema()).await; + + // Add a column at the end; it must take highest_field_id + 1. + catalog + .alter_table( + &id, + vec![SchemaChange::add_column( + "age".to_string(), + DataType::Int(IntType::new()), + )], + false, + ) + .await + .unwrap(); + let ts = catalog.get_table(&id).await.unwrap(); + let ts = ts.schema(); + let names: Vec<&str> = ts.fields().iter().map(|f| f.name()).collect(); + assert_eq!(names, vec!["id", "name", "age"]); + let age = ts.fields().iter().find(|f| f.name() == "age").unwrap(); + assert_eq!(age.id(), 2, "new column gets highest_field_id + 1"); + assert_eq!(ts.id(), 1, "schema version bumped"); + + // Add a column moved to the front. + catalog + .alter_table( + &id, + vec![SchemaChange::add_column_with_description_and_column_move( + "rowkey".to_string(), + DataType::Int(IntType::new()), + "primary".to_string(), + ColumnMove::move_first("rowkey".to_string()), + )], + false, + ) + .await + .unwrap(); + let ts = catalog.get_table(&id).await.unwrap(); + let ts = ts.schema(); + assert_eq!(ts.fields()[0].name(), "rowkey"); + assert_eq!(ts.fields()[0].description(), Some("primary")); + + // Rename, update comment, update type, update nullability, drop. + catalog + .alter_table( + &id, + vec![ + SchemaChange::rename_column("name".to_string(), "full_name".to_string()), + SchemaChange::update_column_comment("id".to_string(), "the id".to_string()), + SchemaChange::update_column_type( + "age".to_string(), + DataType::BigInt(crate::spec::BigIntType::new()), + ), + SchemaChange::update_column_nullability("id".to_string(), false), + SchemaChange::drop_column("rowkey".to_string()), + ], + false, + ) + .await + .unwrap(); + let ts = catalog.get_table(&id).await.unwrap(); + let ts = ts.schema(); + let names: Vec<&str> = ts.fields().iter().map(|f| f.name()).collect(); + assert_eq!(names, vec!["id", "full_name", "age"]); + let id_field = ts.fields().iter().find(|f| f.name() == "id").unwrap(); + assert_eq!(id_field.description(), Some("the id")); + assert!(!id_field.data_type().is_nullable()); + let age_field = ts.fields().iter().find(|f| f.name() == "age").unwrap(); + assert!(matches!(age_field.data_type(), DataType::BigInt(_))); + } + + #[tokio::test] + async fn test_alter_table_reposition_column() { + let (_tmp, catalog) = create_test_catalog(); + let id = setup_table(&catalog, two_column_schema()).await; + + // Move `name` before `id`. + catalog + .alter_table( + &id, + vec![SchemaChange::update_column_position( + ColumnMove::move_first("name".to_string()), + )], + false, + ) + .await + .unwrap(); + let ts = catalog.get_table(&id).await.unwrap(); + let names: Vec<&str> = ts.schema().fields().iter().map(|f| f.name()).collect(); + assert_eq!(names, vec!["name", "id"]); + } + + #[tokio::test] + async fn test_alter_table_errors() { + let (_tmp, catalog) = create_test_catalog(); + let id = setup_table(&catalog, two_column_schema()).await; + + // Add a duplicate column -> ColumnAlreadyExist. + let err = catalog + .alter_table( + &id, + vec![SchemaChange::add_column( + "name".to_string(), + DataType::Int(IntType::new()), + )], + false, + ) + .await + .unwrap_err(); + assert!( + matches!(err, Error::ColumnAlreadyExist { .. }), + "got {err:?}" + ); + + // Drop a missing column -> ColumnNotExist. + let err = catalog + .alter_table( + &id, + vec![SchemaChange::drop_column("ghost".to_string())], + false, + ) + .await + .unwrap_err(); + assert!(matches!(err, Error::ColumnNotExist { .. }), "got {err:?}"); + + // Altering a missing table: ignored vs error. + let missing = Identifier::new("db", "nope"); + catalog + .alter_table( + &missing, + vec![SchemaChange::update_column_comment( + "id".to_string(), + "x".to_string(), + )], + true, + ) + .await + .unwrap(); + let err = catalog + .alter_table( + &missing, + vec![SchemaChange::update_column_comment( + "id".to_string(), + "x".to_string(), + )], + false, + ) + .await + .unwrap_err(); + assert!(matches!(err, Error::TableNotExist { .. }), "got {err:?}"); + } + + #[tokio::test] + async fn test_alter_table_drop_primary_key_column_rejected() { + let (_tmp, catalog) = create_test_catalog(); + let schema = Schema::builder() + .column("id", DataType::Int(IntType::new())) + .column("name", DataType::VarChar(VarCharType::string_type())) + .primary_key(["id"]) + .build() + .unwrap(); + let id = setup_table(&catalog, schema).await; + + let err = catalog + .alter_table( + &id, + vec![SchemaChange::drop_column("id".to_string())], + false, + ) + .await + .unwrap_err(); + assert!(matches!(err, Error::Unsupported { .. }), "got {err:?}"); + } } diff --git a/crates/paimon/src/catalog/rest/rest_catalog.rs b/crates/paimon/src/catalog/rest/rest_catalog.rs index 09b93547..5ff5d6b0 100644 --- a/crates/paimon/src/catalog/rest/rest_catalog.rs +++ b/crates/paimon/src/catalog/rest/rest_catalog.rs @@ -330,13 +330,17 @@ impl Catalog for RESTCatalog { async fn alter_table( &self, - _identifier: &Identifier, - _changes: Vec, - _ignore_if_not_exists: bool, + identifier: &Identifier, + changes: Vec, + ignore_if_not_exists: bool, ) -> Result<()> { - // TODO: Implement alter_table when RESTApi supports it - Err(Error::Unsupported { - message: "Alter table is not yet implemented for REST catalog".to_string(), + let result = self + .api + .alter_table(identifier, changes) + .await + .map_err(|e| map_rest_error_for_table(e, identifier)); + ignore_error_if(result, |e| { + ignore_if_not_exists && matches!(e, Error::TableNotExist { .. }) }) } diff --git a/crates/paimon/src/spec/schema.rs b/crates/paimon/src/spec/schema.rs index 76d09b4c..4b51b061 100644 --- a/crates/paimon/src/spec/schema.rs +++ b/crates/paimon/src/spec/schema.rs @@ -17,7 +17,7 @@ use crate::spec::core_options::{first_row_supports_changelog_producer, CoreOptions}; use crate::spec::types::{ArrayType, DataType, MapType, MultisetType, RowType}; -use crate::spec::PartialUpdateConfig; +use crate::spec::{ColumnMove, ColumnMoveType, PartialUpdateConfig}; use serde::{Deserialize, Serialize}; use serde_with::serde_as; use std::collections::{HashMap, HashSet}; @@ -127,27 +127,153 @@ impl TableSchema { } /// Apply a list of schema changes and return a new schema with incremented ID. + /// + /// Column-level changes operate on **top-level** columns only: a + /// `field_names` path with more than one element (a nested struct field) is + /// rejected with [`crate::Error::Unsupported`]. + /// + /// Column errors ([`crate::Error::ColumnNotExist`] / + /// [`crate::Error::ColumnAlreadyExist`]) are returned with an empty table + /// name; the calling catalog fills in the table's full name. pub fn apply_changes(&self, changes: Vec) -> crate::Result { + use crate::spec::SchemaChange; + + // Column errors carry no table name here; the catalog layer fills it in. + let full_name = ""; + let mut new_schema = self.clone(); new_schema.id += 1; new_schema.time_millis = chrono::Utc::now().timestamp_millis(); + // Operate on an owned field list, then write it back. + let mut fields = std::mem::take(&mut new_schema.fields); + let mut highest_field_id = new_schema.highest_field_id; + for change in changes { match change { - crate::spec::SchemaChange::SetOption { key, value } => { + SchemaChange::SetOption { key, value } => { new_schema.options.insert(key, value); } - crate::spec::SchemaChange::RemoveOption { key } => { + SchemaChange::RemoveOption { key } => { new_schema.options.remove(&key); } - other => { - return Err(crate::Error::Unsupported { - message: format!("Schema change not yet supported: {other:?}"), - }); + SchemaChange::UpdateComment { comment } => { + new_schema.comment = comment; + } + SchemaChange::AddColumn { + field_names, + data_type, + comment, + column_move, + } => { + let name = top_level_field(&field_names)?; + if field_index(&fields, name).is_some() { + return Err(crate::Error::ColumnAlreadyExist { + full_name: full_name.to_string(), + column: name.to_string(), + }); + } + highest_field_id += 1; + let field = DataField::new(highest_field_id, name.to_string(), data_type) + .with_description(comment); + insert_field_with_move(&mut fields, field, column_move.as_ref(), full_name)?; + } + SchemaChange::RenameColumn { + field_names, + new_name, + } => { + let name = top_level_field(&field_names)?; + let idx = + field_index(&fields, name).ok_or_else(|| crate::Error::ColumnNotExist { + full_name: full_name.to_string(), + column: name.to_string(), + })?; + if new_name != name && field_index(&fields, &new_name).is_some() { + return Err(crate::Error::ColumnAlreadyExist { + full_name: full_name.to_string(), + column: new_name, + }); + } + fields[idx] = fields[idx].clone().with_name(new_name.clone()); + rename_in_keys(&mut new_schema.partition_keys, name, &new_name); + rename_in_keys(&mut new_schema.primary_keys, name, &new_name); + } + SchemaChange::DropColumn { field_names } => { + let name = top_level_field(&field_names)?; + let idx = + field_index(&fields, name).ok_or_else(|| crate::Error::ColumnNotExist { + full_name: full_name.to_string(), + column: name.to_string(), + })?; + if new_schema.partition_keys.iter().any(|k| k == name) + || new_schema.primary_keys.iter().any(|k| k == name) + { + return Err(crate::Error::Unsupported { + message: format!( + "Cannot drop partition or primary key column '{name}' of table {full_name}" + ), + }); + } + fields.remove(idx); + } + SchemaChange::UpdateColumnType { + field_names, + new_data_type, + keep_nullability, + } => { + let name = top_level_field(&field_names)?; + let idx = + field_index(&fields, name).ok_or_else(|| crate::Error::ColumnNotExist { + full_name: full_name.to_string(), + column: name.to_string(), + })?; + let old = &fields[idx]; + // Lenient: replace the type without cast-compatibility checks. + let target = if keep_nullability { + new_data_type.copy_with_nullable(old.data_type().is_nullable())? + } else { + new_data_type + }; + fields[idx] = DataField::new(old.id(), old.name().to_string(), target) + .with_description(old.description().map(|s| s.to_string())); + } + SchemaChange::UpdateColumnNullability { + field_names, + new_nullability, + } => { + let name = top_level_field(&field_names)?; + let idx = + field_index(&fields, name).ok_or_else(|| crate::Error::ColumnNotExist { + full_name: full_name.to_string(), + column: name.to_string(), + })?; + let old = &fields[idx]; + let nt = old.data_type().copy_with_nullable(new_nullability)?; + fields[idx] = DataField::new(old.id(), old.name().to_string(), nt) + .with_description(old.description().map(|s| s.to_string())); + } + SchemaChange::UpdateColumnComment { + field_names, + new_comment, + } => { + let name = top_level_field(&field_names)?; + let idx = + field_index(&fields, name).ok_or_else(|| crate::Error::ColumnNotExist { + full_name: full_name.to_string(), + column: name.to_string(), + })?; + fields[idx] = fields[idx].clone().with_description(Some(new_comment)); + } + SchemaChange::UpdateColumnPosition { column_move } => { + apply_move(&mut fields, &column_move, full_name)?; } } } + new_schema.fields = fields; + new_schema.highest_field_id = + highest_field_id.max(Self::current_highest_field_id(&new_schema.fields)); + Schema::validate_first_row_changelog_producer(&new_schema.options)?; Ok(new_schema) } @@ -180,6 +306,107 @@ impl TableSchema { } } +/// Extract the single top-level column name from a `field_names` path. +/// +/// Nested struct field paths (length > 1) are not yet supported. +fn top_level_field(field_names: &[String]) -> crate::Result<&str> { + match field_names { + [name] => Ok(name.as_str()), + [] => Err(crate::Error::ConfigInvalid { + message: "Schema change has empty fieldNames".to_string(), + }), + _ => Err(crate::Error::Unsupported { + message: format!("Altering nested struct fields is not supported yet: {field_names:?}"), + }), + } +} + +/// Index of the field with the given name, if any. +fn field_index(fields: &[DataField], name: &str) -> Option { + fields.iter().position(|f| f.name() == name) +} + +/// Rename a key in a partition/primary key list, if present. +fn rename_in_keys(keys: &mut [String], old: &str, new: &str) { + for key in keys.iter_mut() { + if key == old { + *key = new.to_string(); + } + } +} + +/// Insert a brand-new field according to an optional move (used by `AddColumn`). +fn insert_field_with_move( + fields: &mut Vec, + field: DataField, + column_move: Option<&ColumnMove>, + full_name: &str, +) -> crate::Result<()> { + let Some(mv) = column_move else { + fields.push(field); + return Ok(()); + }; + match mv.move_type() { + ColumnMoveType::FIRST => fields.insert(0, field), + ColumnMoveType::LAST => fields.push(field), + ColumnMoveType::AFTER | ColumnMoveType::BEFORE => { + let reference = move_reference(mv)?; + let ref_idx = + field_index(fields, reference).ok_or_else(|| crate::Error::ColumnNotExist { + full_name: full_name.to_string(), + column: reference.to_string(), + })?; + let at = match mv.move_type() { + ColumnMoveType::AFTER => ref_idx + 1, + _ => ref_idx, + }; + fields.insert(at, field); + } + } + Ok(()) +} + +/// Move an existing field to a new position (used by `UpdateColumnPosition`). +/// +/// Mirrors Java `SchemaManager.applyMove`: remove the field first, then resolve +/// the reference index in the reduced list so the offset is already adjusted. +fn apply_move(fields: &mut Vec, mv: &ColumnMove, full_name: &str) -> crate::Result<()> { + let idx = field_index(fields, mv.field_name()).ok_or_else(|| crate::Error::ColumnNotExist { + full_name: full_name.to_string(), + column: mv.field_name().to_string(), + })?; + let field = fields.remove(idx); + match mv.move_type() { + ColumnMoveType::FIRST => fields.insert(0, field), + ColumnMoveType::LAST => fields.push(field), + ColumnMoveType::AFTER | ColumnMoveType::BEFORE => { + let reference = move_reference(mv)?; + let ref_idx = + field_index(fields, reference).ok_or_else(|| crate::Error::ColumnNotExist { + full_name: full_name.to_string(), + column: reference.to_string(), + })?; + let at = match mv.move_type() { + ColumnMoveType::AFTER => ref_idx + 1, + _ => ref_idx, + }; + fields.insert(at, field); + } + } + Ok(()) +} + +/// The reference (anchor) field name required by `AFTER`/`BEFORE` moves. +fn move_reference(mv: &ColumnMove) -> crate::Result<&str> { + mv.reference_field_name() + .ok_or_else(|| crate::Error::ConfigInvalid { + message: format!( + "Move of type {:?} requires a reference field name", + mv.move_type() + ), + }) +} + pub const ROW_ID_FIELD_NAME: &str = "_ROW_ID"; pub const ROW_ID_FIELD_ID: i32 = i32::MAX - 5; diff --git a/crates/paimon/src/spec/schema_change.rs b/crates/paimon/src/spec/schema_change.rs index 6a11c0a4..9f69400b 100644 --- a/crates/paimon/src/spec/schema_change.rs +++ b/crates/paimon/src/spec/schema_change.rs @@ -20,77 +20,71 @@ use serde::{Deserialize, Serialize}; /// Schema change to table. /// -/// Reference: -#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] +/// The JSON wire format is kept compatible with Java Paimon's `SchemaChange`, +/// which is an internally-tagged polymorphic type (`@JsonTypeInfo` with an +/// `"action"` discriminator). Each variant therefore serializes as +/// `{"action": "", ...fields}` with `fieldNames` arrays (a column path; +/// only top-level single-element paths are currently applied — see +/// `TableSchema::apply_changes`). +/// +/// Reference: +// +// Note: `dropPrimaryKey` and `updateColumnDefaultValue` from Java are not yet +// modeled here; they are out of scope for the current alter-table support. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(tag = "action", rename_all = "camelCase")] pub enum SchemaChange { /// A SchemaChange to set a table option. - /// - /// Reference: SetOption { key: String, value: String }, /// A SchemaChange to remove a table option. - /// - /// Reference: RemoveOption { key: String }, /// A SchemaChange to update a table comment. - /// - /// Reference: UpdateComment { comment: Option }, /// A SchemaChange to add a new field. - /// - /// Reference: #[serde(rename_all = "camelCase")] AddColumn { - field_name: String, + field_names: Vec, data_type: DataType, - description: Option, + comment: Option, #[serde(rename = "move")] column_move: Option, }, /// A SchemaChange to rename a field. - /// - /// Reference: #[serde(rename_all = "camelCase")] RenameColumn { - field_name: String, + field_names: Vec, new_name: String, }, /// A SchemaChange to drop a field. - /// - /// Reference: #[serde(rename_all = "camelCase")] - DropColumn { field_name: String }, + DropColumn { field_names: Vec }, /// A SchemaChange to update the field's type. - /// - /// Reference: #[serde(rename_all = "camelCase")] UpdateColumnType { - field_name: String, - data_type: DataType, - }, - /// A SchemaChange to update the field's position. - /// - /// Reference: - #[serde(rename_all = "camelCase")] - UpdateColumnPosition { - #[serde(rename = "move")] - column_move: ColumnMove, + field_names: Vec, + new_data_type: DataType, + /// When true, keep the existing column's nullability instead of taking + /// it from `new_data_type`. + #[serde(default)] + keep_nullability: bool, }, /// A SchemaChange to update the field's nullability. - /// - /// Reference: #[serde(rename_all = "camelCase")] UpdateColumnNullability { - field_name: Vec, - nullable: bool, + field_names: Vec, + new_nullability: bool, }, /// A SchemaChange to update the (nested) field's comment. - /// - /// Reference: #[serde(rename_all = "camelCase")] UpdateColumnComment { field_names: Vec, - new_description: String, + new_comment: String, + }, + /// A SchemaChange to update the field's position. + #[serde(rename_all = "camelCase")] + UpdateColumnPosition { + #[serde(rename = "move")] + column_move: ColumnMove, }, } @@ -113,9 +107,9 @@ impl SchemaChange { /// impl the `add_column`. pub fn add_column(field_name: String, data_type: DataType) -> Self { SchemaChange::AddColumn { - field_name, + field_names: vec![field_name], data_type, - description: None, + comment: None, column_move: None, } } @@ -127,9 +121,9 @@ impl SchemaChange { description: String, ) -> Self { SchemaChange::AddColumn { - field_name, + field_names: vec![field_name], data_type, - description: Some(description), + comment: Some(description), column_move: None, } } @@ -142,9 +136,9 @@ impl SchemaChange { column_move: ColumnMove, ) -> Self { SchemaChange::AddColumn { - field_name, + field_names: vec![field_name], data_type, - description: Some(description), + comment: Some(description), column_move: Some(column_move), } } @@ -152,21 +146,24 @@ impl SchemaChange { /// impl the `rename_column`. pub fn rename_column(field_name: String, new_name: String) -> Self { SchemaChange::RenameColumn { - field_name, + field_names: vec![field_name], new_name, } } /// impl the `drop_column`. pub fn drop_column(field_name: String) -> Self { - SchemaChange::DropColumn { field_name } + SchemaChange::DropColumn { + field_names: vec![field_name], + } } /// impl the `update_column_type`. pub fn update_column_type(field_name: String, new_data_type: DataType) -> Self { SchemaChange::UpdateColumnType { - field_name, - data_type: new_data_type, + field_names: vec![field_name], + new_data_type, + keep_nullability: false, } } @@ -175,19 +172,19 @@ impl SchemaChange { SchemaChange::UpdateColumnPosition { column_move } } - /// impl the `update_column_position`. + /// impl the `update_column_nullability`. pub fn update_column_nullability(field_name: String, new_nullability: bool) -> Self { SchemaChange::UpdateColumnNullability { - field_name: vec![field_name], - nullable: new_nullability, + field_names: vec![field_name], + new_nullability, } } /// impl the `update_columns_nullability`. pub fn update_columns_nullability(field_names: Vec, new_nullability: bool) -> Self { SchemaChange::UpdateColumnNullability { - field_name: field_names, - nullable: new_nullability, + field_names, + new_nullability, } } @@ -195,7 +192,7 @@ impl SchemaChange { pub fn update_column_comment(field_name: String, comment: String) -> Self { SchemaChange::UpdateColumnComment { field_names: vec![field_name], - new_description: comment, + new_comment: comment, } } @@ -203,28 +200,32 @@ impl SchemaChange { pub fn update_columns_comment(field_names: Vec, comment: String) -> Self { SchemaChange::UpdateColumnComment { field_names, - new_description: comment, + new_comment: comment, } } } /// The type of move. /// -/// Reference: +/// Reference: #[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)] pub enum ColumnMoveType { FIRST, AFTER, + BEFORE, + LAST, } /// Represents a requested column move in a struct. /// -/// Reference: +/// Reference: #[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)] #[serde(rename_all = "camelCase")] pub struct ColumnMove { pub field_name: String, - pub referenced_field_name: Option, + /// The anchor column for `AFTER`/`BEFORE` moves (`None` for `FIRST`/`LAST`). + /// Named `referenceFieldName` on the wire to match Java Paimon. + pub reference_field_name: Option, #[serde(rename = "type")] pub move_type: ColumnMoveType, } @@ -235,9 +236,9 @@ impl ColumnMove { &self.field_name } - /// Get the referenced field name. - pub fn referenced_field_name(&self) -> Option<&str> { - self.referenced_field_name.as_deref() + /// Get the reference field name. + pub fn reference_field_name(&self) -> Option<&str> { + self.reference_field_name.as_deref() } /// Get the move type. @@ -249,19 +250,37 @@ impl ColumnMove { pub fn move_first(field_name: String) -> Self { ColumnMove { field_name, - referenced_field_name: None, + reference_field_name: None, move_type: ColumnMoveType::FIRST, } } + /// Create a new `Move` with `LAST` move type. + pub fn move_last(field_name: String) -> Self { + ColumnMove { + field_name, + reference_field_name: None, + move_type: ColumnMoveType::LAST, + } + } + /// Create a new `Move` with `AFTER` move type. - pub fn move_after(field_name: String, referenced_field_name: String) -> Self { + pub fn move_after(field_name: String, reference_field_name: String) -> Self { ColumnMove { field_name, - referenced_field_name: Some(referenced_field_name), + reference_field_name: Some(reference_field_name), move_type: ColumnMoveType::AFTER, } } + + /// Create a new `Move` with `BEFORE` move type. + pub fn move_before(field_name: String, reference_field_name: String) -> Self { + ColumnMove { + field_name, + reference_field_name: Some(reference_field_name), + move_type: ColumnMoveType::BEFORE, + } + } } #[cfg(test)] @@ -271,79 +290,66 @@ mod tests { #[test] fn test_schema_change_serialize_deserialize() { + // Java-compatible wire format: internally tagged by "action", with + // `fieldNames` arrays and `referenceFieldName` move anchors. let json_data = r#" [ { - "setOption": { - "key": "snapshot.time-retained", - "value": "2h" - } + "action": "setOption", + "key": "snapshot.time-retained", + "value": "2h" }, { - "removeOption": { - "key": "compaction.max.file-num" - } + "action": "removeOption", + "key": "compaction.max.file-num" }, { - "updateComment": { - "comment": "table.comment" - } + "action": "updateComment", + "comment": "table.comment" }, { - "addColumn": { + "action": "addColumn", + "fieldNames": ["col1"], + "dataType": "INT", + "comment": "col1_description", + "move": { "fieldName": "col1", - "dataType": "INT", - "description": "col1_description", - "move": { - "fieldName": "col1_first", - "referencedFieldName": null, - "type": "FIRST" - } + "referenceFieldName": null, + "type": "FIRST" } }, { - "renameColumn": { - "fieldName": "col3", - "newName": "col3_new_name" - } + "action": "renameColumn", + "fieldNames": ["col3"], + "newName": "col3_new_name" }, { - "dropColumn": { - "fieldName": "col1" - } + "action": "dropColumn", + "fieldNames": ["col1"] }, { - "updateColumnType": { - "fieldName": "col14", - "dataType": "DOUBLE" - } + "action": "updateColumnType", + "fieldNames": ["col14"], + "newDataType": "DOUBLE", + "keepNullability": false }, { - "updateColumnPosition": { - "move": { - "fieldName": "col4_first", - "referencedFieldName": null, - "type": "FIRST" - } + "action": "updateColumnPosition", + "move": { + "fieldName": "col4", + "referenceFieldName": "col3", + "type": "AFTER" } }, { - "updateColumnNullability": { - "fieldName": [ - "col5", - "f2" - ], - "nullable": false - } + "action": "updateColumnNullability", + "fieldNames": ["col5", "f2"], + "newNullability": false }, { - "updateColumnComment": { - "fieldNames": [ - "col5", - "f1" - ], - "newDescription": "col5 f1 field" - } + "action": "updateColumnComment", + "fieldNames": ["col5", "f1"], + "newComment": "col5 f1 field" } ]"#; @@ -364,57 +370,63 @@ mod tests { comment: Some("table.comment".to_string()), }, SchemaChange::AddColumn { - field_name: "col1".to_string(), + field_names: vec!["col1".to_string()], data_type: DataType::Int(IntType::new()), - description: Some("col1_description".to_string()), - column_move: Some(ColumnMove { - field_name: "col1_first".to_string(), - referenced_field_name: None, - move_type: ColumnMoveType::FIRST, - }), + comment: Some("col1_description".to_string()), + column_move: Some(ColumnMove::move_first("col1".to_string())), }, SchemaChange::RenameColumn { - field_name: "col3".to_string(), + field_names: vec!["col3".to_string()], new_name: "col3_new_name".to_string(), }, SchemaChange::DropColumn { - field_name: "col1".to_string(), + field_names: vec!["col1".to_string()], }, SchemaChange::UpdateColumnType { - field_name: "col14".to_string(), - data_type: DataType::Double(DoubleType::new()), + field_names: vec!["col14".to_string()], + new_data_type: DataType::Double(DoubleType::new()), + keep_nullability: false, }, SchemaChange::UpdateColumnPosition { - column_move: ColumnMove { - field_name: "col4_first".to_string(), - referenced_field_name: None, - move_type: ColumnMoveType::FIRST, - }, + column_move: ColumnMove::move_after("col4".to_string(), "col3".to_string()), }, SchemaChange::UpdateColumnNullability { - field_name: vec!["col5".to_string(), "f2".to_string()], - nullable: false, + field_names: vec!["col5".to_string(), "f2".to_string()], + new_nullability: false, }, SchemaChange::UpdateColumnComment { field_names: vec!["col5".to_string(), "f1".to_string()], - new_description: "col5 f1 field".to_string(), + new_comment: "col5 f1 field".to_string(), }, ] ); } + #[test] + fn test_schema_change_serialize_shape() { + // Verify the serialized JSON carries the Java "action" discriminator. + let change = SchemaChange::add_column("c".to_string(), DataType::Int(IntType::new())); + let value = serde_json::to_value(&change).unwrap(); + assert_eq!(value["action"], "addColumn"); + assert_eq!(value["fieldNames"][0], "c"); + + // Round-trip through JSON. + let round: SchemaChange = serde_json::from_value(value).unwrap(); + assert_eq!(round, change); + } + #[test] fn test_column_move_serialize_deserialize() { let json_data = r#" [ { "fieldName": "col1", - "referencedFieldName": null, + "referenceFieldName": null, "type": "FIRST" }, { "fieldName": "col2_after", - "referencedFieldName": "col2", + "referenceFieldName": "col2", "type": "AFTER" } ]"#; diff --git a/crates/paimon/tests/mock_server.rs b/crates/paimon/tests/mock_server.rs index 69abf233..6a9f38d0 100644 --- a/crates/paimon/tests/mock_server.rs +++ b/crates/paimon/tests/mock_server.rs @@ -34,8 +34,9 @@ use std::sync::{Arc, Mutex}; use tokio::task::JoinHandle; use paimon::api::{ - AlterDatabaseRequest, AuditRESTResponse, ConfigResponse, ErrorResponse, GetDatabaseResponse, - GetTableResponse, ListDatabasesResponse, ListTablesResponse, RenameTableRequest, ResourcePaths, + AlterDatabaseRequest, AlterTableRequest, AuditRESTResponse, ConfigResponse, ErrorResponse, + GetDatabaseResponse, GetTableResponse, ListDatabasesResponse, ListTablesResponse, + RenameTableRequest, ResourcePaths, }; #[derive(Clone, Debug, Default)] @@ -451,6 +452,40 @@ impl RESTServer { } } + /// Handle POST /databases/:db/tables/:table - alter a table. + /// + /// The mock does not mutate the stored schema; it only validates that the + /// table exists, which is enough to exercise the client's alter-table path + /// (request serialization + 2xx handling). + pub async fn alter_table( + Path((db, table)): Path<(String, String)>, + Extension(state): Extension>, + Json(_request): Json, + ) -> impl IntoResponse { + let s = state.inner.lock().unwrap(); + let key = format!("{db}.{table}"); + if s.no_permission_tables.contains(&key) { + let err = ErrorResponse::new( + Some("table".to_string()), + Some(table), + Some("No Permission".to_string()), + Some(403), + ); + return (StatusCode::FORBIDDEN, Json(err)).into_response(); + } + if s.tables.contains_key(&key) { + (StatusCode::OK, Json(serde_json::json!(""))).into_response() + } else { + let err = ErrorResponse::new( + Some("table".to_string()), + Some(table), + Some("Not Found".to_string()), + Some(404), + ); + (StatusCode::NOT_FOUND, Json(err)).into_response() + } + } + /// Handle POST /rename-table - rename a table. pub async fn rename_table( Extension(state): Extension>, @@ -722,7 +757,9 @@ pub async fn start_mock_server( ) .route( &format!("{prefix}/databases/:db/tables/:table"), - get(RESTServer::get_table).delete(RESTServer::drop_table), + get(RESTServer::get_table) + .post(RESTServer::alter_table) + .delete(RESTServer::drop_table), ) .route( &format!("{prefix}/tables/rename"), diff --git a/crates/paimon/tests/rest_catalog_test.rs b/crates/paimon/tests/rest_catalog_test.rs index 2166b44d..70a86c5c 100644 --- a/crates/paimon/tests/rest_catalog_test.rs +++ b/crates/paimon/tests/rest_catalog_test.rs @@ -25,7 +25,7 @@ use std::collections::HashMap; use paimon::api::ConfigResponse; use paimon::catalog::{Catalog, Identifier, RESTCatalog}; use paimon::common::Options; -use paimon::spec::{BigIntType, DataType, Schema, VarCharType}; +use paimon::spec::{BigIntType, DataType, Schema, SchemaChange, VarCharType}; mod mock_server; use mock_server::{start_mock_server, RESTServer}; @@ -465,17 +465,35 @@ async fn test_catalog_rename_table_ignore_if_not_exists() { // ==================== Alter Table Tests ==================== #[tokio::test] -async fn test_catalog_alter_table_unsupported() { +async fn test_catalog_alter_table() { let ctx = setup_catalog(vec!["default"]).await; let identifier = Identifier::new("default", "some_table"); - - // alter_table should return Unsupported error - let result = ctx.catalog.alter_table(&identifier, vec![], false).await; - assert!( - result.is_err(), - "alter_table should return Unsupported error" - ); + ctx.catalog + .create_table(&identifier, test_schema(), false) + .await + .unwrap(); + + // alter_table on an existing table succeeds (client builds the request and + // POSTs it; the mock accepts it). + let changes = vec![SchemaChange::update_column_comment( + "id".to_string(), + "the id".to_string(), + )]; + let result = ctx.catalog.alter_table(&identifier, changes, false).await; + assert!(result.is_ok(), "alter_table should succeed: {result:?}"); + + // alter_table on a missing table: error, unless ignore_if_not_exists. + let missing = Identifier::new("default", "ghost"); + assert!(ctx + .catalog + .alter_table(&missing, vec![], false) + .await + .is_err()); + ctx.catalog + .alter_table(&missing, vec![], true) + .await + .unwrap(); } // ==================== Multiple Databases Tests ====================