|
1 | 1 | #!/usr/bin/env python3 |
| 2 | + |
| 3 | +# started February 2021, sandro.wenzel@cern.ch |
| 4 | + |
2 | 5 | import re |
3 | 6 | import subprocess |
4 | 7 | import shlex |
5 | 8 | import time |
6 | 9 | import json |
7 | 10 | import logging |
8 | 11 | import os |
9 | | -from graphviz import Digraph |
| 12 | +try: |
| 13 | + from graphviz import Digraph |
| 14 | + havegraphviz=True |
| 15 | +except ImportError: |
| 16 | + havegraphviz=False |
| 17 | + |
10 | 18 |
|
11 | 19 | # |
12 | 20 | # Code section to find all topological orderings |
@@ -130,6 +138,10 @@ def analyseGraph(edges, nodes): |
130 | 138 |
|
131 | 139 |
|
132 | 140 | def draw_workflow(workflowspec): |
| 141 | + if not havegraphviz: |
| 142 | + print('graphviz not installed, cannot draw workflow') |
| 143 | + return |
| 144 | + |
133 | 145 | dot = Digraph(comment='MC workflow') |
134 | 146 | nametoindex={} |
135 | 147 | index=0 |
@@ -360,15 +372,21 @@ def execute(self): |
360 | 372 | break |
361 | 373 |
|
362 | 374 | import argparse |
363 | | -from psutil import virtual_memory |
364 | | - |
365 | | -parser = argparse.ArgumentParser(description='Parellel execute of a DAG data pipeline under resource contraints.') |
| 375 | +try: |
| 376 | + from psutil import virtual_memory |
| 377 | + max_system_mem=virtual_memory().total |
| 378 | +except ImportError: |
| 379 | + # let's assume 16GB |
| 380 | + max_system_mem=16*1024*1024*1024 |
| 381 | + |
| 382 | +parser = argparse.ArgumentParser(description='Parellel execution of a (O2-DPG) DAG data pipeline under resource contraints.') |
366 | 383 | parser.add_argument('-f','--workflowfile', help='Input workflow file name', required=True) |
367 | 384 | parser.add_argument('-jmax','--maxjobs', help='number of maximal parallel tasks', default=100) |
368 | 385 | parser.add_argument('--dry-run', action='store_true', help='show what you would do') |
369 | 386 | parser.add_argument('--visualize-workflow', action='store_true', help='saves a graph visualization of workflow') |
370 | 387 | parser.add_argument('--target-stages', help='Runs the pipeline by target labels (example "TPC" or "digi")') |
371 | | -parser.add_argument('--mem-limit', help='set memory limit as scheduling constraint', default=virtual_memory().total) |
| 388 | + |
| 389 | +parser.add_argument('--mem-limit', help='set memory limit as scheduling constraint', default=max_system_mem) |
372 | 390 | args = parser.parse_args() |
373 | 391 | print (args) |
374 | 392 | print (args.workflowfile) |
|
0 commit comments