Skip to content

Comments

fix: PSv2 follow-up fixes from integration tests#1135

Merged
mihow merged 12 commits intomainfrom
fix/nats-connection-safety
Feb 21, 2026
Merged

fix: PSv2 follow-up fixes from integration tests#1135
mihow merged 12 commits intomainfrom
fix/nats-connection-safety

Conversation

@mihow
Copy link
Collaborator

@mihow mihow commented Feb 17, 2026

Summary

Fixes discovered during PSv2 integration testing where consumer workers overwhelmed Django by flooding /tasks with concurrent NATS reserve requests.

  • NATS get_connection() now sets allow_reconnect=False with a 5s connect timeout, preventing leaked reconnection loops from blocking Django's async event loop
  • All JetStream operations (_ensure_stream, _ensure_consumer, publish, delete) now have 10s timeouts via asyncio.wait_for() to fail fast when NATS is unreachable
  • asyncio.TimeoutError propagates through reserve_tasks() to the /tasks view, which returns a 503 instead of silently returning empty tasks during NATS outages
  • /tasks endpoint returns empty for terminal-status jobs instead of attempting NATS reserve
  • /tasks endpoint batches NATS reserve calls with a short fetch timeout (0.5s) to keep the worker responsive
  • IncompleteJobFilter excludes jobs by top-level status (not just progress JSON stages), so workers don't pick up manually-failed jobs
  • Add pipeline__slug__in filter so workers can fetch jobs for multiple pipelines in a single request
  • Set dispatch_mode at job creation time (in setup()) so the UI shows the correct mode before the job runs

Context

During integration testing the consumer's multiple DataLoader subprocesses generated 147 concurrent /tasks requests against Django's single uvicorn worker thread. Combined with NATS connections that defaulted to unlimited reconnect, this blocked the entire event loop and made Django unresponsive.

Changes

NATS connection safety

  • ami/ml/orchestration/nats_queue.py: allow_reconnect=False with connect_timeout=5 for fail-fast behavior
  • ami/ml/orchestration/nats_queue.py: Add NATS_JETSTREAM_TIMEOUT (10s) to all JetStream metadata operations via asyncio.wait_for(), with asyncio.TimeoutError re-raised explicitly (not swallowed by generic exception handlers)
  • ami/ml/orchestration/nats_queue.py: Batch reserve_tasks() replaces per-task reserve_task(), with conditional log levels (INFO when tasks reserved, DEBUG otherwise)
  • ami/jobs/views.py: Catch asyncio.TimeoutError and OSError in /tasks endpoint, return 503 instead of 500
  • ami/jobs/views.py: Return empty tasks for terminal-status jobs; pass timeout=0.5 to keep worker responsive
  • ami/jobs/views.py: IncompleteJobFilter checks top-level status field

Multi-pipeline job filtering

  • ami/jobs/views.py: pipeline__slug__in = BaseInFilter(...) enables ?pipeline__slug__in=slug1,slug2

Dispatch mode at creation time

  • ami/jobs/models.py: Set dispatch_mode in Job.setup() based on project feature flags, so the UI reflects the correct mode immediately

Follow-up work

Test plan

  • Run PSv2 integration test with clean state (no stale workers)
  • Verify Django stays responsive during worker task fetching
  • Verify terminal-status jobs return empty tasks without hitting NATS
  • Verify incomplete_only=1 excludes FAILURE/SUCCESS/REVOKED jobs
  • pipeline__slug__in filter returns correct jobs (unit test)
  • ML job dispatch_mode set correctly at creation time (unit test)

Summary by CodeRabbit

  • New Features

    • Added filtering jobs by multiple pipeline slugs.
    • Batch task reservation capability for improved efficiency.
  • Bug Fixes

    • Improved incomplete job filtering to exclude terminal statuses.
    • Job dispatch mode now correctly initialized based on project configuration.
    • Enhanced error handling for task queue unavailability (503 response).
  • Tests

    • Added tests for multi-slug pipeline filtering.
    • Added tests for ML job dispatch mode behavior.

mihow and others added 2 commits February 16, 2026 22:43
- Add connect_timeout=5, allow_reconnect=False to NATS connections to
  prevent leaked reconnection loops from blocking Django's event loop
- Guard /tasks endpoint against terminal-status jobs (return empty tasks
  instead of attempting NATS reserve)
- IncompleteJobFilter now excludes jobs by top-level status in addition
  to progress JSON stages
- Add stale worker cleanup to integration test script

Found during PSv2 integration testing where stale ADC workers with
default DataLoader parallelism overwhelmed the single uvicorn worker
thread by flooding /tasks with concurrent NATS reserve requests.

Co-Authored-By: Claude <noreply@anthropic.com>
Session notes from 2026-02-16 integration test including root cause
analysis of stale worker task competition and NATS connection issues.
Findings doc tracks applied fixes and remaining TODOs with priorities.

Co-Authored-By: Claude <noreply@anthropic.com>
@netlify
Copy link

netlify bot commented Feb 17, 2026

Deploy Preview for antenna-preview canceled.

Name Link
🔨 Latest commit 6eb2854
🔍 Latest deploy log https://app.netlify.com/projects/antenna-preview/deploys/69991459c109ae0008bb4275

@netlify
Copy link

netlify bot commented Feb 17, 2026

Deploy Preview for antenna-ssec canceled.

Name Link
🔨 Latest commit 6eb2854
🔍 Latest deploy log https://app.netlify.com/projects/antenna-ssec/deploys/69991459d7ce1e0007e1657a

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Feb 17, 2026

📝 Walkthrough

Walkthrough

Adds batch task reservation to the NATS queue, enables filtering jobs by multiple pipeline slugs, refines incomplete-job exclusion logic and job tasks endpoint behavior, moves dispatch_mode determination to job creation, and extends/updates tests to cover these behaviors.

Changes

Cohort / File(s) Summary
Job Filtering & API
ami/jobs/views.py
Added pipeline__slug__in filter; refined IncompleteJobFilter to exclude top-level terminal jobs and jobs whose "results" stage is final; tasks endpoint now early-returns for final jobs, calls reserve_tasks, and handles NATS unavailability with 503.
Job Models / Dispatch
ami/jobs/models.py
Set dispatch_mode at Job.setup when a pipeline exists (based on project.feature_flags.async_pipeline_workers); removed runtime switching in MLJob.run and only queue images when ASYNC_API.
Job Tests
ami/jobs/tests.py
Added test_filter_by_pipeline_slug_in; added creation-time test_ml_job_dispatch_mode_set_on_creation (in two test contexts); adjusted dispatch-mode filtering test to use a non-ML job_type_key and pipeline-less internal job.
NATS Task Queue Implementation
ami/ml/orchestration/nats_queue.py
Replaced reserve_task() with reserve_tasks(job_id, count, timeout) returning a list of tasks; introduced NATS_JETSTREAM_TIMEOUT; enforced timeouts for stream/consumer/publish/delete ops; improved error handling and logging; propagate timeouts and return empty lists on fetch errors.
NATS Queue Tests
ami/ml/orchestration/tests/test_nats_queue.py
Updated tests to expect list results from reserve_tasks (including single-item lists), verify distinct reply subjects and id preservation, adapt timeout and connection-not-open error paths to list/TimeoutError semantics.

Sequence Diagram(s)

sequenceDiagram
    participant Client as Client
    participant JobAPI as Job Tasks Endpoint
    participant QueueMgr as TaskQueueManager
    participant NATS as NATS/JetStream

    Client->>JobAPI: GET /tasks (job_id, count)
    alt job in final state
        JobAPI-->>Client: [] (empty list)
    else job not final
        JobAPI->>QueueMgr: reserve_tasks(job_id, count, timeout)
        QueueMgr->>NATS: ensure stream & consumer (with rgba(0,128,0,0.5) timeouts)
        NATS-->>QueueMgr: stream/consumer OK
        QueueMgr->>NATS: pull_subscribe & fetch(count, timeout)
        NATS-->>QueueMgr: [msg1, msg2, ...]
        QueueMgr->>QueueMgr: map messages -> PipelineProcessingTask list
        QueueMgr-->>JobAPI: [Task1, Task2, ...]
        JobAPI-->>Client: return tasks list
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

Suggested labels

backend, ml

Poem

🐇 I hopped into queues with a twitch and a wink,
Messages bundled — far faster than I’d think.
Pipelines sorted and dispatch set at birth,
Tests hop along to prove code’s new worth.
May the CI carrots be many and green!

