Skip to content

Commit f73f77d

Browse files
committed
Limit backlog size to a reasonable value and count drops there
1 parent 7c4f648 commit f73f77d

4 files changed

Lines changed: 20 additions & 6 deletions

File tree

src/baseline.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,12 +47,15 @@ def baseline_step(baseline_state, baseline_cores_available, baseline_running_job
4747
job_queue_2d, baseline_backlog_queue, baseline_next_empty_slot
4848
)
4949

50-
_new_baseline_jobs, baseline_next_empty_slot = add_new_jobs(
50+
_new_baseline_jobs, baseline_next_empty_slot, baseline_backlog_dropped = add_new_jobs(
5151
job_queue_2d, new_jobs_count, new_jobs_durations,
5252
new_jobs_nodes, new_jobs_cores, baseline_next_empty_slot, baseline_backlog_queue
5353
)
5454
metrics.baseline_jobs_submitted += new_jobs_count
5555
metrics.episode_baseline_jobs_submitted += new_jobs_count
56+
if baseline_backlog_dropped > 0:
57+
metrics.baseline_jobs_dropped += baseline_backlog_dropped
58+
metrics.episode_baseline_jobs_dropped += baseline_backlog_dropped
5659

5760
num_launched, baseline_next_empty_slot, _, next_job_id = assign_jobs_to_available_nodes(
5861
job_queue_2d, baseline_state['nodes'], baseline_cores_available,

src/config.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
MAX_NODES = 335 # Maximum number of nodes
66
MAX_QUEUE_SIZE = 2500 # Maximum number of jobs in the queue
7+
MAX_BACKLOG_SIZE = 50000 # Maximum number of jobs in the backlog (overflow) queue
78
MAX_CHANGE = MAX_NODES
89
MAX_JOB_DURATION = 170 # maximum job runtime in hours
910
# Use a very high cap; age-based dropping is temporarily disabled in code.

src/environment.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -328,10 +328,13 @@ def step(self, action):
328328

329329
# Add new jobs to queue (overflow goes to helper)
330330
self.env_print(f"[2] Adding {new_jobs_count} new jobs to the queue...")
331-
new_jobs, self.next_empty_slot = add_new_jobs(
331+
new_jobs, self.next_empty_slot, backlog_dropped = add_new_jobs(
332332
job_queue_2d, new_jobs_count, new_jobs_durations,
333333
new_jobs_nodes, new_jobs_cores, self.next_empty_slot, self.backlog_queue
334334
)
335+
if backlog_dropped > 0:
336+
self.metrics.jobs_dropped += backlog_dropped
337+
self.metrics.episode_jobs_dropped += backlog_dropped
335338
self.metrics.jobs_submitted += new_jobs_count
336339
self.metrics.episode_jobs_submitted += new_jobs_count
337340

@@ -350,10 +353,12 @@ def step(self, action):
350353
# Assign jobs to available nodes
351354
self.env_print(f"[4] Assigning jobs to available nodes...")
352355

353-
num_launched_jobs, self.next_empty_slot, num_dropped_this_step, self.next_job_id = assign_jobs_to_available_nodes(
356+
num_dropped_this_step = backlog_dropped
357+
num_launched_jobs, self.next_empty_slot, queue_dropped, self.next_job_id = assign_jobs_to_available_nodes(
354358
job_queue_2d, self.state['nodes'], self.cores_available, self.running_jobs,
355359
self.next_empty_slot, self.next_job_id, self.metrics, is_baseline=False
356360
)
361+
num_dropped_this_step += queue_dropped
357362

358363
self.env_print(f" {num_launched_jobs} jobs launched")
359364

src/job_management.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
import numpy as np
44
from src.config import (
5-
MAX_NODES, CORES_PER_NODE
5+
MAX_NODES, CORES_PER_NODE, MAX_BACKLOG_SIZE
66
)
77

88

@@ -125,16 +125,21 @@ def add_new_jobs(job_queue_2d, new_jobs_count, new_jobs_durations, new_jobs_node
125125
new_jobs_nodes: List of nodes required per job
126126
new_jobs_cores: List of cores per node required per job
127127
next_empty_slot: Index of next empty slot in queue
128+
backlog_queue: Optional deque for overflow jobs
128129
129130
Returns:
130-
Tuple of (list of added jobs (real queue + backlog queue), updated next_empty_slot)
131+
Tuple of (list of added jobs, updated next_empty_slot, num_dropped)
131132
"""
132133
new_jobs = []
134+
num_dropped = 0
133135
for i in range(new_jobs_count):
134136
# Check if we have space in the queue
135137
if next_empty_slot >= len(job_queue_2d):
136138
if backlog_queue is None:
137139
break # Queue is full
140+
if len(backlog_queue) >= MAX_BACKLOG_SIZE:
141+
num_dropped += 1
142+
continue # Backlog full, drop incoming job
138143
job_entry = [
139144
new_jobs_durations[i],
140145
0, # Age starts at 0
@@ -159,7 +164,7 @@ def add_new_jobs(job_queue_2d, new_jobs_count, new_jobs_durations, new_jobs_node
159164
while next_empty_slot < len(job_queue_2d) and job_queue_2d[next_empty_slot][0] != 0:
160165
next_empty_slot += 1
161166

162-
return new_jobs, next_empty_slot
167+
return new_jobs, next_empty_slot, num_dropped
163168

164169

165170
def assign_jobs_to_available_nodes(job_queue_2d, nodes, cores_available, running_jobs,

0 commit comments

Comments
 (0)