diff --git a/kafka/coordinator/assignors/sticky/StickyAssignorUserData.json b/kafka/coordinator/assignors/sticky/StickyAssignorUserData.json new file mode 100644 index 000000000..9635590c1 --- /dev/null +++ b/kafka/coordinator/assignors/sticky/StickyAssignorUserData.json @@ -0,0 +1,37 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "type": "data", + "name": "StickyAssignorUserData", + // StickyAssignor currently always encodes with version 1. + // To decode, versions are attempted in reverse order until one succeeds. + // If no decoding is possible, the assignor ignores the previous user data. + + // Version 1 added the "generation" field + "validVersions": "0-1", + "flexibleVersions": "none", + "fields": [ + { "name": "PreviousAssignment", "type": "[]TopicPartition", "versions": "0+", "fields": [ + { "name": "Topic", "type": "string", "mapKey": true, "versions": "0+", "entityType": "topicName", + "about": "The topic name."}, + { "name": "Partitions", "type": "[]int32", "versions": "0+", + "about": "The partition ids."} + ] + }, + { "name": "Generation", "type": "int32", "versions": "1+", "default": "-1", "ignorable": true, + "about": "The generation id of the previous assignment."} + ] +} diff --git a/kafka/coordinator/assignors/sticky/user_data.py b/kafka/coordinator/assignors/sticky/user_data.py new file mode 100644 index 000000000..e9a98a32c --- /dev/null +++ b/kafka/coordinator/assignors/sticky/user_data.py @@ -0,0 +1,8 @@ +from kafka.protocol.new.api_data import ApiData + + +class StickyAssignorUserData(ApiData, load_json=__package__): + def __init__(self, *args, **kw): + if 'version' not in kw: + kw['version'] = 1 + super().__init__(*args, **kw) diff --git a/kafka/protocol/new/api_data.py b/kafka/protocol/new/api_data.py index 7ecbe0a28..404bc7c14 100644 --- a/kafka/protocol/new/api_data.py +++ b/kafka/protocol/new/api_data.py @@ -11,7 +11,7 @@ class JsonSchemaData(SlotsBuilder): def __new__(metacls, name, bases, attrs, **kw): if kw.get('init', True): - json = load_json(name) + json = load_json(name, package=kw.get('load_json')) if 'json_patch' in attrs: json = attrs['json_patch'].__func__(metacls, json) attrs['_json'] = json diff --git a/kafka/protocol/new/schemas/load_json.py b/kafka/protocol/new/schemas/load_json.py index a2ee99eb8..25c9f32be 100644 --- a/kafka/protocol/new/schemas/load_json.py +++ b/kafka/protocol/new/schemas/load_json.py @@ -1,12 +1,17 @@ import importlib.resources +import inspect import json import re -def load_json(msg_type): +def load_json(msg_type, package=None): + if package is None: + package = __package__ + '.resources' + elif inspect.ismodule(package): + package = package.__package__ COMMENTS_REGEX = r"(?m)((?:^\s*//.*\n?)+)" # Raises FileNotFoundError if not found - msg_json = importlib.resources.read_text(__package__ + '.resources', msg_type + '.json') + msg_json = importlib.resources.read_text(package, msg_type + '.json') data = json.loads(re.sub(COMMENTS_REGEX, '', msg_json)) comments = re.findall(COMMENTS_REGEX, msg_json) if comments: