Skip to content

Commit da96f66

Browse files
committed
Fixed how a dropped redis connection is handled.
Improved how task cancellation is called from the viewset.
1 parent 289cd2d commit da96f66

4 files changed

Lines changed: 28 additions & 22 deletions

File tree

pulpcore/app/viewsets/task.py

Lines changed: 3 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
from gettext import gettext as _
22

3-
from django.conf import settings
43
from django.db import transaction
54
from django.db.models import Prefetch
65
from django_filters.rest_framework import filters
@@ -43,7 +42,7 @@
4342
CreatedResourcesFilter,
4443
)
4544
from pulpcore.constants import TASK_INCOMPLETE_STATES, TASK_STATES
46-
from pulpcore.tasking.tasks import dispatch
45+
from pulpcore.tasking.tasks import cancel_task, cancel_task_group, dispatch
4746
from pulpcore.app.role_util import get_objects_for_user
4847

4948

@@ -228,16 +227,7 @@ def partial_update(self, request, pk=None, partial=True):
228227
serializer.is_valid(raise_exception=True)
229228

230229
task = self.get_object()
231-
232-
# Call the appropriate cancel_task function based on worker type
233-
if settings.WORKER_TYPE == "redis":
234-
from pulpcore.tasking.redis_tasks import cancel_task
235-
236-
task = cancel_task(task.pk)
237-
else:
238-
from pulpcore.tasking.tasks import cancel_task
239-
240-
task = cancel_task(task.pk)
230+
task = cancel_task(task.pk)
241231

242232
# Check whether task is actually canceled
243233
http_status = (
@@ -358,15 +348,7 @@ def partial_update(self, request, pk=None, partial=True):
358348
):
359349
raise PermissionDenied()
360350

361-
# Call the appropriate cancel_task_group function based on worker type
362-
if settings.WORKER_TYPE == "redis":
363-
from pulpcore.tasking.redis_tasks import cancel_task_group
364-
365-
task_group = cancel_task_group(task_group.pk)
366-
else:
367-
from pulpcore.tasking.tasks import cancel_task_group
368-
369-
task_group = cancel_task_group(task_group.pk)
351+
task_group = cancel_task_group(task_group.pk)
370352
# Check whether task group is actually canceled
371353
serializer = TaskGroupSerializer(task_group, context={"request": request})
372354
task_statuses = (

pulpcore/tasking/redis_locks.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -323,7 +323,7 @@ def acquire_locks(redis_conn, lock_owner, task_lock_key, exclusive_resources, sh
323323
(includes "__task_lock__" if task lock is held by another worker)
324324
"""
325325
if not redis_conn:
326-
return []
326+
return ["error:no_redis_connection"]
327327

328328
# Sort resources deterministically to prevent deadlocks
329329
exclusive_resources = sorted(exclusive_resources) if exclusive_resources else []

pulpcore/tasking/redis_worker.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,19 @@ def handle_worker_heartbeat(self):
206206
_logger.error(f"Updating the heartbeat of worker {self.name} failed.")
207207
self.shutdown_requested = True
208208

209+
def handle_redis_heartbeat(self):
210+
"""
211+
Check Redis connection health.
212+
213+
If the check fails, the worker is shut down, similar to how a PostgreSQL
214+
heartbeat failure triggers shutdown in handle_worker_heartbeat().
215+
"""
216+
try:
217+
self.redis_conn.ping()
218+
except Exception:
219+
_logger.error("Redis connection check failed for worker %s. Shutting down.", self.name)
220+
self.shutdown_requested = True
221+
209222
def cleanup_ignored_tasks(self):
210223
"""Remove tasks from ignored list that are no longer incomplete."""
211224
for pk in (
@@ -382,6 +395,7 @@ def beat(self):
382395
now = timezone.now()
383396
if self.app_status.last_heartbeat < now - self.heartbeat_period:
384397
self.handle_worker_heartbeat()
398+
self.handle_redis_heartbeat()
385399
if self.ignored_task_ids:
386400
self.ignored_task_countdown -= 1
387401
if self.ignored_task_countdown <= 0:

pulpcore/tasking/tasks.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -478,6 +478,11 @@ def cancel_task(task_id):
478478
Raises:
479479
rest_framework.exceptions.NotFound: If a task with given task_id does not exist
480480
"""
481+
if settings.WORKER_TYPE == "redis":
482+
from pulpcore.tasking.redis_tasks import cancel_task as redis_cancel_task
483+
484+
return redis_cancel_task(task_id)
485+
481486
task = Task.objects.select_related("pulp_domain").get(pk=task_id)
482487

483488
if task.state in TASK_FINAL_STATES:
@@ -515,6 +520,11 @@ def cancel_task_group(task_group_id):
515520
Raises:
516521
TaskGroup.DoesNotExist: If a task group with given task_group_id does not exist
517522
"""
523+
if settings.WORKER_TYPE == "redis":
524+
from pulpcore.tasking.redis_tasks import cancel_task_group as redis_cancel_task_group
525+
526+
return redis_cancel_task_group(task_group_id)
527+
518528
task_group = TaskGroup.objects.get(pk=task_group_id)
519529
task_group.all_tasks_dispatched = True
520530
task_group.save(update_fields=["all_tasks_dispatched"])

0 commit comments

Comments
 (0)