diff --git a/clients/python/src/taskbroker_client/worker/workerchild.py b/clients/python/src/taskbroker_client/worker/workerchild.py index c5be6f1c..2dffdf18 100644 --- a/clients/python/src/taskbroker_client/worker/workerchild.py +++ b/clients/python/src/taskbroker_client/worker/workerchild.py @@ -95,6 +95,31 @@ def status_name(status: TaskActivationStatus.ValueType) -> str: return f"unknown-{status}" +def _log_task_failed( + activation: TaskActivation, + err: BaseException, + processing_pool_name: str, +) -> None: + """ + Emit a structured failure log via logger.exception. When the embedding + application has the Sentry SDK's LoggingIntegration enabled (default), + this also produces the Sentry event, so the same call covers both the + structured stdout log and the Sentry capture. Call this from inside an + existing isolation_scope block so fingerprints/tags are preserved. + """ + logger.exception( + "taskworker.task.failed", + extra={ + "task_id": activation.id, + "taskname": activation.taskname, + "namespace": activation.namespace, + "processing_pool": processing_pool_name, + "exception_type": type(err).__name__, + "exception_message": str(err), + }, + ) + + def child_process( app_module: str, child_tasks: queue.Queue[InflightTaskActivation], @@ -249,7 +274,7 @@ def handle_alarm(signum: int, frame: FrameType | None) -> None: inflight.activation.taskname, ] scope.set_transaction_name(inflight.activation.taskname) - sentry_sdk.capture_exception(err) + _log_task_failed(inflight.activation, err, processing_pool_name) metrics.incr( "taskworker.worker.processing_deadline_exceeded", tags={ @@ -292,16 +317,20 @@ def handle_alarm(signum: int, frame: FrameType | None) -> None: ] scope.set_transaction_name(inflight.activation.taskname) sentry_sdk.capture_exception(retry_error) - # In this branch, all exceptions should be either - # captured or silenced. - captured_error = True + # Also emit structured stdout log for retry-exhausted failures. + # Uses the original err, not the synthetic retry_error, so the log + # carries the actual exception that exhausted retries. + _log_task_failed(inflight.activation, err, processing_pool_name) + # In this branch, all exceptions should be either + # captured or silenced. + captured_error = True if ( should_capture_error and not captured_error and next_state != TASK_ACTIVATION_STATUS_RETRY ): - sentry_sdk.capture_exception(err) + _log_task_failed(inflight.activation, err, processing_pool_name) clear_current_task() processed_task_count += 1 diff --git a/clients/python/tests/worker/test_worker.py b/clients/python/tests/worker/test_worker.py index f74ba490..989694fc 100644 --- a/clients/python/tests/worker/test_worker.py +++ b/clients/python/tests/worker/test_worker.py @@ -36,7 +36,7 @@ TaskWorkerProcessingPool, WorkerServicer, ) -from taskbroker_client.worker.workerchild import ProcessingDeadlineExceeded, child_process +from taskbroker_client.worker.workerchild import child_process SIMPLE_TASK = InflightTaskActivation( host="localhost:50051", @@ -655,8 +655,11 @@ def test_child_process_retry_task() -> None: assert result.status == TASK_ACTIVATION_STATUS_RETRY +@mock.patch("taskbroker_client.worker.workerchild.logger") @mock.patch("taskbroker_client.worker.workerchild.sentry_sdk.capture_exception") -def test_child_process_retry_task_max_attempts(mock_capture: mock.Mock) -> None: +def test_child_process_retry_task_max_attempts( + mock_capture: mock.Mock, mock_logger: mock.Mock +) -> None: # Create an activation that is on its final attempt and # will raise an error again. activation = InflightTaskActivation( @@ -700,6 +703,18 @@ def test_child_process_retry_task_max_attempts(mock_capture: mock.Mock) -> None: assert isinstance(capture_call[0], NoRetriesRemainingError) assert isinstance(capture_call[0].__cause__, RuntimeError) + # Structured worker log fires for retry-exhausted failures, with the + # original exception (not the synthetic NoRetriesRemainingError). + failed_calls = [ + c + for c in mock_logger.exception.call_args_list + if c.args and c.args[0] == "taskworker.task.failed" + ] + assert len(failed_calls) == 1 + extra = failed_calls[0].kwargs["extra"] + assert extra["exception_type"] == "RuntimeError" + assert extra["taskname"] == "examples.will_retry" + def test_child_process_failure_task() -> None: todo: queue.Queue[InflightTaskActivation] = queue.Queue() @@ -858,8 +873,8 @@ def test_child_process_pass_headers() -> None: redis.delete("task-headers-value", "task-headers-count", "task-headers-custom") -@mock.patch("taskbroker_client.worker.workerchild.sentry_sdk.capture_exception") -def test_child_process_terminate_task(mock_capture: mock.Mock) -> None: +@mock.patch("taskbroker_client.worker.workerchild.logger") +def test_child_process_terminate_task(mock_logger: mock.Mock) -> None: todo: queue.Queue[InflightTaskActivation] = queue.Queue() processed: queue.Queue[ProcessingResult] = queue.Queue() shutdown = Event() @@ -891,8 +906,16 @@ def test_child_process_terminate_task(mock_capture: mock.Mock) -> None: result = processed.get(block=False) assert result.task_id == sleepy.activation.id assert result.status == TASK_ACTIVATION_STATUS_FAILURE - assert mock_capture.call_count == 1 - assert type(mock_capture.call_args.args[0]) is ProcessingDeadlineExceeded + assert mock_logger.exception.call_count == 1 + args, kwargs = mock_logger.exception.call_args + assert args[0] == "taskworker.task.failed" + extra = kwargs["extra"] + assert extra["exception_type"] == "ProcessingDeadlineExceeded" + assert extra["taskname"] == "examples.timed" + assert extra["namespace"] == "examples" + assert extra["task_id"] == "111" + assert extra["processing_pool"] == "test" + assert "execution deadline" in extra["exception_message"] @mock.patch("taskbroker_client.worker.workerchild.capture_checkin") @@ -1023,8 +1046,8 @@ def on_execute(self, headers: dict[str, str]) -> contextlib.AbstractContextManag app.context_hooks.remove(hook) -@mock.patch("taskbroker_client.worker.workerchild.sentry_sdk.capture_exception") -def test_child_process_silenced_timeout(mock_capture: mock.Mock) -> None: +@mock.patch("taskbroker_client.worker.workerchild.logger") +def test_child_process_silenced_timeout(mock_logger: mock.Mock) -> None: todo: queue.Queue[InflightTaskActivation] = queue.Queue() processed: queue.Queue[ProcessingResult] = queue.Queue() shutdown = Event() @@ -1044,7 +1067,12 @@ def test_child_process_silenced_timeout(mock_capture: mock.Mock) -> None: result = processed.get() assert result.task_id == RETRY_TASK_WITH_SILENCED_TIMEOUT.activation.id assert result.status == TASK_ACTIVATION_STATUS_FAILURE - assert mock_capture.call_count == 0 + failed_calls = [ + c + for c in mock_logger.exception.call_args_list + if c.args and c.args[0] == "taskworker.task.failed" + ] + assert failed_calls == [] @mock.patch("taskbroker_client.worker.workerchild.sentry_sdk.capture_exception") @@ -1099,8 +1127,8 @@ def test_child_process_expected_ignored_exception_max_attempts(mock_capture: moc assert mock_capture.call_count == 0 -@mock.patch("taskbroker_client.worker.workerchild.sentry_sdk.capture_exception") -def test_child_process_retry_on_deadline_exceeded(mock_capture: mock.Mock) -> None: +@mock.patch("taskbroker_client.worker.workerchild.logger") +def test_child_process_retry_on_deadline_exceeded(mock_logger: mock.Mock) -> None: todo: queue.Queue[InflightTaskActivation] = queue.Queue() processed: queue.Queue[ProcessingResult] = queue.Queue() shutdown = Event() @@ -1122,5 +1150,76 @@ def test_child_process_retry_on_deadline_exceeded(mock_capture: mock.Mock) -> No result = processed.get() assert result.task_id == RETRY_TASK_ON_DEADLINE_EXCEEDED.activation.id assert result.status == TASK_ACTIVATION_STATUS_RETRY - assert mock_capture.call_count == 1 - assert type(mock_capture.call_args.args[0]) is ProcessingDeadlineExceeded + # The timeout was reported (report_timeout_errors=True) even though + # the task will retry. taskworker.task.failed fires once per attempt. + assert mock_logger.exception.call_count == 1 + args, kwargs = mock_logger.exception.call_args + assert args[0] == "taskworker.task.failed" + assert kwargs["extra"]["exception_type"] == "ProcessingDeadlineExceeded" + + +@mock.patch("taskbroker_client.worker.workerchild.logger") +def test_child_process_general_exception_logs_task_failed(mock_logger: mock.Mock) -> None: + """A non-retriable Exception emits taskworker.task.failed with all fields.""" + todo: queue.Queue[InflightTaskActivation] = queue.Queue() + processed: queue.Queue[ProcessingResult] = queue.Queue() + shutdown = Event() + + # examples.fail_task has no retry policy → raises ValueError once, + # task fails terminally on first attempt. + todo.put(FAIL_TASK) + child_process( + "examples.app:app", + todo, + processed, + shutdown, + max_task_count=1, + processing_pool_name="test", + process_type="fork", + ) + + result = processed.get() + assert result.status == TASK_ACTIVATION_STATUS_FAILURE + assert mock_logger.exception.call_count == 1 + args, kwargs = mock_logger.exception.call_args + assert args[0] == "taskworker.task.failed" + extra = kwargs["extra"] + assert extra["task_id"] == "333" + assert extra["taskname"] == "examples.fail_task" + assert extra["namespace"] == "examples" + assert extra["processing_pool"] == "test" + assert extra["exception_type"] == "ValueError" + assert "exception_message" in extra + + +@mock.patch("taskbroker_client.worker.workerchild.logger") +def test_child_process_silenced_exception_does_not_log_task_failed( + mock_logger: mock.Mock, +) -> None: + """When err is in silenced_exceptions, taskworker.task.failed is NOT logged. + Preserves the silencing semantics added in #608.""" + todo: queue.Queue[InflightTaskActivation] = queue.Queue() + processed: queue.Queue[ProcessingResult] = queue.Queue() + shutdown = Event() + + todo.put(RETRY_TASK_WITH_SILENCED_UNHANDLED_EXCEPTION) + child_process( + "examples.app:app", + todo, + processed, + shutdown, + max_task_count=1, + processing_pool_name="test", + process_type="fork", + ) + + result = processed.get() + assert result.status == TASK_ACTIVATION_STATUS_FAILURE + # Other log calls (e.g., taskworker.task.retry) may have fired, but + # taskworker.task.failed must not have. + failed_calls = [ + c + for c in mock_logger.exception.call_args_list + if c.args and c.args[0] == "taskworker.task.failed" + ] + assert failed_calls == []