Skip to content

Commit 13161c5

Browse files
committed
Topology parser: Add GEN_TOPO_OOM_WORKAROUND option to store DPL json on NFS cache and read it from there
1 parent 02c4aad commit 13161c5

File tree

2 files changed

+89
-64
lines changed

2 files changed

+89
-64
lines changed

DATA/common/setenv.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ if [ $SYNCMODE == 1 ] && [ $CTFINPUT == 1 ]; then
105105
echo SYNCMODE and CTFINPUT are incompatible
106106
exit 1
107107
fi
108-
if [ $WORKFLOWMODE != "run" ] && [ $WORKFLOWMODE != "print" ] && [ $WORKFLOWMODE != "dds" ]; then
108+
if [ $WORKFLOWMODE != "run" ] && [ $WORKFLOWMODE != "print" ] && [ $WORKFLOWMODE != "dds" ] && [ $WORKFLOWMODE != "dump" ]; then
109109
echo Invalid workflow mode
110110
exit 1
111111
fi

DATA/tools/parse

Lines changed: 88 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -4,49 +4,51 @@ import sys
44
import shlex
55
import tempfile
66
import re
7+
import datetime
8+
import random
79

8-
if not os.path.exists("tools/parse"):
9-
print("Running from incorrect directory")
10+
if not os.path.exists('tools/parse'):
11+
print('Running from incorrect directory')
1012
exit(1)
1113

1214
if 'EPNSYNCMODE' in os.environ and int(os.environ['EPNSYNCMODE']):
1315
sys.path.insert(0, '/usr/share/Modules/init')
1416
import python as mod
1517

1618
if len(sys.argv) != 4:
17-
print("Incorrect number of arguments provided, syntax is parse [description library file] [topology name] [output file name]")
19+
print('Incorrect number of arguments provided, syntax is parse [description library file] [topology name] [output file name]')
1820
exit(1)
1921

20-
if not "FILEWORKDIR" in os.environ:
21-
print("$FILEWORKDIR env variable missing")
22+
if not 'FILEWORKDIR' in os.environ:
23+
print('$FILEWORKDIR env variable missing')
2224
exit(1)
2325

24-
if not "DDWORKFLOW" in os.environ and not "DDMODE" in os.environ:
25-
print("Need either $DDWORKFLOW or $DDMODE env variable")
26+
if not 'DDWORKFLOW' in os.environ and not 'DDMODE' in os.environ:
27+
print('Need either $DDWORKFLOW or $DDMODE env variable')
2628
exit(1)
2729

2830
NO_PROCESSING_MODE=0
29-
if not "DDWORKFLOW" in os.environ:
30-
os.environ['DDWORKFLOW'] = "tools/datadistribution_workflows/dd-" + os.environ['DDMODE'] + ".xml"
31+
if not 'DDWORKFLOW' in os.environ:
32+
os.environ['DDWORKFLOW'] = 'tools/datadistribution_workflows/dd-' + os.environ['DDMODE'] + '.xml'
3133
if os.environ['DDMODE'] == 'discard' or os.environ['DDMODE'] == 'disk':
3234
NO_PROCESSING_MODE=1
3335

34-
print("Using topology", sys.argv[2], "of library", sys.argv[1])
36+
print('Using topology', sys.argv[2], 'of library', sys.argv[1])
3537

3638
if 'WORKFLOWMODE' in os.environ:
3739
if not os.environ['WORKFLOWMODE'] in ['dds', 'print']:
38-
print("Invalid WORKFLOWMODE provided")
40+
print('Invalid WORKFLOWMODE provided')
3941
raise
4042
else:
4143
os.environ['WORKFLOWMODE'] = 'dds'
4244

43-
if 'RECO_NUM_NODES_OVERRIDE' in os.environ and os.environ['RECO_NUM_NODES_OVERRIDE'] != "" and int(os.environ['RECO_NUM_NODES_OVERRIDE']) > 0:
45+
if 'RECO_NUM_NODES_OVERRIDE' in os.environ and os.environ['RECO_NUM_NODES_OVERRIDE'] != '' and int(os.environ['RECO_NUM_NODES_OVERRIDE']) > 0:
4446
reco_num_nodes_override = int(os.environ['RECO_NUM_NODES_OVERRIDE'])
4547
os.environ['RECO_NUM_NODES_WORKFLOW'] = str(reco_num_nodes_override)
4648
else:
4749
reco_num_nodes_override = 0
4850

