Skip to content

Ivan/record replay wrap#1976

Open
leshy wants to merge 39 commits intodevfrom
ivan/record-replay-wrap
Open

Ivan/record replay wrap#1976
leshy wants to merge 39 commits intodevfrom
ivan/record-replay-wrap

Conversation

@leshy
Copy link
Copy Markdown
Contributor

@leshy leshy commented May 5, 2026

Problem

Closes DIM-XXX

Solution

Breaking Changes

How to Test

Contributor License Agreement

  • I have read and approved the CLA.

@leshy leshy marked this pull request as ready for review May 5, 2026 07:15
Comment thread dimos/record/record_replay.py Outdated
Comment thread dimos/core/module.py Outdated
Comment thread dimos/core/module.py
_SANITIZE_RE = re.compile(r"[^A-Za-z0-9_]")


def topic_to_stream_name(channel: str) -> str:
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is no need for update_stream_meta and store metadata interventions to save the type of a stream and full stream name, given this function is reversible? streams know their type in their metadata, so they can reconstruct full topics?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the function is reversible. /color/image and /color_image both get transformed to _color_image, no?

Comment thread dimos/core/coordination/module_coordinator.py
Comment thread dimos/core/coordination/module_coordinator.py
Comment thread dimos/record/record_replay.py
Comment thread dimos/memory2/observationstore/sqlite.py
@dimensionalOS dimensionalOS deleted a comment from greptile-apps Bot May 5, 2026
@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps Bot commented May 5, 2026

Greptile Summary

  • Introduces RecordReplay, a SqliteStore-backed record/replay system with a Textual TUI (RecorderApp), auto-record/replay hooks in ModuleCoordinator.build(), and per-module start_recording_outputs/stop_recording_outputs RPCs on ModuleBase.
  • loop() is converted from sync (threading.Event) to async (asyncio.Event), requiring call sites to be updated to await coord.loop() inside asyncio.run() — all callers in the diff are correctly updated.
  • A minor format-string concatenation issue leaves a missing space in the "no modules to disable" warning log message; see inline comment.

Confidence Score: 3/5

Not safe to merge until the open concerns from previous review rounds are resolved — particularly the leaked RecordReplay resource in replay() and playback not republishing messages onto the LCM bus.

New findings in this pass are P2 only (format string cosmetic). However, prior review threads documented P1 issues that remain present in the diff: the RecordReplay instance created in ModuleCoordinator.replay() is a local variable never stored on the coordinator and never closed (resource/task leak), and _playback_loop only logs to Rerun rather than republishing decoded messages onto the LCM bus, so downstream live modules receive no data during replay. Multiple P1s that are unaddressed pull the score below the 4/5 ceiling.

dimos/core/coordination/module_coordinator.py (replay lifecycle / resource management) and dimos/record/record_replay.py (_playback_loop republish path)

Important Files Changed

Filename Overview
dimos/record/record_replay.py New RecordReplay class — async playback loop, seek/trim/delete; referenced in previous review threads for leaked instance and playback not publishing to LCM bus
dimos/core/coordination/module_coordinator.py New replay() classmethod and auto-replay/record branches in build(); leaked RecordReplay instance and missing LCM republish from previous threads; minor format-string bug in warning log
dimos/core/module.py Adds start_recording_outputs/stop_recording_outputs RPC methods; correctly uses default-argument pattern to avoid closure capture bug; previous thread concern about msg_name appears resolved
dimos/memory2/observationstore/sqlite.py Adds delete_range with proper try/except rollback; commit() is outside the if ids: block so it always runs — previous thread concern about misplaced commit appears unfounded
dimos/utils/cli/recorder/run_recorder.py New Textual TUI for recording/playback; correctly uses the same asyncio event loop as RecordReplay; CSS_PATH resolves correctly relative to module file
dimos/memory2/stream.py Adds delete_range (delegates to Backend) and publish() helper; data_type property correctly walks source chain; types are consistent
dimos/memory2/backend.py Adds delete_range delegating to metadata_store with blob/vector cleanup; correctly returns count of deleted IDs
dimos/visualization/rerun/bridge.py Switches TypeGuard to TypeIs, rewrites is_rerun_multi with structural pattern matching, fixes gRPC port to 9876, passes grpc_port explicitly to serve_grpc
dimos/core/coordination/blueprints.py Adds record_modules field and default_record_modules() builder method; autoconnect() correctly propagates record_modules across merged blueprints
dimos/core/global_config.py Adds replay_file and record_path optional config fields for CLI-driven record/replay

