Skip to content

Commit f8f27e8

Browse files
author
Andrei Bratu
committed
Colors on print messages, hanging threads, allow no type in file, cancel run on program abort
1 parent 93f1a6d commit f8f27e8

File tree

2 files changed

+152
-72
lines changed

2 files changed

+152
-72
lines changed

src/humanloop/eval_utils/run.py

Lines changed: 149 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -13,17 +13,27 @@
1313
import inspect
1414
import json
1515
import logging
16+
import signal
1617
import sys
1718
import threading
1819
import time
1920
import types
2021
import typing
21-
from concurrent.futures import ThreadPoolExecutor
22+
from concurrent.futures import ThreadPoolExecutor, as_completed
2223
from datetime import datetime
2324
from functools import partial
2425
from logging import INFO
25-
from typing import Callable, Dict, List, Literal, Optional, Sequence, Tuple, TypeVar, Union
26-
import warnings
26+
from typing import (
27+
Callable,
28+
Dict,
29+
List,
30+
Literal,
31+
Optional,
32+
Sequence,
33+
Tuple,
34+
TypeVar,
35+
Union,
36+
)
2737

2838
from humanloop import EvaluatorResponse, FlowResponse, PromptResponse, ToolResponse
2939
from humanloop.core.api_error import ApiError
@@ -40,7 +50,10 @@
4050
# We use TypedDicts for requests, which is consistent with the rest of the SDK
4151
from humanloop.evaluators.client import EvaluatorsClient
4252
from humanloop.flows.client import FlowsClient
43-
from humanloop.otel.constants import HUMANLOOP_INTERCEPTED_HL_CALL_RESPONSE, HUMANLOOP_INTERCEPTED_HL_CALL_SPAN_NAME
53+
from humanloop.otel.constants import (
54+
HUMANLOOP_INTERCEPTED_HL_CALL_RESPONSE,
55+
HUMANLOOP_INTERCEPTED_HL_CALL_SPAN_NAME,
56+
)
4457
from humanloop.otel.helpers import write_to_opentelemetry_span
4558
from humanloop.prompts.client import PromptsClient
4659
from humanloop.requests import CodeEvaluatorRequestParams as CodeEvaluatorDict
@@ -186,7 +199,11 @@ def _overload_log(
186199
try:
187200
response = self._log(**kwargs)
188201
except Exception as e:
189-
logger.error(f"Failed to log: {e}")
202+
error_message = str(e).replace("\n", " ")
203+
if len(error_message) > 100:
204+
sys.stderr.write(f"{RED}Failed to log: {error_message[:100]}...{RESET}\n")
205+
else:
206+
sys.stderr.write(f"{RED}Failed to log: {error_message}{RESET}\n")
190207
raise e
191208

192209
# Notify the run_eval utility about one Log being created
@@ -249,26 +266,40 @@ def run_eval(
249266
function=function_,
250267
)
251268

252-
# Header of the CLI Report
253-
logger.info(f"\n{CYAN}Navigate to your Evaluation:{RESET}\n{evaluation.url}\n")
254-
logger.info(f"{CYAN}{type_.capitalize()} Version ID: {hl_file.version_id}{RESET}")
255-
logger.info(f"{CYAN}Run ID: {run.id}{RESET}")
269+
def handle_exit_signal(signum, frame):
270+
client.evaluations.update_evaluation_run(
271+
id=evaluation.id,
272+
run_id=run.id,
273+
status="cancelled",
274+
)
275+
evaluators_worker_pool.shutdown(wait=False)
276+
sys.exit(signum)
256277

257-
_PROGRESS_BAR = _SimpleProgressBar(len(hl_dataset.datapoints))
278+
signal.signal(signal.SIGINT, handle_exit_signal)
279+
signal.signal(signal.SIGTERM, handle_exit_signal)
280+
281+
# Header of the CLI Report
282+
sys.stdout.write(f"\n{CYAN}Navigate to your Evaluation:{RESET}\n{evaluation.url}\n\n")
283+
sys.stdout.write(f"{CYAN}{type_.capitalize()} Version ID: {hl_file.version_id}{RESET}\n")
284+
sys.stdout.write(f"{CYAN}Run ID: {run.id}{RESET}\n")
258285

259286
# This will apply apply the local callable to each datapoint
260287
# and log the results to Humanloop
261288

262289
# Generate locally if a file `callable` is provided
263290
if function_ is None:
264291
# TODO: trigger run when updated API is available
265-
logger.info(f"{CYAN}\nRunning '{hl_file.name}' over the Dataset '{hl_dataset.name}'{RESET}")
292+
sys.stdout.write(f"{CYAN}\nRunning '{hl_file.name}' over the Dataset '{hl_dataset.name}'{RESET}\n")
266293
else:
267294
# Running the evaluation locally
268-
logger.info(
269-
f"{CYAN}\nRunning '{hl_file.name}' over the Dataset '{hl_dataset.name}' using {workers} workers{RESET} "
295+
sys.stdout.write(
296+
f"{CYAN}\nRunning '{hl_file.name}' over the Dataset '{hl_dataset.name}' using {workers} workers...{RESET}\n\n"
270297
)
271298

299+
_PROGRESS_BAR = _SimpleProgressBar(len(hl_dataset.datapoints))
300+
301+
if function_ is not None:
302+
# Generate locally if a file `callable` is provided
272303
def _process_datapoint(dp: Datapoint):
273304
def upload_callback(log_id: str):
274305
"""Logic ran after the Log has been created."""
@@ -314,6 +345,7 @@ def upload_callback(log_id: str):
314345
start_time=start_time,
315346
end_time=datetime.now(),
316347
source_datapoint_id=dp.id,
348+
run_id=run.id,
317349
)
318350
except Exception as e:
319351
log_func(
@@ -324,31 +356,46 @@ def upload_callback(log_id: str):
324356
start_time=start_time,
325357
end_time=datetime.now(),
326358
)
327-
logger.warning(
328-
msg=f"\nYour {hl_file.type}'s `callable` failed for Datapoint: {dp.id}. \n Error: {str(e)}"
329-
)
359+
error_message = str(e).replace("\n", " ")
360+
if len(error_message) > 100:
361+
sys.stderr.write(
362+
f"\n{RED}Your {hl_file.type}'s `callable` failed for Datapoint: {dp.id}. Error: {error_message[:100]}...{RESET}\n"
363+
)
364+
else:
365+
sys.stderr.write(
366+
f"\n{RED}Your {hl_file.type}'s `callable` failed for Datapoint: {dp.id}. Error: {error_message}{RESET}\n"
367+
)
330368

331369
with ThreadPoolExecutor(max_workers=workers) as executor:
370+
futures = []
332371
for datapoint in hl_dataset.datapoints:
333-
executor.submit(_process_datapoint, datapoint)
372+
futures.append(executor.submit(_process_datapoint, datapoint))
373+
# Program hangs if any uncaught exceptions are not handled here
374+
for future in as_completed(futures):
375+
try:
376+
future.result()
377+
except Exception:
378+
pass
334379

335380
stats = _wait_for_evaluation_to_complete(
336381
client=client,
337382
evaluation=evaluation,
338383
run=run,
339384
)
340-
logger.info(f"\n{CYAN}View your Evaluation:{RESET}\n{evaluation.url}\n")
385+
sys.stderr.write(f"\n{CYAN}View your Evaluation:{RESET}\n{evaluation.url}\n")
341386

342387
# Print Evaluation results
343-
logger.info(stats.report)
388+
sys.stderr.write(stats.report)
344389

345-
return _get_checks(
390+
checks = _get_checks(
346391
client=client,
347392
evaluation=evaluation,
348393
stats=stats,
349394
evaluators=evaluators,
350395
run=run,
351396
)
397+
evaluators_worker_pool.shutdown(wait=False)
398+
return checks
352399

353400

354401
class _SimpleProgressBar:
@@ -366,6 +413,9 @@ def __init__(self, total: int):
366413
def increment(self):
367414
"""Increment the progress bar by one finished task."""
368415
with self._lock:
416+
# NOTE: There is a deadlock here that needs further investigation
417+
if self._progress == self._total:
418+
return
369419
self._progress += 1
370420
if self._start_time is None:
371421
self._start_time = time.time()
@@ -391,9 +441,6 @@ def increment(self):
391441
sys.stderr.write("\033[K") # Clear the line from the cursor to the end
392442
sys.stderr.write(progress_display)
393443

394-
if self._progress >= self._total:
395-
sys.stderr.write("\n")
396-
397444

398445
@dataclass
399446
class _LocalEvaluator:
@@ -413,15 +460,23 @@ def _wait_for_evaluation_to_complete(
413460
):
414461
# Wait for the Evaluation to complete then print the results
415462
complete = False
463+
464+
wrote_explainer = False
465+
416466
while not complete:
417467
stats = client.evaluations.get_stats(id=evaluation.id)
418-
logger.info(f"\r{stats.progress}")
419468
run_stats = next(
420469
(run_stats for run_stats in stats.run_stats if run_stats.run_id == run.id),
421470
None,
422471
)
423472
complete = run_stats is not None and run_stats.status == "completed"
424473
if not complete:
474+
if not wrote_explainer:
475+
sys.stderr.write("\n\nWaiting for Evaluators on Humanloop runtime...\n")
476+
wrote_explainer = True
477+
sys.stderr.write(stats.progress)
478+
# Move the cursor up in stderr a number of lines equal to the number of lines in stats.progress
479+
sys.stderr.write("\033[A" * (stats.progress.count("\n")))
425480
time.sleep(5)
426481
return stats
427482

@@ -508,13 +563,14 @@ def _get_file_type(file: File) -> FileType:
508563
# Determine the `type` of the `file` to Evaluate - if not `type` provided, default to `flow`
509564
try:
510565
type_ = typing.cast(FileType, file.pop("type"))
511-
logger.info(
512-
f"{CYAN}Evaluating your {type_} function corresponding to `{file.get('path') or file.get('id')}` on Humanloop{RESET} \n\n"
566+
sys.stdout.write(
567+
f"{CYAN}Evaluating your {type_} function corresponding to `{file.get('path') or file.get('id')}` on Humanloop{RESET}\n\n"
513568
)
514569
return type_ or "flow"
515570
except KeyError as _:
516571
type_ = "flow"
517-
logger.warning("No `file` type specified, defaulting to flow.")
572+
sys.stdout.write(f"{CYAN}No `file` type specified, defaulting to flow.{RESET}\n")
573+
return type_
518574

519575

520576
def _get_file_callable(file: File, type_: FileType) -> Optional[Callable]:
@@ -524,7 +580,9 @@ def _get_file_callable(file: File, type_: FileType) -> Optional[Callable]:
524580
if type_ == "flow":
525581
raise ValueError("You must provide a `callable` for your Flow `file` to run a local eval.")
526582
else:
527-
logger.info(f"No `callable` provided for your {type_} file - will attempt to generate logs on Humanloop.")
583+
sys.stdout.write(
584+
f"No `callable` provided for your {type_} file - will attempt to generate logs on Humanloop.\n"
585+
)
528586
return function_
529587

530588

@@ -548,7 +606,7 @@ def _upsert_file(
548606
try:
549607
Prompt.model_validate(version)
550608
except ValidationError as error_:
551-
logger.error(msg="Invalid Prompt `version` in your `file` request. \n\nValidation error: \n)")
609+
sys.stdout.write(f"Invalid Prompt `version` in your `file` request. \n\nValidation error: \n{error_}")
552610
raise error_
553611
try:
554612
hl_file = client.prompts.upsert(**file_dict)
@@ -559,7 +617,7 @@ def _upsert_file(
559617
try:
560618
Tool.model_validate(version)
561619
except ValidationError as error_:
562-
logger.error(msg="Invalid Tool `version` in your `file` request. \n\nValidation error: \n)")
620+
sys.stdout.write(f"Invalid Tool `version` in your `file` request. \n\nValidation error: \n{error_}")
563621
raise error_
564622
hl_file = client.tools.upsert(**file_dict)
565623

@@ -783,12 +841,12 @@ def _check_evaluation_threshold(
783841
evaluator_stat = evaluator_stats_by_path[evaluator_path]
784842
score = _get_score_from_evaluator_stat(stat=evaluator_stat)
785843
if score >= threshold:
786-
logger.info(
844+
sys.stderr.write(
787845
f"{GREEN}✅ Latest eval [{score}] above threshold [{threshold}] for evaluator {evaluator_path}.{RESET}"
788846
)
789847
return True
790848
else:
791-
logger.info(
849+
sys.stderr.write(
792850
f"{RED}❌ Latest score [{score}] below the threshold [{threshold}] for evaluator {evaluator_path}.{RESET}"
793851
)
794852
return False
@@ -817,7 +875,7 @@ def _check_evaluation_improvement(
817875
evaluation=evaluation,
818876
)
819877
if len(stats.run_stats) == 1:
820-
logger.info(f"{YELLOW}⚠️ No previous versions to compare with.{RESET}")
878+
sys.stderr.write(f"{YELLOW}⚠️ No previous versions to compare with.{RESET}\n")
821879
return True, 0, 0
822880

823881
previous_evaluator_stats_by_path = _get_evaluator_stats_by_path(
@@ -833,10 +891,10 @@ def _check_evaluation_improvement(
833891
raise ValueError(f"Could not find score for Evaluator {evaluator_path}.")
834892
diff = round(latest_score - previous_score, 2)
835893
if diff >= 0:
836-
logger.info(f"{CYAN}Change of [{diff}] for Evaluator {evaluator_path}{RESET}")
894+
sys.stderr.write(f"{CYAN}Change of [{diff}] for Evaluator {evaluator_path}{RESET}\n")
837895
return True, latest_score, diff
838896
else:
839-
logger.info(f"{CYAN}Change of [{diff}] for Evaluator {evaluator_path}{RESET}")
897+
sys.stderr.write(f"{CYAN}Change of [{diff}] for Evaluator {evaluator_path}{RESET}\n")
840898
return False, latest_score, diff
841899
else:
842900
raise ValueError(f"Evaluator {evaluator_path} not found in the stats.")
@@ -852,45 +910,67 @@ def _run_local_evaluators(
852910
):
853911
"""Run local Evaluators on the Log and send the judgments to Humanloop."""
854912
# Need to get the full log to pass to the evaluators
855-
log = client.logs.get(id=log_id)
856-
if not isinstance(log, dict):
857-
log_dict = log.dict()
858-
else:
859-
log_dict = log
860-
# Wait for the Flow trace to complete before running evaluators
861-
while file_type == "flow" and log_dict["trace_status"] != "complete":
913+
try:
862914
log = client.logs.get(id=log_id)
863915
if not isinstance(log, dict):
864916
log_dict = log.dict()
865917
else:
866918
log_dict = log
867-
datapoint_dict = datapoint.dict() if datapoint else None
868-
for local_evaluator, eval_function in local_evaluators:
869-
start_time = datetime.now()
870-
try:
871-
if local_evaluator.spec.arguments_type == "target_required":
872-
judgement = eval_function(
873-
log_dict,
874-
datapoint_dict,
875-
)
919+
# Wait for the Flow trace to complete before running evaluators
920+
while file_type == "flow" and log_dict["trace_status"] != "complete":
921+
log = client.logs.get(id=log_id)
922+
if not isinstance(log, dict):
923+
log_dict = log.dict()
876924
else:
877-
judgement = eval_function(log_dict)
878-
879-
_ = client.evaluators.log(
880-
version_id=local_evaluator.version_id,
881-
parent_id=log_id,
882-
judgment=judgement,
883-
id=local_evaluator.id,
884-
start_time=start_time,
885-
end_time=datetime.now(),
925+
log_dict = log
926+
datapoint_dict = datapoint.dict() if datapoint else None
927+
928+
for local_evaluator, eval_function in local_evaluators:
929+
start_time = datetime.now()
930+
try:
931+
if local_evaluator.spec.arguments_type == "target_required":
932+
judgement = eval_function(
933+
log_dict,
934+
datapoint_dict,
935+
)
936+
else:
937+
judgement = eval_function(log_dict)
938+
939+
_ = client.evaluators.log(
940+
version_id=local_evaluator.version_id,
941+
parent_id=log_id,
942+
judgment=judgement,
943+
id=local_evaluator.id,
944+
start_time=start_time,
945+
end_time=datetime.now(),
946+
)
947+
except Exception as e:
948+
_ = client.evaluators.log(
949+
parent_id=log_id,
950+
id=local_evaluator.id,
951+
error=str(e),
952+
start_time=start_time,
953+
end_time=datetime.now(),
954+
)
955+
error_message = str(e).replace("\n", " ")
956+
if len(error_message) > 100:
957+
sys.stderr.write(
958+
f"{RED}Evaluator {local_evaluator.path} failed with error {error_message[:100]}...{RESET}\n"
959+
)
960+
else:
961+
sys.stderr.write(
962+
f"{RED}Evaluator {local_evaluator.path} failed with error {error_message}{RESET}\n"
963+
)
964+
except Exception as e:
965+
error_message = str(e).replace("\n", " ")
966+
if len(error_message) > 100:
967+
sys.stderr.write(
968+
f"{RED}Failed to run local Evaluators for source datapoint {datapoint.dict()['id'] if datapoint else None}: {error_message[:100]}...{RESET}\n"
886969
)
887-
except Exception as e:
888-
_ = client.evaluators.log(
889-
parent_id=log_id,
890-
id=local_evaluator.id,
891-
error=str(e),
892-
start_time=start_time,
893-
end_time=datetime.now(),
970+
else:
971+
sys.stderr.write(
972+
f"{RED}Failed to run local Evaluators for source datapoint {datapoint.dict()['id'] if datapoint else None}: {error_message}{RESET}\n"
894973
)
895-
logger.warning(f"\nEvaluator {local_evaluator.path} failed with error {str(e)}")
896-
progress_bar.increment()
974+
pass
975+
finally:
976+
progress_bar.increment()

0 commit comments

Comments
 (0)