forked from AliceO2Group/O2DPG
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy patho2dpg_workflow_utils.py
More file actions
executable file
·340 lines (273 loc) · 11.5 KB
/
o2dpg_workflow_utils.py
File metadata and controls
executable file
·340 lines (273 loc) · 11.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
#!/usr/bin/env python3
from os import environ, getcwd
from copy import deepcopy
import json
# List of active detectors
ACTIVE_DETECTORS = ["all"]
INACTIVE_DETECTORS = []
def activate_detector(det):
try:
# first of all remove "all" if a specific detector is passed
ind = ACTIVE_DETECTORS.index("all")
del ACTIVE_DETECTORS[ind]
except ValueError:
pass
ACTIVE_DETECTORS.append(det)
def deactivate_detector(det):
INACTIVE_DETECTORS.append(det)
def isActive(det):
def check(detector):
return detector not in INACTIVE_DETECTORS and ("all" in ACTIVE_DETECTORS or detector in ACTIVE_DETECTORS)
if det == "ITS": # special remapping for upgrade only needed in one direction since IT3 output pretends to be ITS
return check("ITS") or check("IT3")
else:
return check(det)
def compute_n_workers(interaction_rate, collision_system, n_workers_user=8, n_workers_min=1, interaction_rate_linear_below=300000):
"""
Compute number of workers
n_workers = m * IR + b
based on
https://indico.cern.ch/event/1395900/contributions/5868567/attachments/2823967/4932440/20240320_slides_cpu_eff.pdf, slide 3
Assume n_workers_in=8 to be ideal for pp IR > interaction_rate_linear_below
Start with 1 worker at IR=0
Go linearly until interaction_rate_linear_below
"""
if collision_system == "PbPb" or interaction_rate >= interaction_rate_linear_below:
return n_workers_user
n_workers_min = max(1, n_workers_min)
m = (n_workers_user - n_workers_min) / interaction_rate_linear_below
# at least 1 worker
return max(1, round(m * interaction_rate + n_workers_min))
def relativeCPU(n_rel, n_workers):
# compute number of CPUs from a given number of workers
# n_workers and a fraction n_rel
# catch cases where n_rel > 1 or n_workers * n_rel
return round(min(n_workers, n_workers * n_rel), 2)
def trimString(cmd):
# trim unnecessary spaces
return ' '.join(cmd.split())
def make_workflow_filename(filename):
if filename.lower().rfind(".json") < 0:
# append extension if not there
return filename + ".json"
return filename
def update_workflow_resource_requirements(workflow, n_workers):
"""Update resource requirements/settings
"""
for s in workflow:
if s["resources"]["relative_cpu"]:
s["resources"]["cpu"] = relativeCPU(s["resources"]["relative_cpu"], n_workers)
def createTask(name='', needs=[], tf=-1, cwd='./', lab=[], cpu=1, relative_cpu=None, mem=500, n_workers=8):
"""Creates and new task. A task is a dictionary/class with typically the following attributes
Args:
name: str
task name
needs: list
list of task names this tasks depends on
tf: int
associated timeframe
cwd: str
working directory of this task, will be created automatically
lab: list
list of labels to be attached
cpu: float
absolute number of CPU this task uses/needs on average
relative_cpu: float or None
if given, cpu is recomputed based on the number of available workers
mem: int
memory size needed by this task
Returns:
dict representing the task
"""
if relative_cpu is not None:
# Re-compute, if relative number of CPUs requested
cpu = relativeCPU(relative_cpu, n_workers)
return { 'name': name,
'cmd':'',
'needs': needs,
'resources': { 'cpu': cpu, 'relative_cpu': relative_cpu , 'mem': mem },
'timeframe' : tf,
'labels' : lab,
'cwd' : cwd }
def createGlobalInitTask(keys_values=None, set_defaults=True):
"""Returns a special task that is recognized by the executor as
a task whose environment section is to be globally applied to all tasks of
a workflow.
Args:
keys_values: dict or None
dictionary of environment variables and values to be globally applied to all tasks
if sharing keys with defaults, keys_values takes precedence
set_defaults: bool
whether or not some default values will be added
Returns:
dict: task dictionary
"""
# dictionary holding global environment to be passed to task
env_dict = {}
if set_defaults:
if environ.get('ALICEO2_CCDB_LOCALCACHE') is None:
print ("ALICEO2_CCDB_LOCALCACHE not set; setting to default " + getcwd() + '/ccdb')
env_dict['ALICEO2_CCDB_LOCALCACHE'] = getcwd() + "/ccdb"
else:
# fixes the workflow to use and remember externally provided path
env_dict['ALICEO2_CCDB_LOCALCACHE'] = environ.get('ALICEO2_CCDB_LOCALCACHE')
env_dict['IGNORE_VALIDITYCHECK_OF_CCDB_LOCALCACHE'] = '${ALICEO2_CCDB_LOCALCACHE:+"ON"}'
if keys_values:
# keys_values takes priority in case of same keys
env_dict |= keys_values
t = createTask(name = '__global_init_task__')
t['cmd'] = 'NO-COMMAND'
t['env'] = env_dict
return t
def summary_workflow(workflow):
print("=== WORKFLOW SUMMARY ===\n")
print(f"-> There are {len(workflow)} tasks")
def dump_workflow(workflow, filename, meta=None):
"""write this workflow to a file
Args:
workflow: list
stages of this workflow
filename: str
name of the output file
"""
# Sanity checks on list of tasks
check_workflow(workflow)
taskwrapper_string = "${O2_ROOT}/share/scripts/jobutils2.sh; taskwrapper"
# prepare for dumping, deepcopy to detach from this instance
to_dump = deepcopy(workflow)
for s in to_dump:
if s["cmd"] and s["name"] != '__global_init_task__' and taskwrapper_string not in s["cmd"]:
# insert taskwrapper stuff if not there already, only do it if cmd string is not empty
s['cmd'] = '. ' + taskwrapper_string + ' ' + s['name']+'.log \'' + s['cmd'] + '\''
# remove unnecessary whitespaces for better readibility
s['cmd'] = trimString(s['cmd'])
# remove None entries from needs list
s['needs'] = [ n for n in s['needs'] if n != None ]
# make the final dict to be dumped
to_dump = {"stages": to_dump}
filename = make_workflow_filename(filename)
to_dump["meta"] = meta if meta else {}
with open(filename, 'w') as outfile:
json.dump(to_dump, outfile, indent=2)
print(f"Workflow saved at {filename}")
def read_workflow(filename):
workflow = None
filename = make_workflow_filename(filename)
with open(filename, "r") as wf_file:
loaded = json.load(wf_file)
workflow =loaded["stages"]
meta = loaded.get("meta", {})
return workflow, meta
def check_workflow_dependencies(workflow, collect_warnings, collect_errors):
"""check dependencies among tasks
Args:
collect_warnings: list
collect all warnings that might come up
collect_errors: list
collect all errors that might come up
"""
is_sane = True
needed = []
names = []
for s in workflow:
needed.extend(s["needs"])
names.append(s["name"])
# remove potential duplicates
needed = list(set(needed))
for n in needed:
if n not in names:
# For now, only add a warning since tasks might still be added
collect_warnings.append(f"WARNING: Task {n} is needed but is not in tasks (might be added later)")
is_sane = False
return is_sane
def check_workflow_unique_names(workflow, collect_warnings, collect_errors):
"""check for uniqueness of task names
Args:
collect_warnings: list
collect all warnings that might come up
collect_errors: list
collect all errors that might come up
"""
is_sane = True
dupl = []
for s in workflow:
if s["name"] in dupl:
# That is an error since adding another task for instance would not solve that
collect_errors.append(f"Task with {s['name']} already defined")
is_sane = False
continue
dupl.append(s["name"])
return is_sane
def check_workflow(workflow):
"""Conduct sanity checks for this workflow
"""
collect_warnings = []
collect_errors = []
is_sane = check_workflow_dependencies(workflow, collect_warnings, collect_errors) and check_workflow_unique_names(workflow, collect_warnings, collect_errors)
print(f"=== There are {len(collect_warnings)} warnings ===")
for w in collect_warnings:
print(w)
print(f"=== There are {len(collect_errors)} errors ===")
for e in collect_errors:
print(e)
if is_sane:
print("===> The workflow looks sane")
else:
print("===> Please check warnings and errors!")
return is_sane
# Adjusts software version for RECO (and beyond) stages
# (if this is wished). Function implements specific wish from operations
# to be able to operate with different sim and reco software versions (due to different speed of development and fixes and patching).
def adjust_RECO_environment(workflowspec, package = ""):
if len(package) == 0:
return
# we try to extract the stage from the path (can be given via '@' separation)
# example O2sim::daily-xxx@DIGI ---> apply this environment from the DIGI phase
# example O2sim::daily-xxx@RECO ---> apply this environment from the RECO phase
# example O2sim::daily-xxx ---> apply this environment from the RECO phase == default case
from_stage = "RECO"
if package.count('@') == 1:
package, from_stage = package.split('@')
# We essentially need to go through the graph and apply the mapping
# so take the workflow spec and see if the task itself or any child
# is labeled RECO ---> typical graph traversal with caching
# helper structures
taskuniverse = [ l['name'] for l in workflowspec['stages'] ]
tasktoid = {}
for i in range(len(taskuniverse)):
tasktoid[taskuniverse[i]]=i
matches_label = {}
# internal helper for recursive graph traversal
def matches_or_inherits_label(taskid, label, cache):
if cache.get(taskid) != None:
return cache[taskid]
result = False
if label in workflowspec['stages'][taskid]['labels']:
result = True
else:
# check mother tasks
for mothertask in workflowspec['stages'][taskid]['needs']:
motherid = tasktoid[mothertask]
if matches_or_inherits_label(motherid, label, cache):
result = True
break
cache[taskid] = result
return result
# fills the matches_label dictionary
for taskid in range(len(workflowspec['stages'])):
if (matches_or_inherits_label(taskid, from_stage, matches_label)):
# now we do the final adjust (as annotation) in the workflow itself
if workflowspec['stages'][taskid].get('disable_alternative_reco_software', False) != True:
if workflowspec['stages'][taskid].get("alternative_alienv_package") == None:
workflowspec['stages'][taskid]["alternative_alienv_package"] = package
def merge_dicts(dict1, dict2):
"""
merges dict2 into dict1 (potentially overwriting values)
"""
for key, value in dict2.items():
if key in dict1 and isinstance(dict1[key], dict) and isinstance(value, dict):
# If both are dictionaries, merge them recursively
merge_dicts(dict1[key], value)
else:
# Otherwise, overwrite dict1's value with dict2's value
dict1[key] = value