Skip to content

Commit ce0ec6e

Browse files
benedikt-voelkelBenedikt Volkel
andauthored
[WFRunner] Handle resource limits and CPU better (#1532)
* Account for relative CPU factor in case of sampling Studies have shown that being able to backfill tasks can have a difference for CPU efficiency, especially for transport. That is in particular important for high-efficiency jobs such as high-interaction-rate pp simulations. * Abort by default if estimated resources exceed limits * Run anyway, if --optimistic-resources is passed * Fix: Actually reset the overestimated resources to the limits as otherwise the runner would silently quit when nothing else can be done. * In case of dynamically sampled resources and if a corresponding task has been run already: Reset the assigned resources to the limits if they exceed the boundaries. Co-authored-by: Benedikt Volkel <benedikt.volkel@cern.ch>
1 parent 8de3655 commit ce0ec6e

File tree

1 file changed

+52
-20
lines changed

1 file changed

+52
-20
lines changed

MC/bin/o2_dpg_workflow_runner.py

Lines changed: 52 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -535,13 +535,16 @@ class TaskResources:
535535
"""
536536
Container holding resources of a single task
537537
"""
538-
def __init__(self, tid, name, cpu, mem, resource_boundaries):
538+
def __init__(self, tid, name, cpu, cpu_relative, mem, resource_boundaries):
539539
# the task ID belonging to these resources
540540
self.tid = tid
541541
self.name = name
542542
# original CPUs/MEM assigned (persistent)
543543
self.cpu_assigned_original = cpu
544544
self.mem_assigned_original = mem
545+
# relative CPU, to be multiplied with sampled CPU; set by the user, e.g. to allow to backfill tasks
546+
# only takes effect when sampling resources; persistent
547+
self.cpu_relative = cpu_relative if cpu_relative else 1
545548
# CPUs/MEM assigned (transient)
546549
self.cpu_assigned = cpu
547550
self.mem_assigned = mem
@@ -571,6 +574,31 @@ def __init__(self, tid, name, cpu, mem, resource_boundaries):
571574
def is_done(self):
572575
return self.time_collect and not self.booked
573576

577+
def is_within_limits(self):
578+
"""
579+
Check if assigned resources respect limits
580+
"""
581+
cpu_within_limits = True
582+
mem_within_limits = True
583+
if self.cpu_assigned > self.resource_boundaries.cpu_limit:
584+
cpu_within_limits = False
585+
actionlogger.warning("CPU of task %s exceeds limits %d > %d", self.name, self.cpu_assigned, self.resource_boundaries.cpu_limit)
586+
if self.cpu_assigned > self.resource_boundaries.mem_limit:
587+
mem_within_limits = False
588+
actionlogger.warning("MEM of task %s exceeds limits %d > %d", self.name, self.cpu_assigned, self.resource_boundaries.cpu_limit)
589+
return cpu_within_limits and mem_within_limits
590+
591+
def limit_resources(self, cpu_limit=None, mem_limit=None):
592+
"""
593+
Limit resources of this specific task
594+
"""
595+
if not cpu_limit:
596+
cpu_limit = self.resource_boundaries.cpu_limit
597+
if not mem_limit:
598+
mem_limit = self.resource_boundaries.mem_limit
599+
self.cpu_assigned = min(self.cpu_assigned, cpu_limit)
600+
self.mem_assigned = min(self.mem_assigned, mem_limit)
601+
574602
def add(self, time_passed, cpu, mem):
575603
"""
576604
Brief interface to add resources that were measured after time_passed
@@ -610,22 +638,23 @@ def sample_resources(self):
610638
# This task ran already with the assigned resources, so let's set it to the limit
611639
if cpu_sampled > self.resource_boundaries.cpu_limit:
612640
actionlogger.warning("Sampled CPU (%.2f) exceeds assigned CPU limit (%.2f)", cpu_sampled, self.resource_boundaries.cpu_limit)
613-
cpu_sampled = self.resource_boundaries.cpu_limit
641+
elif cpu_sampled < 0:
642+
actionlogger.debug("Sampled CPU for %s is %.2f < 0, setting to previously assigned value %.2f", self.name, cpu_sampled, self.cpu_assigned)
643+
cpu_sampled = self.cpu_assigned
644+
614645
if mem_sampled > self.resource_boundaries.mem_limit:
615646
actionlogger.warning("Sampled MEM (%.2f) exceeds assigned MEM limit (%.2f)", mem_sampled, self.resource_boundaries.mem_limit)
616-
mem_sampled = self.resource_boundaries.mem_limit
617-
618-
if mem_sampled <= 0:
647+
elif mem_sampled <= 0:
619648
actionlogger.debug("Sampled memory for %s is %.2f <= 0, setting to previously assigned value %.2f", self.name, mem_sampled, self.mem_assigned)
620649
mem_sampled = self.mem_assigned
621-
if cpu_sampled < 0:
622-
actionlogger.debug("Sampled CPU for %s is %.2f < 0, setting to previously assigned value %.2f", self.name, cpu_sampled, self.cpu_assigned)
623-
cpu_sampled = self.cpu_assigned
650+
624651
for res in self.related_tasks:
625652
if res.is_done or res.booked:
626653
continue
627-
res.cpu_assigned = cpu_sampled
654+
res.cpu_assigned = cpu_sampled * res.cpu_relative
628655
res.mem_assigned = mem_sampled
656+
# This task has been run before, stay optimistic and limit the resources in case the sampled ones exceed limits
657+
res.limit_resources()
629658

630659

631660
class ResourceManager:
@@ -676,19 +705,18 @@ def __init__(self, cpu_limit, mem_limit, procs_parallel_max=100, dynamic_resourc
676705
# add 19 to get nice value of low-priority tasks
677706
self.nice_backfill = self.nice_default + 19
678707

679-
def add_task_resources(self, name, related_tasks_name, cpu, mem, semaphore_string=None):
708+
def add_task_resources(self, name, related_tasks_name, cpu, cpu_relative, mem, semaphore_string=None):
680709
"""
681710
Construct and Add a new TaskResources object
682711
"""
683-
resources = TaskResources(len(self.resources), name, cpu, mem, self.resource_boundaries)
684-
if cpu > self.resource_boundaries.cpu_limit or mem > self.resource_boundaries.mem_limit:
685-
actionlogger.warning(f"Resource estimates of id {len(self.resources)} overestimates limits, CPU limit: {self.resource_boundaries.cpu_limit}, MEM limit: {self.resource_boundaries.mem_limit}; might not run")
686-
if not self.resource_boundaries.optimistic_resources:
687-
# exit if we don't dare to try
688-
print(f"Resources of task {name} are exceeding the boundaries.\nCPU: {cpu} (estimate) vs. {self.resource_boundaries.cpu_limit} (boundary)\nMEM: {mem} (estimated) vs. {self.resource_boundaries.mem_limit} (boundary).")
689-
exit(1)
690-
# or we do dare, let's see what happens...
691-
actionlogger.info("We will try to run this task anyway with maximum available resources")
712+
resources = TaskResources(len(self.resources), name, cpu, cpu_relative, mem, self.resource_boundaries)
713+
if not resources.is_within_limits() and not self.resource_boundaries.optimistic_resources:
714+
# exit if we don't dare to try
715+
print(f"Resources of task {name} are exceeding the boundaries.\nCPU: {cpu} (estimate) vs. {self.resource_boundaries.cpu_limit} (boundary)\nMEM: {mem} (estimated) vs. {self.resource_boundaries.mem_limit} (boundary).")
716+
print("Pass --optimistic-resources to the runner to attempt the run anyway.")
717+
exit(1)
718+
# if we get here, either all is good or the user decided to be optimistic and we limit the resources, by default to the given CPU and mem limits.
719+
resources.limit_resources()
692720

693721
self.resources.append(resources)
694722
# do the following to have the same Semaphore object for all corresponding TaskResources so that we do not need a lookup
@@ -876,7 +904,11 @@ def __init__(self, workflowfile, args, jmax=100):
876904
for task in self.workflowspec['stages']:
877905
# ...and add all initial resource estimates
878906
global_task_name = self.get_global_task_name(task["name"])
879-
self.resource_manager.add_task_resources(task["name"], global_task_name, float(task["resources"]["cpu"]), float(task["resources"]["mem"]), task.get("semaphore"))
907+
try:
908+
cpu_relative = float(task["resources"]["relative_cpu"])
909+
except TypeError:
910+
cpu_relative = 1
911+
self.resource_manager.add_task_resources(task["name"], global_task_name, float(task["resources"]["cpu"]), cpu_relative, float(task["resources"]["mem"]), task.get("semaphore"))
880912

881913
self.procstatus = { tid:'ToDo' for tid in range(len(self.workflowspec['stages'])) }
882914
self.taskneeds= { t:set(self.getallrequirements(t)) for t in self.taskuniverse }

0 commit comments

Comments
 (0)