Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,16 @@ protected void registerMetric(MetricRegistry registry, MetricDefine define, Metr

public void register(MetricRegistry registry) {
if (globalRegistry == null) {
registerMetrics(registry);
try {
registerMetrics(registry);
} catch (Throwable t) {
// Roll back any metrics that were partially registered to prevent orphaned metrics
// in the global MetricRegistry. Without this, unregister() would be a no-op because
// globalRegistry is still null, leaving metrics permanently leaked.
registeredMetricKeys.forEach(registry::unregister);
registeredMetricKeys.clear();
throw t;
}
globalRegistry = registry;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -781,6 +781,28 @@ private void syncTable(ExternalCatalog externalCatalog, TableIdentity tableIdent

private boolean triggerTableAdded(
ServerCatalog catalog, ServerTableIdentifier serverTableIdentifier) {
cleanupStaleRuntimeWithSameName(serverTableIdentifier);

TableRuntime existedRuntime = tableRuntimeMap.get(serverTableIdentifier.getId());
if (existedRuntime != null) {
TableRuntimeMeta existingRuntimeMeta =
getAs(
TableRuntimeMapper.class,
mapper -> mapper.selectRuntime(serverTableIdentifier.getId()));
if (existingRuntimeMeta != null) {
LOG.info(
"Table runtime already exists for {}, skip duplicate add trigger.",
serverTableIdentifier);
return true;
}

LOG.warn(
"Found stale in-memory runtime for {}, runtime metadata is missing, re-creating.",
serverTableIdentifier);
disposeRuntimeSafely(existedRuntime, serverTableIdentifier, "while repairing stale runtime");
tableRuntimeMap.remove(serverTableIdentifier.getId(), existedRuntime);
}

AmoroTable<?> table =
catalog.loadTable(
serverTableIdentifier.getDatabase(), serverTableIdentifier.getTableName());
Expand Down Expand Up @@ -875,6 +897,39 @@ private boolean triggerTableAdded(
return true;
}

private void cleanupStaleRuntimeWithSameName(ServerTableIdentifier serverTableIdentifier) {
tableRuntimeMap
.values()
.removeIf(
existing -> {
ServerTableIdentifier existingId = existing.getTableIdentifier();
if (Objects.equal(existingId.getId(), serverTableIdentifier.getId())) {
return false;
}

if (existingId.getCatalog().equals(serverTableIdentifier.getCatalog())
&& existingId.getDatabase().equals(serverTableIdentifier.getDatabase())
&& existingId.getTableName().equals(serverTableIdentifier.getTableName())) {
LOG.warn(
"Found stale runtime {} for table {}, disposing before re-adding.",
existingId,
serverTableIdentifier);
disposeRuntimeSafely(existing, existingId, "while cleaning stale runtime");
return true;
}
return false;
});
}

private void disposeRuntimeSafely(
TableRuntime runtime, ServerTableIdentifier tableIdentifier, String operation) {
try {
runtime.dispose();
} catch (Exception e) {
LOG.warn("Error disposing runtime for {} {}", tableIdentifier, operation, e);
}
}

private Optional<TableRuntime> createTableRuntime(
ServerTableIdentifier identifier,
TableRuntimeMeta runtimeMeta,
Expand All @@ -900,7 +955,14 @@ private void revertTableRuntimeAdded(
externalCatalog.getServerTableIdentifier(
tableIdentity.getDatabase(), tableIdentity.getTableName());
if (tableIdentifier != null) {
tableRuntimeMap.remove(tableIdentifier.getId());
TableRuntime runtime = tableRuntimeMap.remove(tableIdentifier.getId());
if (runtime != null) {
try {
runtime.dispose();
} catch (Exception e) {
LOG.warn("Error disposing runtime during revert for {}", tableIdentifier, e);
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.amoro.server.table;

import org.apache.amoro.ServerTableIdentifier;
import org.apache.amoro.TableFormat;
import org.apache.amoro.metrics.Counter;
import org.apache.amoro.metrics.MetricDefine;
import org.apache.amoro.metrics.MetricRegistry;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

/**
* Test that AbstractTableMetrics.register properly rolls back partially registered metrics when
* registerMetrics throws an exception partway through.
*/
public class TestAbstractTableMetricsPartialRegistration {

private static final MetricDefine METRIC_A =
MetricDefine.defineCounter("test_metric_a").withTags("catalog", "database", "table").build();

private static final MetricDefine METRIC_B =
MetricDefine.defineCounter("test_metric_b").withTags("catalog", "database", "table").build();

private static final MetricDefine METRIC_C =
MetricDefine.defineCounter("test_metric_c").withTags("catalog", "database", "table").build();

/**
* A test subclass of AbstractTableMetrics that registers 2 metrics successfully, then throws an
* exception on the 3rd metric.
*/
private static class PartialFailureMetrics extends AbstractTableMetrics {

PartialFailureMetrics(ServerTableIdentifier identifier) {
super(identifier);
}

@Override
protected void registerMetrics(MetricRegistry registry) {
registerMetric(registry, METRIC_A, new Counter());
registerMetric(registry, METRIC_B, new Counter());
// Simulate failure on the 3rd metric
throw new RuntimeException("Simulated registration failure");
}
}

/** A normal test subclass that registers all 3 metrics successfully. */
private static class NormalMetrics extends AbstractTableMetrics {

NormalMetrics(ServerTableIdentifier identifier) {
super(identifier);
}

@Override
protected void registerMetrics(MetricRegistry registry) {
registerMetric(registry, METRIC_A, new Counter());
registerMetric(registry, METRIC_B, new Counter());
registerMetric(registry, METRIC_C, new Counter());
}
}

@Test
public void testPartialRegistrationRollback() {
MetricRegistry registry = new MetricRegistry();
ServerTableIdentifier identifier =
ServerTableIdentifier.of("test_catalog", "test_db", "test_table", TableFormat.ICEBERG);

PartialFailureMetrics metrics = new PartialFailureMetrics(identifier);

// register should throw, and the 2 partially registered metrics should be rolled back
Assertions.assertThrows(RuntimeException.class, () -> metrics.register(registry));

// Verify: no metrics remain in the registry (all rolled back)
Assertions.assertEquals(0, registry.getMetrics().size());

// Verify: globalRegistry is still null (register didn't complete)
// This means unregister() would be a no-op, which is fine since we already cleaned up
metrics.unregister(); // should be safe no-op

// Verify: a new metrics instance can register the same metrics without conflict
NormalMetrics newMetrics = new NormalMetrics(identifier);
newMetrics.register(registry);
Assertions.assertEquals(3, registry.getMetrics().size());

// Clean up
newMetrics.unregister();
Assertions.assertEquals(0, registry.getMetrics().size());
}

@Test
public void testSuccessfulRegistrationAndUnregistration() {
MetricRegistry registry = new MetricRegistry();
ServerTableIdentifier identifier =
ServerTableIdentifier.of("test_catalog", "test_db", "test_table", TableFormat.ICEBERG);

NormalMetrics metrics = new NormalMetrics(identifier);

// Normal registration should succeed
metrics.register(registry);
Assertions.assertEquals(3, registry.getMetrics().size());

// Unregister should clean up all metrics
metrics.unregister();
Assertions.assertEquals(0, registry.getMetrics().size());

// Re-registration with a new instance should succeed
NormalMetrics newMetrics = new NormalMetrics(identifier);
newMetrics.register(registry);
Assertions.assertEquals(3, registry.getMetrics().size());

newMetrics.unregister();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.mockito.MockitoAnnotations;

import java.io.IOException;
import java.lang.reflect.Method;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -455,6 +456,133 @@ private UnifiedCatalog createNewCatalogTable(String catalogName, String dbName,
return externalCatalog;
}

/**
* Test that a table can be re-added after disposal without metric conflicts. This verifies the
* fix for the "Metric is already been registered" error that occurs when database-filter changes
* cause tables to be disposed and then re-synced.
*/
@Test
public void testTableReAddAfterDispose_MetricsClean() {
createTable();
ServerTableIdentifier tableIdentifier = serverTableIdentifier();
MetricRegistry globalRegistry = MetricManager.getInstance().getGlobalRegistry();

// Verify table runtime and metrics are registered
List<TableRuntimeMeta> runtimesAfterAdd = persistency.getTableRuntimeMetas();
Assert.assertEquals(1, runtimesAfterAdd.size());
Assert.assertFalse(globalRegistry.getMetrics().isEmpty());

// Dispose the table (simulates filter change excluding all databases)
tableService().disposeTable(tableIdentifier);
List<TableRuntimeMeta> runtimesAfterDispose = persistency.getTableRuntimeMetas();
Assert.assertEquals(0, runtimesAfterDispose.size());

// Re-sync via exploreTableRuntimes (simulates filter being removed, table re-discovered)
// The external iceberg table still exists, so it will be re-synced.
// This should NOT throw "Metric is already been registered"
tableService().exploreTableRuntimes();
List<TableRuntimeMeta> runtimesAfterReAdd = persistency.getTableRuntimeMetas();
Assert.assertEquals(1, runtimesAfterReAdd.size());
Assert.assertFalse(globalRegistry.getMetrics().isEmpty());

dropTable();
dropDatabase();
}

/**
* Test that triggerTableAdded is idempotent: if a table runtime already exists in the map (e.g.,
* due to incomplete disposal), re-adding the table should dispose the old runtime first and
* succeed without metric conflicts.
*/
@Test
public void testTriggerTableAddedIdempotent() {
ExternalCatalog externalCatalog = initExternalCatalog();
createTable();
ServerTableIdentifier tableIdentifier = serverTableIdentifier();
MetricRegistry globalRegistry = MetricManager.getInstance().getGlobalRegistry();

Assert.assertEquals(1, persistency.getTableRuntimeMetas().size());
int metricCountBefore = globalRegistry.getMetrics().size();
Assert.assertTrue(metricCountBefore > 0);

// Simulate inconsistent state: delete table records from DB but leave runtime in map.
// This mimics the scenario where disposeTable's DB operation fails and the table is
// later removed from DB by another path, leaving orphaned metrics in the registry.
persistency.deleteTableRuntime(tableIdentifier.getId());
persistency.deleteTableIdentifier(tableIdentifier.getIdentifier().buildTableIdentifier());

// Verify DB is empty but runtime (with metrics) is still in memory
Assert.assertEquals(0, persistency.getTableRuntimeMetas().size());
Assert.assertEquals(0, tableManager().listManagedTables().size());
Assert.assertTrue(tableService().contains(tableIdentifier.getId()));

// exploreExternalCatalog should detect the table in external catalog but not in DB,
// and call syncTable -> triggerTableAdded. With the fix, it should dispose the old
// runtime (by table name match) first, then register the new one without conflict.
tableService().exploreExternalCatalog(externalCatalog);

// Verify: table is re-registered successfully with new ID, metrics present
List<TableRuntimeMeta> runtimesAfterReSync = persistency.getTableRuntimeMetas();
Assert.assertEquals(1, runtimesAfterReSync.size());
Assert.assertFalse(globalRegistry.getMetrics().isEmpty());

// The old runtime should be gone from the map (old ID)
Assert.assertFalse(tableService().contains(tableIdentifier.getId()));

dropTable();
dropDatabase();
}

@Test
public void testTriggerTableAddedDuplicateSameTableIdIsIdempotent() {
ExternalCatalog externalCatalog = initExternalCatalog();
createTable();
ServerTableIdentifier tableIdentifier = serverTableIdentifier();
MetricRegistry globalRegistry = MetricManager.getInstance().getGlobalRegistry();

Assert.assertEquals(1, persistency.getTableRuntimeMetas().size());
int metricCountBefore = globalRegistry.getMetrics().size();
Assert.assertTrue(metricCountBefore > 0);

boolean added = invokeTriggerTableAdded(externalCatalog, tableIdentifier);

Assert.assertTrue(added);
Assert.assertEquals(1, persistency.getTableRuntimeMetas().size());
Assert.assertTrue(tableService().contains(tableIdentifier.getId()));
Assert.assertEquals(metricCountBefore, globalRegistry.getMetrics().size());

dropTable();
dropDatabase();
}

@Test
public void testTriggerTableAddedRepairStaleRuntimeSameTableId() {
ExternalCatalog externalCatalog = initExternalCatalog();
createTable();
ServerTableIdentifier tableIdentifier = serverTableIdentifier();
MetricRegistry globalRegistry = MetricManager.getInstance().getGlobalRegistry();

int metricCountBefore = globalRegistry.getMetrics().size();
Assert.assertTrue(metricCountBefore > 0);

persistency.deleteTableRuntime(tableIdentifier.getId());
Assert.assertEquals(0, persistency.getTableRuntimeMetas().size());
Assert.assertTrue(tableService().contains(tableIdentifier.getId()));

boolean added = invokeTriggerTableAdded(externalCatalog, tableIdentifier);

Assert.assertTrue(added);
Assert.assertEquals(1, persistency.getTableRuntimeMetas().size());
Assert.assertTrue(tableService().contains(tableIdentifier.getId()));
Assert.assertEquals(metricCountBefore, globalRegistry.getMetrics().size());

tableService().exploreTableRuntimes();
Assert.assertEquals(1, persistency.getTableRuntimeMetas().size());

dropTable();
dropDatabase();
}

@Mock private DefaultTableRuntime tableRuntimeWithException;
@Mock private ServerTableIdentifier tableIdentifierWithException;

Expand Down Expand Up @@ -581,6 +709,19 @@ private void disposeNewCatalogTable(
catalogManager().dropCatalog(catalogName);
}

private boolean invokeTriggerTableAdded(
ServerCatalog catalog, ServerTableIdentifier tableIdentifier) {
try {
Method triggerTableAddedMethod =
DefaultTableService.class.getDeclaredMethod(
"triggerTableAdded", ServerCatalog.class, ServerTableIdentifier.class);
triggerTableAddedMethod.setAccessible(true);
return (boolean) triggerTableAddedMethod.invoke(tableService(), catalog, tableIdentifier);
} catch (Exception e) {
throw new RuntimeException(e);
}
}

private static class Persistency extends PersistentBase {
public void addTableIdentifier(
String catalog, String database, String tableName, TableFormat format) {
Expand Down
Loading