Skip to content

Commit 1e396ce

Browse files
committed
Extract GPS epoch timestamps from RMKN maker note in Ricoh Theta videos
Ricoh Theta cameras write CAMM Type 5 GPS data (lat/lon/alt only, no epoch timestamps). However, the RMKN (Ricoh Maker Note) box in the MP4 udta container includes a GPS IFD with GPSDateStamp and GPSTimeStamp tags—true GPS-derived UTC timestamps recorded at the start of video recording. This change: - Parses the RMKN TIFF/EXIF structure in camm_parser.py to extract the GPS datetime from the GPS IFD (tags 0x001D and 0x0007) - Adds a gps_datetime field to CAMMInfo to carry the extracted timestamp - Enriches CAMM Type 5 points with computed epoch timestamps in CAMMVideoExtractor using the formula: epoch = rmkn_gps_epoch + (point.time - first_point.time) - Converts enriched points to CAMMGPSPoint (Type 6) so downstream consumers receive proper GPS epoch times - Adds unit tests for RMKN parsing (valid, little-endian, missing GPS IFD, truncated, bad magic) and point enrichment logic
1 parent abc0056 commit 1e396ce

3 files changed

Lines changed: 352 additions & 2 deletions

File tree

mapillary_tools/camm/camm_parser.py

Lines changed: 149 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,10 @@
88

99
import abc
1010
import dataclasses
11+
import datetime
1112
import io
1213
import logging
14+
import struct
1315
import typing as T
1416
from enum import Enum
1517

@@ -58,16 +60,22 @@ class CAMMInfo:
5860
magn: list[telemetry.MagnetometerData] | None = None
5961
make: str = ""
6062
model: str = ""
63+
# GPS datetime from RMKN (Ricoh Maker Note) EXIF data, if available.
64+
# This is a true GPS-derived UTC timestamp corresponding to the
65+
# first CAMM Type 5 GPS point in the video.
66+
gps_datetime: datetime.datetime | None = None
6167

6268

6369
def extract_camm_info(fp: T.BinaryIO, telemetry_only: bool = False) -> CAMMInfo | None:
6470
moov = MovieBoxParser.parse_stream(fp)
6571

6672
make, model = "", ""
73+
gps_datetime: datetime.datetime | None = None
6774
if not telemetry_only:
6875
udta_boxdata = moov.extract_udta_boxdata()
6976
if udta_boxdata is not None:
7077
make, model = _extract_camera_make_and_model_from_utda_boxdata(udta_boxdata)
78+
gps_datetime = _extract_gps_datetime_from_udta_boxdata(udta_boxdata)
7179

