44import logging
55import uuid
66import sys
7+ import asyncio
78
89from concurrent import futures
910from .exceptions import (JsonRpcException , JsonRpcRequestCancelled ,
1213log = logging .getLogger (__name__ )
1314JSONRPC_VERSION = '2.0'
1415CANCEL_METHOD = '$/cancelRequest'
16+ EXIT_METHOD = 'exit'
1517
1618
1719class Endpoint :
@@ -35,9 +37,24 @@ def __init__(self, dispatcher, consumer, id_generator=lambda: str(uuid.uuid4()),
3537 self ._client_request_futures = {}
3638 self ._server_request_futures = {}
3739 self ._executor_service = futures .ThreadPoolExecutor (max_workers = max_workers )
40+ self ._cancelledRequests = set ()
41+ self ._messageQueue = None
42+ self ._consume_task = None
43+
44+ def init_async (self ):
45+ self ._messageQueue = asyncio .Queue ()
46+ self ._consume_task = asyncio .create_task (self .consume_task ())
47+
48+ async def consume_task (self ):
49+ while True :
50+ message = await self ._messageQueue .get ()
51+ await asyncio .to_thread (self .consume , message )
52+ self ._messageQueue .task_done ()
3853
3954 def shutdown (self ):
4055 self ._executor_service .shutdown ()
56+ if self ._consume_task is not None :
57+ self ._consume_task .cancel ()
4158
4259 def notify (self , method , params = None ):
4360 """Send a JSON RPC notification to the client.
@@ -94,6 +111,21 @@ def callback(future):
94111 future .set_exception (JsonRpcRequestCancelled ())
95112 return callback
96113
114+ async def consume_async (self , message ):
115+ """Consume a JSON RPC message from the client and put it into a queue.
116+
117+ Args:
118+ message (dict): The JSON RPC message sent by the client
119+ """
120+ if message ['method' ] == CANCEL_METHOD :
121+ self ._cancelledRequests .add (message .get ('params' )['id' ])
122+
123+ # The exit message needs to be handled directly since the stream cannot be closed asynchronously
124+ if message ['method' ] == EXIT_METHOD :
125+ self .consume (message )
126+ else :
127+ await self ._messageQueue .put (message )
128+
97129 def consume (self , message ):
98130 """Consume a JSON RPC message from the client.
99131
@@ -182,6 +214,9 @@ def _handle_request(self, msg_id, method, params):
182214 except KeyError as e :
183215 raise JsonRpcMethodNotFound .of (method ) from e
184216
217+ if msg_id in self ._cancelledRequests :
218+ raise JsonRpcRequestCancelled ()
219+
185220 handler_result = handler (params )
186221
187222 if callable (handler_result ):
0 commit comments