diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java index 37ec3255f1bc85..5e3e64bf8c828f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/MaterializedViewHandler.java @@ -437,14 +437,22 @@ private RollupJobV2 createMaterializedViewJob(String rawSql, String mvName, Stri MaterializedIndex mvIndex = new MaterializedIndex(mvIndexId, IndexState.SHADOW); MaterializedIndex baseIndex = partition.getIndex(baseIndexId); short replicationNum = olapTable.getPartitionInfo().getReplicaAllocation(partitionId).getTotalReplicaNum(); + // All MV tablets of the same (partition, mv index) share the same TabletMeta; + // build it once and bulk-publish to MaterializedIndex.tablets after the per-tablet + // loop to keep copy-on-write O(n). TabletInvertedIndex registration stays + // per-iteration because Tablet.addReplica(...) below needs the tablet present + // in the inverted index. + TabletMeta mvTabletMeta = new TabletMeta( + dbId, tableId, partitionId, mvIndexId, mvSchemaHash, medium); + List mvTabletsForPartition = Lists.newArrayListWithCapacity(baseIndex.getTablets().size()); + TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex(); for (Tablet baseTablet : baseIndex.getTablets()) { - TabletMeta mvTabletMeta = new TabletMeta( - dbId, tableId, partitionId, mvIndexId, mvSchemaHash, medium); long baseTabletId = baseTablet.getId(); long mvTabletId = idGeneratorBuffer.getNextId(); Tablet newTablet = EnvFactory.getInstance().createTablet(mvTabletId); - mvIndex.addTablet(newTablet, mvTabletMeta); + invertedIndex.addTablet(mvTabletId, mvTabletMeta); + mvTabletsForPartition.add(newTablet); addedTablets.add(newTablet); mvJob.addTabletIdMap(partitionId, mvTabletId, baseTabletId); @@ -499,6 +507,9 @@ private RollupJobV2 createMaterializedViewJob(String rawSql, String mvName, Stri } } // end for baseTablets + // Bulk-publish all MV tablets for this partition in one copy-on-write. + mvIndex.appendTablets(mvTabletsForPartition); + mvJob.addMVIndex(partitionId, mvIndex); if (LOG.isDebugEnabled()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java index 42e25ea8f0622d..e791c65088c7d4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java @@ -66,6 +66,7 @@ import org.apache.doris.catalog.Table; import org.apache.doris.catalog.TableIf.TableType; import org.apache.doris.catalog.Tablet; +import org.apache.doris.catalog.TabletInvertedIndex; import org.apache.doris.catalog.TabletMeta; import org.apache.doris.cloud.qe.ComputeGroupException; import org.apache.doris.common.AnalysisException; @@ -1680,14 +1681,23 @@ private void createJob(String rawSql, long dbId, OlapTable olapTable, Map shadowTabletsForPartition = Lists.newArrayListWithCapacity( + originIndex.getTablets().size()); + TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex(); for (Tablet originTablet : originIndex.getTablets()) { - TabletMeta shadowTabletMeta = new TabletMeta(dbId, tableId, partitionId, shadowIndexId, - newSchemaHash, medium); long originTabletId = originTablet.getId(); long shadowTabletId = idGeneratorBuffer.getNextId(); Tablet shadowTablet = EnvFactory.getInstance().createTablet(shadowTabletId); - shadowIndex.addTablet(shadowTablet, shadowTabletMeta); + invertedIndex.addTablet(shadowTabletId, shadowTabletMeta); + shadowTabletsForPartition.add(shadowTablet); addedTablets.add(shadowTablet); schemaChangeJob.addTabletIdMap(partitionId, shadowIndexId, shadowTabletId, originTabletId); @@ -1745,6 +1755,9 @@ private void createJob(String rawSql, long dbId, OlapTable olapTable, Map newTablets = new ArrayList<>(remotetabletSize); for (int i = 0; i < remotetabletSize; i++) { // generate new tablet id long newTabletId = env.getNextId(); Tablet newTablet = EnvFactory.getInstance().createTablet(newTabletId); - // add tablet to index, but not add to TabletInvertedIndex - remoteIdx.addTablet(newTablet, null /* tablet meta */, true /* is restore */); + newTablets.add(newTablet); // replicas try { Pair>, TStorageMedium> beIdsAndMedium = Env.getCurrentSystemInfo() @@ -1549,6 +1551,9 @@ protected Partition resetTabletForRestore(OlapTable localTbl, OlapTable remoteTb return null; } } + // add tablets to index in one batch; TabletInvertedIndex registration + // is intentionally skipped on the restore path (rebuilt separately). + remoteIdx.appendTablets(newTablets); } return remotePart; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/CloudTabletStatMgr.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/CloudTabletStatMgr.java index b28cc369d4676b..70ab983c231892 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/CloudTabletStatMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/CloudTabletStatMgr.java @@ -370,6 +370,7 @@ private void updateStatInfo(List dbIds) { long tabletIndexSize = 0L; long tabletSegmentSize = 0L; + // getReplicas() returns an immutable volatile snapshot; CME-safe under concurrent DDL. for (Replica replica : tablet.getReplicas()) { if (replica.getDataSize() > tabletDataSize) { tabletDataSize = replica.getDataSize(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/LocalTablet.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/LocalTablet.java index 14d9171f3ff0e4..5db0a1286b5d2f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/LocalTablet.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/LocalTablet.java @@ -31,8 +31,8 @@ import org.apache.logging.log4j.Logger; import java.util.ArrayList; +import java.util.Collections; import java.util.Comparator; -import java.util.Iterator; import java.util.List; import java.util.stream.LongStream; @@ -40,7 +40,7 @@ public class LocalTablet extends Tablet { private static final Logger LOG = LogManager.getLogger(LocalTablet.class); @SerializedName(value = "rs", alternate = {"replicas"}) - private List replicas; + private volatile List replicas; @SerializedName(value = "lastCheckTime") private long lastCheckTime; @@ -109,13 +109,14 @@ public long getRemoteDataSize() { if (cooldownReplicaId <= 0) { return 0; } - for (Replica r : replicas) { + List snapshot = replicas; // single volatile read; reuse below + for (Replica r : snapshot) { if (r.getId() == cooldownReplicaId) { return r.getRemoteDataSize(); } } // return replica with max remoteDataSize - return replicas.stream().max(Comparator.comparing(Replica::getRemoteDataSize)).get().getRemoteDataSize(); + return snapshot.stream().max(Comparator.comparing(Replica::getRemoteDataSize)).get().getRemoteDataSize(); } @Override @@ -223,29 +224,32 @@ public void setLastCheckTime(long lastCheckTime) { this.lastCheckTime = lastCheckTime; } - private boolean isLatestReplicaAndDeleteOld(Replica newReplica) { - boolean delete = false; + // Writers are synchronized on this tablet to prevent concurrent lost-update: + // some callers (e.g. InternalCatalog.createTablets, RestoreJob) do NOT hold + // the OlapTable write lock when modifying replicas. + // Readers capture the volatile reference once and iterate freely — no lock needed. + + @Override + public synchronized void addReplica(Replica replica, boolean isRestore) { + long version = replica.getVersion(); + long backendId = replica.getBackendIdWithoutException(); boolean hasBackend = false; - long version = newReplica.getVersion(); - Iterator iterator = replicas.iterator(); - while (iterator.hasNext()) { - Replica replica = iterator.next(); - if (replica.getBackendIdWithoutException() == newReplica.getBackendIdWithoutException()) { + boolean deletedOld = false; + List current = replicas; + List next = new ArrayList<>(current.size() + 1); + for (Replica r : current) { + if (r.getBackendIdWithoutException() == backendId) { hasBackend = true; - if (replica.getVersion() <= version) { - iterator.remove(); - delete = true; + if (r.getVersion() <= version) { + deletedOld = true; + continue; // drop stale replica } } + next.add(r); } - - return delete || !hasBackend; - } - - @Override - public void addReplica(Replica replica, boolean isRestore) { - if (isLatestReplicaAndDeleteOld(replica)) { - replicas.add(replica); + if (deletedOld || !hasBackend) { + next.add(replica); + replicas = next; // volatile write; readers see the new immutable snapshot if (!isRestore) { Env.getCurrentInvertedIndex().addReplica(id, replica); } @@ -254,12 +258,13 @@ public void addReplica(Replica replica, boolean isRestore) { @Override public List getReplicas() { - return this.replicas; + // Volatile read: returns the current immutable snapshot; callers iterate without locking. + return Collections.unmodifiableList(replicas); } @Override public Replica getReplicaByBackendId(long backendId) { - for (Replica replica : replicas) { + for (Replica replica : replicas) { // single volatile read if (replica.getBackendIdWithoutException() == backendId) { return replica; } @@ -268,9 +273,12 @@ public Replica getReplicaByBackendId(long backendId) { } @Override - public boolean deleteReplica(Replica replica) { - if (replicas.contains(replica)) { - replicas.remove(replica); + public synchronized boolean deleteReplica(Replica replica) { + List current = replicas; + if (current.contains(replica)) { + List next = new ArrayList<>(current); + next.remove(replica); + replicas = next; // volatile write Env.getCurrentInvertedIndex().deleteReplica(id, replica.getBackendIdWithoutException()); return true; } @@ -278,16 +286,22 @@ public boolean deleteReplica(Replica replica) { } @Override - public boolean deleteReplicaByBackendId(long backendId) { - Iterator iterator = replicas.iterator(); - while (iterator.hasNext()) { - Replica replica = iterator.next(); + public synchronized boolean deleteReplicaByBackendId(long backendId) { + List current = replicas; + List next = new ArrayList<>(current.size()); + Replica found = null; + for (Replica replica : current) { if (replica.getBackendIdWithoutException() == backendId) { - iterator.remove(); - Env.getCurrentInvertedIndex().deleteReplica(id, backendId); - return true; + found = replica; + } else { + next.add(replica); } } + if (found != null) { + replicas = next; // volatile write + Env.getCurrentInvertedIndex().deleteReplica(id, backendId); + return true; + } return false; } @@ -302,13 +316,17 @@ public boolean equals(Object obj) { LocalTablet tablet = (LocalTablet) obj; - if (replicas != tablet.replicas) { - if (replicas.size() != tablet.replicas.size()) { + // Capture one snapshot per side so a concurrent writer cannot publish + // a different list between size/contains/get calls below. + List thisReplicas = replicas; + List otherReplicas = tablet.replicas; + if (thisReplicas != otherReplicas) { + if (thisReplicas.size() != otherReplicas.size()) { return false; } - int size = replicas.size(); + int size = thisReplicas.size(); for (int i = 0; i < size; i++) { - if (!tablet.replicas.contains(replicas.get(i))) { + if (!otherReplicas.contains(thisReplicas.get(i))) { return false; } } @@ -334,7 +352,7 @@ public boolean readyToBeRepaired(SystemInfoService infoService, TabletSchedCtx.P } boolean allBeAliveOrDecommissioned = true; - for (Replica replica : replicas) { + for (Replica replica : replicas) { // single volatile read; iteration on the snapshot Backend backend = infoService.getBackend(replica.getBackendIdWithoutException()); if (backend == null || (!backend.isAlive() && !backend.isDecommissioned())) { allBeAliveOrDecommissioned = false; diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/MaterializedIndex.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/MaterializedIndex.java index b1c843613137dc..81d9e1f7dd0fff 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/MaterializedIndex.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/MaterializedIndex.java @@ -19,10 +19,11 @@ import org.apache.doris.persist.gson.GsonPostProcessable; -import com.google.common.collect.Lists; import com.google.gson.annotations.SerializedName; import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -57,10 +58,16 @@ public enum IndexExtState { @SerializedName(value = "rowCount") private long rowCount; - private Map idToTablets; + // Published as a volatile immutable snapshot in lockstep with `tablets`. + // Writers (synchronized) build a fresh HashMap and assign the field; readers + // capture the reference once and call get/containsKey on the snapshot. + // Invariant: `tablets ⊆ idToTablets` — any tablet visible in the list is also + // present in the map. This is preserved by publishing the map BEFORE the list + // on add and the list BEFORE the map on clear. + private volatile Map idToTablets; @SerializedName(value = "tablets") // this is for keeping tablet order - private List tablets; + private volatile List tablets; // for push after rollup index finished @SerializedName(value = "rollupIndexId") @@ -94,38 +101,78 @@ public MaterializedIndex(long id, IndexState state) { } public List getTablets() { - return tablets; + // Volatile read: returns the current immutable snapshot; callers iterate without locking. + return Collections.unmodifiableList(tablets); } public List getTabletIdsInOrder() { - List tabletIds = Lists.newArrayListWithCapacity(tablets.size()); - for (Tablet tablet : tablets) { + List snapshot = tablets; // single volatile read + List tabletIds = new ArrayList<>(snapshot.size()); + for (Tablet tablet : snapshot) { tabletIds.add(tablet.getId()); } return tabletIds; } public Tablet getTablet(long tabletId) { + // Single volatile read of the immutable map snapshot. return idToTablets.get(tabletId); } - public void clearTabletsForRestore() { - idToTablets.clear(); - tablets.clear(); + public synchronized void clearTabletsForRestore() { + // Drop the list first so iteration stops seeing tablets before + // lookup-by-id drops them. Maintains tablets ⊆ idToTablets. + tablets = new ArrayList<>(); + idToTablets = new HashMap<>(); } - public void addTablet(Tablet tablet, TabletMeta tabletMeta) { + public synchronized void addTablet(Tablet tablet, TabletMeta tabletMeta) { addTablet(tablet, tabletMeta, false); } - public void addTablet(Tablet tablet, TabletMeta tabletMeta, boolean isRestore) { - idToTablets.put(tablet.getId(), tablet); - tablets.add(tablet); + // Writers are synchronized on this index to prevent concurrent lost-update: + // some callers (e.g. InternalCatalog.createTablets) do NOT hold the OlapTable + // write lock when adding tablets. + // Copy-on-write keeps readers CME-safe without locking; for bulk creation use + // appendTablets(...) so the per-index tablets list is copied once per batch + // instead of once per tablet. + public synchronized void addTablet(Tablet tablet, TabletMeta tabletMeta, boolean isRestore) { + appendTabletsInternal(Collections.singletonList(tablet)); if (!isRestore) { Env.getCurrentInvertedIndex().addTablet(tablet.getId(), tabletMeta); } } + // Bulk-publish: append the given tablets to this index's tablets list in a + // single copy-on-write (O(existing + batch) instead of O(n^2) over n + // single-tablet adds inside a synchronized block). + // + // Does NOT touch TabletInvertedIndex. Bulk-creation callers register tablets + // in TabletInvertedIndex eagerly inside their per-tablet loop because + // Tablet.addReplica(...) (non-restore) requires the tablet to already be + // present in the inverted index; only the per-index list copy is expensive + // enough to be worth batching. + public synchronized void appendTablets(Collection newTablets) { + appendTabletsInternal(newTablets); + } + + private void appendTabletsInternal(Collection newTablets) { + if (newTablets.isEmpty()) { + return; + } + Map nextMap = new HashMap<>(idToTablets); + List nextList = new ArrayList<>(tablets.size() + newTablets.size()); + nextList.addAll(tablets); + for (Tablet tablet : newTablets) { + nextMap.put(tablet.getId(), tablet); + nextList.add(tablet); + } + // Publish the map first, then the list — so any id that appears in the + // visible `tablets` snapshot is already present in `idToTablets`. + idToTablets = nextMap; + tablets = nextList; + } + public void setIdForRestore(long idxId) { this.id = idxId; } @@ -233,8 +280,9 @@ public long getRemoteSegmentSize() { } public int getTabletOrderIdx(long tabletId) { + List snapshot = tablets; // single volatile read int idx = 0; - for (Tablet tablet : tablets) { + for (Tablet tablet : snapshot) { if (tablet.getId() == tabletId) { return idx; } @@ -271,15 +319,16 @@ public boolean equals(Object obj) { @Override public String toString() { + List snapshot = tablets; // single volatile read StringBuilder buffer = new StringBuilder(); buffer.append("index id: ").append(id).append("; "); buffer.append("index state: ").append(state.name()).append("; "); buffer.append("row count: ").append(rowCount).append("; "); - buffer.append("tablets size: ").append(tablets.size()).append("; "); + buffer.append("tablets size: ").append(snapshot.size()).append("; "); // buffer.append("tablets: ["); - for (Tablet tablet : tablets) { + for (Tablet tablet : snapshot) { buffer.append("tablet: ").append(tablet.toString()).append(", "); } buffer.append("]; "); @@ -292,9 +341,13 @@ public String toString() { @Override public void gsonPostProcess() { - // build "idToTablets" from "tablets" + // Build a fresh "idToTablets" snapshot from the deserialized "tablets" list. + // Runs single-threaded during gson deserialization, before any concurrent + // reader can observe this object. + Map map = new HashMap<>(tablets.size()); for (Tablet tablet : tablets) { - idToTablets.put(tablet.getId(), tablet); + map.put(tablet.getId(), tablet); } + idToTablets = map; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index 5555339ed63142..b506b14179bdc3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -945,10 +945,12 @@ && getTableProperty().getDynamicPartitionProperty().getBuckets() // generate new tablets in origin tablet order int tabletNum = idx.getTablets().size(); idx.clearTabletsForRestore(); + // Collect locally and bulk-publish to keep copy-on-write O(n) for the whole index. + List newTablets = new ArrayList<>(tabletNum); for (int i = 0; i < tabletNum; i++) { long newTabletId = env.getNextId(); Tablet newTablet = EnvFactory.getInstance().createTablet(newTabletId); - idx.addTablet(newTablet, null /* tablet meta */, true /* is restore */); + newTablets.add(newTablet); // replicas if (Config.isCloudMode()) { long newReplicaId = Env.getCurrentEnv().getNextId(); @@ -988,6 +990,9 @@ && getTableProperty().getDynamicPartitionProperty().getBuckets() return new Status(ErrCode.COMMON_ERROR, e.getMessage()); } } + // add tablets to index in one batch; TabletInvertedIndex registration + // is intentionally skipped on the restore path (rebuilt separately). + idx.appendTablets(newTablets); } if (createNewColocateGroup) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java index a493aecc4ac5a3..7cdb42315048e2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletStatMgr.java @@ -369,7 +369,14 @@ private void updateTabletStat(Long beId, TTabletStatResult result) { if (result.isSetTabletStatList()) { for (TTabletStat stat : result.getTabletStatList()) { if (invertedIndex.getTabletMeta(stat.getTabletId()) != null) { - Replica replica = invertedIndex.getReplica(stat.getTabletId(), beId); + Replica replica; + try { + replica = invertedIndex.getReplica(stat.getTabletId(), beId); + } catch (IllegalStateException e) { + LOG.debug("skip stale tablet stat update for tablet {} on backend {}: {}", + stat.getTabletId(), beId, e.getMessage()); + continue; + } if (replica != null) { replica.setDataSize(stat.getDataSize()); replica.setRemoteDataSize(stat.getRemoteDataSize()); @@ -393,7 +400,14 @@ private void updateTabletStat(Long beId, TTabletStatResult result) { // the replica is obsolete, ignore it. continue; } - Replica replica = invertedIndex.getReplica(entry.getKey(), beId); + Replica replica; + try { + replica = invertedIndex.getReplica(entry.getKey(), beId); + } catch (IllegalStateException e) { + LOG.debug("skip stale tablet stat update for tablet {} on backend {}: {}", + entry.getKey(), beId, e.getMessage()); + continue; + } if (replica == null) { // replica may be deleted from catalog, ignore it. continue; diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/backup/CloudRestoreJob.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/backup/CloudRestoreJob.java index b1c111ee4a1f94..4b724199a13bce 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/backup/CloudRestoreJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/backup/CloudRestoreJob.java @@ -333,18 +333,22 @@ public Partition resetTabletForRestore(OlapTable localTbl, OlapTable remoteTbl, int schemaHash = remoteTbl.getSchemaHashByIndexId(remoteIdx.getId()); int remotetabletSize = remoteIdx.getTablets().size(); remoteIdx.clearTabletsForRestore(); + // Collect locally and bulk-publish to keep copy-on-write O(n) for the whole index. + List newTablets = new ArrayList<>(remotetabletSize); for (int i = 0; i < remotetabletSize; i++) { // generate new tablet id long newTabletId = env.getNextId(); Tablet newTablet = EnvFactory.getInstance().createTablet(newTabletId); - // add tablet to index, but not add to TabletInvertedIndex - remoteIdx.addTablet(newTablet, null /* tablet meta */, true /* is restore */); + newTablets.add(newTablet); // replicas long newReplicaId = Env.getCurrentEnv().getNextId(); Replica replica = new CloudReplica(newReplicaId, null, Replica.ReplicaState.NORMAL, visibleVersion, schemaHash, dbId, localTbl.getId(), partitionId, remoteIdx.getId(), i); newTablet.addReplica(replica, true /* is restore */); } + // add tablets to index in one batch; TabletInvertedIndex registration + // is intentionally skipped on the restore path (rebuilt separately). + remoteIdx.appendTablets(newTablets); } return remotePart; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java index 0c7891c50b270c..0b2bd821459f96 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/datasource/CloudInternalCatalog.java @@ -39,6 +39,7 @@ import org.apache.doris.catalog.ReplicaAllocation; import org.apache.doris.catalog.Table; import org.apache.doris.catalog.Tablet; +import org.apache.doris.catalog.TabletInvertedIndex; import org.apache.doris.catalog.TabletMeta; import org.apache.doris.cloud.catalog.CloudEnv; import org.apache.doris.cloud.catalog.CloudPartition; @@ -80,6 +81,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -432,11 +434,16 @@ private OlapFile.RowsetMetaCloudPB.Builder createInitialRowset(Tablet tablet, lo private void createCloudTablets(MaterializedIndex index, ReplicaState replicaState, DistributionInfo distributionInfo, long version, ReplicaAllocation replicaAlloc, TabletMeta tabletMeta, Set tabletIdSet) throws DdlException { + // Collect bucket tablets locally and bulk-publish to the MaterializedIndex's + // tablets list in a single copy-on-write after the loop (see + // InternalCatalog.createTablets for rationale). + TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex(); + List bucketTablets = new ArrayList<>(distributionInfo.getBucketNum()); for (int i = 0; i < distributionInfo.getBucketNum(); ++i) { Tablet tablet = EnvFactory.getInstance().createTablet(Env.getCurrentEnv().getNextId()); - // add tablet to inverted index first - index.addTablet(tablet, tabletMeta); + invertedIndex.addTablet(tablet.getId(), tabletMeta); + bucketTablets.add(tablet); tabletIdSet.add(tablet.getId()); long replicaId = Env.getCurrentEnv().getNextId(); @@ -445,6 +452,7 @@ private void createCloudTablets(MaterializedIndex index, ReplicaState replicaSta tabletMeta.getPartitionId(), tabletMeta.getIndexId(), i); tablet.addReplica(replica); } + index.appendTablets(bucketTablets); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java index 96230b04665460..b5ca1a571f5b8b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java @@ -3494,12 +3494,19 @@ public TStorageMedium createTablets(MaterializedIndex index, ReplicaState replic } } + // Collect bucket tablets locally and bulk-publish to the MaterializedIndex's + // tablets list in a single copy-on-write after the loop (O(bucketNum) instead + // of O(bucketNum^2)). TabletInvertedIndex registration stays per-iteration + // because Tablet.addReplica(...) below needs the tablet present in the + // inverted index. + TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex(); + List bucketTablets = new ArrayList<>(distributionInfo.getBucketNum()); for (int i = 0; i < distributionInfo.getBucketNum(); ++i) { // create a new tablet with random chosen backends Tablet tablet = EnvFactory.getInstance().createTablet(idGeneratorBuffer.getNextId()); - // add tablet to inverted index first - index.addTablet(tablet, tabletMeta); + invertedIndex.addTablet(tablet.getId(), tabletMeta); + bucketTablets.add(tablet); tabletIdSet.add(tablet.getId()); // get BackendIds @@ -3539,6 +3546,9 @@ public TStorageMedium createTablets(MaterializedIndex index, ReplicaState replic totalReplicaNum + " vs. " + replicaAlloc.getTotalReplicaNum()); } + // Publish all bucket tablets to the materialized index in one batch. + index.appendTablets(bucketTablets); + if (groupId != null && chooseBackendsArbitrary) { colocateIndex.addBackendsPerBucketSeq(groupId, backendsPerBucketSeq); ColocatePersistInfo info = ColocatePersistInfo.createForBackendsPerBucketSeq(groupId, backendsPerBucketSeq); diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/MaterializedIndexTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/MaterializedIndexTest.java index 25b94bc59e828d..bf10b2556766e6 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/MaterializedIndexTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/MaterializedIndexTest.java @@ -21,6 +21,7 @@ import org.apache.doris.common.FeConstants; import org.apache.doris.common.io.Text; import org.apache.doris.persist.gson.GsonUtils; +import org.apache.doris.thrift.TStorageMedium; import mockit.Mocked; import org.junit.Assert; @@ -34,6 +35,8 @@ import java.nio.file.Paths; import java.util.LinkedList; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; public class MaterializedIndexTest { @@ -66,6 +69,68 @@ public void getMethodTest() { Assert.assertEquals(indexId, index.getId()); } + @Test + public void testGetTabletsReturnsImmutableSnapshot() { + TabletMeta tabletMeta = new TabletMeta(10, 20, 30, 40, 1, TStorageMedium.HDD); + index.addTablet(new LocalTablet(1L), tabletMeta, true); + + List snapshot = index.getTablets(); + Assert.assertEquals(1, snapshot.size()); + + // A write after the snapshot was taken must not be visible in it (copy-on-write). + index.addTablet(new LocalTablet(2L), tabletMeta, true); + Assert.assertEquals(1, snapshot.size()); + Assert.assertEquals(2, index.getTablets().size()); + + // The returned snapshot is read-only. + Assert.assertThrows(UnsupportedOperationException.class, () -> snapshot.add(new LocalTablet(3L))); + } + + @Test + public void testConcurrentGetTabletsNeverThrows() throws InterruptedException { + // A reader repeatedly snapshots and iterates getTablets() while a writer keeps + // adding tablets. Copy-on-write guarantees the reader never observes a partially + // built list or throws ConcurrentModificationException. + TabletMeta tabletMeta = new TabletMeta(10, 20, 30, 40, 1, TStorageMedium.HDD); + AtomicReference error = new AtomicReference<>(); + AtomicBoolean stop = new AtomicBoolean(false); + + Thread writer = new Thread(() -> { + long id = 1000L; + while (!stop.get()) { + index.addTablet(new LocalTablet(id++), tabletMeta, true); + // Keep the list bounded (and exercise the clear path) so the test stays fast. + if (index.getTablets().size() > 64) { + index.clearTabletsForRestore(); + } + } + }); + + Thread reader = new Thread(() -> { + try { + for (int i = 0; i < 50000 && error.get() == null; i++) { + for (Tablet tablet : index.getTablets()) { + tablet.getId(); + } + } + } catch (Throwable t) { + error.set(t); + } finally { + stop.set(true); + } + }); + + writer.start(); + reader.start(); + reader.join(); + stop.set(true); + writer.join(); + + if (error.get() != null) { + Assert.fail("getTablets() iteration threw under concurrent mutation: " + error.get()); + } + } + @Test public void testSerialization() throws Exception { // 1. Write objects to file diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/TabletTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/TabletTest.java index 27fd954e3315a3..144e61e823637e 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/TabletTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/TabletTest.java @@ -37,6 +37,9 @@ import java.io.DataOutputStream; import java.nio.file.Files; import java.nio.file.Path; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; public class TabletTest { @@ -124,6 +127,82 @@ public void deleteReplicaTest() { Assert.assertEquals(1, tablet.getReplicas().size()); } + @Test + public void testGetReplicasReturnsImmutableSnapshot() { + List snapshot = tablet.getReplicas(); + Assert.assertEquals(3, snapshot.size()); + + // A write after the snapshot was taken must not be visible in it (copy-on-write). + Replica replica4 = new LocalReplica(4L, 4L, 100L, 0, 200000L, 0, 3000L, ReplicaState.NORMAL, 0, 0); + tablet.addReplica(replica4); + Assert.assertEquals(3, snapshot.size()); + Assert.assertEquals(4, tablet.getReplicas().size()); + + // The returned snapshot is read-only. + Assert.assertThrows(UnsupportedOperationException.class, () -> snapshot.add(replica4)); + } + + @Test + public void testIterateReplicasWhileMutatingDoesNotThrow() { + // Iterating the snapshot returned by getReplicas() must not throw + // ConcurrentModificationException even when the tablet is structurally modified + // during iteration. + int seen = 0; + for (Replica r : tablet.getReplicas()) { + Assert.assertNotNull(r); + tablet.addReplica(new LocalReplica(100L + seen, 100L + seen, 100L, 0, 200000L, 0, 3000L, + ReplicaState.NORMAL, 0, 0)); + tablet.deleteReplicaByBackendId(2L); + seen++; + } + Assert.assertEquals(3, seen); + } + + @Test + public void testConcurrentGetReplicasNeverThrows() throws InterruptedException { + // A reader repeatedly snapshots and iterates getReplicas() while a writer keeps + // mutating the replica list. Copy-on-write guarantees the reader never observes a + // partially built list or throws ConcurrentModificationException. + AtomicReference error = new AtomicReference<>(); + AtomicBoolean stop = new AtomicBoolean(false); + + Thread writer = new Thread(() -> { + long id = 1000L; + while (!stop.get()) { + // Reuse a small set of backend ids so the list stays bounded while still + // exercising the add/replace path. + long beId = id % 8; + tablet.addReplica(new LocalReplica(id, beId, 100L, 0, 200000L, 0, 3000L, + ReplicaState.NORMAL, 0, 0), true); + id++; + } + }); + + Thread reader = new Thread(() -> { + try { + for (int i = 0; i < 50000 && error.get() == null; i++) { + for (Replica r : tablet.getReplicas()) { + r.getId(); + } + } + } catch (Throwable t) { + error.set(t); + } finally { + stop.set(true); + } + }); + + writer.start(); + reader.start(); + reader.join(); + stop.set(true); + writer.join(); + + if (error.get() != null) { + Assert.fail("getReplicas() iteration threw under concurrent mutation: " + error.get()); + } + } + @Test public void testSerialization() throws Exception { final Path path = Files.createTempFile("olapTabletTest", "tmp");