Skip to content

Commit 45ed649

Browse files
committed
Extract receiver-core projection/batching primitives and unify Fiji/Napari batch paths
This change moves streaming receiver mechanics into shared core abstractions and rewires Fiji/Napari receiver implementations to consume those primitives. Highlights: - Add receiver-core package: - batching contract ABC - debounced batch engine - window/component projection contracts and helpers - Refactor streaming base interfaces to explicit ABC boundaries (no protocol/fallback path). - Factor generic layer-key/component-layout helpers for napari receivers. - Refactor Fiji and Napari batch processors to use DebouncedBatchEngine and shared projection semantics instead of duplicated local debounce/state code. - Add architecture docs for streaming receiver projection boundaries. - Add receiver-core tests, including non-blocking enqueue behavior while processing. Outcome: - Single reusable batching/projection path for receiver implementations - reduced duplicate debounce logic and clearer separation of concerns - stronger extractability for downstream streaming servers.
1 parent 7d0f713 commit 45ed649

File tree

15 files changed

+643
-294
lines changed

15 files changed

+643
-294
lines changed

docs/source/architecture/index.rst

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
Architecture
2+
============
3+
4+
High-level architecture for polystore's storage and streaming abstractions.
5+
6+
.. toctree::
7+
:maxdepth: 1
8+
9+
streaming_receiver_projection
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
Streaming Receiver Projection
2+
=============================
3+
4+
Modules
5+
-------
6+
7+
- ``polystore.streaming.receivers.core.contracts``
8+
- ``polystore.streaming.receivers.core.batch_engine``
9+
- ``polystore.streaming.receivers.core.window_projection``
10+
- ``polystore.streaming.receivers.napari.layer_key``
11+
12+
Purpose
13+
-------
14+
15+
Provide reusable, viewer-agnostic receiver-side primitives for streaming
16+
payload projection and batched update scheduling.
17+
18+
Boundary
19+
--------
20+
21+
``polystore`` owns payload semantics and receiver projection mechanics:
22+
23+
- component-mode grouping into window/channel/slice/frame structures
24+
- canonical layer-key derivation from component metadata
25+
- debounced batch processing with bounded wait behavior
26+
- receiver contracts via nominal ABCs
27+
28+
``polystore`` does not own ZMQ transport lifecycle. Transport/server ownership
29+
belongs to ``zmqruntime``.
30+
31+
Core Contracts
32+
--------------
33+
34+
``BatchEngineABC``
35+
Contract for enqueue/flush behavior in receiver-side batch schedulers.
36+
37+
``WindowProjectionABC``
38+
Contract for grouping stream items into projected window structures.
39+
40+
Reference Implementations
41+
-------------------------
42+
43+
``DebouncedBatchEngine``
44+
Thread-safe debounce + max-wait engine for coalescing incoming items before
45+
projection/render updates.
46+
47+
``group_items_by_component_modes``
48+
Canonical grouping utility that projects incoming items by declared component
49+
modes and returns stable ``GroupedWindowItems`` output.
50+
51+
``build_layer_key``
52+
Canonical napari layer-key construction from slice-mode components and data
53+
type.
54+
55+
Design Outcome
56+
--------------
57+
58+
Receiver implementations (for example napari and Fiji wrappers in downstream
59+
applications) can share one projection/batching kernel while keeping
60+
viewer-specific rendering code separate.
61+
62+
See Also
63+
--------
64+
65+
- ``external/zmqruntime/docs/source/architecture/viewer_streaming_architecture.rst``

docs/source/index.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ Polystore provides a pluggable storage backend system with multi-framework I/O s
1111

1212
installation
1313
quickstart
14+
architecture/index
1415
api/index
1516
guides/custom_backends
1617

src/polystore/streaming/base.py

