Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 28 additions & 4 deletions pkg/controller/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,11 +238,35 @@ type StreamSummaryResponse struct {
Type string `json:"type"`
SubType string `json:"sub-type"`
Direction string `json:"direction"`
TxPackets int `json:"tx-packets"`
TxBytes int `json:"tx-bytes"`
RxPackets int `json:"rx-packets"`
RxBytes int `json:"rx-bytes"`
TxPPS int `json:"tx-pps"`
RxPPS int `json:"rx-pps"`
RxLoss int `json:"rx-loss"`
SessionId int `json:"session-id"`
} `json:"stream-summary"`
}

// StreamInfoResponse response for stream-info socket command.
type StreamInfoResponse struct {
Status string `json:"status"`
Code int `json:"code"`
StreamInfo struct {
FlowId int `json:"flow-id"`
Name string `json:"name"`
Type string `json:"type"`
SubType string `json:"sub-type"`
Direction string `json:"direction"`
TxPackets int `json:"tx-packets"`
TxBytes int `json:"tx-bytes"`
TxPPS int `json:"tx-pps"`
TxBPSL2 int `json:"tx-bps-l2"`
RxPackets int `json:"rx-packets"`
RxBytes int `json:"rx-bytes"`
RxLoss int `json:"rx-loss"`
RxPPS int `json:"rx-pps"`
RxBPSL2 int `json:"rx-bps-l2"`
RxBPSL3 int `json:"rx-bps-l3"`
RxDelayUsMin int `json:"rx-delay-us-min"`
RxDelayUsMax int `json:"rx-delay-us-max"`
SessionId int `json:"session-id"`
} `json:"stream-info"`
}
116 changes: 96 additions & 20 deletions pkg/controller/prom.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,16 @@ const (
metricIfRxLossPacketsStreams = "interfaces_rx_loss_packets_streams"
metricStreamTxPackets = "stream_tx_packets"
metricStreamTxBytes = "stream_tx_bytes"
metricStreamTxPPS = "stream_tx_pps"
metricStreamTxBPSL2 = "stream_tx_bps_l2"
metricStreamRxPackets = "stream_rx_packets"
metricStreamRxBytes = "stream_rx_bytes"
metricStreamRxLoss = "stream_rx_loss"
metricStreamRxPPS = "stream_rx_pps"
metricStreamRxBPSL2 = "stream_rx_bps_l2"
metricStreamRxBPSL3 = "stream_rx_bps_l3"
metricStreamRxDelayUsMin = "stream_rx_delay_us_min"
metricStreamRxDelayUsMax = "stream_rx_delay_us_max"

labelHostname = "hostname"
labelInstanceName = "instance_name"
Expand Down Expand Up @@ -158,11 +165,18 @@ type Prom struct {
IfRxPPSStreams *prometheus.Desc
IfRxLossPacketsStreams *prometheus.Desc
// Streams.
StreamTxPackets *prometheus.Desc
StreamTxBytes *prometheus.Desc
StreamRxPackets *prometheus.Desc
StreamRxBytes *prometheus.Desc
StreamRxLoss *prometheus.Desc
StreamTxPackets *prometheus.Desc
StreamTxBytes *prometheus.Desc
StreamTxPPS *prometheus.Desc
StreamTxBPSL2 *prometheus.Desc
StreamRxPackets *prometheus.Desc
StreamRxBytes *prometheus.Desc
StreamRxLoss *prometheus.Desc
StreamRxPPS *prometheus.Desc
StreamRxBPSL2 *prometheus.Desc
StreamRxBPSL3 *prometheus.Desc
StreamRxDelayUsMin *prometheus.Desc
StreamRxDelayUsMax *prometheus.Desc
}

// NewProm creates a new prometheus export object.
Expand Down Expand Up @@ -416,6 +430,14 @@ func NewProm(repository Repository) *Prom {
"Stream TX bytes",
[]string{labelInstanceName, labelFlowId, labelSessionId, labelStreamName, labelStreamDirection, labelStreamType, labelStreamSubType}, prometheus.Labels{labelHostname: hostname},
)
p.StreamTxPPS = prometheus.NewDesc(metricStreamTxPPS,
"Stream TX PPS",
[]string{labelInstanceName, labelFlowId, labelSessionId, labelStreamName, labelStreamDirection, labelStreamType, labelStreamSubType}, prometheus.Labels{labelHostname: hostname},
)
p.StreamTxBPSL2 = prometheus.NewDesc(metricStreamTxBPSL2,
"Stream TX BPS L2",
[]string{labelInstanceName, labelFlowId, labelSessionId, labelStreamName, labelStreamDirection, labelStreamType, labelStreamSubType}, prometheus.Labels{labelHostname: hostname},
)
p.StreamRxPackets = prometheus.NewDesc(metricStreamRxPackets,
"Stream RX packets",
[]string{labelInstanceName, labelFlowId, labelSessionId, labelStreamName, labelStreamDirection, labelStreamType, labelStreamSubType}, prometheus.Labels{labelHostname: hostname},
Expand All @@ -428,6 +450,26 @@ func NewProm(repository Repository) *Prom {
"Stream RX loss",
[]string{labelInstanceName, labelFlowId, labelSessionId, labelStreamName, labelStreamDirection, labelStreamType, labelStreamSubType}, prometheus.Labels{labelHostname: hostname},
)
p.StreamRxPPS = prometheus.NewDesc(metricStreamRxPPS,
"Stream RX PPS",
[]string{labelInstanceName, labelFlowId, labelSessionId, labelStreamName, labelStreamDirection, labelStreamType, labelStreamSubType}, prometheus.Labels{labelHostname: hostname},
)
p.StreamRxBPSL2 = prometheus.NewDesc(metricStreamRxBPSL2,
"Stream RX BPS L2",
[]string{labelInstanceName, labelFlowId, labelSessionId, labelStreamName, labelStreamDirection, labelStreamType, labelStreamSubType}, prometheus.Labels{labelHostname: hostname},
)
p.StreamRxBPSL3 = prometheus.NewDesc(metricStreamRxBPSL3,
"Stream RX BPS L3",
[]string{labelInstanceName, labelFlowId, labelSessionId, labelStreamName, labelStreamDirection, labelStreamType, labelStreamSubType}, prometheus.Labels{labelHostname: hostname},
)
p.StreamRxDelayUsMin = prometheus.NewDesc(metricStreamRxDelayUsMin,
"Stream RX delay us min",
[]string{labelInstanceName, labelFlowId, labelSessionId, labelStreamName, labelStreamDirection, labelStreamType, labelStreamSubType}, prometheus.Labels{labelHostname: hostname},
)
p.StreamRxDelayUsMax = prometheus.NewDesc(metricStreamRxDelayUsMax,
"Stream RX delay us max",
[]string{labelInstanceName, labelFlowId, labelSessionId, labelStreamName, labelStreamDirection, labelStreamType, labelStreamSubType}, prometheus.Labels{labelHostname: hostname},
)

// Register all metrics and return.
p.Registry.MustRegister(p)
Expand Down Expand Up @@ -497,9 +539,16 @@ func (p *Prom) Describe(ch chan<- *prometheus.Desc) {
ch <- p.IfRxLossPacketsStreams
ch <- p.StreamTxPackets
ch <- p.StreamTxBytes
ch <- p.StreamTxPPS
ch <- p.StreamTxBPSL2
ch <- p.StreamRxPackets
ch <- p.StreamRxBytes
ch <- p.StreamRxLoss
ch <- p.StreamRxPPS
ch <- p.StreamRxBPSL2
ch <- p.StreamRxBPSL3
ch <- p.StreamRxDelayUsMin
ch <- p.StreamRxDelayUsMax
}

// Collect implements required collect function for all metrics collectors.
Expand Down Expand Up @@ -750,31 +799,58 @@ func (p *Prom) collectInstanceA10nspInterfaces(instance string, ch chan<- promet
}

func (p *Prom) collectInstanceStreams(instance string, ch chan<- prometheus.Metric) {
// Invoke command.
command := SocketCommand{
// Invoke command stream-summary.
command_summary := SocketCommand{
Command: "stream-summary",
}
result, err := p.repository.Command(instance, command)
result_summary, err := p.repository.Command(instance, command_summary)
if err != nil {
log.Warn().Msgf("failed to execute stream-summary: %s", err.Error())
return
}
// Decode response.
var cr StreamSummaryResponse
err = json.NewDecoder(strings.NewReader(string(result))).Decode(&cr)
// Decode stream-summary response.
var cr_summary StreamSummaryResponse
err = json.NewDecoder(strings.NewReader(string(result_summary))).Decode(&cr_summary)
if err != nil {
log.Warn().Msgf("failed to decode stream-summary: %s", err.Error())
return
}
// Return Metrics.
for _, stream := range cr.Streams {
fid := strconv.Itoa(stream.FlowId)
sid := strconv.Itoa(stream.SessionId)
ch <- prometheus.MustNewConstMetric(p.StreamTxPackets, prometheus.CounterValue, float64(stream.TxPackets), instance, fid, sid, stream.Name, stream.Direction, stream.Type, stream.SubType)
ch <- prometheus.MustNewConstMetric(p.StreamTxBytes, prometheus.CounterValue, float64(stream.TxBytes), instance, fid, sid, stream.Name, stream.Direction, stream.Type, stream.SubType)
ch <- prometheus.MustNewConstMetric(p.StreamRxPackets, prometheus.CounterValue, float64(stream.RxPackets), instance, fid, sid, stream.Name, stream.Direction, stream.Type, stream.SubType)
ch <- prometheus.MustNewConstMetric(p.StreamRxBytes, prometheus.CounterValue, float64(stream.RxBytes), instance, fid, sid, stream.Name, stream.Direction, stream.Type, stream.SubType)
ch <- prometheus.MustNewConstMetric(p.StreamRxLoss, prometheus.CounterValue, float64(stream.RxLoss), instance, fid, sid, stream.Name, stream.Direction, stream.Type, stream.SubType)
// Loop stream-summary response.
for _, stream_summary := range cr_summary.Streams {
// Invoke command stream-info.
command_info := SocketCommand{
Command: "stream-info",
Arguments: map[string]interface{}{
"flow-id": stream_summary.FlowId,
},
}
result_info, err := p.repository.Command(instance, command_info)
if err != nil {
log.Warn().Msgf("failed to execute stream-info: %s", err.Error())
return
}
// Decode stream-info response.
var cr_info StreamInfoResponse
err = json.NewDecoder(strings.NewReader(string(result_info))).Decode(&cr_info)
if err != nil {
log.Warn().Msgf("failed to decode stream-info: %s", err.Error())
return
}
// Return Metrics.
fid := strconv.Itoa(cr_info.StreamInfo.FlowId)
sid := strconv.Itoa(cr_info.StreamInfo.SessionId)
ch <- prometheus.MustNewConstMetric(p.StreamTxPackets, prometheus.CounterValue, float64(cr_info.StreamInfo.TxPackets), instance, fid, sid, cr_info.StreamInfo.Name, cr_info.StreamInfo.Direction, cr_info.StreamInfo.Type, cr_info.StreamInfo.SubType)
ch <- prometheus.MustNewConstMetric(p.StreamTxBytes, prometheus.CounterValue, float64(cr_info.StreamInfo.TxBytes), instance, fid, sid, cr_info.StreamInfo.Name, cr_info.StreamInfo.Direction, cr_info.StreamInfo.Type, cr_info.StreamInfo.SubType)
ch <- prometheus.MustNewConstMetric(p.StreamTxPPS, prometheus.CounterValue, float64(cr_info.StreamInfo.TxPPS), instance, fid, sid, cr_info.StreamInfo.Name, cr_info.StreamInfo.Direction, cr_info.StreamInfo.Type, cr_info.StreamInfo.SubType)
ch <- prometheus.MustNewConstMetric(p.StreamTxBPSL2, prometheus.CounterValue, float64(cr_info.StreamInfo.TxBPSL2), instance, fid, sid, cr_info.StreamInfo.Name, cr_info.StreamInfo.Direction, cr_info.StreamInfo.Type, cr_info.StreamInfo.SubType)
ch <- prometheus.MustNewConstMetric(p.StreamRxPackets, prometheus.CounterValue, float64(cr_info.StreamInfo.RxPackets), instance, fid, sid, cr_info.StreamInfo.Name, cr_info.StreamInfo.Direction, cr_info.StreamInfo.Type, cr_info.StreamInfo.SubType)
ch <- prometheus.MustNewConstMetric(p.StreamRxBytes, prometheus.CounterValue, float64(cr_info.StreamInfo.RxBytes), instance, fid, sid, cr_info.StreamInfo.Name, cr_info.StreamInfo.Direction, cr_info.StreamInfo.Type, cr_info.StreamInfo.SubType)
ch <- prometheus.MustNewConstMetric(p.StreamRxLoss, prometheus.CounterValue, float64(cr_info.StreamInfo.RxLoss), instance, fid, sid, cr_info.StreamInfo.Name, cr_info.StreamInfo.Direction, cr_info.StreamInfo.Type, cr_info.StreamInfo.SubType)
ch <- prometheus.MustNewConstMetric(p.StreamRxPPS, prometheus.CounterValue, float64(cr_info.StreamInfo.RxPPS), instance, fid, sid, cr_info.StreamInfo.Name, cr_info.StreamInfo.Direction, cr_info.StreamInfo.Type, cr_info.StreamInfo.SubType)
ch <- prometheus.MustNewConstMetric(p.StreamRxBPSL2, prometheus.CounterValue, float64(cr_info.StreamInfo.RxBPSL2), instance, fid, sid, cr_info.StreamInfo.Name, cr_info.StreamInfo.Direction, cr_info.StreamInfo.Type, cr_info.StreamInfo.SubType)
ch <- prometheus.MustNewConstMetric(p.StreamRxBPSL3, prometheus.CounterValue, float64(cr_info.StreamInfo.RxBPSL3), instance, fid, sid, cr_info.StreamInfo.Name, cr_info.StreamInfo.Direction, cr_info.StreamInfo.Type, cr_info.StreamInfo.SubType)
ch <- prometheus.MustNewConstMetric(p.StreamRxDelayUsMin, prometheus.CounterValue, float64(cr_info.StreamInfo.RxDelayUsMin), instance, fid, sid, cr_info.StreamInfo.Name, cr_info.StreamInfo.Direction, cr_info.StreamInfo.Type, cr_info.StreamInfo.SubType)
ch <- prometheus.MustNewConstMetric(p.StreamRxDelayUsMax, prometheus.CounterValue, float64(cr_info.StreamInfo.RxDelayUsMax), instance, fid, sid, cr_info.StreamInfo.Name, cr_info.StreamInfo.Direction, cr_info.StreamInfo.Type, cr_info.StreamInfo.SubType)
}
}

Expand Down