|
1 | 1 | import io |
2 | 2 | import typing as T |
3 | 3 |
|
4 | | -from .. import geo, telemetry, types |
| 4 | +from .. import geo, types |
5 | 5 | from ..mp4 import ( |
6 | 6 | construct_mp4_parser as cparser, |
7 | 7 | mp4_sample_parser as sample_parser, |
|
11 | 11 | from . import camm_parser |
12 | 12 |
|
13 | 13 |
|
14 | | -TelemetryMeasurement = T.Union[ |
15 | | - geo.Point, |
16 | | - telemetry.TelemetryMeasurement, |
17 | | -] |
18 | | - |
19 | | - |
20 | | -def _build_camm_sample(measurement: TelemetryMeasurement) -> bytes: |
21 | | - if isinstance(measurement, geo.Point): |
22 | | - return camm_parser.CAMMSampleData.build( |
23 | | - { |
24 | | - "type": camm_parser.CAMMType.MIN_GPS.value, |
25 | | - "data": [ |
26 | | - measurement.lat, |
27 | | - measurement.lon, |
28 | | - -1.0 if measurement.alt is None else measurement.alt, |
29 | | - ], |
30 | | - } |
31 | | - ) |
32 | | - elif isinstance(measurement, telemetry.AccelerationData): |
33 | | - # Accelerometer reading in meters/second^2 along XYZ axes of the camera. |
34 | | - return camm_parser.CAMMSampleData.build( |
35 | | - { |
36 | | - "type": camm_parser.CAMMType.ACCELERATION.value, |
37 | | - "data": [ |
38 | | - measurement.x, |
39 | | - measurement.y, |
40 | | - measurement.z, |
41 | | - ], |
42 | | - } |
43 | | - ) |
44 | | - elif isinstance(measurement, telemetry.GyroscopeData): |
45 | | - # Gyroscope signal in radians/seconds around XYZ axes of the camera. Rotation is positive in the counterclockwise direction. |
46 | | - return camm_parser.CAMMSampleData.build( |
47 | | - { |
48 | | - "type": camm_parser.CAMMType.GYRO.value, |
49 | | - "data": [ |
50 | | - measurement.x, |
51 | | - measurement.y, |
52 | | - measurement.z, |
53 | | - ], |
54 | | - } |
55 | | - ) |
56 | | - elif isinstance(measurement, telemetry.MagnetometerData): |
57 | | - # Ambient magnetic field. |
58 | | - return camm_parser.CAMMSampleData.build( |
59 | | - { |
60 | | - "type": camm_parser.CAMMType.MAGNETIC_FIELD.value, |
61 | | - "data": [ |
62 | | - measurement.x, |
63 | | - measurement.y, |
64 | | - measurement.z, |
65 | | - ], |
66 | | - } |
67 | | - ) |
68 | | - else: |
69 | | - raise ValueError(f"unexpected measurement type {type(measurement)}") |
| 14 | +def _build_camm_sample(measurement: camm_parser.TelemetryMeasurement) -> bytes: |
| 15 | + for sample_entry_cls in camm_parser.SAMPLE_ENTRY_CLS_BY_CAMM_TYPE.values(): |
| 16 | + if sample_entry_cls.serializable(measurement): |
| 17 | + return sample_entry_cls.serialize(measurement) |
| 18 | + raise ValueError(f"Unsupported measurement type {type(measurement)}") |
70 | 19 |
|
71 | 20 |
|
72 | 21 | def _create_edit_list_from_points( |
@@ -121,16 +70,19 @@ def _create_edit_list_from_points( |
121 | 70 |
|
122 | 71 | def _multiplex( |
123 | 72 | points: T.Sequence[geo.Point], |
124 | | - measurements: T.Optional[T.List[telemetry.TelemetryMeasurement]] = None, |
125 | | -) -> T.List[TelemetryMeasurement]: |
126 | | - mutiplexed: T.List[TelemetryMeasurement] = [*points, *(measurements or [])] |
| 73 | + measurements: T.Optional[T.List[camm_parser.TelemetryMeasurement]] = None, |
| 74 | +) -> T.List[camm_parser.TelemetryMeasurement]: |
| 75 | + mutiplexed: T.List[camm_parser.TelemetryMeasurement] = [ |
| 76 | + *points, |
| 77 | + *(measurements or []), |
| 78 | + ] |
127 | 79 | mutiplexed.sort(key=lambda m: m.time) |
128 | 80 |
|
129 | 81 | return mutiplexed |
130 | 82 |
|
131 | 83 |
|
132 | 84 | def convert_telemetry_to_raw_samples( |
133 | | - measurements: T.Sequence[TelemetryMeasurement], |
| 85 | + measurements: T.Sequence[camm_parser.TelemetryMeasurement], |
134 | 86 | timescale: int, |
135 | 87 | ) -> T.Generator[sample_parser.RawSample, None, None]: |
136 | 88 | for idx, measurement in enumerate(measurements): |
@@ -283,7 +235,7 @@ def create_camm_trak( |
283 | 235 |
|
284 | 236 | def camm_sample_generator2( |
285 | 237 | video_metadata: types.VideoMetadata, |
286 | | - telemetry_measurements: T.Optional[T.List[telemetry.TelemetryMeasurement]] = None, |
| 238 | + telemetry_measurements: T.Optional[T.List[camm_parser.TelemetryMeasurement]] = None, |
287 | 239 | ): |
288 | 240 | def _f( |
289 | 241 | fp: T.BinaryIO, |
|
0 commit comments