Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 34 additions & 5 deletions clients/python/src/taskbroker_client/worker/workerchild.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,31 @@ def status_name(status: TaskActivationStatus.ValueType) -> str:
return f"unknown-{status}"


def _log_task_failed(
activation: TaskActivation,
err: BaseException,
processing_pool_name: str,
) -> None:
"""
Emit a structured failure log via logger.exception. When the embedding
application has the Sentry SDK's LoggingIntegration enabled (default),
this also produces the Sentry event, so the same call covers both the
structured stdout log and the Sentry capture. Call this from inside an
existing isolation_scope block so fingerprints/tags are preserved.
"""
logger.exception(
"taskworker.task.failed",
extra={
"task_id": activation.id,
"taskname": activation.taskname,
"namespace": activation.namespace,
"processing_pool": processing_pool_name,
"exception_type": type(err).__name__,
"exception_message": str(err),
},
)


def child_process(
app_module: str,
child_tasks: queue.Queue[InflightTaskActivation],
Expand Down Expand Up @@ -249,7 +274,7 @@ def handle_alarm(signum: int, frame: FrameType | None) -> None:
inflight.activation.taskname,
]
scope.set_transaction_name(inflight.activation.taskname)
sentry_sdk.capture_exception(err)
_log_task_failed(inflight.activation, err, processing_pool_name)
metrics.incr(
"taskworker.worker.processing_deadline_exceeded",
tags={
Expand Down Expand Up @@ -292,16 +317,20 @@ def handle_alarm(signum: int, frame: FrameType | None) -> None:
]
scope.set_transaction_name(inflight.activation.taskname)
sentry_sdk.capture_exception(retry_error)
# In this branch, all exceptions should be either
# captured or silenced.
captured_error = True
# Also emit structured stdout log for retry-exhausted failures.
# Uses the original err, not the synthetic retry_error, so the log
# carries the actual exception that exhausted retries.
_log_task_failed(inflight.activation, err, processing_pool_name)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Retry-exhausted log not gated by silenced_exceptions

Medium Severity

On the retry-exhausted path, _log_task_failed is called unconditionally without checking should_capture_error. The PR description explicitly states this path is "gated by task_func.silenced_exceptions," but the code doesn't implement that gating. When a silenced exception exhausts retries, the original exception will now appear in structured logs (and potentially as a Sentry event via LoggingIntegration), defeating the purpose of silenced_exceptions on this path.

Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit 3782285. Configure here.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bugbot is right that the PR description is stale, but I think the suggested code change is wrong. Retry-exhausted reporting is intentionally unconditional per #627, so I’ll update the PR description instead of gating _log_task_failed.

# In this branch, all exceptions should be either
# captured or silenced.
captured_error = True

if (
should_capture_error
and not captured_error
and next_state != TASK_ACTIVATION_STATUS_RETRY
):
sentry_sdk.capture_exception(err)
_log_task_failed(inflight.activation, err, processing_pool_name)

clear_current_task()
processed_task_count += 1
Expand Down
125 changes: 112 additions & 13 deletions clients/python/tests/worker/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
TaskWorkerProcessingPool,
WorkerServicer,
)
from taskbroker_client.worker.workerchild import ProcessingDeadlineExceeded, child_process
from taskbroker_client.worker.workerchild import child_process

