Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 5 additions & 10 deletions kafka/coordinator/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@
from kafka.future import Future
from kafka.metrics import AnonMeasurable
from kafka.metrics.stats import Avg, Count, Max, Rate
from kafka.protocol.find_coordinator import FindCoordinatorRequest
from kafka.protocol.group import (
from kafka.protocol.new.metadata import FindCoordinatorRequest
from kafka.protocol.new.consumer import (
HeartbeatRequest, JoinGroupRequest, LeaveGroupRequest, SyncGroupRequest,
DEFAULT_GENERATION_ID, UNKNOWN_MEMBER_ID, GroupMember,
DEFAULT_GENERATION_ID, UNKNOWN_MEMBER_ID,
)
from kafka.util import Timer

Expand Down Expand Up @@ -215,7 +215,7 @@ def _perform_assignment(self, leader_id, protocol, members):
Arguments:
leader_id (str): The id of the leader (which is this member)
protocol (str): the chosen group protocol (assignment strategy)
members (list): [GroupMember] from JoinGroupResponse.
members (list): [JoinGroupResponseMember] from JoinGroupResponse.
metadata_bytes are associated with the chosen group protocol,
and the Coordinator subclass is responsible for decoding
metadata_bytes based on that protocol.
Expand Down Expand Up @@ -697,14 +697,9 @@ def _on_join_leader(self, response):
Future: resolves to member assignment encoded-bytes
"""
try:
members = [GroupMember(
member_id=member[0],
group_instance_id=member[1] if response.API_VERSION >= 5 else None,
metadata=member[2] if response.API_VERSION >= 5 else member[1])
for member in response.members]
group_assignment = self._perform_assignment(response.leader,
response.protocol_name,
members)
response.members)
except Exception as e:
return Future().failure(e)

Expand Down
2 changes: 1 addition & 1 deletion kafka/coordinator/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from kafka.future import Future
from kafka.metrics import AnonMeasurable
from kafka.metrics.stats import Avg, Count, Max, Rate
from kafka.protocol.commit import OffsetCommitRequest, OffsetFetchRequest
from kafka.protocol.new.consumer import OffsetCommitRequest, OffsetFetchRequest
from kafka.structs import OffsetAndMetadata, TopicPartition
from kafka.util import Timer, WeakMethod

Expand Down
58 changes: 30 additions & 28 deletions test/test_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@
import pytest

from kafka.client_async import KafkaClient
from kafka.consumer.subscription_state import (
SubscriptionState, ConsumerRebalanceListener)
from kafka.consumer.subscription_state import SubscriptionState, ConsumerRebalanceListener
from kafka.coordinator.assignors.range import RangePartitionAssignor
from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor
from kafka.coordinator.assignors.sticky.sticky_assignor import StickyPartitionAssignor
Expand All @@ -17,11 +16,12 @@
import kafka.errors as Errors
from kafka.future import Future
from kafka.protocol.broker_api_versions import BROKER_API_VERSIONS
from kafka.protocol.commit import (
from kafka.protocol.new.consumer import (
OffsetCommitRequest, OffsetCommitResponse,
OffsetFetchRequest, OffsetFetchResponse)
from kafka.protocol.group import GroupMember
from kafka.protocol.metadata import MetadataResponse
OffsetFetchRequest, OffsetFetchResponse,
JoinGroupResponse,
)
from kafka.protocol.new.metadata import MetadataResponse
from kafka.structs import OffsetAndMetadata, TopicPartition
from kafka.util import WeakMethod

Expand Down Expand Up @@ -207,8 +207,10 @@ def test_perform_assignment(mocker, coordinator):

ret = coordinator._perform_assignment(
'member-foo', 'roundrobin',
[GroupMember(member, None, subscription.encode())
for member, subscription in group_subscriptions.items()])
[JoinGroupResponse.JoinGroupResponseMember(
member_id=member_id,
metadata=subscription.encode(),
) for member_id, subscription in group_subscriptions.items()])

assert RoundRobinPartitionAssignor.assign.call_count == 1
RoundRobinPartitionAssignor.assign.assert_called_with(
Expand Down Expand Up @@ -302,7 +304,7 @@ def test_close(mocker, coordinator):
assert coordinator._maybe_auto_commit_offsets_sync.call_count == 1
coordinator._handle_leave_group_response.assert_called_with('foobar')

assert coordinator.generation() is None
assert coordinator.generation_if_stable() is None
assert coordinator._generation == Generation.NO_GENERATION
assert coordinator.state is MemberState.UNJOINED
assert coordinator.rejoin_needed is True
Expand Down Expand Up @@ -444,23 +446,23 @@ def test_send_offset_commit_request_fail(mocker, patched_coord, offsets):
assert isinstance(ret.exception, Errors.CoordinatorNotAvailableError)


@pytest.mark.parametrize('api_version,req_type', [
((0, 8, 1), OffsetCommitRequest[0]),
((0, 8, 2), OffsetCommitRequest[1]),
((0, 9), OffsetCommitRequest[2]),
((0, 11), OffsetCommitRequest[3]),
((2, 0), OffsetCommitRequest[4]),
((2, 1), OffsetCommitRequest[6]),
@pytest.mark.parametrize('api_version,version', [
((0, 8, 1), 0),
((0, 8, 2), 1),
((0, 9), 2),
((0, 11), 3),
((2, 0), 4),
((2, 1), 6),
])
def test_send_offset_commit_request_versions(patched_coord, offsets,
api_version, req_type):
api_version, version):
expect_node = 0
patched_coord._client._api_versions = BROKER_API_VERSIONS[api_version]

patched_coord._send_offset_commit_request(offsets)
(node, request), _ = patched_coord._client.send.call_args
assert node == expect_node, 'Unexpected coordinator node'
assert isinstance(request, req_type)
assert request.API_VERSION == version


def test_send_offset_commit_request_failure(patched_coord, offsets):
Expand Down Expand Up @@ -557,25 +559,25 @@ def test_send_offset_fetch_request_fail(mocker, patched_coord, partitions):
assert isinstance(ret.exception, Errors.CoordinatorNotAvailableError)


@pytest.mark.parametrize('api_version,req_type', [
((0, 8, 1), OffsetFetchRequest[0]),
((0, 8, 2), OffsetFetchRequest[1]),
((0, 9), OffsetFetchRequest[1]),
((0, 10, 2), OffsetFetchRequest[2]),
((0, 11), OffsetFetchRequest[3]),
((2, 0), OffsetFetchRequest[4]),
((2, 1), OffsetFetchRequest[5]),
@pytest.mark.parametrize('api_version,version', [
((0, 8, 1), 0),
((0, 8, 2), 1),
((0, 9), 1),
((0, 10, 2), 2),
((0, 11), 3),
((2, 0), 4),
((2, 1), 5),
])
def test_send_offset_fetch_request_versions(patched_coord, partitions,
api_version, req_type):
api_version, version):
# assuming fixture sets coordinator=0, least_loaded_node=1
expect_node = 0
patched_coord._client._api_versions = BROKER_API_VERSIONS[api_version]

patched_coord._send_offset_fetch_request(partitions)
(node, request), _ = patched_coord._client.send.call_args
assert node == expect_node, 'Unexpected coordinator node'
assert isinstance(request, req_type)
assert request.API_VERSION == version


def test_send_offset_fetch_request_failure(patched_coord, partitions):
Expand Down
Loading