Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 6 additions & 7 deletions kafka/admin/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,7 @@ def delete_topics(self, topics, timeout_ms=None, raise_errors=True):
"""
version = self._client.api_version(DeleteTopicsRequest, max_version=3)
timeout_ms = self._validate_timeout(timeout_ms)
request = DeleteTopicsRequest[version](topics=topics, timeout_ms=timeout_ms)
request = DeleteTopicsRequest[version](topic_names=topics, timeout_ms=timeout_ms)
def get_response_errors(r):
for response in r.responses:
yield Errors.for_code(response[1])
Expand Down Expand Up @@ -554,7 +554,7 @@ def list_topics(self):
A list of topic name strings.
"""
metadata = self._get_cluster_metadata(topics=None)
return [t['topic'] for t in metadata['topics']]
return [t['name'] for t in metadata['topics']]

def describe_topics(self, topics=None):
"""Fetch metadata for the specified topics or all topics if None.
Expand Down Expand Up @@ -651,12 +651,11 @@ def describe_acls(self, acl_filter):
request = DescribeAclsRequest[version](
resource_type_filter=acl_filter.resource_pattern.resource_type,
resource_name_filter=acl_filter.resource_pattern.resource_name,
resource_pattern_type_filter=acl_filter.resource_pattern.pattern_type,
pattern_type_filter=acl_filter.resource_pattern.pattern_type,
principal_filter=acl_filter.principal,
host=acl_filter.host,
host_filter=acl_filter.host,
operation=acl_filter.operation,
permission_type=acl_filter.permission_type

)
response = self.send_request(request) # pylint: disable=E0606
error_type = Errors.for_code(response.error_code)
Expand Down Expand Up @@ -1089,9 +1088,9 @@ def _get_leader_for_partitions(self, partitions, timeout_ms=None):
valid_partitions = set()
for topic in metadata.get("topics", ()):
for partition in topic.get("partitions", ()):
t2p = TopicPartition(topic=topic["topic"], partition=partition["partition"])
t2p = TopicPartition(topic=topic["name"], partition=partition["partition_index"])
if t2p in partitions:
leader2partitions[partition["leader"]].append(t2p)
leader2partitions[partition["leader_id"]].append(t2p)
valid_partitions.add(t2p)

if len(partitions) != len(valid_partitions):
Expand Down
70 changes: 35 additions & 35 deletions kafka/protocol/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@ class MetadataResponse_v0(Response):
('port', Int32))),
('topics', Array(
('error_code', Int16),
('topic', String('utf-8')),
('name', String('utf-8')),
('partitions', Array(
('error_code', Int16),
('partition', Int32),
('leader', Int32),
('replicas', Array(Int32)),
('isr', Array(Int32))))))
('partition_index', Int32),
('leader_id', Int32),
('replica_nodes', Array(Int32)),
('isr_nodes', Array(Int32))))))
)


Expand All @@ -34,14 +34,14 @@ class MetadataResponse_v1(Response):
('controller_id', Int32),
('topics', Array(
('error_code', Int16),
('topic', String('utf-8')),
('name', String('utf-8')),
('is_internal', Boolean),
('partitions', Array(
('error_code', Int16),
('partition', Int32),
('leader', Int32),
('replicas', Array(Int32)),
('isr', Array(Int32))))))
('partition_index', Int32),
('leader_id', Int32),
('replica_nodes', Array(Int32)),
('isr_nodes', Array(Int32))))))
)


Expand All @@ -58,14 +58,14 @@ class MetadataResponse_v2(Response):
('controller_id', Int32),
('topics', Array(
('error_code', Int16),
('topic', String('utf-8')),
('name', String('utf-8')),
('is_internal', Boolean),
('partitions', Array(
('error_code', Int16),
('partition', Int32),
('leader', Int32),
('replicas', Array(Int32)),
('isr', Array(Int32))))))
('partition_index', Int32),
('leader_id', Int32),
('replica_nodes', Array(Int32)),
('isr_nodes', Array(Int32))))))
)


Expand All @@ -83,14 +83,14 @@ class MetadataResponse_v3(Response):
('controller_id', Int32),
('topics', Array(
('error_code', Int16),
('topic', String('utf-8')),
('name', String('utf-8')),
('is_internal', Boolean),
('partitions', Array(
('error_code', Int16),
('partition', Int32),
('leader', Int32),
('replicas', Array(Int32)),
('isr', Array(Int32))))))
('partition_index', Int32),
('leader_id', Int32),
('replica_nodes', Array(Int32)),
('isr_nodes', Array(Int32))))))
)


Expand All @@ -114,14 +114,14 @@ class MetadataResponse_v5(Response):
('controller_id', Int32),
('topics', Array(
('error_code', Int16),
('topic', String('utf-8')),
('name', String('utf-8')),
('is_internal', Boolean),
('partitions', Array(
('error_code', Int16),
('partition', Int32),
('leader', Int32),
('replicas', Array(Int32)),
('isr', Array(Int32)),
('partition_index', Int32),
('leader_id', Int32),
('replica_nodes', Array(Int32)),
('isr_nodes', Array(Int32)),
('offline_replicas', Array(Int32))))))
)

Expand Down Expand Up @@ -149,15 +149,15 @@ class MetadataResponse_v7(Response):
('controller_id', Int32),
('topics', Array(
('error_code', Int16),
('topic', String('utf-8')),
('name', String('utf-8')),
('is_internal', Boolean),
('partitions', Array(
('error_code', Int16),
('partition', Int32),
('leader', Int32),
('partition_index', Int32),
('leader_id', Int32),
('leader_epoch', Int32),
('replicas', Array(Int32)),
('isr', Array(Int32)),
('replica_nodes', Array(Int32)),
('isr_nodes', Array(Int32)),
('offline_replicas', Array(Int32))))))
)

Expand All @@ -177,15 +177,15 @@ class MetadataResponse_v8(Response):
('controller_id', Int32),
('topics', Array(
('error_code', Int16),
('topic', String('utf-8')),
('name', String('utf-8')),
('is_internal', Boolean),
('partitions', Array(
('error_code', Int16),
('partition', Int32),
('leader', Int32),
('partition_index', Int32),
('leader_id', Int32),
('leader_epoch', Int32),
('replicas', Array(Int32)),
('isr', Array(Int32)),
('replica_nodes', Array(Int32)),
('isr_nodes', Array(Int32)),
('offline_replicas', Array(Int32)))),
('authorized_operations', BitField))),
('authorized_operations', BitField)
Expand Down
6 changes: 3 additions & 3 deletions test/integration/test_admin_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ def consumer_thread(i, group_id):
assert(len(consumer_group.members) == 1)
for member in consumer_group.members:
assert(member.member_metadata.topics[0] == topic)
assert(member.member_assignment.assignment[0][0] == topic)
assert(member.member_assignment.assigned_partitions[0][0] == topic)
consumer_groups.add(consumer_group.group)
assert(sorted(list(consumer_groups)) == group_id_list)
finally:
Expand Down Expand Up @@ -406,8 +406,8 @@ def test_create_delete_topics(kafka_admin_client):
@pytest.mark.skipif(env_kafka_version() < (2, 2), reason="Leader Election requires broker >=2.2")
def test_perform_leader_election(kafka_admin_client, topic):
topic_metadata = kafka_admin_client.describe_topics([topic])[0]
assert topic_metadata['topic'] == topic
partitions = list(map(lambda p: p['partition'], topic_metadata['partitions']))
assert topic_metadata['name'] == topic
partitions = list(map(lambda p: p['partition_index'], topic_metadata['partitions']))
election_type = 0 # Preferred
topic_partitions = {topic: partitions}
# When Leader Election is not needed (cluster is stable), error 84 is returned
Expand Down
37 changes: 17 additions & 20 deletions test/test_object_conversion.py → test/protocol/test_to_object.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
from kafka.protocol.admin import Request
from kafka.protocol.admin import Response
from kafka.protocol.types import Schema
from kafka.protocol.types import Array
from kafka.protocol.types import Int16
from kafka.protocol.types import String
from kafka.protocol.api import Request
from kafka.protocol.api import Response
from kafka.protocol.types import Schema, Array, Int16, String

import pytest

Expand Down Expand Up @@ -197,31 +194,31 @@ def test_with_metadata_response():

assert len(obj['topics']) == 2
assert obj['topics'][0]['error_code'] == 0
assert obj['topics'][0]['topic'] == 'testtopic1'
assert obj['topics'][0]['name'] == 'testtopic1'
assert obj['topics'][0]['is_internal'] is False
assert len(obj['topics'][0]['partitions']) == 2
assert obj['topics'][0]['partitions'][0]['error_code'] == 0
assert obj['topics'][0]['partitions'][0]['partition'] == 0
assert obj['topics'][0]['partitions'][0]['leader'] == 0
assert obj['topics'][0]['partitions'][0]['replicas'] == [0, 1]
assert obj['topics'][0]['partitions'][0]['isr'] == [0, 1]
assert obj['topics'][0]['partitions'][0]['partition_index'] == 0
assert obj['topics'][0]['partitions'][0]['leader_id'] == 0
assert obj['topics'][0]['partitions'][0]['replica_nodes'] == [0, 1]
assert obj['topics'][0]['partitions'][0]['isr_nodes'] == [0, 1]
assert obj['topics'][0]['partitions'][0]['offline_replicas'] == []
assert obj['topics'][0]['partitions'][1]['error_code'] == 0
assert obj['topics'][0]['partitions'][1]['partition'] == 1
assert obj['topics'][0]['partitions'][1]['leader'] == 1
assert obj['topics'][0]['partitions'][1]['replicas'] == [1, 0]
assert obj['topics'][0]['partitions'][1]['isr'] == [1, 0]
assert obj['topics'][0]['partitions'][1]['partition_index'] == 1
assert obj['topics'][0]['partitions'][1]['leader_id'] == 1
assert obj['topics'][0]['partitions'][1]['replica_nodes'] == [1, 0]
assert obj['topics'][0]['partitions'][1]['isr_nodes'] == [1, 0]
assert obj['topics'][0]['partitions'][1]['offline_replicas'] == []

assert obj['topics'][1]['error_code'] == 0
assert obj['topics'][1]['topic'] == 'other-test-topic'
assert obj['topics'][1]['name'] == 'other-test-topic'
assert obj['topics'][1]['is_internal'] is True
assert len(obj['topics'][1]['partitions']) == 1
assert obj['topics'][1]['partitions'][0]['error_code'] == 0
assert obj['topics'][1]['partitions'][0]['partition'] == 0
assert obj['topics'][1]['partitions'][0]['leader'] == 0
assert obj['topics'][1]['partitions'][0]['replicas'] == [0, 1]
assert obj['topics'][1]['partitions'][0]['isr'] == [0, 1]
assert obj['topics'][1]['partitions'][0]['partition_index'] == 0
assert obj['topics'][1]['partitions'][0]['leader_id'] == 0
assert obj['topics'][1]['partitions'][0]['replica_nodes'] == [0, 1]
assert obj['topics'][1]['partitions'][0]['isr_nodes'] == [0, 1]
assert obj['topics'][1]['partitions'][0]['offline_replicas'] == []

tc.encode()
36 changes: 18 additions & 18 deletions test/test_assignors.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ def test_sticky_add_remove_consumer_one_topic(mocker):

assignment = StickyPartitionAssignor.assign(cluster, member_metadata)
verify_validity_and_balance(subscriptions, assignment)
assert len(assignment['C2'].assignment[0][1]) == 3
assert len(assignment['C2'].assigned_partitions[0][1]) == 3


def test_sticky_add_remove_topic_two_consumers(mocker):
Expand Down Expand Up @@ -693,9 +693,9 @@ def test_assignment_with_multiple_generations1(mocker):

assignment1 = StickyPartitionAssignor.assign(cluster, member_metadata)
verify_validity_and_balance({'C1': {'t'}, 'C2': {'t'}, 'C3': {'t'}}, assignment1)
assert len(assignment1['C1'].assignment[0][1]) == 2
assert len(assignment1['C2'].assignment[0][1]) == 2
assert len(assignment1['C3'].assignment[0][1]) == 2
assert len(assignment1['C1'].assigned_partitions[0][1]) == 2
assert len(assignment1['C2'].assigned_partitions[0][1]) == 2
assert len(assignment1['C3'].assigned_partitions[0][1]) == 2

member_metadata = {
'C1': StickyPartitionAssignor._metadata({'t'}, assignment1['C1'].partitions()),
Expand All @@ -704,10 +704,10 @@ def test_assignment_with_multiple_generations1(mocker):

assignment2 = StickyPartitionAssignor.assign(cluster, member_metadata)
verify_validity_and_balance({'C1': {'t'}, 'C2': {'t'}}, assignment2)
assert len(assignment2['C1'].assignment[0][1]) == 3
assert len(assignment2['C2'].assignment[0][1]) == 3
assert all([partition in assignment2['C1'].assignment[0][1] for partition in assignment1['C1'].assignment[0][1]])
assert all([partition in assignment2['C2'].assignment[0][1] for partition in assignment1['C2'].assignment[0][1]])
assert len(assignment2['C1'].assigned_partitions[0][1]) == 3
assert len(assignment2['C2'].assigned_partitions[0][1]) == 3
assert all([partition in assignment2['C1'].assigned_partitions[0][1] for partition in assignment1['C1'].assigned_partitions[0][1]])
assert all([partition in assignment2['C2'].assigned_partitions[0][1] for partition in assignment1['C2'].assigned_partitions[0][1]])
assert StickyPartitionAssignor._latest_partition_movements.are_sticky()

member_metadata = {
Expand All @@ -717,8 +717,8 @@ def test_assignment_with_multiple_generations1(mocker):

assignment3 = StickyPartitionAssignor.assign(cluster, member_metadata)
verify_validity_and_balance({'C2': {'t'}, 'C3': {'t'}}, assignment3)
assert len(assignment3['C2'].assignment[0][1]) == 3
assert len(assignment3['C3'].assignment[0][1]) == 3
assert len(assignment3['C2'].assigned_partitions[0][1]) == 3
assert len(assignment3['C3'].assigned_partitions[0][1]) == 3
assert StickyPartitionAssignor._latest_partition_movements.are_sticky()


Expand All @@ -733,18 +733,18 @@ def test_assignment_with_multiple_generations2(mocker):

assignment1 = StickyPartitionAssignor.assign(cluster, member_metadata)
verify_validity_and_balance({'C1': {'t'}, 'C2': {'t'}, 'C3': {'t'}}, assignment1)
assert len(assignment1['C1'].assignment[0][1]) == 2
assert len(assignment1['C2'].assignment[0][1]) == 2
assert len(assignment1['C3'].assignment[0][1]) == 2
assert len(assignment1['C1'].assigned_partitions[0][1]) == 2
assert len(assignment1['C2'].assigned_partitions[0][1]) == 2
assert len(assignment1['C3'].assigned_partitions[0][1]) == 2

member_metadata = {
'C2': StickyPartitionAssignor._metadata({'t'}, assignment1['C2'].partitions(), 1),
}

assignment2 = StickyPartitionAssignor.assign(cluster, member_metadata)
verify_validity_and_balance({'C2': {'t'}}, assignment2)
assert len(assignment2['C2'].assignment[0][1]) == 6
assert all([partition in assignment2['C2'].assignment[0][1] for partition in assignment1['C2'].assignment[0][1]])
assert len(assignment2['C2'].assigned_partitions[0][1]) == 6
assert all([partition in assignment2['C2'].assigned_partitions[0][1] for partition in assignment1['C2'].assigned_partitions[0][1]])
assert StickyPartitionAssignor._latest_partition_movements.are_sticky()

member_metadata = {
Expand All @@ -756,9 +756,9 @@ def test_assignment_with_multiple_generations2(mocker):
assignment3 = StickyPartitionAssignor.assign(cluster, member_metadata)
verify_validity_and_balance({'C1': {'t'}, 'C2': {'t'}, 'C3': {'t'}}, assignment3)
assert StickyPartitionAssignor._latest_partition_movements.are_sticky()
assert set(assignment3['C1'].assignment[0][1]) == set(assignment1['C1'].assignment[0][1])
assert set(assignment3['C2'].assignment[0][1]) == set(assignment1['C2'].assignment[0][1])
assert set(assignment3['C3'].assignment[0][1]) == set(assignment1['C3'].assignment[0][1])
assert set(assignment3['C1'].assigned_partitions[0][1]) == set(assignment1['C1'].assigned_partitions[0][1])
assert set(assignment3['C2'].assigned_partitions[0][1]) == set(assignment1['C2'].assigned_partitions[0][1])
assert set(assignment3['C3'].assigned_partitions[0][1]) == set(assignment1['C3'].assigned_partitions[0][1])


@pytest.mark.parametrize('execution_number', range(50))
Expand Down
Loading