-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdupefind.py
More file actions
143 lines (121 loc) · 3.83 KB
/
dupefind.py
File metadata and controls
143 lines (121 loc) · 3.83 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
import os
import sys
import hashlib
from collections import defaultdict
from tqdm import tqdm
def get_md5(file_path):
"""
Calculates the MD5 hash of a file using chunked reading.
"""
hash_md5 = hashlib.md5()
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
return hash_md5.hexdigest()
def find_duplicates(directory):
"""
Finds duplicate files in a directory based on size and MD5 hash.
Returns a dictionary: {hash: [file1, file2, ...]} where len(list) > 1.
"""
size_groups = defaultdict(list)
all_files = [
os.path.join(root, file)
for root, _, files in os.walk(directory)
for file in files
]
# Step 1: Group files by size
for file_path in tqdm(all_files, desc="Indexing files", unit="file"):
try:
size = os.path.getsize(file_path)
size_groups[size].append(file_path)
except Exception:
pass
# Step 2: Hash only same-size candidates
hash_groups = defaultdict(list)
candidate_files = [
file
for files in size_groups.values()
if len(files) > 1
for file in files
]
for file_path in tqdm(candidate_files, desc="Hashing candidates", unit="file"):
try:
file_hash = get_md5(file_path)
hash_groups[file_hash].append(file_path)
except Exception:
pass
return {h: paths for h, paths in hash_groups.items() if len(paths) > 1}
def calculate_total_size(file_paths):
"""
Calculates the total size of the given file paths.
"""
return sum(
os.path.getsize(p)
for p in file_paths
if os.path.exists(p)
)
def delete_files(file_paths):
"""
Deletes the given list of file paths.
Returns total bytes deleted.
"""
bytes_deleted = 0
for file_path in file_paths:
try:
size = os.path.getsize(file_path)
os.remove(file_path)
bytes_deleted += size
except Exception:
pass
return bytes_deleted
if __name__ == "__main__":
if len(sys.argv) > 1:
directory_to_scan = sys.argv[1]
else:
directory_to_scan = os.getcwd()
outfile = os.path.join(os.getcwd(), "output.txt")
print(f"Scanning for duplicate files in: {directory_to_scan}")
duplicate_groups = find_duplicates(directory_to_scan)
total_space_saved = 0
try:
with open(outfile, "w", encoding="utf-8") as f:
if duplicate_groups:
f.write("Duplicate File Groups Found:\n\n")
for file_hash, files in duplicate_groups.items():
f.write(f"Hash: {file_hash}\n")
for file in files:
f.write(f" {file}\n")
f.write("\n")
f.write(
f"{sum(len(v) - 1 for v in duplicate_groups.values())} "
"duplicate files found.\n"
)
else:
f.write("No duplicate files found.\n")
# Potential savings
all_duplicates = [
file for files in duplicate_groups.values()
for file in files[1:]
]
total_size = calculate_total_size(all_duplicates)
# Deletion phase with progress bar
delete_queue = []
for files in duplicate_groups.values():
files_sorted = sorted(files, key=os.path.getmtime, reverse=True)
delete_queue.extend(files_sorted[1:])
for file_path in tqdm(delete_queue, desc="Deleting duplicates", unit="file"):
total_space_saved += delete_files([file_path])
# Append cleanup summary
with open(outfile, "a", encoding="utf-8") as f:
f.write("\nCleanup Summary:\n")
f.write(
f"Total space saved: {total_space_saved} bytes "
f"({total_space_saved / (1024 ** 2):.2f} MB)\n"
)
print("Cleanup complete.")
print(
f"Total space saved: {total_space_saved} bytes "
f"({total_space_saved / (1024 ** 2):.2f} MB)"
)
except Exception as e:
print(f"Error: {e}")