@@ -1129,12 +1129,16 @@ def start(self):
11291129 self ._run_program_directory (ingestion_program_dir , kind = "ingestion" ),
11301130 self .watch_detailed_results (),
11311131 loop = loop ,
1132+ return_exceptions = True ,
11321133 )
11331134
1135+ task_results = [] # will store results/exceptions from gather
11341136 signal .signal (signal .SIGALRM , alarm_handler )
11351137 signal .alarm (self .execution_time_limit )
11361138 try :
11371139 loop .run_until_complete (gathered_tasks )
1140+ # keep what gather returned so we can detect async errors later
1141+ task_results = list (gathered_tasks .result () or [])
11381142 except ExecutionTimeLimitExceeded :
11391143 error_message = f"Execution Time Limit exceeded. Limit was { self .execution_time_limit } seconds"
11401144 logger .error (error_message )
@@ -1211,6 +1215,18 @@ def start(self):
12111215 logger .info ("Program finished" )
12121216 signal .alarm (0 )
12131217
1218+ # Failure "gate" BEFORE changing status
1219+ # An async task error?
1220+ had_async_exc = any (isinstance (r , BaseException ) for r in task_results )
1221+ # Non-zero exit from either container counts as failure for this phase
1222+ program_rc = getattr (self , "program_exit_code" , None )
1223+ ingestion_rc = getattr (self , "ingestion_program_exit_code" , None )
1224+ failed_rc = any (rc not in (0 , None ) for rc in (program_rc , ingestion_rc ))
1225+ if had_async_exc or failed_rc :
1226+ self ._update_status (STATUS_FAILED , extra_information = f"program_rc={ program_rc } , ingestion_rc={ ingestion_rc } " )
1227+ # Raise so upstream marks failed immediately
1228+ raise SubmissionException ("Child task failed or non-zero return code" )
1229+
12141230 if self .is_scoring :
12151231 self ._update_status (STATUS_FINISHED )
12161232 else :
0 commit comments