Flowchart

%%{init: {'theme': 'neutral'}}%%
flowchart TD
    CLI["dimos run / dimos recorder"] -->|asyncio.run| RunCmd["_run async"]
    RunCmd -->|build| Build["ModuleCoordinator.build"]
    Build --> AutoReplay{replay_file set?}
    AutoReplay -->|yes| Replay["ModuleCoordinator.replay"]
    AutoReplay -->|no| NormalBuild["Build + start modules"]
    NormalBuild --> AutoRecord{record_path set?}
    AutoRecord -->|yes| RecordOut["instance.start_recording_outputs\nper record_module"]
    AutoRecord -->|no| Done["Return coordinator"]
    RecordOut --> Done

    Replay --> RR["RecordReplay instance"]
    RR --> Disable["blueprint.disabled_modules"]
    Disable -->|cls.build patched| Build
    RR --> PlayTask["asyncio.create_task\n_playback_loop"]
    PlayTask --> Loop["Merge-sort observations\nby timestamp"]
    Loop --> RerunLog["rec.log to Rerun viewer"]

    ModuleBase["ModuleBase"] --> SQLite["SqliteStore per module"]
    SQLite --> DB[(SQLite DB)]

    TUI["RecorderApp TUI"] --> LCMSub["LCM subscribe"]
    LCMSub --> DB
    TUI --> PlayTask
Loading

Reviews (4): Last reviewed commit: "sql transaction fix" | Re-trigger Greptile

result.append(info)
return tuple(result)

def play(self, speed: float = 1.0) -> None:
Copy link
Copy Markdown
Contributor

@paul-nechifor paul-nechifor May 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment at the start of the file says rec.play(pubsub=LCM(), speed=1.0). Where is this play actually publishing?

rec.stop_recording()

# Replay into LCM (viewable via rerun-bridge)
rec.play(pubsub=LCM(), speed=1.0)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should have docs for replay in docs. Note, BTW, that rec.play doesn't take pubsub currently.

Comment on lines +164 to +165
stream_name = topic_to_stream_name(topic.pattern)
self._store.stream(stream_name, type(msg)).append(msg)
Copy link
Copy Markdown
Contributor

@paul-nechifor paul-nechifor May 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're not storing the topic. How does the stream know where the message should be replayed to?

The previous PR stored the topic. This tries to guess that the topic is.

else:
from typing import TypeGuard as TypeIs

RERUN_GRPC_PORT = 9876
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is 9877 to 9876 intentional? There are hardcoded places where the 9877 is still used:

dimos/core/test_daemon.py:47:    grpc_port: int = 9877,
dimos/core/test_daemon.py:143:        entry = _make_entry(pid=os.getpid(), grpc_port=9877)
dimos/core/test_daemon.py:146:        conflict = check_port_conflicts(grpc_port=9877)
dimos/core/test_daemon.py:154:        conflict = check_port_conflicts(grpc_port=9877)
dimos/core/run_registry.py:47:    grpc_port: int = 9877
dimos/core/run_registry.py:144:def check_port_conflicts(grpc_port: int = 9877) -> RunEntry | None:
dimos/visualization/rerun/bridge.py:196:    connect_url: str = "rerun+http://127.0.0.1:9877/proxy"


@rpc
def start(self) -> None:
# Delay import to reduce import time (~2.4s)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You removed from dimos.msgs.sensor_msgs.PointCloud2 import register_colormap_annotation but left the comment.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants