-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathscanner.py
More file actions
89 lines (75 loc) · 3.14 KB
/
scanner.py
File metadata and controls
89 lines (75 loc) · 3.14 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# scanner.py
import importlib
import os
from typing import List, NamedTuple
from tqdm import tqdm
import concurrent.futures
import chardet
class ScanResult(NamedTuple):
file_path: str
line_number: int
title: str
message: str
severity: str
class Scanner:
def __init__(self, config):
self.config = config
self.checks = self._load_checks()
def _load_checks(self):
checks = []
for check_name in self.config.get_checks():
module = importlib.import_module(f"checks.{check_name}")
check_class = getattr(module, check_name)
checks.append(check_class())
return checks
def scan(self, path: str, limit: int = 1000000000, num_threads: int = 48) -> List[ScanResult]:
files_to_scan = []
# Collect all files to scan
if os.path.isfile(path):
files_to_scan.append(path)
elif os.path.isdir(path):
for root, _, files in os.walk(path):
for file in files:
if any(file.endswith(ext) for ext in self.config.get_file_extensions()):
files_to_scan.append(os.path.join(root, file))
if len(files_to_scan) >= limit:
break
if len(files_to_scan) >= limit:
break
# Limit the number of files to scan
files_to_scan = files_to_scan[:limit]
# Scan files in parallel with progress bar
with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor:
futures = [executor.submit(self._scan_file, file_path) for file_path in files_to_scan]
results = []
for future in tqdm(concurrent.futures.as_completed(futures), total=len(files_to_scan), desc="Scanning files", unit=" file"):
results.extend(future.result())
return results
def _scan_file(self, file_path: str) -> List[ScanResult]:
results = []
try:
# First, try to detect the file encoding
with open(file_path, 'rb') as f:
raw_data = f.read()
detected_encoding = chardet.detect(raw_data)['encoding']
# Try to read the file with the detected encoding
try:
with open(file_path, 'r', encoding=detected_encoding) as f:
content = f.read()
except UnicodeDecodeError:
# If that fails, try with 'latin-1' encoding, which should read all byte values
with open(file_path, 'r', encoding='latin-1') as f:
content = f.read()
for check in self.checks:
check_results = check.run(content)
for result in check_results:
results.append(ScanResult(
file_path=file_path,
line_number=result.line_number,
title=check.title,
message=result.line_content,
severity=check.severity
))
except Exception as e:
print(f"Error scanning file {file_path}: {str(e)}")
return results