Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,10 @@ func main() {
// API endpoint.
mux := http.NewServeMux()

// Shared mutex for serializing CR state changes between the syncer and change-commitments API.
// This ensures atomicity when applying Limes state snapshots.
crMutex := &commitments.CRMutex{}

// The pipeline monitor is a bucket for all metrics produced during the
// execution of individual steps (see step monitor below) and the overall
// pipeline.
Expand Down Expand Up @@ -343,7 +347,7 @@ func main() {

// Initialize commitments API for LIQUID interface (with Nova client for usage reporting)
commitmentsConfig := conf.GetConfigOrDie[commitments.Config]()
commitmentsAPI := commitments.NewAPIWithConfig(multiclusterClient, commitmentsConfig, novaClient)
commitmentsAPI := commitments.NewAPIWithConfig(multiclusterClient, commitmentsConfig, novaClient, crMutex)
commitmentsAPI.Init(mux, metrics.Registry, ctrl.Log.WithName("commitments-api"))

deschedulingsController := &nova.DetectorPipelineController{
Expand Down Expand Up @@ -671,7 +675,7 @@ func main() {
setupLog.Info("starting commitments syncer")
syncerMonitor := commitments.NewSyncerMonitor()
must.Succeed(metrics.Registry.Register(syncerMonitor))
syncer := commitments.NewSyncer(multiclusterClient, syncerMonitor)
syncer := commitments.NewSyncer(multiclusterClient, syncerMonitor, crMutex)
syncerConfig := conf.GetConfigOrDie[commitments.SyncerConfig]()
syncerDefaults := commitments.DefaultSyncerConfig()
if syncerConfig.SyncInterval == 0 {
Expand Down
38 changes: 34 additions & 4 deletions internal/scheduling/reservations/commitments/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,31 @@ type UsageNovaClient interface {
ListProjectServers(ctx context.Context, projectID string) ([]nova.ServerDetail, error)
}

// CRMutex serializes CR state changes between the syncer and change-commitments API.
// This ensures that the syncer's Limes state snapshot is applied atomically without
// interference from concurrent change-commitments API calls. The Lock and Unlock
// methods are no-ops if the receiver is nil, allowing safe use when either component
// is disabled.
// TODO: If the syncer and API are moved to separate pods, replace with a K8s
// distributed lock (e.g., Lease-based coordination).
type CRMutex struct {
mu sync.Mutex
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

# Find and inspect the CRMutex definition
find . -name "api.go" -path "*/commitments/*" | head -5

Repository: cobaltcore-dev/cortex

Length of output: 118


🏁 Script executed:

# Check main.go for mutex creation
find . -name "main.go" -type f | head -10

Repository: cobaltcore-dev/cortex

Length of output: 165


🏁 Script executed:

# Look at the directory structure to understand the project layout
ls -la internal/scheduling/reservations/commitments/ 2>/dev/null || echo "Path may differ"
git ls-files | grep -E "(main|api|commitments)" | head -20

Repository: cobaltcore-dev/cortex

Length of output: 3527


🏁 Script executed:

# Read the api.go file to see CRMutex definition
cat -n internal/scheduling/reservations/commitments/api.go | head -50

Repository: cobaltcore-dev/cortex

Length of output: 1892


🏁 Script executed:

# Read main.go to see how CRMutex is initialized
cat -n cmd/main.go | head -100

Repository: cobaltcore-dev/cortex

Length of output: 4468


🏁 Script executed:

# Search for where CRMutex is used
rg "CRMutex" --type go -B 2 -A 2

Repository: cobaltcore-dev/cortex

Length of output: 6169


🏁 Script executed:

# Search for deployment configuration or documentation about multi-replica setup
find . -name "*.yaml" -o -name "*.yml" -o -name "Dockerfile" -o -name "README*" | grep -E "(deploy|helm|kube|docker|README)" | head -20

Repository: cobaltcore-dev/cortex

Length of output: 955


🏁 Script executed:

# Look for environment/deployment configurations
git ls-files | grep -E "(helm|deploy|kube|config)" | head -20

Repository: cobaltcore-dev/cortex

Length of output: 887


🏁 Script executed:

# Check if there are multiple instances expected to run
rg "replicas|replica|multi-pod|multi-instance" --type go -i | head -20

Repository: cobaltcore-dev/cortex

Length of output: 2820


🏁 Script executed:

# Look at the full main.go to understand the complete context of crMutex usage
sed -n '1,300p' cmd/main.go | tail -100

Repository: cobaltcore-dev/cortex

Length of output: 4633


🏁 Script executed:

# Check if leader election is actually being used and how it impacts the syncer/API
rg "LeaderElection" --type go -B 2 -A 2 | head -40

Repository: cobaltcore-dev/cortex

Length of output: 2152


🏁 Script executed:

# Look at how the syncer and API are registered with the manager
sed -n '280,400p' cmd/main.go

Repository: cobaltcore-dev/cortex

Length of output: 5630


🏁 Script executed:

# Check helm values to understand the deployment architecture
cat helm/bundles/cortex-cinder/values.yaml | grep -A 20 "replicas\|deployment" | head -40

Repository: cobaltcore-dev/cortex

Length of output: 688


🏁 Script executed:

# Check if the syncer and API are actually in separate pods or the same process
rg "NewSyncer|syncer.*crMutex" --type go -B 2 -A 5

Repository: cobaltcore-dev/cortex

Length of output: 5176


🏁 Script executed:

# Look for where the syncer is initialized in main.go to confirm co-location
sed -n '380,500p' cmd/main.go | grep -A 20 -B 5 -i "syncer"

Repository: cobaltcore-dev/cortex

Length of output: 47


🏁 Script executed:

# Search for any split deployment or separate syncer pod configuration
rg "syncer|commitments" helm/bundles/cortex-cinder/values.yaml -A 3 -B 3

Repository: cobaltcore-dev/cortex

Length of output: 47


🏁 Script executed:

# Check the complete syncer initialization and confirm both syncer and API are conditional
sed -n '300,450p' cmd/main.go | grep -A 30 "commitments-sync-task"

Repository: cobaltcore-dev/cortex

Length of output: 47


🏁 Script executed:

# Verify if there's any cross-pod coordination or distributed locking currently in place
rg "Lease|distributed|coordination|lock" --type go internal/scheduling/reservations/commitments/ -i

Repository: cobaltcore-dev/cortex

Length of output: 2653


🏁 Script executed:

# Check if leader election is actually enabled in practice and if multi-replica scenarios are supported
cat helm/bundles/cortex-cinder/values.yaml | head -50

Repository: cobaltcore-dev/cortex

Length of output: 1283


🏁 Script executed:

# Verify the actual syncer initialization block in main.go
rg "commitments-sync-task|commitments-api" cmd/main.go -B 2 -A 15

Repository: cobaltcore-dev/cortex

Length of output: 2330


🏁 Script executed:

# Check if there's any documentation about deployment models or whether split deployments are planned
find . -name "*.md" -type f | xargs grep -l "syncer\|commitments\|deployment" | head -5

Repository: cobaltcore-dev/cortex

Length of output: 230


🏁 Script executed:

# Check the full syncer initialization to confirm both components are in the same process currently
sed -n '430,470p' cmd/main.go

Repository: cobaltcore-dev/cortex

Length of output: 1948


🏁 Script executed:

# Look for any task.Runner or separate process initialization for syncer
rg "SetupWithManager.*syncer\|commitments.*SetupWithManager" --type go -B 3 -A 3

Repository: cobaltcore-dev/cortex

Length of output: 47


🏁 Script executed:

# Check the deployment documentation to understand the expected deployment model
cat docs/reservations/committed-resource-reservations.md | head -100

Repository: cobaltcore-dev/cortex

Length of output: 5148


🏁 Script executed:

# Look for any existing distributed coordination mechanisms or TODO comments about multi-replica
rg "TODO|FIXME|XXX" internal/scheduling/reservations/commitments/ --type go | grep -i "replica\|pod\|distributed"

Repository: cobaltcore-dev/cortex

Length of output: 194


🏁 Script executed:

# Confirm the actual scenario: are syncer and API always co-located?
rg "nova-pipeline-controllers.*commitments-sync-task\|commitments-sync-task.*nova-pipeline-controllers" helm/ --type yaml

Repository: cobaltcore-dev/cortex

Length of output: 47


🏁 Script executed:

# Confirm whether the syncer and API are conditionally enabled or always together
rg "nova-pipeline-controllers" helm/bundles/cortex-cinder/values.yaml -A 5 -B 5

Repository: cobaltcore-dev/cortex

Length of output: 47


🏁 Script executed:

# Check if there are other configurations that could enable them separately
git ls-files | xargs grep -l "commitments-sync-task\|nova-pipeline-controllers" | grep -E "(yaml|helm)" | head -5

Repository: cobaltcore-dev/cortex

Length of output: 101


🏁 Script executed:

# Check the nova deployment configuration to see how syncer and API are enabled
cat helm/bundles/cortex-nova/values.yaml | grep -A 20 -B 5 "commitments-sync-task\|enabledControllers\|enabledTasks"

Repository: cobaltcore-dev/cortex

Length of output: 2273


🏁 Script executed:

# Verify the helm deployment splits - check if cortex-nova-knowledge and cortex-nova-scheduling are separate
cat helm/bundles/cortex-nova/Chart.yaml

Repository: cobaltcore-dev/cortex

Length of output: 1119


🏁 Script executed:

# Confirm the dependencies and deployment structure
git ls-files helm/bundles/cortex-nova/ | grep -E "values|deployment|templates"

Repository: cobaltcore-dev/cortex

Length of output: 815


CRMutex provides no cross-pod synchronization in current deployment.

The syncer and API run in separate Kubernetes deployments: cortex-nova-knowledge (syncer) and cortex-nova-scheduling (API). Each pod independently creates its own crMutex instance in main.go. In multi-replica scenarios, a request to the API pod can race concurrent writes from the syncer running in a different pod, violating the atomicity guarantee claimed in the docstring. The current in-memory mutex only serializes within a single pod. A distributed coordination mechanism (e.g., Kubernetes Lease-based lock) is required to properly serialize CR state changes across pods.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@internal/scheduling/reservations/commitments/api.go` around lines 24 - 33,
The CRMutex type and its Lock/Unlock methods only use an in-memory sync.Mutex
and therefore do not provide cross-pod serialization; replace or augment CRMutex
to use a Kubernetes Lease-based distributed lock (or another cluster-wide
coordination primitive) inside the CRMutex methods so Lock/Unlock block across
pods, ensure the existing nil-receiver semantics remain safe, and update the
CRMutex construction logic in main.go (where crMutex is created) to initialize
the Kubernetes client/Lease lock instance and pass it into CRMutex so API and
syncer pods contend on the same Lease.


// Lock acquires the mutex. No-op if receiver is nil.
func (m *CRMutex) Lock() {
if m != nil {
m.mu.Lock()
}
}

// Unlock releases the mutex. No-op if receiver is nil.
func (m *CRMutex) Unlock() {
if m != nil {
m.mu.Unlock()
}
}

// HTTPAPI implements Limes LIQUID commitment validation endpoints.
type HTTPAPI struct {
client client.Client
Expand All @@ -30,15 +55,19 @@ type HTTPAPI struct {
usageMonitor ReportUsageAPIMonitor
capacityMonitor ReportCapacityAPIMonitor
infoMonitor InfoAPIMonitor
// Mutex to serialize change-commitments requests
changeMutex sync.Mutex
// Shared mutex to serialize CR state changes with the syncer
crMutex *CRMutex
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ideomatic way in Golang is as follows. You can use sync.Mutex as a field in your type struct.

type HTTPAPI struct {
  mutex sync.Mutex
}

And when you're initializing the struct, you can directly use your mutex. No nil check needed.

api := HTTPAPI{}
api.mutex.Lock() // This should work without explicit initialization

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This because you are embedding mutex here, and initializing the surrounding struct will initialize the embedded struct as well.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The mutex wrapper shouldn't be necessary.

}

func NewAPI(client client.Client) *HTTPAPI {
return NewAPIWithConfig(client, DefaultConfig(), nil)
return NewAPIWithConfig(client, DefaultConfig(), nil, nil)
}

func NewAPIWithConfig(client client.Client, config Config, novaClient UsageNovaClient) *HTTPAPI {
func NewAPIWithConfig(client client.Client, config Config, novaClient UsageNovaClient, crMutex *CRMutex) *HTTPAPI {
// If no shared mutex provided, create a local one (for backwards compatibility in tests)
if crMutex == nil {
crMutex = &CRMutex{}
}
return &HTTPAPI{
client: client,
config: config,
Expand All @@ -47,6 +76,7 @@ func NewAPIWithConfig(client client.Client, config Config, novaClient UsageNovaC
usageMonitor: NewReportUsageAPIMonitor(),
capacityMonitor: NewReportCapacityAPIMonitor(),
infoMonitor: NewInfoAPIMonitor(),
crMutex: crMutex,
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,9 @@ func (api *HTTPAPI) HandleChangeCommitments(w http.ResponseWriter, r *http.Reque
return
}

// Serialize all change-commitments requests
api.changeMutex.Lock()
defer api.changeMutex.Unlock()
// Serialize all change-commitments requests (shared with syncer)
api.crMutex.Lock()
defer api.crMutex.Unlock()

ctx := reservations.WithGlobalRequestID(context.Background(), "committed-resource-"+requestID)
logger := LoggerFromContext(ctx).WithValues("component", "api", "endpoint", "/commitments/v1/change-commitments")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -999,7 +999,7 @@ func newCommitmentTestEnv(
// Use custom config if provided, otherwise use default
var api *HTTPAPI
if customConfig != nil {
api = NewAPIWithConfig(wrappedClient, *customConfig, nil)
api = NewAPIWithConfig(wrappedClient, *customConfig, nil, nil)
} else {
api = NewAPI(wrappedClient)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,7 @@ func newUsageTestEnv(
}

// Create API with mock Nova client
api := NewAPIWithConfig(k8sClient, DefaultConfig(), novaClient)
api := NewAPIWithConfig(k8sClient, DefaultConfig(), novaClient, nil)
mux := http.NewServeMux()
registry := prometheus.NewRegistry()
api.Init(mux, registry, log.Log)
Expand Down
16 changes: 14 additions & 2 deletions internal/scheduling/reservations/commitments/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,20 @@ type Syncer struct {
client.Client
// Monitor for metrics
monitor *SyncerMonitor
// Shared mutex to serialize CR state changes with the change-commitments API
crMutex *CRMutex
}

func NewSyncer(k8sClient client.Client, monitor *SyncerMonitor) *Syncer {
func NewSyncer(k8sClient client.Client, monitor *SyncerMonitor, crMutex *CRMutex) *Syncer {
// If no shared mutex provided, create a local one (for backwards compatibility in tests)
if crMutex == nil {
crMutex = &CRMutex{}
}
return &Syncer{
CommitmentsClient: NewCommitmentsClient(),
Client: k8sClient,
monitor: monitor,
crMutex: crMutex,
}
}

Expand Down Expand Up @@ -183,8 +190,13 @@ func (s *Syncer) getCommitmentStates(ctx context.Context, log logr.Logger, flavo
}

// SyncReservations fetches commitments from Limes and synchronizes Reservation CRDs.
// The mutex is held for the entire operation to ensure atomicity - the Limes state
// snapshot must be applied without interference from concurrent change-commitments API calls.
func (s *Syncer) SyncReservations(ctx context.Context) error {
// TODO handle concurrency with change API: consider creation time of reservations and status ready
// Acquire the shared CR mutex for the entire sync operation.
// This ensures the Limes state snapshot is applied atomically.
s.crMutex.Lock()
defer s.crMutex.Unlock()

// Create context with request ID for this sync execution
runID := fmt.Sprintf("sync-%d", time.Now().Unix())
Expand Down
5 changes: 5 additions & 0 deletions internal/scheduling/reservations/commitments/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ func TestSyncer_SyncReservations_InstanceCommitments(t *testing.T) {
syncer := &Syncer{
CommitmentsClient: mockClient,
Client: k8sClient,
crMutex: &CRMutex{},
}

err := syncer.SyncReservations(context.Background())
Expand Down Expand Up @@ -400,6 +401,7 @@ func TestSyncer_SyncReservations_UpdateExisting(t *testing.T) {
syncer := &Syncer{
CommitmentsClient: mockClient,
Client: k8sClient,
crMutex: &CRMutex{},
}

err := syncer.SyncReservations(context.Background())
Expand Down Expand Up @@ -499,6 +501,7 @@ func TestSyncer_SyncReservations_UnitMismatch(t *testing.T) {
CommitmentsClient: mockClient,
Client: k8sClient,
monitor: monitor,
crMutex: &CRMutex{},
}

err := syncer.SyncReservations(context.Background())
Expand Down Expand Up @@ -582,6 +585,7 @@ func TestSyncer_SyncReservations_UnitMatch(t *testing.T) {
CommitmentsClient: mockClient,
Client: k8sClient,
monitor: monitor,
crMutex: &CRMutex{},
}

err := syncer.SyncReservations(context.Background())
Expand Down Expand Up @@ -666,6 +670,7 @@ func TestSyncer_SyncReservations_EmptyUUID(t *testing.T) {
syncer := &Syncer{
CommitmentsClient: mockClient,
Client: k8sClient,
crMutex: &CRMutex{},
}

err := syncer.SyncReservations(context.Background())
Expand Down
Loading