diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java index ae2c698b025..b0a6d082363 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java @@ -148,7 +148,13 @@ Collection generateCopyEntities(FileSystem targetFs, CopyConfigurati String fileSet = this.getFileSetId(); List copyEntities = Lists.newArrayList(); TableMetadata destTableMetadataBeforeSrcRead = getCurrentDestTableMetadata(); - GetFilePathsToFileStatusResult atomicGetPathsResult = getFilePathsToFileStatus(targetFs, copyConfig, this.shouldIncludeMetadataPath); + GetFilePathsToFileStatusResult atomicGetPathsResult; + try { + atomicGetPathsResult = getFilePathsToFileStatus(targetFs, copyConfig, this.shouldIncludeMetadataPath); + } catch (IcebergTable.NoSnapshotFoundException e) { + log.warn("~{}~ source table has no snapshots, nothing to copy", fileSet); + return Lists.newArrayList(); + } Map pathToFileStatus = atomicGetPathsResult.getPathsToFileStatus(); log.info("~{}~ found {} candidate source paths", fileSet, pathToFileStatus.size()); diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java index a899f83c8cc..95557e08cd1 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java @@ -21,6 +21,7 @@ import java.net.URI; import java.time.Instant; import java.util.ArrayList; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -90,6 +91,12 @@ public TableNotFoundException(TableIdentifier tableId) { } } + public static class NoSnapshotFoundException extends IOException { + public NoSnapshotFoundException(TableIdentifier tableId) { + super("No Snapshot found: '" + tableId + "'"); + } + } + @Getter private final TableIdentifier tableId; /** allow the {@link IcebergCatalog} creating this table to qualify its {@link DatasetDescriptor#getName()} used for lineage, etc. */ @@ -108,19 +115,28 @@ public TableNotFoundException(TableIdentifier tableId) { /** @return metadata info limited to the most recent (current) snapshot */ public IcebergSnapshotInfo getCurrentSnapshotInfo() throws IOException { TableMetadata current = accessTableMetadata(); - return createSnapshotInfo(current.currentSnapshot(), Optional.of(current.metadataFileLocation()), Optional.of(current)); + Snapshot currentSnapshot = accessCurrentSnapshot(current); + return createSnapshotInfo(currentSnapshot, Optional.of(current.metadataFileLocation()), Optional.of(current)); } /** @return metadata info for most recent snapshot, wherein manifests and their child data files ARE NOT listed */ public IcebergSnapshotInfo getCurrentSnapshotInfoOverviewOnly() throws IOException { TableMetadata current = accessTableMetadata(); - return createSnapshotInfo(current.currentSnapshot(), Optional.of(current.metadataFileLocation()), Optional.of(current), true); + Snapshot currentSnapshot = accessCurrentSnapshot(current); + return createSnapshotInfo(currentSnapshot, Optional.of(current.metadataFileLocation()), Optional.of(current), true); } /** @return metadata info for all known snapshots, ordered historically, with *most recent last* */ public Iterator getAllSnapshotInfosIterator() throws IOException { TableMetadata current = accessTableMetadata(); - long currentSnapshotId = current.currentSnapshot().snapshotId(); + Snapshot currentSnapshot; + try { + currentSnapshot = accessCurrentSnapshot(current); + } catch (NoSnapshotFoundException e) { + log.warn("~{}~ No snapshot found, returning empty snapshot info iterator", tableId); + return Collections.emptyIterator(); + } + long currentSnapshotId = currentSnapshot.snapshotId(); List snapshots = current.snapshots(); return Iterators.transform(snapshots.iterator(), snapshot -> { try { @@ -183,6 +199,12 @@ protected TableMetadata accessTableMetadata() throws TableNotFoundException { return Optional.ofNullable(current).orElseThrow(() -> new TableNotFoundException(this.tableId)); } + /** @throws {@link IcebergTable.NoSnapshotFoundException} when table is empty i.e. table has zero snapshot */ + protected Snapshot accessCurrentSnapshot(TableMetadata tableMetadata) throws NoSnapshotFoundException { + Snapshot currentSnapshot = tableMetadata.currentSnapshot(); + return Optional.ofNullable(currentSnapshot).orElseThrow(() -> new NoSnapshotFoundException(this.tableId)); + } + protected IcebergSnapshotInfo createSnapshotInfo(Snapshot snapshot, Optional metadataFileLocation, Optional currentTableMetadata) throws IOException { return createSnapshotInfo(snapshot, metadataFileLocation, currentTableMetadata, false); @@ -260,9 +282,15 @@ protected void registerIcebergTable(TableMetadata srcMetadata, TableMetadata dst public List getPartitionSpecificDataFiles(Predicate icebergPartitionFilterPredicate) throws IOException { TableMetadata tableMetadata = accessTableMetadata(); - Snapshot currentSnapshot = tableMetadata.currentSnapshot(); - long currentSnapshotId = currentSnapshot.snapshotId(); List knownDataFiles = new ArrayList<>(); + Snapshot currentSnapshot; + try { + currentSnapshot = accessCurrentSnapshot(tableMetadata); + } catch (NoSnapshotFoundException e) { + log.warn("~{}~ No snapshot found, returning empty data files list", tableId); + return knownDataFiles; + } + long currentSnapshotId = currentSnapshot.snapshotId(); GrowthMilestoneTracker growthMilestoneTracker = new GrowthMilestoneTracker(); //TODO: Add support for deleteManifests as well later // Currently supporting dataManifests only @@ -307,10 +335,10 @@ protected void overwritePartition(List dataFiles, String partitionColN return; } TableMetadata tableMetadata = accessTableMetadata(); - Optional currentSnapshot = Optional.ofNullable(tableMetadata.currentSnapshot()); - if (currentSnapshot.isPresent()) { - log.info("~{}~ SnapshotId before overwrite: {}", tableId, currentSnapshot.get().snapshotId()); - } else { + try { + Snapshot currentSnapshot = accessCurrentSnapshot(tableMetadata); + log.info("~{}~ SnapshotId before overwrite: {}", tableId, currentSnapshot.snapshotId()); + } catch (NoSnapshotFoundException e) { log.warn("~{}~ No current snapshot found before overwrite", tableId); } OverwriteFiles overwriteFiles = this.table.newOverwrite(); diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java index f083e7e7671..de2c9b499f3 100644 --- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java +++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java @@ -350,6 +350,33 @@ public void testGetCurrentSnapshotInfoOnBogusTable() throws IOException { Assert.fail("expected an exception when using table ID '" + bogusTableId + "'"); } + /** Verify failure when attempting to get current snapshot info for an empty table */ + @Test(expectedExceptions = IcebergTable.NoSnapshotFoundException.class) + public void testGetCurrentSnapshotInfoOnEmptyTable() throws IOException { + IcebergSnapshotInfo snapshotInfo = new IcebergTable(tableId, catalog.newTableOps(tableId), catalogUri, + catalog.loadTable(tableId)).getCurrentSnapshotInfo(); + Assert.fail("expected an exception when using table ID '" + tableId + "'"); + } + + /** Verify that getAllSnapshotInfosIterator returns empty iterator for an empty table (no snapshots) */ + @Test + public void testGetAllSnapshotInfosIteratorOnEmptyTable() throws IOException { + IcebergTable icebergTable = new IcebergTable(tableId, catalog.newTableOps(tableId), catalogUri, + catalog.loadTable(tableId)); + List snapshotInfos = Lists.newArrayList(icebergTable.getAllSnapshotInfosIterator()); + Assert.assertTrue(snapshotInfos.isEmpty(), "expected empty snapshot infos for table with no snapshots"); + } + + /** Verify that getPartitionSpecificDataFiles returns empty list for an empty table (no snapshots) */ + @Test + public void testGetPartitionSpecificDataFilesOnEmptyTable() throws IOException { + IcebergTable icebergTable = new IcebergTable(tableId, catalog.newTableOps(tableId), catalogUri, + catalog.loadTable(tableId)); + Predicate alwaysTruePredicate = partition -> true; + List dataFiles = icebergTable.getPartitionSpecificDataFiles(alwaysTruePredicate); + Assert.assertTrue(dataFiles.isEmpty(), "expected empty data files for table with no snapshots"); + } + /** Verify info about all (full) snapshots */ @Test(dataProvider = "isPosDeleteProvider") public void testGetAllSnapshotInfosIterator(boolean isPosDelete) throws IOException {