Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 19 additions & 4 deletions kafka/protocol/schemas/fields/array.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,10 @@ def emit_encode_into(self, ctx, val_expr, indent, version=None, compact=False, t
if compact:
an = ctx.next_var('an')
ctx.emit(indent, '%s = len(%s) + 1 if %s is not None else 0' % (an, val_expr, val_expr))
UnsignedVarInt32.emit_encode_into(ctx, an, indent)
UnsignedVarInt32.emit_encode_into(ctx, an, indent) # reserves the length varint
ctx.emit(indent, 'if %s is not None:' % val_expr)
else:
ctx.emit_reserve(indent, 4)
ctx.emit(indent, 'if %s is None:' % val_expr)
ctx.emit(indent, " pack_into('>i', buf, pos, -1)")
ctx.emit(indent, ' pos += 4')
Expand All @@ -83,9 +84,23 @@ def emit_encode_into(self, ctx, val_expr, indent, version=None, compact=False, t
ctx.emit(indent, ' pos += 4')
guard = indent + ' '
item_var = ctx.next_var('ai')
ctx.emit(guard, 'for %s in %s:' % (item_var, val_expr))
self.array_of.emit_encode_into(ctx, item_var, guard + ' ',
version=version, compact=compact, tagged=tagged)
elem = self.array_of
if isinstance(elem, SimpleField) and elem.is_batchable():
# Fast path: array of fixed-size primitives (e.g. []int32). Reserve
# the whole run with a single ensure, then write each element inline
# without a per-element capacity check. The reserve sits inside the
# `is not None` branch (compact) / `else` branch (non-compact), so
# len() is always safe here.
be_fmt = elem._type._be_fmt
size = elem._type.size
ctx.emit_reserve(guard, '%d * len(%s)' % (size, val_expr))
ctx.emit(guard, 'for %s in %s:' % (item_var, val_expr))
ctx.emit(guard, " pack_into('%s', buf, pos, %s)" % (be_fmt, item_var))
ctx.emit(guard, ' pos += %d' % size)
else:
ctx.emit(guard, 'for %s in %s:' % (item_var, val_expr))
elem.emit_encode_into(ctx, item_var, guard + ' ',
version=version, compact=compact, tagged=tagged)

def emit_decode_from(self, ctx, var_name, indent, version=None, compact=False, tagged=False):
n = ctx.next_var('n')
Expand Down
41 changes: 40 additions & 1 deletion kafka/protocol/schemas/fields/codecs/encode_buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,35 @@


class EncodeBuffer:
"""Growable buffer for encode_into operations."""
"""Growable byte buffer for the ``encode_into`` fast path.

The encoders write primitives directly into ``buf`` at offset ``pos``
rather than building and joining intermediate ``bytes`` objects. The
buffer starts at a fixed size and grows on demand via :meth:`ensure`.

Capacity contract
-----------------
Writing past the end of ``buf`` does not grow it automatically:

* single-byte index writes (``buf[pos] = x``) raise ``IndexError``,
* ``pack_into`` raises ``struct.error``,
* slice assignment (``buf[pos:pos+n] = data``) silently *resizes* the
bytearray, defeating the preallocation.

Therefore **every writer must call** ``ensure(n)`` to reserve ``n`` bytes
before writing ``n`` bytes at ``pos`` (where ``n`` is the maximum the write
can consume - e.g. ``5`` for a varint32, a fixed field's ``size``, or
``len(payload)`` for variable data). See the codecs in ``types.py`` for the
pattern, and ``CodegenContext.emit_reserve`` for the compiled equivalent.

Reallocation note
-----------------
``ensure`` may replace ``buf`` with a larger bytearray. Any caller (or
generated code) that caches ``buf`` in a local **must re-read** ``self.buf``
after a call that can grow it - including indirect growth through a nested
``encode_into`` / ``ensure``. Forgetting to re-read leaves writes targeting
the old, discarded buffer (silent data loss) or raises out of range.
"""
__slots__ = ('buf', 'pos')

def __init__(self, size=65536):
Expand All @@ -14,13 +42,24 @@ def reset(self):
self.pos = 0

def ensure(self, needed):
"""Guarantee at least ``needed`` writable bytes remain at ``pos``.

Call this *before* writing ``needed`` bytes at ``self.pos``. If the
current buffer cannot hold them it is grown (at least doubled) and the
existing ``[:pos]`` content is preserved.

WARNING: this may rebind ``self.buf`` to a new bytearray, so re-read
``self.buf`` afterwards if you hold a local reference to it (see the
class docstring).
"""
if self.pos + needed > len(self.buf):
new_size = max(len(self.buf) * 2, self.pos + needed)
new_buf = bytearray(new_size)
new_buf[:self.pos] = self.buf[:self.pos]
self.buf = new_buf

def result(self):
"""Return the encoded bytes written so far (``buf[:pos]``)."""
# Return a bytearray slice (one copy) rather than bytes(self.buf[:pos])
# (two copies - the slice creates a bytearray, then bytes() copies
# again). Downstream consumers (protocol codec slice assignment,
Expand Down
1 change: 1 addition & 0 deletions kafka/protocol/schemas/fields/codecs/tagged_fields.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ def encode_into(self, item, out, version=None):
# Encode value into buffer after reserving space for size prefix.
# Strategy: assume size fits in 1 byte (< 128), encode value,
# then fix up if the size varint is larger.
out.ensure(1) # reserve the assumed 1-byte size prefix
size_pos = out.pos
out.pos += 1 # reserve 1 byte for size
val_start = out.pos
Expand Down
43 changes: 31 additions & 12 deletions kafka/protocol/schemas/fields/codecs/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ def encode(cls, value, compact=False):

@classmethod
def encode_into(cls, out, value, compact=False):
out.ensure(cls.size)
pack_into(cls._be_fmt, out.buf, out.pos, value)
out.pos += cls.size

Expand All @@ -40,6 +41,7 @@ def decode_from(cls, data, pos):

@classmethod
def emit_encode_into(cls, ctx, val_expr, indent, compact=False):
ctx.emit_reserve(indent, cls.size)
ctx.emit(indent, "pack_into('%s', buf, pos, %s)" % (cls._be_fmt, val_expr))
ctx.emit(indent, 'pos += %d' % cls.size)

Expand Down Expand Up @@ -101,6 +103,7 @@ def encode_into(cls, out, value, compact=False):
b = value.bytes
else:
b = uuid.UUID(value).bytes
out.ensure(16)
pos = out.pos
out.buf[pos:pos+16] = b
out.pos = pos + 16
Expand All @@ -111,6 +114,7 @@ def emit_encode_into(cls, ctx, val_expr, indent, compact=False):
v = ctx.next_var('uv')
ctx.emit(indent, '%s = %s' % (v, val_expr))
ctx.emit(indent, 'if %s is None: %s = _ZERO_UUID' % (v, v))
ctx.emit_reserve(indent, 16)
ctx.emit(indent, 'buf[pos:pos+16] = %s.bytes if hasattr(%s, "bytes") else __import__("uuid").UUID(%s).bytes' % (v, v, v))
ctx.emit(indent, 'pos += 16')

Expand Down Expand Up @@ -151,43 +155,52 @@ def encode(self, value, compact=False):
def encode_into(self, out, value, compact=False):
if compact:
if value is None:
UnsignedVarInt32.encode_into(out,0)
UnsignedVarInt32.encode_into(out, 0) # ensures internally
return
value = str(value).encode(self.encoding)
UnsignedVarInt32.encode_into(out,len(value) + 1)
UnsignedVarInt32.encode_into(out, len(value) + 1) # ensures internally
else:
if value is None:
out.ensure(2)
pack_into('>h', out.buf, out.pos, -1)
out.pos += 2
return
value = str(value).encode(self.encoding)
out.ensure(2)
pack_into('>h', out.buf, out.pos, len(value))
out.pos += 2
n = len(value)
out.ensure(n)
pos = out.pos
out.buf[pos:pos+n] = value
out.pos = pos + n

def emit_encode_into(self, ctx, val_expr, indent, compact=False):
sv = ctx.next_var('sv')
body = indent + ' '
ctx.emit(indent, 'if %s is None:' % val_expr)
if compact:
ctx.emit_reserve(body, 1)
ctx.emit(indent, ' buf[pos] = 0')
ctx.emit(indent, ' pos += 1')
ctx.emit(indent, 'else:')
sn = ctx.next_var('sn')
ctx.emit(indent, ' %s = str(%s).encode("utf-8")' % (sv, val_expr))
ctx.emit(indent, ' %s = len(%s) + 1' % (sn, sv))
UnsignedVarInt32.emit_encode_into(ctx, sn, indent + ' ')
UnsignedVarInt32.emit_encode_into(ctx, sn, body) # reserves the length varint
ctx.emit_reserve(body, 'len(%s)' % sv)
ctx.emit(indent, ' buf[pos:pos+len(%s)] = %s' % (sv, sv))
ctx.emit(indent, ' pos += len(%s)' % sv)
else:
ctx.emit_reserve(body, 2)
ctx.emit(indent, " pack_into('>h', buf, pos, -1)")
ctx.emit(indent, ' pos += 2')
ctx.emit(indent, 'else:')
ctx.emit(indent, ' %s = str(%s).encode("utf-8")' % (sv, val_expr))
ctx.emit_reserve(body, 2)
ctx.emit(indent, " pack_into('>h', buf, pos, len(%s))" % sv)
ctx.emit(indent, ' pos += 2')
ctx.emit_reserve(body, 'len(%s)' % sv)
ctx.emit(indent, ' buf[pos:pos+len(%s)] = %s' % (sv, sv))
ctx.emit(indent, ' pos += len(%s)' % sv)

Expand Down Expand Up @@ -240,14 +253,16 @@ def encode_into(cls, out, value, compact=False):
value = value.encode()
if compact:
if value is None:
UnsignedVarInt32.encode_into(out, 0)
UnsignedVarInt32.encode_into(out, 0) # ensures internally
return
UnsignedVarInt32.encode_into(out, len(value) + 1)
UnsignedVarInt32.encode_into(out, len(value) + 1) # ensures internally
else:
if value is None:
out.ensure(4)
pack_into('>i', out.buf, out.pos, -1)
out.pos += 4
return
out.ensure(4)
pack_into('>i', out.buf, out.pos, len(value))
out.pos += 4
n = len(value)
Expand All @@ -260,35 +275,35 @@ def encode_into(cls, out, value, compact=False):
def emit_encode_into(cls, ctx, val_expr, indent, compact=False):
bv = ctx.next_var('bv')
bn = ctx.next_var('bn')
body = indent + ' '
if compact:
ctx.emit(indent, '%s = %s' % (bv, val_expr))
ctx.emit(indent, 'if %s is not None and not isinstance(%s, (bytes, bytearray, memoryview)): %s = %s.encode()' % (bv, bv, bv, bv))
ctx.emit(indent, 'if %s is None:' % bv)
ctx.emit_reserve(body, 1)
ctx.emit(indent, ' buf[pos] = 0')
ctx.emit(indent, ' pos += 1')
ctx.emit(indent, 'else:')
ctx.emit(indent, ' %s = len(%s)' % (bn, bv))
sn = ctx.next_var('sn')
ctx.emit(indent, ' %s = %s + 1' % (sn, bn))
UnsignedVarInt32.emit_encode_into(ctx, sn, indent + ' ')
ctx.emit(indent, ' out.pos = pos')
ctx.emit(indent, ' out.ensure(%s)' % bn)
ctx.emit(indent, ' buf = out.buf')
UnsignedVarInt32.emit_encode_into(ctx, sn, body) # reserves the length varint
ctx.emit_reserve(body, bn)
ctx.emit(indent, ' buf[pos:pos+%s] = %s' % (bn, bv))
ctx.emit(indent, ' pos += %s' % bn)
else:
ctx.emit(indent, '%s = %s' % (bv, val_expr))
ctx.emit(indent, 'if %s is not None and not isinstance(%s, (bytes, bytearray, memoryview)): %s = %s.encode()' % (bv, bv, bv, bv))
ctx.emit(indent, 'if %s is None:' % bv)
ctx.emit_reserve(body, 4)
ctx.emit(indent, " pack_into('>i', buf, pos, -1)")
ctx.emit(indent, ' pos += 4')
ctx.emit(indent, 'else:')
ctx.emit(indent, ' %s = len(%s)' % (bn, bv))
ctx.emit_reserve(body, 4)
ctx.emit(indent, " pack_into('>i', buf, pos, %s)" % bn)
ctx.emit(indent, ' pos += 4')
ctx.emit(indent, ' out.pos = pos')
ctx.emit(indent, ' out.ensure(%s)' % bn)
ctx.emit(indent, ' buf = out.buf')
ctx.emit_reserve(body, bn)
ctx.emit(indent, ' buf[pos:pos+%s] = %s' % (bn, bv))
ctx.emit(indent, ' pos += %s' % bn)

Expand Down Expand Up @@ -341,6 +356,7 @@ def encode(cls, value, compact=False):

@classmethod
def encode_into(cls, out, value):
out.ensure(5) # a varint32 is at most 5 bytes
buf = out.buf
pos = out.pos
while (value & 0xffffff80) != 0:
Expand All @@ -352,6 +368,7 @@ def encode_into(cls, out, value):

@classmethod
def emit_encode_into(cls, ctx, val_expr, indent, compact=False):
ctx.emit_reserve(indent, 5) # a varint32 is at most 5 bytes
ctx.emit(indent, 'while (%s & 0xffffff80) != 0:' % val_expr)
ctx.emit(indent, ' buf[pos] = (%s & 0x7f) | 0x80' % val_expr)
ctx.emit(indent, ' %s >>= 7' % val_expr)
Expand Down Expand Up @@ -462,6 +479,7 @@ def encode(cls, vals, compact=False):
def encode_into(cls, out, vals, compact=False):
if vals is None:
vals = {31}
out.ensure(4)
pack_into('>I', out.buf, out.pos, cls.to_32_bit_field(vals))
out.pos += 4

Expand All @@ -473,6 +491,7 @@ def emit_encode_into(cls, ctx, val_expr, indent, compact=False):
ctx.emit(indent, 'if %s is None: %s = {31}' % (bf, bf))
ctx.emit(indent, '%s = 0' % bfi)
ctx.emit(indent, 'for _b in %s: %s |= 1 << _b' % (bf, bfi))
ctx.emit_reserve(indent, 4)
ctx.emit(indent, "pack_into('>I', buf, pos, %s)" % bfi)
ctx.emit(indent, 'pos += 4')

Expand Down
31 changes: 31 additions & 0 deletions kafka/protocol/schemas/fields/codegen.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,37 @@ def next_var(self, prefix='v'):
def emit(self, indent, line):
self.lines.append(f'{indent}{line}')

def emit_reserve(self, indent, nbytes):
"""Emit an inline capacity check before a write of up to ``nbytes`` bytes.

Generated encode functions keep three locals in sync:

* ``buf`` - the destination bytearray (``out.buf``),
* ``pos`` - the current write offset,
* ``_cap`` - ``len(buf)``, the cached capacity.

These are not set up per fragment: they are declared once by the
``def _encode(item, out):`` preamble in
``StructField.encode_into__optimized_context`` (the sole generator of
the compiled encode function), and every emitted fragment - including
this one - is spliced into that function body. Emit fragments are
therefore only valid inside that body; they cannot stand alone.

``nbytes`` is the MAXIMUM number of bytes the following write can
consume (an ``int`` for fixed/varint fields, or a string expression
such as ``'len(_bv1)'`` for a variable payload). The fast path is a
single comparison; only on overflow do we sync ``out.pos``, grow via
:meth:`EncodeBuffer.ensure`, and re-read the (possibly reallocated)
buffer back into ``buf``/``_cap``.

Because :meth:`EncodeBuffer.ensure` may rebind ``out.buf``, code that
instead delegates to a runtime ``encode_into`` (e.g. tagged fields)
must re-bind ``buf``/``_cap`` itself afterwards - ``emit_reserve`` only
covers writes emitted inline.
"""
self.emit(indent, 'if pos + %s > _cap:' % nbytes)
self.emit(indent, ' out.pos = pos; out.ensure(%s); buf = out.buf; _cap = len(buf)' % nbytes)

def source(self):
return '\n'.join(self.lines)

Expand Down
10 changes: 10 additions & 0 deletions kafka/protocol/schemas/fields/struct.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ def emit_encode_into(self, ctx, item_expr, indent, version=None, compact=False,
and not tuple_access
)
if inline_nullable:
ctx.emit_reserve(indent, 1)
ctx.emit(indent, 'if %s is None:' % item_expr)
ctx.emit(indent, ' buf[pos] = 0xff')
ctx.emit(indent, ' pos += 1')
Expand All @@ -136,8 +137,13 @@ def emit_encode_into(self, ctx, item_expr, indent, version=None, compact=False,
ctx.globs[tf_var] = self.tagged_fields(version)
ctx.emit(indent, 'out.pos = pos')
ctx.emit(indent, '%s.encode_into(%s, out, version=%d)' % (tf_var, item_expr, version))
# TaggedFields.encode_into may grow (reallocate) out.buf, so re-bind
# the cached buf/_cap locals before any further inline writes.
ctx.emit(indent, 'pos = out.pos')
ctx.emit(indent, 'buf = out.buf')
ctx.emit(indent, '_cap = len(buf)')
elif tagged is None:
ctx.emit_reserve(indent, 1)
ctx.emit(indent, 'buf[pos] = 0')
ctx.emit(indent, 'pos += 1')

Expand Down Expand Up @@ -170,9 +176,13 @@ def encode_into(self, item, out, version=None, compact=False, tagged=False):
def encode_into__optimized_context(self, version, compact=False, tagged=False):
ctx = CodegenContext()
indent = ' '
# Preamble: declare the buf/pos/_cap locals that every emitted fragment
# relies on (see CodegenContext.emit_reserve). Fragments are spliced
# into this function body and are not valid outside it.
ctx.lines.append('def _encode(item, out):')
ctx.emit(indent, 'buf = out.buf')
ctx.emit(indent, 'pos = out.pos')
ctx.emit(indent, '_cap = len(buf) # cached len(buf); kept in sync by emit_reserve')
self.emit_encode_into(ctx, 'item', indent, version=version, compact=compact, tagged=tagged)
ctx.emit(indent, 'out.pos = pos')
return ctx
Expand Down
7 changes: 6 additions & 1 deletion kafka/protocol/schemas/fields/struct_array.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,10 @@ def emit_encode_into(self, ctx, val_expr, indent, version=None, compact=False, t
if compact:
an = ctx.next_var('an')
ctx.emit(indent, '%s = len(%s) + 1 if %s is not None else 0' % (an, val_expr, val_expr))
UnsignedVarInt32.emit_encode_into(ctx, an, indent)
UnsignedVarInt32.emit_encode_into(ctx, an, indent) # reserves the length varint
ctx.emit(indent, 'if %s is not None:' % val_expr)
else:
ctx.emit_reserve(indent, 4)
ctx.emit(indent, 'if %s is None:' % val_expr)
ctx.emit(indent, " pack_into('>i', buf, pos, -1)")
ctx.emit(indent, ' pos += 4')
Expand All @@ -118,8 +119,12 @@ def emit_encode_into(self, ctx, val_expr, indent, version=None, compact=False, t
ctx.emit(scalar_indent, 'out.pos = pos')
ctx.emit(scalar_indent, '# tagged fields for single-field struct')
ctx.emit(scalar_indent, '%s.encode_into(%s, out, version=%d)' % (tf_var, item_var, version))
# encode_into may reallocate out.buf - re-bind cached locals.
ctx.emit(scalar_indent, 'pos = out.pos')
ctx.emit(scalar_indent, 'buf = out.buf')
ctx.emit(scalar_indent, '_cap = len(buf)')
elif tagged is None:
ctx.emit_reserve(scalar_indent, 1)
ctx.emit(scalar_indent, 'buf[pos] = 0')
ctx.emit(scalar_indent, 'pos += 1')
else:
Expand Down
Loading