Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions kafka/coordinator/assignors/abstract.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import abc
import logging

log = logging.getLogger(__name__)
from kafka.protocol.new.consumer.metadata import (
ConsumerProtocolSubscription, ConsumerProtocolAssignment,
)


class AbstractPartitionAssignor(object):
Expand All @@ -26,7 +27,7 @@ def assign(self, cluster, members):
when available.

Returns:
dict: {member_id: MemberAssignment}
dict: {member_id: ConsumerProtocolAssignment}
"""
pass

Expand All @@ -38,7 +39,7 @@ def metadata(self, topics):
topics (set): a member's subscribed topics

Returns:
MemberMetadata struct
ConsumerProtocolSubscription
"""
pass

Expand All @@ -50,7 +51,7 @@ def on_assignment(self, assignment, generation):
partition assignor.

Arguments:
assignment (MemberAssignment): the member's assignment
assignment (ConsumerProtocolAssignment): the member's assignment
generation (int): generation id of assignment
"""
pass
11 changes: 7 additions & 4 deletions kafka/coordinator/assignors/range.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@
import itertools
import logging

from kafka.coordinator.assignors.abstract import AbstractPartitionAssignor
from kafka.coordinator.protocol import ConsumerProtocolMemberMetadata_v0, ConsumerProtocolMemberAssignment_v0
from kafka.coordinator.assignors.abstract import (
AbstractPartitionAssignor,
ConsumerProtocolSubscription,
ConsumerProtocolAssignment,
)

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -63,15 +66,15 @@ def assign(cls, cluster, group_subscriptions):

protocol_assignment = {}
for member_id in group_subscriptions:
protocol_assignment[member_id] = ConsumerProtocolMemberAssignment_v0(
protocol_assignment[member_id] = ConsumerProtocolAssignment(
cls.version,
sorted(assignment[member_id].items()),
b'')
return protocol_assignment

@classmethod
def metadata(cls, topics):
return ConsumerProtocolMemberMetadata_v0(cls.version, list(topics), b'')
return ConsumerProtocolSubscription(cls.version, list(topics), b'')

@classmethod
def on_assignment(cls, assignment, generation):
Expand Down
11 changes: 7 additions & 4 deletions kafka/coordinator/assignors/roundrobin.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@
import itertools
import logging

from kafka.coordinator.assignors.abstract import AbstractPartitionAssignor
from kafka.coordinator.protocol import ConsumerProtocolMemberMetadata_v0, ConsumerProtocolMemberAssignment_v0
from kafka.coordinator.assignors.abstract import (
AbstractPartitionAssignor,
ConsumerProtocolSubscription,
ConsumerProtocolAssignment,
)
from kafka.structs import TopicPartition

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -82,15 +85,15 @@ def assign(cls, cluster, group_subscriptions):

protocol_assignment = {}
for member_id in group_subscriptions:
protocol_assignment[member_id] = ConsumerProtocolMemberAssignment_v0(
protocol_assignment[member_id] = ConsumerProtocolAssignment(
cls.version,
sorted(assignment[member_id].items()),
b'')
return protocol_assignment

@classmethod
def metadata(cls, topics):
return ConsumerProtocolMemberMetadata_v0(cls.version, list(topics), b'')
return ConsumerProtocolSubscription(cls.version, list(topics), b'')

@classmethod
def on_assignment(cls, assignment, generation):
Expand Down
36 changes: 12 additions & 24 deletions kafka/coordinator/assignors/sticky/sticky_assignor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@
from collections import defaultdict, namedtuple
from copy import deepcopy

from kafka.coordinator.assignors.abstract import AbstractPartitionAssignor
from kafka.coordinator.assignors.sticky.partition_movements import PartitionMovements
from kafka.coordinator.assignors.sticky.sorted_set import SortedSet
from kafka.coordinator.protocol import ConsumerProtocolMemberMetadata_v0, ConsumerProtocolMemberAssignment_v0
from kafka.protocol.struct import Struct
from kafka.protocol.types import Array, Int32, Schema, String
from ..abstract import AbstractPartitionAssignor
from .partition_movements import PartitionMovements
from .sorted_set import SortedSet
from .user_data import StickyAssignorUserData
from kafka.protocol.new.consumer.metadata import (
ConsumerProtocolSubscription,
ConsumerProtocolAssignment,
)
from kafka.structs import TopicPartition

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -51,20 +53,6 @@ def remove_if_present(collection, element):
["subscription", "partitions", "generation"])


class StickyAssignorUserDataV1(Struct):
"""
Used for preserving consumer's previously assigned partitions
list and sending it as user data to the leader during a rebalance
"""

SCHEMA = Schema(
("previous_assignment", Array(
("topic", String("utf-8")),
("partitions", Array(Int32)))),
("generation", Int32)
)


class StickyAssignmentExecutor:
def __init__(self, cluster, members):
# a mapping of member_id => StickyAssignorMemberMetadataV1
Expand Down Expand Up @@ -605,7 +593,7 @@ def assign(cls, cluster, members):

assignment = {}
for member_id in members:
assignment[member_id] = ConsumerProtocolMemberAssignment_v0(
assignment[member_id] = ConsumerProtocolAssignment(
cls.version, sorted(executor.get_final_assignment(member_id)), b''
)
return assignment
Expand All @@ -631,7 +619,7 @@ def parse_member_metadata(cls, metadata):
)

try:
decoded_user_data = StickyAssignorUserDataV1.decode(user_data)
decoded_user_data = StickyAssignorUserData.decode(user_data)
except Exception:
# ignore the consumer's previous assignment if it cannot be parsed
log.exception("Could not parse member data")
Expand Down Expand Up @@ -661,9 +649,9 @@ def _metadata(cls, topics, member_assignment_partitions, generation=-1):
partitions_by_topic = defaultdict(list)
for topic_partition in member_assignment_partitions:
partitions_by_topic[topic_partition.topic].append(topic_partition.partition)
data = StickyAssignorUserDataV1(list(partitions_by_topic.items()), generation)
data = StickyAssignorUserData(list(partitions_by_topic.items()), generation)
user_data = data.encode()
return ConsumerProtocolMemberMetadata_v0(cls.version, list(topics), user_data)
return ConsumerProtocolSubscription(cls.version, list(topics), user_data)

@classmethod
def on_assignment(cls, assignment, generation):
Expand Down
10 changes: 6 additions & 4 deletions kafka/coordinator/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
from kafka.coordinator.assignors.range import RangePartitionAssignor
from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor
from kafka.coordinator.assignors.sticky.sticky_assignor import StickyPartitionAssignor
from kafka.coordinator.protocol import ConsumerProtocol
from kafka.protocol.new.consumer.metadata import (
ConsumerProtocolType, ConsumerProtocolSubscription, ConsumerProtocolAssignment,
)
from kafka.coordinator.subscription import Subscription
import kafka.errors as Errors
from kafka.future import Future
Expand Down Expand Up @@ -147,7 +149,7 @@ def __del__(self):
super(ConsumerCoordinator, self).__del__()

def protocol_type(self):
return ConsumerProtocol[0].PROTOCOL_TYPE
return ConsumerProtocolType

def group_protocols(self):
"""Returns list of preferred (protocols, metadata)"""
Expand Down Expand Up @@ -235,7 +237,7 @@ def _on_join_complete(self, generation, member_id, protocol,
assignor = self._lookup_assignor(protocol)
assert assignor, 'Coordinator selected invalid assignment protocol: %s' % (protocol,)

assignment = ConsumerProtocol[0].ASSIGNMENT.decode(member_assignment_bytes)
assignment = ConsumerProtocolAssignment.decode(member_assignment_bytes)

try:
self._subscription.assign_from_subscribed(assignment.partitions())
Expand Down Expand Up @@ -332,7 +334,7 @@ def _perform_assignment(self, leader_id, assignment_strategy, members):
all_subscribed_topics = set()
for member in members:
subscription = Subscription(
ConsumerProtocol[0].METADATA.decode(member.metadata),
ConsumerProtocolSubscription.decode(member.metadata),
member.group_instance_id
)
member_subscriptions[member.member_id] = subscription
Expand Down
Loading
Loading