From 7874a549fdf01633b4227eccb7eb9299dad68095 Mon Sep 17 00:00:00 2001 From: Ivana Kellyer Date: Thu, 5 Mar 2026 15:06:12 +0100 Subject: [PATCH 1/2] ref: Per-bucket limits, fix envelope chunking --- sentry_sdk/_span_batcher.py | 84 +++++++++++++++++++++++-------------- 1 file changed, 53 insertions(+), 31 deletions(-) diff --git a/sentry_sdk/_span_batcher.py b/sentry_sdk/_span_batcher.py index 947eca3806..ef27da1e05 100644 --- a/sentry_sdk/_span_batcher.py +++ b/sentry_sdk/_span_batcher.py @@ -15,10 +15,15 @@ class SpanBatcher(Batcher["StreamedSpan"]): - # TODO[span-first]: size-based flushes - # TODO[span-first]: adjust flush/drop defaults + # MAX_BEFORE_FLUSH should be lower than MAX_BEFORE_DROP, so that there is + # a bit of a buffer for spans that appear between setting the flush event + # and actually flushing the buffer. + # + # The max limits are all per trace. + MAX_ENVELOPE_SIZE = 1000 # spans MAX_BEFORE_FLUSH = 1000 - MAX_BEFORE_DROP = 5000 + MAX_BEFORE_DROP = 2000 + MAX_BYTES_BEFORE_FLUSH = 5 * 1024 * 1024 # 5 MB FLUSH_WAIT_TIME = 5.0 TYPE = "span" @@ -35,6 +40,7 @@ def __init__( # envelope. # trace_id -> span buffer self._span_buffer: dict[str, list["StreamedSpan"]] = defaultdict(list) + self._running_size: dict[str, int] = defaultdict(lambda: 0) self._capture_func = capture_func self._record_lost_func = record_lost_func self._running = True @@ -45,16 +51,12 @@ def __init__( self._flusher: "Optional[threading.Thread]" = None self._flusher_pid: "Optional[int]" = None - def get_size(self) -> int: - # caller is responsible for locking before checking this - return sum(len(buffer) for buffer in self._span_buffer.values()) - def add(self, span: "StreamedSpan") -> None: if not self._ensure_thread() or self._flusher is None: return None with self._lock: - size = self.get_size() + size = len(self._span_buffer[span.trace_id]) if size >= self.MAX_BEFORE_DROP: self._record_lost_func( reason="queue_overflow", @@ -64,8 +66,22 @@ def add(self, span: "StreamedSpan") -> None: return None self._span_buffer[span.trace_id].append(span) + self._running_size[span.trace_id] += self._estimate_size(span) + if size + 1 >= self.MAX_BEFORE_FLUSH: self._flush_event.set() + return + + if self._running_size[span.trace_id] >= self.MAX_BYTES_BEFORE_FLUSH: + self._flush_event.set() + return + + @staticmethod + def _estimate_size(item: "StreamedSpan") -> int: + # Rough estimate of serialized span size that's quick to compute. + # 210 is the rough size of the payload without attributes, and we + # estimate additional 70 bytes on top of that per attribute. + return 210 + 70 * len(item._attributes) @staticmethod def _to_transport_format(item: "StreamedSpan") -> "Any": @@ -95,34 +111,40 @@ def _flush(self) -> None: # dsc = spans[0].dynamic_sampling_context() dsc = None - envelope = Envelope( - headers={ - "sent_at": format_timestamp(datetime.now(timezone.utc)), - "trace": dsc, - } - ) - - envelope.add_item( - Item( - type="span", - content_type="application/vnd.sentry.items.span.v2+json", + # Max per envelope is 1000, so if we happen to have more than + # 1000 spans in one bucket, we'll need to separate them. + for start in range(0, len(spans), self.MAX_ENVELOPE_SIZE): + end = min(start + self.MAX_ENVELOPE_SIZE, len(spans)) + + envelope = Envelope( headers={ - "item_count": len(spans), - }, - payload=PayloadRef( - json={ - "items": [ - self._to_transport_format(span) - for span in spans - ] - } - ), + "sent_at": format_timestamp(datetime.now(timezone.utc)), + "trace": dsc, + } + ) + + envelope.add_item( + Item( + type=self.TYPE, + content_type=self.CONTENT_TYPE, + headers={ + "item_count": end - start, + }, + payload=PayloadRef( + json={ + "items": [ + self._to_transport_format(spans[j]) + for j in range(start, end) + ] + } + ), + ) ) - ) - envelopes.append(envelope) + envelopes.append(envelope) self._span_buffer.clear() + self._running_size.clear() for envelope in envelopes: self._capture_func(envelope) From 63a9396a89ddefbc75025a9a2e84c491213c7fdb Mon Sep 17 00:00:00 2001 From: Ivana Kellyer Date: Thu, 5 Mar 2026 15:09:11 +0100 Subject: [PATCH 2/2] . --- sentry_sdk/_span_batcher.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/sentry_sdk/_span_batcher.py b/sentry_sdk/_span_batcher.py index ef27da1e05..baa67fbd9d 100644 --- a/sentry_sdk/_span_batcher.py +++ b/sentry_sdk/_span_batcher.py @@ -87,11 +87,15 @@ def _estimate_size(item: "StreamedSpan") -> int: def _to_transport_format(item: "StreamedSpan") -> "Any": # TODO[span-first] res: "dict[str, Any]" = { + "trace_id": item.trace_id, "span_id": item.span_id, "name": item._name, "status": item._status, } + if item._parent_span_id: + res["parent_span_id"] = item._parent_span_id + if item._attributes: res["attributes"] = { k: serialize_attribute(v) for (k, v) in item._attributes.items() @@ -102,7 +106,7 @@ def _to_transport_format(item: "StreamedSpan") -> "Any": def _flush(self) -> None: with self._lock: if len(self._span_buffer) == 0: - return None + return envelopes = [] for trace_id, spans in self._span_buffer.items():