From 2d6aec1ade941d38ca98224381ab7556004c8342 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 18 Mar 2026 09:09:26 -0700 Subject: [PATCH 1/4] kafka.coordinator -> new FindCoordinatorRequest/Response --- kafka/coordinator/base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index f5fd1ad83..c946603e6 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -11,7 +11,7 @@ from kafka.future import Future from kafka.metrics import AnonMeasurable from kafka.metrics.stats import Avg, Count, Max, Rate -from kafka.protocol.find_coordinator import FindCoordinatorRequest +from kafka.protocol.new.metadata import FindCoordinatorRequest from kafka.protocol.group import ( HeartbeatRequest, JoinGroupRequest, LeaveGroupRequest, SyncGroupRequest, DEFAULT_GENERATION_ID, UNKNOWN_MEMBER_ID, GroupMember, From c4a1d777db39c16e50a752675a56eab23ddc21f7 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Thu, 19 Mar 2026 11:43:54 -0700 Subject: [PATCH 2/4] kafka.coordinator -> new group apis --- kafka/coordinator/base.py | 13 ++++--------- kafka/coordinator/consumer.py | 2 +- 2 files changed, 5 insertions(+), 10 deletions(-) diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index c946603e6..f87e31618 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -12,9 +12,9 @@ from kafka.metrics import AnonMeasurable from kafka.metrics.stats import Avg, Count, Max, Rate from kafka.protocol.new.metadata import FindCoordinatorRequest -from kafka.protocol.group import ( +from kafka.protocol.new.consumer import ( HeartbeatRequest, JoinGroupRequest, LeaveGroupRequest, SyncGroupRequest, - DEFAULT_GENERATION_ID, UNKNOWN_MEMBER_ID, GroupMember, + DEFAULT_GENERATION_ID, UNKNOWN_MEMBER_ID, ) from kafka.util import Timer @@ -215,7 +215,7 @@ def _perform_assignment(self, leader_id, protocol, members): Arguments: leader_id (str): The id of the leader (which is this member) protocol (str): the chosen group protocol (assignment strategy) - members (list): [GroupMember] from JoinGroupResponse. + members (list): [JoinGroupResponseMember] from JoinGroupResponse. metadata_bytes are associated with the chosen group protocol, and the Coordinator subclass is responsible for decoding metadata_bytes based on that protocol. @@ -697,14 +697,9 @@ def _on_join_leader(self, response): Future: resolves to member assignment encoded-bytes """ try: - members = [GroupMember( - member_id=member[0], - group_instance_id=member[1] if response.API_VERSION >= 5 else None, - metadata=member[2] if response.API_VERSION >= 5 else member[1]) - for member in response.members] group_assignment = self._perform_assignment(response.leader, response.protocol_name, - members) + response.members) except Exception as e: return Future().failure(e) diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index a9335a1a5..d87f26c7e 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -14,7 +14,7 @@ from kafka.future import Future from kafka.metrics import AnonMeasurable from kafka.metrics.stats import Avg, Count, Max, Rate -from kafka.protocol.commit import OffsetCommitRequest, OffsetFetchRequest +from kafka.protocol.new.consumer import OffsetCommitRequest, OffsetFetchRequest from kafka.structs import OffsetAndMetadata, TopicPartition from kafka.util import Timer, WeakMethod From 42c8091fc84451846a92d1023c69b6f1940ab365 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sat, 21 Mar 2026 12:03:37 -0700 Subject: [PATCH 3/4] use new Offset and Metadata protocol defs in test_coordinator --- test/test_coordinator.py | 48 ++++++++++++++++++++-------------------- 1 file changed, 24 insertions(+), 24 deletions(-) diff --git a/test/test_coordinator.py b/test/test_coordinator.py index 3af922696..4422a9249 100644 --- a/test/test_coordinator.py +++ b/test/test_coordinator.py @@ -4,8 +4,7 @@ import pytest from kafka.client_async import KafkaClient -from kafka.consumer.subscription_state import ( - SubscriptionState, ConsumerRebalanceListener) +from kafka.consumer.subscription_state import SubscriptionState, ConsumerRebalanceListener from kafka.coordinator.assignors.range import RangePartitionAssignor from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor from kafka.coordinator.assignors.sticky.sticky_assignor import StickyPartitionAssignor @@ -17,11 +16,12 @@ import kafka.errors as Errors from kafka.future import Future from kafka.protocol.broker_api_versions import BROKER_API_VERSIONS -from kafka.protocol.commit import ( +from kafka.protocol.new.consumer import ( OffsetCommitRequest, OffsetCommitResponse, - OffsetFetchRequest, OffsetFetchResponse) + OffsetFetchRequest, OffsetFetchResponse, +) from kafka.protocol.group import GroupMember -from kafka.protocol.metadata import MetadataResponse +from kafka.protocol.new.metadata import MetadataResponse from kafka.structs import OffsetAndMetadata, TopicPartition from kafka.util import WeakMethod @@ -444,23 +444,23 @@ def test_send_offset_commit_request_fail(mocker, patched_coord, offsets): assert isinstance(ret.exception, Errors.CoordinatorNotAvailableError) -@pytest.mark.parametrize('api_version,req_type', [ - ((0, 8, 1), OffsetCommitRequest[0]), - ((0, 8, 2), OffsetCommitRequest[1]), - ((0, 9), OffsetCommitRequest[2]), - ((0, 11), OffsetCommitRequest[3]), - ((2, 0), OffsetCommitRequest[4]), - ((2, 1), OffsetCommitRequest[6]), +@pytest.mark.parametrize('api_version,version', [ + ((0, 8, 1), 0), + ((0, 8, 2), 1), + ((0, 9), 2), + ((0, 11), 3), + ((2, 0), 4), + ((2, 1), 6), ]) def test_send_offset_commit_request_versions(patched_coord, offsets, - api_version, req_type): + api_version, version): expect_node = 0 patched_coord._client._api_versions = BROKER_API_VERSIONS[api_version] patched_coord._send_offset_commit_request(offsets) (node, request), _ = patched_coord._client.send.call_args assert node == expect_node, 'Unexpected coordinator node' - assert isinstance(request, req_type) + assert request.API_VERSION == version def test_send_offset_commit_request_failure(patched_coord, offsets): @@ -557,17 +557,17 @@ def test_send_offset_fetch_request_fail(mocker, patched_coord, partitions): assert isinstance(ret.exception, Errors.CoordinatorNotAvailableError) -@pytest.mark.parametrize('api_version,req_type', [ - ((0, 8, 1), OffsetFetchRequest[0]), - ((0, 8, 2), OffsetFetchRequest[1]), - ((0, 9), OffsetFetchRequest[1]), - ((0, 10, 2), OffsetFetchRequest[2]), - ((0, 11), OffsetFetchRequest[3]), - ((2, 0), OffsetFetchRequest[4]), - ((2, 1), OffsetFetchRequest[5]), +@pytest.mark.parametrize('api_version,version', [ + ((0, 8, 1), 0), + ((0, 8, 2), 1), + ((0, 9), 1), + ((0, 10, 2), 2), + ((0, 11), 3), + ((2, 0), 4), + ((2, 1), 5), ]) def test_send_offset_fetch_request_versions(patched_coord, partitions, - api_version, req_type): + api_version, version): # assuming fixture sets coordinator=0, least_loaded_node=1 expect_node = 0 patched_coord._client._api_versions = BROKER_API_VERSIONS[api_version] @@ -575,7 +575,7 @@ def test_send_offset_fetch_request_versions(patched_coord, partitions, patched_coord._send_offset_fetch_request(partitions) (node, request), _ = patched_coord._client.send.call_args assert node == expect_node, 'Unexpected coordinator node' - assert isinstance(request, req_type) + assert request.API_VERSION == version def test_send_offset_fetch_request_failure(patched_coord, partitions): From a2a4cd0eb6d241ed5ed00cf5aaf645c91384f16e Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 22 Mar 2026 21:53:19 -0700 Subject: [PATCH 4/4] test_coordinator --- test/test_coordinator.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/test/test_coordinator.py b/test/test_coordinator.py index 4422a9249..4d56c1e8b 100644 --- a/test/test_coordinator.py +++ b/test/test_coordinator.py @@ -19,8 +19,8 @@ from kafka.protocol.new.consumer import ( OffsetCommitRequest, OffsetCommitResponse, OffsetFetchRequest, OffsetFetchResponse, + JoinGroupResponse, ) -from kafka.protocol.group import GroupMember from kafka.protocol.new.metadata import MetadataResponse from kafka.structs import OffsetAndMetadata, TopicPartition from kafka.util import WeakMethod @@ -207,8 +207,10 @@ def test_perform_assignment(mocker, coordinator): ret = coordinator._perform_assignment( 'member-foo', 'roundrobin', - [GroupMember(member, None, subscription.encode()) - for member, subscription in group_subscriptions.items()]) + [JoinGroupResponse.JoinGroupResponseMember( + member_id=member_id, + metadata=subscription.encode(), + ) for member_id, subscription in group_subscriptions.items()]) assert RoundRobinPartitionAssignor.assign.call_count == 1 RoundRobinPartitionAssignor.assign.assert_called_with( @@ -302,7 +304,7 @@ def test_close(mocker, coordinator): assert coordinator._maybe_auto_commit_offsets_sync.call_count == 1 coordinator._handle_leave_group_response.assert_called_with('foobar') - assert coordinator.generation() is None + assert coordinator.generation_if_stable() is None assert coordinator._generation == Generation.NO_GENERATION assert coordinator.state is MemberState.UNJOINED assert coordinator.rejoin_needed is True