Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,12 @@
@NotThreadSafe
class PrefixKeyLookuper extends AbstractLookuper implements Lookuper {

/** Extract bucket key from prefix lookup key row. */
/** Encode bucket key from prefix lookup key row. */
private final KeyEncoder bucketKeyEncoder;

/** Encode prefix key for prefix lookup (follows primary key encoding rules). */
private final KeyEncoder prefixKeyEncoder;

private final BucketingFunction bucketingFunction;
private final int numBuckets;

Expand All @@ -73,7 +76,24 @@ public PrefixKeyLookuper(
RowType lookupRowType = tableInfo.getRowType().project(lookupColumnNames);
DataLakeFormat lakeFormat = tableInfo.getTableConfig().getDataLakeFormat().orElse(null);

this.bucketKeyEncoder = KeyEncoder.of(lookupRowType, tableInfo.getBucketKeys(), lakeFormat);
// Use ofPrimaryKeyEncoder which follows primary key encoding rules
this.prefixKeyEncoder =
KeyEncoder.ofPrimaryKeyEncoder(
lookupRowType,
// bucket keys are prefix keys
tableInfo.getBucketKeys(),
tableInfo.getTableConfig(),
tableInfo.isDefaultBucketKey());

this.bucketKeyEncoder =
lakeFormat == null
// if lake format is null, don't need need to use lake format bucket
// encoder,
// we can just use the prefixKeyEncoder
? this.prefixKeyEncoder
: KeyEncoder.ofBucketKeyEncoder(
lookupRowType, tableInfo.getBucketKeys(), lakeFormat);

this.bucketingFunction = BucketingFunction.of(lakeFormat);
this.partitionGetter =
tableInfo.isPartitioned()
Expand Down Expand Up @@ -127,11 +147,24 @@ private void validatePrefixLookup(TableInfo tableInfo, List<String> lookupColumn
+ "because the lookup columns %s must contain all bucket keys %s in order.",
tableInfo.getTablePath(), lookupColumns, bucketKeys));
}

if (bucketKeys.equals(physicalPrimaryKeys)) {
throw new IllegalArgumentException(
String.format(
"Can not perform prefix lookup on table '%s', "
+ "because the lookup columns %s equals the physical primary keys %s. "
+ "Please use primary key lookup (Lookuper without lookupBy) instead.",
tableInfo.getTablePath(), lookupColumns, physicalPrimaryKeys));
}
}

