Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions astrbot/core/agent/handoff.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,14 @@ def default_parameters(self) -> dict:
"type": "string",
"description": "The input to be handed off to another agent. This should be a clear and concise request or task.",
},
"background_mission": {
"type": "boolean",
"description": (
"If true, run this handoff as a background mission: "
"return immediately and notify the user when the subagent finishes. "
"Use this for long-running or non-urgent tasks. Defaults to false."
),
},
},
}

Expand Down
164 changes: 164 additions & 0 deletions astrbot/core/astr_agent_tool_exec.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@ async def execute(cls, tool, run_context, **tool_args):

"""
if isinstance(tool, HandoffTool):
is_bg = tool_args.pop("background_mission", False)
if is_bg:
async for r in cls._execute_handoff_background(
tool, run_context, **tool_args
):
yield r
return
async for r in cls._execute_handoff(tool, run_context, **tool_args):
yield r
return
Expand Down Expand Up @@ -146,6 +153,163 @@ async def _execute_handoff(
content=[mcp.types.TextContent(type="text", text=llm_resp.completion_text)]
)

@classmethod
async def _execute_handoff_background(
cls,
tool: HandoffTool,
run_context: ContextWrapper[AstrAgentContext],
**tool_args,
):
"""Execute a handoff as a background mission.

