|
| 1 | +from io import BytesIO |
| 2 | + |
1 | 3 | import pytest |
2 | 4 |
|
3 | | -from kafka.protocol.api import Request |
| 5 | +from kafka.protocol.api import Request, ResponseHeader |
4 | 6 | from kafka.protocol.api_versions import ApiVersionsRequest, ApiVersionsResponse |
5 | 7 | from kafka.protocol.types import Int32 |
6 | 8 | from kafka.version import __version__ |
@@ -70,4 +72,17 @@ def test_parse(msg, encoded): |
70 | 72 | else: |
71 | 73 | msg.with_header(correlation_id=1) |
72 | 74 | assert msg.encode(header=True, framed=True) == encoded |
73 | | - assert msg.decode(encoded, header=True, framed=True) == msg |
| 75 | + data = BytesIO(encoded) |
| 76 | + assert msg.decode(data, header=True, framed=True) == msg |
| 77 | + assert data.read() == b'' |
| 78 | + |
| 79 | + |
| 80 | +@pytest.mark.parametrize('version', [0, 1, 2, 3, 4]) |
| 81 | +def test_parse_flexible_error(version): |
| 82 | + # An unsupported request version returns v0 response. Make sure this works with all versions! |
| 83 | + msg = ApiVersionsResponse[0](error_code=35, api_versions=[(18, 0, 3)]) |
| 84 | + msg.with_header(correlation_id=1) |
| 85 | + encoded = b'\x00\x00\x00\x10\x00\x00\x00\x01\x00#\x00\x00\x00\x01\x00\x12\x00\x00\x00\x03' |
| 86 | + data = BytesIO(encoded) |
| 87 | + assert ApiVersionsResponse[version].decode(data, header=True, framed=True) == msg |
| 88 | + assert data.read() == b'' |
0 commit comments