Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,17 @@ lint-fix: golangci-lint ## Run golangci-lint linter and perform fixes
test: ## Run all tests.
go test ./...

.PHONY: testsum
testsum: gotestsum ## Run all tests (clean output for passing, verbose for failing). Options: WATCH=1, RUN=<pattern>, PACKAGE=<pkg>, FORMAT=<fmt> (e.g., standard-verbose for all output)
$(GOTESTSUM) \
$(if $(WATCH),--watch) \
--format $(if $(FORMAT),$(FORMAT),testname) \
--hide-summary=all \
-- \
$(if $(VERBOSE),-v) \
$(if $(RUN),-run $(RUN)) \
$(if $(PACKAGE),$(PACKAGE),./...)

.PHONY: generate
generate: deepcopy crds ## Regenerate CRDs and DeepCopy after API type changes.

Expand All @@ -45,9 +56,11 @@ $(LOCALBIN):

CONTROLLER_GEN ?= $(LOCALBIN)/controller-gen
GOLANGCI_LINT = $(LOCALBIN)/golangci-lint
GOTESTSUM = $(LOCALBIN)/gotestsum

CONTROLLER_TOOLS_VERSION ?= v0.20.0
GOLANGCI_LINT_VERSION ?= v2.9.0
GOTESTSUM_VERSION ?= v1.13.0

.PHONY: controller-gen
controller-gen: $(CONTROLLER_GEN) ## Download controller-gen locally if necessary.
Expand All @@ -59,6 +72,11 @@ golangci-lint: $(GOLANGCI_LINT) ## Download golangci-lint locally if necessary.
$(GOLANGCI_LINT): $(LOCALBIN)
$(call go-install-tool,$(GOLANGCI_LINT),github.com/golangci/golangci-lint/v2/cmd/golangci-lint,$(GOLANGCI_LINT_VERSION))

.PHONY: gotestsum
gotestsum: $(GOTESTSUM) ## Download gotestsum locally if necessary.
$(GOTESTSUM): $(LOCALBIN)
$(call go-install-tool,$(GOTESTSUM),gotest.tools/gotestsum,$(GOTESTSUM_VERSION))

