diff --git a/kafka/coordinator/protocol.py b/kafka/coordinator/protocol.py index 51957a468..378bc978c 100644 --- a/kafka/coordinator/protocol.py +++ b/kafka/coordinator/protocol.py @@ -13,14 +13,17 @@ class ConsumerProtocolMemberMetadata_v0(Struct): class ConsumerProtocolMemberAssignment_v0(Struct): SCHEMA = Schema( ('version', Int16), - ('assignment', Array( + ('assigned_partitions', Array( ('topic', String('utf-8')), ('partitions', Array(Int32)))), ('user_data', Bytes)) + ALIASES = { + 'assignment': 'assigned_partitions', + } def partitions(self): return [TopicPartition(topic, partition) - for topic, partitions in self.assignment # pylint: disable-msg=no-member + for topic, partitions in self.assigned_partitions for partition in partitions] diff --git a/kafka/protocol/new/consumer/__init__.py b/kafka/protocol/new/consumer/__init__.py index 235088794..1566e406d 100644 --- a/kafka/protocol/new/consumer/__init__.py +++ b/kafka/protocol/new/consumer/__init__.py @@ -1,5 +1,6 @@ from .fetch import * from .group import * +from .metadata import * from .offsets import * @@ -13,4 +14,5 @@ 'HeartbeatRequest', 'HeartbeatResponse', 'OffsetFetchRequest', 'OffsetFetchResponse', 'OffsetCommitRequest', 'OffsetCommitResponse', + 'ConsumerProtocolType', 'ConsumerProtocolSubscription', 'ConsumerProtocolAssignment', ] diff --git a/kafka/protocol/new/consumer/metadata.py b/kafka/protocol/new/consumer/metadata.py new file mode 100644 index 000000000..8e04b0ca3 --- /dev/null +++ b/kafka/protocol/new/consumer/metadata.py @@ -0,0 +1,28 @@ +from ..api_data import ApiData +from kafka.structs import TopicPartition + + +ConsumerProtocolType = 'consumer' + + +class ConsumerProtocolSubscription(ApiData): pass +class ConsumerProtocolAssignment(ApiData): + + # Compatibility with old manual protocol definition + @property + def assignment(self): + return self.assigned_partitions + + @assignment.setter + def assignment(self, value): + self.assigned_partitions = value + + def partitions(self): + return [TopicPartition(topic, partition) + for topic, partitions in self.assigned_partitions + for partition in partitions] + + +__all__ = [ + 'ConsumerProtocolSubscription', 'ConsumerProtocolAssignment', 'ConsumerProtocolType', +] diff --git a/kafka/protocol/new/schemas/resources/ConsumerProtocolAssignment.json b/kafka/protocol/new/schemas/resources/ConsumerProtocolAssignment.json new file mode 100644 index 000000000..afa02a1bb --- /dev/null +++ b/kafka/protocol/new/schemas/resources/ConsumerProtocolAssignment.json @@ -0,0 +1,42 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "type": "data", + "name": "ConsumerProtocolAssignment", + // Assignment part of the Consumer Protocol. + // + // The current implementation assumes that future versions will not break compatibility. When + // it encounters a newer version, it parses it using the current format. This basically means + // that new versions cannot remove or reorder any of the existing fields. + // + // Version 2 is to support a new field "GenerationId" in ConsumerProtocolSubscription. + // Version 3 adds rack id to ConsumerProtocolSubscription. + "validVersions": "0-3", + "flexibleVersions": "none", + "fields": [ + { "name": "AssignedPartitions", "type": "[]TopicPartition", "versions": "0+", + "about": "The list of topics and partitions assigned to this consumer.", "fields": [ + { "name": "Topic", "type": "string", "mapKey": true, "versions": "0+", "entityType": "topicName", + "about": "The topic name."}, + { "name": "Partitions", "type": "[]int32", "versions": "0+", + "about": "The list of partitions assigned to this consumer."} + ] + }, + { "name": "UserData", "type": "bytes", "versions": "0+", "nullableVersions": "0+", + "default": "null", "zeroCopy": true, + "about": "User data."} + ] +} diff --git a/kafka/protocol/new/schemas/resources/ConsumerProtocolSubscription.json b/kafka/protocol/new/schemas/resources/ConsumerProtocolSubscription.json new file mode 100644 index 000000000..ae7aa2b2b --- /dev/null +++ b/kafka/protocol/new/schemas/resources/ConsumerProtocolSubscription.json @@ -0,0 +1,49 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "type": "data", + "name": "ConsumerProtocolSubscription", + // Subscription part of the Consumer Protocol. + // + // The current implementation assumes that future versions will not break compatibility. When + // it encounters a newer version, it parses it using the current format. This basically means + // that new versions cannot remove or reorder any of the existing fields. + + // Version 1 added the "OwnedPartitions" field to allow assigner know what partitions each member owned + // Version 2 added a new field "GenerationId" to indicate if the member has out-of-date ownedPartitions. + // Version 3 adds rack id to enable rack-aware assignment. + "validVersions": "0-3", + "flexibleVersions": "none", + "fields": [ + { "name": "Topics", "type": "[]string", "versions": "0+", + "about": "The topics that the member wants to consume."}, + { "name": "UserData", "type": "bytes", "versions": "0+", "nullableVersions": "0+", + "default": "null", "zeroCopy": true, + "about": "User data that will be passed back to the consumer."}, + { "name": "OwnedPartitions", "type": "[]TopicPartition", "versions": "1+", "ignorable": true, + "about": "The partitions that the member owns.", "fields": [ + { "name": "Topic", "type": "string", "mapKey": true, "versions": "1+", "entityType": "topicName", + "about": "The topic name."}, + { "name": "Partitions", "type": "[]int32", "versions": "1+", + "about": "The partition ids."} + ] + }, + { "name": "GenerationId", "type": "int32", "versions": "2+", "default": "-1", "ignorable": true, + "about": "The generation id of the member."}, + { "name": "RackId", "type": "string", "versions": "3+", "nullableVersions": "3+", "default": "null", "ignorable": true, + "about": "The rack id of the member."} + ] +} diff --git a/test/protocol/new/consumer/test_metadata.py b/test/protocol/new/consumer/test_metadata.py new file mode 100644 index 000000000..4b7e7d919 --- /dev/null +++ b/test/protocol/new/consumer/test_metadata.py @@ -0,0 +1,43 @@ +import pytest + +from kafka.protocol.new.consumer.metadata import * + + +@pytest.mark.parametrize('version', range(ConsumerProtocolAssignment.min_version, ConsumerProtocolAssignment.max_version + 1)) +def test_consumer_protocol_assignment(version): + assignment = ConsumerProtocolAssignment( + assigned_partitions=[('t0', [0, 2]), ('t1', [1])], + user_data=b'foo\x12', + version=version, + ) + encoded = assignment.encode() + decoded = ConsumerProtocolAssignment.decode(encoded) + assert decoded == assignment + assert decoded.version == version + assert len(decoded.assigned_partitions) == 2 + assert decoded.assigned_partitions[0].topic == 't0' + assert decoded.assigned_partitions[0].partitions == [0, 2] + assert decoded.assigned_partitions[1].topic == 't1' + assert decoded.assigned_partitions[1].partitions == [1] + assert decoded.user_data == b'foo\x12' + + +@pytest.mark.parametrize('version', range(ConsumerProtocolSubscription.min_version, ConsumerProtocolSubscription.max_version + 1)) +def test_consumer_protocol_subscription(version): + topics = ['t0', 't1'] + user_data = b'foo\x12' + subscription = ConsumerProtocolSubscription( + topics=topics, + user_data=user_data, + version=version, + ) + encoded = subscription.encode() + decoded = ConsumerProtocolSubscription.decode(encoded) + assert decoded == subscription + assert decoded.version == version + assert decoded.topics == topics + assert decoded.user_data == user_data + + +def test_consumer_protocol_type(): + assert ConsumerProtocolType == 'consumer'