Skip to content

Commit 473c632

Browse files
committed
Add batched SegmentZKMetadata iteration API and migrate controller usages
1 parent 657e7f0 commit 473c632

11 files changed

Lines changed: 390 additions & 133 deletions

File tree

pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import com.fasterxml.jackson.core.JsonProcessingException;
2222
import com.google.common.annotations.VisibleForTesting;
23+
import com.google.common.base.Preconditions;
2324
import java.io.IOException;
2425
import java.util.ArrayList;
2526
import java.util.Collections;
@@ -28,6 +29,7 @@
2829
import java.util.Map;
2930
import java.util.Objects;
3031
import java.util.Optional;
32+
import java.util.function.Consumer;
3133
import java.util.stream.Collectors;
3234
import javax.annotation.Nullable;
3335
import org.apache.commons.lang3.tuple.ImmutablePair;
@@ -69,6 +71,8 @@ public class ZKMetadataProvider {
6971
private ZKMetadataProvider() {
7072
}
7173

74+
public static final int DEFAULT_SEGMENTS_ZK_METADATA_BATCH_SIZE = 1000;
75+
7276
private static final Logger LOGGER = LoggerFactory.getLogger(ZKMetadataProvider.class);
7377
private static final String CLUSTER_TENANT_ISOLATION_ENABLED_KEY = "tenantIsolationEnabled";
7478
private static final String CLUSTER_APPLICATION_QUOTAS = "applicationQuotas";
@@ -725,6 +729,57 @@ public static List<SegmentZKMetadata> getSegmentsZKMetadata(ZkHelixPropertyStore
725729
}
726730
}
727731

