-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathwindow_monitor.py
More file actions
279 lines (253 loc) · 11.4 KB
/
window_monitor.py
File metadata and controls
279 lines (253 loc) · 11.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
# nuitka-project: --standalone
# nuitka-project: --onefile
# nuitka-project: --windows-console-mode=disable
# nuitka-project: --file-version=0.9
# nuitka-project: --product-version=0.9
# nuitka-project: --company-name=tcpsoft
# nuitka-project: --product-name=window_monitor
# nuitka-project: --file-description=窗口记录工具,定期记录所有窗口,并记录到json文件,当资源管理器崩溃或者系统重启,可以查看json恢复你的工作
# nuitka-project: --copyright=© 2026 tcpsoft
# nuitka-project: --output-filename=window_monitor_nogui.exe
# 编译命令: nuitka window_monitor.py
# encoding=utf-8
import os,sys,re,json,time
import psutil
import win32gui
import win32process
import pymsgbox
import traceback
# from subprocess import PIPE, Popen
# python调用命令行:https://zhuanlan.zhihu.com/p/329957363
import clr
# python调用cs: https://zhuanlan.zhihu.com/p/145617607
exe_dir = os.path.dirname(os.path.realpath(sys.argv[0]))
os.chdir(exe_dir) # 切换到文件所在目录
def add_dll_symbol(dll_filename):
sys.path.append(os.path.join(exe_dir, "VirtualDesktopDLL"))
# clr.AddReference('VirtualDesktop_v1.18')
clr.AddReference(dll_filename)
# clr.FindAssembly("ClassLibrary1.dll") # 加载c#的dll文件,该函数不能正常运行,弃用
global VirtualDesktop, Desktop # 声明全局变量
import VirtualDesktop
from VirtualDesktop import Desktop
dbg=1
return VirtualDesktop, Desktop
now_hwnd_all = []
# except_list = ['Progman',"Windows.UI.Core.CoreWindow", "ApplicationFrameWindow"]
all_history = []
max_history_length = 500
seconds_per_loop = 10
program_history_json = "_program_history.json"
program_history_backup_json = "_program_history_backup.json"
config_json = "_config.json"
config_title_replace = "_config_title_replace.json"
cfg = None
cfg_title_replace = None
log_txt = "_log.txt"
last_hwnd_all = None
def log(message):
with open(log_txt, "a", encoding="utf8") as f:
out_info = "[ "+time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + "] " + message+"\n"
print(out_info)
f.write(out_info)
log(exe_dir)
# 被调用的工具函数
def _get_all_hwnd_func(hwnd,mouse):
global now_hwnd_all, cfg_title_replace
if win32gui.IsWindow(hwnd) and win32gui.IsWindowEnabled(hwnd) and win32gui.IsWindowVisible(hwnd):
_title = win32gui.GetWindowText(hwnd)
# _class = win32gui.GetClassName(hwnd)
# title = '下载:0 KB/s, 上传:0 KB/s - BitComet(比特彗星) (64-bit) 2.19'
# condition_pattern = r'BitComet\(比特彗星\) \(64-bit\)'
# replace_old = r'下载:[^,]+, 上传:[^ ]+'
# replace_new = '下载:XX/s, 上传:XX/s'
# if re.search(condition_pattern, title):
# new_title = re.sub(replace_old, replace_new, title)
# print(new_title)
if _title!="":
for title_rule in cfg_title_replace:
# 普通字符串匹配
if title_rule["condition_string"] != "" and title_rule["condition_string"] in _title:
old_title = _title
_title = re.sub(title_rule["re_replace_old"], title_rule["re_replace_new"], _title)
dbg=1
# 正则表达式匹配
if title_rule["re_condition_pattern"] != "" and re.search(title_rule["re_condition_pattern"], _title):
old_title = _title
_title = re.sub(title_rule["re_replace_old"], title_rule["re_replace_new"], _title)
dbg=1
# # proc = Popen('VirtualDesktop.exe "gdfwh:'+_title+'"',stdin=None,stdout=PIPE,stderr=PIPE,shell=True)
# proc2 = Popen('VirtualDesktop.exe "gdfwh:'+str(hwnd)+'"',stdin=None,stdout=PIPE,stderr=PIPE,shell=True)
# infoout2, infoerr2 = proc2.communicate()
# infoout2=infoout2.decode("gbk")
# infoerr2=infoerr2.decode("gbk")
# # if infoerr != "":
# if infoerr2 != "":
# dbg=1
# else:
# # (desktop1, desktopname1) = re.search(r"is on desktop number (.*) \(desktop \'(.*)'\)", infoout).groups()
# (desktop2, desktopname2) = re.search(r"is on desktop number (.*) \(desktop \'(.*)'\)", infoout2).groups()
# thread, processId = win32process.GetWindowThreadProcessId(hwnd)
# exename = psutil.Process(processId).name()
# # hwnd_title.append([desktop2, desktopname2, hwnd, thread, processId, exename, _title])
# now_hwnd_all["arr"].append((desktop2, desktopname2, processId, hwnd, exename, _title))
try:
rc = VirtualDesktop.Desktop.FromDesktop(VirtualDesktop.Desktop.FromWindow(hwnd))
except Exception as e:
# print("捕获到异常:", type(e)) # 打印异常类型
# print("异常信息:", e) # 打印异常信息
return
desktop3, desktopname3 = str(rc), VirtualDesktop.Desktop.DesktopNameFromIndex(rc)
thread, processId = win32process.GetWindowThreadProcessId(hwnd)
exename = psutil.Process(processId).name()
now_hwnd_all["arr"].append((desktop3, desktopname3, processId, hwnd, exename, _title))
def update_hwnd_arr(return_value=False):
global now_hwnd_all
_timestamp = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
now_hwnd_all = {"timestamp": int(time.time()),
"time": _timestamp,
"note": "桌面id, 桌面名称, 进程id, 句柄id, exe名, 窗口标题",
"count": 0,
"arr": []}
desktop_index = 0
process_index = 2
hwnd_index = 3
exe_index = 4
win32gui.EnumWindows(_get_all_hwnd_func, 0)
now_hwnd_all["arr"].sort(key=lambda x: (x[desktop_index],x[exe_index],x[hwnd_index]))
now_hwnd_all["count"] = len(now_hwnd_all["arr"])
if now_hwnd_all["count"] == 0:
msgbox_text = "window_monitor 检测到窗口列表为空\n\n"+\
"1.可能是DLL不适配,建议阅读README.md尝试修改_config.json中的DLL名\n"+\
"2.可能是资源管理器重启过,建议重启本程序"
pymsgbox.alert(msgbox_text, "window_monitor")
if return_value:
return now_hwnd_all
def get_all_pid(): # 输出全部,但太多了,弃用
# pids = psutil.pids()
# for pid in pids:
# p = psutil.Process(pid)
# print("pid-%d,pname-%s" %(pid,p.name()))
pass
# 可选择性使用
def show_all():
update_hwnd_arr()
print("index, desktopname, processId, hwnd, exename, _title")
for i in now_hwnd_all["arr"]:
print(i)
# 可选择性使用
def get_exe_wind(exe_name=None): # None的话就输出全部
global now_hwnd_all
update_hwnd_arr()
new_wind = []
exe_index = 4
print("index, desktopname, processId, hwnd, exename, _title")
for i in now_hwnd_all["arr"]:
if exe_name is None or i[exe_index] == exe_name:
print(i)
new_wind.append(i)
return new_wind
# get_all_pid() # 弃用
# get_exe_wind("explorer.exe")
# get_exe_wind()
# show_all()
def get_json(json_filename):
# 检查json文件有没有
if os.path.exists(json_filename):
with open(json_filename, "r", encoding="utf8") as f:
json_text = f.read()
if json_text != "":
try:
json_obj = json.loads(json_text)
except Exception as e:
pymsgbox.alert(f"window_monitor 打开json失败:{json_filename}\n"
"建议检查json文件然后重新运行本程序\n"
"(注:通常是资源管理器卡死或重启或程序异常中止导致的写入异常,需将两份历史记录手动同步)", "window_monitor")
# raise
exit()
else:
print(f"{json_filename}文件为空,get_json读取为[]")
json_obj = []
return json_obj
else:
return []
def write_obj_to_json(obj, out_json_filename, indent=4, end=""):
# 输出到文件
with open(out_json_filename, "w", encoding="utf8") as f:
_dumps = json.dumps(obj, indent=indent, ensure_ascii=False)
_dumps = _dumps.replace("\n "," ")
_dumps = _dumps.replace("\n ]"," ]")
f.write(_dumps)
f.write(end)
def get_last_hwnd_all_from_json():
global all_history
read_history1 = get_json(program_history_json)
read_history2 = get_json(program_history_backup_json)
if len(read_history1) >= len(read_history2):
all_history = read_history1
else:
all_history = read_history2
if len(all_history) > 0:
return all_history[0]
else:
return None
def save_last_hwnd_all_to_history(a_last):
all_history.insert(0, a_last)
if len(all_history) > max_history_length: # 清除第一次,避免长度超过 max_history_length
all_history.pop()
if len(all_history) > max_history_length: # 清除第二次,确认避免长度超过 max_history_length
all_history.pop()
all_history.pop()
write_obj_to_json(all_history, program_history_json)
# 保留10000条记录,每3秒记录一次,共约8.3小时
def program_main():
global last_hwnd_all, max_history_length, seconds_per_loop
global cfg, cfg_title_replace
cfg = get_json(config_json)
cfg_title_replace = get_json(config_title_replace)
max_history_length = cfg["max_history_length"]
seconds_per_loop = cfg["seconds_per_loop"]
virtualdesktop_dll_name = cfg["VirtualDesktop_DLL_name"]
add_dll_symbol(virtualdesktop_dll_name)
log("virtualdesktop_dll_name: " + virtualdesktop_dll_name)
log(f"最长历史纪录 {max_history_length} ,正在每 {seconds_per_loop} 秒一次监视所有窗口状态...")
if last_hwnd_all is None: # 刚运行程序
last_hwnd_all = get_last_hwnd_all_from_json()
if last_hwnd_all is None: # 没有历史json,就获取一个当前的值
update_hwnd_arr()
last_hwnd_all = now_hwnd_all
save_last_hwnd_all_to_history(last_hwnd_all)
while True:
# 再次获取窗口,然后和last比对,如果不同则
# log("updating...")
update_hwnd_arr()
if last_hwnd_all["arr"] != now_hwnd_all["arr"]:
last_hwnd_all = now_hwnd_all
log("saving...")
save_last_hwnd_all_to_history(last_hwnd_all)
dbg=1
# log("sleeping...")
time.sleep(seconds_per_loop/2)
write_obj_to_json(all_history, program_history_backup_json)
time.sleep(seconds_per_loop/2)
# os.system("pause")
dbg=1
def init_load_cfg():
global last_hwnd_all, max_history_length, seconds_per_loop
global cfg, cfg_title_replace
cfg = get_json(config_json)
cfg_title_replace = get_json(config_title_replace)
init_load_cfg() # 避免 restore_windows.py 导入时,cfg等变量未定义导致的错误
if __name__ == "__main__":
is_debugging = sys.gettrace()
if is_debugging: # vscode拉起调试中,暴露错误信息
program_main()
else: # 没有在调试,打印错误信息
try:
program_main()
except Exception as e:
error_str = traceback.format_exc()
log(f"{type(e).__name__}: {str(e)}\n{error_str}")
pymsgbox.alert(error_str, "window_monitor: 发生错误")
dbg = 1
a=1