diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java index d9dfe2822067bf..729f8514203e28 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java @@ -578,14 +578,16 @@ private List> getFilteredDatabaseNames() { * @param invalidCache if {@code true}, the catalog cache will be invalidated * and reloaded during the refresh process. */ - public synchronized void resetToUninitialized(boolean invalidCache) { - this.objectCreated = false; - this.initialized = false; - synchronized (this.confLock) { - this.cachedConf = null; + public void resetToUninitialized(boolean invalidCache) { + synchronized (this) { + this.objectCreated = false; + this.initialized = false; + synchronized (this.confLock) { + this.cachedConf = null; + } + this.lowerCaseToDatabaseName.clear(); + onClose(); } - this.lowerCaseToDatabaseName.clear(); - onClose(); onRefreshCache(invalidCache); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java index c849d779217e7a..a841626bcadf2f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java @@ -77,7 +77,7 @@ public class JdbcExternalCatalog extends ExternalCatalog { // Must add "transient" for Gson to ignore this field, // or Gson will throw exception with HikariCP private transient JdbcClient jdbcClient; - private IdentifierMapping identifierMapping; + private volatile IdentifierMapping identifierMapping; private ExternalFunctionRules functionRules; public JdbcExternalCatalog(long catalogId, String name, String resource, Map props, @@ -134,7 +134,7 @@ public void setDefaultPropsIfMissing(boolean isReplay) { } @Override - public synchronized void resetToUninitialized(boolean invalidCache) { + public void resetToUninitialized(boolean invalidCache) { super.resetToUninitialized(invalidCache); this.identifierMapping = new JdbcIdentifierMapping( (Env.isTableNamesCaseInsensitive() || Env.isStoredTableNamesLowerCase()), diff --git a/fe/fe-core/src/test/java/org/apache/doris/datasource/ExternalCatalogDeadlockTest.java b/fe/fe-core/src/test/java/org/apache/doris/datasource/ExternalCatalogDeadlockTest.java new file mode 100644 index 00000000000000..8a8172f2125853 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/datasource/ExternalCatalogDeadlockTest.java @@ -0,0 +1,162 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource; + +import org.apache.doris.datasource.InitCatalogLog.Type; + +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.LoadingCache; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.lang.management.ManagementFactory; +import java.lang.management.ThreadMXBean; +import java.util.Arrays; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +public class ExternalCatalogDeadlockTest { + + @Test + public void testResetToUninitializedShouldNotDeadlockWithCacheLoader() throws Exception { + DeadlockCatalog catalog = new DeadlockCatalog(); + CountDownLatch loaderEntered = new CountDownLatch(1); + CountDownLatch allowLoaderToTouchCatalog = new CountDownLatch(1); + AtomicReference backgroundFailure = new AtomicReference<>(); + + // The loader holds Caffeine's per-key lock before it calls back into the catalog. + LoadingCache cache = Caffeine.newBuilder().build(key -> { + loaderEntered.countDown(); + awaitLatch(allowLoaderToTouchCatalog); + catalog.makeSureInitialized(); + return key; + }); + + Thread queryThread = new Thread( + () -> runQuietly(backgroundFailure, () -> cache.get("deadlock-key")), + "deadlock-cache-loader"); + queryThread.setDaemon(true); + queryThread.start(); + Assertions.assertTrue(loaderEntered.await(5, TimeUnit.SECONDS)); + + Thread refreshThread = new Thread( + () -> runQuietly(backgroundFailure, () -> { + // resetToUninitialized() grabs the catalog monitor before invalidating the cache. + catalog.setInvalidator(() -> { + allowLoaderToTouchCatalog.countDown(); + cache.invalidate("deadlock-key"); + }); + catalog.resetToUninitialized(true); + }), + "deadlock-catalog-refresh"); + refreshThread.setDaemon(true); + refreshThread.start(); + + assertNoDeadlock(queryThread, refreshThread, backgroundFailure); + } + + private static void assertNoDeadlock(Thread queryThread, Thread refreshThread, + AtomicReference backgroundFailure) throws Exception { + long[] deadlockedThreads = waitForDeadlock(queryThread, refreshThread); + queryThread.join(TimeUnit.SECONDS.toMillis(5)); + refreshThread.join(TimeUnit.SECONDS.toMillis(5)); + Assertions.assertNull(backgroundFailure.get(), "unexpected background failure: " + backgroundFailure.get()); + Assertions.assertNull(deadlockedThreads, + String.format("detected deadlock between threads %s and %s", + queryThread.getName(), refreshThread.getName())); + Assertions.assertFalse(queryThread.isAlive(), queryThread.getName() + " is still running"); + Assertions.assertFalse(refreshThread.isAlive(), refreshThread.getName() + " is still running"); + } + + private static void awaitLatch(CountDownLatch latch) throws InterruptedException { + Assertions.assertTrue(latch.await(5, TimeUnit.SECONDS)); + } + + private static void runQuietly(AtomicReference failure, ThrowingRunnable task) { + try { + task.run(); + } catch (Throwable t) { + failure.compareAndSet(null, t); + } + } + + private static long[] waitForDeadlock(Thread queryThread, Thread refreshThread) throws InterruptedException { + ThreadMXBean threadMxBean = ManagementFactory.getThreadMXBean(); + for (int i = 0; i < 100; i++) { + long[] deadlockedThreads = threadMxBean.findDeadlockedThreads(); + if (deadlockedThreads != null + && contains(deadlockedThreads, queryThread.getId()) + && contains(deadlockedThreads, refreshThread.getId())) { + return deadlockedThreads; + } + Thread.sleep(50); + } + return null; + } + + private static boolean contains(long[] ids, long targetId) { + return Arrays.stream(ids).anyMatch(id -> id == targetId); + } + + private static class DeadlockCatalog extends ExternalCatalog { + private Runnable invalidator = () -> { + }; + + DeadlockCatalog() { + super(1L, "deadlock-catalog", Type.TEST, ""); + initialized = true; + } + + void setInvalidator(Runnable invalidator) { + this.invalidator = invalidator; + } + + @Override + protected void initLocalObjectsImpl() { + } + + @Override + public void onClose() { + } + + @Override + public void onRefreshCache(boolean invalidCache) { + // Keep the harness catalog usable after refresh so the test only checks lock ordering. + initialized = true; + if (invalidCache) { + invalidator.run(); + } + } + + @Override + protected java.util.List listTableNamesFromRemote(SessionContext ctx, String dbName) { + return java.util.Collections.emptyList(); + } + + @Override + public boolean tableExist(SessionContext ctx, String dbName, String tblName) { + return false; + } + } + + @FunctionalInterface + private interface ThrowingRunnable { + void run() throws Exception; + } +}