-
Notifications
You must be signed in to change notification settings - Fork 143
Expand file tree
/
Copy pathexiftool_read_video.py
More file actions
474 lines (408 loc) · 15.9 KB
/
exiftool_read_video.py
File metadata and controls
474 lines (408 loc) · 15.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
# Copyright (c) Meta Platforms, Inc. and affiliates.
#
# This source code is licensed under the BSD license found in the
# LICENSE file in the root directory of this source tree.
from __future__ import annotations
import dataclasses
import functools
import logging
import typing as T
import xml.etree.ElementTree as ET
from . import exif_read, exiftool_read, geo
from .telemetry import GPSFix, GPSPoint
MAX_TRACK_ID = 10
EXIFTOOL_NAMESPACES: dict[str, str] = {
"Keys": "http://ns.exiftool.org/QuickTime/Keys/1.0/",
"IFD0": "http://ns.exiftool.org/EXIF/IFD0/1.0/",
"QuickTime": "http://ns.exiftool.org/QuickTime/QuickTime/1.0/",
"UserData": "http://ns.exiftool.org/QuickTime/UserData/1.0/",
"Insta360": "http://ns.exiftool.org/Trailer/Insta360/1.0/",
"GoPro": "http://ns.exiftool.org/QuickTime/GoPro/1.0/",
**{
f"Track{track_id}": f"http://ns.exiftool.org/QuickTime/Track{track_id}/1.0/"
for track_id in range(1, MAX_TRACK_ID + 1)
},
}
LOG = logging.getLogger(__name__)
_FIELD_TYPE = T.TypeVar("_FIELD_TYPE", int, float, str, T.List[str])
expand_tag = functools.partial(exiftool_read.expand_tag, namespaces=EXIFTOOL_NAMESPACES)
def _maybe_float(text: str | None) -> float | None:
if text is None:
return None
try:
return float(text)
except (ValueError, TypeError):
return None
def _index_text_by_tag(elements: T.Iterable[ET.Element]) -> dict[str, list[str]]:
texts_by_tag: dict[str, list[str]] = {}
for element in elements:
tag = element.tag
if element.text is not None:
texts_by_tag.setdefault(tag, []).append(element.text)
return texts_by_tag
def _extract_alternative_fields(
texts_by_tag: dict[str, list[str]],
fields: T.Sequence[str],
field_type: T.Type[_FIELD_TYPE],
) -> _FIELD_TYPE | None:
for field in fields:
values = texts_by_tag.get(expand_tag(field))
if values is None:
continue
if field_type is int:
value = values[0]
try:
return T.cast(_FIELD_TYPE, int(value))
except (ValueError, TypeError):
pass
elif field_type is float:
value = values[0]
try:
return T.cast(_FIELD_TYPE, float(value))
except (ValueError, TypeError):
pass
elif field_type is str:
value = values[0]
try:
return T.cast(_FIELD_TYPE, str(value))
except (ValueError, TypeError):
pass
elif field_type is list:
return T.cast(_FIELD_TYPE, values)
else:
raise ValueError(f"Invalid field type {field_type}")
return None
def _same_gps_point(left: GPSPoint, right: GPSPoint) -> bool:
"""
>>> left = GPSPoint(time=56.0, lat=36.741385, lon=29.021274, alt=141.6, angle=1.54, epoch_time=None, fix=None, precision=None, ground_speed=None)
>>> right = GPSPoint(time=56.0, lat=36.741385, lon=29.021274, alt=142.4, angle=1.54, epoch_time=None, fix=None, precision=None, ground_speed=None)
>>> _same_gps_point(left, right)
True
"""
return (
left.time == right.time
and left.lon == right.lon
and left.lat == right.lat
and left.epoch_time == right.epoch_time
and left.angle == right.angle
)
def _deduplicate_gps_points(
track: list[GPSPoint], same_gps_point: T.Callable[[GPSPoint, GPSPoint], bool]
) -> list[GPSPoint]:
deduplicated_track: list[GPSPoint] = []
for point in track:
if not deduplicated_track or not same_gps_point(deduplicated_track[-1], point):
deduplicated_track.append(point)
return deduplicated_track
def _aggregate_gps_track(
texts_by_tag: dict[str, list[str]],
time_tag: str | None,
lon_tag: str,
lat_tag: str,
alt_tag: str | None = None,
gps_time_tag: str | None = None,
direction_tag: str | None = None,
ground_speed_tag: str | None = None,
) -> list[GPSPoint]:
"""
Aggregate all GPS data by the tags.
It requires lat, lon to be present, and their lengths must match.
Some cameras store time information in the SimpleTime tag (and each simple has multiple GPS data points),
therefore the time_tag is optional. If it is None, then all returned points will have time = 0.0.
"""
# aggregate coordinates (required)
lons = [
_maybe_float(lon)
for lon in _extract_alternative_fields(texts_by_tag, [lon_tag], list) or []
]
lats = [
_maybe_float(lat)
for lat in _extract_alternative_fields(texts_by_tag, [lat_tag], list) or []
]
if len(lons) != len(lats):
# no idea what to do if we have different number of lons and lats
LOG.warning(
"Found different number of longitudes %d and latitudes %d",
len(lons),
len(lats),
)
return []
expected_length = len(lats)
# aggregate timestamps (optional)
if time_tag is not None:
dts = [
exif_read.parse_gps_datetime(text)
for text in _extract_alternative_fields(texts_by_tag, [time_tag], list)
or []
]
timestamps = [geo.as_unix_time(dt) if dt is not None else None for dt in dts]
if expected_length != len(timestamps):
# no idea what to do if we have different number of timestamps and coordinates
LOG.warning(
"Found different number of timestamps %d and coordinates %d",
len(timestamps),
expected_length,
)
return []
else:
timestamps = [0.0] * expected_length
assert len(timestamps) == expected_length
def _aggregate_float_values_same_length(
tag: str | None,
) -> list[float | None]:
if tag is not None:
vals = [
_maybe_float(val)
for val in _extract_alternative_fields(texts_by_tag, [tag], list) or []
]
else:
vals = []
while len(vals) < expected_length:
vals.append(None)
return vals
# aggregate altitudes (optional)
alts = _aggregate_float_values_same_length(alt_tag)
# aggregate directions (optional)
directions = _aggregate_float_values_same_length(direction_tag)
# aggregate speeds (optional)
ground_speeds = _aggregate_float_values_same_length(ground_speed_tag)
# GPS timestamp (optional)
epoch_time = None
if gps_time_tag is not None:
gps_time_text = _extract_alternative_fields(texts_by_tag, [gps_time_tag], str)
if gps_time_text is not None:
dt = exif_read.parse_gps_datetime(gps_time_text)
if dt is not None:
epoch_time = geo.as_unix_time(dt)
# build track
track: list[GPSPoint] = []
for timestamp, lon, lat, alt, direction, ground_speed in zip(
timestamps,
lons,
lats,
alts,
directions,
ground_speeds,
):
if timestamp is None or lon is None or lat is None:
continue
point = GPSPoint(
time=timestamp,
lon=lon,
lat=lat,
alt=alt,
angle=direction,
epoch_time=epoch_time,
fix=None,
precision=None,
ground_speed=ground_speed,
)
if not track or not _same_gps_point(track[-1], point):
track.append(point)
track.sort(key=lambda point: point.time)
track = _deduplicate_gps_points(track, same_gps_point=_same_gps_point)
if time_tag is not None:
if track:
first_time = track[0].time
for point in track:
point.time = point.time - first_time
return track
def _aggregate_samples(
elements: T.Iterable[ET.Element],
sample_time_tag: str,
sample_duration_tag: str,
) -> T.Generator[tuple[float, float, list[ET.Element]], None, None]:
expanded_sample_time_tag = expand_tag(sample_time_tag)
expanded_sample_duration_tag = expand_tag(sample_duration_tag)
accumulated_elements: list[ET.Element] = []
sample_time = None
sample_duration = None
for element in elements:
if element.tag == expanded_sample_time_tag:
if sample_time is not None and sample_duration is not None:
yield (sample_time, sample_duration, accumulated_elements)
accumulated_elements = []
sample_time = _maybe_float(element.text)
elif element.tag == expanded_sample_duration_tag:
sample_duration = _maybe_float(element.text)
else:
accumulated_elements.append(element)
if sample_time is not None and sample_duration is not None:
yield (sample_time, sample_duration, accumulated_elements)
def _aggregate_gps_track_by_sample_time(
sample_iterator: T.Iterable[tuple[float, float, list[ET.Element]]],
lon_tag: str,
lat_tag: str,
alt_tag: str | None = None,
gps_time_tag: str | None = None,
direction_tag: str | None = None,
ground_speed_tag: str | None = None,
gps_fix_tag: str | None = None,
gps_precision_tag: str | None = None,
) -> list[GPSPoint]:
track: list[GPSPoint] = []
expanded_gps_fix_tag = None
if gps_fix_tag is not None:
expanded_gps_fix_tag = expand_tag(gps_fix_tag)
expanded_gps_precision_tag = None
if gps_precision_tag is not None:
expanded_gps_precision_tag = expand_tag(gps_precision_tag)
for sample_time, sample_duration, elements in sample_iterator:
texts_by_tag = _index_text_by_tag(elements)
gps_fix = None
if expanded_gps_fix_tag is not None:
gps_fix_texts = texts_by_tag.get(expanded_gps_fix_tag)
if gps_fix_texts:
try:
gps_fix = GPSFix(int(gps_fix_texts[0]))
except ValueError:
gps_fix = None
gps_precision = None
if expanded_gps_precision_tag is not None:
gps_precision_texts = texts_by_tag.get(expanded_gps_precision_tag)
if gps_precision_texts:
gps_precision = _maybe_float(gps_precision_texts[0])
if gps_precision is not None:
# GPS precision in ExifTool (i.e. horizontal positioning error) are in meters.
# https://exiftool.org/forum/index.php?topic=11565.0
# Here we multiply by 100 to be compatible with the GPSP
# described in https://github.com/gopro/gpmf-parser
gps_precision = gps_precision * 100
# Aggregate GPS points in the sample
points = _aggregate_gps_track(
texts_by_tag,
time_tag=None,
lon_tag=lon_tag,
lat_tag=lat_tag,
alt_tag=alt_tag,
direction_tag=direction_tag,
ground_speed_tag=ground_speed_tag,
)
if points:
avg_timedelta = sample_duration / len(points)
for idx, point in enumerate(points):
point.time = sample_time + idx * avg_timedelta
track.extend(
dataclasses.replace(point, fix=gps_fix, precision=gps_precision)
for point in points
)
track.sort(key=lambda point: point.time)
return track
class ExifToolReadVideo:
def __init__(
self,
etree: ET.ElementTree,
) -> None:
self.etree = etree
root = self.etree.getroot()
if root is None:
raise ValueError("ElementTree root is None")
self._texts_by_tag = _index_text_by_tag(root)
self._all_tags = set(self._texts_by_tag.keys())
def extract_gps_track(self) -> list[geo.Point]:
# blackvue and many other cameras
track_with_fix = self._extract_gps_track_from_quicktime()
if track_with_fix:
return T.cast(T.List[geo.Point], track_with_fix)
# insta360 has its own tag
track_with_fix = self._extract_gps_track_from_quicktime(namespace="Insta360")
if track_with_fix:
return T.cast(T.List[geo.Point], track_with_fix)
# mostly for gopro
track_with_fix = self._extract_gps_track_from_track()
if track_with_fix:
return T.cast(T.List[geo.Point], track_with_fix)
return []
def _extract_make_and_model(self) -> tuple[str | None, str | None]:
make = self._extract_alternative_fields(["GoPro:Make"], str)
model = self._extract_alternative_fields(["GoPro:Model"], str)
if model is not None:
if make is None:
make = "GoPro"
make = make.strip()
model = model.strip()
return make, model
make = self._extract_alternative_fields(["Insta360:Make"], str)
model = self._extract_alternative_fields(["Insta360:Model"], str)
if model is not None:
if make is None:
make = "Insta360"
make = make.strip()
model = model.strip()
return make, model
make = self._extract_alternative_fields(
["IFD0:Make", "UserData:Make", "Keys:Make"], str
)
model = self._extract_alternative_fields(
["IFD0:Model", "UserData:Model", "Keys:Model"], str
)
if make is not None:
make = make.strip()
if model is not None:
model = model.strip()
return make, model
def extract_make(self) -> str | None:
make, _ = self._extract_make_and_model()
return make
def extract_model(self) -> str | None:
_, model = self._extract_make_and_model()
return model
def _extract_gps_track_from_track(self) -> list[GPSPoint]:
root = self.etree.getroot()
if root is None:
raise ValueError("ElementTree root is None")
for track_id in range(1, MAX_TRACK_ID + 1):
track_ns = f"Track{track_id}"
if self._all_tags_exists(
{
expand_tag(f"{track_ns}:SampleTime"),
expand_tag(f"{track_ns}:SampleDuration"),
expand_tag(f"{track_ns}:GPSLongitude"),
expand_tag(f"{track_ns}:GPSLatitude"),
}
):
sample_iterator = _aggregate_samples(
root,
f"{track_ns}:SampleTime",
f"{track_ns}:SampleDuration",
)
track = _aggregate_gps_track_by_sample_time(
sample_iterator,
lon_tag=f"{track_ns}:GPSLongitude",
lat_tag=f"{track_ns}:GPSLatitude",
alt_tag=f"{track_ns}:GPSAltitude",
direction_tag=f"{track_ns}:GPSTrack",
ground_speed_tag=f"{track_ns}:GPSSpeed",
gps_fix_tag=f"{track_ns}:GPSMeasureMode",
gps_precision_tag=f"{track_ns}:GPSHPositioningError",
)
if track:
return track
return []
def _extract_alternative_fields(
self,
fields: T.Sequence[str],
field_type: T.Type[_FIELD_TYPE],
) -> _FIELD_TYPE | None:
return _extract_alternative_fields(self._texts_by_tag, fields, field_type)
def _all_tags_exists(self, tags: set[str]) -> bool:
return self._all_tags.issuperset(tags)
def _extract_gps_track_from_quicktime(
self, namespace: str = "QuickTime"
) -> list[GPSPoint]:
if not self._all_tags_exists(
{
expand_tag(f"{namespace}:GPSDateTime"),
expand_tag(f"{namespace}:GPSLongitude"),
expand_tag(f"{namespace}:GPSLatitude"),
}
):
return []
return _aggregate_gps_track(
self._texts_by_tag,
time_tag=f"{namespace}:GPSDateTime",
lon_tag=f"{namespace}:GPSLongitude",
lat_tag=f"{namespace}:GPSLatitude",
alt_tag=f"{namespace}:GPSAltitude",
direction_tag=f"{namespace}:GPSTrack",
)