From 6795571cc051aae7de53d2876eca354b3aeab75c Mon Sep 17 00:00:00 2001 From: Jun Aishima Date: Wed, 25 Mar 2026 11:00:09 -0400 Subject: [PATCH 01/17] update for dotenv * centralize on one get_run() * use dotenv for Tiled API key --- data_validation.py | 27 ++++++++++++++++++--------- end_of_run_workflow.py | 17 ++++++++--------- logscan.py | 8 +++++--- xanes_exporter.py | 26 +++++++++++--------------- xrf_hdf5_exporter.py | 16 ++++++---------- 5 files changed, 48 insertions(+), 46 deletions(-) diff --git a/data_validation.py b/data_validation.py index 4fbd1c7..776713f 100644 --- a/data_validation.py +++ b/data_validation.py @@ -1,21 +1,30 @@ from prefect import task, flow, get_run_logger import time as ttime -from tiled.client import from_profile -from prefect.blocks.system import Secret +from tiled.client import from_uri + + +@task +def get_run(uid, api_key=None): + tiled_client = from_uri("https://tiled.nsls2.bnl.gov", api_key=api_key) + run = tiled_client["srx/raw"][uid] + return run + + +@task +def read_stream(run, stream): + return run[stream].read() @task(retries=2, retry_delay_seconds=10) -def read_all_streams(uid, beamline_acronym): - api_key = Secret.load("tiled-srx-api-key", _sync=True).get() - tiled_client = from_profile("nsls2", api_key=api_key) +def read_all_streams(uid, api_key=None): logger = get_run_logger() - run = tiled_client[beamline_acronym]["raw"][uid] + run = get_run(uid, api_key=api_key) logger.info(f"Validating uid {run.start['uid']}") start_time = ttime.monotonic() for stream in run: logger.info(f"{stream}:") stream_start_time = ttime.monotonic() - stream_data = run[stream].read() + stream_data = read_stream(run, stream) stream_elapsed_time = ttime.monotonic() - stream_start_time logger.info(f"{stream} elapsed_time = {stream_elapsed_time}") logger.info(f"{stream} nbytes = {stream_data.nbytes:_}") @@ -24,5 +33,5 @@ def read_all_streams(uid, beamline_acronym): @flow -def data_validation(uid): - read_all_streams(uid, beamline_acronym="srx") +def data_validation(uid, api_key=None): + read_all_streams(uid, api_key=api_key) diff --git a/end_of_run_workflow.py b/end_of_run_workflow.py index 5ae867b..73b24e7 100644 --- a/end_of_run_workflow.py +++ b/end_of_run_workflow.py @@ -9,7 +9,7 @@ from xrf_hdf5_exporter import xrf_hdf5_exporter from logscan import logscan -from tiled.client import from_profile +from data_validation import get_run CATALOG_NAME = "srx" @@ -36,9 +36,8 @@ def end_of_run_workflow(stop_doc): # Get the scan_id. api_key = Secret.load("tiled-srx-api-key", _sync=True).get() - tiled_client = from_profile("nsls2", api_key=api_key)[CATALOG_NAME] - tiled_client_raw = tiled_client["raw"] - scan_id = tiled_client_raw[uid].start["scan_id"] + tiled_client = get_run(uid, api_key=api_key) + scan_id = tiled_client(uid, api_key=api_key).start["scan_id"] # Send a message to mon-bluesky if bluesky-run failed. if stop_doc.get("exit_status") == "fail": @@ -74,11 +73,11 @@ def log_completion(): @flow @slack -def end_of_run_workflow(stop_doc): +def end_of_run_workflow(stop_doc, api_key=None): uid = stop_doc["run_start"] - # data_validation(uid, return_state=True) - xanes_exporter(uid) - xrf_hdf5_exporter(uid) - logscan(uid) + # data_validation(uid, return_state=True, api_key=api) + xanes_exporter(uid, api_key=api_key) + xrf_hdf5_exporter(uid, api_key=api_key) + logscan(uid, api_key=api_key) log_completion() diff --git a/logscan.py b/logscan.py index 702c2d7..3f20f33 100644 --- a/logscan.py +++ b/logscan.py @@ -3,6 +3,8 @@ from prefect.blocks.system import Secret from tiled.client import from_profile +from data_validation import get_run + api_key = Secret.load("tiled-srx-api-key", _sync=True).get() tiled_client = from_profile("nsls2", api_key=api_key)["srx"] @@ -21,10 +23,10 @@ def find_scanid(logfile_path, scanid): @task -def logscan_detailed(scanid): +def logscan_detailed(scanid, api_key=None): logger = get_run_logger() - h = tiled_client_raw[scanid] + h = get_run(scanid, api_key=api_key) if ( "Beamline Commissioning (beamline staff only)".lower() @@ -77,7 +79,7 @@ def logscan_detailed(scanid): @flow(log_prints=True) -def logscan(ref): +def logscan(ref, api_key=None): logger = get_run_logger() logger.info("Start writing logfile...") logscan_detailed(ref) diff --git a/xanes_exporter.py b/xanes_exporter.py index 3670f25..db96432 100644 --- a/xanes_exporter.py +++ b/xanes_exporter.py @@ -1,18 +1,11 @@ from prefect import flow, task, get_run_logger -from prefect.blocks.system import Secret -from tiled.client import from_profile - +from data_validation import get_run import time as ttime import numpy as np import xraylib as xrl import pandas as pd -api_key = Secret.load("tiled-srx-api-key", _sync=True).get() -tiled_client = from_profile("nsls2", api_key=api_key)["srx"] -tiled_client_raw = tiled_client["raw"] - - def xanes_textout( scanid=-1, header=[], @@ -21,6 +14,7 @@ def xanes_textout( usercolumn={}, usercolumnname=[], output=True, + api_key=None, ): """ scan: can be scan_id (integer) or uid (string). default=-1 (last scan run) @@ -35,7 +29,7 @@ def xanes_textout( """ - h = tiled_client_raw[scanid] + h = get_run(scanid, api_key=api_key) if ( "Beamline Commissioning (beamline staff only)".lower() in h.start["proposal"]["type"].lower() @@ -121,13 +115,13 @@ def xanes_textout( @task -def xas_step_exporter(scanid): +def xas_step_exporter(scanid, api_key=None): logger = get_run_logger() # Custom header list headeritem = [] # Load header for our scan - h = tiled_client_raw[scanid] + h = get_run(scanid, api_key=api_key) if h.start["scan"].get("type") != "XAS_STEP": logger.info("Incorrect document type. Not running exporter on this document.") @@ -228,10 +222,10 @@ def xas_step_exporter(scanid): @task -def xas_fly_exporter(uid): +def xas_fly_exporter(uid, api_key=None): logger = get_run_logger() # Get a scan header - hdr = tiled_client_raw[uid] + hdr = get_run(uid, api_key=api_key) start_doc = hdr.start # Get proposal directory location @@ -326,12 +320,14 @@ def xas_fly_exporter(uid): @flow(log_prints=True) -def xanes_exporter(ref): +def xanes_exporter(ref, api_key=None): logger = get_run_logger() logger.info("Start writing file with xanes_exporter...") # Get scan type - scan_type = tiled_client_raw[ref].start.get("scan", {}).get("type", "unknown") + scan_type = ( + get_run(ref, api_key=api_key).start.get("scan", {}).get("type", "unknown") + ) # Redirect to correction function - or pass if scan_type == "XAS_STEP": diff --git a/xrf_hdf5_exporter.py b/xrf_hdf5_exporter.py index 7705c38..fea68e4 100644 --- a/xrf_hdf5_exporter.py +++ b/xrf_hdf5_exporter.py @@ -1,5 +1,4 @@ from prefect import flow, task, get_run_logger -from prefect.blocks.system import Secret import glob import os @@ -8,18 +7,15 @@ import dask from pyxrf.api import make_hdf -from tiled.client import from_profile +from data_validation import get_run + # from pyxrf.api import make_hdf CATALOG_NAME = "srx" -api_key = Secret.load("tiled-srx-api-key", _sync=True).get() -tiled_client = from_profile("nsls2", api_key=api_key)[CATALOG_NAME] -tiled_client_raw = tiled_client["raw"] - @task -def export_xrf_hdf5(scanid): +def export_xrf_hdf5(scanid, api_key=None): logger = get_run_logger() logger.info(f"{pyxrf.__file__ = }") @@ -27,7 +23,7 @@ def export_xrf_hdf5(scanid): logger.info(f"{dask.__file__ = }") # Load header for our scan - h = tiled_client_raw[scanid] + h = get_run(scanid, api_key=api_key) if h.start["scan"]["type"] not in ["XRF_FLY", "XRF_STEP"]: logger.info( @@ -73,8 +69,8 @@ def export_xrf_hdf5(scanid): @flow(log_prints=True) -def xrf_hdf5_exporter(scanid): +def xrf_hdf5_exporter(scanid, api_key=None): logger = get_run_logger() logger.info("Start writing file with xrf_hdf5 exporter...") - export_xrf_hdf5(scanid) + export_xrf_hdf5(scanid, api_key=api_key) logger.info("Finish writing file with xrf_hdf5 exporter.") From 918d89d1e9e8cf70e050302cee6f9f24a4078fc8 Mon Sep 17 00:00:00 2001 From: Jun Aishima Date: Wed, 25 Mar 2026 11:01:44 -0400 Subject: [PATCH 02/17] remove site profile references --- prefect.yaml | 3 --- 1 file changed, 3 deletions(-) diff --git a/prefect.yaml b/prefect.yaml index 8d7d721..1bcb462 100644 --- a/prefect.yaml +++ b/prefect.yaml @@ -24,14 +24,11 @@ deployments: schedule: {} work_pool: job_variables: - env: - TILED_SITE_PROFILES: /nsls2/software/etc/tiled/profiles image: ghcr.io/nsls2/srx-workflows:main image_pull_policy: Always network: slirp4netns volumes: - /nsls2/data/srx/proposals:/nsls2/data/srx/proposals - - /nsls2/software/etc/tiled:/nsls2/software/etc/tiled container_create_kwargs: userns_mode: "keep-id:uid=402949,gid=402949" # workflow-srx:workflow-srx auto_remove: true From 013710fac3bfefeec88026de45c5a5bf1c6df34c Mon Sep 17 00:00:00 2001 From: Jun Aishima Date: Fri, 27 Mar 2026 17:38:59 -0400 Subject: [PATCH 03/17] Update end_of_run_workflow.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- end_of_run_workflow.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/end_of_run_workflow.py b/end_of_run_workflow.py index 73b24e7..68ef3cf 100644 --- a/end_of_run_workflow.py +++ b/end_of_run_workflow.py @@ -36,8 +36,8 @@ def end_of_run_workflow(stop_doc): # Get the scan_id. api_key = Secret.load("tiled-srx-api-key", _sync=True).get() - tiled_client = get_run(uid, api_key=api_key) - scan_id = tiled_client(uid, api_key=api_key).start["scan_id"] + run = get_run(uid, api_key=api_key) + scan_id = run.start["scan_id"] # Send a message to mon-bluesky if bluesky-run failed. if stop_doc.get("exit_status") == "fail": From 76dacd457899fab8800e9e0dc1d5cb270b96774a Mon Sep 17 00:00:00 2001 From: Jun Aishima Date: Fri, 27 Mar 2026 17:41:15 -0400 Subject: [PATCH 04/17] remove use of Prefect Block --- end_of_run_workflow.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/end_of_run_workflow.py b/end_of_run_workflow.py index 68ef3cf..1f9600f 100644 --- a/end_of_run_workflow.py +++ b/end_of_run_workflow.py @@ -2,7 +2,6 @@ from prefect import task, flow, get_run_logger from prefect.blocks.notifications import SlackWebhook -from prefect.blocks.system import Secret from prefect.context import FlowRunContext from xanes_exporter import xanes_exporter @@ -35,7 +34,6 @@ def end_of_run_workflow(stop_doc): uid = stop_doc["run_start"] # Get the scan_id. - api_key = Secret.load("tiled-srx-api-key", _sync=True).get() run = get_run(uid, api_key=api_key) scan_id = run.start["scan_id"] From ddfa92d30f13cfa90d4525f7ca7fca8f458acc4f Mon Sep 17 00:00:00 2001 From: Jun Aishima Date: Fri, 27 Mar 2026 17:42:38 -0400 Subject: [PATCH 05/17] ensure end_of_run_workflow receives API key --- end_of_run_workflow.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/end_of_run_workflow.py b/end_of_run_workflow.py index 1f9600f..64d1ac5 100644 --- a/end_of_run_workflow.py +++ b/end_of_run_workflow.py @@ -23,7 +23,7 @@ def slack(func): the flow. To keep the naming of workflows consistent, the name of this inner function had to match the expected name. """ - def end_of_run_workflow(stop_doc): + def end_of_run_workflow(stop_doc, api_key=None): flow_run_name = FlowRunContext.get().flow_run.dict().get("name") # Load slack credentials that are saved in Prefect. From ec1d9a2345b4200c503ea45ac3660bd66ec73e81 Mon Sep 17 00:00:00 2001 From: Jun Aishima Date: Fri, 27 Mar 2026 17:46:12 -0400 Subject: [PATCH 06/17] pass through api_key when necessary --- xanes_exporter.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/xanes_exporter.py b/xanes_exporter.py index db96432..1ec54c9 100644 --- a/xanes_exporter.py +++ b/xanes_exporter.py @@ -218,6 +218,7 @@ def xas_step_exporter(scanid, api_key=None): usercolumn=usercolumnitem, usercolumnname=usercolumnitem.keys(), output=False, + api_key=api_key, ) @@ -332,11 +333,11 @@ def xanes_exporter(ref, api_key=None): # Redirect to correction function - or pass if scan_type == "XAS_STEP": logger.info("Starting xanes step-scan exporter.") - xas_step_exporter(ref) + xas_step_exporter(ref, api_key=api_key) logger.info("Finished writing file with xanes step-scan exporter.") elif scan_type == "XAS_FLY": logger.info("Starting xanes fly-scan exporter.") - xas_fly_exporter(ref) + xas_fly_exporter(ref, api_key=api_key) logger.info("Finished writing file with xanes fly-scan exporter.") else: logger.info(f"xanes exporter for {scan_type=} not available") From 2cc51403767f94ecad3a88088533ad052fdade0d Mon Sep 17 00:00:00 2001 From: Jun Aishima Date: Fri, 27 Mar 2026 17:52:43 -0400 Subject: [PATCH 07/17] remove old code * unused code to get Tiled client - replace by using get_run() from data_validation() --- logscan.py | 7 ------- 1 file changed, 7 deletions(-) diff --git a/logscan.py b/logscan.py index 3f20f33..3910204 100644 --- a/logscan.py +++ b/logscan.py @@ -1,16 +1,9 @@ from pathlib import Path from prefect import flow, task, get_run_logger -from prefect.blocks.system import Secret -from tiled.client import from_profile from data_validation import get_run -api_key = Secret.load("tiled-srx-api-key", _sync=True).get() -tiled_client = from_profile("nsls2", api_key=api_key)["srx"] -tiled_client_raw = tiled_client["raw"] - - def find_scanid(logfile_path, scanid): is_scanid = False with open(logfile_path) as lf: From 44750764f3cc8e2a632c62c014643718b07b452c Mon Sep 17 00:00:00 2001 From: Jun Aishima Date: Fri, 27 Mar 2026 18:36:32 -0400 Subject: [PATCH 08/17] start adding dry_run --- end_of_run_workflow.py | 8 ++++---- logscan.py | 15 +++++++++------ 2 files changed, 13 insertions(+), 10 deletions(-) diff --git a/end_of_run_workflow.py b/end_of_run_workflow.py index 64d1ac5..15078d1 100644 --- a/end_of_run_workflow.py +++ b/end_of_run_workflow.py @@ -71,11 +71,11 @@ def log_completion(): @flow @slack -def end_of_run_workflow(stop_doc, api_key=None): +def end_of_run_workflow(stop_doc, api_key=None, dry_run=False): uid = stop_doc["run_start"] # data_validation(uid, return_state=True, api_key=api) - xanes_exporter(uid, api_key=api_key) - xrf_hdf5_exporter(uid, api_key=api_key) - logscan(uid, api_key=api_key) + xanes_exporter(uid, api_key=api_key, dry_run=dry_run) + xrf_hdf5_exporter(uid, api_key=api_key, dry_run=dry_run) + logscan(uid, api_key=api_key, dry_run=dry_run) log_completion() diff --git a/logscan.py b/logscan.py index 3910204..41eb37c 100644 --- a/logscan.py +++ b/logscan.py @@ -16,7 +16,7 @@ def find_scanid(logfile_path, scanid): @task -def logscan_detailed(scanid, api_key=None): +def logscan_detailed(scanid, api_key=None, dry_run=None): logger = get_run_logger() h = get_run(scanid, api_key=api_key) @@ -66,14 +66,17 @@ def logscan_detailed(scanid, api_key=None): out_str += "\n" # Write to file - with open(logfile_path, "a") as userlogf: - userlogf.write(out_str) - logger.info(f"Added {h.start['scan_id']} to the logs") + if dry_run: + logger.info(f"Dry run: scan_id: {h.start['scan_id']} output: {out_str}") + else: + with open(logfile_path, "a") as userlogf: + userlogf.write(out_str) + logger.info(f"Added {h.start['scan_id']} to the logs") @flow(log_prints=True) -def logscan(ref, api_key=None): +def logscan(ref, api_key=None, dry_run=None): logger = get_run_logger() logger.info("Start writing logfile...") - logscan_detailed(ref) + logscan_detailed(ref, dry_run=dry_run) logger.info("Finish writing logfile.") From 14ffe01445432ee45ec80c8689bdb2fe1b9b4726 Mon Sep 17 00:00:00 2001 From: Jun Aishima Date: Fri, 27 Mar 2026 18:47:53 -0400 Subject: [PATCH 09/17] update xanes_exporter for dry_run * output more info for xas_fly_exporter as it seems to have a better structure --- xanes_exporter.py | 49 ++++++++++++++++++++++++++++++----------------- 1 file changed, 31 insertions(+), 18 deletions(-) diff --git a/xanes_exporter.py b/xanes_exporter.py index 1ec54c9..5ddbb55 100644 --- a/xanes_exporter.py +++ b/xanes_exporter.py @@ -115,7 +115,7 @@ def xanes_textout( @task -def xas_step_exporter(scanid, api_key=None): +def xas_step_exporter(scanid, api_key=None, dry_run=None): logger = get_run_logger() # Custom header list @@ -210,20 +210,23 @@ def xas_step_exporter(scanid, api_key=None): # usercolumnitem['If-{:02}'.format(i)] = roisum # usercolumnitem['If-{:02}'.format(i)].round(0) - xanes_textout( - scanid=scanid, - header=headeritem, - userheader=userheaderitem, - column=columnitem, - usercolumn=usercolumnitem, - usercolumnname=usercolumnitem.keys(), - output=False, - api_key=api_key, - ) + if dry_run: + logger.info(f"Dry run: Not exporting xanes") + else: + xanes_textout( + scanid=scanid, + header=headeritem, + userheader=userheaderitem, + column=columnitem, + usercolumn=usercolumnitem, + usercolumnname=usercolumnitem.keys(), + output=False, + api_key=api_key, + ) @task -def xas_fly_exporter(uid, api_key=None): +def xas_fly_exporter(uid, api_key=None, dry_run=dry_run): logger = get_run_logger() # Get a scan header hdr = get_run(uid, api_key=api_key) @@ -315,13 +318,23 @@ def xas_fly_exporter(uid, api_key=None): staticheader += "# \n# " # Export data to file - with open(fname, "w") as f: - f.write(staticheader) - df.to_csv(fname, float_format="%.3f", sep=" ", mode="a") + if dry_run: + logger.info("Dry run: xas fly exporter") + if len(df) >=2: + logger.info("Dry run: first and last row: {pd.concat([df.head(1), df.tail(1)])}") + elif len(df) == 1: + logger.info("Dry run: row: {df}") + else: + logger.info("Dry run: (no data)") + else: + with open(fname, "w") as f: + f.write(staticheader) + df.to_csv(fname, float_format="%.3f", sep=" ", mode="a") + @flow(log_prints=True) -def xanes_exporter(ref, api_key=None): +def xanes_exporter(ref, api_key=None, dry_run=False): logger = get_run_logger() logger.info("Start writing file with xanes_exporter...") @@ -333,11 +346,11 @@ def xanes_exporter(ref, api_key=None): # Redirect to correction function - or pass if scan_type == "XAS_STEP": logger.info("Starting xanes step-scan exporter.") - xas_step_exporter(ref, api_key=api_key) + xas_step_exporter(ref, api_key=api_key, dry_run=dry_run) logger.info("Finished writing file with xanes step-scan exporter.") elif scan_type == "XAS_FLY": logger.info("Starting xanes fly-scan exporter.") - xas_fly_exporter(ref, api_key=api_key) + xas_fly_exporter(ref, api_key=api_key, dry_run=dry_run) logger.info("Finished writing file with xanes fly-scan exporter.") else: logger.info(f"xanes exporter for {scan_type=} not available") From b1e7f208030546e0fbe3976c436a84da82a8ac98 Mon Sep 17 00:00:00 2001 From: Jun Aishima Date: Fri, 27 Mar 2026 18:52:02 -0400 Subject: [PATCH 10/17] fix dry_run default value in logscan --- logscan.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/logscan.py b/logscan.py index 41eb37c..0a0e50d 100644 --- a/logscan.py +++ b/logscan.py @@ -16,7 +16,7 @@ def find_scanid(logfile_path, scanid): @task -def logscan_detailed(scanid, api_key=None, dry_run=None): +def logscan_detailed(scanid, api_key=None, dry_run=False): logger = get_run_logger() h = get_run(scanid, api_key=api_key) @@ -75,7 +75,7 @@ def logscan_detailed(scanid, api_key=None, dry_run=None): @flow(log_prints=True) -def logscan(ref, api_key=None, dry_run=None): +def logscan(ref, api_key=None, dry_run=False): logger = get_run_logger() logger.info("Start writing logfile...") logscan_detailed(ref, dry_run=dry_run) From 214462e13093f6bee4fc8b21a9ac780171af6204 Mon Sep 17 00:00:00 2001 From: Jun Aishima Date: Fri, 27 Mar 2026 18:52:23 -0400 Subject: [PATCH 11/17] pass through dry_run, do not write out HDF5 file via PyXRF if not True --- xrf_hdf5_exporter.py | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/xrf_hdf5_exporter.py b/xrf_hdf5_exporter.py index fea68e4..53fbc63 100644 --- a/xrf_hdf5_exporter.py +++ b/xrf_hdf5_exporter.py @@ -15,7 +15,7 @@ @task -def export_xrf_hdf5(scanid, api_key=None): +def export_xrf_hdf5(scanid, api_key=None, dry_run=False): logger = get_run_logger() logger.info(f"{pyxrf.__file__ = }") @@ -60,17 +60,20 @@ def export_xrf_hdf5(scanid, api_key=None): os.environ["TILED_API_KEY"] = ( api_key # pyxrf assumes Tiled API key as an environment variable ) - make_hdf(scanid, wd=working_dir, prefix=prefix, catalog_name=CATALOG_NAME) + if dry_run: + logger.info("Dry run: not creating HDF5 file using PyXRF") + else: + make_hdf(scanid, wd=working_dir, prefix=prefix, catalog_name=CATALOG_NAME) - # chmod g+w for created file(s) - # context: https://nsls2.slack.com/archives/C04UUSG88VB/p1718911163624149 - for file in glob.glob(f"{working_dir}/{prefix}{scanid}*.h5"): - os.chmod(file, os.stat(file).st_mode | stat.S_IWGRP) + # chmod g+w for created file(s) + # context: https://nsls2.slack.com/archives/C04UUSG88VB/p1718911163624149 + for file in glob.glob(f"{working_dir}/{prefix}{scanid}*.h5"): + os.chmod(file, os.stat(file).st_mode | stat.S_IWGRP) @flow(log_prints=True) -def xrf_hdf5_exporter(scanid, api_key=None): +def xrf_hdf5_exporter(scanid, api_key=None, dry_run=False): logger = get_run_logger() logger.info("Start writing file with xrf_hdf5 exporter...") - export_xrf_hdf5(scanid, api_key=api_key) + export_xrf_hdf5(scanid, api_key=api_key, dry_run=dry_run) logger.info("Finish writing file with xrf_hdf5 exporter.") From 3b04cc8e21a6ba9f66216c754f35621bd4b84666 Mon Sep 17 00:00:00 2001 From: Jun Aishima Date: Fri, 27 Mar 2026 19:04:37 -0400 Subject: [PATCH 12/17] pre-commit fixes, default values for dry_run for functions --- xanes_exporter.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/xanes_exporter.py b/xanes_exporter.py index 5ddbb55..248a433 100644 --- a/xanes_exporter.py +++ b/xanes_exporter.py @@ -115,7 +115,7 @@ def xanes_textout( @task -def xas_step_exporter(scanid, api_key=None, dry_run=None): +def xas_step_exporter(scanid, api_key=None, dry_run=False): logger = get_run_logger() # Custom header list @@ -211,7 +211,7 @@ def xas_step_exporter(scanid, api_key=None, dry_run=None): # usercolumnitem['If-{:02}'.format(i)].round(0) if dry_run: - logger.info(f"Dry run: Not exporting xanes") + logger.info("Dry run: Not exporting xanes") else: xanes_textout( scanid=scanid, @@ -226,7 +226,7 @@ def xas_step_exporter(scanid, api_key=None, dry_run=None): @task -def xas_fly_exporter(uid, api_key=None, dry_run=dry_run): +def xas_fly_exporter(uid, api_key=None, dry_run=False): logger = get_run_logger() # Get a scan header hdr = get_run(uid, api_key=api_key) @@ -320,8 +320,10 @@ def xas_fly_exporter(uid, api_key=None, dry_run=dry_run): # Export data to file if dry_run: logger.info("Dry run: xas fly exporter") - if len(df) >=2: - logger.info("Dry run: first and last row: {pd.concat([df.head(1), df.tail(1)])}") + if len(df) >= 2: + logger.info( + "Dry run: first and last row: {pd.concat([df.head(1), df.tail(1)])}" + ) elif len(df) == 1: logger.info("Dry run: row: {df}") else: @@ -332,7 +334,6 @@ def xas_fly_exporter(uid, api_key=None, dry_run=dry_run): df.to_csv(fname, float_format="%.3f", sep=" ", mode="a") - @flow(log_prints=True) def xanes_exporter(ref, api_key=None, dry_run=False): logger = get_run_logger() From b60e2cf98f49e975ac4bd52ba67921c8e62e114e Mon Sep 17 00:00:00 2001 From: Jun Aishima Date: Mon, 30 Mar 2026 11:12:22 -0400 Subject: [PATCH 13/17] add missing mount of directory for container.secret --- prefect.yaml | 1 + 1 file changed, 1 insertion(+) diff --git a/prefect.yaml b/prefect.yaml index 1bcb462..36ac550 100644 --- a/prefect.yaml +++ b/prefect.yaml @@ -29,6 +29,7 @@ deployments: network: slirp4netns volumes: - /nsls2/data/srx/proposals:/nsls2/data/srx/proposals + - /srv/prefect3-docker-worker-srx/app:/srv container_create_kwargs: userns_mode: "keep-id:uid=402949,gid=402949" # workflow-srx:workflow-srx auto_remove: true From 0816671ce8d8a3ff13c94e7122aa32c297a6d922 Mon Sep 17 00:00:00 2001 From: Jun Aishima Date: Mon, 30 Mar 2026 18:04:29 -0400 Subject: [PATCH 14/17] add reading API key from container.secret file with dotenv --- end_of_run_workflow.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/end_of_run_workflow.py b/end_of_run_workflow.py index 15078d1..8924fa7 100644 --- a/end_of_run_workflow.py +++ b/end_of_run_workflow.py @@ -7,12 +7,20 @@ from xanes_exporter import xanes_exporter from xrf_hdf5_exporter import xrf_hdf5_exporter from logscan import logscan - +from dotenv import load_dotenv +import os from data_validation import get_run CATALOG_NAME = "srx" +def get_api_key_from_env(api_key=None): + with open("/srv/container.secret", "r") as secrets: + load_dotenv(stream=secrets) + api_key = os.environ["TILED_API_KEY"] + return api_key + + def slack(func): """ Send a message to mon-prefect slack channel about the flow-run status. @@ -73,6 +81,8 @@ def log_completion(): @slack def end_of_run_workflow(stop_doc, api_key=None, dry_run=False): uid = stop_doc["run_start"] + if not api_key: + api_key = get_api_key_from_env(api_key=None) # data_validation(uid, return_state=True, api_key=api) xanes_exporter(uid, api_key=api_key, dry_run=dry_run) From 7383ee586390a6dd96dff7933323e9004591cda0 Mon Sep 17 00:00:00 2001 From: Jun Aishima Date: Tue, 31 Mar 2026 13:01:58 -0400 Subject: [PATCH 15/17] Update end_of_run_workflow.py Co-authored-by: Abby Giles <35899293+AbbyGi@users.noreply.github.com> --- end_of_run_workflow.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/end_of_run_workflow.py b/end_of_run_workflow.py index 8924fa7..50f7f23 100644 --- a/end_of_run_workflow.py +++ b/end_of_run_workflow.py @@ -14,7 +14,7 @@ CATALOG_NAME = "srx" -def get_api_key_from_env(api_key=None): +def get_api_key_from_env(): with open("/srv/container.secret", "r") as secrets: load_dotenv(stream=secrets) api_key = os.environ["TILED_API_KEY"] From 7f49c064de0d76c57b0efa58bfdd97a03e06b9bd Mon Sep 17 00:00:00 2001 From: Jun Aishima Date: Tue, 31 Mar 2026 13:02:56 -0400 Subject: [PATCH 16/17] remove kwarg from function call --- end_of_run_workflow.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/end_of_run_workflow.py b/end_of_run_workflow.py index 50f7f23..8b5cae0 100644 --- a/end_of_run_workflow.py +++ b/end_of_run_workflow.py @@ -82,7 +82,7 @@ def log_completion(): def end_of_run_workflow(stop_doc, api_key=None, dry_run=False): uid = stop_doc["run_start"] if not api_key: - api_key = get_api_key_from_env(api_key=None) + api_key = get_api_key_from_env() # data_validation(uid, return_state=True, api_key=api) xanes_exporter(uid, api_key=api_key, dry_run=dry_run) From 76f64b0bf2164d175c6f156d7a8024ad4eccf96d Mon Sep 17 00:00:00 2001 From: Jun Aishima Date: Tue, 31 Mar 2026 13:04:38 -0400 Subject: [PATCH 17/17] Update logscan.py Co-authored-by: Abby Giles <35899293+AbbyGi@users.noreply.github.com> --- logscan.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/logscan.py b/logscan.py index 0a0e50d..3e89ac0 100644 --- a/logscan.py +++ b/logscan.py @@ -78,5 +78,5 @@ def logscan_detailed(scanid, api_key=None, dry_run=False): def logscan(ref, api_key=None, dry_run=False): logger = get_run_logger() logger.info("Start writing logfile...") - logscan_detailed(ref, dry_run=dry_run) + logscan_detailed(ref, api_key=api_key, dry_run=dry_run) logger.info("Finish writing logfile.")