Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -437,14 +437,22 @@ private RollupJobV2 createMaterializedViewJob(String rawSql, String mvName, Stri
MaterializedIndex mvIndex = new MaterializedIndex(mvIndexId, IndexState.SHADOW);
MaterializedIndex baseIndex = partition.getIndex(baseIndexId);
short replicationNum = olapTable.getPartitionInfo().getReplicaAllocation(partitionId).getTotalReplicaNum();
// All MV tablets of the same (partition, mv index) share the same TabletMeta;
// build it once and bulk-publish to MaterializedIndex.tablets after the per-tablet
// loop to keep copy-on-write O(n). TabletInvertedIndex registration stays
// per-iteration because Tablet.addReplica(...) below needs the tablet present
// in the inverted index.
TabletMeta mvTabletMeta = new TabletMeta(
dbId, tableId, partitionId, mvIndexId, mvSchemaHash, medium);
List<Tablet> mvTabletsForPartition = Lists.newArrayListWithCapacity(baseIndex.getTablets().size());
TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex();
for (Tablet baseTablet : baseIndex.getTablets()) {
TabletMeta mvTabletMeta = new TabletMeta(
dbId, tableId, partitionId, mvIndexId, mvSchemaHash, medium);
long baseTabletId = baseTablet.getId();
long mvTabletId = idGeneratorBuffer.getNextId();

Tablet newTablet = EnvFactory.getInstance().createTablet(mvTabletId);
mvIndex.addTablet(newTablet, mvTabletMeta);
invertedIndex.addTablet(mvTabletId, mvTabletMeta);
mvTabletsForPartition.add(newTablet);
addedTablets.add(newTablet);

mvJob.addTabletIdMap(partitionId, mvTabletId, baseTabletId);
Expand Down Expand Up @@ -499,6 +507,9 @@ private RollupJobV2 createMaterializedViewJob(String rawSql, String mvName, Stri
}
} // end for baseTablets

// Bulk-publish all MV tablets for this partition in one copy-on-write.
mvIndex.appendTablets(mvTabletsForPartition);

mvJob.addMVIndex(partitionId, mvIndex);

if (LOG.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf.TableType;
import org.apache.doris.catalog.Tablet;
import org.apache.doris.catalog.TabletInvertedIndex;
import org.apache.doris.catalog.TabletMeta;
import org.apache.doris.cloud.qe.ComputeGroupException;
import org.apache.doris.common.AnalysisException;
Expand Down Expand Up @@ -1680,14 +1681,23 @@ private void createJob(String rawSql, long dbId, OlapTable olapTable, Map<Long,
MaterializedIndex originIndex = partition.getIndex(originIndexId);
ReplicaAllocation replicaAlloc = olapTable.getPartitionInfo().getReplicaAllocation(partitionId);
Short totalReplicaNum = replicaAlloc.getTotalReplicaNum();
// All shadow tablets of the same (partition, shadow index) share the same TabletMeta;
// build it once and bulk-publish to MaterializedIndex.tablets after the per-tablet
// loop to keep copy-on-write O(n). TabletInvertedIndex registration stays
// per-iteration because Tablet.addReplica(...) below needs the tablet present
// in the inverted index.
TabletMeta shadowTabletMeta = new TabletMeta(dbId, tableId, partitionId, shadowIndexId,
newSchemaHash, medium);
List<Tablet> shadowTabletsForPartition = Lists.newArrayListWithCapacity(
originIndex.getTablets().size());
TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex();
for (Tablet originTablet : originIndex.getTablets()) {
TabletMeta shadowTabletMeta = new TabletMeta(dbId, tableId, partitionId, shadowIndexId,
newSchemaHash, medium);
long originTabletId = originTablet.getId();
long shadowTabletId = idGeneratorBuffer.getNextId();

Tablet shadowTablet = EnvFactory.getInstance().createTablet(shadowTabletId);
shadowIndex.addTablet(shadowTablet, shadowTabletMeta);
invertedIndex.addTablet(shadowTabletId, shadowTabletMeta);
shadowTabletsForPartition.add(shadowTablet);
addedTablets.add(shadowTablet);

schemaChangeJob.addTabletIdMap(partitionId, shadowIndexId, shadowTabletId, originTabletId);
Expand Down Expand Up @@ -1745,6 +1755,9 @@ private void createJob(String rawSql, long dbId, OlapTable olapTable, Map<Long,
}
}

// Bulk-publish all shadow tablets for this partition in one copy-on-write.
shadowIndex.appendTablets(shadowTabletsForPartition);

schemaChangeJob.addPartitionShadowIndex(partitionId, shadowIndexId, shadowIndex);
} // end for partition
schemaChangeJob.addIndexSchema(shadowIndexId, originIndexId, newIndexName, newSchemaVersion, newSchemaHash,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@

import java.io.DataInput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -1525,12 +1526,13 @@ protected Partition resetTabletForRestore(OlapTable localTbl, OlapTable remoteTb
int schemaHash = remoteTbl.getSchemaHashByIndexId(remoteIdx.getId());
int remotetabletSize = remoteIdx.getTablets().size();
remoteIdx.clearTabletsForRestore();
// Collect locally and bulk-publish to keep copy-on-write O(n) for the whole index.
List<Tablet> newTablets = new ArrayList<>(remotetabletSize);
for (int i = 0; i < remotetabletSize; i++) {
// generate new tablet id
long newTabletId = env.getNextId();
Tablet newTablet = EnvFactory.getInstance().createTablet(newTabletId);
// add tablet to index, but not add to TabletInvertedIndex
remoteIdx.addTablet(newTablet, null /* tablet meta */, true /* is restore */);
newTablets.add(newTablet);
// replicas
try {
Pair<Map<Tag, List<Long>>, TStorageMedium> beIdsAndMedium = Env.getCurrentSystemInfo()
Expand All @@ -1549,6 +1551,9 @@ protected Partition resetTabletForRestore(OlapTable localTbl, OlapTable remoteTb
return null;
}
}
// add tablets to index in one batch; TabletInvertedIndex registration
// is intentionally skipped on the restore path (rebuilt separately).
remoteIdx.appendTablets(newTablets);
}
return remotePart;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,7 @@ private void updateStatInfo(List<Long> dbIds) {
long tabletIndexSize = 0L;
long tabletSegmentSize = 0L;

// getReplicas() returns an immutable volatile snapshot; CME-safe under concurrent DDL.
for (Replica replica : tablet.getReplicas()) {
if (replica.getDataSize() > tabletDataSize) {
tabletDataSize = replica.getDataSize();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,16 @@
import org.apache.logging.log4j.Logger;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.stream.LongStream;

public class LocalTablet extends Tablet {
private static final Logger LOG = LogManager.getLogger(LocalTablet.class);

@SerializedName(value = "rs", alternate = {"replicas"})
private List<Replica> replicas;
private volatile List<Replica> replicas;
@SerializedName(value = "lastCheckTime")
private long lastCheckTime;

Expand Down Expand Up @@ -109,13 +109,14 @@ public long getRemoteDataSize() {
if (cooldownReplicaId <= 0) {
return 0;
}
for (Replica r : replicas) {
List<Replica> snapshot = replicas; // single volatile read; reuse below
for (Replica r : snapshot) {
if (r.getId() == cooldownReplicaId) {
return r.getRemoteDataSize();
}
}
// return replica with max remoteDataSize
return replicas.stream().max(Comparator.comparing(Replica::getRemoteDataSize)).get().getRemoteDataSize();
return snapshot.stream().max(Comparator.comparing(Replica::getRemoteDataSize)).get().getRemoteDataSize();
}

@Override
Expand Down Expand Up @@ -223,29 +224,32 @@ public void setLastCheckTime(long lastCheckTime) {
this.lastCheckTime = lastCheckTime;
}

private boolean isLatestReplicaAndDeleteOld(Replica newReplica) {
boolean delete = false;
// Writers are synchronized on this tablet to prevent concurrent lost-update:
// some callers (e.g. InternalCatalog.createTablets, RestoreJob) do NOT hold
// the OlapTable write lock when modifying replicas.
// Readers capture the volatile reference once and iterate freely — no lock needed.

@Override
public synchronized void addReplica(Replica replica, boolean isRestore) {
long version = replica.getVersion();
long backendId = replica.getBackendIdWithoutException();
boolean hasBackend = false;
long version = newReplica.getVersion();
Iterator<Replica> iterator = replicas.iterator();
while (iterator.hasNext()) {
Replica replica = iterator.next();
if (replica.getBackendIdWithoutException() == newReplica.getBackendIdWithoutException()) {
boolean deletedOld = false;
List<Replica> current = replicas;
List<Replica> next = new ArrayList<>(current.size() + 1);
for (Replica r : current) {
if (r.getBackendIdWithoutException() == backendId) {
hasBackend = true;
if (replica.getVersion() <= version) {
iterator.remove();
delete = true;
if (r.getVersion() <= version) {
deletedOld = true;
continue; // drop stale replica
}
}
next.add(r);
}

return delete || !hasBackend;
}

@Override
public void addReplica(Replica replica, boolean isRestore) {
if (isLatestReplicaAndDeleteOld(replica)) {
replicas.add(replica);
if (deletedOld || !hasBackend) {
next.add(replica);
replicas = next; // volatile write; readers see the new immutable snapshot
if (!isRestore) {
Env.getCurrentInvertedIndex().addReplica(id, replica);
}
Expand All @@ -254,12 +258,13 @@ public void addReplica(Replica replica, boolean isRestore) {

@Override
public List<Replica> getReplicas() {
return this.replicas;
// Volatile read: returns the current immutable snapshot; callers iterate without locking.
return Collections.unmodifiableList(replicas);
}

@Override
public Replica getReplicaByBackendId(long backendId) {
for (Replica replica : replicas) {
for (Replica replica : replicas) { // single volatile read
if (replica.getBackendIdWithoutException() == backendId) {
return replica;
}
Expand All @@ -268,26 +273,35 @@ public Replica getReplicaByBackendId(long backendId) {
}

@Override
public boolean deleteReplica(Replica replica) {
if (replicas.contains(replica)) {
replicas.remove(replica);
public synchronized boolean deleteReplica(Replica replica) {
List<Replica> current = replicas;
if (current.contains(replica)) {
List<Replica> next = new ArrayList<>(current);
next.remove(replica);
replicas = next; // volatile write
Env.getCurrentInvertedIndex().deleteReplica(id, replica.getBackendIdWithoutException());
return true;
}
return false;
}

@Override
public boolean deleteReplicaByBackendId(long backendId) {
Iterator<Replica> iterator = replicas.iterator();
while (iterator.hasNext()) {
Replica replica = iterator.next();
public synchronized boolean deleteReplicaByBackendId(long backendId) {
List<Replica> current = replicas;
List<Replica> next = new ArrayList<>(current.size());
Replica found = null;
for (Replica replica : current) {
if (replica.getBackendIdWithoutException() == backendId) {
iterator.remove();
Env.getCurrentInvertedIndex().deleteReplica(id, backendId);
return true;
found = replica;
} else {
next.add(replica);
}
}
if (found != null) {
replicas = next; // volatile write
Env.getCurrentInvertedIndex().deleteReplica(id, backendId);
return true;
}
return false;
}

Expand All @@ -302,13 +316,17 @@ public boolean equals(Object obj) {

LocalTablet tablet = (LocalTablet) obj;

if (replicas != tablet.replicas) {
if (replicas.size() != tablet.replicas.size()) {
// Capture one snapshot per side so a concurrent writer cannot publish
// a different list between size/contains/get calls below.
List<Replica> thisReplicas = replicas;
List<Replica> otherReplicas = tablet.replicas;
if (thisReplicas != otherReplicas) {
if (thisReplicas.size() != otherReplicas.size()) {
return false;
}
int size = replicas.size();
int size = thisReplicas.size();
for (int i = 0; i < size; i++) {
if (!tablet.replicas.contains(replicas.get(i))) {
if (!otherReplicas.contains(thisReplicas.get(i))) {
return false;
}
}
Expand All @@ -334,7 +352,7 @@ public boolean readyToBeRepaired(SystemInfoService infoService, TabletSchedCtx.P
}

boolean allBeAliveOrDecommissioned = true;
for (Replica replica : replicas) {
for (Replica replica : replicas) { // single volatile read; iteration on the snapshot
Backend backend = infoService.getBackend(replica.getBackendIdWithoutException());
if (backend == null || (!backend.isAlive() && !backend.isDecommissioned())) {
allBeAliveOrDecommissioned = false;
Expand Down
Loading
Loading