Skip to content

Commit a7fb33a

Browse files
rustyconoverclaude
andcommitted
Base64-encode binary state token in Arrow IPC metadata for UTF-8 safety
The STATE_KEY metadata value contained raw binary data (HMAC-signed token with serialized Arrow IPC and SHA-256 digest) that violated the Arrow IPC requirement for UTF-8 metadata values. This broke cross-language Arrow consumers. - Rename STATE_KEY from vgi_rpc.stream_state to vgi_rpc.stream_state#b64 to signal that the value is base64-encoded binary data - Base64-encode in _pack_state_token, base64-decode in _unpack_state_token - Add tests for UTF-8 validity and pack/unpack roundtrip - Update wire protocol docs, README, and docstrings Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 163e448 commit a7fb33a

8 files changed

Lines changed: 59 additions & 19 deletions

File tree

CLAUDE.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ Then you can commit.
5959

6060
- **`logging_utils.py`**`VgiJsonFormatter`, a `logging.Formatter` subclass that serializes log records as single-line JSON. Not auto-imported; must be imported explicitly from `vgi_rpc.logging_utils`.
6161

62-
- **`metadata.py`** — Shared helpers for `pa.KeyValueMetadata`. Centralises well-known metadata key constants (`vgi_rpc.method`, `vgi_rpc.stream_state`, `vgi_rpc.log_level`, `vgi_rpc.log_message`, `vgi_rpc.log_extra`, `vgi_rpc.server_id`, `vgi_rpc.request_version`, `vgi_rpc.location`, `vgi_rpc.shm_offset`, etc.) and provides encoding, merging, and key-stripping utilities used by `rpc/`, `http/`, `log.py`, `external.py`, `shm.py`, and `introspect.py`.
62+
- **`metadata.py`** — Shared helpers for `pa.KeyValueMetadata`. Centralises well-known metadata key constants (`vgi_rpc.method`, `vgi_rpc.stream_state#b64`, `vgi_rpc.log_level`, `vgi_rpc.log_message`, `vgi_rpc.log_extra`, `vgi_rpc.server_id`, `vgi_rpc.request_version`, `vgi_rpc.location`, `vgi_rpc.shm_offset`, etc.) and provides encoding, merging, and key-stripping utilities used by `rpc/`, `http/`, `log.py`, `external.py`, `shm.py`, and `introspect.py`.
6363

6464
- **`introspect.py`** — Introspection support. Provides the built-in `__describe__` RPC method, `MethodDescription`, `ServiceDescription`, `build_describe_batch`, `parse_describe_batch`, and `introspect()`. Enabled on `RpcServer` via `enable_describe=True`.
6565

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1420,7 +1420,7 @@ All framework metadata keys live in the `vgi_rpc.` namespace:
14201420
|---|---|---|
14211421
| `vgi_rpc.method` | batch metadata | Target RPC method name |
14221422
| `vgi_rpc.request_version` | batch metadata | Wire protocol version (`"1"`) |
1423-
| `vgi_rpc.stream_state` | batch metadata | Serialized stream state (HTTP transport) |
1423+
| `vgi_rpc.stream_state#b64` | batch metadata | Base64-encoded serialized stream state (HTTP transport). The `#b64` suffix indicates the value is base64-encoded binary data. |
14241424
| `vgi_rpc.log_level` | batch metadata | Log level on zero-row log/error batches |
14251425
| `vgi_rpc.log_message` | batch metadata | Log message text |
14261426
| `vgi_rpc.log_extra` | batch metadata | JSON-encoded extra fields |
@@ -1480,7 +1480,7 @@ All endpoints use `Content-Type: application/vnd.apache.arrow.stream`.
14801480
| `{prefix}/{method}/init` | POST | Stream initialization (producer and exchange) |
14811481
| `{prefix}/{method}/exchange` | POST | Stream continuation (producer and exchange) |
14821482

1483-
Over HTTP, streaming is **stateless**: each exchange carries serialized `StreamState` in a signed token in the `vgi_rpc.stream_state` batch metadata key. Producer stream init returns data batches directly; exchange stream init returns a state token.
1483+
Over HTTP, streaming is **stateless**: each exchange carries serialized `StreamState` in a signed token in the `vgi_rpc.stream_state#b64` batch metadata key. The token is base64-encoded to ensure the metadata value is valid UTF-8. Producer stream init returns data batches directly; exchange stream init returns a state token.
14841484

14851485
For streams with headers, the `/init` response body contains the header IPC stream prepended to the main output IPC stream. The `/exchange` endpoint never re-sends the header — it is only included in the initial response.
14861486

docs/WIRE_PROTOCOL.md

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ where they appear, and their semantics:
8383

8484
| Key (bytes) | Value | Description |
8585
|-------------|-------|-------------|
86-
| `vgi_rpc.stream_state` | Opaque binary (signed token) | Serialized stream state for stateless HTTP exchanges. |
86+
| `vgi_rpc.stream_state#b64` | Base64-encoded binary (signed token) | Serialized stream state for stateless HTTP exchanges. The `#b64` suffix signals that the value is base64-encoded binary data. |
8787

