Skip to content

Commit 48e77ba

Browse files
rustyconoverclaude
andcommitted
Add rich multi-type headers and dynamic schema streams to conformance suite
Exercises the full breadth of Arrow type mappings (18 fields including enums, nested structs, optionals, lists, dicts, annotated types) in stream headers with seed-based deterministic values for cross-language conformance testing. Adds dynamic-schema producer with variable column sets and exchange variant for HTTP state token round-trip coverage. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent d8c9984 commit 48e77ba

6 files changed

Lines changed: 543 additions & 0 deletions

File tree

tests/test_conformance.py

Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,10 @@
2323
ConformanceService,
2424
ConformanceServiceImpl,
2525
Point,
26+
RichHeader,
2627
Status,
28+
build_dynamic_schema,
29+
build_rich_header,
2730
)
2831
from vgi_rpc.log import Level, Message
2932
from vgi_rpc.rpc import AnnotatedBatch, MethodType, RpcError, RpcServer, make_pipe_pair, rpc_methods
@@ -970,3 +973,167 @@ def test_describe_via_rpc(self) -> None:
970973
finally:
971974
client_transport.close()
972975
thread.join(timeout=5)
976+
977+
978+
# ---------------------------------------------------------------------------
979+
# Dynamic Streams With Rich Multi-Type Headers
980+
# ---------------------------------------------------------------------------
981+
982+
983+
def _assert_rich_header(actual: RichHeader, seed: int) -> None:
984+
"""Assert all fields of a ``RichHeader`` match the expected seed values."""
985+
expected = build_rich_header(seed)
986+
assert actual.str_field == expected.str_field
987+
assert actual.bytes_field == expected.bytes_field
988+
assert actual.int_field == expected.int_field
989+
assert actual.float_field == pytest.approx(expected.float_field)
990+
assert actual.bool_field == expected.bool_field
991+
assert actual.list_of_int == expected.list_of_int
992+
assert actual.list_of_str == expected.list_of_str
993+
assert actual.dict_field == expected.dict_field
994+
assert actual.enum_field == expected.enum_field
995+
assert actual.nested_point.x == pytest.approx(expected.nested_point.x)
996+
assert actual.nested_point.y == pytest.approx(expected.nested_point.y)
997+
assert actual.optional_str == expected.optional_str
998+
assert actual.optional_int == expected.optional_int
999+
if expected.optional_nested is None:
1000+
assert actual.optional_nested is None
1001+
else:
1002+
assert actual.optional_nested is not None
1003+
assert actual.optional_nested.x == pytest.approx(expected.optional_nested.x)
1004+
assert actual.optional_nested.y == pytest.approx(expected.optional_nested.y)
1005+
assert len(actual.list_of_nested) == len(expected.list_of_nested)
1006+
for a_pt, e_pt in zip(actual.list_of_nested, expected.list_of_nested, strict=True):
1007+
assert a_pt.x == pytest.approx(e_pt.x)
1008+
assert a_pt.y == pytest.approx(e_pt.y)
1009+
assert actual.nested_list == expected.nested_list
1010+
assert actual.annotated_int32 == expected.annotated_int32
1011+
assert actual.annotated_float32 == pytest.approx(expected.annotated_float32)
1012+
assert actual.dict_str_str == expected.dict_str_str
1013+
1014+
1015+
class TestDynamicRichHeader:
1016+
"""Test producer streams with rich multi-type headers."""
1017+
1018+
def test_seed_42(self, conformance_conn: ConnFactory) -> None:
1019+
"""Rich header with seed=42: PENDING, bool=True, opt_nested present."""
1020+
with conformance_conn() as proxy:
1021+
session = proxy.produce_with_rich_header(seed=42, count=3)
1022+
header = session.header
1023+
assert header is not None
1024+
assert isinstance(header, RichHeader)
1025+
_assert_rich_header(header, 42)
1026+
batches = list(session)
1027+
assert len(batches) == 3
1028+
for i, ab in enumerate(batches):
1029+
assert ab.batch.column("index")[0].as_py() == i
1030+
assert ab.batch.column("value")[0].as_py() == i * 10
1031+
1032+
def test_seed_7(self, conformance_conn: ConnFactory) -> None:
1033+
"""Rich header with seed=7: ACTIVE, bool=False, opt_int present."""
1034+
with conformance_conn() as proxy:
1035+
session = proxy.produce_with_rich_header(seed=7, count=2)
1036+
header = session.header
1037+
assert header is not None
1038+
assert isinstance(header, RichHeader)
1039+
_assert_rich_header(header, 7)
1040+
batches = list(session)
1041+
assert len(batches) == 2
1042+
1043+
def test_seed_0(self, conformance_conn: ConnFactory) -> None:
1044+
"""Rich header with seed=0: edge case zeros."""
1045+
with conformance_conn() as proxy:
1046+
session = proxy.produce_with_rich_header(seed=0, count=1)
1047+
header = session.header
1048+
assert header is not None
1049+
assert isinstance(header, RichHeader)
1050+
_assert_rich_header(header, 0)
1051+
batches = list(session)
1052+
assert len(batches) == 1
1053+
1054+
1055+
class TestDynamicSchemaProducer:
1056+
"""Test producer streams with dynamic output schema and rich header."""
1057+
1058+
def test_all_columns(self, conformance_conn: ConnFactory) -> None:
1059+
"""Dynamic schema with all columns: index + label + score."""
1060+
with conformance_conn() as proxy:
1061+
session = proxy.produce_dynamic_schema(seed=42, count=3, include_strings=True, include_floats=True)
1062+
header = session.header
1063+
assert header is not None
1064+
assert isinstance(header, RichHeader)
1065+
_assert_rich_header(header, 42)
1066+
batches = list(session)
1067+
assert len(batches) == 3
1068+
expected_schema = build_dynamic_schema(include_strings=True, include_floats=True)
1069+
for i, ab in enumerate(batches):
1070+
assert ab.batch.schema.equals(expected_schema)
1071+
assert ab.batch.column("index")[0].as_py() == i
1072+
assert ab.batch.column("label")[0].as_py() == f"row-{i}"
1073+
assert ab.batch.column("score")[0].as_py() == pytest.approx(i * 1.5)
1074+
1075+
def test_strings_only(self, conformance_conn: ConnFactory) -> None:
1076+
"""Dynamic schema with strings only: index + label."""
1077+
with conformance_conn() as proxy:
1078+
session = proxy.produce_dynamic_schema(seed=7, count=2, include_strings=True, include_floats=False)
1079+
header = session.header
1080+
assert header is not None
1081+
_assert_rich_header(header, 7)
1082+
batches = list(session)
1083+
assert len(batches) == 2
1084+
for i, ab in enumerate(batches):
1085+
assert ab.batch.schema.names == ["index", "label"]
1086+
assert ab.batch.column("label")[0].as_py() == f"row-{i}"
1087+
1088+
def test_floats_only(self, conformance_conn: ConnFactory) -> None:
1089+
"""Dynamic schema with floats only: index + score."""
1090+
with conformance_conn() as proxy:
1091+
session = proxy.produce_dynamic_schema(seed=5, count=2, include_strings=False, include_floats=True)
1092+
header = session.header
1093+
assert header is not None
1094+
_assert_rich_header(header, 5)
1095+
batches = list(session)
1096+
assert len(batches) == 2
1097+
for i, ab in enumerate(batches):
1098+
assert ab.batch.schema.names == ["index", "score"]
1099+
assert ab.batch.column("score")[0].as_py() == pytest.approx(i * 1.5)
1100+
1101+
def test_minimal(self, conformance_conn: ConnFactory) -> None:
1102+
"""Dynamic schema minimal: index only."""
1103+
with conformance_conn() as proxy:
1104+
session = proxy.produce_dynamic_schema(seed=0, count=1, include_strings=False, include_floats=False)
1105+
header = session.header
1106+
assert header is not None
1107+
_assert_rich_header(header, 0)
1108+
batches = list(session)
1109+
assert len(batches) == 1
1110+
assert batches[0].batch.schema.names == ["index"]
1111+
assert batches[0].batch.column("index")[0].as_py() == 0
1112+
1113+
1114+
class TestRichHeaderExchange:
1115+
"""Test exchange streams with rich multi-type headers."""
1116+
1117+
def test_header_then_exchange(self, conformance_conn: ConnFactory) -> None:
1118+
"""Exchange with rich header seed=5, factor=2.5."""
1119+
with conformance_conn() as proxy:
1120+
session = proxy.exchange_with_rich_header(seed=5, factor=2.5)
1121+
header = session.header
1122+
assert header is not None
1123+
assert isinstance(header, RichHeader)
1124+
_assert_rich_header(header, 5)
1125+
with session:
1126+
out = session.exchange(AnnotatedBatch.from_pydict({"value": [4.0]}))
1127+
assert out.batch.column("value")[0].as_py() == pytest.approx(10.0)
1128+
1129+
def test_different_seed(self, conformance_conn: ConnFactory) -> None:
1130+
"""Exchange with rich header seed=12, factor=1.0."""
1131+
with conformance_conn() as proxy:
1132+
session = proxy.exchange_with_rich_header(seed=12, factor=1.0)
1133+
header = session.header
1134+
assert header is not None
1135+
assert isinstance(header, RichHeader)
1136+
_assert_rich_header(header, 12)
1137+
with session:
1138+
out = session.exchange(AnnotatedBatch.from_pydict({"value": [7.0]}))
1139+
assert out.batch.column("value")[0].as_py() == pytest.approx(7.0)

