Skip to content

feat: add fallback chat model chain in tool loop runner#5109

Merged
Soulter merged 3 commits intomasterfrom
feat/fallback-chat-models
Feb 15, 2026
Merged

feat: add fallback chat model chain in tool loop runner#5109
Soulter merged 3 commits intomasterfrom
feat/fallback-chat-models

Conversation

@Soulter
Copy link
Member

@Soulter Soulter commented Feb 15, 2026

Summary

  • add fallback chat provider chain support in
  • add config key
  • wire fallback providers from main agent builder into runner
  • add dashboard with ordered multi-select UI
  • fallback requests now omit in payload for non-primary providers

Validation

  • ruff format/check passed
  • ...... [100%]
    6 passed in 1.63s passed (6 tests)

Summary by Sourcery

为工具循环代理运行器添加可配置的备用聊天模型链支持,并通过主代理配置和仪表板 UI 对其进行呈现。

New Features:

  • 在主代理中引入备用聊天提供商列表,将其传递给工具循环代理运行器,以便在主要聊天提供商失败或返回错误时按顺序进行接替。
  • 添加配置支持,用于指定有序的备用聊天模型,包括模式元数据和仪表板接线,以便选择多个提供商。
  • 扩展提供商选择 UI 组件,使其支持提供商的有序多选,并提供重新排序和移除控制,以及所选项的汇总展示。

Enhancements:

  • 确保对备用聊天提供商的请求不显式指定模型,同时保持对主要提供商的既有行为。
  • 在构建备用链时通过基于提供商 ID 和类型的过滤来避免重复或无效的备用提供商。

Tests:

  • 添加测试,验证当主要聊天提供商抛出异常或返回错误响应时会使用备用提供商,并且最终助手响应来自备用提供商。
Original summary in English

Summary by Sourcery

Add configurable fallback chat model chain support to the tool loop agent runner and surface it through the main agent configuration and dashboard UI.

New Features:

  • Introduce fallback chat providers list in the main agent that is passed into the tool loop agent runner for sequential failover when the primary chat provider fails or returns an error.
  • Add configuration support for specifying ordered fallback chat models, including schema metadata and dashboard wiring to select multiple providers.
  • Extend the provider selection UI component to support ordered multi-select of providers with reordering and removal controls and summary display of selections.

Enhancements:

  • Ensure fallback chat requests omit explicit model selection for non-primary providers while preserving behavior for the primary provider.
  • Avoid duplicate or invalid fallback providers by filtering on provider IDs and types when building the fallback chain.

Tests:

  • Add tests verifying that fallback providers are used when the primary chat provider raises an exception or returns an error response, and that the final assistant response comes from the fallback provider.

@auto-assign auto-assign bot requested review from Fridemn and Raven95676 February 15, 2026 03:41
@dosubot dosubot bot added the size:L This PR changes 100-499 lines, ignoring generated files. label Feb 15, 2026
@dosubot
Copy link

dosubot bot commented Feb 15, 2026

Documentation Updates

1 document(s) were updated by changes in this PR:

pr4697的改动
View Changes
@@ -6,10 +6,15 @@
 ### 1. SubAgent(子代理)架构
 
 #### 架构说明
-主动代理系统引入了 SubAgent(子代理)架构。主代理(MainAgent)不仅专注于对话和任务委派,还可以直接使用自身工具集。具体工具和任务也可以由子代理(SubAgent)处理。这样可以有效避免 prompt 膨胀和调用失败,同时提升主代理的灵活性。
+主代理可以通过 `transfer_to_<subagent>` 工具将任务转交给指定子代理。现在该工具新增了 `background_mission` 参数:
 
-- 工具分配:每个 SubAgent 继承自 Persona,拥有独立的工具集、技能、名称和描述。主代理可以直接调用自身工具,也可以通过 `transfer_to_<subagent>` 工具将任务转交给指定子代理。启用 SubAgent 编排后,主代理会保留自身工具,并自动挂载 handoff 工具(transfer_to_*),可根据配置决定是否移除与子代理重复的工具。
-- 任务流转:主代理既可以直接处理任务,也可以分发任务给子代理。子代理之间可通过 transfer 工具实现任务转移。主代理和子代理的工具集可以根据配置灵活分配,支持去重和 handoff 工具自动挂载。
+- `background_mission`: 若设置为 `true`,任务将以后台模式异步执行,主代理会立即返回任务 ID,任务完成后通过后台任务唤醒机制通知用户结果。适用于长时间或非紧急任务。
+
+任务流转支持同步和异步两种模式:
+- 同步模式(默认):主代理等待子代理完成任务后返回结果。
+- 异步模式(background_mission=true):主代理立即返回任务 ID,子代理后台执行,完成后自动通知用户。
+
+这样,主代理既可以自主处理,也可以灵活委派任务,并支持异步任务流转。
 
 #### 使用示例
 假设有两个子代理:`DataAgent` 和 `ReportAgent`,分别负责数据处理和报告生成。主代理收到用户请求后,可以选择直接使用自身工具处理部分任务,或通过 `transfer_to_DataAgent` 工具将数据处理任务分配给 DataAgent,处理完成后再通过 `transfer_to_ReportAgent` 工具将结果交给 ReportAgent 生成报告。
