Skip to content

Commit fcdc77e

Browse files
Merge pull request #664 from AutomationSolutionz/multi-client-support-faster
Auto reconnect to log service, added buffering for logs for latest version
2 parents 1a0b1eb + 26b0372 commit fcdc77e

1 file changed

Lines changed: 218 additions & 48 deletions

File tree

Framework/Utilities/live_log_service.py

Lines changed: 218 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,72 @@
11
import json
2+
import ssl
23
import time
4+
from threading import Lock, Thread, Timer
5+
from urllib.parse import urlparse
6+
37
import websocket
4-
import ssl
5-
import json
6-
import os
7-
from threading import Thread
88

99

1010
# Websocket connection object that runs on a different thread.
1111
ws = None
1212
connected = False
13+
ws_url = None
14+
ws_thread = None
15+
should_run = False
16+
connection_lock = Lock()
17+
MIN_RECONNECT_DELAY_SECONDS = 3
18+
MAX_RECONNECT_DELAY_SECONDS = 300
19+
BACKOFF_MULTIPLIER = 2
20+
BUFFER_FLUSH_INTERVAL = 0.5 # seconds
21+
MAX_BUFFER_SIZE = 500
22+
BATCH_MIN_VERSION = (2, 0, 0)
23+
WARN_COLOR = "\033[33m"
24+
RESET_COLOR = "\033[0m"
25+
attempt_opened = False
26+
reconnect_delay_seconds = MIN_RECONNECT_DELAY_SECONDS
27+
server_version = None # version tuple, e.g. (2, 0, 0), set on connect
28+
_log_buffer = [] # buffered log dicts awaiting flush
29+
_buffer_lock = Lock()
30+
_flush_timer = None # type: Timer | None
31+
32+
33+
def _parse_version(version_str):
34+
"""Parse a semver-like string into a tuple of ints, e.g. '2.0.0' -> (2, 0, 0).
35+
36+
Pre-release suffixes (e.g. '2.0.0-beta.1') are stripped before parsing.
37+
"""
38+
try:
39+
core = version_str.strip().split("-")[0] # strip pre-release suffix
40+
return tuple(int(p) for p in core.split("."))
41+
except (ValueError, AttributeError):
42+
return None
43+
44+
45+
def _log(message):
46+
print(f"[live-log] {message}")
47+
48+
49+
def _warn(message):
50+
print(f"{WARN_COLOR}[live-log][warn] {message}{RESET_COLOR}")
51+
52+
53+
def _is_localhost_url(url):
54+
try:
55+
parsed = urlparse(url or "")
56+
return parsed.hostname == "localhost"
57+
except Exception:
58+
return False
59+
60+
61+
def _build_websocket(url):
62+
app = websocket.WebSocketApp(
63+
url,
64+
on_message=on_message,
65+
on_error=on_error,
66+
on_close=on_close,
67+
)
68+
app.on_open = on_open
69+
return app
1370

1471

1572
def send_file(data, ws):
@@ -18,44 +75,66 @@ def send_file(data, ws):
1875
try:
1976
if ws is None:
2077
return
21-
2278
ws.send(data, websocket.ABNF.OPCODE_BINARY)
23-
except:
24-
pass
79+
except Exception:
80+
return
2581

2682

2783
def send(msg, ws):
2884
"""Send plain text through websocket."""
2985

30-
# TODO: Make this "send" method buffered so that it only sends certain
31-
# amount of messages at certain intervals of time. Maybe send it in a queue
32-
# that a background thread reads from every 1s or so.
33-
3486
try:
3587
if ws is None:
3688
return
37-
3889
if not isinstance(msg, str):
3990
msg = json.dumps(msg)
40-
4191
ws.send(msg)
42-
except:
43-
pass
92+
except Exception:
93+
return
4494

4595

46-
def log(module_info, log_level, description):
47-
msg = {
48-
"type": "log",
49-
"msg": {
50-
"module_info": module_info,
51-
"log_level": log_level,
52-
"msg": description,
53-
},
54-
}
96+
def _schedule_flush():
97+
"""Schedule a buffer flush after BUFFER_FLUSH_INTERVAL if not already scheduled."""
98+
global _flush_timer
99+
if _flush_timer is not None:
100+
return # already scheduled
101+
_flush_timer = Timer(BUFFER_FLUSH_INTERVAL, _flush_buffer)
102+
_flush_timer.daemon = True
103+
_flush_timer.start()
104+
105+
106+
def _flush_buffer():
107+
"""Send all buffered log entries as a single batch message."""
108+
global _flush_timer
109+
with _buffer_lock:
110+
_flush_timer = None
111+
if not _log_buffer:
112+
return
113+
batch = list(_log_buffer)
114+
_log_buffer.clear()
115+
msg = {"type": "log", "msg": batch}
55116
global ws
56117
send(msg, ws)
57118

58119

