Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 24 additions & 11 deletions async_postgres/pg_connection/notify.nim
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ proc onNotify*(conn: PgConnection, callback: NotifyCallback) =
conn.notifyCallback = callback

proc onListenError*(
conn: PgConnection, callback: proc(msg: string) {.gcsafe, raises: [].}
conn: PgConnection, callback: proc(err: ref PgListenError) {.gcsafe, raises: [].}
) =
## Set a callback invoked when the listen pump dies permanently (reconnection
## failed, or the connection was lost with no channels left to re-subscribe).
Expand Down Expand Up @@ -76,24 +76,34 @@ proc reconnectInPlace*(conn: PgConnection) {.async.} =

# Background pump and start/stop

proc notifyListenDeath(conn: PgConnection, msg: string) {.raises: [].} =
proc newListenError(
msg: string, reconnectionAttempted: bool
): ref PgListenError {.raises: [].} =
(ref PgListenError)(msg: msg, reconnectionAttempted: reconnectionAttempted)

proc notifyListenDeath(
conn: PgConnection, msg: string, reconnectionAttempted: bool
) {.raises: [].} =
## Mark the listen pump as permanently dead and notify both APIs: the pull
## API via `notifyWaiter.fail` and the push API via `listenErrorCallback`.
conn.listenErrorMsg = msg
conn.listenError = newListenError(msg, reconnectionAttempted)
conn.state = csClosed
if conn.notifyWaiter != nil and not conn.notifyWaiter.finished:
let err = newException(PgError, msg)
# Fail the pull-API waiter with a *fresh* exception, never the stored
# `conn.listenError`: that object is re-raised by `checkListenAlive` on every
# later call, so sharing one ref would let its stack trace accumulate.
#
# asyncdispatch types `Future.fail`'s callback chain as raising the base
# `Exception`, so catching `Exception` (not `CatchableError`) is what keeps
# this proc `raises: []` — same idiom as `dispatchNotification`. `fail` runs
# no user callbacks synchronously here, so nothing real is masked, and
# swallowing it guarantees the push-API callback below still fires.
try:
conn.notifyWaiter.fail(err)
conn.notifyWaiter.fail(newListenError(msg, reconnectionAttempted))
except Exception:
discard
if conn.listenErrorCallback != nil:
conn.listenErrorCallback(msg)
conn.listenErrorCallback(conn.listenError)

proc listenPump*(conn: PgConnection) {.async.} =
## Background loop: repeatedly receives messages, dispatching notifications.
Expand Down Expand Up @@ -121,7 +131,7 @@ proc listenPump*(conn: PgConnection) {.async.} =
return # Cancelled from close()
except CatchableError:
if conn.listenChannels.len == 0:
conn.notifyListenDeath("Listen connection lost")
conn.notifyListenDeath("Listen connection lost", false)
return
# Auto-reconnect with exponential backoff
let maxAttempts = conn.listenReconnectMaxAttempts
Expand All @@ -147,7 +157,8 @@ proc listenPump*(conn: PgConnection) {.async.} =
if not reconnected:
conn.notifyListenDeath(
"Listen connection lost: reconnection failed after " & $maxAttempts &
" attempts"
" attempts",
true,
)
return

Expand Down Expand Up @@ -218,8 +229,10 @@ proc checkNotifyOverflow(conn: PgConnection) =

proc checkListenAlive(conn: PgConnection) =
## Raise if the listen pump has died permanently.
if conn.listenErrorMsg.len > 0:
raise newException(PgConnectionError, conn.listenErrorMsg)
if conn.listenError != nil:
# Raise a fresh copy, not the stored object: re-raising one shared ref
# across repeated calls would let its stack trace grow unbounded.
raise newListenError(conn.listenError.msg, conn.listenError.reconnectionAttempted)
if conn.state == csClosed:
raise newException(PgConnectionError, "Connection is closed")

