Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 127 additions & 0 deletions src/scancode/cache_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
# SPDX-License-Identifier: Apache-2.0
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# ScanCode is a trademark of nexB Inc.
# Visit https://aboutcode.org and https://github.com/aboutcode-org/scancode-toolkit/
#
"""
File-level result caching for faster repeated scans.
"""

import hashlib
import json
import os
from pathlib import Path

from commoncode import fileutils
scancode_version = "dev"

class ResultCache:
"""
Manages cached scan results for files based on content hash.
"""

def __init__(self, cache_dir=None):
"""
Initialize cache manager.

Args:
cache_dir: Custom cache directory path. If None, uses default.
"""
if cache_dir:
self.cache_dir = Path(cache_dir)
else:
# Use platform-appropriate cache directory
cache_base = Path.home() / '.cache' / 'scancode'
self.cache_dir = cache_base / 'file_results'

self.cache_dir.mkdir(parents=True, exist_ok=True)
self.stats = {'hits': 0, 'misses': 0}

def _get_file_hash(self, file_path, scan_options):
"""
Generate unique hash for file + scan configuration.

Args:
file_path: Path to file being scanned
scan_options: Dict of enabled scan options (e.g., {'license': True})

Returns:
SHA256 hex digest string
"""
hasher = hashlib.sha256()

# Hash file content
with open(file_path, 'rb') as f:
hasher.update(f.read())

# Hash scan configuration to invalidate on option changes
config_str = f"{scancode_version}:{sorted(scan_options.items())}"
hasher.update(config_str.encode('utf-8'))

return hasher.hexdigest()

def get_cached_result(self, file_path, scan_options):
"""
Retrieve cached scan result if available.

Args:
file_path: Path to file
scan_options: Dict of scan options

Returns:
Dict with scan results or None if not cached
"""
file_hash = self._get_file_hash(file_path, scan_options)
cache_file = self.cache_dir / f"{file_hash}.json"

if cache_file.exists():
try:
with open(cache_file, 'r') as f:
self.stats['hits'] += 1
return json.load(f)
except (json.JSONDecodeError, IOError):
# Corrupted cache, remove it
cache_file.unlink(missing_ok=True)
self.stats['misses'] += 1
return None

self.stats['misses'] += 1
return None

def store_result(self, file_path, scan_options, result):
"""
Store scan result in cache.

Args:
file_path: Path to scanned file
scan_options: Dict of scan options used
result: Scan result dict to cache
"""
file_hash = self._get_file_hash(file_path, scan_options)
cache_file = self.cache_dir / f"{file_hash}.json"

try:
with open(cache_file, 'w') as f:
json.dump(result, f)
except IOError as e:
# Don't fail scan if cache write fails
print(f"Warning: Failed to write cache: {e}")

def clear_cache(self):
"""Remove all cached result files."""
for file in self.cache_dir.glob("*.json"):
try:
file.unlink()
except Exception:
pass

def get_stats(self):
"""Return cache statistics."""
total = self.stats['hits'] + self.stats['misses']
hit_rate = (self.stats['hits'] / total * 100) if total > 0 else 0
return {
'hits': self.stats['hits'],
'misses': self.stats['misses'],
'hit_rate_percent': round(hit_rate, 1)
}
68 changes: 68 additions & 0 deletions src/scancode/plugin_cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
# SPDX-License-Identifier: Apache-2.0
#
# Copyright (c) nexB Inc. and others. All rights reserved.
#
"""
CLI plugin to enable result caching.
"""

import click

from plugincode.scan import ScanPlugin
from plugincode.scan import scan_impl
from scancode.cache_manager import ResultCache


@scan_impl
class CachePlugin(ScanPlugin):
"""
Enable file-level result caching for faster repeated scans.
"""

options = [
click.Option(
['--cache'],
is_flag=True,
default=False,
help='Enable result caching for faster repeated scans.',
),
click.Option(
['--cache-dir'],
type=click.Path(exists=False, file_okay=False, dir_okay=True),
metavar='DIR',
help='Custom directory for cache storage. '
'Default: ~/.cache/scancode/file_results',
),
click.Option(
['--force-reindex'],
is_flag=True,
default=False,
help='Ignore cache and perform full rescan of all files.',
),
]

