Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ import (
"github.com/cobaltcore-dev/cortex/internal/scheduling/pods"
"github.com/cobaltcore-dev/cortex/internal/scheduling/reservations"
"github.com/cobaltcore-dev/cortex/internal/scheduling/reservations/commitments"
reservationscontroller "github.com/cobaltcore-dev/cortex/internal/scheduling/reservations/controller"
"github.com/cobaltcore-dev/cortex/internal/scheduling/reservations/failover"
"github.com/cobaltcore-dev/cortex/pkg/conf"
"github.com/cobaltcore-dev/cortex/pkg/monitoring"
Expand Down Expand Up @@ -489,16 +488,16 @@ func main() {
}
if slices.Contains(mainConfig.EnabledControllers, "reservations-controller") {
setupLog.Info("enabling controller", "controller", "reservations-controller")
monitor := reservationscontroller.NewControllerMonitor(multiclusterClient)
monitor := reservations.NewMonitor(multiclusterClient)
metrics.Registry.MustRegister(&monitor)
reservationsControllerConfig := conf.GetConfigOrDie[reservationscontroller.Config]()
commitmentsConfig := conf.GetConfigOrDie[commitments.Config]()

if err := (&reservationscontroller.ReservationReconciler{
if err := (&commitments.CommitmentReservationController{
Client: multiclusterClient,
Scheme: mgr.GetScheme(),
Conf: reservationsControllerConfig,
Conf: commitmentsConfig,
}).SetupWithManager(mgr, multiclusterClient); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Reservation")
setupLog.Error(err, "unable to create controller", "controller", "CommitmentReservation")
os.Exit(1)
}
}
Expand Down
1 change: 1 addition & 0 deletions helm/bundles/cortex-nova/templates/pipelines_kvm.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ spec:

This is the pipeline used for KVM hypervisors (qemu and cloud-hypervisor).
Specifically, this pipeline is used for general purpose workloads.
It is also used for (CR/HA) reservation requests.
type: filter-weigher
createDecisions: false
# Fetch all placement candidates, ignoring nova's preselection.
Expand Down
11 changes: 8 additions & 3 deletions helm/bundles/cortex-nova/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,14 @@ cortex-scheduling-controllers:
- failover-reservations-controller
enabledTasks:
- nova-decisions-cleanup-task
# Endpoints configuration for reservations controller
endpoints:
novaExternalScheduler: "http://localhost:8080/scheduler/nova/external"
# NovaExternalScheduler is the URL of the nova external scheduler API for CR reservations
novaExternalScheduler: "http://localhost:8080/scheduler/nova/external"
# FlavorGroupPipelines maps flavor group IDs to pipeline names for CR reservations
# This allows different scheduling strategies per flavor group (e.g., HANA vs GP)
flavorGroupPipelines:
"2152": "kvm-hana-bin-packing-all-filters-enabled" # HANA flavor group
"2101": "kvm-general-purpose-load-balancing-all-filters-enabled" # General Purpose flavor group
"*": "kvm-general-purpose-load-balancing-all-filters-enabled" # Catch-all fallback
# OvercommitMappings is a list of mappings that map hypervisor traits to
# overcommit ratios. Note that this list is applied in order, so if there
# are multiple mappings applying to the same hypervisors, the last mapping
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,18 @@ import (
"github.com/cobaltcore-dev/cortex/api/v1alpha1"
"github.com/cobaltcore-dev/cortex/internal/scheduling/reservations"
"github.com/go-logr/logr"
"github.com/google/uuid"
. "github.com/majewsky/gg/option"
"github.com/sapcc/go-api-declarations/liquid"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
)

var apiLog = ctrl.Log.WithName("commitment-reservation-api")

// sortedKeys returns map keys sorted alphabetically for deterministic iteration.
func sortedKeys[K ~string, V any](m map[K]V) []K {
keys := make([]K, 0, len(m))
Expand All @@ -43,16 +47,23 @@ func sortedKeys[K ~string, V any](m map[K]V) []K {
// This endpoint handles commitment changes by creating/updating/deleting Reservation CRDs based on the commitment lifecycle.
// A request may contain multiple commitment changes which are processed in a single transaction. If any change fails, all changes are rolled back.
func (api *HTTPAPI) HandleChangeCommitments(w http.ResponseWriter, r *http.Request) {
// Check if API is enabled
if !api.config.EnableChangeCommitmentsAPI {
http.Error(w, "change-commitments API is disabled", http.StatusServiceUnavailable)
return
}

// Serialize all change-commitments requests
api.changeMutex.Lock()
defer api.changeMutex.Unlock()

// Extract or generate request ID for tracing
requestID := r.Header.Get("X-Request-ID")
if requestID == "" {
requestID = fmt.Sprintf("req-%d", time.Now().UnixNano())
requestID = uuid.New().String()
}
log := commitmentApiLog.WithValues("requestID", requestID, "endpoint", "/v1/change-commitments")
ctx := reservations.WithGlobalRequestID(context.Background(), requestID)
logger := APILoggerFromContext(ctx).WithValues("endpoint", "/v1/change-commitments")

// Only accept POST method
if r.Method != http.MethodPost {
Expand All @@ -63,20 +74,20 @@ func (api *HTTPAPI) HandleChangeCommitments(w http.ResponseWriter, r *http.Reque
// Parse request body
var req liquid.CommitmentChangeRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
log.Error(err, "invalid request body")
logger.Error(err, "invalid request body")
http.Error(w, "Invalid request body: "+err.Error(), http.StatusBadRequest)
return
}

log.Info("received change commitments request", "affectedProjects", len(req.ByProject), "dryRun", req.DryRun, "availabilityZone", req.AZ)
logger.Info("received change commitments request", "affectedProjects", len(req.ByProject), "dryRun", req.DryRun, "availabilityZone", req.AZ)

// Initialize response
resp := liquid.CommitmentChangeResponse{}

// Check for dry run -> early reject, not supported yet
if req.DryRun {
resp.RejectionReason = "Dry run not supported yet"
log.Info("rejecting dry run request")
logger.Info("rejecting dry run request")
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
if err := json.NewEncoder(w).Encode(resp); err != nil {
Expand All @@ -87,7 +98,7 @@ func (api *HTTPAPI) HandleChangeCommitments(w http.ResponseWriter, r *http.Reque

// Process commitment changes
// For now, we'll implement a simplified path that checks capacity for immediate start CRs
if err := api.processCommitmentChanges(w, log, req, &resp); err != nil {
if err := api.processCommitmentChanges(ctx, w, logger, req, &resp); err != nil {
// Error already written to response by processCommitmentChanges
return
}
Expand All @@ -100,17 +111,16 @@ func (api *HTTPAPI) HandleChangeCommitments(w http.ResponseWriter, r *http.Reque
}
}

func (api *HTTPAPI) processCommitmentChanges(w http.ResponseWriter, log logr.Logger, req liquid.CommitmentChangeRequest, resp *liquid.CommitmentChangeResponse) error {
ctx := context.Background()
func (api *HTTPAPI) processCommitmentChanges(ctx context.Context, w http.ResponseWriter, logger logr.Logger, req liquid.CommitmentChangeRequest, resp *liquid.CommitmentChangeResponse) error {
manager := NewReservationManager(api.client)
requireRollback := false
failedCommitments := make(map[string]string) // commitmentUUID to reason for failure, for better response messages in case of rollback
log.Info("processing commitment change request", "availabilityZone", req.AZ, "dryRun", req.DryRun, "affectedProjects", len(req.ByProject))
logger.Info("processing commitment change request", "availabilityZone", req.AZ, "dryRun", req.DryRun, "affectedProjects", len(req.ByProject))

knowledge := &reservations.FlavorGroupKnowledgeClient{Client: api.client}
flavorGroups, err := knowledge.GetAllFlavorGroups(ctx, nil)
if err != nil {
log.Info("failed to get flavor groups from knowledge extractor", "error", err)
logger.Info("failed to get flavor groups from knowledge extractor", "error", err)
resp.RejectionReason = "caches not ready"
retryTime := time.Now().Add(1 * time.Minute)
resp.RetryAt = Some(retryTime)
Expand All @@ -124,7 +134,7 @@ func (api *HTTPAPI) processCommitmentChanges(w http.ResponseWriter, log logr.Log
}

if req.InfoVersion != currentVersion {
log.Info("version mismatch in commitment change request",
logger.Info("version mismatch in commitment change request",
"requestVersion", req.InfoVersion,
"currentVersion", currentVersion)
http.Error(w, fmt.Sprintf("Version mismatch: request version %d, current version %d. Please refresh and retry.",
Expand Down Expand Up @@ -163,7 +173,7 @@ ProcessLoop:

for _, commitment := range resourceChanges.Commitments {
// Additional per-commitment validation if needed
log.Info("processing commitment change", "commitmentUUID", commitment.UUID, "projectID", projectID, "resourceName", resourceName, "oldStatus", commitment.OldStatus.UnwrapOr("none"), "newStatus", commitment.NewStatus.UnwrapOr("none"))
logger.Info("processing commitment change", "commitmentUUID", commitment.UUID, "projectID", projectID, "resourceName", resourceName, "oldStatus", commitment.OldStatus.UnwrapOr("none"), "newStatus", commitment.NewStatus.UnwrapOr("none"))

// TODO add configurable upper limit validation for commitment size (number of instances) to prevent excessive reservation creation
// TODO add domain
Expand All @@ -174,7 +184,7 @@ ProcessLoop:
v1alpha1.LabelReservationType: v1alpha1.ReservationTypeLabelCommittedResource,
}); err != nil {
failedCommitments[string(commitment.UUID)] = "failed to list reservations"
log.Info(fmt.Sprintf("failed to list reservations for commitment %s: %v", commitment.UUID, err))
logger.Info("failed to list reservations for commitment", "commitmentUUID", commitment.UUID, "error", err)
requireRollback = true
break ProcessLoop
}
Expand All @@ -200,7 +210,7 @@ ProcessLoop:
stateBefore, err = FromReservations(existing_reservations.Items)
if err != nil {
failedCommitments[string(commitment.UUID)] = "failed to parse existing commitment reservations"
log.Info(fmt.Sprintf("failed to get existing state for commitment %s: %v", commitment.UUID, err))
logger.Info("failed to get existing state for commitment", "commitmentUUID", commitment.UUID, "error", err)
requireRollback = true
break ProcessLoop
}
Expand All @@ -210,35 +220,35 @@ ProcessLoop:
// get desired state
stateDesired, err := FromChangeCommitmentTargetState(commitment, string(projectID), flavorGroupName, flavorGroup, string(req.AZ))
if err != nil {
failedCommitments[string(commitment.UUID)] = "failed to determine desired commitment state"
log.Info(fmt.Sprintf("failed to get desired state for commitment %s: %v", commitment.UUID, err))
failedCommitments[string(commitment.UUID)] = err.Error()
logger.Info("failed to get desired state for commitment", "commitmentUUID", commitment.UUID, "error", err)
requireRollback = true
break ProcessLoop
}

log.Info("applying commitment state change", "commitmentUUID", commitment.UUID, "oldState", stateBefore, "desiredState", stateDesired)
logger.Info("applying commitment state change", "commitmentUUID", commitment.UUID, "oldState", stateBefore, "desiredState", stateDesired)

touchedReservations, deletedReservations, err := manager.ApplyCommitmentState(ctx, log, stateDesired, flavorGroups, "changeCommitmentsApi")
touchedReservations, deletedReservations, err := manager.ApplyCommitmentState(ctx, logger, stateDesired, flavorGroups, "changeCommitmentsApi")
if err != nil {
failedCommitments[string(commitment.UUID)] = "failed to apply commitment state"
log.Info(fmt.Sprintf("failed to apply commitment state for commitment %s: %v", commitment.UUID, err))
logger.Info("failed to apply commitment state for commitment", "commitmentUUID", commitment.UUID, "error", err)
requireRollback = true
break ProcessLoop
}
log.Info("applied commitment state change", "commitmentUUID", commitment.UUID, "touchedReservations", len(touchedReservations), "deletedReservations", len(deletedReservations))
logger.Info("applied commitment state change", "commitmentUUID", commitment.UUID, "touchedReservations", len(touchedReservations), "deletedReservations", len(deletedReservations))
reservationsToWatch = append(reservationsToWatch, touchedReservations...)
}
}
}

// TODO make the rollback defer safe
if !requireRollback {
log.Info("applied commitment changes, now watching for reservation readiness", "reservationsToWatch", len(reservationsToWatch))
logger.Info("applied commitment changes, now watching for reservation readiness", "reservationsToWatch", len(reservationsToWatch))

time_start := time.Now()

if failedReservations, errors := watchReservationsUntilReady(ctx, log, api.client, reservationsToWatch, api.config.ChangeAPIWatchReservationsTimeout, api.config.ChangeAPIWatchReservationsPollInterval); len(failedReservations) > 0 || len(errors) > 0 {
log.Info("reservations failed to become ready, initiating rollback",
if failedReservations, errors := watchReservationsUntilReady(ctx, logger, api.client, reservationsToWatch, api.config.ChangeAPIWatchReservationsTimeout, api.config.ChangeAPIWatchReservationsPollInterval); len(failedReservations) > 0 || len(errors) > 0 {
logger.Info("reservations failed to become ready, initiating rollback",
"failedReservations", len(failedReservations),
"errors", errors)

Expand All @@ -251,7 +261,7 @@ ProcessLoop:
requireRollback = true
}

log.Info("finished watching reservation", "totalSchedulingTimeSeconds", time.Since(time_start).Seconds())
logger.Info("finished watching reservation", "totalSchedulingTimeSeconds", time.Since(time_start).Seconds())
}

if requireRollback {
Expand All @@ -265,29 +275,29 @@ ProcessLoop:
resp.RejectionReason = reasonBuilder.String()
}

log.Info("rollback of commitment changes")
logger.Info("rollback of commitment changes")
for commitmentUUID, state := range statesBefore {
// Rollback to statesBefore for this commitment
log.Info("applying rollback for commitment", "commitmentUUID", commitmentUUID, "stateBefore", state)
_, _, err := manager.ApplyCommitmentState(ctx, log, state, flavorGroups, "changeCommitmentsApiRollback")
logger.Info("applying rollback for commitment", "commitmentUUID", commitmentUUID, "stateBefore", state)
_, _, err := manager.ApplyCommitmentState(ctx, logger, state, flavorGroups, "changeCommitmentsApiRollback")
if err != nil {
log.Info("failed to apply rollback state for commitment", "commitmentUUID", commitmentUUID, "error", err)
logger.Info("failed to apply rollback state for commitment", "commitmentUUID", commitmentUUID, "error", err)
// continue with best effort rollback for other projects
}
}

log.Info("finished applying rollbacks for commitment changes", "reasonOfRollback", resp.RejectionReason)
logger.Info("finished applying rollbacks for commitment changes", "reasonOfRollback", resp.RejectionReason)
return nil
}

log.Info("commitment changes accepted")
logger.Info("commitment changes accepted")
return nil
}

// watchReservationsUntilReady polls until all reservations reach Ready=True or timeout.
func watchReservationsUntilReady(
ctx context.Context,
log logr.Logger,
logger logr.Logger,
k8sClient client.Client,
reservations []v1alpha1.Reservation,
timeout time.Duration,
Expand All @@ -310,7 +320,7 @@ func watchReservationsUntilReady(
return failedReservations, errors
}

allChecked := true
allAreReady := true

for _, res := range reservationsToWatch {
// Fetch current state
Expand All @@ -321,9 +331,9 @@ func watchReservationsUntilReady(
}

if err := k8sClient.Get(ctx, nn, &current); err != nil {
allChecked = false
allAreReady = false
// Reservation is still in process of being created, or there is a transient error, continue waiting for it
log.V(1).Info("transient error getting reservation, will retry", "reservation", res.Name, "error", err)
logger.V(1).Info("transient error getting reservation, will retry", "reservation", res.Name, "error", err)
stillWaiting = append(stillWaiting, res)
continue
}
Expand All @@ -336,32 +346,40 @@ func watchReservationsUntilReady(

if readyCond == nil {
// Condition not set yet, keep waiting
allChecked = false
allAreReady = false
stillWaiting = append(stillWaiting, res)
continue
}

switch readyCond.Status {
case metav1.ConditionTrue:
// TODO use more than readyCondition
// check if host is not set in spec or status: if so, no capacity left to schedule the reservation
if current.Spec.TargetHost == "" || current.Status.Host == "" {
allAreReady = false
failedReservations = append(failedReservations, current)
logger.Info("insufficient capacity for reservation", "reservation", current.Name, "reason", readyCond.Reason, "message", readyCond.Message, "targetHostInSpec", current.Spec.TargetHost, "hostInStatus", current.Status.Host)
} else {
// Reservation is successfully scheduled, no further action needed
logger.Info("reservation ready", "reservation", current.Name, "host", current.Spec.TargetHost)
}

case metav1.ConditionFalse:
allChecked = false
failedReservations = append(failedReservations, res)
case metav1.ConditionUnknown:
Comment on lines 363 to 365
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Inconsistent use of res vs current when appending failed reservations.

At line 367, when ConditionFalse is detected, res (the original from reservationsToWatch) is appended to failedReservations. However, for ConditionTrue with missing host (line 359), current (the freshly fetched state) is appended. This inconsistency could lead to stale data in the failedReservations slice.

Proposed fix for consistency
 		case metav1.ConditionFalse:
-			failedReservations = append(failedReservations, res)
+			failedReservations = append(failedReservations, current)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
case metav1.ConditionFalse:
allChecked = false
failedReservations = append(failedReservations, res)
case metav1.ConditionUnknown:
case metav1.ConditionFalse:
failedReservations = append(failedReservations, current)
case metav1.ConditionUnknown:
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/scheduling/reservations/commitments/api_change_commitments.go`
around lines 366 - 368, The code appends the original `res` into
`failedReservations` for the `metav1.ConditionFalse` case, while other failure
cases append the freshly fetched `current`, leading to potential stale data;
update the `case metav1.ConditionFalse` branch to append `current` instead of
`res` (or consistently use one source across all branches) so
`failedReservations` always contains the latest reservation state from the
`current` variable (also check other branches in the same switch that append to
`failedReservations` to ensure consistent use of `current` vs `res`).

allChecked = false
allAreReady = false
stillWaiting = append(stillWaiting, res)
}
}

if allChecked || len(stillWaiting) == 0 {
log.Info("all reservations checked",
if allAreReady || len(stillWaiting) == 0 {
logger.Info("all reservations checked",
"failed", len(failedReservations))
return failedReservations, errors
}

reservationsToWatch = stillWaiting
// Log progress
log.Info("waiting for reservations to become ready",
logger.V(1).Info("waiting for reservations to become ready",
"notReady", len(reservationsToWatch),
"total", len(reservations),
"timeRemaining", time.Until(deadline).Round(time.Second))
Expand Down
Loading
Loading