This repository was archived by the owner on Jan 11, 2026. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmanager.py
More file actions
89 lines (78 loc) · 3.77 KB
/
manager.py
File metadata and controls
89 lines (78 loc) · 3.77 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
import asyncio
from typing import Dict, Optional
from logger import Logger
from fastapi import HTTPException
class BaseManager:
def __init__(self, ws_manager):
self.logger = Logger(self.__class__.__name__)
self.ws_manager = ws_manager
self.queues: Dict[str, asyncio.Queue] = {}
self.logger.info(f"{self.__class__.__name__} initialized")
async def create_queue(self, request_id: str) -> asyncio.Queue:
"""Create a queue and register it with WebSocketManager"""
self.logger.info(f"Creating queue for request_id: {request_id}")
queue = self.ws_manager.register_listener(request_id)
self.queues[request_id] = queue
return queue
async def cleanup_queue(self, request_id: str):
"""Cleanup queue and unregister from WebSocketManager"""
self.logger.info(f"Cleaning up queue for request_id: {request_id}")
self.ws_manager.unregister_listener(request_id)
if request_id in self.queues:
del self.queues[request_id]
async def wait_for_response(self, request_id: str, timeout: float) -> Optional[Dict]:
"""Wait for a response with timeout"""
try:
# Get or create queue
queue = self.queues.get(request_id)
if not queue:
self.logger.error(f"Queue not found for request_id: {request_id}")
raise HTTPException(status_code=404, detail=f"Request not found: {request_id}")
# Create a task to wait for the response
response_task = asyncio.create_task(queue.get())
# Create a timeout task
timeout_task = asyncio.create_task(asyncio.sleep(timeout))
# Wait for either the response or timeout
done, pending = await asyncio.wait(
[response_task, timeout_task],
return_when=asyncio.FIRST_COMPLETED
)
try:
if response_task in done:
# Response was received
response = response_task.result()
if response and response.get("code") == 200:
return response
else:
error_msg = response.get("message", "Unknown error")
raise HTTPException(status_code=500, detail=error_msg)
else:
# Timeout occurred
self.logger.error(f"Timeout waiting for response for request_id: {request_id}")
raise HTTPException(status_code=408, detail=f"Request timeout: {request_id}")
finally:
# Cancel pending tasks
for task in pending:
if not task.done():
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
except Exception as e:
self.logger.error(f"Error cancelling task: {str(e)}")
# Cleanup queue
if request_id in self.queues:
try:
del self.queues[request_id]
except Exception as e:
self.logger.error(f"Error cleaning up queue: {str(e)}")
except Exception as e:
self.logger.error(f"Error waiting for response: {str(e)}")
# Cleanup queue on error
if request_id in self.queues:
try:
del self.queues[request_id]
except Exception as cleanup_error:
self.logger.error(f"Error during cleanup: {str(cleanup_error)}")
raise