vgi_rpc/conformance/__init__.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,10 @@
2929
BoundingBox,
3030
ConformanceHeader,
3131
Point,
32+
RichHeader,
3233
Status,
34+
build_dynamic_schema,
35+
build_rich_header,
3336
)
3437

3538
__all__ = [
@@ -42,7 +45,10 @@
4245
"ConformanceSuite",
4346
"LogCollector",
4447
"Point",
48+
"RichHeader",
4549
"Status",
50+
"build_dynamic_schema",
51+
"build_rich_header",
4652
"list_conformance_tests",
4753
"run_conformance",
4854
]

vgi_rpc/conformance/_impl.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
BoundingBox,
2727
ConformanceHeader,
2828
CounterState,
29+
DynamicProducerState,
2930
EmptyProducerState,
3031
ErrorAfterNState,
3132
FailOnExchangeNState,
@@ -34,9 +35,12 @@
3435
LoggingExchangeState,
3536
LoggingProducerState,
3637
Point,
38+
RichHeader,
3739
ScaleExchangeState,
3840
SingleProducerState,
3941
Status,
42+
build_dynamic_schema,
43+
build_rich_header,
4044
)
4145

4246
# Reusable schemas
@@ -314,3 +318,38 @@ def exchange_with_header(self, factor: float) -> Stream[ScaleExchangeState, Conf
314318
input_schema=_SCALE_INPUT_SCHEMA,
315319
header=header,
316320
)
321+
322+
# ------------------------------------------------------------------
323+
# Dynamic Streams With Rich Multi-Type Headers
324+
# ------------------------------------------------------------------
325+
326+
def produce_with_rich_header(self, seed: int, count: int) -> Stream[HeaderProducerState, RichHeader]:
327+
"""Produce batches with a rich multi-type header."""
328+
return Stream(
329+
output_schema=_COUNTER_SCHEMA,
330+
state=HeaderProducerState(count=count),
331+
header=build_rich_header(seed),
332+
)
333+
334+
def produce_dynamic_schema(
335+
self, seed: int, count: int, include_strings: bool, include_floats: bool
336+
) -> Stream[DynamicProducerState, RichHeader]:
337+
"""Produce batches with dynamic output schema and rich header."""
338+
return Stream(
339+
output_schema=build_dynamic_schema(include_strings, include_floats),
340+
state=DynamicProducerState(
341+
count=count,
342+
include_strings=include_strings,
343+
include_floats=include_floats,
344+
),
345+
header=build_rich_header(seed),
346+
)
347+
348+
def exchange_with_rich_header(self, seed: int, factor: float) -> Stream[ScaleExchangeState, RichHeader]:
349+
"""Exchange stream with a rich multi-type header."""
350+
return Stream(
351+
output_schema=_SCALE_OUTPUT_SCHEMA,
352+
state=ScaleExchangeState(factor=factor),
353+
input_schema=_SCALE_INPUT_SCHEMA,
354+
header=build_rich_header(seed),
355+
)

