diff --git a/auctioneer/client.go b/auctioneer/client.go index c4469bd6..a25ea9e3 100644 --- a/auctioneer/client.go +++ b/auctioneer/client.go @@ -1311,13 +1311,21 @@ func (c *Client) HandleServerShutdown(err error) error { delete(c.subscribedAccts, key) } c.subscribedAcctsMtx.Unlock() + + // Attempt every account even if some fail — bailing on the first + // error here would leave the remaining accounts silently + // un-subscribed and offline to the matchmaker until the next + // reconnect or process restart. + var errs []error for _, acctKey := range acctKeys { err := c.StartAccountSubscription(context.Background(), acctKey) if err != nil { - return err + log.Errorf("Failed to re-subscribe account %x: %v", + acctKey.PubKey.SerializeCompressed(), err) + errs = append(errs, err) } } - return nil + return errors.Join(errs...) } // unmarshallServerAccount parses the account information sent from the diff --git a/auctioneer/client_test.go b/auctioneer/client_test.go index fd730a03..ad6bcc90 100644 --- a/auctioneer/client_test.go +++ b/auctioneer/client_test.go @@ -5,11 +5,16 @@ import ( "context" "errors" "io" + "sync" "testing" "time" + "github.com/btcsuite/btcd/btcec/v2" + "github.com/lightninglabs/pool/account" "github.com/lightninglabs/pool/auctioneerrpc" + "github.com/lightninglabs/pool/clientdb" "github.com/lightninglabs/pool/order" + "github.com/lightningnetwork/lnd/keychain" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -73,6 +78,43 @@ func (s *fakeServerStream) Recv() (*auctioneerrpc.ServerAuctionMessage, error) { return r.msg, r.err } +func (s *fakeServerStream) CloseSend() error { + return nil +} + +// fakeAuctioneerClient embeds the real ChannelAuctioneerClient interface so it +// satisfies all 14 methods by nil-deref (none are called in this test other +// than the two we override below). +type fakeAuctioneerClient struct { + auctioneerrpc.ChannelAuctioneerClient + + stream auctioneerrpc.ChannelAuctioneer_SubscribeBatchAuctionClient +} + +func (f *fakeAuctioneerClient) Terms(ctx context.Context, + in *auctioneerrpc.TermsRequest, + opts ...grpc.CallOption) (*auctioneerrpc.TermsResponse, error) { + + return &auctioneerrpc.TermsResponse{}, nil +} + +func (f *fakeAuctioneerClient) SubscribeBatchAuction(ctx context.Context, + opts ...grpc.CallOption) ( + auctioneerrpc.ChannelAuctioneer_SubscribeBatchAuctionClient, error) { + + return f.stream, nil +} + +// noPendingBatchSource is a BatchSource stub that always reports "no pending +// batch", letting checkPendingBatch return cleanly. +type noPendingBatchSource struct{} + +func (noPendingBatchSource) PendingBatchSnapshot() ( + *clientdb.LocalBatchSnapshot, error) { + + return nil, account.ErrNoPendingBatch +} + // newTestClient returns a Client wired up just enough to drive // readIncomingStream against a fake server stream. func newTestClient(stream auctioneerrpc.ChannelAuctioneer_SubscribeBatchAuctionClient, @@ -437,3 +479,171 @@ func runCleanupCase(t *testing.T, pubKey [33]byte, t.Fatal("read loop did not exit after EOF") } } + +// TestHandleServerShutdownPartialResubscribeFailure asserts that +// HandleServerShutdown attempts to re-subscribe every account even when one +// of the handshakes fails server-side. The current loop bails on the first +// error, silently leaving the remaining accounts un-subscribed. +func TestHandleServerShutdownPartialResubscribeFailure(t *testing.T) { + t.Parallel() + + // Three distinct account keys. + keys := make([]*keychain.KeyDescriptor, 3) + for i := range keys { + priv, err := btcec.NewPrivateKey() + if err != nil { + t.Fatalf("could not generate key: %v", err) + } + keys[i] = &keychain.KeyDescriptor{PubKey: priv.PubKey()} + } + keyBytes := func(k *keychain.KeyDescriptor) [33]byte { + var b [33]byte + copy(b[:], k.PubKey.SerializeCompressed()) + return b + } + + // Stream that connectServerStream will hand back after closeStream. + stream := &fakeServerStream{ + recv: make(chan recvResult, 8), + sent: make(chan *auctioneerrpc.ClientAuctionMessage, 8), + } + + mainErrChan := make(chan error, 4) + c := &Client{ + cfg: &Config{ + Signer: testSigner, + BatchVersion: order.LatestBatchVersion, + MinBackoff: time.Millisecond, + MaxBackoff: time.Millisecond, + BatchSource: noPendingBatchSource{}, + }, + client: &fakeAuctioneerClient{stream: stream}, + FromServerChan: make(chan *auctioneerrpc.ServerAuctionMessage), + StreamErrChan: mainErrChan, + errChanSwitch: NewErrChanSwitch(mainErrChan), + quit: make(chan struct{}), + subscribedAccts: make(map[[33]byte]*acctSubscription), + } + c.errChanSwitch.Start() + defer c.errChanSwitch.Stop() + defer close(c.quit) + + // Pre-populate the subscribed accounts. HandleServerShutdown only reads + // acctKey out of each subscription to seed its re-subscribe loop; the + // channels here are placeholders. + for _, k := range keys { + c.subscribedAccts[keyBytes(k)] = &acctSubscription{ + acctKey: k, + msgChan: make(chan *auctioneerrpc.ServerAuctionMessage), + quit: make(chan struct{}), + } + } + + // Orchestrator: walk each handshake through Challenge + final + // response. The first attempt always gets ACCOUNT_DOES_NOT_EXIST; + // the rest get Success. Failing on the first attempt (rather than a + // fixed key) keeps the assertion deterministic under Go's randomized + // map iteration order. + var ( + attemptedMtx sync.Mutex + attempted = make(map[[33]byte]struct{}) + ) + go func() { + first := true + for { + var msg *auctioneerrpc.ClientAuctionMessage + select { + case msg = <-stream.sent: + case <-c.quit: + return + } + commit, ok := msg.Msg.(*auctioneerrpc.ClientAuctionMessage_Commit) + if !ok { + continue + } + + // Send Challenge back with the matching commitHash so + // readIncomingStream can route it. + stream.recv <- recvResult{ + msg: &auctioneerrpc.ServerAuctionMessage{ + Msg: &auctioneerrpc.ServerAuctionMessage_Challenge{ + Challenge: &auctioneerrpc.ServerChallenge{ + Challenge: []byte{1, 2, 3, 4}, + CommitHash: commit.Commit.CommitHash, + }, + }, + }, + } + + // Wait for the Subscribe with the trader key. + var subMsg *auctioneerrpc.ClientAuctionMessage + select { + case subMsg = <-stream.sent: + case <-c.quit: + return + } + sub, ok := subMsg.Msg.(*auctioneerrpc.ClientAuctionMessage_Subscribe) + if !ok { + continue + } + + var traderKey [33]byte + copy(traderKey[:], sub.Subscribe.TraderKey) + attemptedMtx.Lock() + attempted[traderKey] = struct{}{} + attemptedMtx.Unlock() + + final := &auctioneerrpc.ServerAuctionMessage{ + Msg: &auctioneerrpc.ServerAuctionMessage_Success{ + Success: &auctioneerrpc.SubscribeSuccess{ + TraderKey: sub.Subscribe.TraderKey, + }, + }, + } + if first { + final = &auctioneerrpc.ServerAuctionMessage{ + Msg: &auctioneerrpc.ServerAuctionMessage_Error{ + Error: &auctioneerrpc.SubscribeError{ + ErrorCode: auctioneerrpc.SubscribeError_ACCOUNT_DOES_NOT_EXIST, + TraderKey: sub.Subscribe.TraderKey, + }, + }, + } + first = false + } + stream.recv <- recvResult{msg: final} + } + }() + + // Drive HandleServerShutdown in a goroutine. + shutdownErr := make(chan error, 1) + go func() { + shutdownErr <- c.HandleServerShutdown(nil) + }() + + // Wait for HandleServerShutdown to return. With the bug, it returns + // after the first handshake's error. With a fix, it returns after + // all three handshakes complete. + select { + case <-shutdownErr: + case <-time.After(2 * time.Second): + t.Fatal("HandleServerShutdown did not return") + } + + // Every account must have been attempted, even though one handshake + // failed. Otherwise that single failure silently took the rest of the + // trader's accounts offline. + attemptedMtx.Lock() + got := len(attempted) + attemptedMtx.Unlock() + if got < len(keys) { + t.Fatalf("expected re-subscribe to attempt all %d accounts, "+ + "only attempted %d — the loop bails on first error "+ + "and leaves later accounts silently un-subscribed", + len(keys), got) + } + + // Terminate the readIncomingStream goroutine that connectServerStream + // spawned, so it doesn't leak past the test. + stream.recv <- recvResult{err: io.EOF} +}