Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 85 additions & 0 deletions src/uipath_langchain/agent/tools/client_side_tool.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
"""Factory for creating client-side tools that execute on the client SDK."""

import json
from typing import Annotated, Any

from langchain_core.messages import ToolMessage
from langchain_core.tools import InjectedToolCallId, StructuredTool
from uipath.agent.models.agent import AgentClientSideToolResourceConfig
from uipath.eval.mocks import mockable

from uipath_langchain._utils.durable_interrupt import durable_interrupt
from uipath_langchain.agent.react.jsonschema_pydantic_converter import (
create_model as create_model_from_schema,
)
from uipath_langchain.chat.hitl import CLIENT_SIDE_TOOL_MARKER

from .utils import sanitize_tool_name


def create_client_side_tool(
resource: AgentClientSideToolResourceConfig,
) -> StructuredTool:
"""Create a client-side tool that pauses the graph and waits for the client to execute it.

The tool uses @durable_interrupt to suspend the graph. The client SDK receives
an executingToolCall event, runs its registered handler, and sends endToolCall
back through CAS. The bridge routes that endToolCall to wait_for_resume(),
which unblocks the graph with the client's result.
"""
tool_name = sanitize_tool_name(resource.name)
input_model = create_model_from_schema(resource.input_schema)

async def client_side_tool_fn(
*, tool_call_id: Annotated[str, InjectedToolCallId], **kwargs: Any
) -> Any:
@mockable(
name=resource.name,
description=resource.description,
input_schema=input_model.model_json_schema(),
output_schema=(resource.output_schema or {}),
example_calls=getattr(resource.properties, "example_calls", None),
)
async def execute_tool() -> dict[str, Any]:
"""Execute client-side tool, pausing for client response."""

@durable_interrupt
async def wait_for_client_execution() -> dict[str, Any]:
return {
"tool_call_id": tool_call_id,
"tool_name": tool_name,
"input": kwargs,
"is_execution_phase": True,
}

result = await wait_for_client_execution()
return result.get("output", result) if isinstance(result, dict) else result

result = await execute_tool()

if isinstance(result, dict):
try:
content = json.dumps(result)
except TypeError:
content = str(result)
else:
content = str(result) if result is not None else ""

return ToolMessage(
content=content,
tool_call_id=tool_call_id,
response_metadata={CLIENT_SIDE_TOOL_MARKER: True},
)

tool = StructuredTool(
name=tool_name,
description=resource.description or f"Client-side tool: {tool_name}",
args_schema=input_model,
coroutine=client_side_tool_fn,
metadata={
CLIENT_SIDE_TOOL_MARKER: True,
"output_schema": resource.output_schema,
},
)

