|
| 1 | +#!/usr/bin/env python3 |
| 2 | + |
| 3 | +import sys |
| 4 | +import time |
| 5 | +import argparse |
| 6 | +from os import environ, makedirs |
| 7 | +from os.path import join, expanduser, exists, dirname |
| 8 | +from os.path import split as ossplit |
| 9 | +from copy import deepcopy |
| 10 | +import array as arr |
| 11 | +import os |
| 12 | + |
| 13 | +# Creates a time anchored MC workflow; positioned within a given run-number (as function of production size etc) |
| 14 | + |
| 15 | +# Example: |
| 16 | +# ${O2DPG_ROOT}/MC/bin/o2dpg_sim_workflow_anchored.py -tf 500 --split-id ${s} --cycle ${cycle} --prod-split 100 --run-number 505600 \ |
| 17 | +# -- -gen pythia8 -eCM 900 -col pp -gen pythia8 -proc inel \ |
| 18 | +# -ns 22 -e TGeant4 \ |
| 19 | +# -j 8 -interactionRate 2000 \ |
| 20 | +# -field +2 \ |
| 21 | +# -confKey "Diamond.width[2]=6" |
| 22 | +# (the first set of arguments is used to determine anchoring point; the second set of arguments are passed forward to workflow creation) |
| 23 | + |
| 24 | + |
| 25 | +# this is PyROOT; enables reading ROOT C++ objects |
| 26 | +from ROOT import o2, TFile, TString, std |
| 27 | + |
| 28 | + |
| 29 | +# these need to go into a module / support layer |
| 30 | +class CCDBAccessor: |
| 31 | + def __init__(self, url): |
| 32 | + # This is used for some special operations |
| 33 | + self.api = o2.ccdb.CcdbApi() |
| 34 | + self.api.init(url) |
| 35 | + |
| 36 | + # this is used for the actual fetching for now |
| 37 | + o2.ccdb.BasicCCDBManager.instance().setURL(url) |
| 38 | + |
| 39 | + def list(self, path, dump_path=None): |
| 40 | + ret = self.api.list(path, False, "application/json") |
| 41 | + ret = json.loads(ret) |
| 42 | + if ret and "objects" in ret: |
| 43 | + ret = ret["objects"] |
| 44 | + if ret and dump_path: |
| 45 | + print(f"CCDB object information for path {path} stored in {dump_path}") |
| 46 | + dump_json(ret, dump_path) |
| 47 | + return ret |
| 48 | + |
| 49 | + def fetch(self, path, obj_type, timestamp=None, meta_info=None): |
| 50 | + """ |
| 51 | + TODO We could use CcdbApi::snapshot at some point, needs revision |
| 52 | + """ |
| 53 | + |
| 54 | + if not timestamp: |
| 55 | + timestamp = self.mgr.getTimestamp() |
| 56 | + else: |
| 57 | + self.mgr.setTimestamp(timestamp) |
| 58 | + |
| 59 | + if not meta_info: |
| 60 | + obj = self.mgr.get[obj_type](path) |
| 61 | + else: |
| 62 | + obj = self.mgr.getSpecific[obj_type](path, meta_info) |
| 63 | + |
| 64 | + return timestamp, obj |
| 65 | + |
| 66 | + def fetch_header(self, path, timestamp=None): |
| 67 | + meta_info = std.map["std::string", "std::string"]() |
| 68 | + if timestamp is None: |
| 69 | + timestamp = -1 |
| 70 | + header = self.api.retrieveHeaders(path, meta_info, timestamp) |
| 71 | + return header |
| 72 | + |
| 73 | + |
| 74 | + |
| 75 | +def retrieve_sor_eor(ccdbreader, run_number): |
| 76 | + """ |
| 77 | + retrieves start of run (sor) and end of run (eor) given a run number |
| 78 | + """ |
| 79 | + |
| 80 | + path_run_info = "RCT/RunInformation" |
| 81 | + header = ccdbreader.fetch_header(path_run_info, run_number) |
| 82 | + if not header: |
| 83 | + print(f"WARNING: Cannot find run information for run number {r}") |
| 84 | + return None |
| 85 | + # return this a dictionary |
| 86 | + return {"SOR": int(header["SOR"]), "EOR": int(header["EOR"])} |
| 87 | + |
| 88 | + |
| 89 | +def retrieve_GRP(ccdbreader, timestamp): |
| 90 | + """ |
| 91 | + retrieves the GRP for a given time stamp |
| 92 | + """ |
| 93 | + grp_path = "GLO/GRP/GRP" |
| 94 | + header = ccdbreader.fetch_header(grp_path, timestamp) |
| 95 | + if not header: |
| 96 | + print(f"WARNING: Could not download GRP object for timestamp {timestamp}") |
| 97 | + return None |
| 98 | + ts, grp = reader.fetch(grp_path, "o2::parameters::GRPObject", timestamp = timestamp) |
| 99 | + return grp |
| 100 | + |
| 101 | + |
| 102 | +def determine_timestamp(sor, eor, splitinfo, cycle, ntf): |
| 103 | + """ |
| 104 | + Determines the timestamp and production offset variable based |
| 105 | + on the global properties of the production (MC split, etc) and the properties |
| 106 | + of the run. ntf is the number of timeframes per MC job |
| 107 | + """ |
| 108 | + totaljobs = splitinfo[1] |
| 109 | + thisjobID = splitinfo[0] |
| 110 | + print (f"Start-of-run : {sor}") |
| 111 | + print (f"End-of-run : {eor}") |
| 112 | + time_length_inmus = 1000.*(eor - sor) # time length in micro seconds |
| 113 | + timestamp_delta = time_length_inmus / totaljobs |
| 114 | + HBF_per_timeframe = 256 # 256 orbits per timeframe --> should be taken from GRP or common constant in all O2DPG |
| 115 | + |
| 116 | + # this should be taken from the C++ code (via PyROOT and library access to these constants) |
| 117 | + LHCMaxBunches = 3564; # max N bunches |
| 118 | + LHCRFFreq = 400.789e6; # LHC RF frequency in Hz |
| 119 | + LHCBunchSpacingNS = 10 * 1.e9 / LHCRFFreq; # bunch spacing in ns (10 RFbuckets) |
| 120 | + LHCOrbitNS = LHCMaxBunches * LHCBunchSpacingNS; # orbit duration in ns |
| 121 | + LHCOrbitMUS = LHCOrbitNS * 1e-3; # orbit duration in \mus |
| 122 | + |
| 123 | + ntimeframes = time_length_inmus / (HBF_per_timeframe * LHCOrbitMUS) |
| 124 | + norbits = time_length_inmus / LHCOrbitMUS |
| 125 | + print (f"This run has space for {ntimeframes} timeframes") |
| 126 | + print (f"This run has {norbits} orbits") |
| 127 | + |
| 128 | + # ntimeframes is the total number of timeframes possible |
| 129 | + # if we have totaljobs number of jobs |
| 130 | + timeframesperjob = ntimeframes // totaljobs |
| 131 | + orbitsperjob = norbits // totaljobs |
| 132 | + print (f"Each job can do {timeframesperjob} maximally at a prod split of {totaljobs}") |
| 133 | + print (f"With each job doing {ntf} timeframes, this corresponds to a filling rate of ", ntf/timeframesperjob) |
| 134 | + maxcycles = timeframesperjob // ntf |
| 135 | + print (f"We can do this amount of cycle iterations to achieve 100%: ", maxcycles) |
| 136 | + |
| 137 | + return sor, int(thisjobID * maxcycles) + cycle |
| 138 | + |
| 139 | +def main(): |
| 140 | + parser = argparse.ArgumentParser(description='Creates an O2DPG simulation workflow, anchored to a given LHC run. The workflows are time anchored at regular positions within a run as a function of production size, split-id and cycle.') |
| 141 | + |
| 142 | + parser.add_argument("--run-number", type=int, help="Run number to anchor to", required=True) |
| 143 | + parser.add_argument("--ccdb-url", dest="ccdb_url", help="CCDB access RUL", default="http://alice-ccdb.cern.ch") |
| 144 | + parser.add_argument("--prod-split", type=int, help="The number of MC jobs that sample from the given time range",default=1) |
| 145 | + parser.add_argument("--cycle", type=int, help="MC cycle. Determines the sampling offset", default=0) |
| 146 | + parser.add_argument("--split-id", type=int, help="The split id of this job within the whole production --prod-split)", default=0) |
| 147 | + parser.add_argument("-tf", type=int, help="number of timeframes per job", default=1) |
| 148 | + parser.add_argument('forward', nargs=argparse.REMAINDER) # forward args passed to actual workflow creation |
| 149 | + args = parser.parse_args() |
| 150 | + |
| 151 | + # make a CCDB accessor object |
| 152 | + ccdbreader = CCDBAccessor(args.ccdb_url) |
| 153 | + # fetch the EOR/SOR |
| 154 | + sor_eor = retrieve_sor_eor(ccdbreader, args.run_number) |
| 155 | + if not sor_eor: |
| 156 | + print ("No time info found") |
| 157 | + sys.exit(1) |
| 158 | + |
| 159 | + # determine timestamp, and production offset for the final |
| 160 | + # MC job to run |
| 161 | + timestamp, prod_offset = determine_timestamp(sor_eor["SOR"], sor_eor["EOR"], [args.split_id, args.prod_split], args.cycle, args.tf) |
| 162 | + # this is anchored to |
| 163 | + print ("Determined timestamp to be : ", timestamp) |
| 164 | + print ("Determined offset to be : ", prod_offset) |
| 165 | + |
| 166 | + # we finally pass forward to the unanchored MC workflow creation |
| 167 | + # TODO: this needs to be done in a pythonic way clearly |
| 168 | + forwardargs = " ".join([ a for a in args.forward if a != '--' ]) + " -tf " + str(args.tf) + " --timestamp " + str(timestamp) + " --production-offset " + str(prod_offset) + " -run " + str(args.run_number) |
| 169 | + cmd = "${O2DPG_ROOT}/MC/bin/o2dpg_sim_workflow.py " + forwardargs |
| 170 | + print ("Creating time-anchored workflow...") |
| 171 | + os.system(cmd) |
| 172 | + # TODO: |
| 173 | + # - we can anchor many more things at this level: |
| 174 | + # * field |
| 175 | + # * interaction rate |
| 176 | + # * vertex position |
| 177 | + # * ... |
| 178 | + # - develop this into swiss-army tool: |
| 179 | + # * determine prod split based on sampling-fraction (support for production manager etc) |
| 180 | + |
| 181 | +if __name__ == "__main__": |
| 182 | + sys.exit(main()) |
0 commit comments