Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/model-provider-errors.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@electric-ax/agents-runtime": patch
"@electric-ax/agents-server-ui": patch
---

Add default model-provider timeout/error handling for agent runs and render durable run errors in the UI.
3 changes: 3 additions & 0 deletions packages/agents-runtime/src/context-factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,9 @@ export function createHandlerContext<TState extends StateProxy = StateProxy>(
getApiKey: activeAgentConfig.getApiKey,

onPayload: activeAgentConfig.onPayload,

modelTimeoutMs: activeAgentConfig.modelTimeoutMs,
modelMaxRetries: activeAgentConfig.modelMaxRetries,
})
const handle = adapterFactory({
entityUrl: config.entityUrl,
Expand Down
7 changes: 7 additions & 0 deletions packages/agents-runtime/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,13 @@ export type {
export { buildSections, buildTimelineEntries } from './use-chat'
export type { EntityTimelineEntry } from './use-chat'
export { appendPathToUrl } from './url'
export {
ModelProviderError,
classifyModelProviderError,
modelProviderErrorMessage,
toModelProviderError,
} from './model-provider-error'
export type { ModelProviderErrorCode } from './model-provider-error'

export {
defaultProjection,
Expand Down
145 changes: 145 additions & 0 deletions packages/agents-runtime/src/model-provider-error.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
export type ModelProviderErrorCode =
| `MODEL_PROVIDER_TIMEOUT`
| `MODEL_PROVIDER_UNREACHABLE`
| `MODEL_PROVIDER_AUTH_FAILED`
| `MODEL_PROVIDER_RATE_LIMITED`
| `MODEL_PROVIDER_UNAVAILABLE`
| `MODEL_PROVIDER_ERROR`

export class ModelProviderError extends Error {
readonly code: ModelProviderErrorCode
readonly provider?: string
readonly model?: string

constructor(opts: {
code: ModelProviderErrorCode
message: string
provider?: string
model?: string
cause?: unknown
}) {
super(
opts.message,
opts.cause === undefined ? undefined : { cause: opts.cause }
)
this.name = `ModelProviderError`
this.code = opts.code
this.provider = opts.provider
this.model = opts.model
}
}

function stringifyError(error: unknown): string {
if (error instanceof Error) {
const cause = (error as { cause?: unknown }).cause
return [
error.name,
error.message,
cause === undefined ? `` : stringifyError(cause),
]
.filter(Boolean)
.join(` `)
}
return String(error)
}

export function classifyModelProviderError(
error: unknown
): ModelProviderErrorCode {
const text = stringifyError(error).toLowerCase()

if (
/\b(aborterror|timeouterror)\b/.test(text) ||
text.includes(`timeout`) ||
text.includes(`timed out`)
) {
return `MODEL_PROVIDER_TIMEOUT`
}

if (
text.includes(`401`) ||
text.includes(`invalid api key`) ||
text.includes(`authentication`) ||
text.includes(`unauthorized`)
) {
return `MODEL_PROVIDER_AUTH_FAILED`
}

if (text.includes(`429`) || text.includes(`rate limit`)) {
return `MODEL_PROVIDER_RATE_LIMITED`
}

if (
text.includes(`502`) ||
text.includes(`503`) ||
text.includes(`504`) ||
text.includes(`overloaded`) ||
text.includes(`unavailable`)
) {
return `MODEL_PROVIDER_UNAVAILABLE`
}

if (
text.includes(`enotfound`) ||
text.includes(`econnrefused`) ||
text.includes(`econnreset`) ||
text.includes(`eai_again`) ||
text.includes(`fetch failed`) ||
text.includes(`failed to fetch`) ||
text.includes(`network`)
) {
return `MODEL_PROVIDER_UNREACHABLE`
}

return `MODEL_PROVIDER_ERROR`
}

export function modelProviderErrorMessage(opts: {
code: ModelProviderErrorCode
provider?: string
}): string {
const provider = opts.provider
? displayProvider(opts.provider)
: `the model provider`
switch (opts.code) {
case `MODEL_PROVIDER_TIMEOUT`:
return `${provider} did not respond before the timeout. Check your Internet connection or provider status.`
case `MODEL_PROVIDER_UNREACHABLE`:
return `Could not reach ${provider}. Check your Internet connection or ${provider} status.`
case `MODEL_PROVIDER_AUTH_FAILED`:
return `${provider} rejected the API key. Check your model provider credentials.`
case `MODEL_PROVIDER_RATE_LIMITED`:
return `${provider} rate limited the request. Please wait and try again.`
case `MODEL_PROVIDER_UNAVAILABLE`:
return `${provider} is currently unavailable. Check provider status and try again.`
case `MODEL_PROVIDER_ERROR`:
return `${provider} returned an error. Check the runtime logs for provider details.`
}
}

export function toModelProviderError(
error: unknown,
opts: { provider?: string; model?: string }
): ModelProviderError {
if (error instanceof ModelProviderError) return error
const code = classifyModelProviderError(error)
const detail = error instanceof Error ? error.message : String(error)
return new ModelProviderError({
code,
provider: opts.provider,
model: opts.model,
message: `${modelProviderErrorMessage({ code, provider: opts.provider })} (${detail})`,
cause: error,
})
}

function displayProvider(provider: string): string {
switch (provider.toLowerCase()) {
case `anthropic`:
return `Anthropic`
case `openai`:
return `OpenAI`
default:
return provider
}
}
16 changes: 16 additions & 0 deletions packages/agents-runtime/src/outbound-bridge.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ export async function loadOutboundIdSeed(
export interface OutboundBridge {
onRunStart: () => void
onRunEnd: (opts?: { finishReason?: string }) => void
onError: (opts: { errorCode: string; message: string }) => void
onStepStart: (opts?: { modelProvider?: string; modelId?: string }) => void
onStepEnd: (opts?: {
finishReason?: string
Expand Down Expand Up @@ -193,6 +194,21 @@ export function createOutboundBridge(
currentRunKey = null
},

onError(opts: { errorCode: string; message: string }) {
if (!currentRunKey) return
writeEvent(
entityStateSchema.errors.insert({
key: `${currentRunKey}:error-${crypto.randomUUID()}`,
value: {
error_code: opts.errorCode,
message: opts.message,
run_id: currentRunKey,
...(currentStepKey ? { step_id: currentStepKey } : {}),
} as never,
}) as ChangeEvent
)
},

onStepStart(opts?: { modelProvider?: string; modelId?: string }) {
const runKey = requireActiveRun(`onStepStart`)
currentStepKey = `step-${counters.step++}`
Expand Down
90 changes: 83 additions & 7 deletions packages/agents-runtime/src/pi-adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ import { getModel } from '@mariozechner/pi-ai'
import { createOutboundBridge } from './outbound-bridge'
import { MOONSHOT_PROVIDER, getMoonshotModel } from './moonshot-models'
import { runtimeLog } from './log'
import {
ModelProviderError,
toModelProviderError,
} from './model-provider-error'
import type { OutboundIdSeed } from './outbound-bridge'
import type { ChangeEvent } from '@durable-streams/state'
import type {
Expand Down Expand Up @@ -42,6 +46,25 @@ export interface PiAdapterOptions {
provider: string
) => Promise<string | undefined> | string | undefined
onPayload?: SimpleStreamOptions[`onPayload`]
modelTimeoutMs?: number
modelMaxRetries?: number
}

const DEFAULT_MODEL_TIMEOUT_MS = 30_000
const DEFAULT_MODEL_MAX_RETRIES = 0

function readPositiveIntEnv(name: string): number | undefined {
const raw = process.env[name]
if (!raw) return undefined
const parsed = Number(raw)
return Number.isFinite(parsed) && parsed > 0 ? Math.floor(parsed) : undefined
}

function readNonNegativeIntEnv(name: string): number | undefined {
const raw = process.env[name]
if (!raw) return undefined
const parsed = Number(raw)
return Number.isFinite(parsed) && parsed >= 0 ? Math.floor(parsed) : undefined
}

interface PiAgentAdapterConfig {
Expand Down Expand Up @@ -227,8 +250,16 @@ export function createPiAgentAdapter(
model: opts.model,
...(opts.provider && { provider: opts.provider }),
})

const agent = new Agent({
const modelTimeoutMs =
opts.modelTimeoutMs ??
readPositiveIntEnv(`ELECTRIC_AGENTS_MODEL_TIMEOUT_MS`) ??
DEFAULT_MODEL_TIMEOUT_MS
const modelMaxRetries =
opts.modelMaxRetries ??
readNonNegativeIntEnv(`ELECTRIC_AGENTS_MODEL_MAX_RETRIES`) ??
DEFAULT_MODEL_MAX_RETRIES

const agentOptions = {
initialState: {
systemPrompt: opts.systemPrompt,
tools: opts.tools as Array<never>,
Expand All @@ -238,7 +269,18 @@ export function createPiAgentAdapter(
...(opts.streamFn && { streamFn: opts.streamFn }),
...(opts.getApiKey && { getApiKey: opts.getApiKey }),
...(opts.onPayload && { onPayload: opts.onPayload }),
})
// Pi forwards these options to provider stream calls in current releases.
// Keep them as a top-level passthrough so unreachable providers settle
// even when the caller did not provide a custom stream function. Older
// type definitions don't expose them yet, so keep our timeout fallback
// below as the hard guarantee.
timeoutMs: modelTimeoutMs,
maxRetries: modelMaxRetries,
}

const agent = new Agent(
agentOptions as ConstructorParameters<typeof Agent>[0]
)

function processAgentEvents(
resolveWhenDone: () => void,
Expand Down Expand Up @@ -361,8 +403,11 @@ export function createPiAgentAdapter(
})

if (isError) {
throw new Error(
`pi-agent message_end error: ${msg.errorMessage ?? `unknown error`} (stopReason=${msg.stopReason ?? `none`})`
throw toModelProviderError(
new Error(
`pi-agent message_end error: ${msg.errorMessage ?? `unknown error`} (stopReason=${msg.stopReason ?? `none`})`
),
{ provider: model.provider, model: model.id }
)
}
break
Expand Down Expand Up @@ -437,20 +482,38 @@ export function createPiAgentAdapter(
let settled = false
let unsubscribe = (): void => {}
let abortFallback: ReturnType<typeof setTimeout> | null = null
let modelTimeout: ReturnType<typeof setTimeout> | null = null
const clearAbortFallback = (): void => {
if (!abortFallback) return
clearTimeout(abortFallback)
abortFallback = null
}
const clearModelTimeout = (): void => {
if (!modelTimeout) return
clearTimeout(modelTimeout)
modelTimeout = null
}
const finish = (finishReason: `stop` | `aborted` | `error`): void => {
if (settled) return
settled = true
clearAbortFallback()
clearModelTimeout()
running = false
abortSignal?.removeEventListener(`abort`, abortRun)
unsubscribe()
bridge.onRunEnd({ finishReason })
}
const failWithProviderError = (err: unknown): ModelProviderError => {
const providerError = toModelProviderError(err, {
provider: model.provider,
model: model.id,
})
bridge.onError({
errorCode: providerError.code,
message: providerError.message,
})
return providerError
}
const abortRun = (): void => {
if (settled) return
abortedRun = true
Expand All @@ -476,12 +539,24 @@ export function createPiAgentAdapter(
},
(err) => {
if (settled) return
const providerError = failWithProviderError(err)
finish(`error`)
reject(err)
reject(providerError)
}
)

abortSignal?.addEventListener(`abort`, abortRun, { once: true })
modelTimeout = setTimeout(() => {
if (settled) return
const providerError = failWithProviderError(
new Error(
`model provider request timed out after ${modelTimeoutMs}ms`
)
)
agent.abort()
finish(`error`)
reject(providerError)
}, modelTimeoutMs)
const runPromise =
input !== undefined ? agent.prompt(input) : agent.continue()
if (abortSignal?.aborted) {
Expand All @@ -491,8 +566,9 @@ export function createPiAgentAdapter(
Promise.resolve(runPromise).catch((err: Error) => {
if (settled) return
if (abortedRun) return
const providerError = failWithProviderError(err)
finish(`error`)
reject(err)
reject(providerError)
})
})
},
Expand Down
Loading
Loading