-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathdata_validation.py
More file actions
57 lines (48 loc) · 2.21 KB
/
data_validation.py
File metadata and controls
57 lines (48 loc) · 2.21 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
from prefect import task, flow, get_run_logger
import time as ttime
from tiled.client import from_uri
from bluesky_tiled_plugins.writing.validator import validate
BEAMLINE_OR_ENDSTATION = "!!! Set the endstation or beamline_TLA here !!!"
@task(retries=2, retry_delay_seconds=10)
def get_run(uid, api_key=None):
cl = from_uri("https://tiled.nsls2.bnl.gov", api_key=api_key)
run = cl[f"{BEAMLINE_OR_ENDSTATION}/raw"][uid]
return run
# SQL database-backed - remove if this does not exist on the beamline
@task(retries=2, retry_delay_seconds=10)
def get_run_migration(uid, api_key=None):
cl = from_uri("https://tiled.nsls2.bnl.gov", api_key=api_key)
run = cl[f"{BEAMLINE_OR_ENDSTATION}/migration"][uid]
return run
@task(retries=2, retry_delay_seconds=10)
def read_stream(run, stream):
return run[stream].read()
# currently configured to run only one of BTP validation or read all streams checks
@flow
def data_validation(uid, api_key=None, dry_run=False):
logger = get_run_logger()
run_client = get_run_migration(
uid, api_key=api_key
) # replace with get_run() if no SQL database
logger.info(f"Validating uid {run_client.start['uid']}")
start_time = ttime.monotonic()
try:
# the following calls to validate() only work for SQL database-backed catalogs - remove if not available
if dry_run:
validate(
run_client, fix_errors=False, try_reading=True, raise_on_error=True
)
else:
validate(run_client, fix_errors=True, try_reading=True, raise_on_error=True)
except AttributeError:
# check by reading data if not SQL database-backed
run_client = get_run(uid, api_key=api_key) # remove if no SQL database
for stream in run_client:
logger.info(f"{stream}:")
stream_start_time = ttime.monotonic()
stream_data = read_stream(run_client, stream) # noqa: F841
stream_elapsed_time = ttime.monotonic() - stream_start_time
logger.info(f"{stream} elapsed_time = {stream_elapsed_time}")
logger.info(f"{stream} nbytes = {stream_data.nbytes:_}")
elapsed_time = ttime.monotonic() - start_time
logger.info(f"{elapsed_time = }")