Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 84 additions & 79 deletions kafka/admin/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -383,68 +383,43 @@ def send_requests(self, requests_and_node_ids, response_fn=lambda x: x):
self._wait_for_futures(futures)
return [response_fn(future.value) for future in futures]

def _send_request_to_controller(self, request):
def _send_request_to_controller(self, request, get_errors_fn=lambda r: (), ignore_errors=(), raise_errors=True, tries=2):
"""Send a Kafka protocol message to the cluster controller.

Will block until the message result is received.

Arguments:
request: The message to send.

Keyword Arguments:
get_errors_fn (func): Function to process response and return an iterable of Error types.
ignore_errors (tuple): Any non-zero error codes that should be ignored. Not used if raise_errors=False.
raise_errors (bool): Whether to raise unhandled errors (True, default) or return response with errors (False).
tries (int): Number of times to refresh controller id and retry on NotControllerIdError.

Returns:
The Kafka protocol response for the message.
"""
tries = 2 # in case our cached self._controller_id is outdated
# retry in case our controller_id is out of date
while tries:
tries -= 1
response = self.send_request(request, node_id=self._controller_id)
# In Java, the error field name is inconsistent:
# - CreateTopicsResponse / CreatePartitionsResponse uses topic_errors
# - DeleteTopicsResponse uses topic_error_codes
# So this is a little brittle in that it assumes all responses have
# one of these attributes and that they always unpack into
# (topic, error_code) tuples.
topic_error_tuples = getattr(response, 'topic_errors', getattr(response, 'topic_error_codes', None))
if topic_error_tuples is not None:
success = self._parse_topic_request_response(topic_error_tuples, request, response, tries)
else:
# Leader Election request has a two layer error response (topic and partition)
success = self._parse_topic_partition_request_response(request, response, tries)

if success:
return response
raise RuntimeError("This should never happen, please file a bug with full stacktrace if encountered")

def _parse_topic_request_response(self, topic_error_tuples, request, response, tries):
for topic, error_code, *_ in topic_error_tuples:
error_type = Errors.for_code(error_code)
if tries and error_type is Errors.NotControllerError:
# No need to inspect the rest of the errors for
# non-retriable errors because NotControllerError should
# either be thrown for all errors or no errors.
self._refresh_controller_id()
return False
elif error_type is not Errors.NoError:
raise error_type(
"Request '{}' failed with response '{}'."
.format(request, response))
return True

def _parse_topic_partition_request_response(self, request, response, tries):
for topic, partition_results in response.replication_election_results:
for partition_id, error_code, *_ in partition_results:
error_type = Errors.for_code(error_code)
for error_type in get_errors_fn(response):
if tries and error_type is Errors.NotControllerError:
# No need to inspect the rest of the errors for
# non-retriable errors because NotControllerError should
# either be thrown for all errors or no errors.
self._refresh_controller_id()
return False
elif error_type not in (Errors.NoError, Errors.ElectionNotNeededError):
raise error_type(
"Request '{}' failed with response '{}'."
.format(request, response))
return True
break
elif raise_errors:
if error_type is not Errors.NoError and error_type not in ignore_errors:
raise error_type(
"Request '{}' failed with response '{}'."
.format(request, response))
else:
# No controller refresh needed
return response
raise RuntimeError("Failed to find active controller id!")

@staticmethod
def _convert_new_topic_request(new_topic):
Expand Down Expand Up @@ -472,7 +447,7 @@ def _convert_new_topic_request(new_topic):
]
)

def create_topics(self, new_topics, timeout_ms=None, validate_only=False):
def create_topics(self, new_topics, timeout_ms=None, validate_only=False, raise_errors=True):
"""Create new topics in the cluster.

Arguments:
Expand All @@ -483,6 +458,7 @@ def create_topics(self, new_topics, timeout_ms=None, validate_only=False):
before the broker returns.
validate_only (bool, optional): If True, don't actually create new topics.
Not supported by all versions. Default: False
raise_errors (bool, optional): Whether to raise errors as exceptions. Default True.

Returns:
Appropriate version of CreateTopicResponse class.
Expand All @@ -504,11 +480,15 @@ def create_topics(self, new_topics, timeout_ms=None, validate_only=False):
timeout=timeout_ms,
validate_only=validate_only
)
else:
raise RuntimeError('Version check error: %s' % version)
# TODO convert structs to a more pythonic interface
# TODO raise exceptions if errors
return self._send_request_to_controller(request) # pylint: disable=E0606
def get_response_errors(r):
for topic in r.topics:
yield Errors.for_code(topic[1])
return self._send_request_to_controller(request, get_errors_fn=get_response_errors, raise_errors=raise_errors)

def delete_topics(self, topics, timeout_ms=None):
def delete_topics(self, topics, timeout_ms=None, raise_errors=True):
"""Delete topics from the cluster.

