diff --git a/kafka/codec.py b/kafka/codec.py index 7177a646f..e523f4229 100644 --- a/kafka/codec.py +++ b/kafka/codec.py @@ -319,7 +319,5 @@ def zstd_encode(payload): def zstd_decode(payload): if not zstd: raise NotImplementedError("Zstd codec is not available") - try: - return zstd.ZstdDecompressor().decompress(payload) - except zstd.ZstdError: - return zstd.ZstdDecompressor().decompress(payload, max_output_size=ZSTD_MAX_OUTPUT_SIZE) + with zstd.ZstdDecompressor().stream_reader(io.BytesIO(payload), read_across_frames=True) as reader: + return reader.read() diff --git a/test/test_codec.py b/test/test_codec.py index 90c53a3fb..f04c1dfb8 100644 --- a/test/test_codec.py +++ b/test/test_codec.py @@ -121,3 +121,14 @@ def test_zstd(): b1 = random_string(100).encode('utf-8') b2 = zstd_decode(zstd_encode(b1)) assert b1 == b2 + + +@pytest.mark.skipif(not has_zstd(), reason="Zstd not available") +def test_zstd_multi_frame(): + """Test that zstd_decode handles multiple concatenated zstd frames.""" + frame1_data = b'some payload data ' * 100 + frame2_data = b'another frame of data ' * 100 + # Concatenate two independently compressed zstd frames + multi_frame_payload = zstd_encode(frame1_data) + zstd_encode(frame2_data) + result = zstd_decode(multi_frame_payload) + assert result == frame1_data + frame2_data