From 5b06ab29fe4e622b8b57d7b0e7145a05fe97690f Mon Sep 17 00:00:00 2001 From: mblos Date: Thu, 26 Mar 2026 08:18:05 +0100 Subject: [PATCH 1/2] Commitments syncer and change-api share mutex lock --- cmd/main.go | 8 +++- .../reservations/commitments/api.go | 38 +++++++++++++++++-- .../commitments/api_change_commitments.go | 6 +-- .../api_change_commitments_test.go | 2 +- .../commitments/api_report_usage_test.go | 2 +- .../reservations/commitments/syncer.go | 16 +++++++- .../reservations/commitments/syncer_test.go | 5 +++ 7 files changed, 64 insertions(+), 13 deletions(-) diff --git a/cmd/main.go b/cmd/main.go index 5d8acf1e7..7fd1c9f03 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -308,6 +308,10 @@ func main() { // API endpoint. mux := http.NewServeMux() + // Shared mutex for serializing CR state changes between the syncer and change-commitments API. + // This ensures atomicity when applying Limes state snapshots. + crMutex := &commitments.CRMutex{} + // The pipeline monitor is a bucket for all metrics produced during the // execution of individual steps (see step monitor below) and the overall // pipeline. @@ -343,7 +347,7 @@ func main() { // Initialize commitments API for LIQUID interface (with Nova client for usage reporting) commitmentsConfig := conf.GetConfigOrDie[commitments.Config]() - commitmentsAPI := commitments.NewAPIWithConfig(multiclusterClient, commitmentsConfig, novaClient) + commitmentsAPI := commitments.NewAPIWithConfig(multiclusterClient, commitmentsConfig, novaClient, crMutex) commitmentsAPI.Init(mux, metrics.Registry, ctrl.Log.WithName("commitments-api")) deschedulingsController := &nova.DetectorPipelineController{ @@ -671,7 +675,7 @@ func main() { setupLog.Info("starting commitments syncer") syncerMonitor := commitments.NewSyncerMonitor() must.Succeed(metrics.Registry.Register(syncerMonitor)) - syncer := commitments.NewSyncer(multiclusterClient, syncerMonitor) + syncer := commitments.NewSyncer(multiclusterClient, syncerMonitor, crMutex) syncerConfig := conf.GetConfigOrDie[commitments.SyncerConfig]() syncerDefaults := commitments.DefaultSyncerConfig() if syncerConfig.SyncInterval == 0 { diff --git a/internal/scheduling/reservations/commitments/api.go b/internal/scheduling/reservations/commitments/api.go index 06fb97be1..bdefeab81 100644 --- a/internal/scheduling/reservations/commitments/api.go +++ b/internal/scheduling/reservations/commitments/api.go @@ -21,6 +21,31 @@ type UsageNovaClient interface { ListProjectServers(ctx context.Context, projectID string) ([]nova.ServerDetail, error) } +// CRMutex serializes CR state changes between the syncer and change-commitments API. +// This ensures that the syncer's Limes state snapshot is applied atomically without +// interference from concurrent change-commitments API calls. The Lock and Unlock +// methods are no-ops if the receiver is nil, allowing safe use when either component +// is disabled. +// TODO: If the syncer and API are moved to separate pods, replace with a K8s +// distributed lock (e.g., Lease-based coordination). +type CRMutex struct { + mu sync.Mutex +} + +// Lock acquires the mutex. No-op if receiver is nil. +func (m *CRMutex) Lock() { + if m != nil { + m.mu.Lock() + } +} + +// Unlock releases the mutex. No-op if receiver is nil. +func (m *CRMutex) Unlock() { + if m != nil { + m.mu.Unlock() + } +} + // HTTPAPI implements Limes LIQUID commitment validation endpoints. type HTTPAPI struct { client client.Client @@ -30,15 +55,19 @@ type HTTPAPI struct { usageMonitor ReportUsageAPIMonitor capacityMonitor ReportCapacityAPIMonitor infoMonitor InfoAPIMonitor - // Mutex to serialize change-commitments requests - changeMutex sync.Mutex + // Shared mutex to serialize CR state changes with the syncer + crMutex *CRMutex } func NewAPI(client client.Client) *HTTPAPI { - return NewAPIWithConfig(client, DefaultConfig(), nil) + return NewAPIWithConfig(client, DefaultConfig(), nil, nil) } -func NewAPIWithConfig(client client.Client, config Config, novaClient UsageNovaClient) *HTTPAPI { +func NewAPIWithConfig(client client.Client, config Config, novaClient UsageNovaClient, crMutex *CRMutex) *HTTPAPI { + // If no shared mutex provided, create a local one (for backwards compatibility in tests) + if crMutex == nil { + crMutex = &CRMutex{} + } return &HTTPAPI{ client: client, config: config, @@ -47,6 +76,7 @@ func NewAPIWithConfig(client client.Client, config Config, novaClient UsageNovaC usageMonitor: NewReportUsageAPIMonitor(), capacityMonitor: NewReportCapacityAPIMonitor(), infoMonitor: NewInfoAPIMonitor(), + crMutex: crMutex, } } diff --git a/internal/scheduling/reservations/commitments/api_change_commitments.go b/internal/scheduling/reservations/commitments/api_change_commitments.go index 5c2436267..b835a7976 100644 --- a/internal/scheduling/reservations/commitments/api_change_commitments.go +++ b/internal/scheduling/reservations/commitments/api_change_commitments.go @@ -64,9 +64,9 @@ func (api *HTTPAPI) HandleChangeCommitments(w http.ResponseWriter, r *http.Reque return } - // Serialize all change-commitments requests - api.changeMutex.Lock() - defer api.changeMutex.Unlock() + // Serialize all change-commitments requests (shared with syncer) + api.crMutex.Lock() + defer api.crMutex.Unlock() ctx := reservations.WithGlobalRequestID(context.Background(), "committed-resource-"+requestID) logger := LoggerFromContext(ctx).WithValues("component", "api", "endpoint", "/commitments/v1/change-commitments") diff --git a/internal/scheduling/reservations/commitments/api_change_commitments_test.go b/internal/scheduling/reservations/commitments/api_change_commitments_test.go index c304d9e5a..a7ebc89a6 100644 --- a/internal/scheduling/reservations/commitments/api_change_commitments_test.go +++ b/internal/scheduling/reservations/commitments/api_change_commitments_test.go @@ -999,7 +999,7 @@ func newCommitmentTestEnv( // Use custom config if provided, otherwise use default var api *HTTPAPI if customConfig != nil { - api = NewAPIWithConfig(wrappedClient, *customConfig, nil) + api = NewAPIWithConfig(wrappedClient, *customConfig, nil, nil) } else { api = NewAPI(wrappedClient) } diff --git a/internal/scheduling/reservations/commitments/api_report_usage_test.go b/internal/scheduling/reservations/commitments/api_report_usage_test.go index 26a55332d..5f2995df3 100644 --- a/internal/scheduling/reservations/commitments/api_report_usage_test.go +++ b/internal/scheduling/reservations/commitments/api_report_usage_test.go @@ -537,7 +537,7 @@ func newUsageTestEnv( } // Create API with mock Nova client - api := NewAPIWithConfig(k8sClient, DefaultConfig(), novaClient) + api := NewAPIWithConfig(k8sClient, DefaultConfig(), novaClient, nil) mux := http.NewServeMux() registry := prometheus.NewRegistry() api.Init(mux, registry, log.Log) diff --git a/internal/scheduling/reservations/commitments/syncer.go b/internal/scheduling/reservations/commitments/syncer.go index cb3d2fdb4..643e6dda5 100644 --- a/internal/scheduling/reservations/commitments/syncer.go +++ b/internal/scheduling/reservations/commitments/syncer.go @@ -43,13 +43,20 @@ type Syncer struct { client.Client // Monitor for metrics monitor *SyncerMonitor + // Shared mutex to serialize CR state changes with the change-commitments API + crMutex *CRMutex } -func NewSyncer(k8sClient client.Client, monitor *SyncerMonitor) *Syncer { +func NewSyncer(k8sClient client.Client, monitor *SyncerMonitor, crMutex *CRMutex) *Syncer { + // If no shared mutex provided, create a local one (for backwards compatibility in tests) + if crMutex == nil { + crMutex = &CRMutex{} + } return &Syncer{ CommitmentsClient: NewCommitmentsClient(), Client: k8sClient, monitor: monitor, + crMutex: crMutex, } } @@ -183,8 +190,13 @@ func (s *Syncer) getCommitmentStates(ctx context.Context, log logr.Logger, flavo } // SyncReservations fetches commitments from Limes and synchronizes Reservation CRDs. +// The mutex is held for the entire operation to ensure atomicity - the Limes state +// snapshot must be applied without interference from concurrent change-commitments API calls. func (s *Syncer) SyncReservations(ctx context.Context) error { - // TODO handle concurrency with change API: consider creation time of reservations and status ready + // Acquire the shared CR mutex for the entire sync operation. + // This ensures the Limes state snapshot is applied atomically. + s.crMutex.Lock() + defer s.crMutex.Unlock() // Create context with request ID for this sync execution runID := fmt.Sprintf("sync-%d", time.Now().Unix()) diff --git a/internal/scheduling/reservations/commitments/syncer_test.go b/internal/scheduling/reservations/commitments/syncer_test.go index b09115453..ff85b6e04 100644 --- a/internal/scheduling/reservations/commitments/syncer_test.go +++ b/internal/scheduling/reservations/commitments/syncer_test.go @@ -264,6 +264,7 @@ func TestSyncer_SyncReservations_InstanceCommitments(t *testing.T) { syncer := &Syncer{ CommitmentsClient: mockClient, Client: k8sClient, + crMutex: &CRMutex{}, } err := syncer.SyncReservations(context.Background()) @@ -400,6 +401,7 @@ func TestSyncer_SyncReservations_UpdateExisting(t *testing.T) { syncer := &Syncer{ CommitmentsClient: mockClient, Client: k8sClient, + crMutex: &CRMutex{}, } err := syncer.SyncReservations(context.Background()) @@ -499,6 +501,7 @@ func TestSyncer_SyncReservations_UnitMismatch(t *testing.T) { CommitmentsClient: mockClient, Client: k8sClient, monitor: monitor, + crMutex: &CRMutex{}, } err := syncer.SyncReservations(context.Background()) @@ -582,6 +585,7 @@ func TestSyncer_SyncReservations_UnitMatch(t *testing.T) { CommitmentsClient: mockClient, Client: k8sClient, monitor: monitor, + crMutex: &CRMutex{}, } err := syncer.SyncReservations(context.Background()) @@ -666,6 +670,7 @@ func TestSyncer_SyncReservations_EmptyUUID(t *testing.T) { syncer := &Syncer{ CommitmentsClient: mockClient, Client: k8sClient, + crMutex: &CRMutex{}, } err := syncer.SyncReservations(context.Background()) From 0922a35f829af390b96bb54bec373bc577580f5b Mon Sep 17 00:00:00 2001 From: mblos Date: Thu, 26 Mar 2026 08:54:05 +0100 Subject: [PATCH 2/2] replace go mutex with k8s lease based lock --- cmd/main.go | 16 +- .../reservations/commitments/api.go | 34 +-- .../commitments/api_change_commitments.go | 16 +- .../commitments/distributed_mutex.go | 277 ++++++++++++++++++ .../commitments/distributed_mutex_test.go | 207 +++++++++++++ .../reservations/commitments/syncer.go | 24 +- .../reservations/commitments/syncer_test.go | 10 +- 7 files changed, 534 insertions(+), 50 deletions(-) create mode 100644 internal/scheduling/reservations/commitments/distributed_mutex.go create mode 100644 internal/scheduling/reservations/commitments/distributed_mutex_test.go diff --git a/cmd/main.go b/cmd/main.go index 7fd1c9f03..0864b22fb 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -309,8 +309,20 @@ func main() { mux := http.NewServeMux() // Shared mutex for serializing CR state changes between the syncer and change-commitments API. - // This ensures atomicity when applying Limes state snapshots. - crMutex := &commitments.CRMutex{} + // This uses Kubernetes Leases for distributed coordination across pods/deployments. + // Both cortex-nova-scheduling (API) and cortex-nova-knowledge (syncer) pods will + // use the same Lease resource to serialize CR state changes. + crMutexConfig := commitments.DefaultDistributedMutexConfig() + // Use the scheduling domain namespace for the lease (typically where cortex CRDs live) + crMutexConfig.LeaseNamespace = "default" // Will be overridden by pod's namespace + if ns := os.Getenv("POD_NAMESPACE"); ns != "" { + crMutexConfig.LeaseNamespace = ns + } + // Use pod name as holder identity for debugging + if podName := os.Getenv("POD_NAME"); podName != "" { + crMutexConfig.HolderIdentity = podName + } + var crMutex commitments.CRMutexInterface = commitments.NewDistributedMutex(multiclusterClient, crMutexConfig) // The pipeline monitor is a bucket for all metrics produced during the // execution of individual steps (see step monitor below) and the overall diff --git a/internal/scheduling/reservations/commitments/api.go b/internal/scheduling/reservations/commitments/api.go index bdefeab81..79dc923a6 100644 --- a/internal/scheduling/reservations/commitments/api.go +++ b/internal/scheduling/reservations/commitments/api.go @@ -7,7 +7,6 @@ import ( "context" "net/http" "strings" - "sync" "github.com/cobaltcore-dev/cortex/internal/scheduling/nova" "github.com/go-logr/logr" @@ -21,31 +20,6 @@ type UsageNovaClient interface { ListProjectServers(ctx context.Context, projectID string) ([]nova.ServerDetail, error) } -// CRMutex serializes CR state changes between the syncer and change-commitments API. -// This ensures that the syncer's Limes state snapshot is applied atomically without -// interference from concurrent change-commitments API calls. The Lock and Unlock -// methods are no-ops if the receiver is nil, allowing safe use when either component -// is disabled. -// TODO: If the syncer and API are moved to separate pods, replace with a K8s -// distributed lock (e.g., Lease-based coordination). -type CRMutex struct { - mu sync.Mutex -} - -// Lock acquires the mutex. No-op if receiver is nil. -func (m *CRMutex) Lock() { - if m != nil { - m.mu.Lock() - } -} - -// Unlock releases the mutex. No-op if receiver is nil. -func (m *CRMutex) Unlock() { - if m != nil { - m.mu.Unlock() - } -} - // HTTPAPI implements Limes LIQUID commitment validation endpoints. type HTTPAPI struct { client client.Client @@ -55,18 +29,18 @@ type HTTPAPI struct { usageMonitor ReportUsageAPIMonitor capacityMonitor ReportCapacityAPIMonitor infoMonitor InfoAPIMonitor - // Shared mutex to serialize CR state changes with the syncer - crMutex *CRMutex + // Shared mutex to serialize CR state changes with the syncer (distributed across pods) + crMutex CRMutexInterface } func NewAPI(client client.Client) *HTTPAPI { return NewAPIWithConfig(client, DefaultConfig(), nil, nil) } -func NewAPIWithConfig(client client.Client, config Config, novaClient UsageNovaClient, crMutex *CRMutex) *HTTPAPI { +func NewAPIWithConfig(client client.Client, config Config, novaClient UsageNovaClient, crMutex CRMutexInterface) *HTTPAPI { // If no shared mutex provided, create a local one (for backwards compatibility in tests) if crMutex == nil { - crMutex = &CRMutex{} + crMutex = &LocalCRMutex{} } return &HTTPAPI{ client: client, diff --git a/internal/scheduling/reservations/commitments/api_change_commitments.go b/internal/scheduling/reservations/commitments/api_change_commitments.go index b835a7976..57c7b9c04 100644 --- a/internal/scheduling/reservations/commitments/api_change_commitments.go +++ b/internal/scheduling/reservations/commitments/api_change_commitments.go @@ -64,11 +64,19 @@ func (api *HTTPAPI) HandleChangeCommitments(w http.ResponseWriter, r *http.Reque return } - // Serialize all change-commitments requests (shared with syncer) - api.crMutex.Lock() - defer api.crMutex.Unlock() - + // Serialize all change-commitments requests (shared with syncer via distributed lock) ctx := reservations.WithGlobalRequestID(context.Background(), "committed-resource-"+requestID) + _, unlock, err := api.crMutex.Lock(ctx) + if err != nil { + logger := LoggerFromContext(ctx).WithValues("component", "api", "endpoint", "/commitments/v1/change-commitments") + logger.Error(err, "failed to acquire distributed lock for change-commitments") + statusCode = http.StatusServiceUnavailable + http.Error(w, "Failed to acquire lock, please retry later: "+err.Error(), statusCode) + api.recordMetrics(req, resp, statusCode, startTime) + return + } + defer unlock() + logger := LoggerFromContext(ctx).WithValues("component", "api", "endpoint", "/commitments/v1/change-commitments") // Only accept POST method diff --git a/internal/scheduling/reservations/commitments/distributed_mutex.go b/internal/scheduling/reservations/commitments/distributed_mutex.go new file mode 100644 index 000000000..c7f6775c7 --- /dev/null +++ b/internal/scheduling/reservations/commitments/distributed_mutex.go @@ -0,0 +1,277 @@ +// Copyright SAP SE +// SPDX-License-Identifier: Apache-2.0 + +package commitments + +import ( + "context" + "fmt" + "os" + "sync" + "time" + + "github.com/go-logr/logr" + coordinationv1 "k8s.io/api/coordination/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/log" +) + +const ( + // DefaultLeaseDuration is how long a lease is valid before it can be taken over + DefaultLeaseDuration = 30 * time.Second + // DefaultRetryInterval is how often to retry acquiring the lease + DefaultRetryInterval = 500 * time.Millisecond + // DefaultAcquireTimeout is the maximum time to wait for acquiring the lease + DefaultAcquireTimeout = 60 * time.Second +) + +// DistributedMutexConfig holds configuration for the distributed mutex +type DistributedMutexConfig struct { + // LeaseName is the name of the Lease resource + LeaseName string + // LeaseNamespace is the namespace where the Lease resource is created + LeaseNamespace string + // LeaseDuration is how long the lease is valid + LeaseDuration time.Duration + // RetryInterval is how often to retry acquiring the lease + RetryInterval time.Duration + // AcquireTimeout is the maximum time to wait for the lease + AcquireTimeout time.Duration + // HolderIdentity identifies this instance (typically pod name) + HolderIdentity string +} + +// DefaultDistributedMutexConfig returns a config with sensible defaults +func DefaultDistributedMutexConfig() DistributedMutexConfig { + hostname, err := os.Hostname() + if err != nil { + hostname = "unknown-host" + } + return DistributedMutexConfig{ + LeaseName: "cortex-cr-mutex", + LeaseNamespace: "default", + LeaseDuration: DefaultLeaseDuration, + RetryInterval: DefaultRetryInterval, + AcquireTimeout: DefaultAcquireTimeout, + HolderIdentity: hostname, + } +} + +// DistributedMutex provides distributed locking using Kubernetes Leases. +// It serializes CR state changes between the syncer and change-commitments API +// across different pods/deployments. +type DistributedMutex struct { + client client.Client + config DistributedMutexConfig + logger logr.Logger + + // localMu prevents concurrent Lock() calls from the same pod + localMu sync.Mutex + // held tracks if this instance currently holds the lease + held bool +} + +// NewDistributedMutex creates a new distributed mutex using Kubernetes Leases +func NewDistributedMutex(k8sClient client.Client, config DistributedMutexConfig) *DistributedMutex { + return &DistributedMutex{ + client: k8sClient, + config: config, + logger: log.Log.WithName("distributed-mutex"), + } +} + +// Lock acquires the distributed lock, blocking until successful or context is cancelled. +// Returns a context that should be used for the locked operation and an unlock function. +func (m *DistributedMutex) Lock(ctx context.Context) (context.Context, func(), error) { + m.localMu.Lock() + + // Create a timeout context for acquisition + acquireCtx, acquireCancel := context.WithTimeout(ctx, m.config.AcquireTimeout) + defer acquireCancel() + + ticker := time.NewTicker(m.config.RetryInterval) + defer ticker.Stop() + + startTime := time.Now() + attempts := 0 + + for { + attempts++ + acquired, err := m.tryAcquire(acquireCtx) + if err != nil { + m.localMu.Unlock() + return ctx, nil, fmt.Errorf("failed to acquire lease: %w", err) + } + if acquired { + m.held = true + m.logger.V(1).Info("acquired distributed lock", + "leaseName", m.config.LeaseName, + "holder", m.config.HolderIdentity, + "attempts", attempts, + "duration", time.Since(startTime)) + + // Return unlock function that releases both local and distributed lock + unlockOnce := sync.Once{} + unlock := func() { + unlockOnce.Do(func() { + m.release(context.Background()) // Use background context for cleanup + m.held = false + m.localMu.Unlock() + }) + } + return ctx, unlock, nil + } + + select { + case <-acquireCtx.Done(): + m.localMu.Unlock() + return ctx, nil, fmt.Errorf("timeout waiting for distributed lock after %d attempts: %w", attempts, acquireCtx.Err()) + case <-ticker.C: + // Continue trying + } + } +} + +// tryAcquire attempts to acquire or renew the lease once +func (m *DistributedMutex) tryAcquire(ctx context.Context) (bool, error) { + now := metav1.NewMicroTime(time.Now()) + leaseDurationSeconds := int32(m.config.LeaseDuration.Seconds()) + + lease := &coordinationv1.Lease{} + err := m.client.Get(ctx, types.NamespacedName{ + Name: m.config.LeaseName, + Namespace: m.config.LeaseNamespace, + }, lease) + + if errors.IsNotFound(err) { + // Create new lease + newLease := &coordinationv1.Lease{ + ObjectMeta: metav1.ObjectMeta{ + Name: m.config.LeaseName, + Namespace: m.config.LeaseNamespace, + }, + Spec: coordinationv1.LeaseSpec{ + HolderIdentity: &m.config.HolderIdentity, + LeaseDurationSeconds: &leaseDurationSeconds, + AcquireTime: &now, + RenewTime: &now, + }, + } + if err := m.client.Create(ctx, newLease); err != nil { + if errors.IsAlreadyExists(err) { + // Race condition - another pod created it first, retry + return false, nil + } + return false, err + } + return true, nil + } + if err != nil { + return false, err + } + + // Check if we already hold the lease + if lease.Spec.HolderIdentity != nil && *lease.Spec.HolderIdentity == m.config.HolderIdentity { + // Renew our own lease + lease.Spec.RenewTime = &now + if err := m.client.Update(ctx, lease); err != nil { + if errors.IsConflict(err) { + return false, nil // Retry + } + return false, err + } + return true, nil + } + + // Check if lease has expired + if lease.Spec.RenewTime != nil && lease.Spec.LeaseDurationSeconds != nil { + expireTime := lease.Spec.RenewTime.Add(time.Duration(*lease.Spec.LeaseDurationSeconds) * time.Second) + if time.Now().After(expireTime) { + // Lease expired, try to take over + lease.Spec.HolderIdentity = &m.config.HolderIdentity + lease.Spec.AcquireTime = &now + lease.Spec.RenewTime = &now + lease.Spec.LeaseDurationSeconds = &leaseDurationSeconds + if err := m.client.Update(ctx, lease); err != nil { + if errors.IsConflict(err) { + return false, nil // Another pod took it first, retry + } + return false, err + } + m.logger.Info("took over expired lease", + "leaseName", m.config.LeaseName, + "previousHolder", lease.Spec.HolderIdentity) + return true, nil + } + } + + // Lease is held by another pod and not expired + m.logger.V(1).Info("waiting for lease", + "leaseName", m.config.LeaseName, + "currentHolder", *lease.Spec.HolderIdentity) + return false, nil +} + +// release releases the distributed lock +func (m *DistributedMutex) release(ctx context.Context) { + lease := &coordinationv1.Lease{} + err := m.client.Get(ctx, types.NamespacedName{ + Name: m.config.LeaseName, + Namespace: m.config.LeaseNamespace, + }, lease) + if err != nil { + m.logger.Error(err, "failed to get lease for release") + return + } + + // Only release if we hold it + if lease.Spec.HolderIdentity == nil || *lease.Spec.HolderIdentity != m.config.HolderIdentity { + m.logger.V(1).Info("not releasing lease - not held by us", + "leaseName", m.config.LeaseName, + "holder", lease.Spec.HolderIdentity) + return + } + + // Set expire time to now by setting renew time in the past + pastTime := metav1.NewMicroTime(time.Now().Add(-m.config.LeaseDuration)) + lease.Spec.RenewTime = &pastTime + + if err := m.client.Update(ctx, lease); err != nil { + m.logger.Error(err, "failed to release lease") + return + } + m.logger.V(1).Info("released distributed lock", "leaseName", m.config.LeaseName) +} + +// CRMutexInterface defines the interface for CR state serialization. +// This allows switching between local (in-memory) and distributed (Lease-based) implementations. +type CRMutexInterface interface { + // Lock acquires the lock. Returns an unlock function that must be called to release. + // The returned context should be used for the locked operation. + Lock(ctx context.Context) (context.Context, func(), error) +} + +// Ensure implementations satisfy the interface +var _ CRMutexInterface = (*DistributedMutex)(nil) +var _ CRMutexInterface = (*LocalCRMutex)(nil) + +// LocalCRMutex is an in-memory mutex for single-pod deployments or testing. +// It wraps sync.Mutex to implement CRMutexInterface. +type LocalCRMutex struct { + mu sync.Mutex +} + +// Lock acquires the local mutex +func (m *LocalCRMutex) Lock(ctx context.Context) (context.Context, func(), error) { + m.mu.Lock() + unlockOnce := sync.Once{} + unlock := func() { + unlockOnce.Do(func() { + m.mu.Unlock() + }) + } + return ctx, unlock, nil +} diff --git a/internal/scheduling/reservations/commitments/distributed_mutex_test.go b/internal/scheduling/reservations/commitments/distributed_mutex_test.go new file mode 100644 index 000000000..da2f7714d --- /dev/null +++ b/internal/scheduling/reservations/commitments/distributed_mutex_test.go @@ -0,0 +1,207 @@ +// Copyright SAP SE +// SPDX-License-Identifier: Apache-2.0 + +package commitments + +import ( + "context" + "sync" + "sync/atomic" + "testing" + "time" + + coordinationv1 "k8s.io/api/coordination/v1" + "k8s.io/apimachinery/pkg/runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" +) + +func TestLocalCRMutex_Lock(t *testing.T) { + mu := &LocalCRMutex{} + ctx := context.Background() + + // Acquire lock + _, unlock, err := mu.Lock(ctx) + if err != nil { + t.Fatalf("Lock failed: %v", err) + } + + // Verify unlock function works + unlock() + + // Should be able to acquire again after unlock + _, unlock2, err := mu.Lock(ctx) + if err != nil { + t.Fatalf("Second lock failed: %v", err) + } + unlock2() +} + +func TestLocalCRMutex_ConcurrentAccess(t *testing.T) { + mu := &LocalCRMutex{} + ctx := context.Background() + var counter int64 + var wg sync.WaitGroup + + // Run multiple goroutines that try to increment counter + for range 10 { + wg.Add(1) + go func() { + defer wg.Done() + _, unlock, err := mu.Lock(ctx) + if err != nil { + t.Errorf("Lock failed: %v", err) + return + } + defer unlock() + // Critical section - should be serialized + val := atomic.LoadInt64(&counter) + time.Sleep(time.Millisecond) + atomic.StoreInt64(&counter, val+1) + }() + } + + wg.Wait() + + // All increments should have happened + if counter != 10 { + t.Errorf("Expected counter=10, got %d", counter) + } +} + +func TestDistributedMutex_AcquireAndRelease(t *testing.T) { + scheme := runtime.NewScheme() + if err := coordinationv1.AddToScheme(scheme); err != nil { + t.Fatalf("Failed to add scheme: %v", err) + } + + k8sClient := fake.NewClientBuilder(). + WithScheme(scheme). + Build() + + config := DistributedMutexConfig{ + LeaseName: "test-mutex", + LeaseNamespace: "default", + LeaseDuration: 5 * time.Second, + RetryInterval: 100 * time.Millisecond, + AcquireTimeout: 10 * time.Second, + HolderIdentity: "test-pod-1", + } + + mu := NewDistributedMutex(k8sClient, config) + ctx := context.Background() + + // Acquire lock + _, unlock, err := mu.Lock(ctx) + if err != nil { + t.Fatalf("Lock failed: %v", err) + } + + // Verify lease was created + lease := &coordinationv1.Lease{} + if err := k8sClient.Get(ctx, client.ObjectKey{Name: "test-mutex", Namespace: "default"}, lease); err != nil { + t.Fatalf("Failed to get lease: %v", err) + } + + if lease.Spec.HolderIdentity == nil || *lease.Spec.HolderIdentity != "test-pod-1" { + t.Errorf("Expected holder identity 'test-pod-1', got %v", lease.Spec.HolderIdentity) + } + + // Release lock + unlock() + + // After release, the lease should have renew time in the past (expired) + if err := k8sClient.Get(ctx, client.ObjectKey{Name: "test-mutex", Namespace: "default"}, lease); err != nil { + t.Fatalf("Failed to get lease after release: %v", err) + } + + // Check that the lease is now acquirable by another holder + if lease.Spec.RenewTime != nil && lease.Spec.LeaseDurationSeconds != nil { + expireTime := lease.Spec.RenewTime.Add(time.Duration(*lease.Spec.LeaseDurationSeconds) * time.Second) + if time.Now().Before(expireTime) { + t.Errorf("Lease should be expired after release, but expires at %v", expireTime) + } + } +} + +func TestDistributedMutex_ReacquireSamePod(t *testing.T) { + scheme := runtime.NewScheme() + if err := coordinationv1.AddToScheme(scheme); err != nil { + t.Fatalf("Failed to add scheme: %v", err) + } + + k8sClient := fake.NewClientBuilder(). + WithScheme(scheme). + Build() + + config := DistributedMutexConfig{ + LeaseName: "test-mutex-reacquire", + LeaseNamespace: "default", + LeaseDuration: 5 * time.Second, + RetryInterval: 100 * time.Millisecond, + AcquireTimeout: 10 * time.Second, + HolderIdentity: "test-pod-1", + } + + mu := NewDistributedMutex(k8sClient, config) + ctx := context.Background() + + // Acquire lock twice from the same mutex instance (same holder) + _, unlock1, err := mu.Lock(ctx) + if err != nil { + t.Fatalf("First lock failed: %v", err) + } + unlock1() + + // Should be able to reacquire + _, unlock2, err := mu.Lock(ctx) + if err != nil { + t.Fatalf("Second lock failed: %v", err) + } + unlock2() +} + +func TestDistributedMutex_Timeout(t *testing.T) { + scheme := runtime.NewScheme() + if err := coordinationv1.AddToScheme(scheme); err != nil { + t.Fatalf("Failed to add scheme: %v", err) + } + + k8sClient := fake.NewClientBuilder(). + WithScheme(scheme). + Build() + + // First mutex acquires the lock + config1 := DistributedMutexConfig{ + LeaseName: "test-mutex-timeout", + LeaseNamespace: "default", + LeaseDuration: 30 * time.Second, // Long lease + RetryInterval: 50 * time.Millisecond, + AcquireTimeout: 10 * time.Second, + HolderIdentity: "holder-1", + } + mu1 := NewDistributedMutex(k8sClient, config1) + ctx := context.Background() + + _, _, err := mu1.Lock(ctx) + if err != nil { + t.Fatalf("First lock failed: %v", err) + } + // Don't release - keep the lock held + + // Second mutex tries to acquire with short timeout + config2 := DistributedMutexConfig{ + LeaseName: "test-mutex-timeout", + LeaseNamespace: "default", + LeaseDuration: 5 * time.Second, + RetryInterval: 50 * time.Millisecond, + AcquireTimeout: 200 * time.Millisecond, // Short timeout + HolderIdentity: "holder-2", + } + mu2 := NewDistributedMutex(k8sClient, config2) + + _, _, err = mu2.Lock(ctx) + if err == nil { + t.Error("Expected timeout error, but lock succeeded") + } +} diff --git a/internal/scheduling/reservations/commitments/syncer.go b/internal/scheduling/reservations/commitments/syncer.go index 643e6dda5..56ef9659f 100644 --- a/internal/scheduling/reservations/commitments/syncer.go +++ b/internal/scheduling/reservations/commitments/syncer.go @@ -43,14 +43,14 @@ type Syncer struct { client.Client // Monitor for metrics monitor *SyncerMonitor - // Shared mutex to serialize CR state changes with the change-commitments API - crMutex *CRMutex + // Shared mutex to serialize CR state changes with the change-commitments API (distributed across pods) + crMutex CRMutexInterface } -func NewSyncer(k8sClient client.Client, monitor *SyncerMonitor, crMutex *CRMutex) *Syncer { +func NewSyncer(k8sClient client.Client, monitor *SyncerMonitor, crMutex CRMutexInterface) *Syncer { // If no shared mutex provided, create a local one (for backwards compatibility in tests) if crMutex == nil { - crMutex = &CRMutex{} + crMutex = &LocalCRMutex{} } return &Syncer{ CommitmentsClient: NewCommitmentsClient(), @@ -193,14 +193,20 @@ func (s *Syncer) getCommitmentStates(ctx context.Context, log logr.Logger, flavo // The mutex is held for the entire operation to ensure atomicity - the Limes state // snapshot must be applied without interference from concurrent change-commitments API calls. func (s *Syncer) SyncReservations(ctx context.Context) error { - // Acquire the shared CR mutex for the entire sync operation. - // This ensures the Limes state snapshot is applied atomically. - s.crMutex.Lock() - defer s.crMutex.Unlock() - // Create context with request ID for this sync execution runID := fmt.Sprintf("sync-%d", time.Now().Unix()) ctx = WithNewGlobalRequestID(ctx) + + // Acquire the shared CR mutex for the entire sync operation (distributed across pods). + // This ensures the Limes state snapshot is applied atomically. + _, unlock, err := s.crMutex.Lock(ctx) + if err != nil { + logger := LoggerFromContext(ctx).WithValues("component", "syncer", "runID", runID) + logger.Error(err, "failed to acquire distributed lock for sync") + return fmt.Errorf("failed to acquire distributed lock: %w", err) + } + defer unlock() + logger := LoggerFromContext(ctx).WithValues("component", "syncer", "runID", runID) logger.Info("starting commitment sync with sync interval", "interval", DefaultSyncerConfig().SyncInterval) diff --git a/internal/scheduling/reservations/commitments/syncer_test.go b/internal/scheduling/reservations/commitments/syncer_test.go index ff85b6e04..f6b7b3eb5 100644 --- a/internal/scheduling/reservations/commitments/syncer_test.go +++ b/internal/scheduling/reservations/commitments/syncer_test.go @@ -264,7 +264,7 @@ func TestSyncer_SyncReservations_InstanceCommitments(t *testing.T) { syncer := &Syncer{ CommitmentsClient: mockClient, Client: k8sClient, - crMutex: &CRMutex{}, + crMutex: &LocalCRMutex{}, } err := syncer.SyncReservations(context.Background()) @@ -401,7 +401,7 @@ func TestSyncer_SyncReservations_UpdateExisting(t *testing.T) { syncer := &Syncer{ CommitmentsClient: mockClient, Client: k8sClient, - crMutex: &CRMutex{}, + crMutex: &LocalCRMutex{}, } err := syncer.SyncReservations(context.Background()) @@ -501,7 +501,7 @@ func TestSyncer_SyncReservations_UnitMismatch(t *testing.T) { CommitmentsClient: mockClient, Client: k8sClient, monitor: monitor, - crMutex: &CRMutex{}, + crMutex: &LocalCRMutex{}, } err := syncer.SyncReservations(context.Background()) @@ -585,7 +585,7 @@ func TestSyncer_SyncReservations_UnitMatch(t *testing.T) { CommitmentsClient: mockClient, Client: k8sClient, monitor: monitor, - crMutex: &CRMutex{}, + crMutex: &LocalCRMutex{}, } err := syncer.SyncReservations(context.Background()) @@ -670,7 +670,7 @@ func TestSyncer_SyncReservations_EmptyUUID(t *testing.T) { syncer := &Syncer{ CommitmentsClient: mockClient, Client: k8sClient, - crMutex: &CRMutex{}, + crMutex: &LocalCRMutex{}, } err := syncer.SyncReservations(context.Background())