From 5dd99109b168bcd3999a183eb3933508f35ffd40 Mon Sep 17 00:00:00 2001 From: ChMG Date: Sun, 19 Apr 2026 14:03:04 +0200 Subject: [PATCH 1/2] fix for Prometheus stream_* metrics --- pkg/controller/model.go | 19 +++++++++++++++- pkg/controller/prom.go | 50 ++++++++++++++++++++++++++++------------- 2 files changed, 53 insertions(+), 16 deletions(-) diff --git a/pkg/controller/model.go b/pkg/controller/model.go index d962b28..aa69dc4 100644 --- a/pkg/controller/model.go +++ b/pkg/controller/model.go @@ -233,6 +233,23 @@ type A10nspInterfacesResponse struct { type StreamSummaryResponse struct { Code int `json:"code"` Streams []struct { + FlowId int `json:"flow-id"` + Name string `json:"name"` + Type string `json:"type"` + SubType string `json:"sub-type"` + Direction string `json:"direction"` + TxPPS int `json:"tx-pps"` + RxPPS int `json:"rx-pps"` + RxLoss int `json:"rx-loss"` + SessionId int `json:"session-id"` + } `json:"stream-summary"` +} + +// StreamInfoResponse response for stream-info socket command. +type StreamInfoResponse struct { + Status string `json:"status"` + Code int `json:"code"` + StreamInfo struct { FlowId int `json:"flow-id"` Name string `json:"name"` Type string `json:"type"` @@ -244,5 +261,5 @@ type StreamSummaryResponse struct { RxBytes int `json:"rx-bytes"` RxLoss int `json:"rx-loss"` SessionId int `json:"session-id"` - } `json:"stream-summary"` + } `json:"stream-info"` } diff --git a/pkg/controller/prom.go b/pkg/controller/prom.go index 54ca972..9a59ef0 100644 --- a/pkg/controller/prom.go +++ b/pkg/controller/prom.go @@ -750,31 +750,51 @@ func (p *Prom) collectInstanceA10nspInterfaces(instance string, ch chan<- promet } func (p *Prom) collectInstanceStreams(instance string, ch chan<- prometheus.Metric) { - // Invoke command. - command := SocketCommand{ + // Invoke command stream-summary. + command_summary := SocketCommand{ Command: "stream-summary", } - result, err := p.repository.Command(instance, command) + result_summary, err := p.repository.Command(instance, command_summary) if err != nil { log.Warn().Msgf("failed to execute stream-summary: %s", err.Error()) return } - // Decode response. - var cr StreamSummaryResponse - err = json.NewDecoder(strings.NewReader(string(result))).Decode(&cr) + // Decode stream-summary response. + var cr_summary StreamSummaryResponse + err = json.NewDecoder(strings.NewReader(string(result_summary))).Decode(&cr_summary) if err != nil { log.Warn().Msgf("failed to decode stream-summary: %s", err.Error()) return } - // Return Metrics. - for _, stream := range cr.Streams { - fid := strconv.Itoa(stream.FlowId) - sid := strconv.Itoa(stream.SessionId) - ch <- prometheus.MustNewConstMetric(p.StreamTxPackets, prometheus.CounterValue, float64(stream.TxPackets), instance, fid, sid, stream.Name, stream.Direction, stream.Type, stream.SubType) - ch <- prometheus.MustNewConstMetric(p.StreamTxBytes, prometheus.CounterValue, float64(stream.TxBytes), instance, fid, sid, stream.Name, stream.Direction, stream.Type, stream.SubType) - ch <- prometheus.MustNewConstMetric(p.StreamRxPackets, prometheus.CounterValue, float64(stream.RxPackets), instance, fid, sid, stream.Name, stream.Direction, stream.Type, stream.SubType) - ch <- prometheus.MustNewConstMetric(p.StreamRxBytes, prometheus.CounterValue, float64(stream.RxBytes), instance, fid, sid, stream.Name, stream.Direction, stream.Type, stream.SubType) - ch <- prometheus.MustNewConstMetric(p.StreamRxLoss, prometheus.CounterValue, float64(stream.RxLoss), instance, fid, sid, stream.Name, stream.Direction, stream.Type, stream.SubType) + // Loop stream-summary response. + for _, stream_summary := range cr_summary.Streams { + // Invoke command stream-info. + command_info := SocketCommand{ + Command: "stream-info", + Arguments: map[string]interface{}{ + "flow-id": stream_summary.FlowId, + }, + } + result_info, err := p.repository.Command(instance, command_info) + if err != nil { + log.Warn().Msgf("failed to execute stream-info: %s", err.Error()) + return + } + // Decode stream-info response. + var cr_info StreamInfoResponse + err = json.NewDecoder(strings.NewReader(string(result_info))).Decode(&cr_info) + if err != nil { + log.Warn().Msgf("failed to decode stream-info: %s", err.Error()) + return + } + // Return Metrics. + fid := strconv.Itoa(cr_info.StreamInfo.FlowId) + sid := strconv.Itoa(cr_info.StreamInfo.SessionId) + ch <- prometheus.MustNewConstMetric(p.StreamTxPackets, prometheus.CounterValue, float64(cr_info.StreamInfo.TxPackets), instance, fid, sid, cr_info.StreamInfo.Name, cr_info.StreamInfo.Direction, cr_info.StreamInfo.Type, cr_info.StreamInfo.SubType) + ch <- prometheus.MustNewConstMetric(p.StreamTxBytes, prometheus.CounterValue, float64(cr_info.StreamInfo.TxBytes), instance, fid, sid, cr_info.StreamInfo.Name, cr_info.StreamInfo.Direction, cr_info.StreamInfo.Type, cr_info.StreamInfo.SubType) + ch <- prometheus.MustNewConstMetric(p.StreamRxPackets, prometheus.CounterValue, float64(cr_info.StreamInfo.RxPackets), instance, fid, sid, cr_info.StreamInfo.Name, cr_info.StreamInfo.Direction, cr_info.StreamInfo.Type, cr_info.StreamInfo.SubType) + ch <- prometheus.MustNewConstMetric(p.StreamRxBytes, prometheus.CounterValue, float64(cr_info.StreamInfo.RxBytes), instance, fid, sid, cr_info.StreamInfo.Name, cr_info.StreamInfo.Direction, cr_info.StreamInfo.Type, cr_info.StreamInfo.SubType) + ch <- prometheus.MustNewConstMetric(p.StreamRxLoss, prometheus.CounterValue, float64(cr_info.StreamInfo.RxLoss), instance, fid, sid, cr_info.StreamInfo.Name, cr_info.StreamInfo.Direction, cr_info.StreamInfo.Type, cr_info.StreamInfo.SubType) } } From 57ca623501f1952e6179ffb348c9c6917f734a47 Mon Sep 17 00:00:00 2001 From: ChMG Date: Fri, 22 May 2026 10:11:33 +0200 Subject: [PATCH 2/2] add more values to stream metric --- pkg/controller/model.go | 29 +++++++++++------- pkg/controller/prom.go | 66 +++++++++++++++++++++++++++++++++++++---- 2 files changed, 79 insertions(+), 16 deletions(-) diff --git a/pkg/controller/model.go b/pkg/controller/model.go index aa69dc4..ac1bdba 100644 --- a/pkg/controller/model.go +++ b/pkg/controller/model.go @@ -250,16 +250,23 @@ type StreamInfoResponse struct { Status string `json:"status"` Code int `json:"code"` StreamInfo struct { - FlowId int `json:"flow-id"` - Name string `json:"name"` - Type string `json:"type"` - SubType string `json:"sub-type"` - Direction string `json:"direction"` - TxPackets int `json:"tx-packets"` - TxBytes int `json:"tx-bytes"` - RxPackets int `json:"rx-packets"` - RxBytes int `json:"rx-bytes"` - RxLoss int `json:"rx-loss"` - SessionId int `json:"session-id"` + FlowId int `json:"flow-id"` + Name string `json:"name"` + Type string `json:"type"` + SubType string `json:"sub-type"` + Direction string `json:"direction"` + TxPackets int `json:"tx-packets"` + TxBytes int `json:"tx-bytes"` + TxPPS int `json:"tx-pps"` + TxBPSL2 int `json:"tx-bps-l2"` + RxPackets int `json:"rx-packets"` + RxBytes int `json:"rx-bytes"` + RxLoss int `json:"rx-loss"` + RxPPS int `json:"rx-pps"` + RxBPSL2 int `json:"rx-bps-l2"` + RxBPSL3 int `json:"rx-bps-l3"` + RxDelayUsMin int `json:"rx-delay-us-min"` + RxDelayUsMax int `json:"rx-delay-us-max"` + SessionId int `json:"session-id"` } `json:"stream-info"` } diff --git a/pkg/controller/prom.go b/pkg/controller/prom.go index 9a59ef0..347862e 100644 --- a/pkg/controller/prom.go +++ b/pkg/controller/prom.go @@ -76,9 +76,16 @@ const ( metricIfRxLossPacketsStreams = "interfaces_rx_loss_packets_streams" metricStreamTxPackets = "stream_tx_packets" metricStreamTxBytes = "stream_tx_bytes" + metricStreamTxPPS = "stream_tx_pps" + metricStreamTxBPSL2 = "stream_tx_bps_l2" metricStreamRxPackets = "stream_rx_packets" metricStreamRxBytes = "stream_rx_bytes" metricStreamRxLoss = "stream_rx_loss" + metricStreamRxPPS = "stream_rx_pps" + metricStreamRxBPSL2 = "stream_rx_bps_l2" + metricStreamRxBPSL3 = "stream_rx_bps_l3" + metricStreamRxDelayUsMin = "stream_rx_delay_us_min" + metricStreamRxDelayUsMax = "stream_rx_delay_us_max" labelHostname = "hostname" labelInstanceName = "instance_name" @@ -158,11 +165,18 @@ type Prom struct { IfRxPPSStreams *prometheus.Desc IfRxLossPacketsStreams *prometheus.Desc // Streams. - StreamTxPackets *prometheus.Desc - StreamTxBytes *prometheus.Desc - StreamRxPackets *prometheus.Desc - StreamRxBytes *prometheus.Desc - StreamRxLoss *prometheus.Desc + StreamTxPackets *prometheus.Desc + StreamTxBytes *prometheus.Desc + StreamTxPPS *prometheus.Desc + StreamTxBPSL2 *prometheus.Desc + StreamRxPackets *prometheus.Desc + StreamRxBytes *prometheus.Desc + StreamRxLoss *prometheus.Desc + StreamRxPPS *prometheus.Desc + StreamRxBPSL2 *prometheus.Desc + StreamRxBPSL3 *prometheus.Desc + StreamRxDelayUsMin *prometheus.Desc + StreamRxDelayUsMax *prometheus.Desc } // NewProm creates a new prometheus export object. @@ -416,6 +430,14 @@ func NewProm(repository Repository) *Prom { "Stream TX bytes", []string{labelInstanceName, labelFlowId, labelSessionId, labelStreamName, labelStreamDirection, labelStreamType, labelStreamSubType}, prometheus.Labels{labelHostname: hostname}, ) + p.StreamTxPPS = prometheus.NewDesc(metricStreamTxPPS, + "Stream TX PPS", + []string{labelInstanceName, labelFlowId, labelSessionId, labelStreamName, labelStreamDirection, labelStreamType, labelStreamSubType}, prometheus.Labels{labelHostname: hostname}, + ) + p.StreamTxBPSL2 = prometheus.NewDesc(metricStreamTxBPSL2, + "Stream TX BPS L2", + []string{labelInstanceName, labelFlowId, labelSessionId, labelStreamName, labelStreamDirection, labelStreamType, labelStreamSubType}, prometheus.Labels{labelHostname: hostname}, + ) p.StreamRxPackets = prometheus.NewDesc(metricStreamRxPackets, "Stream RX packets", []string{labelInstanceName, labelFlowId, labelSessionId, labelStreamName, labelStreamDirection, labelStreamType, labelStreamSubType}, prometheus.Labels{labelHostname: hostname}, @@ -428,6 +450,26 @@ func NewProm(repository Repository) *Prom { "Stream RX loss", []string{labelInstanceName, labelFlowId, labelSessionId, labelStreamName, labelStreamDirection, labelStreamType, labelStreamSubType}, prometheus.Labels{labelHostname: hostname}, ) + p.StreamRxPPS = prometheus.NewDesc(metricStreamRxPPS, + "Stream RX PPS", + []string{labelInstanceName, labelFlowId, labelSessionId, labelStreamName, labelStreamDirection, labelStreamType, labelStreamSubType}, prometheus.Labels{labelHostname: hostname}, + ) + p.StreamRxBPSL2 = prometheus.NewDesc(metricStreamRxBPSL2, + "Stream RX BPS L2", + []string{labelInstanceName, labelFlowId, labelSessionId, labelStreamName, labelStreamDirection, labelStreamType, labelStreamSubType}, prometheus.Labels{labelHostname: hostname}, + ) + p.StreamRxBPSL3 = prometheus.NewDesc(metricStreamRxBPSL3, + "Stream RX BPS L3", + []string{labelInstanceName, labelFlowId, labelSessionId, labelStreamName, labelStreamDirection, labelStreamType, labelStreamSubType}, prometheus.Labels{labelHostname: hostname}, + ) + p.StreamRxDelayUsMin = prometheus.NewDesc(metricStreamRxDelayUsMin, + "Stream RX delay us min", + []string{labelInstanceName, labelFlowId, labelSessionId, labelStreamName, labelStreamDirection, labelStreamType, labelStreamSubType}, prometheus.Labels{labelHostname: hostname}, + ) + p.StreamRxDelayUsMax = prometheus.NewDesc(metricStreamRxDelayUsMax, + "Stream RX delay us max", + []string{labelInstanceName, labelFlowId, labelSessionId, labelStreamName, labelStreamDirection, labelStreamType, labelStreamSubType}, prometheus.Labels{labelHostname: hostname}, + ) // Register all metrics and return. p.Registry.MustRegister(p) @@ -497,9 +539,16 @@ func (p *Prom) Describe(ch chan<- *prometheus.Desc) { ch <- p.IfRxLossPacketsStreams ch <- p.StreamTxPackets ch <- p.StreamTxBytes + ch <- p.StreamTxPPS + ch <- p.StreamTxBPSL2 ch <- p.StreamRxPackets ch <- p.StreamRxBytes ch <- p.StreamRxLoss + ch <- p.StreamRxPPS + ch <- p.StreamRxBPSL2 + ch <- p.StreamRxBPSL3 + ch <- p.StreamRxDelayUsMin + ch <- p.StreamRxDelayUsMax } // Collect implements required collect function for all metrics collectors. @@ -792,9 +841,16 @@ func (p *Prom) collectInstanceStreams(instance string, ch chan<- prometheus.Metr sid := strconv.Itoa(cr_info.StreamInfo.SessionId) ch <- prometheus.MustNewConstMetric(p.StreamTxPackets, prometheus.CounterValue, float64(cr_info.StreamInfo.TxPackets), instance, fid, sid, cr_info.StreamInfo.Name, cr_info.StreamInfo.Direction, cr_info.StreamInfo.Type, cr_info.StreamInfo.SubType) ch <- prometheus.MustNewConstMetric(p.StreamTxBytes, prometheus.CounterValue, float64(cr_info.StreamInfo.TxBytes), instance, fid, sid, cr_info.StreamInfo.Name, cr_info.StreamInfo.Direction, cr_info.StreamInfo.Type, cr_info.StreamInfo.SubType) + ch <- prometheus.MustNewConstMetric(p.StreamTxPPS, prometheus.CounterValue, float64(cr_info.StreamInfo.TxPPS), instance, fid, sid, cr_info.StreamInfo.Name, cr_info.StreamInfo.Direction, cr_info.StreamInfo.Type, cr_info.StreamInfo.SubType) + ch <- prometheus.MustNewConstMetric(p.StreamTxBPSL2, prometheus.CounterValue, float64(cr_info.StreamInfo.TxBPSL2), instance, fid, sid, cr_info.StreamInfo.Name, cr_info.StreamInfo.Direction, cr_info.StreamInfo.Type, cr_info.StreamInfo.SubType) ch <- prometheus.MustNewConstMetric(p.StreamRxPackets, prometheus.CounterValue, float64(cr_info.StreamInfo.RxPackets), instance, fid, sid, cr_info.StreamInfo.Name, cr_info.StreamInfo.Direction, cr_info.StreamInfo.Type, cr_info.StreamInfo.SubType) ch <- prometheus.MustNewConstMetric(p.StreamRxBytes, prometheus.CounterValue, float64(cr_info.StreamInfo.RxBytes), instance, fid, sid, cr_info.StreamInfo.Name, cr_info.StreamInfo.Direction, cr_info.StreamInfo.Type, cr_info.StreamInfo.SubType) ch <- prometheus.MustNewConstMetric(p.StreamRxLoss, prometheus.CounterValue, float64(cr_info.StreamInfo.RxLoss), instance, fid, sid, cr_info.StreamInfo.Name, cr_info.StreamInfo.Direction, cr_info.StreamInfo.Type, cr_info.StreamInfo.SubType) + ch <- prometheus.MustNewConstMetric(p.StreamRxPPS, prometheus.CounterValue, float64(cr_info.StreamInfo.RxPPS), instance, fid, sid, cr_info.StreamInfo.Name, cr_info.StreamInfo.Direction, cr_info.StreamInfo.Type, cr_info.StreamInfo.SubType) + ch <- prometheus.MustNewConstMetric(p.StreamRxBPSL2, prometheus.CounterValue, float64(cr_info.StreamInfo.RxBPSL2), instance, fid, sid, cr_info.StreamInfo.Name, cr_info.StreamInfo.Direction, cr_info.StreamInfo.Type, cr_info.StreamInfo.SubType) + ch <- prometheus.MustNewConstMetric(p.StreamRxBPSL3, prometheus.CounterValue, float64(cr_info.StreamInfo.RxBPSL3), instance, fid, sid, cr_info.StreamInfo.Name, cr_info.StreamInfo.Direction, cr_info.StreamInfo.Type, cr_info.StreamInfo.SubType) + ch <- prometheus.MustNewConstMetric(p.StreamRxDelayUsMin, prometheus.CounterValue, float64(cr_info.StreamInfo.RxDelayUsMin), instance, fid, sid, cr_info.StreamInfo.Name, cr_info.StreamInfo.Direction, cr_info.StreamInfo.Type, cr_info.StreamInfo.SubType) + ch <- prometheus.MustNewConstMetric(p.StreamRxDelayUsMax, prometheus.CounterValue, float64(cr_info.StreamInfo.RxDelayUsMax), instance, fid, sid, cr_info.StreamInfo.Name, cr_info.StreamInfo.Direction, cr_info.StreamInfo.Type, cr_info.StreamInfo.SubType) } }