From 40ae6b264bedee84134eb98f057b1381935ac523 Mon Sep 17 00:00:00 2001 From: shekharrajak Date: Thu, 28 May 2026 23:56:06 +0530 Subject: [PATCH 1/4] [FLINK-39789][formats-avro] convertToDataType honors local-timestamp logical types --- .../formats/avro/typeutils/AvroSchemaConverter.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java index 8bf5cbe2c07f7..8fe03cced9d14 100644 --- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java +++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java @@ -333,8 +333,13 @@ private static DataType convertToDataType(Schema schema, boolean legacyMapping) } return DataTypes.INT().notNull(); case LONG: + // Local timestamps are unambiguous per the Avro spec; honor in both mappings. + if (schema.getLogicalType() == LogicalTypes.localTimestampMillis()) { + return DataTypes.TIMESTAMP(3).notNull(); + } else if (schema.getLogicalType() == LogicalTypes.localTimestampMicros()) { + return DataTypes.TIMESTAMP(6).notNull(); + } if (legacyMapping) { - // Avro logical timestamp types to Flink SQL timestamp types if (schema.getLogicalType() == LogicalTypes.timestampMillis()) { return DataTypes.TIMESTAMP(3).notNull(); } else if (schema.getLogicalType() == LogicalTypes.timestampMicros()) { @@ -345,7 +350,6 @@ private static DataType convertToDataType(Schema schema, boolean legacyMapping) return DataTypes.TIME(6).notNull(); } } else { - // Avro logical timestamp types to Flink SQL timestamp types if (schema.getLogicalType() == LogicalTypes.timestampMillis()) { return DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).notNull(); } else if (schema.getLogicalType() == LogicalTypes.timestampMicros()) { @@ -354,10 +358,6 @@ private static DataType convertToDataType(Schema schema, boolean legacyMapping) return DataTypes.TIME(3).notNull(); } else if (schema.getLogicalType() == LogicalTypes.timeMicros()) { return DataTypes.TIME(6).notNull(); - } else if (schema.getLogicalType() == LogicalTypes.localTimestampMillis()) { - return DataTypes.TIMESTAMP(3).notNull(); - } else if (schema.getLogicalType() == LogicalTypes.localTimestampMicros()) { - return DataTypes.TIMESTAMP(6).notNull(); } } From 3353306e5ccde5459af27f29679c0db4085ef40f Mon Sep 17 00:00:00 2001 From: shekharrajak Date: Thu, 28 May 2026 23:56:35 +0530 Subject: [PATCH 2/4] [FLINK-39789][formats-avro] convertToTypeInfo honors local-timestamp logical types --- .../formats/avro/typeutils/AvroSchemaConverter.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java index 8fe03cced9d14..30f6eb2aa39eb 100644 --- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java +++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java @@ -192,6 +192,11 @@ private static TypeInformation convertToTypeInfo( } return Types.INT; case LONG: + // Local timestamps are unambiguous per the Avro spec; honor in both mappings. + if (schema.getLogicalType() == LogicalTypes.localTimestampMillis() + || schema.getLogicalType() == LogicalTypes.localTimestampMicros()) { + return Types.LOCAL_DATE_TIME; + } if (legacyTimestampMapping) { if (schema.getLogicalType() == LogicalTypes.timestampMillis() || schema.getLogicalType() == LogicalTypes.timestampMicros()) { @@ -201,13 +206,9 @@ private static TypeInformation convertToTypeInfo( return Types.SQL_TIME; } } else { - // Avro logical timestamp types to Flink DataStream timestamp types if (schema.getLogicalType() == LogicalTypes.timestampMillis() || schema.getLogicalType() == LogicalTypes.timestampMicros()) { return Types.INSTANT; - } else if (schema.getLogicalType() == LogicalTypes.localTimestampMillis() - || schema.getLogicalType() == LogicalTypes.localTimestampMicros()) { - return Types.LOCAL_DATE_TIME; } else if (schema.getLogicalType() == LogicalTypes.timeMicros() || schema.getLogicalType() == LogicalTypes.timeMillis()) { return Types.SQL_TIME; From c640bca2999814f1e00f3682a8a9883c35623e5c Mon Sep 17 00:00:00 2001 From: shekharrajak Date: Thu, 28 May 2026 23:58:56 +0530 Subject: [PATCH 3/4] [FLINK-39789][formats-avro][test] Cover local-timestamp via single-arg public API --- ...maConverterLocalTimestampBehaviorTest.java | 159 ++++++++++++++++++ 1 file changed, 159 insertions(+) create mode 100644 flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterLocalTimestampBehaviorTest.java diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterLocalTimestampBehaviorTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterLocalTimestampBehaviorTest.java new file mode 100644 index 0000000000000..d485d6b3c8ca5 --- /dev/null +++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterLocalTimestampBehaviorTest.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.formats.avro.typeutils; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatCode; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.RowType; +import org.junit.jupiter.api.Test; + +class AvroSchemaConverterLocalTimestampBehaviorTest { + + private static String record(String fieldName, String logicalType) { + return "{ \"type\": \"record\", \"name\": \"R\", \"fields\": [" + + "{ \"name\": \"" + + fieldName + + "\", \"type\": { \"type\": \"long\", \"logicalType\": \"" + + logicalType + + "\" } } ] }"; + } + + private static DataType firstFieldType(DataType row) { + RowType rowType = (RowType) row.getLogicalType(); + return DataTypes.of(rowType.getTypeAt(0)).notNull(); + } + + @Test + void readTimestampMillisSingleArg() { + DataType dt = AvroSchemaConverter.convertToDataType(record("ts", "timestamp-millis")); + assertThat(firstFieldType(dt)).isEqualTo(DataTypes.TIMESTAMP(3).notNull()); + } + + @Test + void readTimestampMicrosSingleArg() { + DataType dt = AvroSchemaConverter.convertToDataType(record("ts", "timestamp-micros")); + assertThat(firstFieldType(dt)).isEqualTo(DataTypes.TIMESTAMP(6).notNull()); + } + + @Test + void readLocalTimestampMillisSingleArg() { + DataType dt = + AvroSchemaConverter.convertToDataType(record("ts", "local-timestamp-millis")); + assertThat(firstFieldType(dt)).isEqualTo(DataTypes.TIMESTAMP(3).notNull()); + } + + @Test + void readLocalTimestampMicrosSingleArg() { + DataType dt = + AvroSchemaConverter.convertToDataType(record("ts", "local-timestamp-micros")); + assertThat(firstFieldType(dt)).isEqualTo(DataTypes.TIMESTAMP(6).notNull()); + } + + @Test + void readTimestampMillisTwoArgLegacyFalse() { + DataType dt = + AvroSchemaConverter.convertToDataType(record("ts", "timestamp-millis"), false); + assertThat(firstFieldType(dt)) + .isEqualTo(DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).notNull()); + } + + @Test + void readLocalTimestampMillisTwoArgLegacyFalse() { + DataType dt = + AvroSchemaConverter.convertToDataType( + record("ts", "local-timestamp-millis"), false); + assertThat(firstFieldType(dt)).isEqualTo(DataTypes.TIMESTAMP(3).notNull()); + } + + @Test + void readLocalTimestampMicrosTwoArgLegacyFalse() { + DataType dt = + AvroSchemaConverter.convertToDataType( + record("ts", "local-timestamp-micros"), false); + assertThat(firstFieldType(dt)).isEqualTo(DataTypes.TIMESTAMP(6).notNull()); + } + + @Test + void readLocalTimestampMillisTwoArgLegacyTrue() { + DataType dt = + AvroSchemaConverter.convertToDataType(record("ts", "local-timestamp-millis"), true); + assertThat(firstFieldType(dt)).isEqualTo(DataTypes.TIMESTAMP(3).notNull()); + } + + @Test + void typeInfoLocalTimestampMicrosSingleArg() { + TypeInformation ti = + AvroSchemaConverter.convertToTypeInfo(record("ts", "local-timestamp-micros")); + RowTypeInfoLike.assertFirstField(ti, Types.LOCAL_DATE_TIME); + } + + @Test + void typeInfoLocalTimestampMicrosTwoArgLegacyFalse() { + TypeInformation ti = + AvroSchemaConverter.convertToTypeInfo( + record("ts", "local-timestamp-micros"), false); + RowTypeInfoLike.assertFirstField(ti, Types.LOCAL_DATE_TIME); + } + + @Test + void typeInfoLocalTimestampMicrosTwoArgLegacyTrue() { + TypeInformation ti = + AvroSchemaConverter.convertToTypeInfo( + record("ts", "local-timestamp-micros"), true); + RowTypeInfoLike.assertFirstField(ti, Types.LOCAL_DATE_TIME); + } + + @Test + void fieldGetterFromConvertedRowTypeAcceptsTimestampData() { + DataType dt = + AvroSchemaConverter.convertToDataType(record("ts", "local-timestamp-micros")); + RowType row = (RowType) dt.getLogicalType(); + RowData.FieldGetter getter = RowData.createFieldGetter(row.getTypeAt(0), 0); + GenericRowData data = new GenericRowData(1); + data.setField(0, TimestampData.fromEpochMillis(0L)); + assertThatCode(() -> getter.getFieldOrNull(data)).doesNotThrowAnyException(); + } + + @Test + void roundTripTimestampMicrosTwoArgLegacyFalse() { + org.apache.avro.Schema schema = + AvroSchemaConverter.convertToSchema( + DataTypes.ROW(DataTypes.FIELD("ts", DataTypes.TIMESTAMP(6).notNull())) + .getLogicalType(), + false); + DataType dt = AvroSchemaConverter.convertToDataType(schema.toString(), false); + assertThat(firstFieldType(dt)).isEqualTo(DataTypes.TIMESTAMP(6).notNull()); + } + + private static final class RowTypeInfoLike { + static void assertFirstField(TypeInformation rowTypeInfo, TypeInformation expected) { + assertThat(rowTypeInfo).isInstanceOf(org.apache.flink.api.java.typeutils.RowTypeInfo.class); + org.apache.flink.api.java.typeutils.RowTypeInfo r = + (org.apache.flink.api.java.typeutils.RowTypeInfo) rowTypeInfo; + assertThat(r.getTypeAt(0)).isEqualTo(expected); + } + } +} From dc5e6deed3fe2572d88e1f2832f4f4a2d468cff7 Mon Sep 17 00:00:00 2001 From: shekharrajak Date: Fri, 29 May 2026 00:06:33 +0530 Subject: [PATCH 4/4] [FLINK-39789][formats-avro][test] Update legacy-mapping expectations for local-timestamp --- .../formats/avro/typeutils/AvroSchemaConverterTest.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterTest.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterTest.java index 0cf0033341c3b..251d302f7cf96 100644 --- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterTest.java +++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverterTest.java @@ -671,8 +671,8 @@ private void validateLegacyTimestampsSchema(TypeInformation actual) { }, Types.SQL_TIMESTAMP, Types.SQL_TIMESTAMP, - Types.LONG, - Types.LONG); + Types.LOCAL_DATE_TIME, + Types.LOCAL_DATE_TIME); final RowTypeInfo timestampsRowTypeInfo = (RowTypeInfo) timestamps; assertThat(timestampsRowTypeInfo.schemaEquals(actual)).isTrue(); } @@ -686,10 +686,10 @@ private void validateLegacyTimestampsSchema(DataType actual) { "type_timestamp_micros", DataTypes.TIMESTAMP(6).notNull()), DataTypes.FIELD( "type_local_timestamp_millis", - DataTypes.BIGINT().notNull()), + DataTypes.TIMESTAMP(3).notNull()), DataTypes.FIELD( "type_local_timestamp_micros", - DataTypes.BIGINT().notNull())) + DataTypes.TIMESTAMP(6).notNull())) .notNull(); assertThat(actual).isEqualTo(timestamps);