Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ require (
github.com/go-json-experiment/json v0.0.0-20250223041408-d3c622f1b874
github.com/go-playground/validator/v10 v10.26.0
github.com/go-viper/mapstructure/v2 v2.4.0
github.com/golang-jwt/jwt/v5 v5.2.3
github.com/golang-jwt/jwt/v5 v5.3.0
github.com/google/go-cmp v0.7.0
github.com/google/uuid v1.6.0
github.com/grafana/otel-profiling-go v0.5.1
Expand All @@ -37,7 +37,8 @@ require (
github.com/mr-tron/base58 v1.2.0
github.com/pelletier/go-toml v1.9.5
github.com/pelletier/go-toml/v2 v2.2.4
github.com/prometheus/client_golang v1.22.0
github.com/prometheus/client_golang v1.23.2
github.com/prometheus/client_model v0.6.2
github.com/santhosh-tekuri/jsonschema/v5 v5.3.1
github.com/scylladb/go-reflectx v1.0.1
github.com/shopspring/decimal v1.4.0
Expand All @@ -54,6 +55,7 @@ require (
github.com/smartcontractkit/libocr v0.0.0-20250912173940-f3ab0246e23d
github.com/stellar/go-stellar-sdk v0.5.0
github.com/stretchr/testify v1.11.1
go.opentelemetry.io/contrib/bridges/prometheus v0.68.0
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.63.0
go.opentelemetry.io/otel v1.43.0
go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc v0.12.2
Expand Down Expand Up @@ -133,9 +135,8 @@ require (
github.com/pierrec/lz4/v4 v4.1.22 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/prometheus/client_model v0.6.2 // indirect
github.com/prometheus/common v0.65.0 // indirect
github.com/prometheus/procfs v0.16.1 // indirect
github.com/prometheus/common v0.67.5 // indirect
github.com/prometheus/procfs v0.20.1 // indirect
github.com/rogpeppe/go-internal v1.14.1 // indirect
github.com/ryanuber/go-glob v1.0.0 // indirect
github.com/sanity-io/litter v1.5.5 // indirect
Expand All @@ -148,6 +149,7 @@ require (
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.43.0 // indirect
go.opentelemetry.io/proto/otlp v1.10.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.yaml.in/yaml/v2 v2.4.4 // indirect
golang.org/x/mod v0.33.0 // indirect
golang.org/x/net v0.52.0 // indirect
golang.org/x/sys v0.42.0 // indirect
Expand Down
20 changes: 12 additions & 8 deletions go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 10 additions & 8 deletions pkg/beholder/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -567,16 +567,18 @@ func newMeterProvider(cfg Config, resource *sdkresource.Resource, auth Auth, cre
if err != nil {
return nil, err
}
mp := sdkmetric.NewMeterProvider(
sdkmetric.WithReader(
sdkmetric.NewPeriodicReader(
exporter,
sdkmetric.WithInterval(cfg.MetricReaderInterval), // Default is 10s
)),

readerOpts := []sdkmetric.PeriodicReaderOption{
sdkmetric.WithInterval(cfg.MetricReaderInterval), // Default is 10s
}
for _, p := range cfg.MetricProducers {
readerOpts = append(readerOpts, sdkmetric.WithProducer(p))
}
return sdkmetric.NewMeterProvider(
sdkmetric.WithReader(sdkmetric.NewPeriodicReader(exporter, readerOpts...)),
sdkmetric.WithResource(resource),
sdkmetric.WithView(cfg.MetricViews...),
)
return mp, nil
), nil
}

// newLoggerOpts creates options for a logger exporter
Expand Down
1 change: 1 addition & 0 deletions pkg/beholder/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type Config struct {
MetricViews []metric.View
// MetricCompressor sets the gRPC compressor for metrics. Valid values: "gzip" (default), "none".
MetricCompressor string
MetricProducers []metric.Producer // For example, a prometheus bridge

// Custom Events via Chip Ingress Emitter
ChipIngressEmitterEnabled bool
Expand Down
32 changes: 25 additions & 7 deletions pkg/beholder/config_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
package beholder_test

import (
_ "embed"
"encoding/json"
"flag"
"fmt"
"os"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
otelattr "go.opentelemetry.io/otel/attribute"
"go.uber.org/zap/zapcore"

Expand All @@ -14,7 +21,12 @@ const (
packageName = "beholder"
)

func ExampleConfig() {
//go:embed testdata/config-example.json
var configExample string

var update = flag.Bool("update", false, "update golden test files")

func TestConfig(t *testing.T) {
config := beholder.Config{
InsecureConnection: true,
CACertFile: "",
Expand All @@ -31,7 +43,7 @@ func ExampleConfig() {
EmitterExportInterval: 1 * time.Second,
EmitterMaxQueueSize: 2048,
// true uses batched async export for custom messages.
EmitterBatchProcessor: true,
EmitterBatchProcessor: true,
// OTel message log exporter retry config
LogRetryConfig: nil,
// Trace
Expand Down Expand Up @@ -60,14 +72,20 @@ func ExampleConfig() {
AuthKeySigner: nil,
AuthHeadersTTL: 0,
}
fmt.Printf("%+v\n", config)

b, err := json.MarshalIndent(config, "", " ")
require.NoError(t, err)

if *update {
require.NoError(t, os.WriteFile("testdata/config-example.json", b, 0644))
} else {
assert.Equal(t, configExample, string(b))
}

config.LogRetryConfig = &beholder.RetryConfig{
InitialInterval: 5 * time.Second,
MaxInterval: 30 * time.Second,
MaxElapsedTime: 1 * time.Minute, // Set to zero to disable retry
}
fmt.Printf("%+v\n", *config.LogRetryConfig)
// Output:
// {InsecureConnection:true CACertFile: OtelExporterGRPCEndpoint:localhost:4317 OtelExporterHTTPEndpoint:localhost:4318 ResourceAttributes:[{Key:package_name Value:{vtype:4 numeric:0 stringly:beholder slice:<nil>}} {Key:sender Value:{vtype:4 numeric:0 stringly:beholderclient slice:<nil>}}] EmitterExportTimeout:1s EmitterExportInterval:1s EmitterExportMaxBatchSize:512 EmitterMaxQueueSize:2048 EmitterBatchProcessor:true TraceSampleRatio:1 TraceBatchTimeout:1s TraceSpanExporter:<nil> TraceRetryConfig:<nil> TraceCompressor:gzip MetricReaderInterval:1s MetricRetryConfig:<nil> MetricViews:[] MetricCompressor:gzip ChipIngressEmitterEnabled:false ChipIngressEmitterGRPCEndpoint: ChipIngressInsecureConnection:false ChipIngressBatchEmitterEnabled:false ChipIngressBufferSize:0 ChipIngressMaxBatchSize:0 ChipIngressSendInterval:0s ChipIngressSendTimeout:0s ChipIngressDrainTimeout:0s ChipIngressMaxConcurrentSends:0 ChipIngressLogger:<nil> LogExportTimeout:1s LogExportInterval:1s LogExportMaxBatchSize:512 LogMaxQueueSize:2048 LogBatchProcessor:true LogRetryConfig:<nil> LogStreamingEnabled:false LogLevel:info LogCompressor:gzip AuthHeaders:map[] AuthHeadersTTL:0s AuthKeySigner:<nil> AuthPublicKeyHex:}
// {InitialInterval:5s MaxInterval:30s MaxElapsedTime:1m0s}
assert.Equal(t, "{InitialInterval:5s MaxInterval:30s MaxElapsedTime:1m0s}", fmt.Sprintf("%+v", *config.LogRetryConfig))
}
61 changes: 61 additions & 0 deletions pkg/beholder/testdata/config-example.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
{
"InsecureConnection": true,
"CACertFile": "",
"OtelExporterGRPCEndpoint": "localhost:4317",
"OtelExporterHTTPEndpoint": "localhost:4318",
"ResourceAttributes": [
{
"Key": "package_name",
"Value": {
"Type": "STRING",
"Value": "beholder"
}
},
{
"Key": "sender",
"Value": {
"Type": "STRING",
"Value": "beholderclient"
}
}
],
"EmitterExportTimeout": 1000000000,
"EmitterExportInterval": 1000000000,
"EmitterExportMaxBatchSize": 512,
"EmitterMaxQueueSize": 2048,
"EmitterBatchProcessor": true,
"TraceSampleRatio": 1,
"TraceBatchTimeout": 1000000000,
"TraceSpanExporter": null,
"TraceRetryConfig": null,
"TraceCompressor": "gzip",
"MetricReaderInterval": 1000000000,
"MetricRetryConfig": null,
"MetricViews": null,
"MetricCompressor": "gzip",
"MetricProducers": null,
"ChipIngressEmitterEnabled": false,
"ChipIngressEmitterGRPCEndpoint": "",
"ChipIngressInsecureConnection": false,
"ChipIngressBatchEmitterEnabled": false,
"ChipIngressBufferSize": 0,
"ChipIngressMaxBatchSize": 0,
"ChipIngressSendInterval": 0,
"ChipIngressSendTimeout": 0,
"ChipIngressDrainTimeout": 0,
"ChipIngressMaxConcurrentSends": 0,
"ChipIngressLogger": null,
"LogExportTimeout": 1000000000,
"LogExportInterval": 1000000000,
"LogExportMaxBatchSize": 512,
"LogMaxQueueSize": 2048,
"LogBatchProcessor": true,
"LogRetryConfig": null,
"LogStreamingEnabled": false,
"LogLevel": "info",
"LogCompressor": "gzip",
"AuthHeaders": {},
"AuthHeadersTTL": 0,
"AuthKeySigner": null,
"AuthPublicKeyHex": ""
}
11 changes: 11 additions & 0 deletions pkg/loop/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ const (
envTelemetryLogMaxQueueSize = "CL_TELEMETRY_LOG_MAX_QUEUE_SIZE"
envTelemetryTraceCompressor = "CL_TELEMETRY_TRACE_COMPRESSOR"
envTelemetryMetricCompressor = "CL_TELEMETRY_METRIC_COMPRESSOR"
envTelemetryPrometheusBridgeEnabled = "CL_TELEMETRY_PROMETHEUS_BRIDGE_ENABLED"
envTelemetryPrometheusBridgePrefixes = "CL_TELEMETRY_PROMETHEUS_BRIDGE_PREFIXES"
envTelemetryLogCompressor = "CL_TELEMETRY_LOG_COMPRESSOR"

envChipIngressEndpoint = "CL_CHIP_INGRESS_ENDPOINT"
Expand Down Expand Up @@ -164,6 +166,8 @@ type EnvConfig struct {
TelemetryLogMaxQueueSize int
TelemetryTraceCompressor string
TelemetryMetricCompressor string
TelemetryPrometheusBridgeEnabled bool
TelemetryPrometheusBridgePrefixes []string
TelemetryLogCompressor string

TracingEnabled bool
Expand Down Expand Up @@ -255,6 +259,8 @@ func (e *EnvConfig) AsCmdEnv() (env []string) {
add(envTelemetryLogMaxQueueSize, strconv.Itoa(e.TelemetryLogMaxQueueSize))
add(envTelemetryTraceCompressor, e.TelemetryTraceCompressor)
add(envTelemetryMetricCompressor, e.TelemetryMetricCompressor)
add(envTelemetryPrometheusBridgeEnabled, strconv.FormatBool(e.TelemetryPrometheusBridgeEnabled))
add(envTelemetryPrometheusBridgePrefixes, strings.Join(e.TelemetryPrometheusBridgePrefixes, ","))
add(envTelemetryLogCompressor, e.TelemetryLogCompressor)

add(envChipIngressEndpoint, e.ChipIngressEndpoint)
Expand Down Expand Up @@ -484,6 +490,11 @@ func (e *EnvConfig) parse() error {
}
e.TelemetryTraceCompressor = os.Getenv(envTelemetryTraceCompressor)
e.TelemetryMetricCompressor = os.Getenv(envTelemetryMetricCompressor)
e.TelemetryPrometheusBridgeEnabled, err = getBool(envTelemetryPrometheusBridgeEnabled)
if err != nil {
return fmt.Errorf("failed to parse %s: %w", envTelemetryPrometheusBridgeEnabled, err)
}
e.TelemetryPrometheusBridgePrefixes = strings.Split(os.Getenv(envTelemetryPrometheusBridgePrefixes), ",")
e.TelemetryLogCompressor = os.Getenv(envTelemetryLogCompressor)
// Optional
e.ChipIngressEndpoint = os.Getenv(envChipIngressEndpoint)
Expand Down
6 changes: 6 additions & 0 deletions pkg/loop/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ func TestEnvConfig_parse(t *testing.T) {
envTelemetryEmitterExportMaxBatchSize: "100",
envTelemetryEmitterMaxQueueSize: "1000",
envTelemetryLogStreamingEnabled: "false",
envTelemetryPrometheusBridgeEnabled: "true",
envTelemetryPrometheusBridgePrefixes: "foo,bar",

envChipIngressEndpoint: "chip-ingress.example.com:50051",
envChipIngressInsecureConnection: "true",
Expand Down Expand Up @@ -195,6 +197,8 @@ var envCfgFull = EnvConfig{
TelemetryEmitterExportMaxBatchSize: 100,
TelemetryEmitterMaxQueueSize: 1000,
TelemetryLogStreamingEnabled: false,
TelemetryPrometheusBridgeEnabled: true,
TelemetryPrometheusBridgePrefixes: []string{"foo", "bar"},

ChipIngressEndpoint: "chip-ingress.example.com:50051",
ChipIngressInsecureConnection: true,
Expand Down Expand Up @@ -257,6 +261,8 @@ func TestEnvConfig_AsCmdEnv(t *testing.T) {
assert.Equal(t, "100", got[envTelemetryEmitterExportMaxBatchSize])
assert.Equal(t, "1000", got[envTelemetryEmitterMaxQueueSize])
assert.Equal(t, "false", got[envTelemetryLogStreamingEnabled])
assert.Equal(t, "true", got[envTelemetryPrometheusBridgeEnabled])
assert.Equal(t, "foo,bar", got[envTelemetryPrometheusBridgePrefixes])

// Assert ChipIngress environment variables
assert.Equal(t, "chip-ingress.example.com:50051", got[envChipIngressEndpoint])
Expand Down
13 changes: 13 additions & 0 deletions pkg/loop/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
otelpyroscope "github.com/grafana/otel-profiling-go"
"github.com/grafana/pyroscope-go"
"github.com/jmoiron/sqlx"
"github.com/prometheus/client_golang/prometheus"
prombridge "go.opentelemetry.io/contrib/bridges/prometheus"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
Expand All @@ -29,6 +31,7 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/settings/limits"
"github.com/smartcontractkit/chainlink-common/pkg/sqlutil"
"github.com/smartcontractkit/chainlink-common/pkg/sqlutil/pg"
"github.com/smartcontractkit/chainlink-common/pkg/sqlutil/promutil"
)

// NewStartedServer returns a started Server.
Expand Down Expand Up @@ -186,6 +189,16 @@ func (s *Server) start(opts ...ServerOpt) error {
MetricCompressor: s.EnvConfig.TelemetryMetricCompressor,
}

if s.EnvConfig.TelemetryPrometheusBridgeEnabled {
var bridgeOpts []prombridge.Option
if prefixes := s.EnvConfig.TelemetryPrometheusBridgePrefixes; len(prefixes) > 0 {
bridgeOpts = append(bridgeOpts, prombridge.WithGatherer(
&promutil.PrefixGatherer{Gatherer: prometheus.DefaultGatherer, Prefixes: prefixes}),
)
}
beholderCfg.MetricProducers = append(beholderCfg.MetricProducers, prombridge.NewMetricProducer(bridgeOpts...))
}

// Configure beholder auth - the client will determine rotating vs static mode
// Rotating mode: when AuthHeadersTTL is set, client creates internal lazySigner
// Static mode: no TTL is provided it is assumed that the headers are static
Expand Down
34 changes: 34 additions & 0 deletions pkg/sqlutil/promutil/promutil.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package promutil

import (
"strings"

"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
)

// PrefixGatherer is [prometheus.Gatherer] that will only produce metrics matching one of the prefixes.
type PrefixGatherer struct {
Gatherer prometheus.Gatherer
Prefixes []string
}

func (g *PrefixGatherer) Gather() ([]*dto.MetricFamily, error) {
var ret []*dto.MetricFamily
all, err := g.Gatherer.Gather()
if err != nil {
return nil, err
Comment on lines +16 to +20
}
for _, m := range all {
if m.Name == nil {
continue
}
for _, prefix := range g.Prefixes {
if strings.HasPrefix(*m.Name, prefix) {
ret = append(ret, m)
break
}
Comment on lines +26 to +30
}
}
return ret, nil
}
Loading