Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 7 additions & 63 deletions kafka/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,12 @@
import kafka.errors as Errors
from kafka.future import Future
from kafka.metrics.stats import Avg, Count, Max, Rate
from kafka.protocol.admin import DescribeAclsRequest, DescribeClientQuotasRequest, ListGroupsRequest
from kafka.protocol.api_versions import ApiVersionsRequest
from kafka.protocol.broker_api_versions import BROKER_API_VERSIONS
from kafka.protocol.commit import OffsetFetchRequest
from kafka.protocol.fetch import FetchRequest
from kafka.protocol.find_coordinator import FindCoordinatorRequest
from kafka.protocol.list_offsets import ListOffsetsRequest
from kafka.protocol.metadata import MetadataRequest
from kafka.protocol.broker_api_versions import (
BROKER_API_VERSIONS, VERSION_CHECKS,
infer_broker_version_from_api_versions,
)
from kafka.protocol.parser import KafkaProtocol
from kafka.protocol.produce import ProduceRequest
from kafka.protocol.sasl_authenticate import SaslAuthenticateRequest
from kafka.protocol.sasl_handshake import SaslHandshakeRequest
from kafka.protocol.types import Int32
Expand Down Expand Up @@ -216,12 +212,6 @@ class BrokerConnection(object):
'socks5_proxy': None,
}
SECURITY_PROTOCOLS = ('PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL')
VERSION_CHECKS = (
((0, 9), ListGroupsRequest[0]()),
((0, 8, 2), FindCoordinatorRequest[0]('kafka-python-default-group')),
((0, 8, 1), OffsetFetchRequest[0]('kafka-python-default-group', [])),
((0, 8, 0), MetadataRequest[0]([])),
)

def __init__(self, host, port, afi, **configs):
self.host = host
Expand Down Expand Up @@ -556,8 +546,8 @@ def _try_api_versions_check(self):
self._api_versions_future = future
self.state = ConnectionStates.API_VERSIONS_RECV
self.config['state_change_callback'](self.node_id, self._sock, self)
elif self._check_version_idx < len(self.VERSION_CHECKS):
version, request = self.VERSION_CHECKS[self._check_version_idx]
elif self._check_version_idx < len(VERSION_CHECKS):
version, request = VERSION_CHECKS[self._check_version_idx]
future = Future()
self._api_versions_check_timeout /= 2
response = self._send(request, blocking=True, request_timeout_ms=self._api_versions_check_timeout)
Expand Down Expand Up @@ -605,7 +595,7 @@ def _handle_api_versions_response(self, future, response):
(api_version_data[0], (api_version_data[1], api_version_data[2]))
for api_version_data in response.api_versions
])
self._api_version = self._infer_broker_version_from_api_versions(self._api_versions)
self._api_version = infer_broker_version_from_api_versions(self._api_versions)
log.info('%s: Broker version identified as %s', self, '.'.join(map(str, self._api_version)))
future.success(self._api_version)
self.connect()
Expand Down Expand Up @@ -1215,52 +1205,6 @@ def get_api_versions(self):
self.check_version()
return self._api_versions

def _infer_broker_version_from_api_versions(self, api_versions):
# The logic here is to check the list of supported request versions
# in reverse order. As soon as we find one that works, return it
test_cases = [
# format (<broker version>, <needed struct>)
# Make sure to update consumer_integration test check when adding newer versions.
# ((3, 9), FetchRequest[17]),
# ((3, 8), ProduceRequest[11]),
# ((3, 7), FetchRequest[16]),
# ((3, 6), AddPartitionsToTxnRequest[4]),
# ((3, 5), FetchRequest[15]),
# ((3, 4), StopReplicaRequest[3]), # broker-internal api...
# ((3, 3), DescribeAclsRequest[3]),
# ((3, 2), JoinGroupRequest[9]),
# ((3, 1), FetchRequest[13]),
# ((3, 0), ListOffsetsRequest[7]),
# ((2, 8), ProduceRequest[9]),
# ((2, 7), FetchRequest[12]),
# ((2, 6), ListGroupsRequest[4]),
# ((2, 5), JoinGroupRequest[7]),
((2, 6), DescribeClientQuotasRequest[0]),
((2, 5), DescribeAclsRequest[2]),
((2, 4), ProduceRequest[8]),
((2, 3), FetchRequest[11]),
((2, 2), ListOffsetsRequest[5]),
((2, 1), FetchRequest[10]),
((2, 0), FetchRequest[8]),
((1, 1), FetchRequest[7]),
((1, 0), MetadataRequest[5]),
((0, 11), MetadataRequest[4]),
((0, 10, 2), OffsetFetchRequest[2]),
((0, 10, 1), MetadataRequest[2]),
]

