-
Notifications
You must be signed in to change notification settings - Fork 34
Expand file tree
/
Copy pathmodels.py
More file actions
326 lines (261 loc) · 10.4 KB
/
models.py
File metadata and controls
326 lines (261 loc) · 10.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
"""Models for IPP."""
# pylint: disable=R0912,R0915
from __future__ import annotations
from dataclasses import asdict, dataclass
from datetime import datetime, timedelta, timezone
from typing import Any
from yarl import URL
from .parser import parse_ieee1284_device_id, parse_make_and_model
PRINTER_STATES = {3: "idle", 4: "printing", 5: "stopped"}
@dataclass
class Info:
"""Object holding information from IPP."""
name: str
printer_name: str
printer_uri_supported: list[str]
uptime: int
command_set: str | None = None
location: str | None = None
manufacturer: str | None = None
model: str | None = None
printer_info: str | None = None
serial: str | None = None
uuid: str | None = None
version: str | None = None
more_info: str | None = None
@staticmethod
def from_dict(data: dict[str, Any]) -> Info:
"""Return Info object from IPP response."""
cmd = None
name = "IPP Printer"
name_parts = []
serial = None
_printer_name = printer_name = data.get("printer-name", "")
make_model = data.get("printer-make-and-model", "")
device_id = data.get("printer-device-id", "")
uri_supported = data.get("printer-uri-supported", [])
uuid = data.get("printer-uuid")
if not isinstance(uri_supported, list):
uri_supported = [str(uri_supported)]
for uri in uri_supported:
if (URL(uri).path.lstrip("/")) == _printer_name.lstrip("/"):
_printer_name = ""
break
make, model = parse_make_and_model(make_model)
parsed_device_id = parse_ieee1284_device_id(device_id)
if parsed_device_id.get("MFG") is not None and len(parsed_device_id["MFG"]) > 0:
make = parsed_device_id["MFG"]
name_parts.append(make)
if parsed_device_id.get("MDL") is not None and len(parsed_device_id["MDL"]) > 0:
model = parsed_device_id["MDL"]
name_parts.append(model)
if parsed_device_id.get("CMD") is not None and len(parsed_device_id["CMD"]) > 0:
cmd = parsed_device_id["CMD"]
if parsed_device_id.get("SN") is not None and len(parsed_device_id["SN"]) > 0:
serial = parsed_device_id["SN"]
if len(make_model) > 0:
name = make_model
elif len(name_parts) == 2:
name = " ".join(name_parts)
elif len(_printer_name) > 0:
name = _printer_name
return Info(
command_set=cmd,
location=data.get("printer-location", ""),
name=name,
manufacturer=make,
model=model,
printer_name=printer_name,
printer_info=data.get("printer-info"),
printer_uri_supported=uri_supported,
serial=serial,
uptime=data.get("printer-up-time", 0),
uuid=uuid[9:] if uuid else None, # strip urn:uuid: from uuid
version=data.get("printer-firmware-string-version"),
more_info=data.get("printer-more-info"),
)
@dataclass
class Marker:
"""Object holding marker (ink) info from IPP."""
marker_id: int
marker_type: str
name: str
color: str
level: int
low_level: int
high_level: int
@dataclass
class Uri:
"""Object holding URI info from IPP."""
uri: str
authentication: str | None
security: str | None
@dataclass
class State:
"""Object holding the IPP printer state."""
printer_state: str
reasons: str | None
message: str | None
@staticmethod
def from_dict(data: dict[str, Any]) -> State:
"""Return State object from IPP response."""
state = data.get("printer-state", 0)
if (reasons := data.get("printer-state-reasons")) == "none":
reasons = None
return State(
printer_state=PRINTER_STATES.get(state, state),
reasons=reasons,
message=data.get("printer-state-message"),
)
@dataclass
class Printer:
"""Object holding the IPP printer information."""
info: Info
markers: list[Marker]
state: State
uris: list[Uri]
booted_at: datetime
def as_dict(self) -> dict[str, Any]:
"""Return dictionary version of this printer."""
return {
"info": asdict(self.info),
"state": asdict(self.state),
"markers": [asdict(marker) for marker in self.markers],
"uris": [asdict(uri) for uri in self.uris],
"booted_at": self.booted_at,
}
def update_from_dict(self, data: dict[str, Any]) -> Printer:
"""Return updated Printer object from IPP response data."""
last_uptime = self.info.uptime
self.info = Info.from_dict(data)
self.markers = Printer.merge_marker_data(data)
self.state = State.from_dict(data)
self.uris = Printer.merge_uri_data(data)
if self.info.uptime < last_uptime:
self.booted_at = _utcnow() - timedelta(seconds=self.info.uptime)
return self
@staticmethod
def from_dict(data: dict[str, Any]) -> Printer:
"""Return Printer object from IPP response data."""
info = Info.from_dict(data)
return Printer(
info=info,
markers=Printer.merge_marker_data(data),
state=State.from_dict(data),
uris=Printer.merge_uri_data(data),
booted_at=(_utcnow() - timedelta(seconds=info.uptime)),
)
@staticmethod
def merge_marker_data( # noqa: PLR0912, C901
data: dict[str, Any],
) -> list[Marker]:
"""Return Marker data from IPP response."""
marker_names = []
marker_colors = []
marker_levels = []
marker_types = []
marker_highs = []
marker_lows = []
if not data.get("marker-names"):
return []
if isinstance(data["marker-names"], list):
marker_names = data["marker-names"]
elif isinstance(data["marker-names"], str):
marker_names = [data["marker-names"]]
if not (mlen := len(marker_names)):
return []
for _ in range(mlen):
marker_colors.append("")
marker_levels.append(-2)
marker_types.append("unknown")
marker_highs.append(100)
marker_lows.append(0)
if isinstance(data.get("marker-colors"), list):
for index, list_value in enumerate(data["marker-colors"]):
if index < mlen:
marker_colors[index] = list_value
elif isinstance(data.get("marker-colors"), str) and mlen == 1:
marker_colors[0] = data["marker-colors"]
if isinstance(data.get("marker-levels"), list):
for index, list_value in enumerate(data["marker-levels"]):
if index < mlen:
marker_levels[index] = list_value
elif isinstance(data.get("marker-levels"), int) and mlen == 1:
marker_levels[0] = data["marker-levels"]
if isinstance(data.get("marker-high-levels"), list):
for index, list_value in enumerate(data["marker-high-levels"]):
if index < mlen:
marker_highs[index] = list_value
elif isinstance(data.get("marker-high-levels"), int) and mlen == 1:
marker_highs[0] = data["marker-high-levels"]
if isinstance(data.get("marker-low-levels"), list):
for index, list_value in enumerate(data["marker-low-levels"]):
if index < mlen:
marker_lows[index] = list_value
elif isinstance(data.get("marker-low-levels"), int) and mlen == 1:
marker_lows[0] = data["marker-low-levels"]
if isinstance(data.get("marker-types"), list):
for index, list_value in enumerate(data["marker-types"]):
if index < mlen:
marker_types[index] = list_value
elif isinstance(data.get("marker-types"), str) and mlen == 1:
marker_types[0] = data["marker-types"]
markers = [
Marker(
marker_id=marker_id,
marker_type=marker_types[marker_id],
name=marker_names[marker_id],
color=marker_colors[marker_id],
level=marker_levels[marker_id],
high_level=marker_highs[marker_id],
low_level=marker_lows[marker_id],
)
for marker_id in range(mlen)
]
markers.sort(key=lambda x: x.name)
return markers
@staticmethod
def merge_uri_data(data: dict[str, Any]) -> list[Uri]: # noqa: PLR0912
"""Return URI data from IPP response."""
_uris: list[str] = []
auth: list[str | None] = []
security: list[str | None] = []
if not data.get("printer-uri-supported"):
return []
if isinstance(data["printer-uri-supported"], list):
_uris = data["printer-uri-supported"]
elif isinstance(data["printer-uri-supported"], str):
_uris = [data["printer-uri-supported"]]
if not (ulen := len(_uris)):
return []
for _ in range(ulen):
auth.append(None)
security.append(None)
if isinstance(data.get("uri-authentication-supported"), list):
for k, list_value in enumerate(data["uri-authentication-supported"]):
if k < ulen:
auth[k] = _str_or_none(list_value)
elif isinstance(data.get("uri-authentication-supported"), str) and ulen == 1:
auth[0] = _str_or_none(data["uri-authentication-supported"])
if isinstance(data.get("uri-security-supported"), list):
for k, list_value in enumerate(data["uri-security-supported"]):
if k < ulen:
security[k] = _str_or_none(list_value)
elif isinstance(data.get("uri-security-supported"), str) and ulen == 1:
security[0] = _str_or_none(data["uri-security-supported"])
return [
Uri(
uri=_uris[uri_id],
authentication=auth[uri_id],
security=security[uri_id],
)
for uri_id in range(ulen)
]
def _utcnow() -> datetime:
"""Return the current date and time in UTC."""
return datetime.now(tz=timezone.utc)
def _str_or_none(value: str) -> str | None:
"""Return string while handling string representations of None."""
if value == "none":
return None
return value