-
Notifications
You must be signed in to change notification settings - Fork 85
Expand file tree
/
Copy path_boundary_store.py
More file actions
144 lines (120 loc) · 4.97 KB
/
_boundary_store.py
File metadata and controls
144 lines (120 loc) · 4.97 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
"""Memmap-backed boundary strip storage for dask tile sweeps.
Stores top/bottom/left/right boundary strips for a 2D tile grid in
flat numpy memory-mapped files, avoiding O(N_tiles) in-memory nested
lists that can OOM on very large inputs.
"""
from __future__ import annotations
import os
import shutil
import tempfile
import numpy as np
class BoundaryStore:
"""Disk-backed storage for boundary strips of a tiled 2D grid.
Each of the four sides (top, bottom, left, right) is stored as a
single contiguous memmap file. Strip lookup is O(1) via
precomputed cumulative offsets, and ``get`` returns a zero-copy
memmap view.
Parameters
----------
chunks_y : sequence of int
Tile heights (one per tile row).
chunks_x : sequence of int
Tile widths (one per tile column).
fill_value : float, optional
Initial fill value for all strips (default 0.0).
"""
def __init__(self, chunks_y, chunks_x, fill_value=0.0):
self._tmpdir = tempfile.mkdtemp(prefix='xrs_bdry_')
self._closed = False
self._chunks_y = tuple(chunks_y)
self._chunks_x = tuple(chunks_x)
n_ty = len(chunks_y)
n_tx = len(chunks_x)
total_h = sum(chunks_y)
total_w = sum(chunks_x)
# Cumulative offsets for O(1) strip lookup
self._cum_x = np.zeros(n_tx + 1, dtype=np.int64)
np.cumsum(chunks_x, out=self._cum_x[1:])
self._cum_y = np.zeros(n_ty + 1, dtype=np.int64)
np.cumsum(chunks_y, out=self._cum_y[1:])
# top/bottom: strip length = chunks_x[ix], indexed by (iy, ix)
# shape (n_ty, total_w) — row iy holds all top/bottom strips
# left/right: strip length = chunks_y[iy], indexed by (iy, ix)
# shape (n_tx, total_h) — row ix holds all left/right strips
for name, shape in [('top', (n_ty, total_w)),
('bottom', (n_ty, total_w)),
('left', (n_tx, total_h)),
('right', (n_tx, total_h))]:
path = os.path.join(self._tmpdir, f'{name}.dat')
mm = np.memmap(path, dtype=np.float64, mode='w+', shape=shape)
mm[:] = fill_value
mm.flush()
setattr(self, f'_{name}', mm)
def get(self, side, iy, ix):
"""Return a memmap view of the boundary strip for tile (iy, ix)."""
if side == 'top':
return self._top[iy, self._cum_x[ix]:self._cum_x[ix + 1]]
elif side == 'bottom':
return self._bottom[iy, self._cum_x[ix]:self._cum_x[ix + 1]]
elif side == 'left':
return self._left[ix, self._cum_y[iy]:self._cum_y[iy + 1]]
elif side == 'right':
return self._right[ix, self._cum_y[iy]:self._cum_y[iy + 1]]
else:
raise ValueError(f"Unknown side: {side!r}")
def set(self, side, iy, ix, data):
"""Write *data* into the boundary strip for tile (iy, ix)."""
self.get(side, iy, ix)[:] = data
def close(self):
"""Flush memmaps and remove temporary files."""
if self._closed:
return
self._closed = True
for name in ('top', 'bottom', 'left', 'right'):
mm = getattr(self, f'_{name}', None)
if mm is not None:
del mm
setattr(self, f'_{name}', None)
try:
shutil.rmtree(self._tmpdir)
except OSError:
pass
def __del__(self):
self.close()
def snapshot(self):
"""Return a lightweight in-memory copy and close this store.
The returned ``BoundarySnapshot`` has the same ``.get()``
interface but holds plain numpy arrays instead of memmaps,
so no temp files remain referenced.
"""
snap = BoundarySnapshot(self)
self.close()
return snap
def __enter__(self):
return self
def __exit__(self, *exc):
self.close()
class BoundarySnapshot:
"""Read-only in-memory copy of converged boundary strips.
Created via ``BoundaryStore.snapshot()``; exposes the same
``.get(side, iy, ix)`` interface so callers need no changes.
"""
def __init__(self, store):
self._cum_x = store._cum_x
self._cum_y = store._cum_y
# Copy memmap data to plain numpy arrays
for name in ('top', 'bottom', 'left', 'right'):
src = getattr(store, f'_{name}')
setattr(self, f'_{name}', np.array(src))
def get(self, side, iy, ix):
"""Return a numpy view of the boundary strip for tile (iy, ix)."""
if side == 'top':
return self._top[iy, self._cum_x[ix]:self._cum_x[ix + 1]]
elif side == 'bottom':
return self._bottom[iy, self._cum_x[ix]:self._cum_x[ix + 1]]
elif side == 'left':
return self._left[ix, self._cum_y[iy]:self._cum_y[iy + 1]]
elif side == 'right':
return self._right[ix, self._cum_y[iy]:self._cum_y[iy + 1]]
else:
raise ValueError(f"Unknown side: {side!r}")