Skip to content

Default to Zenoh transport on macOS and document replay workflow#1906

Open
bogwi wants to merge 35 commits intodimensionalOS:devfrom
bogwi:feat/integrate-zenoh
Open

Default to Zenoh transport on macOS and document replay workflow#1906
bogwi wants to merge 35 commits intodimensionalOS:devfrom
bogwi:feat/integrate-zenoh

Conversation

@bogwi
Copy link
Copy Markdown
Collaborator

@bogwi bogwi commented Apr 23, 2026

Supersedes #1787.

This PR carries forward the Zenoh transport integration from #1787 and wraps it into a merge-ready branch that fixes the remaining macOS Big Office replay gap.

Validation

Typical replay on macOS when Zenoh is installed (default is already Zenoh, so no transport flag is required):

dimos --dtop --replay --replay-db=go2_bigoffice run unitree-go2

The same workload on Linux (default remains lcm until you opt in):

dimos --transport=zenoh --dtop --replay --replay-db=go2_bigoffice run unitree-go2

Notes

  • this PR is intended as the wrapped successor to Feat/integrate zenoh #1787, not a separate redesign
  • Linux behavior remains unchanged by default: explicit Zenoh still works, and the default transport remains LCM

@bogwi bogwi changed the title Wrap Zenoh integration for macOS Big Office replay Default to Zenoh transport on macOS and document replay workflow Apr 23, 2026
@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps Bot commented Apr 23, 2026

Greptile Summary

  • Adds Zenoh as a first-class stream transport backend (ZenohTransport, pZenohTransport, ZenohPubSubBase, ZenohService) with a macOS platform default: when eclipse-zenoh is importable on Darwin, GlobalConfig.transport defaults to "zenoh" instead of "lcm", fixing large replay reliability on macOS.
  • The Rerun bridge's _resolve_pubsubs now selects pubsub backends at runtime from GlobalConfig.transport rather than hardcoding [LCM()]; when transport is Zenoh, both [Zenoh(), LCM()] are subscribed simultaneously to retain LCM-based TF frame delivery.
  • Previously-flagged issues (import-safe test collection via pytest.importorskip, pubsubs override honoring, and the _retry_until import cycle) are resolved; however the documented DIMOS_TRANSPORT env var is still ineffective because the transport field carries no env= alias and pydantic-settings resolves it as TRANSPORT.

Confidence Score: 4/5

Safe to merge for macOS replay functionality; the documented DIMOS_TRANSPORT env var is silently ignored (a P1 from a prior review iteration that remains open), but core transport selection via CLI and default factory works correctly.

The implementation is solid — conditional Zenoh import guards, singleton session management, proper test-skip markers, and backward-compatible bridge pubsub resolution are all correct. Score capped at 4 because the DIMOS_TRANSPORT env var documentation mismatch (P1, flagged in a previous review round) is still unresolved in the current code: transport: TransportBackend = Field(default_factory=_default_transport) carries no env="DIMOS_TRANSPORT" alias, so users following the documented export DIMOS_TRANSPORT=zenoh pattern get silent no-ops.

dimos/core/global_config.py — the transport field's missing env= alias means DIMOS_TRANSPORT (documented in transports/index.md and osx.md) has no effect at runtime.

Important Files Changed

