Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions crates/integrations/datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,15 @@ chrono = "0.4"
datafusion = { workspace = true }
paimon = { path = "../../paimon" }
futures = "0.3"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
tokio = { workspace = true, features = ["rt", "time", "fs"] }

[dev-dependencies]
arrow-array = { workspace = true }
arrow-schema = { workspace = true }
flate2 = "1"
parquet = { workspace = true }
serde = "1"
serde_json = "1"
tar = "0.4"
tempfile = "3"
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
6 changes: 5 additions & 1 deletion crates/integrations/datafusion/src/system_tables/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,11 @@ use paimon::table::Table;
use crate::error::to_datafusion_error;

mod options;
mod schemas;

type Builder = fn(Table) -> DFResult<Arc<dyn TableProvider>>;

const TABLES: &[(&str, Builder)] = &[("options", options::build)];
const TABLES: &[(&str, Builder)] = &[("options", options::build), ("schemas", schemas::build)];

/// Parse a Paimon object name into `(base_table, optional system_table_name)`.
///
Expand Down Expand Up @@ -117,6 +118,9 @@ mod tests {
assert!(is_registered("options"));
assert!(is_registered("Options"));
assert!(is_registered("OPTIONS"));
assert!(is_registered("schemas"));
assert!(is_registered("Schemas"));
assert!(is_registered("SCHEMAS"));
assert!(!is_registered("nonsense"));
}

Expand Down
138 changes: 138 additions & 0 deletions crates/integrations/datafusion/src/system_tables/schemas.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Mirrors Java [SchemasTable](https://github.com/apache/paimon/blob/release-1.3/paimon-core/src/main/java/org/apache/paimon/table/system/SchemasTable.java).

use std::any::Any;
use std::sync::{Arc, OnceLock};

use async_trait::async_trait;
use datafusion::arrow::array::{Int64Array, RecordBatch, StringArray, TimestampMillisecondArray};
use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit};
use datafusion::catalog::Session;
use datafusion::datasource::memory::MemorySourceConfig;
use datafusion::datasource::{TableProvider, TableType};
use datafusion::error::{DataFusionError, Result as DFResult};
use datafusion::logical_expr::Expr;
use datafusion::physical_plan::ExecutionPlan;
use paimon::table::Table;
use serde::Serialize;

use crate::error::to_datafusion_error;

pub(super) fn build(table: Table) -> DFResult<Arc<dyn TableProvider>> {
Ok(Arc::new(SchemasTable { table }))
}

fn schemas_schema() -> SchemaRef {
static SCHEMA: OnceLock<SchemaRef> = OnceLock::new();
SCHEMA
.get_or_init(|| {
Arc::new(Schema::new(vec![
Field::new("schema_id", DataType::Int64, false),
Field::new("fields", DataType::Utf8, false),
Field::new("partition_keys", DataType::Utf8, false),
Field::new("primary_keys", DataType::Utf8, false),
Field::new("options", DataType::Utf8, false),
Field::new("comment", DataType::Utf8, true),
Field::new(
"update_time",
DataType::Timestamp(TimeUnit::Millisecond, None),
false,
),
]))
})
.clone()
}

#[derive(Debug)]
struct SchemasTable {
table: Table,
}

#[async_trait]
impl TableProvider for SchemasTable {
fn as_any(&self) -> &dyn Any {
self
}

fn schema(&self) -> SchemaRef {
schemas_schema()
}

fn table_type(&self) -> TableType {
TableType::View
}

async fn scan(
&self,
_state: &dyn Session,
projection: Option<&Vec<usize>>,
_filters: &[Expr],
_limit: Option<usize>,
) -> DFResult<Arc<dyn ExecutionPlan>> {
let schemas = self
.table
.schema_manager()
.list_all()
.await
.map_err(to_datafusion_error)?;

let n = schemas.len();
let mut schema_ids: Vec<i64> = Vec::with_capacity(n);
let mut fields_json: Vec<String> = Vec::with_capacity(n);
let mut partition_keys_json: Vec<String> = Vec::with_capacity(n);
let mut primary_keys_json: Vec<String> = Vec::with_capacity(n);
let mut options_json: Vec<String> = Vec::with_capacity(n);
let mut comments: Vec<Option<String>> = Vec::with_capacity(n);
let mut update_times: Vec<i64> = Vec::with_capacity(n);

for schema in &schemas {
schema_ids.push(schema.id());
fields_json.push(to_json(schema.fields())?);
partition_keys_json.push(to_json(schema.partition_keys())?);
primary_keys_json.push(to_json(schema.primary_keys())?);
options_json.push(to_json(schema.options())?);
comments.push(schema.comment().map(str::to_string));
update_times.push(schema.time_millis());
}

let schema = schemas_schema();
let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int64Array::from(schema_ids)),
Arc::new(StringArray::from(fields_json)),
Arc::new(StringArray::from(partition_keys_json)),
Arc::new(StringArray::from(primary_keys_json)),
Arc::new(StringArray::from(options_json)),
Arc::new(StringArray::from(comments)),
Arc::new(TimestampMillisecondArray::from(update_times)),
],
)?;

