Skip to content

Commit 810c60d

Browse files
committed
Workflow runner: Several new features
This adds the ability to * produce a shell script to run the workflow standalone and serialized * restart from certain task, where all dependent tasks are automatically redone * otherwise use skipping done tasks consistently * option to just list the tasks in the workflow
1 parent 86b33b8 commit 810c60d

File tree

1 file changed

+100
-19
lines changed

1 file changed

+100
-19
lines changed

MC/bin/o2_dpg_workflow_runner.py

Lines changed: 100 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,8 @@ def __init__(self, edges, N):
4646

4747

4848
# Recursive function to find all topological orderings of a given DAG
49-
def findAllTopologicalOrders(graph, path, discovered, N, allpaths):
50-
if len(allpaths) >= 2000:
49+
def findAllTopologicalOrders(graph, path, discovered, N, allpaths, maxnumber=1):
50+
if len(allpaths) >= maxnumber:
5151
# print ('More than 2000 paths found')
5252
return
5353

@@ -84,7 +84,7 @@ def findAllTopologicalOrders(graph, path, discovered, N, allpaths):
8484

8585

8686
# get all topological orderings of a given DAG as a list
87-
def printAllTopologicalOrders(graph):
87+
def printAllTopologicalOrders(graph, maxnumber=1):
8888
# get number of nodes in the graph
8989
N = len(graph.adjList)
9090

@@ -95,7 +95,7 @@ def printAllTopologicalOrders(graph):
9595
path = []
9696
allpaths = []
9797
# find all topological ordering and print them
98-
findAllTopologicalOrders(graph, path, discovered, N, allpaths)
98+
findAllTopologicalOrders(graph, path, discovered, N, allpaths, maxnumber=maxnumber)
9999
return allpaths
100100

101101
# wrapper taking some edges, constructing the graph,
@@ -116,9 +116,8 @@ def analyseGraph(edges, nodes):
116116

117117
# find topological orderings of the graph -> not used for moment
118118
# create a graph from edges
119-
# graph = Graph(edges, N)
120-
# allorderings = printAllTopologicalOrders(graph)
121-
allorderings=[[]]
119+
graph = Graph(edges, N)
120+
orderings = printAllTopologicalOrders(graph)
122121
# find out "can be followed by" for each node
123122
# can be followed does not mean that all requirements are met though
124123
# nextjob={}
@@ -134,7 +133,7 @@ def analyseGraph(edges, nodes):
134133

135134
# print(nextjob)
136135

137-
return (allorderings, nextjobtrivial)
136+
return (orderings, nextjobtrivial)
138137

139138

140139
def draw_workflow(workflowspec):
@@ -165,7 +164,7 @@ def draw_workflow(workflowspec):
165164
# builds accompagnying structures tasktoid and idtotask
166165
def build_graph(taskuniverse, workflowspec):
167166
tasktoid={ t[0]['name']:i for i, t in enumerate(taskuniverse, 0) }
168-
print (tasktoid)
167+
# print (tasktoid)
169168

170169
nodes = []
171170
edges = []
@@ -208,8 +207,8 @@ def getweight(tid):
208207

209208
task_weights = [ getweight(tid) for tid in range(len(globaltaskuniverse)) ]
210209

211-
print (global_next_tasks)
212-
return { 'nexttasks' : global_next_tasks, 'weights' : task_weights }
210+
# print (global_next_tasks)
211+
return { 'nexttasks' : global_next_tasks, 'weights' : task_weights, 'topological_ordering' : tup[0] }
213212

214213

215214
#
@@ -226,7 +225,7 @@ def __init__(self, workflowfile, args, jmax=100):
226225
draw_workflow(self.workflowspec)
227226
self.possiblenexttask = workflow['nexttasks']
228227
self.taskweights = workflow['weights']
229-
print (self.possiblenexttask)
228+
self.topological_orderings = workflow['topological_ordering']
230229
self.taskuniverse = [ l['name'] for l in self.workflowspec['stages'] ]
231230
self.idtotask = [ 0 for l in self.taskuniverse ]
232231
self.tasktoid = {}
@@ -242,13 +241,37 @@ def __init__(self, workflowfile, args, jmax=100):
242241
self.stoponfailure = True
243242
self.max_jobs_parallel = int(jmax)
244243
self.scheduling_iteration = 0
245-
244+
246245
def getallrequirements(self, t):
247246
l=[]
248247
for r in self.workflowspec['stages'][self.tasktoid[t]]['needs']:
249248
l.append(r)
250249
l=l+self.getallrequirements(r)
251250
return l
251+
252+
# find all tasks that depend on a given task (id)
253+
def find_all_dependent_tasks(self, tid):
254+
daughterlist=[tid]
255+
# possibly recurse
256+
for n in self.possiblenexttask[tid]:
257+
daughterlist = daughterlist + self.find_all_dependent_tasks(n)
258+
259+
return list(set(daughterlist))
260+
261+
# removes the done flag from tasks that need to be run again
262+
def remove_done_flag(self, listoftaskids):
263+
for tid in listoftaskids:
264+
name = self.workflowspec['stages'][tid]['name']
265+
workdir = self.workflowspec['stages'][tid]['cwd']
266+
# name and workdir define the "done" file as used by taskwrapper
267+
# this assumes that taskwrapper is used to actually check if something is to be rerun
268+
done_filename = workdir + '/' + name + '.log_done'
269+
if args.dry_run:
270+
print ("Would mark task " + name + " as to be done again")
271+
else:
272+
print ("Marking task " + name + " as to be done again")
273+
if os.path.exists(done_filename) and os.path.isfile(done_filename):
274+
os.remove(done_filename)
252275

