forked from JianLIUhep/RCTutils
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathrct_verify_flag.py
More file actions
157 lines (123 loc) · 5.65 KB
/
rct_verify_flag.py
File metadata and controls
157 lines (123 loc) · 5.65 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
import requests
import json
import argparse
import urllib3
from datetime import datetime
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
def load_config(config_file):
"""Load the configuration file."""
with open(config_file, 'r') as file:
config = json.load(file)
return config
def fetch_data_pass_ids(api_base_url, token):
"""Fetches all data passes and returns a dictionary mapping names to IDs."""
url = f"{api_base_url}/dataPasses?token={token}"
response = requests.get(url, verify=False)
if response.status_code != 200:
print(f"Error fetching data pass IDs: {response.status_code} - {response.text}")
return {}
data_passes = response.json().get('data', [])
return {dp['name']: dp['id'] for dp in data_passes}
def fetch_runs(api_base_url, data_pass_id, token):
"""Fetches a list of runs for a given data pass ID from the API."""
url = f"{api_base_url}/runs?filter[dataPassIds][]={data_pass_id}&token={token}"
response = requests.get(url, verify=False)
if response.status_code != 200:
print(f"Error fetching runs: {response.status_code} - {response.text}")
return []
runs = response.json().get('data', [])
# Extract detectors involved in each run
for run in runs:
run['detectors_involved'] = run.get('detectors', '').split(',')
return runs
def fetch_flags(api_base_url, data_pass_id, run_number, detector_id, token):
"""Fetches quality flags for a specific detector and run."""
url = f"{api_base_url}/perDataPass?dataPassId={data_pass_id}&runNumber={run_number}&dplDetectorId={detector_id}&token={token}"
response = requests.get(url, verify=False)
if response.status_code != 200:
print(f"Error fetching flags: {response.status_code} - {response.text}")
return []
data = response.json()
flags = data.get('data', [])
if not flags:
return []
# Sort flags by 'updatedAt' timestamp
flags.sort(key=lambda x: x['updatedAt'], reverse=True)
# Keep only the latest flag for each interval
latest_flags = {}
for flag in flags:
key = (flag['from'], flag['to'])
if key not in latest_flags:
latest_flags[key] = flag
return list(latest_flags.values())
def is_run_excluded(run_number, excluded_runs):
"""Check if a run number is in the excluded runs list."""
return run_number in excluded_runs
# Set up argument parsing
parser = argparse.ArgumentParser(description="Verify quality control flags.")
parser.add_argument('config', type=str, help='Path to the configuration file')
parser.add_argument('--data_pass', type=str, required=True, help='Data pass name to use')
parser.add_argument('--detector', type=str, required=True, help='Detector name to use')
parser.add_argument('--min_run', type=int, help='Minimum run number')
parser.add_argument('--max_run', type=int, help='Maximum run number')
parser.add_argument('--excluded_runs', type=int, nargs='*', default=[], help='List of run numbers to exclude')
parser.add_argument('--comment', type=str, default=None, help='Optional verification comment')
args = parser.parse_args()
# Load configuration from the specified JSON file
config = load_config(args.config)
# Replace with your actual token and URLs from the config
TOKEN = config['token']
API_BASE_URL = config['run_api_url']
FLAG_FETCH_API_URL = config['flag_fetch_api_url']
FLAG_VERIFY_API_URL = config['flag_verify_api_url']
# Fetch data pass IDs
data_pass_ids = fetch_data_pass_ids(API_BASE_URL, TOKEN)
# Get the data pass ID for the specified data pass name
data_pass_id = data_pass_ids.get(args.data_pass)
if not data_pass_id:
print(f"No data pass ID found for {args.data_pass}")
exit(1)
# Fetch runs for the specified data pass ID
runs = fetch_runs(API_BASE_URL, data_pass_id, TOKEN)
# Get the detector ID from the config
detector_id = config['detector_ids'].get(args.detector)
if not detector_id:
print(f"No detector ID found for {args.detector}")
exit(1)
# Iterate over the fetched runs, filter them if needed, and make POST requests
for run in runs:
run_number = run['runNumber']
# Check if the run is excluded
if is_run_excluded(run_number, args.excluded_runs):
print(f"Skipping excluded run {run_number}")
continue
# Check if filtering by min_run and max_run is required
if (args.min_run is not None and run_number < args.min_run) or (args.max_run is not None and run_number > args.max_run):
print(f"Skipping run {run_number} outside the specified range")
continue
# Check if the detector is involved in the run
involved_detectors = run['detectors_involved']
if args.detector not in involved_detectors:
print(f"Skipping run {run_number} as detector {args.detector} is not involved")
continue
# Fetch the flags for the current run and detector
flags = fetch_flags(FLAG_FETCH_API_URL, data_pass_id, run_number, detector_id, TOKEN)
# Verify each flag
for flag in flags:
flag_id = flag['id']
# URL for the POST request
url = f"{FLAG_VERIFY_API_URL}/{flag_id}/verify"
# Query parameters
params = {
"token": TOKEN
}
# Request body
data = {
"comment": args.comment # Optional comment, can be None
}
print(f"Verifying flag {flag_id} for run {run_number}")
# Make the POST request
response = requests.post(url, params=params, json=data)
# Print the response
#print(f"Run {run_number}, Flag {flag_id} - Status Code:", response.status_code)
#print(f"Run {run_number}, Flag {flag_id} - Response Body:", response.json())