def is_enabled(self, cache, **kwargs):
return cache

def setup(self, **kwargs):
"""
Initialize cache manager for the scan.
"""
pass


def get_cache_manager(cache, cache_dir, **kwargs):
"""
Factory function to get cache manager instance.
Args:
cache: Boolean, whether caching is enabled
cache_dir: Custom cache directory path
Returns:
ResultCache instance or None if caching disabled
"""
if not cache:
return None

return ResultCache(cache_dir=cache_dir)
115 changes: 115 additions & 0 deletions tests/scancode/test_cache_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
# SPDX-License-Identifier: Apache-2.0
#
# Copyright (c) nexB Inc. and others. All rights reserved.
#
"""
Tests for result caching functionality.
"""

import json
import tempfile
from pathlib import Path

import pytest

from scancode.cache_manager import ResultCache


class TestResultCache:

def test_cache_stores_and_retrieves_results(self):
"""Test basic cache store and retrieval."""
with tempfile.TemporaryDirectory() as tmpdir:
cache = ResultCache(cache_dir=tmpdir)

# Create a test file
test_file = Path(tmpdir) / 'test.py'
test_file.write_text('# Test file\nprint("hello")')

scan_options = {'license': True, 'copyright': False}
result = {'licenses': ['MIT'], 'path': str(test_file)}

# Store result
cache.store_result(test_file, scan_options, result)

# Retrieve result
cached = cache.get_cached_result(test_file, scan_options)

assert cached is not None
assert cached['licenses'] == ['MIT']
assert cache.stats['hits'] == 1

def test_cache_miss_on_file_change(self):
"""Test cache invalidation when file content changes."""
with tempfile.TemporaryDirectory() as tmpdir:
cache = ResultCache(cache_dir=tmpdir)
test_file = Path(tmpdir) / 'test.py'

# First content and scan
test_file.write_text('# Version 1')
scan_options = {'license': True}
result1 = {'version': 1}
cache.store_result(test_file, scan_options, result1)

# Modify file
test_file.write_text('# Version 2')

# Should be cache miss
cached = cache.get_cached_result(test_file, scan_options)
assert cached is None
assert cache.stats['misses'] == 1

def test_cache_miss_on_scan_options_change(self):
"""Test cache invalidation when scan options change."""
with tempfile.TemporaryDirectory() as tmpdir:
cache = ResultCache(cache_dir=tmpdir)
test_file = Path(tmpdir) / 'test.py'
test_file.write_text('# Test')

# First scan with license
options1 = {'license': True}
cache.store_result(test_file, options1, {'result': 1})

# Second scan with copyright added
options2 = {'license': True, 'copyright': True}
cached = cache.get_cached_result(test_file, options2)

assert cached is None # Should miss due to different options

def test_cache_statistics(self):
"""Test cache statistics tracking."""
with tempfile.TemporaryDirectory() as tmpdir:
cache = ResultCache(cache_dir=tmpdir)
test_file = Path(tmpdir) / 'test.py'
test_file.write_text('# Test')

options = {'license': True}

# First scan: miss
cache.get_cached_result(test_file, options)

# Store and retrieve: hit
cache.store_result(test_file, options, {'data': 'test'})
cache.get_cached_result(test_file, options)

stats = cache.get_stats()
assert stats['hits'] == 1
assert stats['misses'] == 1
assert stats['hit_rate_percent'] == 50.0

def test_clear_cache(self):
"""Test cache clearing functionality."""
with tempfile.TemporaryDirectory() as tmpdir:
cache = ResultCache(cache_dir=tmpdir)
test_file = Path(tmpdir) / 'test.py'
test_file.write_text('# Test')

options = {'license': True}
cache.store_result(test_file, options, {'data': 'test'})

# Clear cache
cache.clear_cache()

# Should be miss after clear
cached = cache.get_cached_result(test_file, options)
assert cached is None