Skip to content

Commit 2b67992

Browse files
committed
relayer: defer getClient calls
1 parent 672ceae commit 2b67992

5 files changed

Lines changed: 61 additions & 32 deletions

File tree

relayer/chain/chain.go

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -119,16 +119,12 @@ func newChain(cfg *config.TOMLConfig, loopKs loop.Keystore, lggr logger.Logger,
119119
ds: ds,
120120
}
121121

122-
getClient := func() (aptos.AptosRpcClient, error) {
123-
return ch.GetClient()
124-
}
125-
126-
ch.txm, err = txm.New(lggr, loopKs, *cfg.TransactionManager, getClient)
122+
ch.txm, err = txm.New(lggr, loopKs, *cfg.TransactionManager, ch.GetClient)
127123
if err != nil {
128124
return nil, err
129125
}
130126

131-
ch.logPoller, err = logpoller.NewLogPoller(lggr, ch.chainInfo(), getClient, ds, cfg.LogPoller)
127+
ch.logPoller, err = logpoller.NewLogPoller(lggr, ch.chainInfo(), ch.GetClient, ds, cfg.LogPoller)
132128
if err != nil {
133129
return nil, fmt.Errorf("failed to create log poller: %w", err)
134130
}
@@ -140,7 +136,7 @@ func newChain(cfg *config.TOMLConfig, loopKs loop.Keystore, lggr logger.Logger,
140136
Config: *cfg.BalanceMonitor,
141137
Logger: lggr,
142138
Keystore: loopKs,
143-
NewClient: getClient,
139+
NewClient: ch.GetClient,
144140
})
145141
if err != nil {
146142
return nil, err

relayer/logpoller/event_poller.go

Lines changed: 37 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import (
66
"strings"
77
"time"
88

9-
cache "github.com/patrickmn/go-cache"
9+
"github.com/patrickmn/go-cache"
1010

1111
"github.com/aptos-labs/aptos-go-sdk"
1212
"github.com/aptos-labs/aptos-go-sdk/api"
@@ -166,7 +166,12 @@ func (l *AptosLogPoller) syncEvent(ctx context.Context, boundAddress aptos.Accou
166166
var resource map[string]any
167167

168168
if !found {
169-
resource, err = l.client.AccountResource(eventAccountAddress, eventHandle)
169+
var client aptos.AptosRpcClient
170+
client, err = l.getClient()
171+
if err != nil {
172+
return fmt.Errorf("failed to get client: %w", err)
173+
}
174+
resource, err = client.AccountResource(eventAccountAddress, eventHandle)
170175
if err != nil {
171176
return fmt.Errorf("syncEvent: failed to fetch the resource: %w", err)
172177
}
@@ -179,7 +184,12 @@ func (l *AptosLogPoller) syncEvent(ctx context.Context, boundAddress aptos.Accou
179184
resource, ok = resourceAny.(map[string]any)
180185
if !ok {
181186
l.lggr.Errorw("Failed to cast cached resource to map[string]any", "key", cacheKey)
182-
resource, err = l.client.AccountResource(eventAccountAddress, eventHandle)
187+
var client aptos.AptosRpcClient
188+
client, err = l.getClient()
189+
if err != nil {
190+
return fmt.Errorf("failed to get client: %w", err)
191+
}
192+
resource, err = client.AccountResource(eventAccountAddress, eventHandle)
183193
if err != nil {
184194
return fmt.Errorf("syncEvent: failed to fetch the resource after cache cast failure: %w", err)
185195
}
@@ -195,13 +205,19 @@ func (l *AptosLogPoller) syncEvent(ctx context.Context, boundAddress aptos.Accou
195205
batchSize := l.config.EventBatchSize
196206
var totalProcessed int = 0
197207

208+
var client aptos.AptosRpcClient
209+
client, err = l.getClient()
210+
if err != nil {
211+
return fmt.Errorf("failed to get client: %w", err)
212+
}
213+
198214
eventLoop:
199215
for {
200216
select {
201217
case <-ctx.Done():
202218
return ctx.Err()
203219
default:
204-
events, err := l.client.EventsByCreationNumber(eventAccountAddress, creationNumber, &latestOffset, &batchSize)
220+
events, err := client.EventsByCreationNumber(eventAccountAddress, creationNumber, &latestOffset, &batchSize)
205221
if err != nil {
206222
l.lggr.Errorw("syncEvent: failed to fetch new events", "error", err)
207223
return fmt.Errorf("syncEvent: failed to fetch events: %w", err)
@@ -328,7 +344,11 @@ func (l *AptosLogPoller) computeEventAccountAddress(boundAddress aptos.AccountAd
328344
Args: [][]byte{},
329345
}
330346

331-
data, err := l.client.View(viewPayload)
347+
client, err := l.getClient()
348+
if err != nil {
349+
return eventAccountAddress, err
350+
}
351+
data, err := client.View(viewPayload)
332352
if err != nil {
333353
return eventAccountAddress, fmt.Errorf("failed to call view function: %+w", err)
334354
}
@@ -354,7 +374,12 @@ func (l *AptosLogPoller) getBlockHead(version uint64) (types.Head, error) {
354374
block, ok = cachedBlockAny.(*api.Block)
355375
if !ok {
356376
l.lggr.Errorw("Failed to cast cached block to *api.Block", "key", cacheKey)
357-
block, err = l.client.BlockByVersion(version, false)
377+
var client aptos.AptosRpcClient
378+
client, err = l.getClient()
379+
if err != nil {
380+
return types.Head{}, fmt.Errorf("failed to get client: %w", err)
381+
}
382+
block, err = client.BlockByVersion(version, false)
358383
if err != nil {
359384
return types.Head{}, fmt.Errorf("failed to get block by version after cache cast failure: %w", err)
360385
}
@@ -363,7 +388,12 @@ func (l *AptosLogPoller) getBlockHead(version uint64) (types.Head, error) {
363388
l.lggr.Debugw("Using cached block", "version", version)
364389
}
365390
} else {
366-
block, err = l.client.BlockByVersion(version, false)
391+
var client aptos.AptosRpcClient
392+
client, err = l.getClient()
393+
if err != nil {
394+
return types.Head{}, fmt.Errorf("failed to get client: %w", err)
395+
}
396+
block, err = client.BlockByVersion(version, false)
367397
if err != nil {
368398
return types.Head{}, fmt.Errorf("failed to get block by version: %w", err)
369399
}

relayer/logpoller/logpoller.go

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import (
77
"sync"
88
"time"
99

10-
cache "github.com/patrickmn/go-cache"
10+
"github.com/patrickmn/go-cache"
1111

1212
"github.com/aptos-labs/aptos-go-sdk"
1313

@@ -31,7 +31,7 @@ type AptosLogPoller struct {
3131
lggr logger.Logger
3232
dbStore *db.DBStore
3333
config *Config
34-
client aptos.AptosRpcClient
34+
getClient func() (aptos.AptosRpcClient, error)
3535
chainInfo types.ChainInfo
3636

3737
mu sync.RWMutex
@@ -49,11 +49,6 @@ type AptosLogPoller struct {
4949
}
5050

5151
func NewLogPoller(lggr logger.Logger, chainInfo types.ChainInfo, getClient func() (aptos.AptosRpcClient, error), ds sqlutil.DataSource, cfg *Config) (*AptosLogPoller, error) {
52-
client, err := getClient()
53-
if err != nil {
54-
return nil, err
55-
}
56-
5752
if cfg == nil {
5853
cfg = &DefaultConfigSet
5954
}
@@ -67,7 +62,7 @@ func NewLogPoller(lggr logger.Logger, chainInfo types.ChainInfo, getClient func(
6762
lggr: logger.Named(lggr, "AptosLogPoller"),
6863
dbStore: dbStore,
6964
config: cfg,
70-
client: client,
65+
getClient: getClient,
7166
chainInfo: chainInfo,
7267

7368
modules: make(map[string]*moduleInfo),

relayer/logpoller/tx_poller.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,12 @@ func (l *AptosLogPoller) syncTransmitterTxs(ctx context.Context, transmitter apt
188188
case <-ctx.Done():
189189
return totalProcessed, ctx.Err()
190190
default:
191-
txns, err := l.client.AccountTransactions(transmitter, &sequenceNumber, &batchSize)
191+
var client aptos.AptosRpcClient
192+
client, err = l.getClient()
193+
if err != nil {
194+
return totalProcessed, fmt.Errorf("failed to get client: %w", err)
195+
}
196+
txns, err := client.AccountTransactions(transmitter, &sequenceNumber, &batchSize)
192197
if err != nil {
193198
return totalProcessed, fmt.Errorf("failed to fetch transactions: %w", err)
194199
}

relayer/txm/txm.go

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -42,21 +42,16 @@ type AptosTxm struct {
4242
done sync.WaitGroup
4343
stop chan struct{}
4444

45-
client aptos.AptosRpcClient
45+
getClient func() (aptos.AptosRpcClient, error)
4646
}
4747

4848
// TODO: Config input is not validated for sanity
4949
func New(lgr logger.Logger, keystore loop.Keystore, config Config, getClient func() (aptos.AptosRpcClient, error)) (*AptosTxm, error) {
50-
client, err := getClient()
51-
if err != nil {
52-
return nil, err
53-
}
54-
5550
return &AptosTxm{
5651
baseLogger: logger.Named(lgr, "AptosTxm"),
5752
keystore: keystore,
5853
config: config,
59-
client: client,
54+
getClient: getClient,
6055

6156
transactions: map[string]*AptosTx{},
6257
transactionsLastPruneTime: getTimestampSecs(),
@@ -469,8 +464,12 @@ func (a *AptosTxm) getTransactionAttempt(tx *AptosTx) uint64 {
469464
}
470465

471466
func (a *AptosTxm) signAndBroadcast(tx *AptosTx) {
472-
client := a.client
473467
ctxLogger := GetContexedTxLogger(a.baseLogger, tx.ID, tx.Metadata)
468+
client, err := a.getClient()
469+
if err != nil {
470+
ctxLogger.Error("Unable to sign and broadcast: failed to get client")
471+
return
472+
}
474473

475474
txStore := a.accountStore.GetTxStore(tx.FromAddress.String())
476475
if txStore == nil {
@@ -598,7 +597,11 @@ func (a *AptosTxm) confirmLoop() {
598597
}
599598

600599
func (a *AptosTxm) checkUnconfirmed() {
601-
client := a.client
600+
client, err := a.getClient()
601+
if err != nil {
602+
a.baseLogger.Error("Unable to check unconfirmed: failed to get client")
603+
return
604+
}
602605
allUnconfirmedTxs := a.accountStore.GetAllUnconfirmed()
603606

604607
for accountAddress, unconfirmedTxs := range allUnconfirmedTxs {

0 commit comments

Comments
 (0)