Skip to content

Commit b8c68d2

Browse files
authored
Assignor on_assignment(assignment, generation) (#2769)
1 parent 15d4c03 commit b8c68d2

6 files changed

Lines changed: 13 additions & 25 deletions

File tree

kafka/coordinator/assignors/abstract.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,13 +43,14 @@ def metadata(self, topics):
4343
pass
4444

4545
@abc.abstractmethod
46-
def on_assignment(self, assignment):
46+
def on_assignment(self, assignment, generation):
4747
"""Callback that runs on each assignment.
4848
4949
This method can be used to update internal state, if any, of the
5050
partition assignor.
5151
5252
Arguments:
5353
assignment (MemberAssignment): the member's assignment
54+
generation (int): generation id of assignment
5455
"""
5556
pass

kafka/coordinator/assignors/range.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,5 +74,5 @@ def metadata(cls, topics):
7474
return ConsumerProtocolMemberMetadata_v0(cls.version, list(topics), b'')
7575

7676
@classmethod
77-
def on_assignment(cls, assignment):
77+
def on_assignment(cls, assignment, generation):
7878
pass

kafka/coordinator/assignors/roundrobin.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,5 +93,5 @@ def metadata(cls, topics):
9393
return ConsumerProtocolMemberMetadata_v0(cls.version, list(topics), b'')
9494

9595
@classmethod
96-
def on_assignment(cls, assignment):
96+
def on_assignment(cls, assignment, generation):
9797
pass

kafka/coordinator/assignors/sticky/sticky_assignor.py

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -666,21 +666,12 @@ def _metadata(cls, topics, member_assignment_partitions, generation=-1):
666666
return ConsumerProtocolMemberMetadata_v0(cls.version, list(topics), user_data)
667667

668668
@classmethod
669-
def on_assignment(cls, assignment):
669+
def on_assignment(cls, assignment, generation):
670670
"""Callback that runs on each assignment. Updates assignor's state.
671671
672672
Arguments:
673673
assignment: MemberAssignment
674674
"""
675-
log.debug("On assignment: assignment={}".format(assignment))
675+
log.debug(f"On assignment: assignment={assignment}, generation={generation}")
676676
cls.member_assignment = assignment.partitions()
677-
678-
@classmethod
679-
def on_generation_assignment(cls, generation):
680-
"""Callback that runs on each assignment. Updates assignor's generation id.
681-
682-
Arguments:
683-
generation: generation id
684-
"""
685-
log.debug("On generation assignment: generation={}".format(generation))
686677
cls.generation = generation

kafka/coordinator/consumer.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -245,9 +245,7 @@ def _on_join_complete(self, generation, member_id, protocol,
245245

246246
# give the assignor a chance to update internal state
247247
# based on the received assignment
248-
assignor.on_assignment(assignment)
249-
if assignor.name == 'sticky':
250-
assignor.on_generation_assignment(generation)
248+
assignor.on_assignment(assignment, generation)
251249

252250
# reschedule the auto commit starting from now
253251
self.next_auto_commit_deadline = time.monotonic() + self.auto_commit_interval

test/test_coordinator.py

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -136,25 +136,23 @@ def test_join_complete(mocker, coordinator):
136136
mocker.spy(assignor, 'on_assignment')
137137
assert assignor.on_assignment.call_count == 0
138138
assignment = ConsumerProtocolMemberAssignment_v0(0, [('foobar', [0, 1])], b'')
139-
coordinator._on_join_complete(0, 'member-foo', 'roundrobin', assignment.encode())
139+
generation = 12
140+
coordinator._on_join_complete(generation, 'member-foo', 'roundrobin', assignment.encode())
140141
assert assignor.on_assignment.call_count == 1
141-
assignor.on_assignment.assert_called_with(assignment)
142+
assignor.on_assignment.assert_called_with(assignment, generation)
142143

143144

144145
def test_join_complete_with_sticky_assignor(mocker, coordinator):
145146
coordinator._subscription.subscribe(topics=['foobar'])
146147
assignor = StickyPartitionAssignor()
147148
coordinator.config['assignors'] = (assignor,)
148149
mocker.spy(assignor, 'on_assignment')
149-
mocker.spy(assignor, 'on_generation_assignment')
150150
assert assignor.on_assignment.call_count == 0
151-
assert assignor.on_generation_assignment.call_count == 0
151+
generation = 3
152152
assignment = ConsumerProtocolMemberAssignment_v0(0, [('foobar', [0, 1])], b'')
153-
coordinator._on_join_complete(0, 'member-foo', 'sticky', assignment.encode())
153+
coordinator._on_join_complete(generation, 'member-foo', 'sticky', assignment.encode())
154154
assert assignor.on_assignment.call_count == 1
155-
assert assignor.on_generation_assignment.call_count == 1
156-
assignor.on_assignment.assert_called_with(assignment)
157-
assignor.on_generation_assignment.assert_called_with(0)
155+
assignor.on_assignment.assert_called_with(assignment, generation)
158156

159157

160158
def test_subscription_listener(mocker, coordinator):

0 commit comments

Comments
 (0)