Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions src/baseline.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from src.job_management import (
process_ongoing_jobs,
add_new_jobs,
assign_jobs_to_available_nodes,
assign_jobs_with_backlog_refill,
fill_queue_from_backlog,
age_backlog_queue,
)
Expand Down Expand Up @@ -54,9 +54,10 @@ def baseline_step(baseline_state, baseline_cores_available, baseline_running_job
metrics.baseline_jobs_submitted += new_jobs_count
metrics.episode_baseline_jobs_submitted += new_jobs_count

_, baseline_next_empty_slot, _, next_job_id = assign_jobs_to_available_nodes(
_, baseline_next_empty_slot, _, next_job_id, _ = assign_jobs_with_backlog_refill(
job_queue_2d, baseline_state['nodes'], baseline_cores_available,
baseline_running_jobs, baseline_next_empty_slot, next_job_id, metrics, is_baseline=True
baseline_running_jobs, baseline_next_empty_slot, next_job_id, metrics,
baseline_backlog_queue, is_baseline=True
)

num_used_nodes = np.sum(baseline_state['nodes'] > 0)
Expand Down
2 changes: 2 additions & 0 deletions src/callbacks.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ def _on_step(self) -> bool:
self.logger.record("metrics/jobs_completed", env.metrics.episode_jobs_completed)
self.logger.record("metrics/completion_rate", completion_rate)
self.logger.record("metrics/avg_wait_hours", avg_wait)
self.logger.record("metrics/on_nodes", env.metrics.episode_on_nodes[-1] if env.metrics.episode_on_nodes else 0)
self.logger.record("metrics/used_nodes", env.metrics.episode_used_nodes[-1] if env.metrics.episode_used_nodes else 0)
self.logger.record("metrics/max_queue_size", env.metrics.episode_max_queue_size_reached)
self.logger.record("metrics/max_backlog_size", env.metrics.episode_max_backlog_size_reached)
self.logger.record("metrics/jobs_dropped", env.metrics.episode_jobs_dropped)
Expand Down
27 changes: 23 additions & 4 deletions src/environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
)
from src.job_management import (
process_ongoing_jobs, add_new_jobs,
assign_jobs_to_available_nodes, fill_queue_from_backlog, age_backlog_queue
assign_jobs_with_backlog_refill, fill_queue_from_backlog, age_backlog_queue
)
from src.node_management import adjust_nodes
from src.reward_calculation import RewardCalculator
Expand Down Expand Up @@ -158,6 +158,18 @@ def __init__(self,
shape=(MAX_QUEUE_SIZE * 4,),
dtype=np.int32
),
'backlog_size': spaces.Box(
low=0,
high=np.iinfo(np.int32).max,
shape=(1,),
dtype=np.int32
),
'backlog_assigned': spaces.Box(
low=0,
high=np.iinfo(np.int32).max,
shape=(1,),
dtype=np.int32
),
Comment on lines +161 to +172
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Add time features to the observation space.

The observation still lacks explicit time information (e.g., hour-of-episode or hour-of-day), which is required for the state representation. Please add a time feature to observation_space and update it each step/reset.

✅ Suggested addition (example)
             'predicted_prices': spaces.Box(
                 low=-1000,
                 high=1000,
                 shape=(24,),
                 dtype=np.float32
             ),
+            'time': spaces.Box(
+                low=0,
+                high=EPISODE_HOURS - 1,
+                shape=(1,),
+                dtype=np.int32
+            ),
         })
Based on learnings: State space representation should include node counts, job queue status, electricity prices, and time information.
🤖 Prompt for AI Agents
In `@src/environment.py` around lines 161 - 172, Observation lacks explicit time
info: add a time feature to the observation_space dict (e.g., 'time_of_day' or
'episode_hour') alongside existing keys like 'backlog_size' and
'backlog_assigned', and ensure the environment updates this feature on every
reset() and step() (or inside the existing _get_observation/_observe method) so
the returned observation contains the current time-of-day or hour-of-episode
value; update observation_space dtype/shape appropriately (e.g., Box(low=0,
high=23, shape=(1,), dtype=np.int32) or normalized float) and set the value
during reset() initialization and each step() before returning observations.

# predicted prices for the next 24h
'predicted_prices': spaces.Box(
low=-1000,
Expand All @@ -175,6 +187,8 @@ def _reset_timeline_state(self, start_index):
'nodes': np.zeros(MAX_NODES, dtype=np.int32),
# Initialize job queue to be empty
'job_queue': np.zeros((MAX_QUEUE_SIZE * 4), dtype=np.int32),
'backlog_size': np.array([0], dtype=np.int32),
'backlog_assigned': np.array([0], dtype=np.int32),
# Initialize predicted prices array
'predicted_prices': self.prices.predicted_prices.copy(),
}
Expand Down Expand Up @@ -303,12 +317,17 @@ def step(self, action):
# Assign jobs to available nodes
self.env_print(f"[4] Assigning jobs to available nodes...")

num_launched_jobs, self.next_empty_slot, num_dropped_this_step, self.next_job_id = assign_jobs_to_available_nodes(
job_queue_2d, self.state['nodes'], self.cores_available, self.running_jobs,
self.next_empty_slot, self.next_job_id, self.metrics, is_baseline=False
num_launched_jobs, self.next_empty_slot, num_dropped_this_step, self.next_job_id, backlog_assigned = (
assign_jobs_with_backlog_refill(
job_queue_2d, self.state['nodes'], self.cores_available, self.running_jobs,
self.next_empty_slot, self.next_job_id, self.metrics, self.backlog_queue,
is_baseline=False
)
)

self.env_print(f" {num_launched_jobs} jobs launched")
self.state['backlog_size'][0] = len(self.backlog_queue)
self.state['backlog_assigned'][0] = backlog_assigned
Comment thread
coderabbitai[bot] marked this conversation as resolved.

# Calculate node utilization stats
num_used_nodes = np.sum(self.state['nodes'] > 0)
Expand Down
48 changes: 46 additions & 2 deletions src/job_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,8 @@ def add_new_jobs(job_queue_2d, new_jobs_count, new_jobs_durations, new_jobs_node


def assign_jobs_to_available_nodes(job_queue_2d, nodes, cores_available, running_jobs,
next_empty_slot, next_job_id, metrics, is_baseline=False):
next_empty_slot, next_job_id, metrics, is_baseline=False,
age_waiting_jobs=True):
"""
Assign jobs from queue to available nodes.

Expand Down Expand Up @@ -250,6 +251,49 @@ def assign_jobs_to_available_nodes(job_queue_2d, nodes, cores_available, running
# metrics.jobs_dropped += 1
# metrics.episode_jobs_dropped += 1
#else:
job_queue_2d[job_idx][1] = new_age
if age_waiting_jobs:
job_queue_2d[job_idx][1] = new_age

return num_processed_jobs, next_empty_slot, num_dropped, next_job_id


def assign_jobs_with_backlog_refill(job_queue_2d, nodes, cores_available, running_jobs,
next_empty_slot, next_job_id, metrics, backlog_queue,
is_baseline=False):
"""
Assign jobs, refilling the queue from backlog when it becomes empty.
Loop until no progress is made or resources/backlog are exhausted.
"""
total_launched_jobs = 0
total_dropped_jobs = 0
backlog_assigned = 0
backlog_loaded_remaining = 0

while True:
num_launched_jobs, next_empty_slot, num_dropped, next_job_id = assign_jobs_to_available_nodes(
job_queue_2d, nodes, cores_available, running_jobs,
next_empty_slot, next_job_id, metrics, is_baseline=is_baseline,
age_waiting_jobs=(backlog_loaded_remaining == 0)
)
total_launched_jobs += num_launched_jobs
total_dropped_jobs += num_dropped
if backlog_loaded_remaining > 0 and num_launched_jobs > 0:
assigned_from_backlog = min(num_launched_jobs, backlog_loaded_remaining)
backlog_assigned += assigned_from_backlog
backlog_loaded_remaining -= assigned_from_backlog

queue_empty = np.all(job_queue_2d[:, 0] == 0)
backlog_has_jobs = len(backlog_queue) > 0
resources_available = np.any((nodes >= 0) & (cores_available > 0))

moved_from_backlog = 0
if queue_empty and backlog_has_jobs and resources_available:
next_empty_slot, moved_from_backlog = fill_queue_from_backlog(
job_queue_2d, backlog_queue, next_empty_slot
)
backlog_loaded_remaining += moved_from_backlog

if moved_from_backlog == 0:
break

Comment thread
coderabbitai[bot] marked this conversation as resolved.
return total_launched_jobs, next_empty_slot, total_dropped_jobs, next_job_id, backlog_assigned
Comment on lines +260 to +299
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Backlog refill can violate the 1000‑job queue cap.

With backlog enabled, overflow jobs can accumulate beyond the fixed queue size, which risks exceeding the 1000‑job limit (and potentially the per‑hour and max‑duration constraints) unless those limits are enforced elsewhere. Please cap combined queue+backlog size and validate new job counts/durations before enqueueing or refilling.
As per coding guidelines: Job queue constraints must enforce maximum 1000 jobs, maximum 1500 new jobs per hour, and maximum 170h runtime per job.

🤖 Prompt for AI Agents
In `@src/job_management.py` around lines 258 - 296, The backlog refill path can
exceed the 1000-job queue cap and skip per-hour/new-job and max-duration checks;
update assign_jobs_with_backlog_refill (and the helper fill_queue_from_backlog)
to enforce limits before moving jobs from backlog: compute combined_size =
current_queue_count(job_queue_2d) + len(backlog_queue) and only move
min(moved_from_backlog, 1000 - current_queue_count) jobs, validate each job's
new_job_count <= 1500_per_hour and runtime <= 170_hours before enqueueing,
reject or drop any jobs that violate these rules (increment total_dropped_jobs
and backlog_assigned appropriately), and ensure metrics/logging are updated when
jobs are dropped or skipped.