Skip to content

Commit c72ee0e

Browse files
committed
Fix streaming race condition when streaming to multiple viewers
- Use fresh REQ socket for each send instead of caching (prevents ZMQ state errors) - Standardize both Napari and Fiji on REQ/REP pattern (was PUB/SUB for Napari) - Remove hardcoded VIEWER_TYPE == 'fiji' check in _get_publisher() - Both backends now block until viewer acknowledges receipt Fixes: Cannot stream to both Napari and Fiji in quick succession
1 parent d46732b commit c72ee0e

File tree

3 files changed

+76
-32
lines changed

3 files changed

+76
-32
lines changed

src/polystore/fiji_stream.py

Lines changed: 36 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,17 @@
1212
"""
1313

1414
import logging
15+
import time
1516
from pathlib import Path
1617
from typing import Any, List, Union
1718

19+
import zmq
20+
1821
from .constants import Backend, TransportMode
1922
from .streaming_constants import StreamingDataType
2023
from .streaming import StreamingBackend
2124
from .roi_converters import FijiROIConverter
25+
from zmqruntime.transport import get_zmq_transport_url, coerce_transport_mode
2226

2327
logger = logging.getLogger(__name__)
2428

@@ -81,7 +85,6 @@ def save_batch(self, data_list: List[Any], file_paths: List[Union[str, Path]], *
8185
port = kwargs['port']
8286
transport_mode = kwargs['transport_mode']
8387
transport_config = kwargs.get('transport_config')
84-
publisher = self._get_publisher(host, port, transport_mode, transport_config=transport_config)
8588
display_config = kwargs['display_config']
8689
microscope_handler = kwargs['microscope_handler']
8790
source = kwargs.get('source', 'unknown_source') # Pre-built source value
@@ -132,16 +135,39 @@ def save_batch(self, data_list: List[Any], file_paths: List[Union[str, Path]], *
132135
transport_config=transport_config,
133136
)
134137

135-
# Send with REQ socket (BLOCKING - worker waits for Fiji to acknowledge)
136-
# Worker blocks until Fiji receives, copies data from shared memory, and sends ack
137-
# This guarantees no messages are lost and shared memory is only closed after Fiji is done
138-
logger.info(f"📤 FIJI BACKEND: Sending batch of {len(batch_images)} images to Fiji on port {port} (REQ/REP - blocking until ack)")
139-
publisher.send_json(message) # Blocking send
138+
# Create FRESH REQ socket for each send - REQ sockets cannot be reused
139+
# This prevents the "Operation cannot be accomplished in current state" error
140+
# when multiple streams happen concurrently
141+
transport_config = transport_config or self._transport_config
142+
url = get_zmq_transport_url(
143+
port,
144+
host=host,
145+
mode=coerce_transport_mode(transport_mode),
146+
config=transport_config,
147+
)
148+
149+
if self._context is None:
150+
self._context = zmq.Context()
151+
152+
socket = self._context.socket(zmq.REQ)
153+
socket.connect(url)
154+
time.sleep(0.1) # Brief delay for connection to establish
155+
156+
try:
157+
# Send with REQ socket (BLOCKING - worker waits for Fiji to acknowledge)
158+
# Worker blocks until Fiji receives, copies data from shared memory, and sends ack
159+
# This guarantees no messages are lost and shared memory is only closed after Fiji is done
160+
logger.info(f"📤 FIJI BACKEND: Sending batch of {len(batch_images)} images to Fiji on port {port} (REQ/REP - blocking until ack)")
161+
socket.send_json(message) # Blocking send
162+
163+
# Wait for acknowledgment from Fiji (REP socket)
164+
# Fiji will only reply after it has copied all data from shared memory
165+
ack_response = socket.recv_json()
166+
logger.info(f"✅ FIJI BACKEND: Received ack from Fiji: {ack_response.get('status', 'unknown')}")
140167

141-
# Wait for acknowledgment from Fiji (REP socket)
142-
# Fiji will only reply after it has copied all data from shared memory
143-
ack_response = publisher.recv_json()
144-
logger.info(f"✅ FIJI BACKEND: Received ack from Fiji: {ack_response.get('status', 'unknown')}")
168+
finally:
169+
# Always close the socket - never reuse REQ sockets
170+
socket.close()
145171

146172
# Clean up publisher's handles after successful send
147173
# Receiver will unlink the shared memory after copying the data

src/polystore/napari_stream.py

Lines changed: 36 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,11 @@
99
- Sender (Worker): Creates shared memory, sends reference via ZMQ, closes handle (does NOT unlink)
1010
- Receiver (Napari Server): Attaches to shared memory, copies data, closes handle, unlinks
1111
- Only receiver calls unlink() to prevent FileNotFoundError
12-
- PUB/SUB socket pattern is non-blocking; receiver must copy data before sender closes handle
12+
- REQ/REP socket pattern is blocking; worker waits for acknowledgment before closing shared memory
1313
"""
1414

