Skip to content

Commit 5b97ecc

Browse files
Benedikt Volkelsawenzel
authored andcommitted
Make analysis fully modular
* o2dpg_analysis_test_workflows wraps the analysis side of things now It can be run 1. as script where workflow file with only the analysis tasks is created 2. as a module to be used inside another workflow generation ==> analyses only need to be maintained in ine place * no analyses directly implemented in o2dpg_sim_workflow * changes in o2dpg_sim_workflow fully transparent, --include-analysis works as before and expected * change os.mkdir to os.makedirs in o2_dpg_workflow_runner to avoid the necessity of the user to create certain directories if the soecified cwd of a tasks is deeper than one directory
1 parent 189d3ad commit 5b97ecc

File tree

5 files changed

+144
-99
lines changed

5 files changed

+144
-99
lines changed

MC/analysis_testing/analysis_test.sh

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,8 @@ NTF=$(find ./ -name "tf*" -type d | wc | awk '//{print $1}')
2121
# RC1=$?
2222
# echo "EXIT 1: $RC1"
2323

24-
# run on the merged part
25-
wf_name="workflow_test_analysis.json"
26-
# remove if present...
27-
rm ${wf_name} 2>/dev/null
28-
# ...and recreate
29-
$O2DPG_ROOT/MC/analysis_testing/o2dpg_analysis_test_workflow.py -o ${wf_name}
3024
# run requested analysis
31-
$O2DPG_ROOT/MC/bin/o2_dpg_workflow_runner.py -f ${wf_name} -tt Analysis_${testanalysis}$ --rerun-from Analysis_${testanalysis}$
25+
$O2DPG_ROOT/MC/bin/o2_dpg_workflow_runner.py -f workflow.json -tt Analysis_${testanalysis}$ --rerun-from Analysis_${testanalysis}$
3226
RC2=$?
3327
echo "EXIT 2: $RC2"
3428

MC/analysis_testing/o2dpg_analysis_test_workflow.py

Lines changed: 124 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
print('ERROR: This needs O2, O2DPG and O2PHYICS loaded')
3333
sys.exit(1)
3434

35+
# dynamically import required utilities
3536
module_name = "o2dpg_workflow_utils"
3637
spec = importlib.util.spec_from_file_location(module_name, join(O2DPG_ROOT, "MC", "bin", "o2dpg_workflow_utils.py"))
3738
o2dpg_workflow_utils = importlib.util.module_from_spec(spec)
@@ -40,56 +41,158 @@
4041

4142
from o2dpg_workflow_utils import createTask, dump_workflow
4243

