Skip to content

Commit b7a13a8

Browse files
authored
Assignors -> new protocol (#2772)
1 parent f8e9a71 commit b7a13a8

7 files changed

Lines changed: 89 additions & 91 deletions

File tree

kafka/coordinator/assignors/abstract.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
import abc
2-
import logging
32

4-
log = logging.getLogger(__name__)
3+
from kafka.protocol.new.consumer.metadata import (
4+
ConsumerProtocolSubscription, ConsumerProtocolAssignment,
5+
)
56

67

78
class AbstractPartitionAssignor(object):
@@ -26,7 +27,7 @@ def assign(self, cluster, members):
2627
when available.
2728
2829
Returns:
29-
dict: {member_id: MemberAssignment}
30+
dict: {member_id: ConsumerProtocolAssignment}
3031
"""
3132
pass
3233

@@ -38,7 +39,7 @@ def metadata(self, topics):
3839
topics (set): a member's subscribed topics
3940
4041
Returns:
41-
MemberMetadata struct
42+
ConsumerProtocolSubscription
4243
"""
4344
pass
4445

@@ -50,7 +51,7 @@ def on_assignment(self, assignment, generation):
5051
partition assignor.
5152
5253
Arguments:
53-
assignment (MemberAssignment): the member's assignment
54+
assignment (ConsumerProtocolAssignment): the member's assignment
5455
generation (int): generation id of assignment
5556
"""
5657
pass

kafka/coordinator/assignors/range.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,11 @@
22
import itertools
33
import logging
44

5-
from kafka.coordinator.assignors.abstract import AbstractPartitionAssignor
6-
from kafka.coordinator.protocol import ConsumerProtocolMemberMetadata_v0, ConsumerProtocolMemberAssignment_v0
5+
from kafka.coordinator.assignors.abstract import (
6+
AbstractPartitionAssignor,
7+
ConsumerProtocolSubscription,
8+
ConsumerProtocolAssignment,
9+
)
710

811
log = logging.getLogger(__name__)
912

@@ -63,15 +66,15 @@ def assign(cls, cluster, group_subscriptions):
6366

6467
protocol_assignment = {}
6568
for member_id in group_subscriptions:
66-
protocol_assignment[member_id] = ConsumerProtocolMemberAssignment_v0(
69+
protocol_assignment[member_id] = ConsumerProtocolAssignment(
6770
cls.version,
6871
sorted(assignment[member_id].items()),
6972
b'')
7073
return protocol_assignment
7174

7275
@classmethod
7376
def metadata(cls, topics):
74-
return ConsumerProtocolMemberMetadata_v0(cls.version, list(topics), b'')
77+
return ConsumerProtocolSubscription(cls.version, list(topics), b'')
7578

7679
@classmethod
7780
def on_assignment(cls, assignment, generation):

kafka/coordinator/assignors/roundrobin.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,11 @@
22
import itertools
33
import logging
44

5-
from kafka.coordinator.assignors.abstract import AbstractPartitionAssignor
6-
from kafka.coordinator.protocol import ConsumerProtocolMemberMetadata_v0, ConsumerProtocolMemberAssignment_v0
5+
from kafka.coordinator.assignors.abstract import (
6+
AbstractPartitionAssignor,
7+
ConsumerProtocolSubscription,
8+
ConsumerProtocolAssignment,
9+
)
710
from kafka.structs import TopicPartition
811

912
log = logging.getLogger(__name__)
@@ -82,15 +85,15 @@ def assign(cls, cluster, group_subscriptions):
8285

8386
protocol_assignment = {}
8487
for member_id in group_subscriptions:
85-
protocol_assignment[member_id] = ConsumerProtocolMemberAssignment_v0(
88+
protocol_assignment[member_id] = ConsumerProtocolAssignment(
8689
cls.version,
8790
sorted(assignment[member_id].items()),
8891
b'')
8992
return protocol_assignment
9093

9194
@classmethod
9295
def metadata(cls, topics):
93-
return ConsumerProtocolMemberMetadata_v0(cls.version, list(topics), b'')
96+
return ConsumerProtocolSubscription(cls.version, list(topics), b'')
9497

9598
@classmethod
9699
def on_assignment(cls, assignment, generation):

kafka/coordinator/assignors/sticky/sticky_assignor.py

Lines changed: 12 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,14 @@
22
from collections import defaultdict, namedtuple
33
from copy import deepcopy
44

5-
from kafka.coordinator.assignors.abstract import AbstractPartitionAssignor
6-
from kafka.coordinator.assignors.sticky.partition_movements import PartitionMovements
7-
from kafka.coordinator.assignors.sticky.sorted_set import SortedSet
8-
from kafka.coordinator.protocol import ConsumerProtocolMemberMetadata_v0, ConsumerProtocolMemberAssignment_v0
9-
from kafka.protocol.struct import Struct
10-
from kafka.protocol.types import Array, Int32, Schema, String
5+
from ..abstract import AbstractPartitionAssignor
6+
from .partition_movements import PartitionMovements
7+
from .sorted_set import SortedSet
8+
from .user_data import StickyAssignorUserData
9+
from kafka.protocol.new.consumer.metadata import (
10+
ConsumerProtocolSubscription,
11+
ConsumerProtocolAssignment,
12+
)
1113
from kafka.structs import TopicPartition
1214

1315
log = logging.getLogger(__name__)
@@ -51,20 +53,6 @@ def remove_if_present(collection, element):
5153
["subscription", "partitions", "generation"])
5254

5355

54-
class StickyAssignorUserDataV1(Struct):
55-
"""
56-
Used for preserving consumer's previously assigned partitions
57-
list and sending it as user data to the leader during a rebalance
58-
"""
59-
60-
SCHEMA = Schema(
61-
("previous_assignment", Array(
62-
("topic", String("utf-8")),
63-
("partitions", Array(Int32)))),
64-
("generation", Int32)
65-
)
66-
67-
6856
class StickyAssignmentExecutor:
6957
def __init__(self, cluster, members):
7058
# a mapping of member_id => StickyAssignorMemberMetadataV1
@@ -605,7 +593,7 @@ def assign(cls, cluster, members):
605593

606594
assignment = {}
607595
for member_id in members:
608-
assignment[member_id] = ConsumerProtocolMemberAssignment_v0(
596+
assignment[member_id] = ConsumerProtocolAssignment(
609597
cls.version, sorted(executor.get_final_assignment(member_id)), b''
610598
)
611599
return assignment
@@ -631,7 +619,7 @@ def parse_member_metadata(cls, metadata):
631619
)
632620

633621
try:
634-
decoded_user_data = StickyAssignorUserDataV1.decode(user_data)
622+
decoded_user_data = StickyAssignorUserData.decode(user_data)
635623
except Exception:
636624
# ignore the consumer's previous assignment if it cannot be parsed
637625
log.exception("Could not parse member data")
@@ -661,9 +649,9 @@ def _metadata(cls, topics, member_assignment_partitions, generation=-1):
661649
partitions_by_topic = defaultdict(list)
662650
for topic_partition in member_assignment_partitions:
663651
partitions_by_topic[topic_partition.topic].append(topic_partition.partition)
664-
data = StickyAssignorUserDataV1(list(partitions_by_topic.items()), generation)
652+
data = StickyAssignorUserData(list(partitions_by_topic.items()), generation)
665653
user_data = data.encode()
666-
return ConsumerProtocolMemberMetadata_v0(cls.version, list(topics), user_data)
654+
return ConsumerProtocolSubscription(cls.version, list(topics), user_data)
667655

668656
@classmethod
669657
def on_assignment(cls, assignment, generation):

kafka/coordinator/consumer.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,9 @@
88
from kafka.coordinator.assignors.range import RangePartitionAssignor
99
from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor
1010
from kafka.coordinator.assignors.sticky.sticky_assignor import StickyPartitionAssignor
11-
from kafka.coordinator.protocol import ConsumerProtocol
11+
from kafka.protocol.new.consumer.metadata import (
12+
ConsumerProtocolType, ConsumerProtocolSubscription, ConsumerProtocolAssignment,
13+
)
1214
from kafka.coordinator.subscription import Subscription
1315
import kafka.errors as Errors
1416
from kafka.future import Future
@@ -147,7 +149,7 @@ def __del__(self):
147149
super(ConsumerCoordinator, self).__del__()
148150

149151
def protocol_type(self):
150-
return ConsumerProtocol[0].PROTOCOL_TYPE
152+
return ConsumerProtocolType
151153

152154
def group_protocols(self):
153155
"""Returns list of preferred (protocols, metadata)"""
@@ -235,7 +237,7 @@ def _on_join_complete(self, generation, member_id, protocol,
235237
assignor = self._lookup_assignor(protocol)
236238
assert assignor, 'Coordinator selected invalid assignment protocol: %s' % (protocol,)
237239

238-
assignment = ConsumerProtocol[0].ASSIGNMENT.decode(member_assignment_bytes)
240+
assignment = ConsumerProtocolAssignment.decode(member_assignment_bytes)
239241

240242
try:
241243
self._subscription.assign_from_subscribed(assignment.partitions())
@@ -332,7 +334,7 @@ def _perform_assignment(self, leader_id, assignment_strategy, members):
332334
all_subscribed_topics = set()
333335
for member in members:
334336
subscription = Subscription(
335-
ConsumerProtocol[0].METADATA.decode(member.metadata),
337+
ConsumerProtocolSubscription.decode(member.metadata),
336338
member.group_instance_id
337339
)
338340
member_subscriptions[member.member_id] = subscription

0 commit comments

Comments
 (0)