From 0153569547af11c3f73b860638a4974c83916d0e Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 22 Mar 2026 10:31:22 -0700 Subject: [PATCH 1/3] Use assignor instances, not classes --- .../assignors/sticky/sticky_assignor.py | 33 +- kafka/coordinator/consumer.py | 21 +- test/test_assignors.py | 300 +++++++++--------- 3 files changed, 175 insertions(+), 179 deletions(-) diff --git a/kafka/coordinator/assignors/sticky/sticky_assignor.py b/kafka/coordinator/assignors/sticky/sticky_assignor.py index d270dcd94..97903ce93 100644 --- a/kafka/coordinator/assignors/sticky/sticky_assignor.py +++ b/kafka/coordinator/assignors/sticky/sticky_assignor.py @@ -565,13 +565,12 @@ class StickyPartitionAssignor(AbstractPartitionAssignor): name = "sticky" version = 0 - member_assignment = None - generation = DEFAULT_GENERATION_ID + def __init__(self): + self.member_assignment = None + self.generation = self.DEFAULT_GENERATION_ID + self._latest_partition_movements = None - _latest_partition_movements = None - - @classmethod - def assign(cls, cluster, members): + def assign(self, cluster, members): """Performs group assignment given cluster metadata and member subscriptions Arguments: @@ -582,23 +581,23 @@ def assign(cls, cluster, members): dict: {member_id: ConsumerProtocolAssignment} """ members_metadata = { - member.member_id: cls.parse_member_metadata(member.metadata) + member.member_id: self.parse_member_metadata(member.metadata) for member in members } executor = StickyAssignmentExecutor(cluster, members_metadata) executor.perform_initial_assignment() executor.balance() - cls._latest_partition_movements = executor.partition_movements + # store for tests + self._latest_partition_movements = executor.partition_movements assignment = { member.member_id: ConsumerProtocolAssignment( - cls.version, sorted(executor.get_final_assignment(member.member_id)), b'') + self.version, sorted(executor.get_final_assignment(member.member_id)), b'') for member in members } return assignment - @classmethod def parse_member_metadata(cls, metadata): """ Parses member metadata into a python object. @@ -635,9 +634,8 @@ def parse_member_metadata(cls, metadata): partitions=member_partitions, generation=decoded_user_data.generation, subscription=metadata.topics ) - @classmethod - def metadata(cls, topics): - return cls._metadata(topics, cls.member_assignment, cls.generation) + def metadata(self, topics): + return self._metadata(topics, self.member_assignment, self.generation) @classmethod def _metadata(cls, topics, member_assignment_partitions, generation=-1): @@ -645,7 +643,7 @@ def _metadata(cls, topics, member_assignment_partitions, generation=-1): log.debug("No member assignment available") user_data = b'' else: - log.debug("Member assignment is available, generating the metadata: generation {}".format(cls.generation)) + log.debug("Member assignment is available, generating the metadata: generation {}".format(generation)) partitions_by_topic = defaultdict(list) for topic_partition in member_assignment_partitions: partitions_by_topic[topic_partition.topic].append(topic_partition.partition) @@ -653,13 +651,12 @@ def _metadata(cls, topics, member_assignment_partitions, generation=-1): user_data = data.encode() return ConsumerProtocolSubscription(cls.version, list(topics), user_data) - @classmethod - def on_assignment(cls, assignment, generation): + def on_assignment(self, assignment, generation): """Callback that runs on each assignment. Updates assignor's state. Arguments: assignment: MemberAssignment """ log.debug(f"On assignment: assignment={assignment}, generation={generation}") - cls.member_assignment = assignment.partitions() - cls.generation = generation + self.member_assignment = assignment.partitions() + self.generation = generation diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index 49cb97ede..c2cae49d9 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -136,6 +136,10 @@ def __init__(self, client, subscription, **configs): else: self._consumer_sensors = None + self._assignors = {} + for klass in self._config['assignors']: + assignor = klass() + self._assignors[assignor.name] = assignor self._cluster.request_update() self._cluster.add_listener(WeakMethod(self._handle_metadata_update)) @@ -166,9 +170,9 @@ def group_protocols(self): # best I've got for now. self._joined_subscription = set(self._subscription.subscription) metadata_list = [] - for assignor in self.config['assignors']: - metadata = assignor.metadata(self._joined_subscription) - group_protocol = (assignor.name, metadata) + for assignor in self._assignors: + metadata = self._assignors[assignor].metadata(self._joined_subscription) + group_protocol = (assignor, metadata) metadata_list.append(group_protocol) return metadata_list @@ -221,10 +225,7 @@ def _build_metadata_snapshot(self, subscription, cluster): return metadata_snapshot def _lookup_assignor(self, name): - for assignor in self.config['assignors']: - if assignor.name == name: - return assignor - return None + return self._assignors.get(name, None) def _on_join_complete(self, generation, member_id, protocol, member_assignment_bytes): @@ -326,9 +327,9 @@ def time_to_next_poll(self): return min(self.next_auto_commit_deadline - time.monotonic(), self.time_to_next_heartbeat()) - def _perform_assignment(self, leader_id, assignment_strategy, members): - assignor = self._lookup_assignor(assignment_strategy) - assert assignor, 'Invalid assignment protocol: %s' % (assignment_strategy,) + def _perform_assignment(self, leader_id, protocol_name, members): + assignor = self._lookup_assignor(protocol_name) + assert assignor, 'Invalid assignment protocol: %s' % (protocol_name,) all_subscribed_topics = set() for member in members: member.metadata = ConsumerProtocolSubscription.decode(member.metadata) diff --git a/test/test_assignors.py b/test/test_assignors.py index 808d69ff9..1bc920801 100644 --- a/test/test_assignors.py +++ b/test/test_assignors.py @@ -13,11 +13,9 @@ from kafka.protocol.new.consumer import JoinGroupResponse -@pytest.fixture(autouse=True) -def reset_sticky_assignor(): - yield - StickyPartitionAssignor.member_assignment = None - StickyPartitionAssignor.generation = -1 +@pytest.fixture() +def sticky_assignor(): + yield StickyPartitionAssignor() def create_cluster(mocker, topics, topics_partitions=None, topic_partitions_lambda=None): @@ -94,7 +92,7 @@ def make_member_metadata(subscriptions): ] -def test_sticky_assignor1(mocker): +def test_sticky_assignor1(mocker, sticky_assignor): """ Given: there are three consumers C0, C1, C2, four topics t0, t1, t2, t3, and each topic has 2 partitions, @@ -119,32 +117,32 @@ def test_sticky_assignor1(mocker): } member_metadata = make_member_metadata(subscriptions) - sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata) + sticky_assignment = sticky_assignor.assign(cluster, member_metadata) expected_assignment = { - 'C0': ConsumerProtocolAssignment(StickyPartitionAssignor.version, [('t0', [0]), ('t1', [1]), ('t3', [0])], b''), - 'C1': ConsumerProtocolAssignment(StickyPartitionAssignor.version, [('t0', [1]), ('t2', [0]), ('t3', [1])], b''), - 'C2': ConsumerProtocolAssignment(StickyPartitionAssignor.version, [('t1', [0]), ('t2', [1])], b''), + 'C0': ConsumerProtocolAssignment(sticky_assignor.version, [('t0', [0]), ('t1', [1]), ('t3', [0])], b''), + 'C1': ConsumerProtocolAssignment(sticky_assignor.version, [('t0', [1]), ('t2', [0]), ('t3', [1])], b''), + 'C2': ConsumerProtocolAssignment(sticky_assignor.version, [('t1', [0]), ('t2', [1])], b''), } assert_assignment(sticky_assignment, expected_assignment) del subscriptions['C1'] member_metadata = {} for member, topics in subscriptions.items(): - member_metadata[member] = StickyPartitionAssignor._metadata(topics, sticky_assignment[member].partitions()) + member_metadata[member] = sticky_assignor._metadata(topics, sticky_assignment[member].partitions()) - sticky_assignment = StickyPartitionAssignor.assign(cluster, make_join_group_response_members(member_metadata)) + sticky_assignment = sticky_assignor.assign(cluster, make_join_group_response_members(member_metadata)) expected_assignment = { 'C0': ConsumerProtocolAssignment( - StickyPartitionAssignor.version, [('t0', [0]), ('t1', [1]), ('t2', [0]), ('t3', [0])], b'' + sticky_assignor.version, [('t0', [0]), ('t1', [1]), ('t2', [0]), ('t3', [0])], b'' ), 'C2': ConsumerProtocolAssignment( - StickyPartitionAssignor.version, [('t0', [1]), ('t1', [0]), ('t2', [1]), ('t3', [1])], b'' + sticky_assignor.version, [('t0', [1]), ('t1', [0]), ('t2', [1]), ('t3', [1])], b'' ), } assert_assignment(sticky_assignment, expected_assignment) -def test_sticky_assignor2(mocker): +def test_sticky_assignor2(mocker, sticky_assignor): """ Given: there are three consumers C0, C1, C2, and three topics t0, t1, t2, with 1, 2, and 3 partitions respectively. @@ -173,30 +171,30 @@ def test_sticky_assignor2(mocker): } member_metadata = {} for member, topics in subscriptions.items(): - member_metadata[member] = StickyPartitionAssignor._metadata(topics, []) + member_metadata[member] = sticky_assignor._metadata(topics, []) - sticky_assignment = StickyPartitionAssignor.assign(cluster, make_join_group_response_members(member_metadata)) + sticky_assignment = sticky_assignor.assign(cluster, make_join_group_response_members(member_metadata)) expected_assignment = { - 'C0': ConsumerProtocolAssignment(StickyPartitionAssignor.version, [('t0', [0])], b''), - 'C1': ConsumerProtocolAssignment(StickyPartitionAssignor.version, [('t1', [0, 1])], b''), - 'C2': ConsumerProtocolAssignment(StickyPartitionAssignor.version, [('t2', [0, 1, 2])], b''), + 'C0': ConsumerProtocolAssignment(sticky_assignor.version, [('t0', [0])], b''), + 'C1': ConsumerProtocolAssignment(sticky_assignor.version, [('t1', [0, 1])], b''), + 'C2': ConsumerProtocolAssignment(sticky_assignor.version, [('t2', [0, 1, 2])], b''), } assert_assignment(sticky_assignment, expected_assignment) del subscriptions['C0'] member_metadata = {} for member, topics in subscriptions.items(): - member_metadata[member] = StickyPartitionAssignor._metadata(topics, sticky_assignment[member].partitions()) + member_metadata[member] = sticky_assignor._metadata(topics, sticky_assignment[member].partitions()) - sticky_assignment = StickyPartitionAssignor.assign(cluster, make_join_group_response_members(member_metadata)) + sticky_assignment = sticky_assignor.assign(cluster, make_join_group_response_members(member_metadata)) expected_assignment = { - 'C1': ConsumerProtocolAssignment(StickyPartitionAssignor.version, [('t0', [0]), ('t1', [0, 1])], b''), - 'C2': ConsumerProtocolAssignment(StickyPartitionAssignor.version, [('t2', [0, 1, 2])], b''), + 'C1': ConsumerProtocolAssignment(sticky_assignor.version, [('t0', [0]), ('t1', [0, 1])], b''), + 'C2': ConsumerProtocolAssignment(sticky_assignor.version, [('t2', [0, 1, 2])], b''), } assert_assignment(sticky_assignment, expected_assignment) -def test_sticky_one_consumer_no_topic(mocker): +def test_sticky_one_consumer_no_topic(mocker, sticky_assignor): cluster = create_cluster(mocker, topics={}, topics_partitions={}) subscriptions = { @@ -204,14 +202,14 @@ def test_sticky_one_consumer_no_topic(mocker): } member_metadata = make_member_metadata(subscriptions) - sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata) + sticky_assignment = sticky_assignor.assign(cluster, member_metadata) expected_assignment = { - 'C': ConsumerProtocolAssignment(StickyPartitionAssignor.version, [], b''), + 'C': ConsumerProtocolAssignment(sticky_assignor.version, [], b''), } assert_assignment(sticky_assignment, expected_assignment) -def test_sticky_one_consumer_nonexisting_topic(mocker): +def test_sticky_one_consumer_nonexisting_topic(mocker, sticky_assignor): cluster = create_cluster(mocker, topics={}, topics_partitions={}) subscriptions = { @@ -219,14 +217,14 @@ def test_sticky_one_consumer_nonexisting_topic(mocker): } member_metadata = make_member_metadata(subscriptions) - sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata) + sticky_assignment = sticky_assignor.assign(cluster, member_metadata) expected_assignment = { - 'C': ConsumerProtocolAssignment(StickyPartitionAssignor.version, [], b''), + 'C': ConsumerProtocolAssignment(sticky_assignor.version, [], b''), } assert_assignment(sticky_assignment, expected_assignment) -def test_sticky_one_consumer_one_topic(mocker): +def test_sticky_one_consumer_one_topic(mocker, sticky_assignor): cluster = create_cluster(mocker, topics={'t'}, topics_partitions={0, 1, 2}) subscriptions = { @@ -234,14 +232,14 @@ def test_sticky_one_consumer_one_topic(mocker): } member_metadata = make_member_metadata(subscriptions) - sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata) + sticky_assignment = sticky_assignor.assign(cluster, member_metadata) expected_assignment = { - 'C': ConsumerProtocolAssignment(StickyPartitionAssignor.version, [('t', [0, 1, 2])], b''), + 'C': ConsumerProtocolAssignment(sticky_assignor.version, [('t', [0, 1, 2])], b''), } assert_assignment(sticky_assignment, expected_assignment) -def test_sticky_should_only_assign_partitions_from_subscribed_topics(mocker): +def test_sticky_should_only_assign_partitions_from_subscribed_topics(mocker, sticky_assignor): cluster = create_cluster(mocker, topics={'t', 'other-t'}, topics_partitions={0, 1, 2}) subscriptions = { @@ -249,14 +247,14 @@ def test_sticky_should_only_assign_partitions_from_subscribed_topics(mocker): } member_metadata = make_member_metadata(subscriptions) - sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata) + sticky_assignment = sticky_assignor.assign(cluster, member_metadata) expected_assignment = { - 'C': ConsumerProtocolAssignment(StickyPartitionAssignor.version, [('t', [0, 1, 2])], b''), + 'C': ConsumerProtocolAssignment(sticky_assignor.version, [('t', [0, 1, 2])], b''), } assert_assignment(sticky_assignment, expected_assignment) -def test_sticky_one_consumer_multiple_topics(mocker): +def test_sticky_one_consumer_multiple_topics(mocker, sticky_assignor): cluster = create_cluster(mocker, topics={'t1', 't2'}, topics_partitions={0, 1, 2}) subscriptions = { @@ -264,14 +262,14 @@ def test_sticky_one_consumer_multiple_topics(mocker): } member_metadata = make_member_metadata(subscriptions) - sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata) + sticky_assignment = sticky_assignor.assign(cluster, member_metadata) expected_assignment = { - 'C': ConsumerProtocolAssignment(StickyPartitionAssignor.version, [('t1', [0, 1, 2]), ('t2', [0, 1, 2])], b''), + 'C': ConsumerProtocolAssignment(sticky_assignor.version, [('t1', [0, 1, 2]), ('t2', [0, 1, 2])], b''), } assert_assignment(sticky_assignment, expected_assignment) -def test_sticky_two_consumers_one_topic_one_partition(mocker): +def test_sticky_two_consumers_one_topic_one_partition(mocker, sticky_assignor): cluster = create_cluster(mocker, topics={'t'}, topics_partitions={0}) subscriptions = { @@ -280,15 +278,15 @@ def test_sticky_two_consumers_one_topic_one_partition(mocker): } member_metadata = make_member_metadata(subscriptions) - sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata) + sticky_assignment = sticky_assignor.assign(cluster, member_metadata) expected_assignment = { - 'C1': ConsumerProtocolAssignment(StickyPartitionAssignor.version, [('t', [0])], b''), - 'C2': ConsumerProtocolAssignment(StickyPartitionAssignor.version, [], b''), + 'C1': ConsumerProtocolAssignment(sticky_assignor.version, [('t', [0])], b''), + 'C2': ConsumerProtocolAssignment(sticky_assignor.version, [], b''), } assert_assignment(sticky_assignment, expected_assignment) -def test_sticky_two_consumers_one_topic_two_partitions(mocker): +def test_sticky_two_consumers_one_topic_two_partitions(mocker, sticky_assignor): cluster = create_cluster(mocker, topics={'t'}, topics_partitions={0, 1}) subscriptions = { @@ -297,15 +295,15 @@ def test_sticky_two_consumers_one_topic_two_partitions(mocker): } member_metadata = make_member_metadata(subscriptions) - sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata) + sticky_assignment = sticky_assignor.assign(cluster, member_metadata) expected_assignment = { - 'C1': ConsumerProtocolAssignment(StickyPartitionAssignor.version, [('t', [0])], b''), - 'C2': ConsumerProtocolAssignment(StickyPartitionAssignor.version, [('t', [1])], b''), + 'C1': ConsumerProtocolAssignment(sticky_assignor.version, [('t', [0])], b''), + 'C2': ConsumerProtocolAssignment(sticky_assignor.version, [('t', [1])], b''), } assert_assignment(sticky_assignment, expected_assignment) -def test_sticky_multiple_consumers_mixed_topic_subscriptions(mocker): +def test_sticky_multiple_consumers_mixed_topic_subscriptions(mocker, sticky_assignor): partitions = {'t1': {0, 1, 2}, 't2': {0, 1}} cluster = create_cluster(mocker, topics={'t1', 't2'}, topic_partitions_lambda=lambda t: partitions[t]) @@ -316,16 +314,16 @@ def test_sticky_multiple_consumers_mixed_topic_subscriptions(mocker): } member_metadata = make_member_metadata(subscriptions) - sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata) + sticky_assignment = sticky_assignor.assign(cluster, member_metadata) expected_assignment = { - 'C1': ConsumerProtocolAssignment(StickyPartitionAssignor.version, [('t1', [0, 2])], b''), - 'C2': ConsumerProtocolAssignment(StickyPartitionAssignor.version, [('t2', [0, 1])], b''), - 'C3': ConsumerProtocolAssignment(StickyPartitionAssignor.version, [('t1', [1])], b''), + 'C1': ConsumerProtocolAssignment(sticky_assignor.version, [('t1', [0, 2])], b''), + 'C2': ConsumerProtocolAssignment(sticky_assignor.version, [('t2', [0, 1])], b''), + 'C3': ConsumerProtocolAssignment(sticky_assignor.version, [('t1', [1])], b''), } assert_assignment(sticky_assignment, expected_assignment) -def test_sticky_add_remove_consumer_one_topic(mocker): +def test_sticky_add_remove_consumer_one_topic(mocker, sticky_assignor): cluster = create_cluster(mocker, topics={'t'}, topics_partitions={0, 1, 2}) subscriptions = { @@ -333,9 +331,9 @@ def test_sticky_add_remove_consumer_one_topic(mocker): } member_metadata = make_member_metadata(subscriptions) - assignment = StickyPartitionAssignor.assign(cluster, member_metadata) + assignment = sticky_assignor.assign(cluster, member_metadata) expected_assignment = { - 'C1': ConsumerProtocolAssignment(StickyPartitionAssignor.version, [('t', [0, 1, 2])], b''), + 'C1': ConsumerProtocolAssignment(sticky_assignor.version, [('t', [0, 1, 2])], b''), } assert_assignment(assignment, expected_assignment) @@ -345,11 +343,11 @@ def test_sticky_add_remove_consumer_one_topic(mocker): } member_metadata = {} for member, topics in subscriptions.items(): - member_metadata[member] = StickyPartitionAssignor._metadata( + member_metadata[member] = sticky_assignor._metadata( topics, assignment[member].partitions() if member in assignment else [] ) - assignment = StickyPartitionAssignor.assign(cluster, make_join_group_response_members(member_metadata)) + assignment = sticky_assignor.assign(cluster, make_join_group_response_members(member_metadata)) verify_validity_and_balance(subscriptions, assignment) subscriptions = { @@ -357,14 +355,14 @@ def test_sticky_add_remove_consumer_one_topic(mocker): } member_metadata = {} for member, topics in subscriptions.items(): - member_metadata[member] = StickyPartitionAssignor._metadata(topics, assignment[member].partitions()) + member_metadata[member] = sticky_assignor._metadata(topics, assignment[member].partitions()) - assignment = StickyPartitionAssignor.assign(cluster, make_join_group_response_members(member_metadata)) + assignment = sticky_assignor.assign(cluster, make_join_group_response_members(member_metadata)) verify_validity_and_balance(subscriptions, assignment) assert len(assignment['C2'].assigned_partitions[0][1]) == 3 -def test_sticky_add_remove_topic_two_consumers(mocker): +def test_sticky_add_remove_topic_two_consumers(mocker, sticky_assignor): cluster = create_cluster(mocker, topics={'t1', 't2'}, topics_partitions={0, 1, 2}) subscriptions = { @@ -373,10 +371,10 @@ def test_sticky_add_remove_topic_two_consumers(mocker): } member_metadata = make_member_metadata(subscriptions) - sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata) + sticky_assignment = sticky_assignor.assign(cluster, member_metadata) expected_assignment = { - 'C1': ConsumerProtocolAssignment(StickyPartitionAssignor.version, [('t1', [0, 2])], b''), - 'C2': ConsumerProtocolAssignment(StickyPartitionAssignor.version, [('t1', [1])], b''), + 'C1': ConsumerProtocolAssignment(sticky_assignor.version, [('t1', [0, 2])], b''), + 'C2': ConsumerProtocolAssignment(sticky_assignor.version, [('t1', [1])], b''), } assert_assignment(sticky_assignment, expected_assignment) @@ -386,12 +384,12 @@ def test_sticky_add_remove_topic_two_consumers(mocker): } member_metadata = {} for member, topics in subscriptions.items(): - member_metadata[member] = StickyPartitionAssignor._metadata(topics, sticky_assignment[member].partitions()) + member_metadata[member] = sticky_assignor._metadata(topics, sticky_assignment[member].partitions()) - sticky_assignment = StickyPartitionAssignor.assign(cluster, make_join_group_response_members(member_metadata)) + sticky_assignment = sticky_assignor.assign(cluster, make_join_group_response_members(member_metadata)) expected_assignment = { - 'C1': ConsumerProtocolAssignment(StickyPartitionAssignor.version, [('t1', [0, 2]), ('t2', [1])], b''), - 'C2': ConsumerProtocolAssignment(StickyPartitionAssignor.version, [('t1', [1]), ('t2', [0, 2])], b''), + 'C1': ConsumerProtocolAssignment(sticky_assignor.version, [('t1', [0, 2]), ('t2', [1])], b''), + 'C2': ConsumerProtocolAssignment(sticky_assignor.version, [('t1', [1]), ('t2', [0, 2])], b''), } assert_assignment(sticky_assignment, expected_assignment) @@ -401,17 +399,17 @@ def test_sticky_add_remove_topic_two_consumers(mocker): } member_metadata = {} for member, topics in subscriptions.items(): - member_metadata[member] = StickyPartitionAssignor._metadata(topics, sticky_assignment[member].partitions()) + member_metadata[member] = sticky_assignor._metadata(topics, sticky_assignment[member].partitions()) - sticky_assignment = StickyPartitionAssignor.assign(cluster, make_join_group_response_members(member_metadata)) + sticky_assignment = sticky_assignor.assign(cluster, make_join_group_response_members(member_metadata)) expected_assignment = { - 'C1': ConsumerProtocolAssignment(StickyPartitionAssignor.version, [('t2', [1])], b''), - 'C2': ConsumerProtocolAssignment(StickyPartitionAssignor.version, [('t2', [0, 2])], b''), + 'C1': ConsumerProtocolAssignment(sticky_assignor.version, [('t2', [1])], b''), + 'C2': ConsumerProtocolAssignment(sticky_assignor.version, [('t2', [0, 2])], b''), } assert_assignment(sticky_assignment, expected_assignment) -def test_sticky_reassignment_after_one_consumer_leaves(mocker): +def test_sticky_reassignment_after_one_consumer_leaves(mocker, sticky_assignor): partitions = dict([('t{}'.format(i), set(range(i))) for i in range(1, 20)]) cluster = create_cluster( mocker, topics=set(['t{}'.format(i) for i in range(1, 20)]), topic_partitions_lambda=lambda t: partitions[t] @@ -426,20 +424,20 @@ def test_sticky_reassignment_after_one_consumer_leaves(mocker): member_metadata = make_member_metadata(subscriptions) - assignment = StickyPartitionAssignor.assign(cluster, member_metadata) + assignment = sticky_assignor.assign(cluster, member_metadata) verify_validity_and_balance(subscriptions, assignment) del subscriptions['C10'] member_metadata = {} for member, topics in subscriptions.items(): - member_metadata[member] = StickyPartitionAssignor._metadata(topics, assignment[member].partitions()) + member_metadata[member] = sticky_assignor._metadata(topics, assignment[member].partitions()) - assignment = StickyPartitionAssignor.assign(cluster, make_join_group_response_members(member_metadata)) + assignment = sticky_assignor.assign(cluster, make_join_group_response_members(member_metadata)) verify_validity_and_balance(subscriptions, assignment) - assert StickyPartitionAssignor._latest_partition_movements.are_sticky() + assert sticky_assignor._latest_partition_movements.are_sticky() -def test_sticky_reassignment_after_one_consumer_added(mocker): +def test_sticky_reassignment_after_one_consumer_added(mocker, sticky_assignor): cluster = create_cluster(mocker, topics={'t'}, topics_partitions=set(range(20))) subscriptions = defaultdict(set) @@ -448,21 +446,21 @@ def test_sticky_reassignment_after_one_consumer_added(mocker): member_metadata = make_member_metadata(subscriptions) - assignment = StickyPartitionAssignor.assign(cluster, member_metadata) + assignment = sticky_assignor.assign(cluster, member_metadata) verify_validity_and_balance(subscriptions, assignment) subscriptions['C10'] = {'t'} member_metadata = {} for member, topics in subscriptions.items(): - member_metadata[member] = StickyPartitionAssignor._metadata( + member_metadata[member] = sticky_assignor._metadata( topics, assignment[member].partitions() if member in assignment else [] ) - assignment = StickyPartitionAssignor.assign(cluster, make_join_group_response_members(member_metadata)) + assignment = sticky_assignor.assign(cluster, make_join_group_response_members(member_metadata)) verify_validity_and_balance(subscriptions, assignment) - assert StickyPartitionAssignor._latest_partition_movements.are_sticky() + assert sticky_assignor._latest_partition_movements.are_sticky() -def test_sticky_same_subscriptions(mocker): +def test_sticky_same_subscriptions(mocker, sticky_assignor): partitions = dict([('t{}'.format(i), set(range(i))) for i in range(1, 15)]) cluster = create_cluster( mocker, topics=set(['t{}'.format(i) for i in range(1, 15)]), topic_partitions_lambda=lambda t: partitions[t] @@ -475,19 +473,19 @@ def test_sticky_same_subscriptions(mocker): member_metadata = make_member_metadata(subscriptions) - assignment = StickyPartitionAssignor.assign(cluster, member_metadata) + assignment = sticky_assignor.assign(cluster, member_metadata) verify_validity_and_balance(subscriptions, assignment) del subscriptions['C5'] member_metadata = {} for member, topics in subscriptions.items(): - member_metadata[member] = StickyPartitionAssignor._metadata(topics, assignment[member].partitions()) - assignment = StickyPartitionAssignor.assign(cluster, make_join_group_response_members(member_metadata)) + member_metadata[member] = sticky_assignor._metadata(topics, assignment[member].partitions()) + assignment = sticky_assignor.assign(cluster, make_join_group_response_members(member_metadata)) verify_validity_and_balance(subscriptions, assignment) - assert StickyPartitionAssignor._latest_partition_movements.are_sticky() + assert sticky_assignor._latest_partition_movements.are_sticky() -def test_sticky_large_assignment_with_multiple_consumers_leaving(mocker): +def test_sticky_large_assignment_with_multiple_consumers_leaving(mocker, sticky_assignor): n_topics = 40 n_consumers = 200 @@ -502,12 +500,12 @@ def test_sticky_large_assignment_with_multiple_consumers_leaving(mocker): member_metadata = make_member_metadata(subscriptions) - assignment = StickyPartitionAssignor.assign(cluster, member_metadata) + assignment = sticky_assignor.assign(cluster, member_metadata) verify_validity_and_balance(subscriptions, assignment) member_metadata = {} for member, topics in subscriptions.items(): - member_metadata[member] = StickyPartitionAssignor._metadata(topics, assignment[member].partitions()) + member_metadata[member] = sticky_assignor._metadata(topics, assignment[member].partitions()) for i in range(50): member = 'C{}'.format(randint(1, n_consumers)) @@ -515,12 +513,12 @@ def test_sticky_large_assignment_with_multiple_consumers_leaving(mocker): del subscriptions[member] del member_metadata[member] - assignment = StickyPartitionAssignor.assign(cluster, make_join_group_response_members(member_metadata)) + assignment = sticky_assignor.assign(cluster, make_join_group_response_members(member_metadata)) verify_validity_and_balance(subscriptions, assignment) - assert StickyPartitionAssignor._latest_partition_movements.are_sticky() + assert sticky_assignor._latest_partition_movements.are_sticky() -def test_new_subscription(mocker): +def test_new_subscription(mocker, sticky_assignor): cluster = create_cluster(mocker, topics={'t1', 't2', 't3', 't4'}, topics_partitions={0}) subscriptions = defaultdict(set) @@ -530,20 +528,20 @@ def test_new_subscription(mocker): member_metadata = make_member_metadata(subscriptions) - assignment = StickyPartitionAssignor.assign(cluster, member_metadata) + assignment = sticky_assignor.assign(cluster, member_metadata) verify_validity_and_balance(subscriptions, assignment) subscriptions['C0'].add('t1') member_metadata = {} for member, topics in subscriptions.items(): - member_metadata[member] = StickyPartitionAssignor._metadata(topics, []) + member_metadata[member] = sticky_assignor._metadata(topics, []) - assignment = StickyPartitionAssignor.assign(cluster, make_join_group_response_members(member_metadata)) + assignment = sticky_assignor.assign(cluster, make_join_group_response_members(member_metadata)) verify_validity_and_balance(subscriptions, assignment) - assert StickyPartitionAssignor._latest_partition_movements.are_sticky() + assert sticky_assignor._latest_partition_movements.are_sticky() -def test_move_existing_assignments(mocker): +def test_move_existing_assignments(mocker, sticky_assignor): cluster = create_cluster(mocker, topics={'t1', 't2', 't3', 't4', 't5', 't6'}, topics_partitions={0}) subscriptions = { @@ -559,13 +557,13 @@ def test_move_existing_assignments(mocker): member_metadata = {} for member, topics in subscriptions.items(): - member_metadata[member] = StickyPartitionAssignor._metadata(topics, member_assignments[member]) + member_metadata[member] = sticky_assignor._metadata(topics, member_assignments[member]) - assignment = StickyPartitionAssignor.assign(cluster, make_join_group_response_members(member_metadata)) + assignment = sticky_assignor.assign(cluster, make_join_group_response_members(member_metadata)) verify_validity_and_balance(subscriptions, assignment) -def test_stickiness(mocker): +def test_stickiness(mocker, sticky_assignor): cluster = create_cluster(mocker, topics={'t'}, topics_partitions={0, 1, 2}) subscriptions = { 'C1': {'t'}, @@ -575,7 +573,7 @@ def test_stickiness(mocker): } member_metadata = make_member_metadata(subscriptions) - assignment = StickyPartitionAssignor.assign(cluster, member_metadata) + assignment = sticky_assignor.assign(cluster, member_metadata) verify_validity_and_balance(subscriptions, assignment) partitions_assigned = {} for consumer, consumer_assignment in assignment.items(): @@ -589,11 +587,11 @@ def test_stickiness(mocker): del subscriptions['C1'] member_metadata = {} for member, topics in subscriptions.items(): - member_metadata[member] = StickyPartitionAssignor._metadata(topics, assignment[member].partitions()) + member_metadata[member] = sticky_assignor._metadata(topics, assignment[member].partitions()) - assignment = StickyPartitionAssignor.assign(cluster, make_join_group_response_members(member_metadata)) + assignment = sticky_assignor.assign(cluster, make_join_group_response_members(member_metadata)) verify_validity_and_balance(subscriptions, assignment) - assert StickyPartitionAssignor._latest_partition_movements.are_sticky() + assert sticky_assignor._latest_partition_movements.are_sticky() for consumer, consumer_assignment in assignment.items(): assert ( @@ -604,7 +602,7 @@ def test_stickiness(mocker): ), 'Stickiness was not honored for consumer {}'.format(consumer) -def test_assignment_updated_for_deleted_topic(mocker): +def test_assignment_updated_for_deleted_topic(mocker, sticky_assignor): def topic_partitions(topic): if topic == 't1': return {0} @@ -618,14 +616,14 @@ def topic_partitions(topic): } member_metadata = make_member_metadata(subscriptions) - sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata) + sticky_assignment = sticky_assignor.assign(cluster, member_metadata) expected_assignment = { - 'C': ConsumerProtocolAssignment(StickyPartitionAssignor.version, [('t1', [0]), ('t3', list(range(100)))], b''), + 'C': ConsumerProtocolAssignment(sticky_assignor.version, [('t1', [0]), ('t3', list(range(100)))], b''), } assert_assignment(sticky_assignment, expected_assignment) -def test_no_exceptions_when_only_subscribed_topic_is_deleted(mocker): +def test_no_exceptions_when_only_subscribed_topic_is_deleted(mocker, sticky_assignor): cluster = create_cluster(mocker, topics={'t'}, topics_partitions={0, 1, 2}) subscriptions = { @@ -633,9 +631,9 @@ def test_no_exceptions_when_only_subscribed_topic_is_deleted(mocker): } member_metadata = make_member_metadata(subscriptions) - sticky_assignment = StickyPartitionAssignor.assign(cluster, member_metadata) + sticky_assignment = sticky_assignor.assign(cluster, member_metadata) expected_assignment = { - 'C': ConsumerProtocolAssignment(StickyPartitionAssignor.version, [('t', [0, 1, 2])], b''), + 'C': ConsumerProtocolAssignment(sticky_assignor.version, [('t', [0, 1, 2])], b''), } assert_assignment(sticky_assignment, expected_assignment) @@ -644,17 +642,17 @@ def test_no_exceptions_when_only_subscribed_topic_is_deleted(mocker): } member_metadata = {} for member, topics in subscriptions.items(): - member_metadata[member] = StickyPartitionAssignor._metadata(topics, sticky_assignment[member].partitions()) + member_metadata[member] = sticky_assignor._metadata(topics, sticky_assignment[member].partitions()) cluster = create_cluster(mocker, topics={}, topics_partitions={}) - sticky_assignment = StickyPartitionAssignor.assign(cluster, make_join_group_response_members(member_metadata)) + sticky_assignment = sticky_assignor.assign(cluster, make_join_group_response_members(member_metadata)) expected_assignment = { - 'C': ConsumerProtocolAssignment(StickyPartitionAssignor.version, [], b''), + 'C': ConsumerProtocolAssignment(sticky_assignor.version, [], b''), } assert_assignment(sticky_assignment, expected_assignment) -def test_conflicting_previous_assignments(mocker): +def test_conflicting_previous_assignments(mocker, sticky_assignor): cluster = create_cluster(mocker, topics={'t'}, topics_partitions={0, 1}) subscriptions = { @@ -664,16 +662,16 @@ def test_conflicting_previous_assignments(mocker): member_metadata = {} for member, topics in subscriptions.items(): # assume both C1 and C2 have partition 1 assigned to them in generation 1 - member_metadata[member] = StickyPartitionAssignor._metadata(topics, [TopicPartition('t', 0), TopicPartition('t', 0)], 1) + member_metadata[member] = sticky_assignor._metadata(topics, [TopicPartition('t', 0), TopicPartition('t', 0)], 1) - assignment = StickyPartitionAssignor.assign(cluster, make_join_group_response_members(member_metadata)) + assignment = sticky_assignor.assign(cluster, make_join_group_response_members(member_metadata)) verify_validity_and_balance(subscriptions, assignment) @pytest.mark.parametrize( 'execution_number,n_topics,n_consumers', [(i, randint(10, 20), randint(20, 40)) for i in range(100)] ) -def test_reassignment_with_random_subscriptions_and_changes(mocker, execution_number, n_topics, n_consumers): +def test_reassignment_with_random_subscriptions_and_changes(mocker, sticky_assignor, execution_number, n_topics, n_consumers): all_topics = sorted(['t{}'.format(i) for i in range(1, n_topics + 1)]) partitions = dict([(t, set(range(1, i + 1))) for i, t in enumerate(all_topics)]) cluster = create_cluster(mocker, topics=all_topics, topic_partitions_lambda=lambda t: partitions[t]) @@ -685,7 +683,7 @@ def test_reassignment_with_random_subscriptions_and_changes(mocker, execution_nu member_metadata = make_member_metadata(subscriptions) - assignment = StickyPartitionAssignor.assign(cluster, member_metadata) + assignment = sticky_assignor.assign(cluster, member_metadata) verify_validity_and_balance(subscriptions, assignment) subscriptions = defaultdict(set) @@ -695,94 +693,94 @@ def test_reassignment_with_random_subscriptions_and_changes(mocker, execution_nu member_metadata = {} for member, topics in subscriptions.items(): - member_metadata[member] = StickyPartitionAssignor._metadata(topics, assignment[member].partitions()) + member_metadata[member] = sticky_assignor._metadata(topics, assignment[member].partitions()) - assignment = StickyPartitionAssignor.assign(cluster, make_join_group_response_members(member_metadata)) + assignment = sticky_assignor.assign(cluster, make_join_group_response_members(member_metadata)) verify_validity_and_balance(subscriptions, assignment) - assert StickyPartitionAssignor._latest_partition_movements.are_sticky() + assert sticky_assignor._latest_partition_movements.are_sticky() -def test_assignment_with_multiple_generations1(mocker): +def test_assignment_with_multiple_generations1(mocker, sticky_assignor): cluster = create_cluster(mocker, topics={'t'}, topics_partitions={0, 1, 2, 3, 4, 5}) member_metadata = { - 'C1': StickyPartitionAssignor._metadata({'t'}, []), - 'C2': StickyPartitionAssignor._metadata({'t'}, []), - 'C3': StickyPartitionAssignor._metadata({'t'}, []), + 'C1': sticky_assignor._metadata({'t'}, []), + 'C2': sticky_assignor._metadata({'t'}, []), + 'C3': sticky_assignor._metadata({'t'}, []), } - assignment1 = StickyPartitionAssignor.assign(cluster, make_join_group_response_members(member_metadata)) + assignment1 = sticky_assignor.assign(cluster, make_join_group_response_members(member_metadata)) verify_validity_and_balance({'C1': {'t'}, 'C2': {'t'}, 'C3': {'t'}}, assignment1) assert len(assignment1['C1'].assigned_partitions[0][1]) == 2 assert len(assignment1['C2'].assigned_partitions[0][1]) == 2 assert len(assignment1['C3'].assigned_partitions[0][1]) == 2 member_metadata = { - 'C1': StickyPartitionAssignor._metadata({'t'}, assignment1['C1'].partitions()), - 'C2': StickyPartitionAssignor._metadata({'t'}, assignment1['C2'].partitions()), + 'C1': sticky_assignor._metadata({'t'}, assignment1['C1'].partitions()), + 'C2': sticky_assignor._metadata({'t'}, assignment1['C2'].partitions()), } - assignment2 = StickyPartitionAssignor.assign(cluster, make_join_group_response_members(member_metadata)) + assignment2 = sticky_assignor.assign(cluster, make_join_group_response_members(member_metadata)) verify_validity_and_balance({'C1': {'t'}, 'C2': {'t'}}, assignment2) assert len(assignment2['C1'].assigned_partitions[0][1]) == 3 assert len(assignment2['C2'].assigned_partitions[0][1]) == 3 assert all([partition in assignment2['C1'].assigned_partitions[0][1] for partition in assignment1['C1'].assigned_partitions[0][1]]) assert all([partition in assignment2['C2'].assigned_partitions[0][1] for partition in assignment1['C2'].assigned_partitions[0][1]]) - assert StickyPartitionAssignor._latest_partition_movements.are_sticky() + assert sticky_assignor._latest_partition_movements.are_sticky() member_metadata = { - 'C2': StickyPartitionAssignor._metadata({'t'}, assignment2['C2'].partitions(), 2), - 'C3': StickyPartitionAssignor._metadata({'t'}, assignment1['C3'].partitions(), 1), + 'C2': sticky_assignor._metadata({'t'}, assignment2['C2'].partitions(), 2), + 'C3': sticky_assignor._metadata({'t'}, assignment1['C3'].partitions(), 1), } - assignment3 = StickyPartitionAssignor.assign(cluster, make_join_group_response_members(member_metadata)) + assignment3 = sticky_assignor.assign(cluster, make_join_group_response_members(member_metadata)) verify_validity_and_balance({'C2': {'t'}, 'C3': {'t'}}, assignment3) assert len(assignment3['C2'].assigned_partitions[0][1]) == 3 assert len(assignment3['C3'].assigned_partitions[0][1]) == 3 - assert StickyPartitionAssignor._latest_partition_movements.are_sticky() + assert sticky_assignor._latest_partition_movements.are_sticky() -def test_assignment_with_multiple_generations2(mocker): +def test_assignment_with_multiple_generations2(mocker, sticky_assignor): cluster = create_cluster(mocker, topics={'t'}, topics_partitions={0, 1, 2, 3, 4, 5}) member_metadata = { - 'C1': StickyPartitionAssignor._metadata({'t'}, []), - 'C2': StickyPartitionAssignor._metadata({'t'}, []), - 'C3': StickyPartitionAssignor._metadata({'t'}, []), + 'C1': sticky_assignor._metadata({'t'}, []), + 'C2': sticky_assignor._metadata({'t'}, []), + 'C3': sticky_assignor._metadata({'t'}, []), } - assignment1 = StickyPartitionAssignor.assign(cluster, make_join_group_response_members(member_metadata)) + assignment1 = sticky_assignor.assign(cluster, make_join_group_response_members(member_metadata)) verify_validity_and_balance({'C1': {'t'}, 'C2': {'t'}, 'C3': {'t'}}, assignment1) assert len(assignment1['C1'].assigned_partitions[0][1]) == 2 assert len(assignment1['C2'].assigned_partitions[0][1]) == 2 assert len(assignment1['C3'].assigned_partitions[0][1]) == 2 member_metadata = { - 'C2': StickyPartitionAssignor._metadata({'t'}, assignment1['C2'].partitions(), 1), + 'C2': sticky_assignor._metadata({'t'}, assignment1['C2'].partitions(), 1), } - assignment2 = StickyPartitionAssignor.assign(cluster, make_join_group_response_members(member_metadata)) + assignment2 = sticky_assignor.assign(cluster, make_join_group_response_members(member_metadata)) verify_validity_and_balance({'C2': {'t'}}, assignment2) assert len(assignment2['C2'].assigned_partitions[0][1]) == 6 assert all([partition in assignment2['C2'].assigned_partitions[0][1] for partition in assignment1['C2'].assigned_partitions[0][1]]) - assert StickyPartitionAssignor._latest_partition_movements.are_sticky() + assert sticky_assignor._latest_partition_movements.are_sticky() member_metadata = { - 'C1': StickyPartitionAssignor._metadata({'t'}, assignment1['C1'].partitions(), 1), - 'C2': StickyPartitionAssignor._metadata({'t'}, assignment2['C2'].partitions(), 2), - 'C3': StickyPartitionAssignor._metadata({'t'}, assignment1['C3'].partitions(), 1), + 'C1': sticky_assignor._metadata({'t'}, assignment1['C1'].partitions(), 1), + 'C2': sticky_assignor._metadata({'t'}, assignment2['C2'].partitions(), 2), + 'C3': sticky_assignor._metadata({'t'}, assignment1['C3'].partitions(), 1), } - assignment3 = StickyPartitionAssignor.assign(cluster, make_join_group_response_members(member_metadata)) + assignment3 = sticky_assignor.assign(cluster, make_join_group_response_members(member_metadata)) verify_validity_and_balance({'C1': {'t'}, 'C2': {'t'}, 'C3': {'t'}}, assignment3) - assert StickyPartitionAssignor._latest_partition_movements.are_sticky() + assert sticky_assignor._latest_partition_movements.are_sticky() assert set(assignment3['C1'].assigned_partitions[0][1]) == set(assignment1['C1'].assigned_partitions[0][1]) assert set(assignment3['C2'].assigned_partitions[0][1]) == set(assignment1['C2'].assigned_partitions[0][1]) assert set(assignment3['C3'].assigned_partitions[0][1]) == set(assignment1['C3'].assigned_partitions[0][1]) @pytest.mark.parametrize('execution_number', range(50)) -def test_assignment_with_conflicting_previous_generations(mocker, execution_number): +def test_assignment_with_conflicting_previous_generations(mocker, sticky_assignor, execution_number): cluster = create_cluster(mocker, topics={'t'}, topics_partitions={0, 1, 2, 3, 4, 5}) member_assignments = { @@ -797,11 +795,11 @@ def test_assignment_with_conflicting_previous_generations(mocker, execution_numb } member_metadata = {} for member in member_assignments: - member_metadata[member] = StickyPartitionAssignor._metadata({'t'}, member_assignments[member], member_generations[member]) + member_metadata[member] = sticky_assignor._metadata({'t'}, member_assignments[member], member_generations[member]) - assignment = StickyPartitionAssignor.assign(cluster, make_join_group_response_members(member_metadata)) + assignment = sticky_assignor.assign(cluster, make_join_group_response_members(member_metadata)) verify_validity_and_balance({'C1': {'t'}, 'C2': {'t'}, 'C3': {'t'}}, assignment) - assert StickyPartitionAssignor._latest_partition_movements.are_sticky() + assert sticky_assignor._latest_partition_movements.are_sticky() def assert_assignment(result_assignment, expected_assignment): From a5cfc39fe06f7bd4abcc261bd7eb2a8b80a5fc68 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Mon, 23 Mar 2026 10:39:40 -0700 Subject: [PATCH 2/3] fixes --- kafka/coordinator/assignors/sticky/sticky_assignor.py | 1 + kafka/coordinator/consumer.py | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/kafka/coordinator/assignors/sticky/sticky_assignor.py b/kafka/coordinator/assignors/sticky/sticky_assignor.py index 97903ce93..e1726bc92 100644 --- a/kafka/coordinator/assignors/sticky/sticky_assignor.py +++ b/kafka/coordinator/assignors/sticky/sticky_assignor.py @@ -598,6 +598,7 @@ def assign(self, cluster, members): } return assignment + @classmethod def parse_member_metadata(cls, metadata): """ Parses member metadata into a python object. diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index c2cae49d9..a07c86b6a 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -137,7 +137,7 @@ def __init__(self, client, subscription, **configs): self._consumer_sensors = None self._assignors = {} - for klass in self._config['assignors']: + for klass in self.config['assignors']: assignor = klass() self._assignors[assignor.name] = assignor self._cluster.request_update() From 36f2fd36993fd50b2af7a87be285b7c04b081dc2 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Mon, 23 Mar 2026 10:47:09 -0700 Subject: [PATCH 3/3] test_coordinator fixes --- test/test_coordinator.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/test/test_coordinator.py b/test/test_coordinator.py index b325d1f3b..cdf6fed23 100644 --- a/test/test_coordinator.py +++ b/test/test_coordinator.py @@ -123,16 +123,16 @@ def test_pattern_subscription(conn, metrics, api_version): def test_lookup_assignor(coordinator): - assert coordinator._lookup_assignor('roundrobin') is RoundRobinPartitionAssignor - assert coordinator._lookup_assignor('range') is RangePartitionAssignor - assert coordinator._lookup_assignor('sticky') is StickyPartitionAssignor + assert isinstance(coordinator._lookup_assignor('roundrobin'), RoundRobinPartitionAssignor) + assert isinstance(coordinator._lookup_assignor('range'), RangePartitionAssignor) + assert isinstance(coordinator._lookup_assignor('sticky'), StickyPartitionAssignor) assert coordinator._lookup_assignor('foobar') is None def test_join_complete(mocker, coordinator): coordinator._subscription.subscribe(topics=['foobar']) assignor = RoundRobinPartitionAssignor() - coordinator.config['assignors'] = (assignor,) + coordinator._assignors = {assignor.name: assignor} mocker.spy(assignor, 'on_assignment') assert assignor.on_assignment.call_count == 0 assignment = ConsumerProtocolAssignment(0, [('foobar', [0, 1])], b'') @@ -145,7 +145,7 @@ def test_join_complete(mocker, coordinator): def test_join_complete_with_sticky_assignor(mocker, coordinator): coordinator._subscription.subscribe(topics=['foobar']) assignor = StickyPartitionAssignor() - coordinator.config['assignors'] = (assignor,) + coordinator._assignors = {assignor.name: assignor} mocker.spy(assignor, 'on_assignment') assert assignor.on_assignment.call_count == 0 generation = 3