We read every piece of feedback, and take your input very seriously.
To see all available qualifiers, see our documentation.
1 parent e34faad commit bb62428Copy full SHA for bb62428
6 files changed
kafka/coordinator/assignors/abstract.py
@@ -32,8 +32,8 @@ def assign(self, cluster, members):
32
pass
33
34
@abc.abstractmethod
35
- def metadata(self, topics):
36
- """Generate ProtocolMetadata to be submitted via JoinGroupRequest.
+ def subscription(self, topics):
+ """Generate subscription data to be submitted via JoinGroupRequest.
37
38
Arguments:
39
topics (set): a member's subscribed topics
kafka/coordinator/assignors/range.py
@@ -73,7 +73,7 @@ def assign(cls, cluster, group_subscriptions):
73
return protocol_assignment
74
75
@classmethod
76
- def metadata(cls, topics):
+ def subscription(cls, topics):
77
return ConsumerProtocolSubscription(cls.version, list(topics), b'')
78
79
kafka/coordinator/assignors/roundrobin.py
@@ -92,7 +92,7 @@ def assign(cls, cluster, group_subscriptions):
92
93
94
95
96
97
98
kafka/coordinator/assignors/sticky/sticky_assignor.py
@@ -636,7 +636,7 @@ def parse_member_metadata(cls, metadata):
636
)
637
638
639
640
return cls._metadata(topics, cls.member_assignment, cls.generation)
641
642
@@ -645,7 +645,7 @@ def _metadata(cls, topics, member_assignment_partitions, generation=-1):
645
log.debug("No member assignment available")
646
user_data = b''
647
else:
648
- log.debug("Member assignment is available, generating the metadata: generation {}".format(cls.generation))
+ log.debug("Member assignment is available, generating subscription: generation {}".format(cls.generation))
649
partitions_by_topic = defaultdict(list)
650
for topic_partition in member_assignment_partitions:
651
partitions_by_topic[topic_partition.topic].append(topic_partition.partition)
kafka/coordinator/consumer.py
@@ -168,7 +168,7 @@ def group_protocols(self):
168
self._joined_subscription = set(self._subscription.subscription)
169
metadata_list = []
170
for assignor in self.config['assignors']:
171
- metadata = assignor.metadata(self._joined_subscription)
+ metadata = assignor.subscription(self._joined_subscription)
172
group_protocol = (assignor.name, metadata)
173
metadata_list.append(group_protocol)
174
return metadata_list
test/test_assignors.py
@@ -34,8 +34,8 @@ def test_assignor_roundrobin(mocker):
assignor = RoundRobinPartitionAssignor
group_subscriptions = {
- 'C0': Subscription(assignor.metadata({'t0', 't1'}), None),
- 'C1': Subscription(assignor.metadata({'t0', 't1'}), None),
+ 'C0': Subscription(assignor.subscription({'t0', 't1'}), None),
+ 'C1': Subscription(assignor.subscription({'t0', 't1'}), None),
}
40
41
cluster = create_cluster(mocker, {'t0', 't1'}, topics_partitions={0, 1, 2})
@@ -56,8 +56,8 @@ def test_assignor_range(mocker):
56
assignor = RangePartitionAssignor
57
58
59
60
61
62
63
0 commit comments