Skip to content
Draft
37 changes: 33 additions & 4 deletions backend/api_v2/notification.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,50 @@
from notification_v2.helper import NotificationHelper
from notification_v2.models import Notification
from pipeline_v2.dto import PipelineStatusPayload
from workflow_manager.workflow_v2.enums import ExecutionStatus
from workflow_manager.workflow_v2.models.execution import WorkflowExecution

from api_v2.models import APIDeployment

logger = logging.getLogger(__name__)


_FAILURE_STATUSES = {ExecutionStatus.ERROR.value, ExecutionStatus.STOPPED.value}


class APINotification:
def __init__(self, api: APIDeployment, workflow_execution: WorkflowExecution) -> None:
self.notifications = Notification.objects.filter(api=api, is_active=True)
self.api = api
self.workflow_execution = workflow_execution

def send(self):
if not self.notifications.count():
logger.info(f"No notifications found for api {self.api}")
def send(self) -> None:
# Failure if the run hit a non-success terminal state OR any file errored.
# Partial-success runs land as status=COMPLETED with failed_files>0, so the
# status check alone misses them — see callback aggregation rules.
failed_files = self.workflow_execution.failed_files or 0
is_failure = (
self.workflow_execution.status in _FAILURE_STATUSES or failed_files > 0
)
if not is_failure:
# Success path: skip rows that opted into failure-only alerts.
self.notifications = self.notifications.filter(notify_on_failures=False)

if not self.notifications.exists():
logger.info(
"No notifications to dispatch for api %s (status=%s, failed_files=%s)",
self.api,
self.workflow_execution.status,
failed_files,
)
return
logger.info(f"Sending api status notification for api {self.api}")
logger.info(
"Sending api status notification for api %s (status=%s, successful=%s, failed=%s)",
self.api,
self.workflow_execution.status,
self.workflow_execution.successful_files or 0,
failed_files,
)

payload_dto = PipelineStatusPayload(
type="API",
Expand All @@ -29,6 +55,9 @@ def send(self):
status=self.workflow_execution.status,
execution_id=self.workflow_execution.id,
error_message=self.workflow_execution.error_message,
total_files=self.workflow_execution.total_files,
successful_files=self.workflow_execution.successful_files,
failed_files=failed_files,
)

NotificationHelper.send_notification(
Expand Down
195 changes: 108 additions & 87 deletions backend/notification_v2/internal_api_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,18 @@
"""

import logging
from typing import Any, cast

from api_v2.models import APIDeployment
from django.http import JsonResponse
from django.db.models import QuerySet
from django.http import HttpRequest, JsonResponse
from django.shortcuts import get_object_or_404
from django.views.decorators.csrf import csrf_exempt
from django.views.decorators.http import require_http_methods
from pipeline_v2.models import Pipeline
from utils.organization_utils import filter_queryset_by_organization
from workflow_manager.workflow_v2.enums import ExecutionStatus
from workflow_manager.workflow_v2.models.execution import WorkflowExecution

from notification_v2.models import Notification

Expand All @@ -26,10 +30,71 @@
# Constants for error messages
INTERNAL_SERVER_ERROR_MSG = "Internal server error"

_FAILURE_STATUSES = {ExecutionStatus.ERROR.value, ExecutionStatus.STOPPED.value}


def _load_execution(execution_id: str | None) -> WorkflowExecution | None:
"""Best-effort lookup; returns None on missing id or unknown row."""
if not execution_id:
return None
try:
return cast(WorkflowExecution, WorkflowExecution.objects.get(id=execution_id))
except WorkflowExecution.DoesNotExist:
logger.warning("WorkflowExecution %s not found", execution_id)
return None


def _apply_failure_filter(
notifications_qs: QuerySet[Notification],
execution: WorkflowExecution | None,
) -> QuerySet[Notification]:
"""Drop notify_on_failures=True rows on success runs.

Mirrors the dispatch-side rule in backend/api_v2/notification.py and
backend/pipeline_v2/notification.py so both code paths agree on what
counts as a failure (status ∈ {ERROR, STOPPED} OR any file errored).

No execution → no filter, preserving legacy "return every active row"
behavior for callers that don't pass execution_id.
"""
if execution is None:
return notifications_qs
failed_files = execution.failed_files or 0
is_failure = execution.status in _FAILURE_STATUSES or failed_files > 0
if not is_failure:
notifications_qs = notifications_qs.filter(notify_on_failures=False)
return notifications_qs


def _execution_counts(execution: WorkflowExecution | None) -> dict[str, int]:
"""File counts surfaced into webhook payloads. Empty dict on no execution."""
if execution is None:
return {}
return {
"total_files": execution.total_files or 0,
"successful_files": execution.successful_files or 0,
"failed_files": execution.failed_files or 0,
}


def _serialize_notification(n: Notification) -> dict[str, Any]:
return {
"id": str(n.id),
"notification_type": n.notification_type,
"platform": n.platform,
"url": n.url,
"authorization_type": n.authorization_type,
"authorization_key": n.authorization_key,
"authorization_header": n.authorization_header,
"max_retries": n.max_retries,
"is_active": n.is_active,
"notify_on_failures": n.notify_on_failures,
}


@csrf_exempt # Safe: Internal API with Bearer token auth, service-to-service only
@require_http_methods(["GET"])
def get_pipeline_notifications(request, pipeline_id):
def get_pipeline_notifications(request: HttpRequest, pipeline_id: str) -> JsonResponse:
"""Get active notifications for a pipeline or API deployment.

Used by callback worker to fetch notification configuration.
Expand All @@ -41,83 +106,53 @@ def get_pipeline_notifications(request, pipeline_id):
pipeline_queryset, request, "organization"
)

