Skip to content

Commit bd94ce1

Browse files
committed
python script as workflow.json generator
Demonstrator for a python that can generate a typical MC worklow. Produces a json format that can be run by a dedicated scheduler/runner which takes care of runtime optimizations. Demonstrates how embedding can be treated as an option.
1 parent d95e972 commit bd94ce1

File tree

1 file changed

+208
-0
lines changed

1 file changed

+208
-0
lines changed
Lines changed: 208 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,208 @@
1+
#!/usr/bin/env python3
2+
3+
#
4+
# A script producing a consistent MC->RECO->AOD workflow with optional embedding.
5+
#
6+
7+
import argparse
8+
from os import environ
9+
import json
10+
11+
parser = argparse.ArgumentParser(description='Create a PWGHF embedding pipeline')
12+
parser.add_argument('-nb',help='number of background events / timeframe', default=20)
13+
parser.add_argument('-ns',help='number of signal events / timeframe', default=20)
14+
parser.add_argument('-tf',help='number of timeframes', default=2)
15+
parser.add_argument('-j',help='number of workers (if applicable)', default=8)
16+
parser.add_argument('-e',help='simengine', default='TGeant4')
17+
parser.add_argument('-o',help='output workflow file', default='workflow.json')
18+
parser.add_argument('--embedding',help='whether to embedd into background', default=True)
19+
parser.add_argument('--noIPC',help='disable shared memory in DPL')
20+
args = parser.parse_args()
21+
print (args)
22+
23+
# make sure O2DPG + O2 is loaded
24+
O2DPG_ROOT=environ.get('O2DPG_ROOT')
25+
O2_ROOT=environ.get('O2_ROOT')
26+
27+
if O2DPG_ROOT == None:
28+
print('Error: This needs O2DPG loaded')
29+
# exit(1)
30+
31+
if O2_ROOT == None:
32+
print('Error: This needs O2 loaded')
33+
# exit(1)
34+
35+
# ----------- START WORKFLOW CONSTRUCTION -----------------------------
36+
37+
NSIGEVENTS=args.ns
38+
NTIMEFRAMES=int(args.tf)
39+
NWORKERS=args.j
40+
NBKGEVENTS=args.nb
41+
MODULES="--skipModules ZDC"
42+
SIMENGINE=args.e
43+
44+
workflow={}
45+
workflow['stages'] = []
46+
47+
taskcounter=0
48+
def createTask(name='', needs=[], tf=-1, cwd='./'):
49+
global taskcounter
50+
taskcounter = taskcounter + 1
51+
return { 'name': name, 'cmd':'', 'needs': needs, 'resources': { 'cpu': -1 , 'mem': -1 }, 'timeframe' : tf, 'labels' : [], 'cwd' : cwd }
52+
53+
def getDPL_global_options():
54+
if args.noIPC!=None:
55+
return "-b --run --no-IPC"
56+
return "-b --run --shm-segment-size ${SHMSIZE:-50000000000} --session " + str(taskcounter)
57+
58+
59+
doembedding=True if args.embedding=='True' or args.embedding==True else False
60+
61+
if doembedding:
62+
# ---- background transport task -------
63+
BKGtask=createTask(name='bkgsim')
64+
BKGtask['cmd']='o2-sim -e ' + SIMENGINE + ' -j ' + str(NWORKERS) + ' -n ' + str(NBKGEVENTS) + ' -g pythia8hi ' + str(MODULES) + ' -o bkg --configFile ${O2DPG_ROOT}/MC/config/common/ini/basic.ini'
65+
workflow['stages'].append(BKGtask)
66+
67+
# loop over timeframes
68+
for tf in range(1, NTIMEFRAMES + 1):
69+
timeframeworkdir='tf'+str(tf)
70+
71+
# ---- transport task -------
72+
# function encapsulating the signal sim part
73+
# first argument is timeframe id
74+
RNDSEED=0 # 0 means random seed !
75+
PTHATMIN=0. # [default = 0]
76+
PTHATMAX=-1. # [default = -1]
77+
78+
# produce the signal configuration
79+
SGN_CONFIG_task=createTask(name='gensgnconf_'+str(tf), tf=tf, cwd=timeframeworkdir)
80+
81+
SGN_CONFIG_task['cmd'] = '${O2DPG_ROOT}/MC/config/common/pythia8/utils/mkpy8cfg.py \
82+
--output=pythia8_'+ str(tf) +'.cfg \
83+
--seed='+str(RNDSEED)+' \
84+
--idA=2212 \
85+
--idB=2212 \
86+
--eCM=13000. \
87+
--process=ccbar \
88+
--ptHatMin=' + str(PTHATMIN) + ' \
89+
--ptHatMax=' + str(PTHATMAX)
90+
workflow['stages'].append(SGN_CONFIG_task)
91+
92+
if doembedding:
93+
# link background files to current working dir for this timeframe
94+
LinkBKGtask=createTask(name='linkbkg_'+str(tf), needs=[BKGtask['name']], tf=tf, cwd=timeframeworkdir)
95+
LinkBKGtask['cmd']='ln -s ../bkg*.root .'
96+
workflow['stages'].append(LinkBKGtask)
97+
98+
# transport signals
99+
signalprefix='sgn_' + str(tf)
100+
signalneeds=[ SGN_CONFIG_task['name'] ]
101+
embeddinto= "--embedIntoFile bkg_Kine.root" if doembedding else ""
102+
if doembedding:
103+
signalneeds = signalneeds + [ BKGtask['name'], LinkBKGtask['name'] ]
104+
SGNtask=createTask(name='sgnsim_'+str(tf), needs=signalneeds, tf=tf, cwd='tf'+str(tf))
105+
#SGNtask['cmd']='o2-sim -e '+str(SIMENGINE) + ' ' + str(MODULES) + ' -n ' + str(NSIGEVENTS) + ' -j ' + str(NWORKERS) + ' -g extgen \
106+
# --configFile ${O2DPG_ROOT}/MC/config/PWGHF/ini/GeneratorHF.ini \
107+
# --configKeyValues \"GeneratorPythia8.config=pythia8_'+ str(tf) +'.cfg\"' \
108+
# + ' -o ' + signalprefix + ' ' + embeddinto
109+
SGNtask['cmd']='o2-sim -e '+str(SIMENGINE) + ' ' + str(MODULES) + ' -n ' + str(NSIGEVENTS) + ' -j ' + str(NWORKERS) + ' -g pythia8 '\
110+
+ ' -o ' + signalprefix + ' ' + embeddinto
111+
workflow['stages'].append(SGNtask)
112+
113+
# some tasks further below still want geometry + grp in fixed names, so we provide it here
114+
# Alternatively, since we have timeframe isolation, we could just work with standard o2sim_ files
115+
LinkGRPFileTask=createTask(name='linkGRP_'+str(tf), needs=[SGNtask['name']], tf=tf, cwd=timeframeworkdir)
116+
LinkGRPFileTask['cmd']='ln -s ' + signalprefix + '_grp.root o2sim_grp.root ; ln -s ' + signalprefix + '_geometry.root o2sim_geometry.root'
117+
workflow['stages'].append(LinkGRPFileTask)
118+
119+
120+
CONTEXTFILE='collisioncontext.root'
121+
122+
simsoption=' --sims ' + ('bkg,'+signalprefix if doembedding else signalprefix)
123+
TPCDigitask=createTask(name='tpcdigi_'+str(tf), needs=[SGNtask['name'], LinkGRPFileTask['name']], tf=tf, cwd=timeframeworkdir)
124+
TPCDigitask['cmd'] = 'o2-sim-digitizer-workflow ' + getDPL_global_options() + ' -n ' + str(args.ns) + simsoption + ' --onlyDet TPC --interactionRate 50000 --tpc-lanes ' + str(NWORKERS) + ' --outcontext ' + str(CONTEXTFILE)
125+
workflow['stages'].append(TPCDigitask)
126+
127+
# The TRD digi task has a dependency on TPC only because of the digitization context (and because they both use CPU efficiently)
128+
# TODO: activate only if TRD present
129+
TRDDigitask=createTask(name='trddigi_'+str(tf), needs=[TPCDigitask['name']], tf=tf, cwd=timeframeworkdir)
130+
TRDDigitask['cmd'] = 'o2-sim-digitizer-workflow ' + getDPL_global_options() + ' -n ' + str(args.ns) + simsoption + ' --onlyDet TRD --interactionRate 50000 --configKeyValues \"TRDSimParams.digithreads=' + str(NWORKERS) + '\" --incontext ' + str(CONTEXTFILE)
131+
workflow['stages'].append(TRDDigitask)
132+
133+
RESTDigitask=createTask(name='restdigi_'+str(tf), needs=[TPCDigitask['name'], LinkGRPFileTask['name']], tf=tf, cwd=timeframeworkdir)
134+
RESTDigitask['cmd'] = 'o2-sim-digitizer-workflow ' + getDPL_global_options() + ' -n ' + str(args.ns) + simsoption + ' --skipDet TRD,TPC --interactionRate 50000 --incontext ' + str(CONTEXTFILE)
135+
workflow['stages'].append(RESTDigitask)
136+
137+
# -----------
138+
# reco
139+
# -----------
140+
141+
# TODO: check value for MaxTimeBin; A large value had to be set tmp in order to avoid crashes bases on "exceeding timeframe limit"
142+
TPCRECOtask=createTask(name='tpcreco_'+str(tf), needs=[TPCDigitask['name']], tf=tf, cwd=timeframeworkdir)
143+
TPCRECOtask['cmd'] = 'o2-tpc-reco-workflow ' + getDPL_global_options() + ' --tpc-digit-reader "--infile tpcdigits.root" --input-type digits --output-type clusters,tracks,send-clusters-per-sector --configKeyValues "GPU_global.continuousMaxTimeBin=100000;GPU_proc.ompThreads='+str(NWORKERS)+'"'
144+
workflow['stages'].append(TPCRECOtask)
145+
146+
ITSRECOtask=createTask(name='itsreco_'+str(tf), needs=[RESTDigitask['name']], tf=tf, cwd=timeframeworkdir)
147+
ITSRECOtask['cmd'] = 'o2-its-reco-workflow --trackerCA --tracking-mode async ' + getDPL_global_options()
148+
workflow['stages'].append(ITSRECOtask)
149+
150+
FT0RECOtask=createTask(name='ft0reco_'+str(tf), needs=[RESTDigitask['name']], tf=tf, cwd=timeframeworkdir)
151+
FT0RECOtask['cmd'] = 'o2-ft0-reco-workflow ' + getDPL_global_options()
152+
workflow['stages'].append(FT0RECOtask)
153+
154+
ITSTPCMATCHtask=createTask(name='itstpcMatch_'+str(tf), needs=[TPCRECOtask['name'], ITSRECOtask['name']], tf=tf, cwd=timeframeworkdir)
155+
ITSTPCMATCHtask['cmd']= 'o2-tpcits-match-workflow ' + getDPL_global_options() + ' --tpc-track-reader \"tpctracks.root\" --tpc-native-cluster-reader \"--infile tpc-native-clusters.root\"'
156+
workflow['stages'].append(ITSTPCMATCHtask)
157+
158+
# this can be combined with TRD digitization if benefical
159+
TRDTRAPtask = createTask(name='trdtrap_'+str(tf), needs=[TRDDigitask['name']], tf=tf, cwd=timeframeworkdir)
160+
TRDTRAPtask['cmd'] = 'o2-trd-trap-sim'
161+
workflow['stages'].append(TRDTRAPtask)
162+
163+
TRDTRACKINGtask = createTask(name='trdreco_'+str(tf), needs=[TRDTRAPtask['name'], ITSTPCMATCHtask['name'], TPCRECOtask['name'], ITSRECOtask['name']], tf=tf, cwd=timeframeworkdir)
164+
TRDTRACKINGtask['cmd'] = 'o2-trd-global-tracking'
165+
workflow['stages'].append(TRDTRACKINGtask)
166+
167+
TOFRECOtask = createTask(name='tofmatch_'+str(tf), needs=[ITSTPCMATCHtask['name'], RESTDigitask['name']], tf=tf, cwd=timeframeworkdir)
168+
TOFRECOtask['cmd'] = 'o2-tof-reco-workflow ' + getDPL_global_options()
169+
workflow['stages'].append(TOFRECOtask)
170+
171+
PVFINDERtask = createTask(name='pvfinder_'+str(tf), needs=[ITSTPCMATCHtask['name'], FT0RECOtask['name']], tf=tf, cwd=timeframeworkdir)
172+
PVFINDERtask['cmd'] = 'o2-primary-vertexing-workflow ' + getDPL_global_options()
173+
workflow['stages'].append(PVFINDERtask)
174+
175+
# -----------
176+
# produce AOD
177+
# -----------
178+
179+
# enable later. It still has memory access problems
180+
# taskwrapper aod_${tf}.log o2-aod-producer-workflow --aod-writer-keep dangling --aod-writer-resfile "AO2D" --aod-writer-resmode UPDATE --aod-timeframe-id ${tf} $gloOpt
181+
AODtask = createTask(name='aod_'+str(tf), needs=[PVFINDERtask['name'], TOFRECOtask['name'], TRDTRACKINGtask['name']], tf=tf, cwd=timeframeworkdir)
182+
AODtask['cmd'] = ' echo "Would do AOD (enable later)" '
183+
workflow['stages'].append(AODtask)
184+
185+
# cleanup step for this timeframe (we cleanup disc space early so as to make possible checkpoint dumps smaller)
186+
CLEANUPtask = createTask(name='cleanup_'+str(tf), needs=[AODtask['name']], tf=tf, cwd=timeframeworkdir)
187+
CLEANUPtask['cmd'] = ' echo "Doing cleanup" '
188+
workflow['stages'].append(CLEANUPtask)
189+
190+
191+
def trimString(cmd):
192+
return ' '.join(cmd.split())
193+
194+
# insert taskwrapper stuff
195+
for s in workflow['stages']:
196+
s['cmd']='. ${O2_ROOT}/share/scripts/jobutils.sh; taskwrapper ' + s['name']+'.log \'' + s['cmd'] + '\''
197+
198+
# remove whitespaces etc
199+
for s in workflow['stages']:
200+
s['cmd']=trimString(s['cmd'])
201+
202+
203+
# write workflow to json
204+
workflowfile=args.o
205+
with open(workflowfile, 'w') as outfile:
206+
json.dump(workflow, outfile, indent=2)
207+
208+
exit (0)

0 commit comments

Comments
 (0)