diff --git a/compute_worker/compute_worker.py b/compute_worker/compute_worker.py index 243dcd16d..75a3540c3 100644 --- a/compute_worker/compute_worker.py +++ b/compute_worker/compute_worker.py @@ -314,6 +314,9 @@ def run_wrapper(run_args): run.prepare() run.start() if run.is_scoring: + if run.human_in_the_loop: + run.wait_for_human_validation() + run._update_status(SubmissionStatus.FINISHED) run.push_scores() run.push_output() except DockerImagePullException as e: @@ -470,6 +473,7 @@ def __init__(self, run_args): self.prediction_result = run_args["prediction_result"] self.scoring_result = run_args.get("scoring_result") self.execution_time_limit = run_args["execution_time_limit"] + self.human_in_the_loop = run_args.get("human_in_the_loop", False) # stdout and stderr self.stdout, self.stderr, self.ingestion_stdout, self.ingestion_stderr = ( self._get_stdout_stderr_file_names(run_args) @@ -1445,11 +1449,55 @@ def start(self): ) # Raise so upstream marks failed immediately raise SubmissionException("Child task failed or non-zero return code") - self._update_status(SubmissionStatus.FINISHED) + + if not self.human_in_the_loop: + self._update_status(SubmissionStatus.FINISHED) else: self._update_status(SubmissionStatus.SCORING) + def wait_for_human_validation(self): + container_output_dir = self.output_dir + host_output_dir = self._get_host_path(self.output_dir) + + scores_path = os.path.join(host_output_dir, "scores.json") + if not os.path.exists(os.path.join(container_output_dir, "scores.json")): + scores_path = os.path.join(host_output_dir, "scores.txt") + + approved_container = os.path.join(container_output_dir, "hitl_approved") + rejected_container = os.path.join(container_output_dir, "hitl_rejected") + approved_host = os.path.join(host_output_dir, "hitl_approved") + rejected_host = os.path.join(host_output_dir, "hitl_rejected") + + logger.info("=" * 60) + logger.info(f"HUMAN IN THE LOOP — submission {self.submission_id}") + logger.info("Inspect the scores file:") + logger.info(f" cat {scores_path}") + logger.info(f"To approve : touch {approved_host}") + logger.info(f"To reject : touch {rejected_host}") + logger.info("=" * 60) + + poll_interval = 3 + max_wait = 60 * 60 * 24 + + elapsed = 0 + while elapsed < max_wait: + if os.path.exists(approved_container): # ← poll le chemin conteneur + logger.info(f"HITL: submission {self.submission_id} approved, sending scores.") + return + if os.path.exists(rejected_container): # ← poll le chemin conteneur + raise SubmissionException( + f"HITL: scores rejected by the compute node operator " + f"(submission {self.submission_id})" + ) + time.sleep(poll_interval) + elapsed += poll_interval + + raise SubmissionException( + f"HITL: 24h timeout reached without validation " + f"(submission {self.submission_id})" + ) + def push_scores(self): """This is only ran at the end of the scoring step""" # POST to some endpoint: diff --git a/src/apps/api/serializers/competitions.py b/src/apps/api/serializers/competitions.py index a0fb14fda..3a566446e 100644 --- a/src/apps/api/serializers/competitions.py +++ b/src/apps/api/serializers/competitions.py @@ -272,7 +272,8 @@ class Meta: 'contact_email', 'report', 'whitelist_emails', - 'forum_enabled' + 'forum_enabled', + 'enable_human_in_the_loop' ) def validate_phases(self, phases): @@ -417,7 +418,8 @@ class Meta: 'contact_email', 'report', 'whitelist_emails', - 'forum_enabled' + 'forum_enabled', + 'enable_human_in_the_loop' ) def get_leaderboards(self, instance): diff --git a/src/apps/competitions/models.py b/src/apps/competitions/models.py index e5facb4fc..ed4ce5722 100644 --- a/src/apps/competitions/models.py +++ b/src/apps/competitions/models.py @@ -89,6 +89,7 @@ class Competition(models.Model): # If true, forum is enabled (default=True) forum_enabled = models.BooleanField(default=True) + enable_human_in_the_loop = models.BooleanField(default=False) def __str__(self): return f"competition-{self.title}-{self.pk}-{self.competition_type}" diff --git a/src/apps/competitions/tasks.py b/src/apps/competitions/tasks.py index 71acb85c1..2e052e9ca 100644 --- a/src/apps/competitions/tasks.py +++ b/src/apps/competitions/tasks.py @@ -59,6 +59,7 @@ "contact_email", "fact_sheet", "forum_enabled", + "enable_human_in_the_loop" ] TASK_FIELDS = [ @@ -134,6 +135,7 @@ def _send_to_compute_worker(submission, is_scoring): ), "id": submission.pk, "is_scoring": is_scoring, + "human_in_the_loop": submission.phase.competition.enable_human_in_the_loop, } if ( @@ -212,6 +214,9 @@ def _send_to_compute_worker(submission, is_scoring): time_padding = 60 * 20 # 20 minutes time_limit = submission.phase.execution_time_limit + time_padding + if is_scoring and submission.phase.competition.enable_human_in_the_loop: + time_limit = 60 * 60 * 25 + if ( submission.phase.competition.queue ): # if the competition is running on a custom queue, not the default queue diff --git a/src/static/riot/competitions/editor/_competition_details.tag b/src/static/riot/competitions/editor/_competition_details.tag index 199edd404..da135fef0 100644 --- a/src/static/riot/competitions/editor/_competition_details.tag +++ b/src/static/riot/competitions/editor/_competition_details.tag @@ -211,6 +211,22 @@ + +