-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathrecorder.py
More file actions
115 lines (98 loc) · 4.72 KB
/
recorder.py
File metadata and controls
115 lines (98 loc) · 4.72 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
import os
import shutil
import time
from datetime import timedelta, datetime
from threading import Event, Thread
from typing import Optional, List
import rtsp
class Recorder:
"""A class that uses threading to capture and save images in the background
Attributes
----------
image_dirs : List[str]
The paths of both the root image directory and the channel directories
num_cameras : int
How many cameras there are
dt_offset : float
How many seconds to add (or subtract if the number is negative) to the timestamp
of the images to better align with the actual timestamps pasted on the images themselves
capture_delay : float
How many seconds to wait in between capturing images
delete_old_images : bool, default=True
Whether to delete the existing "images" directory (if True, the entirety of the directory
listed as the first index of image_dirs will be deleted)
verbose : bool, default=True
Whether to log to the console information about what is happening while the script is running
cameras : List[rtsp.Client]
The Client objects that directly capture frames from cv2's RTSP buffer
_recorder_thread : threading.Thread
The thread that handles saving captured images to the disk
_stop_recording_event : threading.Event
An event that notifies the _recorder_thread to stop
"""
def __init__(self, image_dirs: List[str], num_cameras: int, dt_offset: float, capture_delay: float,
delete_old_images: bool = True, verbose: bool = True):
"""
Parameters
----------
image_dirs : List[str]
The paths of both the root image directory and the channel directories
num_cameras : int
How many cameras there are
dt_offset : float
How many seconds to add (or subtract if the number is negative) to the timestamp
of the images to better align with the actual timestamps pasted on the images themselves
capture_delay : float
How many seconds to wait in between capturing images
delete_old_images : bool, default=True
Whether to delete the existing "images" directory (if True, the entirety of the directory
listed as the first index of image_dirs will be deleted)
verbose : bool, default=True
Whether to log to the console information about what is happening while the script is running
"""
self.image_dirs = image_dirs
self.num_cameras = num_cameras
self.dt_offset = dt_offset
self.capture_delay = capture_delay
self.verbose = verbose
# delete the entire images directory if it exists
if delete_old_images:
try:
shutil.rmtree(self.image_dirs[0])
except FileNotFoundError:
pass
# add any directories that should exist
for missing_dir in filter(lambda x: not os.path.isdir(x), self.image_dirs):
os.mkdir(missing_dir)
# connect to all the cameras
if self.verbose:
print('Initializing cameras')
self.cameras = [rtsp.Client(f'rtsp://admin:123456@10.43.3.113/ch0{i + 1}/0', verbose=self.verbose) for i in
range(self.num_cameras)]
self._recorder_thread: Optional[Thread] = None
self._stop_recording_event = Event()
def start_recording(self):
"""Start recording and saving images to the disk"""
if self.verbose:
print('Starting to record')
def record():
while not self._stop_recording_event.is_set():
iter_time = time.time()
for i, camera in enumerate(self.cameras):
img = camera.read()
if img is not None:
timestamp = str(datetime.now() + timedelta(seconds=self.dt_offset)).replace(':', '_')
img.save(f"{self.image_dirs[1:][i]}\\{timestamp}.jpg")
# wait for capture delay to take more pics, accounting for the amount of time it took to take the pics
time.sleep(max(0.0, self.capture_delay - (time.time() - iter_time)))
self._recorder_thread = Thread(target=record)
self._recorder_thread.start()
def stop_recording(self):
"""Stop recording and saving images to the disk"""
self._stop_recording_event.set()
self._recorder_thread.join() # wait for thread to finish
# disconnect from the cameras
if self.verbose:
print('Finished recording')
for camera in self.cameras:
camera.close()