Ok(MemorySourceConfig::try_new_exec(
&[vec![batch]],
schema,
projection.cloned(),
)?)
}
}

fn to_json<T: Serialize + ?Sized>(value: &T) -> DFResult<String> {
serde_json::to_string(value).map_err(|e| DataFusionError::External(Box::new(e)))
}
109 changes: 107 additions & 2 deletions crates/integrations/datafusion/tests/system_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@
// specific language governing permissions and limitations
// under the License.

//! Paimon `$options` system table end-to-end via DataFusion SQL.
//! Paimon system tables end-to-end via DataFusion SQL.

use std::sync::Arc;

use datafusion::arrow::array::{Array, StringArray};
use datafusion::arrow::array::{Array, Int64Array, StringArray};
use datafusion::arrow::datatypes::{DataType, TimeUnit};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::prelude::SessionContext;
use paimon::catalog::Identifier;
Expand Down Expand Up @@ -129,6 +130,110 @@ async fn test_unknown_system_table_name_returns_not_found() {
);
}

#[tokio::test]
async fn test_schemas_system_table() {
let (ctx, catalog, _tmp) = create_context().await;
let sql = format!("SELECT * FROM paimon.default.{FIXTURE_TABLE}$schemas");
let batches = run_sql(&ctx, &sql).await;

assert!(!batches.is_empty(), "$schemas should return ≥1 batch");

let arrow_schema = batches[0].schema();
let expected_columns = [
("schema_id", DataType::Int64),
("fields", DataType::Utf8),
("partition_keys", DataType::Utf8),
("primary_keys", DataType::Utf8),
("options", DataType::Utf8),
("comment", DataType::Utf8),
(
"update_time",
DataType::Timestamp(TimeUnit::Millisecond, None),
),
];
for (i, (name, dtype)) in expected_columns.iter().enumerate() {
let field = arrow_schema.field(i);
assert_eq!(field.name(), name, "column {i} name");
assert_eq!(field.data_type(), dtype, "column {i} type");
}

let identifier = Identifier::new("default".to_string(), FIXTURE_TABLE.to_string());
let table = catalog
.get_table(&identifier)
.await
.expect("fixture table should load");
let all_schemas = table
.schema_manager()
.list_all()
.await
.expect("list_all should succeed");
let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
assert_eq!(
total_rows,
all_schemas.len(),
"$schemas rows should match list_all() length"
);

let mut ids: Vec<i64> = Vec::new();
for batch in &batches {
let col = batch
.column(0)
.as_any()
.downcast_ref::<Int64Array>()
.expect("schema_id is Int64");
for i in 0..batch.num_rows() {
ids.push(col.value(i));
}
}
let mut sorted = ids.clone();
sorted.sort_unstable();
assert_eq!(ids, sorted, "schema_id column should be ascending");

// The last row's JSON columns must round-trip to the current schema.
let last_batch = batches.last().unwrap();
let last_idx = last_batch.num_rows() - 1;
let latest = table.schema();
let json_columns: [(usize, &str, String); 4] = [
(
1,
"fields",
serde_json::to_string(latest.fields()).expect("serialize fields"),
),
(
2,
"partition_keys",
serde_json::to_string(latest.partition_keys()).expect("serialize partition_keys"),
),
(
3,
"primary_keys",
serde_json::to_string(latest.primary_keys()).expect("serialize primary_keys"),
),
(
4,
"options",
serde_json::to_string(latest.options()).expect("serialize options"),
),
];
for (col_idx, col_name, expected) in &json_columns {
let col = last_batch
.column(*col_idx)
.as_any()
.downcast_ref::<StringArray>()
.unwrap_or_else(|| panic!("column {col_name} is not Utf8"));
// Parse both sides before comparing: `options` is a HashMap whose
// JSON key order is non-deterministic across `HashMap` instances.
let actual: serde_json::Value = serde_json::from_str(col.value(last_idx))
.unwrap_or_else(|e| panic!("parse actual `{col_name}`: {e}"));
let expected: serde_json::Value = serde_json::from_str(expected)
.unwrap_or_else(|e| panic!("parse expected `{col_name}`: {e}"));
assert_eq!(
actual, expected,
"latest-row `{col_name}` JSON should round-trip"
);
}
}

#[tokio::test]
async fn test_missing_base_table_for_system_table_errors() {
let (ctx, _catalog, _tmp) = create_context().await;
Expand Down
Loading
Loading