Expand All @@ -229,7 +242,7 @@ proc waitNotification*(
## Wait for the next notification from the buffer.
## If the buffer is empty, blocks until a notification arrives or timeout expires.
## Raises PgNotifyOverflowError if notifications were dropped due to queue overflow.
## Raises PgError if the listen pump has died (e.g. reconnection failed).
## Raises PgListenError if the listen pump has died (e.g. reconnection failed).
conn.checkNotifyOverflow()
conn.checkListenAlive()
if conn.notifyQueue.len > 0:
Expand Down
4 changes: 2 additions & 2 deletions async_postgres/pg_connection/types.nim
Original file line number Diff line number Diff line change
Expand Up @@ -195,15 +195,15 @@ type
notifyWaiter*: Future[void]
sendBuf*: seq[byte] ## Reusable send buffer for COPY IN batching
notifyDropped*: int ## Count of notifications dropped due to queue overflow
listenErrorMsg*: string ## Set when listen pump fails permanently
listenError*: ref PgListenError ## Set when listen pump fails permanently
listenReconnectMaxAttempts*: int
## Max reconnect attempts on listen pump failure. Default 10.
## 0 or negative = unlimited retries (retry until close()).
listenReconnectMaxBackoff*: int
## Max seconds between reconnect attempts (backoff cap). Default 30.
reconnectCallback*: proc() {.gcsafe, raises: [].}
notifyOverflowCallback*: proc(dropped: int) {.gcsafe, raises: [].}
listenErrorCallback*: proc(msg: string) {.gcsafe, raises: [].}
listenErrorCallback*: proc(err: ref PgListenError) {.gcsafe, raises: [].}
## Invoked when the listen pump dies permanently (reconnection failed or
## the connection was lost with nothing left to re-subscribe). Lets push
## API (`onNotify`) users learn the pump is gone — the pull API surfaces
Expand Down
6 changes: 6 additions & 0 deletions async_postgres/pg_errors.nim
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,9 @@ type

PgNotifyOverflowError* = object of PgError
dropped*: int ## Number of notifications dropped due to queue overflow

PgListenError* = object of PgConnectionError
## Listen pump died permanently (reconnection failed or connection lost
## with no channels left to re-subscribe).
reconnectionAttempted*: bool
## True if the pump attempted reconnection before giving up.
59 changes: 56 additions & 3 deletions tests/test_e2e.nim
Original file line number Diff line number Diff line change
Expand Up @@ -4882,10 +4882,12 @@ when hasChronos:

var errored = false
var errMsg = ""
var reconnectionAttempted = false
listener.onListenError(
proc(msg: string) {.gcsafe, raises: [].} =
proc(err: ref PgListenError) {.gcsafe, raises: [].} =
errored = true
errMsg = msg
errMsg = err.msg
reconnectionAttempted = err.reconnectionAttempted
)

await listener.listen("listen_err_cb")
Expand All @@ -4910,8 +4912,59 @@ when hasChronos:
doAssert errored
doAssert errMsg.len > 0
doAssert "reconnection failed" in errMsg
doAssert reconnectionAttempted
doAssert listener.state == csClosed
doAssert listener.listenErrorMsg.len > 0
doAssert listener.listenError != nil
doAssert listener.listenError.reconnectionAttempted

await listener.close()

waitFor t()

test "waitNotification raises fresh PgListenError after permanent death":
proc t() {.async.} =
let listener = await connect(plainConfig())
listener.listenReconnectMaxAttempts = 1
listener.listenReconnectMaxBackoff = 1

await listener.listen("listen_err_fresh")

# Make reconnection fail, then kill the backend so the pump dies for good.
listener.config.port = 1
let killer = await connect(plainConfig())
try:
discard await killer.exec(
"SELECT pg_terminate_backend($1)", @[toPgParam(listener.pid)]
)
except PgError:
discard
await killer.close()

# backoff=1s, then the single reconnect attempt fails → permanent death.
await sleepAsync(milliseconds(4000))
doAssert listener.listenError != nil

# The pull API surfaces the structured PgListenError, not a bare PgError.
var first: ref PgListenError = nil
try:
discard await listener.waitNotification()
except PgListenError as e:
first = e
doAssert first != nil
doAssert first.reconnectionAttempted
doAssert "reconnection failed" in first.msg
# Each raise is a fresh instance, never the stored object — re-raising a
# single shared ref would let its stack trace accumulate across calls.
doAssert first != listener.listenError

var second: ref PgListenError = nil
try:
discard await listener.waitNotification()
except PgListenError as e:
second = e
doAssert second != nil
doAssert second != first
doAssert second != listener.listenError

await listener.close()

Expand Down