From 31ea4b98c54b199fba2dc033bc472b6377767d42 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sat, 21 Mar 2026 09:05:04 -0700 Subject: [PATCH] Use implicit bytes encoding for StickyAssignorUserData --- .../assignors/sticky/sticky_assignor.py | 26 ++++++++++--------- 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/kafka/coordinator/assignors/sticky/sticky_assignor.py b/kafka/coordinator/assignors/sticky/sticky_assignor.py index e1726bc92..e9589221b 100644 --- a/kafka/coordinator/assignors/sticky/sticky_assignor.py +++ b/kafka/coordinator/assignors/sticky/sticky_assignor.py @@ -1,6 +1,7 @@ -import logging from collections import defaultdict, namedtuple from copy import deepcopy +import logging +import io from ..abstract import AbstractPartitionAssignor from .partition_movements import PartitionMovements @@ -617,15 +618,17 @@ def parse_member_metadata(cls, metadata): return StickyAssignorMemberMetadataV1( partitions=[], generation=cls.DEFAULT_GENERATION_ID, subscription=metadata.topics ) - - try: - decoded_user_data = StickyAssignorUserData.decode(user_data) - except Exception: - # ignore the consumer's previous assignment if it cannot be parsed - log.exception("Could not parse member data") - return StickyAssignorMemberMetadataV1( - partitions=[], generation=cls.DEFAULT_GENERATION_ID, subscription=metadata.topics - ) + elif isinstance(user_data, StickyAssignorUserData): + decoded_user_data = user_data + else: + try: + decoded_user_data = StickyAssignorUserData.decode(user_data) + except Exception: + # ignore the consumer's previous assignment if it cannot be parsed + log.exception("Could not parse member data") + return StickyAssignorMemberMetadataV1( + partitions=[], generation=cls.DEFAULT_GENERATION_ID, subscription=metadata.topics + ) member_partitions = [] for topic, partitions in decoded_user_data.previous_assignment: # pylint: disable=no-member @@ -648,8 +651,7 @@ def _metadata(cls, topics, member_assignment_partitions, generation=-1): partitions_by_topic = defaultdict(list) for topic_partition in member_assignment_partitions: partitions_by_topic[topic_partition.topic].append(topic_partition.partition) - data = StickyAssignorUserData(list(partitions_by_topic.items()), generation) - user_data = data.encode() + user_data = StickyAssignorUserData(list(partitions_by_topic.items()), generation) return ConsumerProtocolSubscription(cls.version, list(topics), user_data) def on_assignment(self, assignment, generation):