diff --git a/kafka/coordinator/assignors/abstract.py b/kafka/coordinator/assignors/abstract.py index 04d84d285..a14f11ee4 100644 --- a/kafka/coordinator/assignors/abstract.py +++ b/kafka/coordinator/assignors/abstract.py @@ -1,7 +1,8 @@ import abc -import logging -log = logging.getLogger(__name__) +from kafka.protocol.new.consumer.metadata import ( + ConsumerProtocolSubscription, ConsumerProtocolAssignment, +) class AbstractPartitionAssignor(object): @@ -26,7 +27,7 @@ def assign(self, cluster, members): when available. Returns: - dict: {member_id: MemberAssignment} + dict: {member_id: ConsumerProtocolAssignment} """ pass @@ -38,7 +39,7 @@ def metadata(self, topics): topics (set): a member's subscribed topics Returns: - MemberMetadata struct + ConsumerProtocolSubscription """ pass @@ -50,7 +51,7 @@ def on_assignment(self, assignment, generation): partition assignor. Arguments: - assignment (MemberAssignment): the member's assignment + assignment (ConsumerProtocolAssignment): the member's assignment generation (int): generation id of assignment """ pass diff --git a/kafka/coordinator/assignors/range.py b/kafka/coordinator/assignors/range.py index dc45f8a9a..7b1cad21e 100644 --- a/kafka/coordinator/assignors/range.py +++ b/kafka/coordinator/assignors/range.py @@ -2,8 +2,11 @@ import itertools import logging -from kafka.coordinator.assignors.abstract import AbstractPartitionAssignor -from kafka.coordinator.protocol import ConsumerProtocolMemberMetadata_v0, ConsumerProtocolMemberAssignment_v0 +from kafka.coordinator.assignors.abstract import ( + AbstractPartitionAssignor, + ConsumerProtocolSubscription, + ConsumerProtocolAssignment, +) log = logging.getLogger(__name__) @@ -63,7 +66,7 @@ def assign(cls, cluster, group_subscriptions): protocol_assignment = {} for member_id in group_subscriptions: - protocol_assignment[member_id] = ConsumerProtocolMemberAssignment_v0( + protocol_assignment[member_id] = ConsumerProtocolAssignment( cls.version, sorted(assignment[member_id].items()), b'') @@ -71,7 +74,7 @@ def assign(cls, cluster, group_subscriptions): @classmethod def metadata(cls, topics): - return ConsumerProtocolMemberMetadata_v0(cls.version, list(topics), b'') + return ConsumerProtocolSubscription(cls.version, list(topics), b'') @classmethod def on_assignment(cls, assignment, generation): diff --git a/kafka/coordinator/assignors/roundrobin.py b/kafka/coordinator/assignors/roundrobin.py index 83d98ee58..279001cf2 100644 --- a/kafka/coordinator/assignors/roundrobin.py +++ b/kafka/coordinator/assignors/roundrobin.py @@ -2,8 +2,11 @@ import itertools import logging -from kafka.coordinator.assignors.abstract import AbstractPartitionAssignor -from kafka.coordinator.protocol import ConsumerProtocolMemberMetadata_v0, ConsumerProtocolMemberAssignment_v0 +from kafka.coordinator.assignors.abstract import ( + AbstractPartitionAssignor, + ConsumerProtocolSubscription, + ConsumerProtocolAssignment, +) from kafka.structs import TopicPartition log = logging.getLogger(__name__) @@ -82,7 +85,7 @@ def assign(cls, cluster, group_subscriptions): protocol_assignment = {} for member_id in group_subscriptions: - protocol_assignment[member_id] = ConsumerProtocolMemberAssignment_v0( + protocol_assignment[member_id] = ConsumerProtocolAssignment( cls.version, sorted(assignment[member_id].items()), b'') @@ -90,7 +93,7 @@ def assign(cls, cluster, group_subscriptions): @classmethod def metadata(cls, topics): - return ConsumerProtocolMemberMetadata_v0(cls.version, list(topics), b'') + return ConsumerProtocolSubscription(cls.version, list(topics), b'') @classmethod def on_assignment(cls, assignment, generation): diff --git a/kafka/coordinator/assignors/sticky/sticky_assignor.py b/kafka/coordinator/assignors/sticky/sticky_assignor.py index 158a21da7..2fd2af964 100644 --- a/kafka/coordinator/assignors/sticky/sticky_assignor.py +++ b/kafka/coordinator/assignors/sticky/sticky_assignor.py @@ -2,12 +2,14 @@ from collections import defaultdict, namedtuple from copy import deepcopy -from kafka.coordinator.assignors.abstract import AbstractPartitionAssignor -from kafka.coordinator.assignors.sticky.partition_movements import PartitionMovements -from kafka.coordinator.assignors.sticky.sorted_set import SortedSet -from kafka.coordinator.protocol import ConsumerProtocolMemberMetadata_v0, ConsumerProtocolMemberAssignment_v0 -from kafka.protocol.struct import Struct -from kafka.protocol.types import Array, Int32, Schema, String +from ..abstract import AbstractPartitionAssignor +from .partition_movements import PartitionMovements +from .sorted_set import SortedSet +from .user_data import StickyAssignorUserData +from kafka.protocol.new.consumer.metadata import ( + ConsumerProtocolSubscription, + ConsumerProtocolAssignment, +) from kafka.structs import TopicPartition log = logging.getLogger(__name__) @@ -51,20 +53,6 @@ def remove_if_present(collection, element): ["subscription", "partitions", "generation"]) -class StickyAssignorUserDataV1(Struct): - """ - Used for preserving consumer's previously assigned partitions - list and sending it as user data to the leader during a rebalance - """ - - SCHEMA = Schema( - ("previous_assignment", Array( - ("topic", String("utf-8")), - ("partitions", Array(Int32)))), - ("generation", Int32) - ) - - class StickyAssignmentExecutor: def __init__(self, cluster, members): # a mapping of member_id => StickyAssignorMemberMetadataV1 @@ -605,7 +593,7 @@ def assign(cls, cluster, members): assignment = {} for member_id in members: - assignment[member_id] = ConsumerProtocolMemberAssignment_v0( + assignment[member_id] = ConsumerProtocolAssignment( cls.version, sorted(executor.get_final_assignment(member_id)), b'' ) return assignment @@ -631,7 +619,7 @@ def parse_member_metadata(cls, metadata): ) try: - decoded_user_data = StickyAssignorUserDataV1.decode(user_data) + decoded_user_data = StickyAssignorUserData.decode(user_data) except Exception: # ignore the consumer's previous assignment if it cannot be parsed log.exception("Could not parse member data") @@ -661,9 +649,9 @@ def _metadata(cls, topics, member_assignment_partitions, generation=-1): partitions_by_topic = defaultdict(list) for topic_partition in member_assignment_partitions: partitions_by_topic[topic_partition.topic].append(topic_partition.partition) - data = StickyAssignorUserDataV1(list(partitions_by_topic.items()), generation) + data = StickyAssignorUserData(list(partitions_by_topic.items()), generation) user_data = data.encode() - return ConsumerProtocolMemberMetadata_v0(cls.version, list(topics), user_data) + return ConsumerProtocolSubscription(cls.version, list(topics), user_data) @classmethod def on_assignment(cls, assignment, generation): diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index d87f26c7e..be3975d24 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -8,7 +8,9 @@ from kafka.coordinator.assignors.range import RangePartitionAssignor from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor from kafka.coordinator.assignors.sticky.sticky_assignor import StickyPartitionAssignor -from kafka.coordinator.protocol import ConsumerProtocol +from kafka.protocol.new.consumer.metadata import ( + ConsumerProtocolType, ConsumerProtocolSubscription, ConsumerProtocolAssignment, +) from kafka.coordinator.subscription import Subscription import kafka.errors as Errors from kafka.future import Future @@ -147,7 +149,7 @@ def __del__(self): super(ConsumerCoordinator, self).__del__() def protocol_type(self): - return ConsumerProtocol[0].PROTOCOL_TYPE + return ConsumerProtocolType def group_protocols(self): """Returns list of preferred (protocols, metadata)""" @@ -235,7 +237,7 @@ def _on_join_complete(self, generation, member_id, protocol, assignor = self._lookup_assignor(protocol) assert assignor, 'Coordinator selected invalid assignment protocol: %s' % (protocol,) - assignment = ConsumerProtocol[0].ASSIGNMENT.decode(member_assignment_bytes) + assignment = ConsumerProtocolAssignment.decode(member_assignment_bytes) try: self._subscription.assign_from_subscribed(assignment.partitions()) @@ -332,7 +334,7 @@ def _perform_assignment(self, leader_id, assignment_strategy, members): all_subscribed_topics = set() for member in members: subscription = Subscription( - ConsumerProtocol[0].METADATA.decode(member.metadata), + ConsumerProtocolSubscription.decode(member.metadata), member.group_instance_id ) member_subscriptions[member.member_id] = subscription diff --git a/test/test_assignors.py b/test/test_assignors.py index c04d6afb6..189885a84 100644 --- a/test/test_assignors.py +++ b/test/test_assignors.py @@ -6,10 +6,10 @@ import pytest from kafka.structs import TopicPartition +from kafka.coordinator.assignors.abstract import ConsumerProtocolAssignment from kafka.coordinator.assignors.range import RangePartitionAssignor from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor from kafka.coordinator.assignors.sticky.sticky_assignor import StickyPartitionAssignor -from kafka.coordinator.protocol import ConsumerProtocolMemberAssignment_v0 from kafka.coordinator.subscription import Subscription @@ -41,9 +41,9 @@ def test_assignor_roundrobin(mocker): cluster = create_cluster(mocker, {'t0', 't1'}, topics_partitions={0, 1, 2}) ret = assignor.assign(cluster, group_subscriptions) expected = { - 'C0': ConsumerProtocolMemberAssignment_v0( + 'C0': ConsumerProtocolAssignment( assignor.version, [('t0', [0, 2]), ('t1', [1])], b''), - 'C1': ConsumerProtocolMemberAssignment_v0( + 'C1': ConsumerProtocolAssignment( assignor.version, [('t0', [1]), ('t1', [0, 2])], b'') } assert ret == expected @@ -63,9 +63,9 @@ def test_assignor_range(mocker): cluster = create_cluster(mocker, {'t0', 't1'}, topics_partitions={0, 1, 2}) ret = assignor.assign(cluster, group_subscriptions) expected = { - 'C0': ConsumerProtocolMemberAssignment_v0( + 'C0': ConsumerProtocolAssignment( assignor.version, [('t0', [0, 1]), ('t1', [0, 1])], b''), - 'C1': ConsumerProtocolMemberAssignment_v0( + 'C1': ConsumerProtocolAssignment( assignor.version, [('t0', [2]), ('t1', [2])], b'') } assert ret == expected @@ -101,9 +101,9 @@ def test_sticky_assignor1(mocker): sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata) expected_assignment = { - 'C0': ConsumerProtocolMemberAssignment_v0(StickyPartitionAssignor.version, [('t0', [0]), ('t1', [1]), ('t3', [0])], b''), - 'C1': ConsumerProtocolMemberAssignment_v0(StickyPartitionAssignor.version, [('t0', [1]), ('t2', [0]), ('t3', [1])], b''), - 'C2': ConsumerProtocolMemberAssignment_v0(StickyPartitionAssignor.version, [('t1', [0]), ('t2', [1])], b''), + 'C0': ConsumerProtocolAssignment(StickyPartitionAssignor.version, [('t0', [0]), ('t1', [1]), ('t3', [0])], b''), + 'C1': ConsumerProtocolAssignment(StickyPartitionAssignor.version, [('t0', [1]), ('t2', [0]), ('t3', [1])], b''), + 'C2': ConsumerProtocolAssignment(StickyPartitionAssignor.version, [('t1', [0]), ('t2', [1])], b''), } assert_assignment(sticky_assignment, expected_assignment) @@ -114,10 +114,10 @@ def test_sticky_assignor1(mocker): sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata) expected_assignment = { - 'C0': ConsumerProtocolMemberAssignment_v0( + 'C0': ConsumerProtocolAssignment( StickyPartitionAssignor.version, [('t0', [0]), ('t1', [1]), ('t2', [0]), ('t3', [0])], b'' ), - 'C2': ConsumerProtocolMemberAssignment_v0( + 'C2': ConsumerProtocolAssignment( StickyPartitionAssignor.version, [('t0', [1]), ('t1', [0]), ('t2', [1]), ('t3', [1])], b'' ), } @@ -157,9 +157,9 @@ def test_sticky_assignor2(mocker): sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata) expected_assignment = { - 'C0': ConsumerProtocolMemberAssignment_v0(StickyPartitionAssignor.version, [('t0', [0])], b''), - 'C1': ConsumerProtocolMemberAssignment_v0(StickyPartitionAssignor.version, [('t1', [0, 1])], b''), - 'C2': ConsumerProtocolMemberAssignment_v0(StickyPartitionAssignor.version, [('t2', [0, 1, 2])], b''), + 'C0': ConsumerProtocolAssignment(StickyPartitionAssignor.version, [('t0', [0])], b''), + 'C1': ConsumerProtocolAssignment(StickyPartitionAssignor.version, [('t1', [0, 1])], b''), + 'C2': ConsumerProtocolAssignment(StickyPartitionAssignor.version, [('t2', [0, 1, 2])], b''), } assert_assignment(sticky_assignment, expected_assignment) @@ -170,8 +170,8 @@ def test_sticky_assignor2(mocker): sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata) expected_assignment = { - 'C1': ConsumerProtocolMemberAssignment_v0(StickyPartitionAssignor.version, [('t0', [0]), ('t1', [0, 1])], b''), - 'C2': ConsumerProtocolMemberAssignment_v0(StickyPartitionAssignor.version, [('t2', [0, 1, 2])], b''), + 'C1': ConsumerProtocolAssignment(StickyPartitionAssignor.version, [('t0', [0]), ('t1', [0, 1])], b''), + 'C2': ConsumerProtocolAssignment(StickyPartitionAssignor.version, [('t2', [0, 1, 2])], b''), } assert_assignment(sticky_assignment, expected_assignment) @@ -186,7 +186,7 @@ def test_sticky_one_consumer_no_topic(mocker): sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata) expected_assignment = { - 'C': ConsumerProtocolMemberAssignment_v0(StickyPartitionAssignor.version, [], b''), + 'C': ConsumerProtocolAssignment(StickyPartitionAssignor.version, [], b''), } assert_assignment(sticky_assignment, expected_assignment) @@ -201,7 +201,7 @@ def test_sticky_one_consumer_nonexisting_topic(mocker): sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata) expected_assignment = { - 'C': ConsumerProtocolMemberAssignment_v0(StickyPartitionAssignor.version, [], b''), + 'C': ConsumerProtocolAssignment(StickyPartitionAssignor.version, [], b''), } assert_assignment(sticky_assignment, expected_assignment) @@ -216,7 +216,7 @@ def test_sticky_one_consumer_one_topic(mocker): sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata) expected_assignment = { - 'C': ConsumerProtocolMemberAssignment_v0(StickyPartitionAssignor.version, [('t', [0, 1, 2])], b''), + 'C': ConsumerProtocolAssignment(StickyPartitionAssignor.version, [('t', [0, 1, 2])], b''), } assert_assignment(sticky_assignment, expected_assignment) @@ -231,7 +231,7 @@ def test_sticky_should_only_assign_partitions_from_subscribed_topics(mocker): sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata) expected_assignment = { - 'C': ConsumerProtocolMemberAssignment_v0(StickyPartitionAssignor.version, [('t', [0, 1, 2])], b''), + 'C': ConsumerProtocolAssignment(StickyPartitionAssignor.version, [('t', [0, 1, 2])], b''), } assert_assignment(sticky_assignment, expected_assignment) @@ -246,7 +246,7 @@ def test_sticky_one_consumer_multiple_topics(mocker): sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata) expected_assignment = { - 'C': ConsumerProtocolMemberAssignment_v0(StickyPartitionAssignor.version, [('t1', [0, 1, 2]), ('t2', [0, 1, 2])], b''), + 'C': ConsumerProtocolAssignment(StickyPartitionAssignor.version, [('t1', [0, 1, 2]), ('t2', [0, 1, 2])], b''), } assert_assignment(sticky_assignment, expected_assignment) @@ -262,8 +262,8 @@ def test_sticky_two_consumers_one_topic_one_partition(mocker): sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata) expected_assignment = { - 'C1': ConsumerProtocolMemberAssignment_v0(StickyPartitionAssignor.version, [('t', [0])], b''), - 'C2': ConsumerProtocolMemberAssignment_v0(StickyPartitionAssignor.version, [], b''), + 'C1': ConsumerProtocolAssignment(StickyPartitionAssignor.version, [('t', [0])], b''), + 'C2': ConsumerProtocolAssignment(StickyPartitionAssignor.version, [], b''), } assert_assignment(sticky_assignment, expected_assignment) @@ -279,8 +279,8 @@ def test_sticky_two_consumers_one_topic_two_partitions(mocker): sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata) expected_assignment = { - 'C1': ConsumerProtocolMemberAssignment_v0(StickyPartitionAssignor.version, [('t', [0])], b''), - 'C2': ConsumerProtocolMemberAssignment_v0(StickyPartitionAssignor.version, [('t', [1])], b''), + 'C1': ConsumerProtocolAssignment(StickyPartitionAssignor.version, [('t', [0])], b''), + 'C2': ConsumerProtocolAssignment(StickyPartitionAssignor.version, [('t', [1])], b''), } assert_assignment(sticky_assignment, expected_assignment) @@ -298,9 +298,9 @@ def test_sticky_multiple_consumers_mixed_topic_subscriptions(mocker): sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata) expected_assignment = { - 'C1': ConsumerProtocolMemberAssignment_v0(StickyPartitionAssignor.version, [('t1', [0, 2])], b''), - 'C2': ConsumerProtocolMemberAssignment_v0(StickyPartitionAssignor.version, [('t2', [0, 1])], b''), - 'C3': ConsumerProtocolMemberAssignment_v0(StickyPartitionAssignor.version, [('t1', [1])], b''), + 'C1': ConsumerProtocolAssignment(StickyPartitionAssignor.version, [('t1', [0, 2])], b''), + 'C2': ConsumerProtocolAssignment(StickyPartitionAssignor.version, [('t2', [0, 1])], b''), + 'C3': ConsumerProtocolAssignment(StickyPartitionAssignor.version, [('t1', [1])], b''), } assert_assignment(sticky_assignment, expected_assignment) @@ -315,7 +315,7 @@ def test_sticky_add_remove_consumer_one_topic(mocker): assignment = StickyPartitionAssignor.assign(cluster, member_metadata) expected_assignment = { - 'C1': ConsumerProtocolMemberAssignment_v0(StickyPartitionAssignor.version, [('t', [0, 1, 2])], b''), + 'C1': ConsumerProtocolAssignment(StickyPartitionAssignor.version, [('t', [0, 1, 2])], b''), } assert_assignment(assignment, expected_assignment) @@ -355,8 +355,8 @@ def test_sticky_add_remove_topic_two_consumers(mocker): sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata) expected_assignment = { - 'C1': ConsumerProtocolMemberAssignment_v0(StickyPartitionAssignor.version, [('t1', [0, 2])], b''), - 'C2': ConsumerProtocolMemberAssignment_v0(StickyPartitionAssignor.version, [('t1', [1])], b''), + 'C1': ConsumerProtocolAssignment(StickyPartitionAssignor.version, [('t1', [0, 2])], b''), + 'C2': ConsumerProtocolAssignment(StickyPartitionAssignor.version, [('t1', [1])], b''), } assert_assignment(sticky_assignment, expected_assignment) @@ -370,8 +370,8 @@ def test_sticky_add_remove_topic_two_consumers(mocker): sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata) expected_assignment = { - 'C1': ConsumerProtocolMemberAssignment_v0(StickyPartitionAssignor.version, [('t1', [0, 2]), ('t2', [1])], b''), - 'C2': ConsumerProtocolMemberAssignment_v0(StickyPartitionAssignor.version, [('t1', [1]), ('t2', [0, 2])], b''), + 'C1': ConsumerProtocolAssignment(StickyPartitionAssignor.version, [('t1', [0, 2]), ('t2', [1])], b''), + 'C2': ConsumerProtocolAssignment(StickyPartitionAssignor.version, [('t1', [1]), ('t2', [0, 2])], b''), } assert_assignment(sticky_assignment, expected_assignment) @@ -385,8 +385,8 @@ def test_sticky_add_remove_topic_two_consumers(mocker): sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata) expected_assignment = { - 'C1': ConsumerProtocolMemberAssignment_v0(StickyPartitionAssignor.version, [('t2', [1])], b''), - 'C2': ConsumerProtocolMemberAssignment_v0(StickyPartitionAssignor.version, [('t2', [0, 2])], b''), + 'C1': ConsumerProtocolAssignment(StickyPartitionAssignor.version, [('t2', [1])], b''), + 'C2': ConsumerProtocolAssignment(StickyPartitionAssignor.version, [('t2', [0, 2])], b''), } assert_assignment(sticky_assignment, expected_assignment) @@ -600,7 +600,7 @@ def topic_partitions(topic): sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata) expected_assignment = { - 'C': ConsumerProtocolMemberAssignment_v0(StickyPartitionAssignor.version, [('t1', [0]), ('t3', list(range(100)))], b''), + 'C': ConsumerProtocolAssignment(StickyPartitionAssignor.version, [('t1', [0]), ('t3', list(range(100)))], b''), } assert_assignment(sticky_assignment, expected_assignment) @@ -615,7 +615,7 @@ def test_no_exceptions_when_only_subscribed_topic_is_deleted(mocker): sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata) expected_assignment = { - 'C': ConsumerProtocolMemberAssignment_v0(StickyPartitionAssignor.version, [('t', [0, 1, 2])], b''), + 'C': ConsumerProtocolAssignment(StickyPartitionAssignor.version, [('t', [0, 1, 2])], b''), } assert_assignment(sticky_assignment, expected_assignment) @@ -629,7 +629,7 @@ def test_no_exceptions_when_only_subscribed_topic_is_deleted(mocker): cluster = create_cluster(mocker, topics={}, topics_partitions={}) sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata) expected_assignment = { - 'C': ConsumerProtocolMemberAssignment_v0(StickyPartitionAssignor.version, [], b''), + 'C': ConsumerProtocolAssignment(StickyPartitionAssignor.version, [], b''), } assert_assignment(sticky_assignment, expected_assignment) diff --git a/test/test_coordinator.py b/test/test_coordinator.py index 4d56c1e8b..54d300c34 100644 --- a/test/test_coordinator.py +++ b/test/test_coordinator.py @@ -5,13 +5,14 @@ from kafka.client_async import KafkaClient from kafka.consumer.subscription_state import SubscriptionState, ConsumerRebalanceListener +from kafka.coordinator.assignors.abstract import ( + ConsumerProtocolSubscription, ConsumerProtocolAssignment, +) from kafka.coordinator.assignors.range import RangePartitionAssignor from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor from kafka.coordinator.assignors.sticky.sticky_assignor import StickyPartitionAssignor from kafka.coordinator.base import Generation, MemberState, HeartbeatThread from kafka.coordinator.consumer import ConsumerCoordinator -from kafka.coordinator.protocol import ( - ConsumerProtocolMemberMetadata_v0, ConsumerProtocolMemberAssignment_v0) from kafka.coordinator.subscription import Subscription import kafka.errors as Errors from kafka.future import Future @@ -74,15 +75,15 @@ def test_group_protocols(coordinator): coordinator._subscription.subscribe(topics=['foobar']) assert coordinator.group_protocols() == [ - ('range', ConsumerProtocolMemberMetadata_v0( + ('range', ConsumerProtocolSubscription( RangePartitionAssignor.version, ['foobar'], b'')), - ('roundrobin', ConsumerProtocolMemberMetadata_v0( + ('roundrobin', ConsumerProtocolSubscription( RoundRobinPartitionAssignor.version, ['foobar'], b'')), - ('sticky', ConsumerProtocolMemberMetadata_v0( + ('sticky', ConsumerProtocolSubscription( StickyPartitionAssignor.version, ['foobar'], b'')), @@ -135,7 +136,7 @@ def test_join_complete(mocker, coordinator): coordinator.config['assignors'] = (assignor,) mocker.spy(assignor, 'on_assignment') assert assignor.on_assignment.call_count == 0 - assignment = ConsumerProtocolMemberAssignment_v0(0, [('foobar', [0, 1])], b'') + assignment = ConsumerProtocolAssignment(0, [('foobar', [0, 1])], b'') generation = 12 coordinator._on_join_complete(generation, 'member-foo', 'roundrobin', assignment.encode()) assert assignor.on_assignment.call_count == 1 @@ -149,7 +150,7 @@ def test_join_complete_with_sticky_assignor(mocker, coordinator): mocker.spy(assignor, 'on_assignment') assert assignor.on_assignment.call_count == 0 generation = 3 - assignment = ConsumerProtocolMemberAssignment_v0(0, [('foobar', [0, 1])], b'') + assignment = ConsumerProtocolAssignment(0, [('foobar', [0, 1])], b'') coordinator._on_join_complete(generation, 'member-foo', 'sticky', assignment.encode()) assert assignor.on_assignment.call_count == 1 assignor.on_assignment.assert_called_with(assignment, generation) @@ -165,7 +166,7 @@ def test_subscription_listener(mocker, coordinator): assert listener.on_partitions_revoked.call_count == 1 listener.on_partitions_revoked.assert_called_with(set([])) - assignment = ConsumerProtocolMemberAssignment_v0(0, [('foobar', [0, 1])], b'') + assignment = ConsumerProtocolAssignment(0, [('foobar', [0, 1])], b'') coordinator._on_join_complete( 0, 'member-foo', 'roundrobin', assignment.encode()) assert listener.on_partitions_assigned.call_count == 1 @@ -183,7 +184,7 @@ def test_subscription_listener_failure(mocker, coordinator): coordinator._on_join_prepare(0, 'member-foo') assert listener.on_partitions_revoked.call_count == 1 - assignment = ConsumerProtocolMemberAssignment_v0(0, [('foobar', [0, 1])], b'') + assignment = ConsumerProtocolAssignment(0, [('foobar', [0, 1])], b'') coordinator._on_join_complete( 0, 'member-foo', 'roundrobin', assignment.encode()) assert listener.on_partitions_assigned.call_count == 1 @@ -192,13 +193,13 @@ def test_subscription_listener_failure(mocker, coordinator): def test_perform_assignment(mocker, coordinator): coordinator._subscription.subscribe(topics=['foo1']) group_subscriptions = { - 'member-foo': Subscription(ConsumerProtocolMemberMetadata_v0(0, ['foo1'], b''), None), - 'member-bar': Subscription(ConsumerProtocolMemberMetadata_v0(0, ['foo1'], b''), None), + 'member-foo': Subscription(ConsumerProtocolSubscription(0, ['foo1'], b''), None), + 'member-bar': Subscription(ConsumerProtocolSubscription(0, ['foo1'], b''), None), } assignments = { - 'member-foo': ConsumerProtocolMemberAssignment_v0( + 'member-foo': ConsumerProtocolAssignment( 0, [('foo1', [0])], b''), - 'member-bar': ConsumerProtocolMemberAssignment_v0( + 'member-bar': ConsumerProtocolAssignment( 0, [('foo1', [1])], b'') }