Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions api/v1alpha1/reservation_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@ const (
ReservationTypeLabelFailover = "failover"
)

// Annotation keys for Reservation metadata.
const (
// AnnotationCreatorRequestID tracks the request ID that created this reservation.
// Used for end-to-end traceability across API calls, controller reconciles, and scheduler invocations.
AnnotationCreatorRequestID = "reservations.cortex.cloud/creator-request-id"
)

// CommittedResourceAllocation represents a workload's assignment to a committed resource reservation slot.
// The workload could be a VM (Nova/IronCore), Pod (Kubernetes), or other resource.
type CommittedResourceAllocation struct {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func (api *HTTPAPI) processCommitmentChanges(ctx context.Context, w http.Respons
manager := NewReservationManager(api.client)
requireRollback := false
failedCommitments := make(map[string]string) // commitmentUUID to reason for failure, for better response messages in case of rollback
logger.Info("processing commitment change request", "availabilityZone", req.AZ, "dryRun", req.DryRun, "affectedProjects", len(req.ByProject))
creatorRequestID := reservations.GlobalRequestIDFromContext(ctx)

knowledge := &reservations.FlavorGroupKnowledgeClient{Client: api.client}
flavorGroups, err := knowledge.GetAllFlavorGroups(ctx, nil)
Expand Down Expand Up @@ -194,8 +194,7 @@ ProcessLoop:
}

for _, commitment := range resourceChanges.Commitments {
// Additional per-commitment validation if needed
logger.Info("processing commitment change", "commitmentUUID", commitment.UUID, "projectID", projectID, "resourceName", resourceName, "oldStatus", commitment.OldStatus.UnwrapOr("none"), "newStatus", commitment.NewStatus.UnwrapOr("none"))
logger.V(1).Info("processing commitment", "commitmentUUID", commitment.UUID, "oldStatus", commitment.OldStatus.UnwrapOr("none"), "newStatus", commitment.NewStatus.UnwrapOr("none"))

// TODO add configurable upper limit validation for commitment size (number of instances) to prevent excessive reservation creation
// TODO add domain
Expand Down Expand Up @@ -247,8 +246,10 @@ ProcessLoop:
requireRollback = true
break ProcessLoop
}
// Set creator request ID for traceability across controller reconciles
stateDesired.CreatorRequestID = creatorRequestID

logger.Info("applying commitment state change", "commitmentUUID", commitment.UUID, "oldState", stateBefore, "desiredState", stateDesired)
logger.V(1).Info("applying commitment state change", "commitmentUUID", commitment.UUID, "oldMemory", stateBefore.TotalMemoryBytes, "desiredMemory", stateDesired.TotalMemoryBytes)

touchedReservations, deletedReservations, err := manager.ApplyCommitmentState(ctx, logger, stateDesired, flavorGroups, "changeCommitmentsApi")
if err != nil {
Expand All @@ -257,7 +258,7 @@ ProcessLoop:
requireRollback = true
break ProcessLoop
}
logger.Info("applied commitment state change", "commitmentUUID", commitment.UUID, "touchedReservations", len(touchedReservations), "deletedReservations", len(deletedReservations))
logger.V(1).Info("applied commitment state change", "commitmentUUID", commitment.UUID, "touchedReservations", len(touchedReservations), "deletedReservations", len(deletedReservations))
reservationsToWatch = append(reservationsToWatch, touchedReservations...)
}
}
Expand Down Expand Up @@ -318,6 +319,7 @@ ProcessLoop:
}

// watchReservationsUntilReady polls until all reservations reach Ready=True or timeout.
// Returns failed reservations and any errors encountered.
func watchReservationsUntilReady(
ctx context.Context,
logger logr.Logger,
Expand All @@ -332,19 +334,32 @@ func watchReservationsUntilReady(
}

deadline := time.Now().Add(timeout)
startTime := time.Now()
totalReservations := len(reservations)

reservationsToWatch := make([]v1alpha1.Reservation, len(reservations))
copy(reservationsToWatch, reservations)

// Track successful reservations for summary
var successfulReservations []string
pollCount := 0

for {
pollCount++
var stillWaiting []v1alpha1.Reservation
if time.Now().After(deadline) {
errors = append(errors, fmt.Errorf("timeout after %v waiting for reservations to become ready", timeout))
// Log summary on timeout
logger.Info("reservation watch completed (timeout)",
"total", totalReservations,
"ready", len(successfulReservations),
"failed", len(failedReservations),
"timedOut", len(reservationsToWatch),
"duration", time.Since(startTime).Round(time.Millisecond),
"polls", pollCount)
return failedReservations, errors
}

allAreReady := true

for _, res := range reservationsToWatch {
// Fetch current state
var current v1alpha1.Reservation
Expand All @@ -354,9 +369,7 @@ func watchReservationsUntilReady(
}

if err := k8sClient.Get(ctx, nn, &current); err != nil {
allAreReady = false
// Reservation is still in process of being created, or there is a transient error, continue waiting for it
logger.V(1).Info("transient error getting reservation, will retry", "reservation", res.Name, "error", err)
// Reservation is still in process of being created, or there is a transient error
stillWaiting = append(stillWaiting, res)
continue
}
Expand All @@ -369,43 +382,40 @@ func watchReservationsUntilReady(

if readyCond == nil {
// Condition not set yet, keep waiting
allAreReady = false
stillWaiting = append(stillWaiting, res)
continue
}

switch readyCond.Status {
case metav1.ConditionTrue:
// check if host is not set in spec or status: if so, no capacity left to schedule the reservation
// Only consider truly ready if Status.Host is populated
if current.Spec.TargetHost == "" || current.Status.Host == "" {
allAreReady = false
failedReservations = append(failedReservations, current)
logger.Info("insufficient capacity for reservation", "reservation", current.Name, "reason", readyCond.Reason, "message", readyCond.Message, "targetHostInSpec", current.Spec.TargetHost, "hostInStatus", current.Status.Host)
} else {
// Reservation is successfully scheduled, no further action needed
logger.Info("reservation ready", "reservation", current.Name, "host", current.Spec.TargetHost)
stillWaiting = append(stillWaiting, res)
continue
}
// Reservation is successfully scheduled - track for summary
successfulReservations = append(successfulReservations, current.Name)

case metav1.ConditionFalse:
failedReservations = append(failedReservations, res)
// Any failure reason counts as failed
failedReservations = append(failedReservations, current)
case metav1.ConditionUnknown:
allAreReady = false
stillWaiting = append(stillWaiting, res)
}
}

if allAreReady || len(stillWaiting) == 0 {
logger.Info("all reservations checked",
"failed", len(failedReservations))
if len(stillWaiting) == 0 {
// All reservations have reached a terminal state - log summary
logger.Info("reservation watch completed",
"total", totalReservations,
"ready", len(successfulReservations),
"failed", len(failedReservations),
"duration", time.Since(startTime).Round(time.Millisecond),
"polls", pollCount)
return failedReservations, errors
}

reservationsToWatch = stillWaiting
// Log progress
logger.V(1).Info("waiting for reservations to become ready",
"notReady", len(reservationsToWatch),
"total", len(reservations),
"timeRemaining", time.Until(deadline).Round(time.Second))

// Wait before next poll
select {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ type ChangeCommitmentsAPIMonitor struct {
}

// NewChangeCommitmentsAPIMonitor creates a new monitor with Prometheus metrics.
// Metrics are pre-initialized with zero values for common HTTP status codes
// to ensure they appear in Prometheus before the first request.
func NewChangeCommitmentsAPIMonitor() ChangeCommitmentsAPIMonitor {
return ChangeCommitmentsAPIMonitor{
m := ChangeCommitmentsAPIMonitor{
requestCounter: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "cortex_committed_resource_change_api_requests_total",
Help: "Total number of committed resource change API requests by HTTP status code",
Expand All @@ -35,6 +37,21 @@ func NewChangeCommitmentsAPIMonitor() ChangeCommitmentsAPIMonitor {
Help: "Total number of commitment change requests that timed out while waiting for reservations to become ready",
}),
}

// Pre-initialize metrics with zero values for common HTTP status codes.
// This ensures metrics exist in Prometheus before the first request,
// preventing "metric missing" warnings in alerting rules.
for _, statusCode := range []string{"200", "400", "409", "500", "503"} {
m.requestCounter.WithLabelValues(statusCode)
m.requestDuration.WithLabelValues(statusCode)
}

// Pre-initialize commitment change result labels
for _, result := range []string{"accepted", "rejected"} {
m.commitmentChanges.WithLabelValues(result)
}

return m
}

// Describe implements prometheus.Collector.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,13 @@ func TestChangeCommitmentsAPIMonitor_MetricLabels(t *testing.T) {
// Verify request counter has correct labels
for _, family := range families {
if *family.Name == "cortex_committed_resource_change_api_requests_total" {
if len(family.Metric) != 3 {
t.Errorf("Expected 3 request counter metrics, got %d", len(family.Metric))
// At minimum we expect the 3 labels we added (200, 409, 503)
// Plus pre-initialized labels (400, 500) - so >= 5 total
if len(family.Metric) < 3 {
t.Errorf("Expected at least 3 request counter metrics, got %d", len(family.Metric))
}

// Check label names
// Check all metrics have the status_code label
for _, metric := range family.Metric {
labelNames := make(map[string]bool)
for _, label := range metric.Label {
Expand All @@ -120,11 +122,13 @@ func TestChangeCommitmentsAPIMonitor_MetricLabels(t *testing.T) {
}

if *family.Name == "cortex_committed_resource_change_api_request_duration_seconds" {
if len(family.Metric) != 1 {
t.Errorf("Expected 1 histogram metric, got %d", len(family.Metric))
// At minimum we expect the label we used (200)
// Plus pre-initialized labels - so >= 1 total
if len(family.Metric) < 1 {
t.Errorf("Expected at least 1 histogram metric, got %d", len(family.Metric))
}

// Check label names
// Check all metrics have the status_code label
for _, metric := range family.Metric {
labelNames := make(map[string]bool)
for _, label := range metric.Label {
Expand All @@ -138,11 +142,13 @@ func TestChangeCommitmentsAPIMonitor_MetricLabels(t *testing.T) {
}

if *family.Name == "cortex_committed_resource_change_api_commitment_changes_total" {
if len(family.Metric) != 2 {
t.Errorf("Expected 2 commitment changes metrics, got %d", len(family.Metric))
// At minimum we expect the 2 labels we added (success, rejected)
// Plus pre-initialized labels (accepted) - so >= 2 total
if len(family.Metric) < 2 {
t.Errorf("Expected at least 2 commitment changes metrics, got %d", len(family.Metric))
}

// Check label names
// Check all metrics have the result label
for _, metric := range family.Metric {
labelNames := make(map[string]bool)
for _, label := range metric.Label {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1277,7 +1277,8 @@ func (env *CommitmentTestEnv) processNewReservation(res *v1alpha1.Reservation) {
}
}

// markReservationSchedulerProcessedStatus updates a reservation to have Ready=True status (scheduling can be succeeded or not - depending on host status)
// markReservationSchedulerProcessedStatus updates a reservation status based on scheduling result.
// If host is non-empty, sets Ready=True (success). If host is empty, sets Ready=False with NoHostsFound (failure).
func (env *CommitmentTestEnv) markReservationSchedulerProcessedStatus(res *v1alpha1.Reservation, host string) {
ctx := context.Background()

Expand All @@ -1288,16 +1289,28 @@ func (env *CommitmentTestEnv) markReservationSchedulerProcessedStatus(res *v1alp
return
}

// Then update status
// Then update status - Ready=True only if host was found, Ready=False otherwise
res.Status.Host = host
res.Status.Conditions = []metav1.Condition{
{
Type: v1alpha1.ReservationConditionReady,
Status: metav1.ConditionTrue,
Reason: "ReservationActive",
Message: "Reservation is ready (set by test controller)",
LastTransitionTime: metav1.Now(),
},
if host != "" {
res.Status.Conditions = []metav1.Condition{
{
Type: v1alpha1.ReservationConditionReady,
Status: metav1.ConditionTrue,
Reason: "ReservationActive",
Message: "Reservation is ready (set by test controller)",
LastTransitionTime: metav1.Now(),
},
}
} else {
res.Status.Conditions = []metav1.Condition{
{
Type: v1alpha1.ReservationConditionReady,
Status: metav1.ConditionFalse,
Reason: "NoHostsFound",
Message: "No hosts with sufficient capacity (set by test controller)",
LastTransitionTime: metav1.Now(),
},
}
}
if err := env.K8sClient.Status().Update(ctx, res); err != nil {
env.T.Logf("Warning: Failed to update reservation status: %v", err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@ type ReportCapacityAPIMonitor struct {
}

// NewReportCapacityAPIMonitor creates a new monitor with Prometheus metrics.
// Metrics are pre-initialized with zero values for common HTTP status codes
// to ensure they appear in Prometheus before the first request.
func NewReportCapacityAPIMonitor() ReportCapacityAPIMonitor {
return ReportCapacityAPIMonitor{
m := ReportCapacityAPIMonitor{
requestCounter: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "cortex_committed_resource_capacity_api_requests_total",
Help: "Total number of committed resource capacity API requests by HTTP status code",
Expand All @@ -26,6 +28,16 @@ func NewReportCapacityAPIMonitor() ReportCapacityAPIMonitor {
Buckets: []float64{0.1, 0.25, 0.5, 1, 2.5, 5, 10},
}, []string{"status_code"}),
}

// Pre-initialize metrics with zero values for common HTTP status codes.
// This ensures metrics exist in Prometheus before the first request,
// preventing "metric missing" warnings in alerting rules.
for _, statusCode := range []string{"200", "500", "503"} {
m.requestCounter.WithLabelValues(statusCode)
m.requestDuration.WithLabelValues(statusCode)
}

return m
}

// Describe implements prometheus.Collector.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@ type ReportUsageAPIMonitor struct {
}

// NewReportUsageAPIMonitor creates a new monitor with Prometheus metrics.
// Metrics are pre-initialized with zero values for common HTTP status codes
// to ensure they appear in Prometheus before the first request.
func NewReportUsageAPIMonitor() ReportUsageAPIMonitor {
return ReportUsageAPIMonitor{
m := ReportUsageAPIMonitor{
requestCounter: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "cortex_committed_resource_usage_api_requests_total",
Help: "Total number of committed resource usage API requests by HTTP status code",
Expand All @@ -26,6 +28,16 @@ func NewReportUsageAPIMonitor() ReportUsageAPIMonitor {
Buckets: []float64{0.1, 0.25, 0.5, 1, 2.5, 5, 10},
}, []string{"status_code"}),
}

// Pre-initialize metrics with zero values for common HTTP status codes.
// This ensures metrics exist in Prometheus before the first request,
// preventing "metric missing" warnings in alerting rules.
for _, statusCode := range []string{"200", "400", "404", "500", "503"} {
m.requestCounter.WithLabelValues(statusCode)
m.requestDuration.WithLabelValues(statusCode)
}

return m
}

// Describe implements prometheus.Collector.
Expand Down
Loading
Loading