PSv2: Improve task fetching & web worker concurrency configuration#1142
PSv2: Improve task fetching & web worker concurrency configuration#1142mihow merged 2 commits intoRolnickLab:mainfrom
Conversation
✅ Deploy Preview for antenna-ssec canceled.
|
✅ Deploy Preview for antenna-preview canceled.
|
📝 WalkthroughWalkthroughConfiguration settings for NATS queue (Task-To-Run and acknowledgment pending limits) are now customizable via environment settings. Django startup scripts refactored to unify Gunicorn and Uvicorn execution with conditional logic for debugger, worker count, and reload behavior, plus intelligent CPU-core-based worker defaults in production. Changes
Estimated code review effort🎯 2 (Simple) | ⏱️ ~12 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Pull request overview
This pull request introduces performance optimizations for task fetching in the ML pipeline orchestration system. The changes enable batch task reservation from NATS JetStream and configurable worker counts for Gunicorn, resulting in significant performance improvements (from ~647 seconds to ~102 seconds in end-to-end tests).
Changes:
- Enhanced
TaskQueueManagerto support configurablemax_ack_pendinglimits and batch task reservation via newntasksparameter - Modified deployment scripts to allow configurable Gunicorn worker counts based on environment variables with CPU-based auto-detection
- Updated API endpoint to utilize batch task fetching instead of sequential single-task fetching
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 7 comments.
| File | Description |
|---|---|
| ami/ml/orchestration/nats_queue.py | Added configurable max_ack_pending and refactored reserve_task to support batch reservation |
| ami/jobs/views.py | Refactored to use batch reserve_task and changed logging level for result processing |
| compose/production/django/start | Added configurable Gunicorn workers with CPU-based default (2*cores + 1) |
| compose/local/django/start | Added configurable Gunicorn workers with CPU-based default and conditional reload |
Comments suppressed due to low confidence (1)
ami/ml/orchestration/nats_queue.py:179
- The docstring for reserve_task is outdated and doesn't document the new parameters and return type. Update the documentation to include the ntasks parameter, and change the return type description from "PipelineProcessingTask with reply_subject set for acknowledgment, or None if no task available" to "List of PipelineProcessingTask objects with reply_subject set for acknowledgment, or None if no tasks available". Also document that ntasks defaults to 1.
"""
Reserve a task from the specified stream.
Args:
job_id: The job ID (integer primary key) to pull tasks from
timeout: Timeout in seconds for reservation (default: 5 seconds)
Returns:
PipelineProcessingTask with reply_subject set for acknowledgment, or None if no task available
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
ami/ml/orchestration/nats_queue.py (2)
168-179:⚠️ Potential issue | 🟡 Minor
ntasksis untyped, and the docstring still describes a single-task return.Two minor cleanup items:
ntasks=1has no type annotation; should bentasks: int = 1.- The
Returns:block still reads "PipelineProcessingTask with reply_subject set for acknowledgment" — it should reflectlist[PipelineProcessingTask] | None.📝 Proposed fix
async def reserve_task( - self, job_id: int, ntasks=1, timeout: float | None = None + self, job_id: int, ntasks: int = 1, timeout: float | None = None ) -> list[PipelineProcessingTask] | None: """ Reserve a task from the specified stream. Args: job_id: The job ID (integer primary key) to pull tasks from + ntasks: Number of tasks to fetch in a single batch (default: 1) timeout: Timeout in seconds for reservation (default: 5 seconds) Returns: - PipelineProcessingTask with reply_subject set for acknowledgment, or None if no task available + List of PipelineProcessingTask objects with reply_subject set, or None if no tasks available """🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@ami/ml/orchestration/nats_queue.py` around lines 168 - 179, In reserve_task, add an explicit type annotation to the ntasks parameter (ntasks: int = 1) and update the docstring Returns section to indicate it returns a list[PipelineProcessingTask] or None (e.g., "Returns: list[PipelineProcessingTask] | None — list of PipelineProcessingTask objects with reply_subject set for acknowledgment, or None if no tasks available"); reference the reserve_task function and the PipelineProcessingTask type so the change is applied in that function's signature and its docstring.
116-133:⚠️ Potential issue | 🟠 MajorAdd consumer configuration update logic when
max_ack_pendingchanges.The
_ensure_consumermethod logs the existing consumer'smax_ack_pendingbut does not compare it to the configuredself.max_ack_pending. If a consumer was previously created with an older default value, it retains that configuration after deployment; the new default only applies to newly created consumers.Since nats-py supports updating consumers by calling
add_consumer()again with the samedurable_nameand an updatedConsumerConfig, consider adding a comparison and update step:try: info = await self.js.consumer_info(stream_name, consumer_name) if info.config.max_ack_pending != self.max_ack_pending: # Update consumer config updated_config = info.config.evolve(max_ack_pending=self.max_ack_pending) await self.js.add_consumer(stream=stream_name, config=updated_config) logger.info(f"Updated consumer {consumer_name} max_ack_pending to {self.max_ack_pending}") else: logger.debug(f"Consumer {consumer_name} already exists with max_ack_pending={info.config.max_ack_pending}") except Exception: # Consumer doesn't exist, create it ...Without this, the throughput improvements targeted by this PR won't apply to existing consumers until they are explicitly deleted and recreated.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@ami/ml/orchestration/nats_queue.py` around lines 116 - 133, In _ensure_consumer, after calling self.js.consumer_info(stream_name, consumer_name) check if info.config.max_ack_pending != self.max_ack_pending and if so call self.js.add_consumer(...) with an updated config (use info.config.evolve(max_ack_pending=self.max_ack_pending) or construct a new ConsumerConfig preserving existing fields and setting max_ack_pending) and log the update (e.g., "Updated consumer {consumer_name} max_ack_pending to {self.max_ack_pending}"); otherwise keep the existing debug log; leave the existing create path (ConsumerConfig(...)) in the except block unchanged.
🧹 Nitpick comments (5)
ami/ml/orchestration/nats_queue.py (2)
51-53: Useis not Noneinstead oforformax_ack_pendingdefaulting.The
orpattern silently ignores an explicitly passed value of0(or any other falsy integer) because0is falsy in Python. The type annotation isint | None, so the sentinel for "use default" isNone, not0. While0is not a valid NATSmax_ack_pendingvalue, the pattern is semantically imprecise and could mask future callers passing-1(unlimited).♻️ Proposed fix
- self.max_ack_pending = max_ack_pending or getattr(settings, "NATS_MAX_ACK_PENDING", 1000) + self.max_ack_pending = max_ack_pending if max_ack_pending is not None else getattr(settings, "NATS_MAX_ACK_PENDING", 1000)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@ami/ml/orchestration/nats_queue.py` around lines 51 - 53, The __init__ currently uses "max_ack_pending = max_ack_pending or getattr(..., 1000)" which treats falsy integers (like 0) as unset; in the __init__ of the NATS queue class replace that "or" pattern with an explicit None check so the default is used only when the caller passed None (e.g., self.max_ack_pending = max_ack_pending if max_ack_pending is not None else getattr(settings, "NATS_MAX_ACK_PENDING", 1000)); keep the nats_url assignment as-is.
199-217: Stale comment + Ruff TRY300: movereturn tasksto theelseblock.Line 200's
# Fetch a single messagecomment is now incorrect — the call fetchesntasksmessages.Additionally, Ruff flags that
return tasksat line 217 sits inside thetrybody (TRY300). Moving it to theelseclause makes the happy-path/error-path separation explicit: theelseruns only when no exception was raised, and thefinallystill guaranteespsub.unsubscribe().♻️ Proposed fix
tasks = [] try: - # Fetch a single message + # Fetch up to ntasks messages msgs = await psub.fetch(ntasks, timeout=timeout) if msgs: for msg in msgs: task_data = json.loads(msg.data.decode()) metadata = msg.metadata # Parse the task data into PipelineProcessingTask task = PipelineProcessingTask(**task_data) # Set the reply_subject for acknowledgment task.reply_subject = msg.reply logger.debug( f"Reserved task from stream for job '{job_id}', sequence {metadata.sequence.stream}" ) tasks.append(task) - return tasks except nats.errors.TimeoutError as e: # No messages available logger.debug(f"No tasks available in stream for job '{job_id}': {e}") return None + else: + return tasks finally: # Always unsubscribe await psub.unsubscribe()🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@ami/ml/orchestration/nats_queue.py` around lines 199 - 217, Update the outdated comment and move the `return tasks` out of the `try` block into an `else` so the happy path is separated from exceptions (this addresses Ruff TRY300); specifically, change the comment above `psub.fetch(ntasks, timeout=timeout)` to indicate it fetches up to `ntasks` messages (not a single message), and after processing messages into `PipelineProcessingTask` instances (keeping `task.reply_subject = msg.reply` and appending to `tasks`), place the `return tasks` in the `else` clause of the `try/except/finally` construct so that `psub.unsubscribe()` in the `finally` still always runs while successful returns occur only when no exception was raised.compose/production/django/start (1)
9-18: LGTM — optionally document the intentional formula difference from the local script.The production script uses
CPU_CORES * 2 + 1(standard Gunicorn I/O-worker formula) while the local counterpart just usesnproc. Both share the sameGUNICORN_WORKERSenv var name for overrides, so a developer who setsGUNICORN_WORKERS=0expecting auto-detection will get different worker counts in the two environments. A brief inline comment explaining the intentional divergence would help avoid confusion.📝 Suggested comment to clarify the intentional difference
# Configurable number of Gunicorn workers (default: auto-detect based on CPU cores) +# Uses (cores * 2 + 1) in production for I/O-bound workloads; the local script uses +# plain nproc to keep development lighter. WORKERS=${GUNICORN_WORKERS:-0}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@compose/production/django/start` around lines 9 - 18, Add a concise inline comment above the WORKERS calculation explaining the intentional divergence in auto-detection: note that the production script computes WORKERS using the Gunicorn recommended formula (CPU_CORES * 2 + 1) while the local script uses nproc, and that both respect the same GUNICORN_WORKERS env var (with 0 triggering auto-detect), so developers understand why setting GUNICORN_WORKERS=0 yields different counts between environments; reference the WORKERS, GUNICORN_WORKERS, CPU_CORES and nproc symbols in the comment.ami/jobs/views.py (1)
228-228: Add amax_valuecap to thebatchparameter to bound memory usage.Without an upper bound, a client can request an arbitrarily large batch (e.g.,
batch=1000000). Whiletimeout=0.1limits how long the NATS fetch blocks,psub.fetch(ntasks, ...)still allocates a response buffer sized to the return count before the timeout fires. A reasonable cap (e.g., 1000, matchingmax_ack_pending) would prevent accidental or malicious oversized requests.♻️ Proposed fix
- batch = IntegerField(required=True, min_value=1).clean(request.query_params.get("batch")) + batch = IntegerField(required=True, min_value=1, max_value=1000).clean(request.query_params.get("batch"))🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@ami/jobs/views.py` at line 228, Add an upper bound to the batch IntegerField to prevent unbounded memory usage: when constructing the batch validator (the IntegerField(...) call that assigns to batch reading request.query_params.get("batch")), include a max_value (suggested 1000 to match max_ack_pending) so the cleaned batch value is capped; this ensures requests passed to psub.fetch(ntasks, ...) cannot request arbitrarily large response buffers.compose/local/django/start (1)
9-31: Auto-detection silently opts out of--reloadon multi-core developer machines.When
GUNICORN_WORKERSis unset,WORKERS=$(nproc)resolves to the machine's core count. On any machine with more than 1 core theelsebranch fires — "production mode" without auto-reload — which may surprise developers who expect--reloadto be the default for local work. Explicitly settingGUNICORN_WORKERS=1works around it, but that's a non-obvious requirement. Consider defaulting local auto-detection to 1 worker (reload-enabled) and only going multi-worker whenGUNICORN_WORKERSis explicitly set to a value > 1.♻️ Proposed adjustment to default local behaviour
# Configurable number of Gunicorn workers (default: auto-detect based on CPU cores) WORKERS=${GUNICORN_WORKERS:-0} if [ "$WORKERS" -eq 0 ]; then - WORKERS=$(nproc) + # Default to single worker with reload for local development. + # Set GUNICORN_WORKERS > 1 explicitly for multi-worker performance testing. + WORKERS=1 fi🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@compose/local/django/start` around lines 9 - 31, The script currently auto-sets WORKERS=$(nproc) when GUNICORN_WORKERS is unset, which disables --reload on multi-core dev machines; change the logic to default to a single reload-enabled worker unless the user explicitly sets GUNICORN_WORKERS > 1. Concretely, replace the unconditional nproc assignment with a check of whether GUNICORN_WORKERS is set/non-empty: if GUNICORN_WORKERS is set use its numeric value (and keep multi-worker behavior only when >1), otherwise set WORKERS=1 so the dev default uses the single-worker --reload branch; ensure you still honor an explicitly provided GUNICORN_WORKERS (and fall back/validate numeric input if needed).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@ami/ml/orchestration/nats_queue.py`:
- Around line 44-48: The docstring in nats_queue.py incorrectly states the
default max_ack_pending is 500; update the docstring to match the actual default
used in __init__ (getattr(settings, "NATS_MAX_ACK_PENDING", 1000)) or change the
default in __init__ to 500 so they match; refer to the __init__ method and the
getattr call for NATS_MAX_ACK_PENDING / max_ack_pending to locate and correct
the discrepancy.
- Around line 54-56: The current logger.info in TaskQueueManager logs
self.nats_url which may contain embedded credentials; add a small helper (e.g.,
_redact_url) that uses urllib.parse.urlparse/urlunparse to remove or replace the
authority (user:password@) portion (keep hostname:port) and call logger.info
with the redacted value instead of self.nats_url; update the TaskQueueManager
init logging line to use this helper so credentials are never written to logs.
---
Outside diff comments:
In `@ami/ml/orchestration/nats_queue.py`:
- Around line 168-179: In reserve_task, add an explicit type annotation to the
ntasks parameter (ntasks: int = 1) and update the docstring Returns section to
indicate it returns a list[PipelineProcessingTask] or None (e.g., "Returns:
list[PipelineProcessingTask] | None — list of PipelineProcessingTask objects
with reply_subject set for acknowledgment, or None if no tasks available");
reference the reserve_task function and the PipelineProcessingTask type so the
change is applied in that function's signature and its docstring.
- Around line 116-133: In _ensure_consumer, after calling
self.js.consumer_info(stream_name, consumer_name) check if
info.config.max_ack_pending != self.max_ack_pending and if so call
self.js.add_consumer(...) with an updated config (use
info.config.evolve(max_ack_pending=self.max_ack_pending) or construct a new
ConsumerConfig preserving existing fields and setting max_ack_pending) and log
the update (e.g., "Updated consumer {consumer_name} max_ack_pending to
{self.max_ack_pending}"); otherwise keep the existing debug log; leave the
existing create path (ConsumerConfig(...)) in the except block unchanged.
---
Nitpick comments:
In `@ami/jobs/views.py`:
- Line 228: Add an upper bound to the batch IntegerField to prevent unbounded
memory usage: when constructing the batch validator (the IntegerField(...) call
that assigns to batch reading request.query_params.get("batch")), include a
max_value (suggested 1000 to match max_ack_pending) so the cleaned batch value
is capped; this ensures requests passed to psub.fetch(ntasks, ...) cannot
request arbitrarily large response buffers.
In `@ami/ml/orchestration/nats_queue.py`:
- Around line 51-53: The __init__ currently uses "max_ack_pending =
max_ack_pending or getattr(..., 1000)" which treats falsy integers (like 0) as
unset; in the __init__ of the NATS queue class replace that "or" pattern with an
explicit None check so the default is used only when the caller passed None
(e.g., self.max_ack_pending = max_ack_pending if max_ack_pending is not None
else getattr(settings, "NATS_MAX_ACK_PENDING", 1000)); keep the nats_url
assignment as-is.
- Around line 199-217: Update the outdated comment and move the `return tasks`
out of the `try` block into an `else` so the happy path is separated from
exceptions (this addresses Ruff TRY300); specifically, change the comment above
`psub.fetch(ntasks, timeout=timeout)` to indicate it fetches up to `ntasks`
messages (not a single message), and after processing messages into
`PipelineProcessingTask` instances (keeping `task.reply_subject = msg.reply` and
appending to `tasks`), place the `return tasks` in the `else` clause of the
`try/except/finally` construct so that `psub.unsubscribe()` in the `finally`
still always runs while successful returns occur only when no exception was
raised.
In `@compose/local/django/start`:
- Around line 9-31: The script currently auto-sets WORKERS=$(nproc) when
GUNICORN_WORKERS is unset, which disables --reload on multi-core dev machines;
change the logic to default to a single reload-enabled worker unless the user
explicitly sets GUNICORN_WORKERS > 1. Concretely, replace the unconditional
nproc assignment with a check of whether GUNICORN_WORKERS is set/non-empty: if
GUNICORN_WORKERS is set use its numeric value (and keep multi-worker behavior
only when >1), otherwise set WORKERS=1 so the dev default uses the single-worker
--reload branch; ensure you still honor an explicitly provided GUNICORN_WORKERS
(and fall back/validate numeric input if needed).
In `@compose/production/django/start`:
- Around line 9-18: Add a concise inline comment above the WORKERS calculation
explaining the intentional divergence in auto-detection: note that the
production script computes WORKERS using the Gunicorn recommended formula
(CPU_CORES * 2 + 1) while the local script uses nproc, and that both respect the
same GUNICORN_WORKERS env var (with 0 triggering auto-detect), so developers
understand why setting GUNICORN_WORKERS=0 yields different counts between
environments; reference the WORKERS, GUNICORN_WORKERS, CPU_CORES and nproc
symbols in the comment.
Reconciliation plan with PR #1135PR #1135 ( Merge #1135 first — it addresses the reliability issues found during PSv2 integration testing (JetStream timeouts, Keep from this PR (rebase onto #1135):
Adapt to #1135's API:
Not needed after #1135 merges:
Happy to help with the rebase if useful. |
|
Update after investigating the production server. TL;DR: Production is already running 4 gunicorn workers! and gunicorn has a built-in env var for this that was set. production has:
The auto-detect logic ( Claude says: "the formula is designed for synchronous WSGI workers. For async ASGI workers (UvicornWorker), each process handles many concurrent connections via the event loop, so 33 workers on a 16-core machine would be a lot. Something like Maybe we set to WEB_CONCURRENCY to 8? I'll leave a comment on #1145 with a plan for merging the two approaches. |
Rebase onto main after RolnickLab#1135 merge. Keep only the additions unique to this branch: - Make TASK_TTR configurable via NATS_TASK_TTR Django setting (default 30s) - Make max_ack_pending configurable via NATS_MAX_ACK_PENDING setting (default 100) - Local dev: switch to gunicorn+UvicornWorker by default for production parity, with USE_UVICORN=1 escape hatch for raw uvicorn - Production: auto-detect WEB_CONCURRENCY from CPU cores (capped at 8) when not explicitly set in the environment Co-Authored-By: Claude <noreply@anthropic.com>
bcd97f5 to
7e4db08
Compare
There was a problem hiding this comment.
Actionable comments posted: 4
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
ami/ml/orchestration/tests/test_nats_queue.py (1)
91-106:⚠️ Potential issue | 🟡 Minor
test_reserve_task_no_messagestests the wrong code path — real NATS "no messages" raisesTimeoutError, not an empty list.In the nats.py client,
fetchraisesnats.errors.TimeoutError(convertingasyncio.TimeoutError/CancelledError) when no messages arrive within the timeout — it never returns[]. The current mock (return_value=[]) exercises theif msgs:dead-code branch that returns[], while the real empty-queue path goes throughexcept nats.errors.TimeoutErrorand returnsNone.This means:
- The
Nonereturn case that callers guard against withtasks or []is never tested.- The PR reviewer's note to "mock
nats.errors.TimeoutErrorfor no-messages cases" was not addressed.Additionally,
len(tasks or [])at line 105 is redundant sinceassertIsNotNoneon line 104 already ensurestasksis notNonein this scenario.🧪 Proposed fix
async def test_reserve_task_no_messages(self): """Test reserve_task when no messages are available.""" nc, js = self._create_mock_nats_connection() mock_psub = MagicMock() - mock_psub.fetch = AsyncMock(return_value=[]) + mock_psub.fetch = AsyncMock(side_effect=nats.errors.TimeoutError) mock_psub.unsubscribe = AsyncMock() js.pull_subscribe = AsyncMock(return_value=mock_psub) with patch("ami.ml.orchestration.nats_queue.get_connection", AsyncMock(return_value=(nc, js))): async with TaskQueueManager() as manager: tasks = await manager.reserve_task(123) - self.assertIsNotNone(tasks) - self.assertEqual(len(tasks or []), 0) + # TimeoutError path currently returns None; callers use `tasks or []` + self.assertIsNone(tasks) mock_psub.unsubscribe.assert_called_once()Note: if
reserve_taskis updated to return[]instead ofNoneonTimeoutError(see separate comment onnats_queue.py), update the assertion toassertIsNotNone+assertEqual(len(tasks), 0).🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@ami/ml/orchestration/tests/test_nats_queue.py` around lines 91 - 106, The test test_reserve_task_no_messages currently mocks mock_psub.fetch to return [] which doesn't match real NATS behavior; change the mock to raise nats.errors.TimeoutError instead (e.g., mock_psub.fetch = AsyncMock(side_effect=nats.errors.TimeoutError())) so reserve_task's TimeoutError except branch is exercised; ensure you still patch get_connection to return (nc, js), keep assertions aligned with reserve_task's contract (if reserve_task returns None on timeout assertIsNone or keep tasks or [] handling removed and assertEqual(len(tasks), 0) only if reserve_task is updated to return []), and verify mock_psub.unsubscribe is still asserted (mock_psub.unsubscribe.assert_called_once()).ami/ml/orchestration/nats_queue.py (1)
216-228:⚠️ Potential issue | 🟡 Minor
TimeoutErrorreturnsNonewhile an empty fetch returns[]— inconsistent "no tasks" signal.There are two distinct "no tasks available" return values:
- An empty
msgsfromfetch→return tasks(i.e.,[])- A
nats.errors.TimeoutErrorfromfetch→return NoneSince
TimeoutErroris the actual "no messages" path in NATS JetStream (confirmed by the nats.py client source), returningNonehere is the normal case. Callers (ami/jobs/views.py) already guard withtasks or [], but having two different "empty" representations is error-prone and was flagged in the PR review ("returning a list (never None)"). Additionally, Ruff flags thereturn tasksat line 216 as TRY300 — it should move to anelseblock.♻️ Proposed fix — return `[]` consistently and move `return` to `else`
try: # Fetch messages (up to ntasks) msgs = await psub.fetch(ntasks, timeout=timeout) if msgs: for msg in msgs: task_data = json.loads(msg.data.decode()) metadata = msg.metadata # Parse the task data into PipelineProcessingTask task = PipelineProcessingTask(**task_data) # Set the reply_subject for acknowledgment task.reply_subject = msg.reply logger.debug( f"Reserved task from stream for job '{job_id}', sequence {metadata.sequence.stream}" ) tasks.append(task) - return tasks except nats.errors.TimeoutError as e: # No messages available logger.debug(f"No tasks available in stream for job '{job_id}': {e}") - return None + return [] + else: + return tasks finally: # Always unsubscribe await psub.unsubscribe()The return type annotation can also be tightened to
-> list[PipelineProcessingTask]onceNoneis removed.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@ami/ml/orchestration/nats_queue.py` around lines 216 - 228, In the function containing the JetStream fetch/unsubscribe logic (the reserve task path that logs "Failed to reserve task from stream for job '{job_id}'"), make the "no tasks" return value consistent by returning an empty list ([]) for the nats.errors.TimeoutError path instead of None, and move the existing "return tasks" out of the inner try into an else block (so the return happens only when no exception was raised) to satisfy Ruff's TRY300; after this change you can tighten the function's return annotation to list[PipelineProcessingTask] if desired.
🧹 Nitpick comments (1)
ami/ml/orchestration/nats_queue.py (1)
32-32:TASK_TTRis still hardcoded — make it configurable via a Django setting.The PR reviewer mihow explicitly requested making
TASK_TTRconfigurable viaNATS_TASK_TTR. At 30 seconds, tasks that take longer will be re-delivered without operators being able to tune it without a code change.♻️ Proposed fix
-TASK_TTR = 30 # Default Time-To-Run (visibility timeout) in seconds +TASK_TTR = getattr(settings, "NATS_TASK_TTR", 30) # Time-To-Run (visibility timeout) in seconds🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@ami/ml/orchestration/nats_queue.py` at line 32, TASK_TTR is hardcoded to 30s; make it configurable via a Django setting named NATS_TASK_TTR by reading from django.conf.settings (e.g., TASK_TTR = getattr(settings, "NATS_TASK_TTR", 30)) in ami/ml/orchestration/nats_queue.py so operators can override the visibility timeout; ensure you coerce/validate the value as an int (fallback to 30 on invalid/missing values) and keep the TASK_TTR symbol used elsewhere unchanged.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@ami/ml/orchestration/nats_queue.py`:
- Around line 52-55: The constructor for TaskQueueManager (__init__) incorrectly
uses "or" which treats an explicit 0 for max_ack_pending as falsy; change the
assignment to explicitly check for None (e.g., if max_ack_pending is not None:
use it; else fall back to getattr(settings, "NATS_MAX_ACK_PENDING", 1000)) so
that 0 is respected as a valid value for self.max_ack_pending while preserving
the settings default behavior.
- Around line 167-178: The docstring for reserve_task is stale: add
documentation for the ntasks parameter, fix the Returns to reflect the actual
signature (list[PipelineProcessingTask] | None), and clarify when None is
returned (e.g., on reservation timeout/TimeoutError) vs when an empty list may
be returned; update the Args block to include ntasks: int = 1 and confirm
timeout: float | None default, and update the Returns description to state it
returns a list of PipelineProcessingTask (with reply_subject set for ack) or
None if the reservation timed out.
In `@compose/local/django/start`:
- Around line 24-30: Replace deprecated uvicorn.workers.UvicornWorker usage with
the new package and class name: install and use the uvicorn-worker package and
change all occurrences of the worker flag from "uvicorn.workers.UvicornWorker"
to "uvicorn_worker.UvicornWorker" (update the three occurrences in the start
script where exec gunicorn ... -k uvicorn.workers.UvicornWorker appears and the
debug line similarly) so the Gunicorn -k option points to
uvicorn_worker.UvicornWorker and remains compatible with Uvicorn 0.30.0+.
In `@compose/production/django/start`:
- Around line 11-14: The calculation for WEB_CONCURRENCY incorrectly only caps
the upper bound and can yield 0; update the logic around CPU_CORES (value from
nproc) so WEB_CONCURRENCY is at least 1 and at most 8 (e.g., compute max(1,
min(CPU_CORES, 8)) or add a guard that sets WEB_CONCURRENCY to 1 if CPU_CORES is
less than 1) — adjust the block that uses nproc/CPU_CORES and the exported
WEB_CONCURRENCY variable to enforce the "(minimum 1, maximum 8)" requirement.
---
Outside diff comments:
In `@ami/ml/orchestration/nats_queue.py`:
- Around line 216-228: In the function containing the JetStream
fetch/unsubscribe logic (the reserve task path that logs "Failed to reserve task
from stream for job '{job_id}'"), make the "no tasks" return value consistent by
returning an empty list ([]) for the nats.errors.TimeoutError path instead of
None, and move the existing "return tasks" out of the inner try into an else
block (so the return happens only when no exception was raised) to satisfy
Ruff's TRY300; after this change you can tighten the function's return
annotation to list[PipelineProcessingTask] if desired.
In `@ami/ml/orchestration/tests/test_nats_queue.py`:
- Around line 91-106: The test test_reserve_task_no_messages currently mocks
mock_psub.fetch to return [] which doesn't match real NATS behavior; change the
mock to raise nats.errors.TimeoutError instead (e.g., mock_psub.fetch =
AsyncMock(side_effect=nats.errors.TimeoutError())) so reserve_task's
TimeoutError except branch is exercised; ensure you still patch get_connection
to return (nc, js), keep assertions aligned with reserve_task's contract (if
reserve_task returns None on timeout assertIsNone or keep tasks or [] handling
removed and assertEqual(len(tasks), 0) only if reserve_task is updated to return
[]), and verify mock_psub.unsubscribe is still asserted
(mock_psub.unsubscribe.assert_called_once()).
---
Nitpick comments:
In `@ami/ml/orchestration/nats_queue.py`:
- Line 32: TASK_TTR is hardcoded to 30s; make it configurable via a Django
setting named NATS_TASK_TTR by reading from django.conf.settings (e.g., TASK_TTR
= getattr(settings, "NATS_TASK_TTR", 30)) in ami/ml/orchestration/nats_queue.py
so operators can override the visibility timeout; ensure you coerce/validate the
value as an int (fallback to 30 on invalid/missing values) and keep the TASK_TTR
symbol used elsewhere unchanged.
|
Summary of changes after rebase: What was kept (after adapting to the new changes in main from #1135):
What was dropped (superseded by #1135):
Incorporated from #1135 via rebase:
Net effect: The performance tuning ideas (lower TTR, configurable ack pending, gunicorn workers) are |
- Fix max_ack_pending falsy-zero guard (use `is not None` instead of `or`) - Update TaskQueueManager docstring with Args section - Simplify production WEB_CONCURRENCY fallback (just use nproc) Co-Authored-By: Claude <noreply@anthropic.com>
Summary
This pull request introduces performance tuning for the NATS task queue and configurable Gunicorn worker counts.
See: RolnickLab/ami-data-companion#113
Task queue management improvements:
max_ack_pendinginTaskQueueManager, allowing customization of the maximum number of unacknowledged tasks per consumer (default: 100, settable viasettings.NATS_MAX_ACK_PENDING).settings.NATS_TASK_TTR.Changes added but not visible after merging with #1135
These changes were part of the original PR but were superseded by #1135 (
fix/nats-connection-safety) during rebase:reserve_taskmethod to support batch reservation of tasks (ntasksparameter), returning a list of tasks instead of a single task. (Now handled by fix: PSv2 follow-up fixes from integration tests #1135'sreserve_tasks(count=...)API.)views.py— replaced sequential single-task fetching with batch calls. (Already implemented differently in fix: PSv2 follow-up fixes from integration tests #1135.)Testing
Test:
Before this change and with current ami worker (sync results):
Before this change and with RolnickLab/ami-data-companion#113 (async results)
Across multiple tests ~5% of tasks failed to process and had to be re-delivered by NATS 5 mins later, hence the much longer time.
TASK_TTR=300
Repeated test with lower TASK_TTR=30
After this change and with RolnickLab/ami-data-companion#113: (no NATS re-delivers)
After this change and with current ami worker
Performance Summary Table
(seconds)
154.36 (TASK_TTR=30)
Dataloader Benchmark
without changes:
with changes:
Deployment Notes
Include instructions if this PR requires specific steps for its deployment (database migrations, config changes, etc.)
Checklist
Summary by CodeRabbit
NATS_MAX_ACK_PENDING).NATS_TASK_TTR).