diff --git a/mapillary_tools/camm/camm_builder.py b/mapillary_tools/camm/camm_builder.py index 6f4c9b08..d22c624a 100644 --- a/mapillary_tools/camm/camm_builder.py +++ b/mapillary_tools/camm/camm_builder.py @@ -1,7 +1,7 @@ import io import typing as T -from .. import geo, types +from .. import geo, telemetry, types from ..mp4 import ( construct_mp4_parser as cparser, mp4_sample_parser as sample_parser, @@ -11,20 +11,65 @@ from . import camm_parser -def build_camm_sample(point: geo.Point) -> bytes: - return camm_parser.CAMMSampleData.build( - { - "type": camm_parser.CAMMType.MIN_GPS.value, - "data": [ - point.lat, - point.lon, - -1.0 if point.alt is None else point.alt, - ], - } - ) +TelemetryMeasurement = T.Union[ + geo.Point, + telemetry.TelemetryMeasurement, +] -def _create_edit_list( +def _build_camm_sample(measurement: TelemetryMeasurement) -> bytes: + if isinstance(measurement, geo.Point): + return camm_parser.CAMMSampleData.build( + { + "type": camm_parser.CAMMType.MIN_GPS.value, + "data": [ + measurement.lat, + measurement.lon, + -1.0 if measurement.alt is None else measurement.alt, + ], + } + ) + elif isinstance(measurement, telemetry.AccelerationData): + # Accelerometer reading in meters/second^2 along XYZ axes of the camera. + return camm_parser.CAMMSampleData.build( + { + "type": camm_parser.CAMMType.ACCELERATION.value, + "data": [ + measurement.x, + measurement.y, + measurement.z, + ], + } + ) + elif isinstance(measurement, telemetry.GyroscopeData): + # Gyroscope signal in radians/seconds around XYZ axes of the camera. Rotation is positive in the counterclockwise direction. + return camm_parser.CAMMSampleData.build( + { + "type": camm_parser.CAMMType.GYRO.value, + "data": [ + measurement.x, + measurement.y, + measurement.z, + ], + } + ) + elif isinstance(measurement, telemetry.MagnetometerData): + # Ambient magnetic field. + return camm_parser.CAMMSampleData.build( + { + "type": camm_parser.CAMMType.MAGNETIC_FIELD.value, + "data": [ + measurement.x, + measurement.y, + measurement.z, + ], + } + ) + else: + raise ValueError(f"unexpected measurement type {type(measurement)}") + + +def _create_edit_list_from_points( point_segments: T.Sequence[T.Sequence[geo.Point]], movie_timescale: int, media_timescale: int, @@ -82,18 +127,30 @@ def _create_edit_list( } -def convert_points_to_raw_samples( - points: T.Sequence[geo.Point], timescale: int +def _multiplex( + points: T.Sequence[geo.Point], + measurements: T.Optional[T.List[telemetry.TelemetryMeasurement]] = None, +) -> T.List[TelemetryMeasurement]: + mutiplexed: T.List[TelemetryMeasurement] = [*points, *(measurements or [])] + mutiplexed.sort(key=lambda m: m.time) + + return mutiplexed + + +def convert_telemetry_to_raw_samples( + measurements: T.Sequence[TelemetryMeasurement], + timescale: int, ) -> T.Generator[sample_parser.RawSample, None, None]: - for idx, point in enumerate(points): - camm_sample_data = build_camm_sample(point) + for idx, measurement in enumerate(measurements): + camm_sample_data = _build_camm_sample(measurement) - if idx + 1 < len(points): - timedelta = int((points[idx + 1].time - point.time) * timescale) + if idx + 1 < len(measurements): + timedelta = int((measurements[idx + 1].time - measurement.time) * timescale) else: timedelta = 0 + assert 0 <= timedelta <= builder.UINT32_MAX, ( - f"expected timedelta {timedelta} between {points[idx]} and {points[idx + 1]} with timescale {timescale} to be <= UINT32_MAX" + f"expected timedelta {timedelta} between {measurements[idx]} and {measurements[idx + 1]} with timescale {timescale} to be <= UINT32_MAX" ) yield sample_parser.RawSample( @@ -232,7 +289,10 @@ def create_camm_trak( } -def camm_sample_generator2(video_metadata: types.VideoMetadata): +def camm_sample_generator2( + video_metadata: types.VideoMetadata, + telemetry_measurements: T.Optional[T.List[telemetry.TelemetryMeasurement]] = None, +): def _f( fp: T.BinaryIO, moov_children: T.List[builder.BoxDict], @@ -240,11 +300,12 @@ def _f( movie_timescale = builder.find_movie_timescale(moov_children) # make sure the precision of timedeltas not lower than 0.001 (1ms) media_timescale = max(1000, movie_timescale) + measurements = _multiplex(video_metadata.points, telemetry_measurements) camm_samples = list( - convert_points_to_raw_samples(video_metadata.points, media_timescale) + convert_telemetry_to_raw_samples(measurements, media_timescale) ) camm_trak = create_camm_trak(camm_samples, media_timescale) - elst = _create_edit_list( + elst = _create_edit_list_from_points( [video_metadata.points], movie_timescale, media_timescale ) if T.cast(T.Dict, elst["data"])["entries"]: @@ -280,6 +341,8 @@ def _f( ) # if yield, the moov_children will not be modified - return (io.BytesIO(build_camm_sample(point)) for point in video_metadata.points) + return ( + io.BytesIO(_build_camm_sample(measurement)) for measurement in measurements + ) return _f diff --git a/mapillary_tools/camm/camm_parser.py b/mapillary_tools/camm/camm_parser.py index 46b0fa48..a2271f99 100644 --- a/mapillary_tools/camm/camm_parser.py +++ b/mapillary_tools/camm/camm_parser.py @@ -9,13 +9,22 @@ import construct as C -from .. import geo -from ..mp4 import mp4_sample_parser as sample_parser, simple_mp4_parser as sparser +from .. import geo, telemetry +from ..mp4 import simple_mp4_parser as sparser +from ..mp4.mp4_sample_parser import MovieBoxParser, Sample, TrackBoxParser LOG = logging.getLogger(__name__) +TelemetryMeasurement = T.Union[ + geo.Point, + telemetry.AccelerationData, + telemetry.GyroscopeData, + telemetry.MagnetometerData, +] + + # Camera Motion Metadata Spec https://developers.google.com/streetview/publish/camm-spec class CAMMType(Enum): ANGLE_AXIS = 0 @@ -75,9 +84,9 @@ class CAMMType(Enum): ) -def _parse_point_from_sample( - fp: T.BinaryIO, sample: sample_parser.Sample -) -> T.Optional[geo.Point]: +def _parse_telemetry_from_sample( + fp: T.BinaryIO, sample: Sample +) -> T.Optional[TelemetryMeasurement]: fp.seek(sample.raw_sample.offset, io.SEEK_SET) data = fp.read(sample.raw_sample.size) box = CAMMSampleData.parse(data) @@ -99,12 +108,34 @@ def _parse_point_from_sample( alt=box.data.altitude, angle=None, ) + elif box.type == CAMMType.ACCELERATION.value: + return telemetry.AccelerationData( + time=sample.exact_time, + x=box.data[0], + y=box.data[1], + z=box.data[2], + ) + elif box.type == CAMMType.GYRO.value: + return telemetry.GyroscopeData( + time=sample.exact_time, + x=box.data[0], + y=box.data[1], + z=box.data[2], + ) + elif box.type == CAMMType.MAGNETIC_FIELD.value: + return telemetry.MagnetometerData( + time=sample.exact_time, + x=box.data[0], + y=box.data[1], + z=box.data[2], + ) return None -def filter_points_by_elst( - points: T.Iterable[geo.Point], elst: T.Sequence[T.Tuple[float, float]] -) -> T.Generator[geo.Point, None, None]: +def _filter_telemetry_by_elst_segments( + measurements: T.Iterable[TelemetryMeasurement], + elst: T.Sequence[T.Tuple[float, float]], +) -> T.Generator[TelemetryMeasurement, None, None]: empty_elst = [entry for entry in elst if entry[0] == -1] if empty_elst: offset = empty_elst[-1][1] @@ -114,20 +145,26 @@ def filter_points_by_elst( elst = [entry for entry in elst if entry[0] != -1] if not elst: - for p in points: - yield dataclasses.replace(p, time=p.time + offset) + for m in measurements: + if dataclasses.is_dataclass(m): + yield dataclasses.replace(m, time=m.time + offset) + else: + m._replace(time=m.time + offset) return elst.sort(key=lambda entry: entry[0]) elst_idx = 0 - for p in points: + for m in measurements: if len(elst) <= elst_idx: break media_time, duration = elst[elst_idx] - if p.time < media_time: + if m.time < media_time: pass - elif p.time <= media_time + duration: - yield dataclasses.replace(p, time=p.time + offset) + elif m.time <= media_time + duration: + if dataclasses.is_dataclass(m): + yield dataclasses.replace(m, time=m.time + offset) + else: + m._replace(time=m.time + offset) else: elst_idx += 1 @@ -148,46 +185,84 @@ def _is_camm_description(description: T.Dict) -> bool: return description["format"] == b"camm" +def _contains_camm_description(track: TrackBoxParser) -> bool: + descriptions = track.extract_sample_descriptions() + return any(_is_camm_description(d) for d in descriptions) + + +def _filter_telemetry_by_track_elst( + moov: MovieBoxParser, + track: TrackBoxParser, + measurements: T.Iterable[TelemetryMeasurement], +) -> T.List[TelemetryMeasurement]: + elst_boxdata = track.extract_elst_boxdata() + + if elst_boxdata is not None: + elst_entries = elst_boxdata["entries"] + if elst_entries: + # media_timescale + mdhd_boxdata = track.extract_mdhd_boxdata() + media_timescale = mdhd_boxdata["timescale"] + + # movie_timescale + mvhd_boxdata = moov.extract_mvhd_boxdata() + movie_timescale = mvhd_boxdata["timescale"] + + segments = [ + elst_entry_to_seconds( + entry, + movie_timescale=movie_timescale, + media_timescale=media_timescale, + ) + for entry in elst_entries + ] + + return list(_filter_telemetry_by_elst_segments(measurements, segments)) + + return list(measurements) + + def extract_points(fp: T.BinaryIO) -> T.Optional[T.List[geo.Point]]: """ Return a list of points (could be empty) if it is a valid CAMM video, otherwise None """ - points = None + moov = MovieBoxParser.parse_stream(fp) - moov = sample_parser.MovieBoxParser.parse_stream(fp) for track in moov.extract_tracks(): - descriptions = track.extract_sample_descriptions() - if any(_is_camm_description(d) for d in descriptions): - maybe_points = ( - _parse_point_from_sample(fp, sample) + if _contains_camm_description(track): + maybe_measurements = ( + _parse_telemetry_from_sample(fp, sample) for sample in track.extract_samples() if _is_camm_description(sample.description) ) - points = [p for p in maybe_points if p is not None] - if points: - elst_boxdata = track.extract_elst_boxdata() - if elst_boxdata is not None: - elst_entries = elst_boxdata["entries"] - if elst_entries: - # media_timescale - mdhd_boxdata = track.extract_mdhd_boxdata() - media_timescale = mdhd_boxdata["timescale"] - # movie_timescale - mvhd_boxdata = moov.extract_mvhd_boxdata() - movie_timescale = mvhd_boxdata["timescale"] - segments = [ - elst_entry_to_seconds( - entry, - movie_timescale=movie_timescale, - media_timescale=media_timescale, - ) - for entry in elst_entries - ] - points = list(filter_points_by_elst(points, segments)) + points = [m for m in maybe_measurements if isinstance(m, geo.Point)] - return points + return T.cast( + T.List[geo.Point], _filter_telemetry_by_track_elst(moov, track, points) + ) + + return None + + +def extract_telemetry_data(fp: T.BinaryIO) -> T.Optional[T.List[TelemetryMeasurement]]: + moov = MovieBoxParser.parse_stream(fp) + + for track in moov.extract_tracks(): + if _contains_camm_description(track): + maybe_measurements = ( + _parse_telemetry_from_sample(fp, sample) + for sample in track.extract_samples() + if _is_camm_description(sample.description) + ) + measurements = [m for m in maybe_measurements if m is not None] + + measurements = _filter_telemetry_by_track_elst(moov, track, measurements) + + return measurements + + return None def parse_gpx(path: pathlib.Path) -> T.List[geo.Point]: diff --git a/mapillary_tools/geotag/gpmf_parser.py b/mapillary_tools/geotag/gpmf_parser.py index 72c1ce34..f799ae21 100644 --- a/mapillary_tools/geotag/gpmf_parser.py +++ b/mapillary_tools/geotag/gpmf_parser.py @@ -7,7 +7,7 @@ import construct as C -from .. import imu +from .. import telemetry from ..mp4.mp4_sample_parser import MovieBoxParser, Sample, TrackBoxParser from ..telemetry import GPSFix, GPSPoint @@ -132,9 +132,9 @@ class KLVDict(T.TypedDict): @dataclasses.dataclass class TelemetryData: gps: T.List[GPSPoint] - accl: T.List[imu.AccelerationData] - gyro: T.List[imu.GyroscopeData] - magn: T.List[imu.MagnetometerData] + accl: T.List[telemetry.AccelerationData] + gyro: T.List[telemetry.GyroscopeData] + magn: T.List[telemetry.MagnetometerData] def _gps5_timestamp_to_epoch_time(dtstr: str): @@ -507,9 +507,9 @@ def _extract_points_from_samples( ) -> TelemetryData: # To keep GPS points from different devices separated points_by_dvid: T.Dict[int, T.List[GPSPoint]] = {} - accls_by_dvid: T.Dict[int, T.List[imu.AccelerationData]] = {} - gyros_by_dvid: T.Dict[int, T.List[imu.GyroscopeData]] = {} - magns_by_dvid: T.Dict[int, T.List[imu.MagnetometerData]] = {} + accls_by_dvid: T.Dict[int, T.List[telemetry.AccelerationData]] = {} + gyros_by_dvid: T.Dict[int, T.List[telemetry.GyroscopeData]] = {} + magns_by_dvid: T.Dict[int, T.List[telemetry.MagnetometerData]] = {} for sample in samples: fp.seek(sample.raw_sample.offset, io.SEEK_SET) @@ -536,7 +536,7 @@ def _extract_points_from_samples( # interpolate timestamps in between avg_delta = sample.exact_timedelta / len(sample_accls) accls_by_dvid.setdefault(device_id, []).extend( - imu.AccelerationData( + telemetry.AccelerationData( time=sample.exact_time + avg_delta * idx, x=x, y=y, @@ -550,7 +550,7 @@ def _extract_points_from_samples( # interpolate timestamps in between avg_delta = sample.exact_timedelta / len(sample_gyros) gyros_by_dvid.setdefault(device_id, []).extend( - imu.GyroscopeData( + telemetry.GyroscopeData( time=sample.exact_time + avg_delta * idx, x=x, y=y, @@ -564,7 +564,7 @@ def _extract_points_from_samples( # interpolate timestamps in between avg_delta = sample.exact_timedelta / len(sample_magns) magns_by_dvid.setdefault(device_id, []).extend( - imu.MagnetometerData( + telemetry.MagnetometerData( time=sample.exact_time + avg_delta * idx, x=x, y=y, diff --git a/mapillary_tools/imu.py b/mapillary_tools/imu.py deleted file mode 100644 index d49c4172..00000000 --- a/mapillary_tools/imu.py +++ /dev/null @@ -1,25 +0,0 @@ -import typing as T - - -# Gyroscope signal in radians/seconds around XYZ axes of the camera. Rotation is positive in the counterclockwise direction. -class GyroscopeData(T.NamedTuple): - time: float - x: float - y: float - z: float - - -# Accelerometer reading in meters/second^2 along XYZ axes of the camera. -class AccelerationData(T.NamedTuple): - time: float - x: float - y: float - z: float - - -# Ambient magnetic field. -class MagnetometerData(T.NamedTuple): - time: float - x: float - y: float - z: float diff --git a/mapillary_tools/telemetry.py b/mapillary_tools/telemetry.py index 7e0312e1..81ce5888 100644 --- a/mapillary_tools/telemetry.py +++ b/mapillary_tools/telemetry.py @@ -18,3 +18,42 @@ class GPSPoint(Point): fix: T.Optional[GPSFix] precision: T.Optional[float] ground_speed: T.Optional[float] + + +@dataclasses.dataclass(order=True) +class TelemetryMeasurement: + """Base class for all telemetry measurements. + + All telemetry measurements must have a timestamp in seconds. + This is an abstract base class - do not instantiate directly. + Instead use the concrete subclasses: AccelerationData, GyroscopeData, etc. + """ + + time: float + + +@dataclasses.dataclass(order=True) +class GyroscopeData(TelemetryMeasurement): + """Gyroscope signal in radians/seconds around XYZ axes of the camera.""" + + x: float + y: float + z: float + + +@dataclasses.dataclass(order=True) +class AccelerationData(TelemetryMeasurement): + """Accelerometer reading in meters/second^2 along XYZ axes of the camera.""" + + x: float + y: float + z: float + + +@dataclasses.dataclass(order=True) +class MagnetometerData(TelemetryMeasurement): + """Ambient magnetic field.""" + + x: float + y: float + z: float diff --git a/mapillary_tools/upload.py b/mapillary_tools/upload.py index 55657bcb..4a8904ba 100644 --- a/mapillary_tools/upload.py +++ b/mapillary_tools/upload.py @@ -20,6 +20,7 @@ geo, history, ipc, + telemetry, types, upload_api_v4, uploader, @@ -27,7 +28,7 @@ VERSION, ) from .camm import camm_builder, camm_parser -from .geotag import blackvue_parser, utils as video_utils +from .geotag import blackvue_parser, gpmf_parser, utils as video_utils from .mp4 import simple_mp4_builder from .types import FileType @@ -38,6 +39,7 @@ MAPILLARY__ENABLE_UPLOAD_HISTORY_FOR_DRY_RUN = os.getenv( "MAPILLARY__ENABLE_UPLOAD_HISTORY_FOR_DRY_RUN" ) +MAPILLARY__EXPERIMENTAL_ENABLE_IMU = os.getenv("MAPILLARY__EXPERIMENTAL_ENABLE_IMU") CAMM_CONVERTABLES = {FileType.CAMM, FileType.BLACKVUE, FileType.GOPRO} @@ -658,7 +660,22 @@ def upload( assert isinstance(video_metadata.md5sum, str), ( "md5sum should be updated" ) - generator = camm_builder.camm_sample_generator2(video_metadata) + + # extract telemetry measurements from GoPro videos + telemetry_measurements: T.List[telemetry.TelemetryMeasurement] = [] + if MAPILLARY__EXPERIMENTAL_ENABLE_IMU == "YES": + if video_metadata.filetype is FileType.GOPRO: + with video_metadata.filename.open("rb") as fp: + telemetry_data = gpmf_parser.extract_telemetry_data(fp) + if telemetry_data: + telemetry_measurements.extend(telemetry_data.accl) + telemetry_measurements.extend(telemetry_data.gyro) + telemetry_measurements.extend(telemetry_data.magn) + telemetry_measurements.sort(key=lambda m: m.time) + + generator = camm_builder.camm_sample_generator2( + video_metadata, telemetry_measurements=telemetry_measurements + ) with video_metadata.filename.open("rb") as src_fp: camm_fp = simple_mp4_builder.transform_mp4(src_fp, generator) event_payload: uploader.Progress = { diff --git a/mapillary_tools/video_data_extraction/video_data_parser_factory.py b/mapillary_tools/video_data_extraction/video_data_parser_factory.py index 9eca1f21..6239ea78 100644 --- a/mapillary_tools/video_data_extraction/video_data_parser_factory.py +++ b/mapillary_tools/video_data_extraction/video_data_parser_factory.py @@ -1,9 +1,7 @@ import typing as T from pathlib import Path -from mapillary_tools.video_data_extraction.extractors.exiftool_runtime_parser import ( - ExiftoolRuntimeParser, -) +from .extractors.exiftool_runtime_parser import ExiftoolRuntimeParser from .cli_options import CliOptions diff --git a/tests/cli/camm_parser.py b/tests/cli/camm_parser.py index 461c33d9..1e80676f 100644 --- a/tests/cli/camm_parser.py +++ b/tests/cli/camm_parser.py @@ -1,11 +1,12 @@ import argparse +import dataclasses import json import pathlib import gpxpy import gpxpy.gpx -from mapillary_tools import utils +from mapillary_tools import telemetry, utils from mapillary_tools.camm import camm_parser from mapillary_tools.geotag import utils as geotag_utils @@ -22,15 +23,50 @@ def _convert(path: pathlib.Path): return track -def main(): +def _parse_args(): parser = argparse.ArgumentParser() + parser.add_argument("--imu", help="Print IMU in JSON") parser.add_argument("camm_video_path", nargs="+") - parsed = parser.parse_args() + return parser.parse_args() + + +def main(): + parsed_args = _parse_args() + + video_paths = utils.find_videos( + [pathlib.Path(p) for p in parsed_args.camm_video_path] + ) + + if parsed_args.imu: + imu_option = parsed_args.imu.split(",") + + for path in video_paths: + with path.open("rb") as fp: + telemetry_measurements = camm_parser.extract_telemetry_data(fp) + + accls = [] + gyros = [] + magns = [] + if telemetry_measurements: + for m in telemetry_measurements: + if isinstance(m, telemetry.AccelerationData): + accls.append(m) + elif isinstance(m, telemetry.GyroscopeData): + gyros.append(m) + elif isinstance(m, telemetry.MagnetometerData): + magns.append(m) - gpx = gpxpy.gpx.GPX() - for p in utils.find_videos([pathlib.Path(p) for p in parsed.camm_video_path]): - gpx.tracks.append(_convert(p)) - print(gpx.to_xml()) + if "accl" in imu_option: + print(json.dumps([dataclasses.asdict(accl) for accl in accls])) + if "gyro" in imu_option: + print(json.dumps([dataclasses.asdict(gyro) for gyro in gyros])) + if "magn" in imu_option: + print(json.dumps([dataclasses.asdict(magn) for magn in magns])) + else: + gpx = gpxpy.gpx.GPX() + for path in video_paths: + gpx.tracks.append(_convert(path)) + print(gpx.to_xml()) if __name__ == "__main__": diff --git a/tests/cli/gpmf_parser.py b/tests/cli/gpmf_parser.py index d271a588..f621fc76 100644 --- a/tests/cli/gpmf_parser.py +++ b/tests/cli/gpmf_parser.py @@ -1,4 +1,5 @@ import argparse +import dataclasses import datetime import io import json @@ -119,47 +120,70 @@ def _parse_samples(path: pathlib.Path) -> T.Generator[T.Dict, None, None]: def _parse_args(): parser = argparse.ArgumentParser() - parser.add_argument("path", nargs="+", help="Path to video file or directory") parser.add_argument("--geojson", help="Print as GeoJSON", action="store_true") + parser.add_argument("--imu", help="Print IMU in JSON") parser.add_argument( "--dump", help="Print as Construct structures", action="store_true" ) + parser.add_argument("path", nargs="+", help="Path to video file or directory") return parser.parse_args() def main(): parsed_args = _parse_args() - features = [] - parsed_samples = [] - gpx = gpxpy.gpx.GPX() - - def _process(path: pathlib.Path): - if parsed_args.dump: - parsed_samples.extend(_parse_samples(path)) - elif parsed_args.geojson: + video_paths = utils.find_videos([pathlib.Path(p) for p in parsed_args.path]) + + if parsed_args.imu: + imu_option = parsed_args.imu.split(",") + for path in video_paths: + with path.open("rb") as fp: + telemetry_data = gpmf_parser.extract_telemetry_data(fp) + if telemetry_data: + if "accl" in imu_option: + print( + json.dumps( + [dataclasses.asdict(accl) for accl in telemetry_data.accl] + ) + ) + if "gyro" in imu_option: + print( + json.dumps( + [dataclasses.asdict(gyro) for gyro in telemetry_data.gyro] + ) + ) + if "magn" in imu_option: + print( + json.dumps( + [dataclasses.asdict(magn) for magn in telemetry_data.magn] + ) + ) + + elif parsed_args.geojson: + features = [] + for path in video_paths: features.extend(_convert_geojson(path)) - else: - _convert_gpx(gpx, path) - - for path in utils.find_videos([pathlib.Path(p) for p in parsed_args.path]): - _process(path) + print( + json.dumps( + { + "type": "FeatureCollection", + "features": features, + } + ) + ) - if parsed_args.dump: + elif parsed_args.dump: + parsed_samples = [] + for path in video_paths: + parsed_samples.extend(_parse_samples(path)) for sample in parsed_samples: print(sample) + else: - if features: - print( - json.dumps( - { - "type": "FeatureCollection", - "features": features, - } - ) - ) - else: - print(gpx.to_xml()) + gpx = gpxpy.gpx.GPX() + for path in video_paths: + _convert_gpx(gpx, path) + print(gpx.to_xml()) if __name__ == "__main__": diff --git a/tests/unit/test_camm_parser.py b/tests/unit/test_camm_parser.py index af2ff845..8eb48c07 100644 --- a/tests/unit/test_camm_parser.py +++ b/tests/unit/test_camm_parser.py @@ -9,23 +9,23 @@ def test_filter_points_by_edit_list(): - assert [] == list(camm_parser.filter_points_by_elst([], [])) + assert [] == list(camm_parser._filter_telemetry_by_elst_segments([], [])) points = [ geo.Point(time=0, lat=0, lon=0, alt=None, angle=None), geo.Point(time=0.23, lat=0, lon=0, alt=None, angle=None), geo.Point(time=0.29, lat=0, lon=0, alt=None, angle=None), geo.Point(time=0.31, lat=0, lon=0, alt=None, angle=None), ] - assert points == list(camm_parser.filter_points_by_elst(points, [])) + assert points == list(camm_parser._filter_telemetry_by_elst_segments(points, [])) assert [dataclasses.replace(p, time=p.time + 4.4) for p in points] == list( - camm_parser.filter_points_by_elst(points, [(-1, 3), (-1, 4.4)]) + camm_parser._filter_telemetry_by_elst_segments(points, [(-1, 3), (-1, 4.4)]) ) assert [ geo.Point(time=0.23 + 4.4, lat=0, lon=0, alt=None, angle=None), geo.Point(time=0.31 + 4.4, lat=0, lon=0, alt=None, angle=None), ] == list( - camm_parser.filter_points_by_elst( + camm_parser._filter_telemetry_by_elst_segments( points, [(-1, 3), (-1, 4.4), (0.21, 0.04), (0.30, 0.04)] ) ) @@ -33,7 +33,9 @@ def test_filter_points_by_edit_list(): assert [ geo.Point(time=0.29 + 4.4, lat=0, lon=0, alt=None, angle=None), geo.Point(time=0.31 + 4.4, lat=0, lon=0, alt=None, angle=None), - ] == list(camm_parser.filter_points_by_elst(points, [(-1, 4.4), (0.24, 0.3)])) + ] == list( + camm_parser._filter_telemetry_by_elst_segments(points, [(-1, 4.4), (0.24, 0.3)]) + ) def build_mp4(metadata: types.VideoMetadata) -> types.VideoMetadata: