diff --git a/kafka/admin/client.py b/kafka/admin/client.py index 22ac70f1c..7ac809e06 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -504,7 +504,7 @@ def delete_topics(self, topics, timeout_ms=None, raise_errors=True): """ version = self._client.api_version(DeleteTopicsRequest, max_version=3) timeout_ms = self._validate_timeout(timeout_ms) - request = DeleteTopicsRequest[version](topics=topics, timeout_ms=timeout_ms) + request = DeleteTopicsRequest[version](topic_names=topics, timeout_ms=timeout_ms) def get_response_errors(r): for response in r.responses: yield Errors.for_code(response[1]) @@ -554,7 +554,7 @@ def list_topics(self): A list of topic name strings. """ metadata = self._get_cluster_metadata(topics=None) - return [t['topic'] for t in metadata['topics']] + return [t['name'] for t in metadata['topics']] def describe_topics(self, topics=None): """Fetch metadata for the specified topics or all topics if None. @@ -651,12 +651,11 @@ def describe_acls(self, acl_filter): request = DescribeAclsRequest[version]( resource_type_filter=acl_filter.resource_pattern.resource_type, resource_name_filter=acl_filter.resource_pattern.resource_name, - resource_pattern_type_filter=acl_filter.resource_pattern.pattern_type, + pattern_type_filter=acl_filter.resource_pattern.pattern_type, principal_filter=acl_filter.principal, - host=acl_filter.host, + host_filter=acl_filter.host, operation=acl_filter.operation, permission_type=acl_filter.permission_type - ) response = self.send_request(request) # pylint: disable=E0606 error_type = Errors.for_code(response.error_code) @@ -1089,9 +1088,9 @@ def _get_leader_for_partitions(self, partitions, timeout_ms=None): valid_partitions = set() for topic in metadata.get("topics", ()): for partition in topic.get("partitions", ()): - t2p = TopicPartition(topic=topic["topic"], partition=partition["partition"]) + t2p = TopicPartition(topic=topic["name"], partition=partition["partition_index"]) if t2p in partitions: - leader2partitions[partition["leader"]].append(t2p) + leader2partitions[partition["leader_id"]].append(t2p) valid_partitions.add(t2p) if len(partitions) != len(valid_partitions): diff --git a/kafka/protocol/metadata.py b/kafka/protocol/metadata.py index f3d588133..0fa6939ca 100644 --- a/kafka/protocol/metadata.py +++ b/kafka/protocol/metadata.py @@ -12,13 +12,13 @@ class MetadataResponse_v0(Response): ('port', Int32))), ('topics', Array( ('error_code', Int16), - ('topic', String('utf-8')), + ('name', String('utf-8')), ('partitions', Array( ('error_code', Int16), - ('partition', Int32), - ('leader', Int32), - ('replicas', Array(Int32)), - ('isr', Array(Int32)))))) + ('partition_index', Int32), + ('leader_id', Int32), + ('replica_nodes', Array(Int32)), + ('isr_nodes', Array(Int32)))))) ) @@ -34,14 +34,14 @@ class MetadataResponse_v1(Response): ('controller_id', Int32), ('topics', Array( ('error_code', Int16), - ('topic', String('utf-8')), + ('name', String('utf-8')), ('is_internal', Boolean), ('partitions', Array( ('error_code', Int16), - ('partition', Int32), - ('leader', Int32), - ('replicas', Array(Int32)), - ('isr', Array(Int32)))))) + ('partition_index', Int32), + ('leader_id', Int32), + ('replica_nodes', Array(Int32)), + ('isr_nodes', Array(Int32)))))) ) @@ -58,14 +58,14 @@ class MetadataResponse_v2(Response): ('controller_id', Int32), ('topics', Array( ('error_code', Int16), - ('topic', String('utf-8')), + ('name', String('utf-8')), ('is_internal', Boolean), ('partitions', Array( ('error_code', Int16), - ('partition', Int32), - ('leader', Int32), - ('replicas', Array(Int32)), - ('isr', Array(Int32)))))) + ('partition_index', Int32), + ('leader_id', Int32), + ('replica_nodes', Array(Int32)), + ('isr_nodes', Array(Int32)))))) ) @@ -83,14 +83,14 @@ class MetadataResponse_v3(Response): ('controller_id', Int32), ('topics', Array( ('error_code', Int16), - ('topic', String('utf-8')), + ('name', String('utf-8')), ('is_internal', Boolean), ('partitions', Array( ('error_code', Int16), - ('partition', Int32), - ('leader', Int32), - ('replicas', Array(Int32)), - ('isr', Array(Int32)))))) + ('partition_index', Int32), + ('leader_id', Int32), + ('replica_nodes', Array(Int32)), + ('isr_nodes', Array(Int32)))))) ) @@ -114,14 +114,14 @@ class MetadataResponse_v5(Response): ('controller_id', Int32), ('topics', Array( ('error_code', Int16), - ('topic', String('utf-8')), + ('name', String('utf-8')), ('is_internal', Boolean), ('partitions', Array( ('error_code', Int16), - ('partition', Int32), - ('leader', Int32), - ('replicas', Array(Int32)), - ('isr', Array(Int32)), + ('partition_index', Int32), + ('leader_id', Int32), + ('replica_nodes', Array(Int32)), + ('isr_nodes', Array(Int32)), ('offline_replicas', Array(Int32)))))) ) @@ -149,15 +149,15 @@ class MetadataResponse_v7(Response): ('controller_id', Int32), ('topics', Array( ('error_code', Int16), - ('topic', String('utf-8')), + ('name', String('utf-8')), ('is_internal', Boolean), ('partitions', Array( ('error_code', Int16), - ('partition', Int32), - ('leader', Int32), + ('partition_index', Int32), + ('leader_id', Int32), ('leader_epoch', Int32), - ('replicas', Array(Int32)), - ('isr', Array(Int32)), + ('replica_nodes', Array(Int32)), + ('isr_nodes', Array(Int32)), ('offline_replicas', Array(Int32)))))) ) @@ -177,15 +177,15 @@ class MetadataResponse_v8(Response): ('controller_id', Int32), ('topics', Array( ('error_code', Int16), - ('topic', String('utf-8')), + ('name', String('utf-8')), ('is_internal', Boolean), ('partitions', Array( ('error_code', Int16), - ('partition', Int32), - ('leader', Int32), + ('partition_index', Int32), + ('leader_id', Int32), ('leader_epoch', Int32), - ('replicas', Array(Int32)), - ('isr', Array(Int32)), + ('replica_nodes', Array(Int32)), + ('isr_nodes', Array(Int32)), ('offline_replicas', Array(Int32)))), ('authorized_operations', BitField))), ('authorized_operations', BitField) diff --git a/test/integration/test_admin_integration.py b/test/integration/test_admin_integration.py index 5a3a83d2b..ca2d79f2e 100644 --- a/test/integration/test_admin_integration.py +++ b/test/integration/test_admin_integration.py @@ -238,7 +238,7 @@ def consumer_thread(i, group_id): assert(len(consumer_group.members) == 1) for member in consumer_group.members: assert(member.member_metadata.topics[0] == topic) - assert(member.member_assignment.assignment[0][0] == topic) + assert(member.member_assignment.assigned_partitions[0][0] == topic) consumer_groups.add(consumer_group.group) assert(sorted(list(consumer_groups)) == group_id_list) finally: @@ -406,8 +406,8 @@ def test_create_delete_topics(kafka_admin_client): @pytest.mark.skipif(env_kafka_version() < (2, 2), reason="Leader Election requires broker >=2.2") def test_perform_leader_election(kafka_admin_client, topic): topic_metadata = kafka_admin_client.describe_topics([topic])[0] - assert topic_metadata['topic'] == topic - partitions = list(map(lambda p: p['partition'], topic_metadata['partitions'])) + assert topic_metadata['name'] == topic + partitions = list(map(lambda p: p['partition_index'], topic_metadata['partitions'])) election_type = 0 # Preferred topic_partitions = {topic: partitions} # When Leader Election is not needed (cluster is stable), error 84 is returned diff --git a/test/test_object_conversion.py b/test/protocol/test_to_object.py similarity index 86% rename from test/test_object_conversion.py rename to test/protocol/test_to_object.py index bd2c51b47..602c014d6 100644 --- a/test/test_object_conversion.py +++ b/test/protocol/test_to_object.py @@ -1,9 +1,6 @@ -from kafka.protocol.admin import Request -from kafka.protocol.admin import Response -from kafka.protocol.types import Schema -from kafka.protocol.types import Array -from kafka.protocol.types import Int16 -from kafka.protocol.types import String +from kafka.protocol.api import Request +from kafka.protocol.api import Response +from kafka.protocol.types import Schema, Array, Int16, String import pytest @@ -197,31 +194,31 @@ def test_with_metadata_response(): assert len(obj['topics']) == 2 assert obj['topics'][0]['error_code'] == 0 - assert obj['topics'][0]['topic'] == 'testtopic1' + assert obj['topics'][0]['name'] == 'testtopic1' assert obj['topics'][0]['is_internal'] is False assert len(obj['topics'][0]['partitions']) == 2 assert obj['topics'][0]['partitions'][0]['error_code'] == 0 - assert obj['topics'][0]['partitions'][0]['partition'] == 0 - assert obj['topics'][0]['partitions'][0]['leader'] == 0 - assert obj['topics'][0]['partitions'][0]['replicas'] == [0, 1] - assert obj['topics'][0]['partitions'][0]['isr'] == [0, 1] + assert obj['topics'][0]['partitions'][0]['partition_index'] == 0 + assert obj['topics'][0]['partitions'][0]['leader_id'] == 0 + assert obj['topics'][0]['partitions'][0]['replica_nodes'] == [0, 1] + assert obj['topics'][0]['partitions'][0]['isr_nodes'] == [0, 1] assert obj['topics'][0]['partitions'][0]['offline_replicas'] == [] assert obj['topics'][0]['partitions'][1]['error_code'] == 0 - assert obj['topics'][0]['partitions'][1]['partition'] == 1 - assert obj['topics'][0]['partitions'][1]['leader'] == 1 - assert obj['topics'][0]['partitions'][1]['replicas'] == [1, 0] - assert obj['topics'][0]['partitions'][1]['isr'] == [1, 0] + assert obj['topics'][0]['partitions'][1]['partition_index'] == 1 + assert obj['topics'][0]['partitions'][1]['leader_id'] == 1 + assert obj['topics'][0]['partitions'][1]['replica_nodes'] == [1, 0] + assert obj['topics'][0]['partitions'][1]['isr_nodes'] == [1, 0] assert obj['topics'][0]['partitions'][1]['offline_replicas'] == [] assert obj['topics'][1]['error_code'] == 0 - assert obj['topics'][1]['topic'] == 'other-test-topic' + assert obj['topics'][1]['name'] == 'other-test-topic' assert obj['topics'][1]['is_internal'] is True assert len(obj['topics'][1]['partitions']) == 1 assert obj['topics'][1]['partitions'][0]['error_code'] == 0 - assert obj['topics'][1]['partitions'][0]['partition'] == 0 - assert obj['topics'][1]['partitions'][0]['leader'] == 0 - assert obj['topics'][1]['partitions'][0]['replicas'] == [0, 1] - assert obj['topics'][1]['partitions'][0]['isr'] == [0, 1] + assert obj['topics'][1]['partitions'][0]['partition_index'] == 0 + assert obj['topics'][1]['partitions'][0]['leader_id'] == 0 + assert obj['topics'][1]['partitions'][0]['replica_nodes'] == [0, 1] + assert obj['topics'][1]['partitions'][0]['isr_nodes'] == [0, 1] assert obj['topics'][1]['partitions'][0]['offline_replicas'] == [] tc.encode() diff --git a/test/test_assignors.py b/test/test_assignors.py index 1056a7c1a..c04d6afb6 100644 --- a/test/test_assignors.py +++ b/test/test_assignors.py @@ -341,7 +341,7 @@ def test_sticky_add_remove_consumer_one_topic(mocker): assignment = StickyPartitionAssignor.assign(cluster, member_metadata) verify_validity_and_balance(subscriptions, assignment) - assert len(assignment['C2'].assignment[0][1]) == 3 + assert len(assignment['C2'].assigned_partitions[0][1]) == 3 def test_sticky_add_remove_topic_two_consumers(mocker): @@ -693,9 +693,9 @@ def test_assignment_with_multiple_generations1(mocker): assignment1 = StickyPartitionAssignor.assign(cluster, member_metadata) verify_validity_and_balance({'C1': {'t'}, 'C2': {'t'}, 'C3': {'t'}}, assignment1) - assert len(assignment1['C1'].assignment[0][1]) == 2 - assert len(assignment1['C2'].assignment[0][1]) == 2 - assert len(assignment1['C3'].assignment[0][1]) == 2 + assert len(assignment1['C1'].assigned_partitions[0][1]) == 2 + assert len(assignment1['C2'].assigned_partitions[0][1]) == 2 + assert len(assignment1['C3'].assigned_partitions[0][1]) == 2 member_metadata = { 'C1': StickyPartitionAssignor._metadata({'t'}, assignment1['C1'].partitions()), @@ -704,10 +704,10 @@ def test_assignment_with_multiple_generations1(mocker): assignment2 = StickyPartitionAssignor.assign(cluster, member_metadata) verify_validity_and_balance({'C1': {'t'}, 'C2': {'t'}}, assignment2) - assert len(assignment2['C1'].assignment[0][1]) == 3 - assert len(assignment2['C2'].assignment[0][1]) == 3 - assert all([partition in assignment2['C1'].assignment[0][1] for partition in assignment1['C1'].assignment[0][1]]) - assert all([partition in assignment2['C2'].assignment[0][1] for partition in assignment1['C2'].assignment[0][1]]) + assert len(assignment2['C1'].assigned_partitions[0][1]) == 3 + assert len(assignment2['C2'].assigned_partitions[0][1]) == 3 + assert all([partition in assignment2['C1'].assigned_partitions[0][1] for partition in assignment1['C1'].assigned_partitions[0][1]]) + assert all([partition in assignment2['C2'].assigned_partitions[0][1] for partition in assignment1['C2'].assigned_partitions[0][1]]) assert StickyPartitionAssignor._latest_partition_movements.are_sticky() member_metadata = { @@ -717,8 +717,8 @@ def test_assignment_with_multiple_generations1(mocker): assignment3 = StickyPartitionAssignor.assign(cluster, member_metadata) verify_validity_and_balance({'C2': {'t'}, 'C3': {'t'}}, assignment3) - assert len(assignment3['C2'].assignment[0][1]) == 3 - assert len(assignment3['C3'].assignment[0][1]) == 3 + assert len(assignment3['C2'].assigned_partitions[0][1]) == 3 + assert len(assignment3['C3'].assigned_partitions[0][1]) == 3 assert StickyPartitionAssignor._latest_partition_movements.are_sticky() @@ -733,9 +733,9 @@ def test_assignment_with_multiple_generations2(mocker): assignment1 = StickyPartitionAssignor.assign(cluster, member_metadata) verify_validity_and_balance({'C1': {'t'}, 'C2': {'t'}, 'C3': {'t'}}, assignment1) - assert len(assignment1['C1'].assignment[0][1]) == 2 - assert len(assignment1['C2'].assignment[0][1]) == 2 - assert len(assignment1['C3'].assignment[0][1]) == 2 + assert len(assignment1['C1'].assigned_partitions[0][1]) == 2 + assert len(assignment1['C2'].assigned_partitions[0][1]) == 2 + assert len(assignment1['C3'].assigned_partitions[0][1]) == 2 member_metadata = { 'C2': StickyPartitionAssignor._metadata({'t'}, assignment1['C2'].partitions(), 1), @@ -743,8 +743,8 @@ def test_assignment_with_multiple_generations2(mocker): assignment2 = StickyPartitionAssignor.assign(cluster, member_metadata) verify_validity_and_balance({'C2': {'t'}}, assignment2) - assert len(assignment2['C2'].assignment[0][1]) == 6 - assert all([partition in assignment2['C2'].assignment[0][1] for partition in assignment1['C2'].assignment[0][1]]) + assert len(assignment2['C2'].assigned_partitions[0][1]) == 6 + assert all([partition in assignment2['C2'].assigned_partitions[0][1] for partition in assignment1['C2'].assigned_partitions[0][1]]) assert StickyPartitionAssignor._latest_partition_movements.are_sticky() member_metadata = { @@ -756,9 +756,9 @@ def test_assignment_with_multiple_generations2(mocker): assignment3 = StickyPartitionAssignor.assign(cluster, member_metadata) verify_validity_and_balance({'C1': {'t'}, 'C2': {'t'}, 'C3': {'t'}}, assignment3) assert StickyPartitionAssignor._latest_partition_movements.are_sticky() - assert set(assignment3['C1'].assignment[0][1]) == set(assignment1['C1'].assignment[0][1]) - assert set(assignment3['C2'].assignment[0][1]) == set(assignment1['C2'].assignment[0][1]) - assert set(assignment3['C3'].assignment[0][1]) == set(assignment1['C3'].assignment[0][1]) + assert set(assignment3['C1'].assigned_partitions[0][1]) == set(assignment1['C1'].assigned_partitions[0][1]) + assert set(assignment3['C2'].assigned_partitions[0][1]) == set(assignment1['C2'].assigned_partitions[0][1]) + assert set(assignment3['C3'].assigned_partitions[0][1]) == set(assignment1['C3'].assigned_partitions[0][1]) @pytest.mark.parametrize('execution_number', range(50))