Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 65 additions & 13 deletions kafka/protocol/api.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import abc
from io import BytesIO

from kafka.protocol.struct import Struct
from kafka.protocol.types import Int16, Int32, String, Schema, Array, TaggedFields
Expand All @@ -12,11 +13,6 @@ class RequestHeader(Struct):
('client_id', String('utf-8'))
)

def __init__(self, request, correlation_id=0, client_id='kafka-python'):
super(RequestHeader, self).__init__(
request.API_KEY, request.API_VERSION, correlation_id, client_id
)


class RequestHeaderV2(Struct):
# Flexible response / request headers end in field buffer
Expand All @@ -28,11 +24,6 @@ class RequestHeaderV2(Struct):
('tags', TaggedFields),
)

def __init__(self, request, correlation_id=0, client_id='kafka-python', tags=None):
super(RequestHeaderV2, self).__init__(
request.API_KEY, request.API_VERSION, correlation_id, client_id, tags or {}
)


class ResponseHeader(Struct):
SCHEMA = Schema(
Expand Down Expand Up @@ -72,10 +63,41 @@ def expect_response(self):
def to_object(self):
return _to_object(self.SCHEMA, self)

def build_header(self, correlation_id, client_id):
def build_header(self, correlation_id=0, client_id='kafka-python'):
if self.FLEXIBLE_VERSION:
return RequestHeaderV2(self, correlation_id=correlation_id, client_id=client_id)
return RequestHeader(self, correlation_id=correlation_id, client_id=client_id)
return RequestHeaderV2(self.API_KEY, self.API_VERSION, correlation_id, client_id, {})
return RequestHeader(self.API_KEY, self.API_VERSION, correlation_id, client_id)

@classmethod
def parse_header(cls, read_buffer):
if cls.FLEXIBLE_VERSION:
return RequestHeaderV2.decode(read_buffer)
return RequestHeader.decode(read_buffer)

def encode(self, header=False, framed=False, correlation_id=None, client_id=None, **kwargs):
data = super().encode()
if not framed and not header:
return data
bits = [data]
if header:
bits.insert(0, self.build_header(correlation_id, client_id).encode())
if framed:
bits.insert(0, Int32.encode(sum(map(len, bits))))
return b''.join(bits)

@classmethod
def decode(cls, data, header=False, framed=False):
if not framed and not header:
return super().decode(data)
if isinstance(data, bytes):
data = BytesIO(data)
ret = []
if framed:
ret.append(Int32.decode(data))
if header:
ret.append(cls.parse_header(data))
ret.append(super().decode(data))
return tuple(ret)


class Response(Struct, metaclass=abc.ABCMeta):
Expand All @@ -94,12 +116,42 @@ def API_VERSION(self):
def to_object(self):
return _to_object(self.SCHEMA, self)

def build_header(self, correlation_id=0):
if self.FLEXIBLE_VERSION:
return ResponseHeaderV2(correlation_id=correlation_id, tags=None)
return ResponseHeader(correlation_id=correlation_id)

@classmethod
def parse_header(cls, read_buffer):
if cls.FLEXIBLE_VERSION:
return ResponseHeaderV2.decode(read_buffer)
return ResponseHeader.decode(read_buffer)

def encode(self, header=False, framed=False, correlation_id=None, **kwargs):
data = super().encode()
if not framed and not header:
return data
bits = [data]
if header:
bits.insert(0, self.build_header(correlation_id).encode())
if framed:
bits.insert(0, Int32.encode(sum(map(len, bits))))
return b''.join(bits)

@classmethod
def decode(cls, data, header=False, framed=False):
if not framed and not header:
return super().decode(data)
if isinstance(data, bytes):
data = BytesIO(data)
ret = []
if framed:
ret.append(Int32.decode(data))
if header:
ret.append(cls.parse_header(data))
ret.append(super().decode(data))
return tuple(ret)


def _to_object(schema, data):
obj = {}
Expand Down
10 changes: 7 additions & 3 deletions kafka/protocol/api_versions.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,20 @@ class BaseApiVersionsResponse(Response):
)

@classmethod
def decode(cls, data):
def decode(cls, data, header=False, framed=False):
if isinstance(data, bytes):
data = BytesIO(data)
# Check error_code, decode as v0 if any error
curr = data.tell()
if framed:
nbytes = Int32.decode(data)
if header:
cls.parse_header(data)
err = Int16.decode(data)
data.seek(curr)
if err != 0:
return ApiVersionsResponse_v0.decode(data)
return super(BaseApiVersionsResponse, cls).decode(data)
return ApiVersionsResponse_v0.decode(data, header=header, framed=framed)
return super(BaseApiVersionsResponse, cls).decode(data, header=header, framed=framed)


