From 3a0fe0f66ed82900d7e22ab7ff82c360b725f523 Mon Sep 17 00:00:00 2001 From: didayolo Date: Sat, 11 Oct 2025 15:28:27 +0200 Subject: [PATCH 1/4] Update compute worker to FAILED when needed --- compute_worker/compute_worker.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/compute_worker/compute_worker.py b/compute_worker/compute_worker.py index a592604f0..311add58b 100644 --- a/compute_worker/compute_worker.py +++ b/compute_worker/compute_worker.py @@ -1129,12 +1129,16 @@ def start(self): self._run_program_directory(ingestion_program_dir, kind="ingestion"), self.watch_detailed_results(), loop=loop, + return_exceptions=True, ) + task_results = [] # will store results/exceptions from gather signal.signal(signal.SIGALRM, alarm_handler) signal.alarm(self.execution_time_limit) try: loop.run_until_complete(gathered_tasks) + # keep what gather returned so we can detect async errors later + task_results = list(gathered_tasks.result() or []) except ExecutionTimeLimitExceeded: error_message = f"Execution Time Limit exceeded. Limit was {self.execution_time_limit} seconds" logger.error(error_message) @@ -1211,6 +1215,18 @@ def start(self): logger.info("Program finished") signal.alarm(0) + # Failure "gate" BEFORE changing status + # An async task error? + had_async_exc = any(isinstance(r, BaseException) for r in task_results) + # Non-zero exit from either container counts as failure for this phase + program_rc = getattr(self, "program_exit_code", None) + ingestion_rc = getattr(self, "ingestion_program_exit_code", None) + failed_rc = any(rc not in (0, None) for rc in (program_rc, ingestion_rc)) + if had_async_exc or failed_rc: + self._update_status(STATUS_FAILED, extra_information=f"program_rc={program_rc}, ingestion_rc={ingestion_rc}") + # Raise so upstream marks failed immediately + raise SubmissionException("Child task failed or non-zero return code") + if self.is_scoring: self._update_status(STATUS_FINISHED) else: From 72b0b1c0f9973b808e7cc8da8522c96da322ad24 Mon Sep 17 00:00:00 2001 From: didayolo Date: Fri, 6 Feb 2026 11:54:41 +0100 Subject: [PATCH 2/4] Catch only real errors, improve formatting of Exception --- compute_worker/compute_worker.py | 28 +++++++++++++++------------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/compute_worker/compute_worker.py b/compute_worker/compute_worker.py index 311add58b..ae708d12c 100644 --- a/compute_worker/compute_worker.py +++ b/compute_worker/compute_worker.py @@ -449,7 +449,7 @@ async def send_detailed_results(self, file_path): ) ) except Exception as e: - logger.error("This error might result in a Execution Time Exceeded error" + e) + logger.error(f"This error might result in a Execution Time Exceeded error: {e}") if os.environ.get("LOG_LEVEL", "info").lower() == "debug": logger.exception(e) @@ -668,8 +668,7 @@ async def _run_container_engine_cmd(self, container, kind): ) except Exception as e: logger.error( - "There was an error trying to connect to the websocket on the codabench instance" - + e + f"There was an error trying to connect to the websocket on the codabench instance: {e}" ) if os.environ.get("LOG_LEVEL", "info").lower() == "debug": logger.exception(e) @@ -718,8 +717,7 @@ async def _run_container_engine_cmd(self, container, kind): logger.error(e) except Exception as e: logger.error( - "There was an error while starting the container and getting the logs" - + e + f"There was an error while starting the container and getting the logs: {e}" ) if os.environ.get("LOG_LEVEL", "info").lower() == "debug": logger.exception(e) @@ -1136,9 +1134,9 @@ def start(self): signal.signal(signal.SIGALRM, alarm_handler) signal.alarm(self.execution_time_limit) try: - loop.run_until_complete(gathered_tasks) + # run tasks # keep what gather returned so we can detect async errors later - task_results = list(gathered_tasks.result() or []) + task_results = loop.run_until_complete(gathered_tasks) or [] except ExecutionTimeLimitExceeded: error_message = f"Execution Time Limit exceeded. Limit was {self.execution_time_limit} seconds" logger.error(error_message) @@ -1163,7 +1161,7 @@ def start(self): logger.error(e) except Exception as e: logger.error( - "There was a problem killing " + str(containers_to_kill) + e + f"There was a problem killing {containers_to_kill}: {e}" ) if os.environ.get("LOG_LEVEL", "info").lower() == "debug": logger.exception(e) @@ -1193,7 +1191,7 @@ def start(self): logger.error(e) except Exception as e: logger.error( - "There was a problem killing " + str(containers_to_kill) + e + f"There was a problem killing {containers_to_kill}: {e}" ) if os.environ.get("LOG_LEVEL", "info").lower() == "debug": logger.exception(e) @@ -1216,14 +1214,18 @@ def start(self): signal.alarm(0) # Failure "gate" BEFORE changing status - # An async task error? - had_async_exc = any(isinstance(r, BaseException) for r in task_results) - # Non-zero exit from either container counts as failure for this phase + def is_real_async_failure(r): + # gather returns either normal values or exception instances when return_exceptions=True + return isinstance(r, BaseException) and not isinstance(r, asyncio.CancelledError) + had_async_exc = any(is_real_async_failure(r) for r in task_results) program_rc = getattr(self, "program_exit_code", None) ingestion_rc = getattr(self, "ingestion_program_exit_code", None) failed_rc = any(rc not in (0, None) for rc in (program_rc, ingestion_rc)) if had_async_exc or failed_rc: - self._update_status(STATUS_FAILED, extra_information=f"program_rc={program_rc}, ingestion_rc={ingestion_rc}") + self._update_status( + STATUS_FAILED, + extra_information=f"program_rc={program_rc}, ingestion_rc={ingestion_rc}, async={task_results}", + ) # Raise so upstream marks failed immediately raise SubmissionException("Child task failed or non-zero return code") From 464b0aa79b11b296e9d5659301b402b460b4590c Mon Sep 17 00:00:00 2001 From: didayolo Date: Fri, 6 Feb 2026 12:26:27 +0100 Subject: [PATCH 3/4] Make status code integers, add logs --- compute_worker/compute_worker.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/compute_worker/compute_worker.py b/compute_worker/compute_worker.py index ae708d12c..28eaa0f8d 100644 --- a/compute_worker/compute_worker.py +++ b/compute_worker/compute_worker.py @@ -746,7 +746,7 @@ async def _run_container_engine_cmd(self, container, kind): Exception, ) as e: logger.error(e) - return_Code = {"StatusCode": e} + return_Code = {"StatusCode": 1} self.logs[kind] = { "returncode": return_Code["StatusCode"], @@ -1177,7 +1177,12 @@ def start(self): elapsed_time = logs["end"] - logs["start"] else: elapsed_time = self.execution_time_limit - return_code = logs["returncode"] + # Normalize the return_code + return_code = ( + logs["returncode"] + if logs["returncode"] is None or isinstance(logs["returncode"], int) + else 1 + ) if return_code is None: logger.warning("No return code from Process. Killing it") if kind == "ingestion": @@ -1229,6 +1234,10 @@ def is_real_async_failure(r): # Raise so upstream marks failed immediately raise SubmissionException("Child task failed or non-zero return code") + logger.info( + "PROGRAM STATUS: is_scoring=%s program_rc=%r ingestion_rc=%r task_results=%r", + self.is_scoring, program_rc, ingestion_rc, task_results + ) if self.is_scoring: self._update_status(STATUS_FINISHED) else: From 2c49a31896555ecce1577d51329209f021be1e7f Mon Sep 17 00:00:00 2001 From: didayolo Date: Fri, 6 Feb 2026 13:41:55 +0100 Subject: [PATCH 4/4] For scoring program only --- compute_worker/compute_worker.py | 30 ++++++++++-------------------- 1 file changed, 10 insertions(+), 20 deletions(-) diff --git a/compute_worker/compute_worker.py b/compute_worker/compute_worker.py index 28eaa0f8d..14a1b3c59 100644 --- a/compute_worker/compute_worker.py +++ b/compute_worker/compute_worker.py @@ -1218,27 +1218,17 @@ def start(self): logger.info("Program finished") signal.alarm(0) - # Failure "gate" BEFORE changing status - def is_real_async_failure(r): - # gather returns either normal values or exception instances when return_exceptions=True - return isinstance(r, BaseException) and not isinstance(r, asyncio.CancelledError) - had_async_exc = any(is_real_async_failure(r) for r in task_results) - program_rc = getattr(self, "program_exit_code", None) - ingestion_rc = getattr(self, "ingestion_program_exit_code", None) - failed_rc = any(rc not in (0, None) for rc in (program_rc, ingestion_rc)) - if had_async_exc or failed_rc: - self._update_status( - STATUS_FAILED, - extra_information=f"program_rc={program_rc}, ingestion_rc={ingestion_rc}, async={task_results}", - ) - # Raise so upstream marks failed immediately - raise SubmissionException("Child task failed or non-zero return code") - - logger.info( - "PROGRAM STATUS: is_scoring=%s program_rc=%r ingestion_rc=%r task_results=%r", - self.is_scoring, program_rc, ingestion_rc, task_results - ) if self.is_scoring: + # Check if scoring program failed + program_results, _, _ = task_results + # Gather returns either normal values or exception instances when return_exceptions=True + had_async_exc = isinstance(program_results, BaseException) and not isinstance(program_results, asyncio.CancelledError) + program_rc = getattr(self, "program_exit_code", None) + failed_rc = program_rc not in (0, None) + if had_async_exc or failed_rc: + self._update_status(STATUS_FAILED, extra_information=f"program_rc={program_rc}, async={task_results}") + # Raise so upstream marks failed immediately + raise SubmissionException("Child task failed or non-zero return code") self._update_status(STATUS_FINISHED) else: self._update_status(STATUS_SCORING)