This repository was archived by the owner on Apr 3, 2026. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 12
Expand file tree
/
Copy pathtest_spatial_sampling.py
More file actions
103 lines (82 loc) · 3.07 KB
/
test_spatial_sampling.py
File metadata and controls
103 lines (82 loc) · 3.07 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
import os
import unittest
import numpy as np
import pandas as pd
from shapely import wkt
from shapely.geometry import Point
from mapswipe_workers.utils.spatial_sampling import (
distance_on_sphere,
filter_points,
spatial_sampling,
)
class TestDistanceCalculations(unittest.TestCase):
@classmethod
def setUpClass(cls):
with open(
os.path.join(
os.path.dirname(os.path.abspath(__file__)),
"..",
"fixtures",
"mapillary_sequence.csv",
),
"r",
) as file:
df = pd.read_csv(file)
df["geometry"] = df["geometry"].apply(wkt.loads)
cls.fixture_df = df
def test_distance_on_sphere(self):
p1 = Point(-74.006, 40.7128)
p2 = Point(-118.2437, 34.0522)
distance = distance_on_sphere((p1.x, p1.y), (p2.x, p2.y))
expected_distance = 3940 # Approximate known distance in km
self.assertTrue(np.isclose(distance, expected_distance, atol=50))
def test_filter_points(self):
data = {
"geometry": [
"POINT (-74.006 40.7128)",
"POINT (-75.006 41.7128)",
"POINT (-76.006 42.7128)",
"POINT (-77.006 43.7128)",
]
}
df = pd.DataFrame(data)
df["geometry"] = df["geometry"].apply(wkt.loads)
df["long"] = df["geometry"].apply(
lambda geom: geom.x if geom.geom_type == "Point" else None
)
df["lat"] = df["geometry"].apply(
lambda geom: geom.y if geom.geom_type == "Point" else None
)
threshold_distance = 100
filtered_df = filter_points(df, threshold_distance)
self.assertIsInstance(filtered_df, pd.DataFrame)
self.assertLessEqual(len(filtered_df), len(df))
def test_spatial_sampling_ordering(self):
data = {
"geometry": [
"POINT (-74.006 40.7128)",
"POINT (-75.006 41.7128)",
"POINT (-76.006 42.7128)",
"POINT (-77.006 43.7128)",
],
"captured_at": [1, 2, 3, 4],
"sequence_id": ["1", "1", "1", "1"],
}
df = pd.DataFrame(data)
df["geometry"] = df["geometry"].apply(wkt.loads)
interval_length = 0.1
filtered_gdf = spatial_sampling(df, interval_length)
self.assertTrue(filtered_gdf["captured_at"].is_monotonic_decreasing)
def test_spatial_sampling_with_sequence(self):
threshold_distance = 0.01
filtered_df = spatial_sampling(self.fixture_df, threshold_distance)
self.assertIsInstance(filtered_df, pd.DataFrame)
self.assertLess(len(filtered_df), len(self.fixture_df))
filtered_df.reset_index(drop=True, inplace=True)
for i in range(len(filtered_df) - 1):
geom1 = filtered_df.loc[i, "geometry"]
geom2 = filtered_df.loc[i + 1, "geometry"]
distance = geom1.distance(geom2)
self.assertLess(distance, threshold_distance)
if __name__ == "__main__":
unittest.main()