Skip to content

Commit c6e91dc

Browse files
committed
update VideoRTSPStream to enable reuse of same stream for multiple callbacks
1 parent 9c135a7 commit c6e91dc

1 file changed

Lines changed: 83 additions & 7 deletions

File tree

src/om1_vlm/video/video_rtsp_stream.py

Lines changed: 83 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,35 @@ class VideoRTSPStream:
4040
JPEG quality for encoding frames, by default 70
4141
"""
4242

43+
# Per-URL singleton registry
44+
_instances: dict[str, "VideoRTSPStream"] = {}
45+
_instances_lock = threading.Lock()
46+
47+
def __new__(
48+
cls,
49+
rtsp_url: str = "rtsp://localhost:8554/live",
50+
decode_format: str = "H264",
51+
*args,
52+
**kwargs,
53+
):
54+
with cls._instances_lock:
55+
existing = cls._instances.get(rtsp_url)
56+
if existing is not None:
57+
# already have a stream for this URL → reuse and bump refcount
58+
existing._refcount += 1
59+
logger.info(
60+
f"Reusing existing VideoRTSPStream for {rtsp_url}, "
61+
f"refcount={existing._refcount}"
62+
)
63+
return existing
64+
65+
# first time this rtsp_url is seen → create new instance
66+
instance = super().__new__(cls)
67+
cls._instances[rtsp_url] = instance
68+
instance._refcount = 1
69+
logger.info(f"Created new VideoRTSPStream for {rtsp_url}, refcount=1")
70+
return instance
71+
4372
def __init__(
4473
self,
4574
rtsp_url: str = "rtsp://localhost:8554/live",
@@ -50,6 +79,36 @@ def __init__(
5079
resolution: Optional[Tuple[int, int]] = (480, 640),
5180
jpeg_quality: int = 70,
5281
):
82+
# Prevent reinitialising on subsequent "constructions" for same URL
83+
if getattr(self, "_initialized", False):
84+
# register any new callbacks on the existing instance
85+
try:
86+
if frame_callback is not None:
87+
self.register_frame_callback(frame_callback)
88+
logger.info(
89+
f"VideoRTSPStream for {self.rtsp_url}: registered extra frame callback on reused stream"
90+
)
91+
logger.info(
92+
f"Current number of callbacks: {len(self.frame_callbacks)}"
93+
)
94+
except Exception as e:
95+
logger.error(
96+
f"VideoRTSPStream for {self.rtsp_url}: failed to register extra callback(s) on reused stream: {e}"
97+
)
98+
99+
# mismatch logging logic (decode_format only)
100+
if decode_format != self.decode_format:
101+
logger.info(
102+
f"VideoRTSPStream for {self.rtsp_url} already initialized "
103+
f"with decode_format={self.decode_format}, fps={self.fps}, "
104+
f"resolution={self.resolution}, jpeg_quality={self.jpeg_quality}, "
105+
f"ignoring new request with decode_format={decode_format}, "
106+
f"fps={fps}, resolution={resolution}, jpeg_quality={jpeg_quality}"
107+
)
108+
return
109+
110+
self._initialized = True
111+
53112
# RTSP stream parameters
54113
self.rtsp_url = rtsp_url
55114
self.decode_format = decode_format
@@ -235,14 +294,31 @@ def stop(self):
235294
"""
236295
Stop video capture and clean up resources.
237296
238-
Stops the video processing loop and waits for the
239-
processing thread to finish.
297+
Decrement reference count and only actually stop the stream
298+
when the last user releases it.
240299
"""
241-
self.running = False
300+
with self._instances_lock:
301+
self._refcount -= 1
302+
logger.info(
303+
f"VideoRTSPStream.stop called for {self.rtsp_url}, "
304+
f"new refcount={self._refcount}"
305+
)
306+
307+
if self._refcount > 0:
308+
# Still in use by other providers — do NOT tear it down.
309+
return
242310

243-
self._release_capture()
311+
# Last user: really stop and remove from registry
312+
self.running = False
313+
self._release_capture()
244314

245-
if self._video_thread and self._video_thread.is_alive():
246-
self._video_thread.join(timeout=1.0)
315+
if self._video_thread and self._video_thread.is_alive():
316+
self._video_thread.join(timeout=1.0)
247317

248-
logger.info("Stopped video processing thread")
318+
logger.info("Stopped video processing thread")
319+
320+
# Remove from class-level registry
321+
try:
322+
del self._instances[self.rtsp_url]
323+
except KeyError:
324+
pass

0 commit comments

Comments
 (0)