-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathclientserver.py
More file actions
75 lines (62 loc) · 2.11 KB
/
clientserver.py
File metadata and controls
75 lines (62 loc) · 2.11 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
import asyncio
import logging
from typing import AsyncGenerator, Literal
import pytest
from websockets.server import serve
from replit_river.client import Client
from replit_river.client_transport import UriAndMetadata
from replit_river.server import Server
from replit_river.transport_options import TransportOptions
from tests.conftest import HandlerMapping
from tests.river_fixtures.logging import NoErrors # noqa: E402
@pytest.fixture
def transport_options() -> TransportOptions:
return TransportOptions()
@pytest.fixture
def server_handlers(handlers: HandlerMapping) -> HandlerMapping:
return handlers
@pytest.fixture
def server(
transport_options: TransportOptions, server_handlers: HandlerMapping
) -> Server:
server = Server(server_id="test_server", transport_options=transport_options)
server.add_rpc_handlers(server_handlers)
return server
@pytest.fixture
async def client(
server: Server,
transport_options: TransportOptions,
no_logging_error: NoErrors,
) -> AsyncGenerator[Client, None]:
binding = None
try:
binding = await serve(server.serve, "127.0.0.1")
sockets = list(binding.sockets)
assert len(sockets) == 1, "Too many sockets!"
socket = sockets[0]
async def websocket_uri_factory() -> UriAndMetadata[None]:
return {
"uri": "ws://%s:%d" % socket.getsockname(),
"metadata": None,
}
client: Client[Literal[None]] = Client[None](
uri_and_metadata_factory=websocket_uri_factory,
client_id="test_client",
server_id="test_server",
transport_options=transport_options,
)
try:
yield client
finally:
logging.debug("Start closing test client : %s", "test_client")
await client.close()
finally:
await asyncio.sleep(1)
logging.debug("Start closing test server")
if binding:
binding.close()
await server.close()
if binding:
await binding.wait_closed()
# Server should close normally
no_logging_error()