43-
44-
def create_ana_task(name, cmd, output_dir, input_aod, shmsegmentsize="--shm-segment-size 2000000000",
44+
# The default analysis tasks that can be created by this script
45+
ANALYSIS_DEFAULT = ("Efficiency", "EventTrackQA", "MCHistograms", "Validation", "PIDFull", "PWGMMMFT", "EventSelectionQA", "WeakDecayTutorial")
46+
# The default DPL JSON configuration to use
47+
CONFIGURATION_JSON_DEFAULT = "json://${O2DPG_ROOT}/MC/config/analysis_testing/json/analysis-testing.json"
48+
# Default analysis label to be put in the workflow JSON per analysis
49+
ANALYSIS_LABEL = "Analysis"
50+
# Default tuple of lists for QC upload task
51+
DEFAULT_ANALYSES_FOR_QC_UPLOAD = [("Efficiency", ("AnalysisResults.root",)),
52+
("EventTrackQA", ("AnalysisResults.root",)),
53+
("MCHistograms", ("AnalysisResults.root",)),
54+
("Validation", ("AnalysisResults.root",)),
55+
("PIDFull", ("AnalysisResults.root",)),
56+
("PWGMMMFT", ("AnalysisResults.root",)),
57+
("EventSelectionQA", ("AnalysisResults.root",))]
58+
59+
def full_ana_name(raw_ana_name):
60+
"""Make the standard name of the analysis how it should appear in the workflow"""
61+
return f"{ANALYSIS_LABEL}_{raw_ana_name}"
62+
63+
def create_ana_task(name, cmd, output_dir, input_aod, *, needs=None, shmsegmentsize="--shm-segment-size 2000000000",
4564
aodmemoryratelimit="--aod-memory-rate-limit 500000000",
4665
readers="--readers 1", extraarguments="-b"):
4766
"""Quick helper to create analysis task
67+
68+
This creates an analysis task from various arguments
69+
70+
Args:
71+
name: str
72+
desired analysis name
73+
cmd: str
74+
command line to run
75+
input_aod: str
76+
path to input AOD
77+
Keyword args (optional):
78+
needs: tuple, list
79+
list of other tasks to be run before
80+
shmsegmentsize: str
81+
O2/DPL argument string for shared mem size
82+
aodmemoryratelimit: str
83+
O2/DPL argument string for AOD memory rate limit
84+
readers: O2/DPL argument string
85+
number of readers
86+
extraarguments: str
87+
O2/DPL argument string for any other desired arguments to be added to the executed cmd
88+
Return:
89+
dict: the task dictionary
4890
"""
91+
# if another workflow want to use it from the outside, allow to inject dependencies before analyses can be run
92+
if needs is None:
93+
# set to empty list
94+
needs = []
4995
input_aod = f" --aod-file {abspath(input_aod)}"
50-
task = createTask(name=f"Analysis_{name}", cwd=join(output_dir, name), lab=["ANALYSIS", name], cpu=1, mem='2000')
96+
task = createTask(name=full_ana_name(name), cwd=join(output_dir, name), lab=[ANALYSIS_LABEL, name], cpu=1, mem='2000', needs=needs)
5197
task['cmd'] = f"{cmd} {shmsegmentsize} {aodmemoryratelimit} {readers} {input_aod} {extraarguments}"
5298
return task
5399

54-
55-
def run(args):
56-
57-
input_file = expanduser(args.input_file)
58-
output_dir = expanduser(args.analysis_dir)
59-
if not exists(output_dir):
60-
makedirs(output_dir)
61-
62-
workflow = []
63-
100+
def add_analysis_tasks(workflow, input_aod="./AO2D.root", output_dir="./Analysis", *, config_json=CONFIGURATION_JSON_DEFAULT, needs=None):
101+
"""Add default analyses to user workflow
102+
103+
Args:
104+
workflow: list
105+
list of tasks to add the analyses to
106+
input_aod: str
107+
path to AOD to be analysed
108+
output_dir: str
109+
top-level output directory under which the analysis is executed and potential results are saved
110+
Keyword arguments:
111+
config_json: str
112+
path to DPL JSON configuration
113+
needs: tuple, list
114+
list of other tasks to be run before
115+
"""
64116
# Efficiency
65-
workflow.append(create_ana_task("Efficiency", "o2-analysis-timestamp --configuration json://${O2DPG_ROOT}/MC/config/analysis_testing/json/analysis-testing.json | o2-analysis-trackextension --configuration json://${O2DPG_ROOT}/MC/config/analysis_testing/json/analysis-testing.json | o2-analysis-trackselection --configuration json://${O2DPG_ROOT}/MC/config/analysis_testing/json/analysis-testing.json | o2-analysis-event-selection --configuration json://${O2DPG_ROOT}/MC/config/QC/json/event-track-qa.json | o2-analysis-qa-efficiency --eff-mc 1 --eff-mc-pos 1 --eff-mc-neg 1 --configuration json://${O2DPG_ROOT}/MC/config/analysis_testing/json/analysis-testing.json ", output_dir, input_file))
117+
workflow.append(create_ana_task("Efficiency", f"o2-analysis-timestamp --configuration {config_json} | o2-analysis-trackextension --configuration {config_json} | o2-analysis-trackselection --configuration {config_json} | o2-analysis-event-selection --configuration {config_json} | o2-analysis-qa-efficiency --eff-mc 1 --eff-mc-pos 1 --eff-mc-neg 1 --configuration {config_json} ", output_dir, input_aod, needs=needs))
66118

67119
# Event and track QA
68-
workflow.append(create_ana_task("EventTrackQA", 'o2-analysis-timestamp --configuration json://${O2DPG_ROOT}/MC/config/analysis_testing/json/analysis-testing.json | o2-analysis-event-selection --configuration json://${O2DPG_ROOT}/MC/config/analysis_testing/json/analysis-testing.json | o2-analysis-trackextension --configuration json://${O2DPG_ROOT}/MC/config/analysis_testing/json/analysis-testing.json | o2-analysis-trackselection --configuration json://${O2DPG_ROOT}/MC/config/QC/json/event-track-qa.json | o2-analysis-qa-event-track --configuration json://${O2DPG_ROOT}/MC/config/analysis_testing/json/analysis-testing.json', output_dir, input_file))
120+
workflow.append(create_ana_task("EventTrackQA", f'o2-analysis-timestamp --configuration {config_json} | o2-analysis-event-selection --configuration {config_json} | o2-analysis-trackextension --configuration {config_json} | o2-analysis-trackselection --configuration {config_json} | o2-analysis-qa-event-track --configuration {config_json}', output_dir, input_aod, needs=needs))
69121

70122
# MCHistograms (no complex workflow / piping required atm)
71-
workflow.append(create_ana_task("MCHistograms", 'o2-analysistutorial-mc-histograms', output_dir, input_file))
123+
workflow.append(create_ana_task("MCHistograms", 'o2-analysistutorial-mc-histograms', output_dir, input_aod, needs=needs))
72124

73125
# Valitation (no complex workflow / piping required atm)
74-
workflow.append(create_ana_task("Validation", 'o2-analysis-validation', output_dir, input_file))
126+
workflow.append(create_ana_task("Validation", 'o2-analysis-validation', output_dir, input_aod, needs=needs))
75127

76128
# Full PID
77-
workflow.append(create_ana_task("PIDFull", 'o2-analysis-dq-table-maker-mc --configuration json://${O2DPG_ROOT}/MC/config/analysis_testing/json/analysis-testing.json --severity error --shm-segment-size 12000000000 --aod-writer-json aodWriterTempConfig.json | o2-analysis-timestamp --configuration json://${O2DPG_ROOT}/MC/config/analysis_testing/json/analysis-testing.json | o2-analysis-event-selection --configuration json://${O2DPG_ROOT}/MC/config/analysis_testing/json/analysis-testing.json | o2-analysis-multiplicity-table --configuration json://${O2DPG_ROOT}/MC/config/analysis_testing/json/analysis-testing.json | o2-analysis-trackselection --configuration json://${O2DPG_ROOT}/MC/config/analysis_testing/json/analysis-testing.json | o2-analysis-trackextension --configuration json://${O2DPG_ROOT}/MC/config/analysis_testing/json/analysis-testing.json | o2-analysis-pid-tof --configuration json://${O2DPG_ROOT}/MC/config/analysis_testing/json/analysis-testing.json | o2-analysis-pid-tof-full --configuration json://${O2DPG_ROOT}/MC/config/analysis_testing/json/analysis-testing.json | o2-analysis-pid-tof-beta --configuration json://${O2DPG_ROOT}/MC/config/analysis_testing/json/analysis-testing.json | o2-analysis-pid-tpc-full --configuration json://${O2DPG_ROOT}/MC/config/analysis_testing/json/analysis-testing.json', output_dir, input_file))
129+
workflow.append(create_ana_task("PIDFull", f'o2-analysis-dq-table-maker-mc --configuration {config_json} --severity error --shm-segment-size 12000000000 --aod-writer-json aodWriterTempConfig.json | o2-analysis-timestamp --configuration {config_json} | o2-analysis-event-selection --configuration {config_json} | o2-analysis-multiplicity-table --configuration {config_json} | o2-analysis-trackselection --configuration {config_json} | o2-analysis-trackextension --configuration {config_json} | o2-analysis-pid-tof --configuration {config_json} | o2-analysis-pid-tof-full --configuration {config_json} | o2-analysis-pid-tof-beta --configuration {config_json} | o2-analysis-pid-tpc-full --configuration {config_json}', output_dir, input_aod, needs=needs))
78130

79131
# PWGMM MFT dNdeta
80-
workflow.append(create_ana_task("PWGMMMFT", 'o2-analysis-timestamp --configuration json://${O2DPG_ROOT}/MC/config/analysis_testing/json/analysis-testing.json | o2-analysis-trackselection --configuration json://${O2DPG_ROOT}/MC/config/analysis_testing/json/analysis-testing.json | o2-analysis-trackextension --configuration json://${O2DPG_ROOT}/MC/config/analysis_testing/json/analysis-testing.json | o2-analysis-event-selection --configuration json://${O2DPG_ROOT}/MC/config/analysis_testing/json/analysis-testing.json | o2-analysis-multiplicity-table --configuration json://${O2DPG_ROOT}/MC/config/analysis_testing/json/analysis-testing.json | o2-analysis-trackselection --configuration json://${O2DPG_ROOT}/MC/config/analysis_testing/json/analysis-testing.json | o2-analysis-mm-dndeta-mft --configuration json://${O2DPG_ROOT}/MC/config/analysis_testing/json/analysis-testing.json', output_dir, input_file))
132+
workflow.append(create_ana_task("PWGMMMFT", f'o2-analysis-timestamp --configuration {config_json} | o2-analysis-trackselection --configuration {config_json} | o2-analysis-trackextension --configuration {config_json} | o2-analysis-event-selection --configuration {config_json} | o2-analysis-multiplicity-table --configuration {config_json} | o2-analysis-trackselection --configuration {config_json} | o2-analysis-mm-dndeta-mft --configuration {config_json}', output_dir, input_aod, needs=needs))
133+
134+
# Event selection QA
135+
workflow.append(create_ana_task("EventSelectionQA", f'o2-analysis-timestamp --configuration {config_json} | o2-analysis-event-selection --configuration {config_json} | o2-analysis-event-selection-qa --configuration {config_json}', output_dir, input_aod, needs=needs))
81136

82137
# weak decay tutorial task (no complex workflow / piping required atm), NOTE: produces no output
83-
workflow.append(create_ana_task("WeakDecayTutorial", 'o2-analysistutorial-weak-decay-iteration', output_dir, input_file))
138+
workflow.append(create_ana_task("WeakDecayTutorial", 'o2-analysistutorial-weak-decay-iteration', output_dir, input_aod, needs=needs))
84139

85-
dump_workflow(workflow, args.output)
140+
def add_analysis_qc_upload_tasks(workflow, prodcution_tag, run_number, *ana_tasks_expected_outputs):
141+
"""add o2-qc-upload-root-objects to specified analysis tasks
142+
143+
The analysis name has simply to be present in the workflow. Then adding these upload tasks works
144+
for any analysis because it does not have to have any knowledge about the analysis.
145+
146+
Args:
147+
workflow: list
148+
current list of tasks
149+
ana_tasks_expected_outputs: list of tuples
150+
[(AnalysisName_1, (expected_output_1_1, expected_output_1_2, ...)), ..., (AnalysisName_N, (expected_output_N_1, expected_output_N_2, ...)) ]
151+
"""
152+
if not ana_tasks_expected_outputs:
153+
ana_tasks_expected_outputs = DEFAULT_ANALYSES_FOR_QC_UPLOAD
154+
155+
for ana_name_raw, expcted_outputs in ana_tasks_expected_outputs:
156+
ana_name = full_ana_name(ana_name_raw)
157+
for pot_ana in workflow:
158+
# search through workflow stages if we can find the requested analysis
159+
if pot_ana["name"] != ana_name:
160+
continue
161+
print(f"Adding QC upload task for analysis {ana_name_raw}")
162+
cwd = pot_ana["cwd"]
163+
qc_tag = f"Analysis{ana_name_raw}"
164+
needs = [ana_name]
165+
for eo in expcted_outputs:
166+
# this seems unnecessary but to ensure backwards compatible behaviour...
167+
rename_output = eo.strip(".root")
168+
rename_output = f"{rename_output}_{ana_name_raw}.root"
169+
# add upload task for each expected output file
170+
task = createTask(name=f"{ANALYSIS_LABEL}_finalize_{ana_name_raw}_{rename_output}", cwd=cwd, lab=[f"{ANALYSIS_LABEL}Upload", ana_name_raw], cpu=1, mem='2000', needs=needs)
171+
# This has now to be renamed for upload, as soon as that is done, the output is renamed back to its original, there is in general no point of renaming it on disk only because one specific tasks needs a renamed version of it
172+
rename_cmd = f"mv {eo} {rename_output}"
173+
rename_back_cmd = f"mv {rename_output} {eo}"
174+
task["cmd"] = f"{rename_cmd} && o2-qc-upload-root-objects --input-file ./{rename_output} --qcdb-url ccdb-test.cern.ch:8080 --task-name Analysis{ana_name_raw} --detector-code AOD --provenance qc_mc --pass-name passMC --period-name {prodcution_tag} --run-number {run_number} && {rename_back_cmd} "
175+
workflow.append(task)
176+
177+
def run(args):
178+
"""digetsing what comes from the command line"""
179+
output_dir = expanduser(args.analysis_dir)
180+
if not exists(output_dir):
181+
makedirs(output_dir)
86182

183+
workflow = []
184+
add_analysis_tasks(workflow, args.input_file, output_dir)
185+
if args.with_qc_upload:
186+
add_analysis_qc_upload_tasks(workflow, args.with_qc_upload[0], args.with_qc_upload[1])
187+
dump_workflow(workflow, args.output)
87188

88189
def main():
190+
"""entry point when run directly from command line"""
89191
parser = argparse.ArgumentParser(description='Create analysi test workflow')
90192
parser.add_argument("-f", "--input-file", dest="input_file", default="./AO2D.root", help="full path to the AO2D input")
91193
parser.add_argument("-a", "--analysis-dir", dest="analysis_dir", default="./Analysis", help="the analysis output directory")
92194
parser.add_argument("-o", "--output", default="./workflow_analysis_test.json", help="the workflow output directory")
195+
parser.add_argument("--with-qc-upload", dest="with_qc_upload", nargs=2, help="2. args: production tag and run number number")
93196
parser.set_defaults(func=run)
94197

95198
args = parser.parse_args()

MC/bin/o2_dpg_workflow_runner.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -529,7 +529,7 @@ def submit(self, tid, nice=os.nice(0)):
529529
return None
530530

531531
if not os.path.isdir(workdir):
532-
os.mkdir(workdir)
532+
os.makedirs(workdir)
533533

534534
self.procstatus[tid]='Running'
535535
if args.dry_run:

0 commit comments

Comments
 (0)