Lines changed: 48 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@
55
that work with arbitrary numbers of components (not hardcoded to 3).
66
"""
77

8-
from typing import TypeVar, Generic, Dict, Any, Protocol, runtime_checkable
8+
from abc import ABC, abstractmethod
9+
from typing import TypeVar, Generic, Dict, Any, List
910
from dataclasses import dataclass
1011

1112
T = TypeVar('T')
@@ -29,9 +30,10 @@ class TypedData(Generic[T]):
2930
source: str
3031

3132

32-
class ComponentAccessor(Protocol):
33-
"""Protocol for component metadata access (arbitrary number of components)."""
33+
class ComponentAccessor(ABC):
34+
"""ABC for component metadata access (arbitrary number of components)."""
3435

36+
@abstractmethod
3537
def get_by_mode(self, mode: str) -> list:
3638
"""
3739
Get all component names that have this mode (stack/slice/window).
@@ -46,42 +48,68 @@ def get_by_mode(self, mode: str) -> list:
4648
If config has {'channel': 'stack', 'z_index': 'slice', 'well': 'window'}
4749
Then get_by_mode('stack') returns ['channel']
4850
"""
49-
...
51+
raise NotImplementedError
5052

53+
@abstractmethod
5154
def get_value(self, item: Dict[str, Any], component_name: str) -> Any:
5255
"""
5356
Get component value for an item.
5457
5558
Returns:
5659
Value or default (0) if component not in metadata.
5760
"""
58-
...
61+
raise NotImplementedError
5962

63+
@abstractmethod
6064
def collect_values(self, component_names: list) -> list[tuple]:
6165
"""
6266
Collect unique values for given components across all items.
6367
6468
Returns:
6569
Sorted list of tuples for consistent indexing.
6670
"""
67-
...
71+
raise NotImplementedError
6872

6973

70-
class HandlerContext(Protocol):
71-
"""Protocol for handler context with generic component access."""
74+
class HandlerContext(ABC):
75+
"""ABC for handler context with generic component access."""
7276

73-
server: Any
74-
window_key: str
75-
data: 'TypedData[Any]'
76-
display_config: Dict[str, Any]
77-
components: ComponentAccessor
78-
images_dir: str | None
77+
@property
78+
@abstractmethod
79+
def server(self) -> Any:
80+
raise NotImplementedError
81+
82+
@property
83+
@abstractmethod
84+
def window_key(self) -> str:
85+
raise NotImplementedError
86+
87+
@property
88+
@abstractmethod
89+
def data(self) -> "TypedData[Any]":
90+
raise NotImplementedError
91+
92+
@property
93+
@abstractmethod
94+
def display_config(self) -> Dict[str, Any]:
95+
raise NotImplementedError
96+
97+
@property
98+
@abstractmethod
99+
def components(self) -> ComponentAccessor:
100+
raise NotImplementedError
101+
102+
@property
103+
@abstractmethod
104+
def images_dir(self) -> str | None:
105+
raise NotImplementedError
79106

80107

81-
class ItemHandler(Protocol):
82-
"""Type-safe protocol for item handlers with automatic discovery."""
108+
class ItemHandler(ABC):
109+
"""Type-safe ABC for item handlers with automatic discovery."""
83110

84111
@staticmethod
112+
@abstractmethod
85113
def can_handle(data_type: str) -> bool:
86114
"""
87115
Check if this handler can process the given data type.
@@ -92,17 +120,18 @@ def can_handle(data_type: str) -> bool:
92120
Returns:
93121
True if this handler can process this type.
94122
"""
95-
...
123+
raise NotImplementedError
96124

97125
@staticmethod
126+
@abstractmethod
98127
def handle(context: HandlerContext) -> None:
99128
"""
100129
Process items using type-safe context object.
101130
102131
Args:
103132
context: HandlerContext with typed data and component accessor.
104133
"""
105-
...
134+
raise NotImplementedError
106135

107136

108137
@dataclass(frozen=True)
@@ -171,7 +200,7 @@ def collect_values(self, component_names: list) -> list[tuple]:
171200

172201

173202
@dataclass(frozen=True)
174-
class SimpleHandlerContext(HandlerContext):
203+
class SimpleHandlerContext:
175204
"""Concrete implementation of HandlerContext protocol."""
176205

177206
server: Any

src/polystore/streaming/handlers/fiji_images.py

Lines changed: 16 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,7 @@ def can_handle(data_type: str) -> bool:
4040
@staticmethod
4141
def handle(context: HandlerContext) -> None:
4242
"""Build hyperstack from accumulated images."""
43-
# Access data via typed wrapper
44-
images = context.data
45-
46-
# Access components generically (no hardcoded 3 dimensions!)
43+
images = context.data.items
4744
channel_comps = context.components.get_by_mode("channel")
4845
slice_comps = context.components.get_by_mode("slice")
4946
frame_comps = context.components.get_by_mode("frame")
@@ -54,27 +51,20 @@ def handle(context: HandlerContext) -> None:
5451
frame_values = context.components.collect_values(frame_comps)
5552

5653
logger.info(
57-
f"🔬 FIJI IMAGE HANDLER: Processing {len(images.data)} images: "
54+
f"🔬 FIJI IMAGE HANDLER: Processing {len(images)} images: "
5855
f"{len(channel_values)}C x {len(slice_values)}Z x {len(frame_values)}T"
5956
)
60-
61-
# Build hyperstack using server's method
62-
# Note: We don't call _build_single_hyperstack directly - we need to adapt
63-
# the existing server code to work with our new architecture.
64-
# For now, this is a placeholder that logs the handler was called.
65-
# In a full implementation, this would call the actual hyperstack building
66-
# logic from FijiViewerServer.
67-
68-
# The actual implementation would be:
69-
# context.server._build_single_hyperstack(
70-
# window_key=context.window_key,
71-
# images=images.data,
72-
# display_config_dict=context.display_config,
73-
# channel_components=channel_comps,
74-
# slice_components=slice_comps,
75-
# frame_components=frame_comps,
76-
# channel_values=channel_values,
77-
# slice_values=slice_values,
78-
# frame_values=frame_values,
79-
# component_names_metadata=context.display_config.get('component_names_metadata', {}),
80-
# )
57+
context.server._build_single_hyperstack(
58+
window_key=context.window_key,
59+
images=images,
60+
display_config_dict=context.display_config,
61+
channel_components=channel_comps,
62+
slice_components=slice_comps,
63+
frame_components=frame_comps,
64+
channel_values=channel_values,
65+
slice_values=slice_values,
66+
frame_values=frame_values,
67+
component_names_metadata=context.display_config.get(
68+
"component_names_metadata", {}
69+
),
70+
)

src/polystore/streaming/receivers/__init__.py

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,28 @@
44
Provides reusable batching and debouncing logic for Fiji and Napari viewers.
55
"""
66

