Skip to content

Commit ffefb2d

Browse files
authored
Support Request/Response encode/decode with frame and header kwargs (#2720)
1 parent 6fe54b9 commit ffefb2d

7 files changed

Lines changed: 385 additions & 26 deletions

File tree

kafka/protocol/api.py

Lines changed: 65 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import abc
2+
from io import BytesIO
23

34
from kafka.protocol.struct import Struct
45
from kafka.protocol.types import Int16, Int32, String, Schema, Array, TaggedFields
@@ -12,11 +13,6 @@ class RequestHeader(Struct):
1213
('client_id', String('utf-8'))
1314
)
1415

15-
def __init__(self, request, correlation_id=0, client_id='kafka-python'):
16-
super(RequestHeader, self).__init__(
17-
request.API_KEY, request.API_VERSION, correlation_id, client_id
18-
)
19-
2016

2117
class RequestHeaderV2(Struct):
2218
# Flexible response / request headers end in field buffer
@@ -28,11 +24,6 @@ class RequestHeaderV2(Struct):
2824
('tags', TaggedFields),
2925
)
3026

31-
def __init__(self, request, correlation_id=0, client_id='kafka-python', tags=None):
32-
super(RequestHeaderV2, self).__init__(
33-
request.API_KEY, request.API_VERSION, correlation_id, client_id, tags or {}
34-
)
35-
3627

3728
class ResponseHeader(Struct):
3829
SCHEMA = Schema(
@@ -72,10 +63,41 @@ def expect_response(self):
7263
def to_object(self):
7364
return _to_object(self.SCHEMA, self)
7465

75-
def build_header(self, correlation_id, client_id):
66+
def build_header(self, correlation_id=0, client_id='kafka-python'):
7667
if self.FLEXIBLE_VERSION:
77-
return RequestHeaderV2(self, correlation_id=correlation_id, client_id=client_id)
78-
return RequestHeader(self, correlation_id=correlation_id, client_id=client_id)
68+
return RequestHeaderV2(self.API_KEY, self.API_VERSION, correlation_id, client_id, {})
69+
return RequestHeader(self.API_KEY, self.API_VERSION, correlation_id, client_id)
70+
71+
@classmethod
72+
def parse_header(cls, read_buffer):
73+
if cls.FLEXIBLE_VERSION:
74+
return RequestHeaderV2.decode(read_buffer)
75+
return RequestHeader.decode(read_buffer)
76+
77+
def encode(self, header=False, framed=False, correlation_id=None, client_id=None, **kwargs):
78+
data = super().encode()
79+
if not framed and not header:
80+
return data
81+
bits = [data]
82+
if header:
83+
bits.insert(0, self.build_header(correlation_id, client_id).encode())
84+
if framed:
85+
bits.insert(0, Int32.encode(sum(map(len, bits))))
86+
return b''.join(bits)
87+
88+
@classmethod
89+
def decode(cls, data, header=False, framed=False):
90+
if not framed and not header:
91+
return super().decode(data)
92+
if isinstance(data, bytes):
93+
data = BytesIO(data)
94+
ret = []
95+
if framed:
96+
ret.append(Int32.decode(data))
97+
if header:
98+
ret.append(cls.parse_header(data))
99+
ret.append(super().decode(data))
100+
return tuple(ret)
79101

80102

81103
class Response(Struct, metaclass=abc.ABCMeta):
@@ -94,12 +116,42 @@ def API_VERSION(self):
94116
def to_object(self):
95117
return _to_object(self.SCHEMA, self)
96118

119+
def build_header(self, correlation_id=0):
120+
if self.FLEXIBLE_VERSION:
121+
return ResponseHeaderV2(correlation_id=correlation_id, tags=None)
122+
return ResponseHeader(correlation_id=correlation_id)
123+
97124
@classmethod
98125
def parse_header(cls, read_buffer):
99126
if cls.FLEXIBLE_VERSION:
100127
return ResponseHeaderV2.decode(read_buffer)
101128
return ResponseHeader.decode(read_buffer)
102129

130+
def encode(self, header=False, framed=False, correlation_id=None, **kwargs):
131+
data = super().encode()
132+
if not framed and not header:
133+
return data
134+
bits = [data]
135+
if header:
136+
bits.insert(0, self.build_header(correlation_id).encode())
137+
if framed:
138+
bits.insert(0, Int32.encode(sum(map(len, bits))))
139+
return b''.join(bits)
140+
141+
@classmethod
142+
def decode(cls, data, header=False, framed=False):
143+
if not framed and not header:
144+
return super().decode(data)
145+
if isinstance(data, bytes):
146+
data = BytesIO(data)
147+
ret = []
148+
if framed:
149+
ret.append(Int32.decode(data))
150+
if header:
151+
ret.append(cls.parse_header(data))
152+
ret.append(super().decode(data))
153+
return tuple(ret)
154+
103155

104156
def _to_object(schema, data):
105157
obj = {}

kafka/protocol/api_versions.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,20 @@ class BaseApiVersionsResponse(Response):
1616
)
1717

