Skip to content
277 changes: 173 additions & 104 deletions internal/knowledge/kpis/plugins/compute/resource_capacity_kvm.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,12 @@ import (
"strconv"
"strings"

"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/cobaltcore-dev/cortex/api/v1alpha1"
"github.com/cobaltcore-dev/cortex/internal/knowledge/db"
"github.com/cobaltcore-dev/cortex/internal/knowledge/kpis/plugins"
"github.com/cobaltcore-dev/cortex/pkg/conf"
Expand All @@ -29,14 +32,17 @@ func getBuildingBlock(hostName string) string {
return "unknown"
}

// hostReservationResources holds aggregated CPU and memory reservation quantities for a single hypervisor.
type hostReservationResources struct {
cpu resource.Quantity
memory resource.Quantity
}

type KVMResourceCapacityKPI struct {
// Common base for all KPIs that provides standard functionality.
plugins.BaseKPI[struct{}] // No options passed through yaml config
utilizedCapacityPerHost *prometheus.Desc
paygCapacityPerHost *prometheus.Desc
failoverCapacityPerHost *prometheus.Desc
reservedCapacityPerHost *prometheus.Desc
totalCapacityPerHost *prometheus.Desc
capacityPerHost *prometheus.Desc
}

func (KVMResourceCapacityKPI) GetName() string {
Expand All @@ -47,60 +53,9 @@ func (k *KVMResourceCapacityKPI) Init(db *db.DB, client client.Client, opts conf
if err := k.BaseKPI.Init(db, client, opts); err != nil {
return err
}
k.utilizedCapacityPerHost = prometheus.NewDesc(
"cortex_kvm_host_capacity_utilized",
"Utilized resources on the KVM hosts (individually by host).",
[]string{
"compute_host",
"resource",
"availability_zone",
"building_block",
"cpu_architecture",
"workload_type",
"enabled",
"decommissioned",
"external_customer",
"maintenance",
},
nil,
)
k.paygCapacityPerHost = prometheus.NewDesc(
"cortex_kvm_host_capacity_payg",
"PAYG resources available on the KVM hosts (individually by host).",
[]string{
"compute_host",
"resource",
"availability_zone",
"building_block",
"cpu_architecture",
"workload_type",
"enabled",
"decommissioned",
"external_customer",
"maintenance",
},
nil,
)
k.reservedCapacityPerHost = prometheus.NewDesc(
"cortex_kvm_host_capacity_reserved",
"Reserved resources on the KVM hosts (individually by host).",
[]string{
"compute_host",
"resource",
"availability_zone",
"building_block",
"cpu_architecture",
"workload_type",
"enabled",
"decommissioned",
"external_customer",
"maintenance",
},
nil,
)
k.failoverCapacityPerHost = prometheus.NewDesc(
"cortex_kvm_host_capacity_failover",
"Failover resources on the KVM hosts (individually by host).",
k.totalCapacityPerHost = prometheus.NewDesc(
"cortex_kvm_host_capacity_total",
"Total resource capacity on the KVM hosts (individually by host).",
[]string{
"compute_host",
"resource",
Expand All @@ -115,12 +70,13 @@ func (k *KVMResourceCapacityKPI) Init(db *db.DB, client client.Client, opts conf
},
nil,
)
k.totalCapacityPerHost = prometheus.NewDesc(
"cortex_kvm_host_capacity_total",
"Total resources on the KVM hosts (individually by host).",
k.capacityPerHost = prometheus.NewDesc(
"cortex_kvm_host_capacity_usage",
"Resource capacity usage on the KVM hosts (individually by host).",
[]string{
"compute_host",
"resource",
"type",
"availability_zone",
"building_block",
"cpu_architecture",
Expand All @@ -136,23 +92,96 @@ func (k *KVMResourceCapacityKPI) Init(db *db.DB, client client.Client, opts conf
}

func (k *KVMResourceCapacityKPI) Describe(ch chan<- *prometheus.Desc) {
ch <- k.utilizedCapacityPerHost
ch <- k.paygCapacityPerHost
ch <- k.reservedCapacityPerHost
ch <- k.failoverCapacityPerHost
ch <- k.totalCapacityPerHost
ch <- k.capacityPerHost
}

// aggregateReservationsByHost groups Ready reservations by host, returning per-host
// failover totals and committed-resource "not yet in use" totals.
func aggregateReservationsByHost(reservations []v1alpha1.Reservation) (
failoverByHost map[string]hostReservationResources,
committedNotInUseByHost map[string]hostReservationResources,
) {

failoverByHost = make(map[string]hostReservationResources)
committedNotInUseByHost = make(map[string]hostReservationResources)

for _, reservation := range reservations {
if reservation.Spec.SchedulingDomain != v1alpha1.SchedulingDomainNova {
continue
}

readyCondition := meta.FindStatusCondition(reservation.Status.Conditions, v1alpha1.ReservationConditionReady)
if readyCondition == nil || readyCondition.Status != metav1.ConditionTrue {
continue
}

host := reservation.Status.Host
if host == "" {
continue
}

switch reservation.Spec.Type {
case v1alpha1.ReservationTypeFailover:
entry := failoverByHost[host]
cpuQty := reservation.Spec.Resources[hv1.ResourceCPU]
entry.cpu.Add(cpuQty)
memQty := reservation.Spec.Resources[hv1.ResourceMemory]
entry.memory.Add(memQty)
failoverByHost[host] = entry

case v1alpha1.ReservationTypeCommittedResource:
// Total reserved resources for this reservation.
cpuTotal := reservation.Spec.Resources[hv1.ResourceCPU]
memTotal := reservation.Spec.Resources[hv1.ResourceMemory]

// Sum allocated resources across all workloads.
var cpuAllocated, memAllocated resource.Quantity
if reservation.Spec.CommittedResourceReservation != nil {
for _, alloc := range reservation.Spec.CommittedResourceReservation.Allocations {
cpuAllocated.Add(alloc.Resources[hv1.ResourceCPU])
memAllocated.Add(alloc.Resources[hv1.ResourceMemory])
}
}

// Not yet in use = total - allocated, clamped to zero.
cpuNotInUse := cpuTotal.DeepCopy()
cpuNotInUse.Sub(cpuAllocated)
if cpuNotInUse.Cmp(resource.MustParse("0")) < 0 {
cpuNotInUse = resource.MustParse("0")
}

memNotInUse := memTotal.DeepCopy()
memNotInUse.Sub(memAllocated)
if memNotInUse.Cmp(resource.MustParse("0")) < 0 {
memNotInUse = resource.MustParse("0")
}

entry := committedNotInUseByHost[host]
entry.cpu.Add(cpuNotInUse)
entry.memory.Add(memNotInUse)
committedNotInUseByHost[host] = entry
}
}

return failoverByHost, committedNotInUseByHost
}

func (k *KVMResourceCapacityKPI) Collect(ch chan<- prometheus.Metric) {
// The hypervisor resource auto-discovers its current utilization.
// We can use the hypervisor status to calculate the total capacity
// and then subtract the actual resource allocation from virtual machines.
hvs := &hv1.HypervisorList{}
if err := k.Client.List(context.Background(), hvs); err != nil {
slog.Error("failed to list hypervisors", "error", err)
return
}

reservations := &v1alpha1.ReservationList{}
if err := k.Client.List(context.Background(), reservations); err != nil {
slog.Error("failed to list reservations", "error", err)
return
}

failoverByHost, committedNotInUseByHost := aggregateReservationsByHost(reservations.Items)

for _, hypervisor := range hvs.Items {
if hypervisor.Status.EffectiveCapacity == nil {
slog.Warn("hypervisor with nil effective capacity, skipping", "host", hypervisor.Name)
Expand Down Expand Up @@ -182,27 +211,28 @@ func (k *KVMResourceCapacityKPI) Collect(ch chan<- prometheus.Metric) {
ramUsed = resource.MustParse("0")
}

exportCapacityMetricKVM(ch, k.totalCapacityPerHost, "cpu", cpuTotal.AsApproximateFloat64(), hypervisor)
exportCapacityMetricKVM(ch, k.totalCapacityPerHost, "ram", ramTotal.AsApproximateFloat64(), hypervisor)
// Get reservation data for this hypervisor (zero-value if absent).
failoverRes := failoverByHost[hypervisor.Name]
committedRes := committedNotInUseByHost[hypervisor.Name]

cpuReserved := committedRes.cpu
ramReserved := committedRes.memory
cpuFailover := failoverRes.cpu
ramFailover := failoverRes.memory

exportCapacityMetricKVM(ch, k.utilizedCapacityPerHost, "cpu", cpuUsed.AsApproximateFloat64(), hypervisor)
exportCapacityMetricKVM(ch, k.utilizedCapacityPerHost, "ram", ramUsed.AsApproximateFloat64(), hypervisor)
labels := hostLabelsFromHypervisor(hypervisor)

// WARNING: Using dummy data for now.
// TODO Replace with actual data from reservations capacity CRDs
cpuReserved := resource.MustParse("100")
ramReserved := resource.MustParse("1Gi")
k.emitTotal(ch, "cpu", cpuTotal.AsApproximateFloat64(), labels)
k.emitTotal(ch, "ram", ramTotal.AsApproximateFloat64(), labels)

exportCapacityMetricKVM(ch, k.reservedCapacityPerHost, "cpu", cpuReserved.AsApproximateFloat64(), hypervisor)
exportCapacityMetricKVM(ch, k.reservedCapacityPerHost, "ram", ramReserved.AsApproximateFloat64(), hypervisor)
k.emitUsage(ch, "cpu", cpuUsed.AsApproximateFloat64(), "utilized", labels)
k.emitUsage(ch, "ram", ramUsed.AsApproximateFloat64(), "utilized", labels)

// WARNING: Using dummy data for now.
// TODO Replace with actual data from failover capacity CRDs
cpuFailover := resource.MustParse("100")
ramFailover := resource.MustParse("1Gi")
k.emitUsage(ch, "cpu", cpuReserved.AsApproximateFloat64(), "reserved", labels)
k.emitUsage(ch, "ram", ramReserved.AsApproximateFloat64(), "reserved", labels)

exportCapacityMetricKVM(ch, k.failoverCapacityPerHost, "cpu", cpuFailover.AsApproximateFloat64(), hypervisor)
exportCapacityMetricKVM(ch, k.failoverCapacityPerHost, "ram", ramFailover.AsApproximateFloat64(), hypervisor)
k.emitUsage(ch, "cpu", cpuFailover.AsApproximateFloat64(), "failover", labels)
k.emitUsage(ch, "ram", ramFailover.AsApproximateFloat64(), "failover", labels)

// Calculate PAYG capacity
paygCPU := cpuTotal.DeepCopy()
Expand All @@ -215,21 +245,27 @@ func (k *KVMResourceCapacityKPI) Collect(ch chan<- prometheus.Metric) {
paygRAM.Sub(ramReserved)
paygRAM.Sub(ramFailover)

exportCapacityMetricKVM(ch, k.paygCapacityPerHost, "cpu", paygCPU.AsApproximateFloat64(), hypervisor)
exportCapacityMetricKVM(ch, k.paygCapacityPerHost, "ram", paygRAM.AsApproximateFloat64(), hypervisor)
k.emitUsage(ch, "cpu", paygCPU.AsApproximateFloat64(), "payg", labels)
k.emitUsage(ch, "ram", paygRAM.AsApproximateFloat64(), "payg", labels)
}
}

func exportCapacityMetricKVM(ch chan<- prometheus.Metric, metric *prometheus.Desc, resource string, value float64, hypervisor hv1.Hypervisor) {
bb := getBuildingBlock(hypervisor.Name)

availabilityZone := hypervisor.Labels["topology.kubernetes.io/zone"]
// kvmHostLabels holds precomputed label values derived from a hypervisor.
type kvmHostLabels struct {
computeHost string
availabilityZone string
buildingBlock string
cpuArchitecture string
workloadType string
enabled string
decommissioned string
externalCustomer string
maintenance string
}

enabled := true
func hostLabelsFromHypervisor(hypervisor hv1.Hypervisor) kvmHostLabels {
decommissioned := false
externalCustomer := false
maintenance := false

workloadType := "general-purpose"
cpuArchitecture := "cascade-lake"

Expand All @@ -246,19 +282,52 @@ func exportCapacityMetricKVM(ch chan<- prometheus.Metric, metric *prometheus.Des
}
}

return kvmHostLabels{
computeHost: hypervisor.Name,
availabilityZone: hypervisor.Labels["topology.kubernetes.io/zone"],
buildingBlock: getBuildingBlock(hypervisor.Name),
cpuArchitecture: cpuArchitecture,
workloadType: workloadType,
enabled: strconv.FormatBool(true),
decommissioned: strconv.FormatBool(decommissioned),
externalCustomer: strconv.FormatBool(externalCustomer),
maintenance: strconv.FormatBool(false),
}
}

func (k *KVMResourceCapacityKPI) emitTotal(ch chan<- prometheus.Metric, resourceName string, value float64, l kvmHostLabels) {
ch <- prometheus.MustNewConstMetric(
k.totalCapacityPerHost,
prometheus.GaugeValue,
value,
l.computeHost,
resourceName,
l.availabilityZone,
l.buildingBlock,
l.cpuArchitecture,
l.workloadType,
l.enabled,
l.decommissioned,
l.externalCustomer,
l.maintenance,
)
}

func (k *KVMResourceCapacityKPI) emitUsage(ch chan<- prometheus.Metric, resourceName string, value float64, capacityType string, l kvmHostLabels) {
ch <- prometheus.MustNewConstMetric(
metric,
k.capacityPerHost,
prometheus.GaugeValue,
value,
hypervisor.Name,
resource,
availabilityZone,
bb,
cpuArchitecture,
workloadType,
strconv.FormatBool(enabled),
strconv.FormatBool(decommissioned),
strconv.FormatBool(externalCustomer),
strconv.FormatBool(maintenance),
l.computeHost,
resourceName,
capacityType,
l.availabilityZone,
l.buildingBlock,
l.cpuArchitecture,
l.workloadType,
l.enabled,
l.decommissioned,
l.externalCustomer,
l.maintenance,
)
}
Loading
Loading