diff --git a/kafka/admin/client.py b/kafka/admin/client.py index 025ec11ce..22ac70f1c 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -308,7 +308,7 @@ def _find_coordinator_id_process_response(self, response): raise error_type( "FindCoordinatorRequest failed with response '{}'." .format(response)) - return response.coordinator_id + return response.node_id def _find_coordinator_ids(self, group_ids): """Find the broker node_ids of the coordinators of the given groups. @@ -471,13 +471,13 @@ def create_topics(self, new_topics, timeout_ms=None, validate_only=False, raise_ "validate_only requires CreateTopicsRequest >= v1, which is not supported by Kafka {}." .format(self.config['api_version'])) request = CreateTopicsRequest[version]( - create_topic_requests=[self._convert_new_topic_request(new_topic) for new_topic in new_topics], - timeout=timeout_ms + topics=[self._convert_new_topic_request(new_topic) for new_topic in new_topics], + timeout_ms=timeout_ms ) elif version <= 3: request = CreateTopicsRequest[version]( - create_topic_requests=[self._convert_new_topic_request(new_topic) for new_topic in new_topics], - timeout=timeout_ms, + topics=[self._convert_new_topic_request(new_topic) for new_topic in new_topics], + timeout_ms=timeout_ms, validate_only=validate_only ) else: @@ -504,7 +504,7 @@ def delete_topics(self, topics, timeout_ms=None, raise_errors=True): """ version = self._client.api_version(DeleteTopicsRequest, max_version=3) timeout_ms = self._validate_timeout(timeout_ms) - request = DeleteTopicsRequest[version](topics=topics, timeout=timeout_ms) + request = DeleteTopicsRequest[version](topics=topics, timeout_ms=timeout_ms) def get_response_errors(r): for response in r.responses: yield Errors.for_code(response[1]) @@ -640,19 +640,19 @@ def describe_acls(self, acl_filter): version = self._client.api_version(DescribeAclsRequest, max_version=1) if version == 0: request = DescribeAclsRequest[version]( - resource_type=acl_filter.resource_pattern.resource_type, - resource_name=acl_filter.resource_pattern.resource_name, - principal=acl_filter.principal, - host=acl_filter.host, + resource_type_filter=acl_filter.resource_pattern.resource_type, + resource_name_filter=acl_filter.resource_pattern.resource_name, + principal_filter=acl_filter.principal, + host_filter=acl_filter.host, operation=acl_filter.operation, permission_type=acl_filter.permission_type ) elif version <= 1: request = DescribeAclsRequest[version]( - resource_type=acl_filter.resource_pattern.resource_type, - resource_name=acl_filter.resource_pattern.resource_name, + resource_type_filter=acl_filter.resource_pattern.resource_type, + resource_name_filter=acl_filter.resource_pattern.resource_name, resource_pattern_type_filter=acl_filter.resource_pattern.pattern_type, - principal=acl_filter.principal, + principal_filter=acl_filter.principal, host=acl_filter.host, operation=acl_filter.operation, permission_type=acl_filter.permission_type @@ -727,7 +727,7 @@ def _convert_create_acls_response_to_acls(acls, create_response): creations_error = [] creations_success = [] - for i, creations in enumerate(create_response.creation_responses): + for i, creations in enumerate(create_response.results): if version <= 1: error_code, error_message = creations acl = acls[i] @@ -827,7 +827,7 @@ def _convert_delete_acls_response_to_matching_acls(acl_filters, delete_response) """ version = delete_response.API_VERSION filter_result_list = [] - for i, filter_responses in enumerate(delete_response.filter_responses): + for i, filter_responses in enumerate(delete_response.filter_results): filter_error_code, filter_error_message, matching_acls = filter_responses filter_error = Errors.for_code(filter_error_code) acl_result_list = [] @@ -1054,8 +1054,8 @@ def create_partitions(self, topic_partitions, timeout_ms=None, validate_only=Fal version = self._client.api_version(CreatePartitionsRequest, max_version=1) timeout_ms = self._validate_timeout(timeout_ms) request = CreatePartitionsRequest[version]( - topic_partitions=[self._convert_create_partitions_request(topic_name, new_partitions) for topic_name, new_partitions in topic_partitions.items()], - timeout=timeout_ms, + topics=[self._convert_create_partitions_request(topic_name, new_partitions) for topic_name, new_partitions in topic_partitions.items()], + timeout_ms=timeout_ms, validate_only=validate_only ) def get_response_errors(r): @@ -1594,7 +1594,7 @@ def perform_leader_election(self, election_type, topic_partitions=None, timeout_ request = ElectLeadersRequest[version]( election_type=ElectionType(election_type), topic_partitions=self._get_topic_partitions(topic_partitions), - timeout=timeout_ms, + timeout_ms=timeout_ms, ) # TODO convert structs to a more pythonic interface def get_response_errors(r): diff --git a/kafka/cluster.py b/kafka/cluster.py index 98ddd23da..0f37d3936 100644 --- a/kafka/cluster.py +++ b/kafka/cluster.py @@ -45,7 +45,7 @@ def __init__(self, **configs): self._brokers = {} # node_id -> BrokerMetadata self._partitions = {} # topic -> partition -> PartitionMetadata self._broker_partitions = collections.defaultdict(set) # node_id -> {TopicPartition...} - self._coordinators = {} # (coord_type, coord_key) -> node_id + self._coordinators = {} # (key_type, key) -> node_id self._last_refresh_ms = 0 self._last_successful_refresh_ms = 0 self._need_update = True @@ -369,36 +369,36 @@ def remove_listener(self, listener): """Remove a previously added listener callback""" self._listeners.remove(listener) - def add_coordinator(self, response, coord_type, coord_key): + def add_coordinator(self, response, key_type, key): """Update with metadata for a group or txn coordinator Arguments: response (FindCoordinatorResponse): broker response - coord_type (str): 'group' or 'transaction' - coord_key (str): consumer_group or transactional_id + key_type (str): 'group' or 'transaction' + key (str): consumer_group or transactional_id Returns: string: coordinator node_id if metadata is updated, None on error """ - log.debug("Updating coordinator for %s/%s: %s", coord_type, coord_key, response) + log.debug("Updating coordinator for %s/%s: %s", key_type, key, response) error_type = Errors.for_code(response.error_code) if error_type is not Errors.NoError: log.error("FindCoordinatorResponse error: %s", error_type) - self._coordinators[(coord_type, coord_key)] = -1 + self._coordinators[(key_type, key)] = -1 return # Use a coordinator-specific node id so that requests # get a dedicated connection - node_id = 'coordinator-{}'.format(response.coordinator_id) + node_id = 'coordinator-{}'.format(response.node_id) coordinator = BrokerMetadata( node_id, response.host, response.port, None) - log.info("Coordinator for %s/%s is %s", coord_type, coord_key, coordinator) + log.info("Coordinator for %s/%s is %s", key_type, key, coordinator) self._coordinator_brokers[node_id] = coordinator - self._coordinators[(coord_type, coord_key)] = node_id + self._coordinators[(key_type, key)] = node_id return node_id def with_partitions(self, partitions_to_add): diff --git a/kafka/conn.py b/kafka/conn.py index 4ffb8e850..ed11e7099 100755 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -578,7 +578,7 @@ def _handle_api_versions_response(self, future, response): future.failure(error_type()) if error_type is Errors.UnsupportedVersionError: self._api_versions_idx -= 1 - for api_version_data in response.api_versions: + for api_version_data in response.api_keys: api_key, min_version, max_version = api_version_data[:3] # If broker provides a lower max_version, skip to that if api_key == response.API_KEY: @@ -593,7 +593,7 @@ def _handle_api_versions_response(self, future, response): return self._api_versions = dict([ (api_version_data[0], (api_version_data[1], api_version_data[2])) - for api_version_data in response.api_versions + for api_version_data in response.api_keys ]) self._api_version = infer_broker_version_from_api_versions(self._api_versions) log.info('%s: Broker version identified as %s', self, '.'.join(map(str, self._api_version))) @@ -666,11 +666,11 @@ def _handle_sasl_handshake_response(self, future, response): self.close(error=error) return future.failure(error_type(self)) - if self.config['sasl_mechanism'] not in response.enabled_mechanisms: + if self.config['sasl_mechanism'] not in response.mechanisms: future.failure( Errors.UnsupportedSaslMechanismError( 'Kafka broker does not support %s sasl mechanism. Enabled mechanisms are: %s' - % (self.config['sasl_mechanism'], response.enabled_mechanisms))) + % (self.config['sasl_mechanism'], response.mechanisms))) else: self._sasl_authenticate(future) diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index e413ccd5d..250d20ea6 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -757,14 +757,14 @@ def _handle_fetch_response(self, node_id, fetch_offsets, send_time, response): return partitions = set([TopicPartition(topic, partition_data[0]) - for topic, partitions in response.topics + for topic, partitions in response.responses for partition_data in partitions]) if self._sensors: metric_aggregator = FetchResponseMetricAggregator(self._sensors, partitions) else: metric_aggregator = None - for topic, partitions in response.topics: + for topic, partitions in response.responses: for partition_data in partitions: tp = TopicPartition(topic, partition_data[0]) fetch_offset = fetch_offsets[tp] @@ -922,7 +922,7 @@ def __init__(self, fetch_offset, tp, records, self.aborted_producer_ids = set() self.aborted_transactions = collections.deque( sorted([AbortedTransaction(*data) for data in aborted_transactions] if aborted_transactions else [], - key=lambda txn: txn.first_offset) + key=lambda txn: txn.first_offset) ) self.metric_aggregator = metric_aggregator self.check_crcs = check_crcs @@ -1206,7 +1206,7 @@ def handle_error(self, _exception): def _response_partitions(self, response): return {TopicPartition(topic, partition_data[0]) - for topic, partitions in response.topics + for topic, partitions in response.responses for partition_data in partitions} diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index af8d12143..f5fd1ad83 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -604,10 +604,10 @@ def _handle_join_group_response(self, future, send_time, response): else: self._generation = Generation(response.generation_id, response.member_id, - response.group_protocol) + response.protocol_name) log.info("Successfully joined group %s %s", self.group_id, self._generation) - if response.leader_id == response.member_id: + if response.leader == response.member_id: log.info("Elected group leader -- performing partition" " assignments using %s", self._generation.protocol) self._on_join_leader(response).chain(future) @@ -697,10 +697,13 @@ def _on_join_leader(self, response): Future: resolves to member assignment encoded-bytes """ try: - members = [GroupMember(*member) if response.API_VERSION >= 5 else GroupMember(member[0], None, member[1]) + members = [GroupMember( + member_id=member[0], + group_instance_id=member[1] if response.API_VERSION >= 5 else None, + metadata=member[2] if response.API_VERSION >= 5 else member[1]) for member in response.members] - group_assignment = self._perform_assignment(response.leader_id, - response.group_protocol, + group_assignment = self._perform_assignment(response.leader, + response.protocol_name, members) except Exception as e: return Future().failure(e) @@ -747,7 +750,7 @@ def _handle_sync_group_response(self, future, send_time, response): if error_type is Errors.NoError: if self._sensors: self._sensors.sync_latency.record((time.monotonic() - send_time) * 1000) - future.success(response.member_assignment) + future.success(response.assignment) return # Always rejoin on error @@ -802,12 +805,12 @@ def _send_group_coordinator_request(self): self.group_id, node_id, request) future = Future() _f = self._client.send(node_id, request) - _f.add_callback(self._handle_group_coordinator_response, future) + _f.add_callback(self._handle_find_coordinator_response, future) _f.add_errback(self._failed_request, node_id, request, future) return future - def _handle_group_coordinator_response(self, future, response): - log.debug("Received group coordinator response %s", response) + def _handle_find_coordinator_response(self, future, response): + log.debug("Received find coordinator response %s", response) error_type = Errors.for_code(response.error_code) if error_type is Errors.NoError: diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index 25da9fe1b..f0f289f26 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -334,7 +334,7 @@ def _perform_assignment(self, leader_id, assignment_strategy, members): all_subscribed_topics = set() for member in members: subscription = Subscription( - ConsumerProtocol[0].METADATA.decode(member.metadata_bytes), + ConsumerProtocol[0].METADATA.decode(member.metadata), member.group_instance_id ) member_subscriptions[member.member_id] = subscription diff --git a/kafka/producer/sender.py b/kafka/producer/sender.py index 1463ec8ba..f1484b0ce 100644 --- a/kafka/producer/sender.py +++ b/kafka/producer/sender.py @@ -405,7 +405,7 @@ def _handle_produce_response(self, node_id, send_time, batches, response): batches_by_partition = dict([(batch.topic_partition, batch) for batch in batches]) - for topic, partitions in response.topics: + for topic, partitions in response.responses: for partition_info in partitions: log_append_time = -1 log_start_offset = -1 @@ -609,17 +609,17 @@ def _produce_request(self, node_id, acks, timeout, batches): if version >= 3: return ProduceRequest[version]( transactional_id=transactional_id, - required_acks=acks, - timeout=timeout, - topics=topic_partition_data, + acks=acks, + timeout_ms=timeout, + topic_data=topic_partition_data, ) else: if transactional_id is not None: log.warning('%s: Broker does not support ProduceRequest v3+, required for transactional_id', str(self)) return ProduceRequest[version]( - required_acks=acks, - timeout=timeout, - topics=topic_partition_data, + acks=acks, + timeout_ms=timeout, + topic_data=topic_partition_data, ) def wakeup(self): diff --git a/kafka/producer/transaction_manager.py b/kafka/producer/transaction_manager.py index 6c7a5f5b4..6ebf748e9 100644 --- a/kafka/producer/transaction_manager.py +++ b/kafka/producer/transaction_manager.py @@ -658,10 +658,10 @@ def __init__(self, transaction_manager, topic_partitions): for tp in topic_partitions: topic_data[tp.topic].append(tp.partition) self.request = AddPartitionsToTxnRequest[version]( - transactional_id=self.transactional_id, - producer_id=self.producer_id, - producer_epoch=self.producer_epoch, - topics=list(topic_data.items())) + v3_and_below_transactional_id=self.transactional_id, + v3_and_below_producer_id=self.producer_id, + v3_and_below_producer_epoch=self.producer_epoch, + v3_and_below_topics=list(topic_data.items())) @property def priority(self): @@ -673,7 +673,7 @@ def handle_response(self, response): self.retry_backoff_ms = self.transaction_manager.retry_backoff_ms results = {TopicPartition(topic, partition): Errors.for_code(error_code) - for topic, partition_data in response.results + for topic, partition_data in response.results_by_topic_v3_and_below for partition, error_code in partition_data} for tp, error in results.items(): diff --git a/kafka/protocol/group.py b/kafka/protocol/group.py index 71da0b6fe..4e5905798 100644 --- a/kafka/protocol/group.py +++ b/kafka/protocol/group.py @@ -8,7 +8,7 @@ DEFAULT_GENERATION_ID = -1 UNKNOWN_MEMBER_ID = '' -GroupMember = collections.namedtuple("GroupMember", ["member_id", "group_instance_id", "metadata_bytes"]) +GroupMember = collections.namedtuple("GroupMember", ["member_id", "group_instance_id", "metadata"]) GroupMember.__new__.__defaults__ = (None,) * len(GroupMember._fields) diff --git a/test/integration/fixtures.py b/test/integration/fixtures.py index 1723a3bd5..245bdbfb8 100644 --- a/test/integration/fixtures.py +++ b/test/integration/fixtures.py @@ -614,7 +614,7 @@ def _create_topic_via_admin_api(self, topic_name, num_partitions, replication_fa request = CreateTopicsRequest[0]([(topic_name, num_partitions, replication_factor, [], [])], timeout_ms) response = self._send_request(request, timeout=timeout_ms) - for topic_result in response.topic_errors: + for topic_result in response.topics: error_code = topic_result[1] if error_code != 0: raise errors.for_code(error_code) diff --git a/test/integration/test_admin_integration.py b/test/integration/test_admin_integration.py index d47663465..5a3a83d2b 100644 --- a/test/integration/test_admin_integration.py +++ b/test/integration/test_admin_integration.py @@ -103,9 +103,9 @@ def test_describe_configs_broker_resource_returns_configs(kafka_admin_client): configs = kafka_admin_client.describe_configs([ConfigResource(ConfigResourceType.BROKER, broker_id)]) assert len(configs) == 1 - assert configs[0].resources[0][2] == ConfigResourceType.BROKER - assert configs[0].resources[0][3] == str(broker_id) - assert len(configs[0].resources[0][4]) > 1 + assert configs[0].results[0][2] == ConfigResourceType.BROKER + assert configs[0].results[0][3] == str(broker_id) + assert len(configs[0].results[0][4]) > 1 @pytest.mark.xfail(condition=True, @@ -118,9 +118,9 @@ def test_describe_configs_topic_resource_returns_configs(topic, kafka_admin_clie configs = kafka_admin_client.describe_configs([ConfigResource(ConfigResourceType.TOPIC, topic)]) assert len(configs) == 1 - assert configs[0].resources[0][2] == ConfigResourceType.TOPIC - assert configs[0].resources[0][3] == topic - assert len(configs[0].resources[0][4]) > 1 + assert configs[0].results[0][2] == ConfigResourceType.TOPIC + assert configs[0].results[0][3] == topic + assert len(configs[0].results[0][4]) > 1 @pytest.mark.skipif(env_kafka_version() < (0, 11), reason="Describe config features require broker >=0.11") @@ -135,11 +135,11 @@ def test_describe_configs_mixed_resources_returns_configs(topic, kafka_admin_cli assert len(configs) == 2 for config in configs: - assert (config.resources[0][2] == ConfigResourceType.TOPIC - and config.resources[0][3] == topic) or \ - (config.resources[0][2] == ConfigResourceType.BROKER - and config.resources[0][3] == str(broker_id)) - assert len(config.resources[0][4]) > 1 + assert (config.results[0][2] == ConfigResourceType.TOPIC + and config.results[0][3] == topic) or \ + (config.results[0][2] == ConfigResourceType.BROKER + and config.results[0][3] == str(broker_id)) + assert len(config.results[0][4]) > 1 @pytest.mark.skipif(env_kafka_version() < (0, 11), reason="Describe config features require broker >=0.11") diff --git a/test/protocol/new/test_api_compatibility.py b/test/protocol/new/test_api_compatibility.py index ddebdcb46..1c86d9da4 100644 --- a/test/protocol/new/test_api_compatibility.py +++ b/test/protocol/new/test_api_compatibility.py @@ -44,7 +44,7 @@ # ApiVersionsResponse_v3 (with no header for simplicity in this golden sample) # error_code = 0 -# api_versions = [ +# api_keys = [ # {'api_key': 0, 'min_version': 0, 'max_version': 9, 'tags': {}}, # Produce # {'api_key': 1, 'min_version': 0, 'max_version': 10, 'tags': {}}, # Fetch # ] @@ -52,7 +52,7 @@ # tags = {} # Response Body: # error_code (Int16) = 0 (0x0000) -# api_versions (CompactArray): +# api_keys (CompactArray): # len(2) -> 0x03 (VarInt) # Item 1: # api_key (Int16) = 0 (0x0000) @@ -133,7 +133,7 @@ def test_old_system_encode_decode_response(): # Old system encoding response = ApiVersionsResponse[3]( error_code=0, - api_versions=[ + api_keys=[ #{'api_key': 0, 'min_version': 0, 'max_version': 9}, (0, 0, 9), #{'api_key': 1, 'min_version': 0, 'max_version': 10} @@ -148,17 +148,17 @@ def test_old_system_encode_decode_response(): decoded_response = ApiVersionsResponse[3].decode(BytesIO(encoded_body)) assert decoded_response.error_code == 0 - assert len(decoded_response.api_versions) == 2 - assert decoded_response.api_versions[0][0] == 0 # api_key - assert decoded_response.api_versions[0][1] == 0 # min_version - assert decoded_response.api_versions[0][2] == 9 # max_version - #assert decoded_response.api_versions[0].tags == {} - assert decoded_response.api_versions[1][0] == 1 # api_key - assert decoded_response.api_versions[1][1] == 0 # min_version - assert decoded_response.api_versions[1][2] == 10 # max_version - #assert decoded_response.api_versions[1].tags == {} + assert len(decoded_response.api_keys) == 2 + assert decoded_response.api_keys[0][0] == 0 # api_key + assert decoded_response.api_keys[0][1] == 0 # min_version + assert decoded_response.api_keys[0][2] == 9 # max_version + #assert decoded_response.api_keys[0].tags == {} + assert decoded_response.api_keys[1][0] == 1 # api_key + assert decoded_response.api_keys[1][1] == 0 # min_version + assert decoded_response.api_keys[1][2] == 10 # max_version + #assert decoded_response.api_keys[1].tags == {} assert decoded_response.throttle_time_ms == 0 - #assert decoded_response.tags == {} + assert decoded_response.tags == {} def test_new_system_nested_field_access(): diff --git a/test/protocol/new/test_new_parser.py b/test/protocol/new/test_new_parser.py index f53a96177..4ac034ea9 100644 --- a/test/protocol/new/test_new_parser.py +++ b/test/protocol/new/test_new_parser.py @@ -102,7 +102,7 @@ 'messages': ( { 'request': ( - FindCoordinatorRequest[2](key='test-group-CYErjI'), + FindCoordinatorRequest[2]('test-group-CYErjI', 0), b'\x00\x00\x00/\x00\n\x00\x02\x00\x00\x00\x01\x00\x11consumer_thread-1\x00\x11test-group-CYErjI\x00', ), 'response': ( @@ -113,7 +113,7 @@ }, { 'request': ( - FindCoordinatorRequest[2](key='test-group-CYErjI'), + FindCoordinatorRequest[2]('test-group-CYErjI', 0), b'\x00\x00\x00/\x00\n\x00\x02\x00\x00\x00\x02\x00\x11consumer_thread-1\x00\x11test-group-CYErjI\x00', ), 'response': ( diff --git a/test/protocol/test_api.py b/test/protocol/test_api.py index 60a368d9d..15b10b606 100644 --- a/test/protocol/test_api.py +++ b/test/protocol/test_api.py @@ -4,7 +4,6 @@ from kafka.protocol.api import RequestHeader from kafka.protocol.fetch import FetchRequest -from kafka.protocol.find_coordinator import FindCoordinatorRequest from kafka.protocol.metadata import MetadataRequest @@ -17,8 +16,7 @@ def test_encode_message_header(): b'client3', # ClientId ]) - req = FindCoordinatorRequest[0]('foo') - header = RequestHeader(api_key=req.API_KEY, api_version=req.API_VERSION, correlation_id=4, client_id='client3') + header = RequestHeader(api_key=10, api_version=0, correlation_id=4, client_id='client3') assert header.encode() == expect diff --git a/test/protocol/test_fetch.py b/test/protocol/test_fetch.py index 993df9c89..847986da4 100644 --- a/test/protocol/test_fetch.py +++ b/test/protocol/test_fetch.py @@ -57,8 +57,8 @@ def test_decode_fetch_response_partial(): b'ar', # Value (truncated) ]) resp = FetchResponse[0].decode(io.BytesIO(encoded)) - assert len(resp.topics) == 1 - topic, partitions = resp.topics[0] + assert len(resp.responses) == 1 + topic, partitions = resp.responses[0] assert topic == 'foobar' assert len(partitions) == 2 diff --git a/test/protocol/test_parser.py b/test/protocol/test_parser.py index 7397c5301..cf4909374 100644 --- a/test/protocol/test_parser.py +++ b/test/protocol/test_parser.py @@ -123,37 +123,37 @@ 'messages': ( { 'request': ( - JoinGroupRequest[5](group='test-group-IrXFAX', session_timeout=10000, rebalance_timeout=300000, member_id='', group_instance_id=None, protocol_type='consumer', group_protocols=[('range', b'\x00\x00\x00\x00\x00\x01\x00\x15test_group_fNFDXXKzKt\x00\x00\x00\x00'), ('roundrobin', b'\x00\x00\x00\x00\x00\x01\x00\x15test_group_fNFDXXKzKt\x00\x00\x00\x00')]), + JoinGroupRequest[5](group_id='test-group-IrXFAX', session_timeout_ms=10000, rebalance_timeout_ms=300000, member_id='', group_instance_id=None, protocol_type='consumer', protocols=[('range', b'\x00\x00\x00\x00\x00\x01\x00\x15test_group_fNFDXXKzKt\x00\x00\x00\x00'), ('roundrobin', b'\x00\x00\x00\x00\x00\x01\x00\x15test_group_fNFDXXKzKt\x00\x00\x00\x00')]), b"\x00\x00\x00\xa5\x00\x0b\x00\x05\x00\x00\x00\x01\x00\x11consumer_thread-0\x00\x11test-group-IrXFAX\x00\x00'\x10\x00\x04\x93\xe0\x00\x00\xff\xff\x00\x08consumer\x00\x00\x00\x02\x00\x05range\x00\x00\x00!\x00\x00\x00\x00\x00\x01\x00\x15test_group_fNFDXXKzKt\x00\x00\x00\x00\x00\nroundrobin\x00\x00\x00!\x00\x00\x00\x00\x00\x01\x00\x15test_group_fNFDXXKzKt\x00\x00\x00\x00", ), 'response': ( b'\x00\x00\x00N\x00\x00\x00\x01\x00\x00\x00\x00\x00O\xff\xff\xff\xff\x00\x00\x00\x00\x006consumer_thread-0-b3c8845c-6071-48bc-9cf1-372f84c4409b\x00\x00\x00\x00', 1, - JoinGroupResponse[5](throttle_time_ms=0, error_code=79, generation_id=-1, group_protocol='', leader_id='', member_id='consumer_thread-0-b3c8845c-6071-48bc-9cf1-372f84c4409b', members=[]), + JoinGroupResponse[5](throttle_time_ms=0, error_code=79, generation_id=-1, protocol_name='', leader='', member_id='consumer_thread-0-b3c8845c-6071-48bc-9cf1-372f84c4409b', members=[]), ), }, { 'request': ( - JoinGroupRequest[5](group='test-group-IrXFAX', session_timeout=10000, rebalance_timeout=300000, member_id='consumer_thread-0-b3c8845c-6071-48bc-9cf1-372f84c4409b', group_instance_id=None, protocol_type='consumer', group_protocols=[('range', b'\x00\x00\x00\x00\x00\x01\x00\x15test_group_fNFDXXKzKt\x00\x00\x00\x00'), ('roundrobin', b'\x00\x00\x00\x00\x00\x01\x00\x15test_group_fNFDXXKzKt\x00\x00\x00\x00')]), + JoinGroupRequest[5](group_id='test-group-IrXFAX', session_timeout_ms=10000, rebalance_timeout_ms=300000, member_id='consumer_thread-0-b3c8845c-6071-48bc-9cf1-372f84c4409b', group_instance_id=None, protocol_type='consumer', protocols=[('range', b'\x00\x00\x00\x00\x00\x01\x00\x15test_group_fNFDXXKzKt\x00\x00\x00\x00'), ('roundrobin', b'\x00\x00\x00\x00\x00\x01\x00\x15test_group_fNFDXXKzKt\x00\x00\x00\x00')]), b"\x00\x00\x00\xdb\x00\x0b\x00\x05\x00\x00\x00\x02\x00\x11consumer_thread-0\x00\x11test-group-IrXFAX\x00\x00'\x10\x00\x04\x93\xe0\x006consumer_thread-0-b3c8845c-6071-48bc-9cf1-372f84c4409b\xff\xff\x00\x08consumer\x00\x00\x00\x02\x00\x05range\x00\x00\x00!\x00\x00\x00\x00\x00\x01\x00\x15test_group_fNFDXXKzKt\x00\x00\x00\x00\x00\nroundrobin\x00\x00\x00!\x00\x00\x00\x00\x00\x01\x00\x15test_group_fNFDXXKzKt\x00\x00\x00\x00", ), 'response': ( b'\x00\x00\x02\x05\x00\x00\x00\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x05range\x006consumer_thread-0-b3c8845c-6071-48bc-9cf1-372f84c4409b\x006consumer_thread-0-b3c8845c-6071-48bc-9cf1-372f84c4409b\x00\x00\x00\x04\x006consumer_thread-1-a1e534ee-01a9-4c45-9dfd-778bbac550b7\xff\xff\x00\x00\x00!\x00\x00\x00\x00\x00\x01\x00\x15test_group_fNFDXXKzKt\x00\x00\x00\x00\x006consumer_thread-3-5c966b06-f0da-4ba9-9d89-38e9da865b8c\xff\xff\x00\x00\x00!\x00\x00\x00\x00\x00\x01\x00\x15test_group_fNFDXXKzKt\x00\x00\x00\x00\x006consumer_thread-0-b3c8845c-6071-48bc-9cf1-372f84c4409b\xff\xff\x00\x00\x00!\x00\x00\x00\x00\x00\x01\x00\x15test_group_fNFDXXKzKt\x00\x00\x00\x00\x006consumer_thread-2-01a02bdb-5425-4bfd-979f-ddff280cecd8\xff\xff\x00\x00\x00!\x00\x00\x00\x00\x00\x01\x00\x15test_group_fNFDXXKzKt\x00\x00\x00\x00', 2, - JoinGroupResponse[5](throttle_time_ms=0, error_code=0, generation_id=1, group_protocol='range', leader_id='consumer_thread-0-b3c8845c-6071-48bc-9cf1-372f84c4409b', member_id='consumer_thread-0-b3c8845c-6071-48bc-9cf1-372f84c4409b', members=[('consumer_thread-1-a1e534ee-01a9-4c45-9dfd-778bbac550b7', None, b'\x00\x00\x00\x00\x00\x01\x00\x15test_group_fNFDXXKzKt\x00\x00\x00\x00'), ('consumer_thread-3-5c966b06-f0da-4ba9-9d89-38e9da865b8c', None, b'\x00\x00\x00\x00\x00\x01\x00\x15test_group_fNFDXXKzKt\x00\x00\x00\x00'), ('consumer_thread-0-b3c8845c-6071-48bc-9cf1-372f84c4409b', None, b'\x00\x00\x00\x00\x00\x01\x00\x15test_group_fNFDXXKzKt\x00\x00\x00\x00'), ('consumer_thread-2-01a02bdb-5425-4bfd-979f-ddff280cecd8', None, b'\x00\x00\x00\x00\x00\x01\x00\x15test_group_fNFDXXKzKt\x00\x00\x00\x00')]), + JoinGroupResponse[5](throttle_time_ms=0, error_code=0, generation_id=1, protocol_name='range', leader='consumer_thread-0-b3c8845c-6071-48bc-9cf1-372f84c4409b', member_id='consumer_thread-0-b3c8845c-6071-48bc-9cf1-372f84c4409b', members=[('consumer_thread-1-a1e534ee-01a9-4c45-9dfd-778bbac550b7', None, b'\x00\x00\x00\x00\x00\x01\x00\x15test_group_fNFDXXKzKt\x00\x00\x00\x00'), ('consumer_thread-3-5c966b06-f0da-4ba9-9d89-38e9da865b8c', None, b'\x00\x00\x00\x00\x00\x01\x00\x15test_group_fNFDXXKzKt\x00\x00\x00\x00'), ('consumer_thread-0-b3c8845c-6071-48bc-9cf1-372f84c4409b', None, b'\x00\x00\x00\x00\x00\x01\x00\x15test_group_fNFDXXKzKt\x00\x00\x00\x00'), ('consumer_thread-2-01a02bdb-5425-4bfd-979f-ddff280cecd8', None, b'\x00\x00\x00\x00\x00\x01\x00\x15test_group_fNFDXXKzKt\x00\x00\x00\x00')]), ), }, { 'request': ( - SyncGroupRequest[3](group='test-group-IrXFAX', generation_id=1, member_id='consumer_thread-0-b3c8845c-6071-48bc-9cf1-372f84c4409b', group_instance_id=None, group_assignment=[('consumer_thread-1-a1e534ee-01a9-4c45-9dfd-778bbac550b7', b'\x00\x00\x00\x00\x00\x01\x00\x15test_group_fNFDXXKzKt\x00\x00\x00\x01\x00\x00\x00\x01\x00\x00\x00\x00'), ('consumer_thread-3-5c966b06-f0da-4ba9-9d89-38e9da865b8c', b'\x00\x00\x00\x00\x00\x01\x00\x15test_group_fNFDXXKzKt\x00\x00\x00\x01\x00\x00\x00\x03\x00\x00\x00\x00'), ('consumer_thread-0-b3c8845c-6071-48bc-9cf1-372f84c4409b', b'\x00\x00\x00\x00\x00\x01\x00\x15test_group_fNFDXXKzKt\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00'), ('consumer_thread-2-01a02bdb-5425-4bfd-979f-ddff280cecd8', b'\x00\x00\x00\x00\x00\x01\x00\x15test_group_fNFDXXKzKt\x00\x00\x00\x01\x00\x00\x00\x02\x00\x00\x00\x00')]), + SyncGroupRequest[3](group_id='test-group-IrXFAX', generation_id=1, member_id='consumer_thread-0-b3c8845c-6071-48bc-9cf1-372f84c4409b', group_instance_id=None, assignments=[('consumer_thread-1-a1e534ee-01a9-4c45-9dfd-778bbac550b7', b'\x00\x00\x00\x00\x00\x01\x00\x15test_group_fNFDXXKzKt\x00\x00\x00\x01\x00\x00\x00\x01\x00\x00\x00\x00'), ('consumer_thread-3-5c966b06-f0da-4ba9-9d89-38e9da865b8c', b'\x00\x00\x00\x00\x00\x01\x00\x15test_group_fNFDXXKzKt\x00\x00\x00\x01\x00\x00\x00\x03\x00\x00\x00\x00'), ('consumer_thread-0-b3c8845c-6071-48bc-9cf1-372f84c4409b', b'\x00\x00\x00\x00\x00\x01\x00\x15test_group_fNFDXXKzKt\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00'), ('consumer_thread-2-01a02bdb-5425-4bfd-979f-ddff280cecd8', b'\x00\x00\x00\x00\x00\x01\x00\x15test_group_fNFDXXKzKt\x00\x00\x00\x01\x00\x00\x00\x02\x00\x00\x00\x00')]), b'\x00\x00\x02\x04\x00\x0e\x00\x03\x00\x00\x00\x03\x00\x11consumer_thread-0\x00\x11test-group-IrXFAX\x00\x00\x00\x01\x006consumer_thread-0-b3c8845c-6071-48bc-9cf1-372f84c4409b\xff\xff\x00\x00\x00\x04\x006consumer_thread-1-a1e534ee-01a9-4c45-9dfd-778bbac550b7\x00\x00\x00)\x00\x00\x00\x00\x00\x01\x00\x15test_group_fNFDXXKzKt\x00\x00\x00\x01\x00\x00\x00\x01\x00\x00\x00\x00\x006consumer_thread-3-5c966b06-f0da-4ba9-9d89-38e9da865b8c\x00\x00\x00)\x00\x00\x00\x00\x00\x01\x00\x15test_group_fNFDXXKzKt\x00\x00\x00\x01\x00\x00\x00\x03\x00\x00\x00\x00\x006consumer_thread-0-b3c8845c-6071-48bc-9cf1-372f84c4409b\x00\x00\x00)\x00\x00\x00\x00\x00\x01\x00\x15test_group_fNFDXXKzKt\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x006consumer_thread-2-01a02bdb-5425-4bfd-979f-ddff280cecd8\x00\x00\x00)\x00\x00\x00\x00\x00\x01\x00\x15test_group_fNFDXXKzKt\x00\x00\x00\x01\x00\x00\x00\x02\x00\x00\x00\x00', ), 'response': ( b'\x00\x00\x007\x00\x00\x00\x03\x00\x00\x00\x00\x00\x00\x00\x00\x00)\x00\x00\x00\x00\x00\x01\x00\x15test_group_fNFDXXKzKt\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00', 3, - SyncGroupResponse[3](throttle_time_ms=0, error_code=0, member_assignment=b'\x00\x00\x00\x00\x00\x01\x00\x15test_group_fNFDXXKzKt\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00'), + SyncGroupResponse[3](throttle_time_ms=0, error_code=0, assignment=b'\x00\x00\x00\x00\x00\x01\x00\x15test_group_fNFDXXKzKt\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00'), ), }, @@ -171,7 +171,7 @@ { 'request': ( - HeartbeatRequest[3](group='test-group-IrXFAX', generation_id=1, member_id='consumer_thread-0-b3c8845c-6071-48bc-9cf1-372f84c4409b', group_instance_id=None), + HeartbeatRequest[3](group_id='test-group-IrXFAX', generation_id=1, member_id='consumer_thread-0-b3c8845c-6071-48bc-9cf1-372f84c4409b', group_instance_id=None), b'\x00\x00\x00l\x00\x0c\x00\x03\x00\x00\x00\x05\x00\x11consumer_thread-0\x00\x11test-group-IrXFAX\x00\x00\x00\x01\x006consumer_thread-0-b3c8845c-6071-48bc-9cf1-372f84c4409b\xff\xff', ), 'response': ( @@ -195,7 +195,7 @@ { 'request': ( - LeaveGroupRequest[3](group='test-group-IrXFAX', members=[('consumer_thread-0-b3c8845c-6071-48bc-9cf1-372f84c4409b', None)]), + LeaveGroupRequest[3](group_id='test-group-IrXFAX', members=[('consumer_thread-0-b3c8845c-6071-48bc-9cf1-372f84c4409b', None)]), b'\x00\x00\x00l\x00\r\x00\x03\x00\x00\x00\x07\x00\x11consumer_thread-0\x00\x11test-group-IrXFAX\x00\x00\x00\x01\x006consumer_thread-0-b3c8845c-6071-48bc-9cf1-372f84c4409b\xff\xff', ), 'response': (