@@ -101,6 +101,23 @@ def upload_csv_to_s3(local_csv_path: str, dataset_id: int) -> None:
101101 raise RuntimeError (f"Failed to upload CSV to S3: { response .text } " )
102102
103103
104+ def upload_metrics_to_s3 (metrics_file_path : str , dataset_id : int ) -> None :
105+ """Upload the metrics file to S3 using S3Ferry."""
106+ destination_file_path = f"/datasets/{ dataset_id } /metrics.json"
107+ source_file_path = metrics_file_path .replace ("/app/" , "" )
108+ logger .info (f"Uploading { metrics_file_path } to S3 as { destination_file_path } " )
109+ response = s3_ferry_service .transfer_file (
110+ destination_file_path = destination_file_path ,
111+ destination_storage_type = "S3" ,
112+ source_file_path = source_file_path ,
113+ source_storage_type = "FS" ,
114+ )
115+ logger .info (f"Metrics S3 upload status: { response .status_code } " )
116+ logger .info (f"Metrics S3 upload response: { response .text } " )
117+ if response .status_code not in [200 , 201 ]:
118+ raise RuntimeError (f"Failed to upload metrics to S3: { response .text } " )
119+
120+
104121def notify_progress_uploading_to_s3 (session_id : int ) -> None :
105122 """Notify progress update: New Dataset Uploading to S3."""
106123 payload = {
@@ -239,7 +256,7 @@ def _log_cleanup_results(cleanup_summary: list) -> None:
239256
240257
241258def process_callback_background (
242- file_path : str , encoded_results : str , session_id : int
259+ file_path : str , encoded_results : str , session_id : int , metrics_file : str
243260) -> None :
244261 """Process the dataset generation callback: upload CSV to S3 and send status update."""
245262 try :
@@ -302,6 +319,13 @@ def process_callback_background(
302319
303320 notify_dataset_update (output_csv_path )
304321 upload_csv_to_s3 (output_csv_path , dataset_id )
322+
323+ try :
324+ upload_metrics_to_s3 (metrics_file , dataset_id )
325+ logger .info (f"Metrics file uploaded successfully for dataset { dataset_id } " )
326+ except Exception as e :
327+ logger .warning (f"S3 upload failure for metrics file: { e } " )
328+
305329 send_status_update (dataset_id , encoded_results )
306330
307331 logger .info ("Processing completed successfully" )
@@ -330,6 +354,9 @@ def parse_args():
330354 parser .add_argument (
331355 "--session-id" , required = True , help = "Session ID for the callback"
332356 )
357+ parser .add_argument (
358+ "--metrics-file" , required = True , help = "Metrics file path for the callback"
359+ )
333360 return parser .parse_args ()
334361
335362
@@ -339,10 +366,11 @@ def main():
339366 try :
340367 logger .info ("Starting callback processing..." )
341368 logger .info (f"File path: { args .file_path } " )
369+ logger .info (f"Metrics file: { args .metrics_file } " )
342370 logger .info (f"Encoded results length: { len (args .encoded_results )} characters" )
343371
344372 process_callback_background (
345- args .file_path , args .encoded_results , args .session_id
373+ args .file_path , args .encoded_results , args .session_id , args . metrics_file
346374 )
347375
348376 response = {
0 commit comments