From 60d48162da9e6736ceedd7e038f08fe72c021f18 Mon Sep 17 00:00:00 2001 From: kirtimanmishrazipstack Date: Fri, 24 Apr 2026 16:25:56 +0530 Subject: [PATCH 1/7] api deployment notification init --- backend/api_v2/notification.py | 31 +- backend/notification_v2/enums.py | 12 + .../migrations/0002_notification_notify_on.py | 30 ++ backend/notification_v2/models.py | 13 +- backend/notification_v2/serializers.py | 7 +- backend/notification_v2/tests/__init__.py | 0 .../tests/test_notification_filter.py | 321 ++++++++++++++++++ backend/pipeline_v2/notification.py | 31 +- .../notification-modal/CreateNotification.jsx | 13 + 9 files changed, 448 insertions(+), 10 deletions(-) create mode 100644 backend/notification_v2/migrations/0002_notification_notify_on.py create mode 100644 backend/notification_v2/tests/__init__.py create mode 100644 backend/notification_v2/tests/test_notification_filter.py diff --git a/backend/api_v2/notification.py b/backend/api_v2/notification.py index 733084c637..a41ebc3000 100644 --- a/backend/api_v2/notification.py +++ b/backend/api_v2/notification.py @@ -1,8 +1,10 @@ import logging +from notification_v2.enums import NotificationTrigger from notification_v2.helper import NotificationHelper from notification_v2.models import Notification from pipeline_v2.dto import PipelineStatusPayload +from workflow_manager.workflow_v2.enums import ExecutionStatus from workflow_manager.workflow_v2.models.execution import WorkflowExecution from api_v2.models import APIDeployment @@ -16,11 +18,32 @@ def __init__(self, api: APIDeployment, workflow_execution: WorkflowExecution) -> self.api = api self.workflow_execution = workflow_execution - def send(self): - if not self.notifications.count(): - logger.info(f"No notifications found for api {self.api}") + def send(self) -> None: + # Partition notifications by the run outcome so each row's notify_on + # preference is honored. STOPPED and any other non-terminal status + # fire only for ALL — explicit opt-ins to FAILURES/SUCCESS shouldn't. + status = self.workflow_execution.status + if status == ExecutionStatus.ERROR.value: + self.notifications = self.notifications.exclude( + notify_on=NotificationTrigger.SUCCESS_ONLY.value + ) + elif status == ExecutionStatus.COMPLETED.value: + self.notifications = self.notifications.exclude( + notify_on=NotificationTrigger.FAILURES_ONLY.value + ) + else: + self.notifications = self.notifications.filter( + notify_on=NotificationTrigger.ALL.value + ) + + if not self.notifications.exists(): + logger.info( + "No notifications to dispatch for api %s (status=%s)", + self.api, + status, + ) return - logger.info(f"Sending api status notification for api {self.api}") + logger.info("Sending api status notification for api %s", self.api) payload_dto = PipelineStatusPayload( type="API", diff --git a/backend/notification_v2/enums.py b/backend/notification_v2/enums.py index 991b08cac9..516b34074b 100644 --- a/backend/notification_v2/enums.py +++ b/backend/notification_v2/enums.py @@ -36,3 +36,15 @@ class PlatformType(Enum): @classmethod def choices(cls): return [(e.value, e.name.replace("_", " ").capitalize()) for e in cls] + + +class NotificationTrigger(Enum): + """Controls which run outcomes fire a notification.""" + + ALL = "ALL" + FAILURES_ONLY = "FAILURES_ONLY" + SUCCESS_ONLY = "SUCCESS_ONLY" + + @classmethod + def choices(cls): + return [(e.value, e.name.replace("_", " ").capitalize()) for e in cls] diff --git a/backend/notification_v2/migrations/0002_notification_notify_on.py b/backend/notification_v2/migrations/0002_notification_notify_on.py new file mode 100644 index 0000000000..53c1180126 --- /dev/null +++ b/backend/notification_v2/migrations/0002_notification_notify_on.py @@ -0,0 +1,30 @@ +from django.db import migrations, models + + +class Migration(migrations.Migration): + dependencies = [ + ("notification_v2", "0001_initial"), + ] + + operations = [ + migrations.AddField( + model_name="notification", + name="notify_on", + field=models.CharField( + max_length=50, + choices=[ + ("ALL", "All"), + ("FAILURES_ONLY", "Failures only"), + ("SUCCESS_ONLY", "Success only"), + ], + default="ALL", + db_comment=( + "Controls which run outcomes trigger this notification. ALL " + "(default) preserves the historical 'notify on every " + "completion' behavior; FAILURES_ONLY fires only on failed " + "runs (ERROR for API deployments, FAILURE for ETL " + "pipelines); SUCCESS_ONLY fires only on successful runs." + ), + ), + ), + ] diff --git a/backend/notification_v2/models.py b/backend/notification_v2/models.py index 489a8c827e..4fe21cd6c4 100644 --- a/backend/notification_v2/models.py +++ b/backend/notification_v2/models.py @@ -5,7 +5,7 @@ from pipeline_v2.models import Pipeline from utils.models.base_model import BaseModel -from .enums import AuthorizationType, NotificationType, PlatformType +from .enums import AuthorizationType, NotificationTrigger, NotificationType, PlatformType NOTIFICATION_NAME_MAX_LENGTH = 255 @@ -47,6 +47,17 @@ class Notification(BaseModel): default=True, db_comment="Flag indicating whether the notification is active or not.", ) + notify_on = models.CharField( + max_length=50, + choices=NotificationTrigger.choices(), + default=NotificationTrigger.ALL.value, + db_comment=( + "Controls which run outcomes trigger this notification. ALL (default) " + "preserves the historical 'notify on every completion' behavior; " + "FAILURES_ONLY fires only on failed runs (ERROR for API deployments, " + "FAILURE for ETL pipelines); SUCCESS_ONLY fires only on successful runs." + ), + ) # Foreign keys to specific models pipeline = models.ForeignKey( Pipeline, diff --git a/backend/notification_v2/serializers.py b/backend/notification_v2/serializers.py index 115487c481..784ec75413 100644 --- a/backend/notification_v2/serializers.py +++ b/backend/notification_v2/serializers.py @@ -1,7 +1,7 @@ from rest_framework import serializers from utils.input_sanitizer import validate_name_field -from .enums import AuthorizationType, NotificationType, PlatformType +from .enums import AuthorizationType, NotificationTrigger, NotificationType, PlatformType from .models import Notification @@ -12,6 +12,11 @@ class NotificationSerializer(serializers.ModelSerializer): max_retries = serializers.IntegerField( max_value=4, min_value=0, default=0, required=False ) + notify_on = serializers.ChoiceField( + choices=NotificationTrigger.choices(), + default=NotificationTrigger.ALL.value, + required=False, + ) class Meta: model = Notification diff --git a/backend/notification_v2/tests/__init__.py b/backend/notification_v2/tests/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/backend/notification_v2/tests/test_notification_filter.py b/backend/notification_v2/tests/test_notification_filter.py new file mode 100644 index 0000000000..7f887be500 --- /dev/null +++ b/backend/notification_v2/tests/test_notification_filter.py @@ -0,0 +1,321 @@ +"""Unit tests for the ``notify_on`` dispatch partition on notifications. + +Covers both dispatch paths that fan out to ``NotificationHelper.send_notification``: + +* ``APINotification.send`` — keyed on ``ExecutionStatus`` (ERROR, COMPLETED, STOPPED) +* ``PipelineNotification.send`` — keyed on ``Pipeline.PipelineStatus`` + (FAILURE, SUCCESS, INPROGRESS) + +Follows the repo convention (see ``usage_v2/tests/test_helper.py``) of stubbing +Django-heavy modules at import time so the tests run without a live DB. +""" + +from __future__ import annotations + +import sys +import types +from unittest.mock import MagicMock, patch + + +# --------------------------------------------------------------------------- +# Module-level stubs — must be installed BEFORE importing the modules under +# test so Django's ORM imports resolve to our MagicMock-backed fakes. +# --------------------------------------------------------------------------- + + +def _ensure_mod(name: str) -> types.ModuleType: + """Force-install a fresh stub module in ``sys.modules``.""" + mod = types.ModuleType(name) + sys.modules[name] = mod + return mod + + +def _install_stubs() -> None: + # Only stub leaf modules that pull in Django ORM. Parent packages + # (api_v2, pipeline_v2, notification_v2, workflow_manager*) load normally. + + exec_enums = _ensure_mod("workflow_manager.workflow_v2.enums") + + class _ExecStatusNS: + class ERROR: + value = "ERROR" + + class COMPLETED: + value = "COMPLETED" + + class STOPPED: + value = "STOPPED" + + exec_enums.ExecutionStatus = _ExecStatusNS # type: ignore[attr-defined] + + exec_models = _ensure_mod("workflow_manager.workflow_v2.models.execution") + exec_models.WorkflowExecution = MagicMock(name="WorkflowExecution") # type: ignore[attr-defined] + + api_models = _ensure_mod("api_v2.models") + api_models.APIDeployment = MagicMock(name="APIDeployment") # type: ignore[attr-defined] + + # notification_v2.models.Notification with a patchable ``objects``. + notif_models = _ensure_mod("notification_v2.models") + + class _FakeNotification: + objects = MagicMock(name="Notification.objects") + + notif_models.Notification = _FakeNotification # type: ignore[attr-defined] + + # notification_v2.helper.NotificationHelper + notif_helper = _ensure_mod("notification_v2.helper") + + class _FakeHelper: + send_notification = MagicMock(name="NotificationHelper.send_notification") + + notif_helper.NotificationHelper = _FakeHelper # type: ignore[attr-defined] + + # pipeline_v2.dto.PipelineStatusPayload + pipeline_dto = _ensure_mod("pipeline_v2.dto") + pipeline_dto.PipelineStatusPayload = MagicMock(name="PipelineStatusPayload") # type: ignore[attr-defined] + + # pipeline_v2.models.Pipeline with a PipelineStatus text-choices surface. + pipeline_models = _ensure_mod("pipeline_v2.models") + + class _PipelineStatus: + SUCCESS = "SUCCESS" + FAILURE = "FAILURE" + INPROGRESS = "INPROGRESS" + + class _FakePipeline: + PipelineStatus = _PipelineStatus + + pipeline_models.Pipeline = _FakePipeline # type: ignore[attr-defined] + + +_install_stubs() + + +# Now safe to import the modules under test. +from api_v2 import notification as api_notification_mod # noqa: E402 +from notification_v2.enums import NotificationTrigger # noqa: E402 +from notification_v2.helper import NotificationHelper # noqa: E402 +from notification_v2.models import Notification # noqa: E402 +from pipeline_v2 import notification as pipeline_notification_mod # noqa: E402 +from pipeline_v2.models import Pipeline # noqa: E402 + + +# --------------------------------------------------------------------------- +# Test helpers +# --------------------------------------------------------------------------- + + +def _make_queryset(notifications: list[MagicMock]) -> MagicMock: + """Return a MagicMock that mimics the chained QuerySet surface we use. + + Supports: + qs.filter(notify_on=) -> qs with matching rows + qs.exclude(notify_on=) -> qs with non-matching rows + qs.exists() -> bool based on contents + iter(qs) -> notifications + """ + qs = MagicMock(name="qs") + qs.__iter__ = lambda self: iter(notifications) + + def _filter(**kwargs): + if "notify_on" in kwargs: + target = kwargs["notify_on"] + kept = [n for n in notifications if n.notify_on == target] + return _make_queryset(kept) + return _make_queryset(notifications) + + def _exclude(**kwargs): + if "notify_on" in kwargs: + target = kwargs["notify_on"] + kept = [n for n in notifications if n.notify_on != target] + return _make_queryset(kept) + return _make_queryset(notifications) + + qs.filter.side_effect = _filter + qs.exclude.side_effect = _exclude + qs.exists.return_value = bool(notifications) + qs.count.return_value = len(notifications) + return qs + + +def _make_notification(*, notify_on: str) -> MagicMock: + n = MagicMock(name="Notification") + n.notify_on = notify_on + return n + + +# --------------------------------------------------------------------------- +# APINotification — 3 modes × 3 statuses +# --------------------------------------------------------------------------- + + +class TestAPINotificationFilter: + def _setup(self, *, status: str, notifications: list[MagicMock]): + Notification.objects.filter.reset_mock() + Notification.objects.filter.side_effect = None + Notification.objects.filter.return_value = _make_queryset(notifications) + NotificationHelper.send_notification.reset_mock() + + api = MagicMock(name="APIDeployment") + api.api_name = "test-api" + api.id = "api-uuid" + + execution = MagicMock(name="WorkflowExecution") + execution.status = status + execution.id = "exec-uuid" + execution.error_message = "boom" if status == "ERROR" else None + + return api_notification_mod.APINotification(api=api, workflow_execution=execution) + + # --- ALL: fires on every status --- + def test_all_fires_on_completed(self): + n = _make_notification(notify_on=NotificationTrigger.ALL.value) + self._setup(status="COMPLETED", notifications=[n]).send() + assert NotificationHelper.send_notification.call_count == 1 + + def test_all_fires_on_error(self): + n = _make_notification(notify_on=NotificationTrigger.ALL.value) + self._setup(status="ERROR", notifications=[n]).send() + assert NotificationHelper.send_notification.call_count == 1 + + def test_all_fires_on_stopped(self): + n = _make_notification(notify_on=NotificationTrigger.ALL.value) + self._setup(status="STOPPED", notifications=[n]).send() + assert NotificationHelper.send_notification.call_count == 1 + + # --- FAILURES_ONLY: fires on ERROR only --- + def test_failures_only_suppressed_on_completed(self): + n = _make_notification(notify_on=NotificationTrigger.FAILURES_ONLY.value) + self._setup(status="COMPLETED", notifications=[n]).send() + NotificationHelper.send_notification.assert_not_called() + + def test_failures_only_fires_on_error(self): + n = _make_notification(notify_on=NotificationTrigger.FAILURES_ONLY.value) + self._setup(status="ERROR", notifications=[n]).send() + assert NotificationHelper.send_notification.call_count == 1 + + def test_failures_only_suppressed_on_stopped(self): + n = _make_notification(notify_on=NotificationTrigger.FAILURES_ONLY.value) + self._setup(status="STOPPED", notifications=[n]).send() + NotificationHelper.send_notification.assert_not_called() + + # --- SUCCESS_ONLY: fires on COMPLETED only --- + def test_success_only_fires_on_completed(self): + n = _make_notification(notify_on=NotificationTrigger.SUCCESS_ONLY.value) + self._setup(status="COMPLETED", notifications=[n]).send() + assert NotificationHelper.send_notification.call_count == 1 + + def test_success_only_suppressed_on_error(self): + n = _make_notification(notify_on=NotificationTrigger.SUCCESS_ONLY.value) + self._setup(status="ERROR", notifications=[n]).send() + NotificationHelper.send_notification.assert_not_called() + + def test_success_only_suppressed_on_stopped(self): + n = _make_notification(notify_on=NotificationTrigger.SUCCESS_ONLY.value) + self._setup(status="STOPPED", notifications=[n]).send() + NotificationHelper.send_notification.assert_not_called() + + # --- Mixed partition on a COMPLETED run: ALL + SUCCESS_ONLY fire, FAILURES_ONLY doesn't --- + def test_mixed_partition_on_completed(self): + all_mode = _make_notification(notify_on=NotificationTrigger.ALL.value) + failures_only = _make_notification(notify_on=NotificationTrigger.FAILURES_ONLY.value) + success_only = _make_notification(notify_on=NotificationTrigger.SUCCESS_ONLY.value) + notifier = self._setup( + status="COMPLETED", notifications=[all_mode, failures_only, success_only] + ) + with patch.object(api_notification_mod, "PipelineStatusPayload") as payload_cls: + payload_cls.return_value.to_dict.return_value = {} + notifier.send() + + assert NotificationHelper.send_notification.call_count == 1 + kwargs = NotificationHelper.send_notification.call_args.kwargs + dispatched = sorted(n.notify_on for n in kwargs["notifications"]) + assert dispatched == ["ALL", "SUCCESS_ONLY"] + + +# --------------------------------------------------------------------------- +# PipelineNotification — 3 modes × 3 statuses +# --------------------------------------------------------------------------- + + +class TestPipelineNotificationFilter: + def _setup(self, *, last_run_status: str, notifications: list[MagicMock]): + Notification.objects.filter.reset_mock() + Notification.objects.filter.side_effect = None + Notification.objects.filter.return_value = _make_queryset(notifications) + NotificationHelper.send_notification.reset_mock() + + pipeline = MagicMock(name="Pipeline") + pipeline.id = "pipeline-uuid" + pipeline.pipeline_name = "test-pipeline" + pipeline.pipeline_type = "ETL" + pipeline.last_run_status = last_run_status + + return pipeline_notification_mod.PipelineNotification( + pipeline=pipeline, execution_id="exec-uuid", error_message=None + ) + + # --- ALL --- + def test_all_fires_on_success(self): + n = _make_notification(notify_on=NotificationTrigger.ALL.value) + self._setup( + last_run_status=Pipeline.PipelineStatus.SUCCESS, notifications=[n] + ).send() + assert NotificationHelper.send_notification.call_count == 1 + + def test_all_fires_on_failure(self): + n = _make_notification(notify_on=NotificationTrigger.ALL.value) + self._setup( + last_run_status=Pipeline.PipelineStatus.FAILURE, notifications=[n] + ).send() + assert NotificationHelper.send_notification.call_count == 1 + + # --- FAILURES_ONLY --- + def test_failures_only_suppressed_on_success(self): + n = _make_notification(notify_on=NotificationTrigger.FAILURES_ONLY.value) + self._setup( + last_run_status=Pipeline.PipelineStatus.SUCCESS, notifications=[n] + ).send() + NotificationHelper.send_notification.assert_not_called() + + def test_failures_only_fires_on_failure(self): + n = _make_notification(notify_on=NotificationTrigger.FAILURES_ONLY.value) + self._setup( + last_run_status=Pipeline.PipelineStatus.FAILURE, notifications=[n] + ).send() + assert NotificationHelper.send_notification.call_count == 1 + + # --- SUCCESS_ONLY --- + def test_success_only_fires_on_success(self): + n = _make_notification(notify_on=NotificationTrigger.SUCCESS_ONLY.value) + self._setup( + last_run_status=Pipeline.PipelineStatus.SUCCESS, notifications=[n] + ).send() + assert NotificationHelper.send_notification.call_count == 1 + + def test_success_only_suppressed_on_failure(self): + n = _make_notification(notify_on=NotificationTrigger.SUCCESS_ONLY.value) + self._setup( + last_run_status=Pipeline.PipelineStatus.FAILURE, notifications=[n] + ).send() + NotificationHelper.send_notification.assert_not_called() + + # --- Mixed partition on a SUCCESS run --- + def test_mixed_partition_on_success(self): + all_mode = _make_notification(notify_on=NotificationTrigger.ALL.value) + failures_only = _make_notification(notify_on=NotificationTrigger.FAILURES_ONLY.value) + success_only = _make_notification(notify_on=NotificationTrigger.SUCCESS_ONLY.value) + notifier = self._setup( + last_run_status=Pipeline.PipelineStatus.SUCCESS, + notifications=[all_mode, failures_only, success_only], + ) + with patch.object( + pipeline_notification_mod, "PipelineStatusPayload" + ) as payload_cls: + payload_cls.return_value.to_dict.return_value = {} + notifier.send() + + assert NotificationHelper.send_notification.call_count == 1 + kwargs = NotificationHelper.send_notification.call_args.kwargs + dispatched = sorted(n.notify_on for n in kwargs["notifications"]) + assert dispatched == ["ALL", "SUCCESS_ONLY"] diff --git a/backend/pipeline_v2/notification.py b/backend/pipeline_v2/notification.py index dbfc0dea52..64df6f1126 100644 --- a/backend/pipeline_v2/notification.py +++ b/backend/pipeline_v2/notification.py @@ -1,5 +1,6 @@ import logging +from notification_v2.enums import NotificationTrigger from notification_v2.helper import NotificationHelper from notification_v2.models import Notification @@ -23,11 +24,33 @@ def __init__( self.error_message = error_message self.execution_id = execution_id - def send(self): - if not self.notifications.count(): - logger.info(f"No notifications found for pipeline {self.pipeline}") + def send(self) -> None: + # Partition notifications by the run outcome so each row's notify_on + # preference is honored. PipelineUtils.update_pipeline_status collapses + # both ERROR and STOPPED execution statuses into PipelineStatus.FAILURE, + # so FAILURES_ONLY subscribers get alerts for both on the pipeline side. + status = self.pipeline.last_run_status + if status == Pipeline.PipelineStatus.FAILURE: + self.notifications = self.notifications.exclude( + notify_on=NotificationTrigger.SUCCESS_ONLY.value + ) + elif status == Pipeline.PipelineStatus.SUCCESS: + self.notifications = self.notifications.exclude( + notify_on=NotificationTrigger.FAILURES_ONLY.value + ) + else: + self.notifications = self.notifications.filter( + notify_on=NotificationTrigger.ALL.value + ) + + if not self.notifications.exists(): + logger.info( + "No notifications to dispatch for pipeline %s (status=%s)", + self.pipeline, + status, + ) return - logger.info(f"Sending pipeline status notification for pipeline {self.pipeline}") + logger.info("Sending pipeline status notification for pipeline %s", self.pipeline) payload_dto = PipelineStatusPayload( type=self.pipeline.pipeline_type, pipeline_id=str(self.pipeline.id), diff --git a/frontend/src/components/pipelines-or-deployments/notification-modal/CreateNotification.jsx b/frontend/src/components/pipelines-or-deployments/notification-modal/CreateNotification.jsx index 9c207bd1a9..9302f9f1dc 100644 --- a/frontend/src/components/pipelines-or-deployments/notification-modal/CreateNotification.jsx +++ b/frontend/src/components/pipelines-or-deployments/notification-modal/CreateNotification.jsx @@ -12,6 +12,7 @@ const DEFAULT_FORM_DETAILS = { authorization_key: "", is_active: false, max_retries: 0, + notify_on: "ALL", pipeline: "", api: "", url: "", @@ -54,6 +55,12 @@ const AUTHORIZATION_TYPES = [ }, ]; +const NOTIFY_ON_OPTIONS = [ + { value: "ALL", label: "On every completion" }, + { value: "FAILURES_ONLY", label: "On failures only" }, + { value: "SUCCESS_ONLY", label: "On success only" }, +]; + function CreateNotification({ setIsForm, type, @@ -192,6 +199,12 @@ function CreateNotification({ tooltip: "Specify the maximum number of times the notification should be retried if it fails.", }, + { + label: "Notify on", + name: "notify_on", + component: , - tooltip: "Choose which run outcomes should trigger this webhook.", - }, ]; return ( @@ -233,6 +221,13 @@ function CreateNotification({ ), )} + + Notify on failures only + diff --git a/unstract/core/src/unstract/core/data_models.py b/unstract/core/src/unstract/core/data_models.py index 7e8e984a04..82da16a72e 100644 --- a/unstract/core/src/unstract/core/data_models.py +++ b/unstract/core/src/unstract/core/data_models.py @@ -510,6 +510,12 @@ class NotificationPayload: error_message: str | None = None organization_id: str | None = None + # Per-run file aggregates surfaced into webhook payloads. + # Default 0 lets receivers switch on a numeric value without None-checks. + total_files: int = 0 + successful_files: int = 0 + failed_files: int = 0 + # Metadata timestamp: datetime = field(default_factory=lambda: datetime.now(UTC)) additional_data: dict[str, Any] = field(default_factory=dict) @@ -565,6 +571,9 @@ def from_execution_status( error_message: str | None = None, organization_id: str | None = None, additional_data: dict[str, Any] | None = None, + total_files: int = 0, + successful_files: int = 0, + failed_files: int = 0, ) -> "NotificationPayload": """Create notification payload from execution status. @@ -607,6 +616,9 @@ def from_execution_status( execution_id=execution_id, error_message=error_message, organization_id=organization_id, + total_files=total_files, + successful_files=successful_files, + failed_files=failed_files, additional_data=additional_data or {}, _source=source, ) diff --git a/workers/callback/tasks.py b/workers/callback/tasks.py index 42599f0659..eee5b97da3 100644 --- a/workers/callback/tasks.py +++ b/workers/callback/tasks.py @@ -381,12 +381,16 @@ def _update_execution_status_unified( try: # Consistent workflow execution status update across all callback types total_files = aggregated_results.get("total_files", 0) + successful_files = aggregated_results.get("successful_files", 0) + failed_files = aggregated_results.get("failed_files", 0) # Make the unified API call api_client.update_workflow_execution_status( execution_id=execution_id, status=final_status, total_files=total_files, + successful_files=successful_files, + failed_files=failed_files, organization_id=organization_id, error_message=error_message, ) diff --git a/workers/scheduler/tasks.py b/workers/scheduler/tasks.py index 2e22946a58..65e5bd7af1 100644 --- a/workers/scheduler/tasks.py +++ b/workers/scheduler/tasks.py @@ -81,6 +81,7 @@ def _send_pipeline_status_notification( pipeline_id=pipeline_id, pipeline_name=pipeline_name, notification_payload=notification, + execution_id=execution_id, ) logger.info(f"Notification sent successfully for {pipeline_type} {pipeline_id}") except Exception as notification_error: diff --git a/workers/shared/api/internal_client.py b/workers/shared/api/internal_client.py index def90bd86a..4964a2b377 100644 --- a/workers/shared/api/internal_client.py +++ b/workers/shared/api/internal_client.py @@ -603,6 +603,8 @@ def update_workflow_execution_status( status: str, error_message: str | None = None, total_files: int | None = None, + successful_files: int | None = None, + failed_files: int | None = None, attempts: int | None = None, execution_time: float | None = None, organization_id: str | None = None, @@ -613,6 +615,8 @@ def update_workflow_execution_status( status, error_message, total_files, + successful_files, + failed_files, attempts, execution_time, organization_id, diff --git a/workers/shared/clients/execution_client.py b/workers/shared/clients/execution_client.py index 80e9ca6568..e1373eb9f5 100644 --- a/workers/shared/clients/execution_client.py +++ b/workers/shared/clients/execution_client.py @@ -265,6 +265,8 @@ def update_workflow_execution_status( status: str | TaskStatus, error_message: str | None = None, total_files: int | None = None, + successful_files: int | None = None, + failed_files: int | None = None, attempts: int | None = None, execution_time: float | None = None, organization_id: str | None = None, @@ -276,6 +278,8 @@ def update_workflow_execution_status( status: New status (TaskStatus enum or string) error_message: Optional error message total_files: Optional total files count + successful_files: Optional count of files that completed successfully + failed_files: Optional count of files that errored attempts: Optional attempts count execution_time: Optional execution time organization_id: Optional organization ID override @@ -292,6 +296,10 @@ def update_workflow_execution_status( data["error_message"] = error_message if total_files is not None: data["total_files"] = total_files + if successful_files is not None: + data["successful_files"] = successful_files + if failed_files is not None: + data["failed_files"] = failed_files if attempts is not None: data["attempts"] = attempts if execution_time is not None: diff --git a/workers/shared/patterns/notification/helper.py b/workers/shared/patterns/notification/helper.py index 977f5a875f..c4f4c5d518 100644 --- a/workers/shared/patterns/notification/helper.py +++ b/workers/shared/patterns/notification/helper.py @@ -104,6 +104,7 @@ def trigger_notification( pipeline_id: str, pipeline_name: str, notification_payload: NotificationPayload, + execution_id: str | None = None, ) -> None: """Trigger notifications for pipeline status updates. @@ -111,10 +112,13 @@ def trigger_notification( Uses API client to fetch notification configuration. """ try: - # Fetch pipeline notifications via API + # Pass execution_id so the backend filter respects notify_on_failures + # (see trigger_pipeline_notifications for the rationale). + params = {"execution_id": execution_id} if execution_id else None response_data = api_client._make_request( method="GET", endpoint=f"v1/webhook/pipeline/{pipeline_id}/notifications/", + params=params, timeout=10, ) @@ -176,10 +180,14 @@ def trigger_pipeline_notifications( return try: - # Fetch pipeline notifications via API + # Pass execution_id so the backend can drop notify_on_failures=True rows + # on success runs. Without it the endpoint is a no-op and we'd fire on + # every active row regardless of trigger preference. + params = {"execution_id": execution_id} if execution_id else None response_data = api_client._make_request( method="GET", endpoint=f"v1/webhook/pipeline/{pipeline_id}/notifications/", + params=params, timeout=10, ) @@ -204,7 +212,9 @@ def trigger_pipeline_notifications( else: workflow_type = WorkflowType.ETL # Default fallback - # Create notification payload using dataclass + # File counts come from WorkflowExecution via the same endpoint so + # webhook receivers (Slack, raw API) see partial-success breakdowns. + counts = response_data.get("execution_counts") or {} payload = NotificationPayload.from_execution_status( pipeline_id=pipeline_id, pipeline_name=pipeline_name, @@ -213,6 +223,9 @@ def trigger_pipeline_notifications( source=NotificationSource.CALLBACK_WORKER, execution_id=execution_id, error_message=error_message, + total_files=counts.get("total_files", 0), + successful_files=counts.get("successful_files", 0), + failed_files=counts.get("failed_files", 0), ) logger.info( @@ -261,9 +274,14 @@ def trigger_api_notifications( return try: - # Fetch API notifications via API + # See trigger_pipeline_notifications: execution_id powers the backend + # filter that respects notify_on_failures. + params = {"execution_id": execution_id} if execution_id else None response_data = api_client._make_request( - method="GET", endpoint=f"v1/webhook/api/{api_id}/notifications/", timeout=10 + method="GET", + endpoint=f"v1/webhook/api/{api_id}/notifications/", + params=params, + timeout=10, ) # _make_request already handles status codes and returns parsed data @@ -277,7 +295,7 @@ def trigger_api_notifications( logger.info(f"No active notifications found for API {api_id}") return - # Create notification payload using dataclass + counts = response_data.get("execution_counts") or {} payload = NotificationPayload.from_execution_status( pipeline_id=api_id, pipeline_name=api_name, @@ -286,6 +304,9 @@ def trigger_api_notifications( source=NotificationSource.CALLBACK_WORKER, execution_id=execution_id, error_message=error_message, + total_files=counts.get("total_files", 0), + successful_files=counts.get("successful_files", 0), + failed_files=counts.get("failed_files", 0), ) logger.info( From 9cd8eb1984891231e9cb1e673c0225be24534a55 Mon Sep 17 00:00:00 2001 From: kirtimanmishrazipstack Date: Tue, 5 May 2026 22:39:48 +0530 Subject: [PATCH 7/7] slack webhook payload --- .../tests/test_notification_filter.py | 321 ------------------ backend/pipeline_v2/dto.py | 14 +- .../core/src/unstract/core/data_models.py | 24 +- .../notification/providers/slack_webhook.py | 9 +- .../shared/patterns/notification/helper.py | 3 +- 5 files changed, 30 insertions(+), 341 deletions(-) delete mode 100644 backend/notification_v2/tests/test_notification_filter.py diff --git a/backend/notification_v2/tests/test_notification_filter.py b/backend/notification_v2/tests/test_notification_filter.py deleted file mode 100644 index 7f887be500..0000000000 --- a/backend/notification_v2/tests/test_notification_filter.py +++ /dev/null @@ -1,321 +0,0 @@ -"""Unit tests for the ``notify_on`` dispatch partition on notifications. - -Covers both dispatch paths that fan out to ``NotificationHelper.send_notification``: - -* ``APINotification.send`` — keyed on ``ExecutionStatus`` (ERROR, COMPLETED, STOPPED) -* ``PipelineNotification.send`` — keyed on ``Pipeline.PipelineStatus`` - (FAILURE, SUCCESS, INPROGRESS) - -Follows the repo convention (see ``usage_v2/tests/test_helper.py``) of stubbing -Django-heavy modules at import time so the tests run without a live DB. -""" - -from __future__ import annotations - -import sys -import types -from unittest.mock import MagicMock, patch - - -# --------------------------------------------------------------------------- -# Module-level stubs — must be installed BEFORE importing the modules under -# test so Django's ORM imports resolve to our MagicMock-backed fakes. -# --------------------------------------------------------------------------- - - -def _ensure_mod(name: str) -> types.ModuleType: - """Force-install a fresh stub module in ``sys.modules``.""" - mod = types.ModuleType(name) - sys.modules[name] = mod - return mod - - -def _install_stubs() -> None: - # Only stub leaf modules that pull in Django ORM. Parent packages - # (api_v2, pipeline_v2, notification_v2, workflow_manager*) load normally. - - exec_enums = _ensure_mod("workflow_manager.workflow_v2.enums") - - class _ExecStatusNS: - class ERROR: - value = "ERROR" - - class COMPLETED: - value = "COMPLETED" - - class STOPPED: - value = "STOPPED" - - exec_enums.ExecutionStatus = _ExecStatusNS # type: ignore[attr-defined] - - exec_models = _ensure_mod("workflow_manager.workflow_v2.models.execution") - exec_models.WorkflowExecution = MagicMock(name="WorkflowExecution") # type: ignore[attr-defined] - - api_models = _ensure_mod("api_v2.models") - api_models.APIDeployment = MagicMock(name="APIDeployment") # type: ignore[attr-defined] - - # notification_v2.models.Notification with a patchable ``objects``. - notif_models = _ensure_mod("notification_v2.models") - - class _FakeNotification: - objects = MagicMock(name="Notification.objects") - - notif_models.Notification = _FakeNotification # type: ignore[attr-defined] - - # notification_v2.helper.NotificationHelper - notif_helper = _ensure_mod("notification_v2.helper") - - class _FakeHelper: - send_notification = MagicMock(name="NotificationHelper.send_notification") - - notif_helper.NotificationHelper = _FakeHelper # type: ignore[attr-defined] - - # pipeline_v2.dto.PipelineStatusPayload - pipeline_dto = _ensure_mod("pipeline_v2.dto") - pipeline_dto.PipelineStatusPayload = MagicMock(name="PipelineStatusPayload") # type: ignore[attr-defined] - - # pipeline_v2.models.Pipeline with a PipelineStatus text-choices surface. - pipeline_models = _ensure_mod("pipeline_v2.models") - - class _PipelineStatus: - SUCCESS = "SUCCESS" - FAILURE = "FAILURE" - INPROGRESS = "INPROGRESS" - - class _FakePipeline: - PipelineStatus = _PipelineStatus - - pipeline_models.Pipeline = _FakePipeline # type: ignore[attr-defined] - - -_install_stubs() - - -# Now safe to import the modules under test. -from api_v2 import notification as api_notification_mod # noqa: E402 -from notification_v2.enums import NotificationTrigger # noqa: E402 -from notification_v2.helper import NotificationHelper # noqa: E402 -from notification_v2.models import Notification # noqa: E402 -from pipeline_v2 import notification as pipeline_notification_mod # noqa: E402 -from pipeline_v2.models import Pipeline # noqa: E402 - - -# --------------------------------------------------------------------------- -# Test helpers -# --------------------------------------------------------------------------- - - -def _make_queryset(notifications: list[MagicMock]) -> MagicMock: - """Return a MagicMock that mimics the chained QuerySet surface we use. - - Supports: - qs.filter(notify_on=) -> qs with matching rows - qs.exclude(notify_on=) -> qs with non-matching rows - qs.exists() -> bool based on contents - iter(qs) -> notifications - """ - qs = MagicMock(name="qs") - qs.__iter__ = lambda self: iter(notifications) - - def _filter(**kwargs): - if "notify_on" in kwargs: - target = kwargs["notify_on"] - kept = [n for n in notifications if n.notify_on == target] - return _make_queryset(kept) - return _make_queryset(notifications) - - def _exclude(**kwargs): - if "notify_on" in kwargs: - target = kwargs["notify_on"] - kept = [n for n in notifications if n.notify_on != target] - return _make_queryset(kept) - return _make_queryset(notifications) - - qs.filter.side_effect = _filter - qs.exclude.side_effect = _exclude - qs.exists.return_value = bool(notifications) - qs.count.return_value = len(notifications) - return qs - - -def _make_notification(*, notify_on: str) -> MagicMock: - n = MagicMock(name="Notification") - n.notify_on = notify_on - return n - - -# --------------------------------------------------------------------------- -# APINotification — 3 modes × 3 statuses -# --------------------------------------------------------------------------- - - -class TestAPINotificationFilter: - def _setup(self, *, status: str, notifications: list[MagicMock]): - Notification.objects.filter.reset_mock() - Notification.objects.filter.side_effect = None - Notification.objects.filter.return_value = _make_queryset(notifications) - NotificationHelper.send_notification.reset_mock() - - api = MagicMock(name="APIDeployment") - api.api_name = "test-api" - api.id = "api-uuid" - - execution = MagicMock(name="WorkflowExecution") - execution.status = status - execution.id = "exec-uuid" - execution.error_message = "boom" if status == "ERROR" else None - - return api_notification_mod.APINotification(api=api, workflow_execution=execution) - - # --- ALL: fires on every status --- - def test_all_fires_on_completed(self): - n = _make_notification(notify_on=NotificationTrigger.ALL.value) - self._setup(status="COMPLETED", notifications=[n]).send() - assert NotificationHelper.send_notification.call_count == 1 - - def test_all_fires_on_error(self): - n = _make_notification(notify_on=NotificationTrigger.ALL.value) - self._setup(status="ERROR", notifications=[n]).send() - assert NotificationHelper.send_notification.call_count == 1 - - def test_all_fires_on_stopped(self): - n = _make_notification(notify_on=NotificationTrigger.ALL.value) - self._setup(status="STOPPED", notifications=[n]).send() - assert NotificationHelper.send_notification.call_count == 1 - - # --- FAILURES_ONLY: fires on ERROR only --- - def test_failures_only_suppressed_on_completed(self): - n = _make_notification(notify_on=NotificationTrigger.FAILURES_ONLY.value) - self._setup(status="COMPLETED", notifications=[n]).send() - NotificationHelper.send_notification.assert_not_called() - - def test_failures_only_fires_on_error(self): - n = _make_notification(notify_on=NotificationTrigger.FAILURES_ONLY.value) - self._setup(status="ERROR", notifications=[n]).send() - assert NotificationHelper.send_notification.call_count == 1 - - def test_failures_only_suppressed_on_stopped(self): - n = _make_notification(notify_on=NotificationTrigger.FAILURES_ONLY.value) - self._setup(status="STOPPED", notifications=[n]).send() - NotificationHelper.send_notification.assert_not_called() - - # --- SUCCESS_ONLY: fires on COMPLETED only --- - def test_success_only_fires_on_completed(self): - n = _make_notification(notify_on=NotificationTrigger.SUCCESS_ONLY.value) - self._setup(status="COMPLETED", notifications=[n]).send() - assert NotificationHelper.send_notification.call_count == 1 - - def test_success_only_suppressed_on_error(self): - n = _make_notification(notify_on=NotificationTrigger.SUCCESS_ONLY.value) - self._setup(status="ERROR", notifications=[n]).send() - NotificationHelper.send_notification.assert_not_called() - - def test_success_only_suppressed_on_stopped(self): - n = _make_notification(notify_on=NotificationTrigger.SUCCESS_ONLY.value) - self._setup(status="STOPPED", notifications=[n]).send() - NotificationHelper.send_notification.assert_not_called() - - # --- Mixed partition on a COMPLETED run: ALL + SUCCESS_ONLY fire, FAILURES_ONLY doesn't --- - def test_mixed_partition_on_completed(self): - all_mode = _make_notification(notify_on=NotificationTrigger.ALL.value) - failures_only = _make_notification(notify_on=NotificationTrigger.FAILURES_ONLY.value) - success_only = _make_notification(notify_on=NotificationTrigger.SUCCESS_ONLY.value) - notifier = self._setup( - status="COMPLETED", notifications=[all_mode, failures_only, success_only] - ) - with patch.object(api_notification_mod, "PipelineStatusPayload") as payload_cls: - payload_cls.return_value.to_dict.return_value = {} - notifier.send() - - assert NotificationHelper.send_notification.call_count == 1 - kwargs = NotificationHelper.send_notification.call_args.kwargs - dispatched = sorted(n.notify_on for n in kwargs["notifications"]) - assert dispatched == ["ALL", "SUCCESS_ONLY"] - - -# --------------------------------------------------------------------------- -# PipelineNotification — 3 modes × 3 statuses -# --------------------------------------------------------------------------- - - -class TestPipelineNotificationFilter: - def _setup(self, *, last_run_status: str, notifications: list[MagicMock]): - Notification.objects.filter.reset_mock() - Notification.objects.filter.side_effect = None - Notification.objects.filter.return_value = _make_queryset(notifications) - NotificationHelper.send_notification.reset_mock() - - pipeline = MagicMock(name="Pipeline") - pipeline.id = "pipeline-uuid" - pipeline.pipeline_name = "test-pipeline" - pipeline.pipeline_type = "ETL" - pipeline.last_run_status = last_run_status - - return pipeline_notification_mod.PipelineNotification( - pipeline=pipeline, execution_id="exec-uuid", error_message=None - ) - - # --- ALL --- - def test_all_fires_on_success(self): - n = _make_notification(notify_on=NotificationTrigger.ALL.value) - self._setup( - last_run_status=Pipeline.PipelineStatus.SUCCESS, notifications=[n] - ).send() - assert NotificationHelper.send_notification.call_count == 1 - - def test_all_fires_on_failure(self): - n = _make_notification(notify_on=NotificationTrigger.ALL.value) - self._setup( - last_run_status=Pipeline.PipelineStatus.FAILURE, notifications=[n] - ).send() - assert NotificationHelper.send_notification.call_count == 1 - - # --- FAILURES_ONLY --- - def test_failures_only_suppressed_on_success(self): - n = _make_notification(notify_on=NotificationTrigger.FAILURES_ONLY.value) - self._setup( - last_run_status=Pipeline.PipelineStatus.SUCCESS, notifications=[n] - ).send() - NotificationHelper.send_notification.assert_not_called() - - def test_failures_only_fires_on_failure(self): - n = _make_notification(notify_on=NotificationTrigger.FAILURES_ONLY.value) - self._setup( - last_run_status=Pipeline.PipelineStatus.FAILURE, notifications=[n] - ).send() - assert NotificationHelper.send_notification.call_count == 1 - - # --- SUCCESS_ONLY --- - def test_success_only_fires_on_success(self): - n = _make_notification(notify_on=NotificationTrigger.SUCCESS_ONLY.value) - self._setup( - last_run_status=Pipeline.PipelineStatus.SUCCESS, notifications=[n] - ).send() - assert NotificationHelper.send_notification.call_count == 1 - - def test_success_only_suppressed_on_failure(self): - n = _make_notification(notify_on=NotificationTrigger.SUCCESS_ONLY.value) - self._setup( - last_run_status=Pipeline.PipelineStatus.FAILURE, notifications=[n] - ).send() - NotificationHelper.send_notification.assert_not_called() - - # --- Mixed partition on a SUCCESS run --- - def test_mixed_partition_on_success(self): - all_mode = _make_notification(notify_on=NotificationTrigger.ALL.value) - failures_only = _make_notification(notify_on=NotificationTrigger.FAILURES_ONLY.value) - success_only = _make_notification(notify_on=NotificationTrigger.SUCCESS_ONLY.value) - notifier = self._setup( - last_run_status=Pipeline.PipelineStatus.SUCCESS, - notifications=[all_mode, failures_only, success_only], - ) - with patch.object( - pipeline_notification_mod, "PipelineStatusPayload" - ) as payload_cls: - payload_cls.return_value.to_dict.return_value = {} - notifier.send() - - assert NotificationHelper.send_notification.call_count == 1 - kwargs = NotificationHelper.send_notification.call_args.kwargs - dispatched = sorted(n.notify_on for n in kwargs["notifications"]) - assert dispatched == ["ALL", "SUCCESS_ONLY"] diff --git a/backend/pipeline_v2/dto.py b/backend/pipeline_v2/dto.py index b8ad7c8707..d0f27e7943 100644 --- a/backend/pipeline_v2/dto.py +++ b/backend/pipeline_v2/dto.py @@ -25,15 +25,21 @@ def __init__( self.failed_files = failed_files def to_dict(self) -> dict[str, Any]: - """Convert the payload DTO to a dictionary.""" + """Convert the payload DTO to a dictionary. + + File counts are nested in `additional_data` to match the worker-path + payload shape (NotificationPayload.from_execution_status). + """ payload: dict[str, Any] = { "type": self.type, "pipeline_id": str(self.pipeline_id), "pipeline_name": self.pipeline_name, "status": self.status, - "total_files": self.total_files or 0, - "successful_files": self.successful_files or 0, - "failed_files": self.failed_files or 0, + "additional_data": { + "total_files": self.total_files or 0, + "successful_files": self.successful_files or 0, + "failed_files": self.failed_files or 0, + }, } if self.execution_id: payload["execution_id"] = str(self.execution_id) diff --git a/unstract/core/src/unstract/core/data_models.py b/unstract/core/src/unstract/core/data_models.py index 82da16a72e..7642aa9ce1 100644 --- a/unstract/core/src/unstract/core/data_models.py +++ b/unstract/core/src/unstract/core/data_models.py @@ -510,14 +510,10 @@ class NotificationPayload: error_message: str | None = None organization_id: str | None = None - # Per-run file aggregates surfaced into webhook payloads. - # Default 0 lets receivers switch on a numeric value without None-checks. - total_files: int = 0 - successful_files: int = 0 - failed_files: int = 0 - # Metadata timestamp: datetime = field(default_factory=lambda: datetime.now(UTC)) + # Per-run file aggregates (total/successful/failed) are nested here + # so receivers see them grouped rather than as top-level keys. additional_data: dict[str, Any] = field(default_factory=dict) # Internal tracking (not sent to external webhooks) @@ -608,6 +604,17 @@ def from_execution_status( f"Cannot create notification for non-final status: {execution_status}" ) + # File counts are bundled inside additional_data so receivers see + # them grouped (e.g. Slack renders one "Additional Data" section). + # Caller-supplied additional_data takes precedence on key conflict. + merged_additional = { + "total_files": total_files, + "successful_files": successful_files, + "failed_files": failed_files, + } + if additional_data: + merged_additional.update(additional_data) + return cls( type=workflow_type, pipeline_id=pipeline_id, @@ -616,10 +623,7 @@ def from_execution_status( execution_id=execution_id, error_message=error_message, organization_id=organization_id, - total_files=total_files, - successful_files=successful_files, - failed_files=failed_files, - additional_data=additional_data or {}, + additional_data=merged_additional, _source=source, ) diff --git a/workers/notification/providers/slack_webhook.py b/workers/notification/providers/slack_webhook.py index 89206646a5..04e3532fa3 100644 --- a/workers/notification/providers/slack_webhook.py +++ b/workers/notification/providers/slack_webhook.py @@ -199,11 +199,10 @@ def _format_value(self, value: Any) -> str: elif isinstance(value, (list, tuple)): return "\n• " + "\n• ".join(str(item) for item in value) elif isinstance(value, dict): - # Format nested dictionary - items = [] - for k, v in value.items(): - items.append(f" • {self._format_key(k)}: {v}") - return "\n" + "\n".join(items) + # Inline {Key: Value, Key: Value} so the receiver sees the + # whole dict on one line instead of a bulleted block. + items = [f"{self._format_key(k)}: {v}" for k, v in value.items()] + return "{" + ", ".join(items) + "}" elif value is None: return "_Not specified_" else: diff --git a/workers/shared/patterns/notification/helper.py b/workers/shared/patterns/notification/helper.py index c4f4c5d518..19f78fbb99 100644 --- a/workers/shared/patterns/notification/helper.py +++ b/workers/shared/patterns/notification/helper.py @@ -5,6 +5,7 @@ """ import logging +from typing import Any from celery import current_app @@ -335,7 +336,7 @@ def trigger_api_notifications( def handle_status_notifications( - api_client, + api_client: Any, pipeline_id: str, status: str, execution_id: str | None = None,