From d81607af2474be71a4ba49495d5a9edaca45c332 Mon Sep 17 00:00:00 2001 From: djkazic Date: Thu, 21 May 2026 11:41:28 -0400 Subject: [PATCH 1/3] auctioneer: fix initial per-account subscribe failure --- auctioneer/client.go | 24 +++++- auctioneer/client_test.go | 163 +++++++++++++++++++++++++++++++++++++- 2 files changed, 184 insertions(+), 3 deletions(-) diff --git a/auctioneer/client.go b/auctioneer/client.go index 1f2fbfaf..b569cd7f 100644 --- a/auctioneer/client.go +++ b/auctioneer/client.go @@ -643,6 +643,25 @@ func (c *Client) connectAndAuthenticate(ctx context.Context, c.subscribedAcctsMtx.Lock() c.subscribedAccts[acctPubKey] = sub c.subscribedAcctsMtx.Unlock() + + // The subscription needs to be in the map before authenticate runs so + // that readIncomingStream can route the server's Challenge/Error + // responses back to it. But if the handshake doesn't reach the + // GetSuccess branch below, the entry is stale — a later subscribe + // attempt for the same account would hit the "already subscribed" + // guard at the top of this function and silently no-op without ever + // sending a fresh Commit. Clean up here so only genuinely live + // subscriptions remain in the map. + success := false + defer func() { + if success { + return + } + c.subscribedAcctsMtx.Lock() + delete(c.subscribedAccts, acctPubKey) + c.subscribedAcctsMtx.Unlock() + }() + err := sub.authenticate(ctx) if err != nil { log.Errorf("Authentication failed for account %x: %v", @@ -679,8 +698,11 @@ func (c *Client) connectAndAuthenticate(ctx context.Context, // Did the server find the account we're interested in? switch { - // Account exists, everything's good to continue. + // Account exists, everything's good to continue. This is the + // only path that keeps the subscription in the map; every + // other exit triggers the deferred cleanup above. case srvMsg.GetSuccess() != nil: + success = true return sub, true, nil // We got an error. If we're in recovery mode, this could either diff --git a/auctioneer/client_test.go b/auctioneer/client_test.go index b6bc74d6..ae6211af 100644 --- a/auctioneer/client_test.go +++ b/auctioneer/client_test.go @@ -1,12 +1,14 @@ package auctioneer import ( + "context" "errors" "io" "testing" "time" "github.com/lightninglabs/pool/auctioneerrpc" + "github.com/lightninglabs/pool/order" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -43,11 +45,14 @@ func TestJitterBackoffBounds(t *testing.T) { // fakeServerStream is a minimal implementation of // ChannelAuctioneer_SubscribeBatchAuctionClient that returns predetermined -// results from Recv. It is only sufficient for driving the client's read loop. +// results from Recv and captures client-sent messages on `sent` (when +// non-nil). It is only sufficient for driving the client's read loop and +// the auth handshake. type fakeServerStream struct { grpc.ClientStream recv chan recvResult + sent chan *auctioneerrpc.ClientAuctionMessage } type recvResult struct { @@ -55,7 +60,10 @@ type recvResult struct { err error } -func (s *fakeServerStream) Send(*auctioneerrpc.ClientAuctionMessage) error { +func (s *fakeServerStream) Send(msg *auctioneerrpc.ClientAuctionMessage) error { + if s.sent != nil { + s.sent <- msg + } return nil } @@ -203,3 +211,154 @@ func TestReadIncomingStreamContextCanceledDoesNotReconnect(t *testing.T) { // Expected: no error surfaced. } } + +// TestConnectAndAuthenticateCleansUpOnAccountDoesNotExist drives a full +// connectAndAuthenticate call in recovery mode against a scripted fake stream +// that responds with ACCOUNT_DOES_NOT_EXIST after the client's Subscribe. It +// asserts the subscription entry is removed from c.subscribedAccts on return. +// +// Regression: previously the entry was added to the map before authenticate +// ran (so readIncomingStream could route the server's Challenge/Error back +// to it) and was never removed on error paths. A later StartAccountSubscription +// for the same account — typically when handleStateOpen runs after on-chain +// confirmation — would hit the "already subscribed" early-return guard at the +// top of connectAndAuthenticate and silently no-op without sending a fresh +// Commit. The per-account 3-way handshake never ran and the trader stayed +// filtered as offline at matching time until the process restarted. +func TestConnectAndAuthenticateCleansUpOnAccountDoesNotExist(t *testing.T) { + t.Parallel() + + stream := &fakeServerStream{ + recv: make(chan recvResult, 1), + sent: make(chan *auctioneerrpc.ClientAuctionMessage, 2), + } + + mainErrChan := make(chan error, 1) + c := &Client{ + cfg: &Config{ + Signer: testSigner, + BatchVersion: order.LatestBatchVersion, + }, + serverStream: stream, + FromServerChan: make(chan *auctioneerrpc.ServerAuctionMessage), + StreamErrChan: mainErrChan, + errChanSwitch: NewErrChanSwitch(mainErrChan), + quit: make(chan struct{}), + subscribedAccts: make(map[[33]byte]*acctSubscription), + } + c.errChanSwitch.Start() + defer c.errChanSwitch.Stop() + defer close(c.quit) + + // Run the read loop in the background so server responses are routed + // to the subscription's msgChan via subscribedAccts lookups. + readDone := make(chan struct{}) + go func() { + c.readIncomingStream() + close(readDone) + }() + + type result struct { + sub *acctSubscription + canRecover bool + err error + } + resCh := make(chan result, 1) + go func() { + sub, canRecover, err := c.connectAndAuthenticate( + context.Background(), testAccountDesc, true, + ) + resCh <- result{sub, canRecover, err} + }() + + // Step 1: capture the Commit and echo its commitHash back in the + // Challenge so readIncomingStream can route it to the right sub. + var commitHash []byte + select { + case msg := <-stream.sent: + commit, ok := msg.Msg.(*auctioneerrpc.ClientAuctionMessage_Commit) + if !ok { + t.Fatalf("expected Commit, got %T", msg.Msg) + } + commitHash = commit.Commit.CommitHash + case <-time.After(defaultTimeout): + t.Fatal("did not receive Commit from client") + } + + // Step 2: feed back the Challenge. + stream.recv <- recvResult{ + msg: &auctioneerrpc.ServerAuctionMessage{ + Msg: &auctioneerrpc.ServerAuctionMessage_Challenge{ + Challenge: &auctioneerrpc.ServerChallenge{ + Challenge: []byte{1, 2, 3, 4}, + CommitHash: commitHash, + }, + }, + }, + } + + // Step 3: drain the Subscribe message so authenticate() returns. + select { + case msg := <-stream.sent: + if _, ok := msg.Msg.(*auctioneerrpc.ClientAuctionMessage_Subscribe); !ok { + t.Fatalf("expected Subscribe, got %T", msg.Msg) + } + case <-time.After(defaultTimeout): + t.Fatal("did not receive Subscribe from client") + } + + // Step 4: server responds with ACCOUNT_DOES_NOT_EXIST (the realistic + // case where RecoverAccounts probes a key the auctioneer hasn't yet + // seen on chain). + var pubKey [33]byte + copy(pubKey[:], testAccountDesc.PubKey.SerializeCompressed()) + stream.recv <- recvResult{ + msg: &auctioneerrpc.ServerAuctionMessage{ + Msg: &auctioneerrpc.ServerAuctionMessage_Error{ + Error: &auctioneerrpc.SubscribeError{ + ErrorCode: auctioneerrpc.SubscribeError_ACCOUNT_DOES_NOT_EXIST, + TraderKey: pubKey[:], + }, + }, + }, + } + + // Step 5: connectAndAuthenticate should return (sub, false, nil) ... + var res result + select { + case res = <-resCh: + case <-time.After(defaultTimeout): + t.Fatal("connectAndAuthenticate did not return") + } + if res.err != nil { + t.Fatalf("unexpected error: %v", res.err) + } + if res.canRecover { + t.Fatal("expected canRecover=false on ACCOUNT_DOES_NOT_EXIST") + } + if res.sub == nil { + t.Fatal("expected non-nil subscription") + } + + // ... and the subscription must NOT be left in the map. A later + // StartAccountSubscription for this account would otherwise hit the + // "already subscribed" guard and silently no-op without ever sending + // a fresh Commit. + c.subscribedAcctsMtx.Lock() + _, present := c.subscribedAccts[pubKey] + c.subscribedAcctsMtx.Unlock() + if present { + t.Fatal("subscribedAccts entry was not cleaned up after " + + "ACCOUNT_DOES_NOT_EXIST; later subscribes for the " + + "same account would silently no-op") + } + + // Clean up the background read loop. Sending io.EOF unblocks the + // Recv call and lets readIncomingStream exit cleanly. + stream.recv <- recvResult{err: io.EOF} + select { + case <-readDone: + case <-time.After(defaultTimeout): + t.Fatal("read loop did not exit after EOF") + } +} From 0f42ef11ce35b32a9c59ecd4fc501542d05cee29 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Thu, 21 May 2026 17:33:20 -0700 Subject: [PATCH 2/3] auctioneer: keep resubscribed entry alive on ErrServerErrored path MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit connectAndAuthenticate's deferred cleanup runs unconditionally on any non-success exit. On the ErrServerErrored branch we hand off to HandleServerShutdown, which itself clears subscribedAccts and re-runs StartAccountSubscription for every previously-live account — installing a fresh entry at acctPubKey. Without suppressing the defer, the outer return would then turn around and delete that fresh entry, leaving the just-resubscribed account unreachable from sendToSubscription. Set success = true before invoking HandleServerShutdown so the inner resubscribe's map entry survives. The defer's invariant — "only delete what this call originally inserted" — still holds because the inner StartAccountSubscription has already replaced our insertion with its own before HandleServerShutdown returns. --- auctioneer/client.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/auctioneer/client.go b/auctioneer/client.go index b569cd7f..c4469bd6 100644 --- a/auctioneer/client.go +++ b/auctioneer/client.go @@ -680,6 +680,16 @@ func (c *Client) connectAndAuthenticate(ctx context.Context, // Ah, so it's the server shutting down, so let's re- // try our connection. if err == ErrServerErrored { + // HandleServerShutdown clears subscribedAccts + // and re-runs StartAccountSubscription for + // every previously-live account, including + // this one. That inner call installs a fresh + // entry in the map for acctPubKey. We must + // suppress our deferred cleanup so it doesn't + // turn around and delete that fresh entry on + // the way out, leaving the resubscribed + // account unreachable from sendToSubscription. + success = true return sub, false, c.HandleServerShutdown(nil) } From 1e5334350ef6d7110caab2be6ff73a0c0aac8e9b Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Thu, 21 May 2026 17:33:27 -0700 Subject: [PATCH 3/3] auctioneer: cover INCOMPLETE_ACCOUNT_RESERVATION in cleanup test The existing TestConnectAndAuthenticateCleansUpOnAccountDoesNotExist only exercised one of the per-account error paths whose cleanup is now guarded by the deferred delete. The INCOMPLETE_ACCOUNT_RESERVATION case is the most error-prone of the bunch: it returns a non-nil sub *and* a typed AcctResNotCompletedError, which makes the "sub got returned, so the entry must still be in the map" mental model easy to fall into. Refactor the test into a table-driven TestConnectAndAuthenticateCleansUpOnError with a runCleanupCase helper that drives the full handshake and asserts the post-condition, and add the INCOMPLETE_ACCOUNT_RESERVATION case alongside the original ACCOUNT_DOES_NOT_EXIST case. --- auctioneer/client_test.go | 135 +++++++++++++++++++++++++++++--------- 1 file changed, 105 insertions(+), 30 deletions(-) diff --git a/auctioneer/client_test.go b/auctioneer/client_test.go index ae6211af..fd730a03 100644 --- a/auctioneer/client_test.go +++ b/auctioneer/client_test.go @@ -1,6 +1,7 @@ package auctioneer import ( + "bytes" "context" "errors" "io" @@ -212,10 +213,11 @@ func TestReadIncomingStreamContextCanceledDoesNotReconnect(t *testing.T) { } } -// TestConnectAndAuthenticateCleansUpOnAccountDoesNotExist drives a full +// TestConnectAndAuthenticateCleansUpOnError drives a full // connectAndAuthenticate call in recovery mode against a scripted fake stream -// that responds with ACCOUNT_DOES_NOT_EXIST after the client's Subscribe. It -// asserts the subscription entry is removed from c.subscribedAccts on return. +// for each per-account error path that the auctioneer can return after the +// Subscribe message, and asserts the subscription entry is always removed +// from c.subscribedAccts on return. // // Regression: previously the entry was added to the map before authenticate // ran (so readIncomingStream could route the server's Challenge/Error back @@ -225,9 +227,97 @@ func TestReadIncomingStreamContextCanceledDoesNotReconnect(t *testing.T) { // top of connectAndAuthenticate and silently no-op without sending a fresh // Commit. The per-account 3-way handshake never ran and the trader stayed // filtered as offline at matching time until the process restarted. -func TestConnectAndAuthenticateCleansUpOnAccountDoesNotExist(t *testing.T) { +func TestConnectAndAuthenticateCleansUpOnError(t *testing.T) { t.Parallel() + var pubKey [33]byte + copy(pubKey[:], testAccountDesc.PubKey.SerializeCompressed()) + + cases := []struct { + name string + errResp *auctioneerrpc.SubscribeError + checkRes func(t *testing.T, sub *acctSubscription, + canRecover bool, err error) + }{ + { + // Realistic case: RecoverAccounts probes a key the + // auctioneer hasn't yet seen on chain. + name: "account does not exist", + errResp: &auctioneerrpc.SubscribeError{ + ErrorCode: auctioneerrpc.SubscribeError_ACCOUNT_DOES_NOT_EXIST, + TraderKey: pubKey[:], + }, + checkRes: func(t *testing.T, sub *acctSubscription, + canRecover bool, err error) { + + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if canRecover { + t.Fatal("expected canRecover=false") + } + if sub == nil { + t.Fatal("expected non-nil subscription") + } + }, + }, + { + // The auctioneer knows about a reservation for this + // key but the funding tx hasn't confirmed yet. The + // function returns a non-nil sub *and* a typed error, + // which makes the cleanup invariant especially easy + // to get wrong. + name: "incomplete account reservation", + errResp: &auctioneerrpc.SubscribeError{ + ErrorCode: auctioneerrpc.SubscribeError_INCOMPLETE_ACCOUNT_RESERVATION, + TraderKey: pubKey[:], + AccountReservation: &auctioneerrpc.AuctionAccount{ + Value: 100_000, + Expiry: 144, + TraderKey: pubKey[:], + AuctioneerKey: bytes.Repeat([]byte{0x02}, 33), + BatchKey: bytes.Repeat([]byte{0x03}, 33), + HeightHint: 1, + }, + }, + checkRes: func(t *testing.T, sub *acctSubscription, + canRecover bool, err error) { + + var resErr *AcctResNotCompletedError + if !errors.As(err, &resErr) { + t.Fatalf("expected "+ + "AcctResNotCompletedError, "+ + "got %v", err) + } + if !canRecover { + t.Fatal("expected canRecover=true") + } + if sub == nil { + t.Fatal("expected non-nil subscription") + } + }, + }, + } + + for _, tc := range cases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + runCleanupCase(t, pubKey, tc.errResp, tc.checkRes) + }) + } +} + +// runCleanupCase wires up a fresh Client + fake stream, drives a full +// connectAndAuthenticate handshake in recovery mode, feeds the supplied +// error response back at the Subscribe step, runs the caller's assertions on +// the return values, and finally asserts that subscribedAccts is empty. +func runCleanupCase(t *testing.T, pubKey [33]byte, + errResp *auctioneerrpc.SubscribeError, + checkRes func(t *testing.T, sub *acctSubscription, canRecover bool, + err error)) { + stream := &fakeServerStream{ recv: make(chan recvResult, 1), sent: make(chan *auctioneerrpc.ClientAuctionMessage, 2), @@ -307,50 +397,35 @@ func TestConnectAndAuthenticateCleansUpOnAccountDoesNotExist(t *testing.T) { t.Fatal("did not receive Subscribe from client") } - // Step 4: server responds with ACCOUNT_DOES_NOT_EXIST (the realistic - // case where RecoverAccounts probes a key the auctioneer hasn't yet - // seen on chain). - var pubKey [33]byte - copy(pubKey[:], testAccountDesc.PubKey.SerializeCompressed()) + // Step 4: server responds with the supplied error. stream.recv <- recvResult{ msg: &auctioneerrpc.ServerAuctionMessage{ Msg: &auctioneerrpc.ServerAuctionMessage_Error{ - Error: &auctioneerrpc.SubscribeError{ - ErrorCode: auctioneerrpc.SubscribeError_ACCOUNT_DOES_NOT_EXIST, - TraderKey: pubKey[:], - }, + Error: errResp, }, }, } - // Step 5: connectAndAuthenticate should return (sub, false, nil) ... + // Step 5: connectAndAuthenticate should return; let the caller assert + // the return values. var res result select { case res = <-resCh: case <-time.After(defaultTimeout): t.Fatal("connectAndAuthenticate did not return") } - if res.err != nil { - t.Fatalf("unexpected error: %v", res.err) - } - if res.canRecover { - t.Fatal("expected canRecover=false on ACCOUNT_DOES_NOT_EXIST") - } - if res.sub == nil { - t.Fatal("expected non-nil subscription") - } + checkRes(t, res.sub, res.canRecover, res.err) - // ... and the subscription must NOT be left in the map. A later - // StartAccountSubscription for this account would otherwise hit the - // "already subscribed" guard and silently no-op without ever sending - // a fresh Commit. + // In every error case, the subscription must NOT be left in the map. + // A later StartAccountSubscription for this account would otherwise + // hit the "already subscribed" guard and silently no-op without ever + // sending a fresh Commit. c.subscribedAcctsMtx.Lock() _, present := c.subscribedAccts[pubKey] c.subscribedAcctsMtx.Unlock() if present { - t.Fatal("subscribedAccts entry was not cleaned up after " + - "ACCOUNT_DOES_NOT_EXIST; later subscribes for the " + - "same account would silently no-op") + t.Fatal("subscribedAccts entry was not cleaned up; later " + + "subscribes for the same account would silently no-op") } // Clean up the background read loop. Sending io.EOF unblocks the