49-
f = open(sys.argv[1], "r")
51+
f = open(sys.argv[1], 'r')
5052
for line in f:
5153
line = line.strip()
5254
if len(line) == 0:
@@ -55,81 +57,104 @@ for line in f:
5557
continue
5658
args = shlex.split(line)
5759
if len(args) <= 1:
58-
print("Toplogy must have at least name and O2 version")
60+
print('Toplogy must have at least name and O2 version')
5961
raise
6062
if len(args[0]) == 0:
61-
print("Empty topology name forbitten")
63+
print('Empty topology name forbitten')
6264
raise
6365
if not args[0].endswith(':'):
64-
print("Topology name ", args[0], "not followed by ':'")
66+
print('Topology name ', args[0], "not followed by ':'")
6567
raise
66-
if args[0] == sys.argv[2] + ":":
68+
if args[0] == sys.argv[2] + ':':
6769
reconodes = 0
6870
reconodesmin = 0
6971
recoworkflows = []
7072
calibworkflows = []
71-
print("Found topology", sys.argv[2], "-", args)
73+
print('Found topology', sys.argv[2], '-', args)
7274
if 'EPNSYNCMODE' in os.environ and int(os.environ['EPNSYNCMODE']) and (not 'GEN_TOPO_RUN_HOME' in os.environ or not int(os.environ['GEN_TOPO_RUN_HOME'])):
7375
restore_O2DPG_ROOT = 'O2DPG_ROOT' in os.environ
7476
if restore_O2DPG_ROOT:
7577
restore_O2DPG_ROOT_val = os.environ['O2DPG_ROOT']
76-
if 'OVERRIDE_PDPSUITE_VERSION' in os.environ and os.environ['OVERRIDE_PDPSUITE_VERSION'] != "":
78+
if 'OVERRIDE_PDPSUITE_VERSION' in os.environ and os.environ['OVERRIDE_PDPSUITE_VERSION'] != '':
7779
args[1] = os.environ['OVERRIDE_PDPSUITE_VERSION']
7880
for i in args[1].split():
79-
if "GEN_TOPO_CACHEABLE" in os.environ and os.environ['GEN_TOPO_CACHEABLE'] == "1":
80-
if i.find("/") == -1 or i.find("/latest") != -1:
81-
print("Must not use non-versioned module", i, "in cacheable workflow (i.e. with repository hash)")
81+
if 'GEN_TOPO_CACHEABLE' in os.environ and os.environ['GEN_TOPO_CACHEABLE'] == '1':
82+
if i.find('/') == -1 or i.find('/latest') != -1:
83+
print('Must not use non-versioned module', i, 'in cacheable workflow (i.e. with repository hash)')
8284
raise
83-
print("Loading module", i)
85+
print('Loading module', i)
8486
mod.module('load', i)
8587
if restore_O2DPG_ROOT:
8688
os.environ['O2DPG_ROOT'] = restore_O2DPG_ROOT_val
8789
if len(args) > 2 and not 'O2_ROOT' in os.environ:
88-
print("O2 not loaded")
90+
print('O2 not loaded')
8991
raise
90-
with tempfile.TemporaryDirectory(prefix="o2_workflow_") as tmpdir:
92+
with tempfile.TemporaryDirectory(prefix='o2_workflow_') as tmpdir:
9193
if NO_PROCESSING_MODE and len(args) > 2:
92-
print("Cannot use DPL workflow together with DD mode", os.environ['DDMODE'])
94+
print('Cannot use DPL workflow together with DD mode', os.environ['DDMODE'])
9395
raise
9496
for i in range(2, len(args)):
95-
filename = tmpdir + "/wf" + str(i) + ".dds"
96-
if args[i].startswith("reco"):
97-
wf = args[i].split(",", 3)
97+
filename = tmpdir + '/wf' + str(i) + '.dds'
98+
if args[i].startswith('reco'):
99+
wf = args[i].split(',', 3)
98100
recoworkflows.append(filename)
99-
elif args[i].startswith("calib"):
100-
wf = args[i].split(",", 2)
101+
elif args[i].startswith('calib'):
102+
wf = args[i].split(',', 2)
101103
filenamecore = filename
102-
# filenamecore = filenamecore+ ":" + wf[1] # Currently disabled, since odc-epn-topo does not accept :[ncores]
104+
# filenamecore = filenamecore+ ':' + wf[1] # Currently disabled, since odc-epn-topo does not accept :[ncores]
103105
wf.append(wf[2])
104-
wf[1] = "1";
106+
wf[1] = '1';
105107
wf[2] = wf[1]
106108
calibworkflows.append(filenamecore)
107109
else:
108-
print("Invalid workflow type", args[i])
110+
print('Invalid workflow type', args[i])
109111
raise
110-
print("Adding", wf[0], "workflow (", wf[2], "-", wf[1], "nodes):", wf[3])
112+
print('Adding', wf[0], 'workflow (', wf[2], '-', wf[1], 'nodes):', wf[3])
111113
reconodes = max(reconodes, int(wf[1]))
112114
reconodesmin = max(reconodesmin, int(wf[2]))
113115
if 'GEN_TOPO_IGNORE_ERROR' in os.environ and int(os.environ['GEN_TOPO_IGNORE_ERROR']):
114-
command_log_filter = "\"^\[\""
116+
command_log_filter = '"^\["'
115117
else:
116-
command_log_filter = "\"^\[INFO\""
117-
command = "GLOBALDPLOPT+=\" -b --dds-workflow-suffix _" + wf[0] + str(i) + "\" " + wf[3] + " | grep -v " + command_log_filter + " > " + filename + " && [ `grep \"^\[\" " + filename + " | wc -l` == 0 ]"
118-
print("Running DPL command", command)
118+
command_log_filter = '"^\[INFO"'
119+
command_preopt = ''
120+
if 'GEN_TOPO_OOM_WORKAROUND' in os.environ and int(os.environ['GEN_TOPO_OOM_WORKAROUND']):
121+
json_cache_path = os.environ['GEN_TOPO_WORKDIR'] + '/json_cache'
122+
filename_xml = filename
123+
if not 'GEN_TOPO_WORKDIR' in os.environ:
124+
print('$GEN_TOPO_WORKDIR not set')
125+
raise
126+
if not os.path.exists(json_cache_path):
127+
os.makedirs(json_cache_path, 0o770)
128+
filename = json_cache_path + '/dpl_tmp_' + datetime.datetime.now().strftime('%y-%m-%d-%H-%M-%S') + wf[0] + str(i) + '_' + str(random.randrange(100000000)) + '.json'
129+
command_preopt += ' WORKFLOWMODE=dump'
130+
command = command_preopt + ' GLOBALDPLOPT+=" -b --dds-workflow-suffix _' + wf[0] + str(i) + '" ' + wf[3] + ' | grep -v ' + command_log_filter + ' > ' + filename + ' && [ `grep "^\[" ' + filename + ' | wc -l` == 0 ]'
131+
print('Running DPL command', command)
119132
if reco_num_nodes_override == 0:
120133
os.environ['RECO_NUM_NODES_WORKFLOW'] = wf[1]
121134
if os.system(command) != 0:
122-
print("Error running command", command)
135+
print('Error running command', command)
123136
ftmp = open(filename, 'r')
124-
rg = re.compile("^<topology")
137+
rg = re.compile('^<topology')
125138
for line in ftmp:
126139
if re.match(rg, line):
127140
break
128141
print(line)
129142
raise
143+
if 'GEN_TOPO_OOM_WORKAROUND' in os.environ and int(os.environ['GEN_TOPO_OOM_WORKAROUND']):
144+
command = 'cat ' + filename + ' | o2-dpl-run -b --dds --dds-workflow-suffix _' + wf[0] + str(i) + ' --resources-monitoring 15 ' + ' | grep -v ' + command_log_filter + ' > ' + filename_xml
145+
print('Running DPL command', command)
146+
if os.system(command) != 0:
147+
print('Error running command', command)
148+
command = 'sed -i \'s,^\( *.exe.*sleep [0-9.]*;\).*\(|[^|]*./exe.\)$,\\1 cat ' + filename + ' \\2,\' ' + filename_xml
149+
os.system('cat ' + filename_xml)
150+
print('Running SED command', command)
151+
if os.system(command) != 0:
152+
print('Error running sed on XML file')
153+
os.system('cat ' + filename_xml)
154+
filename = filename_xml
130155
if reco_num_nodes_override > 0:
131156
reconodes = reco_num_nodes_override
132-
if 'RECO_MAX_FAIL_NODES_OVERRIDE' in os.environ and os.environ['RECO_MAX_FAIL_NODES_OVERRIDE'] != "":
157+
if 'RECO_MAX_FAIL_NODES_OVERRIDE' in os.environ and os.environ['RECO_MAX_FAIL_NODES_OVERRIDE'] != '':
133158
reconodesmin = max(1, reconodes - int(os.environ['RECO_NUM_NODES_OVERRIDE']))
134159
if os.environ['WORKFLOWMODE'] == 'dds':
135160
if 'GEN_TOPO_ODC_EPN_TOPO_CMD' in os.environ:
@@ -139,41 +164,41 @@ for line in f:
139164
if 'GEN_TOPO_ODC_EPN_TOPO_ARGS' in os.environ:
140165
odccommand += ' ' + os.environ['GEN_TOPO_ODC_EPN_TOPO_ARGS']
141166
if reconodes:
142-
replacestring = ""
167+
replacestring = ''
143168
dd_env_variables = ['DD_DISK_FRACTION']
144169
for i in dd_env_variables:
145170
if i in os.environ:
146-
replacestring += " " + i + "=" + os.environ[i]
147-
fddin = open(os.environ['DDWORKFLOW'], "rt")
148-
filename = tmpdir + "/wf_dd.dds";
149-
fddout = open(filename, "wt")
171+
replacestring += ' ' + i + '=' + os.environ[i]
172+
fddin = open(os.environ['DDWORKFLOW'], 'rt')
173+
filename = tmpdir + '/wf_dd.dds';
174+
fddout = open(filename, 'wt')
150175
for line in fddin:
151176
fddout.write(line.replace('GEN_TOPO_TFBUILDER_ENV_VARIABLES', replacestring))
152177
fddin.close()
153178
fddout.close()
154-
odccommand += " --dd " + filename
179+
odccommand += ' --dd ' + filename
155180
if len(recoworkflows):
156-
odccommand += " --reco " + " ".join(recoworkflows)
157-
odccommand += " --n " + str(reconodes)
158-
# odccommand += " --nmin " + str(reconodesmin) # Currently disabled, since odc-epn-topo does not accept --nmin
181+
odccommand += ' --reco ' + ' '.join(recoworkflows)
182+
odccommand += ' --n ' + str(reconodes)
183+
# odccommand += ' --nmin ' + str(reconodesmin) # Currently disabled, since odc-epn-topo does not accept --nmin
159184
if len(calibworkflows):
160-
odccommand += " --calib " + " ".join(calibworkflows)
161-
if "GEN_TOPO_STDERR_LOGGING" in os.environ and int(os.environ['GEN_TOPO_STDERR_LOGGING']):
162-
odccommand += " --mon tools/monitoring_workflows/epnstderrlog.xml"
163-
if args[1] != "":
164-
odccommand += " --prependexe \"module load " + args[1] + " 2>&1 ; \""
165-
odccommand += " -o " + sys.argv[3]
185+
odccommand += ' --calib ' + ' '.join(calibworkflows)
186+
if 'GEN_TOPO_STDERR_LOGGING' in os.environ and int(os.environ['GEN_TOPO_STDERR_LOGGING']):
187+
odccommand += ' --mon tools/monitoring_workflows/epnstderrlog.xml'
188+
if args[1] != '':
189+
odccommand += ' --prependexe "module load ' + args[1] + ' 2>&1 ; "'
190+
odccommand += ' -o ' + sys.argv[3]
166191
if os.system(odccommand) != 0:
167-
print("\nError running odc: ", odccommand)
192+
print('\nError running odc: ', odccommand)
168193
raise
169194
else:
170-
outf = open(sys.argv[3], "w+")
195+
outf = open(sys.argv[3], 'w+')
171196
for i in recoworkflows:
172-
outf.write("# RECO workflow\n\n" + open(i, 'r').read() + "\n\n")
197+
outf.write('# RECO workflow\n\n' + open(i, 'r').read() + '\n\n')
173198
for i in calibworkflows:
174-
outf.write("# CALIB workflow\n\n" + open(i, 'r').read() + "\n\n")
175-
print("Done")
199+
outf.write('# CALIB workflow\n\n' + open(i, 'r').read() + '\n\n')
200+
print('Done')
176201
exit(0)
177202

178-
print("Could not find workflow", sys.argv[2])
203+
print('Could not find workflow', sys.argv[2])
179204
exit(1)

0 commit comments

Comments
 (0)