Skip to content

Commit 49cc7b8

Browse files
committed
Add support for custom Interval(MonthDayNano) encoding under avro_custom_types feature flag. Updates schema handling, encoders, and readers to leverage Arrow-native fixed(16) representation for custom logical type, preserving full range and signed values. Adds unit tests for round-trip serialization/deserialization.
1 parent 104368d commit 49cc7b8

5 files changed

Lines changed: 320 additions & 12 deletions

File tree

arrow-avro/src/codec.rs

Lines changed: 47 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -399,6 +399,10 @@ impl AvroDataType {
399399
AvroLiteral::Bytes(parse_bytes_default(default_json, Some(4))?)
400400
}
401401
#[cfg(feature = "avro_custom_types")]
402+
Codec::IntervalMonthDayNano => {
403+
AvroLiteral::Bytes(parse_bytes_default(default_json, Some(16))?)
404+
}
405+
#[cfg(feature = "avro_custom_types")]
402406
Codec::IntervalDayTime => {
403407
AvroLiteral::Bytes(parse_bytes_default(default_json, Some(8))?)
404408
}
@@ -781,6 +785,9 @@ pub(crate) enum Codec {
781785
/// Arrow Interval(YearMonth) custom logical type (arrow.interval-year-month)
782786
#[cfg(feature = "avro_custom_types")]
783787
IntervalYearMonth,
788+
/// Arrow Interval(MonthDayNano) custom logical type (arrow.interval-month-day-nano)
789+
#[cfg(feature = "avro_custom_types")]
790+
IntervalMonthDayNano,
784791
/// Arrow Interval(DayTime) custom logical type (arrow.interval-day-time)
785792
#[cfg(feature = "avro_custom_types")]
786793
IntervalDayTime,
@@ -907,6 +914,8 @@ impl Codec {
907914
#[cfg(feature = "avro_custom_types")]
908915
Self::IntervalYearMonth => DataType::Interval(IntervalUnit::YearMonth),
909916
#[cfg(feature = "avro_custom_types")]
917+
Self::IntervalMonthDayNano => DataType::Interval(IntervalUnit::MonthDayNano),
918+
#[cfg(feature = "avro_custom_types")]
910919
Self::IntervalDayTime => DataType::Interval(IntervalUnit::DayTime),
911920
}
912921
}
@@ -1098,9 +1107,11 @@ impl From<&Codec> for UnionFieldKind {
10981107
#[cfg(feature = "avro_custom_types")]
10991108
Codec::Time32Secs => Self::TimeMillis, // Closest standard type
11001109
#[cfg(feature = "avro_custom_types")]
1101-
Codec::UInt64 | Codec::Float16 | Codec::IntervalYearMonth | Codec::IntervalDayTime => {
1102-
Self::Fixed
1103-
}
1110+
Codec::UInt64
1111+
| Codec::Float16
1112+
| Codec::IntervalYearMonth
1113+
| Codec::IntervalMonthDayNano
1114+
| Codec::IntervalDayTime => Self::Fixed,
11041115
}
11051116
}
11061117
}
@@ -1492,6 +1503,13 @@ impl<'a> Maker<'a> {
14921503
resolution: None,
14931504
},
14941505
#[cfg(feature = "avro_custom_types")]
1506+
Some("arrow.interval-month-day-nano") if size == 16 => AvroDataType {
1507+
nullability: None,
1508+
metadata,
1509+
codec: Codec::IntervalMonthDayNano,
1510+
resolution: None,
1511+
},
1512+
#[cfg(feature = "avro_custom_types")]
14951513
Some("arrow.interval-day-time") if size == 8 => AvroDataType {
14961514
nullability: None,
14971515
metadata,
@@ -1646,6 +1664,10 @@ impl<'a> Maker<'a> {
16461664
*c = Codec::IntervalYearMonth
16471665
}
16481666
#[cfg(feature = "avro_custom_types")]
1667+
(Some("arrow.interval-month-day-nano"), c @ Codec::Fixed(16)) => {
1668+
*c = Codec::IntervalMonthDayNano
1669+
}
1670+
#[cfg(feature = "avro_custom_types")]
16491671
(Some("arrow.interval-day-time"), c @ Codec::Fixed(8)) => {
16501672
*c = Codec::IntervalDayTime
16511673
}
@@ -3228,6 +3250,28 @@ mod tests {
32283250
assert!(matches!(dt.codec(), Codec::Fixed(16)));
32293251
}
32303252

3253+
#[cfg(feature = "avro_custom_types")]
3254+
#[test]
3255+
fn test_interval_month_day_nano_custom_logical_type_fixed16() {
3256+
let schema = Schema::Complex(ComplexType::Fixed(Fixed {
3257+
name: "ArrowIntervalMDN",
3258+
namespace: None,
3259+
aliases: vec![],
3260+
size: 16,
3261+
attributes: Attributes {
3262+
logical_type: Some("arrow.interval-month-day-nano"),
3263+
additional: Default::default(),
3264+
},
3265+
}));
3266+
let mut maker = Maker::new(false, false);
3267+
let dt = maker.make_data_type(&schema, None, None).unwrap();
3268+
assert!(matches!(dt.codec(), Codec::IntervalMonthDayNano));
3269+
assert_eq!(
3270+
dt.codec.data_type(),
3271+
DataType::Interval(IntervalUnit::MonthDayNano)
3272+
);
3273+
}
3274+
32313275
#[test]
32323276
fn test_resolve_records_mapping_default_fields_and_skip_fields() {
32333277
let writer = Schema::Complex(ComplexType::Record(Record {

arrow-avro/src/reader/record.rs

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,8 @@ enum Decoder {
224224
#[cfg(feature = "avro_custom_types")]
225225
IntervalYearMonth(Vec<i32>),
226226
#[cfg(feature = "avro_custom_types")]
227+
IntervalMonthDayNano(Vec<IntervalMonthDayNano>),
228+
#[cfg(feature = "avro_custom_types")]
227229
IntervalDayTime(Vec<IntervalDayTime>),
228230
Float32(Vec<f32>),
229231
Float64(Vec<f64>),
@@ -399,6 +401,10 @@ impl Decoder {
399401
Self::IntervalYearMonth(Vec::with_capacity(DEFAULT_CAPACITY))
400402
}
401403
#[cfg(feature = "avro_custom_types")]
404+
(Codec::IntervalMonthDayNano, _) => {
405+
Self::IntervalMonthDayNano(Vec::with_capacity(DEFAULT_CAPACITY))
406+
}
407+
#[cfg(feature = "avro_custom_types")]
402408
(Codec::IntervalDayTime, _) => {
403409
Self::IntervalDayTime(Vec::with_capacity(DEFAULT_CAPACITY))
404410
}
@@ -618,6 +624,8 @@ impl Decoder {
618624
#[cfg(feature = "avro_custom_types")]
619625
Self::IntervalDayTime(v) => v.push(IntervalDayTime::new(0, 0)),
620626
#[cfg(feature = "avro_custom_types")]
627+
Self::IntervalMonthDayNano(v) => v.push(IntervalMonthDayNano::new(0, 0, 0)),
628+
#[cfg(feature = "avro_custom_types")]
621629
Self::Time32Secs(v) | Self::IntervalYearMonth(v) => v.push(0),
622630
Self::Float32(v) | Self::Int32ToFloat32(v) | Self::Int64ToFloat32(v) => v.push(0.),
623631
Self::Float64(v)
@@ -841,6 +849,25 @@ impl Decoder {
841849
)),
842850
},
843851
#[cfg(feature = "avro_custom_types")]
852+
Self::IntervalMonthDayNano(v) => match lit {
853+
AvroLiteral::Bytes(b) => {
854+
if b.len() != 16 {
855+
return Err(AvroError::InvalidArgument(format!(
856+
"interval-month-day-nano default must be exactly 16 bytes, got {}",
857+
b.len()
858+
)));
859+
}
860+
let months = i32::from_le_bytes(b[0..4].try_into().unwrap());
861+
let days = i32::from_le_bytes(b[4..8].try_into().unwrap());
862+
let nanos = i64::from_le_bytes(b[8..16].try_into().unwrap());
863+
v.push(IntervalMonthDayNano::new(months, days, nanos));
864+
Ok(())
865+
}
866+
_ => Err(AvroError::InvalidArgument(
867+
"Default for interval-month-day-nano must be bytes (16-byte LE)".to_string(),
868+
)),
869+
},
870+
#[cfg(feature = "avro_custom_types")]
844871
Self::IntervalDayTime(v) => match lit {
845872
AvroLiteral::Bytes(b) => {
846873
if b.len() != 8 {
@@ -1113,6 +1140,14 @@ impl Decoder {
11131140
values.push(i32::from_le_bytes(b.try_into().unwrap()));
11141141
}
11151142
#[cfg(feature = "avro_custom_types")]
1143+
Self::IntervalMonthDayNano(values) => {
1144+
let b = buf.get_fixed(16)?;
1145+
let months = i32::from_le_bytes(b[0..4].try_into().unwrap());
1146+
let days = i32::from_le_bytes(b[4..8].try_into().unwrap());
1147+
let nanos = i64::from_le_bytes(b[8..16].try_into().unwrap());
1148+
values.push(IntervalMonthDayNano::new(months, days, nanos));
1149+
}
1150+
#[cfg(feature = "avro_custom_types")]
11161151
Self::IntervalDayTime(values) => {
11171152
let b = buf.get_fixed(8)?;
11181153
// Read as two i32s: days (4 bytes) and milliseconds (4 bytes)
@@ -1392,6 +1427,10 @@ impl Decoder {
13921427
Arc::new(flush_primitive::<IntervalYearMonthType>(values, nulls))
13931428
}
13941429
#[cfg(feature = "avro_custom_types")]
1430+
Self::IntervalMonthDayNano(values) => {
1431+
Arc::new(flush_primitive::<IntervalMonthDayNanoType>(values, nulls))
1432+
}
1433+
#[cfg(feature = "avro_custom_types")]
13951434
Self::IntervalDayTime(values) => {
13961435
Arc::new(flush_primitive::<IntervalDayTimeType>(values, nulls))
13971436
}
@@ -2314,6 +2353,8 @@ impl Skipper {
23142353
#[cfg(feature = "avro_custom_types")]
23152354
Codec::IntervalYearMonth => Self::Fixed(4),
23162355
#[cfg(feature = "avro_custom_types")]
2356+
Codec::IntervalMonthDayNano => Self::Fixed(16),
2357+
#[cfg(feature = "avro_custom_types")]
23172358
Codec::IntervalDayTime => Self::Fixed(8),
23182359
Codec::Float32 => Self::Float32,
23192360
Codec::Float64 => Self::Float64,
@@ -3599,6 +3640,46 @@ mod tests {
35993640
assert_eq!(interval_array, &expected);
36003641
}
36013642

3643+
#[cfg(feature = "avro_custom_types")]
3644+
#[test]
3645+
fn test_interval_month_day_nano_custom_decoding_with_nulls() {
3646+
let avro_type = AvroDataType::new(
3647+
Codec::IntervalMonthDayNano,
3648+
Default::default(),
3649+
Some(Nullability::NullFirst),
3650+
);
3651+
let mut decoder = Decoder::try_new(&avro_type).unwrap();
3652+
let mut data = Vec::new();
3653+
// First value: months=1, days=-2, nanos=3
3654+
data.extend_from_slice(&encode_avro_long(1));
3655+
data.extend_from_slice(&1i32.to_le_bytes());
3656+
data.extend_from_slice(&(-2i32).to_le_bytes());
3657+
data.extend_from_slice(&3i64.to_le_bytes());
3658+
// Second value: null
3659+
data.extend_from_slice(&encode_avro_long(0));
3660+
// Third value: months=-4, days=5, nanos=-6
3661+
data.extend_from_slice(&encode_avro_long(1));
3662+
data.extend_from_slice(&(-4i32).to_le_bytes());
3663+
data.extend_from_slice(&5i32.to_le_bytes());
3664+
data.extend_from_slice(&(-6i64).to_le_bytes());
3665+
let mut cursor = AvroCursor::new(&data);
3666+
decoder.decode(&mut cursor).unwrap();
3667+
decoder.decode(&mut cursor).unwrap();
3668+
decoder.decode(&mut cursor).unwrap();
3669+
let array = decoder.flush(None).unwrap();
3670+
let interval_array = array
3671+
.as_any()
3672+
.downcast_ref::<IntervalMonthDayNanoArray>()
3673+
.unwrap();
3674+
assert_eq!(interval_array.len(), 3);
3675+
let expected = IntervalMonthDayNanoArray::from(vec![
3676+
Some(IntervalMonthDayNano::new(1, -2, 3)),
3677+
None,
3678+
Some(IntervalMonthDayNano::new(-4, 5, -6)),
3679+
]);
3680+
assert_eq!(interval_array, &expected);
3681+
}
3682+
36023683
#[test]
36033684
fn test_duration_decoding_empty() {
36043685
let duration_codec = Codec::Interval;

arrow-avro/src/schema.rs

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1547,6 +1547,7 @@ fn datatype_to_avro(
15471547
};
15481548
json!({ "type": "long", "logicalType": logical_type })
15491549
}
1550+
#[cfg(not(feature = "avro_custom_types"))]
15501551
DataType::Interval(IntervalUnit::MonthDayNano) => {
15511552
// Avro duration logical type: fixed(12) with months/days/millis per spec.
15521553
let chosen_name = metadata
@@ -1564,6 +1565,25 @@ fn datatype_to_avro(
15641565
}
15651566
json!(obj)
15661567
}
1568+
#[cfg(feature = "avro_custom_types")]
1569+
DataType::Interval(IntervalUnit::MonthDayNano) => {
1570+
// Arrow MonthDayNano interval: i32 months + i32 days + i64 nanos (16 bytes).
1571+
// We preserve the Arrow native representation via a custom logical type.
1572+
let chosen_name = metadata
1573+
.get(AVRO_NAME_METADATA_KEY)
1574+
.map(|s| sanitise_avro_name(s))
1575+
.unwrap_or_else(|| name_gen.make_unique(field_name));
1576+
let mut obj = JsonMap::from_iter([
1577+
("type".into(), json!("fixed")),
1578+
("name".into(), json!(chosen_name)),
1579+
("size".into(), json!(16)),
1580+
("logicalType".into(), json!("arrow.interval-month-day-nano")),
1581+
]);
1582+
if let Some(ns) = metadata.get(AVRO_NAMESPACE_METADATA_KEY) {
1583+
obj.insert("namespace".into(), json!(ns));
1584+
}
1585+
json!(obj)
1586+
}
15671587
#[cfg(not(feature = "avro_custom_types"))]
15681588
DataType::Interval(IntervalUnit::YearMonth) => {
15691589
// Encode as Avro `duration` (fixed(12)) like MonthDayNano
@@ -2751,9 +2771,9 @@ mod tests {
27512771
assert_json_contains(&avro_uuid.json_string, "\"logicalType\":\"uuid\"");
27522772
}
27532773

2754-
#[cfg(feature = "avro_custom_types")]
2774+
#[cfg(not(feature = "avro_custom_types"))]
27552775
#[test]
2756-
fn test_interval_duration() {
2776+
fn test_interval_month_day_nano_duration_schema() {
27572777
let interval_field = ArrowField::new(
27582778
"span",
27592779
DataType::Interval(IntervalUnit::MonthDayNano),
@@ -2763,6 +2783,28 @@ mod tests {
27632783
let avro = AvroSchema::try_from(&s).unwrap();
27642784
assert_json_contains(&avro.json_string, "\"logicalType\":\"duration\"");
27652785
assert_json_contains(&avro.json_string, "\"size\":12");
2786+
}
2787+
2788+
#[cfg(feature = "avro_custom_types")]
2789+
#[test]
2790+
fn test_interval_month_day_nano_custom_schema() {
2791+
let interval_field = ArrowField::new(
2792+
"span",
2793+
DataType::Interval(IntervalUnit::MonthDayNano),
2794+
false,
2795+
);
2796+
let s = single_field_schema(interval_field);
2797+
let avro = AvroSchema::try_from(&s).unwrap();
2798+
assert_json_contains(
2799+
&avro.json_string,
2800+
"\"logicalType\":\"arrow.interval-month-day-nano\"",
2801+
);
2802+
assert_json_contains(&avro.json_string, "\"size\":16");
2803+
}
2804+
2805+
#[cfg(feature = "avro_custom_types")]
2806+
#[test]
2807+
fn test_duration_custom_logical_type() {
27662808
let dur_field = ArrowField::new("latency", DataType::Duration(TimeUnit::Nanosecond), false);
27672809
let s2 = single_field_schema(dur_field);
27682810
let avro2 = AvroSchema::try_from(&s2).unwrap();

0 commit comments

Comments
 (0)