From 716be9510081336a3e16c0ff7952a6a196bad571 Mon Sep 17 00:00:00 2001 From: Jordan Epstein Date: Sat, 6 Jun 2026 08:38:25 -0500 Subject: [PATCH 1/2] [flink] Fix RemoteTableQuery key serializer to use trimmed primary keys (#8145) RemoteTableQuery built its lookup-key serializer from the table's full primary keys (table.primaryKeys()), but the lookup contract (PrimaryKeyPartialLookupTable#get) passes lookup() the trimmed primary key (primary keys minus partition keys -- the native LSM key within a (partition, bucket)). For a partitioned primary-key table these differ, so the serializer reads the wrong fields: it interprets field 0 of the trimmed key as the first full-PK field. When the partition key and trimmed key happen to share a type this silently produces a usable key (the extra trailing field is ignored on the server side), which is why it went unnoticed for unpartitioned tables and same-type keys. When the types differ -- e.g. PRIMARY KEY (pt INT, k STRING) PARTITIONED BY (pt), where the trimmed key is (k STRING) -- it throws ClassCastException in InternalRowSerializer.toBinaryRow. The local executor (LocalTableQuery) consumes the trimmed key natively, so only the remote path is affected. Build keySerializer from schema().trimmedPrimaryKeys() instead, matching the key the caller actually provides. Unpartitioned tables are unaffected, since trimmed primary keys equal the full primary keys there. Add RemoteLookupJoinITCase#testLookupPartitionedRemoteTable covering a partitioned table with a differently-typed trimmed key; it fails with the ClassCastException before this change and passes after. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../paimon/flink/query/RemoteTableQuery.java | 4 +- .../paimon/flink/RemoteLookupJoinITCase.java | 38 +++++++++++++++++++ 2 files changed, 41 insertions(+), 1 deletion(-) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/query/RemoteTableQuery.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/query/RemoteTableQuery.java index 34d993eb3db2..fca626de1879 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/query/RemoteTableQuery.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/query/RemoteTableQuery.java @@ -54,7 +54,9 @@ public RemoteTableQuery(Table table) { ServiceManager manager = this.table.store().newServiceManager(); this.client = new KvQueryClient(new QueryLocationImpl(manager), 1); this.keySerializer = - InternalSerializers.create(TypeUtils.project(table.rowType(), table.primaryKeys())); + InternalSerializers.create( + TypeUtils.project( + this.table.rowType(), this.table.schema().trimmedPrimaryKeys())); } public static boolean isRemoteServiceAvailable(FileStoreTable table) { diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RemoteLookupJoinITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RemoteLookupJoinITCase.java index df9a7ed59be1..ceadb743b607 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RemoteLookupJoinITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RemoteLookupJoinITCase.java @@ -18,6 +18,7 @@ package org.apache.paimon.flink; +import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalRow; import org.apache.paimon.disk.IOManager; @@ -165,6 +166,43 @@ public void testServiceFileCleaned() throws Exception { assertThat(serviceManager.service(PRIMARY_KEY_LOOKUP).isPresent()).isFalse(); } + @Test + public void testLookupPartitionedRemoteTable() throws Throwable { + // Partitioned primary-key table whose partition key and trimmed primary key have different + // types: primaryKeys = (pt INT, k STRING), partitionKeys = (pt), so the trimmed primary key + // handed to lookup is (k STRING). A key serializer built over the full primary key reads + // field 0 as INT and breaks on the STRING key. + sql( + "CREATE TABLE DIMP (pt INT, k STRING, v INT, PRIMARY KEY (pt, k) NOT ENFORCED) " + + "PARTITIONED BY (pt) WITH ('bucket' = '1')"); + ServiceProxy proxy = launchQueryServer("DIMP"); + + proxy.write(GenericRow.of(1, BinaryString.fromString("a"), 100)); + proxy.write(GenericRow.of(1, BinaryString.fromString("b"), 200)); + proxy.write(GenericRow.of(2, BinaryString.fromString("a"), 300)); + + RemoteTableQuery query = new RemoteTableQuery(paimonTable("DIMP")); + + // lookup is called with (partition, bucket, trimmedKey); trimmedKey = (k STRING) + assertThat(query.lookup(row(1), 0, GenericRow.of(BinaryString.fromString("a")))) + .isNotNull() + .extracting(r -> r.getInt(2)) + .isEqualTo(100); + assertThat(query.lookup(row(1), 0, GenericRow.of(BinaryString.fromString("b")))) + .isNotNull() + .extracting(r -> r.getInt(2)) + .isEqualTo(200); + // same trimmed key ("a") in a different partition must resolve through the partition arg + assertThat(query.lookup(row(2), 0, GenericRow.of(BinaryString.fromString("a")))) + .isNotNull() + .extracting(r -> r.getInt(2)) + .isEqualTo(300); + assertThat(query.lookup(row(1), 0, GenericRow.of(BinaryString.fromString("z")))).isNull(); + + query.close(); + proxy.close(); + } + private JobClient queryService(FileStoreTable table) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); QueryService.build(env, table, 2); From fd412104271aaa780daf5f91fbd14d2ec27070cb Mon Sep 17 00:00:00 2001 From: Jordan Epstein Date: Sat, 6 Jun 2026 08:38:25 -0500 Subject: [PATCH 2/2] [flink] Fix RemoteTableQuery key serializer to use trimmed primary keys (#8145) RemoteTableQuery built its lookup-key serializer from the table's full primary keys (table.primaryKeys()), but the lookup contract (PrimaryKeyPartialLookupTable#get) passes lookup() the trimmed primary key (primary keys minus partition keys -- the native LSM key within a (partition, bucket)). For a partitioned primary-key table these differ, so the serializer reads the wrong fields: it interprets field 0 of the trimmed key as the first full-PK field. When the partition key and trimmed key happen to share a type this silently produces a usable key (the extra trailing field is ignored on the server side), which is why it went unnoticed for unpartitioned tables and same-type keys. When the types differ -- e.g. PRIMARY KEY (pt INT, k STRING) PARTITIONED BY (pt), where the trimmed key is (k STRING) -- it throws ClassCastException in InternalRowSerializer.toBinaryRow. The local executor (LocalTableQuery) consumes the trimmed key natively, so only the remote path is affected. Build keySerializer from schema().trimmedPrimaryKeys() instead, matching the key the caller actually provides. Unpartitioned tables are unaffected, since trimmed primary keys equal the full primary keys there. Add RemoteLookupJoinITCase#testLookupPartitionedRemoteTable covering a partitioned table with a differently-typed trimmed key; it fails with the ClassCastException before this change and passes after. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../java/org/apache/paimon/flink/RemoteLookupJoinITCase.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RemoteLookupJoinITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RemoteLookupJoinITCase.java index ceadb743b607..fca350da190e 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RemoteLookupJoinITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RemoteLookupJoinITCase.java @@ -192,7 +192,7 @@ public void testLookupPartitionedRemoteTable() throws Throwable { .isNotNull() .extracting(r -> r.getInt(2)) .isEqualTo(200); - // same trimmed key ("a") in a different partition must resolve through the partition arg + // same trimmed key ("a") in a different partition resolves through the partition arg assertThat(query.lookup(row(2), 0, GenericRow.of(BinaryString.fromString("a")))) .isNotNull() .extracting(r -> r.getInt(2))