forked from MHKiT-Software/MHKiT-Python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbase.py
More file actions
384 lines (327 loc) · 14 KB
/
base.py
File metadata and controls
384 lines (327 loc) · 14 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
import numpy as np
import xarray as xr
import json
import os
import warnings
def _abspath(fname):
return os.path.abspath(os.path.expanduser(fname))
def _get_filetype(fname):
"""
Detects whether the file is a Nortek, Signature (Nortek), or RDI
file by reading the first few bytes of the file.
Returns
=======
None - Doesn't match any known pattern
'signature' - for Nortek signature files
'nortek' - for Nortek (Vec, AWAC) files
'RDI' - for RDI files
'<GIT-LFS pointer> - if the file looks like a GIT-LFS pointer.
"""
with open(fname, "rb") as rdr:
bytes = rdr.read(40)
code = bytes[:2].hex()
if code in ["7f79", "7f7f"]:
return "RDI"
elif code in ["a50a"]:
return "signature"
elif code in ["a505"]:
# AWAC
return "nortek"
elif bytes == b"version https://git-lfs.github.com/spec/":
return "<GIT-LFS pointer>"
else:
return None
def _find_userdata(filename, userdata=True):
# This function finds the file to read
if userdata:
for basefile in [filename.rsplit(".", 1)[0], filename]:
jsonfile = basefile + ".userdata.json"
if os.path.isfile(jsonfile):
return _read_userdata(jsonfile)
elif isinstance(userdata, (str,)) or hasattr(userdata, "read"):
return _read_userdata(userdata)
return {}
def _read_userdata(fname):
"""
Reads a userdata.json file and returns the data it contains as a
dictionary.
"""
with open(fname) as data_file:
data = json.load(data_file)
for nm in ["body2head_rotmat", "body2head_vec"]:
if nm in data:
new_name = "inst" + nm[4:]
warnings.warn(f"{nm} has been deprecated, please change this to {new_name} \
in {fname}.")
data[new_name] = data.pop(nm)
if "inst2head_rotmat" in data:
if data["inst2head_rotmat"] in ["identity", "eye", 1, 1.0]:
data["inst2head_rotmat"] = np.eye(3)
else:
data["inst2head_rotmat"] = np.array(data["inst2head_rotmat"])
if "inst2head_vec" in data and type(data["inst2head_vec"]) != list:
data["inst2head_vec"] = list(data["inst2head_vec"])
return data
def _handle_nan(data):
"""
Finds trailing nan's that cause issues in running the rotation
algorithms and deletes them.
"""
if "time" not in data["coords"]:
raise Exception("No data recorded in file.")
nan = np.zeros(data["coords"]["time"].shape, dtype=bool)
l = data["coords"]["time"].size
if any(np.isnan(data["coords"]["time"])):
nan += np.isnan(data["coords"]["time"])
# Required for motion-correction algorithm
var = ["accel", "angrt", "mag"]
for key in data["data_vars"]:
if any(val in key for val in var):
shp = data["data_vars"][key].shape
if shp[-1] == l:
if len(shp) == 1:
if any(np.isnan(data["data_vars"][key])):
nan += np.isnan(data["data_vars"][key])
elif len(shp) == 2:
if any(np.isnan(data["data_vars"][key][-1])):
nan += np.isnan(data["data_vars"][key][-1])
trailing = np.cumsum(nan)[-1]
if trailing > 0:
data["coords"]["time"] = data["coords"]["time"][:-trailing]
for key in data["data_vars"]:
if data["data_vars"][key].shape[-1] == l:
data["data_vars"][key] = data["data_vars"][key][..., :-trailing]
return data
def _remove_gps_duplicates(dat):
"""
Removes duplicate and NaN timestamp values in the 'time_gps' coordinate
of the dataset and adds a hardware (ADCP DAQ) timestamp corresponding to GPS
acquisition in the 'hdwtime_gps' variable.
Parameters
----------
dat : xarray.Dataset
Dataset containing GPS-related data and timestamps. This dataset is
modified in place.
Returns
-------
xarray.Dataset
The input dataset with duplicates and NaN values removed from the
'time_gps' coordinate. A new variable, 'hdwtime_gps', is added to
indicate the hardware timestamp corresponding to GPS acquisition.
"""
dat["data_vars"]["hdwtime_gps"] = dat["coords"]["time"]
# If the time jumps by nearly 24 hours at any given instance, we've skipped a day
time_diff = np.diff(dat["coords"]["time_gps"])
if any(np.array(list(set(time_diff))) < -(23.9 * 3600)):
idx = np.where(time_diff == time_diff.min())[0]
dat["coords"]["time_gps"][int(idx) + 1 :] += 24 * 3600
# Remove duplicate timestamp values, if applicable
dat["coords"]["time_gps"], idx = np.unique(
dat["coords"]["time_gps"], return_index=True
)
# Remove nan values, if applicable
nan = np.zeros(dat["coords"]["time"].shape, dtype=bool)
if any(np.isnan(dat["coords"]["time_gps"])):
nan = np.isnan(dat["coords"]["time_gps"])
dat["coords"]["time_gps"] = dat["coords"]["time_gps"][~nan]
for key in dat["data_vars"]:
if ("gps" in key) or ("nmea" in key):
dat["data_vars"][key] = dat["data_vars"][key][idx]
if sum(nan) > 0:
dat["data_vars"][key] = dat["data_vars"][key][~nan]
return dat
def _create_dataset(data):
"""
Creates an xarray dataset from dictionary created from binary
readers.
Direction 'dir' coordinates are set in `set_coords`
"""
tag = ["_avg", "_b5", "_echo", "_bt", "_gps", "_altraw", "_altraw_avg", "_sl"]
# If burst velocity not measured
if "vel" not in data["data_vars"]:
# dual profile where burst velocity is not measured but echo sounder is
if "vel_avg" in data["data_vars"]:
data["coords"]["time"] = data["coords"]["time_avg"]
else:
t_vars = [t for t in data["coords"] if "time" in t]
data["coords"]["time"] = data["coords"][t_vars[0]]
ds_dict = {}
for key in data["coords"]:
ds_dict[key] = {"dims": (key), "data": data["coords"][key]}
# Set various coordinate frames
if "n_beams_avg" in data["attrs"]:
beams = data["attrs"]["n_beams_avg"]
else:
beams = data["attrs"]["n_beams"]
n_beams = max(min(beams, 4), 3)
beams = np.arange(1, n_beams + 1, dtype=np.int32)
ds_dict["beam"] = {"dims": ("beam"), "data": beams}
ds_dict["dir"] = {"dims": ("dir"), "data": beams}
data["units"].update({"beam": "1", "dir": "1"})
data["long_name"].update({"beam": "Beam Reference Frame", "dir": "Reference Frame"})
# Iterate through data variables and add them to new dictionary
for key in data["data_vars"]:
# orientation matrices
if "mat" in key:
if "inst" in key: # beam2inst & inst2head orientation matrices
if "x1" not in ds_dict:
ds_dict["x1"] = {"dims": ("x1"), "data": beams}
ds_dict["x2"] = {"dims": ("x2"), "data": beams}
ds_dict[key] = {"dims": ("x1", "x2"), "data": data["data_vars"][key]}
data["units"].update({key: "1"})
data["long_name"].update({key: "Rotation Matrix"})
elif "orientmat" in key: # earth2inst orientation matrix
if any(val in key for val in tag):
tg = "_" + key.rsplit("_")[-1]
else:
tg = ""
ds_dict["earth"] = {"dims": ("earth"), "data": ["E", "N", "U"]}
ds_dict["inst"] = {"dims": ("inst"), "data": ["X", "Y", "Z"]}
ds_dict[key] = {
"dims": ("earth", "inst", "time" + tg),
"data": data["data_vars"][key],
}
data["units"].update(
{"earth": "1", "inst": "1", key: data["units"]["orientmat"]}
)
data["long_name"].update(
{
"earth": "Earth Reference Frame",
"inst": "Instrument Reference Frame",
key: data["long_name"]["orientmat"],
}
)
# quaternion units never change
elif "quaternions" in key:
if any(val in key for val in tag):
tg = "_" + key.rsplit("_")[-1]
else:
tg = ""
if "q" not in ds_dict:
ds_dict["q"] = {"dims": ("q"), "data": ["w", "x", "y", "z"]}
data["units"].update({"q": "1"})
data["long_name"].update({"q": "Quaternion Vector Components"})
ds_dict[key] = {"dims": ("q", "time" + tg), "data": data["data_vars"][key]}
data["units"].update({key: data["units"]["quaternions"]})
data["long_name"].update({key: data["long_name"]["quaternions"]})
else:
shp = data["data_vars"][key].shape
if len(shp) == 1: # 1D variables
if "_altraw_avg" in key:
tg = "_altraw_avg"
elif any(val in key for val in tag):
tg = "_" + key.rsplit("_")[-1]
else:
tg = ""
ds_dict[key] = {"dims": ("time" + tg), "data": data["data_vars"][key]}
elif len(shp) == 2: # 2D variables
if key == "echo":
ds_dict[key] = {
"dims": ("range_echo", "time_echo"),
"data": data["data_vars"][key],
}
elif key == "samp_altraw":
ds_dict[key] = {
"dims": ("n_altraw", "time_altraw"),
"data": data["data_vars"][key],
}
elif key == "samp_altraw_avg":
ds_dict[key] = {
"dims": ("n_altraw_avg", "time_altraw_avg"),
"data": data["data_vars"][key],
}
# ADV/ADCP instrument vector data, bottom tracking
elif shp[0] == n_beams and not any(val in key for val in tag[:3]):
if "bt" in key and "time_bt" in data["coords"]:
tg = "_bt"
else:
tg = ""
if any(
key.rsplit("_")[0] in s
for s in ["amp", "corr", "dist", "prcnt_gd"]
):
dim0 = "beam"
else:
dim0 = "dir"
ds_dict[key] = {
"dims": (dim0, "time" + tg),
"data": data["data_vars"][key],
}
# ADCP IMU data
elif shp[0] == 3:
if not any(val in key for val in tag):
tg = ""
else:
tg = [val for val in tag if val in key]
tg = tg[0]
if "dirIMU" not in ds_dict:
ds_dict["dirIMU"] = {"dims": ("dirIMU"), "data": [1, 2, 3]}
data["units"].update({"dirIMU": "1"})
data["long_name"].update({"dirIMU": "Reference Frame"})
ds_dict[key] = {
"dims": ("dirIMU", "time" + tg),
"data": data["data_vars"][key],
}
elif "b5" in key:
ds_dict[key] = {
"dims": ("range_b5", "time_b5"),
"data": data["data_vars"][key],
}
elif len(shp) == 3: # 3D variables
if "vel" in key:
dim0 = "dir"
else: # amp, corr, prcnt_gd, status
dim0 = "beam"
if not any(val in key for val in tag) or ("_avg" in key):
if "_avg" in key:
tg = "_avg"
else:
tg = ""
ds_dict[key] = {
"dims": (dim0, "range" + tg, "time" + tg),
"data": data["data_vars"][key],
}
elif "b5" in key:
# "vel_b5" sometimes stored as (1, range_b5, time_b5)
ds_dict[key] = {
"dims": ("range_b5", "time_b5"),
"data": data["data_vars"][key].squeeze(),
}
elif "sl" in key:
ds_dict[key] = {
"dims": (dim0, "range_sl", "time"),
"data": data["data_vars"][key],
}
else:
warnings.warn(f"Variable not included in dataset: {key}")
# Create dataset
ds = xr.Dataset.from_dict(ds_dict)
# Assign data array attributes
for key in ds.variables:
for md in ["units", "long_name", "standard_name"]:
if key in data[md]:
ds[key].attrs[md] = data[md][key]
if len(ds[key].shape) > 1:
ds[key].attrs["coverage_content_type"] = "physicalMeasurement"
try: # make sure ones with tags get units
tg = "_" + key.rsplit("_")[-1]
if any(val in key for val in tag):
ds[key].attrs[md] = data[md][key[: -len(tg)]]
except:
pass
# Assign coordinate attributes
for ky in ds.dims:
ds[ky].attrs["coverage_content_type"] = "coordinate"
r_list = [r for r in ds.coords if "range" in r]
for ky in r_list:
ds[ky].attrs["units"] = "m"
ds[ky].attrs["long_name"] = "Profile " + ky.capitalize().replace("_", " ")
ds[ky].attrs["description"] = "Distance to the center of each depth bin"
time_list = [t for t in ds.coords if "time" in t]
for ky in time_list:
ds[ky].attrs["units"] = "seconds since 1970-01-01 00:00:00"
ds[ky].attrs["long_name"] = ky.capitalize().replace("_", " ")
ds[ky].attrs["standard_name"] = "time"
# Set dataset metadata
ds.attrs = data["attrs"]
return ds