Arguments:
Expand All @@ -517,18 +497,18 @@ def delete_topics(self, topics, timeout_ms=None):
Keyword Arguments:
timeout_ms (numeric, optional): Milliseconds to wait for topics to be deleted
before the broker returns.
raise_errors (bool, optional): Whether to raise errors as exceptions. Default True.

Returns:
Appropriate version of DeleteTopicsResponse class.
"""
version = self._client.api_version(DeleteTopicsRequest, max_version=3)
timeout_ms = self._validate_timeout(timeout_ms)
return self._send_request_to_controller(
DeleteTopicsRequest[version](
topics=topics,
timeout=timeout_ms
)
)
request = DeleteTopicsRequest[version](topics=topics, timeout=timeout_ms)
def get_response_errors(r):
for response in r.responses:
yield Errors.for_code(response[1])
return self._send_request_to_controller(request, get_errors_fn=get_response_errors, raise_errors=raise_errors)

def _process_metadata_response(self, metadata_response):
obj = metadata_response.to_object()
Expand Down Expand Up @@ -590,11 +570,9 @@ def describe_topics(self, topics=None):
return metadata['topics']

def describe_cluster(self):
"""
Fetch cluster-wide metadata such as the list of brokers, the controller ID,
"""Fetch cluster-wide metadata such as the list of brokers, the controller ID,
and the cluster ID.


Returns:
A dict with cluster-wide metadata, excluding topic details.
"""
Expand Down Expand Up @@ -1057,7 +1035,7 @@ def _convert_create_partitions_request(topic_name, new_partitions):
)
)

def create_partitions(self, topic_partitions, timeout_ms=None, validate_only=False):
def create_partitions(self, topic_partitions, timeout_ms=None, validate_only=False, raise_errors=True):
"""Create additional partitions for an existing topic.

Arguments:
Expand All @@ -1068,6 +1046,7 @@ def create_partitions(self, topic_partitions, timeout_ms=None, validate_only=Fal
created before the broker returns.
validate_only (bool, optional): If True, don't actually create new partitions.
Default: False
raise_errors (bool, optional): Whether to raise errors as exceptions. Default True.

Returns:
Appropriate version of CreatePartitionsResponse class.
Expand All @@ -1079,18 +1058,25 @@ def create_partitions(self, topic_partitions, timeout_ms=None, validate_only=Fal
timeout=timeout_ms,
validate_only=validate_only
)
return self._send_request_to_controller(request)
def get_response_errors(r):
for result in r.results:
yield Errors.for_code(result[1])
return self._send_request_to_controller(request, get_errors_fn=get_response_errors, raise_errors=raise_errors)

def _get_leader_for_partitions(self, partitions, timeout_ms=None):
"""Finds ID of the leader node for every given topic partition.

Will raise UnknownTopicOrPartitionError if for some partition no leader can be found.

:param partitions: ``[TopicPartition]``: partitions for which to find leaders.
:param timeout_ms: ``float``: Timeout in milliseconds, if None (default), will be read from
config.
Arguments:
partitions ([TopicPartition]): partitions for which to find leaders.

Keyword Arguments:
timeout_ms (numeric, optional): Timeout in milliseconds, if None (default), will be read from
config.

:return: Dictionary with ``{leader_id -> {partitions}}``
Returns:
dict with ``{leader_id -> {partitions}}``
"""
timeout_ms = self._validate_timeout(timeout_ms)

Expand Down Expand Up @@ -1120,15 +1106,19 @@ def _get_leader_for_partitions(self, partitions, timeout_ms=None):
def delete_records(self, records_to_delete, timeout_ms=None, partition_leader_id=None):
"""Delete records whose offset is smaller than the given offset of the corresponding partition.

:param records_to_delete: ``{TopicPartition: int}``: The earliest available offsets for the
given partitions.
:param timeout_ms: ``float``: Timeout in milliseconds, if None (default), will be read from
config.
:param partition_leader_id: ``str``: If specified, all deletion requests will be sent to
this node. No check is performed verifying that this is indeed the leader for all
listed partitions: use with caution.
Arguments:
records_to_delete ({TopicPartition: int}): The earliest available offsets for the
given partitions.

Keyword Arguments:
timeout_ms (numeric, optional): Timeout in milliseconds, if None (default), will be read from
config.
partition_leader_id (node_id / int, optional): If specified, all deletion requests will be sent to
this node. No check is performed verifying that this is indeed the leader for all
listed partitions: use with caution.

:return: Dictionary {topicPartition -> metadata}, where metadata is returned by the broker.
Returns:
dict {topicPartition -> metadata}, where metadata is returned by the broker.
See DeleteRecordsResponse for possible fields. error_code for all partitions is
guaranteed to be zero, otherwise an exception is raised.
"""
Expand Down Expand Up @@ -1583,16 +1573,21 @@ def _get_topic_partitions(self, topic_partitions):
return self._get_all_topic_partitions()
return self._convert_topic_partitions(topic_partitions)

def perform_leader_election(self, election_type, topic_partitions=None, timeout_ms=None):
def perform_leader_election(self, election_type, topic_partitions=None, timeout_ms=None, raise_errors=True):
"""Perform leader election on the topic partitions.

