Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
494 changes: 286 additions & 208 deletions crates/integrations/datafusion/src/sql_handler.rs

Large diffs are not rendered by default.

36 changes: 35 additions & 1 deletion crates/integrations/datafusion/tests/sql_handler_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use std::sync::Arc;
use datafusion::catalog::CatalogProvider;
use datafusion::prelude::SessionContext;
use paimon::catalog::Identifier;
use paimon::spec::{ArrayType, DataType, IntType, MapType, VarCharType};
use paimon::spec::{ArrayType, BlobType, DataType, IntType, MapType, VarCharType};
use paimon::{Catalog, CatalogOptions, FileSystemCatalog, Options};
use paimon_datafusion::{PaimonCatalogProvider, PaimonRelationPlanner, PaimonSqlHandler};
use tempfile::TempDir;
Expand Down Expand Up @@ -147,6 +147,40 @@ async fn test_create_table() {
assert_eq!(schema.primary_keys(), &["id"]);
}

#[tokio::test]
async fn test_create_table_with_blob_type() {
let (_tmp, catalog) = create_test_env();
let handler = create_handler(catalog.clone());

catalog
.create_database("mydb", false, Default::default())
.await
.unwrap();

handler
.sql(
"CREATE TABLE paimon.mydb.assets (
id INT NOT NULL,
payload BLOB,
PRIMARY KEY (id)
)",
)
.await
.expect("CREATE TABLE with BLOB should succeed");

let table = catalog
.get_table(&Identifier::new("mydb", "assets"))
.await
.unwrap();
let schema = table.schema();
assert_eq!(schema.fields().len(), 2);
assert_eq!(schema.primary_keys(), &["id"]);
assert_eq!(
*schema.fields()[1].data_type(),
DataType::Blob(BlobType::new())
);
}

