-
Notifications
You must be signed in to change notification settings - Fork 104
Expand file tree
/
Copy pathlangfuse_gradio.py
More file actions
113 lines (86 loc) · 3.01 KB
/
langfuse_gradio.py
File metadata and controls
113 lines (86 loc) · 3.01 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
"""Reason-and-Act Knowledge Retrieval Agent via the OpenAI Agent SDK.
Log traces to LangFuse for observability and evaluation.
"""
import asyncio
import contextlib
import logging
import signal
import sys
import agents
import gradio as gr
from dotenv import load_dotenv
from gradio.components.chatbot import ChatMessage
from openai import AsyncOpenAI
from prompts_i import REACT_INSTRUCTIONS
from src.utils import (
AsyncWeaviateKnowledgeBase,
Configs,
get_weaviate_async_client,
oai_agent_stream_to_gradio_messages,
pretty_print,
setup_langfuse_tracer,
)
from src.utils.langfuse.shared_client import langfuse_client
load_dotenv(verbose=True)
logging.basicConfig(level=logging.INFO)
AGENT_LLM_NAME = "gemini-2.5-flash"
configs = Configs.from_env_var()
async_weaviate_client = get_weaviate_async_client(
http_host=configs.weaviate_http_host,
http_port=configs.weaviate_http_port,
http_secure=configs.weaviate_http_secure,
grpc_host=configs.weaviate_grpc_host,
grpc_port=configs.weaviate_grpc_port,
grpc_secure=configs.weaviate_grpc_secure,
api_key=configs.weaviate_api_key,
)
async_openai_client = AsyncOpenAI()
async_knowledgebase = AsyncWeaviateKnowledgeBase(
async_weaviate_client,
collection_name="enwiki_20250520",
)
async def _cleanup_clients() -> None:
"""Close async clients."""
await async_weaviate_client.close()
await async_openai_client.close()
def _handle_sigint(signum: int, frame: object) -> None:
"""Handle SIGINT signal to gracefully shutdown."""
with contextlib.suppress(Exception):
asyncio.get_event_loop().run_until_complete(_cleanup_clients())
sys.exit(0)
async def _main(question: str, gr_messages: list[ChatMessage]):
setup_langfuse_tracer()
main_agent = agents.Agent(
name="Wikipedia Agent",
instructions=REACT_INSTRUCTIONS,
tools=[agents.function_tool(async_knowledgebase.search_knowledgebase)],
model=agents.OpenAIChatCompletionsModel(
model=AGENT_LLM_NAME, openai_client=async_openai_client
),
)
with langfuse_client.start_as_current_span(name="Agents-SDK-Trace") as span:
span.update(input=question)
result_stream = agents.Runner.run_streamed(main_agent, input=question)
async for _item in result_stream.stream_events():
gr_messages += oai_agent_stream_to_gradio_messages(_item)
if len(gr_messages) > 0:
yield gr_messages
span.update(output=result_stream.final_output)
pretty_print(gr_messages)
yield gr_messages
demo = gr.ChatInterface(
_main,
title="2.1 OAI Agent SDK ReAct + LangFuse",
type="messages",
examples=[
"At which university did the SVP Software Engineering"
" at Apple (as of June 2025) earn their engineering degree?",
],
)
if __name__ == "__main__":
configs = Configs.from_env_var()
signal.signal(signal.SIGINT, _handle_sigint)
try:
demo.launch(server_name="0.0.0.0")
finally:
asyncio.run(_cleanup_clients())