:param election_type: Type of election to attempt. 0 for Perferred, 1 for Unclean
:param topic_partitions: A map of topic name strings to partition ids list.
By default, will run on all topic partitions
:param timeout_ms: Milliseconds to wait for the leader election process to complete
before the broker returns.
Arguments:
election_type: Type of election to attempt. 0 for Preferred, 1 for Unclean

Keyword Arguments:
topic_partitions (dict): A map of topic name strings to partition ids list.
By default, will run on all topic partitions
timeout_ms (num, optional): Milliseconds to wait for the leader election process to complete
before the broker returns.
raise_errors (bool, optional): True/False whether to raise errors as exceptions. Default True.

:return: Appropriate version of ElectLeadersResponse class.
Returns:
Appropriate version of ElectLeadersResponse class.
"""
version = self._client.api_version(ElectLeadersRequest, max_version=1)
timeout_ms = self._validate_timeout(timeout_ms)
Expand All @@ -1602,7 +1597,17 @@ def perform_leader_election(self, election_type, topic_partitions=None, timeout_
timeout=timeout_ms,
)
# TODO convert structs to a more pythonic interface
return self._send_request_to_controller(request)
def get_response_errors(r):
if r.API_VERSION >= 1:
yield Errors.for_code(r.error_code)
for result in r.replica_election_results:
for partition in result[1]:
yield Errors.for_code(partition[1])
ignore_errors = (Errors.ElectionNotNeededError,)
return self._send_request_to_controller(request,
get_errors_fn=get_response_errors,
ignore_errors=ignore_errors,
raise_errors=raise_errors)

def describe_log_dirs(self):
"""Send a DescribeLogDirsRequest request to a broker.
Expand Down
3 changes: 3 additions & 0 deletions servers/0.10.1.1/resources/kafka.properties
Original file line number Diff line number Diff line change
Expand Up @@ -142,3 +142,6 @@ zookeeper.connect={zk_host}:{zk_port}/{zk_chroot}
zookeeper.connection.timeout.ms=30000
# We want to expire kafka broker sessions quickly when brokers die b/c we restart them quickly
zookeeper.session.timeout.ms=500

# Enable topic deletion for admin integration tests (defaults to false <= 0.11)
delete.topic.enable=true
3 changes: 3 additions & 0 deletions servers/0.10.2.1/resources/kafka.properties
Original file line number Diff line number Diff line change
Expand Up @@ -142,3 +142,6 @@ zookeeper.connect={zk_host}:{zk_port}/{zk_chroot}
zookeeper.connection.timeout.ms=30000
# We want to expire kafka broker sessions quickly when brokers die b/c we restart them quickly
zookeeper.session.timeout.ms=500

# Enable topic deletion for admin integration tests (defaults to false <= 0.11)
delete.topic.enable=true
3 changes: 3 additions & 0 deletions servers/0.10.2.2/resources/kafka.properties
Original file line number Diff line number Diff line change
Expand Up @@ -142,3 +142,6 @@ zookeeper.connect={zk_host}:{zk_port}/{zk_chroot}
zookeeper.connection.timeout.ms=30000
# We want to expire kafka broker sessions quickly when brokers die b/c we restart them quickly
zookeeper.session.timeout.ms=500

# Enable topic deletion for admin integration tests (defaults to false <= 0.11)
delete.topic.enable=true
3 changes: 3 additions & 0 deletions servers/0.11.0.0/resources/kafka.properties
Original file line number Diff line number Diff line change
Expand Up @@ -169,3 +169,6 @@ zookeeper.session.timeout.ms=500
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
group.initial.rebalance.delay.ms=0

# Enable topic deletion for admin integration tests (defaults to false <= 0.11)
delete.topic.enable=true
3 changes: 3 additions & 0 deletions servers/0.11.0.1/resources/kafka.properties
Original file line number Diff line number Diff line change
Expand Up @@ -169,3 +169,6 @@ zookeeper.session.timeout.ms=500
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
group.initial.rebalance.delay.ms=0

# Enable topic deletion for admin integration tests (defaults to false <= 0.11)
delete.topic.enable=true
3 changes: 3 additions & 0 deletions servers/0.11.0.2/resources/kafka.properties
Original file line number Diff line number Diff line change
Expand Up @@ -169,3 +169,6 @@ zookeeper.session.timeout.ms=500
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
group.initial.rebalance.delay.ms=0

# Enable topic deletion for admin integration tests (defaults to false <= 0.11)
delete.topic.enable=true
3 changes: 3 additions & 0 deletions servers/0.11.0.3/resources/kafka.properties
Original file line number Diff line number Diff line change
Expand Up @@ -169,3 +169,6 @@ zookeeper.session.timeout.ms=500
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
group.initial.rebalance.delay.ms=0

# Enable topic deletion for admin integration tests (defaults to false <= 0.11)
delete.topic.enable=true
Loading
Loading