diff --git a/kafka/protocol/new/schemas/fields/codecs/__init__.py b/kafka/protocol/new/schemas/fields/codecs/__init__.py index 0da4164e4..c98d1570a 100644 --- a/kafka/protocol/new/schemas/fields/codecs/__init__.py +++ b/kafka/protocol/new/schemas/fields/codecs/__init__.py @@ -4,3 +4,12 @@ Int8, Int16, Int32, Int64, UnsignedVarInt32, Float64, Bytes, CompactBytes, String, CompactString, ) +from .tagged_fields import TaggedFields + +__all__ = [ + 'Array', 'CompactArray', + 'BitField', 'Boolean', 'UUID', + 'Int8', 'Int16', 'Int32', 'Int64', 'UnsignedVarInt32', 'Float64', + 'Bytes', 'CompactBytes', 'String', 'CompactString', + 'TaggedFields', +] diff --git a/kafka/protocol/new/schemas/fields/codecs/tagged_fields.py b/kafka/protocol/new/schemas/fields/codecs/tagged_fields.py index 7ec916182..b15f24c6a 100644 --- a/kafka/protocol/new/schemas/fields/codecs/tagged_fields.py +++ b/kafka/protocol/new/schemas/fields/codecs/tagged_fields.py @@ -23,8 +23,10 @@ def encode(self, item, version=None, compact=True, tagged=False): for tag, val in tags: ret.append(UnsignedVarInt32.encode(tag)) # Tags that are structs never include nested tagged fields - ret.append(self._tags[tag].encode(val, version=version, - compact=True, tagged=False)) + encoded_val = self._tags[tag].encode(val, version=version, + compact=True, tagged=False) + ret.append(UnsignedVarInt32.encode(len(encoded_val))) + ret.append(encoded_val) return b''.join(ret) def decode(self, data, version=None, compact=True, tagged=False): diff --git a/test/protocol/new/schemas/test_new_tagged_fields.py b/test/protocol/new/schemas/test_new_tagged_fields.py new file mode 100644 index 000000000..1de678af4 --- /dev/null +++ b/test/protocol/new/schemas/test_new_tagged_fields.py @@ -0,0 +1,22 @@ +import io + +import pytest + +from kafka.protocol.new.schemas.fields import SimpleField +from kafka.protocol.new.schemas.fields.codecs import TaggedFields, UnsignedVarInt32 + + +def test_tagged_fields(): + tags = TaggedFields([ + SimpleField({'name': 'foo', 'tag': 0, 'type': 'int16', 'versions': "0+"}), + SimpleField({'name': 'bar', 'tag': 1, 'type': 'string', 'versions': "0+"}), + ]) + val = {'foo': 2, 'bar': 'foobar'} + encoded = tags.encode(val, version=0) + # length(2), tag(0), size(2), b'\x00\x02', tag(1), size(7), len(7), 'foobar' + expected = (UnsignedVarInt32.encode(2) + + UnsignedVarInt32.encode(0) + UnsignedVarInt32.encode(2) + b'\x00\x02' + + UnsignedVarInt32.encode(1) + UnsignedVarInt32.encode(7) + UnsignedVarInt32.encode(7) + b'foobar') + assert encoded == expected + decoded = tags.decode(io.BytesIO(encoded), version=0) + assert decoded == val