Skip to content

refactor: generalize ZMQ pub/sub over message type via MessageCodec#300

Open
nv-alicheng wants to merge 2 commits intomainfrom
feat/alicheng-generic-pubsub
Open

refactor: generalize ZMQ pub/sub over message type via MessageCodec#300
nv-alicheng wants to merge 2 commits intomainfrom
feat/alicheng-generic-pubsub

Conversation

@nv-alicheng
Copy link
Copy Markdown
Collaborator

Summary

  • Replaces EventRecord-specific publisher/subscriber classes with generic ZmqMessagePublisher[T] / ZmqMessageSubscriber[T] parameterized by a MessageCodec[T] Protocol.
  • EventRecordCodec preserves the existing wire format and decode-error wrapping behavior, so all current callers (EventPublisherService, EventLoggerService, MetricsAggregatorService) keep their semantics — they just thread the codec through super().__init__.
  • Exposes sndhwm / linger / conflate / rcvhwm knobs so future callers can choose drop-old vs delivery-guarantee semantics per message type.

Why

The next step is a MetricsPublisher that publishes MetricsSnapshot for a live TUI consumer. Building it directly on top of the EventRecord-specific pub/sub would duplicate the batching / pending-queue / async-writer machinery, or shoehorn snapshots through an EventRecord-shaped wrapper. Generalizing first is one rename pass + a codec extraction; afterward the metrics publisher is a thin composition.

Design context lives at .cursor_artifacts/zmq_metrics_publisher.md (not part of this PR; just for reference).

Behavior preservation

  • MessageSubscriber._on_readable catches msgspec.DecodeError only — same as before.
  • EventRecordCodec.on_decode_error returns the same ErrorEventType.GENERIC wrapper that the old _on_readable constructed inline.
  • All existing socket options (SNDHWM=0, LINGER=-1, IMMEDIATE=1, RCVHWM=0) are the new parameter defaults.

Test plan

  • tests/unit/transport/test_zmq_pool_transport.py — updated to use ZmqMessagePublisher(EventRecordCodec(), ...)
  • tests/unit/async_utils/test_event_publisher.pyCollectingEventSubscriber updated
  • All 154 tests pass in tests/unit/transport/, tests/unit/async_utils/test_event_publisher.py, tests/unit/async_utils/services/event_logger/, tests/unit/async_utils/services/metrics_aggregator/ (run in inference_endpoint_dev container)
  • pre-commit run clean (ruff, ruff-format, mypy, license headers, prettier)
  • Reviewer to confirm no behavior change in production paths beyond the explicit ones called out above

🤖 Generated with Claude Code

@github-actions
Copy link
Copy Markdown

github-actions Bot commented Apr 28, 2026

MLCommons CLA bot All contributors have signed the MLCommons CLA ✍️ ✅

@github-actions github-actions Bot requested review from arekay-nv and nvzhihanj April 28, 2026 23:11
Comment thread src/inference_endpoint/async_utils/transport/protocol.py Fixed
Comment thread src/inference_endpoint/async_utils/transport/protocol.py Fixed
Comment thread src/inference_endpoint/async_utils/transport/protocol.py Fixed
Copy link
Copy Markdown

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request generalizes the pub/sub transport layer by introducing a generic MessageCodec protocol and refactoring the existing EventRecord-specific publisher and subscriber into generic MessagePublisher[T] and MessageSubscriber[T] classes. The ZMQ implementations were updated to support these generics and now include additional socket configuration options such as conflate and high-water marks. Feedback was provided to broaden the exception handling in the subscriber's read loop to ensure the implementation remains truly generic and adheres to the codec protocol contract.

try:
event_record = decode_event_record(payload)
items.append(self._codec.decode(payload))
except msgspec.DecodeError as e:
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The MessageSubscriber should catch Exception instead of msgspec.DecodeError to remain truly generic and adhere to the MessageCodec protocol contract. The protocol defines on_decode_error as accepting any Exception, and different codec implementations might raise various error types (e.g., ValueError, TypeError, or library-specific exceptions like json.JSONDecodeError) during decoding. Catching only msgspec.DecodeError here couples the base protocol implementation to a specific library and could lead to unhandled exceptions crashing the reader loop if a non-msgspec codec is used.

Suggested change
except msgspec.DecodeError as e:
except Exception as e:

Replace EventRecord-specific publisher/subscriber classes with generic
ZmqMessagePublisher[T] / ZmqMessageSubscriber[T] parameterized by a
MessageCodec[T] Protocol. EventRecordCodec preserves existing wire format
and decode-error wrapping behavior. Sets up the generic transport that
the upcoming MetricsSnapshot publisher will reuse.

- protocol.py: drop EventRecordPublisher/Subscriber ABCs; add MessageCodec,
  MessagePublisher[T], MessageSubscriber[T].
- pubsub.py: rewrite as ZmqMessagePublisher[T]/ZmqMessageSubscriber[T];
  expose sndhwm/linger/conflate so future callers (e.g. live snapshots)
  can choose drop-old vs. delivery-guarantee semantics.
- record.py: add EventRecordCodec next to encode/decode helpers.
- Update EventPublisherService, EventLoggerService,
  MetricsAggregatorService and tests to use the generic classes.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@nv-alicheng nv-alicheng force-pushed the feat/alicheng-generic-pubsub branch from c810acd to bd5ab85 Compare April 28, 2026 23:36