# Get the best match of test cases
for broker_version, proto_struct in sorted(test_cases, reverse=True):
if proto_struct.API_KEY not in api_versions:
continue
min_version, max_version = api_versions[proto_struct.API_KEY]
if min_version <= proto_struct.API_VERSION <= max_version:
return broker_version

# We know that ApiVersionsResponse is only supported in 0.10+
# so if all else fails, choose that
return (0, 10, 0)

def check_version(self, timeout=2, **kwargs):
"""Attempt to guess the broker version.

Expand Down
65 changes: 65 additions & 0 deletions kafka/protocol/broker_api_versions.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,68 @@
from kafka.protocol.admin import DescribeAclsRequest, DescribeClientQuotasRequest, ListGroupsRequest
from kafka.protocol.commit import OffsetFetchRequest
from kafka.protocol.fetch import FetchRequest
from kafka.protocol.find_coordinator import FindCoordinatorRequest
from kafka.protocol.list_offsets import ListOffsetsRequest
from kafka.protocol.metadata import MetadataRequest
from kafka.protocol.produce import ProduceRequest


def infer_broker_version_from_api_versions(api_versions):
# The logic here is to check the list of supported request versions
# in reverse order. As soon as we find one that works, return it
test_cases = [
# format (<broker version>, <needed struct>)
# Make sure to update consumer_integration test check when adding newer versions.
# ((3, 9), FetchRequest[17]),
# ((3, 8), ProduceRequest[11]),
# ((3, 7), FetchRequest[16]),
# ((3, 6), AddPartitionsToTxnRequest[4]),
# ((3, 5), FetchRequest[15]),
# ((3, 4), StopReplicaRequest[3]), # broker-internal api...
# ((3, 3), DescribeAclsRequest[3]),
# ((3, 2), JoinGroupRequest[9]),
# ((3, 1), FetchRequest[13]),
# ((3, 0), ListOffsetsRequest[7]),
# ((2, 8), ProduceRequest[9]),
# ((2, 7), FetchRequest[12]),
# ((2, 6), ListGroupsRequest[4]),
# ((2, 5), JoinGroupRequest[7]),
((2, 6), DescribeClientQuotasRequest[0]),
((2, 5), DescribeAclsRequest[2]),
((2, 4), ProduceRequest[8]),
((2, 3), FetchRequest[11]),
((2, 2), ListOffsetsRequest[5]),
((2, 1), FetchRequest[10]),
((2, 0), FetchRequest[8]),
((1, 1), FetchRequest[7]),
((1, 0), MetadataRequest[5]),
((0, 11), MetadataRequest[4]),
((0, 10, 2), OffsetFetchRequest[2]),
((0, 10, 1), MetadataRequest[2]),
]

# Get the best match of test cases
for broker_version, proto_struct in sorted(test_cases, reverse=True):
if proto_struct.API_KEY not in api_versions:
continue
min_version, max_version = api_versions[proto_struct.API_KEY]
if min_version <= proto_struct.API_VERSION <= max_version:
return broker_version

# We know that ApiVersionsResponse is only supported in 0.10+
# so if all else fails, choose that
return (0, 10, 0)


# Fallback version checks for brokers that do not support ApiVersionsCheck
VERSION_CHECKS = (
((0, 9), ListGroupsRequest[0]()),
((0, 8, 2), FindCoordinatorRequest[0]('kafka-python-default-group')),
((0, 8, 1), OffsetFetchRequest[0]('kafka-python-default-group', [])),
((0, 8, 0), MetadataRequest[0]([])),
)


BROKER_API_VERSIONS = {
# api_versions responses prior to (0, 10) are synthesized for compatibility
(0, 8, 0): {0: (0, 0), 1: (0, 0), 2: (0, 0), 3: (0, 0)},
Expand Down
4 changes: 2 additions & 2 deletions test/test_conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from kafka.conn import BrokerConnection, ConnectionStates
from kafka.future import Future
from kafka.conn import BrokerConnection, ConnectionStates, SSLWantWriteError
from kafka.conn import BrokerConnection, ConnectionStates, SSLWantWriteError, VERSION_CHECKS
from kafka.metrics.metrics import Metrics
from kafka.metrics.stats.sensor import Sensor
from kafka.protocol.api import RequestHeader
Expand Down Expand Up @@ -91,7 +91,7 @@ def test_api_versions_check(_socket, mocker):
assert conn._try_api_versions_check() is False
assert conn.connecting() is True

conn._check_version_idx = len(conn.VERSION_CHECKS)
conn._check_version_idx = len(VERSION_CHECKS)
conn._api_versions_future = None
assert conn._try_api_versions_check() is False
assert conn.connecting() is False
Expand Down
Loading