Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -628,10 +628,10 @@ public void deleteTopicConfig(final String topic) {

public TopicConfigSerializeWrapper buildTopicConfigSerializeWrapper() {
TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper();
topicConfigSerializeWrapper.setTopicConfigTable(this.topicConfigTable);
DataVersion dataVersionCopy = new DataVersion();
dataVersionCopy.assignNewOne(this.dataVersion);
topicConfigSerializeWrapper.setDataVersion(dataVersionCopy);
topicConfigSerializeWrapper.setTopicConfigTable(new ConcurrentHashMap<>(this.topicConfigTable));
return topicConfigSerializeWrapper;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;

Expand All @@ -39,6 +40,7 @@
import org.apache.rocketmq.common.utils.QueueTypeUtils;
import org.apache.rocketmq.remoting.protocol.DataVersion;
import org.apache.rocketmq.remoting.protocol.body.TopicConfigAndMappingSerializeWrapper;
import org.apache.rocketmq.remoting.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.junit.Assert;
Expand Down Expand Up @@ -403,7 +405,7 @@ public void testSubTopicConfigTable() {
}

@Test
public void testBuildSerializeWrapperUpdatesDataVersionWhenSplitRegistrationEnabled() {
public void testSplitRegistrationBumpsVersion() {
brokerController.getBrokerConfig().setEnableSplitRegistration(true);
long counterBefore = topicConfigManager.getDataVersion().getCounter().get();

Expand All @@ -414,4 +416,36 @@ public void testBuildSerializeWrapperUpdatesDataVersionWhenSplitRegistrationEnab
Assert.assertEquals(counterBefore + 1, counterAfter);
Assert.assertEquals(counterAfter, wrapper.getDataVersion().getCounter().get());
}

@Test
public void testSerializeWrapperIsSnapshot() {
topicConfigManager.getTopicConfigTable().clear();

for (int i = 0; i < 10; i++) {
TopicConfig tc = new TopicConfig("BaseTopic-" + i, 4, 4,
PermName.PERM_READ | PermName.PERM_WRITE);
topicConfigManager.getTopicConfigTable().put(tc.getTopicName(), tc);
}

TopicConfigSerializeWrapper wrapper = topicConfigManager.buildTopicConfigSerializeWrapper();
ConcurrentMap<String, TopicConfig> wrapperTable = wrapper.getTopicConfigTable();
int sizeAtBuild = wrapperTable.size();
long versionAtBuild = wrapper.getDataVersion().getCounter().get();

// Wrapper should hold an independent snapshot, not the live reference
Assert.assertNotSame(topicConfigManager.getTopicConfigTable(), wrapperTable);

// Simulate a concurrent topic update after the wrapper was built
TopicConfig newTopic = new TopicConfig("NewTopic-AfterBuild", 8, 8,
PermName.PERM_READ | PermName.PERM_WRITE);
topicConfigManager.getTopicConfigTable().put(newTopic.getTopicName(), newTopic);
topicConfigManager.updateDataVersion();

// The wrapper's table should NOT see the new topic
Assert.assertFalse(wrapperTable.containsKey("NewTopic-AfterBuild"));
Assert.assertEquals(sizeAtBuild, wrapperTable.size());

// The wrapper's version should reflect a consistent point-in-time snapshot
Assert.assertEquals(versionAtBuild, wrapper.getDataVersion().getCounter().get());
}
}
Loading