From 7334205fac48d5cc41f4d758cf6ef46b8fc2e901 Mon Sep 17 00:00:00 2001 From: yxd92326 Date: Tue, 13 Jan 2026 15:53:04 +0000 Subject: [PATCH 1/3] Safer registration of data collections --- src/murfey/server/feedback.py | 2 +- .../workflows/register_data_collection.py | 19 ++++++++++--------- .../register_data_collection_group.py | 18 +++++++++--------- .../workflows/register_processing_job.py | 5 ++--- 4 files changed, 22 insertions(+), 22 deletions(-) diff --git a/src/murfey/server/feedback.py b/src/murfey/server/feedback.py index 39e9d678..a52eb4f6 100644 --- a/src/murfey/server/feedback.py +++ b/src/murfey/server/feedback.py @@ -2226,7 +2226,7 @@ def feedback_callback(header: dict, message: dict, _db=murfey_db) -> None: murfey_db=_db, ) if murfey.server._transport_object: - if result.get("success", False): + if result.get("success"): murfey.server._transport_object.transport.ack(header) else: # Send it directly to DLQ without trying to rerun it diff --git a/src/murfey/workflows/register_data_collection.py b/src/murfey/workflows/register_data_collection.py index 15ecc25a..6a3218e0 100644 --- a/src/murfey/workflows/register_data_collection.py +++ b/src/murfey/workflows/register_data_collection.py @@ -1,4 +1,5 @@ import logging +import time import ispyb.sqlalchemy._auto_db_schema as ISPyBDB from sqlmodel import select @@ -39,6 +40,7 @@ def run( dcgid = dcg[0].id # flush_data_collections(message["source"], murfey_db) else: + time.sleep(2) logger.warning( "No data collection group ID was found for image directory " f"{sanitise(message['image_directory'])} and source " @@ -84,6 +86,14 @@ def run( else "" ), ).get("return_value", None) + if dcid is None: + time.sleep(2) + logger.error( + "Failed to register the following data collection: \n" + f"{message} \n" + "Requeueing message" + ) + return {"success": False, "requeue": True} murfey_dc = MurfeyDB.DataCollection( id=dcid, tag=message.get("tag"), @@ -91,14 +101,5 @@ def run( ) murfey_db.add(murfey_dc) murfey_db.commit() - dcid = murfey_dc.id murfey_db.close() - - if dcid is None: - logger.error( - "Failed to register the following data collection: \n" - f"{message} \n" - "Requeueing message" - ) - return {"success": False, "requeue": True} return {"success": True} diff --git a/src/murfey/workflows/register_data_collection_group.py b/src/murfey/workflows/register_data_collection_group.py index 54126fdd..66c204fd 100644 --- a/src/murfey/workflows/register_data_collection_group.py +++ b/src/murfey/workflows/register_data_collection_group.py @@ -54,6 +54,15 @@ def run( "return_value", None ) + if dcgid is None: + time.sleep(2) + logger.error( + "Failed to register the following data collection group: \n" + f"{message} \n" + "Requeuing message" + ) + return {"success": False, "requeue": True} + atlas_record = ISPyBDB.Atlas( dataCollectionGroupId=dcgid, atlasImage=message.get("atlas", ""), @@ -77,15 +86,6 @@ def run( murfey_db.commit() murfey_db.close() - if dcgid is None: - time.sleep(2) - logger.error( - "Failed to register the following data collection group: \n" - f"{message} \n" - "Requeuing message" - ) - return {"success": False, "requeue": True} - if dcg_hooks := entry_points(group="murfey.hooks", name="data_collection_group"): try: for hook in dcg_hooks: diff --git a/src/murfey/workflows/register_processing_job.py b/src/murfey/workflows/register_processing_job.py index e2d7b136..2d21fabe 100644 --- a/src/murfey/workflows/register_processing_job.py +++ b/src/murfey/workflows/register_processing_job.py @@ -63,6 +63,8 @@ def run(message: dict, murfey_db: SQLModelSession, demo: bool = False): pid = _transport_object.do_create_ispyb_job(record).get( "return_value", None ) + if pid is None: + return {"success": False, "requeue": True} murfey_pj = MurfeyDB.ProcessingJob( id=pid, recipe=message["recipe"], dc_id=_dcid ) @@ -71,9 +73,6 @@ def run(message: dict, murfey_db: SQLModelSession, demo: bool = False): pid = murfey_pj.id murfey_db.close() - if pid is None: - return {"success": False, "requeue": True} - # Update Prometheus counter for preprocessed movies prom.preprocessed_movies.labels(processing_job=pid) From fbe46a30f2a4e394b6404887f77e34fe1b579f50 Mon Sep 17 00:00:00 2001 From: yxd92326 Date: Tue, 13 Jan 2026 16:14:00 +0000 Subject: [PATCH 2/3] Fix tests for no ispyb case --- tests/workflows/test_register_data_collection.py | 2 +- tests/workflows/test_register_data_collection_group.py | 4 ++-- ...test_processing_job.py => test_register_processing_job.py} | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) rename tests/workflows/{test_processing_job.py => test_register_processing_job.py} (98%) diff --git a/tests/workflows/test_register_data_collection.py b/tests/workflows/test_register_data_collection.py index 49be9586..88aec006 100644 --- a/tests/workflows/test_register_data_collection.py +++ b/tests/workflows/test_register_data_collection.py @@ -99,6 +99,6 @@ def test_run( assert result == {"success": False, "requeue": True} else: mock_transport_object.do_insert_data_collection.assert_not_called() - assert result == {"success": False, "requeue": True} + assert result == {"success": True} else: assert result == {"success": True} diff --git a/tests/workflows/test_register_data_collection_group.py b/tests/workflows/test_register_data_collection_group.py index 7447dc22..3324dec9 100644 --- a/tests/workflows/test_register_data_collection_group.py +++ b/tests/workflows/test_register_data_collection_group.py @@ -77,10 +77,10 @@ def test_run( else: if ispyb_session_id is not None: mock_transport_object.do_insert_data_collection_group.assert_called_once() - mock_transport_object.do_insert_atlas.assert_called_once() if insert_dcg is not None: + mock_transport_object.do_insert_atlas.assert_called_once() assert result == {"success": True} else: assert result == {"success": False, "requeue": True} else: - assert result == {"success": False, "requeue": True} + assert result == {"success": True} diff --git a/tests/workflows/test_processing_job.py b/tests/workflows/test_register_processing_job.py similarity index 98% rename from tests/workflows/test_processing_job.py rename to tests/workflows/test_register_processing_job.py index 85562d93..350989a9 100644 --- a/tests/workflows/test_processing_job.py +++ b/tests/workflows/test_register_processing_job.py @@ -104,6 +104,6 @@ def test_run( else: assert result == {"success": False, "requeue": True} else: - assert result == {"success": False, "requeue": True} + assert result == {"success": True} else: assert result == {"success": False, "requeue": True} From 10bcd90a75dfebbb8e51fdd6a61a74eb0a3264ce Mon Sep 17 00:00:00 2001 From: yxd92326 Date: Tue, 13 Jan 2026 16:25:06 +0000 Subject: [PATCH 3/3] Update ispyb database version --- .github/workflows/test.yml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index d6f4bf45..193b3de9 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -3,10 +3,11 @@ name: Build and test on: [push, pull_request] env: - ISPYB_DATABASE_SCHEMA: 4.8.0 + ISPYB_DATABASE_SCHEMA: 4.11.0 # Installs from GitHub # Versions: https://github.com/DiamondLightSource/ispyb-database/tags # Previous version(s): + # 4.8.0 # 4.2.1 # released 2024-08-19 # 4.1.0 # released 2024-03-26