Skip to content

Commit 643890a

Browse files
authored
Fix new protocol tagged fields encoding (#2745)
1 parent a908951 commit 643890a

3 files changed

Lines changed: 35 additions & 2 deletions

File tree

kafka/protocol/new/schemas/fields/codecs/__init__.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,12 @@
44
Int8, Int16, Int32, Int64, UnsignedVarInt32, Float64,
55
Bytes, CompactBytes, String, CompactString,
66
)
7+
from .tagged_fields import TaggedFields
8+
9+
__all__ = [
10+
'Array', 'CompactArray',
11+
'BitField', 'Boolean', 'UUID',
12+
'Int8', 'Int16', 'Int32', 'Int64', 'UnsignedVarInt32', 'Float64',
13+
'Bytes', 'CompactBytes', 'String', 'CompactString',
14+
'TaggedFields',
15+
]

kafka/protocol/new/schemas/fields/codecs/tagged_fields.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,10 @@ def encode(self, item, version=None, compact=True, tagged=False):
2323
for tag, val in tags:
2424
ret.append(UnsignedVarInt32.encode(tag))
2525
# Tags that are structs never include nested tagged fields
26-
ret.append(self._tags[tag].encode(val, version=version,
27-
compact=True, tagged=False))
26+
encoded_val = self._tags[tag].encode(val, version=version,
27+
compact=True, tagged=False)
28+
ret.append(UnsignedVarInt32.encode(len(encoded_val)))
29+
ret.append(encoded_val)
2830
return b''.join(ret)
2931

3032
def decode(self, data, version=None, compact=True, tagged=False):
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
import io
2+
3+
import pytest
4+
5+
from kafka.protocol.new.schemas.fields import SimpleField
6+
from kafka.protocol.new.schemas.fields.codecs import TaggedFields, UnsignedVarInt32
7+
8+
9+
def test_tagged_fields():
10+
tags = TaggedFields([
11+
SimpleField({'name': 'foo', 'tag': 0, 'type': 'int16', 'versions': "0+"}),
12+
SimpleField({'name': 'bar', 'tag': 1, 'type': 'string', 'versions': "0+"}),
13+
])
14+
val = {'foo': 2, 'bar': 'foobar'}
15+
encoded = tags.encode(val, version=0)
16+
# length(2), tag(0), size(2), b'\x00\x02', tag(1), size(7), len(7), 'foobar'
17+
expected = (UnsignedVarInt32.encode(2) +
18+
UnsignedVarInt32.encode(0) + UnsignedVarInt32.encode(2) + b'\x00\x02' +
19+
UnsignedVarInt32.encode(1) + UnsignedVarInt32.encode(7) + UnsignedVarInt32.encode(7) + b'foobar')
20+
assert encoded == expected
21+
decoded = tags.decode(io.BytesIO(encoded), version=0)
22+
assert decoded == val

0 commit comments

Comments
 (0)