Skip to content

Commit 3899edc

Browse files
committed
Add Zenoh pub/sub layer
1 parent e9b3f94 commit 3899edc

8 files changed

Lines changed: 299 additions & 2 deletions

File tree

dimos/core/transport.py

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
from dimos.protocol.pubsub.impl.lcmpubsub import LCM, JpegLCM, PickleLCM, Topic as LCMTopic
3636
from dimos.protocol.pubsub.impl.rospubsub import DimosROS, ROSTopic
3737
from dimos.protocol.pubsub.impl.shmpubsub import BytesSharedMemory, PickleSharedMemory
38+
from dimos.protocol.pubsub.impl.zenohpubsub import Topic as ZenohTopic, ZenohPubSub
3839

3940
if TYPE_CHECKING:
4041
from collections.abc import Callable
@@ -319,4 +320,35 @@ def subscribe(
319320
return self.dds.subscribe(self.topic, lambda msg, topic: callback(msg))
320321

321322

322-
class ZenohTransport(PubSubTransport[T]): ...
323+
class ZenohTransport(PubSubTransport[T]):
324+
def __init__(self, topic: str, **kwargs) -> None: # type: ignore[no-untyped-def]
325+
super().__init__(ZenohTopic(topic))
326+
self.zenoh = ZenohPubSub(**kwargs)
327+
self._started: bool = False
328+
self._start_lock = threading.RLock()
329+
330+
def start(self) -> None:
331+
with self._start_lock:
332+
if not self._started:
333+
self.zenoh.start()
334+
self._started = True
335+
336+
def stop(self) -> None:
337+
with self._start_lock:
338+
if self._started:
339+
self.zenoh.stop()
340+
self._started = False
341+
342+
def broadcast(self, _, msg) -> None: # type: ignore[no-untyped-def]
343+
with self._start_lock:
344+
if not self._started:
345+
self.start()
346+
self.zenoh.publish(self.topic, msg)
347+
348+
def subscribe(
349+
self, callback: Callable[[T], None], selfstream: Stream[T] | None = None
350+
) -> Callable[[], None]:
351+
with self._start_lock:
352+
if not self._started:
353+
self.start()
354+
return self.zenoh.subscribe(self.topic, lambda msg, topic: callback(msg))

dimos/protocol/pubsub/benchmark/testdata.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
LCMSharedMemory,
3636
PickleSharedMemory,
3737
)
38+
from dimos.protocol.pubsub.impl.zenohpubsub import Topic as ZenohTopic, ZenohPubSub
3839

3940

4041
def make_data_bytes(size: int) -> bytes:
@@ -272,6 +273,27 @@ def redis_msggen(size: int) -> tuple[str, Any]:
272273
print("Redis not available")
273274

274275