1818
@classmethod
19-
def decode(cls, data):
19+
def decode(cls, data, header=False, framed=False):
2020
if isinstance(data, bytes):
2121
data = BytesIO(data)
2222
# Check error_code, decode as v0 if any error
2323
curr = data.tell()
24+
if framed:
25+
nbytes = Int32.decode(data)
26+
if header:
27+
cls.parse_header(data)
2428
err = Int16.decode(data)
2529
data.seek(curr)
2630
if err != 0:
27-
return ApiVersionsResponse_v0.decode(data)
28-
return super(BaseApiVersionsResponse, cls).decode(data)
31+
return ApiVersionsResponse_v0.decode(data, header=header, framed=framed)
32+
return super(BaseApiVersionsResponse, cls).decode(data, header=header, framed=framed)
2933

3034

3135
class ApiVersionsResponse_v0(Response):

kafka/protocol/parser.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,10 +58,8 @@ def send_request(self, request, correlation_id=None):
5858
correlation_id = self._next_correlation_id()
5959

6060
log.debug('%s Sending request %d %s', self._ident, correlation_id, request)
61-
header = request.build_header(correlation_id=correlation_id, client_id=self._client_id)
62-
message = b''.join([header.encode(), request.encode()])
63-
size = Int32.encode(len(message))
64-
data = size + message
61+
data = request.encode(correlation_id=correlation_id, client_id=self._client_id,
62+
framed=True, header=True)
6563
self.bytes_to_send.append(data)
6664
if request.expect_response():
6765
ifr = (correlation_id, request)

test/protocol/test_api.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ def test_encode_message_header():
1818
])
1919

2020
req = FindCoordinatorRequest[0]('foo')
21-
header = RequestHeader(req, correlation_id=4, client_id='client3')
21+
header = RequestHeader(api_key=req.API_KEY, api_version=req.API_VERSION, correlation_id=4, client_id='client3')
2222
assert header.encode() == expect
2323

2424