vgi_rpc/conformance/_protocol.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
BoundingBox,
2424
ConformanceHeader,
2525
Point,
26+
RichHeader,
2627
Status,
2728
)
2829

@@ -259,3 +260,44 @@ def exchange_error_on_init(self) -> Stream[StreamState]:
259260
def exchange_with_header(self, factor: float) -> Stream[StreamState, ConformanceHeader]:
260261
"""Exchange stream with a header."""
261262
...
263+
264+
# ------------------------------------------------------------------
265+
# Dynamic Streams With Rich Multi-Type Headers
266+
# ------------------------------------------------------------------
267+
268+
def produce_with_rich_header(self, seed: int, count: int) -> Stream[StreamState, RichHeader]:
269+
"""Produce batches with a rich multi-type stream header.
270+
271+
Args:
272+
seed: Determines all header field values deterministically.
273+
count: Number of {index, value} batches to produce.
274+
275+
"""
276+
...
277+
278+
def produce_dynamic_schema(
279+
self, seed: int, count: int, include_strings: bool, include_floats: bool
280+
) -> Stream[StreamState, RichHeader]:
281+
"""Produce batches with a dynamic output schema and rich header.
282+
283+
The output schema changes based on ``include_strings`` and
284+
``include_floats``. Always includes ``index: int64``.
285+
286+
Args:
287+
seed: Determines all header field values deterministically.
288+
count: Number of batches to produce.
289+
include_strings: Whether to include a ``label: utf8`` column.
290+
include_floats: Whether to include a ``score: float64`` column.
291+
292+
"""
293+
...
294+
295+
def exchange_with_rich_header(self, seed: int, factor: float) -> Stream[StreamState, RichHeader]:
296+
"""Exchange stream with a rich multi-type header.
297+
298+
Args:
299+
seed: Determines all header field values deterministically.
300+
factor: Multiplier applied to input values.
301+
302+
"""
303+
...

0 commit comments

Comments
 (0)