diff --git a/bindings/cpp/src/types.rs b/bindings/cpp/src/types.rs index f8efe677..87c4de30 100644 --- a/bindings/cpp/src/types.rs +++ b/bindings/cpp/src/types.rs @@ -351,6 +351,7 @@ pub fn resolve_row_types( Datum::Time(t) => Datum::Time(*t), Datum::TimestampNtz(ts) => Datum::TimestampNtz(*ts), Datum::TimestampLtz(ts) => Datum::TimestampLtz(*ts), + Datum::Row(r) => Datum::Row(Box::new(resolve_row_types(r, None)?)), }; out.set_field(idx, resolved); } diff --git a/crates/fluss/src/row/binary/binary_writer.rs b/crates/fluss/src/row/binary/binary_writer.rs index af2765c4..2ab3ce8c 100644 --- a/crates/fluss/src/row/binary/binary_writer.rs +++ b/crates/fluss/src/row/binary/binary_writer.rs @@ -17,7 +17,7 @@ use crate::error::Error::IllegalArgument; use crate::error::Result; -use crate::metadata::DataType; +use crate::metadata::{DataType, RowType}; use crate::row::Datum; use crate::row::binary::BinaryRowFormat; @@ -136,7 +136,7 @@ pub enum InnerValueWriter { Time(u32), // precision (not used in wire format, but kept for consistency) TimestampNtz(u32), // precision TimestampLtz(u32), // precision - // TODO Array, Row + Row(RowType), } /// Accessor for writing the fields/elements of a binary writer during runtime, the @@ -175,6 +175,7 @@ impl InnerValueWriter { // Validation is done at TimestampLTzType construction time Ok(InnerValueWriter::TimestampLtz(t.precision())) } + DataType::Row(row_type) => Ok(InnerValueWriter::Row(row_type.clone())), _ => unimplemented!( "ValueWriter for DataType {:?} is currently not implemented", data_type @@ -237,6 +238,26 @@ impl InnerValueWriter { (InnerValueWriter::TimestampLtz(p), Datum::TimestampLtz(ts)) => { writer.write_timestamp_ltz(ts, *p); } + (InnerValueWriter::Row(row_type), Datum::Row(inner_row)) => { + use crate::row::compacted::CompactedRowWriter; + let field_count = row_type.fields().len(); + let mut nested = CompactedRowWriter::new(field_count); + for (i, field) in row_type.fields().iter().enumerate() { + let datum = &inner_row.values[i]; + if datum.is_null() { + if field.data_type.is_nullable() { + nested.set_null_at(i); + } + } else { + let vw = + InnerValueWriter::create_inner_value_writer(&field.data_type, None) + .expect("create_inner_value_writer failed for nested row field"); + vw.write_value(&mut nested, i, datum) + .expect("write_value failed for nested row field"); + } + } + writer.write_bytes(nested.buffer()); + } _ => { return Err(IllegalArgument { message: format!("{self:?} used to write value {value:?}"), diff --git a/crates/fluss/src/row/column.rs b/crates/fluss/src/row/column.rs index c07fe97c..df0b9e6c 100644 --- a/crates/fluss/src/row/column.rs +++ b/crates/fluss/src/row/column.rs @@ -17,8 +17,8 @@ use crate::error::Error::IllegalArgument; use crate::error::Result; -use crate::row::InternalRow; -use crate::row::datum::{Date, Time, TimestampLtz, TimestampNtz}; +use crate::row::{GenericRow, InternalRow}; +use crate::row::datum::{Date, Datum, Time, TimestampLtz, TimestampNtz}; use arrow::array::{Array, AsArray, BinaryArray, RecordBatch, StringArray}; use arrow::datatypes::{ DataType as ArrowDataType, Date32Type, Decimal128Type, Float32Type, Float64Type, Int8Type, @@ -32,25 +32,33 @@ use std::sync::Arc; pub struct ColumnarRow { record_batch: Arc, row_id: usize, + nested_rows: Vec>>, } impl ColumnarRow { pub fn new(batch: Arc) -> Self { + let num_cols = batch.num_columns(); ColumnarRow { record_batch: batch, row_id: 0, + nested_rows: (0..num_cols).map(|_| std::sync::OnceLock::new()).collect(), } } pub fn new_with_row_id(bach: Arc, row_id: usize) -> Self { + let num_cols = bach.num_columns(); ColumnarRow { record_batch: bach, row_id, + nested_rows: (0..num_cols).map(|_| std::sync::OnceLock::new()).collect(), } } pub fn set_row_id(&mut self, row_id: usize) { - self.row_id = row_id + self.row_id = row_id; + for lock in &mut self.nested_rows { + *lock = std::sync::OnceLock::new(); + } } pub fn get_row_id(&self) -> usize { @@ -209,6 +217,168 @@ impl ColumnarRow { }), } } + + /// Extract a `GenericRow<'static>` from a column in the RecordBatch at the given row_id. + fn extract_struct_at( + batch: &RecordBatch, + pos: usize, + row_id: usize, + ) -> Result> { + let col = batch.column(pos); + Self::extract_struct_from_array(col.as_ref(), row_id) + } + + /// Recursively extract a `GenericRow<'static>` from a `StructArray` at row_id. + fn extract_struct_from_array(array: &dyn Array, row_id: usize) -> Result> { + use arrow::array::StructArray; + let sa = array + .as_any() + .downcast_ref::() + .ok_or_else(|| IllegalArgument { + message: format!("expected StructArray, got {:?}", array.data_type()), + })?; + let mut values = Vec::with_capacity(sa.num_columns()); + for i in 0..sa.num_columns() { + let child = sa.column(i); + values.push(Self::arrow_value_to_datum(child.as_ref(), row_id)?); + } + Ok(GenericRow { values }) + } + + /// Convert a single element at `row_id` in an Arrow array to a `Datum<'static>`. + fn arrow_value_to_datum(array: &dyn Array, row_id: usize) -> Result> { + use arrow::array::{ + BooleanArray, Decimal128Array, Float32Array, Float64Array, Int8Array, Int16Array, + Int32Array, Int64Array, Time32MillisecondArray, Time32SecondArray, + Time64MicrosecondArray, Time64NanosecondArray, TimestampMicrosecondArray, + TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray, + }; + use crate::row::Decimal; + + if array.is_null(row_id) { + return Ok(Datum::Null); + } + + match array.data_type() { + ArrowDataType::Boolean => { + let a = array.as_any().downcast_ref::().unwrap(); + Ok(Datum::Bool(a.value(row_id))) + } + ArrowDataType::Int8 => { + let a = array.as_any().downcast_ref::().unwrap(); + Ok(Datum::Int8(a.value(row_id))) + } + ArrowDataType::Int16 => { + let a = array.as_any().downcast_ref::().unwrap(); + Ok(Datum::Int16(a.value(row_id))) + } + ArrowDataType::Int32 => { + let a = array.as_any().downcast_ref::().unwrap(); + Ok(Datum::Int32(a.value(row_id))) + } + ArrowDataType::Int64 => { + let a = array.as_any().downcast_ref::().unwrap(); + Ok(Datum::Int64(a.value(row_id))) + } + ArrowDataType::Float32 => { + let a = array.as_any().downcast_ref::().unwrap(); + Ok(Datum::Float32(a.value(row_id).into())) + } + ArrowDataType::Float64 => { + let a = array.as_any().downcast_ref::().unwrap(); + Ok(Datum::Float64(a.value(row_id).into())) + } + ArrowDataType::Utf8 => { + let a = array.as_any().downcast_ref::().unwrap(); + Ok(Datum::String(std::borrow::Cow::Owned(a.value(row_id).to_owned()))) + } + ArrowDataType::Binary => { + let a = array.as_any().downcast_ref::().unwrap(); + Ok(Datum::Blob(std::borrow::Cow::Owned(a.value(row_id).to_vec()))) + } + ArrowDataType::Decimal128(p, s) => { + let (p, s) = (*p, *s); + let a = array.as_any().downcast_ref::().unwrap(); + let i128_val = a.value(row_id); + Ok(Datum::Decimal(Decimal::from_arrow_decimal128( + i128_val, + s as i64, + p as u32, + s as u32, + )?)) + } + ArrowDataType::Date32 => { + let a = array.as_any().downcast_ref::().unwrap(); + Ok(Datum::Date(Date::new(a.value(row_id)))) + } + ArrowDataType::Time32(TimeUnit::Second) => { + let a = array.as_any().downcast_ref::().unwrap(); + Ok(Datum::Time(Time::new(a.value(row_id) * 1000))) + } + ArrowDataType::Time32(TimeUnit::Millisecond) => { + let a = array.as_any().downcast_ref::().unwrap(); + Ok(Datum::Time(Time::new(a.value(row_id)))) + } + ArrowDataType::Time64(TimeUnit::Microsecond) => { + let a = array.as_any().downcast_ref::().unwrap(); + Ok(Datum::Time(Time::new((a.value(row_id) / 1000) as i32))) + } + ArrowDataType::Time64(TimeUnit::Nanosecond) => { + let a = array.as_any().downcast_ref::().unwrap(); + Ok(Datum::Time(Time::new((a.value(row_id) / 1_000_000) as i32))) + } + ArrowDataType::Timestamp(time_unit, tz) => { + let value: i64 = match time_unit { + TimeUnit::Second => { + array.as_any().downcast_ref::().unwrap().value(row_id) + } + TimeUnit::Millisecond => { + array.as_any().downcast_ref::().unwrap().value(row_id) + } + TimeUnit::Microsecond => { + array.as_any().downcast_ref::().unwrap().value(row_id) + } + TimeUnit::Nanosecond => { + array.as_any().downcast_ref::().unwrap().value(row_id) + } + }; + let (millis, nanos) = match time_unit { + TimeUnit::Second => (value * 1000, 0i32), + TimeUnit::Millisecond => (value, 0i32), + TimeUnit::Microsecond => { + let millis = value.div_euclid(1000); + let nanos = (value.rem_euclid(1000) * 1000) as i32; + (millis, nanos) + } + TimeUnit::Nanosecond => { + let millis = value.div_euclid(1_000_000); + let nanos = value.rem_euclid(1_000_000) as i32; + (millis, nanos) + } + }; + if tz.is_some() { + if nanos == 0 { + Ok(Datum::TimestampLtz(TimestampLtz::new(millis))) + } else { + Ok(Datum::TimestampLtz(TimestampLtz::from_millis_nanos(millis, nanos)?)) + } + } else if nanos == 0 { + Ok(Datum::TimestampNtz(TimestampNtz::new(millis))) + } else { + Ok(Datum::TimestampNtz(TimestampNtz::from_millis_nanos(millis, nanos)?)) + } + } + ArrowDataType::Struct(_) => { + let nested = Self::extract_struct_from_array(array, row_id)?; + Ok(Datum::Row(Box::new(nested))) + } + other => Err(IllegalArgument { + message: format!( + "unsupported Arrow data type for nested row extraction: {other:?}" + ), + }), + } + } } impl InternalRow for ColumnarRow { @@ -407,6 +577,18 @@ impl InternalRow for ColumnarRow { })? .value(self.row_id)) } + + fn get_row(&self, pos: usize) -> Result<&GenericRow<'_>> { + let lock = self.nested_rows.get(pos).ok_or_else(|| IllegalArgument { + message: format!("column index {pos} out of bounds for get_row"), + })?; + let batch = Arc::clone(&self.record_batch); + let row_id = self.row_id; + Ok(lock.get_or_init(|| { + Self::extract_struct_at(&batch, pos, row_id) + .expect("failed to extract nested row from StructArray") + })) + } } #[cfg(test)] @@ -414,9 +596,9 @@ mod tests { use super::*; use arrow::array::{ BinaryArray, BooleanArray, Decimal128Array, Float32Array, Float64Array, Int8Array, - Int16Array, Int32Array, Int64Array, StringArray, + Int16Array, Int32Array, Int64Array, StringArray, StructArray, }; - use arrow::datatypes::{DataType, Field, Schema}; + use arrow::datatypes::{DataType, Field, Fields, Schema}; #[test] fn columnar_row_reads_values() { @@ -533,4 +715,112 @@ mod tests { .unwrap() ); } + + fn make_struct_batch( + field_name: &str, + child_fields: Fields, + child_arrays: Vec>, + _num_rows: usize, + ) -> Arc { + let struct_array = StructArray::new(child_fields.clone(), child_arrays, None); + let schema = Arc::new(Schema::new(vec![Field::new( + field_name, + DataType::Struct(child_fields), + false, + )])); + Arc::new( + RecordBatch::try_new(schema, vec![Arc::new(struct_array)]) + .expect("record batch"), + ) + } + + #[test] + fn columnar_row_reads_nested_row() { + // Build a RecordBatch with a Struct column: {i32, string} + let child_fields = Fields::from(vec![ + Field::new("x", DataType::Int32, false), + Field::new("s", DataType::Utf8, false), + ]); + let child_arrays: Vec> = vec![ + Arc::new(Int32Array::from(vec![42, 99])), + Arc::new(StringArray::from(vec!["hello", "world"])), + ]; + let batch = make_struct_batch("nested", child_fields, child_arrays, 2); + + let mut row = ColumnarRow::new(batch); + + // row_id = 0 + let nested = row.get_row(0).unwrap(); + assert_eq!(nested.get_field_count(), 2); + assert_eq!(nested.get_int(0).unwrap(), 42); + assert_eq!(nested.get_string(1).unwrap(), "hello"); + + // row_id = 1 + row.set_row_id(1); + let nested = row.get_row(0).unwrap(); + assert_eq!(nested.get_int(0).unwrap(), 99); + assert_eq!(nested.get_string(1).unwrap(), "world"); + } + + #[test] + fn columnar_row_reads_deeply_nested_row() { + // Build: outer struct { i32, inner struct { string } } + let inner_fields = Fields::from(vec![Field::new("s", DataType::Utf8, false)]); + let inner_array = Arc::new(StructArray::new( + inner_fields.clone(), + vec![Arc::new(StringArray::from(vec!["deep", "deeper"])) as Arc], + None, + )); + + let outer_fields = Fields::from(vec![ + Field::new("n", DataType::Int32, false), + Field::new("inner", DataType::Struct(inner_fields), false), + ]); + let outer_array = Arc::new(StructArray::new( + outer_fields.clone(), + vec![ + Arc::new(Int32Array::from(vec![1, 2])) as Arc, + inner_array as Arc, + ], + None, + )); + + let schema = Arc::new(Schema::new(vec![Field::new( + "outer", + DataType::Struct(outer_fields), + false, + )])); + let batch = Arc::new( + RecordBatch::try_new(schema, vec![outer_array]).expect("record batch"), + ); + + let row = ColumnarRow::new(batch); + + // Access outer struct at column 0, row 0 + let outer = row.get_row(0).unwrap(); + assert_eq!(outer.get_int(0).unwrap(), 1); + + // Access inner struct (column 1 of outer) + let inner = outer.get_row(1).unwrap(); + assert_eq!(inner.get_string(0).unwrap(), "deep"); + } + + #[test] + fn columnar_row_get_row_cache_invalidated_on_set_row_id() { + let child_fields = Fields::from(vec![Field::new("x", DataType::Int32, false)]); + let child_arrays: Vec> = + vec![Arc::new(Int32Array::from(vec![10, 20]))]; + let batch = make_struct_batch("s", child_fields, child_arrays, 2); + + let mut row = ColumnarRow::new(batch); + + // row_id = 0: nested x = 10 + let nested_0 = row.get_row(0).unwrap(); + assert_eq!(nested_0.get_int(0).unwrap(), 10); + + // After set_row_id(1), cache is cleared → nested x = 20 + row.set_row_id(1); + let nested_1 = row.get_row(0).unwrap(); + assert_eq!(nested_1.get_int(0).unwrap(), 20); + } } diff --git a/crates/fluss/src/row/compacted/compacted_row.rs b/crates/fluss/src/row/compacted/compacted_row.rs index 918ebdfd..5481ed4e 100644 --- a/crates/fluss/src/row/compacted/compacted_row.rs +++ b/crates/fluss/src/row/compacted/compacted_row.rs @@ -160,6 +160,10 @@ impl<'a> InternalRow for CompactedRow<'a> { self.decoded_row().get_bytes(pos) } + fn get_row(&self, pos: usize) -> Result<&GenericRow<'_>> { + self.decoded_row().get_row(pos) + } + fn as_encoded_bytes(&self, write_format: WriteFormat) -> Option<&[u8]> { match write_format { WriteFormat::CompactedKv => Some(self.as_bytes()), diff --git a/crates/fluss/src/row/compacted/compacted_row_reader.rs b/crates/fluss/src/row/compacted/compacted_row_reader.rs index 00e53aa1..a8b1638d 100644 --- a/crates/fluss/src/row/compacted/compacted_row_reader.rs +++ b/crates/fluss/src/row/compacted/compacted_row_reader.rs @@ -160,6 +160,18 @@ impl<'a> CompactedRowDeserializer<'a> { (Datum::TimestampLtz(timestamp_ltz), next) } } + DataType::Row(row_type) => { + let (nested_bytes, next) = reader.read_bytes(cursor); + let nested_reader = CompactedRowReader::new( + row_type.fields().len(), + nested_bytes, + 0, + nested_bytes.len(), + ); + let nested_deser = CompactedRowDeserializer::new_from_owned(row_type.clone()); + let nested_row = nested_deser.deserialize(&nested_reader); + (Datum::Row(Box::new(nested_row)), next) + } _ => { panic!("Unsupported DataType in CompactedRowDeserializer: {dtype:?}"); } @@ -286,3 +298,142 @@ impl<'a> CompactedRowReader<'a> { (s, next_pos) } } + +#[cfg(test)] +mod row_type_tests { + use crate::metadata::{DataType, DataTypes, RowType}; + use crate::row::compacted::compacted_row_reader::{CompactedRowDeserializer, CompactedRowReader}; + use crate::row::compacted::compacted_row_writer::CompactedRowWriter; + use crate::row::binary::ValueWriter; + use crate::row::field_getter::FieldGetter; + use crate::row::{Datum, GenericRow, InternalRow}; + + fn round_trip(outer_row_type: &RowType, outer_row: &GenericRow, verify: F) + where + F: FnOnce(&GenericRow), + { + // Write + let field_getters = FieldGetter::create_field_getters(outer_row_type); + let value_writers: Vec = outer_row_type + .fields() + .iter() + .map(|f| ValueWriter::create_value_writer(f.data_type(), None).unwrap()) + .collect(); + let mut writer = CompactedRowWriter::new(outer_row_type.fields().len()); + for (i, (getter, vw)) in field_getters.iter().zip(value_writers.iter()).enumerate() { + let datum = getter.get_field(outer_row as &dyn InternalRow).unwrap(); + vw.write_value(&mut writer, i, &datum).unwrap(); + } + let bytes = writer.to_bytes(); + + // Read + let deser = CompactedRowDeserializer::new(outer_row_type); + let reader = CompactedRowReader::new( + outer_row_type.fields().len(), + bytes.as_ref(), + 0, + bytes.len(), + ); + let result = deser.deserialize(&reader); + verify(&result); + } + + #[test] + fn test_row_simple_nesting() { + // ROW nested inside an outer row + let inner_row_type = RowType::with_data_types_and_field_names( + vec![DataTypes::int(), DataTypes::string()], + vec!["x", "label"], + ); + let outer_row_type = RowType::with_data_types_and_field_names( + vec![DataTypes::int(), DataType::Row(inner_row_type.clone())], + vec!["id", "nested"], + ); + + let mut inner = GenericRow::new(2); + inner.set_field(0, 42_i32); + inner.set_field(1, "hello"); + + let mut outer = GenericRow::new(2); + outer.set_field(0, 1_i32); + outer.set_field(1, Datum::Row(Box::new(inner))); + + round_trip(&outer_row_type, &outer, |result| { + assert_eq!(result.get_int(0).unwrap(), 1); + let nested = result.get_row(1).unwrap(); + assert_eq!(nested.get_int(0).unwrap(), 42); + assert_eq!(nested.get_string(1).unwrap(), "hello"); + }); + } + + #[test] + fn test_row_deep_nesting() { + // ROW> — two levels of nesting + let inner_inner_row_type = RowType::with_data_types_and_field_names( + vec![DataTypes::int()], + vec!["n"], + ); + let inner_row_type = RowType::with_data_types_and_field_names( + vec![DataType::Row(inner_inner_row_type.clone())], + vec!["inner"], + ); + let outer_row_type = RowType::with_data_types_and_field_names( + vec![DataType::Row(inner_row_type.clone())], + vec!["outer"], + ); + + let mut innermost = GenericRow::new(1); + innermost.set_field(0, 99_i32); + + let mut middle = GenericRow::new(1); + middle.set_field(0, Datum::Row(Box::new(innermost))); + + let mut outer = GenericRow::new(1); + outer.set_field(0, Datum::Row(Box::new(middle))); + + round_trip(&outer_row_type, &outer, |result| { + let mid = result.get_row(0).unwrap(); + let inner = mid.get_row(0).unwrap(); + assert_eq!(inner.get_int(0).unwrap(), 99); + }); + } + + #[test] + fn test_row_with_nullable_fields() { + // Outer nullable ROW column; nested row with a nullable STRING field set to null + let inner_row_type = RowType::with_data_types_and_field_names( + vec![DataTypes::int(), DataTypes::string()], + vec!["id", "optional_name"], + ); + let outer_row_type = RowType::with_data_types_and_field_names( + vec![DataTypes::int(), DataType::Row(inner_row_type.clone())], + vec!["k", "nested"], + ); + + // Case 1: non-null nested row with a null field inside + let mut inner = GenericRow::new(2); + inner.set_field(0, 7_i32); + inner.set_field(1, Datum::Null); + + let mut outer = GenericRow::new(2); + outer.set_field(0, 10_i32); + outer.set_field(1, Datum::Row(Box::new(inner))); + + round_trip(&outer_row_type, &outer, |result| { + assert_eq!(result.get_int(0).unwrap(), 10); + let nested = result.get_row(1).unwrap(); + assert_eq!(nested.get_int(0).unwrap(), 7); + assert!(nested.is_null_at(1).unwrap()); + }); + + // Case 2: outer ROW column is null + let mut outer_null = GenericRow::new(2); + outer_null.set_field(0, 20_i32); + outer_null.set_field(1, Datum::Null); + + round_trip(&outer_row_type, &outer_null, |result2| { + assert_eq!(result2.get_int(0).unwrap(), 20); + assert!(result2.is_null_at(1).unwrap()); + }); + } +} diff --git a/crates/fluss/src/row/datum.rs b/crates/fluss/src/row/datum.rs index 9b2e80a6..7fe3d110 100644 --- a/crates/fluss/src/row/datum.rs +++ b/crates/fluss/src/row/datum.rs @@ -18,6 +18,7 @@ use crate::error::Error::RowConvertError; use crate::error::Result; use crate::row::Decimal; +use crate::row::GenericRow; use arrow::array::{ ArrayBuilder, BinaryBuilder, BooleanBuilder, Date32Builder, Decimal128Builder, FixedSizeBinaryBuilder, Float32Builder, Float64Builder, Int8Builder, Int16Builder, @@ -68,6 +69,8 @@ pub enum Datum<'a> { TimestampNtz(TimestampNtz), #[display("{0}")] TimestampLtz(TimestampLtz), + #[display("{0:?}")] + Row(Box>), } impl Datum<'_> { @@ -123,6 +126,13 @@ impl Datum<'_> { _ => panic!("not a timestamp ltz: {self:?}"), } } + + pub fn as_row(&self) -> &GenericRow<'_> { + match self { + Self::Row(r) => r.as_ref(), + _ => panic!("not a row: {self:?}"), + } + } } // ----------- implement from @@ -742,6 +752,11 @@ impl Datum<'_> { message: "Builder type mismatch for TimestampLtz".to_string(), }); } + Datum::Row(_) => { + return Err(RowConvertError { + message: "append_to is not supported for Row type".to_string(), + }); + } } Err(RowConvertError { diff --git a/crates/fluss/src/row/encode/compacted_key_encoder.rs b/crates/fluss/src/row/encode/compacted_key_encoder.rs index d201450b..238def39 100644 --- a/crates/fluss/src/row/encode/compacted_key_encoder.rs +++ b/crates/fluss/src/row/encode/compacted_key_encoder.rs @@ -104,7 +104,7 @@ impl KeyEncoder for CompactedKeyEncoder { #[cfg(test)] mod tests { use super::*; - use crate::metadata::DataTypes; + use crate::metadata::{DataType, DataTypes}; use crate::row::{Datum, GenericRow}; pub fn for_test_row_type(row_type: &RowType) -> CompactedKeyEncoder { @@ -355,4 +355,50 @@ mod tests { encoded.iter().as_slice() ); } + + #[test] + fn test_row_as_primary_key() { + // ROW as a primary key column + let inner_row_type = RowType::with_data_types_and_field_names( + vec![DataTypes::int(), DataTypes::string()], + vec!["x", "label"], + ); + let row_type = RowType::with_data_types_and_field_names( + vec![ + DataTypes::int(), + DataType::Row(inner_row_type.clone()), + ], + vec!["id", "nested"], + ); + + let mut inner = GenericRow::new(2); + inner.set_field(0, 42_i32); + inner.set_field(1, "hello"); + + let mut row = GenericRow::new(2); + row.set_field(0, 1_i32); + row.set_field(1, Datum::Row(Box::new(inner))); + + let mut encoder = for_test_row_type(&row_type); + let encoded = encoder.encode_key(&row).unwrap(); + + // Verify it encodes without error and produces non-empty bytes + assert!(!encoded.is_empty()); + + // Encode the same row again to verify determinism + let encoded2 = encoder.encode_key(&row).unwrap(); + assert_eq!(encoded, encoded2); + + // Encode a different nested row and verify different output + let mut inner2 = GenericRow::new(2); + inner2.set_field(0, 99_i32); + inner2.set_field(1, "world"); + + let mut row2 = GenericRow::new(2); + row2.set_field(0, 1_i32); + row2.set_field(1, Datum::Row(Box::new(inner2))); + + let encoded3 = encoder.encode_key(&row2).unwrap(); + assert_ne!(encoded, encoded3); + } } diff --git a/crates/fluss/src/row/field_getter.rs b/crates/fluss/src/row/field_getter.rs index d6b9fc94..63404478 100644 --- a/crates/fluss/src/row/field_getter.rs +++ b/crates/fluss/src/row/field_getter.rs @@ -82,6 +82,7 @@ impl FieldGetter { pos, precision: t.precision(), }, + DataType::Row(_) => InnerFieldGetter::Row { pos }, _ => unimplemented!("DataType {:?} is currently unimplemented", data_type), }; @@ -149,6 +150,9 @@ pub enum InnerFieldGetter { pos: usize, precision: u32, }, + Row { + pos: usize, + }, } impl InnerFieldGetter { @@ -177,7 +181,8 @@ impl InnerFieldGetter { } InnerFieldGetter::TimestampLtz { pos, precision } => { Datum::TimestampLtz(row.get_timestamp_ltz(*pos, *precision)?) - } //TODO Array, Map, Row + } + InnerFieldGetter::Row { pos } => Datum::Row(Box::new(row.get_row(*pos)?.clone())), }) } @@ -198,7 +203,8 @@ impl InnerFieldGetter { | Self::Date { pos } | Self::Time { pos } | Self::Timestamp { pos, .. } - | Self::TimestampLtz { pos, .. } => *pos, + | Self::TimestampLtz { pos, .. } + | Self::Row { pos } => *pos, } } } diff --git a/crates/fluss/src/row/mod.rs b/crates/fluss/src/row/mod.rs index ef99ba29..39048df1 100644 --- a/crates/fluss/src/row/mod.rs +++ b/crates/fluss/src/row/mod.rs @@ -29,6 +29,7 @@ mod row_decoder; use crate::client::WriteFormat; use bytes::Bytes; +use serde::Serialize; pub use column::*; pub use compacted::CompactedRow; pub use datum::*; @@ -119,13 +120,20 @@ pub trait InternalRow: Send + Sync { /// Returns the binary value at the given position fn get_bytes(&self, pos: usize) -> Result<&[u8]>; + /// Returns the nested row value at the given position + fn get_row(&self, pos: usize) -> Result<&GenericRow<'_>> { + Err(crate::error::Error::IllegalArgument { + message: format!("get_row not supported at position {pos}"), + }) + } + /// Returns encoded bytes if already encoded fn as_encoded_bytes(&self, _write_format: WriteFormat) -> Option<&[u8]> { None } } -#[derive(Debug)] +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize)] pub struct GenericRow<'a> { pub values: Vec>, } @@ -274,6 +282,15 @@ impl<'a> InternalRow for GenericRow<'a> { }), } } + + fn get_row(&self, pos: usize) -> Result<&GenericRow<'_>> { + match self.get_value(pos)? { + Datum::Row(r) => Ok(r.as_ref()), + other => Err(IllegalArgument { + message: format!("type mismatch at position {pos}: expected Row, got {other:?}"), + }), + } + } } impl<'a> GenericRow<'a> {