Immediately yields a success response with a task_id, then runs
the subagent asynchronously. When the subagent finishes, a
``CronMessageEvent`` is created so the main LLM can inform the
user of the result – the same pattern used by
``_execute_background`` for regular background tasks.
"""
task_id = uuid.uuid4().hex

async def _run_handoff_in_background() -> None:
try:
await cls._do_handoff_background(
tool=tool,
run_context=run_context,
task_id=task_id,
**tool_args,
)
except Exception as e: # noqa: BLE001
logger.error(
f"Background handoff {task_id} ({tool.name}) failed: {e!s}",
exc_info=True,
)

asyncio.create_task(_run_handoff_in_background())

text_content = mcp.types.TextContent(
type="text",
text=(
f"Background mission submitted. task_id={task_id}. "
f"The subagent '{tool.agent.name}' is working on the task asynchronously. "
f"You will be notified when it finishes."
),
)
yield mcp.types.CallToolResult(content=[text_content])

@classmethod
async def _do_handoff_background(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (complexity): 建议通过提取共享的后台执行和唤醒主 agent 逻辑到可复用的 helper,中幅重构 _do_handoff_background,让该方法只负责编排流程。

通过把通用的“运行子 agent + 用 background_task_result 唤醒主 agent”的编排逻辑拆分成小的 helper,并复用 _execute_background 中已有的模式,可以显著降低 _do_handoff_background 的复杂度和重复度。

1. 分离交接结果收集逻辑

下面这段逻辑:

result_text = ""
try:
    async for r in cls._execute_handoff(tool, run_context, **tool_args):
        if isinstance(r, mcp.types.CallToolResult):
            for content in r.content:
                if isinstance(content, mcp.types.TextContent):
                    result_text += content.text + "\n"
except Exception as e:
    result_text = (
        f"error: Background handoff execution failed, internal error: {e!s}"
    )

可以移动到一个职责单一的 helper 中,这样 _do_handoff_background 不再同时处理流式结果与流程编排:

@classmethod
async def _collect_handoff_result(
    cls,
    tool: HandoffTool,
    run_context: ContextWrapper[AstrAgentContext],
    **tool_args: Any,
) -> str:
    result_text = ""
    try:
        async for r in cls._execute_handoff(tool, run_context, **tool_args):
            if isinstance(r, mcp.types.CallToolResult):
                for content in r.content:
                    if isinstance(content, mcp.types.TextContent):
                        result_text += content.text + "\n"
    except Exception as e:
        result_text = (
            f"error: Background handoff execution failed, internal error: {e!s}"
        )
    return result_text

然后 _do_handoff_background 只需调用:

result_text = await cls._collect_handoff_result(tool, run_context, **tool_args)

2. 复用通用的“用后台结果唤醒主 agent”逻辑

_do_handoff_background 的大部分逻辑和 _execute_background / 通用后台模式相似(构建 CronMessageEvent、配置 ProviderRequest、挂载历史、追加 BACKGROUND_TASK_RESULT_WOKE_SYSTEM_PROMPT、运行 agent、持久化历史)。

可以将这部分逻辑抽成一个通用工具函数,让 _execute_background_do_handoff_background 都调用它:

async def _wake_main_agent_with_background_result(
    *,
    ctx: AstrAgentContext,
    base_event: AstrMessageEvent,
    note: str,
    background_task_result: dict[str, Any],
) -> None:
    from astrbot.core.astr_main_agent import (
        MainAgentBuildConfig,
        _get_session_conv,
        build_main_agent,
    )

    session = MessageSession.from_str(base_event.unified_msg_origin)
    cron_event = CronMessageEvent(
        context=ctx,
        session=session,
        message=note,
        extras={"background_task_result": background_task_result},
        message_type=session.message_type,
    )
    cron_event.role = base_event.role

    req = ProviderRequest()
    conv = await _get_session_conv(event=cron_event, plugin_context=ctx)
    req.conversation = conv

    context = json.loads(conv.history)
    if context:
        req.contexts = context
        context_dump = req._print_friendly_context()
        req.contexts = []
        req.system_prompt += (
            "\n\nBellow is you and user previous conversation history:\n"
            f"{context_dump}"
        )

    bg = json.dumps(background_task_result, ensure_ascii=False)
    req.system_prompt += BACKGROUND_TASK_RESULT_WOKE_SYSTEM_PROMPT.format(
        background_task_result=bg
    )
    req.prompt = (
        "Proceed according to your system instructions. "
        "Output using same language as previous conversation."
        " After completing your task, summarize and output your actions and results."
    )
    if not req.func_tool:
        req.func_tool = ToolSet()
    req.func_tool.add_tool(SEND_MESSAGE_TO_USER_TOOL)

    config = MainAgentBuildConfig(tool_call_timeout=3600)
    result = await build_main_agent(
        event=cron_event, plugin_context=ctx, config=config, req=req
    )
    if not result:
        logger.error("Failed to build main agent for background mission.")
        return

    runner = result.agent_runner
    async for _ in runner.step_until_done(30):
        pass
    llm_resp = runner.get_final_llm_resp()

    summary_note = _build_background_summary_note(
        tool_name=background_task_result.get("tool_name"),
        subagent_name=background_task_result.get("subagent_name"),
        task_id=background_task_result.get("task_id"),
        result_text=background_task_result.get("result"),
        llm_resp=llm_resp,
    )
    await persist_agent_history(
        ctx.conversation_manager,
        event=cron_event,
        req=req,
        summary_note=summary_note,
    )
    if not llm_resp:
        logger.warning("background mission agent got no response")

3. 抽取 summary note 构造逻辑

将当前内联构造字符串的逻辑:

task_meta = extras.get("background_task_result", {})
summary_note = (
    f"[BackgroundMission] {task_meta.get('subagent_name', tool.agent.name)} "
    f"(task_id={task_meta.get('task_id', task_id)}) finished. "
    f"Result: {task_meta.get('result') or result_text or 'no content'}"
)
if llm_resp and llm_resp.completion_text:
    summary_note += (
        f"I finished the task, here is the result: {llm_resp.completion_text}"
    )

提取成一个 helper,同时供两种后台路径使用:

def _build_background_summary_note(
    *,
    tool_name: str | None,
    subagent_name: str | None,
    task_id: str,
    result_text: str,
    llm_resp: Any | None,
) -> str:
    base = (
        f"[BackgroundMission] {subagent_name or tool_name or 'background_task'} "
        f"(task_id={task_id}) finished. "
        f"Result: {result_text or 'no content'}"
    )
    if llm_resp and getattr(llm_resp, "completion_text", None):
        return (
            base
            + " I finished the task, here is the result: "
            + llm_resp.completion_text
        )
    return base

4. 让 _do_handoff_background 只负责流程编排

有了上述 helper 之后,_do_handoff_background 就可以简化为只做“接线”工作:

@classmethod
async def _do_handoff_background(
    cls,
    tool: HandoffTool,
    run_context: ContextWrapper<AstrAgentContext],
    task_id: str,
    **tool_args,
) -> None:
    event = run_context.context.event
    ctx = run_context.context.context

    result_text = await cls._collect_handoff_result(tool, run_context, **tool_args)

    note = (
        event.get_extra("background_note")
        or f"Background subagent mission '{tool.agent.name}' finished."
    )
    background_task_result = {
        "task_id": task_id,
        "tool_name": tool.name,
        "subagent_name": tool.agent.name,
        "result": result_text or "",
        "tool_args": tool_args,
    }

    await _wake_main_agent_with_background_result(
        ctx=ctx,
        base_event=event,
        note=note,
        background_task_result=background_task_result,
    )

这样可以在保持现有行为的前提下:

  • 去除重复的 ProviderRequest / 历史处理 / 唤醒逻辑;
  • _do_handoff_background 缩减为一个简短、单一职责的编排方法;
  • 让未来对后台唤醒行为的修改可以同时作用于交接任务和普通后台任务。
Original comment in English

issue (complexity): Consider refactoring _do_handoff_background by extracting shared background-execution and wake-up logic into reusable helpers so the method only orchestrates the flow.

You can significantly reduce complexity and duplication in _do_handoff_background by extracting the generic “run subagent + wake main agent with background_task_result” orchestration into small helpers and reusing the same pattern as _execute_background.

1. Isolate the handoff result collection

All this logic:

result_text = ""
try:
    async for r in cls._execute_handoff(tool, run_context, **tool_args):
        if isinstance(r, mcp.types.CallToolResult):
            for content in r.content:
                if isinstance(content, mcp.types.TextContent):
                    result_text += content.text + "\n"
except Exception as e:
    result_text = (
        f"error: Background handoff execution failed, internal error: {e!s}"
    )

can be moved to a focused helper so _do_handoff_background no longer mixes streaming handling with orchestration:

@classmethod
async def _collect_handoff_result(
    cls,
    tool: HandoffTool,
    run_context: ContextWrapper[AstrAgentContext],
    **tool_args: Any,
) -> str:
    result_text = ""
    try:
        async for r in cls._execute_handoff(tool, run_context, **tool_args):
            if isinstance(r, mcp.types.CallToolResult):
                for content in r.content:
                    if isinstance(content, mcp.types.TextContent):
                        result_text += content.text + "\n"
    except Exception as e:
        result_text = (
            f"error: Background handoff execution failed, internal error: {e!s}"
        )
    return result_text

Then _do_handoff_background calls:

result_text = await cls._collect_handoff_result(tool, run_context, **tool_args)

2. Reuse generic “wake main agent with background result” logic

Most of _do_handoff_background mirrors _execute_background / the general background pattern (build CronMessageEvent, configure ProviderRequest, attach history, add BACKGROUND_TASK_RESULT_WOKE_SYSTEM_PROMPT, run agent, persist history).

Move that into a reusable utility that both _execute_background and _do_handoff_background can call:

async def _wake_main_agent_with_background_result(
    *,
    ctx: AstrAgentContext,
    base_event: AstrMessageEvent,
    note: str,
    background_task_result: dict[str, Any],
) -> None:
    from astrbot.core.astr_main_agent import (
        MainAgentBuildConfig,
        _get_session_conv,
        build_main_agent,
    )

    session = MessageSession.from_str(base_event.unified_msg_origin)
    cron_event = CronMessageEvent(
        context=ctx,
        session=session,
        message=note,
        extras={"background_task_result": background_task_result},
        message_type=session.message_type,
    )
    cron_event.role = base_event.role

    req = ProviderRequest()
    conv = await _get_session_conv(event=cron_event, plugin_context=ctx)
    req.conversation = conv

    context = json.loads(conv.history)
    if context:
        req.contexts = context
        context_dump = req._print_friendly_context()
        req.contexts = []
        req.system_prompt += (
            "\n\nBellow is you and user previous conversation history:\n"
            f"{context_dump}"
        )

    bg = json.dumps(background_task_result, ensure_ascii=False)
    req.system_prompt += BACKGROUND_TASK_RESULT_WOKE_SYSTEM_PROMPT.format(
        background_task_result=bg
    )
    req.prompt = (
        "Proceed according to your system instructions. "
        "Output using same language as previous conversation."
        " After completing your task, summarize and output your actions and results."
    )
    if not req.func_tool:
        req.func_tool = ToolSet()
    req.func_tool.add_tool(SEND_MESSAGE_TO_USER_TOOL)

    config = MainAgentBuildConfig(tool_call_timeout=3600)
    result = await build_main_agent(
        event=cron_event, plugin_context=ctx, config=config, req=req
    )
    if not result:
        logger.error("Failed to build main agent for background mission.")
        return

    runner = result.agent_runner
    async for _ in runner.step_until_done(30):
        pass
    llm_resp = runner.get_final_llm_resp()

    summary_note = _build_background_summary_note(
        tool_name=background_task_result.get("tool_name"),
        subagent_name=background_task_result.get("subagent_name"),
        task_id=background_task_result.get("task_id"),
        result_text=background_task_result.get("result"),
        llm_resp=llm_resp,
    )
    await persist_agent_history(
        ctx.conversation_manager,
        event=cron_event,
        req=req,
        summary_note=summary_note,
    )
    if not llm_resp:
        logger.warning("background mission agent got no response")

3. Extract summary-note construction

Lift this inline string-building:

task_meta = extras.get("background_task_result", {})
summary_note = (
    f"[BackgroundMission] {task_meta.get('subagent_name', tool.agent.name)} "
    f"(task_id={task_meta.get('task_id', task_id)}) finished. "
    f"Result: {task_meta.get('result') or result_text or 'no content'}"
)
if llm_resp and llm_resp.completion_text:
    summary_note += (
        f"I finished the task, here is the result: {llm_resp.completion_text}"
    )

into a helper used by both background paths:

def _build_background_summary_note(
    *,
    tool_name: str | None,
    subagent_name: str | None,
    task_id: str,
    result_text: str,
    llm_resp: Any | None,
) -> str:
    base = (
        f"[BackgroundMission] {subagent_name or tool_name or 'background_task'} "
        f"(task_id={task_id}) finished. "
        f"Result: {result_text or 'no content'}"
    )
    if llm_resp and getattr(llm_resp, "completion_text", None):
        return (
            base
            + " I finished the task, here is the result: "
            + llm_resp.completion_text
        )
    return base

4. Simplify _do_handoff_background to orchestration only

With the above helpers, _do_handoff_background reduces to wiring:

@classmethod
async def _do_handoff_background(
    cls,
    tool: HandoffTool,
    run_context: ContextWrapper[AstrAgentContext],
    task_id: str,
    **tool_args,
) -> None:
    event = run_context.context.event
    ctx = run_context.context.context

    result_text = await cls._collect_handoff_result(tool, run_context, **tool_args)

    note = (
        event.get_extra("background_note")
        or f"Background subagent mission '{tool.agent.name}' finished."
    )
    background_task_result = {
        "task_id": task_id,
        "tool_name": tool.name,
        "subagent_name": tool.agent.name,
        "result": result_text or "",
        "tool_args": tool_args,
    }

    await _wake_main_agent_with_background_result(
        ctx=ctx,
        base_event=event,
        note=note,
        background_task_result=background_task_result,
    )

This keeps all current behavior, but:

  • Removes duplicated ProviderRequest/history/wake-up logic.
  • Shrinks _do_handoff_background to a short, single-responsibility orchestration method.
  • Makes future changes to background wake-up behavior apply to handoff and normal background tasks in one place.

cls,
tool: HandoffTool,
run_context: ContextWrapper[AstrAgentContext],
task_id: str,
**tool_args,
) -> None:
"""Run the subagent handoff and, on completion, wake the main agent."""
from astrbot.core.astr_main_agent import (
MainAgentBuildConfig,
_get_session_conv,
build_main_agent,
)

# ---- 1. Execute the handoff (subagent) ----------------------------
result_text = ""
try:
async for r in cls._execute_handoff(tool, run_context, **tool_args):
if isinstance(r, mcp.types.CallToolResult):
for content in r.content:
if isinstance(content, mcp.types.TextContent):
result_text += content.text + "\n"
except Exception as e:
result_text = (
f"error: Background handoff execution failed, internal error: {e!s}"
)

# ---- 2. Build a CronMessageEvent to wake the main agent -----------
event = run_context.context.event
ctx = run_context.context.context

note = (
event.get_extra("background_note")
or f"Background subagent mission '{tool.agent.name}' finished."
)
extras = {
"background_task_result": {
"task_id": task_id,
"tool_name": tool.name,
"subagent_name": tool.agent.name,
"result": result_text or "",
"tool_args": tool_args,
}
}
session = MessageSession.from_str(event.unified_msg_origin)
cron_event = CronMessageEvent(
context=ctx,
session=session,
message=note,
extras=extras,
message_type=session.message_type,
)
cron_event.role = event.role
config = MainAgentBuildConfig(tool_call_timeout=3600)

req = ProviderRequest()
conv = await _get_session_conv(event=cron_event, plugin_context=ctx)
req.conversation = conv
context = json.loads(conv.history)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (bug_risk): 考虑让 json.loads(conv.history) 对异常或不合法的历史数据更加健壮。

如果 conv.history 可能为空、为 None,或者不是合法 JSON(例如来源于旧数据或部分写入),json.loads 会抛异常并中断后台交接流程。建议用 try/except 包裹,或者先检查是否为有效且非空的 JSON 字符串,并在历史数据无效时将其视为“无上下文(no context)”,以便后台任务仍然可以完成。

Original comment in English

issue (bug_risk): Consider making json.loads(conv.history) more robust against malformed or empty history.

If conv.history can ever be empty, None, or non-JSON (e.g., from older data or partial writes), json.loads will raise and stop the background handoff flow. Consider wrapping this in a try/except or checking for a valid, non-empty JSON string first, and treating invalid history as “no context” so the mission can still complete.

if context:
req.contexts = context
context_dump = req._print_friendly_context()
req.contexts = []
req.system_prompt += (
"\n\nBellow is you and user previous conversation history:\n"
f"{context_dump}"
)

bg = json.dumps(extras["background_task_result"], ensure_ascii=False)
req.system_prompt += BACKGROUND_TASK_RESULT_WOKE_SYSTEM_PROMPT.format(
background_task_result=bg
)
req.prompt = (
"Proceed according to your system instructions. "
"Output using same language as previous conversation."
" After completing your task, summarize and output your actions and results."
)
if not req.func_tool:
req.func_tool = ToolSet()
req.func_tool.add_tool(SEND_MESSAGE_TO_USER_TOOL)

result = await build_main_agent(
event=cron_event, plugin_context=ctx, config=config, req=req
)
if not result:
logger.error("Failed to build main agent for background handoff mission.")
return

runner = result.agent_runner
async for _ in runner.step_until_done(30):
# agent will send message to user via using tools
pass
llm_resp = runner.get_final_llm_resp()
task_meta = extras.get("background_task_result", {})
summary_note = (
f"[BackgroundMission] {task_meta.get('subagent_name', tool.agent.name)} "
f"(task_id={task_meta.get('task_id', task_id)}) finished. "
f"Result: {task_meta.get('result') or result_text or 'no content'}"
)
if llm_resp and llm_resp.completion_text:
summary_note += (
f"I finished the task, here is the result: {llm_resp.completion_text}"
)
await persist_agent_history(
ctx.conversation_manager,
event=cron_event,
req=req,
summary_note=summary_note,
)
if not llm_resp:
logger.warning("background handoff mission agent got no response")
return

@classmethod
async def _execute_background(
cls,
Expand Down