Skip to content

Commit a83dbe7

Browse files
committed
lake fix
1 parent 86a283a commit a83dbe7

29 files changed

Lines changed: 625 additions & 102 deletions

File tree

fluss-client/src/main/java/org/apache/fluss/client/lookup/PrefixKeyLookuper.java

Lines changed: 36 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,12 @@
4848
@NotThreadSafe
4949
class PrefixKeyLookuper extends AbstractLookuper implements Lookuper {
5050

51-
/** Extract bucket key from prefix lookup key row. */
51+
/** Encode bucket key from prefix lookup key row. */
5252
private final KeyEncoder bucketKeyEncoder;
5353

54+
/** Encode prefix key for prefix lookup (follows primary key encoding rules). */
55+
private final KeyEncoder prefixKeyEncoder;
56+
5457
private final BucketingFunction bucketingFunction;
5558
private final int numBuckets;
5659

@@ -73,7 +76,23 @@ public PrefixKeyLookuper(
7376
RowType lookupRowType = tableInfo.getRowType().project(lookupColumnNames);
7477
DataLakeFormat lakeFormat = tableInfo.getTableConfig().getDataLakeFormat().orElse(null);
7578

76-
this.bucketKeyEncoder = KeyEncoder.of(lookupRowType, tableInfo.getBucketKeys(), lakeFormat);
79+
// Use ofPrimaryKeyEncoder which follows primary key encoding rules
80+
this.prefixKeyEncoder =
81+
KeyEncoder.ofPrimaryKeyEncoder(
82+
lookupRowType,
83+
tableInfo.getBucketKeys(),
84+
tableInfo.getTableConfig(),
85+
tableInfo.isDefaultBucketKey());
86+
87+
this.bucketKeyEncoder =
88+
lakeFormat == null
89+
// if lake format is null, don't need need to use lake format bucket
90+
// encoder,
91+
// we can just use the prefixKeyEncoder
92+
? this.prefixKeyEncoder
93+
: KeyEncoder.ofBucketKeyEncoder(
94+
lookupRowType, tableInfo.getBucketKeys(), lakeFormat);
95+
7796
this.bucketingFunction = BucketingFunction.of(lakeFormat);
7897
this.partitionGetter =
7998
tableInfo.isPartitioned()
@@ -127,11 +146,24 @@ private void validatePrefixLookup(TableInfo tableInfo, List<String> lookupColumn
127146
+ "because the lookup columns %s must contain all bucket keys %s in order.",
128147
tableInfo.getTablePath(), lookupColumns, bucketKeys));
129148
}
149+
150+
if (bucketKeys.equals(physicalPrimaryKeys)) {
151+
throw new IllegalArgumentException(
152+
String.format(
153+
"Can not perform prefix lookup on table '%s', "
154+
+ "because the lookup columns %s equals the physical primary keys %s. "
155+
+ "Please use primary key lookup (Lookuper without lookupBy) instead.",
156+
tableInfo.getTablePath(), lookupColumns, physicalPrimaryKeys));
157+
}
130158
}
131159

