refactor: generalize ZMQ pub/sub over message type via MessageCodec#300
refactor: generalize ZMQ pub/sub over message type via MessageCodec#300nv-alicheng wants to merge 2 commits intomainfrom
Conversation
|
MLCommons CLA bot All contributors have signed the MLCommons CLA ✍️ ✅ |
There was a problem hiding this comment.
Code Review
This pull request generalizes the pub/sub transport layer by introducing a generic MessageCodec protocol and refactoring the existing EventRecord-specific publisher and subscriber into generic MessagePublisher[T] and MessageSubscriber[T] classes. The ZMQ implementations were updated to support these generics and now include additional socket configuration options such as conflate and high-water marks. Feedback was provided to broaden the exception handling in the subscriber's read loop to ensure the implementation remains truly generic and adheres to the codec protocol contract.
| try: | ||
| event_record = decode_event_record(payload) | ||
| items.append(self._codec.decode(payload)) | ||
| except msgspec.DecodeError as e: |
There was a problem hiding this comment.
The MessageSubscriber should catch Exception instead of msgspec.DecodeError to remain truly generic and adhere to the MessageCodec protocol contract. The protocol defines on_decode_error as accepting any Exception, and different codec implementations might raise various error types (e.g., ValueError, TypeError, or library-specific exceptions like json.JSONDecodeError) during decoding. Catching only msgspec.DecodeError here couples the base protocol implementation to a specific library and could lead to unhandled exceptions crashing the reader loop if a non-msgspec codec is used.
| except msgspec.DecodeError as e: | |
| except Exception as e: |
Replace EventRecord-specific publisher/subscriber classes with generic ZmqMessagePublisher[T] / ZmqMessageSubscriber[T] parameterized by a MessageCodec[T] Protocol. EventRecordCodec preserves existing wire format and decode-error wrapping behavior. Sets up the generic transport that the upcoming MetricsSnapshot publisher will reuse. - protocol.py: drop EventRecordPublisher/Subscriber ABCs; add MessageCodec, MessagePublisher[T], MessageSubscriber[T]. - pubsub.py: rewrite as ZmqMessagePublisher[T]/ZmqMessageSubscriber[T]; expose sndhwm/linger/conflate so future callers (e.g. live snapshots) can choose drop-old vs. delivery-guarantee semantics. - record.py: add EventRecordCodec next to encode/decode helpers. - Update EventPublisherService, EventLoggerService, MetricsAggregatorService and tests to use the generic classes. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
c810acd to
bd5ab85
Compare
Per Gemini review on PR #300: catching only msgspec.DecodeError in MessageSubscriber._on_readable bakes the codec implementation into the supposedly-generic base class. A future codec backed by json, pickle, etc. raises different exception types and would bypass on_decode_error, crashing the reader. - protocol.py: widen the catch back to Exception so the base class makes no assumption about which decoder library a codec uses; drop the now- unused msgspec import. - record.py: tighten EventRecordCodec.on_decode_error to wrap only msgspec.DecodeError and re-raise other exceptions. Preserves the previous behavior parity (only malformed-payload errors become ErrorEventType.GENERIC records; programmer bugs in the decode path still surface). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
|
||
| def encode(self, item: T) -> tuple[bytes, bytes]: | ||
| """Return (topic, payload). topic must be exactly TOPIC_FRAME_SIZE bytes.""" | ||
| ... |
| def decode(self, payload: bytes) -> T: | ||
| """Decode payload back to T. May raise; the caller routes failures | ||
| through on_decode_error.""" | ||
| ... |
| def on_decode_error(self, payload: bytes, exc: Exception) -> T | None: | ||
| """Fallback for malformed payloads. Return a sentinel item or None | ||
| to drop the message.""" | ||
| ... |
arekay-nv
left a comment
There was a problem hiding this comment.
Review Council — Multi-AI Code Review
Reviewed by Codex + Claude | Depth: standard | HEAD: faaff7b
Found 1 issue across 1 file: 0 critical, 0 high, 1 medium, 0 low.
Codex returned no actionable regressions: "The refactor from EventRecord-specific pub/sub classes to generic codec-driven message transport appears internally consistent." Claude flagged one test-coverage gap on the new on_decode_error codec method (see inline comment).
Note (not posted as inline): a few docs/**/DESIGN.md files still reference the old symbol names (encode_event_record, decode_event_record, ZmqEventRecordPublisher, ZmqEventRecordSubscriber). Worth a follow-up doc sweep.
| ) | ||
| from inference_endpoint.core.types import ErrorData, PromptData, TextModelOutput | ||
|
|
||
| _codec = EventRecordCodec() |
There was a problem hiding this comment.
[Review Council — Claude] medium · testing
EventRecordCodec.on_decode_error (in core/record.py:187) is brand-new public-API logic introduced by this PR with two distinct branches:
def on_decode_error(self, payload: bytes, exc: Exception) -> EventRecord:
if not isinstance(exc, msgspec.DecodeError):
raise exc
return EventRecord(
event_type=ErrorEventType.GENERIC,
data=ErrorData(error_type=type(exc).__name__, error_message=str(exc)),
)grep -r on_decode_error tests/ src/ returns zero hits outside the dispatch site in protocol.py. Neither the wrap-on-DecodeError branch nor the re-raise-on-other-exception branch is exercised. This is the only piece of behavior that meaningfully differs between the old inline _on_readable decode-error handling and the new codec abstraction (the previous implementation unconditionally wrapped any msgspec.DecodeError; the new code re-raises non-DecodeError exceptions, which is a real behavioral change).
More importantly, this is the cold path that fires when something else has already gone wrong — exactly the path you don't want untested. The re-raise branch in particular escapes _on_readable (which catches only StopIteration), so a non-DecodeError propagates out of the asyncio reader callback and silently de-registers the subscriber.
Suggested addition in TestEventRecordRoundTrip (or a new TestEventRecordCodecOnDecodeError):
- Garbage payload returns a wrapped
EventRecord(event_type=ErrorEventType.GENERIC, data=ErrorData(...))whoseerror_messagematchesstr(exc). on_decode_error(b"...", ValueError("not a decode error"))re-raisesValueError(verifies thenot isinstance(exc, msgspec.DecodeError)branch).
Review Council — Multi-AI Code ReviewReviewed by Codex + Claude | Depth: standard | HEAD: Found 1 issue across 1 file:
Codex verdict
Cross-reviewer agreement: only one finding overall, from Claude. Codex independently concluded the refactor is clean. Follow-up (not posted inline)A few design docs reference the old symbol names — likely a small doc sweep:
Stale references include Inline review: #pullrequestreview-4231671106 🤖 Posted by |
Summary
EventRecord-specific publisher/subscriber classes with genericZmqMessagePublisher[T]/ZmqMessageSubscriber[T]parameterized by aMessageCodec[T]Protocol.EventRecordCodecpreserves the existing wire format and decode-error wrapping behavior, so all current callers (EventPublisherService,EventLoggerService,MetricsAggregatorService) keep their semantics — they just thread the codec throughsuper().__init__.sndhwm/linger/conflate/rcvhwmknobs so future callers can choose drop-old vs delivery-guarantee semantics per message type.Why
The next step is a
MetricsPublisherthat publishesMetricsSnapshotfor a live TUI consumer. Building it directly on top of the EventRecord-specific pub/sub would duplicate the batching / pending-queue / async-writer machinery, or shoehorn snapshots through an EventRecord-shaped wrapper. Generalizing first is one rename pass + a codec extraction; afterward the metrics publisher is a thin composition.Design context lives at
.cursor_artifacts/zmq_metrics_publisher.md(not part of this PR; just for reference).Behavior preservation
MessageSubscriber._on_readablecatchesmsgspec.DecodeErroronly — same as before.EventRecordCodec.on_decode_errorreturns the sameErrorEventType.GENERICwrapper that the old_on_readableconstructed inline.SNDHWM=0,LINGER=-1,IMMEDIATE=1,RCVHWM=0) are the new parameter defaults.Test plan
tests/unit/transport/test_zmq_pool_transport.py— updated to useZmqMessagePublisher(EventRecordCodec(), ...)tests/unit/async_utils/test_event_publisher.py—CollectingEventSubscriberupdatedtests/unit/transport/,tests/unit/async_utils/test_event_publisher.py,tests/unit/async_utils/services/event_logger/,tests/unit/async_utils/services/metrics_aggregator/(run ininference_endpoint_devcontainer)pre-commit runclean (ruff, ruff-format, mypy, license headers, prettier)🤖 Generated with Claude Code