Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 41 additions & 62 deletions pkg/cvo/cvo.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,13 @@ import (
"github.com/openshift/cluster-version-operator/pkg/payload/precondition"
preconditioncv "github.com/openshift/cluster-version-operator/pkg/payload/precondition/clusterversion"
"github.com/openshift/cluster-version-operator/pkg/risk"
"github.com/openshift/cluster-version-operator/pkg/risk/adminack"
"github.com/openshift/cluster-version-operator/pkg/risk/aggregate"
"github.com/openshift/cluster-version-operator/pkg/risk/alert"
deletionrisk "github.com/openshift/cluster-version-operator/pkg/risk/deletion"
"github.com/openshift/cluster-version-operator/pkg/risk/overrides"
updatingrisk "github.com/openshift/cluster-version-operator/pkg/risk/updating"
upgradeablerisk "github.com/openshift/cluster-version-operator/pkg/risk/upgradeable"
)

const (
Expand Down Expand Up @@ -130,21 +136,13 @@ type Operator struct {
queue workqueue.TypedRateLimitingInterface[any]
// availableUpdatesQueue tracks checking for updates from the update server.
availableUpdatesQueue workqueue.TypedRateLimitingInterface[any]
// upgradeableQueue tracks checking for upgradeable.
upgradeableQueue workqueue.TypedRateLimitingInterface[any]

// statusLock guards access to modifying available updates
statusLock sync.Mutex
availableUpdates *availableUpdates

// upgradeableStatusLock guards access to modifying Upgradeable conditions
upgradeableStatusLock sync.Mutex
upgradeable *upgradeable
upgradeableChecks []upgradeableCheck

// upgradeableCheckIntervals drives minimal intervals between Upgradeable status
// synchronization
upgradeableCheckIntervals upgradeableCheckIntervals
// upgradeable tracks the risks that feed the ClusterVersion Upgradeable condition.
upgradeable risk.Source

// conditionRegistry is used to evaluate whether a particular condition is risky or not.
conditionRegistry clusterconditions.ConditionRegistry
Expand Down Expand Up @@ -257,7 +255,6 @@ func New(

statusInterval: 15 * time.Second,
minimumUpdateCheckInterval: minimumInterval,
upgradeableCheckIntervals: defaultUpgradeableCheckIntervals(),
payloadDir: overridePayloadDir,
updateService: updateService,

Expand All @@ -267,13 +264,11 @@ func New(
eventRecorder: eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: namespace}),
queue: workqueue.NewTypedRateLimitingQueueWithConfig(workqueue.DefaultTypedControllerRateLimiter[any](), workqueue.TypedRateLimitingQueueConfig[any]{Name: "clusterversion"}),
availableUpdatesQueue: workqueue.NewTypedRateLimitingQueueWithConfig(workqueue.DefaultTypedControllerRateLimiter[any](), workqueue.TypedRateLimitingQueueConfig[any]{Name: "availableupdates"}),
upgradeableQueue: workqueue.NewTypedRateLimitingQueueWithConfig(workqueue.DefaultTypedControllerRateLimiter[any](), workqueue.TypedRateLimitingQueueConfig[any]{Name: "upgradeable"}),

hypershift: hypershift,
exclude: exclude,
clusterProfile: clusterProfile,
conditionRegistry: standard.NewConditionRegistry(promqlTarget),
risks: alert.New("Alert", promqlTarget),
injectClusterIdIntoPromQL: injectClusterIdIntoPromQL,

requiredFeatureSet: featureSet,
Expand All @@ -286,12 +281,6 @@ func New(
if _, err := cvInformer.Informer().AddEventHandler(optr.clusterVersionEventHandler()); err != nil {
return nil, err
}
if _, err := cmConfigInformer.Informer().AddEventHandler(optr.adminAcksEventHandler()); err != nil {
return nil, err
}
if _, err := cmConfigManagedInformer.Informer().AddEventHandler(optr.adminGatesEventHandler()); err != nil {
return nil, err
}
if _, err := coInformer.Informer().AddEventHandler(optr.clusterOperatorEventHandler()); err != nil {
return nil, err
}
Expand All @@ -307,13 +296,45 @@ func New(

optr.proxyLister = proxyInformer.Lister()
optr.cmConfigLister = cmConfigInformer.Lister().ConfigMaps(internal.ConfigNamespace)
optr.cacheSynced = append(optr.cacheSynced, cmConfigInformer.Informer().HasSynced)
optr.cmConfigManagedLister = cmConfigManagedInformer.Lister().ConfigMaps(internal.ConfigManagedNamespace)
optr.cacheSynced = append(optr.cacheSynced, cmConfigManagedInformer.Informer().HasSynced)

optr.featureGateLister = featureGateInformer.Lister()
optr.cacheSynced = append(optr.cacheSynced, featureGateInformer.Informer().HasSynced)

// make sure this is initialized after all the listers are initialized
optr.upgradeableChecks = optr.defaultUpgradeableChecks()
riskSourceCallback := func() { optr.availableUpdatesQueue.Add(optr.queueKey()) }

risks := []risk.Source{}
if source, err := updatingrisk.New("ClusterVersionUpdating", optr.name, cvInformer, riskSourceCallback); err != nil {
return optr, err
} else {
risks = append(risks, source)
}
if source, err := overrides.New("ClusterVersionOverrides", optr.name, cvInformer, riskSourceCallback); err != nil {
return optr, err
} else {
risks = append(risks, source)
}
risks = append(risks, deletionrisk.New("ResourceDeletionInProgress", optr.currentVersion))
if source, err := adminack.New("AdminAck", optr.currentVersion, cmConfigManagedInformer, cmConfigInformer, riskSourceCallback); err != nil {
return optr, err
} else {
risks = append(risks, source)
}
if source, err := upgradeablerisk.New("ClusterOperatorUpgradeable", optr.currentVersion, coInformer, riskSourceCallback); err != nil {
return optr, err
} else {
risks = append(risks, source)
}

optr.upgradeable = aggregate.New(risks...)

optr.risks = aggregate.New(
alert.New("Alert", promqlTarget),
optr.upgradeable,
)

optr.configuration = configuration.NewClusterVersionOperatorConfiguration(operatorClient, operatorInformerFactory)

Expand Down Expand Up @@ -468,7 +489,6 @@ func loadConfigMapVerifierDataFromUpdate(update *payload.Update, clientBuilder s
func (optr *Operator) Run(runContext context.Context, shutdownContext context.Context) error {
defer optr.queue.ShutDown()
defer optr.availableUpdatesQueue.ShutDown()
defer optr.upgradeableQueue.ShutDown()
defer optr.configuration.Queue().ShutDown()
stopCh := runContext.Done()

Expand Down Expand Up @@ -525,25 +545,12 @@ func (optr *Operator) Run(runContext context.Context, shutdownContext context.Co
klog.Infof("The ClusterVersionOperatorConfiguration feature gate is disabled or HyperShift is detected; the configuration sync routine will not run.")
}

resultChannelCount++
go func() {
defer utilruntime.HandleCrash()
wait.UntilWithContext(runContext, func(runContext context.Context) {
optr.worker(runContext, optr.upgradeableQueue, optr.upgradeableSyncFunc(false))
}, time.Second)
resultChannel <- asyncResult{name: "upgradeable"}
}()

resultChannelCount++
go func() {
defer utilruntime.HandleCrash()
wait.UntilWithContext(runContext, func(runContext context.Context) {
// run the worker, then when the queue is closed sync one final time to flush any pending status
optr.worker(runContext, optr.queue, func(runContext context.Context, key string) error { return optr.sync(runContext, key) })
// This is to ensure upgradeableCondition to be synced and thus to avoid the race caused by the throttle
if err := optr.upgradeableSyncFunc(true)(shutdownContext, optr.queueKey()); err != nil {
utilruntime.HandleError(fmt.Errorf("unable to perform final upgradeable sync: %v", err))
}
if err := optr.sync(shutdownContext, optr.queueKey()); err != nil {
utilruntime.HandleError(fmt.Errorf("unable to perform final sync: %v", err))
}
Expand Down Expand Up @@ -593,7 +600,6 @@ func (optr *Operator) Run(runContext context.Context, shutdownContext context.Co
shutdown = true
optr.queue.ShutDown()
optr.availableUpdatesQueue.ShutDown()
optr.upgradeableQueue.ShutDown()
optr.configuration.Queue().ShutDown()
}
}
Expand All @@ -613,12 +619,10 @@ func (optr *Operator) clusterVersionEventHandler() cache.ResourceEventHandler {
AddFunc: func(_ interface{}) {
optr.queue.Add(workQueueKey)
optr.availableUpdatesQueue.Add(workQueueKey)
optr.upgradeableQueue.Add(workQueueKey)
},
UpdateFunc: func(_, _ interface{}) {
optr.queue.Add(workQueueKey)
optr.availableUpdatesQueue.Add(workQueueKey)
optr.upgradeableQueue.Add(workQueueKey)
},
DeleteFunc: func(_ interface{}) {
optr.queue.Add(workQueueKey)
Expand Down Expand Up @@ -829,31 +833,6 @@ func (optr *Operator) availableUpdatesSync(ctx context.Context, key string) erro
return optr.syncAvailableUpdates(ctx, config)
}

// upgradeableSyncFunc returns a function that is triggered on cluster version change (and periodic requeues) to
// sync upgradeableCondition. It only modifies cluster version.
func (optr *Operator) upgradeableSyncFunc(ignoreThrottlePeriod bool) func(_ context.Context, key string) error {
return func(_ context.Context, key string) error {
startTime := time.Now()
klog.V(2).Infof("Started syncing upgradeable %q", key)
defer func() {
klog.V(2).Infof("Finished syncing upgradeable %q (%v)", key, time.Since(startTime))
}()

config, err := optr.cvLister.Get(optr.name)
if apierrors.IsNotFound(err) {
return nil
}
if err != nil {
return err
}
if errs := validation.ValidateClusterVersion(config, optr.shouldReconcileAcceptRisks()); len(errs) > 0 {
return nil
}

return optr.syncUpgradeable(config, ignoreThrottlePeriod)
}
}

// isOlderThanLastUpdate returns true if the cluster version is older than
// the last update we saw.
func (optr *Operator) isOlderThanLastUpdate(config *configv1.ClusterVersion) bool {
Expand Down
5 changes: 5 additions & 0 deletions pkg/cvo/cvo_scenarios_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import (
"github.com/openshift/client-go/config/clientset/versioned/fake"
"github.com/openshift/library-go/pkg/manifest"

"github.com/openshift/cluster-version-operator/pkg/clusterconditions"
"github.com/openshift/cluster-version-operator/pkg/clusterconditions/always"
"github.com/openshift/cluster-version-operator/pkg/featuregates"
"github.com/openshift/cluster-version-operator/pkg/internal"
"github.com/openshift/cluster-version-operator/pkg/payload"
Expand Down Expand Up @@ -114,13 +116,16 @@ func setupCVOTest(payloadDir string) (*Operator, map[string]apiruntime.Object, *
fmt.Printf("Cannot create cluster version object, err: %#v\n", err)
}

registry := clusterconditions.NewConditionRegistry()
registry.Register("Always", &always.Always{})
o := &Operator{
namespace: "test",
name: "version",
queue: workqueue.NewTypedRateLimitingQueueWithConfig[any](workqueue.DefaultTypedControllerRateLimiter[any](), workqueue.TypedRateLimitingQueueConfig[any]{Name: "cvo-loop-test"}),
client: client,
enabledCVOFeatureGates: featuregates.DefaultCvoGates("version"),
cvLister: &clientCVLister{client: client},
conditionRegistry: registry,
exclude: "exclude-test",
eventRecorder: record.NewFakeRecorder(100),
clusterProfile: payload.DefaultClusterProfile,
Expand Down
Loading