6363import sys
6464import importlib .util
6565import argparse
66- from os import environ
66+ from os import environ , makedirs
6767from os .path import join , exists , abspath , expanduser
6868import json
6969
9090from o2dpg_analysis_test_utils import *
9191
9292
93- def create_ana_task (name , cmd , output_dir , * , needs = None , extraarguments = "-b" , is_mc = False ):
93+ def create_ana_task (name , cmd , output_dir , * , cpu = 1 , mem = '2000' , needs = None , extraarguments = "-b" , is_mc = False ):
9494 """Quick helper to create analysis task
9595
9696 This creates an analysis task from various arguments
@@ -114,7 +114,7 @@ def create_ana_task(name, cmd, output_dir, *, needs=None, extraarguments="-b", i
114114 if needs is None :
115115 # set to empty list
116116 needs = []
117- task = createTask (name = full_ana_name (name ), cwd = join (output_dir , name ), lab = [ANALYSIS_LABEL , name ], cpu = 1 , mem = '2000' , needs = needs )
117+ task = createTask (name = full_ana_name (name ), cwd = join (output_dir , name ), lab = [ANALYSIS_LABEL , name ], cpu = cpu , mem = mem , needs = needs )
118118 if is_mc :
119119 task ["labels" ].append (ANALYSIS_LABEL_ON_MC )
120120 task ['cmd' ] = f"{ cmd } { extraarguments } "
@@ -138,38 +138,6 @@ def load_analyses(analyses_only=None, include_disabled_analyses=False):
138138 return collect_analyses
139139
140140
141- def add_analysis_post_processing_tasks (workflow ):
142- """add post-processing step to analysis tasks if possible
143-
144- Args:
145- workflow: list
146- current list of tasks
147- """
148- analyses_to_add_for = {}
149- # collect analyses in current workflow
150- for task in workflow :
151- if ANALYSIS_LABEL in task ["labels" ]:
152- analyses_to_add_for [task ["name" ]] = task
153-
154- for ana in load_analyses (include_disabled_analyses = True ):
155- if not ana ["expected_output" ]:
156- continue
157- ana_name_raw = ana ["name" ]
158- post_processing_macro = join (O2DPG_ROOT , "MC" , "analysis_testing" , "post_processing" , f"{ ana_name_raw } .C" )
159- if not exists (post_processing_macro ):
160- continue
161- ana_name = full_ana_name (ana_name_raw )
162- if ana_name not in analyses_to_add_for :
163- continue
164- pot_ana = analyses_to_add_for [ana_name ]
165- cwd = pot_ana ["cwd" ]
166- needs = [ana_name ]
167- task = createTask (name = f"{ ANALYSIS_LABEL } _post_processing_{ ana_name_raw } " , cwd = join (cwd , "post_processing" ), lab = [ANALYSIS_LABEL , f"{ ANALYSIS_LABEL } PostProcessing" , ana_name_raw ], cpu = 1 , mem = '2000' , needs = needs )
168- input_files = "," .join ([f"../{ eo } " for eo in ana ["expected_output" ]])
169- cmd = f"\\ (\\ \" { input_files } \\ \" ,\\ \" ./\\ \" \\ )"
170- task ["cmd" ] = f"root -l -b -q { post_processing_macro } { cmd } "
171- workflow .append (task )
172-
173141def get_additional_workflows (input_aod ):
174142 additional_workflows = []
175143
@@ -207,7 +175,7 @@ def get_additional_workflows(input_aod):
207175 return additional_workflows
208176
209177
210- def add_analysis_tasks (workflow , input_aod = "./AO2D.root" , output_dir = "./Analysis" , * , analyses_only = None , is_mc = True , collision_system = None , needs = None , autoset_converters = False , include_disabled_analyses = False , timeout = None , add_common_args = None ):
178+ def add_analysis_tasks (workflow , input_aod = "./AO2D.root" , output_dir = "./Analysis" , * , analyses_only = None , is_mc = True , collision_system = None , needs = None , autoset_converters = False , include_disabled_analyses = False , timeout = None , split_analyses = False ):
211179 """Add default analyses to user workflow
212180
213181 Args:
@@ -238,38 +206,71 @@ def add_analysis_tasks(workflow, input_aod="./AO2D.root", output_dir="./Analysis
238206 data_or_mc = ANALYSIS_VALID_MC if is_mc else ANALYSIS_VALID_DATA
239207 collision_system = get_collision_system (collision_system )
240208
209+ # list of lists, each sub-list corresponds to one analysis pipe to be executed
210+ analysis_pipes = []
211+ # collect the names corresponding to analysis pipes
212+ analysis_names = []
213+ # cpu and mem of each task
214+ analysis_cpu_mem = []
215+ # a list of all tasks to be put together
216+ merged_analysis_pipe = additional_workflows .copy ()
217+ # cpu and mem of merged analyses
218+ merged_analysis_cpu_mem = [0 , 0 ]
219+
241220 for ana in load_analyses (analyses_only , include_disabled_analyses = include_disabled_analyses ):
242221 if is_mc and not ana .get ("valid_mc" , False ):
243222 print (f"INFO: Analysis { ana ['name' ]} not added since not valid in MC" )
244223 continue
245224 if not is_mc and not ana .get ("valid_data" , False ):
246225 print (f"INFO: Analysis { ana ['name' ]} not added since not valid in data" )
247226 continue
248-
249- configuration = get_configuration (ana ["name" ], data_or_mc , collision_system )
250- if not configuration :
251- print (f"INFO: Analysis { ana ['name' ]} excluded due to no valid configuration" )
227+ if analyses_only and ana ['name' ] not in analyses_only :
228+ # filter on analyses if requested
252229 continue
253- print (f"INFO: Analysis { ana ['name' ]} uses configuration { configuration } " )
254230
255- add_common_args_ana = get_common_args_as_string (ana , add_common_args )
256- if not add_common_args_ana :
257- print (f"ERROR: Cannot parse common args for analysis { ana ['name' ]} " )
231+ if split_analyses :
232+ # only the individual analyses, no merged
233+ analysis_pipes .append (ana ['tasks' ])
234+ analysis_names .append (ana ['name' ])
235+ analysis_cpu_mem .append ((1 , 2000 ))
258236 continue
259237
260- for i in additional_workflows :
261- if i not in ana ["tasks" ]:
262- # print("Appending extra task", i, "to analysis", ana["name"], "as it is not there yet and needed for conversion")
263- ana ["tasks" ].append (i )
264- piped_analysis = f" --configuration { configuration } | " .join (ana ["tasks" ])
265- piped_analysis += f" --configuration { configuration } --aod-file { input_aod } "
266- piped_analysis += add_common_args_ana
238+ merged_analysis_pipe .extend (ana ['tasks' ])
239+ # underestimate what a single analysis would take in the merged case.
240+ # Putting everything into one big pipe does not mean that the resources scale the same!
241+ merged_analysis_cpu_mem [0 ] += 0.5
242+ merged_analysis_cpu_mem [1 ] += 700
243+
244+ if not split_analyses :
245+ # add the merged analysis
246+ analysis_pipes .append (merged_analysis_pipe )
247+ analysis_names .append ('MergedAnalyses' )
248+ # take at least the resources estimated for a single analysis
249+ analysis_cpu_mem .append ((max (1 , merged_analysis_cpu_mem [0 ]), max (2000 , merged_analysis_cpu_mem [1 ])))
250+
251+ # now we need to create the output directory where we want the final configurations to go
252+ output_dir_config = join (output_dir , 'config' )
253+ if not exists (output_dir_config ):
254+ makedirs (output_dir_config )
255+
256+ configuration = adjust_and_get_configuration_path (data_or_mc , collision_system , output_dir_config )
257+
258+ for analysis_name , analysis_pipe , analysis_res in zip (analysis_names , analysis_pipes , analysis_cpu_mem ):
259+ # remove duplicates if they are there for nay reason (especially in the merged case)
260+ analysis_pipe = list (set (analysis_pipe ))
261+ analysis_pipe_assembled = []
262+ for executable_string in analysis_pipe :
263+ # the input executable might come already with some configurations, the very first token is the actual executable
264+ executable_string += f' --configuration json://{ configuration } '
265+ analysis_pipe_assembled .append (executable_string )
266+
267+ # put together, add AOD and timeout if requested
268+ analysis_pipe_assembled = ' | ' .join (analysis_pipe_assembled )
269+ analysis_pipe_assembled += f' --aod-file { input_aod } --shm-segment-size 3000000000 --readers 1 --aod-memory-rate-limit 500000000'
267270 if timeout is not None :
268- piped_analysis += f" --time-limit { timeout } "
269- workflow .append (create_ana_task (ana ["name" ], piped_analysis , output_dir , needs = needs , is_mc = is_mc ))
271+ analysis_pipe_assembled += f' --time-limit { timeout } '
270272
271- # append potential post-processing
272- add_analysis_post_processing_tasks (workflow )
273+ workflow .append (create_ana_task (analysis_name , analysis_pipe_assembled , output_dir , cpu = analysis_res [0 ], mem = analysis_res [1 ], needs = needs , is_mc = is_mc ))
273274
274275
275276def add_analysis_qc_upload_tasks (workflow , period_name , run_number , pass_name ):
@@ -300,7 +301,6 @@ def add_analysis_qc_upload_tasks(workflow, period_name, run_number, pass_name):
300301 # search through workflow stages if we can find the requested analysis
301302 pot_ana = analyses_to_add_for [ana_name ]
302303 cwd = pot_ana ["cwd" ]
303- qc_tag = f"Analysis{ ana_name_raw } "
304304 needs = [ana_name ]
305305 provenance = "qc_mc" if ANALYSIS_LABEL_ON_MC in pot_ana ["labels" ] else "qc"
306306 for eo in ana ["expected_output" ]:
@@ -325,7 +325,7 @@ def run(args):
325325 ### setup global environment variables which are valid for all tasks, set as first task
326326 global_env = {"ALICEO2_CCDB_CONDITION_NOT_AFTER" : args .condition_not_after } if args .condition_not_after else None
327327 workflow = [createGlobalInitTask (global_env )]
328- add_analysis_tasks (workflow , args .input_file , expanduser (args .analysis_dir ), is_mc = args .is_mc , analyses_only = args .only_analyses , autoset_converters = args .autoset_converters , include_disabled_analyses = args .include_disabled , timeout = args .timeout , collision_system = args .collision_system , add_common_args = args .add_common_args )
328+ add_analysis_tasks (workflow , args .input_file , expanduser (args .analysis_dir ), is_mc = args .is_mc , analyses_only = args .only_analyses , autoset_converters = args .autoset_converters , include_disabled_analyses = args .include_disabled , timeout = args .timeout , collision_system = args .collision_system , split_analyses = args .split_analyses )
329329 if args .with_qc_upload :
330330 add_analysis_qc_upload_tasks (workflow , args .period_name , args .run_number , args .pass_name )
331331 if not workflow :
@@ -351,8 +351,8 @@ def main():
351351 parser .add_argument ("--autoset-converters" , dest = "autoset_converters" , action = "store_true" , help = "Compatibility mode to automatically set the converters for the analysis" )
352352 parser .add_argument ("--timeout" , type = int , default = None , help = "Timeout for analysis tasks in seconds." )
353353 parser .add_argument ("--collision-system" , dest = "collision_system" , help = "Set the collision system. If not set, tried to be derived from ALIEN_JDL_LPMInterationType. Fallback to pp" )
354- parser .add_argument ("--add-common-args" , dest = "add_common_args" , nargs = "*" , help = "Pass additional common arguments per analysis, for instance --add-common-args EMCAL-shm-segment-size 2500000000 will add --shm-segment-size 2500000000 to the EMCAL analysis" )
355354 parser .add_argument ('--condition-not-after' , dest = "condition_not_after" , type = int , help = "only consider CCDB objects not created after this timestamp (for TimeMachine)" , default = 3385078236000 )
355+ parser .add_argument ('--split-analyses' , dest = 'split_analyses' , action = 'store_true' , help = 'Split into single analyses pipes to be executed.' )
356356
357357 parser .set_defaults (func = run )
358358 args = parser .parse_args ()
0 commit comments