1515
import logging
16+
import time
1617
from pathlib import Path
1718
from typing import Any, List, Union
1819

@@ -22,6 +23,7 @@
2223
from .streaming_constants import StreamingDataType
2324
from .streaming import StreamingBackend
2425
from .roi_converters import NapariROIConverter
26+
from zmqruntime.transport import get_zmq_transport_url, coerce_transport_mode
2527

2628
logger = logging.getLogger(__name__)
2729

@@ -82,7 +84,6 @@ def save_batch(self, data_list: List[Any], file_paths: List[Union[str, Path]], *
8284
port = kwargs['port']
8385
transport_mode = kwargs['transport_mode']
8486
transport_config = kwargs.get('transport_config')
85-
publisher = self._get_publisher(host, port, transport_mode, transport_config=transport_config)
8687
display_config = kwargs['display_config']
8788
microscope_handler = kwargs['microscope_handler']
8889
source = kwargs.get('source', 'unknown_source') # Pre-built source value
@@ -114,22 +115,43 @@ def save_batch(self, data_list: List[Any], file_paths: List[Union[str, Path]], *
114115
transport_config=transport_config,
115116
)
116117

117-
# Send non-blocking to prevent hanging if Napari is slow to process (matches Fiji pattern)
118-
send_succeeded = False
119-
try:
120-
publisher.send_json(message, flags=zmq.NOBLOCK)
121-
send_succeeded = True
118+
# Create FRESH REQ socket for each send - REQ sockets cannot be reused
119+
# This prevents the "Operation cannot be accomplished in current state" error
120+
# when multiple streams happen concurrently
121+
transport_config = transport_config or self._transport_config
122+
url = get_zmq_transport_url(
123+
port,
124+
host=host,
125+
mode=coerce_transport_mode(transport_mode),
126+
config=transport_config,
127+
)
122128

123-
except zmq.Again:
124-
logger.warning(f"Napari viewer busy, dropped batch of {len(batch_images)} images (port {port})")
129+
if self._context is None:
130+
self._context = zmq.Context()
125131

126-
except Exception as e:
127-
logger.error(f"Failed to send batch to Napari on port {port}: {e}", exc_info=True)
128-
raise # Re-raise the exception so the pipeline knows it failed
132+
socket = self._context.socket(zmq.REQ)
133+
socket.connect(url)
134+
time.sleep(0.1) # Brief delay for connection to establish
135+
136+
try:
137+
# Send with REQ socket (BLOCKING - worker waits for Napari to acknowledge)
138+
# Worker blocks until Napari receives, copies data from shared memory, and sends ack
139+
# This guarantees no messages are lost and shared memory is only closed after Napari is done
140+
logger.info(f"📤 NAPARI BACKEND: Sending batch of {len(batch_images)} images to Napari on port {port} (REQ/REP - blocking until ack)")
141+
socket.send_json(message) # Blocking send
142+
143+
# Wait for acknowledgment from Napari (REP socket)
144+
# Napari will only reply after it has copied all data from shared memory
145+
ack_response = socket.recv_json()
146+
logger.info(f"✅ NAPARI BACKEND: Received ack from Napari: {ack_response.get('status', 'unknown')}")
129147

130148
finally:
131-
# Unified cleanup: close our handle after successful send, close+unlink after failure
132-
self._cleanup_shared_memory_blocks(batch_images, unlink=not send_succeeded)
149+
# Always close the socket - never reuse REQ sockets
150+
socket.close()
151+
152+
# Clean up publisher's handles after successful send
153+
# Receiver will unlink the shared memory after copying the data
154+
self._cleanup_shared_memory_blocks(batch_images, unlink=False)
133155

134156
# cleanup() now inherited from ABC
135157

src/polystore/streaming/_streaming_backend.py

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -135,16 +135,12 @@ def _get_publisher(self, host: str, port: int, transport_mode: TransportMode, tr
135135
if self._context is None:
136136
self._context = zmq.Context()
137137

138-
# Use REQ socket for Fiji (synchronous request/reply - worker blocks until Fiji acks)
139-
# Use PUB socket for Napari (broadcast pattern)
140-
socket_type = zmq.REQ if self.VIEWER_TYPE == 'fiji' else zmq.PUB
141-
publisher = self._context.socket(socket_type)
142-
143-
if socket_type == zmq.PUB:
144-
publisher.setsockopt(zmq.SNDHWM, 100000) # Only for PUB sockets
138+
# Use REQ socket for all viewers (synchronous request/reply)
139+
# All viewers must send acknowledgment after processing
140+
publisher = self._context.socket(zmq.REQ)
145141

146142
publisher.connect(url)
147-
socket_name = "REQ" if socket_type == zmq.REQ else "PUB"
143+
socket_name = "REQ"
148144
logger.info(f"{self.VIEWER_TYPE} streaming {socket_name} socket connected to {url}")
149145
time.sleep(0.1)
150146
self._publishers[key] = publisher

0 commit comments

Comments
 (0)