-
Notifications
You must be signed in to change notification settings - Fork 1
Backlogger: Add refill from backlogger during job assignment #20
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -163,7 +163,8 @@ def add_new_jobs(job_queue_2d, new_jobs_count, new_jobs_durations, new_jobs_node | |
|
|
||
|
|
||
| def assign_jobs_to_available_nodes(job_queue_2d, nodes, cores_available, running_jobs, | ||
| next_empty_slot, next_job_id, metrics, is_baseline=False): | ||
| next_empty_slot, next_job_id, metrics, is_baseline=False, | ||
| age_waiting_jobs=True): | ||
| """ | ||
| Assign jobs from queue to available nodes. | ||
|
|
||
|
|
@@ -250,6 +251,49 @@ def assign_jobs_to_available_nodes(job_queue_2d, nodes, cores_available, running | |
| # metrics.jobs_dropped += 1 | ||
| # metrics.episode_jobs_dropped += 1 | ||
| #else: | ||
| job_queue_2d[job_idx][1] = new_age | ||
| if age_waiting_jobs: | ||
| job_queue_2d[job_idx][1] = new_age | ||
|
|
||
| return num_processed_jobs, next_empty_slot, num_dropped, next_job_id | ||
|
|
||
|
|
||
| def assign_jobs_with_backlog_refill(job_queue_2d, nodes, cores_available, running_jobs, | ||
| next_empty_slot, next_job_id, metrics, backlog_queue, | ||
| is_baseline=False): | ||
| """ | ||
| Assign jobs, refilling the queue from backlog when it becomes empty. | ||
| Loop until no progress is made or resources/backlog are exhausted. | ||
| """ | ||
| total_launched_jobs = 0 | ||
| total_dropped_jobs = 0 | ||
| backlog_assigned = 0 | ||
| backlog_loaded_remaining = 0 | ||
|
|
||
| while True: | ||
| num_launched_jobs, next_empty_slot, num_dropped, next_job_id = assign_jobs_to_available_nodes( | ||
| job_queue_2d, nodes, cores_available, running_jobs, | ||
| next_empty_slot, next_job_id, metrics, is_baseline=is_baseline, | ||
| age_waiting_jobs=(backlog_loaded_remaining == 0) | ||
| ) | ||
| total_launched_jobs += num_launched_jobs | ||
| total_dropped_jobs += num_dropped | ||
| if backlog_loaded_remaining > 0 and num_launched_jobs > 0: | ||
| assigned_from_backlog = min(num_launched_jobs, backlog_loaded_remaining) | ||
| backlog_assigned += assigned_from_backlog | ||
| backlog_loaded_remaining -= assigned_from_backlog | ||
|
|
||
| queue_empty = np.all(job_queue_2d[:, 0] == 0) | ||
| backlog_has_jobs = len(backlog_queue) > 0 | ||
| resources_available = np.any((nodes >= 0) & (cores_available > 0)) | ||
|
|
||
| moved_from_backlog = 0 | ||
| if queue_empty and backlog_has_jobs and resources_available: | ||
| next_empty_slot, moved_from_backlog = fill_queue_from_backlog( | ||
| job_queue_2d, backlog_queue, next_empty_slot | ||
| ) | ||
| backlog_loaded_remaining += moved_from_backlog | ||
|
|
||
| if moved_from_backlog == 0: | ||
| break | ||
|
|
||
|
coderabbitai[bot] marked this conversation as resolved.
|
||
| return total_launched_jobs, next_empty_slot, total_dropped_jobs, next_job_id, backlog_assigned | ||
|
Comment on lines
+260
to
+299
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Backlog refill can violate the 1000‑job queue cap. With backlog enabled, overflow jobs can accumulate beyond the fixed queue size, which risks exceeding the 1000‑job limit (and potentially the per‑hour and max‑duration constraints) unless those limits are enforced elsewhere. Please cap combined queue+backlog size and validate new job counts/durations before enqueueing or refilling. 🤖 Prompt for AI Agents |
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add time features to the observation space.
The observation still lacks explicit time information (e.g., hour-of-episode or hour-of-day), which is required for the state representation. Please add a time feature to
observation_spaceand update it each step/reset.✅ Suggested addition (example)
'predicted_prices': spaces.Box( low=-1000, high=1000, shape=(24,), dtype=np.float32 ), + 'time': spaces.Box( + low=0, + high=EPISODE_HOURS - 1, + shape=(1,), + dtype=np.int32 + ), })🤖 Prompt for AI Agents