@Override
public CompletableFuture<LookupResult> lookup(InternalRow prefixKey) {
byte[] bucketKeyBytes = bucketKeyEncoder.encodeKey(prefixKey);
byte[] prefixKeyBytes = prefixKeyEncoder.encodeKey(prefixKey);
byte[] bucketKeyBytes =
prefixKeyEncoder == bucketKeyEncoder
? prefixKeyBytes
: bucketKeyEncoder.encodeKey(prefixKey);
int bucketId = bucketingFunction.bucketing(bucketKeyBytes, numBuckets);

Long partitionId = null;
Expand All @@ -151,7 +184,7 @@ public CompletableFuture<LookupResult> lookup(InternalRow prefixKey) {
CompletableFuture<LookupResult> lookupFuture = new CompletableFuture<>();
TableBucket tableBucket = new TableBucket(tableInfo.getTableId(), partitionId, bucketId);
lookupClient
.prefixLookup(tableInfo.getTablePath(), tableBucket, bucketKeyBytes)
.prefixLookup(tableInfo.getTablePath(), tableBucket, prefixKeyBytes)
.whenComplete(
(result, error) -> {
if (error != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,18 @@ public PrimaryKeyLookuper(
// the row type of the input lookup row
RowType lookupRowType = tableInfo.getRowType().project(tableInfo.getPrimaryKeys());
DataLakeFormat lakeFormat = tableInfo.getTableConfig().getDataLakeFormat().orElse(null);

// the encoded primary key is the physical primary key
this.primaryKeyEncoder =
KeyEncoder.of(lookupRowType, tableInfo.getPhysicalPrimaryKeys(), lakeFormat);
KeyEncoder.ofPrimaryKeyEncoder(
lookupRowType,
tableInfo.getPhysicalPrimaryKeys(),
tableInfo.getTableConfig(),
tableInfo.isDefaultBucketKey());
this.bucketKeyEncoder =
tableInfo.isDefaultBucketKey()
? primaryKeyEncoder
: KeyEncoder.of(lookupRowType, tableInfo.getBucketKeys(), lakeFormat);
: KeyEncoder.ofBucketKeyEncoder(
lookupRowType, tableInfo.getBucketKeys(), lakeFormat);

this.bucketingFunction = BucketingFunction.of(lakeFormat);

this.partitionGetter =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import org.apache.fluss.client.write.WriteRecord;
import org.apache.fluss.client.write.WriterClient;
import org.apache.fluss.metadata.DataLakeFormat;
import org.apache.fluss.metadata.LogFormat;
import org.apache.fluss.metadata.PhysicalTablePath;
import org.apache.fluss.metadata.TableInfo;
Expand All @@ -32,7 +31,6 @@
import org.apache.fluss.row.encode.KeyEncoder;
import org.apache.fluss.row.indexed.IndexedRow;
import org.apache.fluss.types.DataType;
import org.apache.fluss.types.RowType;

import javax.annotation.Nullable;

Expand All @@ -57,9 +55,11 @@ class AppendWriterImpl extends AbstractTableWriter implements AppendWriter {
if (bucketKeys.isEmpty()) {
this.bucketKeyEncoder = null;
} else {
RowType rowType = tableInfo.getSchema().getRowType();
DataLakeFormat lakeFormat = tableInfo.getTableConfig().getDataLakeFormat().orElse(null);
this.bucketKeyEncoder = KeyEncoder.of(rowType, bucketKeys, lakeFormat);
this.bucketKeyEncoder =
KeyEncoder.ofBucketKeyEncoder(
tableInfo.getRowType(),
tableInfo.getBucketKeys(),
tableInfo.getTableConfig().getDataLakeFormat().orElse(null));
}

DataType[] fieldDataTypes =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.apache.fluss.client.write.WriteFormat;
import org.apache.fluss.client.write.WriteRecord;
import org.apache.fluss.client.write.WriterClient;
import org.apache.fluss.metadata.DataLakeFormat;
import org.apache.fluss.metadata.KvFormat;
import org.apache.fluss.metadata.TableInfo;
import org.apache.fluss.metadata.TablePath;
Expand Down Expand Up @@ -70,14 +69,20 @@ class UpsertWriterImpl extends AbstractTableWriter implements UpsertWriter {
partialUpdateColumns);

this.targetColumns = partialUpdateColumns;
DataLakeFormat lakeFormat = tableInfo.getTableConfig().getDataLakeFormat().orElse(null);
// encode primary key using physical primary key
this.primaryKeyEncoder =
KeyEncoder.of(rowType, tableInfo.getPhysicalPrimaryKeys(), lakeFormat);
KeyEncoder.ofPrimaryKeyEncoder(
tableInfo.getRowType(),
tableInfo.getPhysicalPrimaryKeys(),
tableInfo.getTableConfig(),
tableInfo.isDefaultBucketKey());
this.bucketKeyEncoder =
tableInfo.isDefaultBucketKey()
? primaryKeyEncoder
: KeyEncoder.of(rowType, tableInfo.getBucketKeys(), lakeFormat);
: KeyEncoder.ofBucketKeyEncoder(
tableInfo.getRowType(),
tableInfo.getBucketKeys(),
tableInfo.getTableConfig().getDataLakeFormat().orElse(null));

this.kvFormat = tableInfo.getTableConfig().getKvFormat();
this.writeFormat = WriteFormat.fromKvFormat(this.kvFormat);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.fluss.config.ConfigOptions.CURRENT_KV_FORMAT_VERSION;
import static org.apache.fluss.config.ConfigOptions.DATALAKE_FORMAT;
import static org.apache.fluss.config.ConfigOptions.TABLE_DATALAKE_ENABLED;
import static org.apache.fluss.config.ConfigOptions.TABLE_DATALAKE_FORMAT;
Expand Down Expand Up @@ -183,11 +184,14 @@ void testGetTableInfoAndSchema() throws Exception {
long timestampAfterCreate = System.currentTimeMillis();
TableInfo tableInfo = admin.getTableInfo(DEFAULT_TABLE_PATH).get();
assertThat(tableInfo.getSchemaId()).isEqualTo(schemaInfo.getSchemaId());
TableDescriptor tableDescriptor =
DEFAULT_TABLE_DESCRIPTOR.withReplicationFactor(3).withDataLakeFormat(PAIMON);
Map<String, String> options = new HashMap<>(tableDescriptor.getProperties());
options.put(
ConfigOptions.TABLE_KV_FORMAT_VERSION.key(),
String.valueOf(CURRENT_KV_FORMAT_VERSION));
assertThat(tableInfo.toTableDescriptor())
.isEqualTo(
DEFAULT_TABLE_DESCRIPTOR
.withReplicationFactor(3)
.withDataLakeFormat(PAIMON));
.isEqualTo(tableDescriptor.withProperties(options));
assertThat(schemaInfo2).isEqualTo(schemaInfo);
assertThat(tableInfo.getCreatedTime()).isEqualTo(tableInfo.getModifiedTime());
assertThat(tableInfo.getCreatedTime()).isLessThan(timestampAfterCreate);
Expand All @@ -208,11 +212,14 @@ void testGetTableInfoAndSchema() throws Exception {
tableInfo = admin.getTableInfo(tablePath).get();
timestampAfterCreate = System.currentTimeMillis();
assertThat(tableInfo.getSchemaId()).isEqualTo(schemaInfo.getSchemaId());
assertThat(tableInfo.toTableDescriptor())
.isEqualTo(
DEFAULT_TABLE_DESCRIPTOR
.withReplicationFactor(3)
.withDataLakeFormat(PAIMON));

TableDescriptor expected =
DEFAULT_TABLE_DESCRIPTOR.withReplicationFactor(3).withDataLakeFormat(PAIMON);
options = new HashMap<>(expected.getProperties());
options.put(
ConfigOptions.TABLE_KV_FORMAT_VERSION.key(),
String.valueOf(CURRENT_KV_FORMAT_VERSION));
assertThat(tableInfo.toTableDescriptor()).isEqualTo(expected.withProperties(options));
assertThat(schemaInfo2).isEqualTo(schemaInfo);
// assert created time
assertThat(tableInfo.getCreatedTime())
Expand Down Expand Up @@ -790,11 +797,13 @@ void testCreateTableWithInvalidReplicationFactor() throws Exception {
try (Connection conn = ConnectionFactory.createConnection(clientConf);
Admin admin = conn.getAdmin()) {
TableInfo tableInfo = admin.getTableInfo(DEFAULT_TABLE_PATH).get();
assertThat(tableInfo.toTableDescriptor())
.isEqualTo(
DEFAULT_TABLE_DESCRIPTOR
.withReplicationFactor(3)
.withDataLakeFormat(PAIMON));
TableDescriptor expected =
DEFAULT_TABLE_DESCRIPTOR.withReplicationFactor(3).withDataLakeFormat(PAIMON);
Map<String, String> options = new HashMap<>(expected.getProperties());
options.put(
ConfigOptions.TABLE_KV_FORMAT_VERSION.key(),
String.valueOf(CURRENT_KV_FORMAT_VERSION));
assertThat(tableInfo.toTableDescriptor()).isEqualTo(expected.withProperties(options));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ void testPartitionedTablePrefixLookup(boolean isDataLakeEnabled) throws Exceptio
.column("b", DataTypes.STRING())
.column("c", DataTypes.BIGINT())
.column("d", DataTypes.STRING())
.primaryKey("a", "b", "c")
.column("e", DataTypes.STRING())
.primaryKey("a", "b", "c", "d")
.build();
TableDescriptor descriptor =
TableDescriptor.builder()
Expand All @@ -139,8 +140,8 @@ void testPartitionedTablePrefixLookup(boolean isDataLakeEnabled) throws Exceptio

Table table = conn.getTable(tablePath);
for (String partition : partitionIdByNames.keySet()) {
verifyPutAndLookup(table, new Object[] {1, partition, 1L, "value1"});
verifyPutAndLookup(table, new Object[] {1, partition, 2L, "value2"});
verifyPutAndLookup(table, new Object[] {1, partition, 1L, "value1", "another_value1"});
verifyPutAndLookup(table, new Object[] {1, partition, 2L, "value2", "another_value2"});
}

for (int i = 0; i < 3; i++) {
Expand All @@ -164,7 +165,9 @@ void testPartitionedTablePrefixLookup(boolean isDataLakeEnabled) throws Exceptio
List<InternalRow> rowList = prefixLookupResult.getRowList();
assertThat(rowList.size()).isEqualTo(1);
assertRowValueEquals(
rowType, rowList.get(0), new Object[] {1, partition, 1L, "value1"});
rowType,
rowList.get(0),
new Object[] {1, partition, 1L, "value1", "another_value1"});
}
}
}
Expand Down
Loading