Per Gemini review on PR #300: catching only msgspec.DecodeError in
MessageSubscriber._on_readable bakes the codec implementation into the
supposedly-generic base class. A future codec backed by json, pickle,
etc. raises different exception types and would bypass on_decode_error,
crashing the reader.

- protocol.py: widen the catch back to Exception so the base class makes
  no assumption about which decoder library a codec uses; drop the now-
  unused msgspec import.
- record.py: tighten EventRecordCodec.on_decode_error to wrap only
  msgspec.DecodeError and re-raise other exceptions. Preserves the
  previous behavior parity (only malformed-payload errors become
  ErrorEventType.GENERIC records; programmer bugs in the decode path
  still surface).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

def encode(self, item: T) -> tuple[bytes, bytes]:
"""Return (topic, payload). topic must be exactly TOPIC_FRAME_SIZE bytes."""
...
def decode(self, payload: bytes) -> T:
"""Decode payload back to T. May raise; the caller routes failures
through on_decode_error."""
...
def on_decode_error(self, payload: bytes, exc: Exception) -> T | None:
"""Fallback for malformed payloads. Return a sentinel item or None
to drop the message."""
...
@nv-alicheng nv-alicheng marked this pull request as ready for review May 5, 2026 18:20
@nv-alicheng nv-alicheng requested a review from a team May 5, 2026 18:20
Copy link
Copy Markdown
Collaborator

@arekay-nv arekay-nv left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review Council — Multi-AI Code Review

Reviewed by Codex + Claude | Depth: standard | HEAD: faaff7b

Found 1 issue across 1 file: 0 critical, 0 high, 1 medium, 0 low.

Codex returned no actionable regressions: "The refactor from EventRecord-specific pub/sub classes to generic codec-driven message transport appears internally consistent." Claude flagged one test-coverage gap on the new on_decode_error codec method (see inline comment).

Note (not posted as inline): a few docs/**/DESIGN.md files still reference the old symbol names (encode_event_record, decode_event_record, ZmqEventRecordPublisher, ZmqEventRecordSubscriber). Worth a follow-up doc sweep.

)
from inference_endpoint.core.types import ErrorData, PromptData, TextModelOutput

_codec = EventRecordCodec()
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Review Council — Claude] medium · testing

EventRecordCodec.on_decode_error (in core/record.py:187) is brand-new public-API logic introduced by this PR with two distinct branches:

def on_decode_error(self, payload: bytes, exc: Exception) -> EventRecord:
    if not isinstance(exc, msgspec.DecodeError):
        raise exc
    return EventRecord(
        event_type=ErrorEventType.GENERIC,
        data=ErrorData(error_type=type(exc).__name__, error_message=str(exc)),
    )

grep -r on_decode_error tests/ src/ returns zero hits outside the dispatch site in protocol.py. Neither the wrap-on-DecodeError branch nor the re-raise-on-other-exception branch is exercised. This is the only piece of behavior that meaningfully differs between the old inline _on_readable decode-error handling and the new codec abstraction (the previous implementation unconditionally wrapped any msgspec.DecodeError; the new code re-raises non-DecodeError exceptions, which is a real behavioral change).

More importantly, this is the cold path that fires when something else has already gone wrong — exactly the path you don't want untested. The re-raise branch in particular escapes _on_readable (which catches only StopIteration), so a non-DecodeError propagates out of the asyncio reader callback and silently de-registers the subscriber.

Suggested addition in TestEventRecordRoundTrip (or a new TestEventRecordCodecOnDecodeError):

  1. Garbage payload returns a wrapped EventRecord(event_type=ErrorEventType.GENERIC, data=ErrorData(...)) whose error_message matches str(exc).
  2. on_decode_error(b"...", ValueError("not a decode error")) re-raises ValueError (verifies the not isinstance(exc, msgspec.DecodeError) branch).

@arekay-nv
Copy link
Copy Markdown
Collaborator

Review Council — Multi-AI Code Review

Reviewed by Codex + Claude | Depth: standard | HEAD: faaff7b

Found 1 issue across 1 file:

  • 0 critical/high
  • 1 medium
  • 0 low
# File Line Severity Category Reviewer(s) Summary
1 tests/unit/core/test_record.py 32 medium testing Claude New EventRecordCodec.on_decode_error has 2 distinct branches (wrap on msgspec.DecodeError, re-raise on anything else) — neither exercised; grep shows zero test references; this is the only behavioral change introduced by the codec refactor

Codex verdict

The refactor from EventRecord-specific pub/sub classes to generic codec-driven message transport appears internally consistent. I did not find an actionable regression in the changed code paths or any missed in-repo call sites that would break existing behavior.

Cross-reviewer agreement: only one finding overall, from Claude. Codex independently concluded the refactor is clean.

Follow-up (not posted inline)

A few design docs reference the old symbol names — likely a small doc sweep:

  • docs/load_generator/DESIGN.md
  • docs/async_utils/DESIGN.md
  • docs/async_utils/services/DESIGN.md
  • docs/async_utils/services/event_logger/DESIGN.md

Stale references include encode_event_record, decode_event_record, ZmqEventRecordPublisher, ZmqEventRecordSubscriber.

Inline review: #pullrequestreview-4231671106

🤖 Posted by /review-council skill.

Copy link
Copy Markdown
Collaborator

@arekay-nv arekay-nv left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#300 🎉 !

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants