Skip to content
36 changes: 18 additions & 18 deletions kafka/admin/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ def _find_coordinator_id_process_response(self, response):
raise error_type(
"FindCoordinatorRequest failed with response '{}'."
.format(response))
return response.coordinator_id
return response.node_id

def _find_coordinator_ids(self, group_ids):
"""Find the broker node_ids of the coordinators of the given groups.
Expand Down Expand Up @@ -471,13 +471,13 @@ def create_topics(self, new_topics, timeout_ms=None, validate_only=False, raise_
"validate_only requires CreateTopicsRequest >= v1, which is not supported by Kafka {}."
.format(self.config['api_version']))
request = CreateTopicsRequest[version](
create_topic_requests=[self._convert_new_topic_request(new_topic) for new_topic in new_topics],
timeout=timeout_ms
topics=[self._convert_new_topic_request(new_topic) for new_topic in new_topics],
timeout_ms=timeout_ms
)
elif version <= 3:
request = CreateTopicsRequest[version](
create_topic_requests=[self._convert_new_topic_request(new_topic) for new_topic in new_topics],
timeout=timeout_ms,
topics=[self._convert_new_topic_request(new_topic) for new_topic in new_topics],
timeout_ms=timeout_ms,
validate_only=validate_only
)
else:
Expand All @@ -504,7 +504,7 @@ def delete_topics(self, topics, timeout_ms=None, raise_errors=True):
"""
version = self._client.api_version(DeleteTopicsRequest, max_version=3)
timeout_ms = self._validate_timeout(timeout_ms)
request = DeleteTopicsRequest[version](topics=topics, timeout=timeout_ms)
request = DeleteTopicsRequest[version](topics=topics, timeout_ms=timeout_ms)
def get_response_errors(r):
for response in r.responses:
yield Errors.for_code(response[1])
Expand Down Expand Up @@ -640,19 +640,19 @@ def describe_acls(self, acl_filter):
version = self._client.api_version(DescribeAclsRequest, max_version=1)
if version == 0:
request = DescribeAclsRequest[version](
resource_type=acl_filter.resource_pattern.resource_type,
resource_name=acl_filter.resource_pattern.resource_name,
principal=acl_filter.principal,
host=acl_filter.host,
resource_type_filter=acl_filter.resource_pattern.resource_type,
resource_name_filter=acl_filter.resource_pattern.resource_name,
principal_filter=acl_filter.principal,
host_filter=acl_filter.host,
operation=acl_filter.operation,
permission_type=acl_filter.permission_type
)
elif version <= 1:
request = DescribeAclsRequest[version](
resource_type=acl_filter.resource_pattern.resource_type,
resource_name=acl_filter.resource_pattern.resource_name,
resource_type_filter=acl_filter.resource_pattern.resource_type,
resource_name_filter=acl_filter.resource_pattern.resource_name,
resource_pattern_type_filter=acl_filter.resource_pattern.pattern_type,
principal=acl_filter.principal,
principal_filter=acl_filter.principal,
host=acl_filter.host,
operation=acl_filter.operation,
permission_type=acl_filter.permission_type
Expand Down Expand Up @@ -727,7 +727,7 @@ def _convert_create_acls_response_to_acls(acls, create_response):

creations_error = []
creations_success = []
for i, creations in enumerate(create_response.creation_responses):
for i, creations in enumerate(create_response.results):
if version <= 1:
error_code, error_message = creations
acl = acls[i]
Expand Down Expand Up @@ -827,7 +827,7 @@ def _convert_delete_acls_response_to_matching_acls(acl_filters, delete_response)
"""
version = delete_response.API_VERSION
filter_result_list = []
for i, filter_responses in enumerate(delete_response.filter_responses):
for i, filter_responses in enumerate(delete_response.filter_results):
filter_error_code, filter_error_message, matching_acls = filter_responses
filter_error = Errors.for_code(filter_error_code)
acl_result_list = []
Expand Down Expand Up @@ -1054,8 +1054,8 @@ def create_partitions(self, topic_partitions, timeout_ms=None, validate_only=Fal
version = self._client.api_version(CreatePartitionsRequest, max_version=1)
timeout_ms = self._validate_timeout(timeout_ms)
request = CreatePartitionsRequest[version](
topic_partitions=[self._convert_create_partitions_request(topic_name, new_partitions) for topic_name, new_partitions in topic_partitions.items()],
timeout=timeout_ms,
topics=[self._convert_create_partitions_request(topic_name, new_partitions) for topic_name, new_partitions in topic_partitions.items()],
timeout_ms=timeout_ms,
validate_only=validate_only
)
def get_response_errors(r):
Expand Down Expand Up @@ -1594,7 +1594,7 @@ def perform_leader_election(self, election_type, topic_partitions=None, timeout_
request = ElectLeadersRequest[version](
election_type=ElectionType(election_type),
topic_partitions=self._get_topic_partitions(topic_partitions),
timeout=timeout_ms,
timeout_ms=timeout_ms,
)
# TODO convert structs to a more pythonic interface
def get_response_errors(r):
Expand Down
18 changes: 9 additions & 9 deletions kafka/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def __init__(self, **configs):
self._brokers = {} # node_id -> BrokerMetadata
self._partitions = {} # topic -> partition -> PartitionMetadata
self._broker_partitions = collections.defaultdict(set) # node_id -> {TopicPartition...}
self._coordinators = {} # (coord_type, coord_key) -> node_id
self._coordinators = {} # (key_type, key) -> node_id
self._last_refresh_ms = 0
self._last_successful_refresh_ms = 0
self._need_update = True
Expand Down Expand Up @@ -369,36 +369,36 @@ def remove_listener(self, listener):
"""Remove a previously added listener callback"""
self._listeners.remove(listener)

def add_coordinator(self, response, coord_type, coord_key):
def add_coordinator(self, response, key_type, key):
"""Update with metadata for a group or txn coordinator

Arguments:
response (FindCoordinatorResponse): broker response
coord_type (str): 'group' or 'transaction'
coord_key (str): consumer_group or transactional_id
key_type (str): 'group' or 'transaction'
key (str): consumer_group or transactional_id

Returns:
string: coordinator node_id if metadata is updated, None on error
"""
log.debug("Updating coordinator for %s/%s: %s", coord_type, coord_key, response)
log.debug("Updating coordinator for %s/%s: %s", key_type, key, response)
error_type = Errors.for_code(response.error_code)
if error_type is not Errors.NoError:
log.error("FindCoordinatorResponse error: %s", error_type)
self._coordinators[(coord_type, coord_key)] = -1
self._coordinators[(key_type, key)] = -1
return

# Use a coordinator-specific node id so that requests
# get a dedicated connection
node_id = 'coordinator-{}'.format(response.coordinator_id)
node_id = 'coordinator-{}'.format(response.node_id)
coordinator = BrokerMetadata(
node_id,
response.host,
response.port,
None)

log.info("Coordinator for %s/%s is %s", coord_type, coord_key, coordinator)
log.info("Coordinator for %s/%s is %s", key_type, key, coordinator)
self._coordinator_brokers[node_id] = coordinator
self._coordinators[(coord_type, coord_key)] = node_id
self._coordinators[(key_type, key)] = node_id
return node_id

def with_partitions(self, partitions_to_add):
Expand Down
8 changes: 4 additions & 4 deletions kafka/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -578,7 +578,7 @@ def _handle_api_versions_response(self, future, response):
future.failure(error_type())
if error_type is Errors.UnsupportedVersionError:
self._api_versions_idx -= 1
for api_version_data in response.api_versions:
for api_version_data in response.api_keys:
api_key, min_version, max_version = api_version_data[:3]
# If broker provides a lower max_version, skip to that
if api_key == response.API_KEY:
Expand All @@ -593,7 +593,7 @@ def _handle_api_versions_response(self, future, response):
return
self._api_versions = dict([
(api_version_data[0], (api_version_data[1], api_version_data[2]))
for api_version_data in response.api_versions
for api_version_data in response.api_keys
])
self._api_version = infer_broker_version_from_api_versions(self._api_versions)
log.info('%s: Broker version identified as %s', self, '.'.join(map(str, self._api_version)))
Expand Down Expand Up @@ -666,11 +666,11 @@ def _handle_sasl_handshake_response(self, future, response):
self.close(error=error)
return future.failure(error_type(self))

if self.config['sasl_mechanism'] not in response.enabled_mechanisms:
if self.config['sasl_mechanism'] not in response.mechanisms:
future.failure(
Errors.UnsupportedSaslMechanismError(
'Kafka broker does not support %s sasl mechanism. Enabled mechanisms are: %s'
% (self.config['sasl_mechanism'], response.enabled_mechanisms)))
% (self.config['sasl_mechanism'], response.mechanisms)))
else:
self._sasl_authenticate(future)

Expand Down
8 changes: 4 additions & 4 deletions kafka/consumer/fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -757,14 +757,14 @@ def _handle_fetch_response(self, node_id, fetch_offsets, send_time, response):
return

partitions = set([TopicPartition(topic, partition_data[0])
for topic, partitions in response.topics
for topic, partitions in response.responses
for partition_data in partitions])
if self._sensors:
metric_aggregator = FetchResponseMetricAggregator(self._sensors, partitions)
else:
metric_aggregator = None

for topic, partitions in response.topics:
for topic, partitions in response.responses:
for partition_data in partitions:
tp = TopicPartition(topic, partition_data[0])
fetch_offset = fetch_offsets[tp]
Expand Down Expand Up @@ -922,7 +922,7 @@ def __init__(self, fetch_offset, tp, records,
self.aborted_producer_ids = set()
self.aborted_transactions = collections.deque(
sorted([AbortedTransaction(*data) for data in aborted_transactions] if aborted_transactions else [],
key=lambda txn: txn.first_offset)
key=lambda txn: txn.first_offset)
)
self.metric_aggregator = metric_aggregator
self.check_crcs = check_crcs
Expand Down Expand Up @@ -1206,7 +1206,7 @@ def handle_error(self, _exception):

def _response_partitions(self, response):
return {TopicPartition(topic, partition_data[0])
for topic, partitions in response.topics
for topic, partitions in response.responses
for partition_data in partitions}


Expand Down
21 changes: 12 additions & 9 deletions kafka/coordinator/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -604,10 +604,10 @@ def _handle_join_group_response(self, future, send_time, response):
else:
self._generation = Generation(response.generation_id,
response.member_id,
response.group_protocol)
response.protocol_name)

log.info("Successfully joined group %s %s", self.group_id, self._generation)
if response.leader_id == response.member_id:
if response.leader == response.member_id:
log.info("Elected group leader -- performing partition"
" assignments using %s", self._generation.protocol)
self._on_join_leader(response).chain(future)
Expand Down Expand Up @@ -697,10 +697,13 @@ def _on_join_leader(self, response):
Future: resolves to member assignment encoded-bytes
"""
try:
members = [GroupMember(*member) if response.API_VERSION >= 5 else GroupMember(member[0], None, member[1])
members = [GroupMember(
member_id=member[0],
group_instance_id=member[1] if response.API_VERSION >= 5 else None,
metadata=member[2] if response.API_VERSION >= 5 else member[1])
for member in response.members]
group_assignment = self._perform_assignment(response.leader_id,
response.group_protocol,
group_assignment = self._perform_assignment(response.leader,
response.protocol_name,
members)
except Exception as e:
return Future().failure(e)
Expand Down Expand Up @@ -747,7 +750,7 @@ def _handle_sync_group_response(self, future, send_time, response):
if error_type is Errors.NoError:
if self._sensors:
self._sensors.sync_latency.record((time.monotonic() - send_time) * 1000)
future.success(response.member_assignment)
future.success(response.assignment)
return

# Always rejoin on error
Expand Down Expand Up @@ -802,12 +805,12 @@ def _send_group_coordinator_request(self):
self.group_id, node_id, request)
future = Future()
_f = self._client.send(node_id, request)
_f.add_callback(self._handle_group_coordinator_response, future)
_f.add_callback(self._handle_find_coordinator_response, future)
_f.add_errback(self._failed_request, node_id, request, future)
return future

def _handle_group_coordinator_response(self, future, response):
log.debug("Received group coordinator response %s", response)
def _handle_find_coordinator_response(self, future, response):
log.debug("Received find coordinator response %s", response)

error_type = Errors.for_code(response.error_code)
if error_type is Errors.NoError:
Expand Down
2 changes: 1 addition & 1 deletion kafka/coordinator/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ def _perform_assignment(self, leader_id, assignment_strategy, members):
all_subscribed_topics = set()
for member in members:
subscription = Subscription(
ConsumerProtocol[0].METADATA.decode(member.metadata_bytes),
ConsumerProtocol[0].METADATA.decode(member.metadata),
member.group_instance_id
)
member_subscriptions[member.member_id] = subscription
Expand Down
14 changes: 7 additions & 7 deletions kafka/producer/sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ def _handle_produce_response(self, node_id, send_time, batches, response):
batches_by_partition = dict([(batch.topic_partition, batch)
for batch in batches])

for topic, partitions in response.topics:
for topic, partitions in response.responses:
for partition_info in partitions:
log_append_time = -1
log_start_offset = -1
Expand Down Expand Up @@ -609,17 +609,17 @@ def _produce_request(self, node_id, acks, timeout, batches):
if version >= 3:
return ProduceRequest[version](
transactional_id=transactional_id,
required_acks=acks,
timeout=timeout,
topics=topic_partition_data,
acks=acks,
timeout_ms=timeout,
topic_data=topic_partition_data,
)
else:
if transactional_id is not None:
log.warning('%s: Broker does not support ProduceRequest v3+, required for transactional_id', str(self))
return ProduceRequest[version](
required_acks=acks,
timeout=timeout,
topics=topic_partition_data,
acks=acks,
timeout_ms=timeout,
topic_data=topic_partition_data,
)

def wakeup(self):
Expand Down
10 changes: 5 additions & 5 deletions kafka/producer/transaction_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -658,10 +658,10 @@ def __init__(self, transaction_manager, topic_partitions):
for tp in topic_partitions:
topic_data[tp.topic].append(tp.partition)
self.request = AddPartitionsToTxnRequest[version](
transactional_id=self.transactional_id,
producer_id=self.producer_id,
producer_epoch=self.producer_epoch,
topics=list(topic_data.items()))
v3_and_below_transactional_id=self.transactional_id,
v3_and_below_producer_id=self.producer_id,
v3_and_below_producer_epoch=self.producer_epoch,
v3_and_below_topics=list(topic_data.items()))

@property
def priority(self):
Expand All @@ -673,7 +673,7 @@ def handle_response(self, response):
self.retry_backoff_ms = self.transaction_manager.retry_backoff_ms

results = {TopicPartition(topic, partition): Errors.for_code(error_code)
for topic, partition_data in response.results
for topic, partition_data in response.results_by_topic_v3_and_below
for partition, error_code in partition_data}

for tp, error in results.items():
Expand Down
2 changes: 1 addition & 1 deletion kafka/protocol/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
DEFAULT_GENERATION_ID = -1
UNKNOWN_MEMBER_ID = ''

GroupMember = collections.namedtuple("GroupMember", ["member_id", "group_instance_id", "metadata_bytes"])
GroupMember = collections.namedtuple("GroupMember", ["member_id", "group_instance_id", "metadata"])
GroupMember.__new__.__defaults__ = (None,) * len(GroupMember._fields)


Expand Down
2 changes: 1 addition & 1 deletion test/integration/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -614,7 +614,7 @@ def _create_topic_via_admin_api(self, topic_name, num_partitions, replication_fa
request = CreateTopicsRequest[0]([(topic_name, num_partitions,
replication_factor, [], [])], timeout_ms)
response = self._send_request(request, timeout=timeout_ms)
for topic_result in response.topic_errors:
for topic_result in response.topics:
error_code = topic_result[1]
if error_code != 0:
raise errors.for_code(error_code)
Expand Down
Loading
Loading