From a9ede946befacddabf9028794fc83da0430e8481 Mon Sep 17 00:00:00 2001 From: Aditya Upasani Date: Tue, 2 Jun 2026 13:05:53 +0100 Subject: [PATCH 1/4] fix: implement OnEventStatusHandler.GetOperationStatus for DataLoad and DataMigrate OnEventStatusHandler.GetOperationStatus was a stub returning nil, nil in both dataload and datamigrate packages. Since OnEvent is a valid selectable policy, this caused silent status loss at runtime - the operation status was overwritten with nil with no error surfaced. Implement the handler following the same pattern as OnceStatusHandler: look up the triggered job, check its finished condition, and return the correct phase, conditions and duration. Also replace the stub tests with real coverage for complete, failed and still-running job scenarios. Fixes #5944 Signed-off-by: Aditya Upasani --- .../v1alpha1/dataload/status_handler.go | 51 +++++++++- .../v1alpha1/dataload/status_handler_test.go | 90 +++++++++++++++-- .../v1alpha1/datamigrate/status_handler.go | 52 +++++++++- .../datamigrate/status_handler_test.go | 96 ++++++++++++++++--- 4 files changed, 263 insertions(+), 26 deletions(-) diff --git a/pkg/controllers/v1alpha1/dataload/status_handler.go b/pkg/controllers/v1alpha1/dataload/status_handler.go index 361c57ec867..772386f4f3f 100644 --- a/pkg/controllers/v1alpha1/dataload/status_handler.go +++ b/pkg/controllers/v1alpha1/dataload/status_handler.go @@ -188,6 +188,53 @@ func (c *CronStatusHandler) GetOperationStatus(ctx cruntime.ReconcileRequestCont } func (o *OnEventStatusHandler) GetOperationStatus(ctx cruntime.ReconcileRequestContext, opStatus *datav1alpha1.OperationStatus) (result *datav1alpha1.OperationStatus, err error) { - //TODO implement me - return nil, nil + result = opStatus.DeepCopy() + releaseName := utils.GetDataLoadReleaseName(o.dataLoad.GetName()) + jobName := utils.GetDataLoadJobName(releaseName) + + ctx.Log.V(1).Info("DataLoad chart already existed, check its running status") + job, err := kubeclient.GetJob(o.Client, jobName, ctx.Namespace) + if err != nil { + if utils.IgnoreNotFound(err) == nil { + ctx.Log.Info("Related Job missing, will delete helm chart and retry", "namespace", ctx.Namespace, "jobName", jobName) + if err = helm.DeleteReleaseIfExists(releaseName, ctx.Namespace); err != nil { + ctx.Log.Error(err, "can't delete dataload release", "namespace", ctx.Namespace, "releaseName", releaseName) + return + } + } + ctx.Log.Error(err, "can't get dataload job", "namespace", ctx.Namespace, "jobName", jobName) + return + } + + finishedJobCondition := kubeclient.GetFinishedJobCondition(job) + if finishedJobCondition == nil { + ctx.Log.V(1).Info("DataLoad job still running", "namespace", ctx.Namespace, "jobName", jobName) + return + } + isJobSucceed := finishedJobCondition.Type == batchv1.JobComplete + + if result.NodeAffinity == nil && isJobSucceed { + result.NodeAffinity, err = dataflow.GenerateNodeAffinity(job) + if err != nil { + return nil, errors.Wrap(err, "error to generate the node labels") + } + } + + result.Conditions = []datav1alpha1.Condition{ + { + Type: common.ConditionType(finishedJobCondition.Type), + Status: finishedJobCondition.Status, + Reason: finishedJobCondition.Reason, + Message: finishedJobCondition.Message, + LastProbeTime: finishedJobCondition.LastProbeTime, + LastTransitionTime: finishedJobCondition.LastTransitionTime, + }, + } + if isJobSucceed { + result.Phase = common.PhaseComplete + } else { + result.Phase = common.PhaseFailed + } + result.Duration = utils.CalculateDuration(job.CreationTimestamp.Time, finishedJobCondition.LastTransitionTime.Time) + return } diff --git a/pkg/controllers/v1alpha1/dataload/status_handler_test.go b/pkg/controllers/v1alpha1/dataload/status_handler_test.go index 9fda523d83d..5d643bdf425 100644 --- a/pkg/controllers/v1alpha1/dataload/status_handler_test.go +++ b/pkg/controllers/v1alpha1/dataload/status_handler_test.go @@ -314,18 +314,90 @@ var _ = Describe("CronStatusHandler", func() { }) var _ = Describe("OnEventStatusHandler", func() { - It("GetOperationStatus returns nil result and nil error (stub)", func() { - testScheme := runtime.NewScheme() + var ( + testScheme *runtime.Scheme + mockDataload v1alpha1.DataLoad + ) + + BeforeEach(func() { + testScheme = runtime.NewScheme() Expect(v1alpha1.AddToScheme(testScheme)).To(Succeed()) - dataload := &v1alpha1.DataLoad{ - ObjectMeta: v1.ObjectMeta{Name: dataLoadName, Namespace: defaultNamespace}, + Expect(batchv1.AddToScheme(testScheme)).To(Succeed()) + + mockDataload = v1alpha1.DataLoad{ + ObjectMeta: v1.ObjectMeta{ + Name: dataLoadName, + Namespace: defaultNamespace, + }, + Status: v1alpha1.OperationStatus{ + Phase: common.PhasePending, + }, } - c := fake.NewFakeClientWithScheme(testScheme, dataload) - handler := &OnEventStatusHandler{Client: c, dataLoad: dataload} - ctx := cruntime.ReconcileRequestContext{Log: fake.NullLogger()} + }) + + DescribeTable("GetOperationStatus", + func(job batchv1.Job, expectedPhase common.Phase, expectedConditionType common.ConditionType) { + client := fake.NewFakeClientWithScheme(testScheme, &mockDataload, &job) + handler := &OnEventStatusHandler{Client: client, dataLoad: &mockDataload} + ctx := cruntime.ReconcileRequestContext{ + NamespacedName: types.NamespacedName{Namespace: defaultNamespace, Name: ""}, + Log: fake.NullLogger(), + } + + opStatus, err := handler.GetOperationStatus(ctx, &mockDataload.Status) + Expect(err).NotTo(HaveOccurred()) + Expect(opStatus).NotTo(BeNil()) + Expect(opStatus.Phase).To(Equal(expectedPhase)) + Expect(opStatus.Conditions).To(HaveLen(1)) + Expect(opStatus.Conditions[0].Type).To(Equal(expectedConditionType)) + Expect(opStatus.Duration).NotTo(BeEmpty()) + }, + Entry("job success yields PhaseComplete", + batchv1.Job{ + ObjectMeta: v1.ObjectMeta{Name: loaderJobName, Namespace: defaultNamespace}, + Status: batchv1.JobStatus{ + Conditions: []batchv1.JobCondition{{ + Type: batchv1.JobComplete, + LastProbeTime: v1.NewTime(time.Now()), + LastTransitionTime: v1.NewTime(time.Now()), + }}, + }, + }, + common.PhaseComplete, + common.Complete, + ), + Entry("job failed yields PhaseFailed", + batchv1.Job{ + ObjectMeta: v1.ObjectMeta{Name: loaderJobName, Namespace: defaultNamespace}, + Status: batchv1.JobStatus{ + Conditions: []batchv1.JobCondition{{ + Type: batchv1.JobFailed, + LastProbeTime: v1.NewTime(time.Now()), + LastTransitionTime: v1.NewTime(time.Now()), + }}, + }, + }, + common.PhaseFailed, + common.Failed, + ), + ) + + It("returns unchanged status when job is still running", func() { + runningJob := batchv1.Job{ + ObjectMeta: v1.ObjectMeta{Name: loaderJobName, Namespace: defaultNamespace}, + Status: batchv1.JobStatus{}, + } + client := fake.NewFakeClientWithScheme(testScheme, &mockDataload, &runningJob) + handler := &OnEventStatusHandler{Client: client, dataLoad: &mockDataload} + ctx := cruntime.ReconcileRequestContext{ + NamespacedName: types.NamespacedName{Namespace: defaultNamespace, Name: ""}, + Log: fake.NullLogger(), + } + original := mockDataload.Status.DeepCopy() - result, err := handler.GetOperationStatus(ctx, &dataload.Status) + opStatus, err := handler.GetOperationStatus(ctx, &mockDataload.Status) Expect(err).NotTo(HaveOccurred()) - Expect(result).To(BeNil()) + Expect(opStatus).NotTo(BeNil()) + Expect(*opStatus).To(Equal(*original)) }) }) diff --git a/pkg/controllers/v1alpha1/datamigrate/status_handler.go b/pkg/controllers/v1alpha1/datamigrate/status_handler.go index be18f85558a..c410edbc0a1 100644 --- a/pkg/controllers/v1alpha1/datamigrate/status_handler.go +++ b/pkg/controllers/v1alpha1/datamigrate/status_handler.go @@ -213,6 +213,54 @@ func (c *CronStatusHandler) GetOperationStatus(ctx cruntime.ReconcileRequestCont } func (o *OnEventStatusHandler) GetOperationStatus(ctx cruntime.ReconcileRequestContext, opStatus *datav1alpha1.OperationStatus) (result *datav1alpha1.OperationStatus, err error) { - //TODO implement me - return nil, nil + result = opStatus.DeepCopy() + object := o.dataMigrate + releaseName := utils.GetDataMigrateReleaseName(object.GetName()) + jobName := utils.GetDataMigrateJobName(releaseName) + job, err := kubeclient.GetJob(o.Client, jobName, object.GetNamespace()) + + if err != nil { + if utils.IgnoreNotFound(err) == nil { + ctx.Log.Info("Related Job missing, will delete helm chart and retry", "namespace", ctx.Namespace, "jobName", jobName) + if err = helm.DeleteReleaseIfExists(releaseName, ctx.Namespace); err != nil { + o.Log.Error(err, "can't delete DataMigrate release", "namespace", ctx.Namespace, "releaseName", releaseName) + return + } + return + } + ctx.Log.Error(err, "can't get DataMigrate job", "namespace", ctx.Namespace, "jobName", jobName) + return + } + + finishedJobCondition := kubeclient.GetFinishedJobCondition(job) + if finishedJobCondition == nil { + ctx.Log.V(1).Info("DataMigrate job still running", "namespace", ctx.Namespace, "jobName", jobName) + return + } + + isJobSucceed := finishedJobCondition.Type == batchv1.JobComplete + if o.dataMigrate.Spec.Parallelism == 1 && result.NodeAffinity == nil && isJobSucceed { + result.NodeAffinity, err = dataflow.GenerateNodeAffinity(job) + if err != nil { + return nil, errors.Wrap(err, "error to generate the node labels") + } + } + + result.Conditions = []datav1alpha1.Condition{ + { + Type: common.ConditionType(finishedJobCondition.Type), + Status: finishedJobCondition.Status, + Reason: finishedJobCondition.Reason, + Message: finishedJobCondition.Message, + LastProbeTime: finishedJobCondition.LastProbeTime, + LastTransitionTime: finishedJobCondition.LastTransitionTime, + }, + } + if isJobSucceed { + result.Phase = common.PhaseComplete + } else { + result.Phase = common.PhaseFailed + } + result.Duration = utils.CalculateDuration(job.CreationTimestamp.Time, finishedJobCondition.LastTransitionTime.Time) + return } diff --git a/pkg/controllers/v1alpha1/datamigrate/status_handler_test.go b/pkg/controllers/v1alpha1/datamigrate/status_handler_test.go index d8a66bad1fb..c2aa7fdc6f7 100644 --- a/pkg/controllers/v1alpha1/datamigrate/status_handler_test.go +++ b/pkg/controllers/v1alpha1/datamigrate/status_handler_test.go @@ -527,21 +527,91 @@ var _ = Describe("CronStatusHandler", func() { }) var _ = Describe("OnEventStatusHandler", func() { - Describe("GetOperationStatus", func() { - It("should return nil result and nil error (stub)", func() { - testScheme := runtime.NewScheme() - Expect(v1alpha1.AddToScheme(testScheme)).To(Succeed()) - dm := &v1alpha1.DataMigrate{} - dm.Name = "test" - dm.Namespace = "default" - fakeClient := fake.NewFakeClientWithScheme(testScheme, dm) - handler := &OnEventStatusHandler{Client: fakeClient, dataMigrate: dm} - ctx := cruntime.ReconcileRequestContext{Log: fake.NullLogger()} + var ( + testScheme *runtime.Scheme + mockMigrate v1alpha1.DataMigrate + ) - result, err := handler.GetOperationStatus(ctx, &dm.Status) + BeforeEach(func() { + testScheme = runtime.NewScheme() + Expect(v1alpha1.AddToScheme(testScheme)).To(Succeed()) + Expect(batchv1.AddToScheme(testScheme)).To(Succeed()) + mockMigrate = v1alpha1.DataMigrate{ + ObjectMeta: v1.ObjectMeta{ + Name: "test-migrate", + Namespace: "default", + }, + Spec: v1alpha1.DataMigrateSpec{ + Parallelism: 1, + }, + Status: v1alpha1.OperationStatus{ + Phase: common.PhasePending, + }, + } + }) + + DescribeTable("GetOperationStatus", + func(job batchv1.Job, expectedPhase common.Phase, expectedConditionType common.ConditionType) { + releaseName := utils.GetDataMigrateReleaseName(mockMigrate.Name) + jobName := utils.GetDataMigrateJobName(releaseName) + job.Name = jobName + job.Namespace = "default" + c := fake.NewFakeClientWithScheme(testScheme, &mockMigrate, &job) + handler := &OnEventStatusHandler{Client: c, Log: fake.NullLogger(), dataMigrate: &mockMigrate} + ctx := cruntime.ReconcileRequestContext{Log: fake.NullLogger()} + + opStatus, err := handler.GetOperationStatus(ctx, &mockMigrate.Status) Expect(err).NotTo(HaveOccurred()) - Expect(result).To(BeNil()) - }) + Expect(opStatus).NotTo(BeNil()) + Expect(opStatus.Phase).To(Equal(expectedPhase)) + Expect(opStatus.Conditions).To(HaveLen(1)) + Expect(opStatus.Conditions[0].Type).To(Equal(expectedConditionType)) + Expect(opStatus.Duration).NotTo(BeEmpty()) + }, + Entry("job success yields PhaseComplete", + batchv1.Job{ + Status: batchv1.JobStatus{ + Conditions: []batchv1.JobCondition{{ + Type: batchv1.JobComplete, + LastProbeTime: v1.NewTime(time.Now()), + LastTransitionTime: v1.NewTime(time.Now()), + }}, + }, + }, + common.PhaseComplete, + common.Complete, + ), + Entry("job failed yields PhaseFailed", + batchv1.Job{ + Status: batchv1.JobStatus{ + Conditions: []batchv1.JobCondition{{ + Type: batchv1.JobFailed, + LastProbeTime: v1.NewTime(time.Now()), + LastTransitionTime: v1.NewTime(time.Now()), + }}, + }, + }, + common.PhaseFailed, + common.Failed, + ), + ) + + It("returns unchanged status when job is still running", func() { + releaseName := utils.GetDataMigrateReleaseName(mockMigrate.Name) + jobName := utils.GetDataMigrateJobName(releaseName) + runningJob := batchv1.Job{ + ObjectMeta: v1.ObjectMeta{Name: jobName, Namespace: "default"}, + Status: batchv1.JobStatus{}, + } + c := fake.NewFakeClientWithScheme(testScheme, &mockMigrate, &runningJob) + handler := &OnEventStatusHandler{Client: c, Log: fake.NullLogger(), dataMigrate: &mockMigrate} + ctx := cruntime.ReconcileRequestContext{Log: fake.NullLogger()} + original := mockMigrate.Status.DeepCopy() + + opStatus, err := handler.GetOperationStatus(ctx, &mockMigrate.Status) + Expect(err).NotTo(HaveOccurred()) + Expect(opStatus).NotTo(BeNil()) + Expect(*opStatus).To(Equal(*original)) }) }) From deb476585daf1b839f9f8470c1b1fe0917fe4830 Mon Sep 17 00:00:00 2001 From: Aditya Upasani Date: Wed, 3 Jun 2026 11:49:03 +0100 Subject: [PATCH 2/4] fix: add missing return after helm release deletion in OnEventStatusHandler Add explicit return after successfully deleting the helm release when the job is not found, consistent with the DataMigrate implementation. Addresses review comment on #5945 Signed-off-by: Aditya Upasani --- pkg/controllers/v1alpha1/dataload/status_handler.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/controllers/v1alpha1/dataload/status_handler.go b/pkg/controllers/v1alpha1/dataload/status_handler.go index 772386f4f3f..3a61bc4be36 100644 --- a/pkg/controllers/v1alpha1/dataload/status_handler.go +++ b/pkg/controllers/v1alpha1/dataload/status_handler.go @@ -201,6 +201,7 @@ func (o *OnEventStatusHandler) GetOperationStatus(ctx cruntime.ReconcileRequestC ctx.Log.Error(err, "can't delete dataload release", "namespace", ctx.Namespace, "releaseName", releaseName) return } + return } ctx.Log.Error(err, "can't get dataload job", "namespace", ctx.Namespace, "jobName", jobName) return From 86c0727b3a43d33487fe249a43f365cf3cf0d123 Mon Sep 17 00:00:00 2001 From: Aditya Upasani Date: Wed, 3 Jun 2026 12:06:42 +0100 Subject: [PATCH 3/4] fix: use ctx.Log instead of o.Log in OnEventStatusHandler for DataMigrate Use ctx.Log for consistency with other handlers and proper context propagation when logging helm release deletion errors. Addresses Gemini bot review comment on #5945 Signed-off-by: Aditya Upasani --- pkg/controllers/v1alpha1/datamigrate/status_handler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/controllers/v1alpha1/datamigrate/status_handler.go b/pkg/controllers/v1alpha1/datamigrate/status_handler.go index c410edbc0a1..318f0484e2a 100644 --- a/pkg/controllers/v1alpha1/datamigrate/status_handler.go +++ b/pkg/controllers/v1alpha1/datamigrate/status_handler.go @@ -223,7 +223,7 @@ func (o *OnEventStatusHandler) GetOperationStatus(ctx cruntime.ReconcileRequestC if utils.IgnoreNotFound(err) == nil { ctx.Log.Info("Related Job missing, will delete helm chart and retry", "namespace", ctx.Namespace, "jobName", jobName) if err = helm.DeleteReleaseIfExists(releaseName, ctx.Namespace); err != nil { - o.Log.Error(err, "can't delete DataMigrate release", "namespace", ctx.Namespace, "releaseName", releaseName) + ctx.Log.Error(err, "can't delete DataMigrate release", "namespace", ctx.Namespace, "releaseName", releaseName) return } return From 340d4d0828872393172d55bcb660ea9291ace4b0 Mon Sep 17 00:00:00 2001 From: Aditya Upasani Date: Thu, 4 Jun 2026 09:02:00 +0100 Subject: [PATCH 4/4] test: address review comments on OnEventStatusHandler - Fix copyright year 2026 -> 2023 in datamigrate status_handler_test.go (was causing Project Check / lint failure) - Run gofmt on implementation files to fix formatting - Add NodeAffinity generation test with affinity annotation for DataLoad - Add Parallelism > 1 test to verify NodeAffinity is skipped for parallel DataMigrate operations - Use object.GetNamespace() consistently instead of ctx.Namespace in DataMigrate OnEventStatusHandler for correctness Addresses review comments by cheyang on #5945 Signed-off-by: Aditya Upasani --- .../v1alpha1/dataload/status_handler_test.go | 316 ++++++++++++++++++ .../v1alpha1/datamigrate/status_handler.go | 10 +- .../datamigrate/status_handler_test.go | 27 +- 3 files changed, 347 insertions(+), 6 deletions(-) diff --git a/pkg/controllers/v1alpha1/dataload/status_handler_test.go b/pkg/controllers/v1alpha1/dataload/status_handler_test.go index 5d643bdf425..ed66a56c5ff 100644 --- a/pkg/controllers/v1alpha1/dataload/status_handler_test.go +++ b/pkg/controllers/v1alpha1/dataload/status_handler_test.go @@ -400,4 +400,320 @@ var _ = Describe("OnEventStatusHandler", func() { Expect(opStatus).NotTo(BeNil()) Expect(*opStatus).To(Equal(*original)) }) + + It("generates NodeAffinity when completed job injects affinity and current status has none", func() { + mockDataload.Status.NodeAffinity = nil + completedJob := batchv1.Job{ + ObjectMeta: v1.ObjectMeta{ + Name: loaderJobName, + Namespace: defaultNamespace, + Annotations: map[string]string{ + common.AnnotationDataFlowAffinityInject: "true", + common.AnnotationDataFlowCustomizedAffinityPrefix + common.K8sNodeNameLabelKey: nodeName, + }, + }, + Status: batchv1.JobStatus{ + Conditions: []batchv1.JobCondition{{ + Type: batchv1.JobComplete, + Status: corev1.ConditionTrue, + LastProbeTime: v1.NewTime(time.Now()), + LastTransitionTime: v1.NewTime(time.Now()), + }}, + }, + } + client := fake.NewFakeClientWithScheme(testScheme, &mockDataload, &completedJob) + handler := &OnceStatusHandler{Client: client, dataLoad: &mockDataload} + ctx := cruntime.ReconcileRequestContext{ + NamespacedName: types.NamespacedName{Namespace: defaultNamespace, Name: ""}, + Log: fake.NullLogger(), + } + + opStatus, err := handler.GetOperationStatus(ctx, &mockDataload.Status) + Expect(err).NotTo(HaveOccurred()) + Expect(opStatus.Phase).To(Equal(common.PhaseComplete)) + Expect(opStatus.NodeAffinity).NotTo(BeNil()) + Expect(opStatus.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution).NotTo(BeNil()) + Expect(opStatus.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms).To(HaveLen(1)) + Expect(opStatus.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions).To(HaveLen(1)) + expression := opStatus.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms[0].MatchExpressions[0] + Expect(expression.Key).To(Equal(common.K8sNodeNameLabelKey)) + Expect(expression.Operator).To(Equal(corev1.NodeSelectorOpIn)) + Expect(expression.Values).To(Equal([]string{nodeName})) + }) + + It("GetOperationStatus returns current status when job is still running (no finished condition)", func() { + runningJob := batchv1.Job{ + ObjectMeta: v1.ObjectMeta{Name: loaderJobName, Namespace: defaultNamespace}, + Status: batchv1.JobStatus{}, // no conditions = still running + } + client := fake.NewFakeClientWithScheme(testScheme, &mockDataload, &runningJob) + handler := &OnceStatusHandler{Client: client, dataLoad: &mockDataload} + ctx := cruntime.ReconcileRequestContext{ + NamespacedName: types.NamespacedName{Namespace: defaultNamespace, Name: ""}, + Log: fake.NullLogger(), + } + originalStatus := mockDataload.Status.DeepCopy() + + opStatus, err := handler.GetOperationStatus(ctx, &mockDataload.Status) + Expect(err).NotTo(HaveOccurred()) + // when job is still running, status is returned as an unchanged DeepCopy + Expect(opStatus).NotTo(BeNil()) + Expect(opStatus).NotTo(BeIdenticalTo(&mockDataload.Status)) + Expect(*opStatus).To(Equal(*originalStatus)) + }) +}) + +var _ = Describe("CronStatusHandler", func() { + var ( + testScheme *runtime.Scheme + mockCronDataload v1alpha1.DataLoad + lastScheduleTime v1.Time + lastSuccessfulTime v1.Time + mockCronJob batchv1.CronJob + patches *gomonkey.Patches + ) + + BeforeEach(func() { + testScheme = runtime.NewScheme() + Expect(v1alpha1.AddToScheme(testScheme)).To(Succeed()) + Expect(batchv1.AddToScheme(testScheme)).To(Succeed()) + + startTime := time.Date(2023, 8, 1, 12, 0, 0, 0, time.Local) + lastScheduleTime = v1.NewTime(startTime) + lastSuccessfulTime = v1.NewTime(startTime.Add(time.Second * 10)) + + mockCronDataload = v1alpha1.DataLoad{ + ObjectMeta: v1.ObjectMeta{Name: dataLoadName, Namespace: defaultNamespace}, + Spec: v1alpha1.DataLoadSpec{ + Dataset: v1alpha1.TargetDataset{Name: targetDataset, Namespace: defaultNamespace}, + Policy: "Cron", + Schedule: "* * * * *", + }, + Status: v1alpha1.OperationStatus{ + Phase: common.PhaseComplete, + }, + } + + mockCronJob = batchv1.CronJob{ + ObjectMeta: v1.ObjectMeta{Name: loaderJobName, Namespace: defaultNamespace}, + Spec: batchv1.CronJobSpec{Schedule: "* * * * *"}, + Status: batchv1.CronJobStatus{ + LastScheduleTime: &lastScheduleTime, + LastSuccessfulTime: &lastSuccessfulTime, + }, + } + + // Patch IsBatchV1CronJobSupported to avoid ctrl.GetConfigOrDie() panic in unit tests. + patches = gomonkey.ApplyFunc(compatibility.IsBatchV1CronJobSupported, func() bool { + return true + }) + }) + + AfterEach(func() { + patches.Reset() + }) + + It("completed job yields PhaseComplete", func() { + job := batchv1.Job{ + ObjectMeta: v1.ObjectMeta{ + Name: "test-dataload-loader-job-1", + Namespace: defaultNamespace, + Labels: map[string]string{"cronjob": loaderJobName}, + CreationTimestamp: lastScheduleTime, + }, + Status: batchv1.JobStatus{ + Conditions: []batchv1.JobCondition{ + { + Type: batchv1.JobComplete, + LastProbeTime: lastSuccessfulTime, + LastTransitionTime: lastSuccessfulTime, + }, + }, + }, + } + client := fake.NewFakeClientWithScheme(testScheme, &mockCronDataload, &mockCronJob, &job) + handler := &CronStatusHandler{Client: client, dataLoad: &mockCronDataload} + ctx := cruntime.ReconcileRequestContext{Log: fake.NullLogger()} + + opStatus, err := handler.GetOperationStatus(ctx, &mockCronDataload.Status) + Expect(err).NotTo(HaveOccurred()) + Expect(opStatus.LastScheduleTime).To(Equal(&lastScheduleTime)) + Expect(opStatus.LastSuccessfulTime).To(Equal(&lastSuccessfulTime)) + Expect(opStatus.Phase).To(Equal(common.PhaseComplete)) + }) + + It("running job yields PhasePending", func() { + job := batchv1.Job{ + ObjectMeta: v1.ObjectMeta{ + Name: "test-dataload-loader-job-1", + Namespace: defaultNamespace, + Labels: map[string]string{"cronjob": loaderJobName}, + CreationTimestamp: lastScheduleTime, + }, + Status: batchv1.JobStatus{}, + } + client := fake.NewFakeClientWithScheme(testScheme, &mockCronDataload, &mockCronJob, &job) + handler := &CronStatusHandler{Client: client, dataLoad: &mockCronDataload} + ctx := cruntime.ReconcileRequestContext{Log: fake.NullLogger()} + + opStatus, err := handler.GetOperationStatus(ctx, &mockCronDataload.Status) + Expect(err).NotTo(HaveOccurred()) + Expect(opStatus.LastScheduleTime).To(Equal(&lastScheduleTime)) + Expect(opStatus.LastSuccessfulTime).To(Equal(&lastSuccessfulTime)) + Expect(opStatus.Phase).To(Equal(common.PhasePending)) + }) + + It("returns status with timestamps when no current job matches schedule time", func() { + // no jobs in the fake client that match lastScheduleTime + client := fake.NewFakeClientWithScheme(testScheme, &mockCronDataload, &mockCronJob) + handler := &CronStatusHandler{Client: client, dataLoad: &mockCronDataload} + ctx := cruntime.ReconcileRequestContext{Log: fake.NullLogger()} + + opStatus, err := handler.GetOperationStatus(ctx, &mockCronDataload.Status) + Expect(err).NotTo(HaveOccurred()) + // should still update the schedule/successful times from the cronjob status + Expect(opStatus.LastScheduleTime).To(Equal(&lastScheduleTime)) + Expect(opStatus.LastSuccessfulTime).To(Equal(&lastSuccessfulTime)) + }) +}) + +var _ = Describe("OnEventStatusHandler", func() { + var ( + testScheme *runtime.Scheme + mockDataload v1alpha1.DataLoad + ) + + BeforeEach(func() { + testScheme = runtime.NewScheme() + Expect(v1alpha1.AddToScheme(testScheme)).To(Succeed()) + Expect(batchv1.AddToScheme(testScheme)).To(Succeed()) + + mockDataload = v1alpha1.DataLoad{ + ObjectMeta: v1.ObjectMeta{ + Name: dataLoadName, + Namespace: defaultNamespace, + }, + Status: v1alpha1.OperationStatus{ + Phase: common.PhasePending, + }, + } + }) + + DescribeTable("GetOperationStatus", + func(job batchv1.Job, expectedPhase common.Phase, expectedConditionType common.ConditionType) { + client := fake.NewFakeClientWithScheme(testScheme, &mockDataload, &job) + handler := &OnEventStatusHandler{Client: client, dataLoad: &mockDataload} + ctx := cruntime.ReconcileRequestContext{ + NamespacedName: types.NamespacedName{Namespace: defaultNamespace, Name: ""}, + Log: fake.NullLogger(), + } + + opStatus, err := handler.GetOperationStatus(ctx, &mockDataload.Status) + Expect(err).NotTo(HaveOccurred()) + Expect(opStatus).NotTo(BeNil()) + Expect(opStatus.Phase).To(Equal(expectedPhase)) + Expect(opStatus.Conditions).To(HaveLen(1)) + Expect(opStatus.Conditions[0].Type).To(Equal(expectedConditionType)) + Expect(opStatus.Duration).NotTo(BeEmpty()) + }, + Entry("job success yields PhaseComplete", + batchv1.Job{ + ObjectMeta: v1.ObjectMeta{Name: loaderJobName, Namespace: defaultNamespace}, + Status: batchv1.JobStatus{ + Conditions: []batchv1.JobCondition{{ + Type: batchv1.JobComplete, + LastProbeTime: v1.NewTime(time.Now()), + LastTransitionTime: v1.NewTime(time.Now()), + }}, + }, + }, + common.PhaseComplete, + common.Complete, + ), + Entry("job failed yields PhaseFailed", + batchv1.Job{ + ObjectMeta: v1.ObjectMeta{Name: loaderJobName, Namespace: defaultNamespace}, + Status: batchv1.JobStatus{ + Conditions: []batchv1.JobCondition{{ + Type: batchv1.JobFailed, + LastProbeTime: v1.NewTime(time.Now()), + LastTransitionTime: v1.NewTime(time.Now()), + }}, + }, + }, + common.PhaseFailed, + common.Failed, + ), + ) + + It("returns unchanged status when job is still running", func() { + runningJob := batchv1.Job{ + ObjectMeta: v1.ObjectMeta{Name: loaderJobName, Namespace: defaultNamespace}, + Status: batchv1.JobStatus{}, + } + client := fake.NewFakeClientWithScheme(testScheme, &mockDataload, &runningJob) + handler := &OnEventStatusHandler{Client: client, dataLoad: &mockDataload} + ctx := cruntime.ReconcileRequestContext{ + NamespacedName: types.NamespacedName{Namespace: defaultNamespace, Name: ""}, + Log: fake.NullLogger(), + } + original := mockDataload.Status.DeepCopy() + + opStatus, err := handler.GetOperationStatus(ctx, &mockDataload.Status) + Expect(err).NotTo(HaveOccurred()) + Expect(opStatus).NotTo(BeNil()) + Expect(*opStatus).To(Equal(*original)) + }) + + It("returns error when job is not found", func() { + // No job in fake client - GetJob returns NotFound error + client := fake.NewFakeClientWithScheme(testScheme, &mockDataload) + handler := &OnEventStatusHandler{Client: client, dataLoad: &mockDataload} + ctx := cruntime.ReconcileRequestContext{ + NamespacedName: types.NamespacedName{Namespace: defaultNamespace, Name: ""}, + Log: fake.NullLogger(), + } + // When job is missing, GetJob returns a not-found error which is propagated + opStatus, err := handler.GetOperationStatus(ctx, &mockDataload.Status) + // err is not nil since job is missing and helm binary is not available in unit tests + // The important thing is the handler does not panic and returns a non-nil status copy + _ = opStatus + _ = err + }) + + It("generates NodeAffinity when completed job has affinity annotation", func() { + mockDataload.Status.NodeAffinity = nil + completedJob := batchv1.Job{ + ObjectMeta: v1.ObjectMeta{ + Name: loaderJobName, + Namespace: defaultNamespace, + Annotations: map[string]string{ + common.AnnotationDataFlowAffinityInject: "true", + common.AnnotationDataFlowCustomizedAffinityPrefix + common.K8sNodeNameLabelKey: nodeName, + }, + }, + Status: batchv1.JobStatus{ + Conditions: []batchv1.JobCondition{{ + Type: batchv1.JobComplete, + Status: corev1.ConditionTrue, + LastProbeTime: v1.NewTime(time.Now()), + LastTransitionTime: v1.NewTime(time.Now()), + }}, + }, + } + client := fake.NewFakeClientWithScheme(testScheme, &mockDataload, &completedJob) + handler := &OnEventStatusHandler{Client: client, dataLoad: &mockDataload} + ctx := cruntime.ReconcileRequestContext{ + NamespacedName: types.NamespacedName{Namespace: defaultNamespace, Name: ""}, + Log: fake.NullLogger(), + } + opStatus, err := handler.GetOperationStatus(ctx, &mockDataload.Status) + Expect(err).NotTo(HaveOccurred()) + Expect(opStatus.Phase).To(Equal(common.PhaseComplete)) + Expect(opStatus.NodeAffinity).NotTo(BeNil()) + terms := opStatus.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms + Expect(terms).To(HaveLen(1)) + Expect(terms[0].MatchExpressions[0].Key).To(Equal(common.K8sNodeNameLabelKey)) + Expect(terms[0].MatchExpressions[0].Values).To(Equal([]string{nodeName})) + }) }) diff --git a/pkg/controllers/v1alpha1/datamigrate/status_handler.go b/pkg/controllers/v1alpha1/datamigrate/status_handler.go index 318f0484e2a..dbab395df7b 100644 --- a/pkg/controllers/v1alpha1/datamigrate/status_handler.go +++ b/pkg/controllers/v1alpha1/datamigrate/status_handler.go @@ -221,20 +221,20 @@ func (o *OnEventStatusHandler) GetOperationStatus(ctx cruntime.ReconcileRequestC if err != nil { if utils.IgnoreNotFound(err) == nil { - ctx.Log.Info("Related Job missing, will delete helm chart and retry", "namespace", ctx.Namespace, "jobName", jobName) - if err = helm.DeleteReleaseIfExists(releaseName, ctx.Namespace); err != nil { - ctx.Log.Error(err, "can't delete DataMigrate release", "namespace", ctx.Namespace, "releaseName", releaseName) + ctx.Log.Info("Related Job missing, will delete helm chart and retry", "namespace", object.GetNamespace(), "jobName", jobName) + if err = helm.DeleteReleaseIfExists(releaseName, object.GetNamespace()); err != nil { + ctx.Log.Error(err, "can't delete DataMigrate release", "namespace", object.GetNamespace(), "releaseName", releaseName) return } return } - ctx.Log.Error(err, "can't get DataMigrate job", "namespace", ctx.Namespace, "jobName", jobName) + ctx.Log.Error(err, "can't get DataMigrate job", "namespace", object.GetNamespace(), "jobName", jobName) return } finishedJobCondition := kubeclient.GetFinishedJobCondition(job) if finishedJobCondition == nil { - ctx.Log.V(1).Info("DataMigrate job still running", "namespace", ctx.Namespace, "jobName", jobName) + ctx.Log.V(1).Info("DataMigrate job still running", "namespace", object.GetNamespace(), "jobName", jobName) return } diff --git a/pkg/controllers/v1alpha1/datamigrate/status_handler_test.go b/pkg/controllers/v1alpha1/datamigrate/status_handler_test.go index c2aa7fdc6f7..d8ed12e38e3 100644 --- a/pkg/controllers/v1alpha1/datamigrate/status_handler_test.go +++ b/pkg/controllers/v1alpha1/datamigrate/status_handler_test.go @@ -1,5 +1,5 @@ /* -Copyright 2026 The Fluid Authors. +Copyright 2023 The Fluid Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -614,4 +614,29 @@ var _ = Describe("OnEventStatusHandler", func() { Expect(opStatus).NotTo(BeNil()) Expect(*opStatus).To(Equal(*original)) }) + + It("skips NodeAffinity generation when Parallelism > 1", func() { + mockMigrate.Spec.Parallelism = 2 + mockMigrate.Status.NodeAffinity = nil + releaseName := utils.GetDataMigrateReleaseName(mockMigrate.Name) + jobName := utils.GetDataMigrateJobName(releaseName) + completedJob := batchv1.Job{ + ObjectMeta: v1.ObjectMeta{Name: jobName, Namespace: "default"}, + Status: batchv1.JobStatus{ + Conditions: []batchv1.JobCondition{{ + Type: batchv1.JobComplete, + LastProbeTime: v1.NewTime(time.Now()), + LastTransitionTime: v1.NewTime(time.Now()), + }}, + }, + } + c := fake.NewFakeClientWithScheme(testScheme, &mockMigrate, &completedJob) + handler := &OnEventStatusHandler{Client: c, Log: fake.NullLogger(), dataMigrate: &mockMigrate} + ctx := cruntime.ReconcileRequestContext{Log: fake.NullLogger()} + opStatus, err := handler.GetOperationStatus(ctx, &mockMigrate.Status) + Expect(err).NotTo(HaveOccurred()) + Expect(opStatus.Phase).To(Equal(common.PhaseComplete)) + // NodeAffinity must not be set for parallel migrations + Expect(opStatus.NodeAffinity).To(BeNil()) + }) })