Skip to content

Commit b170000

Browse files
committed
Fix triggerTableAdded idempotency and stale runtime recovery
1 parent d46f256 commit b170000

2 files changed

Lines changed: 118 additions & 26 deletions

File tree

amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java

Lines changed: 54 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -781,31 +781,27 @@ private void syncTable(ExternalCatalog externalCatalog, TableIdentity tableIdent
781781

782782
private boolean triggerTableAdded(
783783
ServerCatalog catalog, ServerTableIdentifier serverTableIdentifier) {
784-
// Clean up any pre-existing runtime to prevent metric registration conflicts.
785-
// This can happen when a table is deleted from DB (e.g., by dispose after filter change)
786-
// but its runtime and metrics remain in memory. We search by table name rather than by ID
787-
// because re-syncing a table creates a new DB row with a different ID.
788-
tableRuntimeMap
789-
.values()
790-
.removeIf(
791-
existing -> {
792-
ServerTableIdentifier existingId = existing.getTableIdentifier();
793-
if (existingId.getCatalog().equals(serverTableIdentifier.getCatalog())
794-
&& existingId.getDatabase().equals(serverTableIdentifier.getDatabase())
795-
&& existingId.getTableName().equals(serverTableIdentifier.getTableName())) {
796-
LOG.warn(
797-
"Found existing table runtime for {}, disposing before re-adding.",
798-
serverTableIdentifier);
799-
try {
800-
existing.dispose();
801-
} catch (Exception e) {
802-
LOG.warn(
803-
"Error disposing existing table runtime for {}", serverTableIdentifier, e);
804-
}
805-
return true;
806-
}
807-
return false;
808-
});
784+
cleanupStaleRuntimeWithSameName(serverTableIdentifier);
785+
786+
TableRuntime existedRuntime = tableRuntimeMap.get(serverTableIdentifier.getId());
787+
if (existedRuntime != null) {
788+
TableRuntimeMeta existingRuntimeMeta =
789+
getAs(
790+
TableRuntimeMapper.class,
791+
mapper -> mapper.selectRuntime(serverTableIdentifier.getId()));
792+
if (existingRuntimeMeta != null) {
793+
LOG.info(
794+
"Table runtime already exists for {}, skip duplicate add trigger.",
795+
serverTableIdentifier);
796+
return true;
797+
}
798+
799+
LOG.warn(
800+
"Found stale in-memory runtime for {}, runtime metadata is missing, re-creating.",
801+
serverTableIdentifier);
802+
disposeRuntimeSafely(existedRuntime, serverTableIdentifier, "while repairing stale runtime");
803+
tableRuntimeMap.remove(serverTableIdentifier.getId(), existedRuntime);
804+
}
809805

810806
AmoroTable<?> table =
811807
catalog.loadTable(
@@ -901,6 +897,39 @@ private boolean triggerTableAdded(
901897
return true;
902898
}
903899

900+
private void cleanupStaleRuntimeWithSameName(ServerTableIdentifier serverTableIdentifier) {
901+
tableRuntimeMap
902+
.values()
903+
.removeIf(
904+
existing -> {
905+
ServerTableIdentifier existingId = existing.getTableIdentifier();
906+
if (Objects.equal(existingId.getId(), serverTableIdentifier.getId())) {
907+
return false;
908+
}
909+
910+
if (existingId.getCatalog().equals(serverTableIdentifier.getCatalog())
911+
&& existingId.getDatabase().equals(serverTableIdentifier.getDatabase())
912+
&& existingId.getTableName().equals(serverTableIdentifier.getTableName())) {
913+
LOG.warn(
914+
"Found stale runtime {} for table {}, disposing before re-adding.",
915+
existingId,
916+
serverTableIdentifier);
917+
disposeRuntimeSafely(existing, existingId, "while cleaning stale runtime");
918+
return true;
919+
}
920+
return false;
921+
});
922+
}
923+
924+
private void disposeRuntimeSafely(
925+
TableRuntime runtime, ServerTableIdentifier tableIdentifier, String operation) {
926+
try {
927+
runtime.dispose();
928+
} catch (Exception e) {
929+
LOG.warn("Error disposing runtime for {} {}", tableIdentifier, operation, e);
930+
}
931+
}
932+
904933
private Optional<TableRuntime> createTableRuntime(
905934
ServerTableIdentifier identifier,
906935
TableRuntimeMeta runtimeMeta,

amoro-ams/src/test/java/org/apache/amoro/server/table/TestSyncTableOfExternalCatalog.java

Lines changed: 64 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
import org.mockito.MockitoAnnotations;
6363

6464
import java.io.IOException;
65+
import java.lang.reflect.Method;
6566
import java.util.List;
6667
import java.util.Map;
6768

@@ -500,7 +501,6 @@ public void testTriggerTableAddedIdempotent() {
500501
ServerTableIdentifier tableIdentifier = serverTableIdentifier();
501502
MetricRegistry globalRegistry = MetricManager.getInstance().getGlobalRegistry();
502503

503-
// Verify initial state: 1 runtime, metrics registered
504504
Assert.assertEquals(1, persistency.getTableRuntimeMetas().size());
505505
int metricCountBefore = globalRegistry.getMetrics().size();
506506
Assert.assertTrue(metricCountBefore > 0);
@@ -533,6 +533,56 @@ public void testTriggerTableAddedIdempotent() {
533533
dropDatabase();
534534
}
535535

536+
@Test
537+
public void testTriggerTableAddedDuplicateSameTableIdIsIdempotent() {
538+
ExternalCatalog externalCatalog = initExternalCatalog();
539+
createTable();
540+
ServerTableIdentifier tableIdentifier = serverTableIdentifier();
541+
MetricRegistry globalRegistry = MetricManager.getInstance().getGlobalRegistry();
542+
543+
Assert.assertEquals(1, persistency.getTableRuntimeMetas().size());
544+
int metricCountBefore = globalRegistry.getMetrics().size();
545+
Assert.assertTrue(metricCountBefore > 0);
546+
547+
boolean added = invokeTriggerTableAdded(externalCatalog, tableIdentifier);
548+
549+
Assert.assertTrue(added);
550+
Assert.assertEquals(1, persistency.getTableRuntimeMetas().size());
551+
Assert.assertTrue(tableService().contains(tableIdentifier.getId()));
552+
Assert.assertEquals(metricCountBefore, globalRegistry.getMetrics().size());
553+
554+
dropTable();
555+
dropDatabase();
556+
}
557+
558+
@Test
559+
public void testTriggerTableAddedRepairStaleRuntimeSameTableId() {
560+
ExternalCatalog externalCatalog = initExternalCatalog();
561+
createTable();
562+
ServerTableIdentifier tableIdentifier = serverTableIdentifier();
563+
MetricRegistry globalRegistry = MetricManager.getInstance().getGlobalRegistry();
564+
565+
int metricCountBefore = globalRegistry.getMetrics().size();
566+
Assert.assertTrue(metricCountBefore > 0);
567+
568+
persistency.deleteTableRuntime(tableIdentifier.getId());
569+
Assert.assertEquals(0, persistency.getTableRuntimeMetas().size());
570+
Assert.assertTrue(tableService().contains(tableIdentifier.getId()));
571+
572+
boolean added = invokeTriggerTableAdded(externalCatalog, tableIdentifier);
573+
574+
Assert.assertTrue(added);
575+
Assert.assertEquals(1, persistency.getTableRuntimeMetas().size());
576+
Assert.assertTrue(tableService().contains(tableIdentifier.getId()));
577+
Assert.assertEquals(metricCountBefore, globalRegistry.getMetrics().size());
578+
579+
tableService().exploreTableRuntimes();
580+
Assert.assertEquals(1, persistency.getTableRuntimeMetas().size());
581+
582+
dropTable();
583+
dropDatabase();
584+
}
585+
536586
@Mock private DefaultTableRuntime tableRuntimeWithException;
537587
@Mock private ServerTableIdentifier tableIdentifierWithException;
538588

@@ -659,6 +709,19 @@ private void disposeNewCatalogTable(
659709
catalogManager().dropCatalog(catalogName);
660710
}
661711

712+
private boolean invokeTriggerTableAdded(
713+
ServerCatalog catalog, ServerTableIdentifier tableIdentifier) {
714+
try {
715+
Method triggerTableAddedMethod =
716+
DefaultTableService.class.getDeclaredMethod(
717+
"triggerTableAdded", ServerCatalog.class, ServerTableIdentifier.class);
718+
triggerTableAddedMethod.setAccessible(true);
719+
return (boolean) triggerTableAddedMethod.invoke(tableService(), catalog, tableIdentifier);
720+
} catch (Exception e) {
721+
throw new RuntimeException(e);
722+
}
723+
}
724+
662725
private static class Persistency extends PersistentBase {
663726
public void addTableIdentifier(
664727
String catalog, String database, String tableName, TableFormat format) {

0 commit comments

Comments
 (0)