Skip to content

Commit 908f9dc

Browse files
author
hankunming
committed
make the json format of ConsumeRunningInfo and HeartBeatData compatible with RocketMQ Java main project
1 parent 6523dcc commit 908f9dc

File tree

2 files changed

+50
-6
lines changed

2 files changed

+50
-6
lines changed

src/protocol/ConsumerRunningInfo.cpp

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -104,16 +104,18 @@ string ConsumerRunningInfo::encode() {
104104
Json::FastWriter fastwrite;
105105
string finals = fastwrite.write(root);
106106
string key = "\"mqTable\":";
107-
key.append("{");
107+
key.append("[");
108108
for (map<MessageQueue, ProcessQueueInfo>::iterator it = mqTable.begin(); it != mqTable.end(); ++it) {
109+
key.append("[");
109110
key.append((it->first).toJson().toStyledString());
110111
key.erase(key.end() - 1);
111-
key.append(":");
112+
key.append(",");
112113
key.append((it->second).toJson().toStyledString());
114+
key.append("]");
113115
key.append(",");
114116
}
115117
key.erase(key.end() - 1);
116-
key.append("}");
118+
key.append("]");
117119

118120
// insert mqTable to final string
119121
key.append(",");

src/protocol/HeartbeatData.h

Lines changed: 45 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,51 @@ class ConsumerData {
5050
Json::Value toJson() const {
5151
Json::Value outJson;
5252
outJson["groupName"] = groupName;
53-
outJson["consumeFromWhere"] = consumeFromWhere;
54-
outJson["consumeType"] = consumeType;
55-
outJson["messageModel"] = messageModel;
53+
54+
switch (consumeFromWhere) {
55+
case CONSUME_FROM_FIRST_OFFSET:
56+
outJson["consumeFromWhere"] = "CONSUME_FROM_FIRST_OFFSET";
57+
break;
58+
case CONSUME_FROM_TIMESTAMP:
59+
outJson["consumeFromWhere"] = "CONSUME_FROM_TIMESTAMP";
60+
break;
61+
case CONSUME_FROM_LAST_OFFSET:
62+
outJson["consumeFromWhere"] = "CONSUME_FROM_LAST_OFFSET";
63+
break;
64+
case CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST:
65+
outJson["consumeFromWhere"] = "CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST";
66+
break;
67+
case CONSUME_FROM_MAX_OFFSET:
68+
outJson["consumeFromWhere"] = "CONSUME_FROM_MAX_OFFSET";
69+
break;
70+
case CONSUME_FROM_MIN_OFFSET:
71+
outJson["consumeFromWhere"] = "CONSUME_FROM_MIN_OFFSET";
72+
break;
73+
default:
74+
break;
75+
}
76+
77+
switch (consumeType) {
78+
case CONSUME_ACTIVELY:
79+
outJson["consumeType"] = "CONSUME_ACTIVELY";
80+
break;
81+
case CONSUME_PASSIVELY:
82+
outJson["consumeType"] = "CONSUME_PASSIVELY";
83+
break;
84+
default:
85+
break;
86+
}
87+
88+
switch (messageModel) {
89+
case CLUSTERING:
90+
outJson["messageModel"] = "CLUSTERING";
91+
break;
92+
case BROADCASTING:
93+
outJson["messageModel"] = "BROADCASTING";
94+
break;
95+
default:
96+
break;
97+
}
5698

5799
vector<SubscriptionData>::const_iterator it = subscriptionDataSet.begin();
58100
for (; it != subscriptionDataSet.end(); it++) {

0 commit comments

Comments
 (0)