Skip to content

Commit 2f4c086

Browse files
authored
Avoid Inconsistencies among replicas during Upsert Compaction Tasks (#17696)
1 parent 562854b commit 2f4c086

6 files changed

Lines changed: 405 additions & 93 deletions

File tree

pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,11 @@ private MinionConstants() {
9292
*/
9393
public static final String SEGMENT_DOWNLOAD_PARALLELISM = "segmentDownloadParallelism";
9494

95+
/** Valid doc ids consensus mode (executor-only). Kept internal; executors pass config string. */
96+
public enum ValidDocIdsConsensusMode {
97+
UNSAFE, EQUAL, MOST_VALID_DOCS
98+
}
99+
95100
// Purges rows inside segment that match chosen criteria
96101
public static class PurgeTask {
97102
public static final String TASK_TYPE = "PurgeTask";
@@ -257,6 +262,16 @@ public static class UpsertCompactionTask {
257262
* number of segments to query in one batch to fetch valid doc id metadata, by default 500
258263
*/
259264
public static final String NUM_SEGMENTS_BATCH_PER_SERVER_REQUEST = "numSegmentsBatchPerServerRequest";
265+
266+
/**
267+
* Valid doc ids consensus mode used by the executor only (generator unchanged). Values: UNSAFE, EQUAL,
268+
* MOST_VALID_DOCS. UNSAFE = use first server with matching CRC and READY; EQUAL = require all replicas
269+
* to have the same valid doc set (default); MOST_VALID_DOCS = use replica with most valid docs.
270+
*/
271+
public static final String VALID_DOC_IDS_CONSENSUS_MODE_KEY = "validDocIdsConsensusMode";
272+
273+
/** Default: equal valid doc set consensus across replicas. */
274+
public static final String DEFAULT_VALID_DOC_IDS_CONSENSUS_MODE = "EQUAL";
260275
}
261276

262277
public static class UpsertCompactMergeTask {

pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java

Lines changed: 83 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@
3636
import org.apache.pinot.common.auth.AuthProviderUtils;
3737
import org.apache.pinot.common.auth.NullAuthProvider;
3838
import org.apache.pinot.common.restlet.resources.ValidDocIdsBitmapResponse;
39-
import org.apache.pinot.common.restlet.resources.ValidDocIdsMetadataInfo;
4039
import org.apache.pinot.common.restlet.resources.ValidDocIdsType;
4140
import org.apache.pinot.common.utils.RoaringBitmapUtils;
4241
import org.apache.pinot.common.utils.ServiceStatus;
@@ -67,6 +66,14 @@
6766
public class MinionTaskUtils {
6867
private static final Logger LOGGER = LoggerFactory.getLogger(MinionTaskUtils.class);
6968

69+
/** Package-private for testing: parses validDocIdsComparisonMode config string. */
70+
static MinionConstants.ValidDocIdsConsensusMode parseValidDocIdsConsensusMode(String value) {
71+
if (value == null || value.isBlank()) {
72+
return MinionConstants.ValidDocIdsConsensusMode.EQUAL;
73+
}
74+
return MinionConstants.ValidDocIdsConsensusMode.valueOf(value.toUpperCase().trim());
75+
}
76+
7077
private static final String DEFAULT_DIR_PATH_TERMINATOR = "/";
7178

7279
public static final String DATETIME_PATTERN = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
@@ -281,66 +288,114 @@ public static boolean extractMinionAllowDownloadFromServer(TableConfig tableConf
281288
}
282289

283290
/**
284-
* Returns the validDocID bitmap from the server whose local segment crc matches both crc of ZK metadata and
285-
* deepstore copy (expectedCrc).
291+
* Returns the validDocIds bitmap from server(s). {@code comparisonMode} is the task config value: UNSAFE,
292+
* EQUAL (default), or MOST_VALID_DOCS.
286293
*/
287294
@Nullable
288295
public static RoaringBitmap getValidDocIdFromServerMatchingCrc(String tableNameWithType, String segmentName,
289-
String validDocIdsType, MinionContext minionContext, String expectedCrc) {
296+
String validDocIdsType, MinionContext minionContext, String expectedCrc, String comparisonModeStr) {
297+
MinionConstants.ValidDocIdsConsensusMode consensusMode = parseValidDocIdsConsensusMode(comparisonModeStr);
290298
String clusterName = minionContext.getHelixManager().getClusterName();
291299
HelixAdmin helixAdmin = minionContext.getHelixManager().getClusterManagmentTool();
292-
RoaringBitmap validDocIds = null;
293300
List<String> servers = getServers(segmentName, tableNameWithType, helixAdmin, clusterName);
301+
List<RoaringBitmap> matchingBitmaps = new ArrayList<>();
302+
294303
for (String server : servers) {
295304
InstanceConfig instanceConfig = helixAdmin.getInstanceConfig(clusterName, server);
296305
String endpoint = InstanceUtils.getServerAdminEndpoint(instanceConfig);
297306

298-
// We only need aggregated table size and the total number of docs/rows. Skipping column related stats, by
299-
// passing an empty list.
300307
ServerSegmentMetadataReader serverSegmentMetadataReader = new ServerSegmentMetadataReader();
301308
ValidDocIdsBitmapResponse validDocIdsBitmapResponse;
302309
try {
303310
validDocIdsBitmapResponse =
304311
serverSegmentMetadataReader.getValidDocIdsBitmapFromServer(tableNameWithType, segmentName, endpoint,
305312
validDocIdsType, 60_000);
306313
} catch (Exception e) {
307-
LOGGER.warn("Unable to retrieve validDocIds bitmap for segment: " + segmentName + " from endpoint: "
308-
+ endpoint, e);
309-
continue;
314+
if (consensusMode == MinionConstants.ValidDocIdsConsensusMode.UNSAFE) {
315+
LOGGER.warn(
316+
"Unable to retrieve validDocIds bitmap for segment: " + segmentName + " from endpoint: " + endpoint, e);
317+
continue;
318+
} else {
319+
throw new IllegalStateException(
320+
"Unable to retrieve validDocIds bitmap for segment: " + segmentName + " from endpoint: " + endpoint, e);
321+
}
310322
}
311323

324+
String crcFromValidDocIdsBitmap = validDocIdsBitmapResponse.getSegmentCrc();
312325
// Check crc from the downloaded segment against the crc returned from the server along with the valid doc id
313326
// bitmap. If this doesn't match, this means that we are hitting the race condition where the segment has been
314327
// uploaded successfully while the server is still reloading the segment. Reloading can take a while when the
315328
// offheap upsert is used because we will need to delete & add all primary keys.
316329
// `BaseSingleSegmentConversionExecutor.executeTask()` already checks for the crc from the task generator
317330
// against the crc from the current segment zk metadata, so we don't need to check that here.
318-
String crcFromValidDocIdsBitmap = validDocIdsBitmapResponse.getSegmentCrc();
319331
if (!expectedCrc.equals(crcFromValidDocIdsBitmap)) {
320-
// In this scenario, we are hitting the other replica of the segment which did not commit to ZK or deepstore.
321-
// We will skip processing this bitmap to query other server to confirm if there is a valid matching CRC.
322-
String message = "CRC mismatch for segment: " + segmentName + ", expected value based on task generator: "
323-
+ expectedCrc + ", actual crc from validDocIdsBitmapResponse from endpoint " + endpoint + ": "
324-
+ crcFromValidDocIdsBitmap;
325-
LOGGER.warn(message);
326-
continue;
332+
if (consensusMode == MinionConstants.ValidDocIdsConsensusMode.UNSAFE) {
333+
LOGGER.warn("CRC mismatch for segment: {} from endpoint {}, skipping", segmentName, endpoint);
334+
continue;
335+
} else {
336+
throw new IllegalStateException(
337+
"CRC mismatch for segment: " + segmentName + ", expected: " + expectedCrc + ", actual from endpoint "
338+
+ endpoint + ": " + crcFromValidDocIdsBitmap);
339+
}
327340
}
328341

329-
// skipping servers which are not in READY state. The bitmaps would be inconsistent when
330-
// server is NOT READY as UPDATING segments might be updating the ONLINE segments
331342
if (validDocIdsBitmapResponse.getServerStatus() != null && !validDocIdsBitmapResponse.getServerStatus()
332343
.equals(ServiceStatus.Status.GOOD)) {
333-
String message = "Server " + validDocIdsBitmapResponse.getInstanceId() + " is in "
334-
+ validDocIdsBitmapResponse.getServerStatus() + " state, skipping it for execution for segment: "
335-
+ validDocIdsBitmapResponse.getSegmentName() + ". Will try other servers.";
336-
LOGGER.warn(message);
337-
continue;
344+
if (consensusMode == MinionConstants.ValidDocIdsConsensusMode.UNSAFE) {
345+
LOGGER.warn("Server {} not READY for segment {}, skipping", validDocIdsBitmapResponse.getInstanceId(),
346+
segmentName);
347+
continue;
348+
} else {
349+
throw new IllegalStateException("Server " + validDocIdsBitmapResponse.getInstanceId() + " is in "
350+
+ validDocIdsBitmapResponse.getServerStatus() + " state for segment: " + segmentName
351+
+ ". Failing task to avoid inconsistency among replicas.");
352+
}
353+
}
354+
355+
RoaringBitmap bitmap = RoaringBitmapUtils.deserialize(validDocIdsBitmapResponse.getBitmap());
356+
int cardinality = bitmap.getCardinality();
357+
358+
if (consensusMode == MinionConstants.ValidDocIdsConsensusMode.UNSAFE) {
359+
LOGGER.info("Using server {} with {} valid docs for segment {} (mode=UNSAFE)", server, cardinality,
360+
segmentName);
361+
return bitmap;
338362
}
339363

340-
validDocIds = RoaringBitmapUtils.deserialize(validDocIdsBitmapResponse.getBitmap());
341-
break;
364+
matchingBitmaps.add(bitmap);
342365
}
343-
return validDocIds;
366+
367+
if (matchingBitmaps.isEmpty()) {
368+
return null;
369+
}
370+
371+
if (consensusMode == MinionConstants.ValidDocIdsConsensusMode.EQUAL) {
372+
RoaringBitmap consensusBitMap = matchingBitmaps.get(0);
373+
for (RoaringBitmap b : matchingBitmaps) {
374+
if (!b.equals(consensusBitMap)) {
375+
throw new IllegalStateException("No consensus on validDocs across replicas for segment: " + segmentName
376+
+ ". Failing task to avoid replica inconsistency.");
377+
}
378+
}
379+
LOGGER.info("All {} servers have {} valid docs for segment {}", servers.size(), consensusBitMap.getCardinality(),
380+
segmentName);
381+
return consensusBitMap;
382+
}
383+
384+
// MOST_VALID_DOCS: explicitly pick the bitmap with the maximum valid doc count
385+
RoaringBitmap maxCardinalityMap = null;
386+
int maxCard = -1;
387+
for (RoaringBitmap b : matchingBitmaps) {
388+
int card = b.getCardinality();
389+
if (card > maxCard) {
390+
maxCard = card;
391+
maxCardinalityMap = b;
392+
}
393+
}
394+
if (maxCardinalityMap != null) {
395+
LOGGER.info("Selected server with {} valid docs for segment {} (mode=MOST_VALID_DOCS, checked {} servers)",
396+
maxCard, segmentName, servers.size());
397+
}
398+
return maxCardinalityMap;
344399
}
345400

346401
public static String toUTCString(long epochMillis) {
@@ -396,47 +451,4 @@ public static ValidDocIdsType getValidDocIdsType(UpsertConfig upsertConfig, Map<
396451
}
397452
return validDocIdsType;
398453
}
399-
400-
/**
401-
* Checks if all replicas have consensus on validDoc counts for a segment.
402-
* SAFETY LOGIC:
403-
* 1. Only proceed with operations when ALL replicas agree on totalValidDocs count
404-
* 2. Skip operations if ANY server hosting the segment is not in READY state
405-
* 3. Include all replicas (even those with CRC mismatches) in consensus for safety
406-
*
407-
* @param segmentName the name of the segment being checked
408-
* @param replicaMetadataList list of metadata from all replicas of the segment
409-
* @return true if all replicas have consensus on validDoc counts, false otherwise
410-
*/
411-
public static boolean hasValidDocConsensus(String segmentName,
412-
List<ValidDocIdsMetadataInfo> replicaMetadataList) {
413-
414-
if (replicaMetadataList == null || replicaMetadataList.isEmpty()) {
415-
LOGGER.warn("No replica metadata available for segment: {}", segmentName);
416-
return false;
417-
}
418-
419-
// Check server readiness and validDoc consensus
420-
Long consensusValidDocs = null;
421-
for (ValidDocIdsMetadataInfo metadata : replicaMetadataList) {
422-
// Check server readiness - skip if ANY server is not ready
423-
if (metadata.getServerStatus() != null && !metadata.getServerStatus().equals(ServiceStatus.Status.GOOD)) {
424-
LOGGER.warn("Server {} is in {} state for segment: {}, skipping consensus check",
425-
metadata.getInstanceId(), metadata.getServerStatus(), segmentName);
426-
return false;
427-
}
428-
429-
// Check if all replicas have the same totalValidDocs count
430-
long validDocs = metadata.getTotalValidDocs();
431-
if (consensusValidDocs == null) {
432-
// First iteration, we record the value to compare against
433-
consensusValidDocs = validDocs;
434-
} else if (!consensusValidDocs.equals(validDocs)) {
435-
LOGGER.warn("Inconsistent validDoc counts across replicas for segment: {}. Expected: {}, but found: {}",
436-
segmentName, consensusValidDocs, validDocs);
437-
return false;
438-
}
439-
}
440-
return true;
441-
}
442454
}

pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskExecutor.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,9 +75,17 @@ protected SegmentConversionResult convert(PinotTaskConfig pinotTaskConfig, File
7575
LOGGER.error(message);
7676
throw new IllegalStateException(message);
7777
}
78+
79+
// Executor-only: read comparison mode string from task config (no auth resolution or URL hits).
80+
Map<String, String> taskConfigs =
81+
tableConfig.getTaskConfig() != null ? tableConfig.getTaskConfig().getConfigsForTaskType(taskType) : null;
82+
String consensusMode =
83+
taskConfigs != null ? taskConfigs.getOrDefault(UpsertCompactionTask.VALID_DOC_IDS_CONSENSUS_MODE_KEY,
84+
UpsertCompactionTask.DEFAULT_VALID_DOC_IDS_CONSENSUS_MODE)
85+
: UpsertCompactionTask.DEFAULT_VALID_DOC_IDS_CONSENSUS_MODE;
7886
RoaringBitmap validDocIds =
7987
MinionTaskUtils.getValidDocIdFromServerMatchingCrc(tableNameWithType, segmentName, validDocIdsTypeStr,
80-
MINION_CONTEXT, originalSegmentCrcFromTaskGenerator);
88+
MINION_CONTEXT, originalSegmentCrcFromTaskGenerator, consensusMode);
8189
if (validDocIds == null) {
8290
// no valid crc match found or no validDocIds obtained from all servers
8391
// error out the task instead of silently failing so that we can track it via task-error metrics

pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskExecutor.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -109,15 +109,23 @@ protected List<SegmentConversionResult> convert(PinotTaskConfig pinotTaskConfig,
109109
List.of(configs.get(MinionConstants.ORIGINAL_SEGMENT_CRC_KEY).split(","));
110110
validateCRCForInputSegments(segmentMetadataList, originalSegmentCrcFromTaskGenerator);
111111

112-
// Fetch validDocID snapshot from server and get record-reader for compacted reader.
112+
// Executor-only: read comparison mode string from task config (no auth resolution or URL hits).
113+
Map<String, String> taskConfigs =
114+
tableConfig.getTaskConfig() != null ? tableConfig.getTaskConfig().getConfigsForTaskType(taskType) : null;
115+
String consensusMode = taskConfigs != null ? taskConfigs.getOrDefault(
116+
MinionConstants.UpsertCompactionTask.VALID_DOC_IDS_CONSENSUS_MODE_KEY,
117+
MinionConstants.UpsertCompactionTask.DEFAULT_VALID_DOC_IDS_CONSENSUS_MODE)
118+
: MinionConstants.UpsertCompactionTask.DEFAULT_VALID_DOC_IDS_CONSENSUS_MODE;
119+
113120
List<RecordReader> recordReaders = segmentMetadataList.stream().map(x -> {
114121
RoaringBitmap validDocIds = MinionTaskUtils.getValidDocIdFromServerMatchingCrc(tableNameWithType, x.getName(),
115-
ValidDocIdsType.SNAPSHOT.name(), MINION_CONTEXT, x.getCrc());
122+
ValidDocIdsType.SNAPSHOT.name(), MINION_CONTEXT, x.getCrc(), consensusMode);
116123
if (validDocIds == null) {
117124
// no valid crc match found or no validDocIds obtained from all servers
118125
// error out the task instead of silently failing so that we can track it via task-error metrics
119-
String message = String.format("No validDocIds found from all servers. They either failed to download "
120-
+ "or did not match crc from segment copy obtained from deepstore / servers. " + "Expected crc: %s", "");
126+
String message = "No validDocIds found from all servers for segment: " + x.getName()
127+
+ ". They either failed to download or did not match crc from segment copy obtained from "
128+
+ "deepstore/servers. Expected crc: " + x.getCrc();
121129
LOGGER.error(message);
122130
throw new IllegalStateException(message);
123131
}

pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGenerator.java

Lines changed: 1 addition & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@
3434
import org.apache.helix.task.TaskState;
3535
import org.apache.pinot.common.exception.InvalidConfigException;
3636
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
37-
import org.apache.pinot.common.metrics.ControllerMeter;
3837
import org.apache.pinot.common.metrics.ControllerMetrics;
3938
import org.apache.pinot.common.restlet.resources.ValidDocIdsMetadataInfo;
4039
import org.apache.pinot.common.restlet.resources.ValidDocIdsType;
@@ -45,7 +44,6 @@
4544
import org.apache.pinot.controller.util.ServerSegmentMetadataReader;
4645
import org.apache.pinot.core.common.MinionConstants;
4746
import org.apache.pinot.core.minion.PinotTaskConfig;
48-
import org.apache.pinot.plugin.minion.tasks.MinionTaskUtils;
4947
import org.apache.pinot.spi.annotations.minion.TaskGenerator;
5048
import org.apache.pinot.spi.config.table.TableConfig;
5149
import org.apache.pinot.spi.config.table.TableType;
@@ -295,23 +293,9 @@ public static SegmentSelectionResult processValidDocIdsMetadata(String tableName
295293
continue;
296294
}
297295
SegmentZKMetadata segment = candidateSegmentsMap.get(segmentName);
298-
List<ValidDocIdsMetadataInfo> replicaMetadataList = validDocIdsMetadataInfoMap.get(segmentName);
299-
300-
// Check consensus across all replicas before proceeding with any operations
301-
if (!MinionTaskUtils.hasValidDocConsensus(segmentName, replicaMetadataList)) {
302-
LOGGER.info("Skipping segment {} for table {} - no consensus on validDoc counts across replicas",
303-
segmentName, tableNameWithType);
304-
305-
// Emit metric to track segments skipped due to consensus failure
306-
if (controllerMetrics != null) {
307-
controllerMetrics.addMeteredTableValue(tableNameWithType,
308-
ControllerMeter.UPSERT_COMPACT_MERGE_SEGMENT_SKIPPED_CONSENSUS_FAILURE, 1L);
309-
}
310-
continue;
311-
}
312296

313297
// Process with existing logic using the first replica with matching CRC (since all have consensus)
314-
for (ValidDocIdsMetadataInfo validDocIdsMetadata : replicaMetadataList) {
298+
for (ValidDocIdsMetadataInfo validDocIdsMetadata : validDocIdsMetadataInfoMap.get(segmentName)) {
315299
long totalInvalidDocs = validDocIdsMetadata.getTotalInvalidDocs();
316300
long totalValidDocs = validDocIdsMetadata.getTotalValidDocs();
317301
long segmentSizeInBytes = validDocIdsMetadata.getSegmentSizeInBytes();

0 commit comments

Comments
 (0)