-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathconftest.py
More file actions
86 lines (66 loc) · 2.2 KB
/
conftest.py
File metadata and controls
86 lines (66 loc) · 2.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
from datetime import datetime, timezone
from typing import Any, Literal, Mapping
import nanoid
import pytest
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter
from replit_river.error_schema import RiverError
from replit_river.rpc import (
GenericRpcHandlerBuilder,
TransportMessage,
)
# Modular fixtures
pytest_plugins = [
"tests.v1.river_fixtures.logging",
"tests.v1.river_fixtures.clientserver",
"tests.v2.fixtures.bound_client",
"tests.v2.fixtures.raw_ws_server",
]
HandlerKind = Literal["rpc", "subscription-stream", "upload-stream", "stream"]
HandlerMapping = Mapping[tuple[str, str], tuple[HandlerKind, GenericRpcHandlerBuilder]]
def transport_message(
seq: int = 0,
ack: int = 0,
streamId: str = "test_stream",
from_: str = "client",
to: str = "server",
control_flag: int = 0,
payload: Any = {},
) -> TransportMessage:
return TransportMessage(
id=str(nanoid.generate()),
from_=from_,
to=to,
streamId=streamId,
seq=seq,
ack=ack,
payload=payload,
controlFlags=control_flag,
)
def serialize_request(request: str) -> dict:
return {"data": request}
def deserialize_request(request: dict) -> str:
return request.get("data") or ""
def serialize_response(response: str) -> dict:
return {
"data": response,
"data2": datetime.now(timezone.utc),
"data-3": {"data-test": "test"},
}
def deserialize_response(response: dict) -> str:
return response.get("data") or ""
def deserialize_error(response: dict) -> RiverError:
return RiverError.model_validate(response)
@pytest.fixture(scope="session")
def span_exporter() -> InMemorySpanExporter:
exporter = InMemorySpanExporter()
processor = SimpleSpanProcessor(exporter)
provider = TracerProvider()
provider.add_span_processor(processor)
trace.set_tracer_provider(provider)
return exporter
@pytest.fixture(autouse=True)
def reset_span_exporter(span_exporter: InMemorySpanExporter) -> None:
span_exporter.clear()