-
-
Notifications
You must be signed in to change notification settings - Fork 112
Expand file tree
/
Copy pathbase.py
More file actions
356 lines (295 loc) · 12 KB
/
base.py
File metadata and controls
356 lines (295 loc) · 12 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
"""This is part of the MSS Python's module.
Source: https://github.com/BoboTiG/python-mss.
"""
from __future__ import annotations
from abc import ABCMeta, abstractmethod
from datetime import datetime
from threading import Lock
from typing import TYPE_CHECKING, Any
from mss.exception import ScreenShotError
from mss.screenshot import ScreenShot
from mss.tools import to_png
if TYPE_CHECKING: # pragma: nocover
from collections.abc import Callable, Iterator
from mss.models import Monitor, Monitors, Window, Windows
try:
from datetime import UTC
except ImportError: # pragma: nocover
# Python < 3.11
from datetime import timezone
UTC = timezone.utc
lock = Lock()
OPAQUE = 255
class MSSBase(metaclass=ABCMeta):
"""This class will be overloaded by a system specific one."""
__slots__ = {"_monitors", "_windows", "cls_image", "compression_level", "with_cursor"}
def __init__(
self,
/,
*,
compression_level: int = 6,
with_cursor: bool = False,
# Linux only
display: bytes | str | None = None, # noqa: ARG002
# Mac only
max_displays: int = 32, # noqa: ARG002
) -> None:
self.cls_image: type[ScreenShot] = ScreenShot
self.compression_level = compression_level
self.with_cursor = with_cursor
self._monitors: Monitors = []
self._windows: Windows = []
def __enter__(self) -> MSSBase: # noqa:PYI034
"""For the cool call `with MSS() as mss:`."""
return self
def __exit__(self, *_: object) -> None:
"""For the cool call `with MSS() as mss:`."""
self.close()
@abstractmethod
def _cursor_impl(self) -> ScreenShot | None:
"""Retrieve all cursor data. Pixels have to be RGB."""
@abstractmethod
def _grab_impl(self, monitor: Monitor, /) -> ScreenShot:
"""Retrieve all pixels from a monitor. Pixels have to be RGB.
That method has to be run using a threading lock.
"""
@abstractmethod
def _grab_window_impl(self, window: Window, /) -> ScreenShot:
"""Retrieve all pixels from a window. Pixels have to be RGB.
That method has to be run using a threading lock.
"""
@abstractmethod
def _monitors_impl(self) -> None:
"""Get positions of monitors (has to be run using a threading lock).
It must populate self._monitors.
"""
@abstractmethod
def _windows_impl(self) -> None:
"""Get ids of windows (has to be run using a threading lock).
It must populate self._windows.
"""
def close(self) -> None: # noqa:B027
"""Clean-up."""
def grab(self, monitor: Monitor | tuple[int, int, int, int], /) -> ScreenShot:
"""Retrieve screen pixels for a given monitor.
Note: *monitor* can be a tuple like the one PIL.Image.grab() accepts.
:param monitor: The coordinates and size of the box to capture.
See :meth:`monitors <monitors>` for object details.
:return :class:`ScreenShot <ScreenShot>`.
"""
# Convert PIL bbox style
if isinstance(monitor, tuple):
monitor = {
"left": monitor[0],
"top": monitor[1],
"width": monitor[2] - monitor[0],
"height": monitor[3] - monitor[1],
}
with lock:
screenshot = self._grab_impl(monitor)
if self.with_cursor and (cursor := self._cursor_impl()):
return self._merge(screenshot, cursor)
return screenshot
def grab_window(
self, window: Window | str | None = None, /, *, name: str | None = None, process: str | None = None
) -> ScreenShot:
"""Retrieve screen pixels for a given window.
:param window: The window to capture or its name.
See :meth:`windows <windows>` for object details.
:param str name: The window name.
:param str process: The window process name.
:return :class:`ScreenShot <ScreenShot>`.
"""
if isinstance(window, str):
name = window
window = None
if window is None:
windows = self.find_windows(name, process)
if not windows:
msg = f"Window {window!r} not found."
raise ScreenShotError(msg)
window = windows[0]
with lock:
return self._grab_window_impl(window)
@property
def monitors(self) -> Monitors:
"""Get positions of all monitors.
If the monitor has rotation, you have to deal with it
inside this method.
This method has to fill self._monitors with all information
and use it as a cache:
self._monitors[0] is a dict of all monitors together
self._monitors[N] is a dict of the monitor N (with N > 0)
Each monitor is a dict with:
{
'left': the x-coordinate of the upper-left corner,
'top': the y-coordinate of the upper-left corner,
'width': the width,
'height': the height
}
"""
if not self._monitors:
with lock:
self._monitors_impl()
return self._monitors
@property
def windows(self) -> Windows:
"""Get ids, names, and proceesses of all windows.
Unlike monitors, this method does not use a cache, as the list of
windows can change at any time.
Each window is a dict with:
{
'id': the window id or handle,
'name': the window name,
'process': the window process name,
'bounds': the window bounds as a dict with:
{
'left': the x-coordinate of the upper-left corner,
'top': the y-coordinate of the upper-left corner,
'width': the width,
'height': the height
}
}
"""
with lock:
self._windows_impl()
return self._windows
def find_windows(self, name: str | None = None, process: str | None = None) -> Windows:
"""Find windows by name and/or process name.
:param str name: The window name.
:param str process: The window process name.
:return list: List of windows.
"""
windows = self.windows
if name is None and process is None:
return windows
if name is None:
return [window for window in windows if window["process"] == process]
if process is None:
return [window for window in windows if window["name"] == name]
return [window for window in windows if window["name"] == name and window["process"] == process]
def save(
self,
/,
*,
mon: int = 0,
win: str | None = None,
proc: str | None = None,
output: str = "monitor-{mon}.png",
callback: Callable[[str], None] | None = None,
) -> Iterator[str]:
"""Grab a screenshot and save it to a file.
:param int mon: The monitor to screenshot (default=0).
-1: grab one screenshot of all monitors
0: grab one screenshot by monitor
N: grab the screenshot of the monitor N
:param str output: The output filename.
It can take several keywords to customize the filename:
- `{mon}`: the monitor number
- `{top}`: the screenshot y-coordinate of the upper-left corner
- `{left}`: the screenshot x-coordinate of the upper-left corner
- `{width}`: the screenshot's width
- `{height}`: the screenshot's height
- `{date}`: the current date using the default formatter
As it is using the `format()` function, you can specify
formatting options like `{date:%Y-%m-%s}`.
:param callable callback: Callback called before saving the
screenshot to a file. Take the `output` argument as parameter.
:return generator: Created file(s).
"""
monitors = self.monitors
if not monitors:
msg = "No monitor found."
raise ScreenShotError(msg)
if win or proc:
windows = self.find_windows(win, proc)
if not windows:
msg = f"Window {(win or proc)!r} not found."
raise ScreenShotError(msg)
window = windows[0]
fname = output.format(win=win or proc, date=datetime.now(UTC) if "{date" in output else None)
if callable(callback):
callback(fname)
sct = self.grab_window(window)
to_png(sct.rgb, sct.size, level=self.compression_level, output=fname)
yield fname
elif mon == 0:
# One screenshot by monitor
for idx, monitor in enumerate(monitors[1:], 1):
fname = output.format(mon=idx, date=datetime.now(UTC) if "{date" in output else None, **monitor)
if callable(callback):
callback(fname)
sct = self.grab(monitor)
to_png(sct.rgb, sct.size, level=self.compression_level, output=fname)
yield fname
else:
# A screenshot of all monitors together or
# a screenshot of the monitor N.
mon = 0 if mon == -1 else mon
try:
monitor = monitors[mon]
except IndexError as exc:
msg = f"Monitor {mon!r} does not exist."
raise ScreenShotError(msg) from exc
output = output.format(mon=mon, date=datetime.now(UTC) if "{date" in output else None, **monitor)
if callable(callback):
callback(output)
sct = self.grab(monitor)
to_png(sct.rgb, sct.size, level=self.compression_level, output=output)
yield output
def shot(self, /, **kwargs: Any) -> str:
"""Helper to save the screenshot of the 1st monitor, by default.
You can pass the same arguments as for ``save``.
"""
kwargs["mon"] = kwargs.get("mon", 1)
return next(self.save(**kwargs))
@staticmethod
def _merge(screenshot: ScreenShot, cursor: ScreenShot, /) -> ScreenShot:
"""Create composite image by blending screenshot and mouse cursor."""
(cx, cy), (cw, ch) = cursor.pos, cursor.size
(x, y), (w, h) = screenshot.pos, screenshot.size
cx2, cy2 = cx + cw, cy + ch
x2, y2 = x + w, y + h
overlap = cx < x2 and cx2 > x and cy < y2 and cy2 > y
if not overlap:
return screenshot
screen_raw = screenshot.raw
cursor_raw = cursor.raw
cy, cy2 = (cy - y) * 4, (cy2 - y2) * 4
cx, cx2 = (cx - x) * 4, (cx2 - x2) * 4
start_count_y = -cy if cy < 0 else 0
start_count_x = -cx if cx < 0 else 0
stop_count_y = ch * 4 - max(cy2, 0)
stop_count_x = cw * 4 - max(cx2, 0)
rgb = range(3)
for count_y in range(start_count_y, stop_count_y, 4):
pos_s = (count_y + cy) * w + cx
pos_c = count_y * cw
for count_x in range(start_count_x, stop_count_x, 4):
spos = pos_s + count_x
cpos = pos_c + count_x
alpha = cursor_raw[cpos + 3]
if not alpha:
continue
if alpha == OPAQUE:
screen_raw[spos : spos + 3] = cursor_raw[cpos : cpos + 3]
else:
alpha2 = alpha / 255
for i in rgb:
screen_raw[spos + i] = int(cursor_raw[cpos + i] * alpha2 + screen_raw[spos + i] * (1 - alpha2))
return screenshot
@staticmethod
def _cfactory(
attr: Any,
func: str,
argtypes: list[Any],
restype: Any,
/,
errcheck: Callable | None = None,
) -> None:
"""Factory to create a ctypes function and automatically manage errors."""
meth = getattr(attr, func)
meth.argtypes = argtypes
meth.restype = restype
if errcheck:
meth.errcheck = errcheck