Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 132 additions & 19 deletions crates/integrations/datafusion/src/sql_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,10 @@ use datafusion::sql::sqlparser::ast::{
use datafusion::sql::sqlparser::dialect::GenericDialect;
use datafusion::sql::sqlparser::parser::Parser;
use paimon::catalog::{Catalog, Identifier};
use paimon::spec::SchemaChange;
use paimon::spec::{
ArrayType as PaimonArrayType, BlobType, DataField as PaimonDataField,
DataType as PaimonDataType, MapType as PaimonMapType, RowType as PaimonRowType, SchemaChange,
};

use crate::error::to_datafusion_error;
use paimon::arrow::arrow_to_paimon_type;
Expand Down Expand Up @@ -136,15 +139,7 @@ impl PaimonSqlHandler {

// Columns
for col in &ct.columns {
let arrow_type = sql_data_type_to_arrow(&col.data_type)?;
let nullable = !col.options.iter().any(|opt| {
matches!(
opt.option,
datafusion::sql::sqlparser::ast::ColumnOption::NotNull
)
});
let paimon_type =
arrow_to_paimon_type(&arrow_type, nullable).map_err(to_datafusion_error)?;
let paimon_type = column_def_to_paimon_type(col)?;
builder = builder.column(col.name.value.clone(), paimon_type);
}

Expand Down Expand Up @@ -324,20 +319,87 @@ impl PaimonSqlHandler {

/// Convert a sqlparser [`ColumnDef`] to a Paimon [`SchemaChange::AddColumn`].
fn column_def_to_add_column(col: &ColumnDef) -> DFResult<SchemaChange> {
let arrow_type = sql_data_type_to_arrow(&col.data_type)?;
let nullable = !col.options.iter().any(|opt| {
matches!(
opt.option,
datafusion::sql::sqlparser::ast::ColumnOption::NotNull
)
});
let paimon_type = arrow_to_paimon_type(&arrow_type, nullable).map_err(to_datafusion_error)?;
let paimon_type = column_def_to_paimon_type(col)?;
Ok(SchemaChange::add_column(
col.name.value.clone(),
paimon_type,
))
}

fn column_def_to_paimon_type(col: &ColumnDef) -> DFResult<PaimonDataType> {
sql_data_type_to_paimon_type(&col.data_type, column_def_nullable(col))
}

fn column_def_nullable(col: &ColumnDef) -> bool {
!col.options.iter().any(|opt| {
matches!(
opt.option,
datafusion::sql::sqlparser::ast::ColumnOption::NotNull
)
})
}

/// Convert a sqlparser SQL data type to a Paimon data type.
///
/// DDL schema translation must use this function instead of going through Arrow,
/// because Arrow cannot preserve logical distinctions such as `BLOB` vs `VARBINARY`.
fn sql_data_type_to_paimon_type(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we have this, we should remove sql_data_type_to_arrow.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JingsongLi Thanks for the review. sql_data_type_to_arrow has been removed, and DDL type translation now goes directly through sql_data_type_to_paimon_type.

sql_type: &datafusion::sql::sqlparser::ast::DataType,
nullable: bool,
) -> DFResult<PaimonDataType> {
use datafusion::sql::sqlparser::ast::{ArrayElemTypeDef, DataType as SqlType};

match sql_type {
SqlType::Blob(_) => Ok(PaimonDataType::Blob(BlobType::with_nullable(nullable))),
SqlType::Array(elem_def) => {
let element_type = match elem_def {
ArrayElemTypeDef::AngleBracket(t)
| ArrayElemTypeDef::SquareBracket(t, _)
| ArrayElemTypeDef::Parenthesis(t) => sql_data_type_to_paimon_type(t, true)?,
ArrayElemTypeDef::None => {
return Err(DataFusionError::Plan(
"ARRAY type requires an element type".to_string(),
));
}
};
Ok(PaimonDataType::Array(PaimonArrayType::with_nullable(
nullable,
element_type,
)))
}
SqlType::Map(key_type, value_type) => {
let key = sql_data_type_to_paimon_type(key_type, false)?;
let value = sql_data_type_to_paimon_type(value_type, true)?;
Ok(PaimonDataType::Map(PaimonMapType::with_nullable(
nullable, key, value,
)))
}
SqlType::Struct(fields, _) => {
let paimon_fields = fields
.iter()
.enumerate()
.map(|(idx, field)| {
let name = field
.field_name
.as_ref()
.map(|n| n.value.clone())
.unwrap_or_default();
let data_type = sql_data_type_to_paimon_type(&field.field_type, true)?;
Ok(PaimonDataField::new(idx as i32, name, data_type))
})
.collect::<DFResult<Vec<_>>>()?;
Ok(PaimonDataType::Row(PaimonRowType::with_nullable(
nullable,
paimon_fields,
)))
}
_ => {
let arrow_type = sql_data_type_to_arrow(sql_type)?;
arrow_to_paimon_type(&arrow_type, nullable).map_err(to_datafusion_error)
}
}
}

/// Convert a sqlparser SQL data type to an Arrow data type.
fn sql_data_type_to_arrow(
sql_type: &datafusion::sql::sqlparser::ast::DataType,
Expand Down Expand Up @@ -496,7 +558,7 @@ mod tests {
use async_trait::async_trait;
use datafusion::arrow::datatypes::TimeUnit;
use paimon::catalog::Database;
use paimon::spec::Schema as PaimonSchema;
use paimon::spec::{DataType as PaimonDataType, Schema as PaimonSchema};
use paimon::table::Table;

// ==================== Mock Catalog ====================
Expand Down Expand Up @@ -1040,6 +1102,30 @@ mod tests {
}
}

#[tokio::test]
async fn test_create_table_blob_type_preserved() {
let catalog = Arc::new(MockCatalog::new());
let handler = make_handler(catalog.clone());

handler
.sql("CREATE TABLE mydb.t1 (payload BLOB NOT NULL)")
.await
.unwrap();

let calls = catalog.take_calls();
assert_eq!(calls.len(), 1);
if let CatalogCall::CreateTable { schema, .. } = &calls[0] {
assert_eq!(schema.fields().len(), 1);
assert!(matches!(
schema.fields()[0].data_type(),
PaimonDataType::Blob(_)
));
assert!(!schema.fields()[0].data_type().is_nullable());
} else {
panic!("expected CreateTable call");
}
}

#[tokio::test]
async fn test_alter_table_add_column() {
let catalog = Arc::new(MockCatalog::new());
Expand Down Expand Up @@ -1069,6 +1155,33 @@ mod tests {
}
}

#[tokio::test]
async fn test_alter_table_add_blob_column() {
let catalog = Arc::new(MockCatalog::new());
let handler = make_handler(catalog.clone());

handler
.sql("ALTER TABLE mydb.t1 ADD COLUMN payload BLOB")
.await
.unwrap();

let calls = catalog.take_calls();
assert_eq!(calls.len(), 1);
if let CatalogCall::AlterTable { changes, .. } = &calls[0] {
assert_eq!(changes.len(), 1);
assert!(matches!(
&changes[0],
SchemaChange::AddColumn {
field_name,
data_type,
..
} if field_name == "payload" && matches!(data_type, PaimonDataType::Blob(_))
));
} else {
panic!("expected AlterTable call");
}
}

#[tokio::test]
async fn test_alter_table_drop_column() {
let catalog = Arc::new(MockCatalog::new());
Expand Down
36 changes: 35 additions & 1 deletion crates/integrations/datafusion/tests/sql_handler_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use std::sync::Arc;
use datafusion::catalog::CatalogProvider;
use datafusion::prelude::SessionContext;
use paimon::catalog::Identifier;
use paimon::spec::{ArrayType, DataType, IntType, MapType, VarCharType};
use paimon::spec::{ArrayType, BlobType, DataType, IntType, MapType, VarCharType};
use paimon::{Catalog, CatalogOptions, FileSystemCatalog, Options};
use paimon_datafusion::{PaimonCatalogProvider, PaimonRelationPlanner, PaimonSqlHandler};
use tempfile::TempDir;
Expand Down Expand Up @@ -147,6 +147,40 @@ async fn test_create_table() {
assert_eq!(schema.primary_keys(), &["id"]);
}

#[tokio::test]
async fn test_create_table_with_blob_type() {
let (_tmp, catalog) = create_test_env();
let handler = create_handler(catalog.clone());

catalog
.create_database("mydb", false, Default::default())
.await
.unwrap();

handler
.sql(
"CREATE TABLE paimon.mydb.assets (
id INT NOT NULL,
payload BLOB,
PRIMARY KEY (id)
)",
)
.await
.expect("CREATE TABLE with BLOB should succeed");

let table = catalog
.get_table(&Identifier::new("mydb", "assets"))
.await
.unwrap();
let schema = table.schema();
assert_eq!(schema.fields().len(), 2);
assert_eq!(schema.primary_keys(), &["id"]);
assert_eq!(
*schema.fields()[1].data_type(),
DataType::Blob(BlobType::new())
);
}

#[tokio::test]
async fn test_create_table_with_partition() {
let (_tmp, catalog) = create_test_env();
Expand Down
1 change: 1 addition & 0 deletions crates/paimon/src/arrow/format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -755,6 +755,7 @@ fn literal_scalar_for_parquet_filter(
DataType::Time(_)
| DataType::Timestamp(_)
| DataType::LocalZonedTimestamp(_)
| DataType::Blob(_)
| DataType::Array(_)
| DataType::Map(_)
| DataType::Multiset(_)
Expand Down
15 changes: 14 additions & 1 deletion crates/paimon/src/arrow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ pub fn paimon_type_to_arrow(dt: &PaimonDataType) -> crate::Result<ArrowDataType>
PaimonDataType::Float(_) => ArrowDataType::Float32,
PaimonDataType::Double(_) => ArrowDataType::Float64,
PaimonDataType::VarChar(_) | PaimonDataType::Char(_) => ArrowDataType::Utf8,
PaimonDataType::Binary(_) | PaimonDataType::VarBinary(_) => ArrowDataType::Binary,
PaimonDataType::Binary(_) | PaimonDataType::VarBinary(_) | PaimonDataType::Blob(_) => {
ArrowDataType::Binary
}
PaimonDataType::Date(_) => ArrowDataType::Date32,
PaimonDataType::Time(_) => ArrowDataType::Time32(TimeUnit::Millisecond),
PaimonDataType::Timestamp(t) => {
Expand Down Expand Up @@ -341,6 +343,17 @@ mod tests {
}
}

#[test]
fn test_blob_type_maps_one_way_to_arrow_binary() {
let blob = PaimonDataType::Blob(BlobType::new());
let varbinary = PaimonDataType::VarBinary(
VarBinaryType::try_new(true, VarBinaryType::MAX_LENGTH).unwrap(),
);

assert_paimon_to_arrow(&blob, &ArrowDataType::Binary);
assert_arrow_to_paimon(&ArrowDataType::Binary, true, &varbinary);
}

#[test]
fn test_timestamp_roundtrip() {
// millisecond precision
Expand Down
1 change: 1 addition & 0 deletions crates/paimon/src/spec/partition_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ fn format_partition_value(
| DataType::Double(_)
| DataType::Binary(_)
| DataType::VarBinary(_)
| DataType::Blob(_)
| DataType::Array(_)
| DataType::Map(_)
| DataType::Multiset(_)
Expand Down
Loading
Loading