#[tokio::test]
async fn test_create_table_with_partition() {
let (_tmp, catalog) = create_test_env();
Expand Down
1 change: 1 addition & 0 deletions crates/paimon/src/arrow/format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -755,6 +755,7 @@ fn literal_scalar_for_parquet_filter(
DataType::Time(_)
| DataType::Timestamp(_)
| DataType::LocalZonedTimestamp(_)
| DataType::Blob(_)
| DataType::Array(_)
| DataType::Map(_)
| DataType::Multiset(_)
Expand Down
15 changes: 14 additions & 1 deletion crates/paimon/src/arrow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ pub fn paimon_type_to_arrow(dt: &PaimonDataType) -> crate::Result<ArrowDataType>
PaimonDataType::Float(_) => ArrowDataType::Float32,
PaimonDataType::Double(_) => ArrowDataType::Float64,
PaimonDataType::VarChar(_) | PaimonDataType::Char(_) => ArrowDataType::Utf8,
PaimonDataType::Binary(_) | PaimonDataType::VarBinary(_) => ArrowDataType::Binary,
PaimonDataType::Binary(_) | PaimonDataType::VarBinary(_) | PaimonDataType::Blob(_) => {
ArrowDataType::Binary
}
PaimonDataType::Date(_) => ArrowDataType::Date32,
PaimonDataType::Time(_) => ArrowDataType::Time32(TimeUnit::Millisecond),
PaimonDataType::Timestamp(t) => {
Expand Down Expand Up @@ -341,6 +343,17 @@ mod tests {
}
}

#[test]
fn test_blob_type_maps_one_way_to_arrow_binary() {
let blob = PaimonDataType::Blob(BlobType::new());
let varbinary = PaimonDataType::VarBinary(
VarBinaryType::try_new(true, VarBinaryType::MAX_LENGTH).unwrap(),
);

assert_paimon_to_arrow(&blob, &ArrowDataType::Binary);
assert_arrow_to_paimon(&ArrowDataType::Binary, true, &varbinary);
}

#[test]
fn test_timestamp_roundtrip() {
// millisecond precision
Expand Down
1 change: 1 addition & 0 deletions crates/paimon/src/spec/partition_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ fn format_partition_value(
| DataType::Double(_)
| DataType::Binary(_)
| DataType::VarBinary(_)
| DataType::Blob(_)
| DataType::Array(_)
| DataType::Map(_)
| DataType::Multiset(_)
Expand Down
66 changes: 65 additions & 1 deletion crates/paimon/src/spec/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ pub enum DataType {
Binary(BinaryType),
/// Data type of a variable-length binary string (=a sequence of bytes).
VarBinary(VarBinaryType),
/// Data type of binary large object.
Blob(BlobType),
/// Data type of a fixed-length character string.
Char(CharType),
/// Data type of a variable-length character string.
Expand Down Expand Up @@ -117,6 +119,20 @@ impl DataType {
}
}

/// Returns whether this type is or contains (recursively) a [`BlobType`].
pub fn contains_blob_type(&self) -> bool {
match self {
DataType::Blob(_) => true,
DataType::Array(v) => v.element_type.contains_blob_type(),
DataType::Map(v) => {
v.key_type.contains_blob_type() || v.value_type.contains_blob_type()
}
DataType::Multiset(v) => v.element_type.contains_blob_type(),
DataType::Row(v) => v.fields.iter().any(|f| f.data_type().contains_blob_type()),
_ => false,
}
}

/// Returns whether this type is nullable.
pub fn is_nullable(&self) -> bool {
match self {
Expand All @@ -130,6 +146,7 @@ impl DataType {
DataType::Float(v) => v.nullable,
DataType::Binary(v) => v.nullable,
DataType::VarBinary(v) => v.nullable,
DataType::Blob(v) => v.nullable,
DataType::Char(v) => v.nullable,
DataType::VarChar(v) => v.nullable,
DataType::Date(v) => v.nullable,
Expand Down Expand Up @@ -165,6 +182,7 @@ impl DataType {
DataType::VarBinary(v) => {
DataType::VarBinary(VarBinaryType::try_new(nullable, v.length())?)
}
DataType::Blob(_) => DataType::Blob(BlobType::with_nullable(nullable)),
DataType::Char(v) => DataType::Char(CharType::with_nullable(nullable, v.length())?),
DataType::VarChar(v) => {
DataType::VarChar(VarCharType::with_nullable(nullable, v.length())?)
Expand Down Expand Up @@ -386,6 +404,39 @@ impl BooleanType {
}
}

/// BlobType for paimon.
///
/// Data type of binary large object.
///
/// Impl Reference: <https://github.com/apache/paimon/blob/master/paimon-api/src/main/java/org/apache/paimon/types/BlobType.java>.
#[serde_as]
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(transparent)]
pub struct BlobType {
#[serde_as(as = "FromInto<serde_utils::NullableType<serde_utils::BLOB>>")]
nullable: bool,
}

impl Default for BlobType {
fn default() -> Self {
Self::new()
}
}

impl BlobType {
pub fn new() -> Self {
Self::with_nullable(true)
}

pub fn with_nullable(nullable: bool) -> Self {
Self { nullable }
}

pub fn family(&self) -> DataTypeFamily {
DataTypeFamily::PREDEFINED
}
}

/// CharType for paimon.
///
/// Data type of a fixed-length character string.
Expand Down Expand Up @@ -1470,6 +1521,11 @@ mod serde_utils {
const NAME: &'static str = "BOOLEAN";
}

pub struct BLOB;
impl DataTypeName for BLOB {
const NAME: &'static str = "BLOB";
}

pub struct BINARY;
impl DataTypeName for BINARY {
const NAME: &'static str = "BINARY";
Expand Down Expand Up @@ -1655,7 +1711,10 @@ mod tests {

let content = std::fs::read(&path)
.unwrap_or_else(|err| panic!("fixtures {path:?} load failed: {err}"));
String::from_utf8(content).expect("fixtures content must be valid utf8")
String::from_utf8(content)
.expect("fixtures content must be valid utf8")
.trim_end_matches(['\n', '\r'])
.to_string()
}

fn test_cases() -> Vec<(&'static str, DataType)> {
Expand Down Expand Up @@ -1696,6 +1755,11 @@ mod tests {
length: 22,
}),
),
("blob_type", DataType::Blob(BlobType { nullable: false })),
(
"blob_type_nullable",
DataType::Blob(BlobType { nullable: true }),
),
(
"boolean_type",
DataType::Boolean(BooleanType { nullable: false }),
Expand Down
80 changes: 79 additions & 1 deletion crates/paimon/src/table/data_evolution_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,14 @@ pub struct DataEvolutionWriter {
matched_batches: Vec<RecordBatch>,
}

fn schema_contains_blob_type(table: &Table) -> bool {
table
.schema()
.fields()
.iter()
.any(|field| field.data_type().contains_blob_type())
}

impl DataEvolutionWriter {
/// Create a new writer for the given table and update columns.
///
Expand All @@ -73,6 +81,14 @@ impl DataEvolutionWriter {
let schema = table.schema();
let core_options = CoreOptions::new(schema.options());

if schema_contains_blob_type(table) {
return Err(crate::Error::Unsupported {
message:
"MERGE INTO does not support BlobType yet; blob write path is out of scope"
.to_string(),
});
}

if !core_options.data_evolution_enabled() {
return Err(crate::Error::Unsupported {
message:
Expand Down Expand Up @@ -470,6 +486,12 @@ impl DataEvolutionPartialWriter {
let schema = table.schema();
let core_options = CoreOptions::new(schema.options());

if schema_contains_blob_type(table) {
return Err(crate::Error::Unsupported {
message: "DataEvolutionPartialWriter does not support BlobType yet".to_string(),
});
}

if !core_options.data_evolution_enabled() {
return Err(crate::Error::Unsupported {
message: "DataEvolutionPartialWriter requires data-evolution.enabled = true"
Expand Down Expand Up @@ -586,7 +608,9 @@ mod tests {
use super::*;
use crate::catalog::Identifier;
use crate::io::FileIOBuilder;
use crate::spec::{DataType, IntType, Schema, TableSchema, VarCharType};
use crate::spec::{
BlobType, DataField, DataType, IntType, RowType, Schema, TableSchema, VarCharType,
};
use arrow_array::StringArray;
use arrow_schema::{DataType as ArrowDataType, Field as ArrowField, Schema as ArrowSchema};
use std::sync::Arc;
Expand Down Expand Up @@ -640,6 +664,24 @@ mod tests {
TableSchema::new(0, &schema)
}

fn test_blob_data_evolution_schema() -> TableSchema {
let schema = Schema::builder()
.column("id", DataType::Int(IntType::new()))
.column(
"payload",
DataType::Row(RowType::new(vec![DataField::new(
1,
"blob".into(),
DataType::Blob(BlobType::new()),
)])),
)
.option("data-evolution.enabled", "true")
.option("row-tracking.enabled", "true")
.build()
.unwrap();
TableSchema::new(0, &schema)
}

fn test_table(file_io: &FileIO, table_path: &str) -> Table {
Table::new(
file_io.clone(),
Expand All @@ -650,6 +692,16 @@ mod tests {
)
}

fn test_blob_table(file_io: &FileIO, table_path: &str) -> Table {
Table::new(
file_io.clone(),
Identifier::new("default", "test_de_blob_table"),
table_path.to_string(),
test_blob_data_evolution_schema(),
None,
)
}

async fn setup_dirs(file_io: &FileIO, table_path: &str) {
file_io
.mkdirs(&format!("{table_path}/snapshot/"))
Expand Down Expand Up @@ -859,4 +911,30 @@ mod tests {
let result = DataEvolutionPartialWriter::new(&table, vec!["id".to_string()]);
assert!(result.is_err());
}

#[test]
fn test_rejects_blob_data_evolution_writer() {
let file_io = test_file_io();
let table = test_blob_table(&file_io, "memory:/test_blob_de_writer");

let err = DataEvolutionWriter::new(&table, vec!["id".to_string()])
.err()
.unwrap();
assert!(
matches!(err, crate::Error::Unsupported { message } if message.contains("BlobType"))
);
}

#[test]
fn test_rejects_blob_partial_writer() {
let file_io = test_file_io();
let table = test_blob_table(&file_io, "memory:/test_blob_partial_writer");

let err = DataEvolutionPartialWriter::new(&table, vec!["id".to_string()])
.err()
.unwrap();
assert!(
matches!(err, crate::Error::Unsupported { message } if message.contains("BlobType"))
);
}
}
Loading
Loading