Skip to content

Commit 32a77d3

Browse files
committed
Add batched SegmentZKMetadata iteration API and migrate controller usages
1 parent 657e7f0 commit 32a77d3

12 files changed

Lines changed: 294 additions & 143 deletions

File tree

pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import com.fasterxml.jackson.core.JsonProcessingException;
2222
import com.google.common.annotations.VisibleForTesting;
23+
import com.google.common.base.Preconditions;
2324
import java.io.IOException;
2425
import java.util.ArrayList;
2526
import java.util.Collections;
@@ -28,6 +29,7 @@
2829
import java.util.Map;
2930
import java.util.Objects;
3031
import java.util.Optional;
32+
import java.util.function.Consumer;
3133
import java.util.stream.Collectors;
3234
import javax.annotation.Nullable;
3335
import org.apache.commons.lang3.tuple.ImmutablePair;
@@ -69,6 +71,8 @@ public class ZKMetadataProvider {
6971
private ZKMetadataProvider() {
7072
}
7173

74+
public static final int DEFAULT_SEGMENTS_ZK_METADATA_BATCH_SIZE = 1000;
75+
7276
private static final Logger LOGGER = LoggerFactory.getLogger(ZKMetadataProvider.class);
7377
private static final String CLUSTER_TENANT_ISOLATION_ENABLED_KEY = "tenantIsolationEnabled";
7478
private static final String CLUSTER_APPLICATION_QUOTAS = "applicationQuotas";
@@ -725,6 +729,54 @@ public static List<SegmentZKMetadata> getSegmentsZKMetadata(ZkHelixPropertyStore
725729
}
726730
}
727731

