-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy pathwebhook.py
More file actions
135 lines (114 loc) · 5.17 KB
/
webhook.py
File metadata and controls
135 lines (114 loc) · 5.17 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
#!/usr/bin/env python3
"""
Webhook notifier for Trivy results.
Formats results using the new grouped format for generic webhook consumption.
"""
from typing import Dict, Any, List
from collections import defaultdict
def format_notifications(mapping: Dict[str, Any], item_name: str = "Unknown", scan_type: str = "image") -> List[Dict[str, Any]]:
"""Format for generic webhook - grouped format.
Args:
mapping: Component mapping with alerts
item_name: Name of the scanned item
scan_type: Type of scan - 'vuln', 'image', or 'dockerfile'
"""
# Group vulnerabilities by package and severity
package_groups = defaultdict(lambda: defaultdict(set)) # Use set to avoid duplicates
findings: List[Dict[str, Any]] = []
if scan_type == 'dockerfile':
# Process dockerfile components
for comp in mapping.values():
for alert in comp.get('alerts', []):
props = alert.get('props', {}) or {}
rule_id = str(props.get('ruleId', '') or alert.get('title', ''))
severity = str(alert.get('severity', ''))
message = str(alert.get('description', ''))
resolution = str(props.get('resolution', ''))
rule_info = f"{rule_id}|{message}|{resolution}"
package_groups[rule_id][severity].add(rule_info)
findings.append({
'rule': rule_id,
'severity': severity,
'message': message,
'resolution': resolution,
'scanner': 'trivy',
})
else: # image or vuln
# Process package vulnerability components
for comp in mapping.values():
comp_name = str(comp.get('name') or comp.get('id') or '-')
comp_version = str(comp.get('version', ''))
ecosystem = comp.get('qualifiers', {}).get('ecosystem', 'unknown')
if comp_version:
package_key = f"pkg:{ecosystem}/{comp_name}@{comp_version}"
else:
package_key = f"pkg:{ecosystem}/{comp_name}"
for alert in comp.get('alerts', []):
props = alert.get('props', {}) or {}
cve_id = str(props.get('vulnerabilityId', '') or alert.get('title', ''))
severity = str(alert.get('severity', ''))
package_groups[package_key][severity].add(cve_id)
findings.append({
'package': comp_name,
'version': comp_version,
'ecosystem': ecosystem,
'purl': package_key,
'cves': [cve_id],
'severity': severity,
'scanner': 'trivy',
})
# Create rows with proper formatting
rows = []
severity_order = {'critical': 0, 'high': 1, 'medium': 2, 'low': 3}
if scan_type == 'dockerfile':
# Dockerfile format: Rule ID | Severity | Message | Resolution
for rule_id, severity_dict in package_groups.items():
for severity in sorted(severity_dict.keys(), key=lambda s: severity_order.get(s, 4)):
rule_infos = list(severity_dict[severity])
for rule_info in rule_infos:
parts = rule_info.split('|', 2)
if len(parts) >= 3:
_, message, resolution = parts
rows.append([
rule_id,
severity,
message,
resolution
])
headers = ['Rule ID', 'Severity', 'Message', 'Resolution']
else:
# Image format: Package | CVEs | Severity
for package_name, severity_dict in package_groups.items():
# Sort severities by criticality
sorted_severities = sorted(severity_dict.keys(), key=lambda s: severity_order.get(s, 4))
for severity in sorted_severities:
cves = sorted(list(severity_dict[severity])) # Convert set to sorted list
# Format CVEs as comma-separated list for webhook
cve_list = ', '.join(cves)
rows.append([
package_name,
cve_list,
severity
])
headers = ['Package', 'CVEs', 'Severity']
# Format for webhook - simple structure
if not rows:
content = "No vulnerabilities found."
else:
content_lines = [' | '.join(headers)]
content_lines.append(' | '.join(['---'] * len(headers)))
for row in rows:
content_lines.append(' | '.join(str(cell) for cell in row))
content = '\n'.join(content_lines)
# Create title based on scan type
if scan_type == 'vuln':
title = f'Socket CVE Scanning Results: {item_name}'
elif scan_type == 'dockerfile':
title = f'Socket Dockerfile Results: {item_name}'
else: # image
title = f'Socket Image Scanning Results: {item_name}'
return [{
'title': title,
'content': content,
'findings': findings,
}]