From 28db0d17431490ae65dcfca27cc1ce78c0372a0f Mon Sep 17 00:00:00 2001 From: vamossagar12 Date: Mon, 16 Mar 2026 15:47:43 +0530 Subject: [PATCH 1/3] fluss-2213: Adding snapshot expiration support for lake tables in Iceberg --- .../iceberg/tiering/IcebergLakeCommitter.java | 40 +++++- .../tiering/IcebergLakeTieringFactory.java | 2 +- .../iceberg/tiering/IcebergTieringTest.java | 129 +++++++++++++++++- 3 files changed, 164 insertions(+), 7 deletions(-) diff --git a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeCommitter.java b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeCommitter.java index 06b572760a..71d5f0ab47 100644 --- a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeCommitter.java +++ b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeCommitter.java @@ -17,7 +17,9 @@ package org.apache.fluss.lake.iceberg.tiering; +import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.lake.committer.CommittedLakeSnapshot; +import org.apache.fluss.lake.committer.CommitterInitContext; import org.apache.fluss.lake.committer.LakeCommitResult; import org.apache.fluss.lake.committer.LakeCommitter; import org.apache.fluss.lake.iceberg.maintenance.RewriteDataFileResult; @@ -28,6 +30,7 @@ import org.apache.iceberg.ContentFile; import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.ExpireSnapshots; import org.apache.iceberg.RewriteFiles; import org.apache.iceberg.RowDelta; import org.apache.iceberg.Snapshot; @@ -61,12 +64,23 @@ public class IcebergLakeCommitter implements LakeCommitter currentCommitSnapshotId = new ThreadLocal<>(); - public IcebergLakeCommitter(IcebergCatalogProvider icebergCatalogProvider, TablePath tablePath) + public IcebergLakeCommitter( + IcebergCatalogProvider icebergCatalogProvider, + CommitterInitContext committerInitContext) throws IOException { this.icebergCatalog = icebergCatalogProvider.get(); - this.icebergTable = getTable(tablePath); + this.icebergTable = getTable(committerInitContext.tablePath()); + this.isAutoSnapshotExpiration = + committerInitContext + .tableInfo() + .getTableConfig() + .isDataLakeAutoExpireSnapshot() + || committerInitContext + .lakeTieringConfig() + .get(ConfigOptions.LAKE_TIERING_AUTO_EXPIRE_SNAPSHOT); // register iceberg listener Listeners.register(new IcebergSnapshotCreateListener(), CreateSnapshotEvent.class); } @@ -141,6 +155,12 @@ public LakeCommitResult commit( snapshotId = rewriteCommitSnapshotId; } } + + // Expire old snapshots if auto-expire-snapshot is enabled + if (isAutoSnapshotExpiration) { + expireSnapshots(); + } + // Iceberg does not provide cumulative table stats API yet; leave stats as -1 (unknown). return LakeCommitResult.committedIsReadable(snapshotId); } catch (Exception e) { @@ -262,6 +282,22 @@ public void close() throws Exception { } } + /** + * Expires old snapshots from the Iceberg table. Uses Iceberg's built-in snapshot expiration + * which respects table properties like {@code history.expire.max-snapshot-age-ms} and {@code + * history.expire.min-snapshots-to-keep}. + */ + private void expireSnapshots() { + try { + ExpireSnapshots expireSnapshots = + icebergTable.expireSnapshots().cleanExpiredFiles(true); + expireSnapshots.commit(); + LOG.debug("Successfully expired old snapshots for Iceberg table."); + } catch (Exception e) { + LOG.warn("Failed to expire snapshots for Iceberg table, will retry on next commit.", e); + } + } + private Table getTable(TablePath tablePath) throws IOException { try { TableIdentifier tableId = toIceberg(tablePath); diff --git a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeTieringFactory.java b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeTieringFactory.java index 9f55104796..f8bafabdad 100644 --- a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeTieringFactory.java +++ b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeTieringFactory.java @@ -53,7 +53,7 @@ public SimpleVersionedSerializer getWriteResultSerializer() @Override public LakeCommitter createLakeCommitter( CommitterInitContext committerInitContext) throws IOException { - return new IcebergLakeCommitter(icebergCatalogProvider, committerInitContext.tablePath()); + return new IcebergLakeCommitter(icebergCatalogProvider, committerInitContext); } @Override diff --git a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergTieringTest.java b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergTieringTest.java index eb24994cc4..2f5f627426 100644 --- a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergTieringTest.java +++ b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergTieringTest.java @@ -39,6 +39,7 @@ import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.SupportsNamespaces; @@ -110,6 +111,15 @@ private static Stream tieringWriteArgs() { Arguments.of(false, false)); } + private static Stream snapshotExpireArgs() { + return Stream.of( + // isTableAutoExpireSnapshot, isLakeTieringExpireSnapshot + Arguments.of(true, true), + Arguments.of(true, false), + Arguments.of(false, true), + Arguments.of(false, false)); + } + @ParameterizedTest @MethodSource("tieringWriteArgs") void testTieringWriteTable(boolean isPrimaryKeyTable, boolean isPartitionedTable) @@ -186,7 +196,7 @@ void testTieringWriteTable(boolean isPrimaryKeyTable, boolean isPartitionedTable // second, commit data try (LakeCommitter lakeCommitter = - createLakeCommitter(tablePath, tableInfo)) { + createLakeCommitter(tablePath, tableInfo, new Configuration())) { // serialize/deserialize committable IcebergCommittable icebergCommittable = lakeCommitter.toCommittable(icebergWriteResults); @@ -249,7 +259,8 @@ public TableInfo tableInfo() { } private LakeCommitter createLakeCommitter( - TablePath tablePath, TableInfo tableInfo) throws IOException { + TablePath tablePath, TableInfo tableInfo, Configuration lakeTieringConfig) + throws IOException { return icebergLakeTieringFactory.createLakeCommitter( new CommitterInitContext() { @Override @@ -264,7 +275,7 @@ public TableInfo tableInfo() { @Override public Configuration lakeTieringConfig() { - return new Configuration(); + return lakeTieringConfig; } @Override @@ -274,6 +285,108 @@ public Configuration flussClientConfig() { }); } + @ParameterizedTest + @MethodSource("snapshotExpireArgs") + void testSnapshotExpiration( + boolean isTableAutoExpireSnapshot, boolean isLakeTieringExpireSnapshot) + throws Exception { + int bucketNum = 3; + TablePath tablePath = + TablePath.of( + "iceberg", + String.format( + "test_snapshot_expire_%s_%s", + isTableAutoExpireSnapshot, isLakeTieringExpireSnapshot)); + + // Create Iceberg table with snapshot retention properties + Map tableProperties = new HashMap<>(); + tableProperties.put(TableProperties.MIN_SNAPSHOTS_TO_KEEP, "1"); + tableProperties.put(TableProperties.MAX_SNAPSHOT_AGE_MS, "1"); + createTable(tablePath, false, false, tableProperties); + + TableDescriptor descriptor = + TableDescriptor.builder() + .schema( + org.apache.fluss.metadata.Schema.newBuilder() + .column("c1", DataTypes.INT()) + .column("c2", DataTypes.STRING()) + .column("c3", DataTypes.STRING()) + .build()) + .distributedBy(bucketNum) + .property(ConfigOptions.TABLE_DATALAKE_ENABLED, true) + .property( + ConfigOptions.TABLE_DATALAKE_AUTO_EXPIRE_SNAPSHOT, + isTableAutoExpireSnapshot) + .build(); + TableInfo tableInfo = TableInfo.of(tablePath, 0, 1, descriptor, 1L, 1L); + + Configuration lakeTieringConfig = new Configuration(); + lakeTieringConfig.set( + ConfigOptions.LAKE_TIERING_AUTO_EXPIRE_SNAPSHOT, isLakeTieringExpireSnapshot); + + // Write data multiple times to generate snapshots + for (int round = 0; round < 5; round++) { + writeData(tablePath, tableInfo, lakeTieringConfig, bucketNum); + // Small delay to ensure snapshots are older than MAX_SNAPSHOT_AGE_MS=1ms + Thread.sleep(10); + } + + // Verify snapshot count + Table icebergTable = icebergCatalog.loadTable(toIceberg(tablePath)); + int snapshotCount = 0; + for (Snapshot ignored : icebergTable.snapshots()) { + snapshotCount++; + } + + if (isTableAutoExpireSnapshot || isLakeTieringExpireSnapshot) { + // if auto snapshot expiration is enabled, old snapshots should be expired + // With MIN_SNAPSHOTS_TO_KEEP=1, only 1 snapshot should be retained + assertThat(snapshotCount).isEqualTo(1); + } else { + // if auto snapshot expiration is disabled, all snapshots should be retained + assertThat(snapshotCount).isEqualTo(5); + } + } + + private void writeData( + TablePath tablePath, + TableInfo tableInfo, + Configuration lakeTieringConfig, + int bucketNum) + throws Exception { + List icebergWriteResults = new ArrayList<>(); + SimpleVersionedSerializer writeResultSerializer = + icebergLakeTieringFactory.getWriteResultSerializer(); + SimpleVersionedSerializer committableSerializer = + icebergLakeTieringFactory.getCommittableSerializer(); + + for (int bucket = 0; bucket < bucketNum; bucket++) { + try (LakeWriter writer = + createLakeWriter(tablePath, bucket, null, null, tableInfo)) { + Tuple2, List> writeAndExpectRecords = + genLogTableRecords(null, bucket, 3); + for (LogRecord record : writeAndExpectRecords.f0) { + writer.write(record); + } + IcebergWriteResult result = writer.complete(); + byte[] serialized = writeResultSerializer.serialize(result); + icebergWriteResults.add( + writeResultSerializer.deserialize( + writeResultSerializer.getVersion(), serialized)); + } + } + + try (LakeCommitter lakeCommitter = + createLakeCommitter(tablePath, tableInfo, lakeTieringConfig)) { + IcebergCommittable committable = lakeCommitter.toCommittable(icebergWriteResults); + byte[] serialized = committableSerializer.serialize(committable); + committable = + committableSerializer.deserialize( + committableSerializer.getVersion(), serialized); + lakeCommitter.commit(committable, Collections.emptyMap()); + } + } + private Tuple2, List> genLogTableRecords( @Nullable String partition, int bucket, int numRecords) { List logRecords = new ArrayList<>(); @@ -359,6 +472,14 @@ private GenericRecord toRecord(long offset, GenericRow row, ChangeType changeTyp private void createTable( TablePath tablePath, boolean isPrimaryTable, boolean isPartitionedTable) { + createTable(tablePath, isPrimaryTable, isPartitionedTable, Collections.emptyMap()); + } + + private void createTable( + TablePath tablePath, + boolean isPrimaryTable, + boolean isPartitionedTable, + Map tableProperties) { Namespace namespace = Namespace.of(tablePath.getDatabaseName()); if (icebergCatalog instanceof SupportsNamespaces) { SupportsNamespaces ns = (SupportsNamespaces) icebergCatalog; @@ -404,7 +525,7 @@ private void createTable( TableIdentifier tableId = TableIdentifier.of(tablePath.getDatabaseName(), tablePath.getTableName()); - icebergCatalog.createTable(tableId, schema, partitionSpec); + icebergCatalog.createTable(tableId, schema, partitionSpec, tableProperties); } private CloseableIterator getIcebergRows( From 2852d44fa330d4fbdcaf0b5063818f75afba81ad Mon Sep 17 00:00:00 2001 From: vamossagar12 Date: Mon, 16 Mar 2026 15:55:24 +0530 Subject: [PATCH 2/3] Spotless fixes --- .../fluss/lake/iceberg/tiering/IcebergLakeCommitter.java | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeCommitter.java b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeCommitter.java index 71d5f0ab47..04460857f8 100644 --- a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeCommitter.java +++ b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeCommitter.java @@ -74,10 +74,7 @@ public IcebergLakeCommitter( this.icebergCatalog = icebergCatalogProvider.get(); this.icebergTable = getTable(committerInitContext.tablePath()); this.isAutoSnapshotExpiration = - committerInitContext - .tableInfo() - .getTableConfig() - .isDataLakeAutoExpireSnapshot() + committerInitContext.tableInfo().getTableConfig().isDataLakeAutoExpireSnapshot() || committerInitContext .lakeTieringConfig() .get(ConfigOptions.LAKE_TIERING_AUTO_EXPIRE_SNAPSHOT); From 6c09b633a399d7497e9b0252b155c4225775aaca Mon Sep 17 00:00:00 2001 From: vamossagar12 Date: Mon, 16 Mar 2026 21:18:57 +0530 Subject: [PATCH 3/3] AI suggested fix- Making conditional expiration deterministic Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> --- .../fluss/lake/iceberg/tiering/IcebergTieringTest.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergTieringTest.java b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergTieringTest.java index 2f5f627426..8550a13df7 100644 --- a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergTieringTest.java +++ b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergTieringTest.java @@ -301,7 +301,7 @@ void testSnapshotExpiration( // Create Iceberg table with snapshot retention properties Map tableProperties = new HashMap<>(); tableProperties.put(TableProperties.MIN_SNAPSHOTS_TO_KEEP, "1"); - tableProperties.put(TableProperties.MAX_SNAPSHOT_AGE_MS, "1"); + tableProperties.put(TableProperties.MAX_SNAPSHOT_AGE_MS, "1000"); createTable(tablePath, false, false, tableProperties); TableDescriptor descriptor = @@ -327,8 +327,8 @@ void testSnapshotExpiration( // Write data multiple times to generate snapshots for (int round = 0; round < 5; round++) { writeData(tablePath, tableInfo, lakeTieringConfig, bucketNum); - // Small delay to ensure snapshots are older than MAX_SNAPSHOT_AGE_MS=1ms - Thread.sleep(10); + // Delay to ensure snapshots are older than MAX_SNAPSHOT_AGE_MS=1000ms with margin + Thread.sleep(2000); } // Verify snapshot count