This repository was archived by the owner on May 13, 2025. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathsimulation.py
More file actions
266 lines (240 loc) · 10.6 KB
/
simulation.py
File metadata and controls
266 lines (240 loc) · 10.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
import sys
import logging
from typing import Optional, Tuple, List
import numpy as np
import pyforefire as forefire
from forefire_helper import *
import pdb
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
logger = logging.getLogger()
class ForeFireSimulation:
"""
Generic class for a ForeFire simulation.
"""
def __init__(
self,
propagation_model: str,
fuels_table: callable,
spatial_increment: Optional[float] = None,
minimal_propagative_front_depth: Optional[float] = None,
perimeter_resolution: Optional[float] = None,
relax: Optional[float] = None,
min_speed: Optional[float] = None,
burned_map_layer: Optional[int] = None,
):
"""
Initialize the ForeFire simulation.
Args:
propagation_model (str): name of the Rate of Spread model (e.g. Rothermel, RothermelAndrews2018).
domain (tuple): (SWx, SWy, Lx, Ly) where (SWx, SWy) are the coordinates of the top-left corner
of the domain, Lx is the domain width and Ly is the domain height.
fuels_table (callable): a callable reading a fuels table
(e.g. RothermelAndrews2018FuelTable from src/forefire_helper.py)
spatial_increment (float): distance (in meters) between new and old node position
minimal_propagative_front_depth (float): resolution (in meters) of the arrival time matrix
perimiter_resolution (float): maximal distance (in meters) between two nodes
relax (float): weights the new and old ROS (ROS = relax x new_ROS + (1 - relax) x old_ROS)
min_speed (float): minimal speed (m/s), i.e. ROS = max(ROS, min_speed)
max_speed (float): maximal speed (m/S), i.e. ROS = min(ROS, max_speed)
"""
# Initialize pyforefire module
ff = forefire.ForeFire()
ff["propagationModel"] = propagation_model
# ff["SWx"], ff["SWy"], ff["Lx"], ff["Ly"] = domain
ff["fuelsTable"] = fuels_table()
if spatial_increment:
assert spatial_increment > 0, 'spatial_increment must be strictly positive'
ff["spatialIncrement"] = spatial_increment
if minimal_propagative_front_depth:
assert minimal_propagative_front_depth > 0, 'minimal_propagative_front_depth must be strictly positive'
ff["minimalPropagativeFrontDepth"] = minimal_propagative_front_depth
if perimeter_resolution:
assert perimeter_resolution > 0, 'perimeter_resolution must be strictly positive'
ff["perimeterResolution"] = perimeter_resolution
if relax:
assert 0 <= relax <= 1, 'relax must be in [0, 1]'
ff["relax"] = relax
if min_speed:
ff["minSpeed"] = min_speed
if burned_map_layer:
ff["bmapLayer"] = burned_map_layer
self.ff = ff
def __call__(
self,
nb_steps: int,
step_size: float
):
"""
Run the ForeFire simulation.
Args:
nb_step (int): number of step the simulation will execute
step_size (float): duration (in seconds) between each step
"""
pathes = []
bournawt=[]
times=[]
for i in range(1, nb_steps+1):
try:
# Advance timestep by step_size
logger.info("goTo[t=%f]" % (i*step_size))
self.ff.execute("goTo[t=%f]" % (i*step_size))
# Get pathes from previous execution
newPathes = printToPathe(self.ff.execute("print[]"))
pathes += newPathes
bmap=self.ff.getDoubleArray("BMap")
burnrbool = np.logical_not(np.isinf(bmap))
# Count the True values in non_inf array
burnr = np.sum(burnrbool*np.power(float(self.ff["minimalPropagativeFrontDepth"]),2))
bournawt.append(burnr)
times.append(i*step_size)
except KeyboardInterrupt:
# Keyboard interrupt in case simulation take a while and we want to show current state of simulation
break
return pathes
class UniformForeFireSimulation(ForeFireSimulation):
"""
Class for a ForeFire simulation with uniform wind, uniform fuel (only one fuel type)
and uniform slope.
"""
def __init__(
self,
propagation_model: str,
domain: Tuple[int],
fuels_table: callable,
horizontal_wind: float,
vertical_wind: float,
fuel_type: float,
slope: float,
fire_front: List[List[float]],
nn_ros_model_path: Optional[str] = None,
spatial_increment: Optional[float] = None,
minimal_propagative_front_depth: Optional[float] = None,
perimeter_resolution: Optional[float] = None,
relax: Optional[float] = None,
min_speed: Optional[float] = None,
burned_map_layer: Optional[int] = None,
data_resolution: float = 1
):
super().__init__(
propagation_model,
fuels_table,
spatial_increment,
minimal_propagative_front_depth,
perimeter_resolution,
relax,
min_speed,
burned_map_layer,
)
# Instantiate domain
domain_height, domain_width = domain[-1], domain[-2]
self.ff["SWx"], self.ff["SWy"], self.ff["Lx"], self.ff["Ly"] = domain
domain_string = \
f'FireDomain[sw=({self.ff["SWx"]},{self.ff["SWy"]},0);ne=({self.ff["SWx"] + self.ff["Lx"]},{self.ff["SWy"] + self.ff["Ly"]},0);t=0]'
logger.info(domain_string)
self.ff.execute(domain_string)
# Propagation model layer
if nn_ros_model_path:
self.ff["FFANNPropagationModelPath"] = nn_ros_model_path
self.ff.addLayer(
"propagation",
self.ff["propagationModel"],
"propagationModel")
logger.info(f'ROS model: {propagation_model}')
# Wind layers
self.ff["windU"] = horizontal_wind
self.ff["windV"] = vertical_wind
self.ff.addLayer("data","windU","windU")
self.ff.addLayer("data","windV","windV")
logger.info(f'Uniform wind conditions: horizontal wind: {horizontal_wind} m/s | vertical wind: {vertical_wind} m/s')
# Fuel layer
self.fuel_map = fuel_type * np.ones((1, 1, domain_height, domain_width))
self.ff.addIndexLayer(
"table", "fuel",
float(self.ff["SWx"]), float(self.ff["SWy"]), 0, domain_width, domain_height, 0, self.fuel_map)
logger.info(f'Fuel map types: {list(np.unique(self.fuel_map))}')
# Altitude layer
slope = slope * math.pi / 180
self.altitude_map = np.ones_like(self.fuel_map)
assert domain_width % data_resolution == 0, "domain_width must be divisible by data_resolution"
self.altitude_map[:, :, :] = np.linspace(0, domain_width, domain_width // data_resolution) * math.tan(slope)
self.ff.addScalarLayer("table", "altitude", 0, 0, 0, domain_width, domain_height, 0, self.altitude_map)
# Instantiate fire front (front orentation is clockwise!!)
self.ff.execute(f"\tFireFront[id=2;domain=0;t=0]")
for (xp, yp) in fire_front:
self.ff.execute(f"\t\tFireNode[domain=0;loc=({xp},{yp},0);vel=(0,0,0);t=0;state=init;frontId=2]")
logger.info(f'Initial fire front: {fire_front}')
class UniformWindForeFireSimulation(ForeFireSimulation):
"""
Class for a ForeFire simulation with uniform wind, uniform fuel (only one fuel type)
and uniform slope.
"""
def __init__(
self,
propagation_model: str,
fuels_table: callable,
horizontal_wind: float,
vertical_wind: float,
fuel_map: np.ndarray,
altitude_map: np.ndarray,
fire_front: List[List[float]],
nn_ros_model_path: Optional[str] = None,
spatial_increment: Optional[float] = None,
minimal_propagative_front_depth: Optional[float] = None,
perimeter_resolution: Optional[float] = None,
relax: Optional[float] = None,
min_speed: Optional[float] = None,
burned_map_layer: Optional[int] = None,
):
super().__init__(
propagation_model,
# domain,
fuels_table,
spatial_increment,
minimal_propagative_front_depth,
perimeter_resolution,
relax,
min_speed,
burned_map_layer,
)
# Instantiate domain
domain_height, domain_width = fuel_map.shape
self.ff["SWx"] = 0
self.ff["SWy"] = 0
self.ff["Lx"] = domain_width
self.ff["Ly"] = domain_height
domain_string = \
f'FireDomain[sw=({self.ff["SWx"]},{self.ff["SWy"]},0);ne=({self.ff["SWx"] + self.ff["Lx"]},{self.ff["SWy"] + self.ff["Ly"]},0);t=0]'
logger.info(domain_string)
self.ff.execute(domain_string)
# Propagation model layer
if nn_ros_model_path:
self.ff["FFANNPropagationModelPath"] = nn_ros_model_path
self.ff.addLayer(
"propagation",
self.ff["propagationModel"],
"propagationModel")
logger.info(f'ROS model: {propagation_model}')
# Wind layers
self.ff["windU"] = horizontal_wind
self.ff["windV"] = vertical_wind
self.ff.addLayer("data","windU","windU")
self.ff.addLayer("data","windV","windV")
logger.info(f'Uniform wind conditions: horizontal wind: {horizontal_wind} m/s | vertical wind: {vertical_wind} m/s')
# Fuel layer
self.fuel_map = fuel_map.reshape(1, 1, domain_height, domain_width)
self.ff.addIndexLayer(
"table",
"fuel", float(self.ff["SWx"]), float(self.ff["SWy"]), 0, float(self.ff["Lx"]), float(self.ff["Ly"]), 0,
self.fuel_map)
logger.info(f'Fuel map types: {list(np.unique(self.fuel_map))}')
# Altitude layer
self.altitude_map = altitude_map.reshape(1, 1, domain_height, domain_width)
self.ff.addIndexLayer(
"data",
"altitude", float(self.ff["SWx"]), float(self.ff["SWy"]), 0, float(self.ff["Lx"]), float(self.ff["Ly"]), 0,
self.altitude_map)
# Instantiate fire front (front orentation is clockwise!!)
self.ff.execute(f"\tFireFront[id=2;domain=0;t=0]")
for (xp, yp) in fire_front:
self.ff.execute(f"\t\tFireNode[domain=0;loc=({xp},{yp},0);vel=(0,0,0);t=0;state=init;frontId=2]")
logger.info(f'Initial fire front: {fire_front}')