Skip to content

Commit dcc646e

Browse files
committed
kafka.admin: use new protocol attrs
1 parent 32b1dd1 commit dcc646e

3 files changed

Lines changed: 29 additions & 29 deletions

File tree

kafka/admin/client.py

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -471,13 +471,13 @@ def create_topics(self, new_topics, timeout_ms=None, validate_only=False, raise_
471471
"validate_only requires CreateTopicsRequest >= v1, which is not supported by Kafka {}."
472472
.format(self.config['api_version']))
473473
request = CreateTopicsRequest[version](
474-
create_topic_requests=[self._convert_new_topic_request(new_topic) for new_topic in new_topics],
475-
timeout=timeout_ms
474+
topics=[self._convert_new_topic_request(new_topic) for new_topic in new_topics],
475+
timeout_ms=timeout_ms
476476
)
477477
elif version <= 3:
478478
request = CreateTopicsRequest[version](
479-
create_topic_requests=[self._convert_new_topic_request(new_topic) for new_topic in new_topics],
480-
timeout=timeout_ms,
479+
topics=[self._convert_new_topic_request(new_topic) for new_topic in new_topics],
480+
timeout_ms=timeout_ms,
481481
validate_only=validate_only
482482
)
483483
else:
@@ -504,7 +504,7 @@ def delete_topics(self, topics, timeout_ms=None, raise_errors=True):
504504
"""
505505
version = self._client.api_version(DeleteTopicsRequest, max_version=3)
506506
timeout_ms = self._validate_timeout(timeout_ms)
507-
request = DeleteTopicsRequest[version](topics=topics, timeout=timeout_ms)
507+
request = DeleteTopicsRequest[version](topics=topics, timeout_ms=timeout_ms)
508508
def get_response_errors(r):
509509
for response in r.responses:
510510
yield Errors.for_code(response[1])
@@ -640,19 +640,19 @@ def describe_acls(self, acl_filter):
640640
version = self._client.api_version(DescribeAclsRequest, max_version=1)
641641
if version == 0:
642642
request = DescribeAclsRequest[version](
643-
resource_type=acl_filter.resource_pattern.resource_type,
644-
resource_name=acl_filter.resource_pattern.resource_name,
645-
principal=acl_filter.principal,
646-
host=acl_filter.host,
643+
resource_type_filter=acl_filter.resource_pattern.resource_type,
644+
resource_name_filter=acl_filter.resource_pattern.resource_name,
645+
principal_filter=acl_filter.principal,
646+
host_filter=acl_filter.host,
647647
operation=acl_filter.operation,
648648
permission_type=acl_filter.permission_type
649649
)
650650
elif version <= 1:
651651
request = DescribeAclsRequest[version](
652-
resource_type=acl_filter.resource_pattern.resource_type,
653-
resource_name=acl_filter.resource_pattern.resource_name,
652+
resource_type_filter=acl_filter.resource_pattern.resource_type,
653+
resource_name_filter=acl_filter.resource_pattern.resource_name,
654654
resource_pattern_type_filter=acl_filter.resource_pattern.pattern_type,
655-
principal=acl_filter.principal,
655+
principal_filter=acl_filter.principal,
656656
host=acl_filter.host,
657657
operation=acl_filter.operation,
658658
permission_type=acl_filter.permission_type
@@ -727,7 +727,7 @@ def _convert_create_acls_response_to_acls(acls, create_response):
727727

728728
creations_error = []
729729
creations_success = []
730-
for i, creations in enumerate(create_response.creation_responses):
730+
for i, creations in enumerate(create_response.results):
731731
if version <= 1:
732732
error_code, error_message = creations
733733
acl = acls[i]
@@ -827,7 +827,7 @@ def _convert_delete_acls_response_to_matching_acls(acl_filters, delete_response)
827827
"""
828828
version = delete_response.API_VERSION
829829
filter_result_list = []
830-
for i, filter_responses in enumerate(delete_response.filter_responses):
830+
for i, filter_responses in enumerate(delete_response.filter_results):
831831
filter_error_code, filter_error_message, matching_acls = filter_responses
832832
filter_error = Errors.for_code(filter_error_code)
833833
acl_result_list = []
@@ -1054,8 +1054,8 @@ def create_partitions(self, topic_partitions, timeout_ms=None, validate_only=Fal
10541054
version = self._client.api_version(CreatePartitionsRequest, max_version=1)
10551055
timeout_ms = self._validate_timeout(timeout_ms)
10561056
request = CreatePartitionsRequest[version](
1057-
topic_partitions=[self._convert_create_partitions_request(topic_name, new_partitions) for topic_name, new_partitions in topic_partitions.items()],
1058-
timeout=timeout_ms,
1057+
topics=[self._convert_create_partitions_request(topic_name, new_partitions) for topic_name, new_partitions in topic_partitions.items()],
1058+
timeout_ms=timeout_ms,
10591059
validate_only=validate_only
10601060
)
10611061
def get_response_errors(r):
@@ -1594,7 +1594,7 @@ def perform_leader_election(self, election_type, topic_partitions=None, timeout_
15941594
request = ElectLeadersRequest[version](
15951595
election_type=ElectionType(election_type),
15961596
topic_partitions=self._get_topic_partitions(topic_partitions),
1597-
timeout=timeout_ms,
1597+
timeout_ms=timeout_ms,
15981598
)
15991599
# TODO convert structs to a more pythonic interface
16001600
def get_response_errors(r):

test/integration/fixtures.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -614,7 +614,7 @@ def _create_topic_via_admin_api(self, topic_name, num_partitions, replication_fa
614614
request = CreateTopicsRequest[0]([(topic_name, num_partitions,
615615
replication_factor, [], [])], timeout_ms)
616616
response = self._send_request(request, timeout=timeout_ms)
617-
for topic_result in response.topic_errors:
617+
for topic_result in response.topics:
618618
error_code = topic_result[1]
619619
if error_code != 0:
620620
raise errors.for_code(error_code)

test/integration/test_admin_integration.py

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -103,9 +103,9 @@ def test_describe_configs_broker_resource_returns_configs(kafka_admin_client):
103103
configs = kafka_admin_client.describe_configs([ConfigResource(ConfigResourceType.BROKER, broker_id)])
104104

105105
assert len(configs) == 1
106-
assert configs[0].resources[0][2] == ConfigResourceType.BROKER
107-
assert configs[0].resources[0][3] == str(broker_id)
108-
assert len(configs[0].resources[0][4]) > 1
106+
assert configs[0].results[0][2] == ConfigResourceType.BROKER
107+
assert configs[0].results[0][3] == str(broker_id)
108+
assert len(configs[0].results[0][4]) > 1
109109

110110

111111
@pytest.mark.xfail(condition=True,
@@ -118,9 +118,9 @@ def test_describe_configs_topic_resource_returns_configs(topic, kafka_admin_clie
118118
configs = kafka_admin_client.describe_configs([ConfigResource(ConfigResourceType.TOPIC, topic)])
119119

120120
assert len(configs) == 1
121-
assert configs[0].resources[0][2] == ConfigResourceType.TOPIC
122-
assert configs[0].resources[0][3] == topic
123-
assert len(configs[0].resources[0][4]) > 1
121+
assert configs[0].results[0][2] == ConfigResourceType.TOPIC
122+
assert configs[0].results[0][3] == topic
123+
assert len(configs[0].results[0][4]) > 1
124124

125125

126126
@pytest.mark.skipif(env_kafka_version() < (0, 11), reason="Describe config features require broker >=0.11")
@@ -135,11 +135,11 @@ def test_describe_configs_mixed_resources_returns_configs(topic, kafka_admin_cli
135135
assert len(configs) == 2
136136

137137
for config in configs:
138-
assert (config.resources[0][2] == ConfigResourceType.TOPIC
139-
and config.resources[0][3] == topic) or \
140-
(config.resources[0][2] == ConfigResourceType.BROKER
141-
and config.resources[0][3] == str(broker_id))
142-
assert len(config.resources[0][4]) > 1
138+
assert (config.results[0][2] == ConfigResourceType.TOPIC
139+
and config.results[0][3] == topic) or \
140+
(config.results[0][2] == ConfigResourceType.BROKER
141+
and config.results[0][3] == str(broker_id))
142+
assert len(config.results[0][4]) > 1
143143

144144

145145
@pytest.mark.skipif(env_kafka_version() < (0, 11), reason="Describe config features require broker >=0.11")

0 commit comments

Comments
 (0)