|
| 1 | +package oraclecreator |
| 2 | + |
| 3 | +import ( |
| 4 | + "context" |
| 5 | + "errors" |
| 6 | + "math" |
| 7 | + "strings" |
| 8 | + "sync/atomic" |
| 9 | + |
| 10 | + "github.com/prometheus/client_golang/prometheus" |
| 11 | + prometheus_dto "github.com/prometheus/client_model/go" |
| 12 | + |
| 13 | + "github.com/smartcontractkit/chainlink-common/pkg/logger" |
| 14 | +) |
| 15 | + |
| 16 | +// ObservationMetricsPublisher is the interface for publishing observation metrics to external destinations |
| 17 | +type ObservationMetricsPublisher interface { |
| 18 | + PublishMetric(ctx context.Context, metricName string, value float64, labels map[string]string) |
| 19 | +} |
| 20 | + |
| 21 | +// ObservationMetricsCollector creates and wraps OCR3 observation metrics to intercept updates |
| 22 | +type ObservationMetricsCollector struct { |
| 23 | + logger logger.Logger |
| 24 | + publisher ObservationMetricsPublisher |
| 25 | + cancel context.CancelFunc |
| 26 | + constantLabels map[string]string // Prometheus labels (for WrapRegistererWith) |
| 27 | + beholderLabels map[string]string // Beholder labels (for metrics publishing) |
| 28 | + |
| 29 | + // Wrapped counters |
| 30 | + sentObservationsCounter *wrappedCounter |
| 31 | + includedObservationsCounter *wrappedCounter |
| 32 | +} |
| 33 | + |
| 34 | +// NewObservationMetricsCollector creates a new collector that wraps OCR3 observation metrics |
| 35 | +func NewObservationMetricsCollector( |
| 36 | + logger logger.Logger, |
| 37 | + publisher ObservationMetricsPublisher, |
| 38 | + constantLabels map[string]string, |
| 39 | + beholderLabels map[string]string, |
| 40 | +) *ObservationMetricsCollector { |
| 41 | + _, cancel := context.WithCancel(context.Background()) |
| 42 | + |
| 43 | + collector := &ObservationMetricsCollector{ |
| 44 | + logger: logger, |
| 45 | + publisher: publisher, |
| 46 | + cancel: cancel, |
| 47 | + constantLabels: constantLabels, |
| 48 | + beholderLabels: beholderLabels, |
| 49 | + } |
| 50 | + |
| 51 | + return collector |
| 52 | +} |
| 53 | + |
| 54 | +// CreateWrappedRegisterer returns a registerer that intercepts and wraps observation metrics |
| 55 | +func (c *ObservationMetricsCollector) CreateWrappedRegisterer(baseRegisterer prometheus.Registerer) prometheus.Registerer { |
| 56 | + return &interceptingRegisterer{ |
| 57 | + base: baseRegisterer, |
| 58 | + collector: c, |
| 59 | + } |
| 60 | +} |
| 61 | + |
| 62 | +// Close stops the collector |
| 63 | +func (c *ObservationMetricsCollector) Close() error { |
| 64 | + c.cancel() |
| 65 | + return nil |
| 66 | +} |
| 67 | + |
| 68 | +// wrappedCounter wraps a Prometheus collector (which may be a counter or wrappingCollector) |
| 69 | +// to intercept Collect() calls and track value changes |
| 70 | +type wrappedCounter struct { |
| 71 | + prometheus.Collector |
| 72 | + metricName string |
| 73 | + labels map[string]string // Beholder labels (for metrics publishing) |
| 74 | + publisher ObservationMetricsPublisher |
| 75 | + logger logger.Logger |
| 76 | + lastValueBits uint64 // stores float64 as bits for atomic operations |
| 77 | +} |
| 78 | + |
| 79 | +// Collect intercepts metric collection to detect counter increments |
| 80 | +func (w *wrappedCounter) Collect(ch chan<- prometheus.Metric) { |
| 81 | + // Create a channel to intercept metrics |
| 82 | + interceptCh := make(chan prometheus.Metric, 10) |
| 83 | + |
| 84 | + // Collect from the underlying collector |
| 85 | + go func() { |
| 86 | + w.Collector.Collect(interceptCh) |
| 87 | + close(interceptCh) |
| 88 | + }() |
| 89 | + |
| 90 | + // Forward metrics and track counter value |
| 91 | + for m := range interceptCh { |
| 92 | + // Try to extract the counter value from the metric |
| 93 | + var metricValue float64 |
| 94 | + if err := extractCounterValue(m, &metricValue); err == nil { |
| 95 | + // Load the last value atomically |
| 96 | + lastBits := atomic.LoadUint64(&w.lastValueBits) |
| 97 | + lastValue := math.Float64frombits(lastBits) |
| 98 | + |
| 99 | + if metricValue > lastValue { |
| 100 | + delta := metricValue - lastValue |
| 101 | + // Store the new value atomically |
| 102 | + atomic.StoreUint64(&w.lastValueBits, math.Float64bits(metricValue)) |
| 103 | + |
| 104 | + w.logger.Debugw("Observation metric incremented", |
| 105 | + "metric", w.metricName, |
| 106 | + "value", metricValue, |
| 107 | + "delta", delta, |
| 108 | + "labels", w.labels, |
| 109 | + ) |
| 110 | + |
| 111 | + if w.publisher != nil { |
| 112 | + // Publish the delta, not the cumulative value |
| 113 | + w.publisher.PublishMetric(context.Background(), w.metricName, delta, w.labels) |
| 114 | + } |
| 115 | + } |
| 116 | + } |
| 117 | + |
| 118 | + // Forward the metric to the actual channel |
| 119 | + ch <- m |
| 120 | + } |
| 121 | +} |
| 122 | + |
| 123 | +// extractCounterValue extracts the value from a prometheus.Metric |
| 124 | +// This uses the prometheus dto.Metric structure |
| 125 | +func extractCounterValue(m prometheus.Metric, value *float64) error { |
| 126 | + // Create a DTO metric to write into |
| 127 | + dto := &prometheus_dto.Metric{} |
| 128 | + if err := m.Write(dto); err != nil { |
| 129 | + return err |
| 130 | + } |
| 131 | + |
| 132 | + // Check if it's a counter |
| 133 | + if dto.Counter != nil { |
| 134 | + *value = dto.Counter.GetValue() |
| 135 | + return nil |
| 136 | + } |
| 137 | + |
| 138 | + return errors.New("metric is not a counter") |
| 139 | +} |
| 140 | + |
| 141 | +// Describe implements prometheus.Collector |
| 142 | +func (c *ObservationMetricsCollector) Describe(ch chan<- *prometheus.Desc) { |
| 143 | + if c.sentObservationsCounter != nil { |
| 144 | + c.sentObservationsCounter.Describe(ch) |
| 145 | + } |
| 146 | + if c.includedObservationsCounter != nil { |
| 147 | + c.includedObservationsCounter.Describe(ch) |
| 148 | + } |
| 149 | +} |
| 150 | + |
| 151 | +// Collect implements prometheus.Collector |
| 152 | +func (c *ObservationMetricsCollector) Collect(ch chan<- prometheus.Metric) { |
| 153 | + if c.sentObservationsCounter != nil { |
| 154 | + c.sentObservationsCounter.Collect(ch) |
| 155 | + } |
| 156 | + if c.includedObservationsCounter != nil { |
| 157 | + c.includedObservationsCounter.Collect(ch) |
| 158 | + } |
| 159 | +} |
| 160 | + |
| 161 | +// interceptingRegisterer wraps a Prometheus registerer to intercept specific metric registrations |
| 162 | +type interceptingRegisterer struct { |
| 163 | + base prometheus.Registerer |
| 164 | + collector *ObservationMetricsCollector |
| 165 | +} |
| 166 | + |
| 167 | +func (r *interceptingRegisterer) Register(c prometheus.Collector) error { |
| 168 | + // Try to intercept counter registration |
| 169 | + // This returns either our wrappedCounter (for observation metrics) |
| 170 | + // or the original collector (for other metrics) |
| 171 | + wrapped := r.maybeWrapCollector(c) |
| 172 | + |
| 173 | + // If we wrapped it with our wrappedCounter, we still need to add Prometheus labels |
| 174 | + // If we didn't wrap it, we need to add Prometheus labels to maintain existing behavior |
| 175 | + wrappedWithLabels := prometheus.WrapCollectorWith(r.collector.constantLabels, wrapped) |
| 176 | + |
| 177 | + return r.base.Register(wrappedWithLabels) |
| 178 | +} |
| 179 | + |
| 180 | +func (r *interceptingRegisterer) MustRegister(cs ...prometheus.Collector) { |
| 181 | + wrapped := make([]prometheus.Collector, len(cs)) |
| 182 | + for i, c := range cs { |
| 183 | + // Try to intercept and wrap with our custom wrapper |
| 184 | + maybeWrapped := r.maybeWrapCollector(c) |
| 185 | + // Add Prometheus labels to maintain existing behavior |
| 186 | + wrapped[i] = prometheus.WrapCollectorWith(r.collector.constantLabels, maybeWrapped) |
| 187 | + } |
| 188 | + r.base.MustRegister(wrapped...) |
| 189 | +} |
| 190 | + |
| 191 | +func (r *interceptingRegisterer) Unregister(c prometheus.Collector) bool { |
| 192 | + return r.base.Unregister(c) |
| 193 | +} |
| 194 | + |
| 195 | +// maybeWrapCollector checks if this is one of the observation counters and wraps it |
| 196 | +func (r *interceptingRegisterer) maybeWrapCollector(c prometheus.Collector) prometheus.Collector { |
| 197 | + // Check if this is a Counter by trying to extract its descriptor |
| 198 | + descChan := make(chan *prometheus.Desc, 10) |
| 199 | + go func() { |
| 200 | + c.Describe(descChan) |
| 201 | + close(descChan) |
| 202 | + }() |
| 203 | + |
| 204 | + for desc := range descChan { |
| 205 | + descString := desc.String() |
| 206 | + |
| 207 | + // We need to extract the fqName from the descriptor string |
| 208 | + // Format: Desc{fqName: "metric_name", help: "...", ...} |
| 209 | + // We'll check if the fqName matches exactly, not just contains |
| 210 | + |
| 211 | + // Check if this is one of our target metrics by matching the fqName field |
| 212 | + if strings.Contains(descString, `fqName: "ocr3_sent_observations_total"`) { |
| 213 | + r.collector.logger.Info("Wrapping ocr3_sent_observations_total counter") |
| 214 | + |
| 215 | + // Wrap the collector (whether it's a raw Counter or wrappingCollector) |
| 216 | + wrapped := &wrappedCounter{ |
| 217 | + Collector: c, |
| 218 | + metricName: "ocr3_sent_observations_total", |
| 219 | + labels: r.collector.beholderLabels, |
| 220 | + publisher: r.collector.publisher, |
| 221 | + logger: r.collector.logger, |
| 222 | + } |
| 223 | + r.collector.sentObservationsCounter = wrapped |
| 224 | + return wrapped |
| 225 | + } |
| 226 | + |
| 227 | + if strings.Contains(descString, `fqName: "ocr3_included_observations_total"`) { |
| 228 | + r.collector.logger.Info("Wrapping ocr3_included_observations_total counter") |
| 229 | + |
| 230 | + // Wrap the collector (whether it's a raw Counter or wrappingCollector) |
| 231 | + wrapped := &wrappedCounter{ |
| 232 | + Collector: c, |
| 233 | + metricName: "ocr3_included_observations_total", |
| 234 | + labels: r.collector.beholderLabels, |
| 235 | + publisher: r.collector.publisher, |
| 236 | + logger: r.collector.logger, |
| 237 | + } |
| 238 | + r.collector.includedObservationsCounter = wrapped |
| 239 | + return wrapped |
| 240 | + } |
| 241 | + } |
| 242 | + |
| 243 | + // Not a metric we care about, return as-is |
| 244 | + return c |
| 245 | +} |
0 commit comments