From 42b12f957c1010e85455c3e10643d188a0d47bd6 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sat, 21 Mar 2026 21:33:16 -0700 Subject: [PATCH 1/6] Update old MetadataResponse attrs; update Admin client --- kafka/admin/client.py | 6 +- kafka/protocol/metadata.py | 70 +++++++++++----------- test/integration/test_admin_integration.py | 4 +- 3 files changed, 40 insertions(+), 40 deletions(-) diff --git a/kafka/admin/client.py b/kafka/admin/client.py index 22ac70f1c..678f8311a 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -554,7 +554,7 @@ def list_topics(self): A list of topic name strings. """ metadata = self._get_cluster_metadata(topics=None) - return [t['topic'] for t in metadata['topics']] + return [t['name'] for t in metadata['topics']] def describe_topics(self, topics=None): """Fetch metadata for the specified topics or all topics if None. @@ -1089,9 +1089,9 @@ def _get_leader_for_partitions(self, partitions, timeout_ms=None): valid_partitions = set() for topic in metadata.get("topics", ()): for partition in topic.get("partitions", ()): - t2p = TopicPartition(topic=topic["topic"], partition=partition["partition"]) + t2p = TopicPartition(topic=topic["name"], partition=partition["partition_index"]) if t2p in partitions: - leader2partitions[partition["leader"]].append(t2p) + leader2partitions[partition["leader_id"]].append(t2p) valid_partitions.add(t2p) if len(partitions) != len(valid_partitions): diff --git a/kafka/protocol/metadata.py b/kafka/protocol/metadata.py index f3d588133..0fa6939ca 100644 --- a/kafka/protocol/metadata.py +++ b/kafka/protocol/metadata.py @@ -12,13 +12,13 @@ class MetadataResponse_v0(Response): ('port', Int32))), ('topics', Array( ('error_code', Int16), - ('topic', String('utf-8')), + ('name', String('utf-8')), ('partitions', Array( ('error_code', Int16), - ('partition', Int32), - ('leader', Int32), - ('replicas', Array(Int32)), - ('isr', Array(Int32)))))) + ('partition_index', Int32), + ('leader_id', Int32), + ('replica_nodes', Array(Int32)), + ('isr_nodes', Array(Int32)))))) ) @@ -34,14 +34,14 @@ class MetadataResponse_v1(Response): ('controller_id', Int32), ('topics', Array( ('error_code', Int16), - ('topic', String('utf-8')), + ('name', String('utf-8')), ('is_internal', Boolean), ('partitions', Array( ('error_code', Int16), - ('partition', Int32), - ('leader', Int32), - ('replicas', Array(Int32)), - ('isr', Array(Int32)))))) + ('partition_index', Int32), + ('leader_id', Int32), + ('replica_nodes', Array(Int32)), + ('isr_nodes', Array(Int32)))))) ) @@ -58,14 +58,14 @@ class MetadataResponse_v2(Response): ('controller_id', Int32), ('topics', Array( ('error_code', Int16), - ('topic', String('utf-8')), + ('name', String('utf-8')), ('is_internal', Boolean), ('partitions', Array( ('error_code', Int16), - ('partition', Int32), - ('leader', Int32), - ('replicas', Array(Int32)), - ('isr', Array(Int32)))))) + ('partition_index', Int32), + ('leader_id', Int32), + ('replica_nodes', Array(Int32)), + ('isr_nodes', Array(Int32)))))) ) @@ -83,14 +83,14 @@ class MetadataResponse_v3(Response): ('controller_id', Int32), ('topics', Array( ('error_code', Int16), - ('topic', String('utf-8')), + ('name', String('utf-8')), ('is_internal', Boolean), ('partitions', Array( ('error_code', Int16), - ('partition', Int32), - ('leader', Int32), - ('replicas', Array(Int32)), - ('isr', Array(Int32)))))) + ('partition_index', Int32), + ('leader_id', Int32), + ('replica_nodes', Array(Int32)), + ('isr_nodes', Array(Int32)))))) ) @@ -114,14 +114,14 @@ class MetadataResponse_v5(Response): ('controller_id', Int32), ('topics', Array( ('error_code', Int16), - ('topic', String('utf-8')), + ('name', String('utf-8')), ('is_internal', Boolean), ('partitions', Array( ('error_code', Int16), - ('partition', Int32), - ('leader', Int32), - ('replicas', Array(Int32)), - ('isr', Array(Int32)), + ('partition_index', Int32), + ('leader_id', Int32), + ('replica_nodes', Array(Int32)), + ('isr_nodes', Array(Int32)), ('offline_replicas', Array(Int32)))))) ) @@ -149,15 +149,15 @@ class MetadataResponse_v7(Response): ('controller_id', Int32), ('topics', Array( ('error_code', Int16), - ('topic', String('utf-8')), + ('name', String('utf-8')), ('is_internal', Boolean), ('partitions', Array( ('error_code', Int16), - ('partition', Int32), - ('leader', Int32), + ('partition_index', Int32), + ('leader_id', Int32), ('leader_epoch', Int32), - ('replicas', Array(Int32)), - ('isr', Array(Int32)), + ('replica_nodes', Array(Int32)), + ('isr_nodes', Array(Int32)), ('offline_replicas', Array(Int32)))))) ) @@ -177,15 +177,15 @@ class MetadataResponse_v8(Response): ('controller_id', Int32), ('topics', Array( ('error_code', Int16), - ('topic', String('utf-8')), + ('name', String('utf-8')), ('is_internal', Boolean), ('partitions', Array( ('error_code', Int16), - ('partition', Int32), - ('leader', Int32), + ('partition_index', Int32), + ('leader_id', Int32), ('leader_epoch', Int32), - ('replicas', Array(Int32)), - ('isr', Array(Int32)), + ('replica_nodes', Array(Int32)), + ('isr_nodes', Array(Int32)), ('offline_replicas', Array(Int32)))), ('authorized_operations', BitField))), ('authorized_operations', BitField) diff --git a/test/integration/test_admin_integration.py b/test/integration/test_admin_integration.py index 5a3a83d2b..c778b2426 100644 --- a/test/integration/test_admin_integration.py +++ b/test/integration/test_admin_integration.py @@ -406,8 +406,8 @@ def test_create_delete_topics(kafka_admin_client): @pytest.mark.skipif(env_kafka_version() < (2, 2), reason="Leader Election requires broker >=2.2") def test_perform_leader_election(kafka_admin_client, topic): topic_metadata = kafka_admin_client.describe_topics([topic])[0] - assert topic_metadata['topic'] == topic - partitions = list(map(lambda p: p['partition'], topic_metadata['partitions'])) + assert topic_metadata['name'] == topic + partitions = list(map(lambda p: p['partition_index'], topic_metadata['partitions'])) election_type = 0 # Preferred topic_partitions = {topic: partitions} # When Leader Election is not needed (cluster is stable), error 84 is returned From e2d3805b4881407a76c0b538ca887d0a5c29fff2 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sat, 21 Mar 2026 21:33:50 -0700 Subject: [PATCH 2/6] Admin: fixup DescribeAclRequest new attrs --- kafka/admin/client.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/kafka/admin/client.py b/kafka/admin/client.py index 678f8311a..697138a0e 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -651,12 +651,11 @@ def describe_acls(self, acl_filter): request = DescribeAclsRequest[version]( resource_type_filter=acl_filter.resource_pattern.resource_type, resource_name_filter=acl_filter.resource_pattern.resource_name, - resource_pattern_type_filter=acl_filter.resource_pattern.pattern_type, + pattern_type_filter=acl_filter.resource_pattern.pattern_type, principal_filter=acl_filter.principal, - host=acl_filter.host, + host_filter=acl_filter.host, operation=acl_filter.operation, permission_type=acl_filter.permission_type - ) response = self.send_request(request) # pylint: disable=E0606 error_type = Errors.for_code(response.error_code) From 862d892b88c7536f17a57dd5329f0837b601c058 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sat, 21 Mar 2026 20:33:40 -0700 Subject: [PATCH 3/6] Fixup DeleteGroupsRequest new attr --- kafka/admin/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/admin/client.py b/kafka/admin/client.py index 697138a0e..7ac809e06 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -504,7 +504,7 @@ def delete_topics(self, topics, timeout_ms=None, raise_errors=True): """ version = self._client.api_version(DeleteTopicsRequest, max_version=3) timeout_ms = self._validate_timeout(timeout_ms) - request = DeleteTopicsRequest[version](topics=topics, timeout_ms=timeout_ms) + request = DeleteTopicsRequest[version](topic_names=topics, timeout_ms=timeout_ms) def get_response_errors(r): for response in r.responses: yield Errors.for_code(response[1]) From 4b002965189bdfb0b3a0eab8f20878490608c2ff Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 22 Mar 2026 09:48:36 -0700 Subject: [PATCH 4/6] Fixup test_object_conversion --- test/test_object_conversion.py | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/test/test_object_conversion.py b/test/test_object_conversion.py index bd2c51b47..0a04a098d 100644 --- a/test/test_object_conversion.py +++ b/test/test_object_conversion.py @@ -197,31 +197,31 @@ def test_with_metadata_response(): assert len(obj['topics']) == 2 assert obj['topics'][0]['error_code'] == 0 - assert obj['topics'][0]['topic'] == 'testtopic1' + assert obj['topics'][0]['name'] == 'testtopic1' assert obj['topics'][0]['is_internal'] is False assert len(obj['topics'][0]['partitions']) == 2 assert obj['topics'][0]['partitions'][0]['error_code'] == 0 - assert obj['topics'][0]['partitions'][0]['partition'] == 0 - assert obj['topics'][0]['partitions'][0]['leader'] == 0 - assert obj['topics'][0]['partitions'][0]['replicas'] == [0, 1] - assert obj['topics'][0]['partitions'][0]['isr'] == [0, 1] + assert obj['topics'][0]['partitions'][0]['partition_index'] == 0 + assert obj['topics'][0]['partitions'][0]['leader_id'] == 0 + assert obj['topics'][0]['partitions'][0]['replica_nodes'] == [0, 1] + assert obj['topics'][0]['partitions'][0]['isr_nodes'] == [0, 1] assert obj['topics'][0]['partitions'][0]['offline_replicas'] == [] assert obj['topics'][0]['partitions'][1]['error_code'] == 0 - assert obj['topics'][0]['partitions'][1]['partition'] == 1 - assert obj['topics'][0]['partitions'][1]['leader'] == 1 - assert obj['topics'][0]['partitions'][1]['replicas'] == [1, 0] - assert obj['topics'][0]['partitions'][1]['isr'] == [1, 0] + assert obj['topics'][0]['partitions'][1]['partition_index'] == 1 + assert obj['topics'][0]['partitions'][1]['leader_id'] == 1 + assert obj['topics'][0]['partitions'][1]['replica_nodes'] == [1, 0] + assert obj['topics'][0]['partitions'][1]['isr_nodes'] == [1, 0] assert obj['topics'][0]['partitions'][1]['offline_replicas'] == [] assert obj['topics'][1]['error_code'] == 0 - assert obj['topics'][1]['topic'] == 'other-test-topic' + assert obj['topics'][1]['name'] == 'other-test-topic' assert obj['topics'][1]['is_internal'] is True assert len(obj['topics'][1]['partitions']) == 1 assert obj['topics'][1]['partitions'][0]['error_code'] == 0 - assert obj['topics'][1]['partitions'][0]['partition'] == 0 - assert obj['topics'][1]['partitions'][0]['leader'] == 0 - assert obj['topics'][1]['partitions'][0]['replicas'] == [0, 1] - assert obj['topics'][1]['partitions'][0]['isr'] == [0, 1] + assert obj['topics'][1]['partitions'][0]['partition_index'] == 0 + assert obj['topics'][1]['partitions'][0]['leader_id'] == 0 + assert obj['topics'][1]['partitions'][0]['replica_nodes'] == [0, 1] + assert obj['topics'][1]['partitions'][0]['isr_nodes'] == [0, 1] assert obj['topics'][1]['partitions'][0]['offline_replicas'] == [] tc.encode() From 61f8940feffc804e33c18a387b14163489a800e9 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 22 Mar 2026 09:50:36 -0700 Subject: [PATCH 5/6] Rename test_to_object; fixup imports --- .../test_to_object.py} | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) rename test/{test_object_conversion.py => protocol/test_to_object.py} (96%) diff --git a/test/test_object_conversion.py b/test/protocol/test_to_object.py similarity index 96% rename from test/test_object_conversion.py rename to test/protocol/test_to_object.py index 0a04a098d..602c014d6 100644 --- a/test/test_object_conversion.py +++ b/test/protocol/test_to_object.py @@ -1,9 +1,6 @@ -from kafka.protocol.admin import Request -from kafka.protocol.admin import Response -from kafka.protocol.types import Schema -from kafka.protocol.types import Array -from kafka.protocol.types import Int16 -from kafka.protocol.types import String +from kafka.protocol.api import Request +from kafka.protocol.api import Response +from kafka.protocol.types import Schema, Array, Int16, String import pytest From 2b1b66ca2f268e00a88508d5486026a2b1c0412c Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 22 Mar 2026 09:53:23 -0700 Subject: [PATCH 6/6] Fixup tests ConsumerProtocolAssignment assigned_partitions --- test/integration/test_admin_integration.py | 2 +- test/test_assignors.py | 36 +++++++++++----------- 2 files changed, 19 insertions(+), 19 deletions(-) diff --git a/test/integration/test_admin_integration.py b/test/integration/test_admin_integration.py index c778b2426..ca2d79f2e 100644 --- a/test/integration/test_admin_integration.py +++ b/test/integration/test_admin_integration.py @@ -238,7 +238,7 @@ def consumer_thread(i, group_id): assert(len(consumer_group.members) == 1) for member in consumer_group.members: assert(member.member_metadata.topics[0] == topic) - assert(member.member_assignment.assignment[0][0] == topic) + assert(member.member_assignment.assigned_partitions[0][0] == topic) consumer_groups.add(consumer_group.group) assert(sorted(list(consumer_groups)) == group_id_list) finally: diff --git a/test/test_assignors.py b/test/test_assignors.py index 1056a7c1a..c04d6afb6 100644 --- a/test/test_assignors.py +++ b/test/test_assignors.py @@ -341,7 +341,7 @@ def test_sticky_add_remove_consumer_one_topic(mocker): assignment = StickyPartitionAssignor.assign(cluster, member_metadata) verify_validity_and_balance(subscriptions, assignment) - assert len(assignment['C2'].assignment[0][1]) == 3 + assert len(assignment['C2'].assigned_partitions[0][1]) == 3 def test_sticky_add_remove_topic_two_consumers(mocker): @@ -693,9 +693,9 @@ def test_assignment_with_multiple_generations1(mocker): assignment1 = StickyPartitionAssignor.assign(cluster, member_metadata) verify_validity_and_balance({'C1': {'t'}, 'C2': {'t'}, 'C3': {'t'}}, assignment1) - assert len(assignment1['C1'].assignment[0][1]) == 2 - assert len(assignment1['C2'].assignment[0][1]) == 2 - assert len(assignment1['C3'].assignment[0][1]) == 2 + assert len(assignment1['C1'].assigned_partitions[0][1]) == 2 + assert len(assignment1['C2'].assigned_partitions[0][1]) == 2 + assert len(assignment1['C3'].assigned_partitions[0][1]) == 2 member_metadata = { 'C1': StickyPartitionAssignor._metadata({'t'}, assignment1['C1'].partitions()), @@ -704,10 +704,10 @@ def test_assignment_with_multiple_generations1(mocker): assignment2 = StickyPartitionAssignor.assign(cluster, member_metadata) verify_validity_and_balance({'C1': {'t'}, 'C2': {'t'}}, assignment2) - assert len(assignment2['C1'].assignment[0][1]) == 3 - assert len(assignment2['C2'].assignment[0][1]) == 3 - assert all([partition in assignment2['C1'].assignment[0][1] for partition in assignment1['C1'].assignment[0][1]]) - assert all([partition in assignment2['C2'].assignment[0][1] for partition in assignment1['C2'].assignment[0][1]]) + assert len(assignment2['C1'].assigned_partitions[0][1]) == 3 + assert len(assignment2['C2'].assigned_partitions[0][1]) == 3 + assert all([partition in assignment2['C1'].assigned_partitions[0][1] for partition in assignment1['C1'].assigned_partitions[0][1]]) + assert all([partition in assignment2['C2'].assigned_partitions[0][1] for partition in assignment1['C2'].assigned_partitions[0][1]]) assert StickyPartitionAssignor._latest_partition_movements.are_sticky() member_metadata = { @@ -717,8 +717,8 @@ def test_assignment_with_multiple_generations1(mocker): assignment3 = StickyPartitionAssignor.assign(cluster, member_metadata) verify_validity_and_balance({'C2': {'t'}, 'C3': {'t'}}, assignment3) - assert len(assignment3['C2'].assignment[0][1]) == 3 - assert len(assignment3['C3'].assignment[0][1]) == 3 + assert len(assignment3['C2'].assigned_partitions[0][1]) == 3 + assert len(assignment3['C3'].assigned_partitions[0][1]) == 3 assert StickyPartitionAssignor._latest_partition_movements.are_sticky() @@ -733,9 +733,9 @@ def test_assignment_with_multiple_generations2(mocker): assignment1 = StickyPartitionAssignor.assign(cluster, member_metadata) verify_validity_and_balance({'C1': {'t'}, 'C2': {'t'}, 'C3': {'t'}}, assignment1) - assert len(assignment1['C1'].assignment[0][1]) == 2 - assert len(assignment1['C2'].assignment[0][1]) == 2 - assert len(assignment1['C3'].assignment[0][1]) == 2 + assert len(assignment1['C1'].assigned_partitions[0][1]) == 2 + assert len(assignment1['C2'].assigned_partitions[0][1]) == 2 + assert len(assignment1['C3'].assigned_partitions[0][1]) == 2 member_metadata = { 'C2': StickyPartitionAssignor._metadata({'t'}, assignment1['C2'].partitions(), 1), @@ -743,8 +743,8 @@ def test_assignment_with_multiple_generations2(mocker): assignment2 = StickyPartitionAssignor.assign(cluster, member_metadata) verify_validity_and_balance({'C2': {'t'}}, assignment2) - assert len(assignment2['C2'].assignment[0][1]) == 6 - assert all([partition in assignment2['C2'].assignment[0][1] for partition in assignment1['C2'].assignment[0][1]]) + assert len(assignment2['C2'].assigned_partitions[0][1]) == 6 + assert all([partition in assignment2['C2'].assigned_partitions[0][1] for partition in assignment1['C2'].assigned_partitions[0][1]]) assert StickyPartitionAssignor._latest_partition_movements.are_sticky() member_metadata = { @@ -756,9 +756,9 @@ def test_assignment_with_multiple_generations2(mocker): assignment3 = StickyPartitionAssignor.assign(cluster, member_metadata) verify_validity_and_balance({'C1': {'t'}, 'C2': {'t'}, 'C3': {'t'}}, assignment3) assert StickyPartitionAssignor._latest_partition_movements.are_sticky() - assert set(assignment3['C1'].assignment[0][1]) == set(assignment1['C1'].assignment[0][1]) - assert set(assignment3['C2'].assignment[0][1]) == set(assignment1['C2'].assignment[0][1]) - assert set(assignment3['C3'].assignment[0][1]) == set(assignment1['C3'].assignment[0][1]) + assert set(assignment3['C1'].assigned_partitions[0][1]) == set(assignment1['C1'].assigned_partitions[0][1]) + assert set(assignment3['C2'].assigned_partitions[0][1]) == set(assignment1['C2'].assigned_partitions[0][1]) + assert set(assignment3['C3'].assigned_partitions[0][1]) == set(assignment1['C3'].assigned_partitions[0][1]) @pytest.mark.parametrize('execution_number', range(50))