From 58afbf6ef2f7c1f147561e6998807ffe88e707a6 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Mon, 9 Mar 2026 14:04:21 -0700 Subject: [PATCH] Expanded ApiVersions test from api-message branch. --- kafka/protocol/api_versions.py | 2 +- test/protocol/test_api_versions.py | 19 +++++++++++++++++-- 2 files changed, 18 insertions(+), 3 deletions(-) diff --git a/kafka/protocol/api_versions.py b/kafka/protocol/api_versions.py index c6ef407cb..b7571e1cd 100644 --- a/kafka/protocol/api_versions.py +++ b/kafka/protocol/api_versions.py @@ -24,7 +24,7 @@ def decode(cls, data, header=False, framed=False): if framed: nbytes = Int32.decode(data) if header: - cls.parse_header(data) + cls.parse_header(data) # Note: non-flexible header err = Int16.decode(data) data.seek(curr) if err != 0: diff --git a/test/protocol/test_api_versions.py b/test/protocol/test_api_versions.py index ece60035e..c1cf1353a 100644 --- a/test/protocol/test_api_versions.py +++ b/test/protocol/test_api_versions.py @@ -1,6 +1,8 @@ +from io import BytesIO + import pytest -from kafka.protocol.api import Request +from kafka.protocol.api import Request, ResponseHeader from kafka.protocol.api_versions import ApiVersionsRequest, ApiVersionsResponse from kafka.protocol.types import Int32 from kafka.version import __version__ @@ -70,4 +72,17 @@ def test_parse(msg, encoded): else: msg.with_header(correlation_id=1) assert msg.encode(header=True, framed=True) == encoded - assert msg.decode(encoded, header=True, framed=True) == msg + data = BytesIO(encoded) + assert msg.decode(data, header=True, framed=True) == msg + assert data.read() == b'' + + +@pytest.mark.parametrize('version', [0, 1, 2, 3, 4]) +def test_parse_flexible_error(version): + # An unsupported request version returns v0 response. Make sure this works with all versions! + msg = ApiVersionsResponse[0](error_code=35, api_versions=[(18, 0, 3)]) + msg.with_header(correlation_id=1) + encoded = b'\x00\x00\x00\x10\x00\x00\x00\x01\x00#\x00\x00\x00\x01\x00\x12\x00\x00\x00\x03' + data = BytesIO(encoded) + assert ApiVersionsResponse[version].decode(data, header=True, framed=True) == msg + assert data.read() == b''