120+
def log(module_info, log_level, description):
121+
entry = {
122+
"module_info": module_info,
123+
"log_level": log_level,
124+
"msg": description,
125+
}
126+
127+
if server_version is not None and server_version >= BATCH_MIN_VERSION:
128+
with _buffer_lock:
129+
if len(_log_buffer) < MAX_BUFFER_SIZE:
130+
_log_buffer.append(entry)
131+
_schedule_flush()
132+
else:
133+
msg = {"type": "log", "msg": entry}
134+
global ws
135+
send(msg, ws)
136+
137+
59138
def binary(data):
60139
global ws
61140
send_file(data, ws)
@@ -64,15 +143,41 @@ def binary(data):
64143
def close():
65144
global ws
66145
global connected
146+
global should_run
147+
global _flush_timer
148+
global server_version
149+
150+
# Prevent new logs from being buffered while we drain.
151+
server_version = None
152+
153+
# Flush any remaining buffered logs before closing.
154+
with _buffer_lock:
155+
if _flush_timer is not None:
156+
_flush_timer.cancel()
157+
_flush_timer = None
158+
_flush_buffer()
159+
67160
connected = False
68-
if ws != None:
161+
should_run = False
162+
if ws is not None:
69163
try:
70164
ws.close(status=1000, reason="Test Set run complete")
71-
except:
72-
pass
165+
except Exception:
166+
return
73167

74168

75169
def on_message(ws, message):
170+
global server_version
171+
try:
172+
data = json.loads(message)
173+
if isinstance(data, dict) and data.get("type") == "version":
174+
server_version = _parse_version(data["msg"])
175+
_log(f"Server version: {server_version}")
176+
if server_version is not None and server_version >= BATCH_MIN_VERSION:
177+
_log("Batch log buffering enabled.")
178+
return
179+
except Exception:
180+
pass
76181
print("[ws] Message:\n", message)
77182

78183

@@ -82,46 +187,111 @@ def on_error(ws, error):
82187
elif isinstance(error, OSError):
83188
# Prevent bad file descriptor error from showing
84189
return
85-
# print("[ws] Error. Connection closed\n")
190+
_log(f"Connection failed: {error}")
86191

87192

88193
def on_close(ws=None, a=None, b=None):
89194
global connected
195+
global should_run
196+
global reconnect_delay_seconds
197+
global server_version
198+
global _flush_timer
90199
connected = False
91-
# print("[ws] Connection closed.")
200+
server_version = None
201+
# Cancel any pending flush timer on disconnect.
202+
with _buffer_lock:
203+
if _flush_timer is not None:
204+
_flush_timer.cancel()
205+
_flush_timer = None
206+
_log_buffer.clear()
207+
if should_run:
208+
_log(
209+
f"Disconnected. Reconnecting in {reconnect_delay_seconds}s..."
210+
)
92211

93212

94213
def on_open(ws):
95214
global connected
215+
global attempt_opened
96216
connected = True
97-
# print("[ws] Live Log Connection established.")
217+
attempt_opened = True
218+
_log("Live log connection established.")
98219

99220

100-
def run_ws_thread(ws):
101-
try:
102-
ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE}, )
103-
except:
104-
pass
221+
def run_ws_thread():
222+
global ws
223+
global connected
224+
global should_run
225+
global ws_url
226+
global attempt_opened
227+
global reconnect_delay_seconds
228+
229+
while should_run:
230+
next_ws = None
231+
attempt_opened = False
232+
try:
233+
next_ws = _build_websocket(ws_url)
234+
with connection_lock:
235+
ws = next_ws
236+
next_ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE})
237+
except Exception as e:
238+
_log(f"Connection attempt failed: {e}")
239+
finally:
240+
connected = False
241+
with connection_lock:
242+
if ws is not None and next_ws is not None and ws is next_ws:
243+
ws = None
244+
245+
if (
246+
should_run
247+
and (not attempt_opened)
248+
and _is_localhost_url(ws_url)
249+
):
250+
_warn(
251+
"Detected 'localhost' and failed to establish live-log connection. "
252+
"If your server is in WSL, use '127.0.0.1' instead."
253+
)
254+
255+
if should_run:
256+
time.sleep(reconnect_delay_seconds)
257+
if attempt_opened:
258+
reconnect_delay_seconds = MIN_RECONNECT_DELAY_SECONDS
259+
else:
260+
reconnect_delay_seconds = min(
261+
MAX_RECONNECT_DELAY_SECONDS,
262+
reconnect_delay_seconds * BACKOFF_MULTIPLIER,
263+
)
105264

106265

107266
def connect(url):
108267
global ws
109268
global connected
269+
global ws_url
270+
global ws_thread
271+
global should_run
272+
global server_version
273+
global reconnect_delay_seconds
110274

111275
# Uncomment next line for debugging.
112276
# websocket.enableTrace(True)
113-
ws = websocket.WebSocketApp(url,
114-
on_message=on_message,
115-
on_error=on_error,
116-
on_close=on_close)
117-
118-
ws.on_open = on_open
277+
previous_url = ws_url
278+
ws_url = url
279+
should_run = True
280+
reconnect_delay_seconds = MIN_RECONNECT_DELAY_SECONDS
281+
282+
if ws_thread is not None and ws_thread.is_alive():
283+
if previous_url != url:
284+
_log(f"Live log server changed. Switching immediately to: {url}")
285+
server_version = None
286+
with connection_lock:
287+
current_ws = ws
288+
if current_ws is not None:
289+
try:
290+
current_ws.close(status=1000, reason="Switching live log server")
291+
except Exception:
292+
pass
293+
return
119294

120-
t = Thread(target=run_ws_thread, args=(ws,))
121-
t.start()
295+
ws_thread = Thread(target=run_ws_thread, daemon=True)
296+
ws_thread.start()
122297

123-
# Retry for 6s with 0.3s interval.
124-
# for _ in range(20):
125-
# if connected:
126-
# break
127-
# time.sleep(0.3)

0 commit comments

Comments
 (0)