Skip to content

Commit b4b8de6

Browse files
committed
[tests] Add FakeSubtractor to test event boundary merging
1 parent 7db4203 commit b4b8de6

1 file changed

Lines changed: 93 additions & 2 deletions

File tree

tests/test_scan_context.py

Lines changed: 93 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,14 @@
1616
import platform
1717
import typing as ty
1818

19+
import numpy as np
1920
import pytest
21+
from scenedetect import FrameTimecode
2022

2123
from dvr_scan.region import Point
22-
from dvr_scan.scanner import DetectorType, MotionScanner
23-
from dvr_scan.subtractor import SubtractorCNT, SubtractorCudaMOG2
24+
from dvr_scan.scanner import DetectorType, MotionEvent, MotionScanner
25+
from dvr_scan.subtractor import Subtractor, SubtractorCNT, SubtractorCudaMOG2
26+
from dvr_scan.video_joiner import VideoJoiner
2427

2528
MACHINE_ARCH = platform.machine().upper()
2629

@@ -255,3 +258,91 @@ def test_merge_within_time_before(traffic_camera_video):
255258
compare_event_lists(
256259
event_list, TRAFFIC_CAMERA_EVENTS_MERGE_WITHIN_TIME_BEFORE, EVENT_FRAME_TOLERANCE
257260
)
261+
262+
263+
class FakeVideo(VideoJoiner):
264+
def __init__(self):
265+
self._position = FrameTimecode(0, fps=self.framerate)
266+
267+
pass
268+
269+
@property
270+
def paths(self):
271+
return ["fake_path.mp4"]
272+
273+
@property
274+
def resolution(self):
275+
return (1, 1)
276+
277+
@property
278+
def framerate(self) -> float:
279+
return 1.0
280+
281+
@property
282+
def total_frames(self) -> int:
283+
return 1000
284+
285+
@property
286+
def decode_failures(self) -> float:
287+
return 0
288+
289+
@property
290+
def position(self) -> FrameTimecode:
291+
return self._position + 1
292+
293+
@property
294+
def position_ms(self) -> float:
295+
return self._position.get_seconds() / 1000.0
296+
297+
def read(self, decode: bool = True) -> ty.Optional[np.ndarray]:
298+
if self._position.get_frames() >= self.total_frames:
299+
return None
300+
img = np.zeros((self.resolution[1], self.resolution[0], 3), dtype=np.uint8)
301+
self._position += 1
302+
return img
303+
304+
def seek(self, target: FrameTimecode):
305+
pass
306+
307+
308+
def test_fake_video():
309+
# With default subtractor it won't have any motion, it's just empty frames.
310+
scanner = MotionScanner(FakeVideo())
311+
assert scanner.scan().event_list == []
312+
313+
314+
# A fake subtractor we control to give a specific set of frame scores to test boundary and event
315+
# merging behaviors.
316+
class FakeSubtractor(Subtractor):
317+
def __init__(self, events: ty.List[MotionEvent]):
318+
self._frame_num = 0
319+
assert events
320+
self._events = events
321+
self._curr_event = 0
322+
323+
def apply(self, frame: np.ndarray) -> np.ndarray:
324+
self._frame_num += 1
325+
frame = np.copy(frame[:, :, 0])
326+
if self._curr_event >= len(self._events):
327+
return frame
328+
if self._frame_num > self._events[self._curr_event].end:
329+
self._curr_event += 1
330+
return frame
331+
if self._frame_num > self._events[self._curr_event].start:
332+
return np.add(frame, 254) # Scores of 255 are rejected by default.
333+
return frame
334+
335+
@staticmethod
336+
def is_available():
337+
return True
338+
339+
340+
def test_fake_subtractor():
341+
scanner = MotionScanner(FakeVideo())
342+
base_time = FrameTimecode(0, scanner._input.framerate)
343+
expected_events = [MotionEvent(start=(base_time + 100), end=(base_time + 999))]
344+
scanner._subtractor = FakeSubtractor(events=expected_events)
345+
# TODO(#72): This should be the same as the above list ideally, figure out why it's not.
346+
assert scanner.scan().event_list == [
347+
MotionEvent(start=(base_time + 99), end=(base_time + 1001))
348+
]

0 commit comments

Comments
 (0)