-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathvoid_multi_process.py
More file actions
106 lines (85 loc) · 3.82 KB
/
void_multi_process.py
File metadata and controls
106 lines (85 loc) · 3.82 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
import numpy as np
from ase.io import read, write
from ase import Atoms
from scipy.spatial import cKDTree
from sklearn.cluster import DBSCAN
import math
import concurrent.futures
import argparse
def compute_void_radius(atoms, spacing, cutoff, min_points):
cell = atoms.get_cell()
positions = atoms.get_positions()
shifts = np.array([[i,j,k] for i in (-1,0,1) for j in (-1,0,1) for k in (-1,0,1)])
all_pos = (positions[:,None,:] + shifts[None,:,:].dot(cell)).reshape(-1,3)
tree = cKDTree(all_pos)
lengths = np.linalg.norm(cell, axis=1)
n_pts = np.maximum(np.ceil(lengths / spacing).astype(int), 2)
grid_axes = [np.linspace(0, 1, n, endpoint=False) for n in n_pts]
fx, fy, fz = np.meshgrid(*grid_axes, indexing='ij')
grid_frac = np.stack([fx.ravel(), fy.ravel(), fz.ravel()], axis=1)
grid_cart = grid_frac.dot(cell)
dists, _ = tree.query(grid_cart, k=1)
void_mask = dists > cutoff
void_pts = grid_cart[void_mask]
if void_pts.size == 0:
return 0.0, void_pts, cell
eps = spacing * 1.2
labels = DBSCAN(eps=eps, min_samples=1).fit_predict(void_pts)
voxel_vol = spacing ** 3
volumes = []
keep_pts = []
for lbl in np.unique(labels):
pts = void_pts[labels == lbl]
if pts.shape[0] < min_points:
continue
volumes.append(pts.shape[0] * voxel_vol)
keep_pts.append(pts)
total_vol = sum(volumes)
void_all = np.vstack(keep_pts) if keep_pts else np.empty((0, 3))
# Convert volume to equivalent sphere radius
r_eq = (3 * total_vol / (4 * math.pi)) ** (1 / 3) if total_vol > 0 else 0.0
return r_eq, void_all, cell
def compute_density(atoms):
mass_u = atoms.get_masses().sum()
volume_A3 = atoms.get_volume()
mass_g = mass_u * 1.66054e-24
volume_cm3 = volume_A3 * 1e-24
return mass_g / volume_cm3 if volume_cm3 > 0 else 0.0
def export_xyz(pts, cell, filename, symbol='Ar'):
if pts.size == 0:
return
atoms = Atoms(symbols=[symbol] * len(pts), positions=pts, cell=cell, pbc=True)
write(filename, atoms, format='xyz')
def parse_args():
p = argparse.ArgumentParser()
p.add_argument('input', help='Input file: EXTXYZ or LAMMPS dump')
p.add_argument('--spacing', type=float, default=0.29, help='Grid spacing in Å')
p.add_argument('--cutoff', type=float, default=2.9, help='Minimum distance to atoms')
p.add_argument('--min-points', type=int, default=5, help='Min points per void cluster')
p.add_argument('--out', default='voids.xyz', help='XYZ output filename')
p.add_argument('--symbol', default='Ar', help='Symbol used to export void atoms')
p.add_argument('--threads', type=int, default=None, help='Max threads for parallel processing')
return p.parse_args()
def main():
args = parse_args()
try:
traj = read(args.input, index=':')
except Exception:
traj = [read(args.input)]
r0, pts0, cell0 = compute_void_radius(traj[0], args.spacing, args.cutoff, args.min_points)
density = compute_density(traj[0])
print(f"Density of first structure: {density:.3f} g/cm³")
export_xyz(pts0, cell0, args.out, args.symbol)
with concurrent.futures.ThreadPoolExecutor(max_workers=args.threads) as pool:
futures = [pool.submit(compute_void_radius, at, args.spacing, args.cutoff, args.min_points)
for at in traj]
results = [f.result()[0] for f in concurrent.futures.as_completed(futures)]
radii = np.array(results)
mean_r = radii.mean() if radii.size > 0 else 0.0
std_r = radii.std(ddof=1) if radii.size > 1 else 0.0
print(f"Frames processed: {len(radii)}")
print(f"Average eq. radius: {mean_r:.3f} Å")
print(f"Standard deviation: {std_r:.3f} Å")
print(f"Void atoms exported to: {args.out}")
if __name__ == '__main__':
main()