-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathjob.go
More file actions
167 lines (143 loc) · 4.07 KB
/
job.go
File metadata and controls
167 lines (143 loc) · 4.07 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
package maa
import "errors"
// Job represents an asynchronous job with status tracking capabilities.
// It provides methods to check the job status and wait for completion.
type Job struct {
id int64
finalStatus Status
statusFunc func(id int64) Status
waitFunc func(id int64) Status
}
func newJob(id int64, statusFunc func(id int64) Status, waitFunc func(id int64) Status) *Job {
return &Job{
id: id,
statusFunc: statusFunc,
waitFunc: waitFunc,
}
}
// Status returns the current status of the job.
func (j *Job) Status() Status {
if j.finalStatus.Invalid() {
return j.statusFunc(j.id)
}
return j.finalStatus
}
// Invalid reports whether the status is invalid.
func (j *Job) Invalid() bool {
return j.Status().Invalid()
}
// Pending reports whether the status is pending.
func (j *Job) Pending() bool {
return j.Status().Pending()
}
// Running reports whether the status is running.
func (j *Job) Running() bool {
return j.Status().Running()
}
// Success reports whether the status is success.
func (j *Job) Success() bool {
return j.Status().Success()
}
// Failure reports whether the status is a failure.
func (j *Job) Failure() bool {
return j.Status().Failure()
}
// Done reports whether the job is done (either success or failure).
func (j *Job) Done() bool {
return j.Status().Done()
}
// Wait blocks until the job completes and returns the job instance.
func (j *Job) Wait() *Job {
if j.finalStatus.Invalid() {
j.finalStatus = j.waitFunc(j.id)
}
return j
}
// TaskJob extends Job with task-specific functionality.
// It provides additional methods to retrieve task details.
type TaskJob struct {
job *Job
getTaskDetailFunc func(id int64) (*TaskDetail, error)
overridePipelineFunc func(id int64, override any) error
err error
}
func newTaskJob(
id int64,
statusFunc func(id int64) Status,
waitFunc func(id int64) Status,
getTaskDetailFunc func(id int64) (*TaskDetail, error),
overridePipelineFunc func(id int64, override any) error,
err error,
) *TaskJob {
job := newJob(id, statusFunc, waitFunc)
return &TaskJob{
job: job,
getTaskDetailFunc: getTaskDetailFunc,
overridePipelineFunc: overridePipelineFunc,
err: err,
}
}
// Status returns the current status of the task job.
// If the task job has an error, it returns StatusFailure.
func (j *TaskJob) Status() Status {
if j.err != nil {
return StatusFailure
}
return j.job.Status()
}
// Wait blocks until the task job completes and returns the TaskJob instance.
func (j *TaskJob) Wait() *TaskJob {
if j.err == nil {
j.job.Wait()
}
return j
}
// Error returns the error of the task job.
func (j *TaskJob) Error() error {
return j.err
}
// Invalid reports whether the status is invalid.
func (j *TaskJob) Invalid() bool {
return j.Status().Invalid()
}
// Pending reports whether the status is pending.
func (j *TaskJob) Pending() bool {
return j.Status().Pending()
}
// Running reports whether the status is running.
func (j *TaskJob) Running() bool {
return j.Status().Running()
}
// Success reports whether the status is success.
func (j *TaskJob) Success() bool {
return j.Status().Success()
}
// Failure reports whether the status is a failure.
func (j *TaskJob) Failure() bool {
return j.Status().Failure()
}
// Done reports whether the job is done (either success or failure).
func (j *TaskJob) Done() bool {
return j.Status().Done()
}
// GetDetail retrieves the detailed information of the task.
func (j *TaskJob) GetDetail() (*TaskDetail, error) {
if j.err != nil {
return nil, j.err
}
if j.getTaskDetailFunc == nil {
return nil, errors.New("getTaskDetailFunc is nil")
}
return j.getTaskDetailFunc(j.job.id)
}
// OverridePipeline overrides the pipeline for a running task.
// The `override` parameter can be a JSON string or any data type that can be marshaled to JSON.
func (j *TaskJob) OverridePipeline(override any) error {
if j.err != nil {
return j.err
}
if j.overridePipelineFunc == nil {
return errors.New("overridePipelineFunc is nil")
}
return j.overridePipelineFunc(j.job.id, override)
}