-
Notifications
You must be signed in to change notification settings - Fork 713
Expand file tree
/
Copy pathtest_evaluate.py
More file actions
299 lines (263 loc) · 13.2 KB
/
test_evaluate.py
File metadata and controls
299 lines (263 loc) · 13.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
# nuScenes dev-kit.
# Code written by Holger Caesar, 2019.
import json
import os
import random
import shutil
import sys
import unittest
from typing import Any, Dict, List, Optional
from unittest.mock import patch
import numpy as np
from nuscenes import NuScenes
from nuscenes.eval.common.config import config_factory
from nuscenes.eval.tracking.evaluate import TrackingEval
from nuscenes.eval.tracking.utils import category_to_tracking_name
from nuscenes.utils.splits import get_scenes_of_split
from parameterized import parameterized
from tqdm import tqdm
class TestMain(unittest.TestCase):
res_mockup = 'nusc_eval.json'
res_eval_folder = 'tmp'
splits_file_mockup = 'mocked_splits.json'
def setUp(self):
with open(self.splits_file_mockup, 'w') as f:
json.dump({
"mini_custom_train": ["scene-0061", "scene-0553"],
"mini_custom_val": ["scene-0103", "scene-0916"]
}, f, indent=2)
def tearDown(self):
if os.path.exists(self.res_mockup):
os.remove(self.res_mockup)
if os.path.exists(self.res_eval_folder):
shutil.rmtree(self.res_eval_folder)
if os.path.exists(self.splits_file_mockup):
os.remove(self.splits_file_mockup)
@staticmethod
def _mock_submission(nusc: NuScenes,
split: str,
add_errors: bool = False) -> Dict[str, dict]:
"""
Creates "reasonable" submission (results and metadata) by looping through the mini-val set, adding 1 GT
prediction per sample. Predictions will be permuted randomly along all axes.
:param nusc: NuScenes instance.
:param split: Dataset split to use.
:param add_errors: Whether to use GT or add errors to it.
"""
# Get config.
cfg = config_factory('tracking_nips_2019')
def random_class(category_name: str, _add_errors: bool = False) -> Optional[str]:
# Alter 10% of the valid labels.
class_names = sorted(cfg.tracking_names)
tmp = category_to_tracking_name(category_name)
if tmp is None:
return None
else:
if not _add_errors or np.random.rand() < .9:
return tmp
else:
return class_names[np.random.randint(0, len(class_names) - 1)]
def random_id(instance_token: str, _add_errors: bool = False) -> str:
# Alter 10% of the valid ids to be a random string, which hopefully corresponds to a new track.
if not _add_errors or np.random.rand() < .9:
_tracking_id = instance_token + '_pred'
else:
_tracking_id = str(np.random.randint(0, sys.maxsize))
return _tracking_id
mock_meta = {
'use_camera': False,
'use_lidar': True,
'use_radar': False,
'use_map': False,
'use_external': False,
}
mock_results = {}
# Get all samples in the current evaluation split.
scenes_of_eval_split : List[str] = get_scenes_of_split(split_name=split, nusc=nusc)
val_samples = []
for sample in nusc.sample:
if nusc.get('scene', sample['scene_token'])['name'] in scenes_of_eval_split:
val_samples.append(sample)
# Prepare results.
instance_to_score = dict()
for sample in tqdm(val_samples, leave=False):
sample_res = []
for ann_token in sample['anns']:
ann = nusc.get('sample_annotation', ann_token)
translation = np.array(ann['translation'])
size = np.array(ann['size'])
rotation = np.array(ann['rotation'])
velocity = nusc.box_velocity(ann_token)[:2]
tracking_id = random_id(ann['instance_token'], _add_errors=add_errors)
tracking_name = random_class(ann['category_name'], _add_errors=add_errors)
# Skip annotations for classes not part of the detection challenge.
if tracking_name is None:
continue
# Skip annotations with 0 lidar/radar points.
num_pts = ann['num_lidar_pts'] + ann['num_radar_pts']
if num_pts == 0:
continue
# If we randomly assign a score in [0, 1] to each box and later average over the boxes in the track,
# the average score will be around 0.5 and we will have 0 predictions above that.
# Therefore we assign the same scores to each box in a track.
if ann['instance_token'] not in instance_to_score:
instance_to_score[ann['instance_token']] = random.random()
tracking_score = instance_to_score[ann['instance_token']]
tracking_score = np.clip(tracking_score + random.random() * 0.3, 0, 1)
if add_errors:
translation += 4 * (np.random.rand(3) - 0.5)
size *= (np.random.rand(3) + 0.5)
rotation += (np.random.rand(4) - 0.5) * .1
velocity *= np.random.rand(3)[:2] + 0.5
sample_res.append({
'sample_token': sample['token'],
'translation': list(translation),
'size': list(size),
'rotation': list(rotation),
'velocity': list(velocity),
'tracking_id': tracking_id,
'tracking_name': tracking_name,
'tracking_score': tracking_score
})
mock_results[sample['token']] = sample_res
mock_submission = {
'meta': mock_meta,
'results': mock_results
}
return mock_submission
def basic_test(self,
eval_set: str = 'mini_val',
add_errors: bool = False,
render_curves: bool = False,
dist_fcn: str = '',
dist_th_tp: float = 0.0) -> Dict[str, Any]:
"""
Run the evaluation with fixed randomness on the specified subset, with or without introducing errors in the
submission.
:param eval_set: Which split to evaluate on.
:param add_errors: Whether to use GT as submission or introduce additional errors.
:param render_curves: Whether to render stats curves to disk.
:return: The metrics returned by the evaluation.
"""
random.seed(42)
np.random.seed(42)
assert 'NUSCENES' in os.environ, 'Set NUSCENES env. variable to enable tests.'
if eval_set.startswith('mini'):
version = 'v1.0-mini'
elif eval_set == 'test':
version = 'v1.0-test'
else:
version = 'v1.0-trainval'
nusc = NuScenes(version=version, dataroot=os.environ['NUSCENES'], verbose=False)
with open(self.res_mockup, 'w') as f:
mock = self._mock_submission(nusc, eval_set, add_errors=add_errors)
json.dump(mock, f, indent=2)
cfg = config_factory('tracking_nips_2019')
# Update dist fcn and threshold with those specified
cfg.dist_fcn = dist_fcn
cfg.dist_th_tp = dist_th_tp
nusc_eval = TrackingEval(cfg, self.res_mockup, eval_set=eval_set, output_dir=self.res_eval_folder,
nusc_version=version, nusc_dataroot=os.environ['NUSCENES'], verbose=False)
metrics = nusc_eval.main(render_curves=render_curves)
return metrics
@unittest.skip
def test_delta_mock(self,
eval_set: str = 'mini_val',
render_curves: bool = False):
"""
This tests runs the evaluation for an arbitrary random set of predictions.
This score is then captured in this very test such that if we change the eval code,
this test will trigger if the results changed.
:param eval_set: Which set to evaluate on.
:param render_curves: Whether to render stats curves to disk.
"""
# Run the evaluation with errors.
metrics = self.basic_test(eval_set, add_errors=True, render_curves=render_curves,
dist_fcn='center_distance', dist_th_tp=2.0)
# Compare metrics to known solution.
if eval_set == 'mini_val':
self.assertAlmostEqual(metrics['amota'], 0.23766771095785147)
self.assertAlmostEqual(metrics['amotp'], 1.5275400961369252)
self.assertAlmostEqual(metrics['motar'], 0.3726570200013319)
self.assertAlmostEqual(metrics['mota'], 0.25003943918566174)
self.assertAlmostEqual(metrics['motp'], 1.2976508610883917)
else:
print('Skipping checks due to choice of custom eval_set: %s' % eval_set)
# Run again with the alternative bev_iou_complement dist_fcn
metrics = self.basic_test(eval_set, add_errors=True, render_curves=render_curves,
dist_fcn='bev_iou_complement', dist_th_tp=0.999999)
# Compare metrics to known solution.
if eval_set == 'mini_val':
self.assertAlmostEqual(metrics['amota'], 0.231839679131956)
self.assertAlmostEqual(metrics['amotp'], 1.3629342647309446)
self.assertAlmostEqual(metrics['motar'], 0.27918315466340504)
self.assertAlmostEqual(metrics['mota'], 0.22922560056448252)
self.assertAlmostEqual(metrics['motp'], 0.7541595548820258)
else:
print('Skipping checks due to choice of custom eval_set: %s' % eval_set)
@parameterized.expand([
('mini_val',),
('mini_custom_train',)
])
@patch('nuscenes.utils.splits._get_custom_splits_file_path')
def test_delta_gt(self,
eval_set: str,
mock__get_custom_splits_file_path: str,
render_curves: bool = False):
"""
This tests runs the evaluation with the ground truth used as predictions.
This should result in a perfect score for every metric.
This score is then captured in this very test such that if we change the eval code,
this test will trigger if the results changed.
:param eval_set: Which set to evaluate on.
:param render_curves: Whether to render stats curves to disk.
"""
mock__get_custom_splits_file_path.return_value = self.splits_file_mockup
# Run the evaluation without errors.
metrics = self.basic_test(eval_set, add_errors=False, render_curves=render_curves,
dist_fcn='center_distance', dist_th_tp=2.0)
# Compare metrics to known solution. Do not check:
# - MT/TP (hard to figure out here).
# - AMOTA/AMOTP (unachieved recall values lead to hard unintuitive results).
if eval_set in ['mini_val', 'mini_custom_train']:
self.assertAlmostEqual(metrics['amota'], 1.0)
self.assertAlmostEqual(metrics['amotp'], 0.0, delta=1e-5)
self.assertAlmostEqual(metrics['motar'], 1.0)
self.assertAlmostEqual(metrics['recall'], 1.0)
self.assertAlmostEqual(metrics['mota'], 1.0)
self.assertAlmostEqual(metrics['motp'], 0.0, delta=1e-5)
self.assertAlmostEqual(metrics['faf'], 0.0)
self.assertAlmostEqual(metrics['ml'], 0.0)
self.assertAlmostEqual(metrics['fp'], 0.0)
self.assertAlmostEqual(metrics['fn'], 0.0)
self.assertAlmostEqual(metrics['ids'], 0.0)
self.assertAlmostEqual(metrics['frag'], 0.0)
self.assertAlmostEqual(metrics['tid'], 0.0)
self.assertAlmostEqual(metrics['lgd'], 0.0)
else:
print('Skipping checks due to choice of custom eval_set: %s' % eval_set)
# Run again with the alternative bev_iou_complement dist_fcn (and a very precise threshold)
metrics = self.basic_test(eval_set, add_errors=False, render_curves=render_curves,
dist_fcn='bev_iou_complement', dist_th_tp=1e-6)
# Compare metrics to known solution. Do not check:
# - MT/TP (hard to figure out here).
# - AMOTA/AMOTP (unachieved recall values lead to hard unintuitive results).
if eval_set in ['mini_val', 'mini_custom_train']:
self.assertAlmostEqual(metrics['amota'], 1.0)
self.assertAlmostEqual(metrics['amotp'], 0.0, delta=1e-5)
self.assertAlmostEqual(metrics['motar'], 1.0)
self.assertAlmostEqual(metrics['recall'], 1.0)
self.assertAlmostEqual(metrics['mota'], 1.0)
self.assertAlmostEqual(metrics['motp'], 0.0, delta=1e-5)
self.assertAlmostEqual(metrics['faf'], 0.0)
self.assertAlmostEqual(metrics['ml'], 0.0)
self.assertAlmostEqual(metrics['fp'], 0.0)
self.assertAlmostEqual(metrics['fn'], 0.0)
self.assertAlmostEqual(metrics['ids'], 0.0)
self.assertAlmostEqual(metrics['frag'], 0.0)
self.assertAlmostEqual(metrics['tid'], 0.0)
self.assertAlmostEqual(metrics['lgd'], 0.0)
else:
print('Skipping checks due to choice of custom eval_set: %s' % eval_set)
if __name__ == '__main__':
unittest.main()