Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion kafka/coordinator/assignors/abstract.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,14 @@ def metadata(self, topics):
pass

@abc.abstractmethod
def on_assignment(self, assignment):
def on_assignment(self, assignment, generation):
"""Callback that runs on each assignment.

This method can be used to update internal state, if any, of the
partition assignor.

Arguments:
assignment (MemberAssignment): the member's assignment
generation (int): generation id of assignment
"""
pass
2 changes: 1 addition & 1 deletion kafka/coordinator/assignors/range.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,5 +74,5 @@ def metadata(cls, topics):
return ConsumerProtocolMemberMetadata_v0(cls.version, list(topics), b'')

@classmethod
def on_assignment(cls, assignment):
def on_assignment(cls, assignment, generation):
pass
2 changes: 1 addition & 1 deletion kafka/coordinator/assignors/roundrobin.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,5 +93,5 @@ def metadata(cls, topics):
return ConsumerProtocolMemberMetadata_v0(cls.version, list(topics), b'')

@classmethod
def on_assignment(cls, assignment):
def on_assignment(cls, assignment, generation):
pass
13 changes: 2 additions & 11 deletions kafka/coordinator/assignors/sticky/sticky_assignor.py
Original file line number Diff line number Diff line change
Expand Up @@ -666,21 +666,12 @@ def _metadata(cls, topics, member_assignment_partitions, generation=-1):
return ConsumerProtocolMemberMetadata_v0(cls.version, list(topics), user_data)

@classmethod
def on_assignment(cls, assignment):
def on_assignment(cls, assignment, generation):
"""Callback that runs on each assignment. Updates assignor's state.

Arguments:
assignment: MemberAssignment
"""
log.debug("On assignment: assignment={}".format(assignment))
log.debug(f"On assignment: assignment={assignment}, generation={generation}")
cls.member_assignment = assignment.partitions()

@classmethod
def on_generation_assignment(cls, generation):
"""Callback that runs on each assignment. Updates assignor's generation id.

Arguments:
generation: generation id
"""
log.debug("On generation assignment: generation={}".format(generation))
cls.generation = generation
4 changes: 1 addition & 3 deletions kafka/coordinator/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,9 +245,7 @@ def _on_join_complete(self, generation, member_id, protocol,

# give the assignor a chance to update internal state
# based on the received assignment
assignor.on_assignment(assignment)
if assignor.name == 'sticky':
assignor.on_generation_assignment(generation)
assignor.on_assignment(assignment, generation)

# reschedule the auto commit starting from now
self.next_auto_commit_deadline = time.monotonic() + self.auto_commit_interval
Expand Down
14 changes: 6 additions & 8 deletions test/test_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,25 +136,23 @@ def test_join_complete(mocker, coordinator):
mocker.spy(assignor, 'on_assignment')
assert assignor.on_assignment.call_count == 0
assignment = ConsumerProtocolMemberAssignment_v0(0, [('foobar', [0, 1])], b'')
coordinator._on_join_complete(0, 'member-foo', 'roundrobin', assignment.encode())
generation = 12
coordinator._on_join_complete(generation, 'member-foo', 'roundrobin', assignment.encode())
assert assignor.on_assignment.call_count == 1
assignor.on_assignment.assert_called_with(assignment)
assignor.on_assignment.assert_called_with(assignment, generation)


def test_join_complete_with_sticky_assignor(mocker, coordinator):
coordinator._subscription.subscribe(topics=['foobar'])
assignor = StickyPartitionAssignor()
coordinator.config['assignors'] = (assignor,)
mocker.spy(assignor, 'on_assignment')
mocker.spy(assignor, 'on_generation_assignment')
assert assignor.on_assignment.call_count == 0
assert assignor.on_generation_assignment.call_count == 0
generation = 3
assignment = ConsumerProtocolMemberAssignment_v0(0, [('foobar', [0, 1])], b'')
coordinator._on_join_complete(0, 'member-foo', 'sticky', assignment.encode())
coordinator._on_join_complete(generation, 'member-foo', 'sticky', assignment.encode())
assert assignor.on_assignment.call_count == 1
assert assignor.on_generation_assignment.call_count == 1
assignor.on_assignment.assert_called_with(assignment)
assignor.on_generation_assignment.assert_called_with(0)
assignor.on_assignment.assert_called_with(assignment, generation)


def test_subscription_listener(mocker, coordinator):
Expand Down
Loading