From fb6a6a24ff77819755e09e992fdbc325844ae77c Mon Sep 17 00:00:00 2001 From: "943379410@qq.com" <943379410@qq.com> Date: Sun, 7 Jun 2026 20:48:17 +0800 Subject: [PATCH] [ISSUE #10438] Fix buildTopicConfigSerializeWrapper exposing live topicConfigTable reference --- .../broker/topic/TopicConfigManager.java | 2 +- .../broker/topic/TopicConfigManagerTest.java | 36 ++++++++++++++++++- 2 files changed, 36 insertions(+), 2 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java index b481242b121..488cedef799 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java @@ -628,10 +628,10 @@ public void deleteTopicConfig(final String topic) { public TopicConfigSerializeWrapper buildTopicConfigSerializeWrapper() { TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper(); - topicConfigSerializeWrapper.setTopicConfigTable(this.topicConfigTable); DataVersion dataVersionCopy = new DataVersion(); dataVersionCopy.assignNewOne(this.dataVersion); topicConfigSerializeWrapper.setDataVersion(dataVersionCopy); + topicConfigSerializeWrapper.setTopicConfigTable(new ConcurrentHashMap<>(this.topicConfigTable)); return topicConfigSerializeWrapper; } diff --git a/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicConfigManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicConfigManagerTest.java index af1066a4d0c..085c3f9cfb2 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicConfigManagerTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/topic/TopicConfigManagerTest.java @@ -23,6 +23,7 @@ import java.util.Optional; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicLong; @@ -39,6 +40,7 @@ import org.apache.rocketmq.common.utils.QueueTypeUtils; import org.apache.rocketmq.remoting.protocol.DataVersion; import org.apache.rocketmq.remoting.protocol.body.TopicConfigAndMappingSerializeWrapper; +import org.apache.rocketmq.remoting.protocol.body.TopicConfigSerializeWrapper; import org.apache.rocketmq.store.DefaultMessageStore; import org.apache.rocketmq.store.config.MessageStoreConfig; import org.junit.Assert; @@ -403,7 +405,7 @@ public void testSubTopicConfigTable() { } @Test - public void testBuildSerializeWrapperUpdatesDataVersionWhenSplitRegistrationEnabled() { + public void testSplitRegistrationBumpsVersion() { brokerController.getBrokerConfig().setEnableSplitRegistration(true); long counterBefore = topicConfigManager.getDataVersion().getCounter().get(); @@ -414,4 +416,36 @@ public void testBuildSerializeWrapperUpdatesDataVersionWhenSplitRegistrationEnab Assert.assertEquals(counterBefore + 1, counterAfter); Assert.assertEquals(counterAfter, wrapper.getDataVersion().getCounter().get()); } + + @Test + public void testSerializeWrapperIsSnapshot() { + topicConfigManager.getTopicConfigTable().clear(); + + for (int i = 0; i < 10; i++) { + TopicConfig tc = new TopicConfig("BaseTopic-" + i, 4, 4, + PermName.PERM_READ | PermName.PERM_WRITE); + topicConfigManager.getTopicConfigTable().put(tc.getTopicName(), tc); + } + + TopicConfigSerializeWrapper wrapper = topicConfigManager.buildTopicConfigSerializeWrapper(); + ConcurrentMap wrapperTable = wrapper.getTopicConfigTable(); + int sizeAtBuild = wrapperTable.size(); + long versionAtBuild = wrapper.getDataVersion().getCounter().get(); + + // Wrapper should hold an independent snapshot, not the live reference + Assert.assertNotSame(topicConfigManager.getTopicConfigTable(), wrapperTable); + + // Simulate a concurrent topic update after the wrapper was built + TopicConfig newTopic = new TopicConfig("NewTopic-AfterBuild", 8, 8, + PermName.PERM_READ | PermName.PERM_WRITE); + topicConfigManager.getTopicConfigTable().put(newTopic.getTopicName(), newTopic); + topicConfigManager.updateDataVersion(); + + // The wrapper's table should NOT see the new topic + Assert.assertFalse(wrapperTable.containsKey("NewTopic-AfterBuild")); + Assert.assertEquals(sizeAtBuild, wrapperTable.size()); + + // The wrapper's version should reflect a consistent point-in-time snapshot + Assert.assertEquals(versionAtBuild, wrapper.getDataVersion().getCounter().get()); + } }