Skip to content

Commit 0752a37

Browse files
UN-3211 [FEAT] HTTP session lifecycle management for workers API clients
- Add _owns_session flag to prevent singleton shared session from being closed by individual clients - Wire API_CLIENT_POOL_SIZE into HTTPAdapter connection pools - Add idempotent close() and __del__ destructor to BaseAPIClient - Add try/finally cleanup in api-deployment and callback tasks - Add on_worker_process_shutdown hook and early-return guard in postrun - Add 25 unit tests for session lifecycle behavior Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 38b3ce1 commit 0752a37

10 files changed

Lines changed: 1028 additions & 310 deletions

File tree

workers/api-deployment/tasks.py

Lines changed: 19 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -168,10 +168,11 @@ def _unified_api_execution(
168168
Returns:
169169
Execution result dictionary
170170
"""
171+
api_client = None
171172
try:
172173
# Set up execution context using shared utilities
173174
organization_id = schema_name
174-
config, api_client = WorkerExecutionContext.setup_execution_context(
175+
_, api_client = WorkerExecutionContext.setup_execution_context(
175176
organization_id, execution_id, workflow_id
176177
)
177178

@@ -233,22 +234,13 @@ def _unified_api_execution(
233234
f"files_processed={len(converted_files)}",
234235
)
235236

236-
# CRITICAL: Clean up StateStore to prevent data leaks between tasks
237-
try:
238-
from shared.infrastructure.context import StateStore
239-
240-
StateStore.clear_all()
241-
logger.debug("🧹 Cleaned up StateStore context to prevent data leaks")
242-
except Exception as cleanup_error:
243-
logger.warning(f"Failed to cleanup StateStore context: {cleanup_error}")
244-
245237
return result
246238

247239
except Exception as e:
248240
logger.error(f"API execution failed: {e}")
249241

250242
# Handle execution error with standardized pattern
251-
if "api_client" in locals():
243+
if api_client is not None:
252244
WorkerExecutionContext.handle_execution_error(
253245
api_client, execution_id, e, logger, f"api_execution_{task_type}"
254246
)
@@ -261,26 +253,29 @@ def _unified_api_execution(
261253
f"error={str(e)}",
262254
)
263255

264-
# CRITICAL: Clean up StateStore to prevent data leaks between tasks (error path)
265-
try:
266-
from shared.infrastructure.context import StateStore
267-
268-
StateStore.clear_all()
269-
logger.debug(
270-
"🧹 Cleaned up StateStore context to prevent data leaks (error path)"
271-
)
272-
except Exception as cleanup_error:
273-
logger.warning(
274-
f"Failed to cleanup StateStore context on error: {cleanup_error}"
275-
)
276-
277256
return {
278257
"execution_id": execution_id,
279258
"status": "ERROR",
280259
"error": str(e),
281260
"files_processed": 0,
282261
}
283262

263+
finally:
264+
# Clean up API client session to prevent socket FD leaks
265+
if api_client is not None:
266+
try:
267+
api_client.close()
268+
except Exception:
269+
pass
270+
271+
# Clean up StateStore to prevent data leaks between tasks
272+
try:
273+
from shared.infrastructure.context import StateStore
274+
275+
StateStore.clear_all()
276+
except Exception as cleanup_error:
277+
logger.warning(f"Failed to cleanup StateStore context: {cleanup_error}")
278+
284279

285280
@app.task(
286281
bind=True,

0 commit comments

Comments
 (0)