class ApiVersionsResponse_v0(Response):
Expand Down
6 changes: 2 additions & 4 deletions kafka/protocol/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,8 @@ def send_request(self, request, correlation_id=None):
correlation_id = self._next_correlation_id()

log.debug('%s Sending request %d %s', self._ident, correlation_id, request)
header = request.build_header(correlation_id=correlation_id, client_id=self._client_id)
message = b''.join([header.encode(), request.encode()])
size = Int32.encode(len(message))
data = size + message
data = request.encode(correlation_id=correlation_id, client_id=self._client_id,
framed=True, header=True)
self.bytes_to_send.append(data)
if request.expect_response():
ifr = (correlation_id, request)
Expand Down
2 changes: 1 addition & 1 deletion test/protocol/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def test_encode_message_header():
])

req = FindCoordinatorRequest[0]('foo')
header = RequestHeader(req, correlation_id=4, client_id='client3')
header = RequestHeader(api_key=req.API_KEY, api_version=req.API_VERSION, correlation_id=4, client_id='client3')
assert header.encode() == expect


Expand Down
69 changes: 69 additions & 0 deletions test/protocol/test_api_versions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import pytest

from kafka.protocol.api import Request
from kafka.protocol.api_versions import ApiVersionsRequest, ApiVersionsResponse
from kafka.protocol.types import Int32
from kafka.version import __version__


