diff --git a/kafka/conn.py b/kafka/conn.py index 451bd4d1a..4ffb8e850 100755 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -11,16 +11,12 @@ import kafka.errors as Errors from kafka.future import Future from kafka.metrics.stats import Avg, Count, Max, Rate -from kafka.protocol.admin import DescribeAclsRequest, DescribeClientQuotasRequest, ListGroupsRequest from kafka.protocol.api_versions import ApiVersionsRequest -from kafka.protocol.broker_api_versions import BROKER_API_VERSIONS -from kafka.protocol.commit import OffsetFetchRequest -from kafka.protocol.fetch import FetchRequest -from kafka.protocol.find_coordinator import FindCoordinatorRequest -from kafka.protocol.list_offsets import ListOffsetsRequest -from kafka.protocol.metadata import MetadataRequest +from kafka.protocol.broker_api_versions import ( + BROKER_API_VERSIONS, VERSION_CHECKS, + infer_broker_version_from_api_versions, +) from kafka.protocol.parser import KafkaProtocol -from kafka.protocol.produce import ProduceRequest from kafka.protocol.sasl_authenticate import SaslAuthenticateRequest from kafka.protocol.sasl_handshake import SaslHandshakeRequest from kafka.protocol.types import Int32 @@ -216,12 +212,6 @@ class BrokerConnection(object): 'socks5_proxy': None, } SECURITY_PROTOCOLS = ('PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL') - VERSION_CHECKS = ( - ((0, 9), ListGroupsRequest[0]()), - ((0, 8, 2), FindCoordinatorRequest[0]('kafka-python-default-group')), - ((0, 8, 1), OffsetFetchRequest[0]('kafka-python-default-group', [])), - ((0, 8, 0), MetadataRequest[0]([])), - ) def __init__(self, host, port, afi, **configs): self.host = host @@ -556,8 +546,8 @@ def _try_api_versions_check(self): self._api_versions_future = future self.state = ConnectionStates.API_VERSIONS_RECV self.config['state_change_callback'](self.node_id, self._sock, self) - elif self._check_version_idx < len(self.VERSION_CHECKS): - version, request = self.VERSION_CHECKS[self._check_version_idx] + elif self._check_version_idx < len(VERSION_CHECKS): + version, request = VERSION_CHECKS[self._check_version_idx] future = Future() self._api_versions_check_timeout /= 2 response = self._send(request, blocking=True, request_timeout_ms=self._api_versions_check_timeout) @@ -605,7 +595,7 @@ def _handle_api_versions_response(self, future, response): (api_version_data[0], (api_version_data[1], api_version_data[2])) for api_version_data in response.api_versions ]) - self._api_version = self._infer_broker_version_from_api_versions(self._api_versions) + self._api_version = infer_broker_version_from_api_versions(self._api_versions) log.info('%s: Broker version identified as %s', self, '.'.join(map(str, self._api_version))) future.success(self._api_version) self.connect() @@ -1215,52 +1205,6 @@ def get_api_versions(self): self.check_version() return self._api_versions - def _infer_broker_version_from_api_versions(self, api_versions): - # The logic here is to check the list of supported request versions - # in reverse order. As soon as we find one that works, return it - test_cases = [ - # format (, ) - # Make sure to update consumer_integration test check when adding newer versions. - # ((3, 9), FetchRequest[17]), - # ((3, 8), ProduceRequest[11]), - # ((3, 7), FetchRequest[16]), - # ((3, 6), AddPartitionsToTxnRequest[4]), - # ((3, 5), FetchRequest[15]), - # ((3, 4), StopReplicaRequest[3]), # broker-internal api... - # ((3, 3), DescribeAclsRequest[3]), - # ((3, 2), JoinGroupRequest[9]), - # ((3, 1), FetchRequest[13]), - # ((3, 0), ListOffsetsRequest[7]), - # ((2, 8), ProduceRequest[9]), - # ((2, 7), FetchRequest[12]), - # ((2, 6), ListGroupsRequest[4]), - # ((2, 5), JoinGroupRequest[7]), - ((2, 6), DescribeClientQuotasRequest[0]), - ((2, 5), DescribeAclsRequest[2]), - ((2, 4), ProduceRequest[8]), - ((2, 3), FetchRequest[11]), - ((2, 2), ListOffsetsRequest[5]), - ((2, 1), FetchRequest[10]), - ((2, 0), FetchRequest[8]), - ((1, 1), FetchRequest[7]), - ((1, 0), MetadataRequest[5]), - ((0, 11), MetadataRequest[4]), - ((0, 10, 2), OffsetFetchRequest[2]), - ((0, 10, 1), MetadataRequest[2]), - ] - - # Get the best match of test cases - for broker_version, proto_struct in sorted(test_cases, reverse=True): - if proto_struct.API_KEY not in api_versions: - continue - min_version, max_version = api_versions[proto_struct.API_KEY] - if min_version <= proto_struct.API_VERSION <= max_version: - return broker_version - - # We know that ApiVersionsResponse is only supported in 0.10+ - # so if all else fails, choose that - return (0, 10, 0) - def check_version(self, timeout=2, **kwargs): """Attempt to guess the broker version. diff --git a/kafka/protocol/broker_api_versions.py b/kafka/protocol/broker_api_versions.py index af142d07c..a8db8e453 100644 --- a/kafka/protocol/broker_api_versions.py +++ b/kafka/protocol/broker_api_versions.py @@ -1,3 +1,68 @@ +from kafka.protocol.admin import DescribeAclsRequest, DescribeClientQuotasRequest, ListGroupsRequest +from kafka.protocol.commit import OffsetFetchRequest +from kafka.protocol.fetch import FetchRequest +from kafka.protocol.find_coordinator import FindCoordinatorRequest +from kafka.protocol.list_offsets import ListOffsetsRequest +from kafka.protocol.metadata import MetadataRequest +from kafka.protocol.produce import ProduceRequest + + +def infer_broker_version_from_api_versions(api_versions): + # The logic here is to check the list of supported request versions + # in reverse order. As soon as we find one that works, return it + test_cases = [ + # format (, ) + # Make sure to update consumer_integration test check when adding newer versions. + # ((3, 9), FetchRequest[17]), + # ((3, 8), ProduceRequest[11]), + # ((3, 7), FetchRequest[16]), + # ((3, 6), AddPartitionsToTxnRequest[4]), + # ((3, 5), FetchRequest[15]), + # ((3, 4), StopReplicaRequest[3]), # broker-internal api... + # ((3, 3), DescribeAclsRequest[3]), + # ((3, 2), JoinGroupRequest[9]), + # ((3, 1), FetchRequest[13]), + # ((3, 0), ListOffsetsRequest[7]), + # ((2, 8), ProduceRequest[9]), + # ((2, 7), FetchRequest[12]), + # ((2, 6), ListGroupsRequest[4]), + # ((2, 5), JoinGroupRequest[7]), + ((2, 6), DescribeClientQuotasRequest[0]), + ((2, 5), DescribeAclsRequest[2]), + ((2, 4), ProduceRequest[8]), + ((2, 3), FetchRequest[11]), + ((2, 2), ListOffsetsRequest[5]), + ((2, 1), FetchRequest[10]), + ((2, 0), FetchRequest[8]), + ((1, 1), FetchRequest[7]), + ((1, 0), MetadataRequest[5]), + ((0, 11), MetadataRequest[4]), + ((0, 10, 2), OffsetFetchRequest[2]), + ((0, 10, 1), MetadataRequest[2]), + ] + + # Get the best match of test cases + for broker_version, proto_struct in sorted(test_cases, reverse=True): + if proto_struct.API_KEY not in api_versions: + continue + min_version, max_version = api_versions[proto_struct.API_KEY] + if min_version <= proto_struct.API_VERSION <= max_version: + return broker_version + + # We know that ApiVersionsResponse is only supported in 0.10+ + # so if all else fails, choose that + return (0, 10, 0) + + +# Fallback version checks for brokers that do not support ApiVersionsCheck +VERSION_CHECKS = ( + ((0, 9), ListGroupsRequest[0]()), + ((0, 8, 2), FindCoordinatorRequest[0]('kafka-python-default-group')), + ((0, 8, 1), OffsetFetchRequest[0]('kafka-python-default-group', [])), + ((0, 8, 0), MetadataRequest[0]([])), +) + + BROKER_API_VERSIONS = { # api_versions responses prior to (0, 10) are synthesized for compatibility (0, 8, 0): {0: (0, 0), 1: (0, 0), 2: (0, 0), 3: (0, 0)}, diff --git a/test/test_conn.py b/test/test_conn.py index 1f55e1af8..696be53d5 100644 --- a/test/test_conn.py +++ b/test/test_conn.py @@ -8,7 +8,7 @@ from kafka.conn import BrokerConnection, ConnectionStates from kafka.future import Future -from kafka.conn import BrokerConnection, ConnectionStates, SSLWantWriteError +from kafka.conn import BrokerConnection, ConnectionStates, SSLWantWriteError, VERSION_CHECKS from kafka.metrics.metrics import Metrics from kafka.metrics.stats.sensor import Sensor from kafka.protocol.api import RequestHeader @@ -91,7 +91,7 @@ def test_api_versions_check(_socket, mocker): assert conn._try_api_versions_check() is False assert conn.connecting() is True - conn._check_version_idx = len(conn.VERSION_CHECKS) + conn._check_version_idx = len(VERSION_CHECKS) conn._api_versions_future = None assert conn._try_api_versions_check() is False assert conn.connecting() is False