From 1eec1788272edd661f8a7522d8568dcb0fdc34f5 Mon Sep 17 00:00:00 2001 From: "hanane.chrifelasri" Date: Wed, 17 Jun 2026 13:39:46 +0200 Subject: [PATCH] Fix data integrity: ensure FINISHED status only after successful score upload The compute_worker must upload scores before marking submission as FINISHED. Previously, status was updated first, creating a race condition where the leaderboard could read FINISHED submissions without scores. Changes: - Reorder run_wrapper: call push_scores() and push_output() before _update_status(FINISHED) - Add retry logic in push_scores() with exponential backoff (3 attempts) - Increase timeout to 30s for score uploads --- compute_worker/compute_worker.py | 32 ++++++++++++++++++++++++++++---- 1 file changed, 28 insertions(+), 4 deletions(-) diff --git a/compute_worker/compute_worker.py b/compute_worker/compute_worker.py index c02abcb0f..8dd3dfa3d 100644 --- a/compute_worker/compute_worker.py +++ b/compute_worker/compute_worker.py @@ -313,9 +313,13 @@ def run_wrapper(run_args): try: run.prepare() run.start() + # Upload scores and outputs before marking as finished if run.is_scoring: run.push_scores() run.push_output() + # Mark as finished only after successful upload + if run.is_scoring: + run._update_status(SubmissionStatus.FINISHED) except DockerImagePullException as e: msg = str(e).strip() if msg: @@ -1445,7 +1449,7 @@ def start(self): ) # Raise so upstream marks failed immediately raise SubmissionException("Child task failed or non-zero return code") - self._update_status(SubmissionStatus.FINISHED) + # Status will be set to FINISHED after successful upload else: self._update_status(SubmissionStatus.SCORING) @@ -1483,9 +1487,29 @@ def push_scores(self): "scores": scores, } logger.info(f"Submitting these scores to {url}: {scores} with data = {data}") - resp = self.requests_session.post(url, json=data) - logger.info(resp) - logger.info(str(resp.content)) + + # Retry score upload with exponential backoff + max_retries = 3 + for attempt in range(max_retries): + try: + resp = self.requests_session.post(url, json=data, timeout=30) + resp.raise_for_status() + logger.info(f"Scores uploaded successfully: {resp.status_code}") + logger.info(str(resp.content)) + return + except Exception as e: + wait_time = 2 ** attempt + if attempt < max_retries - 1: + logger.warning( + f"Score upload attempt {attempt + 1}/{max_retries} failed: {e}. " + f"Retrying in {wait_time}s..." + ) + time.sleep(wait_time) + else: + logger.error(f"All {max_retries} score upload attempts failed") + raise SubmissionException( + f"Failed to upload scores after {max_retries} attempts: {e}" + ) def push_output(self): """Output is pushed at the end of both prediction and scoring steps."""