🚥 Pre-merge checks | ✅ 3
✅ Passed checks (3 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately reflects the main purpose of this PR—delivering follow-up fixes to address issues discovered during PSv2 integration tests, including NATS connection safety, dispatcher handling, and multi-pipeline filtering.
Description check ✅ Passed The PR description is comprehensive, covering summary, detailed changes across all modified files, context, test plan, and follow-up work. All required template sections are present and well-populated.
Docstring Coverage ✅ Passed Docstring coverage is 84.62% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
  • 📝 Generate docstrings (stacked PR)
  • 📝 Generate docstrings (commit on current branch)
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch fix/nats-connection-safety

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

mihow and others added 3 commits February 16, 2026 23:04
PSv2 integration test passed end-to-end (job 1380, 20/20 images).
Identified ack_wait=300s as cause of ~5min idle time when GPU
processes race for NATS tasks.

Co-Authored-By: Claude <noreply@anthropic.com>
Replace N×1 reserve_task() calls with single reserve_tasks() batch
fetch. The previous implementation created a new pull subscription per
message (320 NATS round trips for batch=64), causing the /tasks endpoint
to exceed HTTP client timeouts. The new approach uses one psub.fetch()
call for the entire batch.

Co-Authored-By: Claude <noreply@anthropic.com>
@mihow mihow changed the title fix: prevent NATS connection flooding and stale job task fetching fix: PSv2 follow-up fixes from integration tests Feb 17, 2026
mihow and others added 2 commits February 20, 2026 16:56
Workers that handle multiple pipelines can now fetch jobs for all of them
in a single request: ?pipeline__slug__in=slug1,slug2

Co-Authored-By: Claude <noreply@anthropic.com>
These files are session notes, planning docs, and test scripts that
should stay local rather than be part of the PR.

Co-Authored-By: Claude <noreply@anthropic.com>
@mihow mihow marked this pull request as ready for review February 21, 2026 01:11
Copilot AI review requested due to automatic review settings February 21, 2026 01:11
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR addresses PSv2 integration-test issues where high-concurrency worker polling overwhelmed Django and NATS interactions caused event-loop stalls, by hardening the /jobs/{id}/tasks behavior and updating NATS queue reservation semantics.

Changes:

  • Update NATS queue reservation to support batch pulls (reserve_tasks) and adjust connection behavior.
  • Prevent /tasks from reserving tasks for terminal-status jobs; switch to batched NATS reserve calls.
  • Extend job filtering to support pipeline__slug__in and strengthen incomplete_only filtering by top-level job status (with a new unit test for multi-pipeline filtering).

Reviewed changes

Copilot reviewed 4 out of 4 changed files in this pull request and generated 5 comments.

File Description
ami/ml/orchestration/nats_queue.py Adds batched task reservation and changes NATS connection options used by the task queue manager.
ami/ml/orchestration/tests/test_nats_queue.py Updates/extends tests to validate batched reservation behavior and timeout handling.
ami/jobs/views.py Updates /tasks endpoint to short-circuit terminal jobs and batch reserves; enhances incomplete filtering and adds pipeline__slug__in.
ami/jobs/tests.py Adds unit test coverage for filtering jobs by multiple pipeline slugs.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

… flags

ML jobs with a pipeline now get dispatch_mode set during setup() instead
of waiting until run() is called by the Celery worker. This lets the UI
show the correct mode immediately after job creation.

Co-Authored-By: Claude <noreply@anthropic.com>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (3)
ami/ml/orchestration/tests/test_nats_queue.py (2)

98-98: Move import nats.errors to module level.

Mid-test imports are unconventional and hide dependencies.

📝 Proposed fix
+import nats.errors
+
 import unittest
 from unittest.mock import AsyncMock, MagicMock, patch

 from ami.ml.orchestration.nats_queue import TaskQueueManager
 from ami.ml.schemas import PipelineProcessingTask

Then remove the inline import at line 98.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ami/ml/orchestration/tests/test_nats_queue.py` at line 98, Move the inline
"import nats.errors" out of the test body and add it to the module-level imports
alongside the other top-level imports in the test file, then delete the mid-test
import; reference the exact symbol "nats.errors" to ensure you import that
module at the top and remove the inline occurrence so the test's dependencies
are explicit and visible.

112-131: test_reserve_tasks_single omits the fetch-parameter assertion present in test_reserve_tasks_success.

For consistency, add the same mock_psub.fetch.assert_called_once_with(1, timeout=5) check to verify the count and timeout are passed through correctly.

📝 Proposed addition
                 self.assertEqual(len(tasks), 1)
                 self.assertEqual(tasks[0].reply_subject, "reply.subject.123")
+                mock_psub.fetch.assert_called_once_with(1, timeout=5)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ami/ml/orchestration/tests/test_nats_queue.py` around lines 112 - 131, The
test test_reserve_tasks_single is missing the fetch-parameter assertion that
ensures the pull-subscribe fetch receives the right count and timeout; update
the test (inside the async with TaskQueueManager() as manager block, using the
mock_psub variable) to include mock_psub.fetch.assert_called_once_with(1,
timeout=5) after calling manager.reserve_tasks so it mirrors
test_reserve_tasks_success and verifies the count and timeout are passed through
correctly.
ami/ml/orchestration/nats_queue.py (1)

207-209: Use logger.exception to preserve the stack trace in the catch-all handler.

logger.error discards the traceback, making it hard to distinguish connection errors, deserialization failures, or NATS API errors from "no tasks." logger.exception includes the traceback at no extra cost and directly addresses the Ruff TRY400 warning.

📝 Proposed fix
         except Exception as e:
-            logger.error(f"Failed to reserve tasks from stream for job '{job_id}': {e}")
+            logger.exception(f"Failed to reserve tasks from stream for job '{job_id}': {e}")
             return []
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ami/ml/orchestration/nats_queue.py` around lines 207 - 209, Replace the
catch-all error logging in the except block for reserving tasks (the "except
Exception as e" handler that logs "Failed to reserve tasks from stream for job
'{job_id}': {e}") to call logger.exception instead of logger.error so the
traceback is preserved; locate the handler in ami/ml/orchestration/nats_queue.py
(the block referencing job_id and returning []), change the logger invocation to
logger.exception with the same message, and keep returning [] so behavior
remains the same while including full stack traces (this also satisfies the Ruff
TRY400 warning).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@ami/jobs/views.py`:
- Around line 249-254: The get_tasks coroutine currently calls await
manager.reserve_tasks(...) without a timeout and uses
async_to_sync(get_tasks)(), so a slow/unresponsive NATS/JetStream can block the
worker; update get_tasks to wrap the reserve_tasks call with
asyncio.wait_for(reserve_tasks_call, timeout=...) (and/or wrap the whole
get_tasks body in asyncio.wait_for) and add explicit timeouts to the JetStream
helper functions in nats_queue.py (specifically _ensure_stream and
_ensure_consumer around js.stream_info, js.add_stream, js.consumer_info,
js.add_consumer) using a shared reasonable constant (e.g., JS_OP_TIMEOUT) to
ensure operations fail fast instead of hanging.

In `@ami/ml/orchestration/nats_queue.py`:
- Around line 47-48: The docstring example passes a string literal 'job123' to
reserve_tasks which is typed as reserve_tasks(self, job_id: int, ...); update
the example to pass an integer (e.g., 123) or otherwise construct/convert the
job_id as an int so the example matches the function signature, and ensure
related example calls (like acknowledge_task(tasks[0].reply_subject)) remain
consistent with the manager API (keep references to reserve_tasks and
acknowledge_task to locate the snippet).

---

Nitpick comments:
In `@ami/ml/orchestration/nats_queue.py`:
- Around line 207-209: Replace the catch-all error logging in the except block
for reserving tasks (the "except Exception as e" handler that logs "Failed to
reserve tasks from stream for job '{job_id}': {e}") to call logger.exception
instead of logger.error so the traceback is preserved; locate the handler in
ami/ml/orchestration/nats_queue.py (the block referencing job_id and returning
[]), change the logger invocation to logger.exception with the same message, and
keep returning [] so behavior remains the same while including full stack traces
(this also satisfies the Ruff TRY400 warning).

In `@ami/ml/orchestration/tests/test_nats_queue.py`:
- Line 98: Move the inline "import nats.errors" out of the test body and add it
to the module-level imports alongside the other top-level imports in the test
file, then delete the mid-test import; reference the exact symbol "nats.errors"
to ensure you import that module at the top and remove the inline occurrence so
the test's dependencies are explicit and visible.
- Around line 112-131: The test test_reserve_tasks_single is missing the
fetch-parameter assertion that ensures the pull-subscribe fetch receives the
right count and timeout; update the test (inside the async with
TaskQueueManager() as manager block, using the mock_psub variable) to include
mock_psub.fetch.assert_called_once_with(1, timeout=5) after calling
manager.reserve_tasks so it mirrors test_reserve_tasks_success and verifies the
count and timeout are passed through correctly.

…olicy

Add NATS_JETSTREAM_TIMEOUT (10s) to all JetStream metadata operations
via asyncio.wait_for() so a hung NATS connection fails fast instead of
blocking the caller's thread indefinitely. Also restore the intended
reconnect policy (2 attempts, 1s wait) that was lost in a prior force push.

Co-Authored-By: Claude <noreply@anthropic.com>
@mihow
Copy link
Collaborator Author

mihow commented Feb 21, 2026

Code review

Found 1 issue:

  1. asyncio.TimeoutError re-raised in _ensure_stream() and _ensure_consumer() is silently caught by the broad except Exception in reserve_tasks(), making NATS outages indistinguishable from an empty queue.

    _ensure_stream() (line 103) and _ensure_consumer() (line 133) explicitly re-raise asyncio.TimeoutError with the comment "NATS unreachable — let caller handle it." However, reserve_tasks() wraps both calls in a try/except Exception (line 230) that catches asyncio.TimeoutError (which is a subclass of Exception in Python 3.11+), logs an error, and returns []. The /tasks endpoint then returns {"tasks": []}, which is identical to the "no tasks available" response. ADC workers cannot distinguish between "NATS is down" and "no work right now," so they poll again immediately — recreating a milder form of the flooding problem this PR is trying to fix.

    Fix: catch asyncio.TimeoutError before Exception in reserve_tasks() and either re-raise it or return a distinguishable signal (e.g., raise so the view can return a 503).

logger.debug(f"Stream {stream_name} already exists")
except asyncio.TimeoutError:
raise # NATS unreachable — let caller handle it rather than creating a stream blindly
except Exception as e:

try:
await self._ensure_stream(job_id)
await self._ensure_consumer(job_id)
consumer_name = self._get_consumer_name(job_id)
subject = self._get_subject(job_id)
psub = await self.js.pull_subscribe(subject, consumer_name)
try:
msgs = await psub.fetch(count, timeout=timeout)
except nats.errors.TimeoutError:
logger.debug(f"No tasks available in stream for job '{job_id}'")
return []
finally:
await psub.unsubscribe()
tasks = []
for msg in msgs:
task_data = json.loads(msg.data.decode())
task = PipelineProcessingTask(**task_data)
task.reply_subject = msg.reply
tasks.append(task)
logger.info(f"Reserved {len(tasks)} tasks from stream for job '{job_id}'")
return tasks
except Exception as e:
logger.error(f"Failed to reserve tasks from stream for job '{job_id}': {e}")
return []


Note on PR overlap: PR #1142 (carlos/taskperf) modifies the same three core files (nats_queue.py, views.py, tests/test_nats_queue.py) with a competing batch-reservation implementation. Key differences:

🤖 Generated with Claude Code

- If this code review was useful, please react with 👍. Otherwise, react with 👎.

mihow and others added 2 commits February 20, 2026 18:01
asyncio.TimeoutError from _ensure_stream() and _ensure_consumer() was
caught by the broad `except Exception` in reserve_tasks(), silently
returning [] and making NATS outages indistinguishable from empty queues.
Workers would then poll immediately, recreating the flooding problem.

- Add explicit `except asyncio.TimeoutError: raise` in reserve_tasks()
- Catch TimeoutError and OSError in the /tasks view, return 503
- Restore allow_reconnect=False (fail-fast on connection issues)
- Add return type annotation to get_connection()

Co-Authored-By: Claude <noreply@anthropic.com>
- Downgrade reserve_tasks log to DEBUG when zero tasks reserved (avoid
  log spam from frequent polling)
- Pass timeout=0.5 from /tasks endpoint to avoid blocking the worker
  for 5s on empty queues
- Fix docstring examples using string 'job123' for int-typed job_id

Co-Authored-By: Claude <noreply@anthropic.com>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
ami/ml/orchestration/nats_queue.py (1)

258-283: asyncio.TimeoutError from delete_consumer / delete_stream is silently swallowed.

The except Exception handlers in delete_consumer (line 281) and delete_stream (line 307) will catch asyncio.TimeoutError raised by the wait_for wrappers. For cleanup operations this is likely acceptable (log + return False), but it's worth being aware that a NATS outage during cleanup won't propagate, unlike the reserve/ensure paths where it does. If that's intentional, no change needed.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ami/ml/orchestration/nats_queue.py` around lines 258 - 283, The current broad
except in delete_consumer and delete_stream swallows asyncio.TimeoutError from
asyncio.wait_for; change the error handling to explicitly catch
asyncio.TimeoutError (from the wait_for call wrapping self.js.delete_consumer /
self.js.delete_stream) and re-raise it after optionally logging, while keeping
the existing catch-all Exception branch to log other failures and return False;
update the try/except blocks in delete_consumer and delete_stream to first
except asyncio.TimeoutError as e: logger.error(...); raise and then except
Exception as e: logger.error(...); return False so timeouts propagate but other
cleanup errors remain handled.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@ami/jobs/views.py`:
- Around line 254-258: The exception handler around the call to
async_to_sync(get_tasks) in the view currently catches asyncio.TimeoutError and
OSError but misses NATS client exceptions; update the except tuple to include
nats.errors.Error (e.g. except (asyncio.TimeoutError, OSError,
nats.errors.Error) as e) and add the corresponding import for nats.errors.Error
at the top of the module so NATS-specific exceptions raised by get_tasks are
caught and logged via logger.warning("NATS unavailable while fetching tasks for
job %s: %s", job.pk, e) and return the 503 response as intended.

---

Nitpick comments:
In `@ami/ml/orchestration/nats_queue.py`:
- Around line 258-283: The current broad except in delete_consumer and
delete_stream swallows asyncio.TimeoutError from asyncio.wait_for; change the
error handling to explicitly catch asyncio.TimeoutError (from the wait_for call
wrapping self.js.delete_consumer / self.js.delete_stream) and re-raise it after
optionally logging, while keeping the existing catch-all Exception branch to log
other failures and return False; update the try/except blocks in delete_consumer
and delete_stream to first except asyncio.TimeoutError as e: logger.error(...);
raise and then except Exception as e: logger.error(...); return False so
timeouts propagate but other cleanup errors remain handled.

NoServersError, ConnectionClosedError, and other NATS exceptions inherit
from nats.errors.Error (not OSError), so they escaped the handler and
returned 500 instead of 503.

Co-Authored-By: Claude <noreply@anthropic.com>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (2)
ami/jobs/views.py (2)

37-37: BaseInFilter should be combined with CharFilter, not used standalone.

Per the official django-filter reference, BaseInFilter "is expected to be used in conjunction with another filter class, as this class only validates that the incoming value is comma-separated. The secondary filter is then used to validate the individual values." Using it standalone skips per-value validation — correct for slugs today since the base Filter defaults to CharField, but the documented contract calls for the mixin pattern.

♻️ Proposed fix
+class CharInFilter(filters.BaseInFilter, filters.CharFilter):
+    pass
+
 class JobFilterSet(filters.FilterSet):
     """Custom filterset to enable pipeline name filtering."""
 
     pipeline__slug = filters.CharFilter(field_name="pipeline__slug", lookup_expr="exact")
-    pipeline__slug__in = filters.BaseInFilter(field_name="pipeline__slug", lookup_expr="in")
+    pipeline__slug__in = CharInFilter(field_name="pipeline__slug", lookup_expr="in")
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ami/jobs/views.py` at line 37, pipeline__slug__in currently uses BaseInFilter
standalone; instead define and use a mixin combining BaseInFilter with
CharFilter (e.g., create InCharFilter = type("InCharFilter",
(filters.BaseInFilter, filters.CharFilter), {}) or a small class class
InCharFilter(filters.BaseInFilter, filters.CharFilter): pass) and replace
pipeline__slug__in = filters.BaseInFilter(...) with pipeline__slug__in =
InCharFilter(field_name="pipeline__slug", lookup_expr="in") so each
comma-separated value is validated as a char/slug.

61-72: JobState.final_states() is called twice — assign once.

final_states() is evaluated at line 62 inline and again at line 65. Assign it once at the top of the if block.

♻️ Proposed fix
         if incomplete_only:
+            final_states = JobState.final_states()
             # Exclude jobs with a terminal top-level status
-            queryset = queryset.exclude(status__in=JobState.final_states())
+            queryset = queryset.exclude(status__in=final_states)
 
             # Also exclude jobs where the "results" stage has a final state status
-            final_states = JobState.final_states()
             exclude_conditions = Q()
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ami/jobs/views.py` around lines 61 - 72, The code calls
JobState.final_states() twice; store its result once in a local variable at the
start of the block and reuse it when building exclude_conditions and when
excluding terminal top-level statuses. Update the block that sets queryset =
queryset.exclude(status__in=JobState.final_states()) and the loop that builds
exclude_conditions so both reference the same final_states variable (e.g.,
final_states = JobState.final_states()) and remove the redundant second call;
keep usage of queryset, exclude_conditions, and the progress__stages__contains
JSON condition intact.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@ami/jobs/views.py`:
- Line 37: pipeline__slug__in currently uses BaseInFilter standalone; instead
define and use a mixin combining BaseInFilter with CharFilter (e.g., create
InCharFilter = type("InCharFilter", (filters.BaseInFilter, filters.CharFilter),
{}) or a small class class InCharFilter(filters.BaseInFilter,
filters.CharFilter): pass) and replace pipeline__slug__in =
filters.BaseInFilter(...) with pipeline__slug__in =
InCharFilter(field_name="pipeline__slug", lookup_expr="in") so each
comma-separated value is validated as a char/slug.
- Around line 61-72: The code calls JobState.final_states() twice; store its
result once in a local variable at the start of the block and reuse it when
building exclude_conditions and when excluding terminal top-level statuses.
Update the block that sets queryset =
queryset.exclude(status__in=JobState.final_states()) and the loop that builds
exclude_conditions so both reference the same final_states variable (e.g.,
final_states = JobState.final_states()) and remove the redundant second call;
keep usage of queryset, exclude_conditions, and the progress__stages__contains
JSON condition intact.

@mihow mihow merged commit 17bf10f into main Feb 21, 2026
7 checks passed
@mihow mihow deleted the fix/nats-connection-safety branch February 21, 2026 02:23
mihow added a commit to uw-ssec/antenna that referenced this pull request Feb 21, 2026
Rebase onto main after RolnickLab#1135 merge. Keep only the additions unique to
this branch:

- Make TASK_TTR configurable via NATS_TASK_TTR Django setting (default 30s)
- Make max_ack_pending configurable via NATS_MAX_ACK_PENDING setting (default 100)
- Local dev: switch to gunicorn+UvicornWorker by default for production
  parity, with USE_UVICORN=1 escape hatch for raw uvicorn
- Production: auto-detect WEB_CONCURRENCY from CPU cores (capped at 8)
  when not explicitly set in the environment

Co-Authored-By: Claude <noreply@anthropic.com>
mihow added a commit that referenced this pull request Feb 21, 2026
…1142)

* feat: configurable NATS tuning and gunicorn worker management

Rebase onto main after #1135 merge. Keep only the additions unique to
this branch:

- Make TASK_TTR configurable via NATS_TASK_TTR Django setting (default 30s)
- Make max_ack_pending configurable via NATS_MAX_ACK_PENDING setting (default 100)
- Local dev: switch to gunicorn+UvicornWorker by default for production
  parity, with USE_UVICORN=1 escape hatch for raw uvicorn
- Production: auto-detect WEB_CONCURRENCY from CPU cores (capped at 8)
  when not explicitly set in the environment

Co-Authored-By: Claude <noreply@anthropic.com>

* fix: address PR review comments

- Fix max_ack_pending falsy-zero guard (use `is not None` instead of `or`)
- Update TaskQueueManager docstring with Args section
- Simplify production WEB_CONCURRENCY fallback (just use nproc)

Co-Authored-By: Claude <noreply@anthropic.com>

---------

Co-authored-by: Michael Bunsen <notbot@gmail.com>
Co-authored-by: Claude <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants