Refactor OpenAI client to fix connection leaks and improve error telemetry#247
Conversation
jjk-g
left a comment
There was a problem hiding this comment.
Thanks for adding! One nit
/lgtm
achandrasekar
left a comment
There was a problem hiding this comment.
Can you add how the change was tested and if you have any numbers on improvements that'd be great too?
|
Please address the linting and type check issue above |
|
@LukeAVanDrie friendly ping for linting and type check errors |
Slightly refactor `openAIModelServerClient` to accept a custom
`aiohttp.ClientSession` per request, which allows us to use exactly 1
client session per worker.
Prior to this commit, a new `aiohttp.ClientSession` is created for each
request. Not only is this inefficient and lowers throughput, on certain
environments, it also leads to inotify watch issues:
aiodns - WARNING - Failed to create DNS resolver channel with
automatic monitoring of resolver configuration changes. This usually
means the system ran out of inotify watches. Falling back to socket
state callback. Consider increasing the system inotify watch limit:
Failed to initialize c-ares channel
Indeed, because each DNS resolver is created for a new `ClientSession`,
creating tons of new `ClientSession`s causes eventual inotify watch
exhaustion. Sharing `ClientSession`s solves this issue.
Relevant links:
- https://docs.aiohttp.org/en/stable/http_request_lifecycle.html
- https://stackoverflow.com/questions/62707369/one-aiohttp-clientsession-per-thread
- home-assistant/core#144457 (comment)
Relevant PR: kubernetes-sigs#247
(doesn't address the issue of worker sharing).
Slightly refactor `openAIModelServerClient` to accept a custom
`aiohttp.ClientSession` per request, which allows us to use exactly 1
client session per worker.
Prior to this commit, a new `aiohttp.ClientSession` is created for each
request. Not only is this inefficient and lowers throughput, on certain
environments, it also leads to inotify watch issues:
aiodns - WARNING - Failed to create DNS resolver channel with
automatic monitoring of resolver configuration changes. This usually
means the system ran out of inotify watches. Falling back to socket
state callback. Consider increasing the system inotify watch limit:
Failed to initialize c-ares channel
Indeed, because each DNS resolver is created for a new `ClientSession`,
creating tons of new `ClientSession`s causes eventual inotify watch
exhaustion. Sharing `ClientSession`s solves this issue.
Relevant links:
- https://docs.aiohttp.org/en/stable/http_request_lifecycle.html
- https://stackoverflow.com/questions/62707369/one-aiohttp-clientsession-per-thread
- home-assistant/core#144457 (comment)
Relevant PR: kubernetes-sigs#247
(doesn't address the issue of worker sharing).
Slightly refactor `openAIModelServerClient` to accept a custom
`aiohttp.ClientSession` per request, which allows us to use exactly 1
client session per worker.
Prior to this commit, a new `aiohttp.ClientSession` is created for each
request. Not only is this inefficient and lowers throughput, on certain
environments, it also leads to inotify watch issues:
aiodns - WARNING - Failed to create DNS resolver channel with
automatic monitoring of resolver configuration changes. This usually
means the system ran out of inotify watches. Falling back to socket
state callback. Consider increasing the system inotify watch limit:
Failed to initialize c-ares channel
Indeed, because each DNS resolver is created for a new `ClientSession`,
creating tons of new `ClientSession`s causes eventual inotify watch
exhaustion. Sharing `ClientSession`s solves this issue.
Relevant links:
- https://docs.aiohttp.org/en/stable/http_request_lifecycle.html
- https://stackoverflow.com/questions/62707369/one-aiohttp-clientsession-per-thread
- home-assistant/core#144457 (comment)
Relevant PR: kubernetes-sigs#247
(doesn't address the issue of worker sharing).
Slightly refactor `openAIModelServerClient` to accept a custom
`aiohttp.ClientSession` per request, which allows us to use exactly 1
client session per worker.
Prior to this commit, a new `aiohttp.ClientSession` is created for each
request. Not only is this inefficient and lowers throughput, on certain
environments, it also leads to inotify watch issues:
aiodns - WARNING - Failed to create DNS resolver channel with
automatic monitoring of resolver configuration changes. This usually
means the system ran out of inotify watches. Falling back to socket
state callback. Consider increasing the system inotify watch limit:
Failed to initialize c-ares channel
Indeed, because each DNS resolver is created for a new `ClientSession`,
creating tons of new `ClientSession`s causes eventual inotify watch
exhaustion. Sharing `ClientSession`s solves this issue.
Relevant links:
- https://docs.aiohttp.org/en/stable/http_request_lifecycle.html
- https://stackoverflow.com/questions/62707369/one-aiohttp-clientsession-per-thread
- home-assistant/core#144457 (comment)
Relevant PR: kubernetes-sigs#247
(doesn't address the issue of worker sharing).
Slightly refactor `openAIModelServerClient` to add a new method,
`process_request_with_session`, that accepts a custom
`ReusableHTTPClientSession` per request, which allows the caller to
reuse an HTTP client session per worker.
The previous method, `process_request`, is made to create a fresh HTTP
client session then call `process_request_with_session`, preserving the
previous behavior.
Prior to this commit, a new `aiohttp.ClientSession` is created for each
request. Not only is this inefficient and lowers throughput, on certain
environments, it also leads to inotify watch issues:
aiodns - WARNING - Failed to create DNS resolver channel with
automatic monitoring of resolver configuration changes. This usually
means the system ran out of inotify watches. Falling back to socket
state callback. Consider increasing the system inotify watch limit:
Failed to initialize c-ares channel
Indeed, because each DNS resolver is created for a new `ClientSession`,
creating tons of new `ClientSession`s causes eventual inotify watch
exhaustion. Sharing `ClientSession`s solves this issue.
Relevant links:
- https://docs.aiohttp.org/en/stable/http_request_lifecycle.html
- https://stackoverflow.com/questions/62707369/one-aiohttp-clientsession-per-thread
- home-assistant/core#144457 (comment)
Relevant PR: kubernetes-sigs#247
(doesn't address the issue of worker sharing).
Slightly refactor `openAIModelServerClient` to add a new method,
`process_request_with_session`, that accepts a custom
`ReusableHTTPClientSession` per request, which allows the caller to
reuse an HTTP client session per worker.
The previous method, `process_request`, is made to create a fresh HTTP
client session then call `process_request_with_session`, preserving the
previous behavior.
Prior to this commit, a new `aiohttp.ClientSession` is created for each
request. Not only is this inefficient and lowers throughput, on certain
environments, it also leads to inotify watch issues:
aiodns - WARNING - Failed to create DNS resolver channel with
automatic monitoring of resolver configuration changes. This usually
means the system ran out of inotify watches. Falling back to socket
state callback. Consider increasing the system inotify watch limit:
Failed to initialize c-ares channel
Indeed, because each DNS resolver is created for a new `ClientSession`,
creating tons of new `ClientSession`s causes eventual inotify watch
exhaustion. Sharing `ClientSession`s solves this issue.
Relevant links:
- https://docs.aiohttp.org/en/stable/http_request_lifecycle.html
- https://stackoverflow.com/questions/62707369/one-aiohttp-clientsession-per-thread
- home-assistant/core#144457 (comment)
Relevant PR: kubernetes-sigs#247
(doesn't address the issue of worker sharing).
Slightly refactor `openAIModelServerClient` to add a new method,
`process_request_with_session`, that accepts a custom
`ReusableHTTPClientSession` per request, which allows the caller to
reuse an HTTP client session per worker.
The previous method, `process_request`, is made to create a fresh HTTP
client session then call `process_request_with_session`, preserving the
previous behavior.
Prior to this commit, a new `aiohttp.ClientSession` is created for each
request. Not only is this inefficient and lowers throughput, on certain
environments, it also leads to inotify watch issues:
aiodns - WARNING - Failed to create DNS resolver channel with
automatic monitoring of resolver configuration changes. This usually
means the system ran out of inotify watches. Falling back to socket
state callback. Consider increasing the system inotify watch limit:
Failed to initialize c-ares channel
Indeed, because each DNS resolver is created for a new `ClientSession`,
creating tons of new `ClientSession`s causes eventual inotify watch
exhaustion. Sharing `ClientSession`s solves this issue.
Relevant links:
- https://docs.aiohttp.org/en/stable/http_request_lifecycle.html
- https://stackoverflow.com/questions/62707369/one-aiohttp-clientsession-per-thread
- home-assistant/core#144457 (comment)
Relevant PR: kubernetes-sigs#247
(doesn't address the issue of worker sharing).
Slightly refactor `openAIModelServerClient` to add a new method,
`process_request_with_session`, that accepts a custom
`ReusableHTTPClientSession` per request, which allows the caller to
reuse an HTTP client session per worker.
The previous method, `process_request`, is made to create a fresh HTTP
client session then call `process_request_with_session`, preserving the
previous behavior.
Prior to this commit, a new `aiohttp.ClientSession` is created for each
request. Not only is this inefficient and lowers throughput, on certain
environments, it also leads to inotify watch issues:
aiodns - WARNING - Failed to create DNS resolver channel with
automatic monitoring of resolver configuration changes. This usually
means the system ran out of inotify watches. Falling back to socket
state callback. Consider increasing the system inotify watch limit:
Failed to initialize c-ares channel
Indeed, because each DNS resolver is created for a new `ClientSession`,
creating tons of new `ClientSession`s causes eventual inotify watch
exhaustion. Sharing `ClientSession`s solves this issue.
Relevant links:
- https://docs.aiohttp.org/en/stable/http_request_lifecycle.html
- https://stackoverflow.com/questions/62707369/one-aiohttp-clientsession-per-thread
- home-assistant/core#144457 (comment)
Relevant PR: kubernetes-sigs#247
(doesn't address the issue of worker sharing).
|
@LukeAVanDrie Thanks for the contribution. Can you please rebase this PR? |
Yes, apologies for the long delay here. I will make sure to update the description with my testing results and verify @diamondburned's concern regarding multiprocessing. |
Slightly refactor `openAIModelServerClient` to add a new method,
`process_request_with_session`, that accepts a custom
`ReusableHTTPClientSession` per request, which allows the caller to
reuse an HTTP client session per worker.
The previous method, `process_request`, is made to create a fresh HTTP
client session then call `process_request_with_session`, preserving the
previous behavior.
Prior to this commit, a new `aiohttp.ClientSession` is created for each
request. Not only is this inefficient and lowers throughput, on certain
environments, it also leads to inotify watch issues:
aiodns - WARNING - Failed to create DNS resolver channel with
automatic monitoring of resolver configuration changes. This usually
means the system ran out of inotify watches. Falling back to socket
state callback. Consider increasing the system inotify watch limit:
Failed to initialize c-ares channel
Indeed, because each DNS resolver is created for a new `ClientSession`,
creating tons of new `ClientSession`s causes eventual inotify watch
exhaustion. Sharing `ClientSession`s solves this issue.
Relevant links:
- https://docs.aiohttp.org/en/stable/http_request_lifecycle.html
- https://stackoverflow.com/questions/62707369/one-aiohttp-clientsession-per-thread
- home-assistant/core#144457 (comment)
Relevant PR: kubernetes-sigs#247
(doesn't address the issue of worker sharing).
Slightly refactor `openAIModelServerClient` to add a new method,
`process_request_with_session`, that accepts a custom
`ReusableHTTPClientSession` per request, which allows the caller to
reuse an HTTP client session per worker.
The previous method, `process_request`, is made to create a fresh HTTP
client session then call `process_request_with_session`, preserving the
previous behavior.
Prior to this commit, a new `aiohttp.ClientSession` is created for each
request. Not only is this inefficient and lowers throughput, on certain
environments, it also leads to inotify watch issues:
aiodns - WARNING - Failed to create DNS resolver channel with
automatic monitoring of resolver configuration changes. This usually
means the system ran out of inotify watches. Falling back to socket
state callback. Consider increasing the system inotify watch limit:
Failed to initialize c-ares channel
Indeed, because each DNS resolver is created for a new `ClientSession`,
creating tons of new `ClientSession`s causes eventual inotify watch
exhaustion. Sharing `ClientSession`s solves this issue.
Relevant links:
- https://docs.aiohttp.org/en/stable/http_request_lifecycle.html
- https://stackoverflow.com/questions/62707369/one-aiohttp-clientsession-per-thread
- home-assistant/core#144457 (comment)
Relevant PR: kubernetes-sigs#247
(doesn't address the issue of worker sharing).
Slightly refactor `openAIModelServerClient` to add a new method,
`process_request_with_session`, that accepts a custom
`ReusableHTTPClientSession` per request, which allows the caller to
reuse an HTTP client session per worker.
The previous method, `process_request`, is made to create a fresh HTTP
client session then call `process_request_with_session`, preserving the
previous behavior.
Prior to this commit, a new `aiohttp.ClientSession` is created for each
request. Not only is this inefficient and lowers throughput, on certain
environments, it also leads to inotify watch issues:
aiodns - WARNING - Failed to create DNS resolver channel with
automatic monitoring of resolver configuration changes. This usually
means the system ran out of inotify watches. Falling back to socket
state callback. Consider increasing the system inotify watch limit:
Failed to initialize c-ares channel
Indeed, because each DNS resolver is created for a new `ClientSession`,
creating tons of new `ClientSession`s causes eventual inotify watch
exhaustion. Sharing `ClientSession`s solves this issue.
Relevant links:
- https://docs.aiohttp.org/en/stable/http_request_lifecycle.html
- https://stackoverflow.com/questions/62707369/one-aiohttp-clientsession-per-thread
- home-assistant/core#144457 (comment)
Relevant PR: kubernetes-sigs#247
(doesn't address the issue of worker sharing).
Slightly refactor `openAIModelServerClient` to add a new method,
`process_request_with_session`, that accepts a custom
`ReusableHTTPClientSession` per request, which allows the caller to
reuse an HTTP client session per worker.
The previous method, `process_request`, is made to create a fresh HTTP
client session then call `process_request_with_session`, preserving the
previous behavior.
Prior to this commit, a new `aiohttp.ClientSession` is created for each
request. Not only is this inefficient and lowers throughput, on certain
environments, it also leads to inotify watch issues:
aiodns - WARNING - Failed to create DNS resolver channel with
automatic monitoring of resolver configuration changes. This usually
means the system ran out of inotify watches. Falling back to socket
state callback. Consider increasing the system inotify watch limit:
Failed to initialize c-ares channel
Indeed, because each DNS resolver is created for a new `ClientSession`,
creating tons of new `ClientSession`s causes eventual inotify watch
exhaustion. Sharing `ClientSession`s solves this issue.
Relevant links:
- https://docs.aiohttp.org/en/stable/http_request_lifecycle.html
-
https://stackoverflow.com/questions/62707369/one-aiohttp-clientsession-per-thread
-
home-assistant/core#144457 (comment)
Relevant PR: #247
(doesn't address the issue of worker sharing).
|
@LukeAVanDrie any updates on this PR? |
Reuses aiohttp.ClientSession across requests in openAIModelServerClient to reduce connection overhead. This change improves client-side throughput and latency. Additional improvements: - Refines error handling to distinguish between network errors (like aiohttp.ClientError), non-200 HTTP status codes, and errors during response processing. - Ensures non-200 responses with text bodies are captured. - Guarantees response body is always consumed to release connections.
…ient` to prevent connection leaks
2350ef6 to
c9fd630
Compare
@jjk-g Sorry for letting this PR hang. It looks like the original issues I set out to fix in this PR were handled in #282 and #289. I've rebased and refactored this PR to tackle the following:
Updated the description accordingly. |
…instance` checks and add type hint for `caught_exception`.
Since there isn't an existing unit test suite for these clients, I wrote a standalone asynchronous test harness to verify the behavior locally. I set up a mock Using this script, I specifically verified three things:
Here is the exact test script and output if you'd like to reproduce it locally: Click to expand: Local Test Harness Scriptimport asyncio
import aiohttp
from aiohttp import web
import json
import logging
import time
import sys
from inference_perf.client.requestdatacollector import RequestDataCollector
from inference_perf.config import APIConfig, APIType
from inference_perf.apis import InferenceAPIData, InferenceInfo
from inference_perf.client.modelserver.openai_client import openAIModelServerClient
from unittest.mock import MagicMock
# --- MOCKS AND STUBS ---
class DummyCollector(RequestDataCollector):
def __init__(self):
self.records =[]
def record_metric(self, metric):
self.records.append(metric)
def get_metrics(self):
return self.records
class MockTokenizer:
def __init__(self): ...
def count_tokens(self, text): return 5
class DummyData(InferenceAPIData):
def get_api_type(self) -> APIType: return APIType.Chat
def get_route(self): return "/v1/chat/completions"
async def to_payload(self, effective_model_name, max_tokens, ignore_eos, streaming):
return {"dummy": "payload"}
async def process_response(self, response, config, tokenizer, lora_adapter=None):
data = await response.json()
return InferenceInfo()
# --- MOCK SERVER ---
async def handle_502(request):
# Simulate a typical bad gateway response that returns an HTML body, NOT json!
return web.Response(
status=502,
text="<html><body><h1>502 Bad Gateway</h1></body></html>",
content_type="text/html"
)
async def start_server():
app = web.Application()
app.router.add_post('/v1/chat/completions', handle_502)
app.router.add_get('/v1/models', handle_models)
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, 'localhost', 8080)
await site.start()
return runner
async def handle_models(request):
return web.json_response({"data": [{"id": "mock-model"}]})
class DummyClient(openAIModelServerClient):
def get_prometheus_metric_metadata(self):
return None
def get_supported_apis(self):
return[APIType.Completion, APIType.Chat]
# --- TEST HARNESS ---
async def run_test():
server = await start_server()
uri = "http://localhost:8080"
config = APIConfig(streaming=False)
collector = DummyCollector()
class MockDummyClient(DummyClient):
def __init__(self, **kwargs):
self.model_name = kwargs["model_name"]
self.max_completion_tokens = 30
self.ignore_eos = kwargs.get("ignore_eos", True)
self.metrics_collector = kwargs["metrics_collector"]
self.max_tcp_connections = kwargs["max_tcp_connections"]
self.additional_filters = kwargs.get("additional_filters",[])
self.api_key = kwargs.get("api_key")
self.cert_path = kwargs.get("cert_path")
self.key_path = kwargs.get("key_path")
self.lora_config = kwargs.get("lora_config")
self.tokenizer = MockTokenizer()
self.api_config = kwargs["api_config"]
self.timeout = None
self.uri = kwargs["uri"]
self.session = aiohttp.ClientSession(connector=aiohttp.TCPConnector(limit=self.max_tcp_connections))
self._session_lock = asyncio.Lock()
self._session = None
client = MockDummyClient(
metrics_collector=collector,
api_config=config,
uri=uri,
model_name="mock-model",
tokenizer_config=None,
max_tcp_connections=1,
additional_filters=[],
ignore_eos=True,
)
data = DummyData()
print("--- Sending Request 1 (Expect 502 with HTML body) ---")
await client.process_request(data, 1, 0.0)
# Introspect the aiohttp connection pool:
connector = client._session.session.connector
print("--- Connection State Post-Request ---")
print(f"Acquired connections: {len(connector._acquired)}")
recorded = collector.records
print("\nRecorded Metric:")
if recorded:
print(f"Error captured: {recorded[0].error.error_type}")
print(f"Error msg captured: {recorded[0].error.error_msg}")
assert len(connector._acquired) == 0, "Connection leak detected! Connector has unreleased acquired sockets."
await client.close()
await server.cleanup()
print("\nVerification Successful: Connections did not leak.")
if __name__ == "__main__":
asyncio.run(run_test())Output: As this is no longer a perf-focused PR, I didn't collect any numbers there. |
|
Since there were no existing unit tests for this file, I went ahead and added a baseline pytest suite. While it doesn't cover every legacy edge case in the file yet, it specifically tests the new error-handling branches introduced in this PR to satisfy the coverage check. Hopefully, this boilerplate sets up a good foundation for us to easily expand test coverage on this client in the future |
1582d01 to
bf4998a
Compare
bf4998a to
b9a256e
Compare
|
Thanks for the updates and the new tests! /lgtm |
|
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: jjk-g, LukeAVanDrie The full list of commands accepted by this bot can be found here. The pull request process is described here DetailsNeeds approval from an approver in each of these files:
Approvers can indicate their approval by writing |
…metry (kubernetes-sigs#247) This PR originally aimed to improve client performance via connection pooling. However, since those improvements were merged upstream in kubernetes-sigs#282 and kubernetes-sigs#289, I have rebased and repurposed this PR to strictly harden the error handling and prevent connection leaks during 5xx Gateway Errors and malformed JSON payloads. Changes: - Fixed JSON parser crashes: Validated HTTP 200 status before parsing the response body via `process_response()`, allowing non-200 responses (like 502 HTML pages) to be safely intercepted and recorded as raw text without throwing a `JSONDecodeError`. - Prevented connection leaks: Added a finally block to guarantee the `aiohttp` response body is universally consumed, ensuring sockets are reliably released and returned to the connection pool even during unexpected exceptions. - Unified metrics collection: Consolidated `RequestLifecycleMetric` creation to execute exactly once per request. This eliminates duplicated logic and ensures that failed requests and timeouts accurately record end-to-end wall-clock duration. - Improved error categorization: Replaced a generic Exception catch-all with dedicated exception handlers for `aiohttp.ClientError` and `asyncio.TimeoutError`, ensuring Prometheus metrics surface exact failure modes instead of generic timeouts. Testing: Verified locally using an isolated async test server designed to inject 502 HTML responses and malformed mock payloads. Evaluated the internal state of the `aiohttp` pool (`connector._acquired`) to ensure that `finally: await response.text()` successfully released the socket back to the connector pool without exhaustion.
This PR originally aimed to improve client performance via connection pooling. However, since those improvements were merged upstream in #282 and #289, I have rebased and repurposed this PR to strictly harden the error handling and prevent connection leaks during 5xx Gateway Errors and malformed JSON payloads.
Changes:
process_response(), allowing non-200 responses (like 502 HTML pages) to be safely intercepted and recorded as raw text without throwing aJSONDecodeError.aiohttpresponse body is universally consumed, ensuring sockets are reliably released and returned to the connection pool even during unexpected exceptions.RequestLifecycleMetriccreation to execute exactly once per request. This eliminates duplicated logic and ensures that failed requests and timeouts accurately record end-to-end wall-clock duration.aiohttp.ClientErrorandasyncio.TimeoutError, ensuring Prometheus metrics surface exact failure modes instead of generic timeouts.Testing: Verified locally using an isolated async test server designed to inject 502 HTML responses and malformed mock payloads. Evaluated the internal state of the
aiohttppool (connector._acquired) to ensure thatfinally: await response.text()successfully released the socket back to the connector pool without exhaustion.