|
1 | 1 | import json |
| 2 | +import os |
2 | 3 | import shlex |
3 | 4 | import sqlite3 |
4 | 5 | import sys |
5 | | -from collections import deque # added |
| 6 | +from collections import deque |
6 | 7 | from typing import Any, Dict, List, Optional |
| 8 | +from urllib.parse import urlparse |
7 | 9 |
|
8 | 10 | from mitmproxy import http |
| 11 | +from mitmproxy.io import FlowReader |
9 | 12 |
|
10 | 13 | from .scope import ScopeManager |
11 | 14 | from .utils import get_safe_text |
@@ -285,6 +288,64 @@ def get_by_ids(self, flow_ids: List[str]) -> List[Dict[str, Any]]: |
285 | 288 | ) |
286 | 289 | return results |
287 | 290 |
|
| 291 | + def import_from_file( |
| 292 | + self, |
| 293 | + file_path: str, |
| 294 | + append: bool = False, |
| 295 | + scope: Optional[List[str]] = None, |
| 296 | + ) -> Dict[str, Any]: |
| 297 | + """Import flows from a HAR or mitmproxy flow file. |
| 298 | +
|
| 299 | + Uses mitmproxy's FlowReader which auto-detects format (HAR if JSON, |
| 300 | + native tnetstring otherwise). |
| 301 | +
|
| 302 | + Args: |
| 303 | + file_path: Path to .har or .mitm/.flow file. |
| 304 | + append: If False, clear existing traffic before import. |
| 305 | + scope: Optional list of domains to filter by during import. |
| 306 | +
|
| 307 | + Returns: |
| 308 | + Dict with import stats: {"imported": int, "skipped": int, "errors": int} |
| 309 | + """ |
| 310 | + if not append: |
| 311 | + self.clear() |
| 312 | + |
| 313 | + stats = {"imported": 0, "skipped": 0, "errors": 0} |
| 314 | + |
| 315 | + if not os.path.exists(file_path): |
| 316 | + print(f"File not found: {file_path}", file=sys.stderr) |
| 317 | + return stats |
| 318 | + |
| 319 | + allowed_exts = ('.har', '.mitm', '.flow') |
| 320 | + if not any(str(file_path).lower().endswith(ext) for ext in allowed_exts): |
| 321 | + print(f"Unsupported file extension: {file_path}", file=sys.stderr) |
| 322 | + return stats |
| 323 | + |
| 324 | + with open(file_path, "rb") as f: |
| 325 | + reader = FlowReader(f) |
| 326 | + for flow in reader.stream(): |
| 327 | + try: |
| 328 | + if not isinstance(flow, http.HTTPFlow): |
| 329 | + stats["skipped"] += 1 |
| 330 | + continue |
| 331 | + |
| 332 | + if scope: |
| 333 | + host = urlparse(flow.request.url).hostname or "" |
| 334 | + if not any(host == d or host.endswith("." + d) for d in scope): |
| 335 | + stats["skipped"] += 1 |
| 336 | + continue |
| 337 | + |
| 338 | + self.save_flow(flow) |
| 339 | + stats["imported"] += 1 |
| 340 | + except Exception as e: |
| 341 | + stats["errors"] += 1 |
| 342 | + print( |
| 343 | + f"Skipped flow during import: {e}", |
| 344 | + file=sys.stderr, |
| 345 | + ) |
| 346 | + |
| 347 | + return stats |
| 348 | + |
288 | 349 | def _generate_curl(self, request: SimpleRequest) -> str: |
289 | 350 | try: |
290 | 351 | cmd = ["curl", "-X", request.method] |
|
0 commit comments