diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index f5fd1ad83..f87e31618 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -11,10 +11,10 @@ from kafka.future import Future from kafka.metrics import AnonMeasurable from kafka.metrics.stats import Avg, Count, Max, Rate -from kafka.protocol.find_coordinator import FindCoordinatorRequest -from kafka.protocol.group import ( +from kafka.protocol.new.metadata import FindCoordinatorRequest +from kafka.protocol.new.consumer import ( HeartbeatRequest, JoinGroupRequest, LeaveGroupRequest, SyncGroupRequest, - DEFAULT_GENERATION_ID, UNKNOWN_MEMBER_ID, GroupMember, + DEFAULT_GENERATION_ID, UNKNOWN_MEMBER_ID, ) from kafka.util import Timer @@ -215,7 +215,7 @@ def _perform_assignment(self, leader_id, protocol, members): Arguments: leader_id (str): The id of the leader (which is this member) protocol (str): the chosen group protocol (assignment strategy) - members (list): [GroupMember] from JoinGroupResponse. + members (list): [JoinGroupResponseMember] from JoinGroupResponse. metadata_bytes are associated with the chosen group protocol, and the Coordinator subclass is responsible for decoding metadata_bytes based on that protocol. @@ -697,14 +697,9 @@ def _on_join_leader(self, response): Future: resolves to member assignment encoded-bytes """ try: - members = [GroupMember( - member_id=member[0], - group_instance_id=member[1] if response.API_VERSION >= 5 else None, - metadata=member[2] if response.API_VERSION >= 5 else member[1]) - for member in response.members] group_assignment = self._perform_assignment(response.leader, response.protocol_name, - members) + response.members) except Exception as e: return Future().failure(e) diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index a9335a1a5..d87f26c7e 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -14,7 +14,7 @@ from kafka.future import Future from kafka.metrics import AnonMeasurable from kafka.metrics.stats import Avg, Count, Max, Rate -from kafka.protocol.commit import OffsetCommitRequest, OffsetFetchRequest +from kafka.protocol.new.consumer import OffsetCommitRequest, OffsetFetchRequest from kafka.structs import OffsetAndMetadata, TopicPartition from kafka.util import Timer, WeakMethod diff --git a/test/test_coordinator.py b/test/test_coordinator.py index 3af922696..4d56c1e8b 100644 --- a/test/test_coordinator.py +++ b/test/test_coordinator.py @@ -4,8 +4,7 @@ import pytest from kafka.client_async import KafkaClient -from kafka.consumer.subscription_state import ( - SubscriptionState, ConsumerRebalanceListener) +from kafka.consumer.subscription_state import SubscriptionState, ConsumerRebalanceListener from kafka.coordinator.assignors.range import RangePartitionAssignor from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor from kafka.coordinator.assignors.sticky.sticky_assignor import StickyPartitionAssignor @@ -17,11 +16,12 @@ import kafka.errors as Errors from kafka.future import Future from kafka.protocol.broker_api_versions import BROKER_API_VERSIONS -from kafka.protocol.commit import ( +from kafka.protocol.new.consumer import ( OffsetCommitRequest, OffsetCommitResponse, - OffsetFetchRequest, OffsetFetchResponse) -from kafka.protocol.group import GroupMember -from kafka.protocol.metadata import MetadataResponse + OffsetFetchRequest, OffsetFetchResponse, + JoinGroupResponse, +) +from kafka.protocol.new.metadata import MetadataResponse from kafka.structs import OffsetAndMetadata, TopicPartition from kafka.util import WeakMethod @@ -207,8 +207,10 @@ def test_perform_assignment(mocker, coordinator): ret = coordinator._perform_assignment( 'member-foo', 'roundrobin', - [GroupMember(member, None, subscription.encode()) - for member, subscription in group_subscriptions.items()]) + [JoinGroupResponse.JoinGroupResponseMember( + member_id=member_id, + metadata=subscription.encode(), + ) for member_id, subscription in group_subscriptions.items()]) assert RoundRobinPartitionAssignor.assign.call_count == 1 RoundRobinPartitionAssignor.assign.assert_called_with( @@ -302,7 +304,7 @@ def test_close(mocker, coordinator): assert coordinator._maybe_auto_commit_offsets_sync.call_count == 1 coordinator._handle_leave_group_response.assert_called_with('foobar') - assert coordinator.generation() is None + assert coordinator.generation_if_stable() is None assert coordinator._generation == Generation.NO_GENERATION assert coordinator.state is MemberState.UNJOINED assert coordinator.rejoin_needed is True @@ -444,23 +446,23 @@ def test_send_offset_commit_request_fail(mocker, patched_coord, offsets): assert isinstance(ret.exception, Errors.CoordinatorNotAvailableError) -@pytest.mark.parametrize('api_version,req_type', [ - ((0, 8, 1), OffsetCommitRequest[0]), - ((0, 8, 2), OffsetCommitRequest[1]), - ((0, 9), OffsetCommitRequest[2]), - ((0, 11), OffsetCommitRequest[3]), - ((2, 0), OffsetCommitRequest[4]), - ((2, 1), OffsetCommitRequest[6]), +@pytest.mark.parametrize('api_version,version', [ + ((0, 8, 1), 0), + ((0, 8, 2), 1), + ((0, 9), 2), + ((0, 11), 3), + ((2, 0), 4), + ((2, 1), 6), ]) def test_send_offset_commit_request_versions(patched_coord, offsets, - api_version, req_type): + api_version, version): expect_node = 0 patched_coord._client._api_versions = BROKER_API_VERSIONS[api_version] patched_coord._send_offset_commit_request(offsets) (node, request), _ = patched_coord._client.send.call_args assert node == expect_node, 'Unexpected coordinator node' - assert isinstance(request, req_type) + assert request.API_VERSION == version def test_send_offset_commit_request_failure(patched_coord, offsets): @@ -557,17 +559,17 @@ def test_send_offset_fetch_request_fail(mocker, patched_coord, partitions): assert isinstance(ret.exception, Errors.CoordinatorNotAvailableError) -@pytest.mark.parametrize('api_version,req_type', [ - ((0, 8, 1), OffsetFetchRequest[0]), - ((0, 8, 2), OffsetFetchRequest[1]), - ((0, 9), OffsetFetchRequest[1]), - ((0, 10, 2), OffsetFetchRequest[2]), - ((0, 11), OffsetFetchRequest[3]), - ((2, 0), OffsetFetchRequest[4]), - ((2, 1), OffsetFetchRequest[5]), +@pytest.mark.parametrize('api_version,version', [ + ((0, 8, 1), 0), + ((0, 8, 2), 1), + ((0, 9), 1), + ((0, 10, 2), 2), + ((0, 11), 3), + ((2, 0), 4), + ((2, 1), 5), ]) def test_send_offset_fetch_request_versions(patched_coord, partitions, - api_version, req_type): + api_version, version): # assuming fixture sets coordinator=0, least_loaded_node=1 expect_node = 0 patched_coord._client._api_versions = BROKER_API_VERSIONS[api_version] @@ -575,7 +577,7 @@ def test_send_offset_fetch_request_versions(patched_coord, partitions, patched_coord._send_offset_fetch_request(partitions) (node, request), _ = patched_coord._client.send.call_args assert node == expect_node, 'Unexpected coordinator node' - assert isinstance(request, req_type) + assert request.API_VERSION == version def test_send_offset_fetch_request_failure(patched_coord, partitions):