From c8d16d57eeb824a6f21aaa9934d10214e19d5bc0 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sat, 21 Mar 2026 12:17:42 -0700 Subject: [PATCH 1/3] Use new ConsumerProtocol classes in AdminClient --- kafka/admin/client.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/kafka/admin/client.py b/kafka/admin/client.py index 7ac809e06..11ef4c527 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -10,7 +10,9 @@ from kafka.admin.acl_resource import ACLOperation, ACLPermissionType, ACLFilter, ACL, ResourcePattern, ResourceType, \ ACLResourcePatternType, valid_acl_operations from kafka.client_async import KafkaClient, selectors -from kafka.coordinator.protocol import ConsumerProtocolMemberMetadata_v0, ConsumerProtocolMemberAssignment_v0, ConsumerProtocol_v0 +from kafka.protocol.new.consumer.metadata import ( + ConsumerProtocolSubscription, ConsumerProtocolAssignment, ConsumerProtocolType, +) import kafka.errors as Errors from kafka.errors import ( IncompatibleBrokerVersion, KafkaConfigurationError, UnknownTopicOrPartitionError, @@ -1232,7 +1234,7 @@ def _describe_consumer_groups_process_response(self, response): for (described_group_information, group_information_name, group_information_field) in zip(described_group, described_groups_field_schema.names, described_groups_field_schema.fields): if group_information_name == 'protocol_type': protocol_type = described_group_information - protocol_type_is_consumer = (protocol_type == ConsumerProtocol_v0.PROTOCOL_TYPE or not protocol_type) + protocol_type_is_consumer = (protocol_type == ConsumerProtocolType or not protocol_type) if isinstance(group_information_field, Array): member_information_list = [] member_schema = group_information_field.array_of @@ -1241,9 +1243,9 @@ def _describe_consumer_groups_process_response(self, response): for (member, member_field, member_name) in zip(members, member_schema.fields, member_schema.names): if protocol_type_is_consumer: if member_name == 'member_metadata' and member: - member_information.append(ConsumerProtocolMemberMetadata_v0.decode(member)) + member_information.append(ConsumerProtocolSubscription.decode(member)) elif member_name == 'member_assignment' and member: - member_information.append(ConsumerProtocolMemberAssignment_v0.decode(member)) + member_information.append(ConsumerProtocolAssignment.decode(member)) else: member_information.append(member) member_info_tuple = MemberInformation._make(member_information) From 481ae5d4c18bfc5fdbde6da47921b013d7d16aad Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 22 Mar 2026 08:46:51 -0700 Subject: [PATCH 2/3] Admin: new MetadataRequest/FindCoordinatorRequest --- kafka/admin/client.py | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/kafka/admin/client.py b/kafka/admin/client.py index 11ef4c527..db0cf9b11 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -24,8 +24,7 @@ ListGroupsRequest, DescribeGroupsRequest, DescribeAclsRequest, CreateAclsRequest, DeleteAclsRequest, DeleteGroupsRequest, DeleteRecordsRequest, DescribeLogDirsRequest, ElectLeadersRequest, ElectionType) from kafka.protocol.commit import OffsetFetchRequest -from kafka.protocol.find_coordinator import FindCoordinatorRequest -from kafka.protocol.metadata import MetadataRequest +from kafka.protocol.new.metadata import MetadataRequest, FindCoordinatorRequest from kafka.protocol.types import Array from kafka.structs import TopicPartition, OffsetAndMetadata, MemberInformation, GroupInformation from kafka.version import __version__ @@ -512,13 +511,9 @@ def get_response_errors(r): yield Errors.for_code(response[1]) return self._send_request_to_controller(request, get_errors_fn=get_response_errors, raise_errors=raise_errors) - def _process_metadata_response(self, metadata_response): - obj = metadata_response.to_object() + def _process_acl_operations(self, obj): if obj.get('authorized_operations', None) is not None: obj['authorized_operations'] = list(map(lambda acl: acl.name, valid_acl_operations(obj['authorized_operations']))) - for t in obj['topics']: - if t.get('authorized_operations', None) is not None: - t['authorized_operations'] = list(map(lambda acl: acl.name, valid_acl_operations(t['authorized_operations']))) return obj def _get_cluster_metadata(self, topics=None, auto_topic_creation=False): @@ -547,7 +542,11 @@ def _get_cluster_metadata(self, topics=None, auto_topic_creation=False): include_topic_authorized_operations=True, ) - return self._process_metadata_response(self.send_request(request)) + metadata = self.send_request(request).to_dict() + self._process_acl_operations(metadata) + for topic in metadata['topics']: + self._process_acl_operations(topic) + return metadata def list_topics(self): """Retrieve a list of all topic names in the cluster. From 04026b4dc9bbc69c3dc832056c70d7c32435fae4 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 22 Mar 2026 08:48:14 -0700 Subject: [PATCH 3/3] Admin: use new protocol apis --- kafka/admin/client.py | 36 ++++++++++++++++++++---------------- 1 file changed, 20 insertions(+), 16 deletions(-) diff --git a/kafka/admin/client.py b/kafka/admin/client.py index db0cf9b11..f325f6239 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -19,7 +19,7 @@ UnrecognizedBrokerVersion, IllegalArgumentError) from kafka.future import Future from kafka.metrics import MetricConfig, Metrics -from kafka.protocol.admin import ( +from kafka.protocol.new.admin import ( CreateTopicsRequest, DeleteTopicsRequest, DescribeConfigsRequest, AlterConfigsRequest, CreatePartitionsRequest, ListGroupsRequest, DescribeGroupsRequest, DescribeAclsRequest, CreateAclsRequest, DeleteAclsRequest, DeleteGroupsRequest, DeleteRecordsRequest, DescribeLogDirsRequest, ElectLeadersRequest, ElectionType) @@ -1150,7 +1150,7 @@ def delete_records(self, records_to_delete, timeout_ms=None, partition_leader_id timeout_ms=timeout_ms ) response = self.send_request(request, node_id=leader) - responses.append(response.to_object()) + responses.append(response.to_dict()) partition2result = {} partition2error = {} @@ -1224,29 +1224,33 @@ def _describe_consumer_groups_process_response(self, response): .format(response.API_VERSION)) assert len(response.groups) == 1 - for response_field, response_name in zip(response.SCHEMA.fields, response.SCHEMA.names): - if isinstance(response_field, Array): - described_groups_field_schema = response_field.array_of + for response_name, response_field in response.fields.items(): + if response_name == 'groups': described_group = getattr(response, response_name)[0] described_group_information_list = [] protocol_type_is_consumer = False - for (described_group_information, group_information_name, group_information_field) in zip(described_group, described_groups_field_schema.names, described_groups_field_schema.fields): + for group_information_name, group_information_field in response_field.fields.items(): + if not group_information_field.for_version_q(response.API_VERSION): + continue + described_group_information = getattr(described_group, group_information_name) if group_information_name == 'protocol_type': protocol_type = described_group_information protocol_type_is_consumer = (protocol_type == ConsumerProtocolType or not protocol_type) - if isinstance(group_information_field, Array): + if group_information_name == 'members': member_information_list = [] - member_schema = group_information_field.array_of - for members in described_group_information: + for member in described_group_information: member_information = [] - for (member, member_field, member_name) in zip(members, member_schema.fields, member_schema.names): + for attr_name, attr_field in group_information_field.fields.items(): + if not attr_field.for_version_q(response.API_VERSION): + continue + attr_val = getattr(member, attr_name) if protocol_type_is_consumer: - if member_name == 'member_metadata' and member: - member_information.append(ConsumerProtocolSubscription.decode(member)) - elif member_name == 'member_assignment' and member: - member_information.append(ConsumerProtocolAssignment.decode(member)) + if attr_name == 'member_metadata' and attr_val: + member_information.append(ConsumerProtocolSubscription.decode(attr_val)) + elif attr_name == 'member_assignment' and attr_val: + member_information.append(ConsumerProtocolAssignment.decode(attr_val)) else: - member_information.append(member) + member_information.append(attr_val) member_info_tuple = MemberInformation._make(member_information) member_information_list.append(member_info_tuple) described_group_information_list.append(member_information_list) @@ -1324,7 +1328,7 @@ def _list_consumer_groups_process_response(self, response): raise NotImplementedError( "Support for ListGroupsResponse_v{} has not yet been added to KafkaAdminClient." .format(response.API_VERSION)) - return response.groups + return [(group.group_id, group.protocol_type) for group in response.groups] def list_consumer_groups(self, broker_ids=None): """List all consumer groups known to the cluster.