From 3b7462f02c86571c2c2a99f6bbfddb62ab6d5500 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 24 Jun 2026 14:04:43 +0000 Subject: [PATCH] Reserve buffer capacity before every encode_into write The optimized encode path writes primitives directly into a growable EncodeBuffer (initial 64KB). Only Bytes reserved space via out.ensure(); every other writer assumed room existed. When a payload forces ensure() to grow the buffer to exactly fit (zero trailing slack, i.e. when pos + payload > 2 * capacity), the next write ran off the end: - UnsignedVarInt32 / tagged-field count -> IndexError (the reported bug) - fixed fields (e.g. the next partition's int32 index) -> struct.error - arrays of fixed primitives ([]int32) overflow at 64KB with no Bytes or tagged fields involved at all - codegen kept a stale `buf` local after a delegated tagged-field encode reallocated out.buf (silent data loss / crash); adding ensure() to the varint made this fire routinely Make capacity reservation uniform: - runtime codecs (types.py, tagged_fields.py) call out.ensure(N) before writing N bytes (fixed/uuid/bitfield/varint, String prefix+payload, Bytes length prefix, tagged size byte) - codegen gains CodegenContext.emit_reserve(): an inline `if pos + N > _cap` check that only syncs/grows/re-reads on overflow, keeping buf/pos/_cap in sync - ArrayField bulk-reserves fixed-element arrays once, then writes the run with a tight pack_into loop (keeps the hot path fast) - struct.py / struct_array.py re-bind buf/_cap after every delegated encode_into that can reallocate the buffer Document the EncodeBuffer capacity contract and the reallocation hazard (callers must re-read out.buf after a grow). Adds test/protocol/schemas/test_encode_large.py: boundary unit tests for every codec plus byte-for-byte parity and round-trip for large records (Produce v9/v3, single and multi partition), large []int32 arrays, the 64KB/128KB grow boundaries, and pooled-buffer reuse. 22 of these fail on the prior tree. Co-Authored-By: Claude Opus 4.8 (1M context) --- kafka/protocol/schemas/fields/array.py | 23 ++- .../schemas/fields/codecs/encode_buffer.py | 41 +++- .../schemas/fields/codecs/tagged_fields.py | 1 + kafka/protocol/schemas/fields/codecs/types.py | 43 +++-- kafka/protocol/schemas/fields/codegen.py | 31 +++ kafka/protocol/schemas/fields/struct.py | 10 + kafka/protocol/schemas/fields/struct_array.py | 7 +- test/protocol/schemas/test_encode_large.py | 181 ++++++++++++++++++ 8 files changed, 319 insertions(+), 18 deletions(-) create mode 100644 test/protocol/schemas/test_encode_large.py diff --git a/kafka/protocol/schemas/fields/array.py b/kafka/protocol/schemas/fields/array.py index 2b9cad9e1..9f18bfb3c 100644 --- a/kafka/protocol/schemas/fields/array.py +++ b/kafka/protocol/schemas/fields/array.py @@ -72,9 +72,10 @@ def emit_encode_into(self, ctx, val_expr, indent, version=None, compact=False, t if compact: an = ctx.next_var('an') ctx.emit(indent, '%s = len(%s) + 1 if %s is not None else 0' % (an, val_expr, val_expr)) - UnsignedVarInt32.emit_encode_into(ctx, an, indent) + UnsignedVarInt32.emit_encode_into(ctx, an, indent) # reserves the length varint ctx.emit(indent, 'if %s is not None:' % val_expr) else: + ctx.emit_reserve(indent, 4) ctx.emit(indent, 'if %s is None:' % val_expr) ctx.emit(indent, " pack_into('>i', buf, pos, -1)") ctx.emit(indent, ' pos += 4') @@ -83,9 +84,23 @@ def emit_encode_into(self, ctx, val_expr, indent, version=None, compact=False, t ctx.emit(indent, ' pos += 4') guard = indent + ' ' item_var = ctx.next_var('ai') - ctx.emit(guard, 'for %s in %s:' % (item_var, val_expr)) - self.array_of.emit_encode_into(ctx, item_var, guard + ' ', - version=version, compact=compact, tagged=tagged) + elem = self.array_of + if isinstance(elem, SimpleField) and elem.is_batchable(): + # Fast path: array of fixed-size primitives (e.g. []int32). Reserve + # the whole run with a single ensure, then write each element inline + # without a per-element capacity check. The reserve sits inside the + # `is not None` branch (compact) / `else` branch (non-compact), so + # len() is always safe here. + be_fmt = elem._type._be_fmt + size = elem._type.size + ctx.emit_reserve(guard, '%d * len(%s)' % (size, val_expr)) + ctx.emit(guard, 'for %s in %s:' % (item_var, val_expr)) + ctx.emit(guard, " pack_into('%s', buf, pos, %s)" % (be_fmt, item_var)) + ctx.emit(guard, ' pos += %d' % size) + else: + ctx.emit(guard, 'for %s in %s:' % (item_var, val_expr)) + elem.emit_encode_into(ctx, item_var, guard + ' ', + version=version, compact=compact, tagged=tagged) def emit_decode_from(self, ctx, var_name, indent, version=None, compact=False, tagged=False): n = ctx.next_var('n') diff --git a/kafka/protocol/schemas/fields/codecs/encode_buffer.py b/kafka/protocol/schemas/fields/codecs/encode_buffer.py index 3230c4025..77f5943b0 100644 --- a/kafka/protocol/schemas/fields/codecs/encode_buffer.py +++ b/kafka/protocol/schemas/fields/codecs/encode_buffer.py @@ -2,7 +2,35 @@ class EncodeBuffer: - """Growable buffer for encode_into operations.""" + """Growable byte buffer for the ``encode_into`` fast path. + + The encoders write primitives directly into ``buf`` at offset ``pos`` + rather than building and joining intermediate ``bytes`` objects. The + buffer starts at a fixed size and grows on demand via :meth:`ensure`. + + Capacity contract + ----------------- + Writing past the end of ``buf`` does not grow it automatically: + + * single-byte index writes (``buf[pos] = x``) raise ``IndexError``, + * ``pack_into`` raises ``struct.error``, + * slice assignment (``buf[pos:pos+n] = data``) silently *resizes* the + bytearray, defeating the preallocation. + + Therefore **every writer must call** ``ensure(n)`` to reserve ``n`` bytes + before writing ``n`` bytes at ``pos`` (where ``n`` is the maximum the write + can consume - e.g. ``5`` for a varint32, a fixed field's ``size``, or + ``len(payload)`` for variable data). See the codecs in ``types.py`` for the + pattern, and ``CodegenContext.emit_reserve`` for the compiled equivalent. + + Reallocation note + ----------------- + ``ensure`` may replace ``buf`` with a larger bytearray. Any caller (or + generated code) that caches ``buf`` in a local **must re-read** ``self.buf`` + after a call that can grow it - including indirect growth through a nested + ``encode_into`` / ``ensure``. Forgetting to re-read leaves writes targeting + the old, discarded buffer (silent data loss) or raises out of range. + """ __slots__ = ('buf', 'pos') def __init__(self, size=65536): @@ -14,6 +42,16 @@ def reset(self): self.pos = 0 def ensure(self, needed): + """Guarantee at least ``needed`` writable bytes remain at ``pos``. + + Call this *before* writing ``needed`` bytes at ``self.pos``. If the + current buffer cannot hold them it is grown (at least doubled) and the + existing ``[:pos]`` content is preserved. + + WARNING: this may rebind ``self.buf`` to a new bytearray, so re-read + ``self.buf`` afterwards if you hold a local reference to it (see the + class docstring). + """ if self.pos + needed > len(self.buf): new_size = max(len(self.buf) * 2, self.pos + needed) new_buf = bytearray(new_size) @@ -21,6 +59,7 @@ def ensure(self, needed): self.buf = new_buf def result(self): + """Return the encoded bytes written so far (``buf[:pos]``).""" # Return a bytearray slice (one copy) rather than bytes(self.buf[:pos]) # (two copies - the slice creates a bytearray, then bytes() copies # again). Downstream consumers (protocol codec slice assignment, diff --git a/kafka/protocol/schemas/fields/codecs/tagged_fields.py b/kafka/protocol/schemas/fields/codecs/tagged_fields.py index f069da88b..dac1d010d 100644 --- a/kafka/protocol/schemas/fields/codecs/tagged_fields.py +++ b/kafka/protocol/schemas/fields/codecs/tagged_fields.py @@ -60,6 +60,7 @@ def encode_into(self, item, out, version=None): # Encode value into buffer after reserving space for size prefix. # Strategy: assume size fits in 1 byte (< 128), encode value, # then fix up if the size varint is larger. + out.ensure(1) # reserve the assumed 1-byte size prefix size_pos = out.pos out.pos += 1 # reserve 1 byte for size val_start = out.pos diff --git a/kafka/protocol/schemas/fields/codecs/types.py b/kafka/protocol/schemas/fields/codecs/types.py index e423cd02a..88fc72f2b 100644 --- a/kafka/protocol/schemas/fields/codecs/types.py +++ b/kafka/protocol/schemas/fields/codecs/types.py @@ -26,6 +26,7 @@ def encode(cls, value, compact=False): @classmethod def encode_into(cls, out, value, compact=False): + out.ensure(cls.size) pack_into(cls._be_fmt, out.buf, out.pos, value) out.pos += cls.size @@ -40,6 +41,7 @@ def decode_from(cls, data, pos): @classmethod def emit_encode_into(cls, ctx, val_expr, indent, compact=False): + ctx.emit_reserve(indent, cls.size) ctx.emit(indent, "pack_into('%s', buf, pos, %s)" % (cls._be_fmt, val_expr)) ctx.emit(indent, 'pos += %d' % cls.size) @@ -101,6 +103,7 @@ def encode_into(cls, out, value, compact=False): b = value.bytes else: b = uuid.UUID(value).bytes + out.ensure(16) pos = out.pos out.buf[pos:pos+16] = b out.pos = pos + 16 @@ -111,6 +114,7 @@ def emit_encode_into(cls, ctx, val_expr, indent, compact=False): v = ctx.next_var('uv') ctx.emit(indent, '%s = %s' % (v, val_expr)) ctx.emit(indent, 'if %s is None: %s = _ZERO_UUID' % (v, v)) + ctx.emit_reserve(indent, 16) ctx.emit(indent, 'buf[pos:pos+16] = %s.bytes if hasattr(%s, "bytes") else __import__("uuid").UUID(%s).bytes' % (v, v, v)) ctx.emit(indent, 'pos += 16') @@ -151,43 +155,52 @@ def encode(self, value, compact=False): def encode_into(self, out, value, compact=False): if compact: if value is None: - UnsignedVarInt32.encode_into(out,0) + UnsignedVarInt32.encode_into(out, 0) # ensures internally return value = str(value).encode(self.encoding) - UnsignedVarInt32.encode_into(out,len(value) + 1) + UnsignedVarInt32.encode_into(out, len(value) + 1) # ensures internally else: if value is None: + out.ensure(2) pack_into('>h', out.buf, out.pos, -1) out.pos += 2 return value = str(value).encode(self.encoding) + out.ensure(2) pack_into('>h', out.buf, out.pos, len(value)) out.pos += 2 n = len(value) + out.ensure(n) pos = out.pos out.buf[pos:pos+n] = value out.pos = pos + n def emit_encode_into(self, ctx, val_expr, indent, compact=False): sv = ctx.next_var('sv') + body = indent + ' ' ctx.emit(indent, 'if %s is None:' % val_expr) if compact: + ctx.emit_reserve(body, 1) ctx.emit(indent, ' buf[pos] = 0') ctx.emit(indent, ' pos += 1') ctx.emit(indent, 'else:') sn = ctx.next_var('sn') ctx.emit(indent, ' %s = str(%s).encode("utf-8")' % (sv, val_expr)) ctx.emit(indent, ' %s = len(%s) + 1' % (sn, sv)) - UnsignedVarInt32.emit_encode_into(ctx, sn, indent + ' ') + UnsignedVarInt32.emit_encode_into(ctx, sn, body) # reserves the length varint + ctx.emit_reserve(body, 'len(%s)' % sv) ctx.emit(indent, ' buf[pos:pos+len(%s)] = %s' % (sv, sv)) ctx.emit(indent, ' pos += len(%s)' % sv) else: + ctx.emit_reserve(body, 2) ctx.emit(indent, " pack_into('>h', buf, pos, -1)") ctx.emit(indent, ' pos += 2') ctx.emit(indent, 'else:') ctx.emit(indent, ' %s = str(%s).encode("utf-8")' % (sv, val_expr)) + ctx.emit_reserve(body, 2) ctx.emit(indent, " pack_into('>h', buf, pos, len(%s))" % sv) ctx.emit(indent, ' pos += 2') + ctx.emit_reserve(body, 'len(%s)' % sv) ctx.emit(indent, ' buf[pos:pos+len(%s)] = %s' % (sv, sv)) ctx.emit(indent, ' pos += len(%s)' % sv) @@ -240,14 +253,16 @@ def encode_into(cls, out, value, compact=False): value = value.encode() if compact: if value is None: - UnsignedVarInt32.encode_into(out, 0) + UnsignedVarInt32.encode_into(out, 0) # ensures internally return - UnsignedVarInt32.encode_into(out, len(value) + 1) + UnsignedVarInt32.encode_into(out, len(value) + 1) # ensures internally else: if value is None: + out.ensure(4) pack_into('>i', out.buf, out.pos, -1) out.pos += 4 return + out.ensure(4) pack_into('>i', out.buf, out.pos, len(value)) out.pos += 4 n = len(value) @@ -260,35 +275,35 @@ def encode_into(cls, out, value, compact=False): def emit_encode_into(cls, ctx, val_expr, indent, compact=False): bv = ctx.next_var('bv') bn = ctx.next_var('bn') + body = indent + ' ' if compact: ctx.emit(indent, '%s = %s' % (bv, val_expr)) ctx.emit(indent, 'if %s is not None and not isinstance(%s, (bytes, bytearray, memoryview)): %s = %s.encode()' % (bv, bv, bv, bv)) ctx.emit(indent, 'if %s is None:' % bv) + ctx.emit_reserve(body, 1) ctx.emit(indent, ' buf[pos] = 0') ctx.emit(indent, ' pos += 1') ctx.emit(indent, 'else:') ctx.emit(indent, ' %s = len(%s)' % (bn, bv)) sn = ctx.next_var('sn') ctx.emit(indent, ' %s = %s + 1' % (sn, bn)) - UnsignedVarInt32.emit_encode_into(ctx, sn, indent + ' ') - ctx.emit(indent, ' out.pos = pos') - ctx.emit(indent, ' out.ensure(%s)' % bn) - ctx.emit(indent, ' buf = out.buf') + UnsignedVarInt32.emit_encode_into(ctx, sn, body) # reserves the length varint + ctx.emit_reserve(body, bn) ctx.emit(indent, ' buf[pos:pos+%s] = %s' % (bn, bv)) ctx.emit(indent, ' pos += %s' % bn) else: ctx.emit(indent, '%s = %s' % (bv, val_expr)) ctx.emit(indent, 'if %s is not None and not isinstance(%s, (bytes, bytearray, memoryview)): %s = %s.encode()' % (bv, bv, bv, bv)) ctx.emit(indent, 'if %s is None:' % bv) + ctx.emit_reserve(body, 4) ctx.emit(indent, " pack_into('>i', buf, pos, -1)") ctx.emit(indent, ' pos += 4') ctx.emit(indent, 'else:') ctx.emit(indent, ' %s = len(%s)' % (bn, bv)) + ctx.emit_reserve(body, 4) ctx.emit(indent, " pack_into('>i', buf, pos, %s)" % bn) ctx.emit(indent, ' pos += 4') - ctx.emit(indent, ' out.pos = pos') - ctx.emit(indent, ' out.ensure(%s)' % bn) - ctx.emit(indent, ' buf = out.buf') + ctx.emit_reserve(body, bn) ctx.emit(indent, ' buf[pos:pos+%s] = %s' % (bn, bv)) ctx.emit(indent, ' pos += %s' % bn) @@ -341,6 +356,7 @@ def encode(cls, value, compact=False): @classmethod def encode_into(cls, out, value): + out.ensure(5) # a varint32 is at most 5 bytes buf = out.buf pos = out.pos while (value & 0xffffff80) != 0: @@ -352,6 +368,7 @@ def encode_into(cls, out, value): @classmethod def emit_encode_into(cls, ctx, val_expr, indent, compact=False): + ctx.emit_reserve(indent, 5) # a varint32 is at most 5 bytes ctx.emit(indent, 'while (%s & 0xffffff80) != 0:' % val_expr) ctx.emit(indent, ' buf[pos] = (%s & 0x7f) | 0x80' % val_expr) ctx.emit(indent, ' %s >>= 7' % val_expr) @@ -462,6 +479,7 @@ def encode(cls, vals, compact=False): def encode_into(cls, out, vals, compact=False): if vals is None: vals = {31} + out.ensure(4) pack_into('>I', out.buf, out.pos, cls.to_32_bit_field(vals)) out.pos += 4 @@ -473,6 +491,7 @@ def emit_encode_into(cls, ctx, val_expr, indent, compact=False): ctx.emit(indent, 'if %s is None: %s = {31}' % (bf, bf)) ctx.emit(indent, '%s = 0' % bfi) ctx.emit(indent, 'for _b in %s: %s |= 1 << _b' % (bf, bfi)) + ctx.emit_reserve(indent, 4) ctx.emit(indent, "pack_into('>I', buf, pos, %s)" % bfi) ctx.emit(indent, 'pos += 4') diff --git a/kafka/protocol/schemas/fields/codegen.py b/kafka/protocol/schemas/fields/codegen.py index 911869e0c..cbb6f5b8f 100644 --- a/kafka/protocol/schemas/fields/codegen.py +++ b/kafka/protocol/schemas/fields/codegen.py @@ -28,6 +28,37 @@ def next_var(self, prefix='v'): def emit(self, indent, line): self.lines.append(f'{indent}{line}') + def emit_reserve(self, indent, nbytes): + """Emit an inline capacity check before a write of up to ``nbytes`` bytes. + + Generated encode functions keep three locals in sync: + + * ``buf`` - the destination bytearray (``out.buf``), + * ``pos`` - the current write offset, + * ``_cap`` - ``len(buf)``, the cached capacity. + + These are not set up per fragment: they are declared once by the + ``def _encode(item, out):`` preamble in + ``StructField.encode_into__optimized_context`` (the sole generator of + the compiled encode function), and every emitted fragment - including + this one - is spliced into that function body. Emit fragments are + therefore only valid inside that body; they cannot stand alone. + + ``nbytes`` is the MAXIMUM number of bytes the following write can + consume (an ``int`` for fixed/varint fields, or a string expression + such as ``'len(_bv1)'`` for a variable payload). The fast path is a + single comparison; only on overflow do we sync ``out.pos``, grow via + :meth:`EncodeBuffer.ensure`, and re-read the (possibly reallocated) + buffer back into ``buf``/``_cap``. + + Because :meth:`EncodeBuffer.ensure` may rebind ``out.buf``, code that + instead delegates to a runtime ``encode_into`` (e.g. tagged fields) + must re-bind ``buf``/``_cap`` itself afterwards - ``emit_reserve`` only + covers writes emitted inline. + """ + self.emit(indent, 'if pos + %s > _cap:' % nbytes) + self.emit(indent, ' out.pos = pos; out.ensure(%s); buf = out.buf; _cap = len(buf)' % nbytes) + def source(self): return '\n'.join(self.lines) diff --git a/kafka/protocol/schemas/fields/struct.py b/kafka/protocol/schemas/fields/struct.py index bd3d3d5ef..d931721e4 100644 --- a/kafka/protocol/schemas/fields/struct.py +++ b/kafka/protocol/schemas/fields/struct.py @@ -116,6 +116,7 @@ def emit_encode_into(self, ctx, item_expr, indent, version=None, compact=False, and not tuple_access ) if inline_nullable: + ctx.emit_reserve(indent, 1) ctx.emit(indent, 'if %s is None:' % item_expr) ctx.emit(indent, ' buf[pos] = 0xff') ctx.emit(indent, ' pos += 1') @@ -136,8 +137,13 @@ def emit_encode_into(self, ctx, item_expr, indent, version=None, compact=False, ctx.globs[tf_var] = self.tagged_fields(version) ctx.emit(indent, 'out.pos = pos') ctx.emit(indent, '%s.encode_into(%s, out, version=%d)' % (tf_var, item_expr, version)) + # TaggedFields.encode_into may grow (reallocate) out.buf, so re-bind + # the cached buf/_cap locals before any further inline writes. ctx.emit(indent, 'pos = out.pos') + ctx.emit(indent, 'buf = out.buf') + ctx.emit(indent, '_cap = len(buf)') elif tagged is None: + ctx.emit_reserve(indent, 1) ctx.emit(indent, 'buf[pos] = 0') ctx.emit(indent, 'pos += 1') @@ -170,9 +176,13 @@ def encode_into(self, item, out, version=None, compact=False, tagged=False): def encode_into__optimized_context(self, version, compact=False, tagged=False): ctx = CodegenContext() indent = ' ' + # Preamble: declare the buf/pos/_cap locals that every emitted fragment + # relies on (see CodegenContext.emit_reserve). Fragments are spliced + # into this function body and are not valid outside it. ctx.lines.append('def _encode(item, out):') ctx.emit(indent, 'buf = out.buf') ctx.emit(indent, 'pos = out.pos') + ctx.emit(indent, '_cap = len(buf) # cached len(buf); kept in sync by emit_reserve') self.emit_encode_into(ctx, 'item', indent, version=version, compact=compact, tagged=tagged) ctx.emit(indent, 'out.pos = pos') return ctx diff --git a/kafka/protocol/schemas/fields/struct_array.py b/kafka/protocol/schemas/fields/struct_array.py index 376041b16..b77451e6c 100644 --- a/kafka/protocol/schemas/fields/struct_array.py +++ b/kafka/protocol/schemas/fields/struct_array.py @@ -92,9 +92,10 @@ def emit_encode_into(self, ctx, val_expr, indent, version=None, compact=False, t if compact: an = ctx.next_var('an') ctx.emit(indent, '%s = len(%s) + 1 if %s is not None else 0' % (an, val_expr, val_expr)) - UnsignedVarInt32.emit_encode_into(ctx, an, indent) + UnsignedVarInt32.emit_encode_into(ctx, an, indent) # reserves the length varint ctx.emit(indent, 'if %s is not None:' % val_expr) else: + ctx.emit_reserve(indent, 4) ctx.emit(indent, 'if %s is None:' % val_expr) ctx.emit(indent, " pack_into('>i', buf, pos, -1)") ctx.emit(indent, ' pos += 4') @@ -118,8 +119,12 @@ def emit_encode_into(self, ctx, val_expr, indent, version=None, compact=False, t ctx.emit(scalar_indent, 'out.pos = pos') ctx.emit(scalar_indent, '# tagged fields for single-field struct') ctx.emit(scalar_indent, '%s.encode_into(%s, out, version=%d)' % (tf_var, item_var, version)) + # encode_into may reallocate out.buf - re-bind cached locals. ctx.emit(scalar_indent, 'pos = out.pos') + ctx.emit(scalar_indent, 'buf = out.buf') + ctx.emit(scalar_indent, '_cap = len(buf)') elif tagged is None: + ctx.emit_reserve(scalar_indent, 1) ctx.emit(scalar_indent, 'buf[pos] = 0') ctx.emit(scalar_indent, 'pos += 1') else: diff --git a/test/protocol/schemas/test_encode_large.py b/test/protocol/schemas/test_encode_large.py new file mode 100644 index 000000000..0e0e850bb --- /dev/null +++ b/test/protocol/schemas/test_encode_large.py @@ -0,0 +1,181 @@ +"""Regression tests for encoding records/messages that grow the EncodeBuffer. + +These cover the buffer-overflow family of bugs in the optimized (codegen) +encode path. The EncodeBuffer starts at 64KB and grows on demand; when a +payload forces a grow that lands exactly at the buffer end (zero trailing +slack), the *next* write used to run off the end: + + * UnsignedVarInt32 / tagged-field count -> IndexError (the reported bug) + * fixed fields (int32 partition index, ...) -> struct.error + * arrays of fixed primitives ([]int32) -> struct.error (no Bytes needed) + * stale `buf` local after a tagged-field encode reallocated the buffer + +Every writer must now reserve via EncodeBuffer.ensure() (runtime) or +CodegenContext.emit_reserve() (codegen) before writing. +""" +import io + +import pytest + +from kafka.protocol.schemas.fields.codecs.encode_buffer import EncodeBuffer +from kafka.protocol.schemas.fields.codecs import types as T +from kafka.protocol.producer.produce import ProduceRequest +from kafka.protocol.consumer.group import OffsetFetchRequest + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _reference_encode(m): + flexible = m.flexible_version_q(m.API_VERSION) + return bytes(m._struct.encode( + m, version=m.API_VERSION, compact=flexible, tagged=flexible)) + + +def _codegen_encode(m, buf=None): + flexible = m.flexible_version_q(m.API_VERSION) + fn = m._struct.compiled_encode_into( + m.API_VERSION, compact=flexible, tagged=flexible) + out = buf if buf is not None else EncodeBuffer() + fn(m, out) + return bytes(out.result()) + + +def _full_buffer(size=16): + """An EncodeBuffer whose pos sits exactly at the end (zero slack).""" + b = EncodeBuffer(size=size) + b.pos = size + return b + + +# --------------------------------------------------------------------------- +# Unit: writing into a completely-full buffer must grow, not crash, and the +# bytes written must match the reference encode(). +# --------------------------------------------------------------------------- + +@pytest.mark.parametrize('codec,value', [ + (T.Int8, 5), + (T.Int16, 1234), + (T.Int32, 1234567), + (T.Int64, 2 ** 40), + (T.UnsignedInt16, 65000), + (T.Float64, 3.14159), + (T.Boolean, True), + (T.BitField, {1, 5, 30}), + (T.UUID, None), +]) +def test_fixed_codec_encode_into_grows_full_buffer(codec, value): + out = _full_buffer() + start = out.pos + codec.encode_into(out, value) + assert bytes(out.buf[start:out.pos]) == bytes(codec.encode(value)) + + +def test_unsigned_varint32_encode_into_grows_full_buffer(): + # The originally-reported crash: buf[pos] = ... on a full buffer. + for value in (0, 1, 127, 128, 16384, 2 ** 28): + out = _full_buffer() + start = out.pos + T.UnsignedVarInt32.encode_into(out, value) + assert out.pos > start # wrote at least one byte without IndexError + + +@pytest.mark.parametrize('compact', [False, True]) +def test_bytes_encode_into_grows_full_buffer(compact): + out = _full_buffer() + start = out.pos + payload = b'z' * 5000 + T.Bytes.encode_into(out, payload, compact=compact) + assert bytes(out.buf[start:out.pos]) == bytes(T.Bytes.encode(payload, compact=compact)) + + +@pytest.mark.parametrize('compact', [False, True]) +def test_string_encode_into_grows_full_buffer(compact): + out = _full_buffer() + start = out.pos + s = T.String('utf-8') + value = 'hello' * 1000 # < int16 length limit for the non-compact case + s.encode_into(out, value, compact=compact) + assert bytes(out.buf[start:out.pos]) == bytes(s.encode(value, compact=compact)) + + +def test_ensure_reallocates_and_preserves_content(): + out = EncodeBuffer(size=8) + out.buf[:4] = b'\x01\x02\x03\x04' + out.pos = 4 + out.ensure(100) # forces a grow + assert len(out.buf) >= 104 + assert bytes(out.buf[:4]) == b'\x01\x02\x03\x04' + + +# --------------------------------------------------------------------------- +# End-to-end: compiled encode of large messages must be byte-for-byte equal to +# the reference encode and must decode back. These are the scenarios that +# crashed before the fix. +# --------------------------------------------------------------------------- + +def _produce_v9(partitions): + return ProduceRequest[9]( + transactional_id=None, acks=1, timeout_ms=1000, + topic_data=[('t', partitions)]) + + +def _produce_v3(partitions): + return ProduceRequest[3]( + transactional_id=None, acks=1, timeout_ms=1000, + topic_data=[('t', partitions)]) + + +@pytest.mark.parametrize('size', [ + 65530, 65532, 65536, 65537, # around the initial 64KB buffer + 131068, 131072, 131074, # around the first doubling (zero-slack) + 200000, +]) +def test_produce_v9_large_records_parity(size): + """Flexible Produce: large records followed by a tagged-field count. + + This is the originally-reported IndexError path. + """ + m = _produce_v9([(0, b'X' * size)]) + assert _codegen_encode(m) == _reference_encode(m) + + +def test_produce_v9_multi_partition_large_records(): + """Two large partitions: the second partition's int32 index is written + + right after the first partition's tagged-field encode reallocated the + buffer (the stale-`buf` regression). Must round-trip. + """ + m = _produce_v9([(0, b'R' * 200_000), (1, b'S' * 200_000)]) + data = _codegen_encode(m) + assert data == _reference_encode(m) + decoded = ProduceRequest[9].decode(io.BytesIO(data), version=9) + assert len(decoded.topic_data[0].partition_data) == 2 + + +def test_produce_v3_multi_partition_large_records(): + """Non-flexible Produce (no tagged fields): the fixed-field-after-Bytes path.""" + m = _produce_v3([(0, b'R' * 200_000), (1, b'S' * 10)]) + assert _codegen_encode(m) == _reference_encode(m) + + +@pytest.mark.parametrize('count', [16000, 40000, 100000]) +def test_offset_fetch_large_int32_array_parity(count): + """A []int32 large enough to overflow 64KB - no Bytes, no tagged fields. + + Exercises the array fixed-element fast path's bulk reservation. + """ + m = OffsetFetchRequest[1](group_id='g', topics=[('t', list(range(count)))]) + assert _codegen_encode(m) == _reference_encode(m) + + +def test_buffer_reuse_across_messages(): + """A pooled buffer that grew for a big message must still encode correctly + when reused (reset) for subsequent messages of varying size.""" + out = EncodeBuffer() + big = _produce_v9([(0, b'R' * 200_000)]) + small = _produce_v9([(0, b'hi'), (1, None)]) + for m in (big, small, big, small): + out.reset() + assert _codegen_encode(m, buf=out) == _reference_encode(m)