Skip to content

Commit 9defa56

Browse files
authored
Merge pull request #817 from mapillary/reduce-ccn
Reduce cyclomatic complexity in GPS parser functions
2 parents e17462f + 709a157 commit 9defa56

3 files changed

Lines changed: 219 additions & 159 deletions

File tree

mapillary_tools/blackvue_parser.py

Lines changed: 71 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,50 @@ def _parse_nmea_lines(
193193
yield epoch_ms, message
194194

195195

196+
def _detect_timezone_offset(
197+
parsed_lines: list[tuple[float, pynmea2.NMEASentence]],
198+
) -> float:
199+
"""
200+
Detect timezone offset between camera clock and GPS time.
201+
202+
Tries RMC messages first (most reliable - has full date+time),
203+
then falls back to GGA/GLL (less reliable - time only, no date).
204+
Returns 0.0 if no offset could be determined.
205+
"""
206+
first_valid_gga_gll: tuple[float, pynmea2.NMEASentence] | None = None
207+
208+
for epoch_sec, message in parsed_lines:
209+
if message.sentence_type == "RMC":
210+
if hasattr(message, "is_valid") and message.is_valid:
211+
offset = _compute_timezone_offset_from_rmc(epoch_sec, message)
212+
if offset is not None:
213+
LOG.debug(
214+
"Computed timezone offset %.1fs from RMC (%s %s)",
215+
offset,
216+
message.datestamp,
217+
message.timestamp,
218+
)
219+
return offset
220+
221+
if first_valid_gga_gll is None and message.sentence_type in ["GGA", "GLL"]:
222+
if hasattr(message, "is_valid") and message.is_valid:
223+
first_valid_gga_gll = (epoch_sec, message)
224+
225+
# Fallback: if no RMC found, try GGA/GLL (less reliable - no date info)
226+
if first_valid_gga_gll is not None:
227+
epoch_sec, message = first_valid_gga_gll
228+
offset = _compute_timezone_offset_from_time_only(epoch_sec, message)
229+
if offset is not None:
230+
LOG.debug(
231+
"Computed timezone offset %.1fs from %s (fallback, no date info)",
232+
offset,
233+
message.sentence_type,
234+
)
235+
return offset
236+
237+
return 0.0
238+
239+
196240
def _parse_gps_box(gps_data: bytes) -> list[telemetry.GPSPoint]:
197241
"""
198242
>>> list(_parse_gps_box(b"[1623057074211]$GPGGA,202530.00,5109.0262,N,11401.8407,W,5,40,0.5,1097.36,M,-17.00,M,18,TSTR*61"))
@@ -210,83 +254,47 @@ def _parse_gps_box(gps_data: bytes) -> list[telemetry.GPSPoint]:
210254
>>> list(_parse_gps_box(b"[1623057074211]$GPVTG,,T,,M,0.078,N,0.144,K,D*28[1623057075215]"))
211255
[]
212256
"""
213-
timezone_offset: float | None = None
214257
parsed_lines: list[tuple[float, pynmea2.NMEASentence]] = []
215-
first_valid_gga_gll: tuple[float, pynmea2.NMEASentence] | None = None
216258

217-
# First pass: collect parsed_lines and compute timezone offset from the first valid RMC message
259+
# First pass: collect parsed_lines
218260
for epoch_ms, message in _parse_nmea_lines(gps_data):
219261
# Rounding needed to avoid floating point precision issues
220262
epoch_sec = round(epoch_ms / 1000, 3)
221263
parsed_lines.append((epoch_sec, message))
222-
if timezone_offset is None and message.sentence_type == "RMC":
223-
if hasattr(message, "is_valid") and message.is_valid:
224-
timezone_offset = _compute_timezone_offset_from_rmc(epoch_sec, message)
225-
if timezone_offset is not None:
226-
LOG.debug(
227-
"Computed timezone offset %.1fs from RMC (%s %s)",
228-
timezone_offset,
229-
message.datestamp,
230-
message.timestamp,
231-
)
232-
# Track first valid GGA/GLL for fallback
233-
if first_valid_gga_gll is None and message.sentence_type in ["GGA", "GLL"]:
234-
if hasattr(message, "is_valid") and message.is_valid:
235-
first_valid_gga_gll = (epoch_sec, message)
236-
237-
# Fallback: if no RMC found, try GGA/GLL (less reliable - no date info)
238-
if timezone_offset is None and first_valid_gga_gll is not None:
239-
epoch_sec, message = first_valid_gga_gll
240-
timezone_offset = _compute_timezone_offset_from_time_only(epoch_sec, message)
241-
if timezone_offset is not None:
242-
LOG.debug(
243-
"Computed timezone offset %.1fs from %s (fallback, no date info)",
244-
timezone_offset,
245-
message.sentence_type,
246-
)
247264

248-
# If no offset could be determined, use 0 (camera clock assumed correct)
249-
if timezone_offset is None:
250-
timezone_offset = 0.0
265+
timezone_offset = _detect_timezone_offset(parsed_lines)
251266

252267
points_by_sentence_type: dict[str, list[telemetry.GPSPoint]] = {}
253268

254269
# Second pass: apply offset to all GPS points
255270
for epoch_sec, message in parsed_lines:
271+
if message.sentence_type not in ("GGA", "RMC", "GLL"):
272+
continue
273+
if not message.is_valid:
274+
continue
275+
256276
corrected_epoch = round(epoch_sec + timezone_offset, 3)
257277

258-
# https://tavotech.com/gps-nmea-sentence-structure/
259-
if message.sentence_type in ["GGA"]:
260-
if not message.is_valid:
261-
continue
262-
point = telemetry.GPSPoint(
263-
time=corrected_epoch,
264-
lat=message.latitude,
265-
lon=message.longitude,
266-
alt=message.altitude,
267-
angle=None,
268-
epoch_time=corrected_epoch,
269-
fix=telemetry.GPSFix.FIX_3D if message.gps_qual >= 1 else None,
270-
precision=None,
271-
ground_speed=None,
272-
)
273-
points_by_sentence_type.setdefault(message.sentence_type, []).append(point)
274-
275-
elif message.sentence_type in ["RMC", "GLL"]:
276-
if not message.is_valid:
277-
continue
278-
point = telemetry.GPSPoint(
279-
time=corrected_epoch,
280-
lat=message.latitude,
281-
lon=message.longitude,
282-
alt=None,
283-
angle=None,
284-
epoch_time=corrected_epoch,
285-
fix=None,
286-
precision=None,
287-
ground_speed=None,
288-
)
289-
points_by_sentence_type.setdefault(message.sentence_type, []).append(point)
278+
# GGA has altitude and fix; RMC and GLL do not
279+
if message.sentence_type == "GGA":
280+
alt = message.altitude
281+
fix = telemetry.GPSFix.FIX_3D if message.gps_qual >= 1 else None
282+
else:
283+
alt = None
284+
fix = None
285+
286+
point = telemetry.GPSPoint(
287+
time=corrected_epoch,
288+
lat=message.latitude,
289+
lon=message.longitude,
290+
alt=alt,
291+
angle=None,
292+
epoch_time=corrected_epoch,
293+
fix=fix,
294+
precision=None,
295+
ground_speed=None,
296+
)
297+
points_by_sentence_type.setdefault(message.sentence_type, []).append(point)
290298

291299
# This is the extraction order in exiftool
292300
if "RMC" in points_by_sentence_type:

mapillary_tools/exiftool_read_video.py

Lines changed: 93 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,86 @@ def _deduplicate_gps_points(
120120
return deduplicated_track
121121

122122

123+
def _aggregate_float_values_same_length(
124+
texts_by_tag: dict[str, list[str]],
125+
tag: str | None,
126+
expected_length: int,
127+
) -> list[float | None]:
128+
if tag is not None:
129+
vals = [
130+
_maybe_float(val)
131+
for val in _extract_alternative_fields(texts_by_tag, [tag], list) or []
132+
]
133+
else:
134+
vals = []
135+
while len(vals) < expected_length:
136+
vals.append(None)
137+
return vals
138+
139+
140+
def _aggregate_epoch_times(
141+
texts_by_tag: dict[str, list[str]],
142+
gps_time_tag: str | None,
143+
time_tag: str | None,
144+
timestamps: list[float | None],
145+
expected_length: int,
146+
) -> list[float | None]:
147+
"""Aggregate GPS epoch times from tags, with fallback to per-point timestamps."""
148+
if gps_time_tag is not None:
149+
gps_epoch_times: list[float | None] = [
150+
geo.as_unix_time(dt) if dt is not None else None
151+
for dt in (
152+
exif_read.parse_gps_datetime(text)
153+
for text in _extract_alternative_fields(
154+
texts_by_tag, [gps_time_tag], list
155+
)
156+
or []
157+
)
158+
]
159+
if len(gps_epoch_times) != expected_length:
160+
LOG.warning(
161+
"Found different number of GPS epoch times %d and coordinates %d",
162+
len(gps_epoch_times),
163+
expected_length,
164+
)
165+
gps_epoch_times = [None] * expected_length
166+
return gps_epoch_times
167+
elif time_tag is not None:
168+
# Use per-point GPS timestamps as epoch times
169+
return [t for t in timestamps]
170+
else:
171+
return [None] * expected_length
172+
173+
174+
def _aggregate_timestamps(
175+
texts_by_tag: dict[str, list[str]],
176+
time_tag: str | None,
177+
expected_length: int,
178+
) -> list[float | None] | None:
179+
"""Aggregate timestamps from the time tag.
180+
181+
Returns the timestamp list, or None if the lengths don't match
182+
(caller should return [] in that case).
183+
"""
184+
if time_tag is not None:
185+
dts = [
186+
exif_read.parse_gps_datetime(text)
187+
for text in _extract_alternative_fields(texts_by_tag, [time_tag], list)
188+
or []
189+
]
190+
timestamps = [geo.as_unix_time(dt) if dt is not None else None for dt in dts]
191+
if expected_length != len(timestamps):
192+
LOG.warning(
193+
"Found different number of timestamps %d and coordinates %d",
194+
len(timestamps),
195+
expected_length,
196+
)
197+
return None
198+
else:
199+
timestamps = [0.0] * expected_length
200+
return timestamps
201+
202+
123203
def _aggregate_gps_track(
124204
texts_by_tag: dict[str, list[str]],
125205
time_tag: str | None,
@@ -159,73 +239,29 @@ def _aggregate_gps_track(
159239
expected_length = len(lats)
160240

161241
# aggregate timestamps (optional)
162-
if time_tag is not None:
163-
dts = [
164-
exif_read.parse_gps_datetime(text)
165-
for text in _extract_alternative_fields(texts_by_tag, [time_tag], list)
166-
or []
167-
]
168-
timestamps = [geo.as_unix_time(dt) if dt is not None else None for dt in dts]
169-
if expected_length != len(timestamps):
170-
# no idea what to do if we have different number of timestamps and coordinates
171-
LOG.warning(
172-
"Found different number of timestamps %d and coordinates %d",
173-
len(timestamps),
174-
expected_length,
175-
)
176-
return []
177-
else:
178-
timestamps = [0.0] * expected_length
242+
timestamps = _aggregate_timestamps(texts_by_tag, time_tag, expected_length)
243+
if timestamps is None:
244+
return []
179245

180246
assert len(timestamps) == expected_length
181247

182-
def _aggregate_float_values_same_length(
183-
tag: str | None,
184-
) -> list[float | None]:
185-
if tag is not None:
186-
vals = [
187-
_maybe_float(val)
188-
for val in _extract_alternative_fields(texts_by_tag, [tag], list) or []
189-
]
190-
else:
191-
vals = []
192-
while len(vals) < expected_length:
193-
vals.append(None)
194-
return vals
195-
196248
# aggregate altitudes (optional)
197-
alts = _aggregate_float_values_same_length(alt_tag)
249+
alts = _aggregate_float_values_same_length(texts_by_tag, alt_tag, expected_length)
198250

199251
# aggregate directions (optional)
200-
directions = _aggregate_float_values_same_length(direction_tag)
252+
directions = _aggregate_float_values_same_length(
253+
texts_by_tag, direction_tag, expected_length
254+
)
201255

202256
# aggregate speeds (optional)
203-
ground_speeds = _aggregate_float_values_same_length(ground_speed_tag)
257+
ground_speeds = _aggregate_float_values_same_length(
258+
texts_by_tag, ground_speed_tag, expected_length
259+
)
204260

205261
# GPS epoch times (optional)
206-
if gps_time_tag is not None:
207-
gps_epoch_times: list[float | None] = [
208-
geo.as_unix_time(dt) if dt is not None else None
209-
for dt in (
210-
exif_read.parse_gps_datetime(text)
211-
for text in _extract_alternative_fields(
212-
texts_by_tag, [gps_time_tag], list
213-
)
214-
or []
215-
)
216-
]
217-
if len(gps_epoch_times) != expected_length:
218-
LOG.warning(
219-
"Found different number of GPS epoch times %d and coordinates %d",
220-
len(gps_epoch_times),
221-
expected_length,
222-
)
223-
gps_epoch_times = [None] * expected_length
224-
elif time_tag is not None:
225-
# Use per-point GPS timestamps as epoch times
226-
gps_epoch_times = [t for t in timestamps]
227-
else:
228-
gps_epoch_times = [None] * expected_length
262+
gps_epoch_times = _aggregate_epoch_times(
263+
texts_by_tag, gps_time_tag, time_tag, timestamps, expected_length
264+
)
229265

230266
# build track
231267
track: list[GPSPoint] = []

0 commit comments

Comments
 (0)