Skip to content

Commit 985a79b

Browse files
johannbothaclaude
andauthored
Fix DSM queue names with Kafka Connect IBM MQ connectors (#10318)
* Fix DSM queue names with Kafka Connect IBM MQ connectors When using Kafka Connect with IBM MQ connectors, DSM was reporting incorrect queue names with schema-derived suffixes like _messagebody_0. This occurred because Kafka Connect schema converters add index suffixes to field names for union/optional types. This fix sanitizes queue/topic names to remove these suffixes: - _messagebody_N - _text_N - _bytes_N - _map_N - _value_N Fixes Zendesk ticket #2429181 Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * Refactor: Replace regex Pattern with string-based approach for performance Address review feedback to avoid Pattern/Matcher usage for performance reasons. Use lastIndexOf() and regionMatches() instead of regex to strip Kafka Connect schema-derived suffixes from queue names. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * Reorder suffix checks for better readability Group len==5 cases together (bytes, value) before len==3 case (map). Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 8760758 commit 985a79b

2 files changed

Lines changed: 159 additions & 1 deletion

File tree

dd-java-agent/instrumentation/jms/javax-jms-1.1/src/main/java/datadog/trace/instrumentation/jms/JMSDecorator.java

Lines changed: 60 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,61 @@ public void onTimeInQueue(AgentSpan span, CharSequence resourceName, String serv
194194

195195
private static final String TIBCO_TMP_PREFIX = "$TMP$";
196196

197+
/**
198+
* Sanitizes destination names to remove Kafka Connect schema-derived suffixes. When Kafka
199+
* Connect's IBM MQ connectors are used with schema converters (Protobuf/JSON Schema), union or
200+
* optional fields may get index suffixes like _messagebody_0 appended to the queue name.
201+
*/
202+
private static String sanitizeDestinationName(String name) {
203+
if (name == null || name.isEmpty()) {
204+
return name;
205+
}
206+
207+
int len = name.length();
208+
209+
// Check if name ends with digits (the schema index suffix)
210+
if (!Character.isDigit(name.charAt(len - 1))) {
211+
return name;
212+
}
213+
214+
// Find the underscore before the trailing digits
215+
int underscoreBeforeDigits = name.lastIndexOf('_');
216+
if (underscoreBeforeDigits <= 0) {
217+
return name;
218+
}
219+
220+
// Verify all characters after the underscore are digits
221+
for (int i = underscoreBeforeDigits + 1; i < len; i++) {
222+
if (!Character.isDigit(name.charAt(i))) {
223+
return name;
224+
}
225+
}
226+
227+
// Find the underscore before the suffix word
228+
int underscoreBeforeSuffix = name.lastIndexOf('_', underscoreBeforeDigits - 1);
229+
if (underscoreBeforeSuffix < 0) {
230+
return name;
231+
}
232+
233+
// Check if the suffix word is one of our known Kafka Connect schema suffixes (case insensitive)
234+
int suffixStart = underscoreBeforeSuffix + 1;
235+
int suffixLen = underscoreBeforeDigits - suffixStart;
236+
237+
if (isKnownKafkaConnectSuffix(name, suffixStart, suffixLen)) {
238+
return name.substring(0, underscoreBeforeSuffix);
239+
}
240+
241+
return name;
242+
}
243+
244+
private static boolean isKnownKafkaConnectSuffix(String name, int start, int len) {
245+
return (len == 11 && name.regionMatches(true, start, "messagebody", 0, 11))
246+
|| (len == 4 && name.regionMatches(true, start, "text", 0, 4))
247+
|| (len == 5 && name.regionMatches(true, start, "bytes", 0, 5))
248+
|| (len == 5 && name.regionMatches(true, start, "value", 0, 5))
249+
|| (len == 3 && name.regionMatches(true, start, "map", 0, 3));
250+
}
251+
197252
public CharSequence toResourceName(String destinationName, boolean isQueue) {
198253
if (null == destinationName) {
199254
return isQueue ? queueTempResourceName : topicTempResourceName;
@@ -229,7 +284,11 @@ public String getDestinationName(Destination destination) {
229284
} catch (Exception e) {
230285
log.debug("Unable to get jms destination name", e);
231286
}
232-
return null != name && !name.startsWith(TIBCO_TMP_PREFIX) ? name : null;
287+
if (null != name && !name.startsWith(TIBCO_TMP_PREFIX)) {
288+
// Sanitize Kafka Connect schema-derived suffixes from queue/topic names
289+
return sanitizeDestinationName(name);
290+
}
291+
return null;
233292
}
234293

235294
public boolean isQueue(Destination destination) {
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
import datadog.trace.instrumentation.jms.JMSDecorator
2+
import spock.lang.Specification
3+
4+
import javax.jms.Queue
5+
import javax.jms.Topic
6+
7+
class JMSDecoratorTest extends Specification {
8+
9+
def "test getDestinationName sanitizes Kafka Connect schema suffixes"() {
10+
given:
11+
def decorator = JMSDecorator.CONSUMER_DECORATE
12+
13+
when:
14+
def queue = Mock(Queue) {
15+
getQueueName() >> rawQueueName
16+
}
17+
def result = decorator.getDestinationName(queue)
18+
19+
then:
20+
result == expectedName
21+
22+
where:
23+
rawQueueName | expectedName
24+
// Customer reported issue: queue name with _messagebody_0 suffix from Kafka Connect IBM MQ connector
25+
// See Zendesk ticket #2429181
26+
"trainmgt.dispatch.trnsheet.p30.v1.pub_messagebody_0" | "trainmgt.dispatch.trnsheet.p30.v1.pub"
27+
28+
// Normal queue names should pass through unchanged (like customer's working pure Java apps)
29+
"ee.wo.aei.delmove.cs" | "ee.wo.aei.delmove.cs"
30+
"myqueue" | "myqueue"
31+
"my.queue.name" | "my.queue.name"
32+
33+
// Other Kafka Connect schema-derived suffixes should also be stripped
34+
"myqueue_messagebody_0" | "myqueue"
35+
"myqueue_text_0" | "myqueue"
36+
"myqueue_bytes_0" | "myqueue"
37+
"myqueue_map_0" | "myqueue"
38+
"myqueue_value_0" | "myqueue"
39+
"myqueue_MESSAGEBODY_0" | "myqueue" // case insensitive
40+
"myqueue_MessageBody_0" | "myqueue" // case insensitive
41+
42+
// Multiple digit indices
43+
"myqueue_messagebody_10" | "myqueue"
44+
"myqueue_messagebody_123" | "myqueue"
45+
46+
// Names that look similar but shouldn't be stripped
47+
"myqueue_messagebody" | "myqueue_messagebody" // no index
48+
"messagebody_0_queue" | "messagebody_0_queue" // not at end
49+
"myqueue_othersuffix_0" | "myqueue_othersuffix_0" // unknown suffix
50+
}
51+
52+
def "test getDestinationName with topic sanitizes Kafka Connect schema suffixes"() {
53+
given:
54+
def decorator = JMSDecorator.CONSUMER_DECORATE
55+
56+
when:
57+
def topic = Mock(Topic) {
58+
getTopicName() >> rawTopicName
59+
}
60+
def result = decorator.getDestinationName(topic)
61+
62+
then:
63+
result == expectedName
64+
65+
where:
66+
rawTopicName | expectedName
67+
"mytopic" | "mytopic"
68+
"mytopic_messagebody_0" | "mytopic"
69+
"mytopic_text_0" | "mytopic"
70+
}
71+
72+
def "test getDestinationName returns null for null queue name"() {
73+
given:
74+
def decorator = JMSDecorator.CONSUMER_DECORATE
75+
76+
when:
77+
def queue = Mock(Queue) {
78+
getQueueName() >> null
79+
}
80+
def result = decorator.getDestinationName(queue)
81+
82+
then:
83+
result == null
84+
}
85+
86+
def "test getDestinationName returns null for TIBCO temp prefix"() {
87+
given:
88+
def decorator = JMSDecorator.CONSUMER_DECORATE
89+
90+
when:
91+
def queue = Mock(Queue) {
92+
getQueueName() >> '$TMP$myqueue'
93+
}
94+
def result = decorator.getDestinationName(queue)
95+
96+
then:
97+
result == null
98+
}
99+
}

0 commit comments

Comments
 (0)