|
| 1 | +import requests |
| 2 | +import base64 |
| 3 | +import getpass |
| 4 | +import json |
| 5 | +import csv |
| 6 | +from datetime import datetime |
| 7 | + |
| 8 | +def read_creds_file(filename="../.creds"): |
| 9 | + """Read credentials from a .creds file""" |
| 10 | + creds = {} |
| 11 | + try: |
| 12 | + with open(filename, "r") as f: |
| 13 | + for line in f: |
| 14 | + line = line.strip() |
| 15 | + if line and not line.startswith("#"): |
| 16 | + key, value = line.split("=", 1) |
| 17 | + creds[key] = value |
| 18 | + except FileNotFoundError: |
| 19 | + print(f"Warning: {filename} file not found. Please input values.") |
| 20 | + return creds |
| 21 | + |
| 22 | +def get_vulnerabilities(headers, org_id, offset=0, limit=100): |
| 23 | + """Fetch vulnerabilities from the organization""" |
| 24 | + url = f"{contrast_url}/api/ng/organizations/{org_id}/orgtraces/ui?expand=application,session_metadata&offset={offset}&limit={limit}&sort=-severity" |
| 25 | + |
| 26 | + # POST body with filter parameters |
| 27 | + post_body = { |
| 28 | + "quickFilter": "OPEN", |
| 29 | + "modules": [], |
| 30 | + "servers": [], |
| 31 | + "filterTags": [], |
| 32 | + "severities": [], |
| 33 | + "status": [], |
| 34 | + "substatus": [], |
| 35 | + "vulnTypes": [], |
| 36 | + "environments": [], |
| 37 | + "urls": [], |
| 38 | + "sinks": [], |
| 39 | + "securityStandards": [], |
| 40 | + "appVersionTags": [], |
| 41 | + "routes": [], |
| 42 | + "tracked": False, |
| 43 | + "untracked": False, |
| 44 | + "technologies": [], |
| 45 | + "applicationTags": [], |
| 46 | + "applicationMetadataFilters": [], |
| 47 | + "applicationImportances": [], |
| 48 | + "languages": [], |
| 49 | + "licensedOnly": False, |
| 50 | + "protectStatuses": [] |
| 51 | + } |
| 52 | + response = requests.post(url, headers=headers, json=post_body) |
| 53 | + return response |
| 54 | + |
| 55 | +def get_vulnerability_cwe(headers, org_id, vuln_id): |
| 56 | + """Fetch CWE information for a specific vulnerability""" |
| 57 | + url = f"{contrast_url}/api/ng/{org_id}/traces/{vuln_id}/recommendation?expand=skip_links" |
| 58 | + try: |
| 59 | + response = requests.get(url, headers=headers) |
| 60 | + if response.status_code == 200: |
| 61 | + data = response.json() |
| 62 | + cwe_url = data.get("recommendation", {}).get("cwe", "") |
| 63 | + if cwe_url: |
| 64 | + # Extract CWE ID from URL (e.g., "https://cwe.mitre.org/data/definitions/89.html" -> "CWE-89") |
| 65 | + if "/" in cwe_url: |
| 66 | + cwe_number = cwe_url.rstrip("/").split("/")[-1].replace(".html", "") |
| 67 | + return f"CWE-{cwe_number}" |
| 68 | + return cwe_url |
| 69 | + return "" |
| 70 | + except Exception as e: |
| 71 | + print(f"Error fetching CWE for {vuln_id}: {e}") |
| 72 | + return "" |
| 73 | + |
| 74 | +def format_timestamp(timestamp): |
| 75 | + """Convert millisecond timestamp to readable format""" |
| 76 | + if timestamp: |
| 77 | + return datetime.fromtimestamp(timestamp / 1000).strftime('%Y-%m-%d %H:%M:%S') |
| 78 | + return "" |
| 79 | + |
| 80 | +def format_date_only(timestamp): |
| 81 | + """Convert millisecond timestamp to date only""" |
| 82 | + if timestamp: |
| 83 | + return datetime.fromtimestamp(timestamp / 1000).strftime('%Y-%m-%d') |
| 84 | + return "" |
| 85 | + |
| 86 | +def extract_vulnerability_data(item, headers, org_id): |
| 87 | + """Extract vulnerability data from API response""" |
| 88 | + vuln = item.get("vulnerability", {}) |
| 89 | + app = vuln.get("application", {}) |
| 90 | + bug_tracker = vuln.get("bugTracker", {}) |
| 91 | + vuln_id = vuln.get("uuid", "") |
| 92 | + |
| 93 | + # Extract bug tracker information |
| 94 | + bug_tracker_id = "" |
| 95 | + bug_tracker_url = "" |
| 96 | + bug_tracker_tickets = bug_tracker.get("bugTrackerTickets", []) |
| 97 | + if bug_tracker_tickets and len(bug_tracker_tickets) > 0: |
| 98 | + first_ticket = bug_tracker_tickets[0] |
| 99 | + bug_tracker_id = first_ticket.get("ticketKey", "") |
| 100 | + bug_tracker_url = first_ticket.get("ticketUrl", "") |
| 101 | + |
| 102 | + # Get CWE ID from recommendation endpoint |
| 103 | + cwe_id = "" |
| 104 | + if vuln_id: |
| 105 | + cwe_id = get_vulnerability_cwe(headers, org_id, vuln_id) |
| 106 | + |
| 107 | + # Get parent application name (directly from application object) |
| 108 | + parent_app_name = app.get("parentApplicationName", "") |
| 109 | + |
| 110 | + return { |
| 111 | + "Vulnerability Name": vuln.get("title", ""), |
| 112 | + "Vulnerability ID": vuln.get("uuid", ""), |
| 113 | + "Rule Name": vuln.get("ruleName", ""), |
| 114 | + "Severity": vuln.get("severity", ""), |
| 115 | + "Status": vuln.get("status", ""), |
| 116 | + "First Seen": format_date_only(vuln.get("firstDetected")), |
| 117 | + "First Seen Datetime": format_timestamp(vuln.get("firstDetected")), |
| 118 | + "Last Seen": format_date_only(vuln.get("lastDetected")), |
| 119 | + "Last Seen Datetime": format_timestamp(vuln.get("lastDetected")), |
| 120 | + "Application Name": app.get("name", ""), |
| 121 | + "Application ID": app.get("id", ""), |
| 122 | + "Application Code": app.get("contextPath", ""), |
| 123 | + "Parent Application Name": parent_app_name, |
| 124 | + "CWE ID": cwe_id, |
| 125 | + "Bug Tracker ID": bug_tracker_id, |
| 126 | + "Bug Tracker URL": bug_tracker_url |
| 127 | + } |
| 128 | + |
| 129 | +def main(): |
| 130 | + # Read credentials from .creds file |
| 131 | + creds = read_creds_file() |
| 132 | + |
| 133 | + global contrast_url |
| 134 | + contrast_url = creds.get("CONTRAST_URL", "") |
| 135 | + org_id = creds.get("ORG_ID", "") |
| 136 | + username = creds.get("USERNAME", "") |
| 137 | + api_key = creds.get("API_KEY", "") |
| 138 | + service_key = creds.get("SERVICE_KEY", "") |
| 139 | + |
| 140 | + # Prompt for credentials if not in .creds file |
| 141 | + msg = f"Enter your Contrast URL (blank will use default '{contrast_url}'): " |
| 142 | + contrast_url_input = input(msg) |
| 143 | + if contrast_url_input.strip(): |
| 144 | + contrast_url = contrast_url_input |
| 145 | + else: |
| 146 | + while not contrast_url_input.strip() and not contrast_url.strip(): |
| 147 | + print("Contrast URL cannot be blank.") |
| 148 | + contrast_url_input = input(msg) |
| 149 | + contrast_url = contrast_url_input |
| 150 | + |
| 151 | + msg = f"Enter your Organization ID (blank will use default '{org_id}'): " |
| 152 | + org_id_input = input(msg) |
| 153 | + if org_id_input.strip(): |
| 154 | + org_id = org_id_input |
| 155 | + else: |
| 156 | + while not org_id_input.strip() and not org_id.strip(): |
| 157 | + print("Organization ID cannot be blank.") |
| 158 | + org_id_input = input(msg) |
| 159 | + org_id = org_id_input |
| 160 | + |
| 161 | + msg = f"Enter your username (blank will use default '{username}'): " |
| 162 | + username_input = input(msg) |
| 163 | + if username_input.strip(): |
| 164 | + username = username_input |
| 165 | + else: |
| 166 | + while not username_input.strip() and not username.strip(): |
| 167 | + print("Username cannot be blank.") |
| 168 | + username_input = input(msg) |
| 169 | + username = username_input |
| 170 | + |
| 171 | + msg = f"Enter your API key (blank will use default '****************************'): " |
| 172 | + api_key_input = getpass.getpass(msg) |
| 173 | + if api_key_input.strip(): |
| 174 | + api_key = api_key_input |
| 175 | + else: |
| 176 | + while not api_key_input.strip() and not api_key.strip(): |
| 177 | + print("API key cannot be blank.") |
| 178 | + api_key_input = getpass.getpass(msg) |
| 179 | + api_key = api_key_input |
| 180 | + |
| 181 | + msg = f"Enter your service key (blank will use default '************'): " |
| 182 | + service_key_input = getpass.getpass(msg) |
| 183 | + if service_key_input.strip(): |
| 184 | + service_key = service_key_input |
| 185 | + else: |
| 186 | + while not service_key_input.strip() and not service_key.strip(): |
| 187 | + print("Service key cannot be blank.") |
| 188 | + service_key_input = getpass.getpass(msg) |
| 189 | + service_key = service_key_input |
| 190 | + |
| 191 | + # Setup authentication headers |
| 192 | + auth_str = f"{username}:{service_key}" |
| 193 | + auth_b64 = base64.b64encode(auth_str.encode()).decode() |
| 194 | + headers = { |
| 195 | + "Authorization": f"Basic {auth_b64}", |
| 196 | + "API-Key": api_key, |
| 197 | + "Accept": "application/json", |
| 198 | + "Content-Type": "application/json" |
| 199 | + } |
| 200 | + |
| 201 | + # Define CSV headers |
| 202 | + csv_headers = [ |
| 203 | + "Vulnerability Name", |
| 204 | + "Vulnerability ID", |
| 205 | + "Rule Name", |
| 206 | + "Severity", |
| 207 | + "Status", |
| 208 | + "First Seen", |
| 209 | + "First Seen Datetime", |
| 210 | + "Last Seen", |
| 211 | + "Last Seen Datetime", |
| 212 | + "Application Name", |
| 213 | + "Application ID", |
| 214 | + "Application Code", |
| 215 | + "Parent Application Name", |
| 216 | + "CWE ID", |
| 217 | + "Bug Tracker ID", |
| 218 | + "Bug Tracker URL" |
| 219 | + ] |
| 220 | + |
| 221 | + # Fetch all vulnerabilities with pagination |
| 222 | + all_vulnerabilities = [] |
| 223 | + offset = 0 |
| 224 | + limit = 100 |
| 225 | + total_count = None |
| 226 | + |
| 227 | + print("Fetching vulnerabilities...") |
| 228 | + |
| 229 | + while True: |
| 230 | + print(f"Fetching vulnerabilities {offset} to {offset + limit}...") |
| 231 | + response = get_vulnerabilities(headers, org_id, offset, limit) |
| 232 | + |
| 233 | + if response.status_code != 200: |
| 234 | + print(f"Error fetching vulnerabilities: {response.status_code}") |
| 235 | + print(f"Response: {response.text}") |
| 236 | + break |
| 237 | + |
| 238 | + data = response.json() |
| 239 | + |
| 240 | + # Get total count on first request |
| 241 | + if total_count is None: |
| 242 | + total_count = data.get("count", 0) |
| 243 | + print(f"Total vulnerabilities to fetch: {total_count}") |
| 244 | + |
| 245 | + items = data.get("items", []) |
| 246 | + if not items: |
| 247 | + break |
| 248 | + |
| 249 | + # Extract vulnerability data |
| 250 | + for idx, item in enumerate(items): |
| 251 | + vuln_id = item.get("vulnerability", {}).get("uuid", "") |
| 252 | + print(f" Processing vulnerability {offset + idx + 1}/{total_count}: {vuln_id}") |
| 253 | + vuln_data = extract_vulnerability_data(item, headers, org_id) |
| 254 | + all_vulnerabilities.append(vuln_data) |
| 255 | + |
| 256 | + # Check if we've fetched all vulnerabilities |
| 257 | + offset += limit |
| 258 | + if offset >= total_count: |
| 259 | + break |
| 260 | + |
| 261 | + # Save to JSON for debugging |
| 262 | + with open("vulnerabilities_output.json", "w") as f: |
| 263 | + json.dump(all_vulnerabilities, f, indent=2) |
| 264 | + print(f"Raw data saved to vulnerabilities_output.json") |
| 265 | + |
| 266 | + # Write to CSV |
| 267 | + csv_filename = "vulnerabilities_export.csv" |
| 268 | + with open(csv_filename, "w", newline='', encoding='utf-8') as csvfile: |
| 269 | + writer = csv.DictWriter(csvfile, fieldnames=csv_headers) |
| 270 | + writer.writeheader() |
| 271 | + writer.writerows(all_vulnerabilities) |
| 272 | + |
| 273 | + print(f"\nExport complete!") |
| 274 | + print(f"Total vulnerabilities exported: {len(all_vulnerabilities)}") |
| 275 | + print(f"CSV file created: {csv_filename}") |
| 276 | + |
| 277 | +if __name__ == "__main__": |
| 278 | + main() |
0 commit comments