From 19e08c0a4359b92673aa5a2416a39440486052d4 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 18 Mar 2026 23:12:54 -0700 Subject: [PATCH 1/2] Fix new TaggedField encoding --- .../new/schemas/fields/codecs/__init__.py | 9 ++++++++ .../schemas/fields/codecs/tagged_fields.py | 6 +++-- .../new/schemas/test_new_tagged_fields.py | 22 +++++++++++++++++++ 3 files changed, 35 insertions(+), 2 deletions(-) create mode 100644 test/protocol/new/schemas/test_new_tagged_fields.py diff --git a/kafka/protocol/new/schemas/fields/codecs/__init__.py b/kafka/protocol/new/schemas/fields/codecs/__init__.py index 0da4164e4..c98d1570a 100644 --- a/kafka/protocol/new/schemas/fields/codecs/__init__.py +++ b/kafka/protocol/new/schemas/fields/codecs/__init__.py @@ -4,3 +4,12 @@ Int8, Int16, Int32, Int64, UnsignedVarInt32, Float64, Bytes, CompactBytes, String, CompactString, ) +from .tagged_fields import TaggedFields + +__all__ = [ + 'Array', 'CompactArray', + 'BitField', 'Boolean', 'UUID', + 'Int8', 'Int16', 'Int32', 'Int64', 'UnsignedVarInt32', 'Float64', + 'Bytes', 'CompactBytes', 'String', 'CompactString', + 'TaggedFields', +] diff --git a/kafka/protocol/new/schemas/fields/codecs/tagged_fields.py b/kafka/protocol/new/schemas/fields/codecs/tagged_fields.py index 7ec916182..b15f24c6a 100644 --- a/kafka/protocol/new/schemas/fields/codecs/tagged_fields.py +++ b/kafka/protocol/new/schemas/fields/codecs/tagged_fields.py @@ -23,8 +23,10 @@ def encode(self, item, version=None, compact=True, tagged=False): for tag, val in tags: ret.append(UnsignedVarInt32.encode(tag)) # Tags that are structs never include nested tagged fields - ret.append(self._tags[tag].encode(val, version=version, - compact=True, tagged=False)) + encoded_val = self._tags[tag].encode(val, version=version, + compact=True, tagged=False) + ret.append(UnsignedVarInt32.encode(len(encoded_val))) + ret.append(encoded_val) return b''.join(ret) def decode(self, data, version=None, compact=True, tagged=False): diff --git a/test/protocol/new/schemas/test_new_tagged_fields.py b/test/protocol/new/schemas/test_new_tagged_fields.py new file mode 100644 index 000000000..ea25643eb --- /dev/null +++ b/test/protocol/new/schemas/test_new_tagged_fields.py @@ -0,0 +1,22 @@ +import io + +import pytest + +from kafka.protocol.new.schemas.fields import SimpleField +from kafka.protocol.new.schemas.fields.codecs import TaggedFields, UnsignedVarInt32 + + +def test_tagged_fields(): + tags = TaggedFields([ + SimpleField({'name': 'foo', 'tag': 0, 'type': 'string', 'versions': "0+"}), + SimpleField({'name': 'bar', 'tag': 1, 'type': 'string', 'versions': "0+"}), + ]) + val = {'foo': 'fizz', 'bar': 'foobar'} + encoded = tags.encode(val, version=0) + # length(2), tag(0), size(5), len(5), 'fizz', tag(1), size(7), len(7), 'foobar' + expected = (UnsignedVarInt32.encode(2) + + UnsignedVarInt32.encode(0) + UnsignedVarInt32.encode(5) + UnsignedVarInt32.encode(5) + b'fizz' + + UnsignedVarInt32.encode(1) + UnsignedVarInt32.encode(7) + UnsignedVarInt32.encode(7) + b'foobar') + assert encoded == expected + decoded = tags.decode(io.BytesIO(encoded), version=0) + assert decoded == val From 8308fd37ae6790f6c7b992ed6bd87dd72405e047 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 18 Mar 2026 23:22:14 -0700 Subject: [PATCH 2/2] test tagged fields with int16 + string fields --- test/protocol/new/schemas/test_new_tagged_fields.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/test/protocol/new/schemas/test_new_tagged_fields.py b/test/protocol/new/schemas/test_new_tagged_fields.py index ea25643eb..1de678af4 100644 --- a/test/protocol/new/schemas/test_new_tagged_fields.py +++ b/test/protocol/new/schemas/test_new_tagged_fields.py @@ -8,14 +8,14 @@ def test_tagged_fields(): tags = TaggedFields([ - SimpleField({'name': 'foo', 'tag': 0, 'type': 'string', 'versions': "0+"}), + SimpleField({'name': 'foo', 'tag': 0, 'type': 'int16', 'versions': "0+"}), SimpleField({'name': 'bar', 'tag': 1, 'type': 'string', 'versions': "0+"}), ]) - val = {'foo': 'fizz', 'bar': 'foobar'} + val = {'foo': 2, 'bar': 'foobar'} encoded = tags.encode(val, version=0) - # length(2), tag(0), size(5), len(5), 'fizz', tag(1), size(7), len(7), 'foobar' + # length(2), tag(0), size(2), b'\x00\x02', tag(1), size(7), len(7), 'foobar' expected = (UnsignedVarInt32.encode(2) + - UnsignedVarInt32.encode(0) + UnsignedVarInt32.encode(5) + UnsignedVarInt32.encode(5) + b'fizz' + + UnsignedVarInt32.encode(0) + UnsignedVarInt32.encode(2) + b'\x00\x02' + UnsignedVarInt32.encode(1) + UnsignedVarInt32.encode(7) + UnsignedVarInt32.encode(7) + b'foobar') assert encoded == expected decoded = tags.decode(io.BytesIO(encoded), version=0)