-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathwatchdog_fightCount.py
More file actions
405 lines (342 loc) · 15.1 KB
/
watchdog_fightCount.py
File metadata and controls
405 lines (342 loc) · 15.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
import configparser
import datetime
import logging
import os
import queue
import threading
import tempfile
import time
import zipfile
from collections import defaultdict, Counter
from typing import Dict, List, Optional, Tuple
import requests
import parser
import gw2_data
from watchdog.events import FileSystemEventHandler
from watchdog.observers.polling import PollingObserver
#EXE Icon attribution: https://www.flaticon.com/authors/abdul-aziz
# --- Logging setup ---
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
)
logger = logging.getLogger(__name__)
MAX_WAIT_TIME = 45 # seconds to wait for file completion
LOG_QUEUE = queue.Queue()
PROCESSED = set() # deduplication guard
# --- File event handler ---
class MyHandler(FileSystemEventHandler):
def on_created(self, event):
self.handle_file_event(event)
def on_modified(self, event):
self.handle_file_event(event)
def handle_file_event(self, event):
if not event.is_directory and event.src_path.endswith((".evtc", ".zevtc")):
if event.src_path not in PROCESSED: # prevent duplicates
LOG_QUEUE.put(event.src_path)
PROCESSED.add(event.src_path)
logger.info(
"Queued file for processing: %s (queue size: %d)",
event.src_path,
LOG_QUEUE.qsize(),
)
# --- Worker thread ---
def log_worker():
while True:
log_file = LOG_QUEUE.get() # blocking wait
if log_file is None: # shutdown signal
break
start_time = datetime.datetime.now()
_, file_ext = os.path.splitext(log_file)
try:
wait_for_file_completion(log_file, file_ext, start_time)
except Exception as e:
logger.exception("Error handling %s: %s", log_file, e)
LOG_QUEUE.task_done()
logger.info(
"Finished processing %s (queue size: %d)",
log_file,
LOG_QUEUE.qsize(),
)
def wait_for_file_completion(file_path: str, file_ext: str, start_time: float) -> None:
"""
Waits until a newly created log file stops changing before processing it.
"""
# Dynamic scaling based on file size
def estimate_wait_time(size_bytes: int) -> int:
size_factor = 1 + (size_bytes // 5_000_000)
return size_factor * 60 + 60 # seconds
# Initial parameters
check_interval = 0.5 # check every 500 ms
last_modified = 0
stable_count = 0
max_retries = 200
# Track dynamic wait time extension for large files
base_wait_time = 100 # seconds
last_size = 0
logger.info("Monitoring %s for completion...", file_path)
# Check for existence first
while not os.path.exists(file_path):
time.sleep(1)
logger.debug("Waiting for file to appear: %s", file_path)
# Determine an adaptive maximum wait time
try:
size_now = os.path.getsize(file_path)
max_wait_time = max(base_wait_time, estimate_wait_time(size_now))
except Exception:
max_wait_time = base_wait_time
start_wait = time.time()
while time.time() - start_wait < max_wait_time:
try:
if not os.path.exists(file_path):
logger.warning("File disappeared before completion: %s", file_path)
return
current_mod = os.path.getmtime(file_path)
current_size = os.path.getsize(file_path)
# Check if file has stabilized
if current_mod == last_modified and current_size == last_size and current_size > 0:
stable_count += 1
if stable_count >= 4: #43 consecutive stable checks = stable for 2.0s
logger.info("File appears complete: %s", file_path)
process_new_log(file_path, file_ext, start_time)
return
else:
stable_count = 0
last_modified = current_mod
last_size = current_size
except (OSError, PermissionError):
logger.debug("File %s not yet accessible, waiting...", file_path)
time.sleep(check_interval)
logger.warning("Timeout waiting for %s to become stable.", file_path)
# --- Agent utilities ---
def set_team_changes(agents: List, events: List) -> None:
"""Assign teams to agents based on event statechanges."""
team_assignments: Dict[int, int] = {}
for event in events:
if event.is_statechange == 22 and event.src_agent:
assigned_team = event.dst_agent if event.dst_agent else event.value
if assigned_team != 0:
team_assignments[event.src_agent] = assigned_team
for agent in agents:
if agent.is_elite != 4294967295 and not agent.team:
assigned_team = team_assignments.get(agent.address)
if assigned_team in gw2_data.team_ids:
agent.team = gw2_data.team_ids[assigned_team]
def set_agent_instance_id(agents: List, events: List) -> None:
"""Assign first seen instance IDs to agents."""
instance_ids: Dict[int, int] = {}
for event in events:
if event.is_statechange != 22 and event.src_instid and event.src_agent:
if event.src_agent not in instance_ids:
instance_ids[event.src_agent] = event.src_instid
for agent in agents:
if agent.is_elite != 4294967295 and not agent.instid:
instid = instance_ids.get(agent.address)
if instid:
agent.instid = instid
def summarize_non_squad_players(
agents: List,
) -> Tuple[int, Dict[int, Counter], Dict[str, Counter], Optional[int]]:
"""
Summarize squad and non-squad players.
Returns: (squad_count, non_squad_summary, squad_comp, squad_color)
"""
non_squad_summary: Dict[int, Counter] = defaultdict(Counter)
squad_comp: Dict[str, Counter] = defaultdict(Counter)
squad_id: set[int] = set()
duplicate_check: set[int] = set()
squad_color: Optional[int] = None
squad_count = 0
for agent in agents:
if agent.is_elite == 4294967295 or agent.instid is None or agent.team is None:
continue
if ":" in agent.name: # Squad
if agent.instid not in squad_id:
squad_id.add(agent.instid)
squad_count += 1
agent_prof = gw2_data.elites.get(
agent.is_elite, gw2_data.profs[agent.profession]
)
squad_comp["Squad"][agent_prof] += 1
if squad_color is None:
squad_color = agent.team
elif agent.instid not in duplicate_check:
duplicate_check.add(agent.instid)
agent_prof = gw2_data.elites.get(
agent.is_elite, gw2_data.profs[agent.profession]
)
non_squad_summary[agent.team][agent_prof] += 1
return squad_count, non_squad_summary, squad_comp, squad_color
# --- Discord integration ---
def send_to_discord(
webhook_url: str,
file_path: str,
summary: Dict,
squad_count: int,
squad_comp: Dict,
squad_color: Optional[int],
) -> None:
"""Send analysis results to Discord via webhook."""
DISCORD_EMOJI = {"Red": ":red_square:", "Green": ":green_square:", "Blue": ":blue_square:"}
if not summary:
payload = {"content": f"No valid data to analyze in {os.path.basename(file_path)}"}
else:
embed = {
"title": f"Player counts for fight: {os.path.basename(file_path)}",
"color": 5793266, # Blurple
"fields": [],
"author": {
"icon_url": "https://wiki.guildwars2.com/images/c/cb/Commander_tag_%28purple%29.png",
"name": "Fight Watchdog",
"url": "https://github.com/Drevarr/EVTC_parser",
},
"footer": {"text": "Drevarr's Fight Log Monitor"},
"timestamp": datetime.datetime.now(datetime.UTC).isoformat(),
}
# Team reports
for team in sorted(summary.keys()):
team_report = "|"
team_count = sum(summary[team].values())
sorted_team = dict(
sorted(summary[team].items(), key=lambda item: item[1], reverse=True)
)
team_emoji = DISCORD_EMOJI.get(team, "")
for prof in sorted_team:
team_report += f" {gw2_data.prof_abbrv[prof]}: {sorted_team[prof]} |"
embed["fields"].append(
{
"name": f"{team_emoji} Team {'Allies' if team == squad_color else team}: {team_count}",
"value": team_report,
"inline": False,
}
)
# Squad info
embed["fields"].append(
{"name": ":pink_heart: Squad Count:", "value": f"{squad_count} squad members", "inline": False}
)
squad_report = "|" + " ".join(
f"{gw2_data.prof_abbrv[prof]}: {squad_comp['Squad'][prof]} |" for prof in squad_comp["Squad"]
)
embed["fields"].append(
{"name": "Squad Composition:", "value": squad_report, "inline": False}
)
payload = {"embeds": [embed]}
try:
response = requests.post(webhook_url, json=payload, timeout=10)
response.raise_for_status()
logger.info("Successfully sent analysis to Discord for %s", file_path)
except Exception as e:
logger.error("Error sending to Discord: %s", e)
# --- Log processing ---
def process_new_log(log_file: str, file_ext: str, start_time: datetime.datetime) -> None:
logger.info("Starting processing of %s", log_file)
agents, skills, events, header = [], [], [], None
try:
if file_ext.lower() == ".zevtc":
logger.info("Processing .zevtc file: %s", log_file)
with tempfile.TemporaryDirectory() as temp_dir:
with zipfile.ZipFile(log_file, "r") as zip_ref:
zip_ref.extractall(temp_dir)
for extracted_file in os.listdir(temp_dir):
extracted_path = os.path.join(temp_dir, extracted_file)
logger.debug("Extracted file: %s", extracted_path)
last_size = -1
start_wait = time.time()
while time.time() - start_wait < MAX_WAIT_TIME:
try:
current_size = os.path.getsize(extracted_path)
if current_size == last_size and current_size > 0:
logger.info("Parsing extracted file: %s", extracted_path)
header, agents, skills, events = parser.parse_evtc(extracted_path)
break
last_size = current_size
except (IOError, PermissionError):
logger.debug("File %s not yet accessible", extracted_path)
time.sleep(1)
else:
logger.warning("Timeout waiting for %s to complete.", extracted_path)
return
elif file_ext.lower() == ".evtc":
logger.info("Processing .evtc file: %s", log_file)
if os.path.getsize(log_file) == 0:
logger.error("Error: %s is empty", log_file)
return
with open(log_file, "rb") as f:
header_bytes = f.read(12)
if not header_bytes.startswith(b"EVTC"):
logger.error("Error: %s is not a valid EVTC file", log_file)
return
header, agents, skills, events = parser.parse_evtc(log_file)
if not all([header, agents, skills, events]):
logger.error("Error: Incomplete data from parser for %s", log_file)
return
logger.info("Parsed %s: %d agents, %d skills, %d events", log_file, len(agents), len(skills), len(events))
except zipfile.BadZipFile as e:
logger.error("Failed to extract %s: %s", log_file, e)
return
except Exception as e:
logger.exception("Error processing %s: %s", log_file, e)
return
logger.info("Setting team changes for %d agents", len(agents))
set_team_changes(agents, events)
logger.info("Setting agent instance IDs")
set_agent_instance_id(agents, events)
logger.info("Summarizing non-squad players")
squad_count, team_report, squad_comp, squad_color = summarize_non_squad_players(agents)
logger.info("Squad players: %d", squad_count)
end_time = datetime.datetime.now()
logger.info("File %s processed, %d agents, %d skills, %d events", log_file, len(agents), len(skills), len(events))
logger.info("Processing Time: %s", end_time - start_time)
if WEBHOOK_URL:
logger.info("Sending to Discord webhook: %s", WEBHOOK_URL)
send_to_discord(WEBHOOK_URL, log_file, team_report, squad_count, squad_comp, squad_color)
else:
logger.warning("No WEBHOOK_URL configured, skipping Discord send")
print("\n===== Log Summary =====")
print(f"File: {os.path.basename(log_file)}")
print(f"Squad members: {squad_count}")
print("Squad composition:")
squad_comp_line = ""
for prof, count in squad_comp["Squad"].items():
squad_comp_line += f"{gw2_data.prof_abbrv[prof]}: {count}, "
print(f" {squad_comp_line.rstrip(', ')}")
if not team_report:
print("No non-squad players found.")
else:
for team, counter in team_report.items():
team_count = sum(counter.values())
team_name = "Allies" if team == squad_color else f"Team {team}"
print(f"\n{team_name} ({team_count} players):")
prof_count_line = ""
for prof, count in counter.items():
prof_count_line += f"{gw2_data.prof_abbrv[prof]}: {count}, "
print(f" {team_name} Comp: {prof_count_line.rstrip(', ')}")
print("========================\n")
parser.free_evtc_data(header, agents, skills, events)
# --- Main entry ---
if __name__ == "__main__":
config_ini = configparser.ConfigParser()
config_ini.read("config.ini")
ARCDPS_LOG_DIR = config_ini["Settings"]["ARCDPS_LOG_DIR"]
if not os.path.isdir(ARCDPS_LOG_DIR):
raise ValueError(f"Directory {ARCDPS_LOG_DIR} does not exist or is not a directory")
if not os.access(ARCDPS_LOG_DIR, os.R_OK):
raise ValueError(f"No read permission for directory {ARCDPS_LOG_DIR}")
LOG_DELAY = int(config_ini["Settings"].get("LOG_DELAY", 5))
WEBHOOK_URL = config_ini["Settings"]["WEBHOOK_URL"]
# Start worker thread
worker = threading.Thread(target=log_worker, daemon=True)
worker.start()
logger.info("Watching for new ArcDps logs in %s", ARCDPS_LOG_DIR)
event_handler = MyHandler()
observer = PollingObserver() # PollingObserver is more reliable cross-platform
observer.schedule(event_handler, ARCDPS_LOG_DIR, recursive=True)
observer.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
LOG_QUEUE.put(None) # signal worker to stop
worker.join()
observer.join()