From 8bfc9d5b45c5d9f1cd3acdab7beb540176c1db6a Mon Sep 17 00:00:00 2001 From: Li Baoming <1508269885@qq.com> Date: Tue, 24 Mar 2026 18:09:50 +0800 Subject: [PATCH 1/2] feat: add test output watcher deamon --- db/__init__.py | 34 ++++-- db/watcher.py | 287 +++++++++++++++++++++++++++++++++++++++++++++ docs/quickstart.md | 52 ++++++++ 3 files changed, 360 insertions(+), 13 deletions(-) create mode 100644 db/watcher.py diff --git a/db/__init__.py b/db/__init__.py index 8a05b13..54d2c01 100644 --- a/db/__init__.py +++ b/db/__init__.py @@ -3,26 +3,34 @@ InfiniMetrics MongoDB integration module. This module provides: -- MongoDB connection management -- Test result repository -- Dispatcher summary repository -- Data import from JSON/CSV files +- MongoDB connection management (client.py) +- Database configuration (config.py) +- Test result repository (repository.py) +- Data import from JSON/CSV files (importer.py) +- File watcher for auto-import (watcher.py) Usage: - from db import MongoDBClient, DatabaseConfig, TestRunRepository + from db import MongoDBClient, DataImporter # Connect to MongoDB client = MongoDBClient() - db = client.get_database() + repo = TestRunRepository(client.get_collection("test_runs")) - # Get repositories - test_runs = TestRunRepository(client.get_collection("test_runs")) - summaries = DispatcherSummaryRepository(client.get_collection("dispatcher_summaries")) - - # Import data - from db import DataImporter - importer = DataImporter(test_runs) + # Import existing results + importer = DataImporter(repo) importer.import_directory(Path("./output")) + + # Or use the watcher for auto-import + from db.watcher import Watcher + watcher = Watcher(output_dir=Path("./output")) + watcher.run_forever() + +CLI Usage: + # Start watching for new files + python -m db.watcher + + # One-time scan + python -m db.watcher --scan """ # Conditionally import MongoDB-dependent modules diff --git a/db/watcher.py b/db/watcher.py new file mode 100644 index 0000000..780ce09 --- /dev/null +++ b/db/watcher.py @@ -0,0 +1,287 @@ +#!/usr/bin/env python3 +""" +File watcher for auto-importing test results to MongoDB. + +Usage: + # Start watching for new files + python -m db.watcher + + # One-time scan (import existing files) + python -m db.watcher --scan +""" + +import argparse +import json +import logging +import signal +import sys +import time +from pathlib import Path +from typing import Dict, List, Set + +from watchdog.observers import Observer +from watchdog.events import FileSystemEventHandler + +from .client import MongoDBClient +from .config import DatabaseConfig +from .importer import DataImporter +from .repository import DispatcherSummaryRepository, TestRunRepository +from .utils import is_dispatcher_summary_file, should_skip_file + +logger = logging.getLogger(__name__) + + +class Watcher: + """Watch directories and auto-import test results to MongoDB. + + - output_dir: imports test result files to test_runs collection + - summary_dir: stores summary metadata to dispatcher_summaries collection + """ + + def __init__( + self, + output_dir: Path = None, + summary_dir: Path = None, + mongo_uri: str = None, + ): + self.output_dir = Path(output_dir) if output_dir else Path("./output") + self.summary_dir = Path(summary_dir) if summary_dir else Path("./summary_output") + + # Initialize MongoDB connection + config = DatabaseConfig.from_env() + if mongo_uri: + config.mongo_uri = mongo_uri + + client = MongoDBClient(config) + if not client.health_check(): + raise ConnectionError(f"Cannot connect to MongoDB: {config.mongo_uri}") + + # Repositories + self.test_run_repo = TestRunRepository( + client.get_collection(config.collection_name) + ) + self.summary_repo = DispatcherSummaryRepository( + client.get_collection(config.summary_collection_name) + ) + self.importer = DataImporter(self.test_run_repo) + + self._observer = None + self._processed: Set[str] = set() + + def scan(self) -> Dict[str, Dict[str, List[str]]]: + """ + One-time scan and import all existing files. + + Returns: + { + "output": {"imported": [...], "skipped": [...], "failed": [...]}, + "summary": {"imported": [...], "skipped": [...], "failed": [...]} + } + """ + result = { + "output": {"imported": [], "skipped": [], "failed": []}, + "summary": {"imported": [], "skipped": [], "failed": []}, + } + + # Import test results from output directory + if self.output_dir.exists(): + logger.info(f"Scanning output: {self.output_dir}") + output_result = self.importer.import_directory( + self.output_dir, recursive=True, include_summaries=False + ) + result["output"]["imported"].extend(output_result.get("imported", [])) + result["output"]["skipped"].extend(output_result.get("skipped", [])) + result["output"]["failed"].extend(output_result.get("failed", [])) + logger.info( + f"Output: {len(result['output']['imported'])} imported, " + f"{len(result['output']['skipped'])} skipped, " + f"{len(result['output']['failed'])} failed" + ) + + # Store summary metadata (not re-import test results) + if self.summary_dir.exists(): + logger.info(f"Scanning summaries: {self.summary_dir}") + for summary_file in sorted(self.summary_dir.glob("dispatcher_summary_*.json")): + summary_result = self._import_summary_metadata(summary_file) + result["summary"]["imported"].extend(summary_result.get("imported", [])) + result["summary"]["skipped"].extend(summary_result.get("skipped", [])) + result["summary"]["failed"].extend(summary_result.get("failed", [])) + logger.info( + f"Summary: {len(result['summary']['imported'])} imported, " + f"{len(result['summary']['skipped'])} skipped, " + f"{len(result['summary']['failed'])} failed" + ) + + return result + + def _import_summary_metadata(self, summary_path: Path) -> Dict[str, List[str]]: + """ + Import summary metadata to dispatcher_summaries collection. + + This stores the summary info (which tests ran, success/failure) + without re-importing the actual test results. + """ + result = {"imported": [], "skipped": [], "failed": []} + + try: + with open(summary_path, "r", encoding="utf-8") as f: + summary_data = json.load(f) + + timestamp = summary_data.get("timestamp") + if not timestamp: + logger.warning(f"No timestamp in summary: {summary_path}") + result["failed"].append(str(summary_path)) + return result + + # Check if already exists + if self.summary_repo.exists(timestamp): + logger.debug(f"Summary already exists: {timestamp}") + result["skipped"].append(timestamp) + return result + + # Add source file info + summary_data.setdefault("_metadata", {}) + summary_data["_metadata"]["source_file"] = summary_path.name + + # Insert summary metadata + self.summary_repo.upsert(summary_data) + result["imported"].append(timestamp) + logger.info(f"Imported summary: {timestamp}") + + except json.JSONDecodeError as e: + logger.error(f"Invalid JSON in {summary_path}: {e}") + result["failed"].append(str(summary_path)) + except Exception as e: + logger.error(f"Failed to import summary {summary_path}: {e}") + result["failed"].append(str(summary_path)) + + return result + + def start(self): + """Start watching for new files.""" + logger.info("Performing initial scan...") + self.scan() + + # Setup file event handler + handler = _Handler(self, self._processed) + + self._observer = Observer() + if self.output_dir.exists(): + self._observer.schedule(handler, str(self.output_dir), recursive=True) + logger.info(f"Watching: {self.output_dir}") + if self.summary_dir.exists(): + self._observer.schedule(handler, str(self.summary_dir), recursive=True) + logger.info(f"Watching: {self.summary_dir}") + + self._observer.start() + logger.info("Watcher started. Press Ctrl+C to stop.") + + def stop(self): + """Stop watching.""" + if self._observer: + self._observer.stop() + self._observer.join() + logger.info("Watcher stopped") + + def run_forever(self): + """Start watcher and block until interrupted.""" + self.start() + try: + while self._observer and self._observer.is_alive(): + time.sleep(1) + except KeyboardInterrupt: + logger.info("Interrupted by user") + finally: + self.stop() + + +class _Handler(FileSystemEventHandler): + """Internal handler for file system events.""" + + def __init__(self, watcher: Watcher, processed: Set[str]): + self.watcher = watcher + self.processed = processed + + def on_created(self, event): + if event.is_directory: + return + self._process_file(Path(event.src_path)) + + def on_modified(self, event): + if event.is_directory: + return + path = Path(event.src_path) + if str(path) not in self.processed: + self._process_file(path) + + def _process_file(self, path: Path): + if path.suffix != ".json": + return + + path_str = str(path) + if path_str in self.processed: + return + + # Wait for file to be fully written + time.sleep(0.5) + + if is_dispatcher_summary_file(path): + logger.info(f"New summary: {path.name}") + result = self.watcher._import_summary_metadata(path) + if result["imported"]: + self.processed.add(path_str) + logger.info(f"Imported summary metadata") + elif not should_skip_file(path): + logger.info(f"New test result: {path.name}") + run_id = self.watcher.importer.import_test_result(path) + if run_id: + self.processed.add(path_str) + logger.info(f"Imported: {run_id}") + + +def main(): + parser = argparse.ArgumentParser( + description="Watch for test results and import to MongoDB" + ) + parser.add_argument("--scan", action="store_true", help="One-time scan only") + parser.add_argument("--output-dir", default="./output", help="Output directory") + parser.add_argument("--summary-dir", default="./summary_output", help="Summary directory") + parser.add_argument("--mongo-uri", help="MongoDB connection URI") + parser.add_argument("-v", "--verbose", action="store_true", help="Verbose output") + args = parser.parse_args() + + logging.basicConfig( + level=logging.DEBUG if args.verbose else logging.INFO, + format="%(asctime)s - %(levelname)s - %(message)s", + datefmt="%H:%M:%S", + ) + + try: + watcher = Watcher( + output_dir=args.output_dir, + summary_dir=args.summary_dir, + mongo_uri=args.mongo_uri, + ) + + if args.scan: + result = watcher.scan() + print("\n=== Output Files (test_runs) ===") + print(f"Imported: {len(result['output']['imported'])}") + print(f"Skipped: {len(result['output']['skipped'])}") + print(f"Failed: {len(result['output']['failed'])}") + print("\n=== Summary Files (dispatcher_summaries) ===") + print(f"Imported: {len(result['summary']['imported'])}") + print(f"Skipped: {len(result['summary']['skipped'])}") + print(f"Failed: {len(result['summary']['failed'])}") + else: + watcher.run_forever() + + except ConnectionError as e: + print(f"Error: {e}") + sys.exit(1) + except KeyboardInterrupt: + pass + + +if __name__ == "__main__": + main() diff --git a/docs/quickstart.md b/docs/quickstart.md index b09810b..8b4cb7c 100644 --- a/docs/quickstart.md +++ b/docs/quickstart.md @@ -97,6 +97,58 @@ Results are saved in: } ``` +## MongoDB Integration (Optional) + +InfiniMetrics supports storing test results in MongoDB for persistent storage and dashboard visualization. + +### Prerequisites + +```bash +# Install MongoDB dependencies +pip install pymongo watchdog + +# Ensure MongoDB is running locally or set connection URL +export MONGO_URI="mongodb://localhost:27017" +``` + +### CLI Usage + +```bash +# Start file watcher (auto-import new results, runs forever) +python -m db.watcher --output-dir ./output --summary-dir ./summary_output + +# One-time scan only (import existing files and exit) +python -m db.watcher --scan +``` + +### Python API + +```python +from pathlib import Path +from db.watcher import Watcher + +# Create watcher and start monitoring +watcher = Watcher( + output_dir=Path("./output"), + summary_dir=Path("./summary_output") +) + +# One-time scan +result = watcher.scan() +print(f"Imported: {len(result['imported'])}") + +# Or run forever (auto-import new files) +watcher.run_forever() # Blocks until Ctrl+C +``` + +### Environment Variables + +| Variable | Default | Description | +|----------|---------|-------------| +| `MONGO_URI` | `mongodb://localhost:27017` | MongoDB connection URL | +| `MONGO_DB_NAME` | `infinimetrics` | Database name | +| `MONGO_COLLECTION` | `test_runs` | Test results collection | + ## Next Steps - **Configure Tests**: See [Configuration Guide](./configuration.md) for customization From 5e77d00715a1dfaae98f4a9e2573dcec77f8ec3e Mon Sep 17 00:00:00 2001 From: Li Baoming <1508269885@qq.com> Date: Tue, 24 Mar 2026 18:11:46 +0800 Subject: [PATCH 2/2] Reformat code --- db/watcher.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/db/watcher.py b/db/watcher.py index 780ce09..260b2be 100644 --- a/db/watcher.py +++ b/db/watcher.py @@ -45,7 +45,9 @@ def __init__( mongo_uri: str = None, ): self.output_dir = Path(output_dir) if output_dir else Path("./output") - self.summary_dir = Path(summary_dir) if summary_dir else Path("./summary_output") + self.summary_dir = ( + Path(summary_dir) if summary_dir else Path("./summary_output") + ) # Initialize MongoDB connection config = DatabaseConfig.from_env() @@ -101,7 +103,9 @@ def scan(self) -> Dict[str, Dict[str, List[str]]]: # Store summary metadata (not re-import test results) if self.summary_dir.exists(): logger.info(f"Scanning summaries: {self.summary_dir}") - for summary_file in sorted(self.summary_dir.glob("dispatcher_summary_*.json")): + for summary_file in sorted( + self.summary_dir.glob("dispatcher_summary_*.json") + ): summary_result = self._import_summary_metadata(summary_file) result["summary"]["imported"].extend(summary_result.get("imported", [])) result["summary"]["skipped"].extend(summary_result.get("skipped", [])) @@ -245,7 +249,9 @@ def main(): ) parser.add_argument("--scan", action="store_true", help="One-time scan only") parser.add_argument("--output-dir", default="./output", help="Output directory") - parser.add_argument("--summary-dir", default="./summary_output", help="Summary directory") + parser.add_argument( + "--summary-dir", default="./summary_output", help="Summary directory" + ) parser.add_argument("--mongo-uri", help="MongoDB connection URI") parser.add_argument("-v", "--verbose", action="store_true", help="Verbose output") args = parser.parse_args()