Skip to content
Merged
Show file tree
Hide file tree
Changes from 45 commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
946decc
ref: Remove flag storage from StreamedSpan
sentrivana Mar 5, 2026
f3ee55c
ref: Tweak StreamedSpan interface
sentrivana Mar 5, 2026
47ed910
Add missing logger
sentrivana Mar 5, 2026
5023c76
fixes
sentrivana Mar 5, 2026
6445447
ref: Add active to StreamedSpan
sentrivana Mar 5, 2026
47e6211
Add property
sentrivana Mar 5, 2026
1e7b694
ref: Add no-op streaming span class
sentrivana Mar 5, 2026
80bfe5a
Remove redundant stuff
sentrivana Mar 5, 2026
1f0ffc1
Merge branch 'master' into ivana/span-first-4-add-noop-span
sentrivana Mar 5, 2026
d773428
ref: Add experimental streaming API
sentrivana Mar 5, 2026
647fa79
reformat
sentrivana Mar 5, 2026
49bdbe6
Add a __repr__
sentrivana Mar 5, 2026
cdd8bd6
Merge branch 'master' into ivana/span-first-5-add-start-span-api
sentrivana Mar 5, 2026
54f81af
ref: Add new_trace, continue_trace to span first
sentrivana Mar 5, 2026
941863e
ref: Add streaming trace decorator
sentrivana Mar 5, 2026
4b14e8d
Remove redundant code
sentrivana Mar 5, 2026
474f8e6
simplify
sentrivana Mar 5, 2026
9996e29
Merge branch 'ivana/span-first-5-add-start-span-api' into ivana/span-…
sentrivana Mar 5, 2026
e20d4fd
Merge branch 'ivana/span-first-6-add-continue-and-new-trace' into iva…
sentrivana Mar 5, 2026
f2738ff
reorder imports
sentrivana Mar 5, 2026
7874a54
ref: Per-bucket limits, fix envelope chunking
sentrivana Mar 5, 2026
63a9396
.
sentrivana Mar 5, 2026
c974d3e
add dummy __enter__, __exit__
sentrivana Mar 5, 2026
5d8c238
Merge branch 'ivana/span-first-7-add-trace-decorator' into ivana/span…
sentrivana Mar 5, 2026
831adae
type hint
sentrivana Mar 5, 2026
656ef2e
Merge branch 'ivana/span-first-7-add-trace-decorator' into ivana/span…
sentrivana Mar 5, 2026
1dcf176
remove unused import
sentrivana Mar 5, 2026
0a7eae8
ref: Allow to start and finish StreamedSpans
sentrivana Mar 5, 2026
6888c56
Add end, finish to noop spans
sentrivana Mar 6, 2026
09e5cce
fixes
sentrivana Mar 6, 2026
ae2fd52
.
sentrivana Mar 6, 2026
f223574
Correctly detect user-set parent_span=None
sentrivana Mar 6, 2026
05a4157
Merge branch 'master' into ivana/span-first-5-add-start-span-api
sentrivana Mar 6, 2026
9e8e60e
mypy
sentrivana Mar 6, 2026
777a246
Merge branch 'ivana/span-first-5-add-start-span-api' into ivana/span-…
sentrivana Mar 6, 2026
9b1e2f3
Merge branch 'ivana/span-first-6-add-continue-and-new-trace' into iva…
sentrivana Mar 6, 2026
e589c53
Merge branch 'ivana/span-first-7-add-trace-decorator' into ivana/span…
sentrivana Mar 6, 2026
1487ea8
Merge branch 'ivana/span-first-8-bucket-based-limits-in-batcher' into…
sentrivana Mar 6, 2026
1006e7b
remove unused imports
sentrivana Mar 6, 2026
6c16dbf
Merge branch 'ivana/span-first-7-add-trace-decorator' into ivana/span…
sentrivana Mar 6, 2026
cb37a07
Merge branch 'ivana/span-first-8-bucket-based-limits-in-batcher' into…
sentrivana Mar 6, 2026
ad6e7cc
move where finished is set
sentrivana Mar 6, 2026
ba29f0c
remove finished
sentrivana Mar 6, 2026
d6a42b2
end_timestamp improvements
sentrivana Mar 6, 2026
5e20ad3
.
sentrivana Mar 6, 2026
c70fae4
fix
sentrivana Mar 6, 2026
b995770
simplify
sentrivana Mar 6, 2026
0235053
Merge branch 'master' into ivana/span-first-9-start-end
sentrivana Mar 9, 2026
b673a09
Merge branch 'master' into ivana/span-first-9-start-end
sentrivana Mar 9, 2026
d6fa965
.
sentrivana Mar 9, 2026
8614d52
Merge branch 'master' into ivana/span-first-9-start-end
sentrivana Mar 9, 2026
dab1970
add a guard
sentrivana Mar 9, 2026
2f0dc01
.
sentrivana Mar 9, 2026
bc9f765
Merge branch 'master' into ivana/span-first-9-start-end
sentrivana Mar 9, 2026
09b88f0
dont redefine slots
sentrivana Mar 9, 2026
68a5b96
Merge branch 'master' into ivana/span-first-9-start-end
sentrivana Mar 10, 2026
1d203bf
review
sentrivana Mar 10, 2026
86396ba
Merge branch 'master' into ivana/span-first-9-start-end
sentrivana Mar 10, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 62 additions & 32 deletions sentry_sdk/_span_batcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,15 @@


