Skip to content

Split OriginConsumer into cheap read handle and announcement cursor#1434

Open
kixelated wants to merge 1 commit into
mainfrom
claude/originconsumer-performance-jKMzS
Open

Split OriginConsumer into cheap read handle and announcement cursor#1434
kixelated wants to merge 1 commit into
mainfrom
claude/originconsumer-performance-jKMzS

Conversation

@kixelated
Copy link
Copy Markdown
Collaborator

Summary

Refactor the announcement subscription API to separate concerns: OriginConsumer is now a cheap, cloneable read handle over the broadcast tree, while AnnounceConsumer is the stateful cursor that receives announcement events. This eliminates the expensive per-clone channel allocation and allows multiple independent announcement streams from the same consumer.

Key Changes

  • OriginConsumer refactored to be cheap to clone: Removed the per-instance mpsc::UnboundedReceiver and ConsumerId. Cloning now shares the underlying tree state without allocating any per-cursor resources.

  • New AnnounceConsumer type: Extracted the announcement stream logic into a separate struct that holds the channel receiver and consumer ID. Created via OriginConsumer::announced() or AnnounceProducer::consume().

  • New AnnounceProducer type: Symmetric counterpart to AnnounceConsumer. Provides a cheap read handle to the announcement stream for a subtree, similar to how OriginConsumer works for the broadcast tree.

  • API method renames:

    • OriginConsumer::announced() now returns AnnounceConsumer (was async method returning Option<OriginAnnounce>)
    • OriginConsumer::try_announced()AnnounceConsumer::try_next()
    • OriginConsumer::announced() (async) → AnnounceConsumer::next() (async)
    • Added AnnounceConsumer::is_closed() to check cursor state
  • OriginProducer::announces() method: New method to get an AnnounceProducer for the subtree, providing symmetric API to consume().

  • Updated internal struct: Renamed OriginConsumerNotify to AnnounceConsumerNotify for clarity.

  • Documentation improvements: Updated doc comments to clarify the cheap-to-clone nature of OriginConsumer and the allocation semantics of AnnounceConsumer.

Implementation Details

  • OriginConsumer now derives Clone and implements it directly (no custom impl needed).
  • AnnounceConsumer maintains the original per-cursor state: channel receiver, consumer ID, and root path.
  • Both types support the same root path stripping and absolute path conversion utilities.
  • Test helpers (assert_next, assert_try_next, etc.) moved to AnnounceConsumer impl block.
  • All call sites updated to call .announced() on OriginConsumer to get an AnnounceConsumer before awaiting announcements.
  • Added comprehensive test test_consumer_clone_is_side_effect_free to verify that cloning OriginConsumer does not drain announcement channels and that fresh AnnounceConsumer instances still receive the active backlog.

https://claude.ai/code/session_01BSuKtGPPntcBMEQiGb3Pn6

OriginConsumer used to bundle a tree-read handle with an mpsc cursor
and a registered ConsumerId. Every clone (and every transient
.consume() / .scope() / .with_root() chain) paid for a fresh channel
plus a consume_initial() walk that replayed every active broadcast as
backlog. Callers like get_broadcast, announced_broadcast, and the FFI
wrappers paid that cost just to look up one path or re-root a handle.

Split announcement subscription into AnnounceProducer and
AnnounceConsumer. OriginConsumer is now a cheap, Clone-friendly read
handle; call .announced() on it to allocate the channel and register
the cursor. AnnounceConsumer owns the Drop-side unregistration. The
deprecated OriginProducer::get_broadcast shim and its libmoq
#[allow(deprecated)] caller go away.

Updates the lite/ietf publishers, FFI, libmoq, relay, clock, moq-boy,
hang examples, and integration tests to grab a cursor explicitly
(consumer.announced() -> AnnounceConsumer::next/try_next). Adds a
regression test that cloning OriginConsumer does not drain another
cursor's channel and that a freshly-built AnnounceConsumer still
receives the active backlog.

https://claude.ai/code/session_01BSuKtGPPntcBMEQiGb3Pn6
Comment thread rs/libmoq/src/origin.rs
// for gossip instead of racing it.
// Uses the deprecated direct lookup to avoid the per-call cost of OriginProducer::consume().
#[allow(deprecated)]
origin.get_broadcast(path).ok_or(Error::BroadcastNotFound)
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should remove this deprecated method because it's cheap to consume().get_broadcast() now

@kixelated kixelated marked this pull request as ready for review May 21, 2026 17:09
@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented May 21, 2026

Review Change Stack

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 1b8346c7-93c1-490a-a32b-de5a78fe6c1e

📥 Commits

Reviewing files that changed from the base of the PR and between 9021565 and 106d480.

📒 Files selected for processing (12)
  • rs/hang/examples/subscribe.rs
  • rs/libmoq/src/origin.rs
  • rs/moq-boy/src/input.rs
  • rs/moq-boy/src/main.rs
  • rs/moq-clock/src/main.rs
  • rs/moq-ffi/src/origin.rs
  • rs/moq-native/tests/backend.rs
  • rs/moq-native/tests/broadcast.rs
  • rs/moq-net/src/ietf/publisher.rs
  • rs/moq-net/src/lite/publisher.rs
  • rs/moq-net/src/model/origin.rs
  • rs/moq-relay/src/web.rs

Walkthrough

This PR refactors the origin announcement streaming API by separating the cheap read handle (OriginConsumer) from the event cursor (AnnounceConsumer). OriginConsumer::announced() changes from an async method that drains announcements to a synchronous factory returning an AnnounceConsumer cursor. Internal notification plumbing switches to AnnounceConsumerNotify. All code that iterates announcements is updated to create persistent cursors and consume them via next() or try_next() rather than re-awaiting on the origin consumer each iteration. Tests are updated to use the new cursor API throughout.

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Title check ✅ Passed The title clearly and concisely describes the main refactoring: splitting OriginConsumer into two types with distinct responsibilities (cheap read handle vs. announcement cursor).
Description check ✅ Passed The description comprehensively explains the refactoring's rationale, key API changes, and implementation details, all directly related to the changeset across multiple files.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch claude/originconsumer-performance-jKMzS
✨ Simplify code
  • Create PR with simplified code
  • Commit simplified code in branch claude/originconsumer-performance-jKMzS

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants