Skip to content

Commit 1e1726c

Browse files
committed
Prototype workflow_runner system
This brings a python tool as an initial proposal to run DPG workflow files.
1 parent bd94ce1 commit 1e1726c

File tree

2 files changed

+485
-0
lines changed

2 files changed

+485
-0
lines changed

MC/bin/o2_dpg_workflow_runner.py

Lines changed: 378 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,378 @@
1+
#!/usr/bin/env python3
2+
import re
3+
import subprocess
4+
import shlex
5+
import time
6+
import json
7+
import logging
8+
import os
9+
from graphviz import Digraph
10+
11+
#
12+
# Code section to find all topological orderings
13+
# of a DAG. This is used to know when we can schedule
14+
# things in parallel.
15+
#
16+
17+
# class to represent a graph object
18+
class Graph:
19+
20+
# Constructor
21+
def __init__(self, edges, N):
22+
23+
# A List of Lists to represent an adjacency list
24+
self.adjList = [[] for _ in range(N)]
25+
26+
# stores in-degree of a vertex
27+
# initialize in-degree of each vertex by 0
28+
self.indegree = [0] * N
29+
30+
# add edges to the undirected graph
31+
for (src, dest) in edges:
32+
33+
# add an edge from source to destination
34+
self.adjList[src].append(dest)
35+
36+
# increment in-degree of destination vertex by 1
37+
self.indegree[dest] = self.indegree[dest] + 1
38+
39+
40+
# Recursive function to find all topological orderings of a given DAG
41+
def findAllTopologicalOrders(graph, path, discovered, N, allpaths):
42+
if len(allpaths) >= 2000:
43+
# print ('More than 2000 paths found')
44+
return
45+
46+
# do for every vertex
47+
for v in range(N):
48+
49+
# proceed only if in-degree of current node is 0 and
50+
# current node is not processed yet
51+
if graph.indegree[v] == 0 and not discovered[v]:
52+
53+
# for every adjacent vertex u of v, reduce in-degree of u by 1
54+
for u in graph.adjList[v]:
55+
graph.indegree[u] = graph.indegree[u] - 1
56+
57+
# include current node in the path and mark it as discovered
58+
path.append(v)
59+
discovered[v] = True
60+
61+
# recur
62+
findAllTopologicalOrders(graph, path, discovered, N, allpaths)
63+
64+
# backtrack: reset in-degree information for the current node
65+
for u in graph.adjList[v]:
66+
graph.indegree[u] = graph.indegree[u] + 1
67+
68+
# backtrack: remove current node from the path and
69+
# mark it as undiscovered
70+
path.pop()
71+
discovered[v] = False
72+
73+
# record valid ordering
74+
if len(path) == N:
75+
allpaths.append(path.copy())
76+
77+
78+
# get all topological orderings of a given DAG as a list
79+
def printAllTopologicalOrders(graph):
80+
# get number of nodes in the graph
81+
N = len(graph.adjList)
82+
83+
# create an auxiliary space to keep track of whether vertex is discovered
84+
discovered = [False] * N
85+
86+
# list to store the topological order
87+
path = []
88+
allpaths = []
89+
# find all topological ordering and print them
90+
findAllTopologicalOrders(graph, path, discovered, N, allpaths)
91+
return allpaths
92+
93+
# wrapper taking some edges, constructing the graph,
94+
# obtain all topological orderings and some other helper data structures
95+
def analyseGraph(edges, nodes):
96+
# Number of nodes in the graph
97+
N = len(nodes)
98+
99+
100+
# candidate list trivial
101+
nextjobtrivial = { n:[] for n in nodes }
102+
# startnodes
103+
nextjobtrivial[-1] = nodes
104+
for e in edges:
105+
nextjobtrivial[e[0]].append(e[1])
106+
if nextjobtrivial[-1].count(e[1]):
107+
nextjobtrivial[-1].remove(e[1])
108+
109+
# find topological orderings of the graph -> not used for moment
110+
# create a graph from edges
111+
# graph = Graph(edges, N)
112+
# allorderings = printAllTopologicalOrders(graph)
113+
allorderings=[[]]
114+
# find out "can be followed by" for each node
115+
# can be followed does not mean that all requirements are met though
116+
# nextjob={}
117+
# for plan in allorderings:
118+
# previous = -1 # means start
119+
# for e in plan:
120+
# if nextjob.get(previous)!=None:
121+
# nextjob[previous].add(e)
122+
# else:
123+
# nextjob[previous]=set()
124+
# nextjob[previous].add(e)
125+
# previous=e
126+
127+
# print(nextjob)
128+
129+
return (allorderings, nextjobtrivial)
130+
131+
132+
def draw_workflow(workflowspec):
133+
dot = Digraph(comment='MC workflow')
134+
nametoindex={}
135+
index=0
136+
# nodes
137+
for node in workflowspec['stages']:
138+
name=node['name']
139+
nametoindex[name]=index
140+
dot.node(str(index), name)
141+
index=index+1
142+
143+
# edges
144+
for node in workflowspec['stages']:
145+
toindex = nametoindex[node['name']]
146+
for req in node['needs']:
147+
fromindex = nametoindex[req]
148+
dot.edge(str(fromindex), str(toindex))
149+
150+
dot.render('workflow.gv')
151+
152+
# builds the graph given a "taskuniverse" list
153+
# builds accompagnying structures tasktoid and idtotask
154+
def build_graph(taskuniverse, workflowspec):
155+
tasktoid={ t[0]['name']:i for i, t in enumerate(taskuniverse, 0) }
156+
print (tasktoid)
157+
158+
nodes = []
159+
edges = []
160+
for t in taskuniverse:
161+
nodes.append(tasktoid[t[0]['name']])
162+
for n in t[0]['needs']:
163+
edges.append((tasktoid[n], tasktoid[t[0]['name']]))
164+
165+
return (edges, nodes)
166+
167+
168+
# loads the workflow specification
169+
# returns a tuple of (all_topological_ordering, possible_next_job_dict, nodeset)
170+
def load_workflow(workflowfile):
171+
fp=open(workflowfile)
172+
workflowspec=json.load(fp)
173+
return workflowspec
174+
175+
176+
# builds topological orderings (for each timeframe)
177+
def build_topological_orderings(workflowspec):
178+
globaltaskuniverse = [ (l, i) for i, l in enumerate(workflowspec['stages'], 1) ]
179+
timeframeset = set( l['timeframe'] for l in workflowspec['stages'] )
180+
181+
# timeframes are independent so we can restrict graph to them
182+
# (this makes the graph analysis less computational/combinatorial)
183+
timeframe_task_universe = { tf:[ (l, i) for i, l in enumerate(workflowspec['stages'], 1) if (l['timeframe']==tf or l['timeframe']==-1) ] for tf in timeframeset if tf!=-1 }
184+
edges, nodes = build_graph(globaltaskuniverse, workflowspec)
185+
tup = analyseGraph(edges, nodes)
186+
#
187+
global_next_tasks = tup[1]
188+
189+
# weight can be anything ... for the moment we just prefer to stay within a timeframe
190+
def getweight(tid):
191+
return globaltaskuniverse[tid][0]['timeframe']
192+
193+
# introduce some initial weight as second component
194+
for key in global_next_tasks:
195+
global_next_tasks[key] = [ tid for tid in global_next_tasks[key] ]
196+
197+
task_weights = [ getweight(tid) for tid in range(len(globaltaskuniverse)) ]
198+
199+
print (global_next_tasks)
200+
return { 'nexttasks' : global_next_tasks, 'weights' : task_weights }
201+
202+
203+
#
204+
# functions for execution; encapsulated in a WorkflowExecutor class
205+
#
206+
class WorkflowExecutor:
207+
# Constructor
208+
def __init__(self, workflowfile, args, jmax=100):
209+
self.args=args
210+
self.workflowfile = workflowfile
211+
self.workflowspec = load_workflow(workflowfile)
212+
workflow = build_topological_orderings(self.workflowspec)
213+
if args.visualize_workflow:
214+
draw_workflow(self.workflowspec)
215+
self.possiblenexttask = workflow['nexttasks']
216+
self.taskweights = workflow['weights']
217+
print (self.possiblenexttask)
218+
self.taskuniverse = [ l['name'] for l in self.workflowspec['stages'] ]
219+
self.idtotask = [ 0 for l in self.taskuniverse ]
220+
self.tasktoid = {}
221+
for i in range(len(self.taskuniverse)):
222+
self.tasktoid[self.taskuniverse[i]]=i
223+
self.idtotask[i]=self.taskuniverse[i]
224+
225+
self.maxmemperid = [ self.workflowspec['stages'][tid]['resources']['mem'] for tid in range(len(self.taskuniverse)) ]
226+
self.curmembooked = 0
227+
self.memlimit = args.mem_limit # some configurable number
228+
self.procstatus = { tid:'ToDo' for tid in range(len(self.workflowspec['stages'])) }
229+
self.taskneeds= { t:set(self.getallrequirements(t)) for t in self.taskuniverse }
230+
self.stoponfailure = True
231+
self.max_jobs_parallel = int(jmax)
232+
self.scheduling_iteration = 0
233+
234+
def getallrequirements(self, t):
235+
l=[]
236+
for r in self.workflowspec['stages'][self.tasktoid[t]]['needs']:
237+
l.append(r)
238+
l=l+self.getallrequirements(r)
239+
return l
240+
241+
# submits a task as subprocess and records Popen instance
242+
def submit(self, tid):
243+
logging.debug("Submitting task " + str(self.idtotask[tid]))
244+
c = self.workflowspec['stages'][tid]['cmd']
245+
workdir = self.workflowspec['stages'][tid]['cwd']
246+
if not workdir=='':
247+
if os.path.exists(workdir) and not os.path.isdir(workdir):
248+
logging.error('Cannot create working dir ... some other resource exists already')
249+
return None
250+
251+
if not os.path.isdir(workdir):
252+
os.mkdir(workdir)
253+
254+
self.procstatus[tid]='Running'
255+
if args.dry_run:
256+
drycommand="echo \' " + str(self.scheduling_iteration) + " : would do " + str(self.workflowspec['stages'][tid]['name']) + "\'"
257+
return subprocess.Popen(['/bin/bash','-c',drycommand], cwd=workdir)
258+
259+
return subprocess.Popen(['/bin/bash','-c',c], cwd=workdir)
260+
261+
def ok_to_submit(self, tid):
262+
if self.curmembooked + self.maxmemperid[tid] < self.memlimit:
263+
return True
264+
else:
265+
return False
266+
267+
def try_job_from_candidates(self, taskcandidates, process_list):
268+
self.scheduling_iteration = self.scheduling_iteration + 1
269+
initialcandidates=taskcandidates.copy()
270+
for tid in initialcandidates:
271+
logging.debug ("trying to submit" + str(tid))
272+
if self.ok_to_submit(tid) and len(process_list) < self.max_jobs_parallel:
273+
p=self.submit(tid)
274+
if p!=None:
275+
self.curmembooked+=self.maxmemperid[tid]
276+
process_list.append((tid,p))
277+
taskcandidates.remove(tid)
278+
else:
279+
break
280+
281+
def stop_pipeline_and_exit(self, process_list):
282+
# kill all remaining jobs
283+
for p in process_list:
284+
p[1].kill()
285+
286+
exit(1)
287+
288+
def waitforany(self, process_list, finished):
289+
failuredetected = False
290+
for p in list(process_list):
291+
logging.debug ("polling" + str(p))
292+
returncode = 0
293+
if not self.args.dry_run:
294+
returncode = p[1].poll()
295+
if returncode!=None:
296+
logging.info ('Task' + str(self.idtotask[p[0]]) + ' finished with status ' + str(returncode))
297+
# account for cleared resources
298+
self.curmembooked-=self.maxmemperid[p[0]]
299+
self.procstatus[p[0]]='Done'
300+
finished.append(p[0])
301+
process_list.remove(p)
302+
if returncode!=0:
303+
failuredetected = True
304+
305+
if failuredetected and self.stoponfailure:
306+
logging.info('Stoping pipeline due to failure in a stage')
307+
self.stop_pipeline_and_exit(process_list)
308+
309+
# empty finished means we have to wait more
310+
return len(finished)==0
311+
312+
def is_good_candidate(self, candid, finishedtasks):
313+
if self.procstatus[candid] != 'ToDo':
314+
return False
315+
needs = set([self.tasktoid[t] for t in self.taskneeds[self.idtotask[candid]]])
316+
if set(finishedtasks).intersection(needs) == needs:
317+
return True
318+
return False
319+
320+
def execute(self):
321+
# main control loop
322+
currenttimeframe=1
323+
candidates = [ tid for tid in self.possiblenexttask[-1] ]
324+
325+
process_list=[] # list of tuples of nodes ids and Popen subprocess instances
326+
finishedtasks=[]
327+
while True:
328+
# sort candidate list occurding to task weights
329+
candidates = [ (tid, self.taskweights[tid]) for tid in candidates ]
330+
candidates.sort(key=lambda tup: tup[1])
331+
# remove weights
332+
candidates = [ tid for tid,_ in candidates ]
333+
334+
logging.debug(candidates)
335+
self.try_job_from_candidates(candidates, process_list)
336+
337+
finished = []
338+
while self.waitforany(process_list, finished):
339+
if not args.dry_run:
340+
time.sleep(1)
341+
else:
342+
time.sleep(0.01)
343+
344+
logging.debug("finished " + str( finished))
345+
finishedtasks=finishedtasks + finished
346+
347+
# someone returned
348+
# new candidates
349+
for tid in finished:
350+
if self.possiblenexttask.get(tid)!=None:
351+
potential_candidates=list(self.possiblenexttask[tid])
352+
for candid in potential_candidates:
353+
# try to see if this is really a candidate:
354+
if self.is_good_candidate(candid, finishedtasks) and candidates.count(candid)==0:
355+
candidates.append(candid)
356+
357+
logging.debug("New candidates " + str( candidates))
358+
359+
if len(candidates)==0 and len(process_list)==0:
360+
break
361+
362+
import argparse
363+
from psutil import virtual_memory
364+
365+
parser = argparse.ArgumentParser(description='Parellel execute of a DAG data pipeline under resource contraints.')
366+
parser.add_argument('-f','--workflowfile', help='Input workflow file name', required=True)
367+
parser.add_argument('-jmax','--maxjobs', help='number of maximal parallel tasks', default=100)
368+
parser.add_argument('--dry-run', action='store_true', help='show what you would do')
369+
parser.add_argument('--visualize-workflow', action='store_true', help='saves a graph visualization of workflow')
370+
parser.add_argument('--target-stages', help='Runs the pipeline by target labels (example "TPC" or "digi")')
371+
parser.add_argument('--mem-limit', help='set memory limit as scheduling constraint', default=virtual_memory().total)
372+
args = parser.parse_args()
373+
print (args)
374+
print (args.workflowfile)
375+
376+
logging.basicConfig(filename='example.log', filemode='w', level=logging.DEBUG)
377+
executor=WorkflowExecutor(args.workflowfile,jmax=args.maxjobs,args=args)
378+
executor.execute()

0 commit comments

Comments
 (0)