Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ gomods: ## Install gomods
go install github.com/jmank88/gomods@v0.1.6

.PHONY: tidy
tidy: gomods
tidy: gomods ## Tidy go.mod and go.sum files
gomods tidy

.PHONY: mockery
mockery: ## Install mockery.
mockery: ## Install mockery
go install github.com/vektra/mockery/v2@v2.53.3

.PHONY: codecgen
Expand All @@ -20,15 +20,20 @@ protoc: ## Install protoc
go install google.golang.org/protobuf/cmd/protoc-gen-go@`go list -m -json google.golang.org/protobuf | jq -r .Version`

.PHONY: generate
generate: gomods codecgen mockery protoc modgraph
generate: gomods codecgen mockery protoc modgraph ## Generate code for all modules
export PATH="$(HOME)/.local/bin:$(PATH)"; gomods -s gethwrappers,contracts/cre/ -go generate ./...
find . -type f -name .mockery.yaml -not -path "./contracts/" -not -path "./gethwrappers/" -execdir mockery \; ## Execute mockery for all .mockery.yaml files

.PHONY: rm-mocked
rm-mocked:
rm-mocked: ## Remove mocked code
grep -rl "^// Code generated by mockery" | grep --exclude-dir ./contracts/ --exclude-dir ./gethwrappers/ .go$ | xargs -r rm

.PHONY: modgraph
modgraph: gomods
modgraph: gomods ## Generate module graph
go install github.com/jmank88/modgraph@v0.1.0
./modgraph > go.md

.PHONY: help
help: ## Show help for all targets
@grep -E '^[a-zA-Z_-]+:.*?## .*$$' $(MAKEFILE_LIST) | sort | \
awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-30s\033[0m %s\n", $$1, $$2}'
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ require (
github.com/smartcontractkit/chainlink-framework/metrics v0.0.0-20251020150604-8ab84f7bad1a
github.com/smartcontractkit/chainlink-framework/multinode v0.0.0-20251021173435-e86785845942
github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20251124151448-0448aefdaab9
github.com/smartcontractkit/chainlink-protos/svr v1.1.0
github.com/smartcontractkit/chainlink-protos/svr v1.1.1-0.20260203131522-bb8bc5c423b3
github.com/smartcontractkit/chainlink-tron/relayer v0.0.11-0.20250815105909-75499abc4335
github.com/smartcontractkit/freeport v0.1.3-0.20250716200817-cb5dfd0e369e
github.com/smartcontractkit/libocr v0.0.0-20250912173940-f3ab0246e23d
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -640,8 +640,8 @@ github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20251124151448-0448ae
github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20251124151448-0448aefdaab9/go.mod h1:jUC52kZzEnWF9tddHh85zolKybmLpbQ1oNA4FjOHt1Q=
github.com/smartcontractkit/chainlink-protos/linking-service/go v0.0.0-20251002192024-d2ad9222409b h1:QuI6SmQFK/zyUlVWEf0GMkiUYBPY4lssn26nKSd/bOM=
github.com/smartcontractkit/chainlink-protos/linking-service/go v0.0.0-20251002192024-d2ad9222409b/go.mod h1:qSTSwX3cBP3FKQwQacdjArqv0g6QnukjV4XuzO6UyoY=
github.com/smartcontractkit/chainlink-protos/svr v1.1.0 h1:79Z9N9dMbMVRGaLoDPAQ+vOwbM+Hnx8tIN2xCPG8H4o=
github.com/smartcontractkit/chainlink-protos/svr v1.1.0/go.mod h1:TcOliTQU6r59DwG4lo3U+mFM9WWyBHGuFkkxQpvSujo=
github.com/smartcontractkit/chainlink-protos/svr v1.1.1-0.20260203131522-bb8bc5c423b3 h1:X8Pekpv+cy0eW1laZTwATuYLTLZ6gRTxz1ZWOMtU74o=
github.com/smartcontractkit/chainlink-protos/svr v1.1.1-0.20260203131522-bb8bc5c423b3/go.mod h1:TcOliTQU6r59DwG4lo3U+mFM9WWyBHGuFkkxQpvSujo=
github.com/smartcontractkit/chainlink-tron/relayer v0.0.11-0.20250815105909-75499abc4335 h1:7bxYNrPpygn8PUSBiEKn8riMd7CXMi/4bjTy0fHhcrY=
github.com/smartcontractkit/chainlink-tron/relayer v0.0.11-0.20250815105909-75499abc4335/go.mod h1:ccjEgNeqOO+bjPddnL4lUrNLzyCvGCxgBjJdhFX3wa8=
github.com/smartcontractkit/chainlink-tron/relayer/gotron-sdk v0.0.5-0.20250528121202-292529af39df h1:36e3ROIZyV/qE8SvFOACXtXfMOMd9vG4+zY2v2ScXkI=
Expand Down
4 changes: 3 additions & 1 deletion pkg/txm/clientwrappers/dualbroadcast/meta_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ type MetaClient struct {
}

func NewMetaClient(lggr logger.Logger, c MetaClientRPC, ks MetaClientKeystore, customURL *url.URL, chainID *big.Int) (*MetaClient, error) {
metrics, err := NewMetaMetrics(chainID.String())
metrics, err := NewMetaMetrics(chainID.String(), lggr)
if err != nil {
return nil, fmt.Errorf("failed to create Meta metrics: %w", err)
}
Expand Down Expand Up @@ -173,11 +173,13 @@ func (a *MetaClient) SendTransaction(ctx context.Context, tx *types.Transaction,
meta, err := a.SendRequest(ctx, tx, attempt, *meta.DualBroadcastParams, tx.ToAddress)
if err != nil {
a.metrics.RecordSendRequestError(ctx)
a.metrics.emitAtlasError(ctx, "send_request", a.customURL, err, tx)
return fmt.Errorf("error sending request for transactionID(%d): %w", tx.ID, err)
}
if meta != nil {
if err := a.SendOperation(ctx, tx, attempt, *meta); err != nil {
a.metrics.RecordSendOperationError(ctx)
a.metrics.emitAtlasError(ctx, "send_operation", a.customURL, err, tx)
return fmt.Errorf("failed to send operation for transactionID(%d): %w", tx.ID, err)
}
return nil
Expand Down
68 changes: 67 additions & 1 deletion pkg/txm/clientwrappers/dualbroadcast/meta_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,20 @@ package dualbroadcast

import (
"context"
"fmt"
"net/url"
"strconv"
"time"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-evm/pkg/txm/types"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"

"github.com/smartcontractkit/chainlink-common/pkg/beholder"
pb "github.com/smartcontractkit/chainlink-protos/svr/v1"
)

// MetaMetrics handles all Meta-related metrics via OTEL
Expand All @@ -18,10 +25,12 @@ type MetaMetrics struct {
latencyHistogram metric.Int64Histogram
bidHistogram metric.Int64Histogram
errorCounter metric.Int64Counter
emitter beholder.Emitter
lggr logger.SugaredLogger
}

// NewMetaMetrics creates a new MetaMetrics instance
func NewMetaMetrics(chainID string) (*MetaMetrics, error) {
func NewMetaMetrics(chainID string, lggr logger.Logger) (*MetaMetrics, error) {
statusCodeCounter, err := beholder.GetMeter().Int64Counter("meta_endpoint_status_codes")
if err != nil {
return nil, err
Expand All @@ -48,6 +57,8 @@ func NewMetaMetrics(chainID string) (*MetaMetrics, error) {
latencyHistogram: latencyHistogram,
bidHistogram: bidHistogram,
errorCounter: errorCounter,
emitter: beholder.GetEmitter(),
lggr: logger.Sugared(logger.Named(lggr, "Txm.MetaClient.MetaMetrics")),
}, nil
}

Expand Down Expand Up @@ -98,3 +109,58 @@ func (m *MetaMetrics) RecordSendOperationError(ctx context.Context) {
),
)
}

// emitAtlasError emits an OTel event to track FastLane Atlas errors
func (m *MetaMetrics) emitAtlasError(ctx context.Context, errType string, customURL *url.URL, cause error, tx *types.Transaction) {
var nonce string
if tx.Nonce != nil {
nonce = fmt.Sprintf("%d", *tx.Nonce)
}

meta, err := tx.GetMeta()
if err != nil {
m.lggr.Errorw(fmt.Sprintf("Failed to get meta for tx. Error to emit was: %v", cause), "txId", tx.ID, "err", err)
return
}

var destAddress string
if meta != nil && meta.FwdrDestAddress != nil {
destAddress = meta.FwdrDestAddress.String()
}

msg := &pb.FastLaneAtlasError{
ChainId: m.chainID,
FromAddress: tx.FromAddress.Hex(),
ToAddress: tx.ToAddress.Hex(),
FeedAddress: destAddress,
Nonce: nonce,
ErrorType: errType,
ErrorMessage: cause.Error(),
TransactionId: tx.ID,
AtlasUrl: customURL.String(),
CreatedAt: time.Now().UnixMicro(),
}

messageBytes, err := proto.Marshal(msg)
if err != nil {
m.lggr.Errorw("Failed to marshal Atlas error event", "err", err)
return
}

attrKVs := []any{
"beholder_domain", "svr",
"beholder_entity", "svr.v1.FastLaneAtlasError",
"beholder_data_schema", "/fastlane-atlas-error/versions/1",
}

mStr := protojson.MarshalOptions{
UseProtoNames: true,
EmitUnpopulated: true,
}.Format(msg)
m.lggr.Infow("[Beholder.emit]", "message", mStr, "attributes", attrKVs)
Copy link
Copy Markdown
Contributor

@dimriou dimriou Jan 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm on the fence as to whether this needs to be info or debug.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should only get a lot of errors in case of erroneous circumstances, then I think it'd be good to have these logs

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But since this is a node log, we get this information from other sources as well, so this just confirms that it sends the Beholder message on an info level. Anyway, as I mentioned, I don't have a strong preference so we can keep it like that.


if emitErr := m.emitter.Emit(ctx, messageBytes, attrKVs...); emitErr != nil {
m.lggr.Errorw("Failed to emit Atlas error event", "err", emitErr)
}
m.lggr.Debugw("Successfully emitted Atlas error event to Beholder", "message", mStr, "attributes", attrKVs)
}
171 changes: 169 additions & 2 deletions pkg/txm/clientwrappers/dualbroadcast/meta_metrics_test.go
Original file line number Diff line number Diff line change
@@ -1,25 +1,41 @@
package dualbroadcast

import (
"context"
"encoding/json"
"errors"
"math/big"
"net/url"
"testing"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/sqlutil"
"github.com/smartcontractkit/chainlink-evm/pkg/txm/types"
pb "github.com/smartcontractkit/chainlink-protos/svr/v1"
)

func TestMetaMetrics(t *testing.T) {
t.Parallel()
chainID := "1"

t.Run("NewMetaMetrics", func(t *testing.T) {
metrics, err := NewMetaMetrics(chainID)
t.Parallel()
metrics, err := NewMetaMetrics(chainID, logger.Test(t))
require.NoError(t, err)
assert.NotNil(t, metrics)
assert.Equal(t, chainID, metrics.chainID)
})

t.Run("RecordBasicMetrics", func(t *testing.T) {
metrics, err := NewMetaMetrics(chainID)
t.Parallel()
metrics, err := NewMetaMetrics(chainID, logger.Test(t))
require.NoError(t, err)

ctx := t.Context()
Expand All @@ -32,3 +48,154 @@ func TestMetaMetrics(t *testing.T) {
metrics.RecordSendOperationError(ctx)
})
}

// mockBeholderEmitter is a mock for beholder.Emitter
type mockBeholderEmitter struct {
mock.Mock
}

func (m *mockBeholderEmitter) Emit(ctx context.Context, body []byte, attrKVs ...any) error {
args := m.Called(ctx, body, attrKVs)
return args.Error(0)
}

func (m *mockBeholderEmitter) Close() error {
args := m.Called()
return args.Error(0)
}

func TestMetaClient_emitAtlasError(t *testing.T) {
t.Parallel()
testChainID := big.NewInt(1)
testURL, _ := url.Parse("https://atlas.example.com")
lggr := logger.Test(t)

t.Run("emits error with all fields populated", func(t *testing.T) {
t.Parallel()
mockEmitter := new(mockBeholderEmitter)
metrics, err := NewMetaMetrics(testChainID.String(), lggr)
require.NoError(t, err)
metrics.emitter = mockEmitter

u, err := url.Parse("https://example.com")
require.NoError(t, err)

nonce := uint64(450)
fwdrDestAddress := common.HexToAddress("0xCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCC")
metaBytes, err := json.Marshal(types.TxMeta{FwdrDestAddress: &fwdrDestAddress})
require.NoError(t, err)
meta := sqlutil.JSON(metaBytes)

tx := &types.Transaction{
ID: 123,
Nonce: &nonce,
FromAddress: common.HexToAddress("0xAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"),
ToAddress: common.HexToAddress("0xBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB"),
Meta: &meta,
}

var capturedBody []byte
mockEmitter.On("Emit", mock.Anything, mock.Anything, mock.Anything).
Run(func(args mock.Arguments) {
capturedBody = args.Get(1).([]byte)
}).
Return(nil)

metrics.emitAtlasError(t.Context(), "send_request", u, errors.New("test error message"), tx)

mockEmitter.AssertExpectations(t)

// Verify the emitted message
var emittedMsg pb.FastLaneAtlasError
err = proto.Unmarshal(capturedBody, &emittedMsg)
require.NoError(t, err)

assert.Equal(t, testChainID.String(), emittedMsg.ChainId)
assert.Equal(t, tx.FromAddress.Hex(), emittedMsg.FromAddress)
assert.Equal(t, tx.ToAddress.Hex(), emittedMsg.ToAddress)
assert.Equal(t, fwdrDestAddress.String(), emittedMsg.FeedAddress)
assert.Equal(t, "450", emittedMsg.Nonce)
assert.Equal(t, "send_request", emittedMsg.ErrorType)
assert.Equal(t, "test error message", emittedMsg.ErrorMessage)
assert.Equal(t, uint64(123), emittedMsg.TransactionId)
assert.Equal(t, u.String(), emittedMsg.AtlasUrl)
})

t.Run("emits error with nil nonce", func(t *testing.T) {
t.Parallel()
mockEmitter := new(mockBeholderEmitter)
metrics, err := NewMetaMetrics(testChainID.String(), lggr)
require.NoError(t, err)
metrics.emitter = mockEmitter

tx := &types.Transaction{
ID: 456,
Nonce: nil, // nil nonce
FromAddress: common.HexToAddress("0xAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"),
ToAddress: common.HexToAddress("0xBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB"),
Meta: nil,
}

var capturedBody []byte
mockEmitter.On("Emit", mock.Anything, mock.Anything, mock.Anything).
Run(func(args mock.Arguments) {
capturedBody = args.Get(1).([]byte)
}).
Return(nil)

metrics.emitAtlasError(t.Context(), "error_type", testURL, errors.New("some error"), tx)

mockEmitter.AssertExpectations(t)

var emittedMsg pb.FastLaneAtlasError
err = proto.Unmarshal(capturedBody, &emittedMsg)
require.NoError(t, err)

assert.Equal(t, "", emittedMsg.Nonce) // empty string when nonce is nil
assert.Equal(t, "", emittedMsg.FeedAddress) // empty string when meta is nil
})

t.Run("handles emit error gracefully", func(t *testing.T) {
t.Parallel()
mockEmitter := new(mockBeholderEmitter)
metrics, err := NewMetaMetrics(testChainID.String(), lggr)
require.NoError(t, err)
metrics.emitter = mockEmitter

tx := &types.Transaction{
ID: 999,
FromAddress: common.HexToAddress("0xAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"),
ToAddress: common.HexToAddress("0xBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB"),
}

mockEmitter.On("Emit", mock.Anything, mock.Anything, mock.Anything).
Return(errors.New("emit failed"))

// Should not panic, just log the error
metrics.emitAtlasError(t.Context(), "error_type", testURL, errors.New("some error"), tx)

mockEmitter.AssertExpectations(t)
})

t.Run("handles invalid meta JSON gracefully", func(t *testing.T) {
t.Parallel()
mockEmitter := new(mockBeholderEmitter)
metrics, err := NewMetaMetrics(testChainID.String(), lggr)
require.NoError(t, err)
metrics.emitter = mockEmitter

invalidJSON := sqlutil.JSON([]byte("invalid json"))
tx := &types.Transaction{
ID: 111,
FromAddress: common.HexToAddress("0xAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"),
ToAddress: common.HexToAddress("0xBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB"),
Meta: &invalidJSON,
}

// Should not call Emit because GetMeta will fail
metrics.emitAtlasError(t.Context(), "error_type", testURL, errors.New("some error"), tx)

// Emit should not be called when meta parsing fails
mockEmitter.AssertNotCalled(t, "Emit", mock.Anything, mock.Anything, mock.Anything)
})
}
Loading