7280
gps_only_construct = _construct_with_selected_camm_types(
7381
[CAMMType.MIN_GPS, CAMMType.GPS]
@@ -121,7 +129,7 @@ def extract_camm_info(fp: T.BinaryIO, telemetry_only: bool = False) -> CAMMInfo
121129
elif isinstance(measurement, telemetry.CAMMGPSPoint):
122130
gps.append(measurement)
123131

124-
return CAMMInfo(mini_gps=mini_gps, gps=gps, make=make, model=model)
132+
return CAMMInfo(mini_gps=mini_gps, gps=gps, make=make, model=model, gps_datetime=gps_datetime)
125133

126134
return None
127135

@@ -551,6 +559,146 @@ def _parse_quietly(data: bytes, type: bytes) -> bytes:
551559
return parsed["data"]
552560

553561

562+
def _extract_gps_datetime_from_udta_boxdata(
563+
utda_boxdata: dict,
564+
) -> datetime.datetime | None:
565+
"""Extract GPS datetime from the RMKN (Ricoh Maker Note) box in udta."""
566+
for box in utda_boxdata:
567+
if box.type == b"RMKN":
568+
gps_dt = _extract_gps_datetime_from_rmkn(box.data)
569+
if gps_dt is not None:
570+
return gps_dt
571+
return None
572+
573+
574+
def _extract_gps_datetime_from_rmkn(rmkn_data: bytes) -> datetime.datetime | None:
575+
"""Extract GPS datetime from RMKN (Ricoh Maker Note) EXIF data.
576+
577+
The RMKN box contains TIFF/EXIF data with a GPS IFD that includes
578+
GPSDateStamp and GPSTimeStamp tags. These are true GPS-derived UTC
579+
timestamps recorded by the camera at the start of video recording.
580+
581+
Returns a timezone-aware UTC datetime, or None if not available.
582+
"""
583+
if len(rmkn_data) < 8:
584+
return None
585+
586+
# Parse TIFF header
587+
byte_order = rmkn_data[:2]
588+
if byte_order == b"MM":
589+
endian = ">"
590+
elif byte_order == b"II":
591+
endian = "<"
592+
else:
593+
return None
594+
595+
magic = struct.unpack(f"{endian}H", rmkn_data[2:4])[0]
596+
if magic != 42:
597+
return None
598+
599+
ifd0_offset = struct.unpack(f"{endian}I", rmkn_data[4:8])[0]
600+
601+
# Parse IFD0 to find GPS IFD pointer (tag 0x8825)
602+
gps_ifd_offset = _find_ifd_tag_long(rmkn_data, endian, ifd0_offset, 0x8825)
603+
if gps_ifd_offset is None:
604+
return None
605+
606+
# Parse GPS IFD to find GPSDateStamp (0x001D) and GPSTimeStamp (0x0007)
607+
gps_date_str = _read_ifd_ascii_tag(rmkn_data, endian, gps_ifd_offset, 0x001D)
608+
gps_time_rationals = _read_ifd_rational_tag(rmkn_data, endian, gps_ifd_offset, 0x0007, count=3)
609+
610+
if gps_date_str is None or gps_time_rationals is None:
611+
return None
612+
613+
try:
614+
# GPSDateStamp is "YYYY:MM:DD"
615+
date_parts = gps_date_str.strip().split(":")
616+
year, month, day = int(date_parts[0]), int(date_parts[1]), int(date_parts[2])
617+
618+
# GPSTimeStamp is 3 RATIONAL values: hours, minutes, seconds
619+
hour = gps_time_rationals[0][0] // gps_time_rationals[0][1]
620+
minute = gps_time_rationals[1][0] // gps_time_rationals[1][1]
621+
sec_num, sec_den = gps_time_rationals[2]
622+
second = sec_num // sec_den
623+
microsecond = ((sec_num % sec_den) * 1_000_000) // sec_den if sec_den > 0 else 0
624+
625+
return datetime.datetime(
626+
year, month, day, hour, minute, second, microsecond,
627+
tzinfo=datetime.timezone.utc,
628+
)
629+
except (ValueError, IndexError, ZeroDivisionError):
630+
return None
631+
632+
633+
def _find_ifd_tag_long(
634+
data: bytes, endian: str, ifd_offset: int, target_tag: int
635+
) -> int | None:
636+
"""Find a LONG (4-byte) value for a specific tag in a TIFF IFD."""
637+
if ifd_offset + 2 > len(data):
638+
return None
639+
num_entries = struct.unpack(f"{endian}H", data[ifd_offset:ifd_offset + 2])[0]
640+
for i in range(num_entries):
641+
entry_offset = ifd_offset + 2 + i * 12
642+
if entry_offset + 12 > len(data):
643+
break
644+
tag = struct.unpack(f"{endian}H", data[entry_offset:entry_offset + 2])[0]
645+
if tag == target_tag:
646+
value = struct.unpack(f"{endian}I", data[entry_offset + 8:entry_offset + 12])[0]
647+
return value
648+
return None
649+
650+
651+
def _read_ifd_ascii_tag(
652+
data: bytes, endian: str, ifd_offset: int, target_tag: int
653+
) -> str | None:
654+
"""Read an ASCII string tag from a TIFF IFD."""
655+
if ifd_offset + 2 > len(data):
656+
return None
657+
num_entries = struct.unpack(f"{endian}H", data[ifd_offset:ifd_offset + 2])[0]
658+
for i in range(num_entries):
659+
entry_offset = ifd_offset + 2 + i * 12
660+
if entry_offset + 12 > len(data):
661+
break
662+
tag = struct.unpack(f"{endian}H", data[entry_offset:entry_offset + 2])[0]
663+
if tag == target_tag:
664+
count = struct.unpack(f"{endian}I", data[entry_offset + 4:entry_offset + 8])[0]
665+
if count <= 4:
666+
raw = data[entry_offset + 8:entry_offset + 8 + count]
667+
else:
668+
offset = struct.unpack(f"{endian}I", data[entry_offset + 8:entry_offset + 12])[0]
669+
if offset + count > len(data):
670+
return None
671+
raw = data[offset:offset + count]
672+
return raw.rstrip(b"\x00").decode("ascii", errors="replace")
673+
return None
674+
675+
676+
def _read_ifd_rational_tag(
677+
data: bytes, endian: str, ifd_offset: int, target_tag: int, count: int = 1
678+
) -> list[tuple[int, int]] | None:
679+
"""Read RATIONAL values (numerator/denominator pairs) from a TIFF IFD tag."""
680+
if ifd_offset + 2 > len(data):
681+
return None
682+
num_entries = struct.unpack(f"{endian}H", data[ifd_offset:ifd_offset + 2])[0]
683+
for i in range(num_entries):
684+
entry_offset = ifd_offset + 2 + i * 12
685+
if entry_offset + 12 > len(data):
686+
break
687+
tag = struct.unpack(f"{endian}H", data[entry_offset:entry_offset + 2])[0]
688+
if tag == target_tag:
689+
offset = struct.unpack(f"{endian}I", data[entry_offset + 8:entry_offset + 12])[0]
690+
rationals = []
691+
for j in range(count):
692+
rat_offset = offset + j * 8
693+
if rat_offset + 8 > len(data):
694+
return None
695+
num = struct.unpack(f"{endian}I", data[rat_offset:rat_offset + 4])[0]
696+
den = struct.unpack(f"{endian}I", data[rat_offset + 4:rat_offset + 8])[0]
697+
rationals.append((num, den))
698+
return rationals
699+
return None
700+
701+
554702
def _extract_camera_make_and_model_from_utda_boxdata(
555703
utda_boxdata: dict,
556704
) -> tuple[str, str]:

mapillary_tools/geotag/video_extractors/native.py

Lines changed: 65 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
from __future__ import annotations
77

8+
import logging
89
import sys
910
import typing as T
1011
from pathlib import Path
@@ -20,6 +21,8 @@
2021
from ...mp4 import construct_mp4_parser, simple_mp4_parser
2122
from .base import BaseVideoExtractor
2223

24+
LOG = logging.getLogger(__name__)
25+
2326

2427
class GoProVideoExtractor(BaseVideoExtractor):
2528
@override
@@ -69,15 +72,76 @@ def extract(self) -> types.VideoMetadata:
6972
if not camm_info.gps and not camm_info.mini_gps:
7073
raise exceptions.MapillaryGPXEmptyError("Empty GPS data found")
7174

75+
if camm_info.gps:
76+
points: T.List[geo.Point] = T.cast(T.List[geo.Point], camm_info.gps)
77+
elif camm_info.mini_gps and camm_info.gps_datetime:
78+
# Type 5 points have no epoch timestamps, but the RMKN
79+
# maker note contains a GPS-derived UTC timestamp for the
80+
# first point. Use it to assign epoch times to all points.
81+
points = self._enrich_with_gps_datetime(
82+
camm_info.mini_gps, camm_info.gps_datetime
83+
)
84+
else:
85+
points = camm_info.mini_gps
86+
7287
return types.VideoMetadata(
7388
filename=self.video_path,
7489
filesize=utils.get_file_size(self.video_path),
7590
filetype=types.FileType.CAMM,
76-
points=T.cast(T.List[geo.Point], camm_info.gps or camm_info.mini_gps),
91+
points=points,
7792
make=camm_info.make,
7893
model=camm_info.model,
7994
)
8095

96+
@staticmethod
97+
def _enrich_with_gps_datetime(
98+
points: T.List[geo.Point],
99+
gps_datetime: "datetime.datetime",
100+
) -> T.List[geo.Point]:
101+
"""Assign GPS epoch timestamps to Type 5 points using an RMKN reference.
102+
103+
The gps_datetime (from the RMKN maker note) is a GPS-derived UTC
104+
timestamp corresponding to the first CAMM Type 5 GPS point.
105+
Each subsequent point's epoch is computed as:
106+
107+
epoch = gps_epoch + (point.time - first_point.time)
108+
"""
109+
import datetime
110+
111+
if not points:
112+
return points
113+
114+
gps_epoch = gps_datetime.timestamp()
115+
first_time = points[0].time
116+
117+
LOG.info(
118+
"Enriching %d CAMM Type 5 points with GPS epoch from "
119+
"RMKN timestamp %s",
120+
len(points),
121+
gps_datetime.isoformat(),
122+
)
123+
124+
enriched: T.List[geo.Point] = []
125+
for p in points:
126+
enriched.append(
127+
telemetry.CAMMGPSPoint(
128+
time=p.time,
129+
lat=p.lat,
130+
lon=p.lon,
131+
alt=p.alt,
132+
angle=p.angle,
133+
time_gps_epoch=gps_epoch + (p.time - first_time),
134+
gps_fix_type=3 if p.alt is not None else 2,
135+
horizontal_accuracy=0.0,
136+
vertical_accuracy=0.0,
137+
velocity_east=0.0,
138+
velocity_north=0.0,
139+
velocity_up=0.0,
140+
speed_accuracy=0.0,
141+
)
142+
)
143+
return enriched
144+
81145

82146
class BlackVueVideoExtractor(BaseVideoExtractor):
83147
@override

0 commit comments

Comments
 (0)