|
258 | 258 |
|
259 | 259 | (def ^:private max-init-retries 3) |
260 | 260 |
|
261 | | -(defn ^:private transient-http-error? |
262 | | - "Checks if the exception root cause is a transient HTTP error (e.g. chunked |
263 | | - encoding EOF) that warrants a retry. This can happen when infrastructure |
264 | | - (load balancers, proxies) closes HTTP streaming connections." |
| 261 | +(defn ^:private transient-transport-error? |
| 262 | + "Checks if the exception (or its root cause) is a transient transport error |
| 263 | + that warrants a retry. This covers infrastructure issues like load balancers |
| 264 | + or proxies closing connections, network interruptions, and connection resets." |
265 | 265 | [^Exception e] |
266 | | - (let [cause (.getCause e)] |
267 | | - (and (instance? IOException cause) |
268 | | - (when-let [msg (.getMessage cause)] |
269 | | - (or (string/includes? msg "chunked transfer encoding") |
270 | | - (string/includes? msg "EOF reached while reading")))))) |
| 266 | + (letfn [(transient-io-msg? [^String msg] |
| 267 | + (or (string/includes? msg "chunked transfer encoding") |
| 268 | + (string/includes? msg "EOF reached while reading") |
| 269 | + (string/includes? msg "Connection reset") |
| 270 | + (string/includes? msg "Connection refused") |
| 271 | + (string/includes? msg "Broken pipe") |
| 272 | + (string/includes? msg "Read timed out")))] |
| 273 | + (or (and (instance? IOException e) |
| 274 | + (when-let [msg (.getMessage e)] |
| 275 | + (transient-io-msg? msg))) |
| 276 | + (when-let [cause (.getCause e)] |
| 277 | + (and (instance? IOException cause) |
| 278 | + (when-let [msg (.getMessage cause)] |
| 279 | + (transient-io-msg? msg))))))) |
271 | 280 |
|
272 | 281 | (defn ^:private initialize-server! [name db* config metrics on-server-updated] |
273 | 282 | (let [db @db* |
|
327 | 336 | :ok) |
328 | 337 | (catch Exception e |
329 | 338 | (try (pp/stop-client-transport! transport false) (catch Exception _)) |
330 | | - (if (and (transient-http-error? e) (< attempt max-init-retries)) |
| 339 | + (if (and (transient-transport-error? e) (< attempt max-init-retries)) |
331 | 340 | (do |
332 | 341 | (logger/warn logger-tag (format "Transient HTTP error initializing MCP server %s (attempt %d/%d), retrying: %s" |
333 | 342 | name attempt max-init-retries |
|
528 | 537 | (def ^:private reinit-poll-interval-ms 100) |
529 | 538 | (def ^:private tool-call-timeout-ms 120000) |
530 | 539 |
|
| 540 | +(defn ^:private tool-call-error [msg] |
| 541 | + {:error true |
| 542 | + :contents [{:type :text :text msg}]}) |
| 543 | + |
531 | 544 | (defn ^:private do-call-tool |
532 | 545 | "Execute a tool call. When needs-reinit?* is provided (HTTP transport), runs |
533 | 546 | pmc/call-tool in a future and polls for transport errors (404/5xx) so we can |
|
537 | 550 | (locking mcp-client |
538 | 551 | (when needs-reinit?* |
539 | 552 | (reset! needs-reinit?* false)) |
540 | | - (let [call-opts {:on-error (fn [_id jsonrpc-error] |
541 | | - (logger/warn logger-tag "Error calling tool:" (:message jsonrpc-error)) |
| 553 | + (let [error-msg* (atom nil) |
| 554 | + call-opts {:on-error (fn [_id jsonrpc-error] |
| 555 | + (let [msg (or (:message jsonrpc-error) "Unknown JSON-RPC error")] |
| 556 | + (logger/warn logger-tag "Error calling tool:" msg) |
| 557 | + (reset! error-msg* msg)) |
542 | 558 | nil)} |
543 | 559 | call-future (future (pmc/call-tool mcp-client name arguments call-opts)) |
544 | | - result (if needs-reinit?* |
545 | | - (loop [elapsed (long 0)] |
546 | | - (cond |
547 | | - (realized? call-future) |
548 | | - (deref call-future) |
549 | | - |
550 | | - @needs-reinit?* |
551 | | - (do (future-cancel call-future) nil) |
552 | | - |
553 | | - (>= elapsed (long tool-call-timeout-ms)) |
554 | | - (do (future-cancel call-future) nil) |
555 | | - |
556 | | - :else |
557 | | - (do (Thread/sleep (long reinit-poll-interval-ms)) |
558 | | - (recur (+ elapsed (long reinit-poll-interval-ms)))))) |
559 | | - (deref call-future))] |
560 | | - (if result |
| 560 | + result (try |
| 561 | + (if needs-reinit?* |
| 562 | + (loop [elapsed (long 0)] |
| 563 | + (cond |
| 564 | + (realized? call-future) |
| 565 | + (deref call-future) |
| 566 | + |
| 567 | + @needs-reinit?* |
| 568 | + (do (future-cancel call-future) ::connection-lost) |
| 569 | + |
| 570 | + (>= elapsed (long tool-call-timeout-ms)) |
| 571 | + (do (future-cancel call-future) ::timeout) |
| 572 | + |
| 573 | + :else |
| 574 | + (do (Thread/sleep (long reinit-poll-interval-ms)) |
| 575 | + (recur (+ elapsed (long reinit-poll-interval-ms)))))) |
| 576 | + (deref call-future)) |
| 577 | + (catch Exception e |
| 578 | + (future-cancel call-future) |
| 579 | + (if (transient-transport-error? e) |
| 580 | + (do (logger/warn logger-tag (format "Transient transport error, retrying tool call: %s" (.getMessage e))) |
| 581 | + ::retry) |
| 582 | + (do (logger/warn logger-tag (format "Error during tool call: %s" (or (.getMessage e) (.getName (class e))))) |
| 583 | + {::error-msg (or (.getMessage e) (.getName (class e)))}))))] |
| 584 | + (cond |
| 585 | + (= ::retry result) |
| 586 | + nil |
| 587 | + |
| 588 | + (= ::timeout result) |
| 589 | + (tool-call-error (format "MCP tool call timed out after %ds" (/ tool-call-timeout-ms 1000))) |
| 590 | + |
| 591 | + (= ::connection-lost result) |
| 592 | + (tool-call-error "MCP server connection lost during tool call") |
| 593 | + |
| 594 | + (::error-msg result) |
| 595 | + (tool-call-error (format "MCP server error: %s" (::error-msg result))) |
| 596 | + |
| 597 | + (map? result) |
561 | 598 | {:error (:isError result) |
562 | 599 | :contents (into [] (keep ->content) (:content result))} |
563 | | - {:error true |
564 | | - :contents nil})))) |
| 600 | + |
| 601 | + :else |
| 602 | + (tool-call-error (or @error-msg* "MCP server returned empty response")))))) |
565 | 603 |
|
566 | 604 | (defn ^:private reinit-and-call-tool! [server-name mcp-client db* config metrics name arguments] |
567 | 605 | (reinitialize-server! server-name mcp-client db* config metrics) |
568 | 606 | (if-let [new-client (get-in @db* [:mcp-clients server-name :client])] |
569 | 607 | (let [new-needs-reinit?* (get-in @db* [:mcp-clients server-name :needs-reinit?*])] |
570 | 608 | (do-call-tool new-client name arguments new-needs-reinit?*)) |
571 | | - {:error true |
572 | | - :contents nil})) |
| 609 | + (tool-call-error (format "Failed to re-initialize MCP server '%s'" server-name)))) |
573 | 610 |
|
574 | 611 | (defn call-tool! [name arguments {:keys [db db* config metrics]}] |
575 | 612 | (if-let [[server-name mcp-client needs-reinit?*] |
|
582 | 619 | ;; Already flagged (e.g. GET stream 5xx) — reinit before attempting the call |
583 | 620 | (reinit-and-call-tool! server-name mcp-client db* config metrics name arguments) |
584 | 621 | (let [result (do-call-tool mcp-client name arguments needs-reinit?*)] |
585 | | - (if (and (:error result) needs-reinit?* @needs-reinit?* db* config metrics) |
| 622 | + (cond |
| 623 | + ;; nil = transient transport error, retry once |
| 624 | + (nil? result) |
| 625 | + (do-call-tool mcp-client name arguments needs-reinit?*) |
| 626 | + |
586 | 627 | ;; Flagged during the call (e.g. POST 404) — reinit and retry |
| 628 | + (and (:error result) needs-reinit?* @needs-reinit?* db* config metrics) |
587 | 629 | (reinit-and-call-tool! server-name mcp-client db* config metrics name arguments) |
588 | | - result))) |
589 | | - {:error true |
590 | | - :contents nil})) |
| 630 | + |
| 631 | + :else result))) |
| 632 | + (tool-call-error (format "Tool '%s' not found in any connected MCP server" name)))) |
591 | 633 |
|
592 | 634 | (defn all-prompts [db] |
593 | 635 | (into [] |
|
0 commit comments