diff --git a/auctioneer/client.go b/auctioneer/client.go index 1f2fbfaf..c4469bd6 100644 --- a/auctioneer/client.go +++ b/auctioneer/client.go @@ -643,6 +643,25 @@ func (c *Client) connectAndAuthenticate(ctx context.Context, c.subscribedAcctsMtx.Lock() c.subscribedAccts[acctPubKey] = sub c.subscribedAcctsMtx.Unlock() + + // The subscription needs to be in the map before authenticate runs so + // that readIncomingStream can route the server's Challenge/Error + // responses back to it. But if the handshake doesn't reach the + // GetSuccess branch below, the entry is stale — a later subscribe + // attempt for the same account would hit the "already subscribed" + // guard at the top of this function and silently no-op without ever + // sending a fresh Commit. Clean up here so only genuinely live + // subscriptions remain in the map. + success := false + defer func() { + if success { + return + } + c.subscribedAcctsMtx.Lock() + delete(c.subscribedAccts, acctPubKey) + c.subscribedAcctsMtx.Unlock() + }() + err := sub.authenticate(ctx) if err != nil { log.Errorf("Authentication failed for account %x: %v", @@ -661,6 +680,16 @@ func (c *Client) connectAndAuthenticate(ctx context.Context, // Ah, so it's the server shutting down, so let's re- // try our connection. if err == ErrServerErrored { + // HandleServerShutdown clears subscribedAccts + // and re-runs StartAccountSubscription for + // every previously-live account, including + // this one. That inner call installs a fresh + // entry in the map for acctPubKey. We must + // suppress our deferred cleanup so it doesn't + // turn around and delete that fresh entry on + // the way out, leaving the resubscribed + // account unreachable from sendToSubscription. + success = true return sub, false, c.HandleServerShutdown(nil) } @@ -679,8 +708,11 @@ func (c *Client) connectAndAuthenticate(ctx context.Context, // Did the server find the account we're interested in? switch { - // Account exists, everything's good to continue. + // Account exists, everything's good to continue. This is the + // only path that keeps the subscription in the map; every + // other exit triggers the deferred cleanup above. case srvMsg.GetSuccess() != nil: + success = true return sub, true, nil // We got an error. If we're in recovery mode, this could either diff --git a/auctioneer/client_test.go b/auctioneer/client_test.go index b6bc74d6..fd730a03 100644 --- a/auctioneer/client_test.go +++ b/auctioneer/client_test.go @@ -1,12 +1,15 @@ package auctioneer import ( + "bytes" + "context" "errors" "io" "testing" "time" "github.com/lightninglabs/pool/auctioneerrpc" + "github.com/lightninglabs/pool/order" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -43,11 +46,14 @@ func TestJitterBackoffBounds(t *testing.T) { // fakeServerStream is a minimal implementation of // ChannelAuctioneer_SubscribeBatchAuctionClient that returns predetermined -// results from Recv. It is only sufficient for driving the client's read loop. +// results from Recv and captures client-sent messages on `sent` (when +// non-nil). It is only sufficient for driving the client's read loop and +// the auth handshake. type fakeServerStream struct { grpc.ClientStream recv chan recvResult + sent chan *auctioneerrpc.ClientAuctionMessage } type recvResult struct { @@ -55,7 +61,10 @@ type recvResult struct { err error } -func (s *fakeServerStream) Send(*auctioneerrpc.ClientAuctionMessage) error { +func (s *fakeServerStream) Send(msg *auctioneerrpc.ClientAuctionMessage) error { + if s.sent != nil { + s.sent <- msg + } return nil } @@ -203,3 +212,228 @@ func TestReadIncomingStreamContextCanceledDoesNotReconnect(t *testing.T) { // Expected: no error surfaced. } } + +// TestConnectAndAuthenticateCleansUpOnError drives a full +// connectAndAuthenticate call in recovery mode against a scripted fake stream +// for each per-account error path that the auctioneer can return after the +// Subscribe message, and asserts the subscription entry is always removed +// from c.subscribedAccts on return. +// +// Regression: previously the entry was added to the map before authenticate +// ran (so readIncomingStream could route the server's Challenge/Error back +// to it) and was never removed on error paths. A later StartAccountSubscription +// for the same account — typically when handleStateOpen runs after on-chain +// confirmation — would hit the "already subscribed" early-return guard at the +// top of connectAndAuthenticate and silently no-op without sending a fresh +// Commit. The per-account 3-way handshake never ran and the trader stayed +// filtered as offline at matching time until the process restarted. +func TestConnectAndAuthenticateCleansUpOnError(t *testing.T) { + t.Parallel() + + var pubKey [33]byte + copy(pubKey[:], testAccountDesc.PubKey.SerializeCompressed()) + + cases := []struct { + name string + errResp *auctioneerrpc.SubscribeError + checkRes func(t *testing.T, sub *acctSubscription, + canRecover bool, err error) + }{ + { + // Realistic case: RecoverAccounts probes a key the + // auctioneer hasn't yet seen on chain. + name: "account does not exist", + errResp: &auctioneerrpc.SubscribeError{ + ErrorCode: auctioneerrpc.SubscribeError_ACCOUNT_DOES_NOT_EXIST, + TraderKey: pubKey[:], + }, + checkRes: func(t *testing.T, sub *acctSubscription, + canRecover bool, err error) { + + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if canRecover { + t.Fatal("expected canRecover=false") + } + if sub == nil { + t.Fatal("expected non-nil subscription") + } + }, + }, + { + // The auctioneer knows about a reservation for this + // key but the funding tx hasn't confirmed yet. The + // function returns a non-nil sub *and* a typed error, + // which makes the cleanup invariant especially easy + // to get wrong. + name: "incomplete account reservation", + errResp: &auctioneerrpc.SubscribeError{ + ErrorCode: auctioneerrpc.SubscribeError_INCOMPLETE_ACCOUNT_RESERVATION, + TraderKey: pubKey[:], + AccountReservation: &auctioneerrpc.AuctionAccount{ + Value: 100_000, + Expiry: 144, + TraderKey: pubKey[:], + AuctioneerKey: bytes.Repeat([]byte{0x02}, 33), + BatchKey: bytes.Repeat([]byte{0x03}, 33), + HeightHint: 1, + }, + }, + checkRes: func(t *testing.T, sub *acctSubscription, + canRecover bool, err error) { + + var resErr *AcctResNotCompletedError + if !errors.As(err, &resErr) { + t.Fatalf("expected "+ + "AcctResNotCompletedError, "+ + "got %v", err) + } + if !canRecover { + t.Fatal("expected canRecover=true") + } + if sub == nil { + t.Fatal("expected non-nil subscription") + } + }, + }, + } + + for _, tc := range cases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + runCleanupCase(t, pubKey, tc.errResp, tc.checkRes) + }) + } +} + +// runCleanupCase wires up a fresh Client + fake stream, drives a full +// connectAndAuthenticate handshake in recovery mode, feeds the supplied +// error response back at the Subscribe step, runs the caller's assertions on +// the return values, and finally asserts that subscribedAccts is empty. +func runCleanupCase(t *testing.T, pubKey [33]byte, + errResp *auctioneerrpc.SubscribeError, + checkRes func(t *testing.T, sub *acctSubscription, canRecover bool, + err error)) { + + stream := &fakeServerStream{ + recv: make(chan recvResult, 1), + sent: make(chan *auctioneerrpc.ClientAuctionMessage, 2), + } + + mainErrChan := make(chan error, 1) + c := &Client{ + cfg: &Config{ + Signer: testSigner, + BatchVersion: order.LatestBatchVersion, + }, + serverStream: stream, + FromServerChan: make(chan *auctioneerrpc.ServerAuctionMessage), + StreamErrChan: mainErrChan, + errChanSwitch: NewErrChanSwitch(mainErrChan), + quit: make(chan struct{}), + subscribedAccts: make(map[[33]byte]*acctSubscription), + } + c.errChanSwitch.Start() + defer c.errChanSwitch.Stop() + defer close(c.quit) + + // Run the read loop in the background so server responses are routed + // to the subscription's msgChan via subscribedAccts lookups. + readDone := make(chan struct{}) + go func() { + c.readIncomingStream() + close(readDone) + }() + + type result struct { + sub *acctSubscription + canRecover bool + err error + } + resCh := make(chan result, 1) + go func() { + sub, canRecover, err := c.connectAndAuthenticate( + context.Background(), testAccountDesc, true, + ) + resCh <- result{sub, canRecover, err} + }() + + // Step 1: capture the Commit and echo its commitHash back in the + // Challenge so readIncomingStream can route it to the right sub. + var commitHash []byte + select { + case msg := <-stream.sent: + commit, ok := msg.Msg.(*auctioneerrpc.ClientAuctionMessage_Commit) + if !ok { + t.Fatalf("expected Commit, got %T", msg.Msg) + } + commitHash = commit.Commit.CommitHash + case <-time.After(defaultTimeout): + t.Fatal("did not receive Commit from client") + } + + // Step 2: feed back the Challenge. + stream.recv <- recvResult{ + msg: &auctioneerrpc.ServerAuctionMessage{ + Msg: &auctioneerrpc.ServerAuctionMessage_Challenge{ + Challenge: &auctioneerrpc.ServerChallenge{ + Challenge: []byte{1, 2, 3, 4}, + CommitHash: commitHash, + }, + }, + }, + } + + // Step 3: drain the Subscribe message so authenticate() returns. + select { + case msg := <-stream.sent: + if _, ok := msg.Msg.(*auctioneerrpc.ClientAuctionMessage_Subscribe); !ok { + t.Fatalf("expected Subscribe, got %T", msg.Msg) + } + case <-time.After(defaultTimeout): + t.Fatal("did not receive Subscribe from client") + } + + // Step 4: server responds with the supplied error. + stream.recv <- recvResult{ + msg: &auctioneerrpc.ServerAuctionMessage{ + Msg: &auctioneerrpc.ServerAuctionMessage_Error{ + Error: errResp, + }, + }, + } + + // Step 5: connectAndAuthenticate should return; let the caller assert + // the return values. + var res result + select { + case res = <-resCh: + case <-time.After(defaultTimeout): + t.Fatal("connectAndAuthenticate did not return") + } + checkRes(t, res.sub, res.canRecover, res.err) + + // In every error case, the subscription must NOT be left in the map. + // A later StartAccountSubscription for this account would otherwise + // hit the "already subscribed" guard and silently no-op without ever + // sending a fresh Commit. + c.subscribedAcctsMtx.Lock() + _, present := c.subscribedAccts[pubKey] + c.subscribedAcctsMtx.Unlock() + if present { + t.Fatal("subscribedAccts entry was not cleaned up; later " + + "subscribes for the same account would silently no-op") + } + + // Clean up the background read loop. Sending io.EOF unblocks the + // Recv call and lets readIncomingStream exit cleanly. + stream.recv <- recvResult{err: io.EOF} + select { + case <-readDone: + case <-time.After(defaultTimeout): + t.Fatal("read loop did not exit after EOF") + } +}