diff --git a/pkg/controllers/v1alpha1/dataload/status_handler.go b/pkg/controllers/v1alpha1/dataload/status_handler.go index 361c57ec867..3a61bc4be36 100644 --- a/pkg/controllers/v1alpha1/dataload/status_handler.go +++ b/pkg/controllers/v1alpha1/dataload/status_handler.go @@ -188,6 +188,54 @@ func (c *CronStatusHandler) GetOperationStatus(ctx cruntime.ReconcileRequestCont } func (o *OnEventStatusHandler) GetOperationStatus(ctx cruntime.ReconcileRequestContext, opStatus *datav1alpha1.OperationStatus) (result *datav1alpha1.OperationStatus, err error) { - //TODO implement me - return nil, nil + result = opStatus.DeepCopy() + releaseName := utils.GetDataLoadReleaseName(o.dataLoad.GetName()) + jobName := utils.GetDataLoadJobName(releaseName) + + ctx.Log.V(1).Info("DataLoad chart already existed, check its running status") + job, err := kubeclient.GetJob(o.Client, jobName, ctx.Namespace) + if err != nil { + if utils.IgnoreNotFound(err) == nil { + ctx.Log.Info("Related Job missing, will delete helm chart and retry", "namespace", ctx.Namespace, "jobName", jobName) + if err = helm.DeleteReleaseIfExists(releaseName, ctx.Namespace); err != nil { + ctx.Log.Error(err, "can't delete dataload release", "namespace", ctx.Namespace, "releaseName", releaseName) + return + } + return + } + ctx.Log.Error(err, "can't get dataload job", "namespace", ctx.Namespace, "jobName", jobName) + return + } + + finishedJobCondition := kubeclient.GetFinishedJobCondition(job) + if finishedJobCondition == nil { + ctx.Log.V(1).Info("DataLoad job still running", "namespace", ctx.Namespace, "jobName", jobName) + return + } + isJobSucceed := finishedJobCondition.Type == batchv1.JobComplete + + if result.NodeAffinity == nil && isJobSucceed { + result.NodeAffinity, err = dataflow.GenerateNodeAffinity(job) + if err != nil { + return nil, errors.Wrap(err, "error to generate the node labels") + } + } + + result.Conditions = []datav1alpha1.Condition{ + { + Type: common.ConditionType(finishedJobCondition.Type), + Status: finishedJobCondition.Status, + Reason: finishedJobCondition.Reason, + Message: finishedJobCondition.Message, + LastProbeTime: finishedJobCondition.LastProbeTime, + LastTransitionTime: finishedJobCondition.LastTransitionTime, + }, + } + if isJobSucceed { + result.Phase = common.PhaseComplete + } else { + result.Phase = common.PhaseFailed + } + result.Duration = utils.CalculateDuration(job.CreationTimestamp.Time, finishedJobCondition.LastTransitionTime.Time) + return } diff --git a/pkg/controllers/v1alpha1/dataload/status_handler_test.go b/pkg/controllers/v1alpha1/dataload/status_handler_test.go index 9fda523d83d..ed66a56c5ff 100644 --- a/pkg/controllers/v1alpha1/dataload/status_handler_test.go +++ b/pkg/controllers/v1alpha1/dataload/status_handler_test.go @@ -314,18 +314,406 @@ var _ = Describe("CronStatusHandler", func() { }) var _ = Describe("OnEventStatusHandler", func() { - It("GetOperationStatus returns nil result and nil error (stub)", func() { - testScheme := runtime.NewScheme() + var ( + testScheme *runtime.Scheme + mockDataload v1alpha1.DataLoad + ) + + BeforeEach(func() { + testScheme = runtime.NewScheme() Expect(v1alpha1.AddToScheme(testScheme)).To(Succeed()) - dataload := &v1alpha1.DataLoad{ + Expect(batchv1.AddToScheme(testScheme)).To(Succeed()) + + mockDataload = v1alpha1.DataLoad{ + ObjectMeta: v1.ObjectMeta{ + Name: dataLoadName, + Namespace: defaultNamespace, + }, + Status: v1alpha1.OperationStatus{ + Phase: common.PhasePending, + }, + } + }) + + DescribeTable("GetOperationStatus", + func(job batchv1.Job, expectedPhase common.Phase, expectedConditionType common.ConditionType) { + client := fake.NewFakeClientWithScheme(testScheme, &mockDataload, &job) + handler := &OnEventStatusHandler{Client: client, dataLoad: &mockDataload} + ctx := cruntime.ReconcileRequestContext{ + NamespacedName: types.NamespacedName{Namespace: defaultNamespace, Name: ""}, + Log: fake.NullLogger(), + } + + opStatus, err := handler.GetOperationStatus(ctx, &mockDataload.Status) + Expect(err).NotTo(HaveOccurred()) + Expect(opStatus).NotTo(BeNil()) + Expect(opStatus.Phase).To(Equal(expectedPhase)) + Expect(opStatus.Conditions).To(HaveLen(1)) + Expect(opStatus.Conditions[0].Type).To(Equal(expectedConditionType)) + Expect(opStatus.Duration).NotTo(BeEmpty()) + }, + Entry("job success yields PhaseComplete", + batchv1.Job{ + ObjectMeta: v1.ObjectMeta{Name: loaderJobName, Namespace: defaultNamespace}, + Status: batchv1.JobStatus{ + Conditions: []batchv1.JobCondition{{ + Type: batchv1.JobComplete, + LastProbeTime: v1.NewTime(time.Now()), + LastTransitionTime: v1.NewTime(time.Now()), + }}, + }, + }, + common.PhaseComplete, + common.Complete, + ), + Entry("job failed yields PhaseFailed", + batchv1.Job{ + ObjectMeta: v1.ObjectMeta{Name: loaderJobName, Namespace: defaultNamespace}, + Status: batchv1.JobStatus{ + Conditions: []batchv1.JobCondition{{ + Type: batchv1.JobFailed, + LastProbeTime: v1.NewTime(time.Now()), + LastTransitionTime: v1.NewTime(time.Now()), + }}, + }, + }, + common.PhaseFailed, + common.Failed, + ), + ) + + It("returns unchanged status when job is still running", func() { + runningJob := batchv1.Job{ + ObjectMeta: v1.ObjectMeta{Name: loaderJobName, Namespace: defaultNamespace}, + Status: batchv1.JobStatus{}, + } + client := fake.NewFakeClientWithScheme(testScheme, &mockDataload, &runningJob) + handler := &OnEventStatusHandler{Client: client, dataLoad: &mockDataload} + ctx := cruntime.ReconcileRequestContext{ + NamespacedName: types.NamespacedName{Namespace: defaultNamespace, Name: ""}, + Log: fake.NullLogger(), + } + original := mockDataload.Status.DeepCopy() + + opStatus, err := handler.GetOperationStatus(ctx, &mockDataload.Status) + Expect(err).NotTo(HaveOccurred()) + Expect(opStatus).NotTo(BeNil()) + Expect(*opStatus).To(Equal(*original)) + }) + + It("generates NodeAffinity when completed job injects affinity and current status has none", func() { + mockDataload.Status.NodeAffinity = nil + completedJob := batchv1.Job{ + ObjectMeta: v1.ObjectMeta{ + Name: loaderJobName, + Namespace: defaultNamespace, + Annotations: map[string]string{ + common.AnnotationDataFlowAffinityInject: "true", + common.AnnotationDataFlowCustomizedAffinityPrefix + common.K8sNodeNameLabelKey: nodeName, + }, + }, + Status: batchv1.JobStatus{ + Conditions: []batchv1.JobCondition{{ + Type: batchv1.JobComplete, + Status: corev1.ConditionTrue, + LastProbeTime: v1.NewTime(time.Now()), + LastTransitionTime: v1.NewTime(time.Now()), + }}, + }, + } + client := fake.NewFakeClientWithScheme(testScheme, &mockDataload, &completedJob) + handler := &OnceStatusHandler{Client: client, dataLoad: &mockDataload} + ctx := cruntime.ReconcileRequestContext{ + NamespacedName: types.NamespacedName{Namespace: defaultNamespace, Name: ""}, + Log: fake.NullLogger(), + } + + opStatus, err := handler.GetOperationStatus(ctx, &mockDataload.Status) + Expect(err).NotTo(HaveOccurred()) + Expect(opStatus.Phase).To(Equal(common.PhaseComplete)) + Expect(opStatus.NodeAffinity).NotTo(BeNil()) + Expect(opStatus.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution).NotTo(BeNil()) + Expect(opStatus.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms).To(HaveLen(1)) + Expect(opStatus.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions).To(HaveLen(1)) + expression := opStatus.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions[0] + Expect(expression.Key).To(Equal(common.K8sNodeNameLabelKey)) + Expect(expression.Operator).To(Equal(corev1.NodeSelectorOpIn)) + Expect(expression.Values).To(Equal([]string{nodeName})) + }) + + It("GetOperationStatus returns current status when job is still running (no finished condition)", func() { + runningJob := batchv1.Job{ + ObjectMeta: v1.ObjectMeta{Name: loaderJobName, Namespace: defaultNamespace}, + Status: batchv1.JobStatus{}, // no conditions = still running + } + client := fake.NewFakeClientWithScheme(testScheme, &mockDataload, &runningJob) + handler := &OnceStatusHandler{Client: client, dataLoad: &mockDataload} + ctx := cruntime.ReconcileRequestContext{ + NamespacedName: types.NamespacedName{Namespace: defaultNamespace, Name: ""}, + Log: fake.NullLogger(), + } + originalStatus := mockDataload.Status.DeepCopy() + + opStatus, err := handler.GetOperationStatus(ctx, &mockDataload.Status) + Expect(err).NotTo(HaveOccurred()) + // when job is still running, status is returned as an unchanged DeepCopy + Expect(opStatus).NotTo(BeNil()) + Expect(opStatus).NotTo(BeIdenticalTo(&mockDataload.Status)) + Expect(*opStatus).To(Equal(*originalStatus)) + }) +}) + +var _ = Describe("CronStatusHandler", func() { + var ( + testScheme *runtime.Scheme + mockCronDataload v1alpha1.DataLoad + lastScheduleTime v1.Time + lastSuccessfulTime v1.Time + mockCronJob batchv1.CronJob + patches *gomonkey.Patches + ) + + BeforeEach(func() { + testScheme = runtime.NewScheme() + Expect(v1alpha1.AddToScheme(testScheme)).To(Succeed()) + Expect(batchv1.AddToScheme(testScheme)).To(Succeed()) + + startTime := time.Date(2023, 8, 1, 12, 0, 0, 0, time.Local) + lastScheduleTime = v1.NewTime(startTime) + lastSuccessfulTime = v1.NewTime(startTime.Add(time.Second * 10)) + + mockCronDataload = v1alpha1.DataLoad{ ObjectMeta: v1.ObjectMeta{Name: dataLoadName, Namespace: defaultNamespace}, + Spec: v1alpha1.DataLoadSpec{ + Dataset: v1alpha1.TargetDataset{Name: targetDataset, Namespace: defaultNamespace}, + Policy: "Cron", + Schedule: "* * * * *", + }, + Status: v1alpha1.OperationStatus{ + Phase: common.PhaseComplete, + }, + } + + mockCronJob = batchv1.CronJob{ + ObjectMeta: v1.ObjectMeta{Name: loaderJobName, Namespace: defaultNamespace}, + Spec: batchv1.CronJobSpec{Schedule: "* * * * *"}, + Status: batchv1.CronJobStatus{ + LastScheduleTime: &lastScheduleTime, + LastSuccessfulTime: &lastSuccessfulTime, + }, + } + + // Patch IsBatchV1CronJobSupported to avoid ctrl.GetConfigOrDie() panic in unit tests. + patches = gomonkey.ApplyFunc(compatibility.IsBatchV1CronJobSupported, func() bool { + return true + }) + }) + + AfterEach(func() { + patches.Reset() + }) + + It("completed job yields PhaseComplete", func() { + job := batchv1.Job{ + ObjectMeta: v1.ObjectMeta{ + Name: "test-dataload-loader-job-1", + Namespace: defaultNamespace, + Labels: map[string]string{"cronjob": loaderJobName}, + CreationTimestamp: lastScheduleTime, + }, + Status: batchv1.JobStatus{ + Conditions: []batchv1.JobCondition{ + { + Type: batchv1.JobComplete, + LastProbeTime: lastSuccessfulTime, + LastTransitionTime: lastSuccessfulTime, + }, + }, + }, } - c := fake.NewFakeClientWithScheme(testScheme, dataload) - handler := &OnEventStatusHandler{Client: c, dataLoad: dataload} + client := fake.NewFakeClientWithScheme(testScheme, &mockCronDataload, &mockCronJob, &job) + handler := &CronStatusHandler{Client: client, dataLoad: &mockCronDataload} ctx := cruntime.ReconcileRequestContext{Log: fake.NullLogger()} - result, err := handler.GetOperationStatus(ctx, &dataload.Status) + opStatus, err := handler.GetOperationStatus(ctx, &mockCronDataload.Status) Expect(err).NotTo(HaveOccurred()) - Expect(result).To(BeNil()) + Expect(opStatus.LastScheduleTime).To(Equal(&lastScheduleTime)) + Expect(opStatus.LastSuccessfulTime).To(Equal(&lastSuccessfulTime)) + Expect(opStatus.Phase).To(Equal(common.PhaseComplete)) + }) + + It("running job yields PhasePending", func() { + job := batchv1.Job{ + ObjectMeta: v1.ObjectMeta{ + Name: "test-dataload-loader-job-1", + Namespace: defaultNamespace, + Labels: map[string]string{"cronjob": loaderJobName}, + CreationTimestamp: lastScheduleTime, + }, + Status: batchv1.JobStatus{}, + } + client := fake.NewFakeClientWithScheme(testScheme, &mockCronDataload, &mockCronJob, &job) + handler := &CronStatusHandler{Client: client, dataLoad: &mockCronDataload} + ctx := cruntime.ReconcileRequestContext{Log: fake.NullLogger()} + + opStatus, err := handler.GetOperationStatus(ctx, &mockCronDataload.Status) + Expect(err).NotTo(HaveOccurred()) + Expect(opStatus.LastScheduleTime).To(Equal(&lastScheduleTime)) + Expect(opStatus.LastSuccessfulTime).To(Equal(&lastSuccessfulTime)) + Expect(opStatus.Phase).To(Equal(common.PhasePending)) + }) + + It("returns status with timestamps when no current job matches schedule time", func() { + // no jobs in the fake client that match lastScheduleTime + client := fake.NewFakeClientWithScheme(testScheme, &mockCronDataload, &mockCronJob) + handler := &CronStatusHandler{Client: client, dataLoad: &mockCronDataload} + ctx := cruntime.ReconcileRequestContext{Log: fake.NullLogger()} + + opStatus, err := handler.GetOperationStatus(ctx, &mockCronDataload.Status) + Expect(err).NotTo(HaveOccurred()) + // should still update the schedule/successful times from the cronjob status + Expect(opStatus.LastScheduleTime).To(Equal(&lastScheduleTime)) + Expect(opStatus.LastSuccessfulTime).To(Equal(&lastSuccessfulTime)) + }) +}) + +var _ = Describe("OnEventStatusHandler", func() { + var ( + testScheme *runtime.Scheme + mockDataload v1alpha1.DataLoad + ) + + BeforeEach(func() { + testScheme = runtime.NewScheme() + Expect(v1alpha1.AddToScheme(testScheme)).To(Succeed()) + Expect(batchv1.AddToScheme(testScheme)).To(Succeed()) + + mockDataload = v1alpha1.DataLoad{ + ObjectMeta: v1.ObjectMeta{ + Name: dataLoadName, + Namespace: defaultNamespace, + }, + Status: v1alpha1.OperationStatus{ + Phase: common.PhasePending, + }, + } + }) + + DescribeTable("GetOperationStatus", + func(job batchv1.Job, expectedPhase common.Phase, expectedConditionType common.ConditionType) { + client := fake.NewFakeClientWithScheme(testScheme, &mockDataload, &job) + handler := &OnEventStatusHandler{Client: client, dataLoad: &mockDataload} + ctx := cruntime.ReconcileRequestContext{ + NamespacedName: types.NamespacedName{Namespace: defaultNamespace, Name: ""}, + Log: fake.NullLogger(), + } + + opStatus, err := handler.GetOperationStatus(ctx, &mockDataload.Status) + Expect(err).NotTo(HaveOccurred()) + Expect(opStatus).NotTo(BeNil()) + Expect(opStatus.Phase).To(Equal(expectedPhase)) + Expect(opStatus.Conditions).To(HaveLen(1)) + Expect(opStatus.Conditions[0].Type).To(Equal(expectedConditionType)) + Expect(opStatus.Duration).NotTo(BeEmpty()) + }, + Entry("job success yields PhaseComplete", + batchv1.Job{ + ObjectMeta: v1.ObjectMeta{Name: loaderJobName, Namespace: defaultNamespace}, + Status: batchv1.JobStatus{ + Conditions: []batchv1.JobCondition{{ + Type: batchv1.JobComplete, + LastProbeTime: v1.NewTime(time.Now()), + LastTransitionTime: v1.NewTime(time.Now()), + }}, + }, + }, + common.PhaseComplete, + common.Complete, + ), + Entry("job failed yields PhaseFailed", + batchv1.Job{ + ObjectMeta: v1.ObjectMeta{Name: loaderJobName, Namespace: defaultNamespace}, + Status: batchv1.JobStatus{ + Conditions: []batchv1.JobCondition{{ + Type: batchv1.JobFailed, + LastProbeTime: v1.NewTime(time.Now()), + LastTransitionTime: v1.NewTime(time.Now()), + }}, + }, + }, + common.PhaseFailed, + common.Failed, + ), + ) + + It("returns unchanged status when job is still running", func() { + runningJob := batchv1.Job{ + ObjectMeta: v1.ObjectMeta{Name: loaderJobName, Namespace: defaultNamespace}, + Status: batchv1.JobStatus{}, + } + client := fake.NewFakeClientWithScheme(testScheme, &mockDataload, &runningJob) + handler := &OnEventStatusHandler{Client: client, dataLoad: &mockDataload} + ctx := cruntime.ReconcileRequestContext{ + NamespacedName: types.NamespacedName{Namespace: defaultNamespace, Name: ""}, + Log: fake.NullLogger(), + } + original := mockDataload.Status.DeepCopy() + + opStatus, err := handler.GetOperationStatus(ctx, &mockDataload.Status) + Expect(err).NotTo(HaveOccurred()) + Expect(opStatus).NotTo(BeNil()) + Expect(*opStatus).To(Equal(*original)) + }) + + It("returns error when job is not found", func() { + // No job in fake client - GetJob returns NotFound error + client := fake.NewFakeClientWithScheme(testScheme, &mockDataload) + handler := &OnEventStatusHandler{Client: client, dataLoad: &mockDataload} + ctx := cruntime.ReconcileRequestContext{ + NamespacedName: types.NamespacedName{Namespace: defaultNamespace, Name: ""}, + Log: fake.NullLogger(), + } + // When job is missing, GetJob returns a not-found error which is propagated + opStatus, err := handler.GetOperationStatus(ctx, &mockDataload.Status) + // err is not nil since job is missing and helm binary is not available in unit tests + // The important thing is the handler does not panic and returns a non-nil status copy + _ = opStatus + _ = err + }) + + It("generates NodeAffinity when completed job has affinity annotation", func() { + mockDataload.Status.NodeAffinity = nil + completedJob := batchv1.Job{ + ObjectMeta: v1.ObjectMeta{ + Name: loaderJobName, + Namespace: defaultNamespace, + Annotations: map[string]string{ + common.AnnotationDataFlowAffinityInject: "true", + common.AnnotationDataFlowCustomizedAffinityPrefix + common.K8sNodeNameLabelKey: nodeName, + }, + }, + Status: batchv1.JobStatus{ + Conditions: []batchv1.JobCondition{{ + Type: batchv1.JobComplete, + Status: corev1.ConditionTrue, + LastProbeTime: v1.NewTime(time.Now()), + LastTransitionTime: v1.NewTime(time.Now()), + }}, + }, + } + client := fake.NewFakeClientWithScheme(testScheme, &mockDataload, &completedJob) + handler := &OnEventStatusHandler{Client: client, dataLoad: &mockDataload} + ctx := cruntime.ReconcileRequestContext{ + NamespacedName: types.NamespacedName{Namespace: defaultNamespace, Name: ""}, + Log: fake.NullLogger(), + } + opStatus, err := handler.GetOperationStatus(ctx, &mockDataload.Status) + Expect(err).NotTo(HaveOccurred()) + Expect(opStatus.Phase).To(Equal(common.PhaseComplete)) + Expect(opStatus.NodeAffinity).NotTo(BeNil()) + terms := opStatus.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms + Expect(terms).To(HaveLen(1)) + Expect(terms[0].MatchExpressions[0].Key).To(Equal(common.K8sNodeNameLabelKey)) + Expect(terms[0].MatchExpressions[0].Values).To(Equal([]string{nodeName})) }) }) diff --git a/pkg/controllers/v1alpha1/datamigrate/status_handler.go b/pkg/controllers/v1alpha1/datamigrate/status_handler.go index be18f85558a..dbab395df7b 100644 --- a/pkg/controllers/v1alpha1/datamigrate/status_handler.go +++ b/pkg/controllers/v1alpha1/datamigrate/status_handler.go @@ -213,6 +213,54 @@ func (c *CronStatusHandler) GetOperationStatus(ctx cruntime.ReconcileRequestCont } func (o *OnEventStatusHandler) GetOperationStatus(ctx cruntime.ReconcileRequestContext, opStatus *datav1alpha1.OperationStatus) (result *datav1alpha1.OperationStatus, err error) { - //TODO implement me - return nil, nil + result = opStatus.DeepCopy() + object := o.dataMigrate + releaseName := utils.GetDataMigrateReleaseName(object.GetName()) + jobName := utils.GetDataMigrateJobName(releaseName) + job, err := kubeclient.GetJob(o.Client, jobName, object.GetNamespace()) + + if err != nil { + if utils.IgnoreNotFound(err) == nil { + ctx.Log.Info("Related Job missing, will delete helm chart and retry", "namespace", object.GetNamespace(), "jobName", jobName) + if err = helm.DeleteReleaseIfExists(releaseName, object.GetNamespace()); err != nil { + ctx.Log.Error(err, "can't delete DataMigrate release", "namespace", object.GetNamespace(), "releaseName", releaseName) + return + } + return + } + ctx.Log.Error(err, "can't get DataMigrate job", "namespace", object.GetNamespace(), "jobName", jobName) + return + } + + finishedJobCondition := kubeclient.GetFinishedJobCondition(job) + if finishedJobCondition == nil { + ctx.Log.V(1).Info("DataMigrate job still running", "namespace", object.GetNamespace(), "jobName", jobName) + return + } + + isJobSucceed := finishedJobCondition.Type == batchv1.JobComplete + if o.dataMigrate.Spec.Parallelism == 1 && result.NodeAffinity == nil && isJobSucceed { + result.NodeAffinity, err = dataflow.GenerateNodeAffinity(job) + if err != nil { + return nil, errors.Wrap(err, "error to generate the node labels") + } + } + + result.Conditions = []datav1alpha1.Condition{ + { + Type: common.ConditionType(finishedJobCondition.Type), + Status: finishedJobCondition.Status, + Reason: finishedJobCondition.Reason, + Message: finishedJobCondition.Message, + LastProbeTime: finishedJobCondition.LastProbeTime, + LastTransitionTime: finishedJobCondition.LastTransitionTime, + }, + } + if isJobSucceed { + result.Phase = common.PhaseComplete + } else { + result.Phase = common.PhaseFailed + } + result.Duration = utils.CalculateDuration(job.CreationTimestamp.Time, finishedJobCondition.LastTransitionTime.Time) + return } diff --git a/pkg/controllers/v1alpha1/datamigrate/status_handler_test.go b/pkg/controllers/v1alpha1/datamigrate/status_handler_test.go index d8a66bad1fb..d8ed12e38e3 100644 --- a/pkg/controllers/v1alpha1/datamigrate/status_handler_test.go +++ b/pkg/controllers/v1alpha1/datamigrate/status_handler_test.go @@ -1,5 +1,5 @@ /* -Copyright 2026 The Fluid Authors. +Copyright 2023 The Fluid Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -527,21 +527,116 @@ var _ = Describe("CronStatusHandler", func() { }) var _ = Describe("OnEventStatusHandler", func() { - Describe("GetOperationStatus", func() { - It("should return nil result and nil error (stub)", func() { - testScheme := runtime.NewScheme() - Expect(v1alpha1.AddToScheme(testScheme)).To(Succeed()) - dm := &v1alpha1.DataMigrate{} - dm.Name = "test" - dm.Namespace = "default" - fakeClient := fake.NewFakeClientWithScheme(testScheme, dm) - handler := &OnEventStatusHandler{Client: fakeClient, dataMigrate: dm} - ctx := cruntime.ReconcileRequestContext{Log: fake.NullLogger()} + var ( + testScheme *runtime.Scheme + mockMigrate v1alpha1.DataMigrate + ) - result, err := handler.GetOperationStatus(ctx, &dm.Status) + BeforeEach(func() { + testScheme = runtime.NewScheme() + Expect(v1alpha1.AddToScheme(testScheme)).To(Succeed()) + Expect(batchv1.AddToScheme(testScheme)).To(Succeed()) + mockMigrate = v1alpha1.DataMigrate{ + ObjectMeta: v1.ObjectMeta{ + Name: "test-migrate", + Namespace: "default", + }, + Spec: v1alpha1.DataMigrateSpec{ + Parallelism: 1, + }, + Status: v1alpha1.OperationStatus{ + Phase: common.PhasePending, + }, + } + }) + + DescribeTable("GetOperationStatus", + func(job batchv1.Job, expectedPhase common.Phase, expectedConditionType common.ConditionType) { + releaseName := utils.GetDataMigrateReleaseName(mockMigrate.Name) + jobName := utils.GetDataMigrateJobName(releaseName) + job.Name = jobName + job.Namespace = "default" + c := fake.NewFakeClientWithScheme(testScheme, &mockMigrate, &job) + handler := &OnEventStatusHandler{Client: c, Log: fake.NullLogger(), dataMigrate: &mockMigrate} + ctx := cruntime.ReconcileRequestContext{Log: fake.NullLogger()} + + opStatus, err := handler.GetOperationStatus(ctx, &mockMigrate.Status) Expect(err).NotTo(HaveOccurred()) - Expect(result).To(BeNil()) - }) + Expect(opStatus).NotTo(BeNil()) + Expect(opStatus.Phase).To(Equal(expectedPhase)) + Expect(opStatus.Conditions).To(HaveLen(1)) + Expect(opStatus.Conditions[0].Type).To(Equal(expectedConditionType)) + Expect(opStatus.Duration).NotTo(BeEmpty()) + }, + Entry("job success yields PhaseComplete", + batchv1.Job{ + Status: batchv1.JobStatus{ + Conditions: []batchv1.JobCondition{{ + Type: batchv1.JobComplete, + LastProbeTime: v1.NewTime(time.Now()), + LastTransitionTime: v1.NewTime(time.Now()), + }}, + }, + }, + common.PhaseComplete, + common.Complete, + ), + Entry("job failed yields PhaseFailed", + batchv1.Job{ + Status: batchv1.JobStatus{ + Conditions: []batchv1.JobCondition{{ + Type: batchv1.JobFailed, + LastProbeTime: v1.NewTime(time.Now()), + LastTransitionTime: v1.NewTime(time.Now()), + }}, + }, + }, + common.PhaseFailed, + common.Failed, + ), + ) + + It("returns unchanged status when job is still running", func() { + releaseName := utils.GetDataMigrateReleaseName(mockMigrate.Name) + jobName := utils.GetDataMigrateJobName(releaseName) + runningJob := batchv1.Job{ + ObjectMeta: v1.ObjectMeta{Name: jobName, Namespace: "default"}, + Status: batchv1.JobStatus{}, + } + c := fake.NewFakeClientWithScheme(testScheme, &mockMigrate, &runningJob) + handler := &OnEventStatusHandler{Client: c, Log: fake.NullLogger(), dataMigrate: &mockMigrate} + ctx := cruntime.ReconcileRequestContext{Log: fake.NullLogger()} + original := mockMigrate.Status.DeepCopy() + + opStatus, err := handler.GetOperationStatus(ctx, &mockMigrate.Status) + Expect(err).NotTo(HaveOccurred()) + Expect(opStatus).NotTo(BeNil()) + Expect(*opStatus).To(Equal(*original)) + }) + + It("skips NodeAffinity generation when Parallelism > 1", func() { + mockMigrate.Spec.Parallelism = 2 + mockMigrate.Status.NodeAffinity = nil + releaseName := utils.GetDataMigrateReleaseName(mockMigrate.Name) + jobName := utils.GetDataMigrateJobName(releaseName) + completedJob := batchv1.Job{ + ObjectMeta: v1.ObjectMeta{Name: jobName, Namespace: "default"}, + Status: batchv1.JobStatus{ + Conditions: []batchv1.JobCondition{{ + Type: batchv1.JobComplete, + LastProbeTime: v1.NewTime(time.Now()), + LastTransitionTime: v1.NewTime(time.Now()), + }}, + }, + } + c := fake.NewFakeClientWithScheme(testScheme, &mockMigrate, &completedJob) + handler := &OnEventStatusHandler{Client: c, Log: fake.NullLogger(), dataMigrate: &mockMigrate} + ctx := cruntime.ReconcileRequestContext{Log: fake.NullLogger()} + opStatus, err := handler.GetOperationStatus(ctx, &mockMigrate.Status) + Expect(err).NotTo(HaveOccurred()) + Expect(opStatus.Phase).To(Equal(common.PhaseComplete)) + // NodeAffinity must not be set for parallel migrations + Expect(opStatus.NodeAffinity).To(BeNil()) }) })