253276
# submits a task as subprocess and records Popen instance
254277
def submit(self, tid):
@@ -257,11 +280,11 @@ def submit(self, tid):
257280
workdir = self.workflowspec['stages'][tid]['cwd']
258281
if not workdir=='':
259282
if os.path.exists(workdir) and not os.path.isdir(workdir):
260-
logging.error('Cannot create working dir ... some other resource exists already')
261-
return None
283+
logging.error('Cannot create working dir ... some other resource exists already')
284+
return None
262285

263286
if not os.path.isdir(workdir):
264-
os.mkdir(workdir)
287+
os.mkdir(workdir)
265288

266289
self.procstatus[tid]='Running'
267290
if args.dry_run:
@@ -329,7 +352,61 @@ def is_good_candidate(self, candid, finishedtasks):
329352
return True
330353
return False
331354

355+
def emit_code_for_task(self, tid, lines):
356+
logging.debug("Submitting task " + str(self.idtotask[tid]))
357+
c = self.workflowspec['stages'][tid]['cmd']
358+
workdir = self.workflowspec['stages'][tid]['cwd']
359+
# in general:
360+
# try to make folder
361+
lines.append('[ ! -d ' + workdir + ' ] && mkdir ' + workdir + '\n')
362+
# cd folder
363+
lines.append('cd ' + workdir + '\n')
364+
# do command
365+
lines.append(c + '\n')
366+
# cd back
367+
lines.append('cd $OLDPWD\n')
368+
369+
370+
# produce a bash script that runs workflow standalone
371+
def produce_script(self, filename):
372+
# pick one of the correct task orderings
373+
taskorder = self.topological_orderings[0]
374+
outF = open(filename, "w")
375+
376+
lines=[]
377+
# header
378+
lines.append('#!/usr/bin/env bash\n')
379+
lines.append('#THIS FILE IS AUTOGENERATED\n')
380+
lines.append('JOBUTILS_SKIPDONE=ON\n')
381+
for tid in taskorder:
382+
print ('Doing task ' + self.idtotask[tid])
383+
self.emit_code_for_task(tid, lines)
384+
385+
outF.writelines(lines)
386+
outF.close()
387+
388+
332389
def execute(self):
390+
os.environ['JOBUTILS_SKIPDONE'] = "ON"
391+
# some maintenance / init work
392+
if args.list_tasks:
393+
print ('List of tasks in this workflow:')
394+
for i in self.workflowspec['stages']:
395+
print (i['name'])
396+
exit (0)
397+
398+
if args.produce_script:
399+
self.produce_script(args.produce_script)
400+
exit (0)
401+
402+
if args.rerun_from:
403+
if self.tasktoid.get(args.rerun_from)!=None:
404+
taskid=self.tasktoid[args.rerun_from]
405+
self.remove_done_flag(self.find_all_dependent_tasks(taskid))
406+
else:
407+
print('task ' + args.rerun_from + ' not found; cowardly refusing to do anything ')
408+
exit (1)
409+
333410
# main control loop
334411
currenttimeframe=1
335412
candidates = [ tid for tid in self.possiblenexttask[-1] ]
@@ -379,17 +456,21 @@ def execute(self):
379456
# let's assume 16GB
380457
max_system_mem=16*1024*1024*1024
381458

382-
parser = argparse.ArgumentParser(description='Parellel execution of a (O2-DPG) DAG data pipeline under resource contraints.')
459+
parser = argparse.ArgumentParser(description='Parallel execution of a (O2-DPG) DAG data/job pipeline under resource contraints.',
460+
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
461+
383462
parser.add_argument('-f','--workflowfile', help='Input workflow file name', required=True)
384463
parser.add_argument('-jmax','--maxjobs', help='number of maximal parallel tasks', default=100)
385464
parser.add_argument('--dry-run', action='store_true', help='show what you would do')
386465
parser.add_argument('--visualize-workflow', action='store_true', help='saves a graph visualization of workflow')
387466
parser.add_argument('--target-stages', help='Runs the pipeline by target labels (example "TPC" or "digi")')
467+
parser.add_argument('--produce-script', help='Produces a shell script that runs the workflow in serialized manner and quits.', default='workflow_script.sh')
468+
parser.add_argument('--rerun-from', help='Reruns the workflow starting from given task. All dependent jobs will be rerun.')
469+
parser.add_argument('--list-tasks', help='Simply list all tasks by name and quit.', action='store_true')
388470

389-
parser.add_argument('--mem-limit', help='set memory limit as scheduling constraint', default=max_system_mem)
471+
parser.add_argument('--mem-limit', help='Set memory limit as scheduling constraint', default=max_system_mem)
390472
args = parser.parse_args()
391473
print (args)
392-
print (args.workflowfile)
393474

394475
logging.basicConfig(filename='example.log', filemode='w', level=logging.DEBUG)
395476
executor=WorkflowExecutor(args.workflowfile,jmax=args.maxjobs,args=args)

0 commit comments

Comments
 (0)