7+
from polystore.streaming.receivers.core import (
8+
BatchEngineABC,
9+
WindowProjectionABC,
10+
DebouncedBatchEngine,
11+
GroupedWindowItems,
12+
group_items_by_component_modes,
13+
)
714
from polystore.streaming.receivers.fiji.fiji_batch_processor import FijiBatchProcessor
8-
from polystore.streaming.receivers.napari.napari_batch_processor import NapariBatchProcessor
15+
from polystore.streaming.receivers.napari import (
16+
NapariBatchProcessor,
17+
normalize_component_layout,
18+
build_layer_key,
19+
)
920

1021
__all__ = [
22+
"BatchEngineABC",
23+
"WindowProjectionABC",
24+
"DebouncedBatchEngine",
25+
"GroupedWindowItems",
26+
"group_items_by_component_modes",
1127
"FijiBatchProcessor",
1228
"NapariBatchProcessor",
29+
"normalize_component_layout",
30+
"build_layer_key",
1331
]
14-
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
"""Shared receiver-core utilities for streaming viewers."""
2+
3+
from polystore.streaming.receivers.core.batch_engine import DebouncedBatchEngine
4+
from polystore.streaming.receivers.core.contracts import (
5+
BatchEngineABC,
6+
WindowProjectionABC,
7+
)
8+
from polystore.streaming.receivers.core.window_projection import (
9+
GroupedWindowItems,
10+
group_items_by_component_modes,
11+
)
12+
13+
__all__ = [
14+
"BatchEngineABC",
15+
"WindowProjectionABC",
16+
"DebouncedBatchEngine",
17+
"GroupedWindowItems",
18+
"group_items_by_component_modes",
19+
]
20+

0 commit comments

Comments
 (0)