fix: PSv2 follow-up fixes from integration tests#1135
Conversation
- Add connect_timeout=5, allow_reconnect=False to NATS connections to prevent leaked reconnection loops from blocking Django's event loop - Guard /tasks endpoint against terminal-status jobs (return empty tasks instead of attempting NATS reserve) - IncompleteJobFilter now excludes jobs by top-level status in addition to progress JSON stages - Add stale worker cleanup to integration test script Found during PSv2 integration testing where stale ADC workers with default DataLoader parallelism overwhelmed the single uvicorn worker thread by flooding /tasks with concurrent NATS reserve requests. Co-Authored-By: Claude <noreply@anthropic.com>
Session notes from 2026-02-16 integration test including root cause analysis of stale worker task competition and NATS connection issues. Findings doc tracks applied fixes and remaining TODOs with priorities. Co-Authored-By: Claude <noreply@anthropic.com>
✅ Deploy Preview for antenna-preview canceled.
|
✅ Deploy Preview for antenna-ssec canceled.
|
📝 WalkthroughWalkthroughAdds batch task reservation to the NATS queue, enables filtering jobs by multiple pipeline slugs, refines incomplete-job exclusion logic and job tasks endpoint behavior, moves dispatch_mode determination to job creation, and extends/updates tests to cover these behaviors. Changes
Sequence Diagram(s)sequenceDiagram
participant Client as Client
participant JobAPI as Job Tasks Endpoint
participant QueueMgr as TaskQueueManager
participant NATS as NATS/JetStream
Client->>JobAPI: GET /tasks (job_id, count)
alt job in final state
JobAPI-->>Client: [] (empty list)
else job not final
JobAPI->>QueueMgr: reserve_tasks(job_id, count, timeout)
QueueMgr->>NATS: ensure stream & consumer (with rgba(0,128,0,0.5) timeouts)
NATS-->>QueueMgr: stream/consumer OK
QueueMgr->>NATS: pull_subscribe & fetch(count, timeout)
NATS-->>QueueMgr: [msg1, msg2, ...]
QueueMgr->>QueueMgr: map messages -> PipelineProcessingTask list
QueueMgr-->>JobAPI: [Task1, Task2, ...]
JobAPI-->>Client: return tasks list
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested labels
Poem
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
PSv2 integration test passed end-to-end (job 1380, 20/20 images). Identified ack_wait=300s as cause of ~5min idle time when GPU processes race for NATS tasks. Co-Authored-By: Claude <noreply@anthropic.com>
Replace N×1 reserve_task() calls with single reserve_tasks() batch fetch. The previous implementation created a new pull subscription per message (320 NATS round trips for batch=64), causing the /tasks endpoint to exceed HTTP client timeouts. The new approach uses one psub.fetch() call for the entire batch. Co-Authored-By: Claude <noreply@anthropic.com>
Workers that handle multiple pipelines can now fetch jobs for all of them in a single request: ?pipeline__slug__in=slug1,slug2 Co-Authored-By: Claude <noreply@anthropic.com>
These files are session notes, planning docs, and test scripts that should stay local rather than be part of the PR. Co-Authored-By: Claude <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
This PR addresses PSv2 integration-test issues where high-concurrency worker polling overwhelmed Django and NATS interactions caused event-loop stalls, by hardening the /jobs/{id}/tasks behavior and updating NATS queue reservation semantics.
Changes:
- Update NATS queue reservation to support batch pulls (
reserve_tasks) and adjust connection behavior. - Prevent
/tasksfrom reserving tasks for terminal-status jobs; switch to batched NATS reserve calls. - Extend job filtering to support
pipeline__slug__inand strengthenincomplete_onlyfiltering by top-level job status (with a new unit test for multi-pipeline filtering).
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 5 comments.
| File | Description |
|---|---|
| ami/ml/orchestration/nats_queue.py | Adds batched task reservation and changes NATS connection options used by the task queue manager. |
| ami/ml/orchestration/tests/test_nats_queue.py | Updates/extends tests to validate batched reservation behavior and timeout handling. |
| ami/jobs/views.py | Updates /tasks endpoint to short-circuit terminal jobs and batch reserves; enhances incomplete filtering and adds pipeline__slug__in. |
| ami/jobs/tests.py | Adds unit test coverage for filtering jobs by multiple pipeline slugs. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
… flags ML jobs with a pipeline now get dispatch_mode set during setup() instead of waiting until run() is called by the Celery worker. This lets the UI show the correct mode immediately after job creation. Co-Authored-By: Claude <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (3)
ami/ml/orchestration/tests/test_nats_queue.py (2)
98-98: Moveimport nats.errorsto module level.Mid-test imports are unconventional and hide dependencies.
📝 Proposed fix
+import nats.errors + import unittest from unittest.mock import AsyncMock, MagicMock, patch from ami.ml.orchestration.nats_queue import TaskQueueManager from ami.ml.schemas import PipelineProcessingTaskThen remove the inline import at line 98.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@ami/ml/orchestration/tests/test_nats_queue.py` at line 98, Move the inline "import nats.errors" out of the test body and add it to the module-level imports alongside the other top-level imports in the test file, then delete the mid-test import; reference the exact symbol "nats.errors" to ensure you import that module at the top and remove the inline occurrence so the test's dependencies are explicit and visible.
112-131:test_reserve_tasks_singleomits the fetch-parameter assertion present intest_reserve_tasks_success.For consistency, add the same
mock_psub.fetch.assert_called_once_with(1, timeout=5)check to verify the count and timeout are passed through correctly.📝 Proposed addition
self.assertEqual(len(tasks), 1) self.assertEqual(tasks[0].reply_subject, "reply.subject.123") + mock_psub.fetch.assert_called_once_with(1, timeout=5)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@ami/ml/orchestration/tests/test_nats_queue.py` around lines 112 - 131, The test test_reserve_tasks_single is missing the fetch-parameter assertion that ensures the pull-subscribe fetch receives the right count and timeout; update the test (inside the async with TaskQueueManager() as manager block, using the mock_psub variable) to include mock_psub.fetch.assert_called_once_with(1, timeout=5) after calling manager.reserve_tasks so it mirrors test_reserve_tasks_success and verifies the count and timeout are passed through correctly.ami/ml/orchestration/nats_queue.py (1)
207-209: Uselogger.exceptionto preserve the stack trace in the catch-all handler.
logger.errordiscards the traceback, making it hard to distinguish connection errors, deserialization failures, or NATS API errors from "no tasks."logger.exceptionincludes the traceback at no extra cost and directly addresses the Ruff TRY400 warning.📝 Proposed fix
except Exception as e: - logger.error(f"Failed to reserve tasks from stream for job '{job_id}': {e}") + logger.exception(f"Failed to reserve tasks from stream for job '{job_id}': {e}") return []🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@ami/ml/orchestration/nats_queue.py` around lines 207 - 209, Replace the catch-all error logging in the except block for reserving tasks (the "except Exception as e" handler that logs "Failed to reserve tasks from stream for job '{job_id}': {e}") to call logger.exception instead of logger.error so the traceback is preserved; locate the handler in ami/ml/orchestration/nats_queue.py (the block referencing job_id and returning []), change the logger invocation to logger.exception with the same message, and keep returning [] so behavior remains the same while including full stack traces (this also satisfies the Ruff TRY400 warning).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@ami/jobs/views.py`:
- Around line 249-254: The get_tasks coroutine currently calls await
manager.reserve_tasks(...) without a timeout and uses
async_to_sync(get_tasks)(), so a slow/unresponsive NATS/JetStream can block the
worker; update get_tasks to wrap the reserve_tasks call with
asyncio.wait_for(reserve_tasks_call, timeout=...) (and/or wrap the whole
get_tasks body in asyncio.wait_for) and add explicit timeouts to the JetStream
helper functions in nats_queue.py (specifically _ensure_stream and
_ensure_consumer around js.stream_info, js.add_stream, js.consumer_info,
js.add_consumer) using a shared reasonable constant (e.g., JS_OP_TIMEOUT) to
ensure operations fail fast instead of hanging.
In `@ami/ml/orchestration/nats_queue.py`:
- Around line 47-48: The docstring example passes a string literal 'job123' to
reserve_tasks which is typed as reserve_tasks(self, job_id: int, ...); update
the example to pass an integer (e.g., 123) or otherwise construct/convert the
job_id as an int so the example matches the function signature, and ensure
related example calls (like acknowledge_task(tasks[0].reply_subject)) remain
consistent with the manager API (keep references to reserve_tasks and
acknowledge_task to locate the snippet).
---
Nitpick comments:
In `@ami/ml/orchestration/nats_queue.py`:
- Around line 207-209: Replace the catch-all error logging in the except block
for reserving tasks (the "except Exception as e" handler that logs "Failed to
reserve tasks from stream for job '{job_id}': {e}") to call logger.exception
instead of logger.error so the traceback is preserved; locate the handler in
ami/ml/orchestration/nats_queue.py (the block referencing job_id and returning
[]), change the logger invocation to logger.exception with the same message, and
keep returning [] so behavior remains the same while including full stack traces
(this also satisfies the Ruff TRY400 warning).
In `@ami/ml/orchestration/tests/test_nats_queue.py`:
- Line 98: Move the inline "import nats.errors" out of the test body and add it
to the module-level imports alongside the other top-level imports in the test
file, then delete the mid-test import; reference the exact symbol "nats.errors"
to ensure you import that module at the top and remove the inline occurrence so
the test's dependencies are explicit and visible.
- Around line 112-131: The test test_reserve_tasks_single is missing the
fetch-parameter assertion that ensures the pull-subscribe fetch receives the
right count and timeout; update the test (inside the async with
TaskQueueManager() as manager block, using the mock_psub variable) to include
mock_psub.fetch.assert_called_once_with(1, timeout=5) after calling
manager.reserve_tasks so it mirrors test_reserve_tasks_success and verifies the
count and timeout are passed through correctly.
…olicy Add NATS_JETSTREAM_TIMEOUT (10s) to all JetStream metadata operations via asyncio.wait_for() so a hung NATS connection fails fast instead of blocking the caller's thread indefinitely. Also restore the intended reconnect policy (2 attempts, 1s wait) that was lost in a prior force push. Co-Authored-By: Claude <noreply@anthropic.com>
Code reviewFound 1 issue:
antenna/ami/ml/orchestration/nats_queue.py Lines 102 to 105 in 37c0210 antenna/ami/ml/orchestration/nats_queue.py Lines 202 to 232 in 37c0210 Note on PR overlap: PR #1142 (
🤖 Generated with Claude Code - If this code review was useful, please react with 👍. Otherwise, react with 👎. |
asyncio.TimeoutError from _ensure_stream() and _ensure_consumer() was caught by the broad `except Exception` in reserve_tasks(), silently returning [] and making NATS outages indistinguishable from empty queues. Workers would then poll immediately, recreating the flooding problem. - Add explicit `except asyncio.TimeoutError: raise` in reserve_tasks() - Catch TimeoutError and OSError in the /tasks view, return 503 - Restore allow_reconnect=False (fail-fast on connection issues) - Add return type annotation to get_connection() Co-Authored-By: Claude <noreply@anthropic.com>
- Downgrade reserve_tasks log to DEBUG when zero tasks reserved (avoid log spam from frequent polling) - Pass timeout=0.5 from /tasks endpoint to avoid blocking the worker for 5s on empty queues - Fix docstring examples using string 'job123' for int-typed job_id Co-Authored-By: Claude <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (1)
ami/ml/orchestration/nats_queue.py (1)
258-283:asyncio.TimeoutErrorfromdelete_consumer/delete_streamis silently swallowed.The
except Exceptionhandlers indelete_consumer(line 281) anddelete_stream(line 307) will catchasyncio.TimeoutErrorraised by thewait_forwrappers. For cleanup operations this is likely acceptable (log + returnFalse), but it's worth being aware that a NATS outage during cleanup won't propagate, unlike the reserve/ensure paths where it does. If that's intentional, no change needed.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@ami/ml/orchestration/nats_queue.py` around lines 258 - 283, The current broad except in delete_consumer and delete_stream swallows asyncio.TimeoutError from asyncio.wait_for; change the error handling to explicitly catch asyncio.TimeoutError (from the wait_for call wrapping self.js.delete_consumer / self.js.delete_stream) and re-raise it after optionally logging, while keeping the existing catch-all Exception branch to log other failures and return False; update the try/except blocks in delete_consumer and delete_stream to first except asyncio.TimeoutError as e: logger.error(...); raise and then except Exception as e: logger.error(...); return False so timeouts propagate but other cleanup errors remain handled.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@ami/jobs/views.py`:
- Around line 254-258: The exception handler around the call to
async_to_sync(get_tasks) in the view currently catches asyncio.TimeoutError and
OSError but misses NATS client exceptions; update the except tuple to include
nats.errors.Error (e.g. except (asyncio.TimeoutError, OSError,
nats.errors.Error) as e) and add the corresponding import for nats.errors.Error
at the top of the module so NATS-specific exceptions raised by get_tasks are
caught and logged via logger.warning("NATS unavailable while fetching tasks for
job %s: %s", job.pk, e) and return the 503 response as intended.
---
Nitpick comments:
In `@ami/ml/orchestration/nats_queue.py`:
- Around line 258-283: The current broad except in delete_consumer and
delete_stream swallows asyncio.TimeoutError from asyncio.wait_for; change the
error handling to explicitly catch asyncio.TimeoutError (from the wait_for call
wrapping self.js.delete_consumer / self.js.delete_stream) and re-raise it after
optionally logging, while keeping the existing catch-all Exception branch to log
other failures and return False; update the try/except blocks in delete_consumer
and delete_stream to first except asyncio.TimeoutError as e: logger.error(...);
raise and then except Exception as e: logger.error(...); return False so
timeouts propagate but other cleanup errors remain handled.
NoServersError, ConnectionClosedError, and other NATS exceptions inherit from nats.errors.Error (not OSError), so they escaped the handler and returned 500 instead of 503. Co-Authored-By: Claude <noreply@anthropic.com>
There was a problem hiding this comment.
🧹 Nitpick comments (2)
ami/jobs/views.py (2)
37-37:BaseInFiltershould be combined withCharFilter, not used standalone.Per the official django-filter reference,
BaseInFilter"is expected to be used in conjunction with another filter class, as this class only validates that the incoming value is comma-separated. The secondary filter is then used to validate the individual values." Using it standalone skips per-value validation — correct for slugs today since the baseFilterdefaults toCharField, but the documented contract calls for the mixin pattern.♻️ Proposed fix
+class CharInFilter(filters.BaseInFilter, filters.CharFilter): + pass + class JobFilterSet(filters.FilterSet): """Custom filterset to enable pipeline name filtering.""" pipeline__slug = filters.CharFilter(field_name="pipeline__slug", lookup_expr="exact") - pipeline__slug__in = filters.BaseInFilter(field_name="pipeline__slug", lookup_expr="in") + pipeline__slug__in = CharInFilter(field_name="pipeline__slug", lookup_expr="in")🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@ami/jobs/views.py` at line 37, pipeline__slug__in currently uses BaseInFilter standalone; instead define and use a mixin combining BaseInFilter with CharFilter (e.g., create InCharFilter = type("InCharFilter", (filters.BaseInFilter, filters.CharFilter), {}) or a small class class InCharFilter(filters.BaseInFilter, filters.CharFilter): pass) and replace pipeline__slug__in = filters.BaseInFilter(...) with pipeline__slug__in = InCharFilter(field_name="pipeline__slug", lookup_expr="in") so each comma-separated value is validated as a char/slug.
61-72:JobState.final_states()is called twice — assign once.
final_states()is evaluated at line 62 inline and again at line 65. Assign it once at the top of theifblock.♻️ Proposed fix
if incomplete_only: + final_states = JobState.final_states() # Exclude jobs with a terminal top-level status - queryset = queryset.exclude(status__in=JobState.final_states()) + queryset = queryset.exclude(status__in=final_states) # Also exclude jobs where the "results" stage has a final state status - final_states = JobState.final_states() exclude_conditions = Q()🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@ami/jobs/views.py` around lines 61 - 72, The code calls JobState.final_states() twice; store its result once in a local variable at the start of the block and reuse it when building exclude_conditions and when excluding terminal top-level statuses. Update the block that sets queryset = queryset.exclude(status__in=JobState.final_states()) and the loop that builds exclude_conditions so both reference the same final_states variable (e.g., final_states = JobState.final_states()) and remove the redundant second call; keep usage of queryset, exclude_conditions, and the progress__stages__contains JSON condition intact.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@ami/jobs/views.py`:
- Line 37: pipeline__slug__in currently uses BaseInFilter standalone; instead
define and use a mixin combining BaseInFilter with CharFilter (e.g., create
InCharFilter = type("InCharFilter", (filters.BaseInFilter, filters.CharFilter),
{}) or a small class class InCharFilter(filters.BaseInFilter,
filters.CharFilter): pass) and replace pipeline__slug__in =
filters.BaseInFilter(...) with pipeline__slug__in =
InCharFilter(field_name="pipeline__slug", lookup_expr="in") so each
comma-separated value is validated as a char/slug.
- Around line 61-72: The code calls JobState.final_states() twice; store its
result once in a local variable at the start of the block and reuse it when
building exclude_conditions and when excluding terminal top-level statuses.
Update the block that sets queryset =
queryset.exclude(status__in=JobState.final_states()) and the loop that builds
exclude_conditions so both reference the same final_states variable (e.g.,
final_states = JobState.final_states()) and remove the redundant second call;
keep usage of queryset, exclude_conditions, and the progress__stages__contains
JSON condition intact.
Rebase onto main after RolnickLab#1135 merge. Keep only the additions unique to this branch: - Make TASK_TTR configurable via NATS_TASK_TTR Django setting (default 30s) - Make max_ack_pending configurable via NATS_MAX_ACK_PENDING setting (default 100) - Local dev: switch to gunicorn+UvicornWorker by default for production parity, with USE_UVICORN=1 escape hatch for raw uvicorn - Production: auto-detect WEB_CONCURRENCY from CPU cores (capped at 8) when not explicitly set in the environment Co-Authored-By: Claude <noreply@anthropic.com>
…1142) * feat: configurable NATS tuning and gunicorn worker management Rebase onto main after #1135 merge. Keep only the additions unique to this branch: - Make TASK_TTR configurable via NATS_TASK_TTR Django setting (default 30s) - Make max_ack_pending configurable via NATS_MAX_ACK_PENDING setting (default 100) - Local dev: switch to gunicorn+UvicornWorker by default for production parity, with USE_UVICORN=1 escape hatch for raw uvicorn - Production: auto-detect WEB_CONCURRENCY from CPU cores (capped at 8) when not explicitly set in the environment Co-Authored-By: Claude <noreply@anthropic.com> * fix: address PR review comments - Fix max_ack_pending falsy-zero guard (use `is not None` instead of `or`) - Update TaskQueueManager docstring with Args section - Simplify production WEB_CONCURRENCY fallback (just use nproc) Co-Authored-By: Claude <noreply@anthropic.com> --------- Co-authored-by: Michael Bunsen <notbot@gmail.com> Co-authored-by: Claude <noreply@anthropic.com>
Summary
Fixes discovered during PSv2 integration testing where consumer workers overwhelmed Django by flooding
/taskswith concurrent NATS reserve requests.get_connection()now setsallow_reconnect=Falsewith a 5s connect timeout, preventing leaked reconnection loops from blocking Django's async event loop_ensure_stream,_ensure_consumer, publish, delete) now have 10s timeouts viaasyncio.wait_for()to fail fast when NATS is unreachableasyncio.TimeoutErrorpropagates throughreserve_tasks()to the/tasksview, which returns a 503 instead of silently returning empty tasks during NATS outages/tasksendpoint returns empty for terminal-status jobs instead of attempting NATS reserve/tasksendpoint batches NATS reserve calls with a short fetch timeout (0.5s) to keep the worker responsiveIncompleteJobFilterexcludes jobs by top-level status (not just progress JSON stages), so workers don't pick up manually-failed jobspipeline__slug__infilter so workers can fetch jobs for multiple pipelines in a single requestdispatch_modeat job creation time (insetup()) so the UI shows the correct mode before the job runsContext
During integration testing the consumer's multiple DataLoader subprocesses generated 147 concurrent
/tasksrequests against Django's single uvicorn worker thread. Combined with NATS connections that defaulted to unlimited reconnect, this blocked the entire event loop and made Django unresponsive.Changes
NATS connection safety
ami/ml/orchestration/nats_queue.py:allow_reconnect=Falsewithconnect_timeout=5for fail-fast behaviorami/ml/orchestration/nats_queue.py: AddNATS_JETSTREAM_TIMEOUT(10s) to all JetStream metadata operations viaasyncio.wait_for(), withasyncio.TimeoutErrorre-raised explicitly (not swallowed by generic exception handlers)ami/ml/orchestration/nats_queue.py: Batchreserve_tasks()replaces per-taskreserve_task(), with conditional log levels (INFO when tasks reserved, DEBUG otherwise)ami/jobs/views.py: Catchasyncio.TimeoutErrorandOSErrorin/tasksendpoint, return 503 instead of 500ami/jobs/views.py: Return empty tasks for terminal-status jobs; passtimeout=0.5to keep worker responsiveami/jobs/views.py:IncompleteJobFilterchecks top-levelstatusfieldMulti-pipeline job filtering
ami/jobs/views.py:pipeline__slug__in = BaseInFilter(...)enables?pipeline__slug__in=slug1,slug2Dispatch mode at creation time
ami/jobs/models.py: Setdispatch_modeinJob.setup()based on project feature flags, so the UI reflects the correct mode immediatelyFollow-up work
TASK_TTRandmax_ack_pending(see reconciliation plan on PSv2: Improve task fetching & web worker concurrency configuration #1142)/tasksearly return andincomplete_onlystatus exclusionTest plan
incomplete_only=1excludes FAILURE/SUCCESS/REVOKED jobspipeline__slug__infilter returns correct jobs (unit test)dispatch_modeset correctly at creation time (unit test)Summary by CodeRabbit
New Features
Bug Fixes
Tests