276+
@contextmanager
277+
def zenoh_pubsub_channel() -> Generator[ZenohPubSub, None, None]:
278+
zenoh_pubsub = ZenohPubSub()
279+
zenoh_pubsub.start()
280+
yield zenoh_pubsub
281+
zenoh_pubsub.stop()
282+
283+
284+
def zenoh_msggen(size: int) -> tuple[ZenohTopic, bytes]:
285+
"""Generate raw bytes for Zenoh pubsub benchmark."""
286+
return (ZenohTopic("benchmark/zenoh"), make_data_bytes(size))
287+
288+
289+
testcases.append(
290+
Case(
291+
pubsub_context=zenoh_pubsub_channel,
292+
msg_gen=zenoh_msggen,
293+
)
294+
)
295+
296+
275297
from dimos.protocol.pubsub.impl.rospubsub import (
276298
ROS_AVAILABLE,
277299
DimosROS,

dimos/protocol/pubsub/impl/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,4 @@
44
PickleLCM as PickleLCM,
55
)
66
from dimos.protocol.pubsub.impl.memory import Memory as Memory
7+
from dimos.protocol.pubsub.impl.zenohpubsub import ZenohPubSub as ZenohPubSub
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
# Copyright 2025-2026 Dimensional Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from __future__ import annotations
16+
17+
from collections.abc import Callable
18+
from dataclasses import dataclass
19+
import threading
20+
from typing import TYPE_CHECKING, Any, TypeAlias
21+
22+
from dimos.protocol.pubsub.spec import PubSub
23+
from dimos.protocol.service.zenohservice import ZenohService
24+
from dimos.utils.logging_config import setup_logger
25+
26+
if TYPE_CHECKING:
27+
import zenoh
28+
29+
logger = setup_logger()
30+
31+
32+
@dataclass(frozen=True)
33+
class Topic:
34+
"""Represents a Zenoh topic (key expression)."""
35+
36+
name: str
37+
38+
def __str__(self) -> str:
39+
return self.name
40+
41+
42+
MessageCallback: TypeAlias = Callable[[Any, Topic], None]
43+
44+
45+
class ZenohPubSub(ZenohService, PubSub[Topic, Any]):
46+
def __init__(self, **kwargs: Any) -> None:
47+
super().__init__(**kwargs)
48+
self._publishers: dict[Topic, zenoh.Publisher] = {}
49+
self._publisher_lock = threading.Lock()
50+
self._subscribers: list[zenoh.Subscriber] = []
51+
self._subscriber_lock = threading.Lock()
52+
53+
def _get_publisher(self, topic: Topic) -> zenoh.Publisher:
54+
"""Get or create a Publisher for the given topic."""
55+
with self._publisher_lock:
56+
if topic not in self._publishers:
57+
self._publishers[topic] = self.session.declare_publisher(topic.name)
58+
return self._publishers[topic]
59+
60+
def publish(self, topic: Topic, message: bytes | str) -> None:
61+
"""Publish a message to a Zenoh topic."""
62+
publisher = self._get_publisher(topic)
63+
try:
64+
publisher.put(message)
65+
except Exception as e:
66+
logger.error(f"Error publishing to topic {topic}: {e}", exc_info=True)
67+
68+
def subscribe(self, topic: Topic, callback: MessageCallback) -> Callable[[], None]:
69+
"""Subscribe to a Zenoh topic with a callback.
70+
71+
Each call declares its own Zenoh subscriber (Zenoh spawns a
72+
background thread per callback handler). Unsubscribe undeclares it.
73+
"""
74+
75+
def on_sample(sample: zenoh.Sample) -> None:
76+
callback(sample.payload.to_bytes(), topic)
77+
78+
sub = self.session.declare_subscriber(topic.name, on_sample)
79+
with self._subscriber_lock:
80+
self._subscribers.append(sub)
81+
82+
def unsubscribe() -> None:
83+
sub.undeclare()
84+
with self._subscriber_lock:
85+
try:
86+
self._subscribers.remove(sub)
87+
except ValueError:
88+
pass
89+
90+
return unsubscribe
91+
92+
def stop(self) -> None:
93+
"""Stop the Zenoh pub/sub and clean up resources."""
94+
with self._subscriber_lock:
95+
for subscriber in self._subscribers:
96+
subscriber.undeclare()
97+
self._subscribers.clear()
98+
with self._publisher_lock:
99+
for publisher in self._publishers.values():
100+
publisher.undeclare()
101+
self._publishers.clear()
102+
super().stop()
103+
104+
105+
__all__ = [
106+
"MessageCallback",
107+
"Topic",
108+
"ZenohPubSub",
109+
]

dimos/protocol/pubsub/test_spec.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
from dimos.msgs.geometry_msgs import Vector3
2626
from dimos.protocol.pubsub.impl.lcmpubsub import LCM, Topic
2727
from dimos.protocol.pubsub.impl.memory import Memory
28+
from dimos.protocol.pubsub.impl.zenohpubsub import Topic as ZenohTopic, ZenohPubSub
2829

2930

3031
@contextmanager
@@ -124,6 +125,16 @@ def lcm_context() -> Generator[LCM, None, None]:
124125
)
125126

126127

128+
@contextmanager
129+
def zenoh_context() -> Generator[ZenohPubSub, None, None]:
130+
zenoh_pubsub = ZenohPubSub()
131+
zenoh_pubsub.start()
132+
yield zenoh_pubsub
133+
zenoh_pubsub.stop()
134+
135+
136+
testdata.append((zenoh_context, ZenohTopic("test/zenoh/spec"), [b"value1", b"value2", b"value3"]))
137+
127138
from dimos.protocol.pubsub.impl.shmpubsub import PickleSharedMemory
128139

129140

dimos/protocol/service/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from dimos.protocol.service.lcmservice import LCMService
22
from dimos.protocol.service.spec import Configurable as Configurable, Service as Service
3+
from dimos.protocol.service.zenohservice import ZenohService as ZenohService
34