732+
/**
733+
* Applies the given consumer on segment ZK metadata for the given table in batches.
734+
*
735+
* @param batchSize batch size for ZK get calls
736+
* @param consumer function invoked for each non-null segment metadata
737+
*/
738+
public static void forEachSegmentZKMetadata(ZkHelixPropertyStore<ZNRecord> propertyStore, String tableNameWithType,
739+
int batchSize, Consumer<SegmentZKMetadata> consumer) {
740+
Preconditions.checkArgument(batchSize > 0, "Segment metadata batchSize must be greater than 0: %s", batchSize);
741+
742+
String segmentsPath = constructPropertyStorePathForResource(tableNameWithType);
743+
List<String> segmentNames = getSegments(propertyStore, tableNameWithType);
744+
if (segmentNames == null || segmentNames.isEmpty()) {
745+
LOGGER.debug("No segments found under path: {}", segmentsPath);
746+
return;
747+
}
748+
749+
for (int startIndex = 0; startIndex < segmentNames.size(); startIndex += batchSize) {
750+
int endIndex = Math.min(startIndex + batchSize, segmentNames.size());
751+
List<String> segmentNameBatch = segmentNames.subList(startIndex, endIndex);
752+
753+
List<String> segmentPathBatch = new ArrayList<>(segmentNameBatch.size());
754+
for (String segmentName : segmentNameBatch) {
755+
segmentPathBatch.add(constructPropertyStorePathForSegment(tableNameWithType, segmentName));
756+
}
757+
758+
List<ZNRecord> znRecords = propertyStore.get(segmentPathBatch, null, AccessOption.PERSISTENT);
759+
int numNullRecords = 0;
760+
if (znRecords != null) {
761+
for (int i = 0; i < segmentNameBatch.size(); i++) {
762+
ZNRecord znRecord = i < znRecords.size() ? znRecords.get(i) : null;
763+
if (znRecord == null) {
764+
numNullRecords++;
765+
} else {
766+
consumer.accept(new SegmentZKMetadata(znRecord));
767+
}
768+
}
769+
} else {
770+
numNullRecords = segmentNameBatch.size();
771+
}
772+
773+
if (numNullRecords > 0) {
774+
LOGGER.warn("Failed to read {}/{} segment ZK metadata under path: {} for table: {}",
775+
numNullRecords, segmentNameBatch.size(), segmentsPath, tableNameWithType);
776+
}
777+
}
778+
}
779+
728780
/**
729781
* Returns the segments for the given table.
730782
*

pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,9 @@ public static class ControllerPeriodicTasksConf {
273273
public static final String AGED_SEGMENTS_DELETION_BATCH_SIZE =
274274
"controller.retentionManager.agedSegmentsDeletionBatchSize";
275275
public static final int DEFAULT_AGED_SEGMENTS_DELETION_BATCH_SIZE = 1000;
276+
public static final String SEGMENTS_ZK_METADATA_BATCH_SIZE =
277+
"controller.retentionManager.segmentsZkMetadataBatchSize";
278+
public static final int DEFAULT_SEGMENTS_ZK_METADATA_BATCH_SIZE = 1000;
276279
public static final int MIN_INITIAL_DELAY_IN_SECONDS = 120;
277280
public static final int MAX_INITIAL_DELAY_IN_SECONDS = 300;
278281
public static final int DEFAULT_SPLIT_COMMIT_TMP_SEGMENT_LIFETIME_SECOND = 60 * 60; // 1 Hour.
@@ -1226,6 +1229,15 @@ public void setAgedSegmentsDeletionBatchSize(int agedSegmentsDeletionBatchSize)
12261229
setProperty(ControllerPeriodicTasksConf.AGED_SEGMENTS_DELETION_BATCH_SIZE, agedSegmentsDeletionBatchSize);
12271230
}
12281231

1232+
public int getSegmentsZKMetadataBatchSize() {
1233+
return getProperty(ControllerPeriodicTasksConf.SEGMENTS_ZK_METADATA_BATCH_SIZE,
1234+
ControllerPeriodicTasksConf.DEFAULT_SEGMENTS_ZK_METADATA_BATCH_SIZE);
1235+
}
1236+
1237+
public void setSegmentsZKMetadataBatchSize(int segmentZKMetadataBatchSize) {
1238+
setProperty(ControllerPeriodicTasksConf.SEGMENTS_ZK_METADATA_BATCH_SIZE, segmentZKMetadataBatchSize);
1239+
}
1240+
12291241
public long getPinotTaskManagerInitialDelaySeconds() {
12301242
return getPeriodicTaskInitialDelayInSeconds();
12311243
}

pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -770,12 +770,9 @@ public Map<String, Map<String, String>> getZookeeperMetadata(
770770
String tableNameWithType =
771771
ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, tableType, LOGGER).get(0);
772772
Map<String, Map<String, String>> segmentToMetadataMap = new HashMap<>();
773-
List<SegmentZKMetadata> segmentZKMetadataList =
774-
_pinotHelixResourceManager.getSegmentsZKMetadata(tableNameWithType);
775-
776-
for (SegmentZKMetadata segmentZKMetadata : segmentZKMetadataList) {
773+
_pinotHelixResourceManager.forEachSegmentsZKMetadata(tableNameWithType, segmentZKMetadata -> {
777774
segmentToMetadataMap.put(segmentZKMetadata.getSegmentName(), segmentZKMetadata.toMap());
778-
}
775+
});
779776
return segmentToMetadataMap;
780777
}
781778

pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java

Lines changed: 22 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -902,20 +902,17 @@ public List<String> getSegmentsFor(String tableNameWithType, boolean shouldExclu
902902
selectedSegments = new ArrayList<>(segmentSet);
903903
} else {
904904
selectedSegments = new ArrayList<>();
905-
List<SegmentZKMetadata> segmentZKMetadataList = getSegmentsZKMetadata(tableNameWithType);
906905
ArrayList<String> filteredSegments = new ArrayList<>();
907-
for (SegmentZKMetadata segmentZKMetadata : segmentZKMetadataList) {
906+
forEachSegmentsZKMetadata(tableNameWithType, ZKMetadataProvider.DEFAULT_SEGMENTS_ZK_METADATA_BATCH_SIZE,
907+
segmentZKMetadata -> {
908908
String segmentName = segmentZKMetadata.getSegmentName();
909909
// Compute the intersection of segmentZK metadata and idealstate for valid segments
910910
if (!segmentSet.contains(segmentName)) {
911911
filteredSegments.add(segmentName);
912-
continue;
913-
}
914-
// Filter by time if the time range is specified
915-
if (isSegmentWithinTimeStamps(segmentZKMetadata, startTimestamp, endTimestamp, excludeOverlapping)) {
912+
} else if (isSegmentWithinTimeStamps(segmentZKMetadata, startTimestamp, endTimestamp, excludeOverlapping)) {
916913
selectedSegments.add(segmentName);
917914
}
918-
}
915+
});
919916
LOGGER.info(
920917
"Successfully computed the segments for table : {}. # of filtered segments: {}, the filtered segment list: "
921918
+ "{}. Only showing up to 100 filtered segments.", tableNameWithType, filteredSegments.size(),
@@ -995,13 +992,23 @@ public SegmentZKMetadata getSegmentZKMetadata(String tableNameWithType, String s
995992
return ZKMetadataProvider.getSegmentZKMetadata(_propertyStore, tableNameWithType, segmentName);
996993
}
997994

995+
public void forEachSegmentsZKMetadata(String tableNameWithType, int batchSize,
996+
Consumer<SegmentZKMetadata> segmentMetadataConsumer) {
997+
ZKMetadataProvider.forEachSegmentZKMetadata(_propertyStore, tableNameWithType, batchSize, segmentMetadataConsumer);
998+
}
999+
1000+
public void forEachSegmentsZKMetadata(String tableNameWithType, Consumer<SegmentZKMetadata> segmentMetadataConsumer) {
1001+
ZKMetadataProvider.forEachSegmentZKMetadata(_propertyStore, tableNameWithType,
1002+
ZKMetadataProvider.DEFAULT_SEGMENTS_ZK_METADATA_BATCH_SIZE, segmentMetadataConsumer);
1003+
}
1004+
9981005
public List<SegmentZKMetadata> getSegmentsZKMetadata(String tableNameWithType) {
9991006
return ZKMetadataProvider.getSegmentsZKMetadata(_propertyStore, tableNameWithType);
10001007
}
10011008

10021009
public Collection<String> getLastLLCCompletedSegments(String tableNameWithType) {
10031010
Map<Integer, String> partitionIdToLastLLCCompletedSegmentMap = new HashMap<>();
1004-
for (SegmentZKMetadata zkMetadata : getSegmentsZKMetadata(tableNameWithType)) {
1011+
forEachSegmentsZKMetadata(tableNameWithType, zkMetadata -> {
10051012
if (zkMetadata.getStatus() == CommonConstants.Segment.Realtime.Status.DONE) {
10061013
LLCSegmentName llcName = LLCSegmentName.of(zkMetadata.getSegmentName());
10071014
int partitionGroupId = llcName.getPartitionGroupId();
@@ -1012,7 +1019,7 @@ public Collection<String> getLastLLCCompletedSegments(String tableNameWithType)
10121019
partitionIdToLastLLCCompletedSegmentMap.put(partitionGroupId, zkMetadata.getSegmentName());
10131020
}
10141021
}
1015-
}
1022+
});
10161023
return partitionIdToLastLLCCompletedSegmentMap.values();
10171024
}
10181025

@@ -1510,13 +1517,12 @@ public void addSchema(Schema schema, boolean override, boolean force)
15101517

15111518
public void updateSegmentsZKTimeInterval(String tableNameWithType, DateTimeFieldSpec timeColumnFieldSpec) {
15121519
LOGGER.info("Updating segment time interval in ZK metadata for table: {}", tableNameWithType);
1513-
1514-
List<SegmentZKMetadata> segmentZKMetadataList = getSegmentsZKMetadata(tableNameWithType);
1515-
for (SegmentZKMetadata segmentZKMetadata : segmentZKMetadataList) {
1516-
int version = segmentZKMetadata.toZNRecord().getVersion();
1517-
updateZkTimeInterval(segmentZKMetadata, timeColumnFieldSpec);
1518-
updateZkMetadata(tableNameWithType, segmentZKMetadata, version);
1519-
}
1520+
forEachSegmentsZKMetadata(tableNameWithType,
1521+
ZKMetadataProvider.DEFAULT_SEGMENTS_ZK_METADATA_BATCH_SIZE, segmentZKMetadata -> {
1522+
int version = segmentZKMetadata.toZNRecord().getVersion();
1523+
updateZkTimeInterval(segmentZKMetadata, timeColumnFieldSpec);
1524+
updateZkMetadata(tableNameWithType, segmentZKMetadata, version);
1525+
});
15201526
}
15211527

15221528
public void updateSchema(Schema schema, boolean reload, boolean forceTableSchemaUpdate)

pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/ClusterInfoAccessor.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.List;
2323
import java.util.Map;
2424
import java.util.concurrent.Executor;
25+
import java.util.function.Consumer;
2526
import javax.annotation.Nullable;
2627
import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager;
2728
import org.apache.helix.HelixDataAccessor;
@@ -115,6 +116,15 @@ public List<SegmentZKMetadata> getSegmentsZKMetadata(String tableNameWithType) {
115116
return ZKMetadataProvider.getSegmentsZKMetadata(_pinotHelixResourceManager.getPropertyStore(), tableNameWithType);
116117
}
117118

119+
public void forEachSegmentsZKMetadata(String tableNameWithType, int batchSize,
120+
Consumer<SegmentZKMetadata> segmentMetadataConsumer) {
121+
_pinotHelixResourceManager.forEachSegmentsZKMetadata(tableNameWithType, batchSize, segmentMetadataConsumer);
122+
}
123+
124+
public void forEachSegmentsZKMetadata(String tableNameWithType, Consumer<SegmentZKMetadata> segmentMetadataConsumer) {
125+
_pinotHelixResourceManager.forEachSegmentsZKMetadata(tableNameWithType, segmentMetadataConsumer);
126+
}
127+
118128
public IdealState getIdealState(String tableNameWithType) {
119129
return _pinotHelixResourceManager.getTableIdealState(tableNameWithType);
120130
}

pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/BaseTaskGenerator.java

Lines changed: 42 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.List;
2424
import java.util.Map;
2525
import java.util.Set;
26+
import java.util.function.Predicate;
2627
import org.apache.commons.lang3.StringUtils;
2728
import org.apache.helix.model.IdealState;
2829
import org.apache.helix.task.JobConfig;
@@ -161,32 +162,54 @@ public int getMaxAttemptsPerTask(String minionTag) {
161162
* @return the list of segment zk metadata for available segments in the table.
162163
*/
163164
public List<SegmentZKMetadata> getSegmentsZKMetadataForTable(String tableNameWithType) {
165+
return getSegmentsZKMetadataInIdealState(tableNameWithType, null);
166+
}
167+
168+
public List<SegmentZKMetadata> getNonConsumingSegmentsZKMetadataForRealtimeTable(String tableNameWithType) {
164169
IdealState idealState = _clusterInfoAccessor.getIdealState(tableNameWithType);
170+
return getSegmentsZKMetadataInIdealState(tableNameWithType, segmentZKMetadata -> {
171+
String segmentName = segmentZKMetadata.getSegmentName();
172+
return segmentZKMetadata.getStatus().isCompleted() // skip consuming segments
173+
&& !idealState.getInstanceStateMap(segmentName).containsValue(SegmentStateModel.CONSUMING);
174+
// The last check is for an edge case where
175+
// 1. SegmentZKMetadata was updated to DONE in segment commit protocol, but
176+
// 2. IdealState for the segment was not updated to ONLINE due to some issue in the controller.
177+
// We avoid picking up such segments to allow RealtimeSegmentValidationManager to fix them.
178+
});
179+
}
180+
181+
private List<SegmentZKMetadata> getSegmentsZKMetadataInIdealState(String tableNameWithType,
182+
Predicate<SegmentZKMetadata> segmentMetadataFilter) {
183+
IdealState idealState = _clusterInfoAccessor.getIdealState(tableNameWithType);
184+
if (idealState == null) {
185+
return new ArrayList<>();
186+
}
165187
Set<String> segmentsForTable = idealState.getPartitionSet();
166-
List<SegmentZKMetadata> segmentZKMetadataList = _clusterInfoAccessor.getSegmentsZKMetadata(tableNameWithType);
188+
if (segmentsForTable == null || segmentsForTable.isEmpty()) {
189+
return new ArrayList<>();
190+
}
191+
167192
List<SegmentZKMetadata> selectedSegmentZKMetadataList = new ArrayList<>();
168-
for (SegmentZKMetadata segmentZKMetadata : segmentZKMetadataList) {
169-
if (segmentsForTable.contains(segmentZKMetadata.getSegmentName())) {
193+
boolean[] callbackInvoked = new boolean[1];
194+
_clusterInfoAccessor.forEachSegmentsZKMetadata(tableNameWithType, segmentZKMetadata -> {
195+
callbackInvoked[0] = true;
196+
if (segmentsForTable.contains(segmentZKMetadata.getSegmentName())
197+
&& (segmentMetadataFilter == null || segmentMetadataFilter.test(segmentZKMetadata))) {
170198
selectedSegmentZKMetadataList.add(segmentZKMetadata);
171199
}
200+
});
201+
if (callbackInvoked[0]) {
202+
return selectedSegmentZKMetadataList;
172203
}
173-
return selectedSegmentZKMetadataList;
174-
}
175204

176-
public List<SegmentZKMetadata> getNonConsumingSegmentsZKMetadataForRealtimeTable(String tableNameWithType) {
177-
IdealState idealState = _clusterInfoAccessor.getIdealState(tableNameWithType);
178-
Set<String> idealStateSegments = idealState.getPartitionSet();
179-
List<SegmentZKMetadata> segmentZKMetadataList = _clusterInfoAccessor.getSegmentsZKMetadata(tableNameWithType);
180-
List<SegmentZKMetadata> selectedSegmentZKMetadataList = new ArrayList<>();
181-
for (SegmentZKMetadata segmentZKMetadata : segmentZKMetadataList) {
182-
String segmentName = segmentZKMetadata.getSegmentName();
183-
if (idealStateSegments.contains(segmentName)
184-
&& segmentZKMetadata.getStatus().isCompleted() // skip consuming segments
185-
&& !idealState.getInstanceStateMap(segmentName).containsValue(SegmentStateModel.CONSUMING)) {
186-
// The last check is for an edge case where
187-
// 1. SegmentZKMetadata was updated to DONE in segment commit protocol, but
188-
// 2. IdealState for the segment was not updated to ONLINE due to some issue in the controller.
189-
// We avoid picking up such segments to allow RealtimeSegmentValidationManager to fix them.
205+
// Fallback for unit tests and custom mocks which stub getSegmentsZKMetadata() but not forEachSegmentsZKMetadata().
206+
List<SegmentZKMetadata> allSegments = _clusterInfoAccessor.getSegmentsZKMetadata(tableNameWithType);
207+
if (allSegments == null) {
208+
return selectedSegmentZKMetadataList;
209+
}
210+
for (SegmentZKMetadata segmentZKMetadata : allSegments) {
211+
if (segmentsForTable.contains(segmentZKMetadata.getSegmentName())
212+
&& (segmentMetadataFilter == null || segmentMetadataFilter.test(segmentZKMetadata))) {
190213
selectedSegmentZKMetadataList.add(segmentZKMetadata);
191214
}
192215
}

0 commit comments

Comments
 (0)