|
| 1 | +import io |
| 2 | +import uuid |
| 3 | +import struct |
| 4 | + |
| 5 | +import pytest |
| 6 | + |
| 7 | +from kafka.protocol.types import ( |
| 8 | + Int8, Int16, Int32, Int64, Float64, Boolean, UUID, |
| 9 | + String, Bytes, Array |
| 10 | +) |
| 11 | + |
| 12 | + |
| 13 | +@pytest.mark.parametrize("cls, value, expected", [ |
| 14 | + (Int8, 0, b'\x00'), |
| 15 | + (Int8, 127, b'\x7f'), |
| 16 | + (Int8, -128, b'\x80'), |
| 17 | + (Int16, 0, b'\x00\x00'), |
| 18 | + (Int16, 32767, b'\x7f\xff'), |
| 19 | + (Int16, -32768, b'\x80\x00'), |
| 20 | + (Int32, 0, b'\x00\x00\x00\x00'), |
| 21 | + (Int32, 2147483647, b'\x7f\xff\xff\xff'), |
| 22 | + (Int32, -2147483648, b'\x80\x00\x00\x00'), |
| 23 | + (Int64, 0, b'\x00\x00\x00\x00\x00\x00\x00\x00'), |
| 24 | + (Int64, 9223372036854775807, b'\x7f\xff\xff\xff\xff\xff\xff\xff'), |
| 25 | + (Int64, -9223372036854775808, b'\x80\x00\x00\x00\x00\x00\x00\x00'), |
| 26 | + (Float64, 0.0, b'\x00\x00\x00\x00\x00\x00\x00\x00'), |
| 27 | + (Float64, 1.0, b'\x3f\xf0\x00\x00\x00\x00\x00\x00'), |
| 28 | + (Boolean, True, b'\x01'), |
| 29 | + (Boolean, False, b'\x00'), |
| 30 | +]) |
| 31 | +def test_primitive_types(cls, value, expected): |
| 32 | + encoded = cls.encode(value) |
| 33 | + assert encoded == expected |
| 34 | + decoded = cls.decode(io.BytesIO(encoded)) |
| 35 | + assert decoded == value |
| 36 | + |
| 37 | + |
| 38 | +def test_uuid_type(): |
| 39 | + val = uuid.uuid4() |
| 40 | + encoded = UUID.encode(val) |
| 41 | + assert len(encoded) == 16 |
| 42 | + assert encoded == val.bytes |
| 43 | + decoded = UUID.decode(io.BytesIO(encoded)) |
| 44 | + assert decoded == val |
| 45 | + |
| 46 | + # Test with string |
| 47 | + val_str = str(val) |
| 48 | + encoded = UUID.encode(val_str) |
| 49 | + assert encoded == val.bytes |
| 50 | + |
| 51 | + |
| 52 | +def test_string_type(): |
| 53 | + s = String() |
| 54 | + assert s.encode(None) == b'\xff\xff' |
| 55 | + assert s.decode(io.BytesIO(b'\xff\xff')) is None |
| 56 | + |
| 57 | + val = "foo" |
| 58 | + encoded = s.encode(val) |
| 59 | + assert encoded == b'\x00\x03foo' |
| 60 | + assert s.decode(io.BytesIO(encoded)) == val |
| 61 | + |
| 62 | + # Test custom encoding |
| 63 | + s_utf16 = String('utf-16') |
| 64 | + val = "好" |
| 65 | + encoded = s_utf16.encode(val) |
| 66 | + assert len(encoded) == 6 |
| 67 | + decoded = s_utf16.decode(io.BytesIO(encoded)) |
| 68 | + assert decoded == val |
| 69 | + |
| 70 | + |
| 71 | +def test_bytes_type(): |
| 72 | + assert Bytes.encode(None) == b'\xff\xff\xff\xff' |
| 73 | + assert Bytes.decode(io.BytesIO(b'\xff\xff\xff\xff')) is None |
| 74 | + |
| 75 | + val = b"foo" |
| 76 | + encoded = Bytes.encode(val) |
| 77 | + assert encoded == b'\x00\x00\x00\x03foo' |
| 78 | + assert Bytes.decode(io.BytesIO(encoded)) == val |
| 79 | + |
| 80 | + |
| 81 | +def test_array_type(): |
| 82 | + arr = Array(Int32) |
| 83 | + assert arr.encode(None) == b'\xff\xff\xff\xff' |
| 84 | + assert arr.decode(io.BytesIO(b'\xff\xff\xff\xff')) is None |
| 85 | + |
| 86 | + val = [1, 2, 3] |
| 87 | + encoded = arr.encode(val) |
| 88 | + assert encoded == b'\x00\x00\x00\x03\x00\x00\x00\x01\x00\x00\x00\x02\x00\x00\x00\x03' |
| 89 | + assert arr.decode(io.BytesIO(encoded)) == val |
| 90 | + |
| 91 | + # Array of Schema |
| 92 | + arr_schema = Array(('f1', Int32), ('f2', Int32)) |
| 93 | + val = [(1, 10), (2, 20)] |
| 94 | + encoded = arr_schema.encode(val) |
| 95 | + assert arr_schema.decode(io.BytesIO(encoded)) == val |
| 96 | + |
| 97 | + |
| 98 | +def test_error_handling(): |
| 99 | + with pytest.raises(ValueError, match="Error encountered when attempting to convert value: 1000 to struct format: '>b'"): |
| 100 | + Int8.encode(1000) # Too large |
| 101 | + |
| 102 | + with pytest.raises(ValueError, match="Error encountered when attempting to convert value: None to struct format: '>h'"): |
| 103 | + Int16.encode(None) |
| 104 | + |
| 105 | + with pytest.raises(ValueError, match="Error encountered when attempting to convert value: b'' to struct format: '>b'"): |
| 106 | + Int8.decode(io.BytesIO(b'')) # Too short |
| 107 | + |
| 108 | + s = String() |
| 109 | + with pytest.raises(ValueError): |
| 110 | + s.decode(io.BytesIO(b'\x00\x05foo')) # length 5 but only 3 bytes |
0 commit comments