-
-
Notifications
You must be signed in to change notification settings - Fork 6
Expand file tree
/
Copy pathdashboard.py
More file actions
143 lines (117 loc) · 3.57 KB
/
dashboard.py
File metadata and controls
143 lines (117 loc) · 3.57 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
from collections import defaultdict, deque
from dataclasses import dataclass
import datetime
from typing import Any, Optional
import anyio
import anyio.abc
from bson import ObjectId
import fastapi
import psutil
from fastapi.staticfiles import StaticFiles
from .config import ROOT_PATH
from .web import query_per_second_statistics
from .utils import scheduler
from tianxiu2b2t.utils import runtime
class StreamNotice:
def __init__(
self
):
self.clients: defaultdict[fastapi.Request, ObjectId] = defaultdict(lambda: ObjectId('0' * 24))
self.events: defaultdict[fastapi.Request, anyio.Event] = defaultdict(anyio.Event)
def add_client(self, request: fastapi.Request) -> fastapi.responses.StreamingResponse:
return fastapi.responses.StreamingResponse(
self.stream(request),
media_type="text/event-stream",
)
async def stream(self, request: fastapi.Request):
try:
while not await request.is_disconnected():
yield ''
except:
...
async def put(self, event: str, data: Any):
...
@dataclass
class MemoryInfo:
rss: int
vms: int
class SystemInfo:
def __init__(
self
):
self.process = psutil.Process()
self.cpus: deque[float] = deque(maxlen=600)
self.memory: deque[MemoryInfo] = deque(maxlen=600)
self._running = 0
def setup(
self,
):
self._running = 1
scheduler.add_job(
self.update,
trigger="date",
next_run_time=datetime.datetime.now(),
id="systeminfo"
)
def stop(
self,
):
self._running = 0
async def update(
self,
):
while self._running:
try:
self.cpus.append(self.process.cpu_percent(interval=None))
memory = self.process.memory_full_info()
self.memory.append(MemoryInfo(
rss=memory.rss,
vms=memory.vms,
))
except:
break
await anyio.sleep(1)
def get_info(self) -> dict[str, Any]:
return {
"cpu": self.cpus[-1] if self.cpus else 0,
"memory": self.memory[-1] if self.memory else MemoryInfo(0, 0),
"sysload": psutil.getloadavg(),
"load": sum(self.cpus) / len(self.cpus) if self.cpus else 0,
"process_runtime": runtime.perf_counter(),
}
notice = StreamNotice()
systeminfo = SystemInfo()
async def setup(
app: fastapi.FastAPI,
task_group: anyio.abc.TaskGroup,
):
#if not DEBUG:
# return
systeminfo.setup()
@app.get("/favicon.ico")
def _():
return fastapi.responses.FileResponse(
ROOT_PATH / "assets" / "favicon.ico",
)
app.mount("/assets", StaticFiles(directory=ROOT_PATH / "assets"), name="assets")
@app.get("/api/receive")
async def _(request: fastapi.Request):
return notice.add_client(request)
@app.get("/api/qps")
async def _(request: fastapi.Request, interval: Optional[int] = 5):
if interval is None:
return query_per_second_statistics.get_all()
return query_per_second_statistics.merge_data(interval)
@app.get("/api/system")
async def _():
return systeminfo.get_info()
@app.get("/")
@app.get("/{page}")
@app.get("/{page}/{pages}")
def index():
return fastapi.responses.FileResponse(
ROOT_PATH / "assets" / "index.html",
)
def stop(
):
systeminfo.stop()