-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathhttp_server.py
More file actions
113 lines (79 loc) · 2.8 KB
/
http_server.py
File metadata and controls
113 lines (79 loc) · 2.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
"""HTTP server example using Falcon (WSGI) and waitress.
Requires the HTTP extra: ``pip install vgi-rpc[http]``
Start the server::
python examples/http_server.py
Then run the client in another terminal::
python examples/http_client.py
"""
from __future__ import annotations
import socket
import sys
from dataclasses import dataclass
from typing import Protocol
import pyarrow as pa
import waitress
from vgi_rpc import (
CallContext,
Level,
OutputCollector,
ProducerState,
RpcServer,
Stream,
StreamState,
)
from vgi_rpc.http import make_wsgi_app
PORT = 8234
# ---------------------------------------------------------------------------
# Service definition
# ---------------------------------------------------------------------------
@dataclass
class FibState(ProducerState):
"""Produce Fibonacci numbers up to *limit*."""
limit: int
a: int = 0
b: int = 1
def produce(self, out: OutputCollector, ctx: CallContext) -> None:
"""Emit the next Fibonacci number."""
if self.a > self.limit:
out.finish()
return
out.emit_pydict({"fib": [self.a]})
self.a, self.b = self.b, self.a + self.b
class DemoService(Protocol):
"""Demonstration HTTP service."""
def echo(self, message: str) -> str:
"""Echo a message back."""
...
def fibonacci(self, limit: int) -> Stream[StreamState]:
"""Stream Fibonacci numbers up to *limit*."""
...
class DemoServiceImpl:
"""Concrete implementation of DemoService."""
def echo(self, message: str, ctx: CallContext | None = None) -> str:
"""Echo a message back, with optional server-side logging."""
if ctx:
ctx.client_log(Level.INFO, f"Echoing: {message}")
return message
def fibonacci(self, limit: int) -> Stream[FibState]:
"""Stream Fibonacci numbers up to *limit*."""
schema = pa.schema([pa.field("fib", pa.int64())])
return Stream(output_schema=schema, state=FibState(limit=limit))
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def _find_free_port() -> int:
"""Find a free TCP port on localhost."""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(("127.0.0.1", 0))
return int(s.getsockname()[1])
def main() -> None:
"""Start the HTTP server."""
port = int(sys.argv[1]) if len(sys.argv) > 1 else PORT
if port == 0:
port = _find_free_port()
server = RpcServer(DemoService, DemoServiceImpl())
app = make_wsgi_app(server)
print(f"Serving DemoService on http://127.0.0.1:{port}", flush=True)
waitress.serve(app, host="127.0.0.1", port=port, _quiet=True)
if __name__ == "__main__":
main()