Skip to content

Commit 7078052

Browse files
committed
pipeline_runner: Support for environment variables and few other improvements
* allow setting local environment variables per task * skip done tasks directly here
1 parent df5589c commit 7078052

File tree

2 files changed

+61
-20
lines changed

2 files changed

+61
-20
lines changed

MC/bin/o2_dpg_workflow_runner.py

Lines changed: 58 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
# Code section to find all topological orderings
2121
# of a DAG. This is used to know when we can schedule
2222
# things in parallel.
23-
#
23+
# Taken from https://www.geeksforgeeks.org/all-topological-sorts-of-a-directed-acyclic-graph/
2424

2525
# class to represent a graph object
2626
class Graph:
@@ -97,6 +97,8 @@ def printAllTopologicalOrders(graph, maxnumber=1):
9797
findAllTopologicalOrders(graph, path, discovered, N, allpaths, maxnumber=maxnumber)
9898
return allpaths
9999

100+
# <--- end code section for topological sorts
101+
100102
# wrapper taking some edges, constructing the graph,
101103
# obtain all topological orderings and some other helper data structures
102104
def analyseGraph(edges, nodes):
@@ -311,14 +313,18 @@ def find_all_dependent_tasks(self, tid):
311313

312314
return list(set(daughterlist))
313315

316+
def get_done_filename(self, tid):
317+
name = self.workflowspec['stages'][tid]['name']
318+
workdir = self.workflowspec['stages'][tid]['cwd']
319+
# name and workdir define the "done" file as used by taskwrapper
320+
# this assumes that taskwrapper is used to actually check if something is to be rerun
321+
done_filename = workdir + '/' + name + '.log_done'
322+
return done_filename
323+
314324
# removes the done flag from tasks that need to be run again
315325
def remove_done_flag(self, listoftaskids):
316326
for tid in listoftaskids:
317-
name = self.workflowspec['stages'][tid]['name']
318-
workdir = self.workflowspec['stages'][tid]['cwd']
319-
# name and workdir define the "done" file as used by taskwrapper
320-
# this assumes that taskwrapper is used to actually check if something is to be rerun
321-
done_filename = workdir + '/' + name + '.log_done'
327+
done_filename = self.get_done_filename(tid)
322328
if args.dry_run:
323329
print ("Would mark task " + name + " as to be done again")
324330
else:
@@ -343,21 +349,39 @@ def submit(self, tid):
343349
if args.dry_run:
344350
drycommand="echo \' " + str(self.scheduling_iteration) + " : would do " + str(self.workflowspec['stages'][tid]['name']) + "\'"
345351
return subprocess.Popen(['/bin/bash','-c',drycommand], cwd=workdir)
346-
347-
return subprocess.Popen(['/bin/bash','-c',c], cwd=workdir)
352+
353+
taskenv = os.environ.copy()
354+
# add task specific environment
355+
if self.workflowspec['stages'][tid].get('env')!=None:
356+
taskenv.update(self.workflowspec['stages'][tid]['env'])
357+
358+
return subprocess.Popen(['/bin/bash','-c',c], cwd=workdir, env=taskenv)
348359

349360
def ok_to_submit(self, tid):
350361
if self.curmembooked + self.maxmemperid[tid] < self.memlimit:
351362
return True
352363
else:
353364
return False
354365

355-
def try_job_from_candidates(self, taskcandidates, process_list):
366+
def ok_to_skip(self, tid):
367+
done_filename = self.get_done_filename(tid)
368+
if os.path.exists(done_filename) and os.path.isfile(done_filename):
369+
return True
370+
return False
371+
372+
def try_job_from_candidates(self, taskcandidates, process_list, finished):
356373
self.scheduling_iteration = self.scheduling_iteration + 1
357374
initialcandidates=taskcandidates.copy()
358375
for tid in initialcandidates:
359376
logging.debug ("trying to submit" + str(tid))
360-
if self.ok_to_submit(tid) and len(process_list) < self.max_jobs_parallel:
377+
# check early if we could skip
378+
# better to do it here (instead of relying on taskwrapper)
379+
if self.ok_to_skip(tid):
380+
finished.append(tid)
381+
taskcandidates.remove(tid)
382+
continue
383+
384+
elif self.ok_to_submit(tid) and len(process_list) < self.max_jobs_parallel:
361385
p=self.submit(tid)
362386
if p!=None:
363387
self.curmembooked+=self.maxmemperid[tid]
@@ -375,8 +399,10 @@ def stop_pipeline_and_exit(self, process_list):
375399

