Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ public RemoteTableQuery(Table table) {
ServiceManager manager = this.table.store().newServiceManager();
this.client = new KvQueryClient(new QueryLocationImpl(manager), 1);
this.keySerializer =
InternalSerializers.create(TypeUtils.project(table.rowType(), table.primaryKeys()));
InternalSerializers.create(
TypeUtils.project(
this.table.rowType(), this.table.schema().trimmedPrimaryKeys()));
}

public static boolean isRemoteServiceAvailable(FileStoreTable table) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.paimon.flink;

import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManager;
Expand Down Expand Up @@ -165,6 +166,43 @@ public void testServiceFileCleaned() throws Exception {
assertThat(serviceManager.service(PRIMARY_KEY_LOOKUP).isPresent()).isFalse();
}

@Test
public void testLookupPartitionedRemoteTable() throws Throwable {
// Partitioned primary-key table whose partition key and trimmed primary key have different
// types: primaryKeys = (pt INT, k STRING), partitionKeys = (pt), so the trimmed primary key
// handed to lookup is (k STRING). A key serializer built over the full primary key reads
// field 0 as INT and breaks on the STRING key.
sql(
"CREATE TABLE DIMP (pt INT, k STRING, v INT, PRIMARY KEY (pt, k) NOT ENFORCED) "
+ "PARTITIONED BY (pt) WITH ('bucket' = '1')");
ServiceProxy proxy = launchQueryServer("DIMP");

proxy.write(GenericRow.of(1, BinaryString.fromString("a"), 100));
proxy.write(GenericRow.of(1, BinaryString.fromString("b"), 200));
proxy.write(GenericRow.of(2, BinaryString.fromString("a"), 300));

RemoteTableQuery query = new RemoteTableQuery(paimonTable("DIMP"));

// lookup is called with (partition, bucket, trimmedKey); trimmedKey = (k STRING)
assertThat(query.lookup(row(1), 0, GenericRow.of(BinaryString.fromString("a"))))
.isNotNull()
.extracting(r -> r.getInt(2))
.isEqualTo(100);
assertThat(query.lookup(row(1), 0, GenericRow.of(BinaryString.fromString("b"))))
.isNotNull()
.extracting(r -> r.getInt(2))
.isEqualTo(200);
// same trimmed key ("a") in a different partition resolves through the partition arg
assertThat(query.lookup(row(2), 0, GenericRow.of(BinaryString.fromString("a"))))
.isNotNull()
.extracting(r -> r.getInt(2))
.isEqualTo(300);
assertThat(query.lookup(row(1), 0, GenericRow.of(BinaryString.fromString("z")))).isNull();

query.close();
proxy.close();
}

private JobClient queryService(FileStoreTable table) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
QueryService.build(env, table, 2);
Expand Down