From 8dbd6748e4ffed2144b12eb091a60c8d058e6989 Mon Sep 17 00:00:00 2001 From: S1ro1 Date: Tue, 12 May 2026 03:00:36 +0530 Subject: [PATCH 1/6] feat: compose split routed experts --- .../clients/openai_chat_completions_client.py | 39 +++++------ .../clients/openai_completions_client.py | 24 ++++++- verifiers/clients/renderer_client.py | 21 +++++- verifiers/clients/routed_experts.py | 64 +++++++++++++++++++ verifiers/utils/response_utils.py | 2 +- 5 files changed, 121 insertions(+), 29 deletions(-) create mode 100644 verifiers/clients/routed_experts.py diff --git a/verifiers/clients/openai_chat_completions_client.py b/verifiers/clients/openai_chat_completions_client.py index c755d8dd4..c2eab2a78 100644 --- a/verifiers/clients/openai_chat_completions_client.py +++ b/verifiers/clients/openai_chat_completions_client.py @@ -1,10 +1,7 @@ -import base64 import functools from collections.abc import Iterable, Mapping from typing import Any, TypeAlias, cast -import numpy as np - from openai import ( AsyncOpenAI, AuthenticationError, @@ -36,6 +33,7 @@ from openai.types.shared_params import FunctionDefinition from verifiers.clients.client import Client +from verifiers.clients.routed_experts import compose_split_routed_experts from verifiers.errors import ( EmptyModelResponseError, InvalidModelResponseError, @@ -459,27 +457,22 @@ def parse_tokens(response: OpenAIChatResponse) -> ResponseTokens | None: logprobs_content = response.choices[0].logprobs["content"] completion_logprobs = [token["logprob"] for token in logprobs_content] - has_routed_experts = ( - isinstance( - routed_experts := getattr(choice, "routed_experts", None), dict - ) - and "data" in routed_experts - and "shape" in routed_experts - ) - if has_routed_experts: - routed_experts = cast(dict[str, Any], routed_experts) - routed_experts = cast( - list[list[list[int]]], - ( - np.frombuffer( - base64.b85decode(routed_experts["data"]), dtype=np.int32 - ) - .reshape(routed_experts["shape"]) - .tolist() - ), - ) # [seq_len, layers, topk] + response_any = cast(Any, response) + choice_any = cast(Any, choice) + if hasattr(response_any, "prompt_routed_experts") or hasattr( + choice_any, "routed_experts" + ): + prompt_routed_experts = response_any.prompt_routed_experts + completion_routed_experts = choice_any.routed_experts else: - routed_experts = None + prompt_routed_experts = None + completion_routed_experts = None + routed_experts = compose_split_routed_experts( + prompt_routed_experts=prompt_routed_experts, + completion_routed_experts=completion_routed_experts, + prompt_len=len(prompt_ids), + completion_len=len(completion_ids), + ) return ResponseTokens( prompt_ids=prompt_ids, prompt_mask=prompt_mask, diff --git a/verifiers/clients/openai_completions_client.py b/verifiers/clients/openai_completions_client.py index f7115322a..b24ad0626 100644 --- a/verifiers/clients/openai_completions_client.py +++ b/verifiers/clients/openai_completions_client.py @@ -1,3 +1,5 @@ +from typing import Any, cast + from openai import ( AsyncOpenAI, ) @@ -9,6 +11,7 @@ get_usage_field, handle_openai_overlong_prompt, ) +from verifiers.clients.routed_experts import compose_split_routed_experts from verifiers.errors import ( EmptyModelResponseError, InvalidModelResponseError, @@ -82,8 +85,7 @@ async def get_native_response( ) -> OpenAITextResponse: if tools: raise ValueError( - "Completions API does not support tools. " - "Use chat_completions or messages client_type instead." + "Completions API does not support tools. Use chat_completions or messages client_type instead." ) def normalize_sampling_args(sampling_args: SamplingArgs): @@ -170,12 +172,30 @@ def parse_tokens(response: OpenAITextResponse) -> ResponseTokens | None: ) if completion_logprobs is None: return None + choice = response.choices[0] + response_any = cast(Any, response) + choice_any = cast(Any, choice) + if hasattr(response_any, "prompt_routed_experts") or hasattr( + choice_any, "routed_experts" + ): + prompt_routed_experts = response_any.prompt_routed_experts + completion_routed_experts = choice_any.routed_experts + else: + prompt_routed_experts = None + completion_routed_experts = None + routed_experts = compose_split_routed_experts( + prompt_routed_experts=prompt_routed_experts, + completion_routed_experts=completion_routed_experts, + prompt_len=len(prompt_ids), + completion_len=len(completion_ids), + ) return ResponseTokens( prompt_ids=prompt_ids, prompt_mask=prompt_mask, completion_ids=completion_ids, completion_mask=completion_mask, completion_logprobs=completion_logprobs, + routed_experts=routed_experts, ) return Response( diff --git a/verifiers/clients/renderer_client.py b/verifiers/clients/renderer_client.py index d793a3b67..cfc881ec8 100644 --- a/verifiers/clients/renderer_client.py +++ b/verifiers/clients/renderer_client.py @@ -17,22 +17,22 @@ from typing import Any, ClassVar, cast from openai import AsyncOpenAI - from renderers import Message as RendererMessage from renderers import ( Renderer, RendererPool, + ToolCallFunction, ToolSpec, create_renderer_pool, ) from renderers import ToolCall as RendererToolCall -from renderers import ToolCallFunction from renderers.client import generate from verifiers.clients.client import Client from verifiers.clients.openai_chat_completions_client import ( handle_openai_overlong_prompt, ) +from verifiers.clients.routed_experts import compose_split_routed_experts from verifiers.errors import EmptyModelResponseError from verifiers.types import ( AssistantMessage, @@ -572,6 +572,21 @@ async def from_native_response(self, response: dict[str, Any]) -> Response: prompt_ids = response.get("prompt_ids", []) completion_ids = response.get("completion_ids", []) completion_logprobs = response.get("completion_logprobs", []) + if ( + "prompt_routed_experts" in response + or "completion_routed_experts" in response + ): + prompt_routed_experts = response["prompt_routed_experts"] + completion_routed_experts = response["completion_routed_experts"] + else: + prompt_routed_experts = None + completion_routed_experts = None + routed_experts = compose_split_routed_experts( + prompt_routed_experts=prompt_routed_experts, + completion_routed_experts=completion_routed_experts, + prompt_len=len(prompt_ids), + completion_len=len(completion_ids), + ) tokens = ResponseTokens( prompt_ids=prompt_ids, @@ -579,7 +594,7 @@ async def from_native_response(self, response: dict[str, Any]) -> Response: completion_ids=completion_ids, completion_mask=[1] * len(completion_ids), completion_logprobs=completion_logprobs, - routed_experts=response.get("routed_experts"), + routed_experts=routed_experts, ) # /inference/v1/generate doesn't return usage; reconstruct from tokens. diff --git a/verifiers/clients/routed_experts.py b/verifiers/clients/routed_experts.py new file mode 100644 index 000000000..abe33b30c --- /dev/null +++ b/verifiers/clients/routed_experts.py @@ -0,0 +1,64 @@ +from __future__ import annotations + +from dataclasses import dataclass +from typing import Any, TypeAlias, cast + +RoutedExperts: TypeAlias = list[list[list[int]]] + + +@dataclass(frozen=True) +class _DecodedRoutedExperts: + values: RoutedExperts + shape: tuple[int, int, int] + + +def _decode_routed_experts(raw: Any) -> _DecodedRoutedExperts: + routed_experts = cast(RoutedExperts, raw) + seq_len = len(routed_experts) + num_layers = len(routed_experts[0]) + topk = len(routed_experts[0][0]) + + return _DecodedRoutedExperts( + values=routed_experts, + shape=(seq_len, num_layers, topk), + ) + + +def compose_split_routed_experts( + *, + prompt_routed_experts: Any, + completion_routed_experts: Any, + prompt_len: int, + completion_len: int, +) -> RoutedExperts | None: + """Compose split routed experts and align them to prompt + completion tokens. + + vLLM returns prompt routing at the response level and generated-token routing + on the choice. The final generated token has no routing decision because it + was not fed into another forward pass, so this appends one zero entry when a + completion exists. + """ + + has_prompt = prompt_routed_experts is not None + has_completion = completion_routed_experts is not None + if not has_prompt and not has_completion: + return None + + prompt = _decode_routed_experts(prompt_routed_experts) + assert prompt.shape[0] == prompt_len + + expected_completion_routed_len = max(completion_len - 1, 0) + if expected_completion_routed_len == 0: + completion_values = [] + else: + completion = _decode_routed_experts(completion_routed_experts) + assert completion.shape[1:] == prompt.shape[1:] + assert completion.shape[0] == expected_completion_routed_len + completion_values = completion.values + + if completion_len == 0: + return prompt.values + + assert prompt.shape[1] > 0 and prompt.shape[2] > 0 + zero_entry = [[0] * prompt.shape[2] for _ in range(prompt.shape[1])] + return prompt.values + completion_values + [zero_entry] diff --git a/verifiers/utils/response_utils.py b/verifiers/utils/response_utils.py index 4e7c8b480..778570886 100644 --- a/verifiers/utils/response_utils.py +++ b/verifiers/utils/response_utils.py @@ -54,7 +54,7 @@ async def parse_response_tokens( completion_mask = tokens.completion_mask[: max_seq_len - prompt_len] completion_logprobs = tokens.completion_logprobs[: max_seq_len - prompt_len] if routed_experts is not None: - routed_experts = routed_experts[: max_seq_len - prompt_len] + routed_experts = routed_experts[:max_seq_len] else: is_truncated = False else: From b76a6bb7901fff2316a2b4277ba2ea1f9f8f11dd Mon Sep 17 00:00:00 2001 From: S1ro1 Date: Tue, 12 May 2026 08:45:17 +0530 Subject: [PATCH 2/6] feat: decode base64 routed experts --- verifiers/clients/routed_experts.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/verifiers/clients/routed_experts.py b/verifiers/clients/routed_experts.py index abe33b30c..58be685f5 100644 --- a/verifiers/clients/routed_experts.py +++ b/verifiers/clients/routed_experts.py @@ -1,8 +1,11 @@ from __future__ import annotations +import base64 from dataclasses import dataclass from typing import Any, TypeAlias, cast +import numpy as np + RoutedExperts: TypeAlias = list[list[list[int]]] @@ -13,6 +16,17 @@ class _DecodedRoutedExperts: def _decode_routed_experts(raw: Any) -> _DecodedRoutedExperts: + if isinstance(raw, dict): + assert raw["encoding"] == "base64" + assert raw["dtype"] == "int16" + shape = tuple(raw["shape"]) + assert len(shape) == 3 + values = np.frombuffer(base64.b64decode(raw["data"]), dtype=np.int16).reshape(shape).tolist() + return _DecodedRoutedExperts( + values=cast(RoutedExperts, values), + shape=cast(tuple[int, int, int], shape), + ) + routed_experts = cast(RoutedExperts, raw) seq_len = len(routed_experts) num_layers = len(routed_experts[0]) From 591914a012d0ef7085e1bed4667742a478bdc54d Mon Sep 17 00:00:00 2001 From: S1ro1 Date: Tue, 12 May 2026 19:47:56 +0530 Subject: [PATCH 3/6] Revert "fix: narrow send_cancel BaseException catch to Exception (#1198)" This reverts commit 5be2f9ee9e79a0c713b555e4ae08acf97043bdde. --- tests/test_env_server.py | 66 ------------------------ verifiers/serve/client/zmq_env_client.py | 5 +- 2 files changed, 1 insertion(+), 70 deletions(-) diff --git a/tests/test_env_server.py b/tests/test_env_server.py index 58348afc7..1d84c6160 100644 --- a/tests/test_env_server.py +++ b/tests/test_env_server.py @@ -326,72 +326,6 @@ async def fail(): await client.close() -class TestSendCancelErrorHandling: - """Tests that ``send_cancel`` swallows transport errors but not - cancellation or interrupt signals.""" - - @pytest.mark.asyncio - async def test_send_cancel_swallows_connection_error(self): - """Transport-layer failures are silently swallowed (best-effort cleanup).""" - client = make_client() - try: - client.socket.send_multipart = AsyncMock( - side_effect=ConnectionError("closed") - ) - # Should not raise - await client.send_cancel("req_1") - finally: - await client.close() - - @pytest.mark.asyncio - async def test_send_cancel_swallows_oserror(self): - """OSError from the socket layer is silently swallowed.""" - client = make_client() - try: - client.socket.send_multipart = AsyncMock(side_effect=OSError("nope")) - await client.send_cancel("req_1") - finally: - await client.close() - - @pytest.mark.asyncio - async def test_send_cancel_propagates_cancelled_error(self): - """``asyncio.CancelledError`` must propagate, not be silenced. - - Especially important in a method literally named ``send_cancel``: the - prior ``except BaseException`` was denying the caller's own - cancellation. - """ - client = make_client() - try: - client.socket.send_multipart = AsyncMock(side_effect=asyncio.CancelledError) - with pytest.raises(asyncio.CancelledError): - await client.send_cancel("req_1") - finally: - await client.close() - - @pytest.mark.asyncio - async def test_send_cancel_propagates_keyboard_interrupt(self): - """``KeyboardInterrupt`` must propagate through best-effort cleanup.""" - client = make_client() - try: - client.socket.send_multipart = AsyncMock(side_effect=KeyboardInterrupt) - with pytest.raises(KeyboardInterrupt): - await client.send_cancel("req_1") - finally: - await client.close() - - @pytest.mark.asyncio - async def test_send_cancel_propagates_system_exit(self): - """``SystemExit`` must propagate through best-effort cleanup.""" - client = make_client() - try: - client.socket.send_multipart = AsyncMock(side_effect=SystemExit) - with pytest.raises(SystemExit): - await client.send_cancel("req_1") - finally: - await client.close() - - class TestCancelForwarding: """Tests that client-side cancellation is forwarded through the router. diff --git a/verifiers/serve/client/zmq_env_client.py b/verifiers/serve/client/zmq_env_client.py index 5c2bf70f8..cd149a795 100644 --- a/verifiers/serve/client/zmq_env_client.py +++ b/verifiers/serve/client/zmq_env_client.py @@ -146,10 +146,7 @@ async def send_cancel(self, request_id: str) -> None: """Send a cancel signal (empty payload) to the server for a request.""" try: await self.socket.send_multipart([request_id.encode(), b""]) - except Exception: - # Best-effort cancel notification; transport/socket errors here are - # deliberately swallowed. Cancellation and interrupt signals still - # propagate (they are not `Exception` subclasses). + except BaseException: pass async def cancel_all_pending( From d70c07fdfd0e55523422669a949066dad3949569 Mon Sep 17 00:00:00 2001 From: S1ro1 Date: Tue, 12 May 2026 21:15:24 +0530 Subject: [PATCH 4/6] feat: keep routed experts compact --- verifiers/clients/routed_experts.py | 90 +++++++++++++++-------------- verifiers/types.py | 16 ++++- verifiers/utils/response_utils.py | 6 +- 3 files changed, 66 insertions(+), 46 deletions(-) diff --git a/verifiers/clients/routed_experts.py b/verifiers/clients/routed_experts.py index 58be685f5..9795e8f9d 100644 --- a/verifiers/clients/routed_experts.py +++ b/verifiers/clients/routed_experts.py @@ -1,40 +1,50 @@ from __future__ import annotations import base64 -from dataclasses import dataclass -from typing import Any, TypeAlias, cast +from typing import Any, Mapping, cast -import numpy as np +from verifiers.types import RoutedExperts -RoutedExperts: TypeAlias = list[list[list[int]]] +INT16_BYTES = 2 -@dataclass(frozen=True) -class _DecodedRoutedExperts: - values: RoutedExperts - shape: tuple[int, int, int] +def _shape_numel(shape: list[int]) -> int: + seq_len, num_layers, topk = shape + return seq_len * num_layers * topk -def _decode_routed_experts(raw: Any) -> _DecodedRoutedExperts: - if isinstance(raw, dict): - assert raw["encoding"] == "base64" - assert raw["dtype"] == "int16" - shape = tuple(raw["shape"]) - assert len(shape) == 3 - values = np.frombuffer(base64.b64decode(raw["data"]), dtype=np.int16).reshape(shape).tolist() - return _DecodedRoutedExperts( - values=cast(RoutedExperts, values), - shape=cast(tuple[int, int, int], shape), - ) +def _token_stride(shape: list[int]) -> int: + return shape[1] * shape[2] * INT16_BYTES + + +def _validate_routed_experts(payload: RoutedExperts) -> RoutedExperts: + assert payload.dtype == "int16" + assert len(payload.data) == _shape_numel(payload.shape) * INT16_BYTES + return payload + + +def _decode_routed_experts(raw: Any) -> RoutedExperts: + if isinstance(raw, RoutedExperts): + return _validate_routed_experts(raw) - routed_experts = cast(RoutedExperts, raw) - seq_len = len(routed_experts) - num_layers = len(routed_experts[0]) - topk = len(routed_experts[0][0]) + if hasattr(raw, "model_dump"): + raw = raw.model_dump(mode="python") - return _DecodedRoutedExperts( - values=routed_experts, - shape=(seq_len, num_layers, topk), + raw = cast(Mapping[str, Any], raw) + assert raw["encoding"] == "base64" + assert raw["dtype"] == "int16" + shape = [int(dim) for dim in raw["shape"]] + data = base64.b64decode(raw["data"]) + return _validate_routed_experts(RoutedExperts(shape=shape, data=data)) + + +def slice_routed_experts(payload: RoutedExperts, end: int) -> RoutedExperts: + payload = _validate_routed_experts(payload) + assert 0 <= end <= payload.shape[0] + stride = _token_stride(payload.shape) + return RoutedExperts( + shape=[end, payload.shape[1], payload.shape[2]], + data=payload.data[: end * stride], ) @@ -45,17 +55,9 @@ def compose_split_routed_experts( prompt_len: int, completion_len: int, ) -> RoutedExperts | None: - """Compose split routed experts and align them to prompt + completion tokens. - - vLLM returns prompt routing at the response level and generated-token routing - on the choice. The final generated token has no routing decision because it - was not fed into another forward pass, so this appends one zero entry when a - completion exists. - """ + """Compose split prompt/completion routing into compact int16 bytes.""" - has_prompt = prompt_routed_experts is not None - has_completion = completion_routed_experts is not None - if not has_prompt and not has_completion: + if prompt_routed_experts is None and completion_routed_experts is None: return None prompt = _decode_routed_experts(prompt_routed_experts) @@ -63,16 +65,20 @@ def compose_split_routed_experts( expected_completion_routed_len = max(completion_len - 1, 0) if expected_completion_routed_len == 0: - completion_values = [] + completion_data = b"" else: completion = _decode_routed_experts(completion_routed_experts) assert completion.shape[1:] == prompt.shape[1:] assert completion.shape[0] == expected_completion_routed_len - completion_values = completion.values + completion_data = completion.data if completion_len == 0: - return prompt.values + return prompt - assert prompt.shape[1] > 0 and prompt.shape[2] > 0 - zero_entry = [[0] * prompt.shape[2] for _ in range(prompt.shape[1])] - return prompt.values + completion_values + [zero_entry] + stride = _token_stride(prompt.shape) + return _validate_routed_experts( + RoutedExperts( + shape=[prompt_len + completion_len, prompt.shape[1], prompt.shape[2]], + data=prompt.data + completion_data + (b"\0" * stride), + ) + ) diff --git a/verifiers/types.py b/verifiers/types.py index f0dc4ac55..3d8184232 100644 --- a/verifiers/types.py +++ b/verifiers/types.py @@ -172,13 +172,25 @@ class Usage(CustomBaseModel): total_tokens: int +class RoutedExperts(CustomBaseModel): + dtype: Literal["int16"] = "int16" + shape: list[int] + data: bytes + + @field_validator("shape") + @classmethod + def validate_shape(cls, value: list[int]) -> list[int]: + assert len(value) == 3 + return value + + class ResponseTokens(CustomBaseModel): prompt_ids: list[int] prompt_mask: list[int] completion_ids: list[int] completion_mask: list[int] completion_logprobs: list[float] - routed_experts: list[list[list[int]]] | None = None # [seq_len, layers, topk] + routed_experts: RoutedExperts | None = None # [seq_len, layers, topk] FinishReason = Literal["stop", "length", "tool_calls"] | None @@ -215,7 +227,7 @@ class TrajectoryStepTokens(TypedDict): completion_logprobs: list[float] overlong_prompt: bool is_truncated: bool - routed_experts: list[list[list[int]]] | None # [seq_len, layers, topk] + routed_experts: RoutedExperts | None # [seq_len, layers, topk] class TokenUsage(TypedDict): diff --git a/verifiers/utils/response_utils.py b/verifiers/utils/response_utils.py index 778570886..ff50dade0 100644 --- a/verifiers/utils/response_utils.py +++ b/verifiers/utils/response_utils.py @@ -1,3 +1,4 @@ +from verifiers.clients.routed_experts import slice_routed_experts from verifiers.types import ( AssistantMessage, Messages, @@ -47,14 +48,15 @@ async def parse_response_tokens( completion_ids = [] completion_mask = [] completion_logprobs = [] - routed_experts = [] if routed_experts is not None else None + if routed_experts is not None: + routed_experts = slice_routed_experts(routed_experts, max_seq_len) elif prompt_len + completion_len > max_seq_len: is_truncated = True completion_ids = tokens.completion_ids[: max_seq_len - prompt_len] completion_mask = tokens.completion_mask[: max_seq_len - prompt_len] completion_logprobs = tokens.completion_logprobs[: max_seq_len - prompt_len] if routed_experts is not None: - routed_experts = routed_experts[:max_seq_len] + routed_experts = slice_routed_experts(routed_experts, max_seq_len) else: is_truncated = False else: From 2423ba34800e4da69b00f67fa7289a7524b55009 Mon Sep 17 00:00:00 2001 From: S1ro1 Date: Wed, 13 May 2026 00:22:43 +0530 Subject: [PATCH 5/6] Revert "Revert "fix: narrow send_cancel BaseException catch to Exception (#1198)"" This reverts commit 591914a012d0ef7085e1bed4667742a478bdc54d. --- tests/test_env_server.py | 66 ++++++++++++++++++++++++ verifiers/serve/client/zmq_env_client.py | 5 +- 2 files changed, 70 insertions(+), 1 deletion(-) diff --git a/tests/test_env_server.py b/tests/test_env_server.py index 1d84c6160..58348afc7 100644 --- a/tests/test_env_server.py +++ b/tests/test_env_server.py @@ -326,6 +326,72 @@ async def fail(): await client.close() +class TestSendCancelErrorHandling: + """Tests that ``send_cancel`` swallows transport errors but not + cancellation or interrupt signals.""" + + @pytest.mark.asyncio + async def test_send_cancel_swallows_connection_error(self): + """Transport-layer failures are silently swallowed (best-effort cleanup).""" + client = make_client() + try: + client.socket.send_multipart = AsyncMock( + side_effect=ConnectionError("closed") + ) + # Should not raise + await client.send_cancel("req_1") + finally: + await client.close() + + @pytest.mark.asyncio + async def test_send_cancel_swallows_oserror(self): + """OSError from the socket layer is silently swallowed.""" + client = make_client() + try: + client.socket.send_multipart = AsyncMock(side_effect=OSError("nope")) + await client.send_cancel("req_1") + finally: + await client.close() + + @pytest.mark.asyncio + async def test_send_cancel_propagates_cancelled_error(self): + """``asyncio.CancelledError`` must propagate, not be silenced. + + Especially important in a method literally named ``send_cancel``: the + prior ``except BaseException`` was denying the caller's own + cancellation. + """ + client = make_client() + try: + client.socket.send_multipart = AsyncMock(side_effect=asyncio.CancelledError) + with pytest.raises(asyncio.CancelledError): + await client.send_cancel("req_1") + finally: + await client.close() + + @pytest.mark.asyncio + async def test_send_cancel_propagates_keyboard_interrupt(self): + """``KeyboardInterrupt`` must propagate through best-effort cleanup.""" + client = make_client() + try: + client.socket.send_multipart = AsyncMock(side_effect=KeyboardInterrupt) + with pytest.raises(KeyboardInterrupt): + await client.send_cancel("req_1") + finally: + await client.close() + + @pytest.mark.asyncio + async def test_send_cancel_propagates_system_exit(self): + """``SystemExit`` must propagate through best-effort cleanup.""" + client = make_client() + try: + client.socket.send_multipart = AsyncMock(side_effect=SystemExit) + with pytest.raises(SystemExit): + await client.send_cancel("req_1") + finally: + await client.close() + + class TestCancelForwarding: """Tests that client-side cancellation is forwarded through the router. diff --git a/verifiers/serve/client/zmq_env_client.py b/verifiers/serve/client/zmq_env_client.py index cd149a795..5c2bf70f8 100644 --- a/verifiers/serve/client/zmq_env_client.py +++ b/verifiers/serve/client/zmq_env_client.py @@ -146,7 +146,10 @@ async def send_cancel(self, request_id: str) -> None: """Send a cancel signal (empty payload) to the server for a request.""" try: await self.socket.send_multipart([request_id.encode(), b""]) - except BaseException: + except Exception: + # Best-effort cancel notification; transport/socket errors here are + # deliberately swallowed. Cancellation and interrupt signals still + # propagate (they are not `Exception` subclasses). pass async def cancel_all_pending( From 0c6fcff41605bd1d5f654e42069b0dda52e9419a Mon Sep 17 00:00:00 2001 From: S1ro1 Date: Wed, 13 May 2026 00:30:57 +0530 Subject: [PATCH 6/6] chore: narrow routed experts payload handling --- .../clients/openai_chat_completions_client.py | 26 ++++++++--------- .../clients/openai_completions_client.py | 28 ++++++++----------- verifiers/clients/renderer_client.py | 21 ++++++-------- verifiers/clients/routed_experts.py | 1 + 4 files changed, 34 insertions(+), 42 deletions(-) diff --git a/verifiers/clients/openai_chat_completions_client.py b/verifiers/clients/openai_chat_completions_client.py index c2eab2a78..84cd671f3 100644 --- a/verifiers/clients/openai_chat_completions_client.py +++ b/verifiers/clients/openai_chat_completions_client.py @@ -457,22 +457,20 @@ def parse_tokens(response: OpenAIChatResponse) -> ResponseTokens | None: logprobs_content = response.choices[0].logprobs["content"] completion_logprobs = [token["logprob"] for token in logprobs_content] - response_any = cast(Any, response) - choice_any = cast(Any, choice) - if hasattr(response_any, "prompt_routed_experts") or hasattr( - choice_any, "routed_experts" + response_extra = response.model_extra or {} + choice_extra = choice.model_extra or {} + if ( + "prompt_routed_experts" not in response_extra + and "routed_experts" not in choice_extra ): - prompt_routed_experts = response_any.prompt_routed_experts - completion_routed_experts = choice_any.routed_experts + routed_experts = None else: - prompt_routed_experts = None - completion_routed_experts = None - routed_experts = compose_split_routed_experts( - prompt_routed_experts=prompt_routed_experts, - completion_routed_experts=completion_routed_experts, - prompt_len=len(prompt_ids), - completion_len=len(completion_ids), - ) + routed_experts = compose_split_routed_experts( + prompt_routed_experts=response_extra["prompt_routed_experts"], + completion_routed_experts=choice_extra["routed_experts"], + prompt_len=len(prompt_ids), + completion_len=len(completion_ids), + ) return ResponseTokens( prompt_ids=prompt_ids, prompt_mask=prompt_mask, diff --git a/verifiers/clients/openai_completions_client.py b/verifiers/clients/openai_completions_client.py index b24ad0626..1fe82f849 100644 --- a/verifiers/clients/openai_completions_client.py +++ b/verifiers/clients/openai_completions_client.py @@ -1,5 +1,3 @@ -from typing import Any, cast - from openai import ( AsyncOpenAI, ) @@ -173,22 +171,20 @@ def parse_tokens(response: OpenAITextResponse) -> ResponseTokens | None: if completion_logprobs is None: return None choice = response.choices[0] - response_any = cast(Any, response) - choice_any = cast(Any, choice) - if hasattr(response_any, "prompt_routed_experts") or hasattr( - choice_any, "routed_experts" + response_extra = response.model_extra or {} + choice_extra = choice.model_extra or {} + if ( + "prompt_routed_experts" not in response_extra + and "routed_experts" not in choice_extra ): - prompt_routed_experts = response_any.prompt_routed_experts - completion_routed_experts = choice_any.routed_experts + routed_experts = None else: - prompt_routed_experts = None - completion_routed_experts = None - routed_experts = compose_split_routed_experts( - prompt_routed_experts=prompt_routed_experts, - completion_routed_experts=completion_routed_experts, - prompt_len=len(prompt_ids), - completion_len=len(completion_ids), - ) + routed_experts = compose_split_routed_experts( + prompt_routed_experts=response_extra["prompt_routed_experts"], + completion_routed_experts=choice_extra["routed_experts"], + prompt_len=len(prompt_ids), + completion_len=len(completion_ids), + ) return ResponseTokens( prompt_ids=prompt_ids, prompt_mask=prompt_mask, diff --git a/verifiers/clients/renderer_client.py b/verifiers/clients/renderer_client.py index cfc881ec8..c689564d7 100644 --- a/verifiers/clients/renderer_client.py +++ b/verifiers/clients/renderer_client.py @@ -573,20 +573,17 @@ async def from_native_response(self, response: dict[str, Any]) -> Response: completion_ids = response.get("completion_ids", []) completion_logprobs = response.get("completion_logprobs", []) if ( - "prompt_routed_experts" in response - or "completion_routed_experts" in response + "prompt_routed_experts" not in response + and "completion_routed_experts" not in response ): - prompt_routed_experts = response["prompt_routed_experts"] - completion_routed_experts = response["completion_routed_experts"] + routed_experts = None else: - prompt_routed_experts = None - completion_routed_experts = None - routed_experts = compose_split_routed_experts( - prompt_routed_experts=prompt_routed_experts, - completion_routed_experts=completion_routed_experts, - prompt_len=len(prompt_ids), - completion_len=len(completion_ids), - ) + routed_experts = compose_split_routed_experts( + prompt_routed_experts=response["prompt_routed_experts"], + completion_routed_experts=response["completion_routed_experts"], + prompt_len=len(prompt_ids), + completion_len=len(completion_ids), + ) tokens = ResponseTokens( prompt_ids=prompt_ids, diff --git a/verifiers/clients/routed_experts.py b/verifiers/clients/routed_experts.py index 9795e8f9d..080c491a9 100644 --- a/verifiers/clients/routed_experts.py +++ b/verifiers/clients/routed_experts.py @@ -19,6 +19,7 @@ def _token_stride(shape: list[int]) -> int: def _validate_routed_experts(payload: RoutedExperts) -> RoutedExperts: assert payload.dtype == "int16" + assert len(payload.shape) == 3 assert len(payload.data) == _shape_numel(payload.shape) * INT16_BYTES return payload