Skip to content

Comments

PSv2: Improve task fetching & web worker concurrency configuration#1142

Merged
mihow merged 2 commits intoRolnickLab:mainfrom
uw-ssec:carlos/taskperf
Feb 21, 2026
Merged

PSv2: Improve task fetching & web worker concurrency configuration#1142
mihow merged 2 commits intoRolnickLab:mainfrom
uw-ssec:carlos/taskperf

Conversation

@carlosgjs
Copy link
Collaborator

@carlosgjs carlosgjs commented Feb 20, 2026

Summary

This pull request introduces performance tuning for the NATS task queue and configurable Gunicorn worker counts.

See: RolnickLab/ami-data-companion#113

Task queue management improvements:

  • Added support for configurable max_ack_pending in TaskQueueManager, allowing customization of the maximum number of unacknowledged tasks per consumer (default: 100, settable via settings.NATS_MAX_ACK_PENDING).
  • Reduced the default NATS visibility timeout (TASK_TTR) from 300s to 30s, now configurable via settings.NATS_TASK_TTR.
  • Updated local and production Django start scripts to allow configurable Gunicorn worker counts based on environment variables and CPU cores.

Changes added but not visible after merging with #1135

These changes were part of the original PR but were superseded by #1135 (fix/nats-connection-safety) during rebase:

Testing

  • Validated significant improvements in the task fetching performance via the ami worker

Test:

docker compose run --rm django python manage.py test_ml_job_e2e "ami-1000" quebec_vermont_moths_2023 --dispatch-mode async_api

Before this change and with current ami worker (sync results):

✅ Job completed successfully

⏱ Total runtime: 192.54 seconds

📊 Final Results:
  Collect: 100.0% (SUCCESS)
    Total Images: 1000
  Process: 100.0% (SUCCESS)
    Processed: 1000
  Results: 100.0% (SUCCESS)
    Captures: 1000
    Detections: 5626
    Classifications: 5626

Before this change and with RolnickLab/ami-data-companion#113 (async results)
Across multiple tests ~5% of tasks failed to process and had to be re-delivered by NATS 5 mins later, hence the much longer time.
TASK_TTR=300

✅ Job completed successfully
⏱ Total runtime: 647.46 seconds
📊 Final Results:
  Collect: 100.0% (SUCCESS)
    Total Images: 1000
  Process: 100.0% (SUCCESS)
    Processed: 1000
  Results: 100.0% (SUCCESS)
    Captures: 1000
    Detections: 5626
    Classifications: 5626

Repeated test with lower TASK_TTR=30

✅ Job completed successfully
⏱ Total runtime: 154.36 seconds
📊 Final Results:
  Collect: 100.0% (SUCCESS)
    Total Images: 1000
  Process: 100.0% (SUCCESS)
    Processed: 1000
  Results: 100.0% (SUCCESS)
    Captures: 1000
    Detections: 5626
    Classifications: 5626

After this change and with RolnickLab/ami-data-companion#113: (no NATS re-delivers)

✅ Job completed successfully
⏱ Total runtime: 102.23 seconds
📊 Final Results:
  Collect: 100.0% (SUCCESS)
    Total Images: 1000
  Process: 100.0% (SUCCESS)
    Processed: 1000
  Results: 100.0% (SUCCESS)
    Captures: 1000
    Detections: 5626
    Classifications: 5626

After this change and with current ami worker

✅ Job completed successfully
⏱ Total runtime: 200.61 seconds
📊 Final Results:
  Collect: 100.0% (SUCCESS)
    Total Images: 1000
  Process: 100.0% (SUCCESS)
    Processed: 1000
  Results: 100.0% (SUCCESS)
    Captures: 1000
    Detections: 5626
    Classifications: 5626

Performance Summary Table

(seconds)

Configuration AMI Sync Results AMI Async Results
Before this change 192.54 647.46 (TASK_TTR=300)
154.36 (TASK_TTR=30)
After this change 200.61 102.23

Dataloader Benchmark

without changes:

python -m trapdata.antenna.benchmark --job-id 98 --num-workers 4 --batch-size 16
Throughput:
  9.82 images/second (total)

with changes:

python -m trapdata.antenna.benchmark --job-id 97 --num-workers 4 --batch-size 16
Throughput:
  14.88 images/second (total)
  • Verified in logs the number of workers and that the get created:
django-1  | Starting Gunicorn with 48 worker(s)...
django-1  | Starting Gunicorn in production mode with 48 workers...
django-1  | /usr/local/lib/python3.11/site-packages/gunicorn/util.py:25: UserWarning: pkg_resources is deprecated as an API. See https://setuptools.pypa.io/en/latest/pkg_resources.html. The pkg_resources package is slated for removal as early as 2025-11-30. Refrain from using this package or pin to Setuptools<81.
django-1  |   import pkg_resources
django-1  | [2026-02-20 17:29:06 +0000] [1] [INFO] Starting gunicorn 20.1.0
django-1  | [2026-02-20 17:29:06 +0000] [1] [INFO] Listening at: http://0.0.0.0:8000 (1)
django-1  | [2026-02-20 17:29:06 +0000] [1] [INFO] Using worker: uvicorn.workers.UvicornWorker
django-1  | [2026-02-20 17:29:06 +0000] [57] [INFO] Booting worker with pid: 57
django-1  | [2026-02-20 17:29:06 +0000] [58] [INFO] Booting worker with pid: 58
django-1  | [2026-02-20 17:29:06 +0000] [59] [INFO] Booting worker with pid: 59
django-1  | [2026-02-20 17:29:06 +0000] [60] [INFO] Booting worker with pid: 60
django-1  | [2026-02-20 17:29:06 +0000] [61] [INFO] Booting worker with pid: 61

