Default to Zenoh transport on macOS and document replay workflow#1906
Default to Zenoh transport on macOS and document replay workflow#1906bogwi wants to merge 35 commits intodimensionalOS:devfrom
Conversation
Greptile Summary
Confidence Score: 4/5Safe to merge for macOS replay functionality; the documented DIMOS_TRANSPORT env var is silently ignored (a P1 from a prior review iteration that remains open), but core transport selection via CLI and default factory works correctly. The implementation is solid — conditional Zenoh import guards, singleton session management, proper test-skip markers, and backward-compatible bridge pubsub resolution are all correct. Score capped at 4 because the DIMOS_TRANSPORT env var documentation mismatch (P1, flagged in a previous review round) is still unresolved in the current code: dimos/core/global_config.py — the Important Files Changed
Flowchart%%{init: {'theme': 'neutral'}}%%
flowchart TD
CLI["CLI --transport=lcm|zenoh"] --> GC["GlobalConfig.transport\n(default_factory: Darwin+Zenoh→zenoh, else lcm)"]
ENV["TRANSPORT env var\n(note: DIMOS_TRANSPORT has no effect)"] --> GC
GC --> MC["_get_transport_for()\nmodule_coordinator.py"]
MC -->|transport==lcm| LCMt["LCMTransport / pLCMTransport"]
MC -->|transport==zenoh| ZT["ZenohTransport / pZenohTransport"]
MC -->|zenoh but !ZENOH_AVAILABLE| ERR["RuntimeError"]
GC --> BP["_default_pubsubs()\nbridge.py"]
BP -->|transport==lcm| LCMb["[LCM()]"]
BP -->|transport==zenoh| ZB["[Zenoh(), LCM()]\n(LCM retained for TF frames)"]
ZT --> ZPS["ZenohPubSubBase\n(publisher cache + subscriber list)"]
ZB --> ZPS
ZPS --> ZS["ZenohService\n(singleton _sessions dict)"]
ZS --> ZSess["zenoh.Session\n(shared per ZenohConfig key)"]
LCMt --> LCM["LCM UDP multicast"]
LCMb --> LCM
Reviews (10): Last reviewed commit: "Merge branch 'dev' into feat/integrate-z..." | Re-trigger Greptile |
Prepare the codebase for Zenoh integration without changing behavior. All existing tests pass (1401 passed, 3 xfailed for Phase 2 stubs). - Add `transport` field to GlobalConfig (default: "lcm") - Add ZENOH_AVAILABLE guard in transport.py - Branch _get_transport_for() on global_config.transport - Gate LCM configurators to only run when transport is "lcm" - Add ZenohTransport/pZenohTransport behind ZENOH_AVAILABLE guard - Add zenohpubsub.py stub (raises NotImplementedError) - Add `zenoh` optional dependency group in pyproject.toml - Add test_zenoh_transport.py covering all new conditional branches
TDD: tests written first, then implementation. Follows DDSService pattern — module-level session dict with lock. - ZenohConfig with mode/connect/listen fields and session_key - ZenohService.start() opens session if not exists for config - ZenohService.stop() does NOT close shared session - session property raises RuntimeError if not started - Two services with same config share one session (8 tests pass)
TDD: tests written first, then implementation. - ZenohPubSubBase(ZenohService, AllPubSub[Topic, bytes]) - Publisher caching per key expression (avoids re-declaring) - Subscriber tracking for cleanup on stop() - Idempotent unsubscribe (guards against Zenoh ZError) - subscribe_all() via dimos/** wildcard - Zenoh and PickleZenoh composed classes (encoder mixins) - 7 unit tests pass
Both encoder-composed variants pass all spec conformance tests: - test_store, test_multiple_subscribers, test_unsubscribe - test_multiple_messages, test_async_iterator - 25 total tests pass (10 new Zenoh tests)
Remove xfail markers — Phase 2 stubs are now real implementations. Add transport wrapper integration tests for broadcast/subscribe. - ZenohTransport wraps Zenoh (LCM-encoded) with DDSTransport pattern - pZenohTransport wraps PickleZenoh with Topic wrapping for pubsub layer - Auto-start on first broadcast, stop/restart lifecycle - 16 tests pass (4 new wrapper tests)
Zenoh appears alongside LCM, SHM in benchmark heatmaps. Results: competitive with LCM for localhost — 82-149k msgs/sec for small messages, 0% message loss, <1ms latency.
Transport-level errors (session closed, invalid key expression) are logged but not raised. Delivery guarantees are handled by Zenoh's reliability protocol, not by exception propagation.
- Fix dimensionalOS#3: unsubscribe() now only calls undeclare() if it successfully removed the subscriber from the list. If stop() already cleared the list, unsubscribe() returns without double-undeclaring. - Fix dimensionalOS#5: on_sample callback wraps payload.to_bytes() in try/except to prevent malformed payloads from crashing Zenoh's internal thread.
Check membership before removing instead of catching ValueError. Reads more clearly and avoids using exceptions for control flow.
Two issues prevented the Rerun bridge from showing data over Zenoh: 1. The bridge hardcoded LCM() as its pubsub. Now resolves lazily at start() using self.config.g.transport from the worker's GlobalConfig. 2. Zenoh key expressions cannot contain '#' (forbidden character). Type info is now embedded as a '/' segment in the key expression (e.g., dimos/pointcloud/sensor_msgs.PointCloud2). _key_expr_to_topic reconstructs the Topic with lcm_type for subscribe_all decoding. Also fixes entity path mapping to strip the dimos/ prefix so Zenoh entity paths match LCM paths in the Rerun viewer.
- typed_out/untyped_out → typed_data/untyped_data - Use TypedMsg instead of Image for blueprint integration tests - Image still used in transport wrapper test (real LCM round-trip)
Replace raw time.sleep() calls with named helpers that document intent. wait_for_subscribers() explains Zenoh has no "subscriber ready" signal.
Replace manual if-both-received check with threading.Barrier(2). The previous approach could miss the event if both callbacks ran concurrently and checked the other's list before it was populated.
Review findings dimensionalOS#2 and dimensionalOS#4: - Remove Config.pubsubs from RerunBridgeModule — pubsubs are resolved lazily at start() from global_config.transport - Remove _zenoh_topic field from pZenohTransport — construct on demand like pLCMTransport does, avoiding dual state
8 new tests covering: - Typed/untyped topic → key expression conversion - Key expression → topic with known/unknown/missing type - Default lcm_type fallback - Round-trip typed and untyped Also documents known limitation: if a topic's base path ends with a segment matching a registered DimosMsg type name, _key_expr_to_topic will incorrectly split it. In practice this doesn't happen because stream names (cmd_vel, lidar) don't match type names.
Existing blueprints pass pubsubs=[LCM()] to RerunBridgeModule. Removing the field caused a Pydantic ValidationError (extra_forbidden). Keep the field but document that it's ignored — start() resolves the pubsub backend from global_config.transport instead.
TF (transform frames) is hardcoded to LCM in the Module base class. When transport=zenoh, module streams use Zenoh but TF stays on LCM. The bridge now listens on both so the robot pose updates in the viewer.
Zenoh tests used time.sleep() to wait for subscriber propagation, which is either too slow or too flaky in CI. Replace with _retry_until() that re-publishes in a tight loop until the subscriber's Event fires.
Calls zenoh.init_log_from_env_or("warn") at module load so that
RUST_LOG=debug surfaces Zenoh's Rust-side transport logs (including
SHM negotiation). Defaults to warn to avoid noise.
uv sync --extra zenoh would resolve dimos[dev] from PyPI instead of the local project, uninstalling other dependencies. The zenoh extra only needs eclipse-zenoh — base deps are already installed.
… missing Align with module_coordinator._get_transport_for: raise RuntimeError instead of silently falling back to LCM when transport is zenoh and eclipse-zenoh is not installed.
93cfd83 to
b8c777a
Compare
…v pip instead of uv sync --extra zenoh
4141991 to
fd20ca8
Compare
This is not the purpose of this branch to add these changes to It will be breaking cause anyone relying on unprefixed env vars for GlobalConfig will no longer get that effect; they need DIMOS_ for those settings to load from the environment / .env even though the cli.md in line 53 says, - "Environment variables and |
From Typical replay on macOS when Zenoh is installed (default is already Zenoh, so no transport flag is required): dimos --dtop --replay --replay-db=go2_bigoffice run unitree-go2The same workload on Linux (default remains dimos --transport=zenoh --dtop --replay --replay-db=go2_bigoffice run unitree-go2
|
Agent and MCP conftest subscribe on LCM while the coordinator uses global_config.transport, which defaults to Zenoh on Darwin when Zenoh is installed. Set transport to lcm so spies and modules share the same backend. The module reloading test runs a separate Python process; set transport to lcm in the REPL before ModuleCoordinator.build for the same reason.
593bb38 to
791b722
Compare
Supersedes #1787.
This PR carries forward the Zenoh transport integration from #1787 and wraps it into a merge-ready branch that fixes the remaining macOS Big Office replay gap.
Validation
Typical replay on macOS when Zenoh is installed (default is already Zenoh, so no transport flag is required):
The same workload on Linux (default remains
lcmuntil you opt in):Notes