execution = _load_execution(request.GET.get("execution_id"))
counts = _execution_counts(execution)

if pipeline_queryset.exists():
pipeline = pipeline_queryset.first()

# Get active notifications for this pipeline
notifications = Notification.objects.filter(pipeline=pipeline, is_active=True)

notifications_data = []
for notification in notifications:
notifications_data.append(
{
"id": str(notification.id),
"notification_type": notification.notification_type,
"platform": notification.platform,
"url": notification.url,
"authorization_type": notification.authorization_type,
"authorization_key": notification.authorization_key,
"authorization_header": notification.authorization_header,
"max_retries": notification.max_retries,
"is_active": notification.is_active,
}
)

notifications = _apply_failure_filter(notifications, execution)
serialized = [_serialize_notification(n) for n in notifications]
return JsonResponse(
{
"status": "success",
"pipeline_id": str(pipeline.id),
"pipeline_name": pipeline.pipeline_name,
"pipeline_type": pipeline.pipeline_type,
"notifications": notifications_data,
"notifications": serialized,
"execution_counts": counts,
}
)
else:
# If not found in Pipeline, try APIDeployment model
api_queryset = APIDeployment.objects.filter(id=pipeline_id)
api_queryset = filter_queryset_by_organization(
api_queryset, request, "organization"

# If not found in Pipeline, try APIDeployment model
api_queryset = APIDeployment.objects.filter(id=pipeline_id)
api_queryset = filter_queryset_by_organization(
api_queryset, request, "organization"
)
if api_queryset.exists():
api = api_queryset.first()
notifications = Notification.objects.filter(api=api, is_active=True)
notifications = _apply_failure_filter(notifications, execution)
serialized = [_serialize_notification(n) for n in notifications]
return JsonResponse(
{
"status": "success",
"pipeline_id": str(api.id),
"pipeline_name": api.api_name,
"pipeline_type": "API",
"notifications": serialized,
"execution_counts": counts,
}
)

if api_queryset.exists():
api = api_queryset.first()

# Get active notifications for this API deployment
notifications = Notification.objects.filter(api=api, is_active=True)

notifications_data = []
for notification in notifications:
notifications_data.append(
{
"id": str(notification.id),
"notification_type": notification.notification_type,
"platform": notification.platform,
"url": notification.url,
"authorization_type": notification.authorization_type,
"authorization_key": notification.authorization_key,
"authorization_header": notification.authorization_header,
"max_retries": notification.max_retries,
"is_active": notification.is_active,
}
)

return JsonResponse(
{
"status": "success",
"pipeline_id": str(api.id),
"pipeline_name": api.api_name,
"pipeline_type": "API",
"notifications": notifications_data,
}
)
else:
return JsonResponse(
{
"status": "error",
"message": "Pipeline or API deployment not found",
},
status=404,
)
return JsonResponse(
{
"status": "error",
"message": "Pipeline or API deployment not found",
},
status=404,
)
except Exception as e:
logger.error(f"Error getting pipeline notifications for {pipeline_id}: {e}")
return JsonResponse(
Expand All @@ -127,7 +162,7 @@ def get_pipeline_notifications(request, pipeline_id):

@csrf_exempt # Safe: Internal API with Bearer token auth, service-to-service only
@require_http_methods(["GET"])
def get_api_notifications(request, api_id):
def get_api_notifications(request: HttpRequest, api_id: str) -> JsonResponse:
"""Get active notifications for an API deployment.

Used by callback worker to fetch notification configuration.
Expand All @@ -140,32 +175,18 @@ def get_api_notifications(request, api_id):
)
api = get_object_or_404(api_queryset)

# Get active notifications for this API
execution = _load_execution(request.GET.get("execution_id"))
notifications = Notification.objects.filter(api=api, is_active=True)

notifications_data = []
for notification in notifications:
notifications_data.append(
{
"id": str(notification.id),
"notification_type": notification.notification_type,
"platform": notification.platform,
"url": notification.url,
"authorization_type": notification.authorization_type,
"authorization_key": notification.authorization_key,
"authorization_header": notification.authorization_header,
"max_retries": notification.max_retries,
"is_active": notification.is_active,
}
)
notifications = _apply_failure_filter(notifications, execution)

return JsonResponse(
{
"status": "success",
"api_id": str(api.id),
"api_name": api.api_name,
"display_name": api.display_name,
"notifications": notifications_data,
"notifications": [_serialize_notification(n) for n in notifications],
"execution_counts": _execution_counts(execution),
}
)

Expand All @@ -182,7 +203,7 @@ def get_api_notifications(request, api_id):

@csrf_exempt # Safe: Internal API with Bearer token auth, service-to-service only
@require_http_methods(["GET"])
def get_pipeline_data(request, pipeline_id):
def get_pipeline_data(request: HttpRequest, pipeline_id: str) -> JsonResponse:
"""Get basic pipeline data for notification purposes.

Used by callback worker to determine pipeline type and name.
Expand Down Expand Up @@ -218,7 +239,7 @@ def get_pipeline_data(request, pipeline_id):

@csrf_exempt # Safe: Internal API with Bearer token auth, service-to-service only
@require_http_methods(["GET"])
def get_api_data(request, api_id):
def get_api_data(request: HttpRequest, api_id: str) -> JsonResponse:
"""Get basic API deployment data for notification purposes.

Used by callback worker to determine API name and details.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from django.db import migrations, models


class Migration(migrations.Migration):
dependencies = [
("notification_v2", "0001_initial"),
]

operations = [
migrations.AddField(
model_name="notification",
name="notify_on_failures",
field=models.BooleanField(
default=False,
db_comment=(
"When True, fire only on failed runs — terminal status "
"ERROR/STOPPED or any file in the run errored (partial "
"failure). When False (default), fire on every terminal "
"completion."
),
),
),
]
8 changes: 8 additions & 0 deletions backend/notification_v2/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,14 @@ class Notification(BaseModel):
default=True,
db_comment="Flag indicating whether the notification is active or not.",
)
notify_on_failures = models.BooleanField(
default=False,
db_comment=(
"When True, fire only on failed runs — terminal status ERROR/STOPPED "
"or any file in the run errored (partial failure). When False "
"(default), fire on every terminal completion."
),
)
# Foreign keys to specific models
pipeline = models.ForeignKey(
Pipeline,
Expand Down
1 change: 1 addition & 0 deletions backend/notification_v2/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ class NotificationSerializer(serializers.ModelSerializer):
max_retries = serializers.IntegerField(
max_value=4, min_value=0, default=0, required=False
)
notify_on_failures = serializers.BooleanField(default=False, required=False)

class Meta:
model = Notification
Expand Down
Empty file.
Loading