Testing: Management command for e2e test of ML jobs#1143
Testing: Management command for e2e test of ML jobs#1143carlosgjs wants to merge 8 commits intoRolnickLab:mainfrom
Conversation
👷 Deploy request for antenna-ssec pending review.Visit the deploys page to approve it
|
✅ Deploy Preview for antenna-preview canceled.
|
📝 WalkthroughWalkthroughThis PR introduces a new Django management command that orchestrates end-to-end testing of ML jobs. The command accepts a SourceImageCollection name, Pipeline slug, and dispatch mode, then creates a job, enqueues it, and monitors its progress in real-time with live status updates until completion or user interruption. Changes
Sequence DiagramsequenceDiagram
actor User
participant Command as test_ml_job_e2e Command
participant DB as Database
participant JobCreator as Job Creation
participant Dispatcher as Job Queue/Dispatcher
participant Monitor as Status Monitor
participant Output as Progress Display
User->>Command: Execute with collection & pipeline args
Command->>DB: Fetch SourceImageCollection
alt Collection not found
Command->>Output: Error message
end
Command->>DB: Fetch Pipeline
alt Pipeline not found
Command->>Output: Error message
end
Command->>JobCreator: Create Job with dispatch mode
JobCreator->>DB: Save Job
Command->>Dispatcher: Enqueue job
Command->>Monitor: Start monitoring loop
loop Every 2 seconds or until final state
Monitor->>DB: Poll job status
Monitor->>Output: Display progress (elapsed time, status, stage progress)
alt Final state reached
Monitor->>Monitor: Exit loop
end
end
Command->>Output: Show final results (runtime, stages, job ID)
Command->>User: Return control
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 3
🧹 Nitpick comments (2)
ami/jobs/management/commands/test_ml_job_e2e.py (2)
40-44: Samefrom Nonechaining needed here.Per Ruff B904, chain the re-raised
CommandErrorto suppress the noisy inner traceback.Proposed fix
except Pipeline.DoesNotExist: - raise CommandError(f"Pipeline '{pipeline_slug}' not found") + raise CommandError(f"Pipeline '{pipeline_slug}' not found") from None🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@ami/jobs/management/commands/test_ml_job_e2e.py` around lines 40 - 44, In the try/except that looks up Pipeline via Pipeline.objects.get(slug=pipeline_slug), update the exception re-raise to suppress the inner traceback by chaining to None: replace the current "raise CommandError(f\"Pipeline '{pipeline_slug}' not found\")" with raising CommandError from None (i.e., raise CommandError(...) from None) so the caught Pipeline.DoesNotExist does not emit a noisy inner traceback when CommandError is thrown.
71-85: Consider adding an optional timeout to prevent indefinite monitoring.For an E2E test utility, an unbounded polling loop is a minor concern (Ctrl+C works), but a
--timeoutargument would make this more robust for CI/scripted usage.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@ami/jobs/management/commands/test_ml_job_e2e.py` around lines 71 - 85, Add an optional timeout to the E2E job monitor so it cannot poll forever: update the _monitor_job method to accept a timeout (e.g., timeout=None) and while looping check if elapsed >= timeout (when timeout is set) and abort/raise a clear TimeoutError or mark the job as failed and call _show_final_results; wire this timeout into the management command by adding a --timeout CLI option in the command class (parse as seconds, default None) and pass it into _monitor_job when invoked so CI or scripts can limit wait time.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@ami/jobs/management/commands/test_ml_job_e2e.py`:
- Around line 123-125: The loop that writes stage parameters skips falsy but
valid values because it tests "if param.value:"; change that check to explicitly
test for None so values like 0, False or empty string are preserved — update the
conditional in the loop iterating over stage.params (the block that calls
self.stdout.write(f" {param.name}: {param.value}")) to use "if param.value is
not None:" so only absent values are skipped.
- Around line 109-114: The three self.stdout.write calls that log job outcomes
(the branches checking JobState.SUCCESS, JobState.FAILURE, and the else branch
using job.status) use f-strings with no interpolation; remove the extraneous
leading "f" from those string literals in the block where job.status is
evaluated so they become normal strings (e.g., change f"✅ Job completed
successfully" etc.); update the three occurrences in the function/command where
these self.stdout.write(...) calls appear (test_ml_job_e2e.py) to silence Ruff
F541.
- Around line 32-37: The current use of
SourceImageCollection.objects.get(name=collection_name) can raise
MultipleObjectsReturned; modify the lookup to handle duplicates by either
catching SourceImageCollection.MultipleObjectsReturned (or
MultipleObjectsReturned from django.core.exceptions) and raising a CommandError
chaining the original exception (raise CommandError(...) from e) with a clear
message, or replace .get(...) with
SourceImageCollection.objects.filter(name=collection_name).first() and then
raise CommandError if result is None or if you detect multiple matches and want
to surface that as an error; update the block around
SourceImageCollection.objects.get, the except path, and ensure you chain the
original exception when re-raising CommandError.
---
Nitpick comments:
In `@ami/jobs/management/commands/test_ml_job_e2e.py`:
- Around line 40-44: In the try/except that looks up Pipeline via
Pipeline.objects.get(slug=pipeline_slug), update the exception re-raise to
suppress the inner traceback by chaining to None: replace the current "raise
CommandError(f\"Pipeline '{pipeline_slug}' not found\")" with raising
CommandError from None (i.e., raise CommandError(...) from None) so the caught
Pipeline.DoesNotExist does not emit a noisy inner traceback when CommandError is
thrown.
- Around line 71-85: Add an optional timeout to the E2E job monitor so it cannot
poll forever: update the _monitor_job method to accept a timeout (e.g.,
timeout=None) and while looping check if elapsed >= timeout (when timeout is
set) and abort/raise a clear TimeoutError or mark the job as failed and call
_show_final_results; wire this timeout into the management command by adding a
--timeout CLI option in the command class (parse as seconds, default None) and
pass it into _monitor_job when invoked so CI or scripts can limit wait time.
| try: | ||
| collection = SourceImageCollection.objects.get(name=collection_name) | ||
| self.stdout.write(self.style.SUCCESS(f"✓ Found collection: {collection.name}")) | ||
| self.stdout.write(f" Project: {collection.project.name}") | ||
| except SourceImageCollection.DoesNotExist: | ||
| raise CommandError(f"SourceImageCollection '{collection_name}' not found") |
There was a problem hiding this comment.
SourceImageCollection.objects.get(name=...) may raise MultipleObjectsReturned.
The name field on SourceImageCollection is not unique. If multiple collections share the same name, this will throw an unhandled MultipleObjectsReturned exception with a raw traceback. Either catch MultipleObjectsReturned or use .filter(...).first() with an appropriate message.
Also, per static analysis (B904), chain the original exception for better diagnostics.
Proposed fix
try:
collection = SourceImageCollection.objects.get(name=collection_name)
self.stdout.write(self.style.SUCCESS(f"✓ Found collection: {collection.name}"))
self.stdout.write(f" Project: {collection.project.name}")
except SourceImageCollection.DoesNotExist:
- raise CommandError(f"SourceImageCollection '{collection_name}' not found")
+ raise CommandError(f"SourceImageCollection '{collection_name}' not found") from None
+ except SourceImageCollection.MultipleObjectsReturned:
+ raise CommandError(
+ f"Multiple SourceImageCollections found with name '{collection_name}'. Use a unique name."
+ ) from None🧰 Tools
🪛 Ruff (0.15.1)
[warning] 37-37: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling
(B904)
[warning] 37-37: Avoid specifying long messages outside the exception class
(TRY003)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@ami/jobs/management/commands/test_ml_job_e2e.py` around lines 32 - 37, The
current use of SourceImageCollection.objects.get(name=collection_name) can raise
MultipleObjectsReturned; modify the lookup to handle duplicates by either
catching SourceImageCollection.MultipleObjectsReturned (or
MultipleObjectsReturned from django.core.exceptions) and raising a CommandError
chaining the original exception (raise CommandError(...) from e) with a clear
message, or replace .get(...) with
SourceImageCollection.objects.filter(name=collection_name).first() and then
raise CommandError if result is None or if you detect multiple matches and want
to surface that as an error; update the block around
SourceImageCollection.objects.get, the except path, and ensure you chain the
original exception when re-raising CommandError.
| if job.status == JobState.SUCCESS: | ||
| self.stdout.write(self.style.SUCCESS(f"✅ Job completed successfully")) | ||
| elif job.status == JobState.FAILURE: | ||
| self.stdout.write(self.style.ERROR(f"❌ Job failed")) | ||
| else: | ||
| self.stdout.write(self.style.WARNING(f"⚠ Job ended with status: {job.status}")) |
There was a problem hiding this comment.
Remove extraneous f prefix from string literals without placeholders.
Lines 110, 112, and 114 use f-strings but contain no interpolation. Ruff F541 flags this.
Proposed fix
if job.status == JobState.SUCCESS:
- self.stdout.write(self.style.SUCCESS(f"✅ Job completed successfully"))
+ self.stdout.write(self.style.SUCCESS("✅ Job completed successfully"))
elif job.status == JobState.FAILURE:
- self.stdout.write(self.style.ERROR(f"❌ Job failed"))
+ self.stdout.write(self.style.ERROR("❌ Job failed"))
else:
- self.stdout.write(self.style.WARNING(f"⚠ Job ended with status: {job.status}"))
+ self.stdout.write(self.style.WARNING(f"⚠ Job ended with status: {job.status}"))📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| if job.status == JobState.SUCCESS: | |
| self.stdout.write(self.style.SUCCESS(f"✅ Job completed successfully")) | |
| elif job.status == JobState.FAILURE: | |
| self.stdout.write(self.style.ERROR(f"❌ Job failed")) | |
| else: | |
| self.stdout.write(self.style.WARNING(f"⚠ Job ended with status: {job.status}")) | |
| if job.status == JobState.SUCCESS: | |
| self.stdout.write(self.style.SUCCESS("✅ Job completed successfully")) | |
| elif job.status == JobState.FAILURE: | |
| self.stdout.write(self.style.ERROR("❌ Job failed")) | |
| else: | |
| self.stdout.write(self.style.WARNING(f"⚠ Job ended with status: {job.status}")) |
🧰 Tools
🪛 Ruff (0.15.1)
[error] 110-110: f-string without any placeholders
Remove extraneous f prefix
(F541)
[error] 112-112: f-string without any placeholders
Remove extraneous f prefix
(F541)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@ami/jobs/management/commands/test_ml_job_e2e.py` around lines 109 - 114, The
three self.stdout.write calls that log job outcomes (the branches checking
JobState.SUCCESS, JobState.FAILURE, and the else branch using job.status) use
f-strings with no interpolation; remove the extraneous leading "f" from those
string literals in the block where job.status is evaluated so they become normal
strings (e.g., change f"✅ Job completed successfully" etc.); update the three
occurrences in the function/command where these self.stdout.write(...) calls
appear (test_ml_job_e2e.py) to silence Ruff F541.
| for param in stage.params: | ||
| if param.value: | ||
| self.stdout.write(f" {param.name}: {param.value}") |
There was a problem hiding this comment.
if param.value: silently hides falsy values like 0, False, or "".
If a stage parameter can legitimately hold 0 or another falsy value, it will be skipped. Use if param.value is not None: to only skip truly absent values.
Proposed fix
for param in stage.params:
- if param.value:
+ if param.value is not None:
self.stdout.write(f" {param.name}: {param.value}")📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| for param in stage.params: | |
| if param.value: | |
| self.stdout.write(f" {param.name}: {param.value}") | |
| for param in stage.params: | |
| if param.value is not None: | |
| self.stdout.write(f" {param.name}: {param.value}") |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@ami/jobs/management/commands/test_ml_job_e2e.py` around lines 123 - 125, The
loop that writes stage parameters skips falsy but valid values because it tests
"if param.value:"; change that check to explicitly test for None so values like
0, False or empty string are preserved — update the conditional in the loop
iterating over stage.params (the block that calls self.stdout.write(f"
{param.name}: {param.value}")) to use "if param.value is not None:" so only
absent values are skipped.
There was a problem hiding this comment.
Pull request overview
This pull request adds a new Django management command test_ml_job_e2e for end-to-end testing of ML job processing. The command creates ML jobs with specified collections and pipelines, monitors their progress in real-time, and displays final results in the console. This is a useful debugging and testing tool for developers working with the ML job pipeline.
Changes:
- Added
test_ml_job_e2e.pymanagement command with support for creating and monitoring ML jobs through different dispatch modes (internal, sync_api, async_api) - Implements real-time progress monitoring with console updates every 2 seconds showing job status and per-stage progress
- Provides final results summary including stage completion status and parameter values
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if key not in last_progress or abs(last_progress[key] - progress_pct) > 0.1: | ||
| last_progress[key] = progress_pct |
There was a problem hiding this comment.
The last_progress dictionary is updated here when progress changes, but this tracking is not used to control whether the progress line is displayed. The display happens unconditionally on line 102 regardless of whether progress changed. Consider either removing this unused tracking logic or using it to avoid redundant screen refreshes when nothing has changed, similar to how process_single_image.py handles this on lines 105-112.
| for stage in job.progress.stages: | ||
| self.stdout.write(f" {stage.name}: {stage.progress*100:.1f}% ({stage.status})") | ||
| for param in stage.params: | ||
| if param.value: |
There was a problem hiding this comment.
The condition if param.value: will skip displaying parameters with falsy values including 0, empty strings, False, and None. This could hide meaningful zero values like "Failed: 0" or "Processed: 0". Consider using if param.value is not None and param.value != "" to only skip unset parameters while still displaying zeros and other meaningful falsy values.
| if param.value: | |
| if param.value is not None and param.value != "": |
This pull request introduces a new management command for end-to-end testing of ML jobs in the Django project. The command allows users to create and monitor ML jobs using specified collections and pipelines, providing real-time progress updates and final results in the console.
New ML job testing command:
test_ml_job_e2e.pymanagement command for end-to-end testing of ML job processing, including job creation, progress monitoring, and final results display.SourceImageCollection,Pipeline, andJobDispatchModevia command-line arguments, with informative error handling for missing resources.Summary by CodeRabbit
Release Notes