Split OriginConsumer into cheap read handle and announcement cursor#1434
Split OriginConsumer into cheap read handle and announcement cursor#1434kixelated wants to merge 1 commit into
Conversation
OriginConsumer used to bundle a tree-read handle with an mpsc cursor and a registered ConsumerId. Every clone (and every transient .consume() / .scope() / .with_root() chain) paid for a fresh channel plus a consume_initial() walk that replayed every active broadcast as backlog. Callers like get_broadcast, announced_broadcast, and the FFI wrappers paid that cost just to look up one path or re-root a handle. Split announcement subscription into AnnounceProducer and AnnounceConsumer. OriginConsumer is now a cheap, Clone-friendly read handle; call .announced() on it to allocate the channel and register the cursor. AnnounceConsumer owns the Drop-side unregistration. The deprecated OriginProducer::get_broadcast shim and its libmoq #[allow(deprecated)] caller go away. Updates the lite/ietf publishers, FFI, libmoq, relay, clock, moq-boy, hang examples, and integration tests to grab a cursor explicitly (consumer.announced() -> AnnounceConsumer::next/try_next). Adds a regression test that cloning OriginConsumer does not drain another cursor's channel and that a freshly-built AnnounceConsumer still receives the active backlog. https://claude.ai/code/session_01BSuKtGPPntcBMEQiGb3Pn6
| // for gossip instead of racing it. | ||
| // Uses the deprecated direct lookup to avoid the per-call cost of OriginProducer::consume(). | ||
| #[allow(deprecated)] | ||
| origin.get_broadcast(path).ok_or(Error::BroadcastNotFound) |
There was a problem hiding this comment.
We should remove this deprecated method because it's cheap to consume().get_broadcast() now
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (12)
WalkthroughThis PR refactors the origin announcement streaming API by separating the cheap read handle ( 🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
✨ Simplify code
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Summary
Refactor the announcement subscription API to separate concerns:
OriginConsumeris now a cheap, cloneable read handle over the broadcast tree, whileAnnounceConsumeris the stateful cursor that receives announcement events. This eliminates the expensive per-clone channel allocation and allows multiple independent announcement streams from the same consumer.Key Changes
OriginConsumerrefactored to be cheap to clone: Removed the per-instancempsc::UnboundedReceiverandConsumerId. Cloning now shares the underlying tree state without allocating any per-cursor resources.New
AnnounceConsumertype: Extracted the announcement stream logic into a separate struct that holds the channel receiver and consumer ID. Created viaOriginConsumer::announced()orAnnounceProducer::consume().New
AnnounceProducertype: Symmetric counterpart toAnnounceConsumer. Provides a cheap read handle to the announcement stream for a subtree, similar to howOriginConsumerworks for the broadcast tree.API method renames:
OriginConsumer::announced()now returnsAnnounceConsumer(was async method returningOption<OriginAnnounce>)OriginConsumer::try_announced()→AnnounceConsumer::try_next()OriginConsumer::announced()(async) →AnnounceConsumer::next()(async)AnnounceConsumer::is_closed()to check cursor stateOriginProducer::announces()method: New method to get anAnnounceProducerfor the subtree, providing symmetric API toconsume().Updated internal struct: Renamed
OriginConsumerNotifytoAnnounceConsumerNotifyfor clarity.Documentation improvements: Updated doc comments to clarify the cheap-to-clone nature of
OriginConsumerand the allocation semantics ofAnnounceConsumer.Implementation Details
OriginConsumernow derivesCloneand implements it directly (no custom impl needed).AnnounceConsumermaintains the original per-cursor state: channel receiver, consumer ID, and root path.assert_next,assert_try_next, etc.) moved toAnnounceConsumerimpl block..announced()onOriginConsumerto get anAnnounceConsumerbefore awaiting announcements.test_consumer_clone_is_side_effect_freeto verify that cloningOriginConsumerdoes not drain announcement channels and that freshAnnounceConsumerinstances still receive the active backlog.https://claude.ai/code/session_01BSuKtGPPntcBMEQiGb3Pn6