-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathshared_memory.py
More file actions
90 lines (65 loc) · 2.65 KB
/
shared_memory.py
File metadata and controls
90 lines (65 loc) · 2.65 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
"""Zero-copy shared memory transport with ``ShmPipeTransport``.
Demonstrates how to set up a shared memory segment and wrap pipe
transports in ``ShmPipeTransport`` for zero-copy Arrow batch transfer
between co-located processes (or threads).
Run::
python examples/shared_memory.py
"""
from __future__ import annotations
import contextlib
import threading
from typing import Protocol, cast
from vgi_rpc import RpcServer, ShmPipeTransport, make_pipe_pair
from vgi_rpc.rpc import _RpcProxy
from vgi_rpc.shm import ShmSegment
# ---------------------------------------------------------------------------
# 1. Define a Protocol and implementation
# ---------------------------------------------------------------------------
class MathService(Protocol):
"""A simple math service."""
def add(self, a: float, b: float) -> float:
"""Add two numbers."""
...
def multiply(self, a: float, b: float) -> float:
"""Multiply two numbers."""
...
class MathServiceImpl:
"""Concrete implementation of MathService."""
def add(self, a: float, b: float) -> float:
"""Add two numbers."""
return a + b
def multiply(self, a: float, b: float) -> float:
"""Multiply two numbers."""
return a * b
# ---------------------------------------------------------------------------
# 2. Run the example with shared memory transport
# ---------------------------------------------------------------------------
def main() -> None:
"""Create a shared memory segment, wrap pipes, and call methods."""
# Allocate a 4 MB shared memory segment
shm = ShmSegment.create(4 * 1024 * 1024)
try:
# Create pipe pair and wrap in ShmPipeTransport
client_pipe, server_pipe = make_pipe_pair()
client_transport = ShmPipeTransport(client_pipe, shm)
server_transport = ShmPipeTransport(server_pipe, shm)
# Start server on a daemon thread
server = RpcServer(MathService, MathServiceImpl())
thread = threading.Thread(target=server.serve, args=(server_transport,), daemon=True)
thread.start()
try:
# Create a typed proxy and call methods
svc = cast(MathService, _RpcProxy(MathService, client_transport))
result_add = svc.add(a=2.5, b=3.5)
print(f"add(2.5, 3.5) = {result_add}")
result_mul = svc.multiply(a=4.0, b=5.0)
print(f"multiply(4.0, 5.0) = {result_mul}")
finally:
client_transport.close()
thread.join(timeout=5)
finally:
shm.unlink()
with contextlib.suppress(BufferError):
shm.close()
if __name__ == "__main__":
main()