|
1 | 1 | import io |
2 | 2 | import typing as T |
3 | 3 |
|
4 | | -from .. import geo, types |
| 4 | +from .. import geo, telemetry, types |
5 | 5 | from ..mp4 import ( |
6 | 6 | construct_mp4_parser as cparser, |
7 | 7 | mp4_sample_parser as sample_parser, |
|
11 | 11 | from . import camm_parser |
12 | 12 |
|
13 | 13 |
|
14 | | -def _build_camm_sample(measurement: camm_parser.TelemetryMeasurement) -> bytes: |
15 | | - for sample_entry_cls in camm_parser.SAMPLE_ENTRY_CLS_BY_CAMM_TYPE.values(): |
16 | | - if sample_entry_cls.serializable(measurement): |
17 | | - return sample_entry_cls.serialize(measurement) |
18 | | - raise ValueError(f"Unsupported measurement type {type(measurement)}") |
| 14 | +TelemetryMeasurement = T.Union[ |
| 15 | + geo.Point, |
| 16 | + telemetry.TelemetryMeasurement, |
| 17 | +] |
| 18 | + |
| 19 | + |
| 20 | +def _build_camm_sample(measurement: TelemetryMeasurement) -> bytes: |
| 21 | + if isinstance(measurement, geo.Point): |
| 22 | + return camm_parser.CAMMSampleData.build( |
| 23 | + { |
| 24 | + "type": camm_parser.CAMMType.MIN_GPS.value, |
| 25 | + "data": [ |
| 26 | + measurement.lat, |
| 27 | + measurement.lon, |
| 28 | + -1.0 if measurement.alt is None else measurement.alt, |
| 29 | + ], |
| 30 | + } |
| 31 | + ) |
| 32 | + elif isinstance(measurement, telemetry.AccelerationData): |
| 33 | + # Accelerometer reading in meters/second^2 along XYZ axes of the camera. |
| 34 | + return camm_parser.CAMMSampleData.build( |
| 35 | + { |
| 36 | + "type": camm_parser.CAMMType.ACCELERATION.value, |
| 37 | + "data": [ |
| 38 | + measurement.x, |
| 39 | + measurement.y, |
| 40 | + measurement.z, |
| 41 | + ], |
| 42 | + } |
| 43 | + ) |
| 44 | + elif isinstance(measurement, telemetry.GyroscopeData): |
| 45 | + # Gyroscope signal in radians/seconds around XYZ axes of the camera. Rotation is positive in the counterclockwise direction. |
| 46 | + return camm_parser.CAMMSampleData.build( |
| 47 | + { |
| 48 | + "type": camm_parser.CAMMType.GYRO.value, |
| 49 | + "data": [ |
| 50 | + measurement.x, |
| 51 | + measurement.y, |
| 52 | + measurement.z, |
| 53 | + ], |
| 54 | + } |
| 55 | + ) |
| 56 | + elif isinstance(measurement, telemetry.MagnetometerData): |
| 57 | + # Ambient magnetic field. |
| 58 | + return camm_parser.CAMMSampleData.build( |
| 59 | + { |
| 60 | + "type": camm_parser.CAMMType.MAGNETIC_FIELD.value, |
| 61 | + "data": [ |
| 62 | + measurement.x, |
| 63 | + measurement.y, |
| 64 | + measurement.z, |
| 65 | + ], |
| 66 | + } |
| 67 | + ) |
| 68 | + else: |
| 69 | + raise ValueError(f"unexpected measurement type {type(measurement)}") |
19 | 70 |
|
20 | 71 |
|
21 | 72 | def _create_edit_list_from_points( |
@@ -70,19 +121,16 @@ def _create_edit_list_from_points( |
70 | 121 |
|
71 | 122 | def _multiplex( |
72 | 123 | points: T.Sequence[geo.Point], |
73 | | - measurements: T.Optional[T.List[camm_parser.TelemetryMeasurement]] = None, |
74 | | -) -> T.List[camm_parser.TelemetryMeasurement]: |
75 | | - mutiplexed: T.List[camm_parser.TelemetryMeasurement] = [ |
76 | | - *points, |
77 | | - *(measurements or []), |
78 | | - ] |
| 124 | + measurements: T.Optional[T.List[telemetry.TelemetryMeasurement]] = None, |
| 125 | +) -> T.List[TelemetryMeasurement]: |
| 126 | + mutiplexed: T.List[TelemetryMeasurement] = [*points, *(measurements or [])] |
79 | 127 | mutiplexed.sort(key=lambda m: m.time) |
80 | 128 |
|
81 | 129 | return mutiplexed |
82 | 130 |
|
83 | 131 |
|
84 | 132 | def convert_telemetry_to_raw_samples( |
85 | | - measurements: T.Sequence[camm_parser.TelemetryMeasurement], |
| 133 | + measurements: T.Sequence[TelemetryMeasurement], |
86 | 134 | timescale: int, |
87 | 135 | ) -> T.Generator[sample_parser.RawSample, None, None]: |
88 | 136 | for idx, measurement in enumerate(measurements): |
@@ -235,7 +283,7 @@ def create_camm_trak( |
235 | 283 |
|
236 | 284 | def camm_sample_generator2( |
237 | 285 | video_metadata: types.VideoMetadata, |
238 | | - telemetry_measurements: T.Optional[T.List[camm_parser.TelemetryMeasurement]] = None, |
| 286 | + telemetry_measurements: T.Optional[T.List[telemetry.TelemetryMeasurement]] = None, |
239 | 287 | ): |
240 | 288 | def _f( |
241 | 289 | fp: T.BinaryIO, |
|
0 commit comments