diff --git a/data_validation.py b/data_validation.py index 4fbd1c7..776713f 100644 --- a/data_validation.py +++ b/data_validation.py @@ -1,21 +1,30 @@ from prefect import task, flow, get_run_logger import time as ttime -from tiled.client import from_profile -from prefect.blocks.system import Secret +from tiled.client import from_uri + + +@task +def get_run(uid, api_key=None): + tiled_client = from_uri("https://tiled.nsls2.bnl.gov", api_key=api_key) + run = tiled_client["srx/raw"][uid] + return run + + +@task +def read_stream(run, stream): + return run[stream].read() @task(retries=2, retry_delay_seconds=10) -def read_all_streams(uid, beamline_acronym): - api_key = Secret.load("tiled-srx-api-key", _sync=True).get() - tiled_client = from_profile("nsls2", api_key=api_key) +def read_all_streams(uid, api_key=None): logger = get_run_logger() - run = tiled_client[beamline_acronym]["raw"][uid] + run = get_run(uid, api_key=api_key) logger.info(f"Validating uid {run.start['uid']}") start_time = ttime.monotonic() for stream in run: logger.info(f"{stream}:") stream_start_time = ttime.monotonic() - stream_data = run[stream].read() + stream_data = read_stream(run, stream) stream_elapsed_time = ttime.monotonic() - stream_start_time logger.info(f"{stream} elapsed_time = {stream_elapsed_time}") logger.info(f"{stream} nbytes = {stream_data.nbytes:_}") @@ -24,5 +33,5 @@ def read_all_streams(uid, beamline_acronym): @flow -def data_validation(uid): - read_all_streams(uid, beamline_acronym="srx") +def data_validation(uid, api_key=None): + read_all_streams(uid, api_key=api_key) diff --git a/end_of_run_workflow.py b/end_of_run_workflow.py index 5ae867b..8b5cae0 100644 --- a/end_of_run_workflow.py +++ b/end_of_run_workflow.py @@ -2,18 +2,25 @@ from prefect import task, flow, get_run_logger from prefect.blocks.notifications import SlackWebhook -from prefect.blocks.system import Secret from prefect.context import FlowRunContext from xanes_exporter import xanes_exporter from xrf_hdf5_exporter import xrf_hdf5_exporter from logscan import logscan - -from tiled.client import from_profile +from dotenv import load_dotenv +import os +from data_validation import get_run CATALOG_NAME = "srx" +def get_api_key_from_env(): + with open("/srv/container.secret", "r") as secrets: + load_dotenv(stream=secrets) + api_key = os.environ["TILED_API_KEY"] + return api_key + + def slack(func): """ Send a message to mon-prefect slack channel about the flow-run status. @@ -24,7 +31,7 @@ def slack(func): the flow. To keep the naming of workflows consistent, the name of this inner function had to match the expected name. """ - def end_of_run_workflow(stop_doc): + def end_of_run_workflow(stop_doc, api_key=None): flow_run_name = FlowRunContext.get().flow_run.dict().get("name") # Load slack credentials that are saved in Prefect. @@ -35,10 +42,8 @@ def end_of_run_workflow(stop_doc): uid = stop_doc["run_start"] # Get the scan_id. - api_key = Secret.load("tiled-srx-api-key", _sync=True).get() - tiled_client = from_profile("nsls2", api_key=api_key)[CATALOG_NAME] - tiled_client_raw = tiled_client["raw"] - scan_id = tiled_client_raw[uid].start["scan_id"] + run = get_run(uid, api_key=api_key) + scan_id = run.start["scan_id"] # Send a message to mon-bluesky if bluesky-run failed. if stop_doc.get("exit_status") == "fail": @@ -74,11 +79,13 @@ def log_completion(): @flow @slack -def end_of_run_workflow(stop_doc): +def end_of_run_workflow(stop_doc, api_key=None, dry_run=False): uid = stop_doc["run_start"] + if not api_key: + api_key = get_api_key_from_env() - # data_validation(uid, return_state=True) - xanes_exporter(uid) - xrf_hdf5_exporter(uid) - logscan(uid) + # data_validation(uid, return_state=True, api_key=api) + xanes_exporter(uid, api_key=api_key, dry_run=dry_run) + xrf_hdf5_exporter(uid, api_key=api_key, dry_run=dry_run) + logscan(uid, api_key=api_key, dry_run=dry_run) log_completion() diff --git a/logscan.py b/logscan.py index 702c2d7..3e89ac0 100644 --- a/logscan.py +++ b/logscan.py @@ -1,12 +1,7 @@ from pathlib import Path from prefect import flow, task, get_run_logger -from prefect.blocks.system import Secret -from tiled.client import from_profile - -api_key = Secret.load("tiled-srx-api-key", _sync=True).get() -tiled_client = from_profile("nsls2", api_key=api_key)["srx"] -tiled_client_raw = tiled_client["raw"] +from data_validation import get_run def find_scanid(logfile_path, scanid): @@ -21,10 +16,10 @@ def find_scanid(logfile_path, scanid): @task -def logscan_detailed(scanid): +def logscan_detailed(scanid, api_key=None, dry_run=False): logger = get_run_logger() - h = tiled_client_raw[scanid] + h = get_run(scanid, api_key=api_key) if ( "Beamline Commissioning (beamline staff only)".lower() @@ -71,14 +66,17 @@ def logscan_detailed(scanid): out_str += "\n" # Write to file - with open(logfile_path, "a") as userlogf: - userlogf.write(out_str) - logger.info(f"Added {h.start['scan_id']} to the logs") + if dry_run: + logger.info(f"Dry run: scan_id: {h.start['scan_id']} output: {out_str}") + else: + with open(logfile_path, "a") as userlogf: + userlogf.write(out_str) + logger.info(f"Added {h.start['scan_id']} to the logs") @flow(log_prints=True) -def logscan(ref): +def logscan(ref, api_key=None, dry_run=False): logger = get_run_logger() logger.info("Start writing logfile...") - logscan_detailed(ref) + logscan_detailed(ref, api_key=api_key, dry_run=dry_run) logger.info("Finish writing logfile.") diff --git a/prefect.yaml b/prefect.yaml index 8d7d721..36ac550 100644 --- a/prefect.yaml +++ b/prefect.yaml @@ -24,14 +24,12 @@ deployments: schedule: {} work_pool: job_variables: - env: - TILED_SITE_PROFILES: /nsls2/software/etc/tiled/profiles image: ghcr.io/nsls2/srx-workflows:main image_pull_policy: Always network: slirp4netns volumes: - /nsls2/data/srx/proposals:/nsls2/data/srx/proposals - - /nsls2/software/etc/tiled:/nsls2/software/etc/tiled + - /srv/prefect3-docker-worker-srx/app:/srv container_create_kwargs: userns_mode: "keep-id:uid=402949,gid=402949" # workflow-srx:workflow-srx auto_remove: true diff --git a/xanes_exporter.py b/xanes_exporter.py index 3670f25..248a433 100644 --- a/xanes_exporter.py +++ b/xanes_exporter.py @@ -1,18 +1,11 @@ from prefect import flow, task, get_run_logger -from prefect.blocks.system import Secret -from tiled.client import from_profile - +from data_validation import get_run import time as ttime import numpy as np import xraylib as xrl import pandas as pd -api_key = Secret.load("tiled-srx-api-key", _sync=True).get() -tiled_client = from_profile("nsls2", api_key=api_key)["srx"] -tiled_client_raw = tiled_client["raw"] - - def xanes_textout( scanid=-1, header=[], @@ -21,6 +14,7 @@ def xanes_textout( usercolumn={}, usercolumnname=[], output=True, + api_key=None, ): """ scan: can be scan_id (integer) or uid (string). default=-1 (last scan run) @@ -35,7 +29,7 @@ def xanes_textout( """ - h = tiled_client_raw[scanid] + h = get_run(scanid, api_key=api_key) if ( "Beamline Commissioning (beamline staff only)".lower() in h.start["proposal"]["type"].lower() @@ -121,13 +115,13 @@ def xanes_textout( @task -def xas_step_exporter(scanid): +def xas_step_exporter(scanid, api_key=None, dry_run=False): logger = get_run_logger() # Custom header list headeritem = [] # Load header for our scan - h = tiled_client_raw[scanid] + h = get_run(scanid, api_key=api_key) if h.start["scan"].get("type") != "XAS_STEP": logger.info("Incorrect document type. Not running exporter on this document.") @@ -216,22 +210,26 @@ def xas_step_exporter(scanid): # usercolumnitem['If-{:02}'.format(i)] = roisum # usercolumnitem['If-{:02}'.format(i)].round(0) - xanes_textout( - scanid=scanid, - header=headeritem, - userheader=userheaderitem, - column=columnitem, - usercolumn=usercolumnitem, - usercolumnname=usercolumnitem.keys(), - output=False, - ) + if dry_run: + logger.info("Dry run: Not exporting xanes") + else: + xanes_textout( + scanid=scanid, + header=headeritem, + userheader=userheaderitem, + column=columnitem, + usercolumn=usercolumnitem, + usercolumnname=usercolumnitem.keys(), + output=False, + api_key=api_key, + ) @task -def xas_fly_exporter(uid): +def xas_fly_exporter(uid, api_key=None, dry_run=False): logger = get_run_logger() # Get a scan header - hdr = tiled_client_raw[uid] + hdr = get_run(uid, api_key=api_key) start_doc = hdr.start # Get proposal directory location @@ -320,27 +318,40 @@ def xas_fly_exporter(uid): staticheader += "# \n# " # Export data to file - with open(fname, "w") as f: - f.write(staticheader) - df.to_csv(fname, float_format="%.3f", sep=" ", mode="a") + if dry_run: + logger.info("Dry run: xas fly exporter") + if len(df) >= 2: + logger.info( + "Dry run: first and last row: {pd.concat([df.head(1), df.tail(1)])}" + ) + elif len(df) == 1: + logger.info("Dry run: row: {df}") + else: + logger.info("Dry run: (no data)") + else: + with open(fname, "w") as f: + f.write(staticheader) + df.to_csv(fname, float_format="%.3f", sep=" ", mode="a") @flow(log_prints=True) -def xanes_exporter(ref): +def xanes_exporter(ref, api_key=None, dry_run=False): logger = get_run_logger() logger.info("Start writing file with xanes_exporter...") # Get scan type - scan_type = tiled_client_raw[ref].start.get("scan", {}).get("type", "unknown") + scan_type = ( + get_run(ref, api_key=api_key).start.get("scan", {}).get("type", "unknown") + ) # Redirect to correction function - or pass if scan_type == "XAS_STEP": logger.info("Starting xanes step-scan exporter.") - xas_step_exporter(ref) + xas_step_exporter(ref, api_key=api_key, dry_run=dry_run) logger.info("Finished writing file with xanes step-scan exporter.") elif scan_type == "XAS_FLY": logger.info("Starting xanes fly-scan exporter.") - xas_fly_exporter(ref) + xas_fly_exporter(ref, api_key=api_key, dry_run=dry_run) logger.info("Finished writing file with xanes fly-scan exporter.") else: logger.info(f"xanes exporter for {scan_type=} not available") diff --git a/xrf_hdf5_exporter.py b/xrf_hdf5_exporter.py index 7705c38..53fbc63 100644 --- a/xrf_hdf5_exporter.py +++ b/xrf_hdf5_exporter.py @@ -1,5 +1,4 @@ from prefect import flow, task, get_run_logger -from prefect.blocks.system import Secret import glob import os @@ -8,18 +7,15 @@ import dask from pyxrf.api import make_hdf -from tiled.client import from_profile +from data_validation import get_run + # from pyxrf.api import make_hdf CATALOG_NAME = "srx" -api_key = Secret.load("tiled-srx-api-key", _sync=True).get() -tiled_client = from_profile("nsls2", api_key=api_key)[CATALOG_NAME] -tiled_client_raw = tiled_client["raw"] - @task -def export_xrf_hdf5(scanid): +def export_xrf_hdf5(scanid, api_key=None, dry_run=False): logger = get_run_logger() logger.info(f"{pyxrf.__file__ = }") @@ -27,7 +23,7 @@ def export_xrf_hdf5(scanid): logger.info(f"{dask.__file__ = }") # Load header for our scan - h = tiled_client_raw[scanid] + h = get_run(scanid, api_key=api_key) if h.start["scan"]["type"] not in ["XRF_FLY", "XRF_STEP"]: logger.info( @@ -64,17 +60,20 @@ def export_xrf_hdf5(scanid): os.environ["TILED_API_KEY"] = ( api_key # pyxrf assumes Tiled API key as an environment variable ) - make_hdf(scanid, wd=working_dir, prefix=prefix, catalog_name=CATALOG_NAME) + if dry_run: + logger.info("Dry run: not creating HDF5 file using PyXRF") + else: + make_hdf(scanid, wd=working_dir, prefix=prefix, catalog_name=CATALOG_NAME) - # chmod g+w for created file(s) - # context: https://nsls2.slack.com/archives/C04UUSG88VB/p1718911163624149 - for file in glob.glob(f"{working_dir}/{prefix}{scanid}*.h5"): - os.chmod(file, os.stat(file).st_mode | stat.S_IWGRP) + # chmod g+w for created file(s) + # context: https://nsls2.slack.com/archives/C04UUSG88VB/p1718911163624149 + for file in glob.glob(f"{working_dir}/{prefix}{scanid}*.h5"): + os.chmod(file, os.stat(file).st_mode | stat.S_IWGRP) @flow(log_prints=True) -def xrf_hdf5_exporter(scanid): +def xrf_hdf5_exporter(scanid, api_key=None, dry_run=False): logger = get_run_logger() logger.info("Start writing file with xrf_hdf5 exporter...") - export_xrf_hdf5(scanid) + export_xrf_hdf5(scanid, api_key=api_key, dry_run=dry_run) logger.info("Finish writing file with xrf_hdf5 exporter.")