Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 18 additions & 2 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,22 @@ func main() {
// API endpoint.
mux := http.NewServeMux()

// Shared mutex for serializing CR state changes between the syncer and change-commitments API.
// This uses Kubernetes Leases for distributed coordination across pods/deployments.
// Both cortex-nova-scheduling (API) and cortex-nova-knowledge (syncer) pods will
// use the same Lease resource to serialize CR state changes.
crMutexConfig := commitments.DefaultDistributedMutexConfig()
// Use the scheduling domain namespace for the lease (typically where cortex CRDs live)
crMutexConfig.LeaseNamespace = "default" // Will be overridden by pod's namespace
if ns := os.Getenv("POD_NAMESPACE"); ns != "" {
crMutexConfig.LeaseNamespace = ns
}
// Use pod name as holder identity for debugging
if podName := os.Getenv("POD_NAME"); podName != "" {
crMutexConfig.HolderIdentity = podName
}
var crMutex commitments.CRMutexInterface = commitments.NewDistributedMutex(multiclusterClient, crMutexConfig)

// The pipeline monitor is a bucket for all metrics produced during the
// execution of individual steps (see step monitor below) and the overall
// pipeline.
Expand Down Expand Up @@ -343,7 +359,7 @@ func main() {

// Initialize commitments API for LIQUID interface (with Nova client for usage reporting)
commitmentsConfig := conf.GetConfigOrDie[commitments.Config]()
commitmentsAPI := commitments.NewAPIWithConfig(multiclusterClient, commitmentsConfig, novaClient)
commitmentsAPI := commitments.NewAPIWithConfig(multiclusterClient, commitmentsConfig, novaClient, crMutex)
commitmentsAPI.Init(mux, metrics.Registry, ctrl.Log.WithName("commitments-api"))

deschedulingsController := &nova.DetectorPipelineController{
Expand Down Expand Up @@ -671,7 +687,7 @@ func main() {
setupLog.Info("starting commitments syncer")
syncerMonitor := commitments.NewSyncerMonitor()
must.Succeed(metrics.Registry.Register(syncerMonitor))
syncer := commitments.NewSyncer(multiclusterClient, syncerMonitor)
syncer := commitments.NewSyncer(multiclusterClient, syncerMonitor, crMutex)
syncerConfig := conf.GetConfigOrDie[commitments.SyncerConfig]()
syncerDefaults := commitments.DefaultSyncerConfig()
if syncerConfig.SyncInterval == 0 {
Expand Down
14 changes: 9 additions & 5 deletions internal/scheduling/reservations/commitments/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"context"
"net/http"
"strings"
"sync"

"github.com/cobaltcore-dev/cortex/internal/scheduling/nova"
"github.com/go-logr/logr"
Expand All @@ -30,15 +29,19 @@ type HTTPAPI struct {
usageMonitor ReportUsageAPIMonitor
capacityMonitor ReportCapacityAPIMonitor
infoMonitor InfoAPIMonitor
// Mutex to serialize change-commitments requests
changeMutex sync.Mutex
// Shared mutex to serialize CR state changes with the syncer (distributed across pods)
crMutex CRMutexInterface
}

func NewAPI(client client.Client) *HTTPAPI {
return NewAPIWithConfig(client, DefaultConfig(), nil)
return NewAPIWithConfig(client, DefaultConfig(), nil, nil)
}

func NewAPIWithConfig(client client.Client, config Config, novaClient UsageNovaClient) *HTTPAPI {
func NewAPIWithConfig(client client.Client, config Config, novaClient UsageNovaClient, crMutex CRMutexInterface) *HTTPAPI {
// If no shared mutex provided, create a local one (for backwards compatibility in tests)
if crMutex == nil {
crMutex = &LocalCRMutex{}
}
return &HTTPAPI{
client: client,
config: config,
Expand All @@ -47,6 +50,7 @@ func NewAPIWithConfig(client client.Client, config Config, novaClient UsageNovaC
usageMonitor: NewReportUsageAPIMonitor(),
capacityMonitor: NewReportCapacityAPIMonitor(),
infoMonitor: NewInfoAPIMonitor(),
crMutex: crMutex,
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,19 @@ func (api *HTTPAPI) HandleChangeCommitments(w http.ResponseWriter, r *http.Reque
return
}

// Serialize all change-commitments requests
api.changeMutex.Lock()
defer api.changeMutex.Unlock()

// Serialize all change-commitments requests (shared with syncer via distributed lock)
ctx := reservations.WithGlobalRequestID(context.Background(), "committed-resource-"+requestID)
_, unlock, err := api.crMutex.Lock(ctx)
if err != nil {
logger := LoggerFromContext(ctx).WithValues("component", "api", "endpoint", "/commitments/v1/change-commitments")
logger.Error(err, "failed to acquire distributed lock for change-commitments")
statusCode = http.StatusServiceUnavailable
http.Error(w, "Failed to acquire lock, please retry later: "+err.Error(), statusCode)
api.recordMetrics(req, resp, statusCode, startTime)
return
}
defer unlock()

logger := LoggerFromContext(ctx).WithValues("component", "api", "endpoint", "/commitments/v1/change-commitments")

// Only accept POST method
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -999,7 +999,7 @@ func newCommitmentTestEnv(
// Use custom config if provided, otherwise use default
var api *HTTPAPI
if customConfig != nil {
api = NewAPIWithConfig(wrappedClient, *customConfig, nil)
api = NewAPIWithConfig(wrappedClient, *customConfig, nil, nil)
} else {
api = NewAPI(wrappedClient)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,7 @@ func newUsageTestEnv(
}

// Create API with mock Nova client
api := NewAPIWithConfig(k8sClient, DefaultConfig(), novaClient)
api := NewAPIWithConfig(k8sClient, DefaultConfig(), novaClient, nil)
mux := http.NewServeMux()
registry := prometheus.NewRegistry()
api.Init(mux, registry, log.Log)
Expand Down
Loading