forked from paulvangentcom/python_corona_simulation
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmotion.py
More file actions
357 lines (269 loc) · 13.6 KB
/
motion.py
File metadata and controls
357 lines (269 loc) · 13.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
'''
file that contains all function related to population mobility
and related computations
'''
import numpy as np
def update_positions(population):
'''update positions of all people
Uses heading and speed to update all positions for
the next time step
Keyword arguments
-----------------
population : ndarray
the array containing all the population information
'''
#update positions
#x
population[:,1] = population[:,1] + (population[:,3] * population[:,5])
#y
population[:,2] = population[:,2] + (population [:,4] * population[:,5])
return population
def out_of_bounds(population, xbounds, ybounds):
'''checks which people are about to go out of bounds and corrects
Function that updates headings of individuals that are about to
go outside of the world boundaries.
Keyword arguments
-----------------
population : ndarray
the array containing all the population information
xbounds, ybounds : list or tuple
contains the lower and upper bounds of the world [min, max]
'''
#update headings and positions where out of bounds
#update x heading
#determine number of elements that need to be updated
shp = population[:,3][(population[:,1] <= xbounds[:,0]) &
(population[:,3] < 0)].shape
population[:,3][(population[:,1] <= xbounds[:,0]) &
(population[:,3] < 0)] = np.clip(np.random.normal(loc = 0.5,
scale = 0.5/3,
size = shp),
a_min = 0.05, a_max = 1)
shp = population[:,3][(population[:,1] >= xbounds[:,1]) &
(population[:,3] > 0)].shape
population[:,3][(population[:,1] >= xbounds[:,1]) &
(population[:,3] > 0)] = np.clip(-np.random.normal(loc = 0.5,
scale = 0.5/3,
size = shp),
a_min = -1, a_max = -0.05)
#update y heading
shp = population[:,4][(population[:,2] <= ybounds[:,0]) &
(population[:,4] < 0)].shape
population[:,4][(population[:,2] <= ybounds[:,0]) &
(population[:,4] < 0)] = np.clip(np.random.normal(loc = 0.5,
scale = 0.5/3,
size = shp),
a_min = 0.05, a_max = 1)
shp = population[:,4][(population[:,2] >= ybounds[:,1]) &
(population[:,4] > 0)].shape
population[:,4][(population[:,2] >= ybounds[:,1]) &
(population[:,4] > 0)] = np.clip(-np.random.normal(loc = 0.5,
scale = 0.5/3,
size = shp),
a_min = -1, a_max = -0.05)
return population
def update_randoms(population, pop_size, heading_update_chance=0.02,
speed_update_chance=0.02, heading_multiplication=1,
speed_multiplication=1, speed=0.01):
'''updates random states such as heading and speed
Function that randomized the headings and speeds for population members
with settable odds.
Keyword arguments
-----------------
population : ndarray
the array containing all the population information
pop_size : int
the size of the population
heading_update_chance : float
the odds of updating the heading of each member, each time step
speed_update_chance : float
the oodds of updating the speed of each member, each time step
heading_multiplication : int or float
factor to multiply heading with (default headings are between -1 and 1)
speed_multiplication : int or float
factor to multiply speed with (default speeds are between 0.0001 and 0.05
speed : int or float
mean speed of population members, speeds will be taken from gaussian distribution
with mean 'speed' and sd 'speed / 3'
'''
#randomly update heading
#x
update = np.random.random(size=(pop_size,))
shp = update[update <= heading_update_chance].shape
population[:,3][update <= heading_update_chance] = np.random.normal(loc = 0,
scale = 1/3,
size = shp) * heading_multiplication
#y
update = np.random.random(size=(pop_size,))
shp = update[update <= heading_update_chance].shape
population[:,4][update <= heading_update_chance] = np.random.normal(loc = 0,
scale = 1/3,
size = shp) * heading_multiplication
#randomize speeds
update = np.random.random(size=(pop_size,))
shp = update[update <= heading_update_chance].shape
population[:,5][update <= heading_update_chance] = np.random.normal(loc = speed,
scale = speed / 3,
size = shp) * speed_multiplication
population[:,5] = np.clip(population[:,5], a_min=0.0001, a_max=0.05)
return population
#########################
##### PATH PLANNING #####
#########################
def set_destination(population, destinations):
'''sets destination of population
Sets the destination of population if destination marker is not 0.
Updates headings and speeds as well.
Keyword arguments
-----------------
population : ndarray
the array containing all the population information
destinations : ndarray
the array containing all destinations information
'''
#how many destinations are active
active_dests = np.unique(population[:,11][population[:,11] != 0])
#set destination
for d in active_dests:
dest_x = destinations[:,int((d - 1) * 2)]
dest_y = destinations[:,int(((d - 1) * 2) + 1)]
#compute new headings
head_x = dest_x - population[:,1]
head_y = dest_y - population[:,2]
#head_x = head_x / np.sqrt(head_x)
#head_y = head_y / np.sqrt(head_y)
#reinsert headings into population of those not at destination yet
population[:,3][(population[:,11] == d) &
(population[:,12] == 0)] = head_x[(population[:,11] == d) &
(population[:,12] == 0)]
population[:,4][(population[:,11] == d) &
(population[:,12] == 0)] = head_y[(population[:,11] == d) &
(population[:,12] == 0)]
#set speed to 0.01
population[:,5][(population[:,11] == d) &
(population[:,12] == 0)] = 0.02
return population
def check_at_destination(population, destinations, wander_factor=1.5):
'''check who is at their destination already
Takes subset of population with active destination and
tests who is at the required coordinates. Updates at destination
column for people at destination.
Keyword arguments
-----------------
population : ndarray
the array containing all the population information
destinations : ndarray
the array containing all destinations information
wander_factor : int or float
defines how far outside of 'wander range' the destination reached
is triggered
'''
#how many destinations are active
active_dests = np.unique(population[:,11][(population[:,11] != 0)])
#see who is at destination
for d in active_dests:
dest_x = destinations[:,int((d - 1) * 2)]
dest_y = destinations[:,int(((d - 1) * 2) + 1)]
#see who arrived at destination and filter out who already was there
at_dest = population[(np.abs(population[:,1] - dest_x) < (population[:,13] * wander_factor)) &
(np.abs(population[:,2] - dest_y) < (population[:,14] * wander_factor)) &
(population[:,12] == 0)]
if len(at_dest) > 0:
#mark those as arrived
at_dest[:,12] = 1
#insert random headings and speeds for those at destination
at_dest = update_randoms(at_dest, len(at_dest), 1, 1)
#at_dest[:,5] = 0.001
#reinsert into population
population[(np.abs(population[:,1] - dest_x) < (population[:,13] * wander_factor)) &
(np.abs(population[:,2] - dest_y) < (population[:,14] * wander_factor)) &
(population[:,12] == 0)] = at_dest
return population
def keep_at_destination(population, destinations, wander_factor=1):
'''keeps those who have arrived, within wander range
Function that keeps those who have been marked as arrived at their
destination within their respective wander ranges
Keyword arguments
-----------------
population : ndarray
the array containing all the population information
destinations : ndarray
the array containing all destinations information
wander_factor : int or float
defines how far outside of 'wander range' the destination reached
is triggered
'''
#how many destinations are active
active_dests = np.unique(population[:,11][(population[:,11] != 0) &
(population[:,12] == 1)])
for d in active_dests:
dest_x = destinations[:,int((d - 1) * 2)][(population[:,12] == 1) &
(population[:,11] == d)]
dest_y = destinations[:,int(((d - 1) * 2) + 1)][(population[:,12] == 1) &
(population[:,11] == d)]
#see who is marked as arrived
arrived = population[(population[:,12] == 1) &
(population[:,11] == d)]
ids = np.int32(arrived[:,0]) # find unique IDs of arrived persons
#check if there are those out of bounds
#replace x oob
#where x larger than destination + wander, AND heading wrong way, set heading negative
shp = arrived[:,3][arrived[:,1] > (dest_x + (arrived[:,13] * wander_factor))].shape
arrived[:,3][arrived[:,1] > (dest_x + (arrived[:,13] * wander_factor))] = -np.random.normal(loc = 0.5,
scale = 0.5 / 3,
size = shp)
#where x smaller than destination - wander, set heading positive
shp = arrived[:,3][arrived[:,1] < (dest_x - (arrived[:,13] * wander_factor))].shape
arrived[:,3][arrived[:,1] < (dest_x - (arrived[:,13] * wander_factor))] = np.random.normal(loc = 0.5,
scale = 0.5 / 3,
size = shp)
#where y larger than destination + wander, set heading negative
shp = arrived[:,4][arrived[:,2] > (dest_y + (arrived[:,14] * wander_factor))].shape
arrived[:,4][arrived[:,2] > (dest_y + (arrived[:,14] * wander_factor))] = -np.random.normal(loc = 0.5,
scale = 0.5 / 3,
size = shp)
#where y smaller than destination - wander, set heading positive
shp = arrived[:,4][arrived[:,2] < (dest_y - (arrived[:,14] * wander_factor))].shape
arrived[:,4][arrived[:,2] < (dest_y - (arrived[:,14] * wander_factor))] = np.random.normal(loc = 0.5,
scale = 0.5 / 3,
size = shp)
#slow speed
arrived[:,5] = np.random.normal(loc = 0.005,
scale = 0.005 / 3,
size = arrived[:,5].shape)
#reinsert into population
population[(population[:,12] == 1) &
(population[:,11] == d)] = arrived
return population
def reset_destinations(population, ids=[]):
'''clears destination markers
Function that clears all active destination markers from the population
Keyword arguments
-----------------
population : ndarray
the array containing all the population information
ids : ndarray or list
array containing the id's of the population members that need their
destinations reset
'''
if len(ids) == 0:
#if ids empty, reset everyone
population[:,11] = 0
else:
pass
#else, reset id's
pass
def get_motion_parameters(xmin, ymin, xmax, ymax):
'''gets destination center and wander ranges
Function that returns geometric parameters of the destination
that the population members have set.
Keyword arguments:
------------------
xmin, ymin, xmax, ymax : int or float
lower and upper bounds of the destination area set.
'''
x_center = xmin + ((xmax - xmin) / 2)
y_center = ymin + ((ymax - ymin) / 2)
x_wander = (xmax - xmin) / 2
y_wander = (ymax - ymin) / 2
return x_center, y_center, x_wander, y_wander