Deployment Notes

Include instructions if this PR requires specific steps for its deployment (database migrations, config changes, etc.)

Checklist

  • I have tested these changes appropriately.
  • I have added and/or modified relevant tests.
  • I updated relevant documentation or comments.
  • I have verified that this PR follows the project's coding standards.
  • Any dependent changes have already been merged to main.

Summary by CodeRabbit

  • Chores
    • Made task queue acknowledgment capacity configurable (default: 100, via NATS_MAX_ACK_PENDING).
    • Reduced NATS visibility timeout from 300s to 30s (configurable via NATS_TASK_TTR).
    • Improved local and production startup controls: Gunicorn worker auto-sizing and optional direct dev server mode.

@netlify
Copy link

netlify bot commented Feb 20, 2026

Deploy Preview for antenna-ssec canceled.

Name Link
🔨 Latest commit a7bf533
🔍 Latest deploy log https://app.netlify.com/projects/antenna-ssec/deploys/699923f4e535400008244117

@netlify
Copy link

netlify bot commented Feb 20, 2026

Deploy Preview for antenna-preview canceled.

Name Link
🔨 Latest commit a7bf533
🔍 Latest deploy log https://app.netlify.com/projects/antenna-preview/deploys/699923f4d071da000820ef8b

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Feb 20, 2026

📝 Walkthrough

Walkthrough

Configuration settings for NATS queue (Task-To-Run and acknowledgment pending limits) are now customizable via environment settings. Django startup scripts refactored to unify Gunicorn and Uvicorn execution with conditional logic for debugger, worker count, and reload behavior, plus intelligent CPU-core-based worker defaults in production.

Changes

Cohort / File(s) Summary
NATS Queue Configuration
ami/ml/orchestration/nats_queue.py
TASK_TTR now configurable via settings.NATS_TASK_TTR (default 30s). TaskQueueManager constructor accepts optional max_ack_pending parameter, replacing hard-coded 100 value in consumer setup.
Django Local Startup
compose/local/django/start
Unified Gunicorn/Uvicorn startup flow replacing VS Code debug path. Added WORKERS variable (derived from WEB_CONCURRENCY), USE_UVICORN toggle, and conditional branches for debugger mode (1 worker with debugpy), single worker with auto-reload, or multi-worker execution.
Django Production Startup
compose/production/django/start
WEB_CONCURRENCY defaults to CPU core count (nproc) when unset. Added informational echo showing worker count. Gunicorn command and collectstatic step remain unchanged.

Estimated code review effort

🎯 2 (Simple) | ⏱️ ~12 minutes

Possibly related PRs

Poem

🐰 Hops of joy for configs free,
TTR and ack now decree!
Gunicorn dances, Uvicorn too,
Worker counts match cores anew,
No more hard-coded chains,
Flexibility now reigns!

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 66.67% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately summarizes the two main changes: task fetching improvements and web worker concurrency configuration enhancements.
Description check ✅ Passed The pull request description comprehensively covers all required template sections with detailed summaries, change lists, test results, deployment notes, and a complete checklist.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
  • 📝 Generate docstrings (stacked PR)
  • 📝 Generate docstrings (commit on current branch)
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@carlosgjs carlosgjs changed the title Carlos/taskperf PSv2: Task fetching perf Feb 20, 2026
@carlosgjs carlosgjs marked this pull request as ready for review February 20, 2026 18:50
Copilot AI review requested due to automatic review settings February 20, 2026 18:50
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This pull request introduces performance optimizations for task fetching in the ML pipeline orchestration system. The changes enable batch task reservation from NATS JetStream and configurable worker counts for Gunicorn, resulting in significant performance improvements (from ~647 seconds to ~102 seconds in end-to-end tests).

Changes:

  • Enhanced TaskQueueManager to support configurable max_ack_pending limits and batch task reservation via new ntasks parameter
  • Modified deployment scripts to allow configurable Gunicorn worker counts based on environment variables with CPU-based auto-detection
  • Updated API endpoint to utilize batch task fetching instead of sequential single-task fetching

Reviewed changes

Copilot reviewed 4 out of 4 changed files in this pull request and generated 7 comments.

File Description
ami/ml/orchestration/nats_queue.py Added configurable max_ack_pending and refactored reserve_task to support batch reservation
ami/jobs/views.py Refactored to use batch reserve_task and changed logging level for result processing
compose/production/django/start Added configurable Gunicorn workers with CPU-based default (2*cores + 1)
compose/local/django/start Added configurable Gunicorn workers with CPU-based default and conditional reload
Comments suppressed due to low confidence (1)

ami/ml/orchestration/nats_queue.py:179

  • The docstring for reserve_task is outdated and doesn't document the new parameters and return type. Update the documentation to include the ntasks parameter, and change the return type description from "PipelineProcessingTask with reply_subject set for acknowledgment, or None if no task available" to "List of PipelineProcessingTask objects with reply_subject set for acknowledgment, or None if no tasks available". Also document that ntasks defaults to 1.
        """
        Reserve a task from the specified stream.

        Args:
            job_id: The job ID (integer primary key) to pull tasks from
            timeout: Timeout in seconds for reservation (default: 5 seconds)

        Returns:
            PipelineProcessingTask with reply_subject set for acknowledgment, or None if no task available

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
ami/ml/orchestration/nats_queue.py (2)

168-179: ⚠️ Potential issue | 🟡 Minor

ntasks is untyped, and the docstring still describes a single-task return.

Two minor cleanup items:

  1. ntasks=1 has no type annotation; should be ntasks: int = 1.
  2. The Returns: block still reads "PipelineProcessingTask with reply_subject set for acknowledgment" — it should reflect list[PipelineProcessingTask] | None.
📝 Proposed fix
 async def reserve_task(
-    self, job_id: int, ntasks=1, timeout: float | None = None
+    self, job_id: int, ntasks: int = 1, timeout: float | None = None
 ) -> list[PipelineProcessingTask] | None:
     """
     Reserve a task from the specified stream.

     Args:
         job_id: The job ID (integer primary key) to pull tasks from
+        ntasks: Number of tasks to fetch in a single batch (default: 1)
         timeout: Timeout in seconds for reservation (default: 5 seconds)

     Returns:
