Skip to content

Commit b53b07e

Browse files
authored
Fix asyncio.Cancelled error propagated to writer/reader (#800)
1 parent fad111c commit b53b07e

4 files changed

Lines changed: 87 additions & 2 deletions

File tree

ydb/_topic_reader/topic_reader_asyncio.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -727,6 +727,11 @@ async def _read_messages_loop(self):
727727
logger.exception("unexpected message in stream reader: %s" % e)
728728

729729
self._state_changed.set()
730+
except asyncio.CancelledError as e:
731+
logger.debug("reader stream %s error: %s", self._id, e)
732+
if not self._closed:
733+
self._set_first_error(issues.ConnectionLost("gRPC stream cancelled"))
734+
raise
730735
except Exception as e:
731736
logger.debug("reader stream %s error: %s", self._id, e)
732737
self._set_first_error(e)

ydb/_topic_reader/topic_reader_asyncio_test.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1566,3 +1566,40 @@ async def stream_create(
15661566

15671567
reader_stream_mock_with_error.wait_error.assert_any_await()
15681568
reader_stream_mock_with_error.wait_messages.assert_any_await()
1569+
1570+
async def test_wait_error_returns_on_cancelled_error_from_receive(self, default_reader_settings):
1571+
receive_call = 0
1572+
1573+
async def receive(timeout=None):
1574+
nonlocal receive_call
1575+
receive_call += 1
1576+
if receive_call == 1:
1577+
# first call: return init response (from _start)
1578+
return StreamReadMessage.FromServer(
1579+
server_status=ServerStatus(ydb_status_codes_pb2.StatusIds.SUCCESS, []),
1580+
server_message=StreamReadMessage.InitResponse(session_id="test-session"),
1581+
)
1582+
# subsequent calls (from _read_messages_loop): simulate gRPC-level CancelledError
1583+
raise asyncio.CancelledError()
1584+
1585+
stream = mock.MagicMock(spec=StreamMock)
1586+
stream.receive = receive
1587+
stream.write = mock.Mock()
1588+
stream.close = mock.Mock()
1589+
1590+
reader = ReaderStream(4, default_reader_settings)
1591+
await reader._start(stream, default_reader_settings._init_message())
1592+
1593+
# Bug: wait_error() hangs forever because _first_error is never set.
1594+
# After the fix: wait_error() returns with a retriable ConnectionLost error.
1595+
try:
1596+
await asyncio.wait_for(reader.wait_error(), timeout=1.0)
1597+
except asyncio.TimeoutError:
1598+
pytest.fail(
1599+
"wait_error() hung forever after CancelledError from receive() — "
1600+
"bug: CancelledError bypasses 'except Exception' in _read_messages_loop"
1601+
)
1602+
except Exception:
1603+
pass # any error is fine, we just need wait_error() to not hang
1604+
1605+
await reader.close(False)

ydb/_topic_writer/topic_writer_asyncio.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -478,7 +478,12 @@ async def _connection_loop(self):
478478
tasks = [send_loop, receive_loop]
479479
done, _ = await asyncio.wait([send_loop, receive_loop], return_when=asyncio.FIRST_COMPLETED)
480480
done.pop().result() # need for raise exception - reason of stop task
481-
except issues.Error as err:
481+
except (asyncio.CancelledError, issues.Error) as err:
482+
if isinstance(err, asyncio.CancelledError):
483+
if self._closed:
484+
return
485+
err = issues.ConnectionLost("gRPC stream cancelled")
486+
482487
err_info = check_retriable_error(err, retry_settings, attempt)
483488
if not err_info.is_retriable or self._tx is not None: # no retries in tx writer
484489
logger.debug("writer reconnector %s stop connection loop due to %s", self._id, err)
@@ -492,7 +497,7 @@ async def _connection_loop(self):
492497
)
493498
await asyncio.sleep(err_info.sleep_timeout_seconds)
494499

495-
except (asyncio.CancelledError, Exception) as err:
500+
except Exception as err:
496501
self._stop(err)
497502
return
498503
finally:

ydb/_topic_writer/topic_writer_asyncio_test.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -455,6 +455,44 @@ async def test_reconnect_and_resent_non_acked_messages_on_retriable_error(
455455
second_writer.from_server.put_nowait(self.make_default_ack_message(seq_no=2))
456456
await reconnector.close(flush=True)
457457

458+
async def test_reconnect_on_cancelled_error_from_receive(self, default_driver, default_settings, monkeypatch):
459+
stream_creates = 0
460+
stream_2_created = asyncio.Event()
461+
462+
class StreamWriterCancelOnFirstReceive(TestWriterAsyncIOReconnector.StreamWriterMock):
463+
def __init__(self):
464+
super().__init__()
465+
self._first_receive = True
466+
467+
async def receive(self):
468+
if self._first_receive:
469+
self._first_receive = False
470+
raise asyncio.CancelledError()
471+
await asyncio.Future() # stream 2 stays alive
472+
473+
async def create_mock(*args, **kwargs):
474+
nonlocal stream_creates
475+
stream_creates += 1
476+
writer = StreamWriterCancelOnFirstReceive()
477+
writer.last_seqno = TestWriterAsyncIOReconnector.init_last_seqno
478+
if stream_creates >= 2:
479+
stream_2_created.set()
480+
return writer
481+
482+
with mock.patch.object(WriterAsyncIOStream, "create", create_mock):
483+
reconnector = WriterAsyncIOReconnector(default_driver, default_settings)
484+
try:
485+
# Bug: stream 2 is never created — _stop(CancelledError) kills the writer permanently.
486+
# After the fix: writer reconnects and stream 2 is created.
487+
await asyncio.wait_for(stream_2_created.wait(), timeout=2.0)
488+
except asyncio.TimeoutError:
489+
pytest.fail(
490+
"Writer did not reconnect after CancelledError from receive() — "
491+
"bug: _stop(CancelledError) permanently kills writer"
492+
)
493+
finally:
494+
await reconnector.close(False)
495+
458496
async def test_stop_on_unexpected_exception(self, reconnector: WriterAsyncIOReconnector, get_stream_writer):
459497
class TestException(Exception):
460498
pass

0 commit comments

Comments
 (0)