132160
@Override
133161
public CompletableFuture<LookupResult> lookup(InternalRow prefixKey) {
134-
byte[] bucketKeyBytes = bucketKeyEncoder.encodeKey(prefixKey);
162+
byte[] prefixKeyBytes = prefixKeyEncoder.encodeKey(prefixKey);
163+
byte[] bucketKeyBytes =
164+
prefixKeyEncoder == bucketKeyEncoder
165+
? prefixKeyBytes
166+
: bucketKeyEncoder.encodeKey(prefixKey);
135167
int bucketId = bucketingFunction.bucketing(bucketKeyBytes, numBuckets);
136168

137169
Long partitionId = null;
@@ -151,7 +183,7 @@ public CompletableFuture<LookupResult> lookup(InternalRow prefixKey) {
151183
CompletableFuture<LookupResult> lookupFuture = new CompletableFuture<>();
152184
TableBucket tableBucket = new TableBucket(tableInfo.getTableId(), partitionId, bucketId);
153185
lookupClient
154-
.prefixLookup(tableInfo.getTablePath(), tableBucket, bucketKeyBytes)
186+
.prefixLookup(tableInfo.getTablePath(), tableBucket, prefixKeyBytes)
155187
.whenComplete(
156188
(result, error) -> {
157189
if (error != null) {

fluss-client/src/main/java/org/apache/fluss/client/lookup/PrimaryKeyLookuper.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -70,15 +70,21 @@ public PrimaryKeyLookuper(
7070

7171
// the row type of the input lookup row
7272
RowType lookupRowType = tableInfo.getRowType().project(tableInfo.getPrimaryKeys());
73-
DataLakeFormat lakeFormat = tableInfo.getTableConfig().getDataLakeFormat().orElse(null);
74-
75-
// the encoded primary key is the physical primary key
7673
this.primaryKeyEncoder =
77-
KeyEncoder.of(lookupRowType, tableInfo.getPhysicalPrimaryKeys(), lakeFormat);
74+
KeyEncoder.ofPrimaryKeyEncoder(
75+
lookupRowType,
76+
tableInfo.getPhysicalPrimaryKeys(),
77+
tableInfo.getTableConfig(),
78+
tableInfo.isDefaultBucketKey());
7879
this.bucketKeyEncoder =
7980
tableInfo.isDefaultBucketKey()
8081
? primaryKeyEncoder
81-
: KeyEncoder.of(lookupRowType, tableInfo.getBucketKeys(), lakeFormat);
82+
: KeyEncoder.ofBucketKeyEncoder(
83+
lookupRowType,
84+
tableInfo.getBucketKeys(),
85+
tableInfo.getTableConfig().getDataLakeFormat().orElse(null));
86+
87+
DataLakeFormat lakeFormat = tableInfo.getTableConfig().getDataLakeFormat().orElse(null);
8288
this.bucketingFunction = BucketingFunction.of(lakeFormat);
8389

8490
this.partitionGetter =

fluss-client/src/main/java/org/apache/fluss/client/table/writer/AppendWriterImpl.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
import org.apache.fluss.client.write.WriteRecord;
2121
import org.apache.fluss.client.write.WriterClient;
22-
import org.apache.fluss.metadata.DataLakeFormat;
2322
import org.apache.fluss.metadata.LogFormat;
2423
import org.apache.fluss.metadata.PhysicalTablePath;
2524
import org.apache.fluss.metadata.TableInfo;
@@ -32,7 +31,6 @@
3231
import org.apache.fluss.row.encode.KeyEncoder;
3332
import org.apache.fluss.row.indexed.IndexedRow;
3433
import org.apache.fluss.types.DataType;
35-
import org.apache.fluss.types.RowType;
3634

3735
import javax.annotation.Nullable;
3836

@@ -57,9 +55,11 @@ class AppendWriterImpl extends AbstractTableWriter implements AppendWriter {
5755
if (bucketKeys.isEmpty()) {
5856
this.bucketKeyEncoder = null;
5957
} else {
60-
RowType rowType = tableInfo.getSchema().getRowType();
61-
DataLakeFormat lakeFormat = tableInfo.getTableConfig().getDataLakeFormat().orElse(null);
62-
this.bucketKeyEncoder = KeyEncoder.of(rowType, bucketKeys, lakeFormat);
58+
this.bucketKeyEncoder =
59+
KeyEncoder.ofBucketKeyEncoder(
60+
tableInfo.getRowType(),
61+
tableInfo.getBucketKeys(),
62+
tableInfo.getTableConfig().getDataLakeFormat().orElse(null));
6363
}
6464

6565
DataType[] fieldDataTypes =

fluss-client/src/main/java/org/apache/fluss/client/table/writer/UpsertWriterImpl.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import org.apache.fluss.client.write.WriteFormat;
2121
import org.apache.fluss.client.write.WriteRecord;
2222
import org.apache.fluss.client.write.WriterClient;
23-
import org.apache.fluss.metadata.DataLakeFormat;
2423
import org.apache.fluss.metadata.KvFormat;
2524
import org.apache.fluss.metadata.TableInfo;
2625
import org.apache.fluss.metadata.TablePath;
@@ -70,14 +69,19 @@ class UpsertWriterImpl extends AbstractTableWriter implements UpsertWriter {
7069
partialUpdateColumns);
7170

7271
this.targetColumns = partialUpdateColumns;
73-
DataLakeFormat lakeFormat = tableInfo.getTableConfig().getDataLakeFormat().orElse(null);
74-
// encode primary key using physical primary key
7572
this.primaryKeyEncoder =
76-
KeyEncoder.of(rowType, tableInfo.getPhysicalPrimaryKeys(), lakeFormat);
73+
KeyEncoder.ofPrimaryKeyEncoder(
74+
tableInfo.getRowType(),
75+
tableInfo.getPhysicalPrimaryKeys(),
76+
tableInfo.getTableConfig(),
77+
tableInfo.isDefaultBucketKey());
7778
this.bucketKeyEncoder =
7879
tableInfo.isDefaultBucketKey()
7980
? primaryKeyEncoder
80-
: KeyEncoder.of(rowType, tableInfo.getBucketKeys(), lakeFormat);
81+
: KeyEncoder.ofBucketKeyEncoder(
82+
tableInfo.getRowType(),
83+
tableInfo.getBucketKeys(),
84+
tableInfo.getTableConfig().getDataLakeFormat().orElse(null));
8185

8286
this.kvFormat = tableInfo.getTableConfig().getKvFormat();
8387
this.writeFormat = WriteFormat.fromKvFormat(this.kvFormat);

fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -73,12 +73,10 @@
7373
import org.apache.fluss.server.zk.data.ServerTags;
7474
import org.apache.fluss.types.DataTypeChecks;
7575
import org.apache.fluss.types.DataTypes;
76-
7776
import org.junit.jupiter.api.BeforeEach;
7877
import org.junit.jupiter.api.Test;
7978

8079
import javax.annotation.Nullable;
81-
8280
import java.time.Duration;
8381
import java.time.LocalDate;
8482
import java.util.ArrayList;
@@ -183,11 +181,12 @@ void testGetTableInfoAndSchema() throws Exception {
183181
long timestampAfterCreate = System.currentTimeMillis();
184182
TableInfo tableInfo = admin.getTableInfo(DEFAULT_TABLE_PATH).get();
185183
assertThat(tableInfo.getSchemaId()).isEqualTo(schemaInfo.getSchemaId());
184+
TableDescriptor tableDescriptor =
185+
DEFAULT_TABLE_DESCRIPTOR.withReplicationFactor(3).withDataLakeFormat(PAIMON);
186+
Map<String, String> options = new HashMap<>(tableDescriptor.getProperties());
187+
options.put(ConfigOptions.TABLE_KV_FORMAT_VERSION.key(), "2");
186188
assertThat(tableInfo.toTableDescriptor())
187-
.isEqualTo(
188-
DEFAULT_TABLE_DESCRIPTOR
189-
.withReplicationFactor(3)
190-
.withDataLakeFormat(PAIMON));
189+
.isEqualTo(tableDescriptor.withProperties(options));
191190
assertThat(schemaInfo2).isEqualTo(schemaInfo);
192191
assertThat(tableInfo.getCreatedTime()).isEqualTo(tableInfo.getModifiedTime());
193192
assertThat(tableInfo.getCreatedTime()).isLessThan(timestampAfterCreate);
@@ -208,11 +207,12 @@ void testGetTableInfoAndSchema() throws Exception {
208207
tableInfo = admin.getTableInfo(tablePath).get();
209208
timestampAfterCreate = System.currentTimeMillis();
210209
assertThat(tableInfo.getSchemaId()).isEqualTo(schemaInfo.getSchemaId());
211-
assertThat(tableInfo.toTableDescriptor())
212-
.isEqualTo(
213-
DEFAULT_TABLE_DESCRIPTOR
214-
.withReplicationFactor(3)
215-
.withDataLakeFormat(PAIMON));
210+
211+
TableDescriptor expected =
212+
DEFAULT_TABLE_DESCRIPTOR.withReplicationFactor(3).withDataLakeFormat(PAIMON);
213+
options = new HashMap<>(expected.getProperties());
214+
options.put(ConfigOptions.TABLE_KV_FORMAT_VERSION.key(), "2");
215+
assertThat(tableInfo.toTableDescriptor()).isEqualTo(expected.withProperties(options));
216216
assertThat(schemaInfo2).isEqualTo(schemaInfo);
217217
// assert created time
218218
assertThat(tableInfo.getCreatedTime())
@@ -774,11 +774,11 @@ void testCreateTableWithInvalidReplicationFactor() throws Exception {
774774
try (Connection conn = ConnectionFactory.createConnection(clientConf);
775775
Admin admin = conn.getAdmin()) {
776776
TableInfo tableInfo = admin.getTableInfo(DEFAULT_TABLE_PATH).get();
777-
assertThat(tableInfo.toTableDescriptor())
778-
.isEqualTo(
779-
DEFAULT_TABLE_DESCRIPTOR
780-
.withReplicationFactor(3)
781-
.withDataLakeFormat(PAIMON));
777+
TableDescriptor expected =
778+
DEFAULT_TABLE_DESCRIPTOR.withReplicationFactor(3).withDataLakeFormat(PAIMON);
779+
Map<String, String> options = new HashMap<>(expected.getProperties());
780+
options.put(ConfigOptions.TABLE_KV_FORMAT_VERSION.key(), "2");
781+
assertThat(tableInfo.toTableDescriptor()).isEqualTo(expected.withProperties(options));
782782
}
783783
}
784784

fluss-client/src/test/java/org/apache/fluss/client/table/AutoPartitionedTableITCase.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,8 @@ void testPartitionedTablePrefixLookup(boolean isDataLakeEnabled) throws Exceptio
118118
.column("b", DataTypes.STRING())
119119
.column("c", DataTypes.BIGINT())
120120
.column("d", DataTypes.STRING())
121-
.primaryKey("a", "b", "c")
121+
.column("e", DataTypes.STRING())
122+
.primaryKey("a", "b", "c", "d")
122123
.build();
123124
TableDescriptor descriptor =
124125
TableDescriptor.builder()
@@ -139,8 +140,8 @@ void testPartitionedTablePrefixLookup(boolean isDataLakeEnabled) throws Exceptio
139140

140141
Table table = conn.getTable(tablePath);
141142
for (String partition : partitionIdByNames.keySet()) {
142-
verifyPutAndLookup(table, new Object[] {1, partition, 1L, "value1"});
143-
verifyPutAndLookup(table, new Object[] {1, partition, 2L, "value2"});
143+
verifyPutAndLookup(table, new Object[] {1, partition, 1L, "value1", "another_value1"});
144+
verifyPutAndLookup(table, new Object[] {1, partition, 2L, "value2", "another_value2"});
144145
}
145146

146147
for (int i = 0; i < 3; i++) {
@@ -164,7 +165,9 @@ void testPartitionedTablePrefixLookup(boolean isDataLakeEnabled) throws Exceptio
164165
List<InternalRow> rowList = prefixLookupResult.getRowList();
165166
assertThat(rowList.size()).isEqualTo(1);
166167
assertRowValueEquals(
167-
rowType, rowList.get(0), new Object[] {1, partition, 1L, "value1"});
168+
rowType,
169+
rowList.get(0),
170+
new Object[] {1, partition, 1L, "value1", "another_value1"});
168171
}
169172
}
170173
}

0 commit comments

Comments
 (0)