-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathbuilder.py
More file actions
455 lines (370 loc) · 16.3 KB
/
builder.py
File metadata and controls
455 lines (370 loc) · 16.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
import json
import logging
import os
from collections import defaultdict
from dataclasses import _MISSING_TYPE, dataclass, field
from pathlib import Path
from typing import Any
import yaml
from epicsdbbuilder.recordbase import Record
from lxml import etree, objectify
from lxml.objectify import ObjectifiedElement
from softioc.builder import records
from techui_builder.generate import Generator
from techui_builder.models import Entity, TechUi
from techui_builder.validator import Validator
logger_ = logging.getLogger(__name__)
@dataclass
class JsonMap:
file: str
display_name: str | None
exists: bool = True
duplicate: bool = False
children: list["JsonMap"] = field(default_factory=list)
macros: dict[str, str] = field(default_factory=dict)
error: str = ""
@dataclass
class Builder:
"""
This class provides the functionality to process the required
techui.yaml file into screens mapped from ioc.yaml and
*-mapping.yaml files.
By default it looks for a `techui.yaml` file in the same dir
of the script Guibuilder is called in. Optionally a custom path
can be declared.
"""
techui: Path = field(default=Path("techui.yaml"))
entities: defaultdict[str, list[Entity]] = field(
default_factory=lambda: defaultdict(list), init=False
)
devsta_pvs: dict[str, Record] = field(default_factory=dict, init=False)
_services_dir: Path = field(init=False, repr=False)
_gui_map: dict = field(init=False, repr=False)
_write_directory: Path = field(default=Path("opis"), init=False, repr=False)
def __post_init__(self):
# Populate beamline and components
self.conf = TechUi.model_validate(
yaml.safe_load(self.techui.read_text(encoding="utf-8"))
)
def setup(self):
"""Run intial setup, e.g. extracting entries from service ioc.yaml."""
self._extract_services()
synoptic_dir = self._write_directory
self.clean_files()
self.generator = Generator(synoptic_dir, self.conf.beamline.url)
def clean_files(self):
exclude = {"index.bob"}
bobs = [
bob
for bob in self._write_directory.glob("*.bob")
if bob.name not in exclude
]
self.validator = Validator(bobs)
self.validator.check_bobs()
# Get bobs that are only present in the bobs list (i.e. generated)
self.generated_bobs = list(set(bobs) ^ set(self.validator.validate.values()))
logger_.info("Preserving edited screens for validation.")
logger_.debug(f"Screens to validate: {list(self.validator.validate.keys())}")
logger_.info("Cleaning synoptic/ of generated screens.")
try:
# Find the JsonMap file
json_map_file = next(self._write_directory.glob("JsonMap.json"))
# If it exists, we want to remove it too
generated_files = [*self.generated_bobs, json_map_file]
except StopIteration:
generated_files = self.generated_bobs
# Remove any generated files that exist
for file_ in generated_files:
logger_.debug(f"Removing generated file: {file_.name}")
os.remove(file_)
def _create_devsta_pv(self, prefix: str, inputs: list[str]):
# Extract all input PVs, provided a default "" if not provided
values = [(inputs[i] if i < len(inputs) else "") for i in range(12)]
inpa, inpb, inpc, inpd, inpe, inpf, inpg, inph, inpi, inpj, inpk, inpl = values
devsta_pv = records.calc( # pyright: ignore[reportAttributeAccessIssue]
f"{prefix}:DEVSTA",
CALC="(A|B|C|D|E|F|G|H|I|J|K|L)>0?1:0",
SCAN="1 second",
ACKT="NO",
INPA=inpa,
INPB=inpb,
INPC=inpc,
INPD=inpd,
INPE=inpe,
INPF=inpf,
INPG=inpg,
INPH=inph,
INPI=inpi,
INPJ=inpj,
INPK=inpk,
INPL=inpl,
)
self.devsta_pvs[prefix] = devsta_pv
def write_devsta_pvs(self):
conf_dir = self._write_directory.joinpath("config")
# Create the config/ dir if it doesn't exist
if not conf_dir.exists():
os.mkdir(conf_dir)
with open(conf_dir.joinpath("devsta.db"), "w") as f:
# Add a header explaining the file is autogenerated
f.write("#" * 51 + "\n")
f.write(
"#" * 2
+ " THIS FILE HAS BEEN AUTOGENERATED; DO NOT EDIT "
+ "#" * 2
+ "\n"
)
f.write("#" * 51 + "\n")
# Write the devsta PVs
for dpv in self.devsta_pvs.values():
dpv.Print(f)
def _extract_services(self):
"""
Finds the services folders in the services directory
and extracts all entites
"""
# Loop over every dir in services, ignoring anything that isn't a service
for service in self._services_dir.glob(f"{self.conf.beamline.long_dom}-*-*-*"):
# If service doesn't exist, file open will fail throwing exception
try:
self._extract_entities(ioc_yaml=service.joinpath("config/ioc.yaml"))
except OSError:
logger_.error(
f"No ioc.yaml file for service: [bold]{service.name}[/bold]. \
Does it exist?"
)
def _extract_entities(self, ioc_yaml: Path):
"""
Extracts the entries in ioc.yaml matching the defined prefix
"""
with open(ioc_yaml) as ioc:
ioc_conf: dict[str, list[dict[str, str]]] = yaml.safe_load(ioc)
for entity in ioc_conf["entities"]:
if "P" in entity.keys():
# Create Entity and append to entity list
new_entity = Entity(
type=entity["type"],
desc=entity.get("desc", None),
P=entity["P"],
M=None if (val := entity.get("M")) is None else val,
R=None if (val := entity.get("R")) is None else val,
)
self.entities[new_entity.P].append(new_entity)
def _generate_screen(self, screen_name: str):
self.generator.build_screen(screen_name)
self.generator.write_screen(screen_name, self._write_directory)
def _validate_screen(self, screen_name: str):
# Get the generated widgets to validate against
widgets = self.generator.widgets
widget_group = self.generator.group
assert widget_group is not None
widget_group_name = widget_group.get_element_value("name")
self.validator.validate_bob(screen_name, widget_group_name, widgets)
def create_screens(self):
"""Create the screens for each component in techui.yaml"""
if len(self.entities) == 0:
logger_.critical("No ioc entities found, has setup() been run?")
exit()
# Loop over every component defined in techui.yaml and locate
# any extras defined
for component_name, component in self.conf.components.items():
screen_entities: list[Entity] = []
if component.devsta is not None:
self._create_devsta_pv(component.prefix, component.devsta)
# ONLY IF there is a matching component and entity, generate a screen
if component.prefix in self.entities.keys():
screen_entities.extend(self.entities[component.prefix])
if component.extras is not None:
# If component has any extras, add them to the entries to generate
for extra_p in component.extras:
if extra_p not in self.entities.keys():
logger_.error(
f"Extra prefix {extra_p} for {component_name} does not \
exist."
)
continue
screen_entities.extend(self.entities[extra_p])
# This is used by both generate and validate,
# so called beforehand for tidyness
self.generator.build_widgets(component_name, screen_entities)
self.generator.build_groups(component_name)
screens_to_validate = list(self.validator.validate.keys())
if component_name in screens_to_validate:
self._validate_screen(component_name)
else:
self._generate_screen(component_name)
else:
logger_.warning(
f"{self.techui.name}: The prefix [bold]{component.prefix}[/bold]\
set in the component [bold]{component_name}[/bold] does not match any P field in the\
ioc.yaml files in services"
)
def _generate_json_map(self, screen_path: Path, dest_path: Path) -> JsonMap:
"""Recursively generate JSON map from .bob file tree"""
# Create initial node at top of .bob file
current_node = JsonMap(
str(screen_path.relative_to(self._write_directory)),
display_name=None,
)
abs_path = screen_path.absolute()
try:
# Create xml tree from .bob file
tree = objectify.parse(abs_path)
root: ObjectifiedElement = tree.getroot()
# Set top level display name from root element
current_node.display_name = self._parse_display_name(
root.name.text, screen_path
)
# Find all <widget> elements
widgets = [
w
for w in root.findall(".//widget")
if w.get("type", default=None)
# in ["symbol", "embedded", "action_button"]
in ["symbol", "action_button"]
]
for widget_elem in widgets:
# Obtain macros associated with file_elem
macro_dict: dict[str, str] = {}
widget_type = widget_elem.get("type", default=None)
match widget_type:
case "symbol" | "action_button":
open_display = _get_action_group(widget_elem)
if open_display is None:
continue
# Use file, name, and macro elements
file_elem = open_display.file
name_elem = widget_elem.name.text
macro_dict = self._get_macros(open_display)
# case "embedded":
# file_elem = widget_elem.file
# macro_dict = _get_macros(widget_elem)
case _:
continue
# Extract file path from file_elem
file_path = Path(file_elem.text.strip() if file_elem.text else "")
# If file is already a .bob file, skip it
if not file_path.suffix == ".bob":
continue
# Create valid displayName
display_name = self._parse_display_name(name_elem, file_path)
# TODO: misleading var name?
next_file_path = dest_path.joinpath(file_path)
# Crawl the next file
if next_file_path.is_file():
# TODO: investigate non-recursive approaches?
child_node = self._generate_json_map(next_file_path, dest_path)
else:
child_node = JsonMap(str(file_path), display_name, exists=False)
child_node.macros = macro_dict
# TODO: make this work for only list[JsonMap]
assert isinstance(current_node.children, list)
# TODO: fix typing
current_node.children.append(child_node)
except etree.ParseError as e:
current_node.error = f"XML parse error: {e}"
except Exception as e:
current_node.error = str(e)
self._fix_duplicate_names(current_node)
return current_node
def _get_macros(self, element: ObjectifiedElement):
if hasattr(element, "macros"):
macros = element.macros.getchildren()
if macros is not None:
return {
str(macro.tag): macro.text
for macro in macros
if macro.text is not None
}
return {}
def _parse_display_name(self, name: str | None, file_path: Path) -> str | None:
"""Parse display name from <name> tag or file_path"""
if name:
# Return name tag text as displayName
return name
elif file_path.name:
# Use tail without file ext as displayName
return file_path.name[: -sum(len(suffix) for suffix in file_path.suffixes)]
else:
# Populate displayName with null
return None
def _fix_duplicate_names(self, node: JsonMap) -> None:
"""Recursively fix duplicate display names in children"""
if not node.children:
return
# Count occurrences of each display_name
name_counts: defaultdict[str | None, int] = defaultdict(int)
for child in node.children:
if child.display_name:
name_counts[child.display_name] += 1
# Track which number we're on for each duplicate name
name_indices: defaultdict[str | None, int] = defaultdict(int)
# Update display names for duplicates
for child in node.children:
if child.display_name and name_counts[child.display_name] > 1:
name_indices[child.display_name] += 1
child.display_name = (
f"{child.display_name} {name_indices[child.display_name]}"
)
# Recursively fix children
self._fix_duplicate_names(child)
def write_json_map(
self,
synoptic: Path = Path("example/t01-services/synoptic/index.bob"),
dest: Path = Path("example/t01-services/synoptic"),
):
"""
Maps the valid entries from the ioc.yaml file
to the required screen in *-mapping.yaml
"""
if not synoptic.exists():
raise FileNotFoundError(
f"Cannot generate json map for {synoptic}. Has it been generated?"
)
map = self._generate_json_map(synoptic, dest)
with open(dest.joinpath("JsonMap.json"), "w") as f:
f.write(
json.dumps(map, indent=4, default=lambda o: _serialise_json_map(o))
+ "\n"
)
# Function to convert the JsonMap objects into dictionaries,
# while ignoring default values
def _serialise_json_map(map: JsonMap) -> dict[str, Any]:
def _check_default(key: str, value: Any):
# Is a default factory used? (e.g. list, dict, ...)
if not isinstance(
JsonMap.__dataclass_fields__[key].default_factory, _MISSING_TYPE
):
# If so, check if value is the same as default factory
default = JsonMap.__dataclass_fields__[key].default_factory()
else:
# If not, check if value is the default value
default = JsonMap.__dataclass_fields__[key].default
return value == default
d = {}
# Loop over everything in the json map object's dictionary
for key, val in map.__dict__.items():
# If children has nested JsonMap object, serialise that too
if key == "children" and len(val) > 0:
val = [_serialise_json_map(v) for v in val]
# only include any items if they are not the default value
if _check_default(key, val):
continue
d[key] = val
# Rename display_name to displayName for JSON camel case convention
if "display_name" in d:
d["displayName"] = d.pop("display_name")
return d
# File and desc are under the "actions",
# so the corresponding tag needs to be found
def _get_action_group(element: ObjectifiedElement) -> ObjectifiedElement | None:
try:
actions = element.actions
assert actions is not None
for action in actions.iterchildren("action"):
if action.get("type", default=None) == "open_display":
return action
return None
except AttributeError:
# TODO: Find better way of handling there being no "actions" group
logger_.error(f"Actions group not found in component: {element.text}")