-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy pathwebhook_notifier.py
More file actions
101 lines (79 loc) · 3.48 KB
/
webhook_notifier.py
File metadata and controls
101 lines (79 loc) · 3.48 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
from datetime import datetime, timezone
from typing import Any, Dict, List
import logging
import requests
from socket_basics.core.notification.base import BaseNotifier
from socket_basics.core.config import get_webhook_url
logger = logging.getLogger(__name__)
def _compute_summary(findings: List[Dict[str, Any]]) -> Dict[str, int]:
"""Compute severity counts from a findings list."""
counts: Dict[str, int] = {'total': len(findings), 'critical': 0, 'high': 0, 'medium': 0, 'low': 0}
for f in findings:
sev = str(f.get('severity', '')).lower()
if sev in counts:
counts[sev] += 1
return counts
class WebhookNotifier(BaseNotifier):
"""Webhook notifier: sends security findings to HTTP webhook endpoints.
Simplified version that works with pre-formatted content from connectors.
"""
name = "webhook"
def __init__(self, params: Dict[str, Any] | None = None):
super().__init__(params or {})
# Webhook URL from params, env variable, or app config
self.url = (
self.config.get('webhook_url') or
get_webhook_url()
)
def notify(self, facts: Dict[str, Any]) -> None:
notifications = facts.get('notifications', []) or []
if not isinstance(notifications, list):
logger.error('WebhookNotifier: only supports new format - list of dicts with title/content')
return
if not notifications:
logger.info('WebhookNotifier: no notifications present; skipping')
return
# Validate format
valid_notifications = []
for item in notifications:
if isinstance(item, dict) and 'title' in item and 'content' in item:
valid_notifications.append(item)
else:
logger.warning('WebhookNotifier: skipping invalid notification item: %s', type(item))
if not valid_notifications:
return
# Send each notification as a separate webhook
for item in valid_notifications:
self._send_webhook(facts, item)
def _send_webhook(self, facts: Dict[str, Any], item: Dict[str, Any]) -> None:
"""Send a single webhook with structured payload."""
if not self.url:
logger.warning('WebhookNotifier: no webhook URL configured')
return
# Get repository and branch info from facts (populated by NotificationManager)
repo = facts.get('repository', 'Unknown')
branch = facts.get('branch', 'Unknown')
title = item['title']
content = item['content']
findings = item.get('findings', [])
payload = {
'repository': repo,
'branch': branch,
'scanner': 'socket-security',
'timestamp': datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ'),
'scan_type': title,
'summary': _compute_summary(findings),
'findings': findings,
'notification': {
'title': title,
'content': content,
}
}
try:
resp = requests.post(self.url, json=payload, timeout=10)
if resp.status_code >= 400:
logger.warning('WebhookNotifier: HTTP error %s: %s', resp.status_code, resp.text[:200])
else:
logger.info('WebhookNotifier: sent webhook for "%s"', title)
except Exception as e:
logger.error('WebhookNotifier: exception sending webhook: %s', e)