Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 27 additions & 11 deletions socket_basics/core/connector/opengrep/webhook.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,30 +35,45 @@ def format_notifications(groups: Dict[str, List[Dict[str, Any]]]) -> List[Dict[s
for subtype, items in groups.items():
if not items: # Skip empty groups
continue

rows = []
findings: List[Dict[str, Any]] = []
for item in items:
c = item['component']
a = item['alert']
props = a.get('props', {}) or {}
full_path = props.get('filePath', a.get('location', {}).get('path')) or '-'

try:
file_name = Path(full_path).name
except Exception:
file_name = full_path


rule = props.get('ruleId', a.get('title', ''))
severity = a.get('severity', '')
lines = f"{props.get('startLine','')}-{props.get('endLine','')}"

rows.append([
props.get('ruleId', a.get('title', '')),
a.get('severity', ''),
rule,
severity,
file_name,
full_path,
f"{props.get('startLine','')}-{props.get('endLine','')}",
lines,
props.get('codeSnippet', '') or '',
subtype,
'opengrep'
])


findings.append({
'rule': rule,
'severity': severity,
'file': file_name,
'path': full_path,
'lines': lines,
'language': subtype,
'scanner': 'opengrep',
})

# Create a separate dataset for each subtype/language group
display_name = subtype_names.get(subtype, subtype.upper())
headers = ['Rule', 'Severity', 'File', 'Path', 'Lines', 'Code', 'SubType', 'Scanner']
Expand All @@ -67,13 +82,14 @@ def format_notifications(groups: Dict[str, List[Dict[str, Any]]]) -> List[Dict[s
content_rows = []
for row in rows:
content_rows.append(' | '.join(str(cell) for cell in row))

content = '\n'.join([header_row, separator_row] + content_rows) if rows else f"No {display_name} issues found."

tables.append({
'title': display_name,
'content': content
'content': content,
'findings': findings,
})

# Return list of tables - one per language group
return tables
20 changes: 16 additions & 4 deletions socket_basics/core/connector/socket_tier1/webhook.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,19 @@ def _make_purl(comp: Dict[str, Any]) -> str:


def format_notifications(components_list: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Format for generic webhook - flexible structured format."""
"""Format for generic webhook - flexible structured format with findings."""
rows = []
findings: List[Dict[str, Any]] = []
for comp in components_list:
comp_name = str(comp.get('name') or comp.get('id') or '-')

for a in comp.get('alerts', []):
props = a.get('props', {}) or {}
purl = str(props.get('purl') or _make_purl(comp) or comp_name)
cve_id = str(props.get('ghsaId') or props.get('cveId') or a.get('title') or '')
severity = str(a.get('severity') or props.get('severity') or '')
reachability = str(props.get('reachability') or '')

rows.append([
cve_id,
severity,
Expand All @@ -46,6 +47,16 @@ def format_notifications(components_list: List[Dict[str, Any]]) -> List[Dict[str
str(props.get('ghsaId', '')),
'socket-tier1'
])

findings.append({
'package': comp_name,
'version': str(comp.get('version', '')),
'purl': purl,
'cve': cve_id,
'severity': severity,
'reachability': reachability,
'scanner': 'socket-tier1',
})

# Format as structured data for webhook
if not rows:
Expand All @@ -62,5 +73,6 @@ def format_notifications(components_list: List[Dict[str, Any]]) -> List[Dict[str

return [{
'title': 'Socket Tier1 Reachability Analysis',
'content': content
'content': content,
'findings': findings,
}]
32 changes: 26 additions & 6 deletions socket_basics/core/connector/trivy/webhook.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ def format_notifications(mapping: Dict[str, Any], item_name: str = "Unknown", sc
"""
# Group vulnerabilities by package and severity
package_groups = defaultdict(lambda: defaultdict(set)) # Use set to avoid duplicates

findings: List[Dict[str, Any]] = []

if scan_type == 'dockerfile':
# Process dockerfile components
for comp in mapping.values():
Expand All @@ -28,27 +29,45 @@ def format_notifications(mapping: Dict[str, Any], item_name: str = "Unknown", sc
severity = str(alert.get('severity', ''))
message = str(alert.get('description', ''))
resolution = str(props.get('resolution', ''))

rule_info = f"{rule_id}|{message}|{resolution}"
package_groups[rule_id][severity].add(rule_info)


findings.append({
'rule': rule_id,
'severity': severity,
'message': message,
'resolution': resolution,
'scanner': 'trivy',
})

else: # image or vuln
# Process package vulnerability components
for comp in mapping.values():
comp_name = str(comp.get('name') or comp.get('id') or '-')
comp_version = str(comp.get('version', ''))
ecosystem = comp.get('qualifiers', {}).get('ecosystem', 'unknown')

if comp_version:
package_key = f"pkg:{ecosystem}/{comp_name}@{comp_version}"
else:
package_key = f"pkg:{ecosystem}/{comp_name}"

for alert in comp.get('alerts', []):
props = alert.get('props', {}) or {}
cve_id = str(props.get('vulnerabilityId', '') or alert.get('title', ''))
severity = str(alert.get('severity', ''))
package_groups[package_key][severity].add(cve_id)

findings.append({
'package': comp_name,
'version': comp_version,
'ecosystem': ecosystem,
'purl': package_key,
'cves': [cve_id],
'severity': severity,
'scanner': 'trivy',
})

# Create rows with proper formatting
rows = []
Expand Down Expand Up @@ -111,5 +130,6 @@ def format_notifications(mapping: Dict[str, Any], item_name: str = "Unknown", sc

return [{
'title': title,
'content': content
'content': content,
'findings': findings,
}]
18 changes: 15 additions & 3 deletions socket_basics/core/connector/trufflehog/webhook.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@


def format_notifications(mapping: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Format for generic webhook - flexible structured format."""
"""Format for generic webhook - flexible structured format with findings."""
rows = []
findings: List[Dict[str, Any]] = []
for comp in mapping.values():
for a in comp.get('alerts', []):
props = a.get('props', {}) or {}
Expand All @@ -20,7 +21,7 @@ def format_notifications(mapping: Dict[str, Any]) -> List[Dict[str, Any]]:
redacted = str(props.get('redactedValue', ''))
verified = props.get('verified', False)
secret_type = str(props.get('secretType', ''))

rows.append([
detector,
severity,
Expand All @@ -31,6 +32,16 @@ def format_notifications(mapping: Dict[str, Any]) -> List[Dict[str, Any]]:
str(verified),
'trufflehog'
])

# Omit redacted_value from structured findings to avoid leaking secrets
findings.append({
'detector': detector,
'severity': severity,
'file': file_path,
'line': line,
'verified': verified,
'scanner': 'trufflehog',
})

# Format as structured data
if not rows:
Expand All @@ -47,5 +58,6 @@ def format_notifications(mapping: Dict[str, Any]) -> List[Dict[str, Any]]:

return [{
'title': 'TruffleHog Secret Detection Results',
'content': content
'content': content,
'findings': findings,
}]
38 changes: 27 additions & 11 deletions socket_basics/core/notification/webhook_notifier.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,25 @@
from typing import Any, Dict
from datetime import datetime, timezone
from typing import Any, Dict, List
import logging

import requests

from socket_basics.core.notification.base import BaseNotifier
from socket_basics.core.config import get_webhook_url

logger = logging.getLogger(__name__)


def _compute_summary(findings: List[Dict[str, Any]]) -> Dict[str, int]:
"""Compute severity counts from a findings list."""
counts: Dict[str, int] = {'total': len(findings), 'critical': 0, 'high': 0, 'medium': 0, 'low': 0}
for f in findings:
sev = str(f.get('severity', '')).lower()
if sev in counts:
counts[sev] += 1
return counts


class WebhookNotifier(BaseNotifier):
"""Webhook notifier: sends security findings to HTTP webhook endpoints.

Expand Down Expand Up @@ -47,12 +60,10 @@ def notify(self, facts: Dict[str, Any]) -> None:

# Send each notification as a separate webhook
for item in valid_notifications:
title = item['title']
content = item['content']
self._send_webhook(facts, title, content)
self._send_webhook(facts, item)

def _send_webhook(self, facts: Dict[str, Any], title: str, content: str) -> None:
"""Send a single webhook with title and content."""
def _send_webhook(self, facts: Dict[str, Any], item: Dict[str, Any]) -> None:
"""Send a single webhook with structured payload."""
if not self.url:
logger.warning('WebhookNotifier: no webhook URL configured')
return
Expand All @@ -61,25 +72,30 @@ def _send_webhook(self, facts: Dict[str, Any], title: str, content: str) -> None
repo = facts.get('repository', 'Unknown')
branch = facts.get('branch', 'Unknown')

# Create webhook payload with pre-formatted content
title = item['title']
content = item['content']
findings = item.get('findings', [])

payload = {
'repository': repo,
'branch': branch,
'scanner': 'socket-security',
'timestamp': facts.get('timestamp'),
'timestamp': datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ'),
'scan_type': title,
'summary': _compute_summary(findings),
'findings': findings,
'notification': {
'title': title,
'content': content
'content': content,
}
}

try:
import requests
resp = requests.post(self.url, json=payload, timeout=10)
if resp.status_code >= 400:
logger.warning('WebhookNotifier: HTTP error %s: %s', resp.status_code, resp.text[:200])
else:
logger.info('WebhookNotifier: sent webhook for "%s"', title)

except Exception as e:
logger.error('WebhookNotifier: exception sending webhook: %s', e)
Loading