45
__all__ = [
56
"Configurable",
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
# Copyright 2025-2026 Dimensional Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from __future__ import annotations
16+
17+
from dataclasses import dataclass, field
18+
import json
19+
import threading
20+
from typing import Any
21+
22+
import zenoh
23+
24+
from dimos.protocol.service.spec import Service
25+
from dimos.utils.logging_config import setup_logger
26+
27+
logger = setup_logger()
28+
29+
_sessions: dict[str, zenoh.Session] = {}
30+
_sessions_lock = threading.Lock()
31+
32+
33+
@dataclass
34+
class ZenohConfig:
35+
"""Configuration for Zenoh service."""
36+
37+
mode: str = "peer"
38+
connect: list[str] = field(default_factory=list)
39+
listen: list[str] = field(default_factory=list)
40+
41+
@property
42+
def session_key(self) -> str:
43+
"""Produce a hashable key for singleton session lookup."""
44+
return f"{self.mode}|{json.dumps(sorted(self.connect))}|{json.dumps(sorted(self.listen))}"
45+
46+
47+
class ZenohService(Service[ZenohConfig]):
48+
default_config = ZenohConfig
49+
50+
def __init__(self, **kwargs: Any) -> None:
51+
super().__init__(**kwargs)
52+
53+
def start(self) -> None:
54+
"""Start the Zenoh service."""
55+
key = self.config.session_key
56+
with _sessions_lock:
57+
if key not in _sessions:
58+
config = zenoh.Config()
59+
config.insert_json5("mode", json.dumps(self.config.mode))
60+
if self.config.connect:
61+
config.insert_json5("connect/endpoints", json.dumps(self.config.connect))
62+
if self.config.listen:
63+
config.insert_json5("listen/endpoints", json.dumps(self.config.listen))
64+
_sessions[key] = zenoh.open(config)
65+
logger.info(f"Zenoh service started in {self.config.mode} mode")
66+
super().start()
67+
68+
def stop(self) -> None:
69+
"""Stop the Zenoh service."""
70+
super().stop()
71+
72+
@property
73+
def session(self) -> zenoh.Session:
74+
"""Get the Zenoh Session instance for this service's config."""
75+
key = self.config.session_key
76+
if key not in _sessions:
77+
raise RuntimeError("Zenoh session not initialized")
78+
return _sessions[key]
79+
80+
81+
__all__ = [
82+
"ZenohConfig",
83+
"ZenohService",
84+
]

docs/usage/transports.md

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ P: box "PubSub" rad 5px
6464
# Descriptions below
6565
text "robot configs" at B.s + (0.1, -0.2in)
6666
text "camera, nav" at M.s + (0, -0.2in)
67-
text "LCM, SHM, ROS" at T.s + (0, -0.2in)
67+
text "LCM, SHM, Zenoh" at T.s + (0, -0.2in)
6868
text "pub/sub API" at P.s + (0, -0.2in)
6969
```
7070

@@ -289,6 +289,42 @@ shm.stop()
289289
Received: [{'data': [1, 2, 3]}]
290290
```
291291

292+
### Zenoh
293+
294+
Zenoh is a high-performance pub/sub protocol. Topics are untyped key expressions; payloads are raw `bytes` or `str` (serialization is your choice).
295+
296+
```python session=zenoh_demo ansi=false
297+
import json
298+
from dataclasses import dataclass, asdict
299+
from dimos.protocol.pubsub.impl.zenohpubsub import ZenohPubSub, Topic
300+
301+
@dataclass
302+
class SensorReading:
303+
value: float
304+
305+
zenoh = ZenohPubSub()
306+
zenoh.start()
307+
308+
received = []
309+
sensor_topic = Topic(name="sensors/temperature")
310+
311+
zenoh.subscribe(sensor_topic, lambda msg, t: received.append(SensorReading(**json.loads(msg))))
312+
zenoh.publish(sensor_topic, json.dumps(asdict(SensorReading(value=22.5))))
313+
314+
import time
315+
time.sleep(0.1)
316+
317+
print(f"Received: {received}")
318+
zenoh.stop()
319+
```
320+
321+
<!--Result:-->
322+
```
323+
Received: [SensorReading(value=22.5)]
324+
```
325+
326+
Zenoh is interoperable with any Zenoh client (Rust, C, C++, etc.) since it sends raw bytes on the wire. Use any serialization format you like (JSON, Protobuf, LCM binary, etc.) — the transport doesn't impose one.
327+
292328
### DDS Transport
293329

294330
For network communication, DDS uses the Data Distribution Service (DDS) protocol:
@@ -434,4 +470,5 @@ python -m pytest -svm tool -k "not bytes" dimos/protocol/pubsub/benchmark/test_b
434470
| `LCM` | Robot LAN broadcast (UDP multicast) | Yes | Yes | Best-effort; can drop packets on LAN |
435471
| `Redis` | Network pubsub via Redis server | Yes | Yes | Central broker; adds hop |
436472
| `ROS` | ROS 2 topic communication | Yes | Yes | Integrates with RViz/ROS tools |
473+
| `Zenoh` | High-perf network pubsub | Yes | Yes | Raw bytes/str; no serialization overhead |
437474
| `DDS` | Cyclone DDS without ROS (WIP) | Yes | Yes | WIP |

0 commit comments

Comments
 (0)