Skip to content

Commit 5cb27fe

Browse files
authored
Merge pull request #122 from boazmichaely/add-acs-rich-policy-report
Add ACS Rich Policy Report utility
2 parents 45b7f88 + 5dc8bd3 commit 5cb27fe

3 files changed

Lines changed: 476 additions & 0 deletions

File tree

Lines changed: 396 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,396 @@
1+
#!/usr/bin/env python3
2+
"""
3+
Script to generate an enriched report of ACS (Advanced Cluster Security) policies with
4+
human-readable MITRE ATT&CK information and export them to a CSV file.
5+
6+
This tool fetches the MITRE ATT&CK framework to provide tactics and techniques with
7+
descriptions, not just IDs.
8+
9+
Usage with API Token:
10+
export ROX_API_TOKEN="your-api-token"
11+
export ROX_CENTRAL_ADDRESS="central.example.com:443"
12+
python3 ACS_rich_policy_report.py [output_file.csv]
13+
14+
Usage with Username/Password:
15+
export ROX_ADMIN_USER="admin"
16+
export ROX_ADMIN_PASSWORD="your-password"
17+
export ROX_CENTRAL_ADDRESS="central.example.com:443"
18+
python3 ACS_rich_policy_report.py [output_file.csv]
19+
20+
Environment Variables:
21+
ROX_API_TOKEN: API token for authentication (option 1)
22+
ROX_ADMIN_USER: Admin username (option 2)
23+
ROX_ADMIN_PASSWORD: Admin password (option 2)
24+
ROX_CENTRAL_ADDRESS: ACS Central address (e.g., central.example.com:443)
25+
"""
26+
27+
import requests
28+
import csv
29+
import json
30+
import sys
31+
import os
32+
from typing import List, Dict, Any, Optional
33+
import urllib3
34+
35+
# Disable SSL warnings for demo/development environments
36+
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
37+
38+
39+
class ACSRichPolicyReporter:
40+
def __init__(self, central_address: str, api_token: Optional[str] = None,
41+
username: Optional[str] = None, password: Optional[str] = None):
42+
"""
43+
Initialize the ACS Rich Policy Reporter
44+
45+
Args:
46+
central_address: ACS Central address (e.g., central.example.com:443)
47+
api_token: API token for authentication (optional)
48+
username: Admin username (optional)
49+
password: Admin password (optional)
50+
"""
51+
# Clean up the address to build proper URL
52+
self.central_address = central_address.strip()
53+
54+
# Remove protocol if present
55+
if self.central_address.startswith('http://'):
56+
self.central_address = self.central_address[7:]
57+
elif self.central_address.startswith('https://'):
58+
self.central_address = self.central_address[8:]
59+
60+
# Build base URL
61+
self.base_url = f"https://{self.central_address}"
62+
63+
self.session = requests.Session()
64+
self.session.headers.update({
65+
'Content-Type': 'application/json'
66+
})
67+
68+
# Set up authentication
69+
if api_token:
70+
self.session.headers.update({
71+
'Authorization': f'Bearer {api_token}'
72+
})
73+
self.auth_method = "API Token"
74+
elif username and password:
75+
self.session.auth = (username, password)
76+
self.auth_method = "Username/Password"
77+
else:
78+
raise ValueError("Either api_token or username/password must be provided")
79+
80+
def fetch_mitre_framework(self) -> Dict[str, Dict[str, str]]:
81+
"""
82+
Fetch MITRE ATT&CK framework from ACS
83+
84+
Returns:
85+
Dictionary mapping MITRE IDs to names
86+
"""
87+
url = f"{self.base_url}/v1/mitreattackvectors"
88+
89+
try:
90+
response = self.session.get(url, verify=False)
91+
response.raise_for_status()
92+
data = response.json()
93+
94+
mitre_map = {}
95+
vectors = data.get('mitreAttackVectors', [])
96+
97+
for vector in vectors:
98+
tactic = vector.get('tactic', {})
99+
tactic_id = tactic.get('id', '')
100+
tactic_name = tactic.get('name', '')
101+
102+
if tactic_id and tactic_name:
103+
mitre_map[tactic_id] = tactic_name
104+
105+
# Map techniques
106+
techniques = vector.get('techniques', [])
107+
for technique in techniques:
108+
tech_id = technique.get('id', '')
109+
tech_name = technique.get('name', '')
110+
if tech_id and tech_name:
111+
mitre_map[tech_id] = tech_name
112+
113+
return mitre_map
114+
except requests.exceptions.RequestException as e:
115+
print(f"Warning: Could not fetch MITRE framework: {e}", file=sys.stderr)
116+
return {}
117+
118+
def fetch_policy_list(self) -> List[str]:
119+
"""
120+
Fetch list of policy IDs from the ACS API
121+
122+
Returns:
123+
List of policy IDs
124+
"""
125+
url = f"{self.base_url}/v1/policies"
126+
127+
print(f"Fetching policy list from: {url}")
128+
print(f"Using authentication method: {self.auth_method}")
129+
130+
try:
131+
response = self.session.get(url, verify=False)
132+
response.raise_for_status()
133+
data = response.json()
134+
policies = data.get('policies', [])
135+
return [p.get('id') for p in policies if p.get('id')]
136+
except requests.exceptions.RequestException as e:
137+
print(f"\nError fetching policy list: {e}", file=sys.stderr)
138+
if hasattr(e, 'response') and e.response is not None:
139+
print(f"Response status: {e.response.status_code}", file=sys.stderr)
140+
print(f"Response body: {e.response.text[:500]}", file=sys.stderr)
141+
sys.exit(1)
142+
143+
def fetch_policy_details(self, policy_id: str) -> Dict[str, Any]:
144+
"""
145+
Fetch full details for a specific policy
146+
147+
Args:
148+
policy_id: Policy ID
149+
150+
Returns:
151+
Full policy dictionary with all details including MITRE ATT&CK data
152+
"""
153+
url = f"{self.base_url}/v1/policies/{policy_id}"
154+
155+
try:
156+
response = self.session.get(url, verify=False)
157+
response.raise_for_status()
158+
# API returns policy object directly, not wrapped
159+
return response.json()
160+
except requests.exceptions.RequestException as e:
161+
print(f"Warning: Could not fetch details for policy {policy_id}: {e}", file=sys.stderr)
162+
return {}
163+
164+
def fetch_policies(self) -> List[Dict[str, Any]]:
165+
"""
166+
Fetch all policies with full details from the ACS API
167+
168+
Returns:
169+
List of policy dictionaries with full details
170+
"""
171+
# First get the list of policy IDs
172+
policy_ids = self.fetch_policy_list()
173+
print(f"Found {len(policy_ids)} policies")
174+
175+
# Fetch full details for each policy
176+
print("Fetching full details for each policy...")
177+
policies = []
178+
for i, policy_id in enumerate(policy_ids, 1):
179+
if i % 10 == 0:
180+
print(f" Progress: {i}/{len(policy_ids)}")
181+
policy = self.fetch_policy_details(policy_id)
182+
if policy:
183+
policies.append(policy)
184+
185+
return policies
186+
187+
def format_mitre_tactics(self, mitre_vectors: List[Dict[str, Any]], mitre_map: Dict[str, str]) -> str:
188+
"""
189+
Format MITRE ATT&CK tactics with human-readable names
190+
191+
Args:
192+
mitre_vectors: List of MITRE ATT&CK vector dictionaries
193+
mitre_map: Mapping of MITRE IDs to names
194+
195+
Returns:
196+
Formatted string with tactics and names
197+
"""
198+
if not mitre_vectors:
199+
return ""
200+
201+
tactics = []
202+
for vector in mitre_vectors:
203+
tactic_id = vector.get('tactic', '')
204+
if tactic_id:
205+
tactic_name = mitre_map.get(tactic_id, '')
206+
if tactic_name:
207+
tactics.append(f"{tactic_id} ({tactic_name})")
208+
else:
209+
tactics.append(tactic_id)
210+
211+
return ", ".join(tactics)
212+
213+
def format_mitre_techniques(self, mitre_vectors: List[Dict[str, Any]], mitre_map: Dict[str, str]) -> str:
214+
"""
215+
Format MITRE ATT&CK techniques with human-readable names
216+
217+
Args:
218+
mitre_vectors: List of MITRE ATT&CK vector dictionaries
219+
mitre_map: Mapping of MITRE IDs to names
220+
221+
Returns:
222+
Formatted string with techniques and names
223+
"""
224+
if not mitre_vectors:
225+
return ""
226+
227+
formatted_parts = []
228+
for vector in mitre_vectors:
229+
tactic_id = vector.get('tactic', '')
230+
techniques = vector.get('techniques', [])
231+
232+
if tactic_id and techniques:
233+
for technique_id in techniques:
234+
technique_name = mitre_map.get(technique_id, '')
235+
if technique_name:
236+
formatted_parts.append(f"{tactic_id}: {technique_id} ({technique_name})")
237+
else:
238+
formatted_parts.append(f"{tactic_id}: {technique_id}")
239+
240+
return " | ".join(formatted_parts)
241+
242+
def export_to_csv(self, policies: List[Dict[str, Any]], mitre_map: Dict[str, str], output_file: str = "acs_policies.csv"):
243+
"""
244+
Export policies to a CSV file
245+
246+
Args:
247+
policies: List of policy dictionaries
248+
mitre_map: Mapping of MITRE IDs to names
249+
output_file: Output CSV filename
250+
"""
251+
if not policies:
252+
print("No policies found to export", file=sys.stderr)
253+
return
254+
255+
# Define CSV columns
256+
fieldnames = [
257+
'Policy ID',
258+
'Policy Name',
259+
'Description',
260+
'Severity',
261+
'Disabled',
262+
'Categories',
263+
'MITRE ATT&CK Tactics',
264+
'MITRE ATT&CK Techniques',
265+
'Lifecycle Stages',
266+
'Is Default',
267+
'Enforcement'
268+
]
269+
270+
with open(output_file, 'w', newline='', encoding='utf-8') as csvfile:
271+
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
272+
writer.writeheader()
273+
274+
for policy in policies:
275+
# Extract MITRE ATT&CK information
276+
mitre_vectors = policy.get('mitreAttackVectors', [])
277+
tactics_str = self.format_mitre_tactics(mitre_vectors, mitre_map)
278+
techniques_str = self.format_mitre_techniques(mitre_vectors, mitre_map)
279+
280+
# Extract other fields
281+
categories = ", ".join(policy.get('categories', []))
282+
lifecycle_stages = ", ".join(policy.get('lifecycleStages', []))
283+
284+
# Extract enforcement actions
285+
enforcement_actions = []
286+
for action in policy.get('enforcementActions', []):
287+
enforcement_actions.append(action)
288+
enforcement_str = ", ".join(enforcement_actions)
289+
290+
row = {
291+
'Policy ID': policy.get('id', ''),
292+
'Policy Name': policy.get('name', ''),
293+
'Description': policy.get('description', ''),
294+
'Severity': policy.get('severity', ''),
295+
'Disabled': policy.get('disabled', False),
296+
'Categories': categories,
297+
'MITRE ATT&CK Tactics': tactics_str,
298+
'MITRE ATT&CK Techniques': techniques_str,
299+
'Lifecycle Stages': lifecycle_stages,
300+
'Is Default': policy.get('isDefault', False),
301+
'Enforcement': enforcement_str
302+
}
303+
304+
writer.writerow(row)
305+
306+
print(f"\nSuccessfully exported {len(policies)} policies to {output_file}")
307+
308+
def run(self, output_file: str = "acs_policies.csv"):
309+
"""
310+
Main execution method
311+
312+
Args:
313+
output_file: Output CSV filename
314+
"""
315+
# Fetch MITRE ATT&CK framework
316+
print("Fetching MITRE ATT&CK framework...")
317+
mitre_map = self.fetch_mitre_framework()
318+
print(f"Loaded {len(mitre_map)} MITRE ATT&CK entries")
319+
print()
320+
321+
# Fetch policies
322+
policies = self.fetch_policies()
323+
print(f"\nSuccessfully fetched {len(policies)} policies with full details")
324+
325+
# Count policies with MITRE ATT&CK data
326+
policies_with_mitre = sum(1 for p in policies if p.get('mitreAttackVectors'))
327+
print(f"Policies with MITRE ATT&CK data: {policies_with_mitre}")
328+
329+
print(f"\nExporting to {output_file}...")
330+
self.export_to_csv(policies, mitre_map, output_file)
331+
332+
333+
def main():
334+
"""
335+
Main entry point
336+
"""
337+
# Read from standard ACS environment variables
338+
api_token = os.getenv('ROX_API_TOKEN')
339+
username = os.getenv('ROX_ADMIN_USER')
340+
password = os.getenv('ROX_ADMIN_PASSWORD')
341+
central_address = os.getenv('ROX_CENTRAL_ADDRESS')
342+
343+
# Validation
344+
if not central_address:
345+
print("ERROR: ROX_CENTRAL_ADDRESS environment variable is not set", file=sys.stderr)
346+
print("\nUsage:", file=sys.stderr)
347+
print(" Option 1 - With API Token:", file=sys.stderr)
348+
print(" export ROX_API_TOKEN='your-api-token'", file=sys.stderr)
349+
print(" export ROX_CENTRAL_ADDRESS='central.example.com:443'", file=sys.stderr)
350+
print(" python3 ACS_rich_policy_report.py [output_file.csv]", file=sys.stderr)
351+
print("\n Option 2 - With Username/Password:", file=sys.stderr)
352+
print(" export ROX_ADMIN_USER='admin'", file=sys.stderr)
353+
print(" export ROX_ADMIN_PASSWORD='your-password'", file=sys.stderr)
354+
print(" export ROX_CENTRAL_ADDRESS='central.example.com:443'", file=sys.stderr)
355+
print(" python3 ACS_rich_policy_report.py [output_file.csv]", file=sys.stderr)
356+
sys.exit(1)
357+
358+
if not api_token and not (username and password):
359+
print("ERROR: Either ROX_API_TOKEN or both ROX_ADMIN_USER and ROX_ADMIN_PASSWORD must be set", file=sys.stderr)
360+
print("\nUsage:", file=sys.stderr)
361+
print(" Option 1 - With API Token:", file=sys.stderr)
362+
print(" export ROX_API_TOKEN='your-api-token'", file=sys.stderr)
363+
print(" export ROX_CENTRAL_ADDRESS='central.example.com:443'", file=sys.stderr)
364+
print(" python3 ACS_rich_policy_report.py [output_file.csv]", file=sys.stderr)
365+
print("\n Option 2 - With Username/Password:", file=sys.stderr)
366+
print(" export ROX_ADMIN_USER='admin'", file=sys.stderr)
367+
print(" export ROX_ADMIN_PASSWORD='your-password'", file=sys.stderr)
368+
print(" export ROX_CENTRAL_ADDRESS='central.example.com:443'", file=sys.stderr)
369+
print(" python3 ACS_rich_policy_report.py [output_file.csv]", file=sys.stderr)
370+
sys.exit(1)
371+
372+
# Get output file from command line argument or use default
373+
if len(sys.argv) > 1:
374+
output_file = sys.argv[1]
375+
else:
376+
output_file = "acs_policies.csv"
377+
378+
print("=" * 60)
379+
print("ACS Rich Policy Report with MITRE ATT&CK Information")
380+
print("=" * 60)
381+
print(f"Central Address: {central_address}")
382+
print(f"Output File: {output_file}")
383+
print("=" * 60)
384+
print()
385+
386+
try:
387+
reporter = ACSRichPolicyReporter(central_address, api_token, username, password)
388+
reporter.run(output_file)
389+
print("\nDone!")
390+
except Exception as e:
391+
print(f"\nError: {e}", file=sys.stderr)
392+
sys.exit(1)
393+
394+
395+
if __name__ == "__main__":
396+
main()

0 commit comments

Comments
 (0)