Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion kafka/protocol/api_versions.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def decode(cls, data, header=False, framed=False):
if framed:
nbytes = Int32.decode(data)
if header:
cls.parse_header(data)
cls.parse_header(data) # Note: non-flexible header
err = Int16.decode(data)
data.seek(curr)
if err != 0:
Expand Down
19 changes: 17 additions & 2 deletions test/protocol/test_api_versions.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from io import BytesIO

import pytest

from kafka.protocol.api import Request
from kafka.protocol.api import Request, ResponseHeader
from kafka.protocol.api_versions import ApiVersionsRequest, ApiVersionsResponse
from kafka.protocol.types import Int32
from kafka.version import __version__
Expand Down Expand Up @@ -70,4 +72,17 @@ def test_parse(msg, encoded):
else:
msg.with_header(correlation_id=1)
assert msg.encode(header=True, framed=True) == encoded
assert msg.decode(encoded, header=True, framed=True) == msg
data = BytesIO(encoded)
assert msg.decode(data, header=True, framed=True) == msg
assert data.read() == b''


@pytest.mark.parametrize('version', [0, 1, 2, 3, 4])
def test_parse_flexible_error(version):
# An unsupported request version returns v0 response. Make sure this works with all versions!
msg = ApiVersionsResponse[0](error_code=35, api_versions=[(18, 0, 3)])
msg.with_header(correlation_id=1)
encoded = b'\x00\x00\x00\x10\x00\x00\x00\x01\x00#\x00\x00\x00\x01\x00\x12\x00\x00\x00\x03'
data = BytesIO(encoded)
assert ApiVersionsResponse[version].decode(data, header=True, framed=True) == msg
assert data.read() == b''
Loading