return tool
5 changes: 5 additions & 0 deletions src/uipath_langchain/agent/tools/tool_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from langchain_core.language_models import BaseChatModel
from langchain_core.tools import BaseTool
from uipath.agent.models.agent import (
AgentClientSideToolResourceConfig,
AgentContextResourceConfig,
AgentEscalationResourceConfig,
AgentIntegrationToolResourceConfig,
Expand All @@ -18,6 +19,7 @@

from uipath_langchain.chat.hitl import REQUIRE_CONVERSATIONAL_CONFIRMATION

from .client_side_tool import create_client_side_tool
from .context_tool import create_context_tool
from .escalation_tool import create_escalation_tool
from .extraction_tool import create_ixp_extraction_tool
Expand Down Expand Up @@ -120,4 +122,7 @@ async def _build_tool_for_resource(
elif isinstance(resource, AgentIxpVsEscalationResourceConfig):
return create_ixp_escalation_tool(resource)

elif isinstance(resource, AgentClientSideToolResourceConfig):
return create_client_side_tool(resource)

Comment on lines +125 to +127
return None
10 changes: 7 additions & 3 deletions src/uipath_langchain/agent/tools/tool_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
find_latest_ai_message,
)
from uipath_langchain.chat.hitl import (
CLIENT_SIDE_TOOL_MARKER,
REQUIRE_CONVERSATIONAL_CONFIRMATION,
request_conversational_tool_confirmation,
)
Expand Down Expand Up @@ -279,10 +280,13 @@ async def _afunc(state: AgentGraphState) -> OutputType:

tool = getattr(tool_node, "tool", None)

# Preserve tool ref so the runtime can discover which tools need confirmation
# (see runtime.py _get_tool_confirmation_info)
# Preserve tool ref so the runtime can discover tool metadata
# (confirmation requirements, client-side markers, etc.)
metadata = getattr(tool, "metadata", None) or {}
if isinstance(tool, BaseTool) and metadata.get(REQUIRE_CONVERSATIONAL_CONFIRMATION):
if isinstance(tool, BaseTool) and (
metadata.get(REQUIRE_CONVERSATIONAL_CONFIRMATION)
or metadata.get(CLIENT_SIDE_TOOL_MARKER)
):
return RunnableCallableWithTool(
func=_func, afunc=_afunc, name=tool_name, tool=tool
)
Expand Down
7 changes: 7 additions & 0 deletions src/uipath_langchain/chat/hitl.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
CANCELLED_MESSAGE = "Cancelled by user"
ARGS_MODIFIED_MESSAGE = "User has modified the tool arguments"

CLIENT_SIDE_TOOL_MARKER = "uipath_client_tool"
CONVERSATIONAL_APPROVED_TOOL_ARGS = "conversational_approved_tool_args"
REQUIRE_CONVERSATIONAL_CONFIRMATION = "require_conversational_confirmation"

Expand Down Expand Up @@ -126,12 +127,18 @@ def request_approval(
"""
tool_call_id: str = tool_args.pop("tool_call_id")

# If this is a server-side tool (not client-side), execution follows immediately
# after confirmation — mark this as the execution trigger so the bridge emits
# executingToolCall. For client-side tools, the execution interrupt sets this instead.
is_execution_trigger = not (tool.metadata or {}).get(CLIENT_SIDE_TOOL_MARKER, False)

@durable_interrupt
def ask_confirmation():
return {
"tool_call_id": tool_call_id,
"tool_name": tool.name,
"input": tool_args,
"is_execution_phase": is_execution_trigger,
}

response = ask_confirmation()
Expand Down
60 changes: 49 additions & 11 deletions src/uipath_langchain/runtime/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
UiPathConversationContentPartEndEvent,
UiPathConversationContentPartEvent,
UiPathConversationContentPartStartEvent,
UiPathConversationExecutingToolCallEvent,
UiPathConversationMessage,
UiPathConversationMessageData,
UiPathConversationMessageEndEvent,
Expand All @@ -39,6 +40,8 @@
)
from uipath.runtime import UiPathRuntimeStorageProtocol

from uipath_langchain.chat.hitl import CLIENT_SIDE_TOOL_MARKER

from ._citations import CitationStreamProcessor, extract_citations_from_text

logger = logging.getLogger(__name__)
Expand All @@ -60,6 +63,7 @@ def __init__(self, runtime_id: str, storage: UiPathRuntimeStorageProtocol | None
self.storage = storage
self.current_message: AIMessageChunk | AIMessage
self.tools_requiring_confirmation: dict[str, Any] = {}
self.client_side_tools: dict[str, Any] = {} # {tool_name: output_schema}
self.seen_message_ids: set[str] = set()
self._storage_lock = asyncio.Lock()
self._citation_stream_processor = CitationStreamProcessor()
Expand Down Expand Up @@ -436,15 +440,40 @@ async def map_current_message_to_start_tool_call_events(self):
tool_name in self.tools_requiring_confirmation
)
input_schema = self.tools_requiring_confirmation.get(tool_name)
is_client_side = tool_name in self.client_side_tools
output_schema = (
self.client_side_tools.get(tool_name)
if is_client_side
else None
)
events.append(
self.map_tool_call_to_tool_call_start_event(
self.current_message.id,
tool_call,
require_confirmation=require_confirmation or None,
input_schema=input_schema,
is_client_side_tool=is_client_side or None,
output_schema=output_schema,
)
)

# Emit executingToolCall from MessageMapper for tools without
# a durable interrupt. Tools with interrupts (client-side, HITL)
# get executingToolCall from the bridge instead.
if not require_confirmation and not is_client_side:
events.append(
UiPathConversationMessageEvent(
message_id=self.current_message.id,
tool_call=UiPathConversationToolCallEvent(
tool_call_id=tool_call["id"],
executing=UiPathConversationExecutingToolCallEvent(
tool_name=tool_call["name"],
input=tool_call["args"],
),
),
)
)

if self.storage is not None:
await self.storage.set_value(
self.runtime_id,
Expand Down Expand Up @@ -476,19 +505,24 @@ async def map_tool_message_to_events(
# Keep as string if not valid JSON
pass

events = [
UiPathConversationMessageEvent(
message_id=message_id,
tool_call=UiPathConversationToolCallEvent(
tool_call_id=message.tool_call_id,
end=UiPathConversationToolCallEndEvent(
timestamp=self.get_timestamp(),
output=content_value,
is_error=message.status == "error",
# Suppress endToolCall for client-side tools — the client already has the result (it produced it).
is_client_side = message.response_metadata.get(CLIENT_SIDE_TOOL_MARKER, False)
events: list[UiPathConversationMessageEvent] = []

if not is_client_side:
events.append(
Comment on lines +508 to +513
UiPathConversationMessageEvent(
message_id=message_id,
tool_call=UiPathConversationToolCallEvent(
tool_call_id=message.tool_call_id,
end=UiPathConversationToolCallEndEvent(
timestamp=self.get_timestamp(),
output=content_value,
is_error=message.status == "error",
),
),
),
)
)
]

if is_last_tool_call:
events.append(self.map_to_message_end_event(message_id))
Expand Down Expand Up @@ -546,6 +580,8 @@ def map_tool_call_to_tool_call_start_event(
*,
require_confirmation: bool | None = None,
input_schema: Any | None = None,
is_client_side_tool: bool | None = None,
output_schema: Any | None = None,
) -> UiPathConversationMessageEvent:
return UiPathConversationMessageEvent(
message_id=message_id,
Expand All @@ -557,6 +593,8 @@ def map_tool_call_to_tool_call_start_event(
input=tool_call["args"],
require_confirmation=require_confirmation,
input_schema=input_schema,
is_client_side_tool=is_client_side_tool,
output_schema=output_schema,
),
),
)
Expand Down
48 changes: 27 additions & 21 deletions src/uipath_langchain/runtime/runtime.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
import os
from collections.abc import Iterator
from typing import Any, AsyncGenerator
from uuid import uuid4

Expand Down Expand Up @@ -31,7 +32,7 @@
from uipath.runtime.schema import UiPathRuntimeSchema

from uipath_langchain.agent.tools.tool_node import RunnableCallableWithTool
from uipath_langchain.chat.hitl import get_confirmation_schema
from uipath_langchain.chat.hitl import CLIENT_SIDE_TOOL_MARKER, get_confirmation_schema
from uipath_langchain.runtime.errors import LangGraphErrorCode, LangGraphRuntimeError
from uipath_langchain.runtime.messages import UiPathChatMessagesMapper
from uipath_langchain.runtime.schema import get_entrypoints_schema, get_graph_schema
Expand Down Expand Up @@ -68,6 +69,7 @@ def __init__(
self.callbacks: list[BaseCallbackHandler] = callbacks or []
self.chat = UiPathChatMessagesMapper(self.runtime_id, storage)
self.chat.tools_requiring_confirmation = self._get_tool_confirmation_info()
self.chat.client_side_tools = self._get_client_side_tools()
self._middleware_node_names: set[str] = self._detect_middleware_nodes()
Comment on lines 71 to 73

async def execute(
Expand Down Expand Up @@ -490,38 +492,42 @@ def _detect_middleware_nodes(self) -> set[str]:

return middleware_nodes

def _get_tool_confirmation_info(self) -> dict[str, Any]:
"""Build {tool_name: input_schema} for tools requiring confirmation.

Walks compiled graph nodes once at runtime init. This is needed because coded agents
(create_agent) export a compiled graph as the only artifact — there's no side channel
to pass confirmation metadata from the build step to the runtime.
"""
schemas: dict[str, Any] = {}
def _iter_graph_tools(self) -> Iterator[BaseTool]:
"""Yield all BaseTool instances from compiled graph nodes."""
for node_spec in self.graph.nodes.values():
bound = getattr(node_spec, "bound", None)
if bound is None:
continue

# Coded agents: one tool per node
if isinstance(bound, RunnableCallableWithTool):
schema = get_confirmation_schema(bound.tool)
if schema is not None:
schemas[bound.tool.name] = schema
tool = getattr(bound, "tool", None)
if isinstance(tool, BaseTool):
yield tool
continue

# Low-code agents: multiple tools in one node
tools_by_name = getattr(bound, "tools_by_name", None)
if isinstance(tools_by_name, dict):
for tool in tools_by_name.values():
if not isinstance(tool, BaseTool):
continue
schema = get_confirmation_schema(tool)
if schema is not None:
schemas[tool.name] = schema
for t in tools_by_name.values():
if isinstance(t, BaseTool):
yield t

def _get_tool_confirmation_info(self) -> dict[str, Any]:
"""Build {tool_name: input_schema} for tools requiring confirmation."""
schemas: dict[str, Any] = {}
for tool in self._iter_graph_tools():
schema = get_confirmation_schema(tool)
if schema is not None:
schemas[tool.name] = schema
return schemas

def _get_client_side_tools(self) -> dict[str, Any]:
"""Build {tool_name: output_schema} for client-side tools."""
tools: dict[str, Any] = {}
for tool in self._iter_graph_tools():
metadata = getattr(tool, "metadata", None) or {}
if metadata.get(CLIENT_SIDE_TOOL_MARKER):
tools[tool.name] = metadata.get("output_schema")
return tools

def _is_middleware_node(self, node_name: str) -> bool:
"""Check if a node name represents a middleware node."""
return node_name in self._middleware_node_names
Expand Down
Loading
Loading