Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 7 additions & 21 deletions src/sentry/scripts/spans/add-buffer.lua
Original file line number Diff line number Diff line change
Expand Up @@ -127,10 +127,13 @@ table.insert(latency_table, {"sunionstore_args_step_latency_ms", sunionstore_arg

-- Merge spans into the parent span set.
-- Used outside the if statement
local spop_end_time_ms = -1
local arg_cleanup_end_time_ms = sunionstore_args_end_time_ms
if #sunionstore_args > 0 then
local dest_memory = redis.call("memory", "usage", set_key) or 0
local already_oversized = dest_memory > max_segment_bytes
local ingested_byte_count_key = string.format("span-buf:ibc:%s", set_key)
local dest_bytes = tonumber(redis.call("get", ingested_byte_count_key) or 0)

local already_oversized = dest_bytes > max_segment_bytes
table.insert(metrics_table, {"parent_span_set_already_oversized", already_oversized and 1 or 0})

local use_zero_copy_dest = not already_oversized and zero_copy_dest_threshold > 0 and dest_memory > zero_copy_dest_threshold
Expand All @@ -142,7 +145,6 @@ if #sunionstore_args > 0 then
local output_size
if already_oversized then
-- Dest already exceeds max_segment_bytes, skip merge entirely.
-- The SPOP loop would just remove what we added.
output_size = start_output_size
elseif use_zero_copy_dest then
-- Zero-copy: read each source set and SADD its members into dest.
Expand Down Expand Up @@ -179,7 +181,6 @@ if #sunionstore_args > 0 then

-- Merge ingested count keys for merged spans
local ingested_count_key = string.format("span-buf:ic:%s", set_key)
local ingested_byte_count_key = string.format("span-buf:ibc:%s", set_key)
for i = 1, #sunionstore_args do
local merged_key = sunionstore_args[i]
local merged_ic_key = string.format("span-buf:ic:%s", merged_key)
Expand All @@ -196,18 +197,8 @@ if #sunionstore_args > 0 then
redis.call("del", merged_ibc_key)
end

local arg_cleanup_end_time_ms = get_time_ms()
arg_cleanup_end_time_ms = get_time_ms()
table.insert(latency_table, {"arg_cleanup_step_latency_ms", arg_cleanup_end_time_ms - unlink_end_time_ms})

local spopcalls = 0
while (redis.call("memory", "usage", set_key) or 0) > max_segment_bytes do
redis.call("spop", set_key)
spopcalls = spopcalls + 1
end

spop_end_time_ms = get_time_ms()
table.insert(latency_table, {"spop_step_latency_ms", spop_end_time_ms - arg_cleanup_end_time_ms})
table.insert(metrics_table, {"spopcalls", spopcalls})
end


Expand All @@ -222,12 +213,7 @@ redis.call("expire", ingested_byte_count_key, set_timeout)
redis.call("expire", set_key, set_timeout)

local ingested_count_end_time_ms = get_time_ms()
local ingested_count_step_latency_ms = 0
if spop_end_time_ms >= 0 then
ingested_count_step_latency_ms = ingested_count_end_time_ms - spop_end_time_ms
else
ingested_count_step_latency_ms = ingested_count_end_time_ms - sunionstore_args_end_time_ms
end
local ingested_count_step_latency_ms = ingested_count_end_time_ms - arg_cleanup_end_time_ms
table.insert(latency_table, {"ingested_count_step_latency_ms", ingested_count_step_latency_ms})

-- Capture end time and calculate latency in milliseconds
Expand Down
52 changes: 27 additions & 25 deletions tests/sentry/spans/test_buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,11 +309,15 @@ def test_observability_metrics_parent_span_already_oversized(
# Disable compression so payload size in Redis is predictable, then force a
# low max-segment-bytes threshold so the destination set is already too
# large before merge.
oversized_parent_payload = orjson.dumps({"span_id": "b" * 16, "blob": "x" * 2048})
spans = [
#
# Batch 1: Span A (large payload, child of B) and Span B (root) build an
# oversized segment keyed on B.
# Batch 2: Span C (child of A) arrives in a separate batch. Its redirect
# resolves to B's set, triggering a merge where dest_bytes > threshold.
oversized_payload = orjson.dumps({"span_id": "a" * 16, "blob": "x" * 2048})
spans: list[Span | _SplitBatch] = [
Span(
# payload=_payload("a" * 16),
payload=oversized_parent_payload,
payload=oversized_payload,
trace_id="a" * 32,
span_id="a" * 16,
parent_span_id="b" * 16,
Expand All @@ -322,8 +326,7 @@ def test_observability_metrics_parent_span_already_oversized(
end_timestamp=1700000000.0,
),
Span(
# payload=oversized_parent_payload,
payload=_payload("a" * 16),
payload=_payload("b" * 16),
trace_id="a" * 32,
span_id="b" * 16,
parent_span_id=None,
Expand All @@ -332,20 +335,27 @@ def test_observability_metrics_parent_span_already_oversized(
project_id=1,
end_timestamp=1700000000.0,
),
_SplitBatch(),
Span(
payload=_payload("c" * 16),
trace_id="a" * 32,
span_id="c" * 16,
parent_span_id="a" * 16,
segment_id=None,
project_id=1,
end_timestamp=1700000000.0,
),
]

with override_options(
{"spans.buffer.max-segment-bytes": 200, "spans.buffer.compression.level": -1}
):
with override_options({"spans.buffer.max-segment-bytes": 200}):
process_spans(spans, buffer, now=0)

emit_observability_metrics.assert_called_once()
args, _ = emit_observability_metrics.call_args
gauge_metrics = args[1]
assert emit_observability_metrics.call_count == 2

oversized_metric_values = [
value
for evalsha_metrics in gauge_metrics
for call in emit_observability_metrics.call_args_list
for evalsha_metrics in call[0][1]
for metric_name, value in evalsha_metrics
if metric_name == b"parent_span_set_already_oversized"
]
Expand Down Expand Up @@ -870,22 +880,14 @@ def test_max_segment_spans_limit(mock_project_model, buffer: SpansBuffer) -> Non
),
]

with override_options({"spans.buffer.max-segment-bytes": 200}):
with override_options({"spans.buffer.max-segment-bytes": 100}):
buffer.process_spans(batch1, now=0)
buffer.process_spans(batch2, now=0)
rv = buffer.flush_segments(now=11)

# The entire segment should be dropped because it exceeds max_segment_bytes.
segment = rv[_segment_id(1, "a" * 32, "a" * 16)]
retained_span_ids = {span.payload["span_id"] for span in segment.spans}

# Some spans should be evicted because the segment is too large.
all_span_ids = {"a" * 16, "b" * 16, "c" * 16, "d" * 16, "e" * 16}
assert len(retained_span_ids) < len(all_span_ids), "Some spans should have been evicted"
assert retained_span_ids.issubset(all_span_ids)

# NB: We currently accept that we leak redirect keys when we limit segments.
# buffer.done_flush_segments(rv)
# assert_clean(buffer.client)
assert segment.spans == []


@mock.patch("sentry.spans.buffer.Project")
Expand Down Expand Up @@ -977,7 +979,7 @@ def test_dropped_spans_emit_outcomes(
)

# Set a very small max-segment-bytes to force Redis to drop spans
with override_options({"spans.buffer.max-segment-bytes": 200}):
with override_options({"spans.buffer.max-segment-bytes": 100}):
buffer.process_spans(batch1, now=0)
buffer.process_spans(batch2, now=0)
buffer.flush_segments(now=11)
Expand Down
Loading