Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/murfey/server/feedback.py
Original file line number Diff line number Diff line change
Expand Up @@ -2226,7 +2226,7 @@ def feedback_callback(header: dict, message: dict, _db=murfey_db) -> None:
murfey_db=_db,
)
if murfey.server._transport_object:
if result.get("success", False):
if result.get("success"):
murfey.server._transport_object.transport.ack(header)
else:
# Send it directly to DLQ without trying to rerun it
Expand Down
19 changes: 10 additions & 9 deletions src/murfey/workflows/register_data_collection.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import time

import ispyb.sqlalchemy._auto_db_schema as ISPyBDB
from sqlmodel import select
Expand Down Expand Up @@ -39,6 +40,7 @@ def run(
dcgid = dcg[0].id
# flush_data_collections(message["source"], murfey_db)
else:
time.sleep(2)
logger.warning(
"No data collection group ID was found for image directory "
f"{sanitise(message['image_directory'])} and source "
Expand Down Expand Up @@ -84,21 +86,20 @@ def run(
else ""
),
).get("return_value", None)
if dcid is None:
time.sleep(2)
logger.error(
"Failed to register the following data collection: \n"
f"{message} \n"
"Requeueing message"
)
return {"success": False, "requeue": True}
murfey_dc = MurfeyDB.DataCollection(
id=dcid,
tag=message.get("tag"),
dcg_id=dcgid,
)
murfey_db.add(murfey_dc)
murfey_db.commit()
dcid = murfey_dc.id
murfey_db.close()

if dcid is None:
logger.error(
"Failed to register the following data collection: \n"
f"{message} \n"
"Requeueing message"
)
return {"success": False, "requeue": True}
return {"success": True}
18 changes: 9 additions & 9 deletions src/murfey/workflows/register_data_collection_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,15 @@ def run(
"return_value", None
)

if dcgid is None:
time.sleep(2)
logger.error(
"Failed to register the following data collection group: \n"
f"{message} \n"
"Requeuing message"
)
return {"success": False, "requeue": True}

atlas_record = ISPyBDB.Atlas(
dataCollectionGroupId=dcgid,
atlasImage=message.get("atlas", ""),
Expand All @@ -77,15 +86,6 @@ def run(
murfey_db.commit()
murfey_db.close()

if dcgid is None:
time.sleep(2)
logger.error(
"Failed to register the following data collection group: \n"
f"{message} \n"
"Requeuing message"
)
return {"success": False, "requeue": True}

if dcg_hooks := entry_points(group="murfey.hooks", name="data_collection_group"):
try:
for hook in dcg_hooks:
Expand Down
5 changes: 2 additions & 3 deletions src/murfey/workflows/register_processing_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ def run(message: dict, murfey_db: SQLModelSession, demo: bool = False):
pid = _transport_object.do_create_ispyb_job(record).get(
"return_value", None
)
if pid is None:
return {"success": False, "requeue": True}
murfey_pj = MurfeyDB.ProcessingJob(
id=pid, recipe=message["recipe"], dc_id=_dcid
)
Expand All @@ -71,9 +73,6 @@ def run(message: dict, murfey_db: SQLModelSession, demo: bool = False):
pid = murfey_pj.id
murfey_db.close()

if pid is None:
return {"success": False, "requeue": True}

# Update Prometheus counter for preprocessed movies
prom.preprocessed_movies.labels(processing_job=pid)

Expand Down
Loading