diff --git a/compute_worker/compute_worker.py b/compute_worker/compute_worker.py index a592604f0..14a1b3c59 100644 --- a/compute_worker/compute_worker.py +++ b/compute_worker/compute_worker.py @@ -449,7 +449,7 @@ async def send_detailed_results(self, file_path): ) ) except Exception as e: - logger.error("This error might result in a Execution Time Exceeded error" + e) + logger.error(f"This error might result in a Execution Time Exceeded error: {e}") if os.environ.get("LOG_LEVEL", "info").lower() == "debug": logger.exception(e) @@ -668,8 +668,7 @@ async def _run_container_engine_cmd(self, container, kind): ) except Exception as e: logger.error( - "There was an error trying to connect to the websocket on the codabench instance" - + e + f"There was an error trying to connect to the websocket on the codabench instance: {e}" ) if os.environ.get("LOG_LEVEL", "info").lower() == "debug": logger.exception(e) @@ -718,8 +717,7 @@ async def _run_container_engine_cmd(self, container, kind): logger.error(e) except Exception as e: logger.error( - "There was an error while starting the container and getting the logs" - + e + f"There was an error while starting the container and getting the logs: {e}" ) if os.environ.get("LOG_LEVEL", "info").lower() == "debug": logger.exception(e) @@ -748,7 +746,7 @@ async def _run_container_engine_cmd(self, container, kind): Exception, ) as e: logger.error(e) - return_Code = {"StatusCode": e} + return_Code = {"StatusCode": 1} self.logs[kind] = { "returncode": return_Code["StatusCode"], @@ -1129,12 +1127,16 @@ def start(self): self._run_program_directory(ingestion_program_dir, kind="ingestion"), self.watch_detailed_results(), loop=loop, + return_exceptions=True, ) + task_results = [] # will store results/exceptions from gather signal.signal(signal.SIGALRM, alarm_handler) signal.alarm(self.execution_time_limit) try: - loop.run_until_complete(gathered_tasks) + # run tasks + # keep what gather returned so we can detect async errors later + task_results = loop.run_until_complete(gathered_tasks) or [] except ExecutionTimeLimitExceeded: error_message = f"Execution Time Limit exceeded. Limit was {self.execution_time_limit} seconds" logger.error(error_message) @@ -1159,7 +1161,7 @@ def start(self): logger.error(e) except Exception as e: logger.error( - "There was a problem killing " + str(containers_to_kill) + e + f"There was a problem killing {containers_to_kill}: {e}" ) if os.environ.get("LOG_LEVEL", "info").lower() == "debug": logger.exception(e) @@ -1175,7 +1177,12 @@ def start(self): elapsed_time = logs["end"] - logs["start"] else: elapsed_time = self.execution_time_limit - return_code = logs["returncode"] + # Normalize the return_code + return_code = ( + logs["returncode"] + if logs["returncode"] is None or isinstance(logs["returncode"], int) + else 1 + ) if return_code is None: logger.warning("No return code from Process. Killing it") if kind == "ingestion": @@ -1189,7 +1196,7 @@ def start(self): logger.error(e) except Exception as e: logger.error( - "There was a problem killing " + str(containers_to_kill) + e + f"There was a problem killing {containers_to_kill}: {e}" ) if os.environ.get("LOG_LEVEL", "info").lower() == "debug": logger.exception(e) @@ -1212,6 +1219,16 @@ def start(self): signal.alarm(0) if self.is_scoring: + # Check if scoring program failed + program_results, _, _ = task_results + # Gather returns either normal values or exception instances when return_exceptions=True + had_async_exc = isinstance(program_results, BaseException) and not isinstance(program_results, asyncio.CancelledError) + program_rc = getattr(self, "program_exit_code", None) + failed_rc = program_rc not in (0, None) + if had_async_exc or failed_rc: + self._update_status(STATUS_FAILED, extra_information=f"program_rc={program_rc}, async={task_results}") + # Raise so upstream marks failed immediately + raise SubmissionException("Child task failed or non-zero return code") self._update_status(STATUS_FINISHED) else: self._update_status(STATUS_SCORING)