diff --git a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeCommitter.java b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeCommitter.java index 06b572760a..04460857f8 100644 --- a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeCommitter.java +++ b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeCommitter.java @@ -17,7 +17,9 @@ package org.apache.fluss.lake.iceberg.tiering; +import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.lake.committer.CommittedLakeSnapshot; +import org.apache.fluss.lake.committer.CommitterInitContext; import org.apache.fluss.lake.committer.LakeCommitResult; import org.apache.fluss.lake.committer.LakeCommitter; import org.apache.fluss.lake.iceberg.maintenance.RewriteDataFileResult; @@ -28,6 +30,7 @@ import org.apache.iceberg.ContentFile; import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.ExpireSnapshots; import org.apache.iceberg.RewriteFiles; import org.apache.iceberg.RowDelta; import org.apache.iceberg.Snapshot; @@ -61,12 +64,20 @@ public class IcebergLakeCommitter implements LakeCommitter currentCommitSnapshotId = new ThreadLocal<>(); - public IcebergLakeCommitter(IcebergCatalogProvider icebergCatalogProvider, TablePath tablePath) + public IcebergLakeCommitter( + IcebergCatalogProvider icebergCatalogProvider, + CommitterInitContext committerInitContext) throws IOException { this.icebergCatalog = icebergCatalogProvider.get(); - this.icebergTable = getTable(tablePath); + this.icebergTable = getTable(committerInitContext.tablePath()); + this.isAutoSnapshotExpiration = + committerInitContext.tableInfo().getTableConfig().isDataLakeAutoExpireSnapshot() + || committerInitContext + .lakeTieringConfig() + .get(ConfigOptions.LAKE_TIERING_AUTO_EXPIRE_SNAPSHOT); // register iceberg listener Listeners.register(new IcebergSnapshotCreateListener(), CreateSnapshotEvent.class); } @@ -141,6 +152,12 @@ public LakeCommitResult commit( snapshotId = rewriteCommitSnapshotId; } } + + // Expire old snapshots if auto-expire-snapshot is enabled + if (isAutoSnapshotExpiration) { + expireSnapshots(); + } + // Iceberg does not provide cumulative table stats API yet; leave stats as -1 (unknown). return LakeCommitResult.committedIsReadable(snapshotId); } catch (Exception e) { @@ -262,6 +279,22 @@ public void close() throws Exception { } } + /** + * Expires old snapshots from the Iceberg table. Uses Iceberg's built-in snapshot expiration + * which respects table properties like {@code history.expire.max-snapshot-age-ms} and {@code + * history.expire.min-snapshots-to-keep}. + */ + private void expireSnapshots() { + try { + ExpireSnapshots expireSnapshots = + icebergTable.expireSnapshots().cleanExpiredFiles(true); + expireSnapshots.commit(); + LOG.debug("Successfully expired old snapshots for Iceberg table."); + } catch (Exception e) { + LOG.warn("Failed to expire snapshots for Iceberg table, will retry on next commit.", e); + } + } + private Table getTable(TablePath tablePath) throws IOException { try { TableIdentifier tableId = toIceberg(tablePath); diff --git a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeTieringFactory.java b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeTieringFactory.java index 9f55104796..f8bafabdad 100644 --- a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeTieringFactory.java +++ b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeTieringFactory.java @@ -53,7 +53,7 @@ public SimpleVersionedSerializer getWriteResultSerializer() @Override public LakeCommitter createLakeCommitter( CommitterInitContext committerInitContext) throws IOException { - return new IcebergLakeCommitter(icebergCatalogProvider, committerInitContext.tablePath()); + return new IcebergLakeCommitter(icebergCatalogProvider, committerInitContext); } @Override diff --git a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergTieringTest.java b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergTieringTest.java index eb24994cc4..8550a13df7 100644 --- a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergTieringTest.java +++ b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergTieringTest.java @@ -39,6 +39,7 @@ import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.SupportsNamespaces; @@ -110,6 +111,15 @@ private static Stream tieringWriteArgs() { Arguments.of(false, false)); } + private static Stream snapshotExpireArgs() { + return Stream.of( + // isTableAutoExpireSnapshot, isLakeTieringExpireSnapshot + Arguments.of(true, true), + Arguments.of(true, false), + Arguments.of(false, true), + Arguments.of(false, false)); + } + @ParameterizedTest @MethodSource("tieringWriteArgs") void testTieringWriteTable(boolean isPrimaryKeyTable, boolean isPartitionedTable) @@ -186,7 +196,7 @@ void testTieringWriteTable(boolean isPrimaryKeyTable, boolean isPartitionedTable // second, commit data try (LakeCommitter lakeCommitter = - createLakeCommitter(tablePath, tableInfo)) { + createLakeCommitter(tablePath, tableInfo, new Configuration())) { // serialize/deserialize committable IcebergCommittable icebergCommittable = lakeCommitter.toCommittable(icebergWriteResults); @@ -249,7 +259,8 @@ public TableInfo tableInfo() { } private LakeCommitter createLakeCommitter( - TablePath tablePath, TableInfo tableInfo) throws IOException { + TablePath tablePath, TableInfo tableInfo, Configuration lakeTieringConfig) + throws IOException { return icebergLakeTieringFactory.createLakeCommitter( new CommitterInitContext() { @Override @@ -264,7 +275,7 @@ public TableInfo tableInfo() { @Override public Configuration lakeTieringConfig() { - return new Configuration(); + return lakeTieringConfig; } @Override @@ -274,6 +285,108 @@ public Configuration flussClientConfig() { }); } + @ParameterizedTest + @MethodSource("snapshotExpireArgs") + void testSnapshotExpiration( + boolean isTableAutoExpireSnapshot, boolean isLakeTieringExpireSnapshot) + throws Exception { + int bucketNum = 3; + TablePath tablePath = + TablePath.of( + "iceberg", + String.format( + "test_snapshot_expire_%s_%s", + isTableAutoExpireSnapshot, isLakeTieringExpireSnapshot)); + + // Create Iceberg table with snapshot retention properties + Map tableProperties = new HashMap<>(); + tableProperties.put(TableProperties.MIN_SNAPSHOTS_TO_KEEP, "1"); + tableProperties.put(TableProperties.MAX_SNAPSHOT_AGE_MS, "1000"); + createTable(tablePath, false, false, tableProperties); + + TableDescriptor descriptor = + TableDescriptor.builder() + .schema( + org.apache.fluss.metadata.Schema.newBuilder() + .column("c1", DataTypes.INT()) + .column("c2", DataTypes.STRING()) + .column("c3", DataTypes.STRING()) + .build()) + .distributedBy(bucketNum) + .property(ConfigOptions.TABLE_DATALAKE_ENABLED, true) + .property( + ConfigOptions.TABLE_DATALAKE_AUTO_EXPIRE_SNAPSHOT, + isTableAutoExpireSnapshot) + .build(); + TableInfo tableInfo = TableInfo.of(tablePath, 0, 1, descriptor, 1L, 1L); + + Configuration lakeTieringConfig = new Configuration(); + lakeTieringConfig.set( + ConfigOptions.LAKE_TIERING_AUTO_EXPIRE_SNAPSHOT, isLakeTieringExpireSnapshot); + + // Write data multiple times to generate snapshots + for (int round = 0; round < 5; round++) { + writeData(tablePath, tableInfo, lakeTieringConfig, bucketNum); + // Delay to ensure snapshots are older than MAX_SNAPSHOT_AGE_MS=1000ms with margin + Thread.sleep(2000); + } + + // Verify snapshot count + Table icebergTable = icebergCatalog.loadTable(toIceberg(tablePath)); + int snapshotCount = 0; + for (Snapshot ignored : icebergTable.snapshots()) { + snapshotCount++; + } + + if (isTableAutoExpireSnapshot || isLakeTieringExpireSnapshot) { + // if auto snapshot expiration is enabled, old snapshots should be expired + // With MIN_SNAPSHOTS_TO_KEEP=1, only 1 snapshot should be retained + assertThat(snapshotCount).isEqualTo(1); + } else { + // if auto snapshot expiration is disabled, all snapshots should be retained + assertThat(snapshotCount).isEqualTo(5); + } + } + + private void writeData( + TablePath tablePath, + TableInfo tableInfo, + Configuration lakeTieringConfig, + int bucketNum) + throws Exception { + List icebergWriteResults = new ArrayList<>(); + SimpleVersionedSerializer writeResultSerializer = + icebergLakeTieringFactory.getWriteResultSerializer(); + SimpleVersionedSerializer committableSerializer = + icebergLakeTieringFactory.getCommittableSerializer(); + + for (int bucket = 0; bucket < bucketNum; bucket++) { + try (LakeWriter writer = + createLakeWriter(tablePath, bucket, null, null, tableInfo)) { + Tuple2, List> writeAndExpectRecords = + genLogTableRecords(null, bucket, 3); + for (LogRecord record : writeAndExpectRecords.f0) { + writer.write(record); + } + IcebergWriteResult result = writer.complete(); + byte[] serialized = writeResultSerializer.serialize(result); + icebergWriteResults.add( + writeResultSerializer.deserialize( + writeResultSerializer.getVersion(), serialized)); + } + } + + try (LakeCommitter lakeCommitter = + createLakeCommitter(tablePath, tableInfo, lakeTieringConfig)) { + IcebergCommittable committable = lakeCommitter.toCommittable(icebergWriteResults); + byte[] serialized = committableSerializer.serialize(committable); + committable = + committableSerializer.deserialize( + committableSerializer.getVersion(), serialized); + lakeCommitter.commit(committable, Collections.emptyMap()); + } + } + private Tuple2, List> genLogTableRecords( @Nullable String partition, int bucket, int numRecords) { List logRecords = new ArrayList<>(); @@ -359,6 +472,14 @@ private GenericRecord toRecord(long offset, GenericRow row, ChangeType changeTyp private void createTable( TablePath tablePath, boolean isPrimaryTable, boolean isPartitionedTable) { + createTable(tablePath, isPrimaryTable, isPartitionedTable, Collections.emptyMap()); + } + + private void createTable( + TablePath tablePath, + boolean isPrimaryTable, + boolean isPartitionedTable, + Map tableProperties) { Namespace namespace = Namespace.of(tablePath.getDatabaseName()); if (icebergCatalog instanceof SupportsNamespaces) { SupportsNamespaces ns = (SupportsNamespaces) icebergCatalog; @@ -404,7 +525,7 @@ private void createTable( TableIdentifier tableId = TableIdentifier.of(tablePath.getDatabaseName(), tablePath.getTableName()); - icebergCatalog.createTable(tableId, schema, partitionSpec); + icebergCatalog.createTable(tableId, schema, partitionSpec, tableProperties); } private CloseableIterator getIcebergRows(