Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/sentry/spans/consumers/process/flusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,14 @@ def _wait_for_process_to_become_healthy(self, process_index: int):
if self.process_healthy_since[process_index].value != 0:
break

process = self.processes[process_index]
if not process.is_alive():
shards = self.process_to_shards_map[process_index]
exitcode = getattr(process, "exitcode", None)
raise RuntimeError(
f"process {process_index} (shards {shards}) exited during startup (exitcode={exitcode})"
)

if time.time() - start_time > max_unhealthy_seconds:
shards = self.process_to_shards_map[process_index]
raise RuntimeError(
Expand Down
35 changes: 31 additions & 4 deletions tests/sentry/spans/consumers/process/test_flusher.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,15 +165,14 @@ def test_multi_producer_sliced_integration_with_arroyo_local_producer() -> None:
manager.close()


def test_flusher_waits_for_processes_to_start() -> None:
def test_flusher_waits_for_exited_processes_during_startup() -> None:
"""Test that the flusher waits for all processes to become healthy during initialization."""
buffer = SpansBuffer(assigned_shards=[0])

# Patch SpanFlusher.main to never set healthy_since, simulating a process that fails to start
# exit without setting healthy_since, simulating a process that fails early
def never_healthy_main(
buffer, shards, stopped, current_drift, backpressure_since, healthy_since, produce_to_pipe
):
# Don't set healthy_since.value, simulating a process that never becomes healthy
return

with (
Expand All @@ -183,7 +182,35 @@ def never_healthy_main(
"spans.buffer.flusher.max-unhealthy-seconds": 0.5,
"spans.buffer.flusher.use-stuck-detector": False,
}
), # Should raise RuntimeError because the process never reports as healthy
),
pytest.raises(RuntimeError, match="process 0 \\(shards \\[0\\]\\) exited during startup"),
):
SpanFlusher(
buffer,
next_step=Noop(),
produce_to_pipe=lambda _: None,
)


def test_flusher_timeout_waiting_for_processes_startup() -> None:
"""Test that the flusher times out when a process stays alive but never becomes healthy."""
buffer = SpansBuffer(assigned_shards=[0])

# block without setting healthy_since, simulating a process that hangs during startup
def hang_main(
buffer, shards, stopped, current_drift, backpressure_since, healthy_since, produce_to_pipe
):
while not stopped.value:
sleep(0.05)

with (
mock.patch.object(SpanFlusher, "main", hang_main),
override_options(
{
"spans.buffer.flusher.max-unhealthy-seconds": 0.5,
"spans.buffer.flusher.use-stuck-detector": False,
}
),
pytest.raises(RuntimeError, match="process 0 \\(shards \\[0\\]\\) didn't start up"),
):
SpanFlusher(
Expand Down
Loading