Skip to content

Commit 1a1ca61

Browse files
committed
[lake/paimon] Remove system columns from Paimon
1 parent 2fc820d commit 1a1ca61

24 files changed

Lines changed: 440 additions & 386 deletions

File tree

fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,7 @@ class FlussAdminITCase extends ClientToServerITCaseBase {
124124
.comment("test table")
125125
.distributedBy(3, "id")
126126
.property(ConfigOptions.TABLE_LOG_TTL, Duration.ofDays(1))
127+
.property(ConfigOptions.TABLE_DATALAKE_STORAGE_VERSION, 2)
127128
.customProperty("connector", "fluss")
128129
.build();
129130

@@ -774,6 +775,14 @@ void testCreateTableWithInvalidReplicationFactor() throws Exception {
774775
try (Connection conn = ConnectionFactory.createConnection(clientConf);
775776
Admin admin = conn.getAdmin()) {
776777
TableInfo tableInfo = admin.getTableInfo(DEFAULT_TABLE_PATH).get();
778+
TableDescriptor expectedDescriptor =
779+
DEFAULT_TABLE_DESCRIPTOR.withReplicationFactor(3).withDataLakeFormat(PAIMON);
780+
Map<String, String> expectedProperties =
781+
new HashMap<>(expectedDescriptor.getProperties());
782+
expectedProperties.put(ConfigOptions.TABLE_DATALAKE_STORAGE_VERSION.key(), "2");
783+
expectedDescriptor = expectedDescriptor.withProperties(expectedProperties);
784+
785+
assertThat(tableInfo.toTableDescriptor()).isEqualTo(expectedDescriptor);
777786
assertThat(tableInfo.toTableDescriptor())
778787
.isEqualTo(
779788
DEFAULT_TABLE_DESCRIPTOR

fluss-client/src/test/java/org/apache/fluss/client/table/FlussTableITCase.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,10 @@
6767
import java.time.Duration;
6868
import java.util.ArrayList;
6969
import java.util.Collections;
70+
import java.util.HashMap;
7071
import java.util.Iterator;
7172
import java.util.List;
73+
import java.util.Map;
7274
import java.util.concurrent.CompletableFuture;
7375

7476
import static org.apache.fluss.client.table.scanner.batch.BatchScanUtils.collectRows;
@@ -103,6 +105,9 @@ void testGetDescriptor() throws Exception {
103105
DATA1_TABLE_DESCRIPTOR_PK
104106
.withReplicationFactor(3)
105107
.withDataLakeFormat(DataLakeFormat.PAIMON);
108+
Map<String, String> expectedProperties = new HashMap<>(expected.getProperties());
109+
expectedProperties.put(ConfigOptions.TABLE_DATALAKE_STORAGE_VERSION.key(), "2");
110+
expected = expected.withProperties(expectedProperties);
106111
assertThat(tableInfo.toTableDescriptor()).isEqualTo(expected);
107112
}
108113

fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1414,6 +1414,14 @@ public class ConfigOptions {
14141414
.withDescription(
14151415
"If true, snapshot expiration will be triggered automatically when tiering service commits to the datalake. It is disabled by default.");
14161416

1417+
public static final ConfigOption<Integer> TABLE_DATALAKE_STORAGE_VERSION =
1418+
key("table.datalake.storage-version")
1419+
.intType()
1420+
.noDefaultValue()
1421+
.withDescription(
1422+
"The storage version of the datalake table. This option is automatically set by Fluss server "
1423+
+ "and cannot be set by clients. Version 2 indicates a clean schema without system columns in datalake.");
1424+
14171425
public static final ConfigOption<MergeEngineType> TABLE_MERGE_ENGINE =
14181426
key("table.merge-engine")
14191427
.enumType(MergeEngineType.class)

fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,11 @@ public boolean isDataLakeAutoExpireSnapshot() {
106106
return config.get(ConfigOptions.TABLE_DATALAKE_AUTO_EXPIRE_SNAPSHOT);
107107
}
108108

109+
/** Gets the data lake storage version of the table. Returns empty if not set (legacy table). */
110+
public Optional<Integer> getDataLakeStorageVersion() {
111+
return config.getOptional(ConfigOptions.TABLE_DATALAKE_STORAGE_VERSION);
112+
}
113+
109114
/** Gets the optional merge engine type of the table. */
110115
public Optional<MergeEngineType> getMergeEngineType() {
111116
return config.getOptional(ConfigOptions.TABLE_MERGE_ENGINE);

fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/LakeCatalog.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@
3535
@PublicEvolving
3636
public interface LakeCatalog extends AutoCloseable {
3737

38+
Integer CURRENT_LAKE_STORAGE_VERSION = 2;
39+
3840
/**
3941
* Create a new table in lake.
4042
*

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java

Lines changed: 29 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -323,15 +323,23 @@ public boolean isBounded() {
323323
case TIMESTAMP:
324324
offsetsInitializer =
325325
OffsetsInitializer.timestamp(startupOptions.startupTimestampMs);
326-
if (hasPrimaryKey()) {
327-
// Currently, for primary key tables, we do not consider lake data
328-
// when reading from a given timestamp. This is because we will need
329-
// to read the change log of primary key table.
330-
// TODO: consider support it using paimon change log data?
331-
enableLakeSource = false;
332-
} else {
333-
if (enableLakeSource) {
334-
enableLakeSource = pushTimeStampFilterToLakeSource(lakeSource);
326+
if (lakeSource != null) {
327+
if (!tableOptions.containsKey(
328+
ConfigOptions.TABLE_DATALAKE_STORAGE_VERSION.key())) {
329+
enableLakeSource = handleLegacyV1LakeTable();
330+
} else {
331+
int lakeStorageVersion =
332+
Integer.parseInt(
333+
tableOptions.get(
334+
ConfigOptions.TABLE_DATALAKE_STORAGE_VERSION
335+
.key()));
336+
if (lakeStorageVersion < 2) {
337+
throw new IllegalArgumentException(
338+
"Unsupported lake storage version: " + lakeStorageVersion);
339+
} else {
340+
// todo: support map to partition in issue
341+
enableLakeSource = false;
342+
}
335343
}
336344
}
337345
break;
@@ -384,6 +392,18 @@ public boolean isBounded() {
384392
}
385393
}
386394

395+
private boolean handleLegacyV1LakeTable() {
396+
if (hasPrimaryKey()) {
397+
// Currently, for primary key tables, we do not consider lake data
398+
// when reading from a given timestamp. This is because we will need
399+
// to read the change log of primary key table.
400+
// TODO: consider support it using paimon change log data?
401+
return false;
402+
} else {
403+
return pushTimeStampFilterToLakeSource(lakeSource);
404+
}
405+
}
406+
387407
private boolean pushTimeStampFilterToLakeSource(LakeSource<?> lakeSource) {
388408
// will push timestamp to lake
389409
// we will have three additional system columns, __bucket, __offset, __timestamp

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -896,6 +896,7 @@ protected static void assertOptionsEqual(
896896
Map<String, String> actualOptions, Map<String, String> expectedOptions) {
897897
actualOptions.remove(ConfigOptions.BOOTSTRAP_SERVERS.key());
898898
actualOptions.remove(ConfigOptions.TABLE_REPLICATION_FACTOR.key());
899+
actualOptions.remove(ConfigOptions.TABLE_DATALAKE_STORAGE_VERSION.key());
899900
assertThat(actualOptions).isEqualTo(expectedOptions);
900901
}
901902
}

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/CatalogTableTestUtils.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626

2727
import static org.apache.fluss.config.ConfigOptions.BOOTSTRAP_SERVERS;
2828
import static org.apache.fluss.config.ConfigOptions.TABLE_DATALAKE_FORMAT;
29+
import static org.apache.fluss.config.ConfigOptions.TABLE_DATALAKE_STORAGE_VERSION;
2930
import static org.apache.fluss.config.ConfigOptions.TABLE_REPLICATION_FACTOR;
3031
import static org.assertj.core.api.Assertions.assertThat;
3132

@@ -140,6 +141,7 @@ private static void assertOptionsEqual(
140141
actualOptions.remove(TABLE_REPLICATION_FACTOR.key());
141142
// Remove datalake format (auto-added when datalake is enabled in Fluss cluster)
142143
actualOptions.remove(TABLE_DATALAKE_FORMAT.key());
144+
actualOptions.remove(TABLE_DATALAKE_STORAGE_VERSION.key());
143145
assertThat(actualOptions).isEqualTo(expectedOptions);
144146
}
145147
}

fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,9 @@ public void createTable(TablePath tablePath, TableDescriptor tableDescriptor, Co
112112
public void alterTable(TablePath tablePath, List<TableChange> tableChanges, Context context)
113113
throws TableNotExistException {
114114
try {
115-
List<SchemaChange> paimonSchemaChanges = toPaimonSchemaChanges(tableChanges);
115+
Table paimonTable = getTable(tablePath);
116+
List<SchemaChange> paimonSchemaChanges =
117+
toPaimonSchemaChanges(paimonTable, tableChanges);
116118

117119
// Compare current Paimon table schema with expected target schema before altering
118120
if (shouldAlterTable(tablePath, tableChanges)) {
@@ -157,6 +159,14 @@ private boolean shouldAlterTable(TablePath tablePath, List<TableChange> tableCha
157159
}
158160
}
159161

162+
private Table getTable(TablePath tablePath) throws TableNotExistException {
163+
try {
164+
return paimonCatalog.getTable(toPaimon(tablePath));
165+
} catch (Catalog.TableNotExistException e) {
166+
throw new TableNotExistException("Table " + tablePath + " does not exist.");
167+
}
168+
}
169+
160170
private boolean isColumnAlreadyExists(Schema currentSchema, TableChange.AddColumn addColumn) {
161171
String columnName = addColumn.getName();
162172

fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/source/PaimonRecordReader.java

Lines changed: 21 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -94,10 +94,12 @@ private ReadBuilder applyProject(
9494
int timestampFieldPos = paimonFullRowType.getFieldIndex(TIMESTAMP_COLUMN_NAME);
9595

9696
int[] paimonProject =
97-
IntStream.concat(
98-
IntStream.of(projectIds),
99-
IntStream.of(offsetFieldPos, timestampFieldPos))
100-
.toArray();
97+
hasSystemColumn(paimonFullRowType)
98+
? IntStream.concat(
99+
IntStream.of(projectIds),
100+
IntStream.of(offsetFieldPos, timestampFieldPos))
101+
.toArray()
102+
: projectIds;
101103

102104
return readBuilder.withProjection(paimonProject);
103105
}
@@ -110,17 +112,15 @@ public static class PaimonRowAsFlussRecordIterator implements CloseableIterator<
110112
private final ProjectedRow projectedRow;
111113
private final PaimonRowAsFlussRow paimonRowAsFlussRow;
112114

113-
private final int logOffsetColIndex;
114-
private final int timestampColIndex;
115-
116115
public PaimonRowAsFlussRecordIterator(
117116
org.apache.paimon.utils.CloseableIterator<InternalRow> paimonRowIterator,
118117
RowType paimonRowType) {
119118
this.paimonRowIterator = paimonRowIterator;
120-
this.logOffsetColIndex = paimonRowType.getFieldIndex(OFFSET_COLUMN_NAME);
121-
this.timestampColIndex = paimonRowType.getFieldIndex(TIMESTAMP_COLUMN_NAME);
122119

123-
int[] project = IntStream.range(0, paimonRowType.getFieldCount() - 2).toArray();
120+
int[] project =
121+
hasSystemColumn(paimonRowType)
122+
? IntStream.range(0, paimonRowType.getFieldCount() - 2).toArray()
123+
: IntStream.range(0, paimonRowType.getFieldCount()).toArray();
124124
projectedRow = ProjectedRow.from(project);
125125
paimonRowAsFlussRow = new PaimonRowAsFlussRow();
126126
}
@@ -143,14 +143,21 @@ public boolean hasNext() {
143143
public LogRecord next() {
144144
InternalRow paimonRow = paimonRowIterator.next();
145145
ChangeType changeType = toChangeType(paimonRow.getRowKind());
146-
long offset = paimonRow.getLong(logOffsetColIndex);
147-
long timestamp = paimonRow.getTimestamp(timestampColIndex, 6).getMillisecond();
148146

149147
return new GenericRecord(
150-
offset,
151-
timestamp,
148+
-1L,
149+
-1L,
152150
changeType,
153151
projectedRow.replaceRow(paimonRowAsFlussRow.replaceRow(paimonRow)));
154152
}
155153
}
154+
155+
// for legacy table, we will have system column
156+
private static boolean hasSystemColumn(RowType paimonRowType) {
157+
return paimonRowType
158+
.getFields()
159+
.get(paimonRowType.getFieldCount() - 1)
160+
.name()
161+
.equals(TIMESTAMP_COLUMN_NAME);
162+
}
156163
}

0 commit comments

Comments
 (0)