-
Notifications
You must be signed in to change notification settings - Fork 310
WIP: coordinate parallel import steps in shared namespace with lock/done ConfigMaps for aggregated runs #5050
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -2,6 +2,8 @@ package release | |||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| import ( | ||||||||||||||||||||||||||||||
| "context" | ||||||||||||||||||||||||||||||
| "crypto/sha256" | ||||||||||||||||||||||||||||||
| "encoding/hex" | ||||||||||||||||||||||||||||||
| "encoding/json" | ||||||||||||||||||||||||||||||
| "errors" | ||||||||||||||||||||||||||||||
| "fmt" | ||||||||||||||||||||||||||||||
|
|
@@ -30,6 +32,16 @@ import ( | |||||||||||||||||||||||||||||
| "github.com/openshift/ci-tools/pkg/util" | ||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| const ( | ||||||||||||||||||||||||||||||
| releaseImportLockPrefix = "ci-operator-release-import-lock-" | ||||||||||||||||||||||||||||||
| releaseImportDonePrefix = "ci-operator-release-import-done-" | ||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| func releaseImportResourceSuffix(pullSpec string) string { | ||||||||||||||||||||||||||||||
| sum := sha256.Sum256([]byte(pullSpec)) | ||||||||||||||||||||||||||||||
| return hex.EncodeToString(sum[:])[:12] | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| // importReleaseStep is responsible for importing release images from | ||||||||||||||||||||||||||||||
| // external image streams for use with tests that need to install or | ||||||||||||||||||||||||||||||
| // upgrade a cluster. It uses the `cli` image within the image stream to | ||||||||||||||||||||||||||||||
|
|
@@ -106,13 +118,86 @@ func (s *importReleaseStep) run(ctx context.Context) error { | |||||||||||||||||||||||||||||
| return fmt.Errorf("could not create stable imagestream: %w", err) | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| // tag the release image in and let it import | ||||||||||||||||||||||||||||||
| pullSpec, err := s.source.PullSpec(ctx) | ||||||||||||||||||||||||||||||
| if err != nil { | ||||||||||||||||||||||||||||||
| return err | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
| logrus.WithField("name", s.name).Debugf("setting originalPullSpec to: %s for multi-stage steps to reference", pullSpec) | ||||||||||||||||||||||||||||||
| s.originalPullSpec = pullSpec | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| ns := s.jobSpec.Namespace() | ||||||||||||||||||||||||||||||
| suffix := releaseImportResourceSuffix(pullSpec) | ||||||||||||||||||||||||||||||
| lockName := releaseImportLockPrefix + s.name + "-" + suffix | ||||||||||||||||||||||||||||||
| doneName := releaseImportDonePrefix + s.name + "-" + suffix | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| var existingDone coreapi.ConfigMap | ||||||||||||||||||||||||||||||
| if err := s.client.Get(ctx, ctrlruntimeclient.ObjectKey{Namespace: ns, Name: doneName}, &existingDone); err == nil { | ||||||||||||||||||||||||||||||
| if existingDone.Data["pullspec"] == pullSpec { | ||||||||||||||||||||||||||||||
| logrus.Infof("release %s import already completed in this namespace (shared aggregate run), skipping duplicate import", s.name) | ||||||||||||||||||||||||||||||
| return nil | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
| } else if !kerrors.IsNotFound(err) { | ||||||||||||||||||||||||||||||
| return fmt.Errorf("get release import completion marker: %w", err) | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| lockCM := &coreapi.ConfigMap{ | ||||||||||||||||||||||||||||||
| ObjectMeta: metav1.ObjectMeta{ | ||||||||||||||||||||||||||||||
| Namespace: ns, | ||||||||||||||||||||||||||||||
| Name: lockName, | ||||||||||||||||||||||||||||||
| Labels: map[string]string{"ci.openshift.io/release-import-coordination": "true"}, | ||||||||||||||||||||||||||||||
| }, | ||||||||||||||||||||||||||||||
| Data: map[string]string{"holder": s.jobSpec.BuildID}, | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
| if err := s.client.Create(ctx, lockCM); err != nil { | ||||||||||||||||||||||||||||||
| if !kerrors.IsAlreadyExists(err) { | ||||||||||||||||||||||||||||||
| return fmt.Errorf("create release import lock: %w", err) | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
| logrus.Infof("waiting for another ci-operator to finish release %s import in this namespace", s.name) | ||||||||||||||||||||||||||||||
| if err := waitForReleaseImportDone(ctx, s.client, ns, doneName, pullSpec, utils.DefaultImageImportTimeout); err != nil { | ||||||||||||||||||||||||||||||
| return err | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
| return nil | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
| defer func() { | ||||||||||||||||||||||||||||||
| del := &coreapi.ConfigMap{ObjectMeta: metav1.ObjectMeta{Namespace: ns, Name: lockName}} | ||||||||||||||||||||||||||||||
| if err := s.client.Delete(ctx, del); err != nil && !kerrors.IsNotFound(err) { | ||||||||||||||||||||||||||||||
| logrus.WithError(err).Warn("failed to delete release import lock configmap") | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
| }() | ||||||||||||||||||||||||||||||
|
Comment on lines
+161
to
+166
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use a fresh context for deferred lock cleanup. The deferred delete reuses Suggested fix defer func() {
+ cleanupCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
+ defer cancel()
del := &coreapi.ConfigMap{ObjectMeta: metav1.ObjectMeta{Namespace: ns, Name: lockName}}
- if err := s.client.Delete(ctx, del); err != nil && !kerrors.IsNotFound(err) {
+ if err := s.client.Delete(cleanupCtx, del); err != nil && !kerrors.IsNotFound(err) {
logrus.WithError(err).Warn("failed to delete release import lock configmap")
}
}()📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| if err := s.runImportBody(ctx, streamName, pullSpec, startTime); err != nil { | ||||||||||||||||||||||||||||||
| return err | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| doneCM := &coreapi.ConfigMap{ | ||||||||||||||||||||||||||||||
| ObjectMeta: metav1.ObjectMeta{Namespace: ns, Name: doneName}, | ||||||||||||||||||||||||||||||
| Data: map[string]string{"pullspec": pullSpec}, | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
| if err := s.client.Create(ctx, doneCM); err != nil && !kerrors.IsAlreadyExists(err) { | ||||||||||||||||||||||||||||||
| return fmt.Errorf("mark release import done: %w", err) | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
| return nil | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| func waitForReleaseImportDone(ctx context.Context, client ctrlruntimeclient.Client, ns, doneName, wantPullSpec string, timeout time.Duration) error { | ||||||||||||||||||||||||||||||
| err := wait.PollUntilContextTimeout(ctx, 2*time.Second, timeout, true, func(ctx context.Context) (bool, error) { | ||||||||||||||||||||||||||||||
| cm := &coreapi.ConfigMap{} | ||||||||||||||||||||||||||||||
| err := client.Get(ctx, ctrlruntimeclient.ObjectKey{Namespace: ns, Name: doneName}, cm) | ||||||||||||||||||||||||||||||
| if kerrors.IsNotFound(err) { | ||||||||||||||||||||||||||||||
| return false, nil | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
| if err != nil { | ||||||||||||||||||||||||||||||
| return false, err | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
| return cm.Data["pullspec"] == wantPullSpec, nil | ||||||||||||||||||||||||||||||
| }) | ||||||||||||||||||||||||||||||
| if err != nil { | ||||||||||||||||||||||||||||||
| return fmt.Errorf("timed out waiting for release import completion in namespace %s: %w", ns, err) | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
| return nil | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| func (s *importReleaseStep) runImportBody(ctx context.Context, streamName string, pullSpec string, startTime time.Time) error { | ||||||||||||||||||||||||||||||
| // retry importing the image a few times because we might race against establishing credentials/roles | ||||||||||||||||||||||||||||||
| // and be unable to import images on the same cluster | ||||||||||||||||||||||||||||||
| if newPullSpec, err := utils.ImportTagWithRetries(ctx, s.client, s.jobSpec.Namespace(), "release", s.name, pullSpec, api.ImageStreamImportRetries, s.client.MetricsAgent()); err != nil { | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Retry lock acquisition when the current importer exits early.
This path only waits for
doneName. If the lock holder returns any error before creating the done ConfigMap, every peer that already sawAlreadyExistswill sit there untilDefaultImageImportTimeouteven after the lock is gone. That turns one transient importer failure into a namespace-wide timeout. Re-check the lock while waiting and retryCreate()when it disappears instead of waiting solely ondoneName.Also applies to: 182-198
🤖 Prompt for AI Agents