Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions bindings/cpp/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,9 @@ pub fn resolve_row_types(
Datum::Time(t) => Datum::Time(*t),
Datum::TimestampNtz(ts) => Datum::TimestampNtz(*ts),
Datum::TimestampLtz(ts) => Datum::TimestampLtz(*ts),
// TODO: C++ bindings need proper CXX wrapper types for FlussArray
// before C++ users can construct or inspect array values through FFI.
Datum::Array(a) => Datum::Array(a.clone()),
Comment thread
charlesdong1991 marked this conversation as resolved.
};
out.set_field(idx, resolved);
}
Expand Down Expand Up @@ -408,6 +411,9 @@ pub fn compacted_row_to_owned(
fcore::metadata::DataType::Binary(dt) => {
Datum::Blob(Cow::Owned(row.get_binary(i, dt.length())?.to_vec()))
}
// TODO: C++ bindings need proper CXX wrapper types for FlussArray
// before C++ users can construct or inspect array values through FFI.
fcore::metadata::DataType::Array(_) => Datum::Array(row.get_array(i)?),
other => return Err(anyhow!("Unsupported data type for column {i}: {other:?}")),
};

Expand Down
80 changes: 76 additions & 4 deletions crates/fluss/src/record/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ use crate::row::{ColumnarRow, InternalRow};
use arrow::array::{
ArrayBuilder, ArrayRef, BinaryBuilder, BooleanBuilder, Date32Builder, Decimal128Builder,
FixedSizeBinaryBuilder, Float32Builder, Float64Builder, Int8Builder, Int16Builder,
Int32Builder, Int64Builder, StringBuilder, Time32MillisecondBuilder, Time32SecondBuilder,
Time64MicrosecondBuilder, Time64NanosecondBuilder, TimestampMicrosecondBuilder,
TimestampMillisecondBuilder, TimestampNanosecondBuilder, TimestampSecondBuilder, UInt8Builder,
UInt16Builder, UInt32Builder, UInt64Builder,
Int32Builder, Int64Builder, ListBuilder, StringBuilder, Time32MillisecondBuilder,
Time32SecondBuilder, Time64MicrosecondBuilder, Time64NanosecondBuilder,
TimestampMicrosecondBuilder, TimestampMillisecondBuilder, TimestampNanosecondBuilder,
TimestampSecondBuilder, UInt8Builder, UInt16Builder, UInt32Builder, UInt64Builder,
};
use arrow::{
array::RecordBatch,
Expand Down Expand Up @@ -330,6 +330,13 @@ impl RowAppendRecordBatchBuilder {
arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Second, _) => {
Ok(Box::new(TimestampSecondBuilder::with_capacity(capacity)))
}
arrow_schema::DataType::List(field) => {
let inner_builder = Self::create_builder(field.data_type(), capacity)?;
Ok(Box::new(ListBuilder::with_capacity(
inner_builder,
capacity,
)))
}
arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Millisecond, _) => Ok(
Box::new(TimestampMillisecondBuilder::with_capacity(capacity)),
),
Expand Down Expand Up @@ -1184,6 +1191,71 @@ pub fn to_arrow_type(fluss_type: &DataType) -> Result<ArrowDataType> {
})
}

/// Converts an Arrow data type back to a Fluss `DataType`.
/// Used for reading array elements from Arrow ListArray back into Fluss types.
pub(crate) fn from_arrow_type(arrow_type: &ArrowDataType) -> Result<DataType> {
use crate::metadata::DataTypes;

Ok(match arrow_type {
ArrowDataType::Boolean => DataTypes::boolean(),
ArrowDataType::Int8 => DataTypes::tinyint(),
ArrowDataType::Int16 => DataTypes::smallint(),
ArrowDataType::Int32 => DataTypes::int(),
ArrowDataType::Int64 => DataTypes::bigint(),
ArrowDataType::Float32 => DataTypes::float(),
ArrowDataType::Float64 => DataTypes::double(),
ArrowDataType::Utf8 => DataTypes::string(),
ArrowDataType::Binary => DataTypes::bytes(),
ArrowDataType::Date32 => DataTypes::date(),
ArrowDataType::FixedSizeBinary(len) => {
if *len < 0 {
return Err(Error::IllegalArgument {
message: format!("FixedSizeBinary length must be >= 0, got {len}"),
});
}
DataTypes::binary(*len as usize)
}
ArrowDataType::Decimal128(p, s) => {
if *s < 0 {
return Err(Error::IllegalArgument {
message: format!("Decimal scale must be >= 0, got {s}"),
});
}
DataTypes::decimal(*p as u32, *s as u32)
}
ArrowDataType::Time32(arrow_schema::TimeUnit::Second) => DataTypes::time_with_precision(0),
ArrowDataType::Time32(arrow_schema::TimeUnit::Millisecond) => {
DataTypes::time_with_precision(3)
}
ArrowDataType::Time64(arrow_schema::TimeUnit::Microsecond) => {
DataTypes::time_with_precision(6)
}
ArrowDataType::Time64(arrow_schema::TimeUnit::Nanosecond) => {
DataTypes::time_with_precision(9)
}
ArrowDataType::Timestamp(unit, tz) => {
let precision = match unit {
arrow_schema::TimeUnit::Second => 0,
arrow_schema::TimeUnit::Millisecond => 3,
arrow_schema::TimeUnit::Microsecond => 6,
arrow_schema::TimeUnit::Nanosecond => 9,
};

if tz.is_some() {
DataTypes::timestamp_ltz_with_precision(precision)
} else {
DataTypes::timestamp_with_precision(precision)
}
}
ArrowDataType::List(field) => DataTypes::array(from_arrow_type(field.data_type())?),
other => {
return Err(Error::IllegalArgument {
message: format!("Cannot convert Arrow type to Fluss type: {other:?}"),
});
}
})
}

#[derive(Clone)]
pub struct ReadContext {
target_schema: SchemaRef,
Expand Down
10 changes: 7 additions & 3 deletions crates/fluss/src/row/binary/binary_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,7 @@ pub trait BinaryWriter {

fn write_timestamp_ltz(&mut self, value: &crate::row::datum::TimestampLtz, precision: u32);

// TODO InternalArray, ArraySerializer
// fn write_array(&mut self, pos: i32, value: i64);
fn write_array(&mut self, value: &[u8]);

// TODO Row serializer
// fn write_row(&mut self, pos: i32, value: &InternalRow);
Expand Down Expand Up @@ -136,7 +135,8 @@ pub enum InnerValueWriter {
Time(u32), // precision (not used in wire format, but kept for consistency)
TimestampNtz(u32), // precision
TimestampLtz(u32), // precision
// TODO Array, Row
Array,
// TODO Row
}

/// Accessor for writing the fields/elements of a binary writer during runtime, the
Expand Down Expand Up @@ -175,6 +175,7 @@ impl InnerValueWriter {
// Validation is done at TimestampLTzType construction time
Ok(InnerValueWriter::TimestampLtz(t.precision()))
}
DataType::Array(_) => Ok(InnerValueWriter::Array),
_ => unimplemented!(
"ValueWriter for DataType {:?} is currently not implemented",
data_type
Expand Down Expand Up @@ -237,6 +238,9 @@ impl InnerValueWriter {
(InnerValueWriter::TimestampLtz(p), Datum::TimestampLtz(ts)) => {
writer.write_timestamp_ltz(ts, *p);
}
(InnerValueWriter::Array, Datum::Array(arr)) => {
writer.write_array(arr.as_bytes());
}
_ => {
return Err(IllegalArgument {
message: format!("{self:?} used to write value {value:?}"),
Expand Down
Loading
Loading