Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 31 additions & 26 deletions kafka/admin/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,21 @@
from kafka.admin.acl_resource import ACLOperation, ACLPermissionType, ACLFilter, ACL, ResourcePattern, ResourceType, \
ACLResourcePatternType, valid_acl_operations
from kafka.client_async import KafkaClient, selectors
from kafka.coordinator.protocol import ConsumerProtocolMemberMetadata_v0, ConsumerProtocolMemberAssignment_v0, ConsumerProtocol_v0
from kafka.protocol.new.consumer.metadata import (
ConsumerProtocolSubscription, ConsumerProtocolAssignment, ConsumerProtocolType,
)
import kafka.errors as Errors
from kafka.errors import (
IncompatibleBrokerVersion, KafkaConfigurationError, UnknownTopicOrPartitionError,
UnrecognizedBrokerVersion, IllegalArgumentError)
from kafka.future import Future
from kafka.metrics import MetricConfig, Metrics
from kafka.protocol.admin import (
from kafka.protocol.new.admin import (
CreateTopicsRequest, DeleteTopicsRequest, DescribeConfigsRequest, AlterConfigsRequest, CreatePartitionsRequest,
ListGroupsRequest, DescribeGroupsRequest, DescribeAclsRequest, CreateAclsRequest, DeleteAclsRequest,
DeleteGroupsRequest, DeleteRecordsRequest, DescribeLogDirsRequest, ElectLeadersRequest, ElectionType)
from kafka.protocol.commit import OffsetFetchRequest
from kafka.protocol.find_coordinator import FindCoordinatorRequest
from kafka.protocol.metadata import MetadataRequest
from kafka.protocol.new.metadata import MetadataRequest, FindCoordinatorRequest
from kafka.protocol.types import Array
from kafka.structs import TopicPartition, OffsetAndMetadata, MemberInformation, GroupInformation
from kafka.version import __version__
Expand Down Expand Up @@ -510,13 +511,9 @@ def get_response_errors(r):
yield Errors.for_code(response[1])
return self._send_request_to_controller(request, get_errors_fn=get_response_errors, raise_errors=raise_errors)

def _process_metadata_response(self, metadata_response):
obj = metadata_response.to_object()
def _process_acl_operations(self, obj):
if obj.get('authorized_operations', None) is not None:
obj['authorized_operations'] = list(map(lambda acl: acl.name, valid_acl_operations(obj['authorized_operations'])))
for t in obj['topics']:
if t.get('authorized_operations', None) is not None:
t['authorized_operations'] = list(map(lambda acl: acl.name, valid_acl_operations(t['authorized_operations'])))
return obj

def _get_cluster_metadata(self, topics=None, auto_topic_creation=False):
Expand Down Expand Up @@ -545,7 +542,11 @@ def _get_cluster_metadata(self, topics=None, auto_topic_creation=False):
include_topic_authorized_operations=True,
)

return self._process_metadata_response(self.send_request(request))
metadata = self.send_request(request).to_dict()
self._process_acl_operations(metadata)
for topic in metadata['topics']:
self._process_acl_operations(topic)
return metadata

def list_topics(self):
"""Retrieve a list of all topic names in the cluster.
Expand Down Expand Up @@ -1149,7 +1150,7 @@ def delete_records(self, records_to_delete, timeout_ms=None, partition_leader_id
timeout_ms=timeout_ms
)
response = self.send_request(request, node_id=leader)
responses.append(response.to_object())
responses.append(response.to_dict())

partition2result = {}
partition2error = {}
Expand Down Expand Up @@ -1223,29 +1224,33 @@ def _describe_consumer_groups_process_response(self, response):
.format(response.API_VERSION))

assert len(response.groups) == 1
for response_field, response_name in zip(response.SCHEMA.fields, response.SCHEMA.names):
if isinstance(response_field, Array):
described_groups_field_schema = response_field.array_of
for response_name, response_field in response.fields.items():
if response_name == 'groups':
described_group = getattr(response, response_name)[0]
described_group_information_list = []
protocol_type_is_consumer = False
for (described_group_information, group_information_name, group_information_field) in zip(described_group, described_groups_field_schema.names, described_groups_field_schema.fields):
for group_information_name, group_information_field in response_field.fields.items():
if not group_information_field.for_version_q(response.API_VERSION):
continue
described_group_information = getattr(described_group, group_information_name)
if group_information_name == 'protocol_type':
protocol_type = described_group_information
protocol_type_is_consumer = (protocol_type == ConsumerProtocol_v0.PROTOCOL_TYPE or not protocol_type)
if isinstance(group_information_field, Array):
protocol_type_is_consumer = (protocol_type == ConsumerProtocolType or not protocol_type)
if group_information_name == 'members':
member_information_list = []
member_schema = group_information_field.array_of
for members in described_group_information:
for member in described_group_information:
member_information = []
for (member, member_field, member_name) in zip(members, member_schema.fields, member_schema.names):
for attr_name, attr_field in group_information_field.fields.items():
if not attr_field.for_version_q(response.API_VERSION):
continue
attr_val = getattr(member, attr_name)
if protocol_type_is_consumer:
if member_name == 'member_metadata' and member:
member_information.append(ConsumerProtocolMemberMetadata_v0.decode(member))
elif member_name == 'member_assignment' and member:
member_information.append(ConsumerProtocolMemberAssignment_v0.decode(member))
if attr_name == 'member_metadata' and attr_val:
member_information.append(ConsumerProtocolSubscription.decode(attr_val))
elif attr_name == 'member_assignment' and attr_val:
member_information.append(ConsumerProtocolAssignment.decode(attr_val))
else:
member_information.append(member)
member_information.append(attr_val)
member_info_tuple = MemberInformation._make(member_information)
member_information_list.append(member_info_tuple)
described_group_information_list.append(member_information_list)
Expand Down Expand Up @@ -1323,7 +1328,7 @@ def _list_consumer_groups_process_response(self, response):
raise NotImplementedError(
"Support for ListGroupsResponse_v{} has not yet been added to KafkaAdminClient."
.format(response.API_VERSION))
return response.groups
return [(group.group_id, group.protocol_type) for group in response.groups]

def list_consumer_groups(self, broker_ids=None):
"""List all consumer groups known to the cluster.
Expand Down
Loading