diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/query/RemoteTableQuery.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/query/RemoteTableQuery.java index 34d993eb3db2..fca626de1879 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/query/RemoteTableQuery.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/query/RemoteTableQuery.java @@ -54,7 +54,9 @@ public RemoteTableQuery(Table table) { ServiceManager manager = this.table.store().newServiceManager(); this.client = new KvQueryClient(new QueryLocationImpl(manager), 1); this.keySerializer = - InternalSerializers.create(TypeUtils.project(table.rowType(), table.primaryKeys())); + InternalSerializers.create( + TypeUtils.project( + this.table.rowType(), this.table.schema().trimmedPrimaryKeys())); } public static boolean isRemoteServiceAvailable(FileStoreTable table) { diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RemoteLookupJoinITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RemoteLookupJoinITCase.java index df9a7ed59be1..fca350da190e 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RemoteLookupJoinITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RemoteLookupJoinITCase.java @@ -18,6 +18,7 @@ package org.apache.paimon.flink; +import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalRow; import org.apache.paimon.disk.IOManager; @@ -165,6 +166,43 @@ public void testServiceFileCleaned() throws Exception { assertThat(serviceManager.service(PRIMARY_KEY_LOOKUP).isPresent()).isFalse(); } + @Test + public void testLookupPartitionedRemoteTable() throws Throwable { + // Partitioned primary-key table whose partition key and trimmed primary key have different + // types: primaryKeys = (pt INT, k STRING), partitionKeys = (pt), so the trimmed primary key + // handed to lookup is (k STRING). A key serializer built over the full primary key reads + // field 0 as INT and breaks on the STRING key. + sql( + "CREATE TABLE DIMP (pt INT, k STRING, v INT, PRIMARY KEY (pt, k) NOT ENFORCED) " + + "PARTITIONED BY (pt) WITH ('bucket' = '1')"); + ServiceProxy proxy = launchQueryServer("DIMP"); + + proxy.write(GenericRow.of(1, BinaryString.fromString("a"), 100)); + proxy.write(GenericRow.of(1, BinaryString.fromString("b"), 200)); + proxy.write(GenericRow.of(2, BinaryString.fromString("a"), 300)); + + RemoteTableQuery query = new RemoteTableQuery(paimonTable("DIMP")); + + // lookup is called with (partition, bucket, trimmedKey); trimmedKey = (k STRING) + assertThat(query.lookup(row(1), 0, GenericRow.of(BinaryString.fromString("a")))) + .isNotNull() + .extracting(r -> r.getInt(2)) + .isEqualTo(100); + assertThat(query.lookup(row(1), 0, GenericRow.of(BinaryString.fromString("b")))) + .isNotNull() + .extracting(r -> r.getInt(2)) + .isEqualTo(200); + // same trimmed key ("a") in a different partition resolves through the partition arg + assertThat(query.lookup(row(2), 0, GenericRow.of(BinaryString.fromString("a")))) + .isNotNull() + .extracting(r -> r.getInt(2)) + .isEqualTo(300); + assertThat(query.lookup(row(1), 0, GenericRow.of(BinaryString.fromString("z")))).isNull(); + + query.close(); + proxy.close(); + } + private JobClient queryService(FileStoreTable table) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); QueryService.build(env, table, 2);