diff --git a/kafka/admin/client.py b/kafka/admin/client.py index 69ac0c4b1..025ec11ce 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -383,7 +383,7 @@ def send_requests(self, requests_and_node_ids, response_fn=lambda x: x): self._wait_for_futures(futures) return [response_fn(future.value) for future in futures] - def _send_request_to_controller(self, request): + def _send_request_to_controller(self, request, get_errors_fn=lambda r: (), ignore_errors=(), raise_errors=True, tries=2): """Send a Kafka protocol message to the cluster controller. Will block until the message result is received. @@ -391,60 +391,35 @@ def _send_request_to_controller(self, request): Arguments: request: The message to send. + Keyword Arguments: + get_errors_fn (func): Function to process response and return an iterable of Error types. + ignore_errors (tuple): Any non-zero error codes that should be ignored. Not used if raise_errors=False. + raise_errors (bool): Whether to raise unhandled errors (True, default) or return response with errors (False). + tries (int): Number of times to refresh controller id and retry on NotControllerIdError. + Returns: The Kafka protocol response for the message. """ - tries = 2 # in case our cached self._controller_id is outdated + # retry in case our controller_id is out of date while tries: tries -= 1 response = self.send_request(request, node_id=self._controller_id) - # In Java, the error field name is inconsistent: - # - CreateTopicsResponse / CreatePartitionsResponse uses topic_errors - # - DeleteTopicsResponse uses topic_error_codes - # So this is a little brittle in that it assumes all responses have - # one of these attributes and that they always unpack into - # (topic, error_code) tuples. - topic_error_tuples = getattr(response, 'topic_errors', getattr(response, 'topic_error_codes', None)) - if topic_error_tuples is not None: - success = self._parse_topic_request_response(topic_error_tuples, request, response, tries) - else: - # Leader Election request has a two layer error response (topic and partition) - success = self._parse_topic_partition_request_response(request, response, tries) - - if success: - return response - raise RuntimeError("This should never happen, please file a bug with full stacktrace if encountered") - - def _parse_topic_request_response(self, topic_error_tuples, request, response, tries): - for topic, error_code, *_ in topic_error_tuples: - error_type = Errors.for_code(error_code) - if tries and error_type is Errors.NotControllerError: - # No need to inspect the rest of the errors for - # non-retriable errors because NotControllerError should - # either be thrown for all errors or no errors. - self._refresh_controller_id() - return False - elif error_type is not Errors.NoError: - raise error_type( - "Request '{}' failed with response '{}'." - .format(request, response)) - return True - - def _parse_topic_partition_request_response(self, request, response, tries): - for topic, partition_results in response.replication_election_results: - for partition_id, error_code, *_ in partition_results: - error_type = Errors.for_code(error_code) + for error_type in get_errors_fn(response): if tries and error_type is Errors.NotControllerError: # No need to inspect the rest of the errors for # non-retriable errors because NotControllerError should # either be thrown for all errors or no errors. self._refresh_controller_id() - return False - elif error_type not in (Errors.NoError, Errors.ElectionNotNeededError): - raise error_type( - "Request '{}' failed with response '{}'." - .format(request, response)) - return True + break + elif raise_errors: + if error_type is not Errors.NoError and error_type not in ignore_errors: + raise error_type( + "Request '{}' failed with response '{}'." + .format(request, response)) + else: + # No controller refresh needed + return response + raise RuntimeError("Failed to find active controller id!") @staticmethod def _convert_new_topic_request(new_topic): @@ -472,7 +447,7 @@ def _convert_new_topic_request(new_topic): ] ) - def create_topics(self, new_topics, timeout_ms=None, validate_only=False): + def create_topics(self, new_topics, timeout_ms=None, validate_only=False, raise_errors=True): """Create new topics in the cluster. Arguments: @@ -483,6 +458,7 @@ def create_topics(self, new_topics, timeout_ms=None, validate_only=False): before the broker returns. validate_only (bool, optional): If True, don't actually create new topics. Not supported by all versions. Default: False + raise_errors (bool, optional): Whether to raise errors as exceptions. Default True. Returns: Appropriate version of CreateTopicResponse class. @@ -504,11 +480,15 @@ def create_topics(self, new_topics, timeout_ms=None, validate_only=False): timeout=timeout_ms, validate_only=validate_only ) + else: + raise RuntimeError('Version check error: %s' % version) # TODO convert structs to a more pythonic interface - # TODO raise exceptions if errors - return self._send_request_to_controller(request) # pylint: disable=E0606 + def get_response_errors(r): + for topic in r.topics: + yield Errors.for_code(topic[1]) + return self._send_request_to_controller(request, get_errors_fn=get_response_errors, raise_errors=raise_errors) - def delete_topics(self, topics, timeout_ms=None): + def delete_topics(self, topics, timeout_ms=None, raise_errors=True): """Delete topics from the cluster. Arguments: @@ -517,18 +497,18 @@ def delete_topics(self, topics, timeout_ms=None): Keyword Arguments: timeout_ms (numeric, optional): Milliseconds to wait for topics to be deleted before the broker returns. + raise_errors (bool, optional): Whether to raise errors as exceptions. Default True. Returns: Appropriate version of DeleteTopicsResponse class. """ version = self._client.api_version(DeleteTopicsRequest, max_version=3) timeout_ms = self._validate_timeout(timeout_ms) - return self._send_request_to_controller( - DeleteTopicsRequest[version]( - topics=topics, - timeout=timeout_ms - ) - ) + request = DeleteTopicsRequest[version](topics=topics, timeout=timeout_ms) + def get_response_errors(r): + for response in r.responses: + yield Errors.for_code(response[1]) + return self._send_request_to_controller(request, get_errors_fn=get_response_errors, raise_errors=raise_errors) def _process_metadata_response(self, metadata_response): obj = metadata_response.to_object() @@ -590,11 +570,9 @@ def describe_topics(self, topics=None): return metadata['topics'] def describe_cluster(self): - """ - Fetch cluster-wide metadata such as the list of brokers, the controller ID, + """Fetch cluster-wide metadata such as the list of brokers, the controller ID, and the cluster ID. - Returns: A dict with cluster-wide metadata, excluding topic details. """ @@ -1057,7 +1035,7 @@ def _convert_create_partitions_request(topic_name, new_partitions): ) ) - def create_partitions(self, topic_partitions, timeout_ms=None, validate_only=False): + def create_partitions(self, topic_partitions, timeout_ms=None, validate_only=False, raise_errors=True): """Create additional partitions for an existing topic. Arguments: @@ -1068,6 +1046,7 @@ def create_partitions(self, topic_partitions, timeout_ms=None, validate_only=Fal created before the broker returns. validate_only (bool, optional): If True, don't actually create new partitions. Default: False + raise_errors (bool, optional): Whether to raise errors as exceptions. Default True. Returns: Appropriate version of CreatePartitionsResponse class. @@ -1079,18 +1058,25 @@ def create_partitions(self, topic_partitions, timeout_ms=None, validate_only=Fal timeout=timeout_ms, validate_only=validate_only ) - return self._send_request_to_controller(request) + def get_response_errors(r): + for result in r.results: + yield Errors.for_code(result[1]) + return self._send_request_to_controller(request, get_errors_fn=get_response_errors, raise_errors=raise_errors) def _get_leader_for_partitions(self, partitions, timeout_ms=None): """Finds ID of the leader node for every given topic partition. Will raise UnknownTopicOrPartitionError if for some partition no leader can be found. - :param partitions: ``[TopicPartition]``: partitions for which to find leaders. - :param timeout_ms: ``float``: Timeout in milliseconds, if None (default), will be read from - config. + Arguments: + partitions ([TopicPartition]): partitions for which to find leaders. + + Keyword Arguments: + timeout_ms (numeric, optional): Timeout in milliseconds, if None (default), will be read from + config. - :return: Dictionary with ``{leader_id -> {partitions}}`` + Returns: + dict with ``{leader_id -> {partitions}}`` """ timeout_ms = self._validate_timeout(timeout_ms) @@ -1120,15 +1106,19 @@ def _get_leader_for_partitions(self, partitions, timeout_ms=None): def delete_records(self, records_to_delete, timeout_ms=None, partition_leader_id=None): """Delete records whose offset is smaller than the given offset of the corresponding partition. - :param records_to_delete: ``{TopicPartition: int}``: The earliest available offsets for the - given partitions. - :param timeout_ms: ``float``: Timeout in milliseconds, if None (default), will be read from - config. - :param partition_leader_id: ``str``: If specified, all deletion requests will be sent to - this node. No check is performed verifying that this is indeed the leader for all - listed partitions: use with caution. + Arguments: + records_to_delete ({TopicPartition: int}): The earliest available offsets for the + given partitions. + + Keyword Arguments: + timeout_ms (numeric, optional): Timeout in milliseconds, if None (default), will be read from + config. + partition_leader_id (node_id / int, optional): If specified, all deletion requests will be sent to + this node. No check is performed verifying that this is indeed the leader for all + listed partitions: use with caution. - :return: Dictionary {topicPartition -> metadata}, where metadata is returned by the broker. + Returns: + dict {topicPartition -> metadata}, where metadata is returned by the broker. See DeleteRecordsResponse for possible fields. error_code for all partitions is guaranteed to be zero, otherwise an exception is raised. """ @@ -1583,16 +1573,21 @@ def _get_topic_partitions(self, topic_partitions): return self._get_all_topic_partitions() return self._convert_topic_partitions(topic_partitions) - def perform_leader_election(self, election_type, topic_partitions=None, timeout_ms=None): + def perform_leader_election(self, election_type, topic_partitions=None, timeout_ms=None, raise_errors=True): """Perform leader election on the topic partitions. - :param election_type: Type of election to attempt. 0 for Perferred, 1 for Unclean - :param topic_partitions: A map of topic name strings to partition ids list. - By default, will run on all topic partitions - :param timeout_ms: Milliseconds to wait for the leader election process to complete - before the broker returns. + Arguments: + election_type: Type of election to attempt. 0 for Preferred, 1 for Unclean + + Keyword Arguments: + topic_partitions (dict): A map of topic name strings to partition ids list. + By default, will run on all topic partitions + timeout_ms (num, optional): Milliseconds to wait for the leader election process to complete + before the broker returns. + raise_errors (bool, optional): True/False whether to raise errors as exceptions. Default True. - :return: Appropriate version of ElectLeadersResponse class. + Returns: + Appropriate version of ElectLeadersResponse class. """ version = self._client.api_version(ElectLeadersRequest, max_version=1) timeout_ms = self._validate_timeout(timeout_ms) @@ -1602,7 +1597,17 @@ def perform_leader_election(self, election_type, topic_partitions=None, timeout_ timeout=timeout_ms, ) # TODO convert structs to a more pythonic interface - return self._send_request_to_controller(request) + def get_response_errors(r): + if r.API_VERSION >= 1: + yield Errors.for_code(r.error_code) + for result in r.replica_election_results: + for partition in result[1]: + yield Errors.for_code(partition[1]) + ignore_errors = (Errors.ElectionNotNeededError,) + return self._send_request_to_controller(request, + get_errors_fn=get_response_errors, + ignore_errors=ignore_errors, + raise_errors=raise_errors) def describe_log_dirs(self): """Send a DescribeLogDirsRequest request to a broker. diff --git a/servers/0.10.1.1/resources/kafka.properties b/servers/0.10.1.1/resources/kafka.properties index daab312b0..32b9c0a17 100644 --- a/servers/0.10.1.1/resources/kafka.properties +++ b/servers/0.10.1.1/resources/kafka.properties @@ -142,3 +142,6 @@ zookeeper.connect={zk_host}:{zk_port}/{zk_chroot} zookeeper.connection.timeout.ms=30000 # We want to expire kafka broker sessions quickly when brokers die b/c we restart them quickly zookeeper.session.timeout.ms=500 + +# Enable topic deletion for admin integration tests (defaults to false <= 0.11) +delete.topic.enable=true diff --git a/servers/0.10.2.1/resources/kafka.properties b/servers/0.10.2.1/resources/kafka.properties index daab312b0..32b9c0a17 100644 --- a/servers/0.10.2.1/resources/kafka.properties +++ b/servers/0.10.2.1/resources/kafka.properties @@ -142,3 +142,6 @@ zookeeper.connect={zk_host}:{zk_port}/{zk_chroot} zookeeper.connection.timeout.ms=30000 # We want to expire kafka broker sessions quickly when brokers die b/c we restart them quickly zookeeper.session.timeout.ms=500 + +# Enable topic deletion for admin integration tests (defaults to false <= 0.11) +delete.topic.enable=true diff --git a/servers/0.10.2.2/resources/kafka.properties b/servers/0.10.2.2/resources/kafka.properties index daab312b0..32b9c0a17 100644 --- a/servers/0.10.2.2/resources/kafka.properties +++ b/servers/0.10.2.2/resources/kafka.properties @@ -142,3 +142,6 @@ zookeeper.connect={zk_host}:{zk_port}/{zk_chroot} zookeeper.connection.timeout.ms=30000 # We want to expire kafka broker sessions quickly when brokers die b/c we restart them quickly zookeeper.session.timeout.ms=500 + +# Enable topic deletion for admin integration tests (defaults to false <= 0.11) +delete.topic.enable=true diff --git a/servers/0.11.0.0/resources/kafka.properties b/servers/0.11.0.0/resources/kafka.properties index 219023551..e025798dd 100644 --- a/servers/0.11.0.0/resources/kafka.properties +++ b/servers/0.11.0.0/resources/kafka.properties @@ -169,3 +169,6 @@ zookeeper.session.timeout.ms=500 # We override this to 0 here as it makes for a better out-of-the-box experience for development and testing. # However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup. group.initial.rebalance.delay.ms=0 + +# Enable topic deletion for admin integration tests (defaults to false <= 0.11) +delete.topic.enable=true diff --git a/servers/0.11.0.1/resources/kafka.properties b/servers/0.11.0.1/resources/kafka.properties index 219023551..e025798dd 100644 --- a/servers/0.11.0.1/resources/kafka.properties +++ b/servers/0.11.0.1/resources/kafka.properties @@ -169,3 +169,6 @@ zookeeper.session.timeout.ms=500 # We override this to 0 here as it makes for a better out-of-the-box experience for development and testing. # However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup. group.initial.rebalance.delay.ms=0 + +# Enable topic deletion for admin integration tests (defaults to false <= 0.11) +delete.topic.enable=true diff --git a/servers/0.11.0.2/resources/kafka.properties b/servers/0.11.0.2/resources/kafka.properties index 219023551..e025798dd 100644 --- a/servers/0.11.0.2/resources/kafka.properties +++ b/servers/0.11.0.2/resources/kafka.properties @@ -169,3 +169,6 @@ zookeeper.session.timeout.ms=500 # We override this to 0 here as it makes for a better out-of-the-box experience for development and testing. # However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup. group.initial.rebalance.delay.ms=0 + +# Enable topic deletion for admin integration tests (defaults to false <= 0.11) +delete.topic.enable=true diff --git a/servers/0.11.0.3/resources/kafka.properties b/servers/0.11.0.3/resources/kafka.properties index 219023551..e025798dd 100644 --- a/servers/0.11.0.3/resources/kafka.properties +++ b/servers/0.11.0.3/resources/kafka.properties @@ -169,3 +169,6 @@ zookeeper.session.timeout.ms=500 # We override this to 0 here as it makes for a better out-of-the-box experience for development and testing. # However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup. group.initial.rebalance.delay.ms=0 + +# Enable topic deletion for admin integration tests (defaults to false <= 0.11) +delete.topic.enable=true diff --git a/test/integration/test_admin_integration.py b/test/integration/test_admin_integration.py index 93382c65c..d47663465 100644 --- a/test/integration/test_admin_integration.py +++ b/test/integration/test_admin_integration.py @@ -1,16 +1,22 @@ -from kafka.structs import TopicPartition -import pytest - from logging import info -from test.testutil import env_kafka_version, random_string from threading import Event, Thread from time import time, sleep +import pytest + from kafka.admin import ( - ACLFilter, ACLOperation, ACLPermissionType, ResourcePattern, ResourceType, ACL, ConfigResource, ConfigResourceType) + ACLFilter, ACLOperation, ACLPermissionType, + ResourcePattern, ResourceType, ACL, + ConfigResource, ConfigResourceType, + NewTopic, +) from kafka.errors import ( - BrokerResponseError, NoError, CoordinatorNotAvailableError, NonEmptyGroupError, - GroupIdNotFoundError, OffsetOutOfRangeError, UnknownTopicOrPartitionError) + BrokerResponseError, NoError, CoordinatorNotAvailableError, + NonEmptyGroupError, GroupIdNotFoundError, OffsetOutOfRangeError, + UnknownTopicOrPartitionError, ElectionNotNeededError, +) +from kafka.structs import TopicPartition +from test.testutil import env_kafka_version, random_string @pytest.mark.skipif(env_kafka_version() < (0, 11), reason="ACL features require broker >=0.11") @@ -383,3 +389,35 @@ def test_delete_records_with_errors(kafka_admin_client, topic, send_messages): kafka_admin_client.delete_records({p0: 1000}) with pytest.raises(BrokerResponseError): kafka_admin_client.delete_records({p0: 1000, p1: 1000}) + + +@pytest.mark.skipif(env_kafka_version() < (0, 10, 1), reason="Create topics requires broker >=0.10.1") +def test_create_delete_topics(kafka_admin_client): + topic_name = random_string(4) + response = kafka_admin_client.create_topics([NewTopic(topic_name, 1, 1)]) + assert response.topics[0][0] == topic_name + assert response.topics[0][1] == 0 # NoError + + response = kafka_admin_client.delete_topics([topic_name]) + assert response.responses[0][0] == topic_name + assert response.responses[0][1] == 0 # NoError + + +@pytest.mark.skipif(env_kafka_version() < (2, 2), reason="Leader Election requires broker >=2.2") +def test_perform_leader_election(kafka_admin_client, topic): + topic_metadata = kafka_admin_client.describe_topics([topic])[0] + assert topic_metadata['topic'] == topic + partitions = list(map(lambda p: p['partition'], topic_metadata['partitions'])) + election_type = 0 # Preferred + topic_partitions = {topic: partitions} + # When Leader Election is not needed (cluster is stable), error 84 is returned + response = kafka_admin_client.perform_leader_election(election_type, topic_partitions) + assert len(response.replica_election_results) == 1 + result = response.replica_election_results[0] + assert result[0] == topic + partition_set = set(partitions) + for partition in result[1]: + assert partition[0] in partition_set + partition_set.remove(partition[0]) + assert partition[1] == ElectionNotNeededError.errno + assert partition_set == set()