diff --git a/src/scancode/cache_manager.py b/src/scancode/cache_manager.py new file mode 100644 index 0000000000..e98cb1a4ad --- /dev/null +++ b/src/scancode/cache_manager.py @@ -0,0 +1,127 @@ +# SPDX-License-Identifier: Apache-2.0 +# +# Copyright (c) nexB Inc. and others. All rights reserved. +# ScanCode is a trademark of nexB Inc. +# Visit https://aboutcode.org and https://github.com/aboutcode-org/scancode-toolkit/ +# +""" +File-level result caching for faster repeated scans. +""" + +import hashlib +import json +import os +from pathlib import Path + +from commoncode import fileutils +scancode_version = "dev" + +class ResultCache: + """ + Manages cached scan results for files based on content hash. + """ + + def __init__(self, cache_dir=None): + """ + Initialize cache manager. + + Args: + cache_dir: Custom cache directory path. If None, uses default. + """ + if cache_dir: + self.cache_dir = Path(cache_dir) + else: + # Use platform-appropriate cache directory + cache_base = Path.home() / '.cache' / 'scancode' + self.cache_dir = cache_base / 'file_results' + + self.cache_dir.mkdir(parents=True, exist_ok=True) + self.stats = {'hits': 0, 'misses': 0} + + def _get_file_hash(self, file_path, scan_options): + """ + Generate unique hash for file + scan configuration. + + Args: + file_path: Path to file being scanned + scan_options: Dict of enabled scan options (e.g., {'license': True}) + + Returns: + SHA256 hex digest string + """ + hasher = hashlib.sha256() + + # Hash file content + with open(file_path, 'rb') as f: + hasher.update(f.read()) + + # Hash scan configuration to invalidate on option changes + config_str = f"{scancode_version}:{sorted(scan_options.items())}" + hasher.update(config_str.encode('utf-8')) + + return hasher.hexdigest() + + def get_cached_result(self, file_path, scan_options): + """ + Retrieve cached scan result if available. + + Args: + file_path: Path to file + scan_options: Dict of scan options + + Returns: + Dict with scan results or None if not cached + """ + file_hash = self._get_file_hash(file_path, scan_options) + cache_file = self.cache_dir / f"{file_hash}.json" + + if cache_file.exists(): + try: + with open(cache_file, 'r') as f: + self.stats['hits'] += 1 + return json.load(f) + except (json.JSONDecodeError, IOError): + # Corrupted cache, remove it + cache_file.unlink(missing_ok=True) + self.stats['misses'] += 1 + return None + + self.stats['misses'] += 1 + return None + + def store_result(self, file_path, scan_options, result): + """ + Store scan result in cache. + + Args: + file_path: Path to scanned file + scan_options: Dict of scan options used + result: Scan result dict to cache + """ + file_hash = self._get_file_hash(file_path, scan_options) + cache_file = self.cache_dir / f"{file_hash}.json" + + try: + with open(cache_file, 'w') as f: + json.dump(result, f) + except IOError as e: + # Don't fail scan if cache write fails + print(f"Warning: Failed to write cache: {e}") + + def clear_cache(self): + """Remove all cached result files.""" + for file in self.cache_dir.glob("*.json"): + try: + file.unlink() + except Exception: + pass + + def get_stats(self): + """Return cache statistics.""" + total = self.stats['hits'] + self.stats['misses'] + hit_rate = (self.stats['hits'] / total * 100) if total > 0 else 0 + return { + 'hits': self.stats['hits'], + 'misses': self.stats['misses'], + 'hit_rate_percent': round(hit_rate, 1) + } \ No newline at end of file diff --git a/src/scancode/plugin_cache.py b/src/scancode/plugin_cache.py new file mode 100644 index 0000000000..feca1c6cd0 --- /dev/null +++ b/src/scancode/plugin_cache.py @@ -0,0 +1,68 @@ +# SPDX-License-Identifier: Apache-2.0 +# +# Copyright (c) nexB Inc. and others. All rights reserved. +# +""" +CLI plugin to enable result caching. +""" + +import click + +from plugincode.scan import ScanPlugin +from plugincode.scan import scan_impl +from scancode.cache_manager import ResultCache + + +@scan_impl +class CachePlugin(ScanPlugin): + """ + Enable file-level result caching for faster repeated scans. + """ + + options = [ + click.Option( + ['--cache'], + is_flag=True, + default=False, + help='Enable result caching for faster repeated scans.', + ), + click.Option( + ['--cache-dir'], + type=click.Path(exists=False, file_okay=False, dir_okay=True), + metavar='DIR', + help='Custom directory for cache storage. ' + 'Default: ~/.cache/scancode/file_results', + ), + click.Option( + ['--force-reindex'], + is_flag=True, + default=False, + help='Ignore cache and perform full rescan of all files.', + ), + ] + + def is_enabled(self, cache, **kwargs): + return cache + + def setup(self, **kwargs): + """ + Initialize cache manager for the scan. + """ + pass + + +def get_cache_manager(cache, cache_dir, **kwargs): + """ + Factory function to get cache manager instance. + + Args: + cache: Boolean, whether caching is enabled + cache_dir: Custom cache directory path + + Returns: + ResultCache instance or None if caching disabled + """ + if not cache: + return None + + return ResultCache(cache_dir=cache_dir) \ No newline at end of file diff --git a/tests/scancode/test_cache_manager.py b/tests/scancode/test_cache_manager.py new file mode 100644 index 0000000000..c07cd30ce5 --- /dev/null +++ b/tests/scancode/test_cache_manager.py @@ -0,0 +1,115 @@ +# SPDX-License-Identifier: Apache-2.0 +# +# Copyright (c) nexB Inc. and others. All rights reserved. +# +""" +Tests for result caching functionality. +""" + +import json +import tempfile +from pathlib import Path + +import pytest + +from scancode.cache_manager import ResultCache + + +class TestResultCache: + + def test_cache_stores_and_retrieves_results(self): + """Test basic cache store and retrieval.""" + with tempfile.TemporaryDirectory() as tmpdir: + cache = ResultCache(cache_dir=tmpdir) + + # Create a test file + test_file = Path(tmpdir) / 'test.py' + test_file.write_text('# Test file\nprint("hello")') + + scan_options = {'license': True, 'copyright': False} + result = {'licenses': ['MIT'], 'path': str(test_file)} + + # Store result + cache.store_result(test_file, scan_options, result) + + # Retrieve result + cached = cache.get_cached_result(test_file, scan_options) + + assert cached is not None + assert cached['licenses'] == ['MIT'] + assert cache.stats['hits'] == 1 + + def test_cache_miss_on_file_change(self): + """Test cache invalidation when file content changes.""" + with tempfile.TemporaryDirectory() as tmpdir: + cache = ResultCache(cache_dir=tmpdir) + test_file = Path(tmpdir) / 'test.py' + + # First content and scan + test_file.write_text('# Version 1') + scan_options = {'license': True} + result1 = {'version': 1} + cache.store_result(test_file, scan_options, result1) + + # Modify file + test_file.write_text('# Version 2') + + # Should be cache miss + cached = cache.get_cached_result(test_file, scan_options) + assert cached is None + assert cache.stats['misses'] == 1 + + def test_cache_miss_on_scan_options_change(self): + """Test cache invalidation when scan options change.""" + with tempfile.TemporaryDirectory() as tmpdir: + cache = ResultCache(cache_dir=tmpdir) + test_file = Path(tmpdir) / 'test.py' + test_file.write_text('# Test') + + # First scan with license + options1 = {'license': True} + cache.store_result(test_file, options1, {'result': 1}) + + # Second scan with copyright added + options2 = {'license': True, 'copyright': True} + cached = cache.get_cached_result(test_file, options2) + + assert cached is None # Should miss due to different options + + def test_cache_statistics(self): + """Test cache statistics tracking.""" + with tempfile.TemporaryDirectory() as tmpdir: + cache = ResultCache(cache_dir=tmpdir) + test_file = Path(tmpdir) / 'test.py' + test_file.write_text('# Test') + + options = {'license': True} + + # First scan: miss + cache.get_cached_result(test_file, options) + + # Store and retrieve: hit + cache.store_result(test_file, options, {'data': 'test'}) + cache.get_cached_result(test_file, options) + + stats = cache.get_stats() + assert stats['hits'] == 1 + assert stats['misses'] == 1 + assert stats['hit_rate_percent'] == 50.0 + + def test_clear_cache(self): + """Test cache clearing functionality.""" + with tempfile.TemporaryDirectory() as tmpdir: + cache = ResultCache(cache_dir=tmpdir) + test_file = Path(tmpdir) / 'test.py' + test_file.write_text('# Test') + + options = {'license': True} + cache.store_result(test_file, options, {'data': 'test'}) + + # Clear cache + cache.clear_cache() + + # Should be miss after clear + cached = cache.get_cached_result(test_file, options) + assert cached is None \ No newline at end of file