forked from leanEthereum/leanSpec
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathserver.py
More file actions
195 lines (146 loc) · 5.91 KB
/
server.py
File metadata and controls
195 lines (146 loc) · 5.91 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
"""
API server for checkpoint sync, node status, and metrics endpoints.
Provides HTTP endpoints for:
- /lean/v0/states/finalized - Serve finalized checkpoint state as SSZ
- /lean/v0/checkpoints/justified - Return latest justified checkpoint information
- /lean/v0/health - Health check endpoint
- /metrics - Prometheus metrics endpoint
This matches the checkpoint sync API implemented in zeam.
"""
from __future__ import annotations
import asyncio
import logging
from collections.abc import Callable
from dataclasses import dataclass, field
from typing import TYPE_CHECKING
from aiohttp import web
from lean_spec.subspecs.metrics import generate_metrics
if TYPE_CHECKING:
from lean_spec.subspecs.forkchoice import Store
logger = logging.getLogger(__name__)
async def _handle_health(_request: web.Request) -> web.Response:
"""
Handle health check endpoint.
Response format:
- status: The status of the API server. Always return "healthy" when the API endpoint is served.
- service: The API service name. Fixed to "lean-rpc-api".
"""
return web.json_response({"status": "healthy", "service": "lean-rpc-api"})
async def _handle_metrics(_request: web.Request) -> web.Response:
"""Handle Prometheus metrics endpoint."""
return web.Response(
body=generate_metrics(),
content_type="text/plain; version=0.0.4",
charset="utf-8",
)
@dataclass(frozen=True, slots=True)
class ApiServerConfig:
"""Configuration for the API server."""
host: str = "0.0.0.0"
"""Host address to bind to."""
port: int = 5052
"""Port to listen on."""
enabled: bool = True
"""Whether the API server is enabled."""
@dataclass(slots=True)
class ApiServer:
"""
HTTP API server for checkpoint sync and node status.
Provides endpoints for:
- Checkpoint sync: Download finalized state for fast sync
- Health checks: Verify node is running
Uses aiohttp to handle HTTP protocol details efficiently.
"""
config: ApiServerConfig
"""Server configuration."""
store_getter: Callable[[], Store | None] | None = None
"""Callable that returns the current Store instance."""
_runner: web.AppRunner | None = field(default=None, init=False)
"""The aiohttp application runner."""
_site: web.TCPSite | None = field(default=None, init=False)
"""The TCP site for the server."""
@property
def store(self) -> Store | None:
"""Get the current Store instance."""
return self.store_getter() if self.store_getter else None
async def start(self) -> None:
"""Start the API server in the background."""
if not self.config.enabled:
logger.info("API server is disabled")
return
app = web.Application()
app.add_routes(
[
web.get("/lean/v0/health", _handle_health),
web.get("/metrics", _handle_metrics),
web.get("/lean/v0/states/finalized", self._handle_finalized_state),
web.get("/lean/v0/checkpoints/justified", self._handle_justified_checkpoint),
]
)
self._runner = web.AppRunner(app)
await self._runner.setup()
self._site = web.TCPSite(self._runner, self.config.host, self.config.port)
await self._site.start()
logger.info(f"API server listening on {self.config.host}:{self.config.port}")
async def run(self) -> None:
"""
Run the API server until shutdown.
This method blocks until stop() is called.
"""
await self.start()
# Keep running until stopped
while self._runner is not None:
await asyncio.sleep(1)
def stop(self) -> None:
"""Request graceful shutdown."""
if self._runner is not None:
asyncio.create_task(self._async_stop())
async def _async_stop(self) -> None:
"""Gracefully stop the server."""
if self._runner:
await self._runner.cleanup()
self._runner = None
self._site = None
logger.info("API server stopped")
async def _handle_finalized_state(self, _request: web.Request) -> web.Response:
"""
Handle finalized checkpoint state endpoint.
Serves the finalized state as SSZ binary at /lean/v0/states/finalized.
This endpoint is used for checkpoint sync - clients can download
the finalized state to bootstrap quickly instead of syncing from genesis.
"""
store = self.store
if store is None:
raise web.HTTPServiceUnavailable(reason="Store not initialized")
finalized = store.latest_finalized
if finalized.root not in store.states:
raise web.HTTPNotFound(reason="Finalized state not available")
state = store.states[finalized.root]
# Run CPU-intensive SSZ encoding in a separate thread
try:
ssz_bytes = await asyncio.to_thread(state.encode_bytes)
except Exception as e:
logger.error(f"Failed to encode state: {e}")
raise web.HTTPInternalServerError(reason="Encoding failed") from e
return web.Response(body=ssz_bytes, content_type="application/octet-stream")
async def _handle_justified_checkpoint(self, _request: web.Request) -> web.Response:
"""
Handle latest justified checkpoint endpoint.
Returns checkpoint info as JSON at /lean/v0/checkpoints/justified.
Useful for monitoring consensus progress and fork choice state.
Response format:
{
"slot": <slot_number>,
"root": "<hex_root_hash>"
}
"""
store = self.store
if store is None:
raise web.HTTPServiceUnavailable(reason="Store not initialized")
justified = store.latest_justified
return web.json_response(
{
"slot": justified.slot,
"root": "0x" + justified.root.hex(),
}
)