From 20f363566ae0adce7d37bfe4129f31d9e84e8b32 Mon Sep 17 00:00:00 2001 From: Sergei Starostin Date: Thu, 14 May 2026 01:03:01 +0300 Subject: [PATCH 1/2] feat(worker): add taskworker.task.failed log for failed tasks --- .../taskbroker_client/worker/workerchild.py | 58 ++++-- clients/python/tests/worker/test_worker.py | 187 ++++++++++++++++-- 2 files changed, 217 insertions(+), 28 deletions(-) diff --git a/clients/python/src/taskbroker_client/worker/workerchild.py b/clients/python/src/taskbroker_client/worker/workerchild.py index c5be6f1c..9b07b18c 100644 --- a/clients/python/src/taskbroker_client/worker/workerchild.py +++ b/clients/python/src/taskbroker_client/worker/workerchild.py @@ -95,6 +95,31 @@ def status_name(status: TaskActivationStatus.ValueType) -> str: return f"unknown-{status}" +def _log_task_failed( + activation: TaskActivation, + err: BaseException, + processing_pool_name: str, +) -> None: + """ + Emit a structured failure log via logger.exception. When the embedding + application has the Sentry SDK's LoggingIntegration enabled (default), + this also produces the Sentry event, so the same call covers both the + structured stdout log and the Sentry capture. Call this from inside an + existing isolation_scope block so fingerprints/tags are preserved. + """ + logger.exception( + "taskworker.task.failed", + extra={ + "task_id": activation.id, + "taskname": activation.taskname, + "namespace": activation.namespace, + "processing_pool": processing_pool_name, + "exception_type": type(err).__name__, + "exception_message": str(err), + }, + ) + + def child_process( app_module: str, child_tasks: queue.Queue[InflightTaskActivation], @@ -249,7 +274,7 @@ def handle_alarm(signum: int, frame: FrameType | None) -> None: inflight.activation.taskname, ] scope.set_transaction_name(inflight.activation.taskname) - sentry_sdk.capture_exception(err) + _log_task_failed(inflight.activation, err, processing_pool_name) metrics.incr( "taskworker.worker.processing_deadline_exceeded", tags={ @@ -281,19 +306,22 @@ def handle_alarm(signum: int, frame: FrameType | None) -> None: next_state = TASK_ACTIVATION_STATUS_RETRY elif retry.max_attempts_reached(inflight.activation.retry_state): with sentry_sdk.isolation_scope() as scope: - retry_error = NoRetriesRemainingError( - f"{inflight.activation.taskname} has consumed all of its retries" - ) - retry_error.__cause__ = err - scope.fingerprint = [ - "taskworker.no_retries_remaining", - inflight.activation.namespace, - inflight.activation.taskname, - ] - scope.set_transaction_name(inflight.activation.taskname) - sentry_sdk.capture_exception(retry_error) - # In this branch, all exceptions should be either - # captured or silenced. + if should_capture_error: + retry_error = NoRetriesRemainingError( + f"{inflight.activation.taskname} has consumed all of its retries" + ) + retry_error.__cause__ = err + scope.fingerprint = [ + "taskworker.no_retries_remaining", + inflight.activation.namespace, + inflight.activation.taskname, + ] + scope.set_transaction_name(inflight.activation.taskname) + sentry_sdk.capture_exception(retry_error) + # Also emit structured stdout log for retry-exhausted failures. + # Uses the original err, not the synthetic retry_error, so the log + # carries the actual exception that exhausted retries. + _log_task_failed(inflight.activation, err, processing_pool_name) captured_error = True if ( @@ -301,7 +329,7 @@ def handle_alarm(signum: int, frame: FrameType | None) -> None: and not captured_error and next_state != TASK_ACTIVATION_STATUS_RETRY ): - sentry_sdk.capture_exception(err) + _log_task_failed(inflight.activation, err, processing_pool_name) clear_current_task() processed_task_count += 1 diff --git a/clients/python/tests/worker/test_worker.py b/clients/python/tests/worker/test_worker.py index f74ba490..751c38b9 100644 --- a/clients/python/tests/worker/test_worker.py +++ b/clients/python/tests/worker/test_worker.py @@ -36,7 +36,7 @@ TaskWorkerProcessingPool, WorkerServicer, ) -from taskbroker_client.worker.workerchild import ProcessingDeadlineExceeded, child_process +from taskbroker_client.worker.workerchild import child_process SIMPLE_TASK = InflightTaskActivation( host="localhost:50051", @@ -655,8 +655,11 @@ def test_child_process_retry_task() -> None: assert result.status == TASK_ACTIVATION_STATUS_RETRY +@mock.patch("taskbroker_client.worker.workerchild.logger") @mock.patch("taskbroker_client.worker.workerchild.sentry_sdk.capture_exception") -def test_child_process_retry_task_max_attempts(mock_capture: mock.Mock) -> None: +def test_child_process_retry_task_max_attempts( + mock_capture: mock.Mock, mock_logger: mock.Mock +) -> None: # Create an activation that is on its final attempt and # will raise an error again. activation = InflightTaskActivation( @@ -700,6 +703,18 @@ def test_child_process_retry_task_max_attempts(mock_capture: mock.Mock) -> None: assert isinstance(capture_call[0], NoRetriesRemainingError) assert isinstance(capture_call[0].__cause__, RuntimeError) + # Structured worker log fires for retry-exhausted failures, with the + # original exception (not the synthetic NoRetriesRemainingError). + failed_calls = [ + c + for c in mock_logger.exception.call_args_list + if c.args and c.args[0] == "taskworker.task.failed" + ] + assert len(failed_calls) == 1 + extra = failed_calls[0].kwargs["extra"] + assert extra["exception_type"] == "RuntimeError" + assert extra["taskname"] == "examples.will_retry" + def test_child_process_failure_task() -> None: todo: queue.Queue[InflightTaskActivation] = queue.Queue() @@ -858,8 +873,8 @@ def test_child_process_pass_headers() -> None: redis.delete("task-headers-value", "task-headers-count", "task-headers-custom") -@mock.patch("taskbroker_client.worker.workerchild.sentry_sdk.capture_exception") -def test_child_process_terminate_task(mock_capture: mock.Mock) -> None: +@mock.patch("taskbroker_client.worker.workerchild.logger") +def test_child_process_terminate_task(mock_logger: mock.Mock) -> None: todo: queue.Queue[InflightTaskActivation] = queue.Queue() processed: queue.Queue[ProcessingResult] = queue.Queue() shutdown = Event() @@ -891,8 +906,16 @@ def test_child_process_terminate_task(mock_capture: mock.Mock) -> None: result = processed.get(block=False) assert result.task_id == sleepy.activation.id assert result.status == TASK_ACTIVATION_STATUS_FAILURE - assert mock_capture.call_count == 1 - assert type(mock_capture.call_args.args[0]) is ProcessingDeadlineExceeded + assert mock_logger.exception.call_count == 1 + args, kwargs = mock_logger.exception.call_args + assert args[0] == "taskworker.task.failed" + extra = kwargs["extra"] + assert extra["exception_type"] == "ProcessingDeadlineExceeded" + assert extra["taskname"] == "examples.timed" + assert extra["namespace"] == "examples" + assert extra["task_id"] == "111" + assert extra["processing_pool"] == "test" + assert "execution deadline" in extra["exception_message"] @mock.patch("taskbroker_client.worker.workerchild.capture_checkin") @@ -1023,8 +1046,8 @@ def on_execute(self, headers: dict[str, str]) -> contextlib.AbstractContextManag app.context_hooks.remove(hook) -@mock.patch("taskbroker_client.worker.workerchild.sentry_sdk.capture_exception") -def test_child_process_silenced_timeout(mock_capture: mock.Mock) -> None: +@mock.patch("taskbroker_client.worker.workerchild.logger") +def test_child_process_silenced_timeout(mock_logger: mock.Mock) -> None: todo: queue.Queue[InflightTaskActivation] = queue.Queue() processed: queue.Queue[ProcessingResult] = queue.Queue() shutdown = Event() @@ -1044,7 +1067,12 @@ def test_child_process_silenced_timeout(mock_capture: mock.Mock) -> None: result = processed.get() assert result.task_id == RETRY_TASK_WITH_SILENCED_TIMEOUT.activation.id assert result.status == TASK_ACTIVATION_STATUS_FAILURE - assert mock_capture.call_count == 0 + failed_calls = [ + c + for c in mock_logger.exception.call_args_list + if c.args and c.args[0] == "taskworker.task.failed" + ] + assert failed_calls == [] @mock.patch("taskbroker_client.worker.workerchild.sentry_sdk.capture_exception") @@ -1099,8 +1127,8 @@ def test_child_process_expected_ignored_exception_max_attempts(mock_capture: moc assert mock_capture.call_count == 0 -@mock.patch("taskbroker_client.worker.workerchild.sentry_sdk.capture_exception") -def test_child_process_retry_on_deadline_exceeded(mock_capture: mock.Mock) -> None: +@mock.patch("taskbroker_client.worker.workerchild.logger") +def test_child_process_retry_on_deadline_exceeded(mock_logger: mock.Mock) -> None: todo: queue.Queue[InflightTaskActivation] = queue.Queue() processed: queue.Queue[ProcessingResult] = queue.Queue() shutdown = Event() @@ -1122,5 +1150,138 @@ def test_child_process_retry_on_deadline_exceeded(mock_capture: mock.Mock) -> No result = processed.get() assert result.task_id == RETRY_TASK_ON_DEADLINE_EXCEEDED.activation.id assert result.status == TASK_ACTIVATION_STATUS_RETRY - assert mock_capture.call_count == 1 - assert type(mock_capture.call_args.args[0]) is ProcessingDeadlineExceeded + # The timeout was reported (report_timeout_errors=True) even though + # the task will retry. taskworker.task.failed fires once per attempt. + assert mock_logger.exception.call_count == 1 + args, kwargs = mock_logger.exception.call_args + assert args[0] == "taskworker.task.failed" + assert kwargs["extra"]["exception_type"] == "ProcessingDeadlineExceeded" + + +@mock.patch("taskbroker_client.worker.workerchild.logger") +def test_child_process_general_exception_logs_task_failed(mock_logger: mock.Mock) -> None: + """A non-retriable Exception emits taskworker.task.failed with all fields.""" + todo: queue.Queue[InflightTaskActivation] = queue.Queue() + processed: queue.Queue[ProcessingResult] = queue.Queue() + shutdown = Event() + + # examples.fail_task has no retry policy → raises ValueError once, + # task fails terminally on first attempt. + todo.put(FAIL_TASK) + child_process( + "examples.app:app", + todo, + processed, + shutdown, + max_task_count=1, + processing_pool_name="test", + process_type="fork", + ) + + result = processed.get() + assert result.status == TASK_ACTIVATION_STATUS_FAILURE + assert mock_logger.exception.call_count == 1 + args, kwargs = mock_logger.exception.call_args + assert args[0] == "taskworker.task.failed" + extra = kwargs["extra"] + assert extra["task_id"] == "333" + assert extra["taskname"] == "examples.fail_task" + assert extra["namespace"] == "examples" + assert extra["processing_pool"] == "test" + assert extra["exception_type"] == "ValueError" + assert "exception_message" in extra + + +@mock.patch("taskbroker_client.worker.workerchild.logger") +def test_child_process_silenced_exception_does_not_log_task_failed( + mock_logger: mock.Mock, +) -> None: + """When err is in silenced_exceptions, taskworker.task.failed is NOT logged. + Preserves the silencing semantics added in #608.""" + todo: queue.Queue[InflightTaskActivation] = queue.Queue() + processed: queue.Queue[ProcessingResult] = queue.Queue() + shutdown = Event() + + todo.put(RETRY_TASK_WITH_SILENCED_UNHANDLED_EXCEPTION) + child_process( + "examples.app:app", + todo, + processed, + shutdown, + max_task_count=1, + processing_pool_name="test", + process_type="fork", + ) + + result = processed.get() + assert result.status == TASK_ACTIVATION_STATUS_FAILURE + # Other log calls (e.g., taskworker.task.retry) may have fired, but + # taskworker.task.failed must not have. + failed_calls = [ + c + for c in mock_logger.exception.call_args_list + if c.args and c.args[0] == "taskworker.task.failed" + ] + assert failed_calls == [] + + +@mock.patch("taskbroker_client.worker.workerchild.sentry_sdk.capture_exception") +@mock.patch("taskbroker_client.worker.workerchild.logger") +def test_child_process_silenced_retry_exhausted_does_not_log_task_failed( + mock_logger: mock.Mock, + mock_capture: mock.Mock, +) -> None: + """ + When err is silenced AND retries are exhausted, neither the synthetic + NoRetriesRemainingError capture nor the structured taskworker.task.failed + log fires. The silenced_exceptions opt-out covers both surfaces on the + retry-exhausted path. + """ + activation = InflightTaskActivation( + host="localhost:50051", + receive_timestamp=0, + activation=TaskActivation( + id="silenced-exhausted", + taskname="examples.will_fail_with_silenced_ignored_exception", + namespace="examples", + parameters_bytes=msgpack.packb({"args": [], "kwargs": {}}, use_bin_type=True), + processing_deadline_duration=2, + retry_state=RetryState( + # On the final attempt: should_retry() returns False because + # no retries remain, max_attempts_reached() returns True. + attempts=1, + max_attempts=2, + on_attempts_exceeded=ON_ATTEMPTS_EXCEEDED_DISCARD, + ), + ), + ) + + todo: queue.Queue[InflightTaskActivation] = queue.Queue() + processed: queue.Queue[ProcessingResult] = queue.Queue() + shutdown = Event() + + todo.put(activation) + child_process( + "examples.app:app", + todo, + processed, + shutdown, + max_task_count=1, + processing_pool_name="test", + process_type="fork", + ) + + result = processed.get() + assert result.task_id == activation.activation.id + assert result.status == TASK_ACTIVATION_STATUS_FAILURE + + # Silenced: no synthetic NoRetriesRemainingError Sentry capture. + assert mock_capture.call_count == 0 + + # Silenced: no structured worker log either. + failed_calls = [ + c + for c in mock_logger.exception.call_args_list + if c.args and c.args[0] == "taskworker.task.failed" + ] + assert failed_calls == [] From 378228587f67fd844c18b486dce4ee0358633479 Mon Sep 17 00:00:00 2001 From: Sergei Starostin Date: Thu, 14 May 2026 23:17:41 +0300 Subject: [PATCH 2/2] fix(worker): restore unconditional NoRetriesRemaining --- .../taskbroker_client/worker/workerchild.py | 35 ++++++----- clients/python/tests/worker/test_worker.py | 62 ------------------- 2 files changed, 18 insertions(+), 79 deletions(-) diff --git a/clients/python/src/taskbroker_client/worker/workerchild.py b/clients/python/src/taskbroker_client/worker/workerchild.py index 9b07b18c..2dffdf18 100644 --- a/clients/python/src/taskbroker_client/worker/workerchild.py +++ b/clients/python/src/taskbroker_client/worker/workerchild.py @@ -306,23 +306,24 @@ def handle_alarm(signum: int, frame: FrameType | None) -> None: next_state = TASK_ACTIVATION_STATUS_RETRY elif retry.max_attempts_reached(inflight.activation.retry_state): with sentry_sdk.isolation_scope() as scope: - if should_capture_error: - retry_error = NoRetriesRemainingError( - f"{inflight.activation.taskname} has consumed all of its retries" - ) - retry_error.__cause__ = err - scope.fingerprint = [ - "taskworker.no_retries_remaining", - inflight.activation.namespace, - inflight.activation.taskname, - ] - scope.set_transaction_name(inflight.activation.taskname) - sentry_sdk.capture_exception(retry_error) - # Also emit structured stdout log for retry-exhausted failures. - # Uses the original err, not the synthetic retry_error, so the log - # carries the actual exception that exhausted retries. - _log_task_failed(inflight.activation, err, processing_pool_name) - captured_error = True + retry_error = NoRetriesRemainingError( + f"{inflight.activation.taskname} has consumed all of its retries" + ) + retry_error.__cause__ = err + scope.fingerprint = [ + "taskworker.no_retries_remaining", + inflight.activation.namespace, + inflight.activation.taskname, + ] + scope.set_transaction_name(inflight.activation.taskname) + sentry_sdk.capture_exception(retry_error) + # Also emit structured stdout log for retry-exhausted failures. + # Uses the original err, not the synthetic retry_error, so the log + # carries the actual exception that exhausted retries. + _log_task_failed(inflight.activation, err, processing_pool_name) + # In this branch, all exceptions should be either + # captured or silenced. + captured_error = True if ( should_capture_error diff --git a/clients/python/tests/worker/test_worker.py b/clients/python/tests/worker/test_worker.py index 751c38b9..989694fc 100644 --- a/clients/python/tests/worker/test_worker.py +++ b/clients/python/tests/worker/test_worker.py @@ -1223,65 +1223,3 @@ def test_child_process_silenced_exception_does_not_log_task_failed( if c.args and c.args[0] == "taskworker.task.failed" ] assert failed_calls == [] - - -@mock.patch("taskbroker_client.worker.workerchild.sentry_sdk.capture_exception") -@mock.patch("taskbroker_client.worker.workerchild.logger") -def test_child_process_silenced_retry_exhausted_does_not_log_task_failed( - mock_logger: mock.Mock, - mock_capture: mock.Mock, -) -> None: - """ - When err is silenced AND retries are exhausted, neither the synthetic - NoRetriesRemainingError capture nor the structured taskworker.task.failed - log fires. The silenced_exceptions opt-out covers both surfaces on the - retry-exhausted path. - """ - activation = InflightTaskActivation( - host="localhost:50051", - receive_timestamp=0, - activation=TaskActivation( - id="silenced-exhausted", - taskname="examples.will_fail_with_silenced_ignored_exception", - namespace="examples", - parameters_bytes=msgpack.packb({"args": [], "kwargs": {}}, use_bin_type=True), - processing_deadline_duration=2, - retry_state=RetryState( - # On the final attempt: should_retry() returns False because - # no retries remain, max_attempts_reached() returns True. - attempts=1, - max_attempts=2, - on_attempts_exceeded=ON_ATTEMPTS_EXCEEDED_DISCARD, - ), - ), - ) - - todo: queue.Queue[InflightTaskActivation] = queue.Queue() - processed: queue.Queue[ProcessingResult] = queue.Queue() - shutdown = Event() - - todo.put(activation) - child_process( - "examples.app:app", - todo, - processed, - shutdown, - max_task_count=1, - processing_pool_name="test", - process_type="fork", - ) - - result = processed.get() - assert result.task_id == activation.activation.id - assert result.status == TASK_ACTIVATION_STATUS_FAILURE - - # Silenced: no synthetic NoRetriesRemainingError Sentry capture. - assert mock_capture.call_count == 0 - - # Silenced: no structured worker log either. - failed_calls = [ - c - for c in mock_logger.exception.call_args_list - if c.args and c.args[0] == "taskworker.task.failed" - ] - assert failed_calls == []