test/protocol/test_api_versions.py

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
import pytest
2+
3+
from kafka.protocol.api import Request
4+
from kafka.protocol.api_versions import ApiVersionsRequest, ApiVersionsResponse
5+
from kafka.protocol.types import Int32
6+
from kafka.version import __version__
7+
8+
9+
TEST_CASES = [
10+
(
11+
ApiVersionsRequest[0](),
12+
b'\x00\x00\x00\x1f\x00\x12\x00\x00\x00\x00\x00\x01\x00\x15_internal_client_kYVL',
13+
),
14+
15+
(
16+
ApiVersionsRequest[1](),
17+
b'\x00\x00\x00\x1f\x00\x12\x00\x01\x00\x00\x00\x01\x00\x15_internal_client_kYVL',
18+
),
19+
20+
(
21+
ApiVersionsRequest[2](),
22+
b'\x00\x00\x00\x1f\x00\x12\x00\x02\x00\x00\x00\x01\x00\x15_internal_client_kYVL',
23+
),
24+
25+
(
26+
ApiVersionsRequest[3](client_software_name='kafka-python', client_software_version=__version__, _tagged_fields={}),
27+
b'\x00\x00\x004\x00\x12\x00\x03\x00\x00\x00\x01\x00\x15_internal_client_kYVL\x00\rkafka-python\x062.3.0\x00',
28+
),
29+
30+
(
31+
ApiVersionsRequest[4](client_software_name='kafka-python', client_software_version=__version__, _tagged_fields={}),
32+
b'\x00\x00\x004\x00\x12\x00\x04\x00\x00\x00\x01\x00\x15_internal_client_kYVL\x00\rkafka-python\x062.3.0\x00',
33+
),
34+
35+
(
36+
ApiVersionsResponse[0](error_code=35, api_versions=[(18, 0, 3)]),
37+
b'\x00\x00\x00\x10\x00\x00\x00\x01\x00#\x00\x00\x00\x01\x00\x12\x00\x00\x00\x03',
38+
),
39+
40+
(
41+
ApiVersionsResponse[0](error_code=0, api_versions=[(0, 0, 8), (1, 0, 11), (2, 0, 5), (3, 0, 9), (4, 0, 4), (5, 0, 3), (6, 0, 6), (7, 0, 3), (8, 0, 8), (9, 0, 7), (10, 0, 3), (11, 0, 7), (12, 0, 4), (13, 0, 4), (14, 0, 5), (15, 0, 5), (16, 0, 4), (17, 0, 1), (18, 0, 3), (19, 0, 5), (20, 0, 4), (21, 0, 2), (22, 0, 3), (23, 0, 3), (24, 0, 1), (25, 0, 1), (26, 0, 1), (27, 0, 0), (28, 0, 3), (29, 0, 2), (30, 0, 2), (31, 0, 2), (32, 0, 3), (33, 0, 1), (34, 0, 1), (35, 0, 2), (36, 0, 2), (37, 0, 2), (38, 0, 2), (39, 0, 2), (40, 0, 2), (41, 0, 2), (42, 0, 2), (43, 0, 2), (44, 0, 1), (45, 0, 0), (46, 0, 0), (47, 0, 0), (48, 0, 0), (49, 0, 0)]),
42+
43+
b'\x00\x00\x016\x00\x00\x00\x01\x00\x00\x00\x00\x002\x00\x00\x00\x00\x00\x08\x00\x01\x00\x00\x00\x0b\x00\x02\x00\x00\x00\x05\x00\x03\x00\x00\x00\t\x00\x04\x00\x00\x00\x04\x00\x05\x00\x00\x00\x03\x00\x06\x00\x00\x00\x06\x00\x07\x00\x00\x00\x03\x00\x08\x00\x00\x00\x08\x00\t\x00\x00\x00\x07\x00\n\x00\x00\x00\x03\x00\x0b\x00\x00\x00\x07\x00\x0c\x00\x00\x00\x04\x00\r\x00\x00\x00\x04\x00\x0e\x00\x00\x00\x05\x00\x0f\x00\x00\x00\x05\x00\x10\x00\x00\x00\x04\x00\x11\x00\x00\x00\x01\x00\x12\x00\x00\x00\x03\x00\x13\x00\x00\x00\x05\x00\x14\x00\x00\x00\x04\x00\x15\x00\x00\x00\x02\x00\x16\x00\x00\x00\x03\x00\x17\x00\x00\x00\x03\x00\x18\x00\x00\x00\x01\x00\x19\x00\x00\x00\x01\x00\x1a\x00\x00\x00\x01\x00\x1b\x00\x00\x00\x00\x00\x1c\x00\x00\x00\x03\x00\x1d\x00\x00\x00\x02\x00\x1e\x00\x00\x00\x02\x00\x1f\x00\x00\x00\x02\x00 \x00\x00\x00\x03\x00!\x00\x00\x00\x01\x00"\x00\x00\x00\x01\x00#\x00\x00\x00\x02\x00$\x00\x00\x00\x02\x00%\x00\x00\x00\x02\x00&\x00\x00\x00\x02\x00\'\x00\x00\x00\x02\x00(\x00\x00\x00\x02\x00)\x00\x00\x00\x02\x00*\x00\x00\x00\x02\x00+\x00\x00\x00\x02\x00,\x00\x00\x00\x01\x00-\x00\x00\x00\x00\x00.\x00\x00\x00\x00\x00/\x00\x00\x00\x00\x000\x00\x00\x00\x00\x001\x00\x00\x00\x00',
44+
),
45+
46+
(
47+
ApiVersionsResponse[1](error_code=0, api_versions=[(0, 0, 8), (1, 0, 11), (2, 0, 5), (3, 0, 9), (4, 0, 4), (5, 0, 3), (6, 0, 6), (7, 0, 3), (8, 0, 8), (9, 0, 7), (10, 0, 3), (11, 0, 7), (12, 0, 4), (13, 0, 4), (14, 0, 5), (15, 0, 5), (16, 0, 4), (17, 0, 1), (18, 0, 3), (19, 0, 5), (20, 0, 4), (21, 0, 2), (22, 0, 3), (23, 0, 3), (24, 0, 1), (25, 0, 1), (26, 0, 1), (27, 0, 0), (28, 0, 3), (29, 0, 2), (30, 0, 2), (31, 0, 2), (32, 0, 3), (33, 0, 1), (34, 0, 1), (35, 0, 2), (36, 0, 2), (37, 0, 2), (38, 0, 2), (39, 0, 2), (40, 0, 2), (41, 0, 2), (42, 0, 2), (43, 0, 2), (44, 0, 1), (45, 0, 0), (46, 0, 0), (47, 0, 0), (48, 0, 0), (49, 0, 0)], throttle_time_ms=1),
48+
49+
b'\x00\x00\x01:\x00\x00\x00\x01\x00\x00\x00\x00\x002\x00\x00\x00\x00\x00\x08\x00\x01\x00\x00\x00\x0b\x00\x02\x00\x00\x00\x05\x00\x03\x00\x00\x00\t\x00\x04\x00\x00\x00\x04\x00\x05\x00\x00\x00\x03\x00\x06\x00\x00\x00\x06\x00\x07\x00\x00\x00\x03\x00\x08\x00\x00\x00\x08\x00\t\x00\x00\x00\x07\x00\n\x00\x00\x00\x03\x00\x0b\x00\x00\x00\x07\x00\x0c\x00\x00\x00\x04\x00\r\x00\x00\x00\x04\x00\x0e\x00\x00\x00\x05\x00\x0f\x00\x00\x00\x05\x00\x10\x00\x00\x00\x04\x00\x11\x00\x00\x00\x01\x00\x12\x00\x00\x00\x03\x00\x13\x00\x00\x00\x05\x00\x14\x00\x00\x00\x04\x00\x15\x00\x00\x00\x02\x00\x16\x00\x00\x00\x03\x00\x17\x00\x00\x00\x03\x00\x18\x00\x00\x00\x01\x00\x19\x00\x00\x00\x01\x00\x1a\x00\x00\x00\x01\x00\x1b\x00\x00\x00\x00\x00\x1c\x00\x00\x00\x03\x00\x1d\x00\x00\x00\x02\x00\x1e\x00\x00\x00\x02\x00\x1f\x00\x00\x00\x02\x00 \x00\x00\x00\x03\x00!\x00\x00\x00\x01\x00"\x00\x00\x00\x01\x00#\x00\x00\x00\x02\x00$\x00\x00\x00\x02\x00%\x00\x00\x00\x02\x00&\x00\x00\x00\x02\x00\'\x00\x00\x00\x02\x00(\x00\x00\x00\x02\x00)\x00\x00\x00\x02\x00*\x00\x00\x00\x02\x00+\x00\x00\x00\x02\x00,\x00\x00\x00\x01\x00-\x00\x00\x00\x00\x00.\x00\x00\x00\x00\x00/\x00\x00\x00\x00\x000\x00\x00\x00\x00\x001\x00\x00\x00\x00\x00\x00\x00\x01',
50+
),
51+
52+
(
53+
ApiVersionsResponse[2](error_code=0, api_versions=[(0, 0, 8), (1, 0, 11), (2, 0, 5), (3, 0, 9), (4, 0, 4), (5, 0, 3), (6, 0, 6), (7, 0, 3), (8, 0, 8), (9, 0, 7), (10, 0, 3), (11, 0, 7), (12, 0, 4), (13, 0, 4), (14, 0, 5), (15, 0, 5), (16, 0, 4), (17, 0, 1), (18, 0, 3), (19, 0, 5), (20, 0, 4), (21, 0, 2), (22, 0, 3), (23, 0, 3), (24, 0, 1), (25, 0, 1), (26, 0, 1), (27, 0, 0), (28, 0, 3), (29, 0, 2), (30, 0, 2), (31, 0, 2), (32, 0, 3), (33, 0, 1), (34, 0, 1), (35, 0, 2), (36, 0, 2), (37, 0, 2), (38, 0, 2), (39, 0, 2), (40, 0, 2), (41, 0, 2), (42, 0, 2), (43, 0, 2), (44, 0, 1), (45, 0, 0), (46, 0, 0), (47, 0, 0), (48, 0, 0), (49, 0, 0)], throttle_time_ms=1),
54+
55+
b'\x00\x00\x01:\x00\x00\x00\x01\x00\x00\x00\x00\x002\x00\x00\x00\x00\x00\x08\x00\x01\x00\x00\x00\x0b\x00\x02\x00\x00\x00\x05\x00\x03\x00\x00\x00\t\x00\x04\x00\x00\x00\x04\x00\x05\x00\x00\x00\x03\x00\x06\x00\x00\x00\x06\x00\x07\x00\x00\x00\x03\x00\x08\x00\x00\x00\x08\x00\t\x00\x00\x00\x07\x00\n\x00\x00\x00\x03\x00\x0b\x00\x00\x00\x07\x00\x0c\x00\x00\x00\x04\x00\r\x00\x00\x00\x04\x00\x0e\x00\x00\x00\x05\x00\x0f\x00\x00\x00\x05\x00\x10\x00\x00\x00\x04\x00\x11\x00\x00\x00\x01\x00\x12\x00\x00\x00\x03\x00\x13\x00\x00\x00\x05\x00\x14\x00\x00\x00\x04\x00\x15\x00\x00\x00\x02\x00\x16\x00\x00\x00\x03\x00\x17\x00\x00\x00\x03\x00\x18\x00\x00\x00\x01\x00\x19\x00\x00\x00\x01\x00\x1a\x00\x00\x00\x01\x00\x1b\x00\x00\x00\x00\x00\x1c\x00\x00\x00\x03\x00\x1d\x00\x00\x00\x02\x00\x1e\x00\x00\x00\x02\x00\x1f\x00\x00\x00\x02\x00 \x00\x00\x00\x03\x00!\x00\x00\x00\x01\x00"\x00\x00\x00\x01\x00#\x00\x00\x00\x02\x00$\x00\x00\x00\x02\x00%\x00\x00\x00\x02\x00&\x00\x00\x00\x02\x00\'\x00\x00\x00\x02\x00(\x00\x00\x00\x02\x00)\x00\x00\x00\x02\x00*\x00\x00\x00\x02\x00+\x00\x00\x00\x02\x00,\x00\x00\x00\x01\x00-\x00\x00\x00\x00\x00.\x00\x00\x00\x00\x00/\x00\x00\x00\x00\x000\x00\x00\x00\x00\x001\x00\x00\x00\x00\x00\x00\x00\x01',
56+
),
57+
58+
(
59+
ApiVersionsResponse[3](error_code=0, api_versions=[(0, 0, 8, {}), (1, 0, 11, {}), (2, 0, 5, {}), (3, 0, 9, {}), (4, 0, 4, {}), (5, 0, 3, {}), (6, 0, 6, {}), (7, 0, 3, {}), (8, 0, 8, {}), (9, 0, 7, {}), (10, 0, 3, {}), (11, 0, 7, {}), (12, 0, 4, {}), (13, 0, 4, {}), (14, 0, 5, {}), (15, 0, 5, {}), (16, 0, 4, {}), (17, 0, 1, {}), (18, 0, 3, {}), (19, 0, 5, {}), (20, 0, 4, {}), (21, 0, 2, {}), (22, 0, 3, {}), (23, 0, 3, {}), (24, 0, 1, {}), (25, 0, 1, {}), (26, 0, 1, {}), (27, 0, 0, {}), (28, 0, 3, {}), (29, 0, 2, {}), (30, 0, 2, {}), (31, 0, 2, {}), (32, 0, 3, {}), (33, 0, 1, {}), (34, 0, 1, {}), (35, 0, 2, {}), (36, 0, 2, {}), (37, 0, 2, {}), (38, 0, 2, {}), (39, 0, 2, {}), (40, 0, 2, {}), (41, 0, 2, {}), (42, 0, 2, {}), (43, 0, 2, {}), (44, 0, 1, {}), (45, 0, 0, {}), (46, 0, 0, {}), (47, 0, 0, {}), (48, 0, 0, {}), (49, 0, 0, {})], throttle_time_ms=0, _tagged_fields={}),
60+
61+
b'\x00\x00\x01j\x00\x00\x00\x01\x00\x003\x00\x00\x00\x00\x00\x08\x00\x00\x01\x00\x00\x00\x0b\x00\x00\x02\x00\x00\x00\x05\x00\x00\x03\x00\x00\x00\t\x00\x00\x04\x00\x00\x00\x04\x00\x00\x05\x00\x00\x00\x03\x00\x00\x06\x00\x00\x00\x06\x00\x00\x07\x00\x00\x00\x03\x00\x00\x08\x00\x00\x00\x08\x00\x00\t\x00\x00\x00\x07\x00\x00\n\x00\x00\x00\x03\x00\x00\x0b\x00\x00\x00\x07\x00\x00\x0c\x00\x00\x00\x04\x00\x00\r\x00\x00\x00\x04\x00\x00\x0e\x00\x00\x00\x05\x00\x00\x0f\x00\x00\x00\x05\x00\x00\x10\x00\x00\x00\x04\x00\x00\x11\x00\x00\x00\x01\x00\x00\x12\x00\x00\x00\x03\x00\x00\x13\x00\x00\x00\x05\x00\x00\x14\x00\x00\x00\x04\x00\x00\x15\x00\x00\x00\x02\x00\x00\x16\x00\x00\x00\x03\x00\x00\x17\x00\x00\x00\x03\x00\x00\x18\x00\x00\x00\x01\x00\x00\x19\x00\x00\x00\x01\x00\x00\x1a\x00\x00\x00\x01\x00\x00\x1b\x00\x00\x00\x00\x00\x00\x1c\x00\x00\x00\x03\x00\x00\x1d\x00\x00\x00\x02\x00\x00\x1e\x00\x00\x00\x02\x00\x00\x1f\x00\x00\x00\x02\x00\x00 \x00\x00\x00\x03\x00\x00!\x00\x00\x00\x01\x00\x00"\x00\x00\x00\x01\x00\x00#\x00\x00\x00\x02\x00\x00$\x00\x00\x00\x02\x00\x00%\x00\x00\x00\x02\x00\x00&\x00\x00\x00\x02\x00\x00\'\x00\x00\x00\x02\x00\x00(\x00\x00\x00\x02\x00\x00)\x00\x00\x00\x02\x00\x00*\x00\x00\x00\x02\x00\x00+\x00\x00\x00\x02\x00\x00,\x00\x00\x00\x01\x00\x00-\x00\x00\x00\x00\x00\x00.\x00\x00\x00\x00\x00\x00/\x00\x00\x00\x00\x00\x000\x00\x00\x00\x00\x00\x001\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00',
62+
),
63+
]
64+
65+
66+
@pytest.mark.parametrize('msg, encoded', TEST_CASES)
67+
def test_parse(msg, encoded):
68+
assert msg.encode(correlation_id=1, client_id='_internal_client_kYVL', header=True, framed=True) == encoded
69+
assert msg.decode(encoded, header=True, framed=True)[2] == msg

0 commit comments

Comments
 (0)