TEST_CASES = [
(
ApiVersionsRequest[0](),
b'\x00\x00\x00\x1f\x00\x12\x00\x00\x00\x00\x00\x01\x00\x15_internal_client_kYVL',
),

(
ApiVersionsRequest[1](),
b'\x00\x00\x00\x1f\x00\x12\x00\x01\x00\x00\x00\x01\x00\x15_internal_client_kYVL',
),

(
ApiVersionsRequest[2](),
b'\x00\x00\x00\x1f\x00\x12\x00\x02\x00\x00\x00\x01\x00\x15_internal_client_kYVL',
),

(
ApiVersionsRequest[3](client_software_name='kafka-python', client_software_version=__version__, _tagged_fields={}),
b'\x00\x00\x004\x00\x12\x00\x03\x00\x00\x00\x01\x00\x15_internal_client_kYVL\x00\rkafka-python\x062.3.0\x00',
),

(
ApiVersionsRequest[4](client_software_name='kafka-python', client_software_version=__version__, _tagged_fields={}),
b'\x00\x00\x004\x00\x12\x00\x04\x00\x00\x00\x01\x00\x15_internal_client_kYVL\x00\rkafka-python\x062.3.0\x00',
),

(
ApiVersionsResponse[0](error_code=35, api_versions=[(18, 0, 3)]),
b'\x00\x00\x00\x10\x00\x00\x00\x01\x00#\x00\x00\x00\x01\x00\x12\x00\x00\x00\x03',
),

(
ApiVersionsResponse[0](error_code=0, api_versions=[(0, 0, 8), (1, 0, 11), (2, 0, 5), (3, 0, 9), (4, 0, 4), (5, 0, 3), (6, 0, 6), (7, 0, 3), (8, 0, 8), (9, 0, 7), (10, 0, 3), (11, 0, 7), (12, 0, 4), (13, 0, 4), (14, 0, 5), (15, 0, 5), (16, 0, 4), (17, 0, 1), (18, 0, 3), (19, 0, 5), (20, 0, 4), (21, 0, 2), (22, 0, 3), (23, 0, 3), (24, 0, 1), (25, 0, 1), (26, 0, 1), (27, 0, 0), (28, 0, 3), (29, 0, 2), (30, 0, 2), (31, 0, 2), (32, 0, 3), (33, 0, 1), (34, 0, 1), (35, 0, 2), (36, 0, 2), (37, 0, 2), (38, 0, 2), (39, 0, 2), (40, 0, 2), (41, 0, 2), (42, 0, 2), (43, 0, 2), (44, 0, 1), (45, 0, 0), (46, 0, 0), (47, 0, 0), (48, 0, 0), (49, 0, 0)]),

b'\x00\x00\x016\x00\x00\x00\x01\x00\x00\x00\x00\x002\x00\x00\x00\x00\x00\x08\x00\x01\x00\x00\x00\x0b\x00\x02\x00\x00\x00\x05\x00\x03\x00\x00\x00\t\x00\x04\x00\x00\x00\x04\x00\x05\x00\x00\x00\x03\x00\x06\x00\x00\x00\x06\x00\x07\x00\x00\x00\x03\x00\x08\x00\x00\x00\x08\x00\t\x00\x00\x00\x07\x00\n\x00\x00\x00\x03\x00\x0b\x00\x00\x00\x07\x00\x0c\x00\x00\x00\x04\x00\r\x00\x00\x00\x04\x00\x0e\x00\x00\x00\x05\x00\x0f\x00\x00\x00\x05\x00\x10\x00\x00\x00\x04\x00\x11\x00\x00\x00\x01\x00\x12\x00\x00\x00\x03\x00\x13\x00\x00\x00\x05\x00\x14\x00\x00\x00\x04\x00\x15\x00\x00\x00\x02\x00\x16\x00\x00\x00\x03\x00\x17\x00\x00\x00\x03\x00\x18\x00\x00\x00\x01\x00\x19\x00\x00\x00\x01\x00\x1a\x00\x00\x00\x01\x00\x1b\x00\x00\x00\x00\x00\x1c\x00\x00\x00\x03\x00\x1d\x00\x00\x00\x02\x00\x1e\x00\x00\x00\x02\x00\x1f\x00\x00\x00\x02\x00 \x00\x00\x00\x03\x00!\x00\x00\x00\x01\x00"\x00\x00\x00\x01\x00#\x00\x00\x00\x02\x00$\x00\x00\x00\x02\x00%\x00\x00\x00\x02\x00&\x00\x00\x00\x02\x00\'\x00\x00\x00\x02\x00(\x00\x00\x00\x02\x00)\x00\x00\x00\x02\x00*\x00\x00\x00\x02\x00+\x00\x00\x00\x02\x00,\x00\x00\x00\x01\x00-\x00\x00\x00\x00\x00.\x00\x00\x00\x00\x00/\x00\x00\x00\x00\x000\x00\x00\x00\x00\x001\x00\x00\x00\x00',
),

(
ApiVersionsResponse[1](error_code=0, api_versions=[(0, 0, 8), (1, 0, 11), (2, 0, 5), (3, 0, 9), (4, 0, 4), (5, 0, 3), (6, 0, 6), (7, 0, 3), (8, 0, 8), (9, 0, 7), (10, 0, 3), (11, 0, 7), (12, 0, 4), (13, 0, 4), (14, 0, 5), (15, 0, 5), (16, 0, 4), (17, 0, 1), (18, 0, 3), (19, 0, 5), (20, 0, 4), (21, 0, 2), (22, 0, 3), (23, 0, 3), (24, 0, 1), (25, 0, 1), (26, 0, 1), (27, 0, 0), (28, 0, 3), (29, 0, 2), (30, 0, 2), (31, 0, 2), (32, 0, 3), (33, 0, 1), (34, 0, 1), (35, 0, 2), (36, 0, 2), (37, 0, 2), (38, 0, 2), (39, 0, 2), (40, 0, 2), (41, 0, 2), (42, 0, 2), (43, 0, 2), (44, 0, 1), (45, 0, 0), (46, 0, 0), (47, 0, 0), (48, 0, 0), (49, 0, 0)], throttle_time_ms=1),

b'\x00\x00\x01:\x00\x00\x00\x01\x00\x00\x00\x00\x002\x00\x00\x00\x00\x00\x08\x00\x01\x00\x00\x00\x0b\x00\x02\x00\x00\x00\x05\x00\x03\x00\x00\x00\t\x00\x04\x00\x00\x00\x04\x00\x05\x00\x00\x00\x03\x00\x06\x00\x00\x00\x06\x00\x07\x00\x00\x00\x03\x00\x08\x00\x00\x00\x08\x00\t\x00\x00\x00\x07\x00\n\x00\x00\x00\x03\x00\x0b\x00\x00\x00\x07\x00\x0c\x00\x00\x00\x04\x00\r\x00\x00\x00\x04\x00\x0e\x00\x00\x00\x05\x00\x0f\x00\x00\x00\x05\x00\x10\x00\x00\x00\x04\x00\x11\x00\x00\x00\x01\x00\x12\x00\x00\x00\x03\x00\x13\x00\x00\x00\x05\x00\x14\x00\x00\x00\x04\x00\x15\x00\x00\x00\x02\x00\x16\x00\x00\x00\x03\x00\x17\x00\x00\x00\x03\x00\x18\x00\x00\x00\x01\x00\x19\x00\x00\x00\x01\x00\x1a\x00\x00\x00\x01\x00\x1b\x00\x00\x00\x00\x00\x1c\x00\x00\x00\x03\x00\x1d\x00\x00\x00\x02\x00\x1e\x00\x00\x00\x02\x00\x1f\x00\x00\x00\x02\x00 \x00\x00\x00\x03\x00!\x00\x00\x00\x01\x00"\x00\x00\x00\x01\x00#\x00\x00\x00\x02\x00$\x00\x00\x00\x02\x00%\x00\x00\x00\x02\x00&\x00\x00\x00\x02\x00\'\x00\x00\x00\x02\x00(\x00\x00\x00\x02\x00)\x00\x00\x00\x02\x00*\x00\x00\x00\x02\x00+\x00\x00\x00\x02\x00,\x00\x00\x00\x01\x00-\x00\x00\x00\x00\x00.\x00\x00\x00\x00\x00/\x00\x00\x00\x00\x000\x00\x00\x00\x00\x001\x00\x00\x00\x00\x00\x00\x00\x01',
),

(
ApiVersionsResponse[2](error_code=0, api_versions=[(0, 0, 8), (1, 0, 11), (2, 0, 5), (3, 0, 9), (4, 0, 4), (5, 0, 3), (6, 0, 6), (7, 0, 3), (8, 0, 8), (9, 0, 7), (10, 0, 3), (11, 0, 7), (12, 0, 4), (13, 0, 4), (14, 0, 5), (15, 0, 5), (16, 0, 4), (17, 0, 1), (18, 0, 3), (19, 0, 5), (20, 0, 4), (21, 0, 2), (22, 0, 3), (23, 0, 3), (24, 0, 1), (25, 0, 1), (26, 0, 1), (27, 0, 0), (28, 0, 3), (29, 0, 2), (30, 0, 2), (31, 0, 2), (32, 0, 3), (33, 0, 1), (34, 0, 1), (35, 0, 2), (36, 0, 2), (37, 0, 2), (38, 0, 2), (39, 0, 2), (40, 0, 2), (41, 0, 2), (42, 0, 2), (43, 0, 2), (44, 0, 1), (45, 0, 0), (46, 0, 0), (47, 0, 0), (48, 0, 0), (49, 0, 0)], throttle_time_ms=1),

b'\x00\x00\x01:\x00\x00\x00\x01\x00\x00\x00\x00\x002\x00\x00\x00\x00\x00\x08\x00\x01\x00\x00\x00\x0b\x00\x02\x00\x00\x00\x05\x00\x03\x00\x00\x00\t\x00\x04\x00\x00\x00\x04\x00\x05\x00\x00\x00\x03\x00\x06\x00\x00\x00\x06\x00\x07\x00\x00\x00\x03\x00\x08\x00\x00\x00\x08\x00\t\x00\x00\x00\x07\x00\n\x00\x00\x00\x03\x00\x0b\x00\x00\x00\x07\x00\x0c\x00\x00\x00\x04\x00\r\x00\x00\x00\x04\x00\x0e\x00\x00\x00\x05\x00\x0f\x00\x00\x00\x05\x00\x10\x00\x00\x00\x04\x00\x11\x00\x00\x00\x01\x00\x12\x00\x00\x00\x03\x00\x13\x00\x00\x00\x05\x00\x14\x00\x00\x00\x04\x00\x15\x00\x00\x00\x02\x00\x16\x00\x00\x00\x03\x00\x17\x00\x00\x00\x03\x00\x18\x00\x00\x00\x01\x00\x19\x00\x00\x00\x01\x00\x1a\x00\x00\x00\x01\x00\x1b\x00\x00\x00\x00\x00\x1c\x00\x00\x00\x03\x00\x1d\x00\x00\x00\x02\x00\x1e\x00\x00\x00\x02\x00\x1f\x00\x00\x00\x02\x00 \x00\x00\x00\x03\x00!\x00\x00\x00\x01\x00"\x00\x00\x00\x01\x00#\x00\x00\x00\x02\x00$\x00\x00\x00\x02\x00%\x00\x00\x00\x02\x00&\x00\x00\x00\x02\x00\'\x00\x00\x00\x02\x00(\x00\x00\x00\x02\x00)\x00\x00\x00\x02\x00*\x00\x00\x00\x02\x00+\x00\x00\x00\x02\x00,\x00\x00\x00\x01\x00-\x00\x00\x00\x00\x00.\x00\x00\x00\x00\x00/\x00\x00\x00\x00\x000\x00\x00\x00\x00\x001\x00\x00\x00\x00\x00\x00\x00\x01',
),

(
ApiVersionsResponse[3](error_code=0, api_versions=[(0, 0, 8, {}), (1, 0, 11, {}), (2, 0, 5, {}), (3, 0, 9, {}), (4, 0, 4, {}), (5, 0, 3, {}), (6, 0, 6, {}), (7, 0, 3, {}), (8, 0, 8, {}), (9, 0, 7, {}), (10, 0, 3, {}), (11, 0, 7, {}), (12, 0, 4, {}), (13, 0, 4, {}), (14, 0, 5, {}), (15, 0, 5, {}), (16, 0, 4, {}), (17, 0, 1, {}), (18, 0, 3, {}), (19, 0, 5, {}), (20, 0, 4, {}), (21, 0, 2, {}), (22, 0, 3, {}), (23, 0, 3, {}), (24, 0, 1, {}), (25, 0, 1, {}), (26, 0, 1, {}), (27, 0, 0, {}), (28, 0, 3, {}), (29, 0, 2, {}), (30, 0, 2, {}), (31, 0, 2, {}), (32, 0, 3, {}), (33, 0, 1, {}), (34, 0, 1, {}), (35, 0, 2, {}), (36, 0, 2, {}), (37, 0, 2, {}), (38, 0, 2, {}), (39, 0, 2, {}), (40, 0, 2, {}), (41, 0, 2, {}), (42, 0, 2, {}), (43, 0, 2, {}), (44, 0, 1, {}), (45, 0, 0, {}), (46, 0, 0, {}), (47, 0, 0, {}), (48, 0, 0, {}), (49, 0, 0, {})], throttle_time_ms=0, _tagged_fields={}),

b'\x00\x00\x01j\x00\x00\x00\x01\x00\x003\x00\x00\x00\x00\x00\x08\x00\x00\x01\x00\x00\x00\x0b\x00\x00\x02\x00\x00\x00\x05\x00\x00\x03\x00\x00\x00\t\x00\x00\x04\x00\x00\x00\x04\x00\x00\x05\x00\x00\x00\x03\x00\x00\x06\x00\x00\x00\x06\x00\x00\x07\x00\x00\x00\x03\x00\x00\x08\x00\x00\x00\x08\x00\x00\t\x00\x00\x00\x07\x00\x00\n\x00\x00\x00\x03\x00\x00\x0b\x00\x00\x00\x07\x00\x00\x0c\x00\x00\x00\x04\x00\x00\r\x00\x00\x00\x04\x00\x00\x0e\x00\x00\x00\x05\x00\x00\x0f\x00\x00\x00\x05\x00\x00\x10\x00\x00\x00\x04\x00\x00\x11\x00\x00\x00\x01\x00\x00\x12\x00\x00\x00\x03\x00\x00\x13\x00\x00\x00\x05\x00\x00\x14\x00\x00\x00\x04\x00\x00\x15\x00\x00\x00\x02\x00\x00\x16\x00\x00\x00\x03\x00\x00\x17\x00\x00\x00\x03\x00\x00\x18\x00\x00\x00\x01\x00\x00\x19\x00\x00\x00\x01\x00\x00\x1a\x00\x00\x00\x01\x00\x00\x1b\x00\x00\x00\x00\x00\x00\x1c\x00\x00\x00\x03\x00\x00\x1d\x00\x00\x00\x02\x00\x00\x1e\x00\x00\x00\x02\x00\x00\x1f\x00\x00\x00\x02\x00\x00 \x00\x00\x00\x03\x00\x00!\x00\x00\x00\x01\x00\x00"\x00\x00\x00\x01\x00\x00#\x00\x00\x00\x02\x00\x00$\x00\x00\x00\x02\x00\x00%\x00\x00\x00\x02\x00\x00&\x00\x00\x00\x02\x00\x00\'\x00\x00\x00\x02\x00\x00(\x00\x00\x00\x02\x00\x00)\x00\x00\x00\x02\x00\x00*\x00\x00\x00\x02\x00\x00+\x00\x00\x00\x02\x00\x00,\x00\x00\x00\x01\x00\x00-\x00\x00\x00\x00\x00\x00.\x00\x00\x00\x00\x00\x00/\x00\x00\x00\x00\x00\x000\x00\x00\x00\x00\x00\x001\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00',
),
]


@pytest.mark.parametrize('msg, encoded', TEST_CASES)
def test_parse(msg, encoded):
assert msg.encode(correlation_id=1, client_id='_internal_client_kYVL', header=True, framed=True) == encoded
assert msg.decode(encoded, header=True, framed=True)[2] == msg
Loading
Loading