-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtesting_http.py
More file actions
149 lines (108 loc) · 4.33 KB
/
testing_http.py
File metadata and controls
149 lines (108 loc) · 4.33 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
"""Testing the HTTP transport without a running server.
``make_sync_client`` wraps a Falcon ``TestClient`` so you can exercise the
full HTTP transport stack (authentication, serialization, streaming) in-process
with zero network I/O.
Requires ``pip install vgi-rpc[http]``
Run::
python examples/testing_http.py
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import Protocol
import falcon
import pyarrow as pa
from vgi_rpc import (
AuthContext,
CallContext,
OutputCollector,
ProducerState,
RpcServer,
Stream,
StreamState,
serve_pipe,
)
from vgi_rpc.http import http_connect, make_sync_client
# ---------------------------------------------------------------------------
# 1. Define a Protocol and implementation
# ---------------------------------------------------------------------------
class DemoService(Protocol):
"""Service used for HTTP testing examples."""
def greet(self, name: str) -> str:
"""Greet by name."""
...
def whoami(self) -> str:
"""Return the caller's identity."""
...
def countdown(self, n: int) -> Stream[StreamState]:
"""Count down from *n* to 1."""
...
@dataclass
class CountdownState(ProducerState):
"""State for the countdown producer stream."""
n: int
def produce(self, out: OutputCollector, ctx: CallContext) -> None:
"""Emit one value per tick, finish when done."""
if self.n <= 0:
out.finish()
return
out.emit_pydict({"value": [self.n]})
self.n -= 1
_COUNTDOWN_SCHEMA = pa.schema([pa.field("value", pa.int64())])
class DemoServiceImpl:
"""Concrete implementation of DemoService."""
def greet(self, name: str) -> str:
"""Greet by name."""
return f"Hello, {name}!"
def whoami(self, ctx: CallContext) -> str:
"""Return the caller's identity."""
return ctx.auth.principal or "anonymous"
def countdown(self, n: int) -> Stream[CountdownState]:
"""Count down from *n* to 1."""
return Stream(output_schema=_COUNTDOWN_SCHEMA, state=CountdownState(n=n))
# ---------------------------------------------------------------------------
# 2. Helper: create an authenticate callback
# ---------------------------------------------------------------------------
def _authenticate(req: falcon.Request) -> AuthContext:
"""Authenticate requests using a simple bearer token."""
token = req.get_header("Authorization") or ""
if token == "Bearer test-token":
return AuthContext(domain="bearer", authenticated=True, principal="alice")
return AuthContext.anonymous()
# ---------------------------------------------------------------------------
# 3. Test with make_sync_client (HTTP transport, no real server)
# ---------------------------------------------------------------------------
def main() -> None:
"""Run the HTTP testing examples."""
server = RpcServer(DemoService, DemoServiceImpl())
# --- Basic HTTP testing -------------------------------------------------
client = make_sync_client(server)
with http_connect(DemoService, client=client) as svc:
result = svc.greet(name="World")
assert result == "Hello, World!"
print(f"greet('World') = {result}")
# Anonymous caller
who = svc.whoami()
assert who == "anonymous"
print(f"whoami() = {who}")
# Producer stream over HTTP
values: list[int] = [row["value"] for batch in svc.countdown(n=3) for row in batch.batch.to_pylist()]
assert values == [3, 2, 1]
print(f"countdown(3) = {values}")
# --- Authenticated HTTP testing -----------------------------------------
client = make_sync_client(
server,
authenticate=_authenticate,
default_headers={"Authorization": "Bearer test-token"},
)
with http_connect(DemoService, client=client) as svc:
who = svc.whoami()
assert who == "alice"
print(f"whoami() [authenticated] = {who}")
# --- Pipe transport for comparison --------------------------------------
with serve_pipe(DemoService, DemoServiceImpl()) as svc:
result = svc.greet(name="Pipe")
assert result == "Hello, Pipe!"
print(f"greet('Pipe') [pipe] = {result}")
print("All assertions passed!")
if __name__ == "__main__":
main()