Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 33 additions & 1 deletion auctioneer/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -643,6 +643,25 @@ func (c *Client) connectAndAuthenticate(ctx context.Context,
c.subscribedAcctsMtx.Lock()
c.subscribedAccts[acctPubKey] = sub
c.subscribedAcctsMtx.Unlock()

// The subscription needs to be in the map before authenticate runs so
// that readIncomingStream can route the server's Challenge/Error
// responses back to it. But if the handshake doesn't reach the
// GetSuccess branch below, the entry is stale — a later subscribe
// attempt for the same account would hit the "already subscribed"
// guard at the top of this function and silently no-op without ever
// sending a fresh Commit. Clean up here so only genuinely live
// subscriptions remain in the map.
success := false
defer func() {
if success {
return
}
c.subscribedAcctsMtx.Lock()
delete(c.subscribedAccts, acctPubKey)
c.subscribedAcctsMtx.Unlock()
}()

err := sub.authenticate(ctx)
if err != nil {
log.Errorf("Authentication failed for account %x: %v",
Expand All @@ -661,6 +680,16 @@ func (c *Client) connectAndAuthenticate(ctx context.Context,
// Ah, so it's the server shutting down, so let's re-
// try our connection.
if err == ErrServerErrored {
// HandleServerShutdown clears subscribedAccts
// and re-runs StartAccountSubscription for
// every previously-live account, including
// this one. That inner call installs a fresh
// entry in the map for acctPubKey. We must
// suppress our deferred cleanup so it doesn't
// turn around and delete that fresh entry on
// the way out, leaving the resubscribed
// account unreachable from sendToSubscription.
success = true
return sub, false, c.HandleServerShutdown(nil)
}

Expand All @@ -679,8 +708,11 @@ func (c *Client) connectAndAuthenticate(ctx context.Context,

// Did the server find the account we're interested in?
switch {
// Account exists, everything's good to continue.
// Account exists, everything's good to continue. This is the
// only path that keeps the subscription in the map; every
// other exit triggers the deferred cleanup above.
case srvMsg.GetSuccess() != nil:
success = true
return sub, true, nil

// We got an error. If we're in recovery mode, this could either
Expand Down
238 changes: 236 additions & 2 deletions auctioneer/client_test.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
package auctioneer

import (
"bytes"
"context"
"errors"
"io"
"testing"
"time"

"github.com/lightninglabs/pool/auctioneerrpc"
"github.com/lightninglabs/pool/order"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -43,19 +46,25 @@ func TestJitterBackoffBounds(t *testing.T) {

// fakeServerStream is a minimal implementation of
// ChannelAuctioneer_SubscribeBatchAuctionClient that returns predetermined
// results from Recv. It is only sufficient for driving the client's read loop.
// results from Recv and captures client-sent messages on `sent` (when
// non-nil). It is only sufficient for driving the client's read loop and
// the auth handshake.
type fakeServerStream struct {
grpc.ClientStream

recv chan recvResult
sent chan *auctioneerrpc.ClientAuctionMessage
}

type recvResult struct {
msg *auctioneerrpc.ServerAuctionMessage
err error
}

func (s *fakeServerStream) Send(*auctioneerrpc.ClientAuctionMessage) error {
func (s *fakeServerStream) Send(msg *auctioneerrpc.ClientAuctionMessage) error {
if s.sent != nil {
s.sent <- msg
}
return nil
}

Expand Down Expand Up @@ -203,3 +212,228 @@ func TestReadIncomingStreamContextCanceledDoesNotReconnect(t *testing.T) {
// Expected: no error surfaced.
}
}

// TestConnectAndAuthenticateCleansUpOnError drives a full
// connectAndAuthenticate call in recovery mode against a scripted fake stream
// for each per-account error path that the auctioneer can return after the
// Subscribe message, and asserts the subscription entry is always removed
// from c.subscribedAccts on return.
//
// Regression: previously the entry was added to the map before authenticate
// ran (so readIncomingStream could route the server's Challenge/Error back
// to it) and was never removed on error paths. A later StartAccountSubscription
// for the same account — typically when handleStateOpen runs after on-chain
// confirmation — would hit the "already subscribed" early-return guard at the
// top of connectAndAuthenticate and silently no-op without sending a fresh
// Commit. The per-account 3-way handshake never ran and the trader stayed
// filtered as offline at matching time until the process restarted.
func TestConnectAndAuthenticateCleansUpOnError(t *testing.T) {
t.Parallel()

var pubKey [33]byte
copy(pubKey[:], testAccountDesc.PubKey.SerializeCompressed())

cases := []struct {
name string
errResp *auctioneerrpc.SubscribeError
checkRes func(t *testing.T, sub *acctSubscription,
canRecover bool, err error)
}{
{
// Realistic case: RecoverAccounts probes a key the
// auctioneer hasn't yet seen on chain.
name: "account does not exist",
errResp: &auctioneerrpc.SubscribeError{
ErrorCode: auctioneerrpc.SubscribeError_ACCOUNT_DOES_NOT_EXIST,
TraderKey: pubKey[:],
},
checkRes: func(t *testing.T, sub *acctSubscription,
canRecover bool, err error) {

if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if canRecover {
t.Fatal("expected canRecover=false")
}
if sub == nil {
t.Fatal("expected non-nil subscription")
}
},
},
{
// The auctioneer knows about a reservation for this
// key but the funding tx hasn't confirmed yet. The
// function returns a non-nil sub *and* a typed error,
// which makes the cleanup invariant especially easy
// to get wrong.
name: "incomplete account reservation",
errResp: &auctioneerrpc.SubscribeError{
ErrorCode: auctioneerrpc.SubscribeError_INCOMPLETE_ACCOUNT_RESERVATION,
TraderKey: pubKey[:],
AccountReservation: &auctioneerrpc.AuctionAccount{
Value: 100_000,
Expiry: 144,
TraderKey: pubKey[:],
AuctioneerKey: bytes.Repeat([]byte{0x02}, 33),
BatchKey: bytes.Repeat([]byte{0x03}, 33),
HeightHint: 1,
},
},
checkRes: func(t *testing.T, sub *acctSubscription,
canRecover bool, err error) {

var resErr *AcctResNotCompletedError
if !errors.As(err, &resErr) {
t.Fatalf("expected "+
"AcctResNotCompletedError, "+
"got %v", err)
}
if !canRecover {
t.Fatal("expected canRecover=true")
}
if sub == nil {
t.Fatal("expected non-nil subscription")
}
},
},
}

for _, tc := range cases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()

runCleanupCase(t, pubKey, tc.errResp, tc.checkRes)
})
}
}

// runCleanupCase wires up a fresh Client + fake stream, drives a full
// connectAndAuthenticate handshake in recovery mode, feeds the supplied
// error response back at the Subscribe step, runs the caller's assertions on
// the return values, and finally asserts that subscribedAccts is empty.
func runCleanupCase(t *testing.T, pubKey [33]byte,
errResp *auctioneerrpc.SubscribeError,
checkRes func(t *testing.T, sub *acctSubscription, canRecover bool,
err error)) {

stream := &fakeServerStream{
recv: make(chan recvResult, 1),
sent: make(chan *auctioneerrpc.ClientAuctionMessage, 2),
}

mainErrChan := make(chan error, 1)
c := &Client{
cfg: &Config{
Signer: testSigner,
BatchVersion: order.LatestBatchVersion,
},
serverStream: stream,
FromServerChan: make(chan *auctioneerrpc.ServerAuctionMessage),
StreamErrChan: mainErrChan,
errChanSwitch: NewErrChanSwitch(mainErrChan),
quit: make(chan struct{}),
subscribedAccts: make(map[[33]byte]*acctSubscription),
}
c.errChanSwitch.Start()
defer c.errChanSwitch.Stop()
defer close(c.quit)

// Run the read loop in the background so server responses are routed
// to the subscription's msgChan via subscribedAccts lookups.
readDone := make(chan struct{})
go func() {
c.readIncomingStream()
close(readDone)
}()

type result struct {
sub *acctSubscription
canRecover bool
err error
}
resCh := make(chan result, 1)
go func() {
sub, canRecover, err := c.connectAndAuthenticate(
context.Background(), testAccountDesc, true,
)
resCh <- result{sub, canRecover, err}
}()

// Step 1: capture the Commit and echo its commitHash back in the
// Challenge so readIncomingStream can route it to the right sub.
var commitHash []byte
select {
case msg := <-stream.sent:
commit, ok := msg.Msg.(*auctioneerrpc.ClientAuctionMessage_Commit)
if !ok {
t.Fatalf("expected Commit, got %T", msg.Msg)
}
commitHash = commit.Commit.CommitHash
case <-time.After(defaultTimeout):
t.Fatal("did not receive Commit from client")
}

// Step 2: feed back the Challenge.
stream.recv <- recvResult{
msg: &auctioneerrpc.ServerAuctionMessage{
Msg: &auctioneerrpc.ServerAuctionMessage_Challenge{
Challenge: &auctioneerrpc.ServerChallenge{
Challenge: []byte{1, 2, 3, 4},
CommitHash: commitHash,
},
},
},
}

// Step 3: drain the Subscribe message so authenticate() returns.
select {
case msg := <-stream.sent:
if _, ok := msg.Msg.(*auctioneerrpc.ClientAuctionMessage_Subscribe); !ok {
t.Fatalf("expected Subscribe, got %T", msg.Msg)
}
case <-time.After(defaultTimeout):
t.Fatal("did not receive Subscribe from client")
}

// Step 4: server responds with the supplied error.
stream.recv <- recvResult{
msg: &auctioneerrpc.ServerAuctionMessage{
Msg: &auctioneerrpc.ServerAuctionMessage_Error{
Error: errResp,
},
},
}

// Step 5: connectAndAuthenticate should return; let the caller assert
// the return values.
var res result
select {
case res = <-resCh:
case <-time.After(defaultTimeout):
t.Fatal("connectAndAuthenticate did not return")
}
checkRes(t, res.sub, res.canRecover, res.err)

// In every error case, the subscription must NOT be left in the map.
// A later StartAccountSubscription for this account would otherwise
// hit the "already subscribed" guard and silently no-op without ever
// sending a fresh Commit.
c.subscribedAcctsMtx.Lock()
_, present := c.subscribedAccts[pubKey]
c.subscribedAcctsMtx.Unlock()
if present {
t.Fatal("subscribedAccts entry was not cleaned up; later " +
"subscribes for the same account would silently no-op")
}

// Clean up the background read loop. Sending io.EOF unblocks the
// Recv call and lets readIncomingStream exit cleanly.
stream.recv <- recvResult{err: io.EOF}
select {
case <-readDone:
case <-time.After(defaultTimeout):
t.Fatal("read loop did not exit after EOF")
}
}
Loading