8888
### Shared memory pointer batch metadata
8989

@@ -277,7 +277,7 @@ receive(batch, custom_metadata):
277277
IF custom_metadata contains "vgi_rpc.shm_offset":
278278
→ SHM POINTER batch → resolve via shared memory (see Section 11)
279279
280-
IF custom_metadata contains "vgi_rpc.stream_state":
280+
IF custom_metadata contains "vgi_rpc.stream_state#b64":
281281
→ STATE TOKEN batch → stream continuation (see Section 10)
282282
283283
// Zero-row batch with unrecognized metadata
@@ -578,7 +578,7 @@ Response body:
578578

579579
All produced data batches are included inline. If the response would exceed
580580
`max_stream_response_bytes`, the server truncates the output and appends a
581-
**continuation batch**: a zero-row batch with `vgi_rpc.stream_state` in its
581+
**continuation batch**: a zero-row batch with `vgi_rpc.stream_state#b64` in its
582582
custom metadata. The client then follows up with `/exchange` requests.
583583

584584
#### Exchange stream init response
@@ -590,7 +590,7 @@ Response body:
590590
```
591591

592592
The zero-row batch carries the signed state token in
593-
`vgi_rpc.stream_state` custom metadata.
593+
`vgi_rpc.stream_state#b64` custom metadata.
594594

595595
### Stream exchange (HTTP)
596596

