diff --git a/backend/api_v2/notification.py b/backend/api_v2/notification.py index 733084c637..57e810c0e9 100644 --- a/backend/api_v2/notification.py +++ b/backend/api_v2/notification.py @@ -3,6 +3,7 @@ from notification_v2.helper import NotificationHelper from notification_v2.models import Notification from pipeline_v2.dto import PipelineStatusPayload +from workflow_manager.workflow_v2.enums import ExecutionStatus from workflow_manager.workflow_v2.models.execution import WorkflowExecution from api_v2.models import APIDeployment @@ -10,17 +11,42 @@ logger = logging.getLogger(__name__) +_FAILURE_STATUSES = {ExecutionStatus.ERROR.value, ExecutionStatus.STOPPED.value} + + class APINotification: def __init__(self, api: APIDeployment, workflow_execution: WorkflowExecution) -> None: self.notifications = Notification.objects.filter(api=api, is_active=True) self.api = api self.workflow_execution = workflow_execution - def send(self): - if not self.notifications.count(): - logger.info(f"No notifications found for api {self.api}") + def send(self) -> None: + # Failure if the run hit a non-success terminal state OR any file errored. + # Partial-success runs land as status=COMPLETED with failed_files>0, so the + # status check alone misses them — see callback aggregation rules. + failed_files = self.workflow_execution.failed_files or 0 + is_failure = ( + self.workflow_execution.status in _FAILURE_STATUSES or failed_files > 0 + ) + if not is_failure: + # Success path: skip rows that opted into failure-only alerts. + self.notifications = self.notifications.filter(notify_on_failures=False) + + if not self.notifications.exists(): + logger.info( + "No notifications to dispatch for api %s (status=%s, failed_files=%s)", + self.api, + self.workflow_execution.status, + failed_files, + ) return - logger.info(f"Sending api status notification for api {self.api}") + logger.info( + "Sending api status notification for api %s (status=%s, successful=%s, failed=%s)", + self.api, + self.workflow_execution.status, + self.workflow_execution.successful_files or 0, + failed_files, + ) payload_dto = PipelineStatusPayload( type="API", @@ -29,6 +55,9 @@ def send(self): status=self.workflow_execution.status, execution_id=self.workflow_execution.id, error_message=self.workflow_execution.error_message, + total_files=self.workflow_execution.total_files, + successful_files=self.workflow_execution.successful_files, + failed_files=failed_files, ) NotificationHelper.send_notification( diff --git a/backend/notification_v2/internal_api_views.py b/backend/notification_v2/internal_api_views.py index 6843d5a50c..3e3f386b17 100644 --- a/backend/notification_v2/internal_api_views.py +++ b/backend/notification_v2/internal_api_views.py @@ -10,14 +10,18 @@ """ import logging +from typing import Any, cast from api_v2.models import APIDeployment -from django.http import JsonResponse +from django.db.models import QuerySet +from django.http import HttpRequest, JsonResponse from django.shortcuts import get_object_or_404 from django.views.decorators.csrf import csrf_exempt from django.views.decorators.http import require_http_methods from pipeline_v2.models import Pipeline from utils.organization_utils import filter_queryset_by_organization +from workflow_manager.workflow_v2.enums import ExecutionStatus +from workflow_manager.workflow_v2.models.execution import WorkflowExecution from notification_v2.models import Notification @@ -26,10 +30,71 @@ # Constants for error messages INTERNAL_SERVER_ERROR_MSG = "Internal server error" +_FAILURE_STATUSES = {ExecutionStatus.ERROR.value, ExecutionStatus.STOPPED.value} + + +def _load_execution(execution_id: str | None) -> WorkflowExecution | None: + """Best-effort lookup; returns None on missing id or unknown row.""" + if not execution_id: + return None + try: + return cast(WorkflowExecution, WorkflowExecution.objects.get(id=execution_id)) + except WorkflowExecution.DoesNotExist: + logger.warning("WorkflowExecution %s not found", execution_id) + return None + + +def _apply_failure_filter( + notifications_qs: QuerySet[Notification], + execution: WorkflowExecution | None, +) -> QuerySet[Notification]: + """Drop notify_on_failures=True rows on success runs. + + Mirrors the dispatch-side rule in backend/api_v2/notification.py and + backend/pipeline_v2/notification.py so both code paths agree on what + counts as a failure (status ∈ {ERROR, STOPPED} OR any file errored). + + No execution → no filter, preserving legacy "return every active row" + behavior for callers that don't pass execution_id. + """ + if execution is None: + return notifications_qs + failed_files = execution.failed_files or 0 + is_failure = execution.status in _FAILURE_STATUSES or failed_files > 0 + if not is_failure: + notifications_qs = notifications_qs.filter(notify_on_failures=False) + return notifications_qs + + +def _execution_counts(execution: WorkflowExecution | None) -> dict[str, int]: + """File counts surfaced into webhook payloads. Empty dict on no execution.""" + if execution is None: + return {} + return { + "total_files": execution.total_files or 0, + "successful_files": execution.successful_files or 0, + "failed_files": execution.failed_files or 0, + } + + +def _serialize_notification(n: Notification) -> dict[str, Any]: + return { + "id": str(n.id), + "notification_type": n.notification_type, + "platform": n.platform, + "url": n.url, + "authorization_type": n.authorization_type, + "authorization_key": n.authorization_key, + "authorization_header": n.authorization_header, + "max_retries": n.max_retries, + "is_active": n.is_active, + "notify_on_failures": n.notify_on_failures, + } + @csrf_exempt # Safe: Internal API with Bearer token auth, service-to-service only @require_http_methods(["GET"]) -def get_pipeline_notifications(request, pipeline_id): +def get_pipeline_notifications(request: HttpRequest, pipeline_id: str) -> JsonResponse: """Get active notifications for a pipeline or API deployment. Used by callback worker to fetch notification configuration. @@ -41,83 +106,53 @@ def get_pipeline_notifications(request, pipeline_id): pipeline_queryset, request, "organization" ) + execution = _load_execution(request.GET.get("execution_id")) + counts = _execution_counts(execution) + if pipeline_queryset.exists(): pipeline = pipeline_queryset.first() - - # Get active notifications for this pipeline notifications = Notification.objects.filter(pipeline=pipeline, is_active=True) - - notifications_data = [] - for notification in notifications: - notifications_data.append( - { - "id": str(notification.id), - "notification_type": notification.notification_type, - "platform": notification.platform, - "url": notification.url, - "authorization_type": notification.authorization_type, - "authorization_key": notification.authorization_key, - "authorization_header": notification.authorization_header, - "max_retries": notification.max_retries, - "is_active": notification.is_active, - } - ) - + notifications = _apply_failure_filter(notifications, execution) + serialized = [_serialize_notification(n) for n in notifications] return JsonResponse( { "status": "success", "pipeline_id": str(pipeline.id), "pipeline_name": pipeline.pipeline_name, "pipeline_type": pipeline.pipeline_type, - "notifications": notifications_data, + "notifications": serialized, + "execution_counts": counts, } ) - else: - # If not found in Pipeline, try APIDeployment model - api_queryset = APIDeployment.objects.filter(id=pipeline_id) - api_queryset = filter_queryset_by_organization( - api_queryset, request, "organization" + + # If not found in Pipeline, try APIDeployment model + api_queryset = APIDeployment.objects.filter(id=pipeline_id) + api_queryset = filter_queryset_by_organization( + api_queryset, request, "organization" + ) + if api_queryset.exists(): + api = api_queryset.first() + notifications = Notification.objects.filter(api=api, is_active=True) + notifications = _apply_failure_filter(notifications, execution) + serialized = [_serialize_notification(n) for n in notifications] + return JsonResponse( + { + "status": "success", + "pipeline_id": str(api.id), + "pipeline_name": api.api_name, + "pipeline_type": "API", + "notifications": serialized, + "execution_counts": counts, + } ) - if api_queryset.exists(): - api = api_queryset.first() - - # Get active notifications for this API deployment - notifications = Notification.objects.filter(api=api, is_active=True) - - notifications_data = [] - for notification in notifications: - notifications_data.append( - { - "id": str(notification.id), - "notification_type": notification.notification_type, - "platform": notification.platform, - "url": notification.url, - "authorization_type": notification.authorization_type, - "authorization_key": notification.authorization_key, - "authorization_header": notification.authorization_header, - "max_retries": notification.max_retries, - "is_active": notification.is_active, - } - ) - - return JsonResponse( - { - "status": "success", - "pipeline_id": str(api.id), - "pipeline_name": api.api_name, - "pipeline_type": "API", - "notifications": notifications_data, - } - ) - else: - return JsonResponse( - { - "status": "error", - "message": "Pipeline or API deployment not found", - }, - status=404, - ) + return JsonResponse( + { + "status": "error", + "message": "Pipeline or API deployment not found", + }, + status=404, + ) except Exception as e: logger.error(f"Error getting pipeline notifications for {pipeline_id}: {e}") return JsonResponse( @@ -127,7 +162,7 @@ def get_pipeline_notifications(request, pipeline_id): @csrf_exempt # Safe: Internal API with Bearer token auth, service-to-service only @require_http_methods(["GET"]) -def get_api_notifications(request, api_id): +def get_api_notifications(request: HttpRequest, api_id: str) -> JsonResponse: """Get active notifications for an API deployment. Used by callback worker to fetch notification configuration. @@ -140,24 +175,9 @@ def get_api_notifications(request, api_id): ) api = get_object_or_404(api_queryset) - # Get active notifications for this API + execution = _load_execution(request.GET.get("execution_id")) notifications = Notification.objects.filter(api=api, is_active=True) - - notifications_data = [] - for notification in notifications: - notifications_data.append( - { - "id": str(notification.id), - "notification_type": notification.notification_type, - "platform": notification.platform, - "url": notification.url, - "authorization_type": notification.authorization_type, - "authorization_key": notification.authorization_key, - "authorization_header": notification.authorization_header, - "max_retries": notification.max_retries, - "is_active": notification.is_active, - } - ) + notifications = _apply_failure_filter(notifications, execution) return JsonResponse( { @@ -165,7 +185,8 @@ def get_api_notifications(request, api_id): "api_id": str(api.id), "api_name": api.api_name, "display_name": api.display_name, - "notifications": notifications_data, + "notifications": [_serialize_notification(n) for n in notifications], + "execution_counts": _execution_counts(execution), } ) @@ -182,7 +203,7 @@ def get_api_notifications(request, api_id): @csrf_exempt # Safe: Internal API with Bearer token auth, service-to-service only @require_http_methods(["GET"]) -def get_pipeline_data(request, pipeline_id): +def get_pipeline_data(request: HttpRequest, pipeline_id: str) -> JsonResponse: """Get basic pipeline data for notification purposes. Used by callback worker to determine pipeline type and name. @@ -218,7 +239,7 @@ def get_pipeline_data(request, pipeline_id): @csrf_exempt # Safe: Internal API with Bearer token auth, service-to-service only @require_http_methods(["GET"]) -def get_api_data(request, api_id): +def get_api_data(request: HttpRequest, api_id: str) -> JsonResponse: """Get basic API deployment data for notification purposes. Used by callback worker to determine API name and details. diff --git a/backend/notification_v2/migrations/0002_notification_notify_on_failures.py b/backend/notification_v2/migrations/0002_notification_notify_on_failures.py new file mode 100644 index 0000000000..ce0c3535b5 --- /dev/null +++ b/backend/notification_v2/migrations/0002_notification_notify_on_failures.py @@ -0,0 +1,23 @@ +from django.db import migrations, models + + +class Migration(migrations.Migration): + dependencies = [ + ("notification_v2", "0001_initial"), + ] + + operations = [ + migrations.AddField( + model_name="notification", + name="notify_on_failures", + field=models.BooleanField( + default=False, + db_comment=( + "When True, fire only on failed runs — terminal status " + "ERROR/STOPPED or any file in the run errored (partial " + "failure). When False (default), fire on every terminal " + "completion." + ), + ), + ), + ] diff --git a/backend/notification_v2/models.py b/backend/notification_v2/models.py index 489a8c827e..e5238ec176 100644 --- a/backend/notification_v2/models.py +++ b/backend/notification_v2/models.py @@ -47,6 +47,14 @@ class Notification(BaseModel): default=True, db_comment="Flag indicating whether the notification is active or not.", ) + notify_on_failures = models.BooleanField( + default=False, + db_comment=( + "When True, fire only on failed runs — terminal status ERROR/STOPPED " + "or any file in the run errored (partial failure). When False " + "(default), fire on every terminal completion." + ), + ) # Foreign keys to specific models pipeline = models.ForeignKey( Pipeline, diff --git a/backend/notification_v2/serializers.py b/backend/notification_v2/serializers.py index 115487c481..4cb4f3c4cb 100644 --- a/backend/notification_v2/serializers.py +++ b/backend/notification_v2/serializers.py @@ -12,6 +12,7 @@ class NotificationSerializer(serializers.ModelSerializer): max_retries = serializers.IntegerField( max_value=4, min_value=0, default=0, required=False ) + notify_on_failures = serializers.BooleanField(default=False, required=False) class Meta: model = Notification diff --git a/backend/notification_v2/tests/__init__.py b/backend/notification_v2/tests/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/backend/pipeline_v2/dto.py b/backend/pipeline_v2/dto.py index 5a87ba0825..d0f27e7943 100644 --- a/backend/pipeline_v2/dto.py +++ b/backend/pipeline_v2/dto.py @@ -10,6 +10,9 @@ def __init__( status: str, execution_id: str | None = None, error_message: str | None = None, + total_files: int | None = None, + successful_files: int | None = None, + failed_files: int | None = None, ): self.type = type self.pipeline_id = pipeline_id @@ -17,14 +20,26 @@ def __init__( self.status = status self.execution_id = execution_id self.error_message = error_message + self.total_files = total_files + self.successful_files = successful_files + self.failed_files = failed_files def to_dict(self) -> dict[str, Any]: - """Convert the payload DTO to a dictionary.""" - payload = { + """Convert the payload DTO to a dictionary. + + File counts are nested in `additional_data` to match the worker-path + payload shape (NotificationPayload.from_execution_status). + """ + payload: dict[str, Any] = { "type": self.type, "pipeline_id": str(self.pipeline_id), "pipeline_name": self.pipeline_name, "status": self.status, + "additional_data": { + "total_files": self.total_files or 0, + "successful_files": self.successful_files or 0, + "failed_files": self.failed_files or 0, + }, } if self.execution_id: payload["execution_id"] = str(self.execution_id) diff --git a/backend/pipeline_v2/notification.py b/backend/pipeline_v2/notification.py index dbfc0dea52..ec82145054 100644 --- a/backend/pipeline_v2/notification.py +++ b/backend/pipeline_v2/notification.py @@ -2,6 +2,8 @@ from notification_v2.helper import NotificationHelper from notification_v2.models import Notification +from workflow_manager.workflow_v2.enums import ExecutionStatus +from workflow_manager.workflow_v2.models.execution import WorkflowExecution from pipeline_v2.dto import PipelineStatusPayload from pipeline_v2.models import Pipeline @@ -9,6 +11,9 @@ logger = logging.getLogger(__name__) +_FAILURE_STATUSES = {ExecutionStatus.ERROR.value, ExecutionStatus.STOPPED.value} + + class PipelineNotification: def __init__( self, @@ -23,11 +28,57 @@ def __init__( self.error_message = error_message self.execution_id = execution_id - def send(self): - if not self.notifications.count(): - logger.info(f"No notifications found for pipeline {self.pipeline}") + def _load_execution(self) -> WorkflowExecution | None: + """Load the WorkflowExecution row for this dispatch, if available. + + Falls back to None when no execution_id was supplied (e.g. legacy + callers); callers must handle the None case. + """ + if not self.execution_id: + return None + try: + return WorkflowExecution.objects.get(id=self.execution_id) + except WorkflowExecution.DoesNotExist: + logger.warning( + "WorkflowExecution %s not found for pipeline notification", + self.execution_id, + ) + return None + + def send(self) -> None: + execution = self._load_execution() + # Source of truth for partial-failure detection is the per-run aggregate + # written by the worker callback. Pipeline.last_run_status is a coarse + # collapse (ERROR/STOPPED → FAILURE) that hides per-file errors when + # at least one file succeeded. + failed_files = (execution.failed_files or 0) if execution else 0 + execution_status = execution.status if execution else None + is_failure = ( + execution_status in _FAILURE_STATUSES + or failed_files > 0 + or self.pipeline.last_run_status == Pipeline.PipelineStatus.FAILURE + ) + if not is_failure: + self.notifications = self.notifications.filter(notify_on_failures=False) + + if not self.notifications.exists(): + logger.info( + "No notifications to dispatch for pipeline %s (status=%s, failed_files=%s)", + self.pipeline, + self.pipeline.last_run_status, + failed_files, + ) return - logger.info(f"Sending pipeline status notification for pipeline {self.pipeline}") + successful_files = (execution.successful_files or 0) if execution else 0 + total_files = execution.total_files if execution else None + logger.info( + "Sending pipeline status notification for pipeline %s " + "(status=%s, successful=%s, failed=%s)", + self.pipeline, + self.pipeline.last_run_status, + successful_files, + failed_files, + ) payload_dto = PipelineStatusPayload( type=self.pipeline.pipeline_type, pipeline_id=str(self.pipeline.id), @@ -35,6 +86,9 @@ def send(self): status=self.pipeline.last_run_status, execution_id=self.execution_id, error_message=self.error_message, + total_files=total_files, + successful_files=successful_files, + failed_files=failed_files, ) NotificationHelper.send_notification( diff --git a/backend/workflow_manager/internal_serializers.py b/backend/workflow_manager/internal_serializers.py index bed98c6853..cd221470cb 100644 --- a/backend/workflow_manager/internal_serializers.py +++ b/backend/workflow_manager/internal_serializers.py @@ -178,6 +178,8 @@ class WorkflowExecutionStatusUpdateSerializer(serializers.Serializer): total_files = serializers.IntegerField( required=False, min_value=0 ) # Allow 0 but backend will only update if > 0 + successful_files = serializers.IntegerField(required=False, min_value=0) + failed_files = serializers.IntegerField(required=False, min_value=0) attempts = serializers.IntegerField(required=False, min_value=0) execution_time = serializers.FloatField(required=False, min_value=0) diff --git a/backend/workflow_manager/internal_views.py b/backend/workflow_manager/internal_views.py index c822e5e7b5..52f100196a 100644 --- a/backend/workflow_manager/internal_views.py +++ b/backend/workflow_manager/internal_views.py @@ -513,10 +513,19 @@ def update_status(self, request, id=None): increment_attempt=increment_attempt, ) - # Update total_files separately (not handled by update_execution) + # Update total_files / per-file aggregates separately (not handled by update_execution) + update_fields: list[str] = [] if validated_data.get("total_files") is not None: execution.total_files = validated_data["total_files"] - execution.save() + update_fields.append("total_files") + if validated_data.get("successful_files") is not None: + execution.successful_files = validated_data["successful_files"] + update_fields.append("successful_files") + if validated_data.get("failed_files") is not None: + execution.failed_files = validated_data["failed_files"] + update_fields.append("failed_files") + if update_fields: + execution.save(update_fields=update_fields) logger.info( f"Updated workflow execution {id} status to {validated_data['status']}" diff --git a/backend/workflow_manager/workflow_v2/migrations/0020_workflowexecution_file_counts.py b/backend/workflow_manager/workflow_v2/migrations/0020_workflowexecution_file_counts.py new file mode 100644 index 0000000000..6dba25f5be --- /dev/null +++ b/backend/workflow_manager/workflow_v2/migrations/0020_workflowexecution_file_counts.py @@ -0,0 +1,36 @@ +from django.db import migrations, models + + +class Migration(migrations.Migration): + dependencies = [ + ("workflow_v2", "0019_remove_filehistory_trigram_index"), + ] + + operations = [ + migrations.AddField( + model_name="workflowexecution", + name="successful_files", + field=models.PositiveIntegerField( + blank=True, + null=True, + db_comment=( + "Per-run aggregate of files that completed successfully. " + "Written by the worker callback at terminal state. Null on " + "rows created before this column was added." + ), + ), + ), + migrations.AddField( + model_name="workflowexecution", + name="failed_files", + field=models.PositiveIntegerField( + blank=True, + null=True, + db_comment=( + "Per-run aggregate of files that errored. Written by the " + "worker callback at terminal state. Null on rows created " + "before this column was added." + ), + ), + ), + ] diff --git a/backend/workflow_manager/workflow_v2/models/execution.py b/backend/workflow_manager/workflow_v2/models/execution.py index 45886bd64e..4e3656b68d 100644 --- a/backend/workflow_manager/workflow_v2/models/execution.py +++ b/backend/workflow_manager/workflow_v2/models/execution.py @@ -16,7 +16,6 @@ from workflow_manager.execution.dto import ExecutionCache from workflow_manager.execution.execution_cache_utils import ExecutionCacheUtils -from workflow_manager.file_execution.models import WorkflowFileExecution from workflow_manager.workflow_v2.enums import ExecutionStatus from workflow_manager.workflow_v2.models import Workflow @@ -174,6 +173,24 @@ class Type(models.TextChoices): total_files = models.PositiveIntegerField( default=0, verbose_name="Total files", db_comment="Number of files to process" ) + successful_files = models.PositiveIntegerField( + null=True, + blank=True, + db_comment=( + "Per-run aggregate of files that completed successfully. Written by " + "the worker callback at terminal state. Null on rows created before " + "this column was added." + ), + ) + failed_files = models.PositiveIntegerField( + null=True, + blank=True, + db_comment=( + "Per-run aggregate of files that errored. Written by the worker " + "callback at terminal state. Null on rows created before this " + "column was added." + ), + ) error_message = models.CharField( max_length=EXECUTION_ERROR_LENGTH, blank=True, @@ -420,16 +437,8 @@ def get_last_run_statuses(cls, pipeline_id: uuid.UUID, limit: int = 5) -> list[d result = [] for e in executions: - # TODO: Optimize by storing successful/failed counts directly in - # WorkflowExecution model. Current approach causes N+1 queries - # (2 queries per execution). Denormalized counts would eliminate - # these queries entirely. - successful = WorkflowFileExecution.objects.filter( - workflow_execution_id=e.id, status="COMPLETED" - ).count() - failed = WorkflowFileExecution.objects.filter( - workflow_execution_id=e.id, status="ERROR" - ).count() + successful = e.successful_files or 0 + failed = e.failed_files or 0 # Compute display_status: PARTIAL_SUCCESS if completed with mixed results display_status = e.status diff --git a/frontend/src/components/pipelines-or-deployments/notification-modal/CreateNotification.jsx b/frontend/src/components/pipelines-or-deployments/notification-modal/CreateNotification.jsx index 9c207bd1a9..65460376d4 100644 --- a/frontend/src/components/pipelines-or-deployments/notification-modal/CreateNotification.jsx +++ b/frontend/src/components/pipelines-or-deployments/notification-modal/CreateNotification.jsx @@ -1,4 +1,4 @@ -import { Button, Form, Input, Select, Space } from "antd"; +import { Button, Checkbox, Form, Input, Select, Space } from "antd"; import PropTypes from "prop-types"; import { useEffect, useState } from "react"; import { getBackendErrorDetail } from "../../../helpers/GetStaticData"; @@ -12,6 +12,7 @@ const DEFAULT_FORM_DETAILS = { authorization_key: "", is_active: false, max_retries: 0, + notify_on_failures: false, pipeline: "", api: "", url: "", @@ -220,6 +221,13 @@ function CreateNotification({ ), )} + + Notify on failures only + diff --git a/unstract/core/src/unstract/core/data_models.py b/unstract/core/src/unstract/core/data_models.py index 7e8e984a04..7642aa9ce1 100644 --- a/unstract/core/src/unstract/core/data_models.py +++ b/unstract/core/src/unstract/core/data_models.py @@ -512,6 +512,8 @@ class NotificationPayload: # Metadata timestamp: datetime = field(default_factory=lambda: datetime.now(UTC)) + # Per-run file aggregates (total/successful/failed) are nested here + # so receivers see them grouped rather than as top-level keys. additional_data: dict[str, Any] = field(default_factory=dict) # Internal tracking (not sent to external webhooks) @@ -565,6 +567,9 @@ def from_execution_status( error_message: str | None = None, organization_id: str | None = None, additional_data: dict[str, Any] | None = None, + total_files: int = 0, + successful_files: int = 0, + failed_files: int = 0, ) -> "NotificationPayload": """Create notification payload from execution status. @@ -599,6 +604,17 @@ def from_execution_status( f"Cannot create notification for non-final status: {execution_status}" ) + # File counts are bundled inside additional_data so receivers see + # them grouped (e.g. Slack renders one "Additional Data" section). + # Caller-supplied additional_data takes precedence on key conflict. + merged_additional = { + "total_files": total_files, + "successful_files": successful_files, + "failed_files": failed_files, + } + if additional_data: + merged_additional.update(additional_data) + return cls( type=workflow_type, pipeline_id=pipeline_id, @@ -607,7 +623,7 @@ def from_execution_status( execution_id=execution_id, error_message=error_message, organization_id=organization_id, - additional_data=additional_data or {}, + additional_data=merged_additional, _source=source, ) diff --git a/workers/callback/tasks.py b/workers/callback/tasks.py index 42599f0659..eee5b97da3 100644 --- a/workers/callback/tasks.py +++ b/workers/callback/tasks.py @@ -381,12 +381,16 @@ def _update_execution_status_unified( try: # Consistent workflow execution status update across all callback types total_files = aggregated_results.get("total_files", 0) + successful_files = aggregated_results.get("successful_files", 0) + failed_files = aggregated_results.get("failed_files", 0) # Make the unified API call api_client.update_workflow_execution_status( execution_id=execution_id, status=final_status, total_files=total_files, + successful_files=successful_files, + failed_files=failed_files, organization_id=organization_id, error_message=error_message, ) diff --git a/workers/notification/providers/slack_webhook.py b/workers/notification/providers/slack_webhook.py index 89206646a5..04e3532fa3 100644 --- a/workers/notification/providers/slack_webhook.py +++ b/workers/notification/providers/slack_webhook.py @@ -199,11 +199,10 @@ def _format_value(self, value: Any) -> str: elif isinstance(value, (list, tuple)): return "\n• " + "\n• ".join(str(item) for item in value) elif isinstance(value, dict): - # Format nested dictionary - items = [] - for k, v in value.items(): - items.append(f" • {self._format_key(k)}: {v}") - return "\n" + "\n".join(items) + # Inline {Key: Value, Key: Value} so the receiver sees the + # whole dict on one line instead of a bulleted block. + items = [f"{self._format_key(k)}: {v}" for k, v in value.items()] + return "{" + ", ".join(items) + "}" elif value is None: return "_Not specified_" else: diff --git a/workers/scheduler/tasks.py b/workers/scheduler/tasks.py index 2e22946a58..65e5bd7af1 100644 --- a/workers/scheduler/tasks.py +++ b/workers/scheduler/tasks.py @@ -81,6 +81,7 @@ def _send_pipeline_status_notification( pipeline_id=pipeline_id, pipeline_name=pipeline_name, notification_payload=notification, + execution_id=execution_id, ) logger.info(f"Notification sent successfully for {pipeline_type} {pipeline_id}") except Exception as notification_error: diff --git a/workers/shared/api/internal_client.py b/workers/shared/api/internal_client.py index def90bd86a..4964a2b377 100644 --- a/workers/shared/api/internal_client.py +++ b/workers/shared/api/internal_client.py @@ -603,6 +603,8 @@ def update_workflow_execution_status( status: str, error_message: str | None = None, total_files: int | None = None, + successful_files: int | None = None, + failed_files: int | None = None, attempts: int | None = None, execution_time: float | None = None, organization_id: str | None = None, @@ -613,6 +615,8 @@ def update_workflow_execution_status( status, error_message, total_files, + successful_files, + failed_files, attempts, execution_time, organization_id, diff --git a/workers/shared/clients/execution_client.py b/workers/shared/clients/execution_client.py index 80e9ca6568..e1373eb9f5 100644 --- a/workers/shared/clients/execution_client.py +++ b/workers/shared/clients/execution_client.py @@ -265,6 +265,8 @@ def update_workflow_execution_status( status: str | TaskStatus, error_message: str | None = None, total_files: int | None = None, + successful_files: int | None = None, + failed_files: int | None = None, attempts: int | None = None, execution_time: float | None = None, organization_id: str | None = None, @@ -276,6 +278,8 @@ def update_workflow_execution_status( status: New status (TaskStatus enum or string) error_message: Optional error message total_files: Optional total files count + successful_files: Optional count of files that completed successfully + failed_files: Optional count of files that errored attempts: Optional attempts count execution_time: Optional execution time organization_id: Optional organization ID override @@ -292,6 +296,10 @@ def update_workflow_execution_status( data["error_message"] = error_message if total_files is not None: data["total_files"] = total_files + if successful_files is not None: + data["successful_files"] = successful_files + if failed_files is not None: + data["failed_files"] = failed_files if attempts is not None: data["attempts"] = attempts if execution_time is not None: diff --git a/workers/shared/patterns/notification/helper.py b/workers/shared/patterns/notification/helper.py index 977f5a875f..19f78fbb99 100644 --- a/workers/shared/patterns/notification/helper.py +++ b/workers/shared/patterns/notification/helper.py @@ -5,6 +5,7 @@ """ import logging +from typing import Any from celery import current_app @@ -104,6 +105,7 @@ def trigger_notification( pipeline_id: str, pipeline_name: str, notification_payload: NotificationPayload, + execution_id: str | None = None, ) -> None: """Trigger notifications for pipeline status updates. @@ -111,10 +113,13 @@ def trigger_notification( Uses API client to fetch notification configuration. """ try: - # Fetch pipeline notifications via API + # Pass execution_id so the backend filter respects notify_on_failures + # (see trigger_pipeline_notifications for the rationale). + params = {"execution_id": execution_id} if execution_id else None response_data = api_client._make_request( method="GET", endpoint=f"v1/webhook/pipeline/{pipeline_id}/notifications/", + params=params, timeout=10, ) @@ -176,10 +181,14 @@ def trigger_pipeline_notifications( return try: - # Fetch pipeline notifications via API + # Pass execution_id so the backend can drop notify_on_failures=True rows + # on success runs. Without it the endpoint is a no-op and we'd fire on + # every active row regardless of trigger preference. + params = {"execution_id": execution_id} if execution_id else None response_data = api_client._make_request( method="GET", endpoint=f"v1/webhook/pipeline/{pipeline_id}/notifications/", + params=params, timeout=10, ) @@ -204,7 +213,9 @@ def trigger_pipeline_notifications( else: workflow_type = WorkflowType.ETL # Default fallback - # Create notification payload using dataclass + # File counts come from WorkflowExecution via the same endpoint so + # webhook receivers (Slack, raw API) see partial-success breakdowns. + counts = response_data.get("execution_counts") or {} payload = NotificationPayload.from_execution_status( pipeline_id=pipeline_id, pipeline_name=pipeline_name, @@ -213,6 +224,9 @@ def trigger_pipeline_notifications( source=NotificationSource.CALLBACK_WORKER, execution_id=execution_id, error_message=error_message, + total_files=counts.get("total_files", 0), + successful_files=counts.get("successful_files", 0), + failed_files=counts.get("failed_files", 0), ) logger.info( @@ -261,9 +275,14 @@ def trigger_api_notifications( return try: - # Fetch API notifications via API + # See trigger_pipeline_notifications: execution_id powers the backend + # filter that respects notify_on_failures. + params = {"execution_id": execution_id} if execution_id else None response_data = api_client._make_request( - method="GET", endpoint=f"v1/webhook/api/{api_id}/notifications/", timeout=10 + method="GET", + endpoint=f"v1/webhook/api/{api_id}/notifications/", + params=params, + timeout=10, ) # _make_request already handles status codes and returns parsed data @@ -277,7 +296,7 @@ def trigger_api_notifications( logger.info(f"No active notifications found for API {api_id}") return - # Create notification payload using dataclass + counts = response_data.get("execution_counts") or {} payload = NotificationPayload.from_execution_status( pipeline_id=api_id, pipeline_name=api_name, @@ -286,6 +305,9 @@ def trigger_api_notifications( source=NotificationSource.CALLBACK_WORKER, execution_id=execution_id, error_message=error_message, + total_files=counts.get("total_files", 0), + successful_files=counts.get("successful_files", 0), + failed_files=counts.get("failed_files", 0), ) logger.info( @@ -314,7 +336,7 @@ def trigger_api_notifications( def handle_status_notifications( - api_client, + api_client: Any, pipeline_id: str, status: str, execution_id: str | None = None,