# go-install-tool will 'go install' any package with custom target and name of binary, if it doesn't exist
# $1 - target path with name of binary
# $2 - package url which can be installed
Expand Down
4 changes: 4 additions & 0 deletions api/v1alpha1/reservation_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ type CommittedResourceReservationSpec struct {
// +kubebuilder:validation:Optional
ResourceName string `json:"resourceName,omitempty"`

// CommitmentUUID is the UUID of the commitment that this reservation corresponds to.
// +kubebuilder:validation:Optional
CommitmentUUID string `json:"commitmentUUID,omitempty"`

// ResourceGroup is the group/category of the resource (e.g., flavor group for Nova)
// +kubebuilder:validation:Optional
ResourceGroup string `json:"resourceGroup,omitempty"`
Expand Down
15 changes: 15 additions & 0 deletions docs/develop.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,21 @@ Cortex is developed using the Go programming language. To get started with the d

Run `make` in your terminal from the cortex root directory to perform linting and testing tasks.

### Working on Tests

```bash
# Watch mode for continuous testing; print logs for failed tests only
make testsum WATCH=1
```

The `testsum` target provides cleaner output by showing only full verbose output for failing tests.

**Available options:**
- `WATCH=1` - Automatically re-run tests when files change
- `RUN=<pattern>` - Run specific tests matching the pattern
- `PACKAGE=<pkg>` - Test specific package(s)
- `FORMAT=<fmt>` - Change output format (e.g., `standard-verbose` for verbose output on all tests)

## Helm Charts

Helm charts bundle the application into a package, containing all the [Kubernetes](https://kubernetes.io/docs/tutorials/hello-minikube/) resources needed to run the application. The configuration for the application is specified in the [Helm `values.yaml`](cortex.secrets.example.yaml).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ spec:
Key: Workload UUID (VM UUID for Nova, Pod UID for Pods, Machine UID for IronCore, etc.)
Value: allocation state and metadata
type: object
commitmentUUID:
description: CommitmentUUID is the UUID of the commitment that
this reservation corresponds to.
type: string
creator:
description: |-
Creator identifies the system or component that created this reservation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"errors"
"fmt"
"net/http"
"sort"
"strings"
"time"

"github.com/cobaltcore-dev/cortex/api/v1alpha1"
Expand All @@ -23,12 +25,24 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
)

// sortedKeys returns map keys sorted alphabetically for deterministic iteration.
func sortedKeys[K ~string, V any](m map[K]V) []K {
keys := make([]K, 0, len(m))
for k := range m {
keys = append(keys, k)
}
sort.Slice(keys, func(i, j int) bool {
return string(keys[i]) < string(keys[j])
})
return keys
}

const (
// watchTimeout is how long to wait for all reservations to become ready
watchTimeout = 20 * time.Second
watchTimeout = 2 * time.Second

// pollInterval is how frequently to poll reservation status
pollInterval = 1 * time.Second
pollInterval = 100 * time.Millisecond
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated
)

// implements POST /v1/change-commitments from Limes LIQUID API:
Expand Down Expand Up @@ -99,6 +113,7 @@ func (api *HTTPAPI) processCommitmentChanges(w http.ResponseWriter, log logr.Log
ctx := context.Background()
manager := NewReservationManager(api.client)
requireRollback := false
failedCommitments := make(map[string]string) // commitmentUUID to reason for failure, for better response messages in case of rollback
log.Info("processing commitment change request", "availabilityZone", req.AZ, "dryRun", req.DryRun, "affectedProjects", len(req.ByProject))

knowledge := &reservations.FlavorGroupKnowledgeClient{Client: api.client}
Expand Down Expand Up @@ -135,8 +150,10 @@ func (api *HTTPAPI) processCommitmentChanges(w http.ResponseWriter, log logr.Log
}

ProcessLoop:
for projectID, projectChanges := range req.ByProject {
for resourceName, resourceChanges := range projectChanges.ByResource {
for _, projectID := range sortedKeys(req.ByProject) {
projectChanges := req.ByProject[projectID]
for _, resourceName := range sortedKeys(projectChanges.ByResource) {
resourceChanges := projectChanges.ByResource[resourceName]
// Validate resource name pattern (instances_group_*)
flavorGroupName, err := getFlavorGroupNameFromResource(string(resourceName))
if err != nil {
Expand All @@ -157,14 +174,16 @@ ProcessLoop:
// Additional per-commitment validation if needed
log.Info("processing commitment change", "commitmentUUID", commitment.UUID, "projectID", projectID, "resourceName", resourceName, "oldStatus", commitment.OldStatus.UnwrapOr("none"), "newStatus", commitment.NewStatus.UnwrapOr("none"))

// TODO add configurable upper limit validation for commitment size (number of instances) to prevent excessive reservation creation
// TODO add domain

// List all committed resource reservations, then filter by name prefix
var all_reservations v1alpha1.ReservationList
if err := api.client.List(ctx, &all_reservations, client.MatchingLabels{
v1alpha1.LabelReservationType: v1alpha1.ReservationTypeLabelCommittedResource,
}); err != nil {
resp.RejectionReason = fmt.Sprintf("failed to list reservations for commitment %s: %v", commitment.UUID, err)
failedCommitments[string(commitment.UUID)] = "failed to list reservations"
log.Info(fmt.Sprintf("failed to list reservations for commitment %s: %v", commitment.UUID, err))
requireRollback = true
break ProcessLoop
}
Expand All @@ -189,7 +208,8 @@ ProcessLoop:
} else {
stateBefore, err = FromReservations(existing_reservations.Items)
if err != nil {
resp.RejectionReason = fmt.Sprintf("failed to get existing state for commitment %s: %v", commitment.UUID, err)
failedCommitments[string(commitment.UUID)] = "failed to parse existing commitment reservations"
log.Info(fmt.Sprintf("failed to get existing state for commitment %s: %v", commitment.UUID, err))
requireRollback = true
break ProcessLoop
}
Expand All @@ -199,7 +219,8 @@ ProcessLoop:
// get desired state
stateDesired, err := FromChangeCommitmentTargetState(commitment, string(projectID), flavorGroupName, flavorGroup, string(req.AZ))
if err != nil {
resp.RejectionReason = fmt.Sprintf("failed to get desired state for commitment %s: %v", commitment.UUID, err)
failedCommitments[string(commitment.UUID)] = "failed to determine desired commitment state"
log.Info(fmt.Sprintf("failed to get desired state for commitment %s: %v", commitment.UUID, err))
requireRollback = true
break ProcessLoop
}
Expand All @@ -208,7 +229,8 @@ ProcessLoop:

touchedReservations, deletedReservations, err := manager.ApplyCommitmentState(ctx, log, stateDesired, flavorGroups, "changeCommitmentsApi")
if err != nil {
resp.RejectionReason = fmt.Sprintf("failed to apply commitment state for commitment %s: %v", commitment.UUID, err)
failedCommitments[string(commitment.UUID)] = "failed to apply commitment state"
log.Info(fmt.Sprintf("failed to apply commitment state for commitment %s: %v", commitment.UUID, err))
requireRollback = true
break ProcessLoop
}
Expand All @@ -224,17 +246,31 @@ ProcessLoop:

time_start := time.Now()

if err := watchReservationsUntilReady(ctx, log, api.client, reservationsToWatch, watchTimeout); err != nil {
if failedReservations, errors := watchReservationsUntilReady(ctx, log, api.client, reservationsToWatch, watchTimeout); len(failedReservations) > 0 || len(errors) > 0 {
log.Info("reservations failed to become ready, initiating rollback",
"reason", err.Error())
resp.RejectionReason = fmt.Sprintf("Not all reservations can be fulfilled: %v", err)
"failedReservations", len(failedReservations),
"errors", errors)

for _, res := range failedReservations {
failedCommitments[res.Spec.CommittedResourceReservation.CommitmentUUID] = "not sufficient capacity"
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
requireRollback = true
}

log.Info("finished watching reservation", "totalSchedulingTimeSeconds", time.Since(time_start).Seconds())
}

if requireRollback {
// Build rejection reason from failed commitments
if len(failedCommitments) > 0 {
var reasonBuilder strings.Builder
reasonBuilder.WriteString(fmt.Sprintf("%d commitment(s) failed to apply: ", len(failedCommitments)))
for commitmentUUID, reason := range failedCommitments {
reasonBuilder.WriteString(fmt.Sprintf("\n- commitment %s: %s", commitmentUUID, reason))
}
resp.RejectionReason = reasonBuilder.String()
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated

log.Info("rollback of commitment changes")
for commitmentUUID, state := range statesBefore {
// Rollback to statesBefore for this commitment
Expand All @@ -247,16 +283,10 @@ ProcessLoop:
}

log.Info("finished applying rollbacks for commitment changes", "reasonOfRollback", resp.RejectionReason)

// TODO improve human-readable reasoning based on actual failure, i.e. polish resp.RejectionReason
return nil
}

log.Info("commitment changes accepted")
if resp.RejectionReason != "" {
log.Info("unexpected non-empty rejection reason without rollback", "reason", resp.RejectionReason)
resp.RejectionReason = ""
}
return nil
}

Expand All @@ -267,23 +297,27 @@ func watchReservationsUntilReady(
k8sClient client.Client,
reservations []v1alpha1.Reservation,
timeout time.Duration,
) error {
) (failedReservations []v1alpha1.Reservation, errors []error) {

if len(reservations) == 0 {
return nil
return failedReservations, nil
}

deadline := time.Now().Add(timeout)

reservationsToWatch := make([]v1alpha1.Reservation, len(reservations))
copy(reservationsToWatch, reservations)

for {
if time.Now().After(deadline) {
return fmt.Errorf("timeout after %v waiting for reservations to become ready", timeout)
errors = append(errors, fmt.Errorf("timeout after %v waiting for reservations to become ready", timeout))
return failedReservations, errors
}

allReady := true
var notReadyReasons []string
allChecked := true

for _, res := range reservations {
check:
for i, res := range reservationsToWatch {
// Fetch current state
var current v1alpha1.Reservation
nn := types.NamespacedName{
Expand All @@ -292,12 +326,16 @@ func watchReservationsUntilReady(
}

if err := k8sClient.Get(ctx, nn, &current); err != nil {
allChecked = false
if apierrors.IsNotFound(err) {
// Reservation is still in process of being created
allReady = false
// Reservation is still in process of being created, continue waiting for it
continue
}
return fmt.Errorf("failed to get reservation %s: %w", res.Name, err)
// remove reservation from waiting
failedReservations = append(failedReservations, res)
reservationsToWatch = append(reservationsToWatch[:i], reservationsToWatch[i+1:]...)
errors = append(errors, fmt.Errorf("failed to get reservation %s: %w", res.Name, err))
break check // break because iterating list was modified
}

// Check Ready condition
Expand All @@ -308,37 +346,36 @@ func watchReservationsUntilReady(

if readyCond == nil {
// Condition not set yet, keep waiting
allReady = false
notReadyReasons = append(notReadyReasons,
res.Name+": condition not set")
allChecked = false
continue
}

switch readyCond.Status {
case metav1.ConditionTrue:
// This reservation is ready
continue
// TODO use more than readyCondition
allChecked = false
reservationsToWatch = append(reservationsToWatch[:i], reservationsToWatch[i+1:]...)
break check // break because iterating list was modified
case metav1.ConditionFalse:
// Explicit failure - stop immediately
return fmt.Errorf("reservation %s failed: %s (reason: %s)",
res.Name, readyCond.Message, readyCond.Reason)
allChecked = false
failedReservations = append(failedReservations, res)
reservationsToWatch = append(reservationsToWatch[:i], reservationsToWatch[i+1:]...)
break check // break because iterating list was modified
case metav1.ConditionUnknown:
// Still processing
allReady = false
notReadyReasons = append(notReadyReasons,
fmt.Sprintf("%s: %s", res.Name, readyCond.Message))
allChecked = false
}
}

if allReady {
log.Info("all reservations are ready",
"count", len(reservations))
return nil
if allChecked {
log.Info("all reservations checked",
"failed", len(failedReservations))
return failedReservations, errors
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

// Log progress
log.Info("waiting for reservations to become ready",
"notReady", len(notReadyReasons),
"notReady", len(reservationsToWatch),
"total", len(reservations),
"timeRemaining", time.Until(deadline).Round(time.Second))

Expand All @@ -347,7 +384,7 @@ func watchReservationsUntilReady(
case <-time.After(pollInterval):
// Continue polling
case <-ctx.Done():
return fmt.Errorf("context cancelled while waiting for reservations: %w", ctx.Err())
return failedReservations, append(errors, fmt.Errorf("context cancelled while waiting for reservations: %w", ctx.Err()))
}
}
}
Loading
Loading