SIMPLE_TASK = InflightTaskActivation(
host="localhost:50051",
Expand Down Expand Up @@ -655,8 +655,11 @@ def test_child_process_retry_task() -> None:
assert result.status == TASK_ACTIVATION_STATUS_RETRY


@mock.patch("taskbroker_client.worker.workerchild.logger")
@mock.patch("taskbroker_client.worker.workerchild.sentry_sdk.capture_exception")
def test_child_process_retry_task_max_attempts(mock_capture: mock.Mock) -> None:
def test_child_process_retry_task_max_attempts(
mock_capture: mock.Mock, mock_logger: mock.Mock
) -> None:
# Create an activation that is on its final attempt and
# will raise an error again.
activation = InflightTaskActivation(
Expand Down Expand Up @@ -700,6 +703,18 @@ def test_child_process_retry_task_max_attempts(mock_capture: mock.Mock) -> None:
assert isinstance(capture_call[0], NoRetriesRemainingError)
assert isinstance(capture_call[0].__cause__, RuntimeError)

# Structured worker log fires for retry-exhausted failures, with the
# original exception (not the synthetic NoRetriesRemainingError).
failed_calls = [
c
for c in mock_logger.exception.call_args_list
if c.args and c.args[0] == "taskworker.task.failed"
]
assert len(failed_calls) == 1
extra = failed_calls[0].kwargs["extra"]
assert extra["exception_type"] == "RuntimeError"
assert extra["taskname"] == "examples.will_retry"


def test_child_process_failure_task() -> None:
todo: queue.Queue[InflightTaskActivation] = queue.Queue()
Expand Down Expand Up @@ -858,8 +873,8 @@ def test_child_process_pass_headers() -> None:
redis.delete("task-headers-value", "task-headers-count", "task-headers-custom")


@mock.patch("taskbroker_client.worker.workerchild.sentry_sdk.capture_exception")
def test_child_process_terminate_task(mock_capture: mock.Mock) -> None:
@mock.patch("taskbroker_client.worker.workerchild.logger")
def test_child_process_terminate_task(mock_logger: mock.Mock) -> None:
todo: queue.Queue[InflightTaskActivation] = queue.Queue()
processed: queue.Queue[ProcessingResult] = queue.Queue()
shutdown = Event()
Expand Down Expand Up @@ -891,8 +906,16 @@ def test_child_process_terminate_task(mock_capture: mock.Mock) -> None:
result = processed.get(block=False)
assert result.task_id == sleepy.activation.id
assert result.status == TASK_ACTIVATION_STATUS_FAILURE
assert mock_capture.call_count == 1
assert type(mock_capture.call_args.args[0]) is ProcessingDeadlineExceeded
assert mock_logger.exception.call_count == 1
args, kwargs = mock_logger.exception.call_args
assert args[0] == "taskworker.task.failed"
extra = kwargs["extra"]
assert extra["exception_type"] == "ProcessingDeadlineExceeded"
assert extra["taskname"] == "examples.timed"
assert extra["namespace"] == "examples"
assert extra["task_id"] == "111"
assert extra["processing_pool"] == "test"
assert "execution deadline" in extra["exception_message"]


@mock.patch("taskbroker_client.worker.workerchild.capture_checkin")
Expand Down Expand Up @@ -1023,8 +1046,8 @@ def on_execute(self, headers: dict[str, str]) -> contextlib.AbstractContextManag
app.context_hooks.remove(hook)


@mock.patch("taskbroker_client.worker.workerchild.sentry_sdk.capture_exception")
def test_child_process_silenced_timeout(mock_capture: mock.Mock) -> None:
@mock.patch("taskbroker_client.worker.workerchild.logger")
def test_child_process_silenced_timeout(mock_logger: mock.Mock) -> None:
todo: queue.Queue[InflightTaskActivation] = queue.Queue()
processed: queue.Queue[ProcessingResult] = queue.Queue()
shutdown = Event()
Expand All @@ -1044,7 +1067,12 @@ def test_child_process_silenced_timeout(mock_capture: mock.Mock) -> None:
result = processed.get()
assert result.task_id == RETRY_TASK_WITH_SILENCED_TIMEOUT.activation.id
assert result.status == TASK_ACTIVATION_STATUS_FAILURE
assert mock_capture.call_count == 0
failed_calls = [
c
for c in mock_logger.exception.call_args_list
if c.args and c.args[0] == "taskworker.task.failed"
]
assert failed_calls == []


@mock.patch("taskbroker_client.worker.workerchild.sentry_sdk.capture_exception")
Expand Down Expand Up @@ -1099,8 +1127,8 @@ def test_child_process_expected_ignored_exception_max_attempts(mock_capture: moc
assert mock_capture.call_count == 0


@mock.patch("taskbroker_client.worker.workerchild.sentry_sdk.capture_exception")
def test_child_process_retry_on_deadline_exceeded(mock_capture: mock.Mock) -> None:
@mock.patch("taskbroker_client.worker.workerchild.logger")
def test_child_process_retry_on_deadline_exceeded(mock_logger: mock.Mock) -> None:
todo: queue.Queue[InflightTaskActivation] = queue.Queue()
processed: queue.Queue[ProcessingResult] = queue.Queue()
shutdown = Event()
Expand All @@ -1122,5 +1150,76 @@ def test_child_process_retry_on_deadline_exceeded(mock_capture: mock.Mock) -> No
result = processed.get()
assert result.task_id == RETRY_TASK_ON_DEADLINE_EXCEEDED.activation.id
assert result.status == TASK_ACTIVATION_STATUS_RETRY
assert mock_capture.call_count == 1
assert type(mock_capture.call_args.args[0]) is ProcessingDeadlineExceeded
# The timeout was reported (report_timeout_errors=True) even though
# the task will retry. taskworker.task.failed fires once per attempt.
assert mock_logger.exception.call_count == 1
args, kwargs = mock_logger.exception.call_args
assert args[0] == "taskworker.task.failed"
assert kwargs["extra"]["exception_type"] == "ProcessingDeadlineExceeded"


@mock.patch("taskbroker_client.worker.workerchild.logger")
def test_child_process_general_exception_logs_task_failed(mock_logger: mock.Mock) -> None:
"""A non-retriable Exception emits taskworker.task.failed with all fields."""
todo: queue.Queue[InflightTaskActivation] = queue.Queue()
processed: queue.Queue[ProcessingResult] = queue.Queue()
shutdown = Event()

# examples.fail_task has no retry policy → raises ValueError once,
# task fails terminally on first attempt.
todo.put(FAIL_TASK)
child_process(
"examples.app:app",
todo,
processed,
shutdown,
max_task_count=1,
processing_pool_name="test",
process_type="fork",
)

result = processed.get()
assert result.status == TASK_ACTIVATION_STATUS_FAILURE
assert mock_logger.exception.call_count == 1
args, kwargs = mock_logger.exception.call_args
assert args[0] == "taskworker.task.failed"
extra = kwargs["extra"]
assert extra["task_id"] == "333"
assert extra["taskname"] == "examples.fail_task"
assert extra["namespace"] == "examples"
assert extra["processing_pool"] == "test"
assert extra["exception_type"] == "ValueError"
assert "exception_message" in extra


@mock.patch("taskbroker_client.worker.workerchild.logger")
def test_child_process_silenced_exception_does_not_log_task_failed(
mock_logger: mock.Mock,
) -> None:
"""When err is in silenced_exceptions, taskworker.task.failed is NOT logged.
Preserves the silencing semantics added in #608."""
todo: queue.Queue[InflightTaskActivation] = queue.Queue()
processed: queue.Queue[ProcessingResult] = queue.Queue()
shutdown = Event()

todo.put(RETRY_TASK_WITH_SILENCED_UNHANDLED_EXCEPTION)
child_process(
"examples.app:app",
todo,
processed,
shutdown,
max_task_count=1,
processing_pool_name="test",
process_type="fork",
)

result = processed.get()
assert result.status == TASK_ACTIVATION_STATUS_FAILURE
# Other log calls (e.g., taskworker.task.retry) may have fired, but
# taskworker.task.failed must not have.
failed_calls = [
c
for c in mock_logger.exception.call_args_list
if c.args and c.args[0] == "taskworker.task.failed"
]
assert failed_calls == []