Skip to content

Commit ad90f31

Browse files
committed
address review comments
1 parent d27a207 commit ad90f31

5 files changed

Lines changed: 30 additions & 44 deletions

File tree

clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import com.linkedin.venice.client.store.ClientConfig;
4646
import com.linkedin.venice.client.store.ClientFactory;
4747
import com.linkedin.venice.common.VeniceSystemStoreType;
48+
import com.linkedin.venice.common.VeniceSystemStoreUtils;
4849
import com.linkedin.venice.exceptions.DiskLimitExhaustedException;
4950
import com.linkedin.venice.exceptions.VeniceException;
5051
import com.linkedin.venice.exceptions.VeniceNoStoreException;
@@ -564,12 +565,11 @@ public SubscriptionBasedReadOnlyStoreRepository getStoreRepository() {
564565

565566
/**
566567
* Resolves and returns the store object for the given store name.
567-
* For user system stores, this returns the SystemStoreAttributes from the parent user store.
568-
* For regular stores and shared system stores, this returns the Store object directly.
568+
* For user system stores, it finds the RT name from the SystemStoreAttributes of the parent user store.
569+
* For regular user stores and top level system stores, it finds the RT name from the Store object directly.
569570
*/
570571
private String getRealTimeTopicName(String storeName) {
571-
VeniceSystemStoreType systemStoreType = VeniceSystemStoreType.getSystemStoreType(storeName);
572-
if (systemStoreType != null) {
572+
if (VeniceSystemStoreUtils.isUserSystemStore(storeName)) {
573573
// it is a user system store
574574
String userStoreName = VeniceSystemStoreType.extractUserStoreName(storeName);
575575
Store userStore = storeRepository.getStore(userStoreName);

clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -394,7 +394,7 @@ public KafkaStoreIngestionService(
394394
pubSubTopicRepository,
395395
serverConfig.getMetaStoreWriterCloseTimeoutInMS(),
396396
serverConfig.getMetaStoreWriterCloseConcurrency(),
397-
storeName -> Utils.getRealTimeTopicNameForSystemStore(metadataRepo.getStore(storeName)));
397+
storeName -> Utils.getRealTimeTopicName(metadataRepo.getStore(storeName)));
398398
metadataRepo.registerStoreDataChangedListener(new StoreDataChangedListener() {
399399
@Override
400400
public void handleStoreDeleted(Store store) {

internal/venice-common/src/main/java/com/linkedin/venice/utils/Utils.java

Lines changed: 16 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
import com.fasterxml.jackson.core.type.TypeReference;
77
import com.fasterxml.jackson.databind.ObjectMapper;
88
import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper;
9-
import com.linkedin.venice.common.VeniceSystemStoreType;
9+
import com.linkedin.venice.common.VeniceSystemStoreUtils;
1010
import com.linkedin.venice.controllerapi.ControllerResponse;
1111
import com.linkedin.venice.exceptions.ConfigurationException;
1212
import com.linkedin.venice.exceptions.ErrorType;
@@ -633,6 +633,21 @@ public static String composeRealTimeTopic(String storeName, int versionNumber) {
633633
}
634634
}
635635

636+
public static int largestUsedRTVersionNumber(Store store) {
637+
if (store.isSystemStore() && VeniceSystemStoreUtils.isUserSystemStore(store.getName())) {
638+
// store is a user level system store
639+
Store userStore = store instanceof SystemStore ? ((SystemStore) store).getVeniceStore() : store;
640+
return userStore.getLargestUsedRTVersionNumber();
641+
} else {
642+
// store is a regular user store or a top level system store
643+
return store.getLargestUsedRTVersionNumber();
644+
}
645+
}
646+
647+
public static String getRealTimeTopicName(Store store) {
648+
return getRealTimeTopicName(store, largestUsedRTVersionNumber(store));
649+
}
650+
636651
/**
637652
* It follows the following order to search for real time topic name,
638653
* i) current store-version config, ii) store config, iii) other store-version configs, iv) default name
@@ -646,13 +661,6 @@ public static String getRealTimeTopicName(Store store, int rtVersionNumber) {
646661
rtVersionNumber);
647662
}
648663

649-
public static String getRealTimeTopicName(Store store) {
650-
if (store instanceof SystemStore) {
651-
return getRealTimeTopicName(store, ((SystemStore) store).getVeniceStore().getLargestUsedRTVersionNumber());
652-
}
653-
return getRealTimeTopicName(store, DEFAULT_RT_VERSION_NUMBER);
654-
}
655-
656664
public static String getRealTimeTopicName(StoreInfo storeInfo) {
657665
return getRealTimeTopicName(
658666
storeInfo.getName(),
@@ -749,19 +757,6 @@ public static String getSeparateRealTimeTopicName(StoreInfo storeInfo) {
749757
return getSeparateRealTimeTopicName(Utils.getRealTimeTopicName(storeInfo));
750758
}
751759

752-
public static String getRealTimeTopicNameForSystemStore(Store systemStore) {
753-
int largestUsedRTVersionNumber;
754-
VeniceSystemStoreType type = VeniceSystemStoreType.getSystemStoreType(systemStore.getName());
755-
if (type != null && systemStore.isSystemStore()) {
756-
// systemStore is a user system store
757-
largestUsedRTVersionNumber = ((SystemStore) systemStore).getVeniceStore().getLargestUsedRTVersionNumber();
758-
} else {
759-
// systemStore is a zkShared system store
760-
largestUsedRTVersionNumber = systemStore.getLargestUsedRTVersionNumber();
761-
}
762-
return Utils.getRealTimeTopicName(systemStore, largestUsedRTVersionNumber);
763-
}
764-
765760
public static int calculateTopicHashCode(PubSubTopic topic) {
766761
if (topic.isSeparateRealTimeTopic()) {
767762
String realTimeTopicName = Utils.getRealTimeTopicNameFromSeparateRealTimeTopic(topic.getName());

internal/venice-common/src/test/java/com/linkedin/venice/system/store/MetaStoreWriterTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ public void testGetOrCreateMetaStoreWriterWithSystemStore() {
9292
pubSubTopicRepository,
9393
5000L,
9494
2,
95-
storeName -> Utils.getRealTimeTopicNameForSystemStore(systemStore));
95+
storeName -> Utils.getRealTimeTopicName(systemStore));
9696

9797
Assert.assertNotNull(metaStoreWriter.getOrCreateMetaStoreWriter(systemStoreName));
9898
verify(systemStore, times(1)).getVeniceStore();
@@ -135,7 +135,7 @@ public void testGetOrCreateMetaStoreWriterWithRegularStore() {
135135
pubSubTopicRepository,
136136
5000L,
137137
2,
138-
storeName1 -> Utils.getRealTimeTopicNameForSystemStore(regularStore));
138+
storeName1 -> Utils.getRealTimeTopicName(regularStore));
139139

140140
Assert.assertNotNull(metaStoreWriter.getOrCreateMetaStoreWriter(storeName));
141141
}
@@ -177,7 +177,7 @@ public void testGetOrCreateMetaStoreWriterWithNonSystemStoreType() {
177177
pubSubTopicRepository,
178178
5000L,
179179
2,
180-
storeName1 -> Utils.getRealTimeTopicNameForSystemStore(store));
180+
storeName1 -> Utils.getRealTimeTopicName(store));
181181

182182
Assert.assertNotNull(metaStoreWriter.getOrCreateMetaStoreWriter(storeName));
183183
}

services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java

Lines changed: 6 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,6 @@
169169
import com.linkedin.venice.meta.StoreInfo;
170170
import com.linkedin.venice.meta.StoreName;
171171
import com.linkedin.venice.meta.StoreVersionInfo;
172-
import com.linkedin.venice.meta.SystemStore;
173172
import com.linkedin.venice.meta.SystemStoreAttributes;
174173
import com.linkedin.venice.meta.VeniceETLStrategy;
175174
import com.linkedin.venice.meta.Version;
@@ -1253,14 +1252,13 @@ private void configureNewStore(
12531252
/* If this store existed previously, we do not want to use the same RT topic name that was used by the previous
12541253
store. To ensure this, increase largestUsedRTVersionNumber and new RT name will be different */
12551254
if (config.isRealTimeTopicVersioningEnabled()) {
1256-
VeniceSystemStoreType type = VeniceSystemStoreType.getSystemStoreType(newStore.getName());
12571255
int newNumber;
1258-
if (type == null && newStore.isSystemStore()) {
1259-
// Top-level shared ZK store
1260-
newNumber = largestUsedRTVersionNumber;
1261-
} else {
1262-
// User-level system store OR regular store
1256+
if (!newStore.isSystemStore() || VeniceSystemStoreUtils.isUserSystemStore(newStore.getName())) {
1257+
// user level system store OR regular store
12631258
newNumber = largestUsedRTVersionNumber + 1;
1259+
} else {
1260+
// top level system store
1261+
newNumber = largestUsedRTVersionNumber;
12641262
}
12651263

12661264
newStore.setLargestUsedRTVersionNumber(newNumber);
@@ -3008,14 +3006,7 @@ private Pair<Boolean, Version> addVersion(
30083006
boolean versionSwapDeferred,
30093007
int repushSourceVersion) {
30103008
Store store = getStore(clusterName, storeName);
3011-
int largestUsedRTVersionNumber;
3012-
VeniceSystemStoreType type = VeniceSystemStoreType.getSystemStoreType(store.getName());
3013-
if (type != null && store.isSystemStore()) {
3014-
largestUsedRTVersionNumber = ((SystemStore) store).getVeniceStore().getLargestUsedRTVersionNumber();
3015-
} else {
3016-
largestUsedRTVersionNumber = store.getLargestUsedRTVersionNumber();
3017-
}
3018-
3009+
int largestUsedRTVersionNumber = Utils.largestUsedRTVersionNumber(store);
30193010
return addVersion(
30203011
clusterName,
30213012
storeName,

0 commit comments

Comments
 (0)