-        PipelineProcessingTask with reply_subject set for acknowledgment, or None if no task available
+        List of PipelineProcessingTask objects with reply_subject set, or None if no tasks available
     """
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ami/ml/orchestration/nats_queue.py` around lines 168 - 179, In reserve_task,
add an explicit type annotation to the ntasks parameter (ntasks: int = 1) and
update the docstring Returns section to indicate it returns a
list[PipelineProcessingTask] or None (e.g., "Returns:
list[PipelineProcessingTask] | None — list of PipelineProcessingTask objects
with reply_subject set for acknowledgment, or None if no tasks available");
reference the reserve_task function and the PipelineProcessingTask type so the
change is applied in that function's signature and its docstring.

116-133: ⚠️ Potential issue | 🟠 Major

Add consumer configuration update logic when max_ack_pending changes.

The _ensure_consumer method logs the existing consumer's max_ack_pending but does not compare it to the configured self.max_ack_pending. If a consumer was previously created with an older default value, it retains that configuration after deployment; the new default only applies to newly created consumers.

Since nats-py supports updating consumers by calling add_consumer() again with the same durable_name and an updated ConsumerConfig, consider adding a comparison and update step:

try:
    info = await self.js.consumer_info(stream_name, consumer_name)
    if info.config.max_ack_pending != self.max_ack_pending:
        # Update consumer config
        updated_config = info.config.evolve(max_ack_pending=self.max_ack_pending)
        await self.js.add_consumer(stream=stream_name, config=updated_config)
        logger.info(f"Updated consumer {consumer_name} max_ack_pending to {self.max_ack_pending}")
    else:
        logger.debug(f"Consumer {consumer_name} already exists with max_ack_pending={info.config.max_ack_pending}")
except Exception:
    # Consumer doesn't exist, create it
    ...

Without this, the throughput improvements targeted by this PR won't apply to existing consumers until they are explicitly deleted and recreated.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ami/ml/orchestration/nats_queue.py` around lines 116 - 133, In
_ensure_consumer, after calling self.js.consumer_info(stream_name,
consumer_name) check if info.config.max_ack_pending != self.max_ack_pending and
if so call self.js.add_consumer(...) with an updated config (use
info.config.evolve(max_ack_pending=self.max_ack_pending) or construct a new
ConsumerConfig preserving existing fields and setting max_ack_pending) and log
the update (e.g., "Updated consumer {consumer_name} max_ack_pending to
{self.max_ack_pending}"); otherwise keep the existing debug log; leave the
existing create path (ConsumerConfig(...)) in the except block unchanged.
🧹 Nitpick comments (5)
ami/ml/orchestration/nats_queue.py (2)

51-53: Use is not None instead of or for max_ack_pending defaulting.

The or pattern silently ignores an explicitly passed value of 0 (or any other falsy integer) because 0 is falsy in Python. The type annotation is int | None, so the sentinel for "use default" is None, not 0. While 0 is not a valid NATS max_ack_pending value, the pattern is semantically imprecise and could mask future callers passing -1 (unlimited).

♻️ Proposed fix
-        self.max_ack_pending = max_ack_pending or getattr(settings, "NATS_MAX_ACK_PENDING", 1000)
+        self.max_ack_pending = max_ack_pending if max_ack_pending is not None else getattr(settings, "NATS_MAX_ACK_PENDING", 1000)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ami/ml/orchestration/nats_queue.py` around lines 51 - 53, The __init__
currently uses "max_ack_pending = max_ack_pending or getattr(..., 1000)" which
treats falsy integers (like 0) as unset; in the __init__ of the NATS queue class
replace that "or" pattern with an explicit None check so the default is used
only when the caller passed None (e.g., self.max_ack_pending = max_ack_pending
if max_ack_pending is not None else getattr(settings, "NATS_MAX_ACK_PENDING",
1000)); keep the nats_url assignment as-is.

199-217: Stale comment + Ruff TRY300: move return tasks to the else block.

Line 200's # Fetch a single message comment is now incorrect — the call fetches ntasks messages.

Additionally, Ruff flags that return tasks at line 217 sits inside the try body (TRY300). Moving it to the else clause makes the happy-path/error-path separation explicit: the else runs only when no exception was raised, and the finally still guarantees psub.unsubscribe().

♻️ Proposed fix
             tasks = []
             try:
-                # Fetch a single message
+                # Fetch up to ntasks messages
                 msgs = await psub.fetch(ntasks, timeout=timeout)

                 if msgs:
                     for msg in msgs:
                         task_data = json.loads(msg.data.decode())
                         metadata = msg.metadata

                         # Parse the task data into PipelineProcessingTask
                         task = PipelineProcessingTask(**task_data)
                         # Set the reply_subject for acknowledgment
                         task.reply_subject = msg.reply

                         logger.debug(
                             f"Reserved task from stream for job '{job_id}', sequence {metadata.sequence.stream}"
                         )
                         tasks.append(task)
-                return tasks

             except nats.errors.TimeoutError as e:
                 # No messages available
                 logger.debug(f"No tasks available in stream for job '{job_id}': {e}")
                 return None
+            else:
+                return tasks
             finally:
                 # Always unsubscribe
                 await psub.unsubscribe()
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ami/ml/orchestration/nats_queue.py` around lines 199 - 217, Update the
outdated comment and move the `return tasks` out of the `try` block into an
`else` so the happy path is separated from exceptions (this addresses Ruff
TRY300); specifically, change the comment above `psub.fetch(ntasks,
timeout=timeout)` to indicate it fetches up to `ntasks` messages (not a single
message), and after processing messages into `PipelineProcessingTask` instances
(keeping `task.reply_subject = msg.reply` and appending to `tasks`), place the
`return tasks` in the `else` clause of the `try/except/finally` construct so
that `psub.unsubscribe()` in the `finally` still always runs while successful
returns occur only when no exception was raised.
compose/production/django/start (1)

9-18: LGTM — optionally document the intentional formula difference from the local script.

The production script uses CPU_CORES * 2 + 1 (standard Gunicorn I/O-worker formula) while the local counterpart just uses nproc. Both share the same GUNICORN_WORKERS env var name for overrides, so a developer who sets GUNICORN_WORKERS=0 expecting auto-detection will get different worker counts in the two environments. A brief inline comment explaining the intentional divergence would help avoid confusion.

📝 Suggested comment to clarify the intentional difference
 # Configurable number of Gunicorn workers (default: auto-detect based on CPU cores)
+# Uses (cores * 2 + 1) in production for I/O-bound workloads; the local script uses
+# plain nproc to keep development lighter.
 WORKERS=${GUNICORN_WORKERS:-0}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@compose/production/django/start` around lines 9 - 18, Add a concise inline
comment above the WORKERS calculation explaining the intentional divergence in
auto-detection: note that the production script computes WORKERS using the
Gunicorn recommended formula (CPU_CORES * 2 + 1) while the local script uses
nproc, and that both respect the same GUNICORN_WORKERS env var (with 0
triggering auto-detect), so developers understand why setting GUNICORN_WORKERS=0
yields different counts between environments; reference the WORKERS,
GUNICORN_WORKERS, CPU_CORES and nproc symbols in the comment.
ami/jobs/views.py (1)

228-228: Add a max_value cap to the batch parameter to bound memory usage.

Without an upper bound, a client can request an arbitrarily large batch (e.g., batch=1000000). While timeout=0.1 limits how long the NATS fetch blocks, psub.fetch(ntasks, ...) still allocates a response buffer sized to the return count before the timeout fires. A reasonable cap (e.g., 1000, matching max_ack_pending) would prevent accidental or malicious oversized requests.

♻️ Proposed fix
-            batch = IntegerField(required=True, min_value=1).clean(request.query_params.get("batch"))
+            batch = IntegerField(required=True, min_value=1, max_value=1000).clean(request.query_params.get("batch"))
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ami/jobs/views.py` at line 228, Add an upper bound to the batch IntegerField
to prevent unbounded memory usage: when constructing the batch validator (the
IntegerField(...) call that assigns to batch reading
request.query_params.get("batch")), include a max_value (suggested 1000 to match
max_ack_pending) so the cleaned batch value is capped; this ensures requests
passed to psub.fetch(ntasks, ...) cannot request arbitrarily large response
buffers.
compose/local/django/start (1)

9-31: Auto-detection silently opts out of --reload on multi-core developer machines.

When GUNICORN_WORKERS is unset, WORKERS=$(nproc) resolves to the machine's core count. On any machine with more than 1 core the else branch fires — "production mode" without auto-reload — which may surprise developers who expect --reload to be the default for local work. Explicitly setting GUNICORN_WORKERS=1 works around it, but that's a non-obvious requirement. Consider defaulting local auto-detection to 1 worker (reload-enabled) and only going multi-worker when GUNICORN_WORKERS is explicitly set to a value > 1.

♻️ Proposed adjustment to default local behaviour
 # Configurable number of Gunicorn workers (default: auto-detect based on CPU cores)
 WORKERS=${GUNICORN_WORKERS:-0}
 if [ "$WORKERS" -eq 0 ]; then
-    WORKERS=$(nproc)
+    # Default to single worker with reload for local development.
+    # Set GUNICORN_WORKERS > 1 explicitly for multi-worker performance testing.
+    WORKERS=1
 fi
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@compose/local/django/start` around lines 9 - 31, The script currently
auto-sets WORKERS=$(nproc) when GUNICORN_WORKERS is unset, which disables
--reload on multi-core dev machines; change the logic to default to a single
reload-enabled worker unless the user explicitly sets GUNICORN_WORKERS > 1.
Concretely, replace the unconditional nproc assignment with a check of whether
GUNICORN_WORKERS is set/non-empty: if GUNICORN_WORKERS is set use its numeric
value (and keep multi-worker behavior only when >1), otherwise set WORKERS=1 so
the dev default uses the single-worker --reload branch; ensure you still honor
an explicitly provided GUNICORN_WORKERS (and fall back/validate numeric input if
needed).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@ami/ml/orchestration/nats_queue.py`:
- Around line 44-48: The docstring in nats_queue.py incorrectly states the
default max_ack_pending is 500; update the docstring to match the actual default
used in __init__ (getattr(settings, "NATS_MAX_ACK_PENDING", 1000)) or change the
default in __init__ to 500 so they match; refer to the __init__ method and the
getattr call for NATS_MAX_ACK_PENDING / max_ack_pending to locate and correct
the discrepancy.
- Around line 54-56: The current logger.info in TaskQueueManager logs
self.nats_url which may contain embedded credentials; add a small helper (e.g.,
_redact_url) that uses urllib.parse.urlparse/urlunparse to remove or replace the
authority (user:password@) portion (keep hostname:port) and call logger.info
with the redacted value instead of self.nats_url; update the TaskQueueManager
init logging line to use this helper so credentials are never written to logs.

---

Outside diff comments:
In `@ami/ml/orchestration/nats_queue.py`:
- Around line 168-179: In reserve_task, add an explicit type annotation to the
ntasks parameter (ntasks: int = 1) and update the docstring Returns section to
indicate it returns a list[PipelineProcessingTask] or None (e.g., "Returns:
list[PipelineProcessingTask] | None — list of PipelineProcessingTask objects
with reply_subject set for acknowledgment, or None if no tasks available");
reference the reserve_task function and the PipelineProcessingTask type so the
change is applied in that function's signature and its docstring.
- Around line 116-133: In _ensure_consumer, after calling
self.js.consumer_info(stream_name, consumer_name) check if
info.config.max_ack_pending != self.max_ack_pending and if so call
self.js.add_consumer(...) with an updated config (use
info.config.evolve(max_ack_pending=self.max_ack_pending) or construct a new
ConsumerConfig preserving existing fields and setting max_ack_pending) and log
the update (e.g., "Updated consumer {consumer_name} max_ack_pending to
{self.max_ack_pending}"); otherwise keep the existing debug log; leave the
existing create path (ConsumerConfig(...)) in the except block unchanged.

---

Nitpick comments:
In `@ami/jobs/views.py`:
- Line 228: Add an upper bound to the batch IntegerField to prevent unbounded
memory usage: when constructing the batch validator (the IntegerField(...) call
that assigns to batch reading request.query_params.get("batch")), include a
max_value (suggested 1000 to match max_ack_pending) so the cleaned batch value
is capped; this ensures requests passed to psub.fetch(ntasks, ...) cannot
request arbitrarily large response buffers.

In `@ami/ml/orchestration/nats_queue.py`:
- Around line 51-53: The __init__ currently uses "max_ack_pending =
max_ack_pending or getattr(..., 1000)" which treats falsy integers (like 0) as
unset; in the __init__ of the NATS queue class replace that "or" pattern with an
explicit None check so the default is used only when the caller passed None
(e.g., self.max_ack_pending = max_ack_pending if max_ack_pending is not None
else getattr(settings, "NATS_MAX_ACK_PENDING", 1000)); keep the nats_url
assignment as-is.
- Around line 199-217: Update the outdated comment and move the `return tasks`
out of the `try` block into an `else` so the happy path is separated from
exceptions (this addresses Ruff TRY300); specifically, change the comment above
`psub.fetch(ntasks, timeout=timeout)` to indicate it fetches up to `ntasks`
messages (not a single message), and after processing messages into
`PipelineProcessingTask` instances (keeping `task.reply_subject = msg.reply` and
appending to `tasks`), place the `return tasks` in the `else` clause of the
`try/except/finally` construct so that `psub.unsubscribe()` in the `finally`
still always runs while successful returns occur only when no exception was
raised.

In `@compose/local/django/start`:
- Around line 9-31: The script currently auto-sets WORKERS=$(nproc) when
GUNICORN_WORKERS is unset, which disables --reload on multi-core dev machines;
change the logic to default to a single reload-enabled worker unless the user
explicitly sets GUNICORN_WORKERS > 1. Concretely, replace the unconditional
nproc assignment with a check of whether GUNICORN_WORKERS is set/non-empty: if
GUNICORN_WORKERS is set use its numeric value (and keep multi-worker behavior
only when >1), otherwise set WORKERS=1 so the dev default uses the single-worker
--reload branch; ensure you still honor an explicitly provided GUNICORN_WORKERS
(and fall back/validate numeric input if needed).

In `@compose/production/django/start`:
- Around line 9-18: Add a concise inline comment above the WORKERS calculation
explaining the intentional divergence in auto-detection: note that the
production script computes WORKERS using the Gunicorn recommended formula
(CPU_CORES * 2 + 1) while the local script uses nproc, and that both respect the
same GUNICORN_WORKERS env var (with 0 triggering auto-detect), so developers
understand why setting GUNICORN_WORKERS=0 yields different counts between
environments; reference the WORKERS, GUNICORN_WORKERS, CPU_CORES and nproc
symbols in the comment.

@mihow
Copy link
Collaborator

mihow commented Feb 21, 2026

Reconciliation plan with PR #1135

PR #1135 (fix/nats-connection-safety) overlaps with this PR in three files: nats_queue.py, views.py, and test_nats_queue.py. After reviewing both side by side, here's a proposed path forward:

Merge #1135 first — it addresses the reliability issues found during PSv2 integration testing (JetStream timeouts, allow_reconnect=False, terminal-job short-circuit). Once merged, this PR should rebase onto main and incorporate the following changes on top:

Keep from this PR (rebase onto #1135):

  1. Configurable max_ack_pending — add NATS_MAX_ACK_PENDING Django setting, pass to TaskQueueManager.__init__. The current hardcoded 100 in fix: PSv2 follow-up fixes from integration tests #1135 may be too low for workers pulling large batches.

  2. Configurable TASK_TTR — the 300s → 30s reduction showed significant throughput gains in your benchmarks. Suggest making it a Django setting (NATS_TASK_TTR) rather than a hardcoded value, since the right number depends on model inference time.

  3. Gunicorn worker auto-detection — the production formula (2*cpus+1) is good. For local dev, consider defaulting to 1 worker (not nproc) to preserve --reload behavior and avoid excessive memory usage.

Adapt to #1135's API:

Not needed after #1135 merges:

Happy to help with the rebase if useful.

@mihow
Copy link
Collaborator

mihow commented Feb 21, 2026

Update after investigating the production server.

TL;DR: Production is already running 4 gunicorn workers! and gunicorn has a built-in env var for this that was set.

production has:

  • 16 vCPUs, 32 GiB RAM (Intel Xeon Skylake) — server is barely loaded (~6 GiB used)
  • The django container has 1 gunicorn master + 4 UvicornWorker children already running
  • This is because gunicorn natively reads the WEB_CONCURRENCY environment variable as its --workers default (source), and production already has WEB_CONCURRENCY=4 set in the env file

The auto-detect logic (nproc * 2 + 1) in the start script is still useful as a fallback when the env var isn't set.

Claude says: "the formula is designed for synchronous WSGI workers. For async ASGI workers (UvicornWorker), each process handles many concurrent connections via the event loop, so 33 workers on a 16-core machine would be a lot. Something like nproc or nproc / 2 + 1 is more appropriate for ASGI."

Maybe we set to WEB_CONCURRENCY to 8?
also we will have multiple containers or VMs running Django at some point (horizontal scaling)

I'll leave a comment on #1145 with a plan for merging the two approaches.

@mihow mihow changed the title PSv2: Task fetching perf PSv2: Improve task fetching & web worker concurrency configuration Feb 21, 2026
Rebase onto main after RolnickLab#1135 merge. Keep only the additions unique to
this branch:

- Make TASK_TTR configurable via NATS_TASK_TTR Django setting (default 30s)
- Make max_ack_pending configurable via NATS_MAX_ACK_PENDING setting (default 100)
- Local dev: switch to gunicorn+UvicornWorker by default for production
  parity, with USE_UVICORN=1 escape hatch for raw uvicorn
- Production: auto-detect WEB_CONCURRENCY from CPU cores (capped at 8)
  when not explicitly set in the environment

Co-Authored-By: Claude <noreply@anthropic.com>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
ami/ml/orchestration/tests/test_nats_queue.py (1)

91-106: ⚠️ Potential issue | 🟡 Minor

test_reserve_task_no_messages tests the wrong code path — real NATS "no messages" raises TimeoutError, not an empty list.

In the nats.py client, fetch raises nats.errors.TimeoutError (converting asyncio.TimeoutError / CancelledError) when no messages arrive within the timeout — it never returns []. The current mock (return_value=[]) exercises the if msgs: dead-code branch that returns [], while the real empty-queue path goes through except nats.errors.TimeoutError and returns None.

This means:

  1. The None return case that callers guard against with tasks or [] is never tested.
  2. The PR reviewer's note to "mock nats.errors.TimeoutError for no-messages cases" was not addressed.

Additionally, len(tasks or []) at line 105 is redundant since assertIsNotNone on line 104 already ensures tasks is not None in this scenario.

🧪 Proposed fix
 async def test_reserve_task_no_messages(self):
     """Test reserve_task when no messages are available."""
     nc, js = self._create_mock_nats_connection()

     mock_psub = MagicMock()
-    mock_psub.fetch = AsyncMock(return_value=[])
+    mock_psub.fetch = AsyncMock(side_effect=nats.errors.TimeoutError)
     mock_psub.unsubscribe = AsyncMock()
     js.pull_subscribe = AsyncMock(return_value=mock_psub)

     with patch("ami.ml.orchestration.nats_queue.get_connection", AsyncMock(return_value=(nc, js))):
         async with TaskQueueManager() as manager:
             tasks = await manager.reserve_task(123)

-            self.assertIsNotNone(tasks)
-            self.assertEqual(len(tasks or []), 0)
+            # TimeoutError path currently returns None; callers use `tasks or []`
+            self.assertIsNone(tasks)
             mock_psub.unsubscribe.assert_called_once()

Note: if reserve_task is updated to return [] instead of None on TimeoutError (see separate comment on nats_queue.py), update the assertion to assertIsNotNone + assertEqual(len(tasks), 0).

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ami/ml/orchestration/tests/test_nats_queue.py` around lines 91 - 106, The
test test_reserve_task_no_messages currently mocks mock_psub.fetch to return []
which doesn't match real NATS behavior; change the mock to raise
nats.errors.TimeoutError instead (e.g., mock_psub.fetch =
AsyncMock(side_effect=nats.errors.TimeoutError())) so reserve_task's
TimeoutError except branch is exercised; ensure you still patch get_connection
to return (nc, js), keep assertions aligned with reserve_task's contract (if
reserve_task returns None on timeout assertIsNone or keep tasks or [] handling
removed and assertEqual(len(tasks), 0) only if reserve_task is updated to return
[]), and verify mock_psub.unsubscribe is still asserted
(mock_psub.unsubscribe.assert_called_once()).
ami/ml/orchestration/nats_queue.py (1)

216-228: ⚠️ Potential issue | 🟡 Minor

TimeoutError returns None while an empty fetch returns [] — inconsistent "no tasks" signal.

There are two distinct "no tasks available" return values:

  • An empty msgs from fetchreturn tasks (i.e., [])
  • A nats.errors.TimeoutError from fetchreturn None

Since TimeoutError is the actual "no messages" path in NATS JetStream (confirmed by the nats.py client source), returning None here is the normal case. Callers (ami/jobs/views.py) already guard with tasks or [], but having two different "empty" representations is error-prone and was flagged in the PR review ("returning a list (never None)"). Additionally, Ruff flags the return tasks at line 216 as TRY300 — it should move to an else block.

♻️ Proposed fix — return `[]` consistently and move `return` to `else`
             try:
                 # Fetch messages (up to ntasks)
                 msgs = await psub.fetch(ntasks, timeout=timeout)

                 if msgs:
                     for msg in msgs:
                         task_data = json.loads(msg.data.decode())
                         metadata = msg.metadata

                         # Parse the task data into PipelineProcessingTask
                         task = PipelineProcessingTask(**task_data)
                         # Set the reply_subject for acknowledgment
                         task.reply_subject = msg.reply

                         logger.debug(
                             f"Reserved task from stream for job '{job_id}', sequence {metadata.sequence.stream}"
                         )
                         tasks.append(task)
-                return tasks

             except nats.errors.TimeoutError as e:
                 # No messages available
                 logger.debug(f"No tasks available in stream for job '{job_id}': {e}")
-                return None
+                return []
+            else:
+                return tasks
             finally:
                 # Always unsubscribe
                 await psub.unsubscribe()

The return type annotation can also be tightened to -> list[PipelineProcessingTask] once None is removed.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ami/ml/orchestration/nats_queue.py` around lines 216 - 228, In the function
containing the JetStream fetch/unsubscribe logic (the reserve task path that
logs "Failed to reserve task from stream for job '{job_id}'"), make the "no
tasks" return value consistent by returning an empty list ([]) for the
nats.errors.TimeoutError path instead of None, and move the existing "return
tasks" out of the inner try into an else block (so the return happens only when
no exception was raised) to satisfy Ruff's TRY300; after this change you can
tighten the function's return annotation to list[PipelineProcessingTask] if
desired.
🧹 Nitpick comments (1)
ami/ml/orchestration/nats_queue.py (1)

32-32: TASK_TTR is still hardcoded — make it configurable via a Django setting.

The PR reviewer mihow explicitly requested making TASK_TTR configurable via NATS_TASK_TTR. At 30 seconds, tasks that take longer will be re-delivered without operators being able to tune it without a code change.

♻️ Proposed fix
-TASK_TTR = 30  # Default Time-To-Run (visibility timeout) in seconds
+TASK_TTR = getattr(settings, "NATS_TASK_TTR", 30)  # Time-To-Run (visibility timeout) in seconds
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ami/ml/orchestration/nats_queue.py` at line 32, TASK_TTR is hardcoded to 30s;
make it configurable via a Django setting named NATS_TASK_TTR by reading from
django.conf.settings (e.g., TASK_TTR = getattr(settings, "NATS_TASK_TTR", 30))
in ami/ml/orchestration/nats_queue.py so operators can override the visibility
timeout; ensure you coerce/validate the value as an int (fallback to 30 on
invalid/missing values) and keep the TASK_TTR symbol used elsewhere unchanged.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@ami/ml/orchestration/nats_queue.py`:
- Around line 52-55: The constructor for TaskQueueManager (__init__) incorrectly
uses "or" which treats an explicit 0 for max_ack_pending as falsy; change the
assignment to explicitly check for None (e.g., if max_ack_pending is not None:
use it; else fall back to getattr(settings, "NATS_MAX_ACK_PENDING", 1000)) so
that 0 is respected as a valid value for self.max_ack_pending while preserving
the settings default behavior.
- Around line 167-178: The docstring for reserve_task is stale: add
documentation for the ntasks parameter, fix the Returns to reflect the actual
signature (list[PipelineProcessingTask] | None), and clarify when None is
returned (e.g., on reservation timeout/TimeoutError) vs when an empty list may
be returned; update the Args block to include ntasks: int = 1 and confirm
timeout: float | None default, and update the Returns description to state it
returns a list of PipelineProcessingTask (with reply_subject set for ack) or
None if the reservation timed out.

In `@compose/local/django/start`:
- Around line 24-30: Replace deprecated uvicorn.workers.UvicornWorker usage with
the new package and class name: install and use the uvicorn-worker package and
change all occurrences of the worker flag from "uvicorn.workers.UvicornWorker"
to "uvicorn_worker.UvicornWorker" (update the three occurrences in the start
script where exec gunicorn ... -k uvicorn.workers.UvicornWorker appears and the
debug line similarly) so the Gunicorn -k option points to
uvicorn_worker.UvicornWorker and remains compatible with Uvicorn 0.30.0+.

In `@compose/production/django/start`:
- Around line 11-14: The calculation for WEB_CONCURRENCY incorrectly only caps
the upper bound and can yield 0; update the logic around CPU_CORES (value from
nproc) so WEB_CONCURRENCY is at least 1 and at most 8 (e.g., compute max(1,
min(CPU_CORES, 8)) or add a guard that sets WEB_CONCURRENCY to 1 if CPU_CORES is
less than 1) — adjust the block that uses nproc/CPU_CORES and the exported
WEB_CONCURRENCY variable to enforce the "(minimum 1, maximum 8)" requirement.

---

Outside diff comments:
In `@ami/ml/orchestration/nats_queue.py`:
- Around line 216-228: In the function containing the JetStream
fetch/unsubscribe logic (the reserve task path that logs "Failed to reserve task
from stream for job '{job_id}'"), make the "no tasks" return value consistent by
returning an empty list ([]) for the nats.errors.TimeoutError path instead of
None, and move the existing "return tasks" out of the inner try into an else
block (so the return happens only when no exception was raised) to satisfy
Ruff's TRY300; after this change you can tighten the function's return
annotation to list[PipelineProcessingTask] if desired.

In `@ami/ml/orchestration/tests/test_nats_queue.py`:
- Around line 91-106: The test test_reserve_task_no_messages currently mocks
mock_psub.fetch to return [] which doesn't match real NATS behavior; change the
mock to raise nats.errors.TimeoutError instead (e.g., mock_psub.fetch =
AsyncMock(side_effect=nats.errors.TimeoutError())) so reserve_task's
TimeoutError except branch is exercised; ensure you still patch get_connection
to return (nc, js), keep assertions aligned with reserve_task's contract (if
reserve_task returns None on timeout assertIsNone or keep tasks or [] handling
removed and assertEqual(len(tasks), 0) only if reserve_task is updated to return
[]), and verify mock_psub.unsubscribe is still asserted
(mock_psub.unsubscribe.assert_called_once()).

---

Nitpick comments:
In `@ami/ml/orchestration/nats_queue.py`:
- Line 32: TASK_TTR is hardcoded to 30s; make it configurable via a Django
setting named NATS_TASK_TTR by reading from django.conf.settings (e.g., TASK_TTR
= getattr(settings, "NATS_TASK_TTR", 30)) in ami/ml/orchestration/nats_queue.py
so operators can override the visibility timeout; ensure you coerce/validate the
value as an int (fallback to 30 on invalid/missing values) and keep the TASK_TTR
symbol used elsewhere unchanged.

@mihow
Copy link
Collaborator

mihow commented Feb 21, 2026

Summary of changes after rebase:

What was kept (after adapting to the new changes in main from #1135):

  • Configurable max_ack_pending — The init(max_ack_pending=...) parameter and
    settings.NATS_MAX_ACK_PENDING pattern stayed. Default is 100 (same as fix: PSv2 follow-up fixes from integration tests #1135's hardcoded value), but
    now tunable per-environment.
  • Reduced TASK_TTR — The 300→30s reduction stayed, now as settings.NATS_TASK_TTR so it's configurable.
    Benchmarks showed significant throughput gains with shorter visibility timeouts.
  • Gunicorn local dev (compose/local/django/start) — The full rewrite stayed: gunicorn+UvicornWorker as
    default, WEB_CONCURRENCY support, USE_UVICORN=1 escape hatch, debugger support with single worker.
  • Production auto-scaling (compose/production/django/start) — The WEB_CONCURRENCY auto-detection from
    nproc capped at 8 stayed, plus the echo for visibility.

What was dropped (superseded by #1135):

Incorporated from #1135 via rebase:

  • asyncio.wait_for() timeouts on all JetStream metadata operations (stream_info, add_stream,
    consumer_info, add_consumer) with NATS_JETSTREAM_TIMEOUT=10s, preventing hung connections from
    blocking Django workers.

Net effect: The performance tuning ideas (lower TTR, configurable ack pending, gunicorn workers) are
preserved and made configurable via Django settings. The reliability/safety layer from #1135
(timeouts, error handling, fail-fast connections) is untouched underneath.

- Fix max_ack_pending falsy-zero guard (use `is not None` instead of `or`)
- Update TaskQueueManager docstring with Args section
- Simplify production WEB_CONCURRENCY fallback (just use nproc)

Co-Authored-By: Claude <noreply@anthropic.com>
@mihow mihow merged commit 3431b68 into RolnickLab:main Feb 21, 2026
7 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants