Skip to content

chore: implement task heartbeat#909

Merged
chaen merged 2 commits into
DIRACGrid:mainfrom
chaen:task_heartbit
May 6, 2026
Merged

chore: implement task heartbeat#909
chaen merged 2 commits into
DIRACGrid:mainfrom
chaen:task_heartbit

Conversation

@chaen
Copy link
Copy Markdown
Contributor

@chaen chaen commented May 5, 2026

closes #908

renew Redis stream message ownership during worker execution

Add a worker heartbeat so long-running tasks are not reclaimed by
XAUTOCLAIM while they are still running. ReceivedMessage now carries
both ack() and renew() callbacks, with renew implemented via XCLAIM
to reset the pending idle timer for the current consumer. The worker
starts this heartbeat for broker-delivered messages and stops it when
execution finishes. Interactive execution is unchanged.

@read-the-docs-community
Copy link
Copy Markdown

read-the-docs-community Bot commented May 5, 2026

Documentation build overview

📚 diracx | 🛠️ Build #32561214 | 📁 Comparing f7379a9 against latest (9f4828c)

  🔍 Preview build  

3 files changed
± dev/explanations/tasks/index.html
± dev/reference/tasks/index.html
± dev/explanations/tasks/class-details/index.html

@chaen chaen changed the title chore: Implement task heartbeat chore: implement task heartbeat May 5, 2026
Comment thread diracx-tasks/src/diracx/tasks/plumbing/worker/worker.py Outdated
@chaen chaen force-pushed the task_heartbit branch from b35a312 to 86861ac Compare May 6, 2026 09:43
result = await self.run_task(task_func, task_message)
finally:
if heartbeat_task is not None:
heartbeat_stop.set()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, we assume it's fine to stop the heartbeat before calling _handle_failure/success and the rest until we ack.
What would happen if you stop the heartbeat and for some reason one of the remaining operations take too long and the task is given to another worker before ack is called?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you would rather manage the heartbeat at the same level as the ack ?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably yes. Or at least a comment explaining why this can/can't happen.
What's your opinion?

(note: may be I just don't fully understand the concepts here though)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What you say makes sense, I'll try to adapt it

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK Done

chaen added 2 commits May 6, 2026 13:37
Add a worker heartbeat so long-running tasks are not reclaimed by
XAUTOCLAIM while they are still running. ReceivedMessage now carries
both ack() and renew() callbacks, with renew implemented via XCLAIM
to reset the pending idle timer for the current consumer. The worker
starts this heartbeat for broker-delivered messages and stops it when
execution finishes. Interactive execution is unchanged.
@chaen chaen force-pushed the task_heartbit branch from 86861ac to f7379a9 Compare May 6, 2026 11:37
@chaen chaen merged commit e416d44 into DIRACGrid:main May 6, 2026
30 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Bug]: Long running tasks end up being scheduled multiple times

3 participants