class SpanBatcher(Batcher["StreamedSpan"]):
# TODO[span-first]: size-based flushes
# TODO[span-first]: adjust flush/drop defaults
# MAX_BEFORE_FLUSH should be lower than MAX_BEFORE_DROP, so that there is
# a bit of a buffer for spans that appear between setting the flush event
# and actually flushing the buffer.
#
# The max limits are all per trace.
MAX_ENVELOPE_SIZE = 1000 # spans
MAX_BEFORE_FLUSH = 1000
MAX_BEFORE_DROP = 5000
MAX_BEFORE_DROP = 2000
MAX_BYTES_BEFORE_FLUSH = 5 * 1024 * 1024 # 5 MB
FLUSH_WAIT_TIME = 5.0

TYPE = "span"
Expand All @@ -35,6 +40,7 @@ def __init__(
# envelope.
# trace_id -> span buffer
self._span_buffer: dict[str, list["StreamedSpan"]] = defaultdict(list)
self._running_size: dict[str, int] = defaultdict(lambda: 0)
self._capture_func = capture_func
self._record_lost_func = record_lost_func
self._running = True
Expand All @@ -45,16 +51,12 @@ def __init__(
self._flusher: "Optional[threading.Thread]" = None
self._flusher_pid: "Optional[int]" = None

def get_size(self) -> int:
# caller is responsible for locking before checking this
return sum(len(buffer) for buffer in self._span_buffer.values())

def add(self, span: "StreamedSpan") -> None:
if not self._ensure_thread() or self._flusher is None:
return None

with self._lock:
size = self.get_size()
size = len(self._span_buffer[span.trace_id])
if size >= self.MAX_BEFORE_DROP:
self._record_lost_func(
reason="queue_overflow",
Expand All @@ -64,18 +66,40 @@ def add(self, span: "StreamedSpan") -> None:
return None

self._span_buffer[span.trace_id].append(span)
self._running_size[span.trace_id] += self._estimate_size(span)

if size + 1 >= self.MAX_BEFORE_FLUSH:
self._flush_event.set()
return

if self._running_size[span.trace_id] >= self.MAX_BYTES_BEFORE_FLUSH:
self._flush_event.set()
return

@staticmethod
def _estimate_size(item: "StreamedSpan") -> int:
# Rough estimate of serialized span size that's quick to compute.
# 210 is the rough size of the payload without attributes, and we
# estimate additional 70 bytes on top of that per attribute.
return 210 + 70 * len(item._attributes)

@staticmethod
def _to_transport_format(item: "StreamedSpan") -> "Any":
# TODO[span-first]
res: "dict[str, Any]" = {
"trace_id": item.trace_id,
"span_id": item.span_id,
"name": item._name,
"status": item._status,
"start_timestamp": item._start_timestamp.timestamp(),
}

if item._timestamp:
res["end_timestamp"] = item._timestamp.timestamp()
Comment on lines +97 to +98
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no scenario where a span that's already in the span batcher wouldn't have an end _timestamp, but mypy doesn't know that. 🤡


if item._parent_span_id:
res["parent_span_id"] = item._parent_span_id

if item._attributes:
res["attributes"] = {
k: serialize_attribute(v) for (k, v) in item._attributes.items()
Expand All @@ -86,7 +110,7 @@ def _to_transport_format(item: "StreamedSpan") -> "Any":
def _flush(self) -> None:
with self._lock:
if len(self._span_buffer) == 0:
return None
return

envelopes = []
for trace_id, spans in self._span_buffer.items():
Expand All @@ -95,34 +119,40 @@ def _flush(self) -> None:
# dsc = spans[0].dynamic_sampling_context()
dsc = None

envelope = Envelope(
headers={
"sent_at": format_timestamp(datetime.now(timezone.utc)),
"trace": dsc,
}
)

envelope.add_item(
Item(
type="span",
content_type="application/vnd.sentry.items.span.v2+json",
# Max per envelope is 1000, so if we happen to have more than
# 1000 spans in one bucket, we'll need to separate them.
for start in range(0, len(spans), self.MAX_ENVELOPE_SIZE):
end = min(start + self.MAX_ENVELOPE_SIZE, len(spans))

envelope = Envelope(
headers={
"item_count": len(spans),
},
payload=PayloadRef(
json={
"items": [
self._to_transport_format(span)
for span in spans
]
}
),
"sent_at": format_timestamp(datetime.now(timezone.utc)),
"trace": dsc,
}
)

envelope.add_item(
Item(
type=self.TYPE,
content_type=self.CONTENT_TYPE,
headers={
"item_count": end - start,
},
payload=PayloadRef(
json={
"items": [
self._to_transport_format(spans[j])
for j in range(start, end)
]
}
),
)
)
)

envelopes.append(envelope)
envelopes.append(envelope)

self._span_buffer.clear()
self._running_size.clear()

for envelope in envelopes:
self._capture_func(envelope)
55 changes: 54 additions & 1 deletion sentry_sdk/scope.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
normalize_incoming_data,
PropagationContext,
)
from sentry_sdk.traces import StreamedSpan
from sentry_sdk.traces import _DEFAULT_PARENT_SPAN, StreamedSpan, NoOpStreamedSpan
from sentry_sdk.tracing import (
BAGGAGE_HEADER_NAME,
SENTRY_TRACE_HEADER_NAME,
Expand Down Expand Up @@ -1174,6 +1174,59 @@ def start_span(

return span

def start_streamed_span(
self,
name: str,
attributes: "Optional[Attributes]",
parent_span: "Optional[StreamedSpan]",
active: bool,
) -> "StreamedSpan":
# TODO: rename to start_span once we drop the old API
if isinstance(parent_span, NoOpStreamedSpan):
# parent_span is only set if the user explicitly set it
logger.debug(
"Ignored parent span provided. Span will be parented to the "
"currently active span instead."
)

if parent_span is _DEFAULT_PARENT_SPAN or isinstance(
parent_span, NoOpStreamedSpan
):
parent_span = self.span # type: ignore

# If no eligible parent_span was provided and there is no currently
# active span, this is a segment
if parent_span is None:
propagation_context = self.get_active_propagation_context()

return StreamedSpan(
name=name,
attributes=attributes,
active=active,
scope=self,
segment=None,
trace_id=propagation_context.trace_id,
parent_span_id=propagation_context.parent_span_id,
parent_sampled=propagation_context.parent_sampled,
baggage=propagation_context.baggage,
)

# This is a child span; take propagation context from the parent span
with new_scope():
if isinstance(parent_span, NoOpStreamedSpan):
return NoOpStreamedSpan()

return StreamedSpan(
name=name,
attributes=attributes,
active=active,
scope=self,
segment=parent_span._segment,
trace_id=parent_span.trace_id,
parent_span_id=parent_span.span_id,
parent_sampled=parent_span.sampled,
)

def continue_trace(
self,
environ_or_headers: "Dict[str, Any]",
Expand Down
Loading
Loading