@@ -25,6 +30,10 @@
 
 #### 配置说明
 SubAgent 的定义与 Persona 配置一致,需在配置文件中指定 tools、skills、name、description 等。
+
+transfer_to_* 工具现支持 `background_mission` 参数:
+- 在调用时可传入 `background_mission: true`,任务将以后台模式异步执行,主代理立即返回任务 ID,完成后通过后台任务唤醒机制通知用户。
+- 若未设置或为 `false`,则为同步模式,主代理等待子代理完成任务。
 
 主代理在未启用 SubAgent 编排时,可直接挂载自身工具集(按 persona 规则,默认全部),并直接调用工具。
 

How did I do? Any feedback?  Join Discord

@dosubot dosubot bot added area:core The bug / feature is about astrbot's core, backend area:webui The bug / feature is about webui(dashboard) of astrbot. labels Feb 15, 2026
Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - 我发现了 3 个问题,并提供了一些整体反馈:

  • _iter_llm_responses_with_fallback 中,用于记录切换 provider 的 warning 日志在 self.provider 已经被重置为候选 provider 之后,依然使用的是 self.provider.provider_config,因此 "from" provider 的 ID 会不正确;建议在覆盖 self.provider 之前先保存之前的 provider ID,这样日志才能正确反映实际的切换过程。
  • ProviderSelector.vue 中,将 modelValue 归一化为 selectedProviders 的逻辑在 modelValuewatchopenDialogcancelSelection 中是重复的;建议抽取成一个小的辅助函数,以保持行为一致并更易维护。
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- In `_iter_llm_responses_with_fallback`, the warning log for switching providers uses `self.provider.provider_config` after `self.provider` has already been reassigned to the candidate, so the "from" provider ID is incorrect; capture the previous provider ID before overwriting `self.provider` so the log reflects the actual transition.
- The logic for normalizing `modelValue` into `selectedProviders` in `ProviderSelector.vue` is duplicated across the `watch` on `modelValue`, `openDialog`, and `cancelSelection`; consider extracting this into a small helper function to keep the behavior consistent and easier to maintain.

## Individual Comments

### Comment 1
<location> `astrbot/core/agent/runners/tool_loop_agent_runner.py:220-229` </location>
<code_context>
+            has_stream_output = False
</code_context>

<issue_to_address>
**issue (bug_risk):** 当前在开始流式输出后发生的异常仍然会触发 fallback,这可能导致来自不同 provider 的输出被混合。

由于 `has_stream_output` 只在正常响应路径中被检查,当已经产出了一些 chunk 之后再发生异常时,仍会触发 fallback。这意味着调用方可能会先收到来自 provider A 的部分输出,然后再收到 provider B 的完整响应,对下游使用方来说这很可能是错误或未定义的行为。

在 `except Exception` 分支中,建议利用 `has_stream_output` 来:
- 一旦已经输出过任何 chunk,就直接终止并暴露错误(或重新抛出异常),或者
- 在开始流式输出之后,对当前候选 provider 禁用 fallback。

这样可以让异常路径与当前逻辑保持一致:只有在没有产生任何流式输出的情况下才进行 fallback。
</issue_to_address>

