From 1183159f81aa0dc8446a2118837ce6326deaa323 Mon Sep 17 00:00:00 2001 From: yaoxiao Date: Sat, 23 May 2026 16:01:35 +0800 Subject: [PATCH] [fix](fe) avoid concurrent tablet stat iteration failures (#63298) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ### What problem does this PR solve? Issue Number: #59138 Problem Summary: TabletStatMgr.runAfterCatalogReady() is a periodic master-FE daemon that iterates every tablet/replica to pull statistics. When DDL runs concurrently with this daemon, two races fire: Iteration race (CME). MaterializedIndex.tablets and LocalTablet.replicas were plain ArrayLists whose getters returned the internal list. A concurrent addTablet / addReplica / deleteReplica (clone, repair, schema change, restore, report handler) during iteration triggered the fail-fast iterator and threw ConcurrentModificationException. TOCTOU race. In updateTabletStat, a getTabletMeta(id) != null check is followed by getReplica(id, beId). If the tablet is removed in between, getReplica hits Preconditions.checkState(...) and throws IllegalStateException. When the daemon throws, the current cycle leaves stale tablet/partition/table sizes and skewed MetricRepo metrics until the next cycle. Solution: Close the CME race for good with copy-on-write via a volatile snapshot. A first attempt returned a defensive copy (Lists.newArrayList(...)), but the copy itself iterates the source list and can still CME mid-copy — the window shrank but did not close. This PR instead: Makes LocalTablet.replicas and MaterializedIndex.tablets volatile. Writers (addReplica / deleteReplica / deleteReplicaByBackendId / addTablet / clearTabletsForRestore) are synchronized, build a new list, and atomically swap the volatile reference — they never mutate a list in place. Readers (getReplicas() / getTablets()) do a single volatile read and return an immutable snapshot (Collections.unmodifiableList). Iteration is lock-free and can never CME, and the hot read path no longer copies elements. synchronized on writers is required (not just volatile) because some write paths do not hold the OlapTable write lock — verified by tracing call sites: InternalCatalog.createPartitionWithIndices and RestoreJob.resetPartitionForRestore call addReplica/addTablet without the table write lock, so concurrent writers are real and a plain volatile field would allow lost updates. Writers are infrequent (DDL / repair / restore), so the lock cost is negligible; reads stay lock-free. TOCTOU race is handled by catching IllegalStateException around getReplica (kept from the original fix) and counting the skip via a new TabletStatMgr.staleTabletStatSkipped counter, which makes the race observable (>0 proves the window was actually hit) instead of relying solely on log scraping. Cloud path: CloudTabletStatMgr.updateStatInfo iterates tablet.getReplicas() and is covered by the same snapshot fix; its updateTabletStat uses getReplicasByTabletId (locked, returns empty list, no checkState) and is already safe. ### Release note None ### Check List (For Author) - Test - [ ] Regression test - [ ] Unit Test - [ ] Manual test (add detailed scripts or steps below) - [ ] No need to test or manual test. Explain why: - [ ] This is a refactor/code format and no logic has been changed. - [ ] Previous test can cover this change. - [ ] No code files have been changed. - [ ] Other reason - Behavior changed: - [ ] No. - [ ] Yes. - Does this need documentation? - [ ] No. - [ ] Yes. ### Check List (For Reviewer who merge this PR) - [ ] Confirm the release note - [ ] Confirm test cases - [ ] Confirm document - [ ] Add branch pick label --------- Co-authored-by: morningman Co-authored-by: Claude Opus 4.7 (1M context) --- .../doris/alter/MaterializedViewHandler.java | 17 +++- .../doris/alter/SchemaChangeHandler.java | 19 +++- .../org/apache/doris/backup/RestoreJob.java | 9 +- .../doris/catalog/CloudTabletStatMgr.java | 1 + .../org/apache/doris/catalog/LocalTablet.java | 96 +++++++++++-------- .../doris/catalog/MaterializedIndex.java | 89 +++++++++++++---- .../org/apache/doris/catalog/OlapTable.java | 7 +- .../apache/doris/catalog/TabletStatMgr.java | 18 +++- .../doris/cloud/backup/CloudRestoreJob.java | 8 +- .../datasource/CloudInternalCatalog.java | 12 ++- .../doris/datasource/InternalCatalog.java | 14 ++- .../doris/catalog/MaterializedIndexTest.java | 65 +++++++++++++ .../org/apache/doris/catalog/TabletTest.java | 79 +++++++++++++++ 13 files changed, 360 insertions(+), 74 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java index 37ec3255f1bc85..5e3e64bf8c828f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java @@ -437,14 +437,22 @@ private RollupJobV2 createMaterializedViewJob(String rawSql, String mvName, Stri MaterializedIndex mvIndex = new MaterializedIndex(mvIndexId, IndexState.SHADOW); MaterializedIndex baseIndex = partition.getIndex(baseIndexId); short replicationNum = olapTable.getPartitionInfo().getReplicaAllocation(partitionId).getTotalReplicaNum(); + // All MV tablets of the same (partition, mv index) share the same TabletMeta; + // build it once and bulk-publish to MaterializedIndex.tablets after the per-tablet + // loop to keep copy-on-write O(n). TabletInvertedIndex registration stays + // per-iteration because Tablet.addReplica(...) below needs the tablet present + // in the inverted index. + TabletMeta mvTabletMeta = new TabletMeta( + dbId, tableId, partitionId, mvIndexId, mvSchemaHash, medium); + List mvTabletsForPartition = Lists.newArrayListWithCapacity(baseIndex.getTablets().size()); + TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex(); for (Tablet baseTablet : baseIndex.getTablets()) { - TabletMeta mvTabletMeta = new TabletMeta( - dbId, tableId, partitionId, mvIndexId, mvSchemaHash, medium); long baseTabletId = baseTablet.getId(); long mvTabletId = idGeneratorBuffer.getNextId(); Tablet newTablet = EnvFactory.getInstance().createTablet(mvTabletId); - mvIndex.addTablet(newTablet, mvTabletMeta); + invertedIndex.addTablet(mvTabletId, mvTabletMeta); + mvTabletsForPartition.add(newTablet); addedTablets.add(newTablet); mvJob.addTabletIdMap(partitionId, mvTabletId, baseTabletId); @@ -499,6 +507,9 @@ private RollupJobV2 createMaterializedViewJob(String rawSql, String mvName, Stri } } // end for baseTablets + // Bulk-publish all MV tablets for this partition in one copy-on-write. + mvIndex.appendTablets(mvTabletsForPartition); + mvJob.addMVIndex(partitionId, mvIndex); if (LOG.isDebugEnabled()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java index 42e25ea8f0622d..e791c65088c7d4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java @@ -66,6 +66,7 @@ import org.apache.doris.catalog.Table; import org.apache.doris.catalog.TableIf.TableType; import org.apache.doris.catalog.Tablet; +import org.apache.doris.catalog.TabletInvertedIndex; import org.apache.doris.catalog.TabletMeta; import org.apache.doris.cloud.qe.ComputeGroupException; import org.apache.doris.common.AnalysisException; @@ -1680,14 +1681,23 @@ private void createJob(String rawSql, long dbId, OlapTable olapTable, Map shadowTabletsForPartition = Lists.newArrayListWithCapacity( + originIndex.getTablets().size()); + TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex(); for (Tablet originTablet : originIndex.getTablets()) { - TabletMeta shadowTabletMeta = new TabletMeta(dbId, tableId, partitionId, shadowIndexId, - newSchemaHash, medium); long originTabletId = originTablet.getId(); long shadowTabletId = idGeneratorBuffer.getNextId(); Tablet shadowTablet = EnvFactory.getInstance().createTablet(shadowTabletId); - shadowIndex.addTablet(shadowTablet, shadowTabletMeta); + invertedIndex.addTablet(shadowTabletId, shadowTabletMeta); + shadowTabletsForPartition.add(shadowTablet); addedTablets.add(shadowTablet); schemaChangeJob.addTabletIdMap(partitionId, shadowIndexId, shadowTabletId, originTabletId); @@ -1745,6 +1755,9 @@ private void createJob(String rawSql, long dbId, OlapTable olapTable, Map newTablets = new ArrayList<>(remotetabletSize); for (int i = 0; i < remotetabletSize; i++) { // generate new tablet id long newTabletId = env.getNextId(); Tablet newTablet = EnvFactory.getInstance().createTablet(newTabletId); - // add tablet to index, but not add to TabletInvertedIndex - remoteIdx.addTablet(newTablet, null /* tablet meta */, true /* is restore */); + newTablets.add(newTablet); // replicas try { Pair>, TStorageMedium> beIdsAndMedium = Env.getCurrentSystemInfo() @@ -1549,6 +1551,9 @@ protected Partition resetTabletForRestore(OlapTable localTbl, OlapTable remoteTb return null; } } + // add tablets to index in one batch; TabletInvertedIndex registration + // is intentionally skipped on the restore path (rebuilt separately). + remoteIdx.appendTablets(newTablets); } return remotePart; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/CloudTabletStatMgr.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/CloudTabletStatMgr.java index b28cc369d4676b..70ab983c231892 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/CloudTabletStatMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/CloudTabletStatMgr.java @@ -370,6 +370,7 @@ private void updateStatInfo(List dbIds) { long tabletIndexSize = 0L; long tabletSegmentSize = 0L; + // getReplicas() returns an immutable volatile snapshot; CME-safe under concurrent DDL. for (Replica replica : tablet.getReplicas()) { if (replica.getDataSize() > tabletDataSize) { tabletDataSize = replica.getDataSize(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/LocalTablet.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/LocalTablet.java index 14d9171f3ff0e4..5db0a1286b5d2f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/LocalTablet.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/LocalTablet.java @@ -31,8 +31,8 @@ import org.apache.logging.log4j.Logger; import java.util.ArrayList; +import java.util.Collections; import java.util.Comparator; -import java.util.Iterator; import java.util.List; import java.util.stream.LongStream; @@ -40,7 +40,7 @@ public class LocalTablet extends Tablet { private static final Logger LOG = LogManager.getLogger(LocalTablet.class); @SerializedName(value = "rs", alternate = {"replicas"}) - private List replicas; + private volatile List replicas; @SerializedName(value = "lastCheckTime") private long lastCheckTime; @@ -109,13 +109,14 @@ public long getRemoteDataSize() { if (cooldownReplicaId <= 0) { return 0; } - for (Replica r : replicas) { + List snapshot = replicas; // single volatile read; reuse below + for (Replica r : snapshot) { if (r.getId() == cooldownReplicaId) { return r.getRemoteDataSize(); } } // return replica with max remoteDataSize - return replicas.stream().max(Comparator.comparing(Replica::getRemoteDataSize)).get().getRemoteDataSize(); + return snapshot.stream().max(Comparator.comparing(Replica::getRemoteDataSize)).get().getRemoteDataSize(); } @Override @@ -223,29 +224,32 @@ public void setLastCheckTime(long lastCheckTime) { this.lastCheckTime = lastCheckTime; } - private boolean isLatestReplicaAndDeleteOld(Replica newReplica) { - boolean delete = false; + // Writers are synchronized on this tablet to prevent concurrent lost-update: + // some callers (e.g. InternalCatalog.createTablets, RestoreJob) do NOT hold + // the OlapTable write lock when modifying replicas. + // Readers capture the volatile reference once and iterate freely — no lock needed. + + @Override + public synchronized void addReplica(Replica replica, boolean isRestore) { + long version = replica.getVersion(); + long backendId = replica.getBackendIdWithoutException(); boolean hasBackend = false; - long version = newReplica.getVersion(); - Iterator iterator = replicas.iterator(); - while (iterator.hasNext()) { - Replica replica = iterator.next(); - if (replica.getBackendIdWithoutException() == newReplica.getBackendIdWithoutException()) { + boolean deletedOld = false; + List current = replicas; + List next = new ArrayList<>(current.size() + 1); + for (Replica r : current) { + if (r.getBackendIdWithoutException() == backendId) { hasBackend = true; - if (replica.getVersion() <= version) { - iterator.remove(); - delete = true; + if (r.getVersion() <= version) { + deletedOld = true; + continue; // drop stale replica } } + next.add(r); } - - return delete || !hasBackend; - } - - @Override - public void addReplica(Replica replica, boolean isRestore) { - if (isLatestReplicaAndDeleteOld(replica)) { - replicas.add(replica); + if (deletedOld || !hasBackend) { + next.add(replica); + replicas = next; // volatile write; readers see the new immutable snapshot if (!isRestore) { Env.getCurrentInvertedIndex().addReplica(id, replica); } @@ -254,12 +258,13 @@ public void addReplica(Replica replica, boolean isRestore) { @Override public List getReplicas() { - return this.replicas; + // Volatile read: returns the current immutable snapshot; callers iterate without locking. + return Collections.unmodifiableList(replicas); } @Override public Replica getReplicaByBackendId(long backendId) { - for (Replica replica : replicas) { + for (Replica replica : replicas) { // single volatile read if (replica.getBackendIdWithoutException() == backendId) { return replica; } @@ -268,9 +273,12 @@ public Replica getReplicaByBackendId(long backendId) { } @Override - public boolean deleteReplica(Replica replica) { - if (replicas.contains(replica)) { - replicas.remove(replica); + public synchronized boolean deleteReplica(Replica replica) { + List current = replicas; + if (current.contains(replica)) { + List next = new ArrayList<>(current); + next.remove(replica); + replicas = next; // volatile write Env.getCurrentInvertedIndex().deleteReplica(id, replica.getBackendIdWithoutException()); return true; } @@ -278,16 +286,22 @@ public boolean deleteReplica(Replica replica) { } @Override - public boolean deleteReplicaByBackendId(long backendId) { - Iterator iterator = replicas.iterator(); - while (iterator.hasNext()) { - Replica replica = iterator.next(); + public synchronized boolean deleteReplicaByBackendId(long backendId) { + List current = replicas; + List next = new ArrayList<>(current.size()); + Replica found = null; + for (Replica replica : current) { if (replica.getBackendIdWithoutException() == backendId) { - iterator.remove(); - Env.getCurrentInvertedIndex().deleteReplica(id, backendId); - return true; + found = replica; + } else { + next.add(replica); } } + if (found != null) { + replicas = next; // volatile write + Env.getCurrentInvertedIndex().deleteReplica(id, backendId); + return true; + } return false; } @@ -302,13 +316,17 @@ public boolean equals(Object obj) { LocalTablet tablet = (LocalTablet) obj; - if (replicas != tablet.replicas) { - if (replicas.size() != tablet.replicas.size()) { + // Capture one snapshot per side so a concurrent writer cannot publish + // a different list between size/contains/get calls below. + List thisReplicas = replicas; + List otherReplicas = tablet.replicas; + if (thisReplicas != otherReplicas) { + if (thisReplicas.size() != otherReplicas.size()) { return false; } - int size = replicas.size(); + int size = thisReplicas.size(); for (int i = 0; i < size; i++) { - if (!tablet.replicas.contains(replicas.get(i))) { + if (!otherReplicas.contains(thisReplicas.get(i))) { return false; } } @@ -334,7 +352,7 @@ public boolean readyToBeRepaired(SystemInfoService infoService, TabletSchedCtx.P } boolean allBeAliveOrDecommissioned = true; - for (Replica replica : replicas) { + for (Replica replica : replicas) { // single volatile read; iteration on the snapshot Backend backend = infoService.getBackend(replica.getBackendIdWithoutException()); if (backend == null || (!backend.isAlive() && !backend.isDecommissioned())) { allBeAliveOrDecommissioned = false; diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/MaterializedIndex.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/MaterializedIndex.java index b1c843613137dc..81d9e1f7dd0fff 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/MaterializedIndex.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/MaterializedIndex.java @@ -19,10 +19,11 @@ import org.apache.doris.persist.gson.GsonPostProcessable; -import com.google.common.collect.Lists; import com.google.gson.annotations.SerializedName; import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -57,10 +58,16 @@ public enum IndexExtState { @SerializedName(value = "rowCount") private long rowCount; - private Map idToTablets; + // Published as a volatile immutable snapshot in lockstep with `tablets`. + // Writers (synchronized) build a fresh HashMap and assign the field; readers + // capture the reference once and call get/containsKey on the snapshot. + // Invariant: `tablets ⊆ idToTablets` — any tablet visible in the list is also + // present in the map. This is preserved by publishing the map BEFORE the list + // on add and the list BEFORE the map on clear. + private volatile Map idToTablets; @SerializedName(value = "tablets") // this is for keeping tablet order - private List tablets; + private volatile List tablets; // for push after rollup index finished @SerializedName(value = "rollupIndexId") @@ -94,38 +101,78 @@ public MaterializedIndex(long id, IndexState state) { } public List getTablets() { - return tablets; + // Volatile read: returns the current immutable snapshot; callers iterate without locking. + return Collections.unmodifiableList(tablets); } public List getTabletIdsInOrder() { - List tabletIds = Lists.newArrayListWithCapacity(tablets.size()); - for (Tablet tablet : tablets) { + List snapshot = tablets; // single volatile read + List tabletIds = new ArrayList<>(snapshot.size()); + for (Tablet tablet : snapshot) { tabletIds.add(tablet.getId()); } return tabletIds; } public Tablet getTablet(long tabletId) { + // Single volatile read of the immutable map snapshot. return idToTablets.get(tabletId); } - public void clearTabletsForRestore() { - idToTablets.clear(); - tablets.clear(); + public synchronized void clearTabletsForRestore() { + // Drop the list first so iteration stops seeing tablets before + // lookup-by-id drops them. Maintains tablets ⊆ idToTablets. + tablets = new ArrayList<>(); + idToTablets = new HashMap<>(); } - public void addTablet(Tablet tablet, TabletMeta tabletMeta) { + public synchronized void addTablet(Tablet tablet, TabletMeta tabletMeta) { addTablet(tablet, tabletMeta, false); } - public void addTablet(Tablet tablet, TabletMeta tabletMeta, boolean isRestore) { - idToTablets.put(tablet.getId(), tablet); - tablets.add(tablet); + // Writers are synchronized on this index to prevent concurrent lost-update: + // some callers (e.g. InternalCatalog.createTablets) do NOT hold the OlapTable + // write lock when adding tablets. + // Copy-on-write keeps readers CME-safe without locking; for bulk creation use + // appendTablets(...) so the per-index tablets list is copied once per batch + // instead of once per tablet. + public synchronized void addTablet(Tablet tablet, TabletMeta tabletMeta, boolean isRestore) { + appendTabletsInternal(Collections.singletonList(tablet)); if (!isRestore) { Env.getCurrentInvertedIndex().addTablet(tablet.getId(), tabletMeta); } } + // Bulk-publish: append the given tablets to this index's tablets list in a + // single copy-on-write (O(existing + batch) instead of O(n^2) over n + // single-tablet adds inside a synchronized block). + // + // Does NOT touch TabletInvertedIndex. Bulk-creation callers register tablets + // in TabletInvertedIndex eagerly inside their per-tablet loop because + // Tablet.addReplica(...) (non-restore) requires the tablet to already be + // present in the inverted index; only the per-index list copy is expensive + // enough to be worth batching. + public synchronized void appendTablets(Collection newTablets) { + appendTabletsInternal(newTablets); + } + + private void appendTabletsInternal(Collection newTablets) { + if (newTablets.isEmpty()) { + return; + } + Map nextMap = new HashMap<>(idToTablets); + List nextList = new ArrayList<>(tablets.size() + newTablets.size()); + nextList.addAll(tablets); + for (Tablet tablet : newTablets) { + nextMap.put(tablet.getId(), tablet); + nextList.add(tablet); + } + // Publish the map first, then the list — so any id that appears in the + // visible `tablets` snapshot is already present in `idToTablets`. + idToTablets = nextMap; + tablets = nextList; + } + public void setIdForRestore(long idxId) { this.id = idxId; } @@ -233,8 +280,9 @@ public long getRemoteSegmentSize() { } public int getTabletOrderIdx(long tabletId) { + List snapshot = tablets; // single volatile read int idx = 0; - for (Tablet tablet : tablets) { + for (Tablet tablet : snapshot) { if (tablet.getId() == tabletId) { return idx; } @@ -271,15 +319,16 @@ public boolean equals(Object obj) { @Override public String toString() { + List snapshot = tablets; // single volatile read StringBuilder buffer = new StringBuilder(); buffer.append("index id: ").append(id).append("; "); buffer.append("index state: ").append(state.name()).append("; "); buffer.append("row count: ").append(rowCount).append("; "); - buffer.append("tablets size: ").append(tablets.size()).append("; "); + buffer.append("tablets size: ").append(snapshot.size()).append("; "); // buffer.append("tablets: ["); - for (Tablet tablet : tablets) { + for (Tablet tablet : snapshot) { buffer.append("tablet: ").append(tablet.toString()).append(", "); } buffer.append("]; "); @@ -292,9 +341,13 @@ public String toString() { @Override public void gsonPostProcess() { - // build "idToTablets" from "tablets" + // Build a fresh "idToTablets" snapshot from the deserialized "tablets" list. + // Runs single-threaded during gson deserialization, before any concurrent + // reader can observe this object. + Map map = new HashMap<>(tablets.size()); for (Tablet tablet : tablets) { - idToTablets.put(tablet.getId(), tablet); + map.put(tablet.getId(), tablet); } + idToTablets = map; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index 5555339ed63142..b506b14179bdc3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -945,10 +945,12 @@ && getTableProperty().getDynamicPartitionProperty().getBuckets() // generate new tablets in origin tablet order int tabletNum = idx.getTablets().size(); idx.clearTabletsForRestore(); + // Collect locally and bulk-publish to keep copy-on-write O(n) for the whole index. + List newTablets = new ArrayList<>(tabletNum); for (int i = 0; i < tabletNum; i++) { long newTabletId = env.getNextId(); Tablet newTablet = EnvFactory.getInstance().createTablet(newTabletId); - idx.addTablet(newTablet, null /* tablet meta */, true /* is restore */); + newTablets.add(newTablet); // replicas if (Config.isCloudMode()) { long newReplicaId = Env.getCurrentEnv().getNextId(); @@ -988,6 +990,9 @@ && getTableProperty().getDynamicPartitionProperty().getBuckets() return new Status(ErrCode.COMMON_ERROR, e.getMessage()); } } + // add tablets to index in one batch; TabletInvertedIndex registration + // is intentionally skipped on the restore path (rebuilt separately). + idx.appendTablets(newTablets); } if (createNewColocateGroup) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java index a493aecc4ac5a3..7cdb42315048e2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java @@ -369,7 +369,14 @@ private void updateTabletStat(Long beId, TTabletStatResult result) { if (result.isSetTabletStatList()) { for (TTabletStat stat : result.getTabletStatList()) { if (invertedIndex.getTabletMeta(stat.getTabletId()) != null) { - Replica replica = invertedIndex.getReplica(stat.getTabletId(), beId); + Replica replica; + try { + replica = invertedIndex.getReplica(stat.getTabletId(), beId); + } catch (IllegalStateException e) { + LOG.debug("skip stale tablet stat update for tablet {} on backend {}: {}", + stat.getTabletId(), beId, e.getMessage()); + continue; + } if (replica != null) { replica.setDataSize(stat.getDataSize()); replica.setRemoteDataSize(stat.getRemoteDataSize()); @@ -393,7 +400,14 @@ private void updateTabletStat(Long beId, TTabletStatResult result) { // the replica is obsolete, ignore it. continue; } - Replica replica = invertedIndex.getReplica(entry.getKey(), beId); + Replica replica; + try { + replica = invertedIndex.getReplica(entry.getKey(), beId); + } catch (IllegalStateException e) { + LOG.debug("skip stale tablet stat update for tablet {} on backend {}: {}", + entry.getKey(), beId, e.getMessage()); + continue; + } if (replica == null) { // replica may be deleted from catalog, ignore it. continue; diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/backup/CloudRestoreJob.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/backup/CloudRestoreJob.java index b1c111ee4a1f94..4b724199a13bce 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/backup/CloudRestoreJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/backup/CloudRestoreJob.java @@ -333,18 +333,22 @@ public Partition resetTabletForRestore(OlapTable localTbl, OlapTable remoteTbl, int schemaHash = remoteTbl.getSchemaHashByIndexId(remoteIdx.getId()); int remotetabletSize = remoteIdx.getTablets().size(); remoteIdx.clearTabletsForRestore(); + // Collect locally and bulk-publish to keep copy-on-write O(n) for the whole index. + List newTablets = new ArrayList<>(remotetabletSize); for (int i = 0; i < remotetabletSize; i++) { // generate new tablet id long newTabletId = env.getNextId(); Tablet newTablet = EnvFactory.getInstance().createTablet(newTabletId); - // add tablet to index, but not add to TabletInvertedIndex - remoteIdx.addTablet(newTablet, null /* tablet meta */, true /* is restore */); + newTablets.add(newTablet); // replicas long newReplicaId = Env.getCurrentEnv().getNextId(); Replica replica = new CloudReplica(newReplicaId, null, Replica.ReplicaState.NORMAL, visibleVersion, schemaHash, dbId, localTbl.getId(), partitionId, remoteIdx.getId(), i); newTablet.addReplica(replica, true /* is restore */); } + // add tablets to index in one batch; TabletInvertedIndex registration + // is intentionally skipped on the restore path (rebuilt separately). + remoteIdx.appendTablets(newTablets); } return remotePart; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java index 0c7891c50b270c..0b2bd821459f96 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java @@ -39,6 +39,7 @@ import org.apache.doris.catalog.ReplicaAllocation; import org.apache.doris.catalog.Table; import org.apache.doris.catalog.Tablet; +import org.apache.doris.catalog.TabletInvertedIndex; import org.apache.doris.catalog.TabletMeta; import org.apache.doris.cloud.catalog.CloudEnv; import org.apache.doris.cloud.catalog.CloudPartition; @@ -80,6 +81,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -432,11 +434,16 @@ private OlapFile.RowsetMetaCloudPB.Builder createInitialRowset(Tablet tablet, lo private void createCloudTablets(MaterializedIndex index, ReplicaState replicaState, DistributionInfo distributionInfo, long version, ReplicaAllocation replicaAlloc, TabletMeta tabletMeta, Set tabletIdSet) throws DdlException { + // Collect bucket tablets locally and bulk-publish to the MaterializedIndex's + // tablets list in a single copy-on-write after the loop (see + // InternalCatalog.createTablets for rationale). + TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex(); + List bucketTablets = new ArrayList<>(distributionInfo.getBucketNum()); for (int i = 0; i < distributionInfo.getBucketNum(); ++i) { Tablet tablet = EnvFactory.getInstance().createTablet(Env.getCurrentEnv().getNextId()); - // add tablet to inverted index first - index.addTablet(tablet, tabletMeta); + invertedIndex.addTablet(tablet.getId(), tabletMeta); + bucketTablets.add(tablet); tabletIdSet.add(tablet.getId()); long replicaId = Env.getCurrentEnv().getNextId(); @@ -445,6 +452,7 @@ private void createCloudTablets(MaterializedIndex index, ReplicaState replicaSta tabletMeta.getPartitionId(), tabletMeta.getIndexId(), i); tablet.addReplica(replica); } + index.appendTablets(bucketTablets); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java index 96230b04665460..b5ca1a571f5b8b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java @@ -3494,12 +3494,19 @@ public TStorageMedium createTablets(MaterializedIndex index, ReplicaState replic } } + // Collect bucket tablets locally and bulk-publish to the MaterializedIndex's + // tablets list in a single copy-on-write after the loop (O(bucketNum) instead + // of O(bucketNum^2)). TabletInvertedIndex registration stays per-iteration + // because Tablet.addReplica(...) below needs the tablet present in the + // inverted index. + TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex(); + List bucketTablets = new ArrayList<>(distributionInfo.getBucketNum()); for (int i = 0; i < distributionInfo.getBucketNum(); ++i) { // create a new tablet with random chosen backends Tablet tablet = EnvFactory.getInstance().createTablet(idGeneratorBuffer.getNextId()); - // add tablet to inverted index first - index.addTablet(tablet, tabletMeta); + invertedIndex.addTablet(tablet.getId(), tabletMeta); + bucketTablets.add(tablet); tabletIdSet.add(tablet.getId()); // get BackendIds @@ -3539,6 +3546,9 @@ public TStorageMedium createTablets(MaterializedIndex index, ReplicaState replic totalReplicaNum + " vs. " + replicaAlloc.getTotalReplicaNum()); } + // Publish all bucket tablets to the materialized index in one batch. + index.appendTablets(bucketTablets); + if (groupId != null && chooseBackendsArbitrary) { colocateIndex.addBackendsPerBucketSeq(groupId, backendsPerBucketSeq); ColocatePersistInfo info = ColocatePersistInfo.createForBackendsPerBucketSeq(groupId, backendsPerBucketSeq); diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/MaterializedIndexTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/MaterializedIndexTest.java index 25b94bc59e828d..bf10b2556766e6 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/MaterializedIndexTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/MaterializedIndexTest.java @@ -21,6 +21,7 @@ import org.apache.doris.common.FeConstants; import org.apache.doris.common.io.Text; import org.apache.doris.persist.gson.GsonUtils; +import org.apache.doris.thrift.TStorageMedium; import mockit.Mocked; import org.junit.Assert; @@ -34,6 +35,8 @@ import java.nio.file.Paths; import java.util.LinkedList; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; public class MaterializedIndexTest { @@ -66,6 +69,68 @@ public void getMethodTest() { Assert.assertEquals(indexId, index.getId()); } + @Test + public void testGetTabletsReturnsImmutableSnapshot() { + TabletMeta tabletMeta = new TabletMeta(10, 20, 30, 40, 1, TStorageMedium.HDD); + index.addTablet(new LocalTablet(1L), tabletMeta, true); + + List snapshot = index.getTablets(); + Assert.assertEquals(1, snapshot.size()); + + // A write after the snapshot was taken must not be visible in it (copy-on-write). + index.addTablet(new LocalTablet(2L), tabletMeta, true); + Assert.assertEquals(1, snapshot.size()); + Assert.assertEquals(2, index.getTablets().size()); + + // The returned snapshot is read-only. + Assert.assertThrows(UnsupportedOperationException.class, () -> snapshot.add(new LocalTablet(3L))); + } + + @Test + public void testConcurrentGetTabletsNeverThrows() throws InterruptedException { + // A reader repeatedly snapshots and iterates getTablets() while a writer keeps + // adding tablets. Copy-on-write guarantees the reader never observes a partially + // built list or throws ConcurrentModificationException. + TabletMeta tabletMeta = new TabletMeta(10, 20, 30, 40, 1, TStorageMedium.HDD); + AtomicReference error = new AtomicReference<>(); + AtomicBoolean stop = new AtomicBoolean(false); + + Thread writer = new Thread(() -> { + long id = 1000L; + while (!stop.get()) { + index.addTablet(new LocalTablet(id++), tabletMeta, true); + // Keep the list bounded (and exercise the clear path) so the test stays fast. + if (index.getTablets().size() > 64) { + index.clearTabletsForRestore(); + } + } + }); + + Thread reader = new Thread(() -> { + try { + for (int i = 0; i < 50000 && error.get() == null; i++) { + for (Tablet tablet : index.getTablets()) { + tablet.getId(); + } + } + } catch (Throwable t) { + error.set(t); + } finally { + stop.set(true); + } + }); + + writer.start(); + reader.start(); + reader.join(); + stop.set(true); + writer.join(); + + if (error.get() != null) { + Assert.fail("getTablets() iteration threw under concurrent mutation: " + error.get()); + } + } + @Test public void testSerialization() throws Exception { // 1. Write objects to file diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/TabletTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/TabletTest.java index 27fd954e3315a3..144e61e823637e 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/TabletTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/TabletTest.java @@ -37,6 +37,9 @@ import java.io.DataOutputStream; import java.nio.file.Files; import java.nio.file.Path; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; public class TabletTest { @@ -124,6 +127,82 @@ public void deleteReplicaTest() { Assert.assertEquals(1, tablet.getReplicas().size()); } + @Test + public void testGetReplicasReturnsImmutableSnapshot() { + List snapshot = tablet.getReplicas(); + Assert.assertEquals(3, snapshot.size()); + + // A write after the snapshot was taken must not be visible in it (copy-on-write). + Replica replica4 = new LocalReplica(4L, 4L, 100L, 0, 200000L, 0, 3000L, ReplicaState.NORMAL, 0, 0); + tablet.addReplica(replica4); + Assert.assertEquals(3, snapshot.size()); + Assert.assertEquals(4, tablet.getReplicas().size()); + + // The returned snapshot is read-only. + Assert.assertThrows(UnsupportedOperationException.class, () -> snapshot.add(replica4)); + } + + @Test + public void testIterateReplicasWhileMutatingDoesNotThrow() { + // Iterating the snapshot returned by getReplicas() must not throw + // ConcurrentModificationException even when the tablet is structurally modified + // during iteration. + int seen = 0; + for (Replica r : tablet.getReplicas()) { + Assert.assertNotNull(r); + tablet.addReplica(new LocalReplica(100L + seen, 100L + seen, 100L, 0, 200000L, 0, 3000L, + ReplicaState.NORMAL, 0, 0)); + tablet.deleteReplicaByBackendId(2L); + seen++; + } + Assert.assertEquals(3, seen); + } + + @Test + public void testConcurrentGetReplicasNeverThrows() throws InterruptedException { + // A reader repeatedly snapshots and iterates getReplicas() while a writer keeps + // mutating the replica list. Copy-on-write guarantees the reader never observes a + // partially built list or throws ConcurrentModificationException. + AtomicReference error = new AtomicReference<>(); + AtomicBoolean stop = new AtomicBoolean(false); + + Thread writer = new Thread(() -> { + long id = 1000L; + while (!stop.get()) { + // Reuse a small set of backend ids so the list stays bounded while still + // exercising the add/replace path. + long beId = id % 8; + tablet.addReplica(new LocalReplica(id, beId, 100L, 0, 200000L, 0, 3000L, + ReplicaState.NORMAL, 0, 0), true); + id++; + } + }); + + Thread reader = new Thread(() -> { + try { + for (int i = 0; i < 50000 && error.get() == null; i++) { + for (Replica r : tablet.getReplicas()) { + r.getId(); + } + } + } catch (Throwable t) { + error.set(t); + } finally { + stop.set(true); + } + }); + + writer.start(); + reader.start(); + reader.join(); + stop.set(true); + writer.join(); + + if (error.get() != null) { + Assert.fail("getReplicas() iteration threw under concurrent mutation: " + error.get()); + } + } + @Test public void testSerialization() throws Exception { final Path path = Files.createTempFile("olapTabletTest", "tmp");