Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions auctioneer/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1311,13 +1311,21 @@ func (c *Client) HandleServerShutdown(err error) error {
delete(c.subscribedAccts, key)
}
c.subscribedAcctsMtx.Unlock()

// Attempt every account even if some fail — bailing on the first
// error here would leave the remaining accounts silently
// un-subscribed and offline to the matchmaker until the next
// reconnect or process restart.
var errs []error
for _, acctKey := range acctKeys {
err := c.StartAccountSubscription(context.Background(), acctKey)
if err != nil {
return err
log.Errorf("Failed to re-subscribe account %x: %v",
acctKey.PubKey.SerializeCompressed(), err)
errs = append(errs, err)
}
}
return nil
return errors.Join(errs...)
}

// unmarshallServerAccount parses the account information sent from the
Expand Down
210 changes: 210 additions & 0 deletions auctioneer/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,16 @@ import (
"context"
"errors"
"io"
"sync"
"testing"
"time"

"github.com/btcsuite/btcd/btcec/v2"
"github.com/lightninglabs/pool/account"
"github.com/lightninglabs/pool/auctioneerrpc"
"github.com/lightninglabs/pool/clientdb"
"github.com/lightninglabs/pool/order"
"github.com/lightningnetwork/lnd/keychain"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -73,6 +78,43 @@ func (s *fakeServerStream) Recv() (*auctioneerrpc.ServerAuctionMessage, error) {
return r.msg, r.err
}

func (s *fakeServerStream) CloseSend() error {
return nil
}

// fakeAuctioneerClient embeds the real ChannelAuctioneerClient interface so it
// satisfies all 14 methods by nil-deref (none are called in this test other
// than the two we override below).
type fakeAuctioneerClient struct {
auctioneerrpc.ChannelAuctioneerClient

stream auctioneerrpc.ChannelAuctioneer_SubscribeBatchAuctionClient
}

func (f *fakeAuctioneerClient) Terms(ctx context.Context,
in *auctioneerrpc.TermsRequest,
opts ...grpc.CallOption) (*auctioneerrpc.TermsResponse, error) {

return &auctioneerrpc.TermsResponse{}, nil
}

func (f *fakeAuctioneerClient) SubscribeBatchAuction(ctx context.Context,
opts ...grpc.CallOption) (
auctioneerrpc.ChannelAuctioneer_SubscribeBatchAuctionClient, error) {

return f.stream, nil
}

// noPendingBatchSource is a BatchSource stub that always reports "no pending
// batch", letting checkPendingBatch return cleanly.
type noPendingBatchSource struct{}

func (noPendingBatchSource) PendingBatchSnapshot() (
*clientdb.LocalBatchSnapshot, error) {

return nil, account.ErrNoPendingBatch
}

// newTestClient returns a Client wired up just enough to drive
// readIncomingStream against a fake server stream.
func newTestClient(stream auctioneerrpc.ChannelAuctioneer_SubscribeBatchAuctionClient,
Expand Down Expand Up @@ -437,3 +479,171 @@ func runCleanupCase(t *testing.T, pubKey [33]byte,
t.Fatal("read loop did not exit after EOF")
}
}

// TestHandleServerShutdownPartialResubscribeFailure asserts that
// HandleServerShutdown attempts to re-subscribe every account even when one
// of the handshakes fails server-side. The current loop bails on the first
// error, silently leaving the remaining accounts un-subscribed.
func TestHandleServerShutdownPartialResubscribeFailure(t *testing.T) {
t.Parallel()

// Three distinct account keys.
keys := make([]*keychain.KeyDescriptor, 3)
for i := range keys {
priv, err := btcec.NewPrivateKey()
if err != nil {
t.Fatalf("could not generate key: %v", err)
}
keys[i] = &keychain.KeyDescriptor{PubKey: priv.PubKey()}
}
keyBytes := func(k *keychain.KeyDescriptor) [33]byte {
var b [33]byte
copy(b[:], k.PubKey.SerializeCompressed())
return b
}

// Stream that connectServerStream will hand back after closeStream.
stream := &fakeServerStream{
recv: make(chan recvResult, 8),
sent: make(chan *auctioneerrpc.ClientAuctionMessage, 8),
}

mainErrChan := make(chan error, 4)
c := &Client{
cfg: &Config{
Signer: testSigner,
BatchVersion: order.LatestBatchVersion,
MinBackoff: time.Millisecond,
MaxBackoff: time.Millisecond,
BatchSource: noPendingBatchSource{},
},
client: &fakeAuctioneerClient{stream: stream},
FromServerChan: make(chan *auctioneerrpc.ServerAuctionMessage),
StreamErrChan: mainErrChan,
errChanSwitch: NewErrChanSwitch(mainErrChan),
quit: make(chan struct{}),
subscribedAccts: make(map[[33]byte]*acctSubscription),
}
c.errChanSwitch.Start()
defer c.errChanSwitch.Stop()
defer close(c.quit)

// Pre-populate the subscribed accounts. HandleServerShutdown only reads
// acctKey out of each subscription to seed its re-subscribe loop; the
// channels here are placeholders.
for _, k := range keys {
c.subscribedAccts[keyBytes(k)] = &acctSubscription{
acctKey: k,
msgChan: make(chan *auctioneerrpc.ServerAuctionMessage),
quit: make(chan struct{}),
}
}

// Orchestrator: walk each handshake through Challenge + final
// response. The first attempt always gets ACCOUNT_DOES_NOT_EXIST;
// the rest get Success. Failing on the first attempt (rather than a
// fixed key) keeps the assertion deterministic under Go's randomized
// map iteration order.
var (
attemptedMtx sync.Mutex
attempted = make(map[[33]byte]struct{})
)
go func() {
first := true
for {
var msg *auctioneerrpc.ClientAuctionMessage
select {
case msg = <-stream.sent:
case <-c.quit:
return
}
commit, ok := msg.Msg.(*auctioneerrpc.ClientAuctionMessage_Commit)
if !ok {
continue
}

// Send Challenge back with the matching commitHash so
// readIncomingStream can route it.
stream.recv <- recvResult{
msg: &auctioneerrpc.ServerAuctionMessage{
Msg: &auctioneerrpc.ServerAuctionMessage_Challenge{
Challenge: &auctioneerrpc.ServerChallenge{
Challenge: []byte{1, 2, 3, 4},
CommitHash: commit.Commit.CommitHash,
},
},
},
}

// Wait for the Subscribe with the trader key.
var subMsg *auctioneerrpc.ClientAuctionMessage
select {
case subMsg = <-stream.sent:
case <-c.quit:
return
}
sub, ok := subMsg.Msg.(*auctioneerrpc.ClientAuctionMessage_Subscribe)
if !ok {
continue
}

var traderKey [33]byte
copy(traderKey[:], sub.Subscribe.TraderKey)
attemptedMtx.Lock()
attempted[traderKey] = struct{}{}
attemptedMtx.Unlock()

final := &auctioneerrpc.ServerAuctionMessage{
Msg: &auctioneerrpc.ServerAuctionMessage_Success{
Success: &auctioneerrpc.SubscribeSuccess{
TraderKey: sub.Subscribe.TraderKey,
},
},
}
if first {
final = &auctioneerrpc.ServerAuctionMessage{
Msg: &auctioneerrpc.ServerAuctionMessage_Error{
Error: &auctioneerrpc.SubscribeError{
ErrorCode: auctioneerrpc.SubscribeError_ACCOUNT_DOES_NOT_EXIST,
TraderKey: sub.Subscribe.TraderKey,
},
},
}
first = false
}
stream.recv <- recvResult{msg: final}
}
}()

// Drive HandleServerShutdown in a goroutine.
shutdownErr := make(chan error, 1)
go func() {
shutdownErr <- c.HandleServerShutdown(nil)
}()

// Wait for HandleServerShutdown to return. With the bug, it returns
// after the first handshake's error. With a fix, it returns after
// all three handshakes complete.
select {
case <-shutdownErr:
case <-time.After(2 * time.Second):
t.Fatal("HandleServerShutdown did not return")
}

// Every account must have been attempted, even though one handshake
// failed. Otherwise that single failure silently took the rest of the
// trader's accounts offline.
attemptedMtx.Lock()
got := len(attempted)
attemptedMtx.Unlock()
if got < len(keys) {
t.Fatalf("expected re-subscribe to attempt all %d accounts, "+
"only attempted %d — the loop bails on first error "+
"and leaves later accounts silently un-subscribed",
len(keys), got)
}

// Terminate the readIncomingStream goroutine that connectServerStream
// spawned, so it doesn't leak past the test.
stream.recv <- recvResult{err: io.EOF}
}
Loading