@@ -601,8 +601,8 @@ Request body: IPC stream (input_schema, 1 input batch with state token in metad
601601
Response body: IPC stream (output_schema, 0..N log batches, 1 data batch with updated state token, EOS)
602602
```
603603

604-
The request batch's custom metadata MUST contain `vgi_rpc.stream_state`
605-
with the current state token.
604+
The request batch's custom metadata MUST contain `vgi_rpc.stream_state#b64`
605+
with the current state token (base64-encoded).
606606

607607
For **producer continuation**, the input is a zero-row batch on empty schema
608608
with the state token. The response may contain multiple data batches and
@@ -611,12 +611,13 @@ may end with another continuation token.
611611
For **exchange**, the input carries real data plus the state token. The
612612
response data batch carries an updated state token for the next exchange.
613613

614-
The client MUST strip `vgi_rpc.stream_state` from the batch metadata before
614+
The client MUST strip `vgi_rpc.stream_state#b64` from the batch metadata before
615615
exposing it to application code.
616616

617617
### State token binary format
618618

619-
The state token is an opaque signed blob with the following wire format (v2):
619+
The state token is an opaque signed blob, base64-encoded for UTF-8 safe
620+
metadata storage. After base64-decoding, the binary wire format (v2) is:
620621

621622
```
622623
Offset Size Field

tests/test_http.py

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -280,6 +280,7 @@ def test_tampered_token_400(self, resumable_client: _SyncTestClient) -> None:
280280

281281
def test_wrong_token_version_400(self, resumable_client: _SyncTestClient) -> None:
282282
"""Token with an unsupported version byte returns 400."""
283+
import base64
283284
import hashlib
284285
import hmac as hmac_mod
285286
import struct
@@ -305,7 +306,7 @@ def test_wrong_token_version_400(self, resumable_client: _SyncTestClient) -> Non
305306
+ input_bytes
306307
)
307308
mac = hmac_mod.new(b"test-key", payload, hashlib.sha256).digest()
308-
token = payload + mac
309+
token = base64.b64encode(payload + mac)
309310

310311
req_buf = BytesIO()
311312
state_md = pa.KeyValueMetadata({STATE_KEY: token})
@@ -360,6 +361,7 @@ def test_union_state_uses_numeric_tag(self) -> None:
360361

361362
def test_expired_token_400(self) -> None:
362363
"""Token with a timestamp 2 hours in the past is rejected as expired."""
364+
import base64
363365
import hashlib
364366
import hmac as hmac_mod
365367
import struct
@@ -386,7 +388,7 @@ def test_expired_token_400(self) -> None:
386388
+ input_bytes
387389
)
388390
mac = hmac_mod.new(b"test-key", payload, hashlib.sha256).digest()
389-
token = payload + mac
391+
token = base64.b64encode(payload + mac)
390392

391393
req_buf = BytesIO()
392394
state_md = pa.KeyValueMetadata({STATE_KEY: token})
@@ -411,6 +413,7 @@ def test_expired_token_400(self) -> None:
411413

412414
def test_token_ttl_zero_disables_expiry(self) -> None:
413415
"""Token with old timestamp is accepted when token_ttl=0."""
416+
import base64
414417
import hashlib
415418
import hmac as hmac_mod
416419
import struct
@@ -439,7 +442,7 @@ def test_token_ttl_zero_disables_expiry(self) -> None:
439442
+ input_bytes
440443
)
441444
mac = hmac_mod.new(b"test-key", payload, hashlib.sha256).digest()
442-
token = payload + mac
445+
token = base64.b64encode(payload + mac)
443446

444447
req_buf = BytesIO()
445448
state_md = pa.KeyValueMetadata({STATE_KEY: token})
@@ -465,6 +468,33 @@ def test_token_ttl_zero_disables_expiry(self) -> None:
465468
assert "expired" not in err.error_message.lower()
466469
c.close()
467470

471+
def test_pack_token_is_valid_utf8(self) -> None:
472+
"""Packed state token is valid UTF-8 (base64-encoded)."""
473+
from vgi_rpc.http._server import _pack_state_token
474+
475+
# Use bytes with high bytes that are invalid UTF-8 on their own
476+
token = _pack_state_token(b"\x00\xff\xfe", b"\x80\x81", b"\x90\x91", b"key", 0)
477+
# Token must be valid UTF-8 (base64 produces only ASCII)
478+
token.decode("utf-8") # Should not raise
479+
480+
def test_pack_unpack_roundtrip_with_base64(self) -> None:
481+
"""Pack and unpack produce consistent results through base64 encoding."""
482+
from vgi_rpc.http._server import _pack_state_token, _unpack_state_token
483+
484+
state = b"test-state-data"
485+
schema = b"test-schema-data"
486+
input_schema = b"test-input-schema"
487+
key = b"signing-key"
488+
489+
token = _pack_state_token(state, schema, input_schema, key, 1000)
490+
# Verify it's valid UTF-8
491+
token.decode("utf-8")
492+
493+
s, sch, inp = _unpack_state_token(token, key)
494+
assert s == state
495+
assert sch == schema
496+
assert inp == input_schema
497+
468498

469499
# ---------------------------------------------------------------------------
470500
# Tests: ExternalStorage over HTTP transport

vgi_rpc/http/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
- **Stream Exchange**: ``POST /vgi/{method}/exchange``
1616
1717
Streaming is implemented statelessly: each exchange is a separate HTTP POST
18-
carrying serialized state in Arrow custom metadata (``vgi_rpc.stream_state``).
18+
carrying serialized state in Arrow custom metadata (``vgi_rpc.stream_state#b64``).
1919
When ``max_stream_response_bytes`` is set, producer-stream responses are
2020
split across multiple exchanges; the client transparently resumes via
2121
``POST /vgi/{method}/exchange``.

vgi_rpc/http/_server.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
from __future__ import annotations
1212

13+
import base64
1314
import hashlib
1415
import hmac
1516
import html as _html
@@ -189,7 +190,7 @@ def _pack_state_token(
189190
created_at: Token creation time as seconds since epoch.
190191
191192
Returns:
192-
The opaque signed token.
193+
The opaque signed token, base64-encoded for UTF-8 safe metadata.
193194
194195
"""
195196
payload = (
@@ -203,7 +204,7 @@ def _pack_state_token(
203204
+ input_schema_bytes
204205
)
205206
mac = hmac.new(signing_key, payload, hashlib.sha256).digest()
206-
return payload + mac
207+
return base64.b64encode(payload + mac)
207208

208209

209210
def _unpack_state_token(token: bytes, signing_key: bytes, token_ttl: int = 0) -> tuple[bytes, bytes, bytes]:
@@ -222,6 +223,14 @@ def _unpack_state_token(token: bytes, signing_key: bytes, token_ttl: int = 0) ->
222223
_RpcHttpError: On malformed, tampered, or expired tokens (HTTP 400).
223224
224225
"""
226+
try:
227+
token = base64.b64decode(token, validate=True)
228+
except Exception:
229+
raise _RpcHttpError(
230+
RuntimeError("Malformed state token"),
231+
status_code=HTTPStatus.BAD_REQUEST,
232+
)
233+
225234
if len(token) < _MIN_TOKEN_LEN:
226235
raise _RpcHttpError(
227236
RuntimeError("Malformed state token"),

vgi_rpc/metadata.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@
4646
# ---------------------------------------------------------------------------
4747

4848
RPC_METHOD_KEY = b"vgi_rpc.method"
49-
STATE_KEY = b"vgi_rpc.stream_state"
49+
STATE_KEY = b"vgi_rpc.stream_state#b64"
5050
LOG_LEVEL_KEY = b"vgi_rpc.log_level"
5151
LOG_MESSAGE_KEY = b"vgi_rpc.log_message"
5252
LOG_EXTRA_KEY = b"vgi_rpc.log_extra"

vgi_rpc/rpc/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@
5151
5252
Over HTTP, streaming is stateless: each exchange is a separate
5353
``POST /vgi/{method}/exchange`` carrying the input batch and serialized
54-
``StreamState`` in Arrow custom metadata (``vgi_rpc.stream_state``).
54+
``StreamState`` in Arrow custom metadata (``vgi_rpc.stream_state#b64``).
5555
5656
State-Based Stream Model
5757
-------------------------

0 commit comments

Comments
 (0)