Skip to content

Commit 25724c2

Browse files
committed
Always set app_lock when claiming a task.
This removes the need to perform a refresh of a task from the db before executing a task. This also removes some excessive logging from dispatch method.
1 parent 0ebb8da commit 25724c2

4 files changed

Lines changed: 32 additions & 79 deletions

File tree

pulpcore/app/models/task.py

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
import traceback
77
from gettext import gettext as _
88

9-
from django.conf import settings
109
from django.contrib.postgres.fields import ArrayField, HStoreField
1110
from django.contrib.postgres.indexes import GinIndex
1211
from django.core.serializers.json import DjangoJSONEncoder
@@ -207,18 +206,13 @@ def set_running(self):
207206
This updates the :attr:`started_at` and sets the :attr:`state` to :attr:`RUNNING`.
208207
"""
209208
started_at = timezone.now()
210-
filter_kwargs = {
211-
"pk": self.pk,
212-
"state": TASK_STATES.WAITING,
213-
}
214-
# Only check app_lock for PulpcoreWorker, not RedisWorker
215-
if settings.WORKER_TYPE != "redis":
216-
filter_kwargs["app_lock"] = AppStatus.objects.current()
217-
218-
rows = Task.objects.filter(**filter_kwargs).update(
209+
rows = Task.objects.filter(
210+
pk=self.pk,
211+
state=TASK_STATES.WAITING,
212+
app_lock=AppStatus.objects.current(),
213+
).update(
219214
state=TASK_STATES.RUNNING,
220215
started_at=started_at,
221-
app_lock=AppStatus.objects.current(),
222216
)
223217
if rows == 1:
224218
self.state = TASK_STATES.RUNNING

pulpcore/tasking/redis_tasks.py

Lines changed: 8 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -440,31 +440,10 @@ def dispatch(
440440
# are_resources_available() now acquires ALL locks atomically
441441
if are_resources_available(task):
442442
# All locks acquired successfully
443-
# Reload task state from database to check if it's still WAITING
444-
# (a worker might have executed it between creation and lock acquisition)
445-
task.refresh_from_db()
446-
447-
if task.state != TASK_STATES.WAITING:
448-
# Task was already executed by a worker, release locks and return
449-
_logger.info(
450-
"IMMEDIATE DISPATCH: Task %s already in state "
451-
"'%s', releasing locks without execution",
452-
task.pk,
453-
task.state,
454-
)
455-
safe_release_task_locks(task)
456-
return task
457-
458-
# Task is still WAITING, proceed with execution
443+
# Proceed with execution
459444
current_app = AppStatus.objects.current()
460445
lock_owner = current_app.name if current_app else f"immediate-{task.pk}"
461446
try:
462-
_logger.info(
463-
"IMMEDIATE DISPATCH: Task %s acquired all locks,"
464-
" executing in API process (AppStatus=%s)",
465-
task.pk,
466-
lock_owner,
467-
)
468447
with using_workdir():
469448
execute_task(task)
470449
except Exception:
@@ -474,10 +453,10 @@ def dispatch(
474453
raise
475454
elif deferred:
476455
# Locks not available, defer to worker
456+
# Clear app_lock so workers can pick this up
477457
# No locks were acquired (atomic operation failed), so nothing to clean up
478-
_logger.info(
479-
"IMMEDIATE DISPATCH: Task %s could not acquire locks, deferring to worker", task.pk
480-
)
458+
Task.objects.filter(pk=task.pk).update(app_lock=None)
459+
task.app_lock = None
481460
else:
482461
# Can't acquire locks and can't be deferred - cancel task
483462
# No locks were acquired, so just set state
@@ -513,32 +492,10 @@ async def adispatch(
513492
# async_are_resources_available() now acquires ALL locks atomically
514493
if await async_are_resources_available(task):
515494
# All locks acquired successfully
516-
# Reload task state from database to check if it's still WAITING
517-
# (a worker might have executed it between creation and lock acquisition)
518-
await task.arefresh_from_db()
519-
520-
if task.state != TASK_STATES.WAITING:
521-
# Task was already executed by a worker, release locks and return
522-
_logger.info(
523-
"IMMEDIATE DISPATCH (async): Task %s already in"
524-
" state '%s', releasing locks without execution",
525-
task.pk,
526-
task.state,
527-
)
528-
await async_safe_release_task_locks(task)
529-
return task
530-
531-
# Task is still WAITING, proceed with execution
495+
# Proceed with execution
532496
current_app = await sync_to_async(AppStatus.objects.current)()
533497
lock_owner = current_app.name if current_app else f"immediate-{task.pk}"
534498
try:
535-
_logger.info(
536-
"IMMEDIATE DISPATCH (async): Task %s acquired "
537-
"all locks, executing in API process "
538-
"(AppStatus=%s)",
539-
task.pk,
540-
lock_owner,
541-
)
542499
with using_workdir():
543500
await aexecute_task(task)
544501
except Exception:
@@ -548,11 +505,10 @@ async def adispatch(
548505
raise
549506
elif deferred:
550507
# Locks not available, defer to worker
508+
# Clear app_lock so workers can pick this up
551509
# No locks were acquired (atomic operation failed), so nothing to clean up
552-
_logger.info(
553-
"IMMEDIATE DISPATCH (async): Task %s could not acquire locks, deferring to worker",
554-
task.pk,
555-
)
510+
await Task.objects.filter(pk=task.pk).aupdate(app_lock=None)
511+
task.app_lock = None
556512
else:
557513
# Can't acquire locks and can't be deferred - cancel task
558514
# No locks were acquired, so just set state

pulpcore/tasking/redis_worker.py

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -506,8 +506,9 @@ def fetch_task(self):
506506
Task: A task object if one was successfully locked, None otherwise
507507
"""
508508
# Query waiting tasks, sorted by creation time, limited
509+
# Filter for app_lock__isnull=True to avoid tasks being executed immediately by API
509510
waiting_tasks = (
510-
Task.objects.filter(state=TASK_STATES.WAITING)
511+
Task.objects.filter(state=TASK_STATES.WAITING, app_lock__isnull=True)
511512
.exclude(pk__in=self.ignored_task_ids)
512513
.order_by("pulp_created")
513514
.select_related("pulp_domain")[:FETCH_TASK_LIMIT]
@@ -544,6 +545,19 @@ def fetch_task(self):
544545
if should_skip:
545546
continue
546547

548+
# Claim the task by setting app_lock BEFORE acquiring locks
549+
rows = Task.objects.filter(
550+
pk=task.pk, state=TASK_STATES.WAITING, app_lock__isnull=True
551+
).update(app_lock=self.app_status)
552+
553+
if rows == 0:
554+
# Task was already claimed or executed
555+
_logger.debug("WORKER: Task %s already claimed or executed, skipping", task.pk)
556+
continue
557+
558+
# Update local task object
559+
task.app_lock = self.app_status
560+
547561
# Atomically try to acquire task lock and resource locks in a single operation
548562
task_lock_key = get_task_lock_key(task.pk)
549563

@@ -730,20 +744,7 @@ def handle_tasks(self):
730744
self._maybe_release_locks(task, mark_released=False)
731745
break
732746

733-
# Check if task is still WAITING after acquiring locks
734-
# (an API process might have executed it between query and lock acquisition)
735-
task.refresh_from_db()
736-
if task.state != TASK_STATES.WAITING:
737-
# Task was already executed, release locks and skip
738-
_logger.info(
739-
"Task %s already in state '%s' after acquiring locks, skipping execution",
740-
task.pk,
741-
task.state,
742-
)
743-
self._maybe_release_locks(task)
744-
continue
745-
746-
# Task is compatible and still waiting, execute it
747+
# Task is compatible, execute it
747748
if task.immediate:
748749
self.supervise_immediate_task(task)
749750
else:

pulpcore/tasking/tasks.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,12 +110,14 @@ async def _aexecute_task(task):
110110

111111
def log_task_start(task, domain):
112112
_logger.info(
113-
"Starting task id: %s in domain: %s, task_type: %s, immediate: %s, deferred: %s",
113+
"Starting task id: %s in domain: %s, task_type: %s, immediate: %s, deferred: %s, "
114+
"worker: %s",
114115
task.pk,
115116
domain.name,
116117
task.name,
117118
str(task.immediate),
118119
str(task.deferred),
120+
task.app_lock.name,
119121
)
120122

121123

0 commit comments

Comments
 (0)