### Comment 2
<location> `astrbot/core/astr_main_agent.py:873-882` </location>
<code_context>
+def _get_fallback_chat_providers(
</code_context>

<issue_to_address>
**suggestion (bug_risk):** 静默忽略非 list 类型的 `fallback_chat_models` 可能会掩盖配置错误;建议通过日志或校验显式地暴露出来。

目前对于非 list 类型的 `fallback_chat_models`(例如字符串),只是简单忽略:
```python
fallback_ids = provider.provider_config.get("fallback_chat_models", [])
if not isinstance(fallback_ids, list):
    return []
```
可以考虑在返回之前打印一个 warning,以便运维人员看到配置错误:
```python
if not isinstance(fallback_ids, list):
    logger.warning("fallback_chat_models should be a list, got %r; ignoring.", type(fallback_ids))
    return []
```

建议实现:

```python
def _get_fallback_chat_providers(
    provider: Provider, plugin_context: Context
) -> list[Provider]:
    fallback_ids = provider.provider_config.get("fallback_chat_models", [])
    if not isinstance(fallback_ids, list):
        logger.warning(
            "fallback_chat_models should be a list, got %r; ignoring. provider_id=%r",
            type(fallback_ids),
            provider.provider_config.get("id"),
        )
        return []

```

1. 确保该模块能够访问一个 `logger` 实例。如果当前还没有,建议在文件顶部添加:
   - `import logging`,以及
   - 在顶层定义附近添加 `logger = logging.getLogger(__name__)`(或者复用当前包中已有的日志工具/模式)。
2. 如果你的日志配置更偏向结构化日志或其他风格,请根据现有约定调整 `logger.warning` 的写法(例如使用 extra/context 字典,而不是格式化字符串)。
</issue_to_address>

### Comment 3
<location> `astrbot/core/agent/runners/tool_loop_agent_runner.py:201` </location>
<code_context>
         else:
             yield await self.provider.text_chat(**payload)

+    async def _iter_llm_responses_with_fallback(
+        self,
+    ) -> T.AsyncGenerator[LLMResponse, None]:
</code_context>

<issue_to_address>
**issue (complexity):** 建议将 fallback 循环拆分为更小的辅助函数,把模型选择逻辑从 `_iter_llm_responses` 中移出,并集中处理 provider 去重,以简化控制流并减少状态跟踪的复杂度。

1.`_iter_llm_responses_with_fallback` 拆分为更小的辅助方法,以避免多 flag 组合的复杂度  
当前迭代器在单个循环中同时维护 `has_stream_output``is_last_candidate``last_exception``last_err_response`,并使用 `break`/`continue`/`return` 控制流程。可以把“单个 provider 的运行逻辑”移动到一个专门的 helper 中,让它返回清晰的状态;然后在外层用一个更简单的 fallback 循环来根据状态做决策。

例如:

```python
class ProviderRunStatus(enum.Enum):
    SUCCESS = "success"
    ERROR_BEFORE_STREAM = "error_before_stream"
    ERROR_DURING_STREAM = "error_during_stream"


async def _iter_single_provider(
    self, *, include_model: bool
) -> tuple[ProviderRunStatus, LLMResponse | None]:
    """Yield responses for current provider; return status + final response if any."""
    has_stream_output = False
    final_resp: LLMResponse | None = None

    async for resp in self._iter_llm_responses(include_model=include_model):
        if resp.is_chunk:
            has_stream_output = True
            yield resp
            continue

        final_resp = resp
        # distinguish error-before-stream vs after-stream
        if resp.role == "err" and not has_stream_output:
            return ProviderRunStatus.ERROR_BEFORE_STREAM, resp
        return ProviderRunStatus.SUCCESS, resp

    # reached end-of-stream without final_resp (defensive)
    if has_stream_output:
        return ProviderRunStatus.SUCCESS, final_resp
    return ProviderRunStatus.ERROR_BEFORE_STREAM, final_resp
```

然后 fallback 包装器就可以主要负责在 providers 之间循环,并根据状态做出反应,而不需要在一个循环里维护多个内部标志:

```python
async def _iter_llm_responses_with_fallback(self) -> AsyncIterator[LLMResponse]:
    candidates = [self.provider, *self.fallback_providers]
    last_err_response: LLMResponse | None = None
    last_exception: Exception | None = None

    for idx, candidate in enumerate(candidates):
        is_last_candidate = idx == len(candidates) - 1
        self.provider = candidate

        try:
            status, final_resp = await self._iter_single_provider(include_model=(idx == 0))
        except Exception as exc:  # noqa: BLE001
            last_exception = exc
            # log + continue
            continue

        if status is ProviderRunStatus.ERROR_BEFORE_STREAM and not is_last_candidate:
            last_err_response = final_resp
            # log + try next provider
            continue

        # success or last provider error
        if final_resp:
            yield final_resp
        return

    # fallbacks exhausted
    if last_err_response:
        yield last_err_response
        return
    if last_exception:
        yield LLMResponse(
            role="err",
            completion_text=f"All chat models failed: {type(last_exception).__name__}: {last_exception}",
        )
        return
    yield LLMResponse(
        role="err",
        completion_text="All available chat models are unavailable.",
    )
```

这可以保持当前的行为(在输出任何 chunk 之前发生错误 → 尝试下一个 provider;一旦开始流式输出,就坚持使用该 provider),同时避免在同一个循环里对多个标志进行复杂推理。


2. 将 “include_model” 与 `_iter_llm_responses` 解耦  
`include_model` 标志仅用于区分主 provider 与 fallback provider。可以让 `_iter_llm_responses` 专注于“为当前 provider 迭代 LLM 响应”,而把模型选择逻辑移到调用方。

例如:

```python
def _build_llm_payload(self, *, model: str | None) -> dict[str, Any]:
    payload: dict[str, Any] = {
        "contexts": self.run_context.messages,
        "func_tool": self.req.func_tool,
        "session_id": self.req.session_id,
        "extra_user_content_parts": self.req.extra_user_content_parts,
    }
    if model:
        payload["model"] = model
    return payload


async def _iter_llm_responses(
    self, payload: dict[str, Any]
) -> AsyncIterator[LLMResponse]:
    if self.streaming:
        async for resp in self.provider.text_chat_stream(**payload):  # type: ignore
            yield resp
    else:
        yield await self.provider.text_chat(**payload)
```

然后由调用方决定是传入 `self.req.model`(主 provider),还是传入 `None`(fallback):

```python
payload = self._build_llm_payload(model=self.req.model if is_primary else None)
async for resp in self._iter_llm_responses(payload):
    ...
```


3. 集中处理 provider 去重逻辑  
你已经在 `__init__` 里做了基于 id 的去重,而且 `reset` / `_get_fallback_chat_providers` 似乎也有类似逻辑。为了避免在两个地方分别维护唯一性,可以把这部分提取到一个小的 helper 中,并在各处复用:

```python
def _dedupe_providers(
    self, primary: Provider, fallbacks: list[Provider] | None
) -> list[Provider]:
    seen_ids: set[str] = {str(primary.provider_config.get("id", ""))}
    deduped: list[Provider] = []

    for p in fallbacks or []:
        pid = str(p.provider_config.get("id", ""))
        if p is primary:
            continue
        if pid and pid in seen_ids:
            continue
        deduped.append(p)
        if pid:
            seen_ids.add(pid)
    return deduped
````__init__` 和任何 `reset` / `_get_fallback_chat_providers` 逻辑中使用它:

```python
self.provider = provider
self.fallback_providers = self._dedupe_providers(provider, fallback_providers)
```

这样可以去除重复的 id 检查逻辑,并将唯一性规则集中在一个更易审查的位置。
</issue_to_address>

Sourcery 对开源项目是免费的——如果你觉得这些 review 有帮助,欢迎分享 ✨
帮我变得更有用!请在每条评论上点 👍 或 👎,我会根据你的反馈改进后续的 review。
Original comment in English

Hey - I've found 3 issues, and left some high level feedback:

  • In _iter_llm_responses_with_fallback, the warning log for switching providers uses self.provider.provider_config after self.provider has already been reassigned to the candidate, so the "from" provider ID is incorrect; capture the previous provider ID before overwriting self.provider so the log reflects the actual transition.
  • The logic for normalizing modelValue into selectedProviders in ProviderSelector.vue is duplicated across the watch on modelValue, openDialog, and cancelSelection; consider extracting this into a small helper function to keep the behavior consistent and easier to maintain.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- In `_iter_llm_responses_with_fallback`, the warning log for switching providers uses `self.provider.provider_config` after `self.provider` has already been reassigned to the candidate, so the "from" provider ID is incorrect; capture the previous provider ID before overwriting `self.provider` so the log reflects the actual transition.
- The logic for normalizing `modelValue` into `selectedProviders` in `ProviderSelector.vue` is duplicated across the `watch` on `modelValue`, `openDialog`, and `cancelSelection`; consider extracting this into a small helper function to keep the behavior consistent and easier to maintain.

## Individual Comments

### Comment 1
<location> `astrbot/core/agent/runners/tool_loop_agent_runner.py:220-229` </location>
<code_context>
+            has_stream_output = False
</code_context>

<issue_to_address>
**issue (bug_risk):** Exceptions after streaming has started currently trigger fallback, which may mix outputs from different providers.

Because `has_stream_output` is only checked in the normal response path, an exception after some chunks have been yielded will still trigger fallback. That means the caller can see partial output from provider A followed by a full response from provider B, which is likely incorrect/undefined behavior for consumers.

In the `except Exception` block, consider using `has_stream_output` to either:
- stop and surface an error (or re-raise) once any chunks have been emitted, or
- skip fallback for this candidate after streaming has started.

That would align the exception path with the existing logic that only falls back when no streamed output was produced.
</issue_to_address>

### Comment 2
<location> `astrbot/core/astr_main_agent.py:873-882` </location>
<code_context>
+def _get_fallback_chat_providers(
</code_context>

<issue_to_address>
**suggestion (bug_risk):** Silently ignoring non-list `fallback_chat_models` may hide configuration mistakes; consider surfacing this via logging or validation.

Right now a non-list `fallback_chat_models` (e.g., a string) is just ignored:
```python
fallback_ids = provider.provider_config.get("fallback_chat_models", [])
if not isinstance(fallback_ids, list):
    return []
```
Consider emitting a warning before returning, so operators see the misconfiguration:
```python
if not isinstance(fallback_ids, list):
    logger.warning("fallback_chat_models should be a list, got %r; ignoring.", type(fallback_ids))
    return []
```

Suggested implementation:

```python
def _get_fallback_chat_providers(
    provider: Provider, plugin_context: Context
) -> list[Provider]:
    fallback_ids = provider.provider_config.get("fallback_chat_models", [])
    if not isinstance(fallback_ids, list):
        logger.warning(
            "fallback_chat_models should be a list, got %r; ignoring. provider_id=%r",
            type(fallback_ids),
            provider.provider_config.get("id"),
        )
        return []

```

1. Ensure this module has access to a `logger` instance. If not already present, add something like:
   - `import logging` at the top of the file, and
   - `logger = logging.getLogger(__name__)` near the top-level definitions (or reuse the existing logging pattern in this package, e.g., a shared logger utility).
2. If your logging configuration prefers structured logging or a different style, adjust the `logger.warning` call to match the existing conventions (e.g., using extra/context dicts instead of formatted strings).
</issue_to_address>

### Comment 3
<location> `astrbot/core/agent/runners/tool_loop_agent_runner.py:201` </location>
<code_context>
         else:
             yield await self.provider.text_chat(**payload)

+    async def _iter_llm_responses_with_fallback(
+        self,
+    ) -> T.AsyncGenerator[LLMResponse, None]:
</code_context>

<issue_to_address>
**issue (complexity):** Consider extracting the fallback loop into smaller helpers, moving model-selection out of `_iter_llm_responses`, and centralizing provider de-duplication to simplify control flow and reduce state tracking complexity.

1. Split `_iter_llm_responses_with_fallback` into smaller helpers to avoid flag juggling  
Right now the iterator is tracking `has_stream_output`, `is_last_candidate`, `last_exception`, and `last_err_response` in a single loop with `break`/`continue`/`return`. You can move the “single provider” logic into a focused helper that reports a clear status, and keep the high‑level fallback loop simple.

For example:

```python
class ProviderRunStatus(enum.Enum):
    SUCCESS = "success"
    ERROR_BEFORE_STREAM = "error_before_stream"
    ERROR_DURING_STREAM = "error_during_stream"


async def _iter_single_provider(
    self, *, include_model: bool
) -> tuple[ProviderRunStatus, LLMResponse | None]:
    """Yield responses for current provider; return status + final response if any."""
    has_stream_output = False
    final_resp: LLMResponse | None = None

    async for resp in self._iter_llm_responses(include_model=include_model):
        if resp.is_chunk:
            has_stream_output = True
            yield resp
            continue

        final_resp = resp
        # distinguish error-before-stream vs after-stream
        if resp.role == "err" and not has_stream_output:
            return ProviderRunStatus.ERROR_BEFORE_STREAM, resp
        return ProviderRunStatus.SUCCESS, resp

    # reached end-of-stream without final_resp (defensive)
    if has_stream_output:
        return ProviderRunStatus.SUCCESS, final_resp
    return ProviderRunStatus.ERROR_BEFORE_STREAM, final_resp
```

Then the fallback wrapper becomes mostly about looping providers and reacting to the status, without internal flags:

```python
async def _iter_llm_responses_with_fallback(self) -> AsyncIterator[LLMResponse]:
    candidates = [self.provider, *self.fallback_providers]
    last_err_response: LLMResponse | None = None
    last_exception: Exception | None = None

    for idx, candidate in enumerate(candidates):
        is_last_candidate = idx == len(candidates) - 1
        self.provider = candidate

        try:
            status, final_resp = await self._iter_single_provider(include_model=(idx == 0))
        except Exception as exc:  # noqa: BLE001
            last_exception = exc
            # log + continue
            continue

        if status is ProviderRunStatus.ERROR_BEFORE_STREAM and not is_last_candidate:
            last_err_response = final_resp
            # log + try next provider
            continue

        # success or last provider error
        if final_resp:
            yield final_resp
        return

    # fallbacks exhausted
    if last_err_response:
        yield last_err_response
        return
    if last_exception:
        yield LLMResponse(
            role="err",
            completion_text=f"All chat models failed: {type(last_exception).__name__}: {last_exception}",
        )
        return
    yield LLMResponse(
        role="err",
        completion_text="All available chat models are unavailable.",
    )
```

This preserves the current behavior (error before any chunk → try next provider; once streaming starts, stick with that provider) but removes the need to reason about multiple flags in the same loop.


2. Decouple “include_model” from `_iter_llm_responses`  
The `include_model` flag exists solely to distinguish primary vs fallback providers. You can keep `_iter_llm_responses` focused on “iterate LLM responses for the *current* provider” by moving model selection into the caller.

For example:

```python
def _build_llm_payload(self, *, model: str | None) -> dict[str, Any]:
    payload: dict[str, Any] = {
        "contexts": self.run_context.messages,
        "func_tool": self.req.func_tool,
        "session_id": self.req.session_id,
        "extra_user_content_parts": self.req.extra_user_content_parts,
    }
    if model:
        payload["model"] = model
    return payload


async def _iter_llm_responses(
    self, payload: dict[str, Any]
) -> AsyncIterator[LLMResponse]:
    if self.streaming:
        async for resp in self.provider.text_chat_stream(**payload):  # type: ignore
            yield resp
    else:
        yield await self.provider.text_chat(**payload)
```

Then callers decide whether to pass `self.req.model` (primary) or `None` (fallback), instead of `_iter_llm_responses` knowing about primary vs fallback:

```python
payload = self._build_llm_payload(model=self.req.model if is_primary else None)
async for resp in self._iter_llm_responses(payload):
    ...
```


3. Centralize provider de‑duplication  
You’re already doing id-based deduplication in `__init__`, and it sounds like `reset` / `_get_fallback_chat_providers` have similar logic. To avoid two places enforcing uniqueness, factor this into a small helper used everywhere:

```python
def _dedupe_providers(
    self, primary: Provider, fallbacks: list[Provider] | None
) -> list[Provider]:
    seen_ids: set[str] = {str(primary.provider_config.get("id", ""))}
    deduped: list[Provider] = []

    for p in fallbacks or []:
        pid = str(p.provider_config.get("id", ""))
        if p is primary:
            continue
        if pid and pid in seen_ids:
            continue
        deduped.append(p)
        if pid:
            seen_ids.add(pid)
    return deduped
```

Use it in `__init__` and any `reset`/`_get_fallback_chat_providers` logic:

```python
self.provider = provider
self.fallback_providers = self._dedupe_providers(provider, fallback_providers)
```

This removes duplicated id-checking and keeps the uniqueness rules in a single, easy-to-audit place.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Comment on lines +220 to +229
has_stream_output = False
try:
async for resp in self._iter_llm_responses(include_model=idx == 0):
if resp.is_chunk:
has_stream_output = True
yield resp
continue

if (
resp.role == "err"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (bug_risk): 当前在开始流式输出后发生的异常仍然会触发 fallback,这可能导致来自不同 provider 的输出被混合。

由于 has_stream_output 只在正常响应路径中被检查,当已经产出了一些 chunk 之后再发生异常时,仍会触发 fallback。这意味着调用方可能会先收到来自 provider A 的部分输出,然后再收到 provider B 的完整响应,对下游使用方来说这很可能是错误或未定义的行为。

except Exception 分支中,建议利用 has_stream_output 来:

  • 一旦已经输出过任何 chunk,就直接终止并暴露错误(或重新抛出异常),或者
  • 在开始流式输出之后,对当前候选 provider 禁用 fallback。

这样可以让异常路径与当前逻辑保持一致:只有在没有产生任何流式输出的情况下才进行 fallback。

Original comment in English

issue (bug_risk): Exceptions after streaming has started currently trigger fallback, which may mix outputs from different providers.

Because has_stream_output is only checked in the normal response path, an exception after some chunks have been yielded will still trigger fallback. That means the caller can see partial output from provider A followed by a full response from provider B, which is likely incorrect/undefined behavior for consumers.

In the except Exception block, consider using has_stream_output to either:

  • stop and surface an error (or re-raise) once any chunks have been emitted, or
  • skip fallback for this candidate after streaming has started.

That would align the exception path with the existing logic that only falls back when no streamed output was produced.

else:
yield await self.provider.text_chat(**payload)

async def _iter_llm_responses_with_fallback(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (complexity): 建议将 fallback 循环拆分为更小的辅助函数,把模型选择逻辑从 _iter_llm_responses 中移出,并集中处理 provider 去重,以简化控制流并减少状态跟踪的复杂度。

  1. _iter_llm_responses_with_fallback 拆分为更小的辅助方法,以避免多 flag 组合的复杂度
    当前迭代器在单个循环中同时维护 has_stream_outputis_last_candidatelast_exceptionlast_err_response,并使用 break/continue/return 控制流程。可以把“单个 provider 的运行逻辑”移动到一个专门的 helper 中,让它返回清晰的状态;然后在外层用一个更简单的 fallback 循环来根据状态做决策。

例如:

class ProviderRunStatus(enum.Enum):
    SUCCESS = "success"
    ERROR_BEFORE_STREAM = "error_before_stream"
    ERROR_DURING_STREAM = "error_during_stream"


async def _iter_single_provider(
    self, *, include_model: bool
) -> tuple[ProviderRunStatus, LLMResponse | None]:
    """Yield responses for current provider; return status + final response if any."""
    has_stream_output = False
    final_resp: LLMResponse | None = None

    async for resp in self._iter_llm_responses(include_model=include_model):
        if resp.is_chunk:
            has_stream_output = True
            yield resp
            continue

        final_resp = resp
        # distinguish error-before-stream vs after-stream
        if resp.role == "err" and not has_stream_output:
            return ProviderRunStatus.ERROR_BEFORE_STREAM, resp
        return ProviderRunStatus.SUCCESS, resp

    # reached end-of-stream without final_resp (defensive)
    if has_stream_output:
        return ProviderRunStatus.SUCCESS, final_resp
    return ProviderRunStatus.ERROR_BEFORE_STREAM, final_resp

然后 fallback 包装器就可以主要负责在 providers 之间循环,并根据状态做出反应,而不需要在一个循环里维护多个内部标志:

async def _iter_llm_responses_with_fallback(self) -> AsyncIterator[LLMResponse]:
    candidates = [self.provider, *self.fallback_providers]
    last_err_response: LLMResponse | None = None
    last_exception: Exception | None = None

    for idx, candidate in enumerate(candidates):
        is_last_candidate = idx == len(candidates) - 1
        self.provider = candidate

        try:
            status, final_resp = await self._iter_single_provider(include_model=(idx == 0))
        except Exception as exc:  # noqa: BLE001
            last_exception = exc
            # log + continue
            continue

        if status is ProviderRunStatus.ERROR_BEFORE_STREAM and not is_last_candidate:
            last_err_response = final_resp
            # log + try next provider
            continue

        # success or last provider error
        if final_resp:
            yield final_resp
        return

    # fallbacks exhausted
    if last_err_response:
        yield last_err_response
        return
    if last_exception:
        yield LLMResponse(
            role="err",
            completion_text=f"All chat models failed: {type(last_exception).__name__}: {last_exception}",
        )
        return
    yield LLMResponse(
        role="err",
        completion_text="All available chat models are unavailable.",
    )

这可以保持当前的行为(在输出任何 chunk 之前发生错误 → 尝试下一个 provider;一旦开始流式输出,就坚持使用该 provider),同时避免在同一个循环里对多个标志进行复杂推理。

  1. 将 “include_model” 与 _iter_llm_responses 解耦
    include_model 标志仅用于区分主 provider 与 fallback provider。可以让 _iter_llm_responses 专注于“为当前 provider 迭代 LLM 响应”,而把模型选择逻辑移到调用方。

例如:

def _build_llm_payload(self, *, model: str | None) -> dict[str, Any]:
    payload: dict[str, Any] = {
        "contexts": self.run_context.messages,
        "func_tool": self.req.func_tool,
        "session_id": self.req.session_id,
        "extra_user_content_parts": self.req.extra_user_content_parts,
    }
    if model:
        payload["model"] = model
    return payload


async def _iter_llm_responses(
    self, payload: dict[str, Any]
) -> AsyncIterator[LLMResponse]:
    if self.streaming:
        async for resp in self.provider.text_chat_stream(**payload):  # type: ignore
            yield resp
    else:
        yield await self.provider.text_chat(**payload)

然后由调用方决定是传入 self.req.model(主 provider),还是传入 None(fallback):

payload = self._build_llm_payload(model=self.req.model if is_primary else None)
async for resp in self._iter_llm_responses(payload):
    ...
  1. 集中处理 provider 去重逻辑
    你已经在 __init__ 里做了基于 id 的去重,而且 reset / _get_fallback_chat_providers 似乎也有类似逻辑。为了避免在两个地方分别维护唯一性,可以把这部分提取到一个小的 helper 中,并在各处复用:
def _dedupe_providers(
    self, primary: Provider, fallbacks: list[Provider] | None
) -> list[Provider]:
    seen_ids: set[str] = {str(primary.provider_config.get("id", ""))}
    deduped: list[Provider] = []

    for p in fallbacks or []:
        pid = str(p.provider_config.get("id", ""))
        if p is primary:
            continue
        if pid and pid in seen_ids:
            continue
        deduped.append(p)
        if pid:
            seen_ids.add(pid)
    return deduped

__init__ 和任何 reset / _get_fallback_chat_providers 逻辑中使用它:

self.provider = provider
self.fallback_providers = self._dedupe_providers(provider, fallback_providers)

这样可以去除重复的 id 检查逻辑,并将唯一性规则集中在一个更易审查的位置。

Original comment in English

issue (complexity): Consider extracting the fallback loop into smaller helpers, moving model-selection out of _iter_llm_responses, and centralizing provider de-duplication to simplify control flow and reduce state tracking complexity.

  1. Split _iter_llm_responses_with_fallback into smaller helpers to avoid flag juggling
    Right now the iterator is tracking has_stream_output, is_last_candidate, last_exception, and last_err_response in a single loop with break/continue/return. You can move the “single provider” logic into a focused helper that reports a clear status, and keep the high‑level fallback loop simple.

For example:

class ProviderRunStatus(enum.Enum):
    SUCCESS = "success"
    ERROR_BEFORE_STREAM = "error_before_stream"
    ERROR_DURING_STREAM = "error_during_stream"


async def _iter_single_provider(
    self, *, include_model: bool
) -> tuple[ProviderRunStatus, LLMResponse | None]:
    """Yield responses for current provider; return status + final response if any."""
    has_stream_output = False
    final_resp: LLMResponse | None = None

    async for resp in self._iter_llm_responses(include_model=include_model):
        if resp.is_chunk:
            has_stream_output = True
            yield resp
            continue

        final_resp = resp
        # distinguish error-before-stream vs after-stream
        if resp.role == "err" and not has_stream_output:
            return ProviderRunStatus.ERROR_BEFORE_STREAM, resp
        return ProviderRunStatus.SUCCESS, resp

    # reached end-of-stream without final_resp (defensive)
    if has_stream_output:
        return ProviderRunStatus.SUCCESS, final_resp
    return ProviderRunStatus.ERROR_BEFORE_STREAM, final_resp

Then the fallback wrapper becomes mostly about looping providers and reacting to the status, without internal flags:

async def _iter_llm_responses_with_fallback(self) -> AsyncIterator[LLMResponse]:
    candidates = [self.provider, *self.fallback_providers]
    last_err_response: LLMResponse | None = None
    last_exception: Exception | None = None

    for idx, candidate in enumerate(candidates):
        is_last_candidate = idx == len(candidates) - 1
        self.provider = candidate

        try:
            status, final_resp = await self._iter_single_provider(include_model=(idx == 0))
        except Exception as exc:  # noqa: BLE001
            last_exception = exc
            # log + continue
            continue

        if status is ProviderRunStatus.ERROR_BEFORE_STREAM and not is_last_candidate:
            last_err_response = final_resp
            # log + try next provider
            continue

        # success or last provider error
        if final_resp:
            yield final_resp
        return

    # fallbacks exhausted
    if last_err_response:
        yield last_err_response
        return
    if last_exception:
        yield LLMResponse(
            role="err",
            completion_text=f"All chat models failed: {type(last_exception).__name__}: {last_exception}",
        )
        return
    yield LLMResponse(
        role="err",
        completion_text="All available chat models are unavailable.",
    )

This preserves the current behavior (error before any chunk → try next provider; once streaming starts, stick with that provider) but removes the need to reason about multiple flags in the same loop.

  1. Decouple “include_model” from _iter_llm_responses
    The include_model flag exists solely to distinguish primary vs fallback providers. You can keep _iter_llm_responses focused on “iterate LLM responses for the current provider” by moving model selection into the caller.

For example:

def _build_llm_payload(self, *, model: str | None) -> dict[str, Any]:
    payload: dict[str, Any] = {
        "contexts": self.run_context.messages,
        "func_tool": self.req.func_tool,
        "session_id": self.req.session_id,
        "extra_user_content_parts": self.req.extra_user_content_parts,
    }
    if model:
        payload["model"] = model
    return payload


async def _iter_llm_responses(
    self, payload: dict[str, Any]
) -> AsyncIterator[LLMResponse]:
    if self.streaming:
        async for resp in self.provider.text_chat_stream(**payload):  # type: ignore
            yield resp
    else:
        yield await self.provider.text_chat(**payload)

Then callers decide whether to pass self.req.model (primary) or None (fallback), instead of _iter_llm_responses knowing about primary vs fallback:

payload = self._build_llm_payload(model=self.req.model if is_primary else None)
async for resp in self._iter_llm_responses(payload):
    ...
  1. Centralize provider de‑duplication
    You’re already doing id-based deduplication in __init__, and it sounds like reset / _get_fallback_chat_providers have similar logic. To avoid two places enforcing uniqueness, factor this into a small helper used everywhere:
def _dedupe_providers(
    self, primary: Provider, fallbacks: list[Provider] | None
) -> list[Provider]:
    seen_ids: set[str] = {str(primary.provider_config.get("id", ""))}
    deduped: list[Provider] = []

    for p in fallbacks or []:
        pid = str(p.provider_config.get("id", ""))
        if p is primary:
            continue
        if pid and pid in seen_ids:
            continue
        deduped.append(p)
        if pid:
            seen_ids.add(pid)
    return deduped

Use it in __init__ and any reset/_get_fallback_chat_providers logic:

self.provider = provider
self.fallback_providers = self._dedupe_providers(provider, fallback_providers)

This removes duplicated id-checking and keeps the uniqueness rules in a single, easy-to-audit place.

@Soulter Soulter merged commit 754144a into master Feb 15, 2026
5 checks passed
@Soulter Soulter deleted the feat/fallback-chat-models branch February 15, 2026 03:51
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:core The bug / feature is about astrbot's core, backend area:webui The bug / feature is about webui(dashboard) of astrbot. size:L This PR changes 100-499 lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant