-
-
Notifications
You must be signed in to change notification settings - Fork 87
Add webhook circuit breaker #124
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Add webhook circuit breaker #124
Conversation
📝 WalkthroughWalkthroughAdds a Redis-backed webhook circuit breaker with local in-process checks integrated into outgoing HTTP sends; shortens consumer and webhook/fetch timeouts; introduces WEBHOOK_CB_* env vars and many webhook-related flags; adjusts AMQP retry delay for circuit-open errors; updates README docs and examples. Changes
Sequence DiagramsequenceDiagram
participant Producer as Client/Producer
participant AMQP as AMQP Consumer
participant API as OutgoingCloudApi
participant Redis as Redis
participant Webhook as Webhook Endpoint
Producer->>AMQP: publish webhook message
AMQP->>API: process message -> sendHttp()
API->>Redis: isWebhookCircuitOpen(session, webhookId)?
Redis-->>API: open / closed
alt Circuit Open
API-->>AMQP: throw WebhookCircuitOpenError (with delayMs)
AMQP->>AMQP: schedule retry using error.delayMs or WEBHOOK_CB_REQUEUE_DELAY_MS
else Circuit Closed
API->>Webhook: HTTP POST (with WEBHOOK_TIMEOUT_MS / FETCH_TIMEOUT_MS)
alt Success (2xx)
Webhook-->>API: success
API->>Redis: closeWebhookCircuit(session, webhookId)
API-->>AMQP: ack
else Failure / timeout / non-2xx
Webhook-->>API: error
API->>Redis: bumpWebhookCircuitFailure(session, webhookId, ttl)
Redis-->>API: new failure count
alt Threshold reached
API->>Redis: openWebhookCircuit(session, webhookId, openMs)
API-->>AMQP: throw WebhookCircuitOpenError (with requeue delayMs)
else Below threshold
API-->>AMQP: nack / standard retry
end
end
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Poem
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@src/services/outgoing_cloud_api.ts`:
- Around line 111-131: sendHttp currently swallows failures when the circuit
breaker is enabled by calling handleCircuitFailure(...) and returning, which
prevents the AMQP consumer from seeing errors and triggering
retries/dead-lettering; after calling handleCircuitFailure(phone, cbId, cbKey,
error) (on fetch/network error) and after handleCircuitFailure(phone, cbId,
cbKey, errText) (on non-OK response), rethrow the original error/errText so the
exception contract is preserved; keep the existing closeWebhookCircuit(phone,
cbId) try/catch and resetCircuitLocal(cbKey) behavior but ensure errors are not
silently returned in sendHttp so the amqpConsume framework can handle them.
🧹 Nitpick comments (1)
src/services/outgoing_cloud_api.ts (1)
156-188: Consider adding cleanup for stale local circuit state entries.The module-level
cbOpenUntilandcbFailStateMaps grow unbounded as new webhook URLs are encountered. WhileisCircuitOpenLocalcleans up expired entries on access, entries for webhooks that are no longer called will accumulate indefinitely.For most deployments with stable webhook configurations this is negligible, but high-churn scenarios could see gradual memory growth.
♻️ Optional: Add periodic cleanup or size cap
+const MAX_LOCAL_CB_ENTRIES = 1000 + const resetCircuitLocal = (key: string) => { cbOpenUntil.delete(key) cbFailState.delete(key) } + +const cleanupStaleEntries = () => { + const now = Date.now() + for (const [key, until] of cbOpenUntil) { + if (now >= until) cbOpenUntil.delete(key) + } + for (const [key, state] of cbFailState) { + if (now >= state.exp) cbFailState.delete(key) + } +}Then call
cleanupStaleEntries()periodically or when maps exceedMAX_LOCAL_CB_ENTRIES.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@src/services/outgoing_cloud_api.ts`:
- Around line 47-58: The try/catch around the Redis circuit breaker check is too
broad and swallows the WebhookCircuitOpenError; narrow the try to only call
isWebhookCircuitOpen(phone, cbId) and handle its errors, then perform the throw
new WebhookCircuitOpenError(...) outside the catch so the exception is not
consumed. Specifically, in the cbEnabled branch ensure isWebhookCircuitOpen is
awaited inside a try that only catches/handles Redis/network errors, and if it
returns true then call logger.warn(...) and throw WebhookCircuitOpenError (using
this.cbRequeueDelayMs()) outside the catch; keep the local check using
isCircuitOpenLocal(cbKey, now) unchanged.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
README.md (1)
580-582: Duplicate key in JSON example.Line 581 has
"sendGroupMessages"defined twice, which is invalid JSON (the second value would override the first). This appears to be a pre-existing issue, but it should be corrected.🐛 Proposed fix
"token": "kslflkhlkwq", "header": "api_access_token", "sendGroupMessages": false, - "sendGroupMessages": false, "sendNewMessages": false,
♻️ Duplicate comments (1)
src/services/outgoing_cloud_api.ts (1)
47-59: Critical:WebhookCircuitOpenErroris swallowed by the empty catch block.The
throw new WebhookCircuitOpenError(...)on line 52 is inside the try block, and the emptycatch {}on line 54 swallows it. When Redis reports the circuit is open, the error is caught and discarded, and execution continues—completely bypassing the Redis-based circuit breaker check.The try/catch should only wrap the
isWebhookCircuitOpencall, not the throw statement.🐛 Proposed fix
if (cbEnabled) { + let redisOpen = false try { - const open = await isWebhookCircuitOpen(phone, cbId) - if (open) { - logger.warn('WEBHOOK_CB open: skipping send (phone=%s webhook=%s)', phone, cbId) - throw new WebhookCircuitOpenError(`WEBHOOK_CB open for ${cbId}`, this.cbRequeueDelayMs()) - } - } catch {} + redisOpen = await isWebhookCircuitOpen(phone, cbId) + } catch { + // Redis unavailable - fall through to local check + } + if (redisOpen) { + logger.warn('WEBHOOK_CB open: skipping send (phone=%s webhook=%s)', phone, cbId) + throw new WebhookCircuitOpenError(`WEBHOOK_CB open for ${cbId}`, this.cbRequeueDelayMs()) + } if (isCircuitOpenLocal(cbKey, now)) {
🧹 Nitpick comments (1)
src/services/outgoing_cloud_api.ts (1)
151-173: Consider logging the original error details when the circuit handler itself fails.When the catch block at line 168 handles an exception from the circuit breaker logic, the original webhook error (
errorparameter) is logged at line 171, but the circuit handler exception (e) is logged with a generic message. This could make debugging difficult if both Redis and logging fail.Also, the method returns
falsewhen an exception occurs (line 172), which means the original error will be thrown but the circuit won't open. This is reasonable as a fallback, but worth documenting.♻️ Minor improvement for clarity
} catch (e) { - logger.warn(e as any, 'WEBHOOK_CB failure handler error') + logger.warn(e as any, 'WEBHOOK_CB failure handler error (phone=%s webhook=%s)', phone, cbId) } - try { logger.warn(error as any, 'WEBHOOK_CB send failed (phone=%s webhook=%s)', phone, cbId) } catch {} + try { + logger.warn(error as any, 'WEBHOOK_CB send failed (phone=%s webhook=%s)', phone, cbId) + } catch { + // Logging failed - nothing more we can do + } return false
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@src/services/outgoing_cloud_api.ts`:
- Around line 148-176: The trailing logging and return after the try/catch in
handleCircuitFailure are unreachable because both the try and catch blocks
always return; remove the final try { logger.warn(error...) } catch {} and final
return false, or move that log into the catch block before its return (and
ensure it references the correct error variable), so that there is no dead code
after the try/catch in the handleCircuitFailure method.
- Around line 131-139: The code currently throws a raw string (errText); instead
create and throw an Error object containing response metadata and body so
upstream retains stack/context. Build an Error (e.g., new Error(`HTTP
${response?.status}: ${response?.statusText}`)) and attach properties like
status = response?.status, statusText = response?.statusText, and body =
errText, then pass that Error object into handleCircuitFailure(phone, cbId,
cbKey, err) (matching the earlier catch usage) and, if opened, throw the
WebhookCircuitOpenError as before; otherwise throw the constructed Error.
Summary\n- Add webhook circuit breaker to fail fast when endpoints are offline\n- Add Redis-backed counters/state (with in-memory fallback)\n- Document new env vars and behavior, plus example config\n- Reduce default webhook timeout to 60s to avoid long consumer blocking\n\n## Why\nOffline webhook endpoints were causing long blocking and queue backlog. The circuit breaker short-circuits repeated failures and auto-recovers after a cool-down.\n\n## Env vars\n- WEBHOOK_CB_ENABLED (default true)\n- WEBHOOK_CB_FAILURE_THRESHOLD (default 1)\n- WEBHOOK_CB_FAILURE_TTL_MS (default 300000)\n- WEBHOOK_CB_OPEN_MS (default 120000)\n- WEBHOOK_TIMEOUT_MS (default 60000)\n\n## Notes\n- When the circuit is open, sends to that endpoint are skipped until the open window expires.\n- After the window, sends resume automatically.\n
Summary by CodeRabbit
New Features
Bug Fixes / Behavior
Documentation
✏️ Tip: You can customize this high-level summary in your review settings.