Filename Overview
dimos/core/global_config.py Adds transport field with platform-aware default factory; no env= alias means documented DIMOS_TRANSPORT env var silently has no effect (effective name is TRANSPORT).
dimos/core/transport.py Adds ZENOH_AVAILABLE guard and conditional ZenohTransport/pZenohTransport definitions inside if ZENOH_AVAILABLE: block; correct lazy imports in module_coordinator prevent ImportError when zenoh is absent.
dimos/protocol/service/zenohservice.py Implements singleton Zenoh session pattern with module-level _sessions dict; unconditional import zenoh is safe because this file is only imported inside the if ZENOH_AVAILABLE: guard in transport.py and behind pytest.importorskip in tests.
dimos/protocol/pubsub/impl/zenohpubsub.py Clean Zenoh pub/sub implementation; zenoh import is TYPE_CHECKING-only at module level; thread-safe publisher cache and subscriber tracking with correct double-undeclare prevention.
dimos/visualization/rerun/bridge.py Adds transport-aware _default_pubsubs and _resolve_pubsubs; correctly treats legacy [LCM()] as backward-compat default while honoring explicit non-LCM overrides; dual Zenoh+LCM subscription intentional for TF.
dimos/core/coordination/module_coordinator.py Transport branching in _get_transport_for is clean; dimos{topic} prefix construction correct; LCM configurator gating skipped for Zenoh transport as intended.
dimos/core/test_zenoh_transport.py Good coverage of config defaults, transport branching, and Zenoh transport lifecycle; ZENOH_AVAILABLE skipif markers and _clean_sessions autouse fixture properly isolate tests.
dimos/protocol/pubsub/impl/test_zenohpubsub.py Uses pytest.importorskip("zenoh") at module top; comprehensive pub/sub, unsubscribe idempotency, and topic key-expr round-trip tests.
dimos/protocol/service/test_zenohservice.py Uses pytest.importorskip("zenoh") guard; autouse fixture properly clears sessions; covers singleton sharing, stop-without-close contract, and idempotent start.
dimos/core/test_utils.py Minimal import-safe retry_until helper; no zenoh dependency; correct use of Timer for deadline and event for signalling.

Flowchart

%%{init: {'theme': 'neutral'}}%%
flowchart TD
    CLI["CLI --transport=lcm|zenoh"] --> GC["GlobalConfig.transport\n(default_factory: Darwin+Zenoh→zenoh, else lcm)"]
    ENV["TRANSPORT env var\n(note: DIMOS_TRANSPORT has no effect)"] --> GC

    GC --> MC["_get_transport_for()\nmodule_coordinator.py"]
    MC -->|transport==lcm| LCMt["LCMTransport / pLCMTransport"]
    MC -->|transport==zenoh| ZT["ZenohTransport / pZenohTransport"]
    MC -->|zenoh but !ZENOH_AVAILABLE| ERR["RuntimeError"]

    GC --> BP["_default_pubsubs()\nbridge.py"]
    BP -->|transport==lcm| LCMb["[LCM()]"]
    BP -->|transport==zenoh| ZB["[Zenoh(), LCM()]\n(LCM retained for TF frames)"]

    ZT --> ZPS["ZenohPubSubBase\n(publisher cache + subscriber list)"]
    ZB --> ZPS
    ZPS --> ZS["ZenohService\n(singleton _sessions dict)"]
    ZS --> ZSess["zenoh.Session\n(shared per ZenohConfig key)"]

    LCMt --> LCM["LCM UDP multicast"]
    LCMb --> LCM
Loading

Reviews (10): Last reviewed commit: "Merge branch 'dev' into feat/integrate-z..." | Re-trigger Greptile

Comment thread dimos/core/test_zenoh_transport.py
Comment thread dimos/visualization/rerun/bridge.py Outdated
Comment thread dimos/protocol/service/test_zenohservice.py
vrinek and others added 26 commits May 4, 2026 19:00
Prepare the codebase for Zenoh integration without changing behavior.
All existing tests pass (1401 passed, 3 xfailed for Phase 2 stubs).

- Add `transport` field to GlobalConfig (default: "lcm")
- Add ZENOH_AVAILABLE guard in transport.py
- Branch _get_transport_for() on global_config.transport
- Gate LCM configurators to only run when transport is "lcm"
- Add ZenohTransport/pZenohTransport behind ZENOH_AVAILABLE guard
- Add zenohpubsub.py stub (raises NotImplementedError)
- Add `zenoh` optional dependency group in pyproject.toml
- Add test_zenoh_transport.py covering all new conditional branches
TDD: tests written first, then implementation.
Follows DDSService pattern — module-level session dict with lock.

- ZenohConfig with mode/connect/listen fields and session_key
- ZenohService.start() opens session if not exists for config
- ZenohService.stop() does NOT close shared session
- session property raises RuntimeError if not started
- Two services with same config share one session (8 tests pass)
TDD: tests written first, then implementation.

- ZenohPubSubBase(ZenohService, AllPubSub[Topic, bytes])
- Publisher caching per key expression (avoids re-declaring)
- Subscriber tracking for cleanup on stop()
- Idempotent unsubscribe (guards against Zenoh ZError)
- subscribe_all() via dimos/** wildcard
- Zenoh and PickleZenoh composed classes (encoder mixins)
- 7 unit tests pass
Both encoder-composed variants pass all spec conformance tests:
- test_store, test_multiple_subscribers, test_unsubscribe
- test_multiple_messages, test_async_iterator
- 25 total tests pass (10 new Zenoh tests)
Remove xfail markers — Phase 2 stubs are now real implementations.
Add transport wrapper integration tests for broadcast/subscribe.

- ZenohTransport wraps Zenoh (LCM-encoded) with DDSTransport pattern
- pZenohTransport wraps PickleZenoh with Topic wrapping for pubsub layer
- Auto-start on first broadcast, stop/restart lifecycle
- 16 tests pass (4 new wrapper tests)
Zenoh appears alongside LCM, SHM in benchmark heatmaps.
Results: competitive with LCM for localhost — 82-149k msgs/sec
for small messages, 0% message loss, <1ms latency.
Transport-level errors (session closed, invalid key expression) are
logged but not raised. Delivery guarantees are handled by Zenoh's
reliability protocol, not by exception propagation.
- Fix dimensionalOS#3: unsubscribe() now only calls undeclare() if it successfully
  removed the subscriber from the list. If stop() already cleared the
  list, unsubscribe() returns without double-undeclaring.
- Fix dimensionalOS#5: on_sample callback wraps payload.to_bytes() in try/except
  to prevent malformed payloads from crashing Zenoh's internal thread.
Check membership before removing instead of catching ValueError.
Reads more clearly and avoids using exceptions for control flow.
Two issues prevented the Rerun bridge from showing data over Zenoh:

1. The bridge hardcoded LCM() as its pubsub. Now resolves lazily at
   start() using self.config.g.transport from the worker's GlobalConfig.

2. Zenoh key expressions cannot contain '#' (forbidden character).
   Type info is now embedded as a '/' segment in the key expression
   (e.g., dimos/pointcloud/sensor_msgs.PointCloud2). _key_expr_to_topic
   reconstructs the Topic with lcm_type for subscribe_all decoding.

Also fixes entity path mapping to strip the dimos/ prefix so Zenoh
entity paths match LCM paths in the Rerun viewer.
- typed_out/untyped_out → typed_data/untyped_data
- Use TypedMsg instead of Image for blueprint integration tests
- Image still used in transport wrapper test (real LCM round-trip)
Replace raw time.sleep() calls with named helpers that document intent.
wait_for_subscribers() explains Zenoh has no "subscriber ready" signal.
Replace manual if-both-received check with threading.Barrier(2).
The previous approach could miss the event if both callbacks ran
concurrently and checked the other's list before it was populated.
Review findings dimensionalOS#2 and dimensionalOS#4:
- Remove Config.pubsubs from RerunBridgeModule — pubsubs are resolved
  lazily at start() from global_config.transport
- Remove _zenoh_topic field from pZenohTransport — construct on demand
  like pLCMTransport does, avoiding dual state
8 new tests covering:
- Typed/untyped topic → key expression conversion
- Key expression → topic with known/unknown/missing type
- Default lcm_type fallback
- Round-trip typed and untyped

Also documents known limitation: if a topic's base path ends with a
segment matching a registered DimosMsg type name, _key_expr_to_topic
will incorrectly split it. In practice this doesn't happen because
stream names (cmd_vel, lidar) don't match type names.
Existing blueprints pass pubsubs=[LCM()] to RerunBridgeModule.
Removing the field caused a Pydantic ValidationError (extra_forbidden).
Keep the field but document that it's ignored — start() resolves
the pubsub backend from global_config.transport instead.
TF (transform frames) is hardcoded to LCM in the Module base class.
When transport=zenoh, module streams use Zenoh but TF stays on LCM.
The bridge now listens on both so the robot pose updates in the viewer.
Zenoh tests used time.sleep() to wait for subscriber propagation,
which is either too slow or too flaky in CI. Replace with _retry_until()
that re-publishes in a tight loop until the subscriber's Event fires.
Calls zenoh.init_log_from_env_or("warn") at module load so that
RUST_LOG=debug surfaces Zenoh's Rust-side transport logs (including
SHM negotiation). Defaults to warn to avoid noise.
uv sync --extra zenoh would resolve dimos[dev] from PyPI instead of
the local project, uninstalling other dependencies. The zenoh extra
only needs eclipse-zenoh — base deps are already installed.
… missing

Align with module_coordinator._get_transport_for: raise RuntimeError instead
of silently falling back to LCM when transport is zenoh and eclipse-zenoh is
not installed.
@bogwi bogwi force-pushed the feat/integrate-zenoh branch from 93cfd83 to b8c777a Compare May 4, 2026 10:04
@bogwi bogwi force-pushed the feat/integrate-zenoh branch from 4141991 to fd20ca8 Compare May 4, 2026 13:52
@bogwi
Copy link
Copy Markdown
Collaborator Author

bogwi commented May 4, 2026

Greptile Summary

  • P1 — wrong env-var name in docs: docs/usage/transports/index.md (and osx.md) document DIMOS_TRANSPORT=zenoh as the env-var override, but GlobalConfig uses pydantic_settings.BaseSettings with no env_prefix, so the actual variable pydantic-settings reads is TRANSPORT. Setting DIMOS_TRANSPORT will be silently ignored.

Confidence Score: 4/5

Safe to merge with the env-var name corrected in docs; runtime behavior is unaffected by the docs bug.

One P1 finding: the documented env-var DIMOS_TRANSPORT will silently have no effect because GlobalConfig has no env_prefix. The code itself is correct and well-tested; the bug is isolated to documentation. P1 ceiling is 4/5.

docs/usage/transports/index.md and docs/installation/osx.md — both reference the incorrect DIMOS_TRANSPORT env-var name.

This is not the purpose of this branch to add these changes to global_config.py:

model_config = SettingsConfigDict(
        env_prefix="DIMOS_",
        ...

It will be breaking cause anyone relying on unprefixed env vars for GlobalConfig will no longer get that effect; they need DIMOS_ for those settings to load from the environment / .env even though the cli.md in line 53 says, - "Environment variables and .env values must be prefixed with DIMOS_" . That is not yet done, but the docs for this PR have to be aligned so to not tackle them later in yet a new PR.

@bogwi
Copy link
Copy Markdown
Collaborator Author

bogwi commented May 4, 2026

  1. The branch defaults to zenoh on Mac yet needs a flag on the rest.

From dimos/docs/usage/transports/index.md

Typical replay on macOS when Zenoh is installed (default is already Zenoh, so no transport flag is required):

dimos --dtop --replay --replay-db=go2_bigoffice run unitree-go2

The same workload on Linux (default remains lcm until you opt in):

dimos --transport=zenoh --dtop --replay --replay-db=go2_bigoffice run unitree-go2
  1. All issues mentioned in the opening PR Feat/integrate zenoh #1787 and recent Finish Zenoh integration Finish Zenoh integration #1941 were addressed.

Comment thread docs/usage/transports/index.md
Agent and MCP conftest subscribe on LCM while the coordinator uses
global_config.transport, which defaults to Zenoh on Darwin when Zenoh is
installed. Set transport to lcm so spies and modules share the same backend.

The module reloading test runs a separate Python process; set transport to
lcm in the REPL before ModuleCoordinator.build for the same reason.
@bogwi bogwi force-pushed the feat/integrate-zenoh branch from 593bb38 to 791b722 Compare May 5, 2026 00:33
@leshy leshy added the PlzReview label May 5, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants