1919import time
2020import types
2121import typing
22- from concurrent .futures import ThreadPoolExecutor
22+ from concurrent .futures import ThreadPoolExecutor , as_completed
2323from datetime import datetime
2424from functools import partial
2525from logging import INFO
@@ -204,7 +204,10 @@ def _overload_log(
204204 response = self ._log (** kwargs )
205205 except Exception as e :
206206 error_message = str (e ).replace ("\n " , " " )
207- sys .stderr .write (f"{ RED } Failed to log: { error_message [:100 ]} ...{ RESET } \n " )
207+ if len (error_message ) > 100 :
208+ sys .stderr .write (f"{ RED } Failed to log: { error_message [:100 ]} ...{ RESET } \n " )
209+ else :
210+ sys .stderr .write (f"{ RED } Failed to log: { error_message } { RESET } \n " )
208211 raise e
209212
210213 # Notify the run_eval utility about one Log being created
@@ -362,13 +365,25 @@ def upload_callback(log_id: str):
362365 end_time = datetime .now (),
363366 )
364367 error_message = str (e ).replace ("\n " , " " )
365- sys .stderr .write (
366- f"\n { RED } Your { hl_file .type } 's `callable` failed for Datapoint: { dp .id } . Error: { error_message [:100 ]} ...{ RESET } \n "
367- )
368+ if len (error_message ) > 100 :
369+ sys .stderr .write (
370+ f"\n { RED } Your { hl_file .type } 's `callable` failed for Datapoint: { dp .id } . Error: { error_message [:100 ]} ...{ RESET } \n "
371+ )
372+ else :
373+ sys .stderr .write (
374+ f"\n { RED } Your { hl_file .type } 's `callable` failed for Datapoint: { dp .id } . Error: { error_message } { RESET } \n "
375+ )
368376
369377 with ThreadPoolExecutor (max_workers = workers ) as executor :
378+ futures = []
370379 for datapoint in hl_dataset .datapoints :
371- executor .submit (_process_datapoint , datapoint )
380+ futures .append (executor .submit (_process_datapoint , datapoint ))
381+ # Program hangs if any uncaught exceptions are not handled here
382+ for future in as_completed (futures ):
383+ try :
384+ future .result ()
385+ except Exception :
386+ pass
372387
373388 stats = _wait_for_evaluation_to_complete (
374389 client = client ,
@@ -901,13 +916,8 @@ def _run_local_evaluators(
901916 progress_bar : _SimpleProgressBar ,
902917):
903918 """Run local Evaluators on the Log and send the judgments to Humanloop."""
904- # If there are no local evaluators, we don't need to do the log lookup.
905- if len (local_evaluators ) == 0 :
906- progress_bar .increment ()
907- return
908-
919+ # Need to get the full log to pass to the evaluators
909920 try :
910- # Need to get the full log to pass to the evaluators
911921 log = client .logs .get (id = log_id )
912922 if not isinstance (log , dict ):
913923 log_dict = log .dict ()
@@ -921,6 +931,7 @@ def _run_local_evaluators(
921931 log_dict = log .dict ()
922932 else :
923933 log_dict = log
934+ time .sleep (2 )
924935 datapoint_dict = datapoint .dict () if datapoint else None
925936
926937 for local_evaluator_tuple in local_evaluators :
@@ -953,14 +964,24 @@ def _run_local_evaluators(
953964 end_time = datetime .now (),
954965 )
955966 error_message = str (e ).replace ("\n " , " " )
956- sys .stderr .write (
957- f"{ RED } Evaluator { local_evaluator .path } failed with error { error_message [:100 ]} ...{ RESET } \n "
958- )
967+ if len (error_message ) > 100 :
968+ sys .stderr .write (
969+ f"{ RED } Evaluator { local_evaluator .path } failed with error { error_message [:100 ]} ...{ RESET } \n "
970+ )
971+ else :
972+ sys .stderr .write (
973+ f"{ RED } Evaluator { local_evaluator .path } failed with error { error_message } { RESET } \n "
974+ )
959975 except Exception as e :
960976 error_message = str (e ).replace ("\n " , " " )
961- sys .stderr .write (
962- f"{ RED } Failed to run local Evaluators for source datapoint { datapoint .dict ()['id' ] if datapoint else None } : { error_message [:100 ]} ...{ RESET } \n "
963- )
977+ if len (error_message ) > 100 :
978+ sys .stderr .write (
979+ f"{ RED } Failed to run local Evaluators for source datapoint { datapoint .dict ()['id' ] if datapoint else None } : { error_message [:100 ]} ...{ RESET } \n "
980+ )
981+ else :
982+ sys .stderr .write (
983+ f"{ RED } Failed to run local Evaluators for source datapoint { datapoint .dict ()['id' ] if datapoint else None } : { error_message } { RESET } \n "
984+ )
964985 pass
965986 finally :
966987 progress_bar .increment ()
0 commit comments