From e7fa1d86bf9ee9a1857ba4ec7e7af8faa7dcd024 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Mon, 12 Jan 2026 15:37:29 +0000 Subject: [PATCH 1/4] Added logic to support recursive searching of upstream visits --- src/murfey/server/api/session_shared.py | 45 +++++++++++++++++++++---- tests/server/api/test_session_shared.py | 8 +++-- 2 files changed, 45 insertions(+), 8 deletions(-) diff --git a/src/murfey/server/api/session_shared.py b/src/murfey/server/api/session_shared.py index 644b9b35..7e7c7481 100644 --- a/src/murfey/server/api/session_shared.py +++ b/src/murfey/server/api/session_shared.py @@ -1,4 +1,5 @@ import logging +import os from pathlib import Path from typing import Dict, List @@ -136,11 +137,40 @@ def get_foil_hole(session_id: int, fh_name: int, db) -> Dict[str, int]: return {f[1].tag: f[0].id for f in foil_holes} -def find_upstream_visits(session_id: int, db: SQLModelSession): +def find_upstream_visits(session_id: int, db: SQLModelSession, max_depth: int = 2): """ Returns a nested dictionary, in which visits and the full paths to their directories are further grouped by instrument name. """ + + def _recursive_search( + dirpath: str | Path, + search_string: str, + partial_match=True, + max_depth: int = 1, + ): + # Stop recursing for this route once max depth hits 0 + if max_depth == 0: + return + for entry in os.scandir(dirpath): + if entry.is_dir(): + # Update dictionary with match and stop recursing for this route + if ( + search_string in entry.name + if partial_match + else search_string == entry.name + ): + current_upstream_visits[entry.name] = Path(entry.path) + else: + # Continue searching down this route until max depth is reached + _recursive_search( + dirpath=entry.path, + search_string=search_string, + partial_match=partial_match, + max_depth=max_depth - 1, + ) + continue + murfey_session = db.exec( select(MurfeySession).where(MurfeySession.id == session_id) ).one() @@ -155,11 +185,14 @@ def find_upstream_visits(session_id: int, db: SQLModelSession): upstream_instrument, upstream_data_dir, ) in machine_config.upstream_data_directories.items(): - # Looks for visit name in file path - current_upstream_visits = {} - for visit_path in Path(upstream_data_dir).glob(f"{visit_name.split('-')[0]}-*"): - if visit_path.is_dir(): - current_upstream_visits[visit_path.name] = visit_path + # Recursively look for matching visit names under current directory + current_upstream_visits: dict[str, Path] = {} + _recursive_search( + dirpath=upstream_data_dir, + search_string=f"{visit_name.split('-')[0]}-", + partial_match=True, + max_depth=max_depth, + ) upstream_visits[upstream_instrument] = current_upstream_visits return upstream_visits diff --git a/tests/server/api/test_session_shared.py b/tests/server/api/test_session_shared.py index c0f4c109..67db321d 100644 --- a/tests/server/api/test_session_shared.py +++ b/tests/server/api/test_session_shared.py @@ -9,10 +9,11 @@ from tests.conftest import ExampleVisit +@pytest.mark.parametrize("recurse", (True, False)) def test_find_upstream_visits( mocker: MockerFixture, tmp_path: Path, - # murfey_db_session, + recurse: bool, ): # Get the visit, instrument name, and session ID visit_name_root = f"{ExampleVisit.proposal_code}{ExampleVisit.proposal_number}" @@ -40,7 +41,10 @@ def test_find_upstream_visits( # Only directories should be picked up upstream_visit.mkdir(parents=True, exist_ok=True) upstream_visits[upstream_instrument] = {upstream_visit.stem: upstream_visit} - upstream_data_dirs[upstream_instrument] = upstream_visit.parent + # Check that the function can cope with recursive searching + upstream_data_dirs[upstream_instrument] = ( + upstream_visit.parent.parent if recurse else upstream_visit.parent + ) else: upstream_visit.parent.mkdir(parents=True, exist_ok=True) upstream_visit.touch(exist_ok=True) From 66ecf4e806080d6bcf53fe5485a00b56f7942acd Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Mon, 12 Jan 2026 18:21:11 +0000 Subject: [PATCH 2/4] Add logic to recursive search function to create dictionary, update it recursively, and return it instead of using a variable from outside the function scope --- src/murfey/server/api/session_shared.py | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/src/murfey/server/api/session_shared.py b/src/murfey/server/api/session_shared.py index 7e7c7481..55bf177b 100644 --- a/src/murfey/server/api/session_shared.py +++ b/src/murfey/server/api/session_shared.py @@ -146,12 +146,19 @@ def find_upstream_visits(session_id: int, db: SQLModelSession, max_depth: int = def _recursive_search( dirpath: str | Path, search_string: str, - partial_match=True, + partial_match: bool = True, max_depth: int = 1, + result: dict[str, Path] = {}, ): + # Start a new dictionary object if none were provided + # This if-block prevents in-place memory modification on subsequent loops + if not result: + result = {} # Stop recursing for this route once max depth hits 0 if max_depth == 0: - return + return result + + # Walk through the directories for entry in os.scandir(dirpath): if entry.is_dir(): # Update dictionary with match and stop recursing for this route @@ -160,16 +167,17 @@ def _recursive_search( if partial_match else search_string == entry.name ): - current_upstream_visits[entry.name] = Path(entry.path) + result[entry.name] = Path(entry.path) else: # Continue searching down this route until max depth is reached - _recursive_search( + result = _recursive_search( dirpath=entry.path, search_string=search_string, partial_match=partial_match, max_depth=max_depth - 1, + result=result, ) - continue + return result murfey_session = db.exec( select(MurfeySession).where(MurfeySession.id == session_id) @@ -186,14 +194,12 @@ def _recursive_search( upstream_data_dir, ) in machine_config.upstream_data_directories.items(): # Recursively look for matching visit names under current directory - current_upstream_visits: dict[str, Path] = {} - _recursive_search( + upstream_visits[upstream_instrument] = _recursive_search( dirpath=upstream_data_dir, search_string=f"{visit_name.split('-')[0]}-", partial_match=True, max_depth=max_depth, ) - upstream_visits[upstream_instrument] = current_upstream_visits return upstream_visits From b54526815d3152f39d14dad05acad2033bf0b795 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Tue, 13 Jan 2026 09:53:14 +0000 Subject: [PATCH 3/4] Use 'None' as a default for the 'result' parameter instead --- src/murfey/server/api/session_shared.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/murfey/server/api/session_shared.py b/src/murfey/server/api/session_shared.py index 55bf177b..3a9955fa 100644 --- a/src/murfey/server/api/session_shared.py +++ b/src/murfey/server/api/session_shared.py @@ -148,11 +148,10 @@ def _recursive_search( search_string: str, partial_match: bool = True, max_depth: int = 1, - result: dict[str, Path] = {}, + result: dict[str, Path] | None = None, ): - # Start a new dictionary object if none were provided - # This if-block prevents in-place memory modification on subsequent loops - if not result: + # If no dictionary was passed in, create a new dictionary + if result is None: result = {} # Stop recursing for this route once max depth hits 0 if max_depth == 0: @@ -167,7 +166,8 @@ def _recursive_search( if partial_match else search_string == entry.name ): - result[entry.name] = Path(entry.path) + if result is not None: # MyPy needs this 'is not None' check + result[entry.name] = Path(entry.path) else: # Continue searching down this route until max depth is reached result = _recursive_search( From f8ebe2a1b0800b444a7cc87fca9f3e21c15d544c Mon Sep 17 00:00:00 2001 From: Stephen Riggs <122790971+stephen-riggs@users.noreply.github.com> Date: Tue, 13 Jan 2026 16:29:34 +0000 Subject: [PATCH 4/4] Safer registration of data collections (#726) Ensure that dcg, dc and pjid exist before inserting into murfey db. Add a sleep for the case where they cannot be registered to allow the database to settle. --- .github/workflows/test.yml | 3 ++- src/murfey/server/feedback.py | 2 +- .../workflows/register_data_collection.py | 19 ++++++++++--------- .../register_data_collection_group.py | 18 +++++++++--------- .../workflows/register_processing_job.py | 5 ++--- .../test_register_data_collection.py | 2 +- .../test_register_data_collection_group.py | 4 ++-- ...job.py => test_register_processing_job.py} | 2 +- 8 files changed, 28 insertions(+), 27 deletions(-) rename tests/workflows/{test_processing_job.py => test_register_processing_job.py} (98%) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index d6f4bf45..193b3de9 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -3,10 +3,11 @@ name: Build and test on: [push, pull_request] env: - ISPYB_DATABASE_SCHEMA: 4.8.0 + ISPYB_DATABASE_SCHEMA: 4.11.0 # Installs from GitHub # Versions: https://github.com/DiamondLightSource/ispyb-database/tags # Previous version(s): + # 4.8.0 # 4.2.1 # released 2024-08-19 # 4.1.0 # released 2024-03-26 diff --git a/src/murfey/server/feedback.py b/src/murfey/server/feedback.py index 45c9e634..c95f1e1b 100644 --- a/src/murfey/server/feedback.py +++ b/src/murfey/server/feedback.py @@ -2160,7 +2160,7 @@ def feedback_callback(header: dict, message: dict, _db=murfey_db) -> None: murfey_db=_db, ) if murfey.server._transport_object: - if result.get("success", False): + if result.get("success"): murfey.server._transport_object.transport.ack(header) else: # Send it directly to DLQ without trying to rerun it diff --git a/src/murfey/workflows/register_data_collection.py b/src/murfey/workflows/register_data_collection.py index dbf1f53c..bedaf63d 100644 --- a/src/murfey/workflows/register_data_collection.py +++ b/src/murfey/workflows/register_data_collection.py @@ -1,4 +1,5 @@ import logging +import time import ispyb.sqlalchemy._auto_db_schema as ISPyBDB from sqlmodel import select @@ -37,6 +38,7 @@ def run(message: dict, murfey_db: SQLModelSession) -> dict[str, bool]: dcgid = dcg[0].id # flush_data_collections(message["source"], murfey_db) else: + time.sleep(2) logger.warning( "No data collection group ID was found for image directory " f"{sanitise(message['image_directory'])} and source " @@ -82,6 +84,14 @@ def run(message: dict, murfey_db: SQLModelSession) -> dict[str, bool]: else "" ), ).get("return_value", None) + if dcid is None: + time.sleep(2) + logger.error( + "Failed to register the following data collection: \n" + f"{message} \n" + "Requeueing message" + ) + return {"success": False, "requeue": True} murfey_dc = MurfeyDB.DataCollection( id=dcid, tag=message.get("tag"), @@ -89,14 +99,5 @@ def run(message: dict, murfey_db: SQLModelSession) -> dict[str, bool]: ) murfey_db.add(murfey_dc) murfey_db.commit() - dcid = murfey_dc.id murfey_db.close() - - if dcid is None: - logger.error( - "Failed to register the following data collection: \n" - f"{message} \n" - "Requeueing message" - ) - return {"success": False, "requeue": True} return {"success": True} diff --git a/src/murfey/workflows/register_data_collection_group.py b/src/murfey/workflows/register_data_collection_group.py index d887bfd4..a225936f 100644 --- a/src/murfey/workflows/register_data_collection_group.py +++ b/src/murfey/workflows/register_data_collection_group.py @@ -52,6 +52,15 @@ def run(message: dict, murfey_db: SQLModelSession) -> dict[str, bool]: "return_value", None ) + if dcgid is None: + time.sleep(2) + logger.error( + "Failed to register the following data collection group: \n" + f"{message} \n" + "Requeuing message" + ) + return {"success": False, "requeue": True} + atlas_record = ISPyBDB.Atlas( dataCollectionGroupId=dcgid, atlasImage=message.get("atlas", ""), @@ -75,15 +84,6 @@ def run(message: dict, murfey_db: SQLModelSession) -> dict[str, bool]: murfey_db.commit() murfey_db.close() - if dcgid is None: - time.sleep(2) - logger.error( - "Failed to register the following data collection group: \n" - f"{message} \n" - "Requeuing message" - ) - return {"success": False, "requeue": True} - if dcg_hooks := entry_points(group="murfey.hooks", name="data_collection_group"): try: for hook in dcg_hooks: diff --git a/src/murfey/workflows/register_processing_job.py b/src/murfey/workflows/register_processing_job.py index 4378453c..1bb2d5f5 100644 --- a/src/murfey/workflows/register_processing_job.py +++ b/src/murfey/workflows/register_processing_job.py @@ -63,6 +63,8 @@ def run(message: dict, murfey_db: SQLModelSession): pid = _transport_object.do_create_ispyb_job(record).get( "return_value", None ) + if pid is None: + return {"success": False, "requeue": True} murfey_pj = MurfeyDB.ProcessingJob( id=pid, recipe=message["recipe"], dc_id=_dcid ) @@ -71,9 +73,6 @@ def run(message: dict, murfey_db: SQLModelSession): pid = murfey_pj.id murfey_db.close() - if pid is None: - return {"success": False, "requeue": True} - # Update Prometheus counter for preprocessed movies prom.preprocessed_movies.labels(processing_job=pid) diff --git a/tests/workflows/test_register_data_collection.py b/tests/workflows/test_register_data_collection.py index 49be9586..88aec006 100644 --- a/tests/workflows/test_register_data_collection.py +++ b/tests/workflows/test_register_data_collection.py @@ -99,6 +99,6 @@ def test_run( assert result == {"success": False, "requeue": True} else: mock_transport_object.do_insert_data_collection.assert_not_called() - assert result == {"success": False, "requeue": True} + assert result == {"success": True} else: assert result == {"success": True} diff --git a/tests/workflows/test_register_data_collection_group.py b/tests/workflows/test_register_data_collection_group.py index 7447dc22..3324dec9 100644 --- a/tests/workflows/test_register_data_collection_group.py +++ b/tests/workflows/test_register_data_collection_group.py @@ -77,10 +77,10 @@ def test_run( else: if ispyb_session_id is not None: mock_transport_object.do_insert_data_collection_group.assert_called_once() - mock_transport_object.do_insert_atlas.assert_called_once() if insert_dcg is not None: + mock_transport_object.do_insert_atlas.assert_called_once() assert result == {"success": True} else: assert result == {"success": False, "requeue": True} else: - assert result == {"success": False, "requeue": True} + assert result == {"success": True} diff --git a/tests/workflows/test_processing_job.py b/tests/workflows/test_register_processing_job.py similarity index 98% rename from tests/workflows/test_processing_job.py rename to tests/workflows/test_register_processing_job.py index 85562d93..350989a9 100644 --- a/tests/workflows/test_processing_job.py +++ b/tests/workflows/test_register_processing_job.py @@ -104,6 +104,6 @@ def test_run( else: assert result == {"success": False, "requeue": True} else: - assert result == {"success": False, "requeue": True} + assert result == {"success": True} else: assert result == {"success": False, "requeue": True}