from kafka.admin import KafkaAdminClient, ConfigResource, ConfigResourceType
admin = KafkaAdminClient(
bootstrap_servers=[...], # MSK SASL/SCRAM bootstrap brokers
security_protocol="SASL_SSL",
sasl_mechanism="SCRAM-SHA-512",
sasl_plain_username="...",
sasl_plain_password="...",
request_timeout_ms=3000000,
)
topics = admin.list_topics()
# Crashes here while ENCODING the DescribeConfigs v4 request:
admin.describe_configs(
[ConfigResource(ConfigResourceType.TOPIC, t) for t in topics]
)
Traceback (most recent call last):
File ".../describe_configs_caller.py", line N, in <module>
admin.describe_configs([ConfigResource(ConfigResourceType.TOPIC, t) for t in topics])
File "/usr/local/lib/python3.10/dist-packages/kafka/admin/_configs.py", line 189, in describe_configs
return self._manager.run(self._async_describe_configs, config_resources,
File "/usr/local/lib/python3.10/dist-packages/kafka/net/manager.py", line 468, in run
return self._net.run(coro, *args)
File "/usr/local/lib/python3.10/dist-packages/kafka/net/selector.py", line 332, in run
raise state['exception'] # pylint: disable=E0702
File "/usr/local/lib/python3.10/dist-packages/kafka/net/selector.py", line 316, in waiter
state['value'] = await self._invoke(coro, *args)
File "/usr/local/lib/python3.10/dist-packages/kafka/net/selector.py", line 402, in _invoke
result = await coro(*args)
File "/usr/local/lib/python3.10/dist-packages/kafka/admin/_configs.py", line 162, in _async_describe_configs
responses.append(await self._manager.send(request))
File "/usr/local/lib/python3.10/dist-packages/kafka/net/manager.py", line 319, in send
return conn.send_request(request, request_timeout_ms=request_timeout_ms)
File "/usr/local/lib/python3.10/dist-packages/kafka/net/connection.py", line 129, in send_request
self._send_request(request, future=future, timeout_at=timeout_at)
File "/usr/local/lib/python3.10/dist-packages/kafka/net/connection.py", line 149, in _send_request
correlation_id = self.parser.send_request(request)
File "/usr/local/lib/python3.10/dist-packages/kafka/protocol/parser.py", line 61, in send_request
data = request.encode(framed=True, header=True)
File "/usr/local/lib/python3.10/dist-packages/kafka/protocol/api_message.py", line 229, in encode
fast_encode(self, out)
File "<codegen:describe_configs_request_v4>", line 36, in _encode
IndexError: bytearray index out of range
Summary
KafkaAdminClient.describe_configs()raisesIndexError: bytearray index out of rangewhile encoding the
DescribeConfigsv4 request. The exception is thrown in the newcodegen/
fast_encodepath before anything is sent to the broker.This works on 2.3.1 and breaks on 3.0.4 with no application code change — the only
difference is the kafka-python version.
Environment
security_protocol="SASL_SSL",sasl_mechanism="SCRAM-SHA-512"Steps to reproduce
Traceback
Expected behavior
describe_configs()returns the topic configs (as it does on 2.3.1).