-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtesting_pipe.py
More file actions
89 lines (59 loc) · 2.61 KB
/
testing_pipe.py
File metadata and controls
89 lines (59 loc) · 2.61 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
"""Testing a service with ``serve_pipe()`` — no subprocess or network needed.
``serve_pipe`` runs the server on a background thread and gives you a typed
proxy. This is the fastest way to write unit tests for your RPC service.
Run::
python examples/testing_pipe.py
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import Protocol
import pyarrow as pa
from vgi_rpc import CallContext, OutputCollector, ProducerState, Stream, StreamState, serve_pipe
# ---------------------------------------------------------------------------
# 1. Define a Protocol and implementation (same as production code)
# ---------------------------------------------------------------------------
class MathService(Protocol):
"""A small service with a unary method and a producer stream."""
def add(self, a: float, b: float) -> float:
"""Add two numbers."""
...
def countdown(self, n: int) -> Stream[StreamState]:
"""Count down from *n* to 1."""
...
@dataclass
class CountdownState(ProducerState):
"""State for the countdown producer stream."""
n: int
def produce(self, out: OutputCollector, ctx: CallContext) -> None:
"""Emit one value per tick, finish when done."""
if self.n <= 0:
out.finish()
return
out.emit_pydict({"value": [self.n]})
self.n -= 1
_COUNTDOWN_SCHEMA = pa.schema([pa.field("value", pa.int64())])
class MathServiceImpl:
"""Concrete implementation of MathService."""
def add(self, a: float, b: float) -> float:
"""Add two numbers."""
return a + b
def countdown(self, n: int) -> Stream[CountdownState]:
"""Count down from *n* to 1."""
return Stream(output_schema=_COUNTDOWN_SCHEMA, state=CountdownState(n=n))
# ---------------------------------------------------------------------------
# 2. Test the service using serve_pipe()
# ---------------------------------------------------------------------------
def main() -> None:
"""Run the testing example."""
with serve_pipe(MathService, MathServiceImpl()) as svc:
# --- Unary method ---------------------------------------------------
result = svc.add(a=2.0, b=3.0)
assert result == 5.0
print(f"add(2, 3) = {result}")
# --- Producer stream ------------------------------------------------
values: list[int] = [row["value"] for batch in svc.countdown(n=3) for row in batch.batch.to_pylist()]
assert values == [3, 2, 1]
print(f"countdown(3) = {values}")
print("All assertions passed!")
if __name__ == "__main__":
main()