-
-
Notifications
You must be signed in to change notification settings - Fork 76
Expand file tree
/
Copy pathworker.ts
More file actions
188 lines (167 loc) · 4.97 KB
/
worker.ts
File metadata and controls
188 lines (167 loc) · 4.97 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
import Base from "./base";
import { Message } from "../kombu/message";
export default class Worker extends Base {
handlers: object = {};
activeTasks: Set<Promise<any>> = new Set();
/**
* register task handler on worker handlers
* @method Worker#register
* @param {String} name the name of task for dispatching.
* @param {Function} handler the function for task handling
*
* @example
* worker.register('tasks.add', (a, b) => a + b);
* worker.start();
*/
public register(name: string, handler: Function): void {
if (!handler) {
throw new Error("Undefined handler");
}
if (this.handlers[name]) {
throw new Error("Already handler setted");
}
this.handlers[name] = function registHandler(...args: any[]): Promise<any> {
try {
return Promise.resolve(handler(...args));
} catch (err) {
return Promise.reject(err);
}
};
}
/**
* start celery worker to run
* @method Worker#start
* @example
* worker.register('tasks.add', (a, b) => a + b);
* worker.start();
*/
public start(): Promise<any> {
console.info("celery.node worker start...");
console.info(`registed task: ${Object.keys(this.handlers)}`);
return this.run().catch(err => console.error(err));
}
/**
* @method Worker#run
* @private
*
* @returns {Promise}
*/
private run(): Promise<any> {
return this.isReady().then(() => this.processTasks());
}
/**
* @method Worker#processTasks
* @private
*
* @returns function results
*/
private processTasks(): Promise<any> {
const consumer = this.getConsumer(this.conf.CELERY_QUEUE);
return consumer();
}
/**
* @method Worker#getConsumer
* @private
*
* @param {String} queue queue name for task route
*/
private getConsumer(queue: string): Function {
const onMessage = this.createTaskHandler();
return (): any => this.broker.subscribe(queue, onMessage);
}
public createTaskHandler(): Function {
const onTaskReceived = (message: Message): any => {
if (!message) {
return Promise.resolve();
}
let payload = null;
let taskName = message.headers["task"];
if (!taskName) {
// protocol v1
payload = message.decode();
taskName = payload["task"];
}
// strategy
let body;
let headers;
if (payload == null && !("args" in message.decode())) {
body = message.decode(); // message.body;
headers = message.headers;
} else {
const args = payload["args"] || [];
const kwargs = payload["kwargs"] || {};
const embed = {
callbacks: payload["callbacks"],
errbacks: payload["errbacks"],
chord: payload["chord"],
chain: null
};
body = [args, kwargs, embed];
headers = {
lang: payload["lang"],
task: payload["task"],
id: payload["id"],
rootId: payload["root_id"],
parantId: payload["parentId"],
group: payload["group"],
meth: payload["meth"],
shadow: payload["shadow"],
eta: payload["eta"],
expires: payload["expires"],
retries: payload["retries"] || 0,
timelimit: payload["timelimit"] || [null, null],
kwargsrepr: payload["kwargsrepr"],
origin: payload["origin"]
};
}
// request
const [args, kwargs /*, embed */] = body;
const taskId = headers["id"];
const self = headers;
const handler = this.handlers[taskName];
if (!handler) {
throw new Error(`Missing process handler for task ${taskName}`);
}
console.info(
`celery.node Received task: ${taskName}[${taskId}], args: ${args}, kwargs: ${JSON.stringify(
kwargs
)}`
);
const timeStart = process.hrtime();
const taskPromise = handler(...args, kwargs, self).then(result => {
const diff = process.hrtime(timeStart);
console.info(
`celery.node Task ${taskName}[${taskId}] succeeded in ${diff[0] +
diff[1] / 1e9}s: ${result}`
);
this.backend.storeResult(taskId, result, "SUCCESS");
this.activeTasks.delete(taskPromise);
}).catch(err => {
console.info(`celery.node Task ${taskName}[${taskId}] failed: [${err}]`);
this.backend.storeResult(taskId, err, "FAILURE");
this.activeTasks.delete(taskPromise);
});
// record the executing task
this.activeTasks.add(taskPromise);
return taskPromise;
};
return onTaskReceived;
}
/**
* @method Worker#whenCurrentJobsFinished
*
* @returns Promise that resolves when all jobs are finished
*/
public async whenCurrentJobsFinished(): Promise<any[]> {
return Promise.all(Array.from(this.activeTasks));
}
/**
* @method Worker#stop
*
* @todo implement here
*/
// eslint-disable-next-line class-methods-use-this
public stop(): any {
throw new Error("not implemented yet");
}
}