Skip to content

Commit 395d3f4

Browse files
committed
Add support for custom Interval(MonthDayNano) encoding under avro_custom_types feature flag. Updates schema handling, encoders, and readers to leverage Arrow-native fixed(16) representation for custom logical type, preserving full range and signed values. Adds unit tests for round-trip serialization/deserialization.
1 parent 104368d commit 395d3f4

5 files changed

Lines changed: 1077 additions & 35 deletions

File tree

arrow-avro/src/codec.rs

Lines changed: 47 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -399,6 +399,10 @@ impl AvroDataType {
399399
AvroLiteral::Bytes(parse_bytes_default(default_json, Some(4))?)
400400
}
401401
#[cfg(feature = "avro_custom_types")]
402+
Codec::IntervalMonthDayNano => {
403+
AvroLiteral::Bytes(parse_bytes_default(default_json, Some(16))?)
404+
}
405+
#[cfg(feature = "avro_custom_types")]
402406
Codec::IntervalDayTime => {
403407
AvroLiteral::Bytes(parse_bytes_default(default_json, Some(8))?)
404408
}
@@ -781,6 +785,9 @@ pub(crate) enum Codec {
781785
/// Arrow Interval(YearMonth) custom logical type (arrow.interval-year-month)
782786
#[cfg(feature = "avro_custom_types")]
783787
IntervalYearMonth,
788+
/// Arrow Interval(MonthDayNano) custom logical type (arrow.interval-month-day-nano)
789+
#[cfg(feature = "avro_custom_types")]
790+
IntervalMonthDayNano,
784791
/// Arrow Interval(DayTime) custom logical type (arrow.interval-day-time)
785792
#[cfg(feature = "avro_custom_types")]
786793
IntervalDayTime,
@@ -907,6 +914,8 @@ impl Codec {
907914
#[cfg(feature = "avro_custom_types")]
908915
Self::IntervalYearMonth => DataType::Interval(IntervalUnit::YearMonth),
909916
#[cfg(feature = "avro_custom_types")]
917+
Self::IntervalMonthDayNano => DataType::Interval(IntervalUnit::MonthDayNano),
918+
#[cfg(feature = "avro_custom_types")]
910919
Self::IntervalDayTime => DataType::Interval(IntervalUnit::DayTime),
911920
}
912921
}
@@ -1098,9 +1107,11 @@ impl From<&Codec> for UnionFieldKind {
10981107
#[cfg(feature = "avro_custom_types")]
10991108
Codec::Time32Secs => Self::TimeMillis, // Closest standard type
11001109
#[cfg(feature = "avro_custom_types")]
1101-
Codec::UInt64 | Codec::Float16 | Codec::IntervalYearMonth | Codec::IntervalDayTime => {
1102-
Self::Fixed
1103-
}
1110+
Codec::UInt64
1111+
| Codec::Float16
1112+
| Codec::IntervalYearMonth
1113+
| Codec::IntervalMonthDayNano
1114+
| Codec::IntervalDayTime => Self::Fixed,
11041115
}
11051116
}
11061117
}
@@ -1492,6 +1503,13 @@ impl<'a> Maker<'a> {
14921503
resolution: None,
14931504
},
14941505
#[cfg(feature = "avro_custom_types")]
1506+
Some("arrow.interval-month-day-nano") if size == 16 => AvroDataType {
1507+
nullability: None,
1508+
metadata,
1509+
codec: Codec::IntervalMonthDayNano,
1510+
resolution: None,
1511+
},
1512+
#[cfg(feature = "avro_custom_types")]
14951513
Some("arrow.interval-day-time") if size == 8 => AvroDataType {
14961514
nullability: None,
14971515
metadata,
@@ -1646,6 +1664,10 @@ impl<'a> Maker<'a> {
16461664
*c = Codec::IntervalYearMonth
16471665
}
16481666
#[cfg(feature = "avro_custom_types")]
1667+
(Some("arrow.interval-month-day-nano"), c @ Codec::Fixed(16)) => {
1668+
*c = Codec::IntervalMonthDayNano
1669+
}
1670+
#[cfg(feature = "avro_custom_types")]
16491671
(Some("arrow.interval-day-time"), c @ Codec::Fixed(8)) => {
16501672
*c = Codec::IntervalDayTime
16511673
}
@@ -3228,6 +3250,28 @@ mod tests {
32283250
assert!(matches!(dt.codec(), Codec::Fixed(16)));
32293251
}
32303252

3253+
#[cfg(feature = "avro_custom_types")]
3254+
#[test]
3255+
fn test_interval_month_day_nano_custom_logical_type_fixed16() {
3256+
let schema = Schema::Complex(ComplexType::Fixed(Fixed {
3257+
name: "ArrowIntervalMDN",
3258+
namespace: None,
3259+
aliases: vec![],
3260+
size: 16,
3261+
attributes: Attributes {
3262+
logical_type: Some("arrow.interval-month-day-nano"),
3263+
additional: Default::default(),
3264+
},
3265+
}));
3266+
let mut maker = Maker::new(false, false);
3267+
let dt = maker.make_data_type(&schema, None, None).unwrap();
3268+
assert!(matches!(dt.codec(), Codec::IntervalMonthDayNano));
3269+
assert_eq!(
3270+
dt.codec.data_type(),
3271+
DataType::Interval(IntervalUnit::MonthDayNano)
3272+
);
3273+
}
3274+
32313275
#[test]
32323276
fn test_resolve_records_mapping_default_fields_and_skip_fields() {
32333277
let writer = Schema::Complex(ComplexType::Record(Record {

arrow-avro/src/reader/record.rs

Lines changed: 102 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,8 @@ enum Decoder {
224224
#[cfg(feature = "avro_custom_types")]
225225
IntervalYearMonth(Vec<i32>),
226226
#[cfg(feature = "avro_custom_types")]
227+
IntervalMonthDayNano(Vec<IntervalMonthDayNano>),
228+
#[cfg(feature = "avro_custom_types")]
227229
IntervalDayTime(Vec<IntervalDayTime>),
228230
Float32(Vec<f32>),
229231
Float64(Vec<f64>),
@@ -399,6 +401,10 @@ impl Decoder {
399401
Self::IntervalYearMonth(Vec::with_capacity(DEFAULT_CAPACITY))
400402
}
401403
#[cfg(feature = "avro_custom_types")]
404+
(Codec::IntervalMonthDayNano, _) => {
405+
Self::IntervalMonthDayNano(Vec::with_capacity(DEFAULT_CAPACITY))
406+
}
407+
#[cfg(feature = "avro_custom_types")]
402408
(Codec::IntervalDayTime, _) => {
403409
Self::IntervalDayTime(Vec::with_capacity(DEFAULT_CAPACITY))
404410
}
@@ -618,6 +624,8 @@ impl Decoder {
618624
#[cfg(feature = "avro_custom_types")]
619625
Self::IntervalDayTime(v) => v.push(IntervalDayTime::new(0, 0)),
620626
#[cfg(feature = "avro_custom_types")]
627+
Self::IntervalMonthDayNano(v) => v.push(IntervalMonthDayNano::new(0, 0, 0)),
628+
#[cfg(feature = "avro_custom_types")]
621629
Self::Time32Secs(v) | Self::IntervalYearMonth(v) => v.push(0),
622630
Self::Float32(v) | Self::Int32ToFloat32(v) | Self::Int64ToFloat32(v) => v.push(0.),
623631
Self::Float64(v)
@@ -781,7 +789,9 @@ impl Decoder {
781789
b.len()
782790
)));
783791
}
784-
v.push(u64::from_le_bytes(b.as_slice().try_into().unwrap()));
792+
v.push(u64::from_le_bytes([
793+
b[0], b[1], b[2], b[3], b[4], b[5], b[6], b[7],
794+
]));
785795
Ok(())
786796
}
787797
_ => Err(AvroError::InvalidArgument(
@@ -797,7 +807,7 @@ impl Decoder {
797807
b.len()
798808
)));
799809
}
800-
v.push(u16::from_le_bytes(b.as_slice().try_into().unwrap()));
810+
v.push(u16::from_le_bytes([b[0], b[1]]));
801811
Ok(())
802812
}
803813
_ => Err(AvroError::InvalidArgument(
@@ -833,14 +843,34 @@ impl Decoder {
833843
b.len()
834844
)));
835845
}
836-
v.push(i32::from_le_bytes(b.as_slice().try_into().unwrap()));
846+
v.push(i32::from_le_bytes([b[0], b[1], b[2], b[3]]));
837847
Ok(())
838848
}
839849
_ => Err(AvroError::InvalidArgument(
840850
"Default for interval-year-month must be bytes (4-byte LE)".to_string(),
841851
)),
842852
},
843853
#[cfg(feature = "avro_custom_types")]
854+
Self::IntervalMonthDayNano(v) => match lit {
855+
AvroLiteral::Bytes(b) => {
856+
if b.len() != 16 {
857+
return Err(AvroError::InvalidArgument(format!(
858+
"interval-month-day-nano default must be exactly 16 bytes, got {}",
859+
b.len()
860+
)));
861+
}
862+
let months = i32::from_le_bytes([b[0], b[1], b[2], b[3]]);
863+
let days = i32::from_le_bytes([b[4], b[5], b[6], b[7]]);
864+
let nanos =
865+
i64::from_le_bytes([b[8], b[9], b[10], b[11], b[12], b[13], b[14], b[15]]);
866+
v.push(IntervalMonthDayNano::new(months, days, nanos));
867+
Ok(())
868+
}
869+
_ => Err(AvroError::InvalidArgument(
870+
"Default for interval-month-day-nano must be bytes (16-byte LE)".to_string(),
871+
)),
872+
},
873+
#[cfg(feature = "avro_custom_types")]
844874
Self::IntervalDayTime(v) => match lit {
845875
AvroLiteral::Bytes(b) => {
846876
if b.len() != 8 {
@@ -849,8 +879,8 @@ impl Decoder {
849879
b.len()
850880
)));
851881
}
852-
let days = i32::from_le_bytes(b[0..4].try_into().unwrap());
853-
let milliseconds = i32::from_le_bytes(b[4..8].try_into().unwrap());
882+
let days = i32::from_le_bytes([b[0], b[1], b[2], b[3]]);
883+
let milliseconds = i32::from_le_bytes([b[4], b[5], b[6], b[7]]);
854884
v.push(IntervalDayTime::new(days, milliseconds));
855885
Ok(())
856886
}
@@ -1094,12 +1124,14 @@ impl Decoder {
10941124
#[cfg(feature = "avro_custom_types")]
10951125
Self::UInt64(values) => {
10961126
let b = buf.get_fixed(8)?;
1097-
values.push(u64::from_le_bytes(b.try_into().unwrap()));
1127+
values.push(u64::from_le_bytes([
1128+
b[0], b[1], b[2], b[3], b[4], b[5], b[6], b[7],
1129+
]));
10981130
}
10991131
#[cfg(feature = "avro_custom_types")]
11001132
Self::Float16(values) => {
11011133
let b = buf.get_fixed(2)?;
1102-
values.push(u16::from_le_bytes(b.try_into().unwrap()));
1134+
values.push(u16::from_le_bytes([b[0], b[1]]));
11031135
}
11041136
#[cfg(feature = "avro_custom_types")]
11051137
Self::Date64(values) | Self::TimeNanos(values) | Self::TimestampSecs(_, values) => {
@@ -1110,14 +1142,23 @@ impl Decoder {
11101142
#[cfg(feature = "avro_custom_types")]
11111143
Self::IntervalYearMonth(values) => {
11121144
let b = buf.get_fixed(4)?;
1113-
values.push(i32::from_le_bytes(b.try_into().unwrap()));
1145+
values.push(i32::from_le_bytes([b[0], b[1], b[2], b[3]]));
1146+
}
1147+
#[cfg(feature = "avro_custom_types")]
1148+
Self::IntervalMonthDayNano(values) => {
1149+
let b = buf.get_fixed(16)?;
1150+
let months = i32::from_le_bytes([b[0], b[1], b[2], b[3]]);
1151+
let days = i32::from_le_bytes([b[4], b[5], b[6], b[7]]);
1152+
let nanos =
1153+
i64::from_le_bytes([b[8], b[9], b[10], b[11], b[12], b[13], b[14], b[15]]);
1154+
values.push(IntervalMonthDayNano::new(months, days, nanos));
11141155
}
11151156
#[cfg(feature = "avro_custom_types")]
11161157
Self::IntervalDayTime(values) => {
11171158
let b = buf.get_fixed(8)?;
11181159
// Read as two i32s: days (4 bytes) and milliseconds (4 bytes)
1119-
let days = i32::from_le_bytes(b[0..4].try_into().unwrap());
1120-
let milliseconds = i32::from_le_bytes(b[4..8].try_into().unwrap());
1160+
let days = i32::from_le_bytes([b[0], b[1], b[2], b[3]]);
1161+
let milliseconds = i32::from_le_bytes([b[4], b[5], b[6], b[7]]);
11211162
values.push(IntervalDayTime::new(days, milliseconds));
11221163
}
11231164
Self::Float32(values) => values.push(buf.get_float()?),
@@ -1205,9 +1246,9 @@ impl Decoder {
12051246
}
12061247
Self::Duration(builder) => {
12071248
let b = buf.get_fixed(12)?;
1208-
let months = u32::from_le_bytes(b[0..4].try_into().unwrap());
1209-
let days = u32::from_le_bytes(b[4..8].try_into().unwrap());
1210-
let millis = u32::from_le_bytes(b[8..12].try_into().unwrap());
1249+
let months = u32::from_le_bytes([b[0], b[1], b[2], b[3]]);
1250+
let days = u32::from_le_bytes([b[4], b[5], b[6], b[7]]);
1251+
let millis = u32::from_le_bytes([b[8], b[9], b[10], b[11]]);
12111252
let nanos = (millis as i64) * 1_000_000;
12121253
builder.append_value(IntervalMonthDayNano::new(months as i32, days as i32, nanos));
12131254
}
@@ -1366,10 +1407,9 @@ impl Decoder {
13661407
Self::Float16(values) => {
13671408
// Convert Vec<u16> to Float16Array by reinterpreting the raw bytes.
13681409
// This is safe because f16 and u16 have the same size and alignment.
1410+
let len = values.len();
13691411
let buf: Buffer = std::mem::take(values).into();
1370-
// SAFETY: u16 and f16 have the same size (2 bytes) and alignment.
1371-
let scalar_buf: ScalarBuffer<<Float16Type as ArrowPrimitiveType>::Native> =
1372-
unsafe { ScalarBuffer::new_unchecked(buf) };
1412+
let scalar_buf = ScalarBuffer::new(buf, 0, len);
13731413
Arc::new(Float16Array::new(scalar_buf, nulls))
13741414
}
13751415
#[cfg(feature = "avro_custom_types")]
@@ -1392,6 +1432,10 @@ impl Decoder {
13921432
Arc::new(flush_primitive::<IntervalYearMonthType>(values, nulls))
13931433
}
13941434
#[cfg(feature = "avro_custom_types")]
1435+
Self::IntervalMonthDayNano(values) => {
1436+
Arc::new(flush_primitive::<IntervalMonthDayNanoType>(values, nulls))
1437+
}
1438+
#[cfg(feature = "avro_custom_types")]
13951439
Self::IntervalDayTime(values) => {
13961440
Arc::new(flush_primitive::<IntervalDayTimeType>(values, nulls))
13971441
}
@@ -2314,6 +2358,8 @@ impl Skipper {
23142358
#[cfg(feature = "avro_custom_types")]
23152359
Codec::IntervalYearMonth => Self::Fixed(4),
23162360
#[cfg(feature = "avro_custom_types")]
2361+
Codec::IntervalMonthDayNano => Self::Fixed(16),
2362+
#[cfg(feature = "avro_custom_types")]
23172363
Codec::IntervalDayTime => Self::Fixed(8),
23182364
Codec::Float32 => Self::Float32,
23192365
Codec::Float64 => Self::Float64,
@@ -3599,6 +3645,46 @@ mod tests {
35993645
assert_eq!(interval_array, &expected);
36003646
}
36013647

3648+
#[cfg(feature = "avro_custom_types")]
3649+
#[test]
3650+
fn test_interval_month_day_nano_custom_decoding_with_nulls() {
3651+
let avro_type = AvroDataType::new(
3652+
Codec::IntervalMonthDayNano,
3653+
Default::default(),
3654+
Some(Nullability::NullFirst),
3655+
);
3656+
let mut decoder = Decoder::try_new(&avro_type).unwrap();
3657+
let mut data = Vec::new();
3658+
// First value: months=1, days=-2, nanos=3
3659+
data.extend_from_slice(&encode_avro_long(1));
3660+
data.extend_from_slice(&1i32.to_le_bytes());
3661+
data.extend_from_slice(&(-2i32).to_le_bytes());
3662+
data.extend_from_slice(&3i64.to_le_bytes());
3663+
// Second value: null
3664+
data.extend_from_slice(&encode_avro_long(0));
3665+
// Third value: months=-4, days=5, nanos=-6
3666+
data.extend_from_slice(&encode_avro_long(1));
3667+
data.extend_from_slice(&(-4i32).to_le_bytes());
3668+
data.extend_from_slice(&5i32.to_le_bytes());
3669+
data.extend_from_slice(&(-6i64).to_le_bytes());
3670+
let mut cursor = AvroCursor::new(&data);
3671+
decoder.decode(&mut cursor).unwrap();
3672+
decoder.decode(&mut cursor).unwrap();
3673+
decoder.decode(&mut cursor).unwrap();
3674+
let array = decoder.flush(None).unwrap();
3675+
let interval_array = array
3676+
.as_any()
3677+
.downcast_ref::<IntervalMonthDayNanoArray>()
3678+
.unwrap();
3679+
assert_eq!(interval_array.len(), 3);
3680+
let expected = IntervalMonthDayNanoArray::from(vec![
3681+
Some(IntervalMonthDayNano::new(1, -2, 3)),
3682+
None,
3683+
Some(IntervalMonthDayNano::new(-4, 5, -6)),
3684+
]);
3685+
assert_eq!(interval_array, &expected);
3686+
}
3687+
36023688
#[test]
36033689
fn test_duration_decoding_empty() {
36043690
let duration_codec = Codec::Interval;

0 commit comments

Comments
 (0)