Skip to content

Commit 3104cdc

Browse files
committed
workflow runner: run specific targets or by labels
+ some cleanup
1 parent 4af3205 commit 3104cdc

File tree

2 files changed

+93
-35
lines changed

2 files changed

+93
-35
lines changed

MC/bin/o2_dpg_workflow_runner.py

Lines changed: 85 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@ def __init__(self, edges, N):
4848
# Recursive function to find all topological orderings of a given DAG
4949
def findAllTopologicalOrders(graph, path, discovered, N, allpaths, maxnumber=1):
5050
if len(allpaths) >= maxnumber:
51-
# print ('More than 2000 paths found')
5251
return
5352

5453
# do for every vertex
@@ -103,7 +102,6 @@ def printAllTopologicalOrders(graph, maxnumber=1):
103102
def analyseGraph(edges, nodes):
104103
# Number of nodes in the graph
105104
N = len(nodes)
106-
107105

108106
# candidate list trivial
109107
nextjobtrivial = { n:[] for n in nodes }
@@ -114,24 +112,10 @@ def analyseGraph(edges, nodes):
114112
if nextjobtrivial[-1].count(e[1]):
115113
nextjobtrivial[-1].remove(e[1])
116114

117-
# find topological orderings of the graph -> not used for moment
115+
# find topological orderings of the graph
118116
# create a graph from edges
119117
graph = Graph(edges, N)
120118
orderings = printAllTopologicalOrders(graph)
121-
# find out "can be followed by" for each node
122-
# can be followed does not mean that all requirements are met though
123-
# nextjob={}
124-
# for plan in allorderings:
125-
# previous = -1 # means start
126-
# for e in plan:
127-
# if nextjob.get(previous)!=None:
128-
# nextjob[previous].add(e)
129-
# else:
130-
# nextjob[previous]=set()
131-
# nextjob[previous].add(e)
132-
# previous=e
133-
134-
# print(nextjob)
135119

136120
return (orderings, nextjobtrivial)
137121

@@ -177,27 +161,90 @@ def build_graph(taskuniverse, workflowspec):
177161

178162

179163
# loads the workflow specification
180-
# returns a tuple of (all_topological_ordering, possible_next_job_dict, nodeset)
181164
def load_workflow(workflowfile):
182165
fp=open(workflowfile)
183166
workflowspec=json.load(fp)
184167
return workflowspec
185168

186169

170+
# filters the original workflowspec according to wanted targets or labels
171+
# returns a new workflowspec
172+
def filter_workflow(workflowspec, targets=[], targetlabels=[]):
173+
if len(targets)==0:
174+
return workflowspec
175+
if len(targetlabels)==0 and len(targets)==1 and targets[0]=="*":
176+
return workflowspec
177+
178+
transformedworkflowspec = workflowspec
179+
180+
def task_matches(t):
181+
for filt in targets:
182+
if filt=="*":
183+
return True
184+
if re.match(filt, t)!=None:
185+
return True
186+
return False
187+
188+
def task_matches_labels(t):
189+
# when no labels are given at all it's ok
190+
if len(targetlabels)==0:
191+
return True
192+
193+
for l in t['labels']:
194+
if targetlabels.count(l)!=0:
195+
return True
196+
return False
197+
198+
# The following sequence of operations works and is somewhat structured.
199+
# However, it builds lookups used elsewhere as well, so some CPU might be saved by reusing
200+
# some structures across functions or by doing less passes on the data.
201+
202+
# helper lookup
203+
tasknametoid = { t['name']:i for i, t in enumerate(workflowspec['stages'],0) }
204+
205+
# build full target list
206+
full_target_list = [ t for t in workflowspec['stages'] if task_matches(t['name']) and task_matches_labels(t) ]
207+
full_target_name_list = [ t['name'] for t in full_target_list ]
208+
209+
# build full dependency list for a task t
210+
def getallrequirements(t):
211+
_l=[]
212+
for r in t['needs']:
213+
fulltask = workflowspec['stages'][tasknametoid[r]]
214+
_l.append(fulltask)
215+
_l=_l+getallrequirements(fulltask)
216+
return _l
217+
218+
full_requirements_list = [ getallrequirements(t) for t in full_target_list ]
219+
220+
# make flat and fetch names only
221+
full_requirements_name_list = list(set([ item['name'] for sublist in full_requirements_list for item in sublist ]))
222+
223+
# inner "lambda" helper answering if a task "name" is needed by given targets
224+
def needed_by_targets(name):
225+
if full_target_name_list.count(name)!=0:
226+
return True
227+
if full_requirements_name_list.count(name)!=0:
228+
return True
229+
return False
230+
231+
# we finaly copy everything matching the targets as well
232+
# as all their requirements
233+
transformedworkflowspec['stages']=[ l for l in workflowspec['stages'] if needed_by_targets(l['name']) ]
234+
return transformedworkflowspec
235+
236+
187237
# builds topological orderings (for each timeframe)
188-
def build_topological_orderings(workflowspec):
238+
def build_dag_properties(workflowspec):
189239
globaltaskuniverse = [ (l, i) for i, l in enumerate(workflowspec['stages'], 1) ]
190240
timeframeset = set( l['timeframe'] for l in workflowspec['stages'] )
191241

192-
# timeframes are independent so we can restrict graph to them
193-
# (this makes the graph analysis less computational/combinatorial)
194-
timeframe_task_universe = { tf:[ (l, i) for i, l in enumerate(workflowspec['stages'], 1) if (l['timeframe']==tf or l['timeframe']==-1) ] for tf in timeframeset if tf!=-1 }
195242
edges, nodes = build_graph(globaltaskuniverse, workflowspec)
196243
tup = analyseGraph(edges, nodes)
197244
#
198245
global_next_tasks = tup[1]
199246

200-
# weight can be anything ... for the moment we just prefer to stay within a timeframe
247+
# weight influences scheduling order can be anything user defined ... for the moment we just prefer to stay within a timeframe
201248
def getweight(tid):
202249
return globaltaskuniverse[tid][0]['timeframe']
203250

@@ -220,7 +267,13 @@ def __init__(self, workflowfile, args, jmax=100):
220267
self.args=args
221268
self.workflowfile = workflowfile
222269
self.workflowspec = load_workflow(workflowfile)
223-
workflow = build_topological_orderings(self.workflowspec)
270+
self.workflowspec = filter_workflow(self.workflowspec, args.target_tasks, args.target_labels)
271+
272+
if len(self.workflowspec['stages']) == 0:
273+
print ('Workflow is empty. Nothing to do')
274+
exit (0)
275+
276+
workflow = build_dag_properties(self.workflowspec)
224277
if args.visualize_workflow:
225278
draw_workflow(self.workflowspec)
226279
self.possiblenexttask = workflow['nexttasks']
@@ -392,7 +445,7 @@ def execute(self):
392445
if args.list_tasks:
393446
print ('List of tasks in this workflow:')
394447
for i in self.workflowspec['stages']:
395-
print (i['name'])
448+
print (i['name'] + ' (' + str(i['labels']) + ')')
396449
exit (0)
397450

398451
if args.produce_script != None:
@@ -460,17 +513,19 @@ def execute(self):
460513
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
461514

462515
parser.add_argument('-f','--workflowfile', help='Input workflow file name', required=True)
463-
parser.add_argument('-jmax','--maxjobs', help='number of maximal parallel tasks', default=100)
464-
parser.add_argument('--dry-run', action='store_true', help='show what you would do')
465-
parser.add_argument('--visualize-workflow', action='store_true', help='saves a graph visualization of workflow')
466-
parser.add_argument('--target-stages', help='Runs the pipeline by target labels (example "TPC" or "digi")')
516+
parser.add_argument('-jmax','--maxjobs', help='Number of maximal parallel tasks.', default=100)
517+
parser.add_argument('--dry-run', action='store_true', help='Show what you would do.')
518+
parser.add_argument('--visualize-workflow', action='store_true', help='Saves a graph visualization of workflow.')
519+
parser.add_argument('--target-labels', nargs='+', help='Runs the pipeline by target labels (example "TPC" or "DIGI").\
520+
This condition is used as logical AND together with --target-tasks.', default=[])
521+
parser.add_argument('-tt','--target-tasks', nargs='+', help='Runs the pipeline by target tasks (example "tpcdigi"). By default everything in the graph is run. Regular expressions supported.', default=["*"])
467522
parser.add_argument('--produce-script', help='Produces a shell script that runs the workflow in serialized manner and quits.')
468523
parser.add_argument('--rerun-from', help='Reruns the workflow starting from given task. All dependent jobs will be rerun.')
469524
parser.add_argument('--list-tasks', help='Simply list all tasks by name and quit.', action='store_true')
470525

471526
parser.add_argument('--mem-limit', help='Set memory limit as scheduling constraint', default=max_system_mem)
472527
args = parser.parse_args()
473-
print (args)
528+
print (args)
474529

475530
logging.basicConfig(filename='example.log', filemode='w', level=logging.DEBUG)
476531
executor=WorkflowExecutor(args.workflowfile,jmax=args.maxjobs,args=args)

MC/doc/WorkflowRunner.md

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -123,14 +123,17 @@ Redo a certain task in the workflow and all its direct or indirect dependencies
123123
${O2DPG_ROOT}/MC/bin/o2_dpg_workflow_runner.py -f workflow.json --rerun-from tpcdigi_1
124124
```
125125

126-
## Future targeted features:
126+
Run workflow for targets matching trdtrap (regular expression works)
127+
```
128+
${O2DPG_ROOT}/MC/bin/o2_dpg_workflow_runner.py -f workflow.json --target-tasks trdtrap
129+
```
127130

128-
Run until everyting marked "RECO" is done
131+
Run everyting marked "RECO"
129132
```
130-
${O2DPG_ROOT}/MC/bin/o2_dpg_workflow_runner.py -f workflow.json --stages RECO
133+
${O2DPG_ROOT}/MC/bin/o2_dpg_workflow_runner.py -f workflow.json --target-stages RECO
131134
```
132135

133-
Rerun worflow until AOD, skipping all tasks already done
136+
Rerun worflow until AOD, skipping all tasks already done (task skipping is default)
134137
```
135-
${O2DPG_ROOT}/MC/bin/o2_dpg_workflow_runner.py -f workflow.json --stages AOD
138+
${O2DPG_ROOT}/MC/bin/o2_dpg_workflow_runner.py -f workflow.json --target-stages AOD
136139
```

0 commit comments

Comments
 (0)