732+
/**
733+
* Iterates over the segment ZK metadata for the table and applies the provided consumer to each non-null segment
734+
* metadata.
735+
*
736+
* @param propertyStore Helix property store from which segment metadata is read.
737+
* @param tableNameWithType Table name with type suffix (e.g. {@code myTable_OFFLINE}).
738+
* @param batchSize Batch size for ZK get calls.
739+
* @param consumer Consumer invoked for each non-null segment metadata.
740+
*/
741+
public static void forEachSegmentZKMetadata(ZkHelixPropertyStore<ZNRecord> propertyStore, String tableNameWithType,
742+
int batchSize, Consumer<SegmentZKMetadata> consumer) {
743+
Preconditions.checkArgument(batchSize > 0, "Segment metadata batchSize must be greater than 0: %s", batchSize);
744+
745+
String segmentsPath = constructPropertyStorePathForResource(tableNameWithType);
746+
List<String> segmentNames = getSegments(propertyStore, tableNameWithType);
747+
if (segmentNames == null || segmentNames.isEmpty()) {
748+
LOGGER.debug("No segments found under path: {}", segmentsPath);
749+
return;
750+
}
751+
752+
for (int startIndex = 0; startIndex < segmentNames.size(); startIndex += batchSize) {
753+
int endIndex = Math.min(startIndex + batchSize, segmentNames.size());
754+
List<String> segmentNameBatch = segmentNames.subList(startIndex, endIndex);
755+
756+
List<String> segmentPathBatch = new ArrayList<>(segmentNameBatch.size());
757+
for (String segmentName : segmentNameBatch) {
758+
segmentPathBatch.add(constructPropertyStorePathForSegment(tableNameWithType, segmentName));
759+
}
760+
761+
List<ZNRecord> znRecords = propertyStore.get(segmentPathBatch, null, AccessOption.PERSISTENT);
762+
int numNullRecords = 0;
763+
if (znRecords != null) {
764+
for (int i = 0; i < segmentNameBatch.size(); i++) {
765+
ZNRecord znRecord = i < znRecords.size() ? znRecords.get(i) : null;
766+
if (znRecord == null) {
767+
numNullRecords++;
768+
} else {
769+
consumer.accept(new SegmentZKMetadata(znRecord));
770+
}
771+
}
772+
} else {
773+
numNullRecords = segmentNameBatch.size();
774+
}
775+
776+
if (numNullRecords > 0) {
777+
LOGGER.warn("Failed to read {}/{} segment ZK metadata under path: {} for table: {}",
778+
numNullRecords, segmentNameBatch.size(), segmentsPath, tableNameWithType);
779+
}
780+
}
781+
}
782+
728783
/**
729784
* Returns the segments for the given table.
730785
*
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pinot.common.metadata;
20+
21+
import java.util.ArrayList;
22+
import java.util.Arrays;
23+
import java.util.Collections;
24+
import java.util.List;
25+
import org.apache.helix.AccessOption;
26+
import org.apache.helix.store.zk.ZkHelixPropertyStore;
27+
import org.apache.helix.zookeeper.datamodel.ZNRecord;
28+
import org.apache.zookeeper.data.Stat;
29+
import org.mockito.ArgumentMatchers;
30+
import org.mockito.Mockito;
31+
import org.testng.Assert;
32+
import org.testng.annotations.Test;
33+
34+
/**
35+
* Tests for {@link ZKMetadataProvider}.
36+
*/
37+
public class ZKMetadataProviderTest {
38+
private static final String TABLE_NAME_WITH_TYPE = "testTable_OFFLINE";
39+
40+
@Test
41+
public void testForEachSegmentZKMetadataEmptySegments() {
42+
ZkHelixPropertyStore<ZNRecord> mockPropertyStore = Mockito.mock(ZkHelixPropertyStore.class);
43+
String segmentsPath = ZKMetadataProvider.constructPropertyStorePathForResource(TABLE_NAME_WITH_TYPE);
44+
Mockito.when(mockPropertyStore.exists(segmentsPath, AccessOption.PERSISTENT)).thenReturn(true);
45+
Mockito.when(
46+
mockPropertyStore.getChildNames(segmentsPath, AccessOption.PERSISTENT)).thenReturn(Collections.emptyList());
47+
48+
List<String> segmentNames = new ArrayList<>();
49+
ZKMetadataProvider.forEachSegmentZKMetadata(mockPropertyStore, TABLE_NAME_WITH_TYPE, 2,
50+
segmentZKMetadata -> segmentNames.add(segmentZKMetadata.getSegmentName()));
51+
52+
Assert.assertTrue(segmentNames.isEmpty());
53+
Mockito.verify(mockPropertyStore, Mockito.never())
54+
.get(ArgumentMatchers.anyList(), Mockito.<List<Stat>>anyList(), ArgumentMatchers.eq(AccessOption.PERSISTENT));
55+
}
56+
57+
@Test
58+
public void testForEachSegmentZKMetadataBatchesAndNullRecords() {
59+
ZkHelixPropertyStore<ZNRecord> mockPropertyStore = Mockito.mock(ZkHelixPropertyStore.class);
60+
String segmentsPath = ZKMetadataProvider.constructPropertyStorePathForResource(TABLE_NAME_WITH_TYPE);
61+
List<String> segmentNames = Arrays.asList("segment-1", "segment-2", "segment-3", "segment-4");
62+
Mockito.when(mockPropertyStore.exists(segmentsPath, AccessOption.PERSISTENT)).thenReturn(true);
63+
Mockito.when(mockPropertyStore.getChildNames(segmentsPath, AccessOption.PERSISTENT)).thenReturn(segmentNames);
64+
65+
List<List<String>> requestedBatches = new ArrayList<>();
66+
Mockito.when(mockPropertyStore.get(ArgumentMatchers.anyList(), Mockito.<List<Stat>>anyList(),
67+
ArgumentMatchers.eq(AccessOption.PERSISTENT))).thenAnswer(invocation -> {
68+
List<String> requestedSegments = invocation.getArgument(0);
69+
requestedBatches.add(new ArrayList<>(requestedSegments));
70+
if (requestedSegments.equals(Arrays.asList(constructSegmentMetadataPath("segment-1"),
71+
constructSegmentMetadataPath("segment-2")))) {
72+
return Arrays.asList(createSegmentMetadata("segment-1"), null);
73+
}
74+
if (requestedSegments.equals(Arrays.asList(constructSegmentMetadataPath("segment-3"),
75+
constructSegmentMetadataPath("segment-4")))) {
76+
return Collections.singletonList(createSegmentMetadata("segment-3"));
77+
}
78+
return Collections.emptyList();
79+
});
80+
81+
List<String> consumedSegments = new ArrayList<>();
82+
ZKMetadataProvider.forEachSegmentZKMetadata(mockPropertyStore, TABLE_NAME_WITH_TYPE, 2,
83+
segmentZKMetadata -> consumedSegments.add(segmentZKMetadata.getSegmentName()));
84+
85+
Assert.assertEquals(consumedSegments, Arrays.asList("segment-1", "segment-3"));
86+
Assert.assertEquals(requestedBatches, Arrays.asList(
87+
Arrays.asList(constructSegmentMetadataPath("segment-1"), constructSegmentMetadataPath("segment-2")),
88+
Arrays.asList(constructSegmentMetadataPath("segment-3"), constructSegmentMetadataPath("segment-4"))));
89+
}
90+
91+
@Test
92+
public void testForEachSegmentZKMetadataRequiresPositiveBatchSize() {
93+
ZkHelixPropertyStore<ZNRecord> mockPropertyStore = Mockito.mock(ZkHelixPropertyStore.class);
94+
Assert.assertThrows(IllegalArgumentException.class,
95+
() -> ZKMetadataProvider.forEachSegmentZKMetadata(mockPropertyStore, TABLE_NAME_WITH_TYPE, 0,
96+
segmentZKMetadata -> {
97+
}));
98+
}
99+
100+
private ZNRecord createSegmentMetadata(String segmentName) {
101+
return new ZNRecord(segmentName);
102+
}
103+
104+
private String constructSegmentMetadataPath(String segmentName) {
105+
return ZKMetadataProvider.constructPropertyStorePathForSegment(TABLE_NAME_WITH_TYPE, segmentName);
106+
}
107+
}

pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,9 @@ public static class ControllerPeriodicTasksConf {
273273
public static final String AGED_SEGMENTS_DELETION_BATCH_SIZE =
274274
"controller.retentionManager.agedSegmentsDeletionBatchSize";
275275
public static final int DEFAULT_AGED_SEGMENTS_DELETION_BATCH_SIZE = 1000;
276+
public static final String SEGMENTS_ZK_METADATA_BATCH_SIZE =
277+
"controller.retentionManager.segmentsZkMetadataBatchSize";
278+
public static final int DEFAULT_SEGMENTS_ZK_METADATA_BATCH_SIZE = 1000;
276279
public static final int MIN_INITIAL_DELAY_IN_SECONDS = 120;
277280
public static final int MAX_INITIAL_DELAY_IN_SECONDS = 300;
278281
public static final int DEFAULT_SPLIT_COMMIT_TMP_SEGMENT_LIFETIME_SECOND = 60 * 60; // 1 Hour.
@@ -1226,6 +1229,15 @@ public void setAgedSegmentsDeletionBatchSize(int agedSegmentsDeletionBatchSize)
12261229
setProperty(ControllerPeriodicTasksConf.AGED_SEGMENTS_DELETION_BATCH_SIZE, agedSegmentsDeletionBatchSize);
12271230
}
12281231

1232+
public int getSegmentsZKMetadataBatchSize() {
1233+
return getProperty(ControllerPeriodicTasksConf.SEGMENTS_ZK_METADATA_BATCH_SIZE,
1234+
ControllerPeriodicTasksConf.DEFAULT_SEGMENTS_ZK_METADATA_BATCH_SIZE);
1235+
}
1236+
1237+
public void setSegmentsZKMetadataBatchSize(int segmentZKMetadataBatchSize) {
1238+
setProperty(ControllerPeriodicTasksConf.SEGMENTS_ZK_METADATA_BATCH_SIZE, segmentZKMetadataBatchSize);
1239+
}
1240+
12291241
public long getPinotTaskManagerInitialDelaySeconds() {
12301242
return getPeriodicTaskInitialDelayInSeconds();
12311243
}

pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -772,7 +772,6 @@ public Map<String, Map<String, String>> getZookeeperMetadata(
772772
Map<String, Map<String, String>> segmentToMetadataMap = new HashMap<>();
773773
List<SegmentZKMetadata> segmentZKMetadataList =
774774
_pinotHelixResourceManager.getSegmentsZKMetadata(tableNameWithType);
775-
776775
for (SegmentZKMetadata segmentZKMetadata : segmentZKMetadataList) {
777776
segmentToMetadataMap.put(segmentZKMetadata.getSegmentName(), segmentZKMetadata.toMap());
778777
}

pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java

Lines changed: 22 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -902,20 +902,17 @@ public List<String> getSegmentsFor(String tableNameWithType, boolean shouldExclu
902902
selectedSegments = new ArrayList<>(segmentSet);
903903
} else {
904904
selectedSegments = new ArrayList<>();
905-
List<SegmentZKMetadata> segmentZKMetadataList = getSegmentsZKMetadata(tableNameWithType);
906905
ArrayList<String> filteredSegments = new ArrayList<>();
907-
for (SegmentZKMetadata segmentZKMetadata : segmentZKMetadataList) {
906+
forEachSegmentsZKMetadata(tableNameWithType, ZKMetadataProvider.DEFAULT_SEGMENTS_ZK_METADATA_BATCH_SIZE,
907+
segmentZKMetadata -> {
908908
String segmentName = segmentZKMetadata.getSegmentName();
909909
// Compute the intersection of segmentZK metadata and idealstate for valid segments
910910
if (!segmentSet.contains(segmentName)) {
911911
filteredSegments.add(segmentName);
912-
continue;
913-
}
914-
// Filter by time if the time range is specified
915-
if (isSegmentWithinTimeStamps(segmentZKMetadata, startTimestamp, endTimestamp, excludeOverlapping)) {
912+
} else if (isSegmentWithinTimeStamps(segmentZKMetadata, startTimestamp, endTimestamp, excludeOverlapping)) {
916913
selectedSegments.add(segmentName);
917914
}
918-
}
915+
});
919916
LOGGER.info(
920917
"Successfully computed the segments for table : {}. # of filtered segments: {}, the filtered segment list: "
921918
+ "{}. Only showing up to 100 filtered segments.", tableNameWithType, filteredSegments.size(),
@@ -995,13 +992,23 @@ public SegmentZKMetadata getSegmentZKMetadata(String tableNameWithType, String s
995992
return ZKMetadataProvider.getSegmentZKMetadata(_propertyStore, tableNameWithType, segmentName);
996993
}
997994

995+
public void forEachSegmentsZKMetadata(String tableNameWithType, int batchSize,
996+
Consumer<SegmentZKMetadata> segmentMetadataConsumer) {
997+
ZKMetadataProvider.forEachSegmentZKMetadata(_propertyStore, tableNameWithType, batchSize, segmentMetadataConsumer);
998+
}
999+
1000+
public void forEachSegmentsZKMetadata(String tableNameWithType, Consumer<SegmentZKMetadata> segmentMetadataConsumer) {
1001+
ZKMetadataProvider.forEachSegmentZKMetadata(_propertyStore, tableNameWithType,
1002+
ZKMetadataProvider.DEFAULT_SEGMENTS_ZK_METADATA_BATCH_SIZE, segmentMetadataConsumer);
1003+
}
1004+
9981005
public List<SegmentZKMetadata> getSegmentsZKMetadata(String tableNameWithType) {
9991006
return ZKMetadataProvider.getSegmentsZKMetadata(_propertyStore, tableNameWithType);
10001007
}
10011008

10021009
public Collection<String> getLastLLCCompletedSegments(String tableNameWithType) {
10031010
Map<Integer, String> partitionIdToLastLLCCompletedSegmentMap = new HashMap<>();
1004-
for (SegmentZKMetadata zkMetadata : getSegmentsZKMetadata(tableNameWithType)) {
1011+
forEachSegmentsZKMetadata(tableNameWithType, zkMetadata -> {
10051012
if (zkMetadata.getStatus() == CommonConstants.Segment.Realtime.Status.DONE) {
10061013
LLCSegmentName llcName = LLCSegmentName.of(zkMetadata.getSegmentName());
10071014
int partitionGroupId = llcName.getPartitionGroupId();
@@ -1012,7 +1019,7 @@ public Collection<String> getLastLLCCompletedSegments(String tableNameWithType)
10121019
partitionIdToLastLLCCompletedSegmentMap.put(partitionGroupId, zkMetadata.getSegmentName());
10131020
}
10141021
}
1015-
}
1022+
});
10161023
return partitionIdToLastLLCCompletedSegmentMap.values();
10171024
}
10181025

@@ -1510,13 +1517,12 @@ public void addSchema(Schema schema, boolean override, boolean force)
15101517

15111518
public void updateSegmentsZKTimeInterval(String tableNameWithType, DateTimeFieldSpec timeColumnFieldSpec) {
15121519
LOGGER.info("Updating segment time interval in ZK metadata for table: {}", tableNameWithType);
1513-
1514-
List<SegmentZKMetadata> segmentZKMetadataList = getSegmentsZKMetadata(tableNameWithType);
1515-
for (SegmentZKMetadata segmentZKMetadata : segmentZKMetadataList) {
1516-
int version = segmentZKMetadata.toZNRecord().getVersion();
1517-
updateZkTimeInterval(segmentZKMetadata, timeColumnFieldSpec);
1518-
updateZkMetadata(tableNameWithType, segmentZKMetadata, version);
1519-
}
1520+
forEachSegmentsZKMetadata(tableNameWithType,
1521+
ZKMetadataProvider.DEFAULT_SEGMENTS_ZK_METADATA_BATCH_SIZE, segmentZKMetadata -> {
1522+
int version = segmentZKMetadata.toZNRecord().getVersion();
1523+
updateZkTimeInterval(segmentZKMetadata, timeColumnFieldSpec);
1524+
updateZkMetadata(tableNameWithType, segmentZKMetadata, version);
1525+
});
15201526
}
15211527

15221528
public void updateSchema(Schema schema, boolean reload, boolean forceTableSchemaUpdate)

pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/ClusterInfoAccessor.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.List;
2323
import java.util.Map;
2424
import java.util.concurrent.Executor;
25+
import java.util.function.Consumer;
2526
import javax.annotation.Nullable;
2627
import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager;
2728
import org.apache.helix.HelixDataAccessor;
@@ -115,6 +116,15 @@ public List<SegmentZKMetadata> getSegmentsZKMetadata(String tableNameWithType) {
115116
return ZKMetadataProvider.getSegmentsZKMetadata(_pinotHelixResourceManager.getPropertyStore(), tableNameWithType);
116117
}
117118

119+
public void forEachSegmentsZKMetadata(String tableNameWithType, int batchSize,
120+
Consumer<SegmentZKMetadata> segmentMetadataConsumer) {
121+
_pinotHelixResourceManager.forEachSegmentsZKMetadata(tableNameWithType, batchSize, segmentMetadataConsumer);
122+
}
123+
124+
public void forEachSegmentsZKMetadata(String tableNameWithType, Consumer<SegmentZKMetadata> segmentMetadataConsumer) {
125+
_pinotHelixResourceManager.forEachSegmentsZKMetadata(tableNameWithType, segmentMetadataConsumer);
126+
}
127+
118128
public IdealState getIdealState(String tableNameWithType) {
119129
return _pinotHelixResourceManager.getTableIdealState(tableNameWithType);
120130
}

0 commit comments

Comments
 (0)