376400
def waitforany(self, process_list, finished):
377401
failuredetected = False
402+
if len(process_list)==0:
403+
return False
404+
378405
for p in list(process_list):
379-
logging.debug ("polling" + str(p))
380406
returncode = 0
381407
if not self.args.dry_run:
382408
returncode = p[1].poll()
@@ -407,15 +433,26 @@ def is_good_candidate(self, candid, finishedtasks):
407433

408434
def emit_code_for_task(self, tid, lines):
409435
logging.debug("Submitting task " + str(self.idtotask[tid]))
410-
c = self.workflowspec['stages'][tid]['cmd']
411-
workdir = self.workflowspec['stages'][tid]['cwd']
436+
taskspec = self.workflowspec['stages'][tid]
437+
c = taskspec['cmd']
438+
workdir = taskspec['cwd']
439+
env = taskspec.get('env')
412440
# in general:
413441
# try to make folder
414442
lines.append('[ ! -d ' + workdir + ' ] && mkdir ' + workdir + '\n')
415443
# cd folder
416444
lines.append('cd ' + workdir + '\n')
445+
# set local environment
446+
if env!=None:
447+
for e in env.items():
448+
lines.append('export ' + e[0] + '=' + str(e[1]) + '\n')
417449
# do command
418450
lines.append(c + '\n')
451+
# unset local environment
452+
if env!=None:
453+
for e in env.items():
454+
lines.append('unset ' + e[0] + '\n')
455+
419456
# cd back
420457
lines.append('cd $OLDPWD\n')
421458

@@ -473,16 +510,18 @@ def execute(self):
473510
# remove weights
474511
candidates = [ tid for tid,_ in candidates ]
475512

476-
logging.debug(candidates)
477-
self.try_job_from_candidates(candidates, process_list)
478-
479513
finished = []
480-
while self.waitforany(process_list, finished):
514+
logging.debug(candidates)
515+
self.try_job_from_candidates(candidates, process_list, finished)
516+
517+
finished_from_started = []
518+
while self.waitforany(process_list, finished_from_started):
481519
if not args.dry_run:
482-
time.sleep(1)
520+
time.sleep(1) # <--- make this incremental (small wait at beginning)
483521
else:
484522
time.sleep(0.01)
485-
523+
524+
finished = finished + finished_from_started
486525
logging.debug("finished " + str( finished))
487526
finishedtasks=finishedtasks + finished
488527

MC/doc/WorkflowRunner.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ still in development. Currently, it follows the following scheme:
5858
},
5959
{
6060
"name": "task2",
61+
"env": { "MY_ENV":1 }
6162
"cmd": "o2-sim-digitizer-workflow"
6263
"needs": [ "task1" ],
6364
"resources": {
@@ -80,7 +81,8 @@ Further keys in this format are:
8081
| `resources` | estimated resource usage for average cpu load (250 = 2.5 CPUs) and maximal memory in MB. Used for scheduling. -1 is used for unknown or don't care. |
8182
| `timeframe` | timeframe index or -1 if not associated to any timeframe. May have influence on order of execution (prefer finish timeframe first) |
8283
| `cwd` | the workding directory where this is to be executed |
83-
| `label` | a list labels, describing this stage. Can be used to execute workfow in stages (such as 'do all digitization', 'run everthing for ITS'
84+
| `label` | a list labels, describing this stage. Can be used to execute workfow in stages (such as 'do all digitization', 'run everthing for ITS' |
85+
| `env` | local environment variables needed by the task |
8486

8587
While a workflow may be written by hand, it's more pratical to have it programmatically generated by sripts, that is sensitive to configuration and options. A current example following the PWGHF embedding exercise can be found here [create_embedding_workflow](https://github.com/AliceO2Group/O2DPG/blob/master/MC/run/PWGHF/create_embedding_workflow.py)
8688

0 commit comments

Comments
 (0)