diff --git a/async_postgres/pg_connection/notify.nim b/async_postgres/pg_connection/notify.nim index cb6ea40..0373da4 100644 --- a/async_postgres/pg_connection/notify.nim +++ b/async_postgres/pg_connection/notify.nim @@ -30,7 +30,7 @@ proc onNotify*(conn: PgConnection, callback: NotifyCallback) = conn.notifyCallback = callback proc onListenError*( - conn: PgConnection, callback: proc(msg: string) {.gcsafe, raises: [].} + conn: PgConnection, callback: proc(err: ref PgListenError) {.gcsafe, raises: [].} ) = ## Set a callback invoked when the listen pump dies permanently (reconnection ## failed, or the connection was lost with no channels left to re-subscribe). @@ -76,24 +76,34 @@ proc reconnectInPlace*(conn: PgConnection) {.async.} = # Background pump and start/stop -proc notifyListenDeath(conn: PgConnection, msg: string) {.raises: [].} = +proc newListenError( + msg: string, reconnectionAttempted: bool +): ref PgListenError {.raises: [].} = + (ref PgListenError)(msg: msg, reconnectionAttempted: reconnectionAttempted) + +proc notifyListenDeath( + conn: PgConnection, msg: string, reconnectionAttempted: bool +) {.raises: [].} = ## Mark the listen pump as permanently dead and notify both APIs: the pull ## API via `notifyWaiter.fail` and the push API via `listenErrorCallback`. - conn.listenErrorMsg = msg + conn.listenError = newListenError(msg, reconnectionAttempted) conn.state = csClosed if conn.notifyWaiter != nil and not conn.notifyWaiter.finished: - let err = newException(PgError, msg) + # Fail the pull-API waiter with a *fresh* exception, never the stored + # `conn.listenError`: that object is re-raised by `checkListenAlive` on every + # later call, so sharing one ref would let its stack trace accumulate. + # # asyncdispatch types `Future.fail`'s callback chain as raising the base # `Exception`, so catching `Exception` (not `CatchableError`) is what keeps # this proc `raises: []` — same idiom as `dispatchNotification`. `fail` runs # no user callbacks synchronously here, so nothing real is masked, and # swallowing it guarantees the push-API callback below still fires. try: - conn.notifyWaiter.fail(err) + conn.notifyWaiter.fail(newListenError(msg, reconnectionAttempted)) except Exception: discard if conn.listenErrorCallback != nil: - conn.listenErrorCallback(msg) + conn.listenErrorCallback(conn.listenError) proc listenPump*(conn: PgConnection) {.async.} = ## Background loop: repeatedly receives messages, dispatching notifications. @@ -121,7 +131,7 @@ proc listenPump*(conn: PgConnection) {.async.} = return # Cancelled from close() except CatchableError: if conn.listenChannels.len == 0: - conn.notifyListenDeath("Listen connection lost") + conn.notifyListenDeath("Listen connection lost", false) return # Auto-reconnect with exponential backoff let maxAttempts = conn.listenReconnectMaxAttempts @@ -147,7 +157,8 @@ proc listenPump*(conn: PgConnection) {.async.} = if not reconnected: conn.notifyListenDeath( "Listen connection lost: reconnection failed after " & $maxAttempts & - " attempts" + " attempts", + true, ) return @@ -218,8 +229,10 @@ proc checkNotifyOverflow(conn: PgConnection) = proc checkListenAlive(conn: PgConnection) = ## Raise if the listen pump has died permanently. - if conn.listenErrorMsg.len > 0: - raise newException(PgConnectionError, conn.listenErrorMsg) + if conn.listenError != nil: + # Raise a fresh copy, not the stored object: re-raising one shared ref + # across repeated calls would let its stack trace grow unbounded. + raise newListenError(conn.listenError.msg, conn.listenError.reconnectionAttempted) if conn.state == csClosed: raise newException(PgConnectionError, "Connection is closed") @@ -229,7 +242,7 @@ proc waitNotification*( ## Wait for the next notification from the buffer. ## If the buffer is empty, blocks until a notification arrives or timeout expires. ## Raises PgNotifyOverflowError if notifications were dropped due to queue overflow. - ## Raises PgError if the listen pump has died (e.g. reconnection failed). + ## Raises PgListenError if the listen pump has died (e.g. reconnection failed). conn.checkNotifyOverflow() conn.checkListenAlive() if conn.notifyQueue.len > 0: diff --git a/async_postgres/pg_connection/types.nim b/async_postgres/pg_connection/types.nim index bc665fe..dc96568 100644 --- a/async_postgres/pg_connection/types.nim +++ b/async_postgres/pg_connection/types.nim @@ -195,7 +195,7 @@ type notifyWaiter*: Future[void] sendBuf*: seq[byte] ## Reusable send buffer for COPY IN batching notifyDropped*: int ## Count of notifications dropped due to queue overflow - listenErrorMsg*: string ## Set when listen pump fails permanently + listenError*: ref PgListenError ## Set when listen pump fails permanently listenReconnectMaxAttempts*: int ## Max reconnect attempts on listen pump failure. Default 10. ## 0 or negative = unlimited retries (retry until close()). @@ -203,7 +203,7 @@ type ## Max seconds between reconnect attempts (backoff cap). Default 30. reconnectCallback*: proc() {.gcsafe, raises: [].} notifyOverflowCallback*: proc(dropped: int) {.gcsafe, raises: [].} - listenErrorCallback*: proc(msg: string) {.gcsafe, raises: [].} + listenErrorCallback*: proc(err: ref PgListenError) {.gcsafe, raises: [].} ## Invoked when the listen pump dies permanently (reconnection failed or ## the connection was lost with nothing left to re-subscribe). Lets push ## API (`onNotify`) users learn the pump is gone — the pull API surfaces diff --git a/async_postgres/pg_errors.nim b/async_postgres/pg_errors.nim index f127c4a..1158385 100644 --- a/async_postgres/pg_errors.nim +++ b/async_postgres/pg_errors.nim @@ -40,3 +40,9 @@ type PgNotifyOverflowError* = object of PgError dropped*: int ## Number of notifications dropped due to queue overflow + + PgListenError* = object of PgConnectionError + ## Listen pump died permanently (reconnection failed or connection lost + ## with no channels left to re-subscribe). + reconnectionAttempted*: bool + ## True if the pump attempted reconnection before giving up. diff --git a/tests/test_e2e.nim b/tests/test_e2e.nim index 2ba4972..d8bc04a 100644 --- a/tests/test_e2e.nim +++ b/tests/test_e2e.nim @@ -4882,10 +4882,12 @@ when hasChronos: var errored = false var errMsg = "" + var reconnectionAttempted = false listener.onListenError( - proc(msg: string) {.gcsafe, raises: [].} = + proc(err: ref PgListenError) {.gcsafe, raises: [].} = errored = true - errMsg = msg + errMsg = err.msg + reconnectionAttempted = err.reconnectionAttempted ) await listener.listen("listen_err_cb") @@ -4910,8 +4912,59 @@ when hasChronos: doAssert errored doAssert errMsg.len > 0 doAssert "reconnection failed" in errMsg + doAssert reconnectionAttempted doAssert listener.state == csClosed - doAssert listener.listenErrorMsg.len > 0 + doAssert listener.listenError != nil + doAssert listener.listenError.reconnectionAttempted + + await listener.close() + + waitFor t() + + test "waitNotification raises fresh PgListenError after permanent death": + proc t() {.async.} = + let listener = await connect(plainConfig()) + listener.listenReconnectMaxAttempts = 1 + listener.listenReconnectMaxBackoff = 1 + + await listener.listen("listen_err_fresh") + + # Make reconnection fail, then kill the backend so the pump dies for good. + listener.config.port = 1 + let killer = await connect(plainConfig()) + try: + discard await killer.exec( + "SELECT pg_terminate_backend($1)", @[toPgParam(listener.pid)] + ) + except PgError: + discard + await killer.close() + + # backoff=1s, then the single reconnect attempt fails → permanent death. + await sleepAsync(milliseconds(4000)) + doAssert listener.listenError != nil + + # The pull API surfaces the structured PgListenError, not a bare PgError. + var first: ref PgListenError = nil + try: + discard await listener.waitNotification() + except PgListenError as e: + first = e + doAssert first != nil + doAssert first.reconnectionAttempted + doAssert "reconnection failed" in first.msg + # Each raise is a fresh instance, never the stored object — re-raising a + # single shared ref would let its stack trace accumulate across calls. + doAssert first != listener.listenError + + var second: ref PgListenError = nil + try: + discard await listener.waitNotification() + except PgListenError as e: + second = e + doAssert second != nil + doAssert second != first + doAssert second != listener.listenError await listener.close()