-
-
Notifications
You must be signed in to change notification settings - Fork 76
Expand file tree
/
Copy pathredis.ts
More file actions
119 lines (109 loc) · 2.85 KB
/
redis.ts
File metadata and controls
119 lines (109 loc) · 2.85 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
import Redis from "ioredis";
import { CeleryBackend } from ".";
/**
* celery key preifx for redis result key
* @private
* @constant
*
* @type {string}
*/
const keyPrefix = "celery-task-meta-";
/**
* @exports
*/
export default class RedisBackend implements CeleryBackend {
redis: Redis;
/**
* Redis backend class
* @constructor RedisBackend
* @param {string} url the connection string of redis
* @param {object} opts the options object for redis connect of ioredis
*/
constructor(url: string, opts: object) {
this.redis = new Redis(url, {...opts});
}
/**
* codes from here: https://github.com/OptimalBits/bull/blob/129c6e108ce67ca343c8532161d06742d92b651c/lib/utils.js#L21-L44
* @method RedisBackend#isReady
* @returns {Promise} promises that continues if redis connected.
*/
public isReady(): Promise<void> {
return new Promise((resolve, reject) => {
if (this.redis.status === "ready") {
resolve();
} else {
let handleError; // eslint-disable-line prefer-const
const handleReady = () => {
this.redis.removeListener("error", handleError);
resolve();
};
handleError = err => {
this.redis.removeListener("ready", handleReady);
reject(err);
};
this.redis.once("ready", handleReady);
this.redis.once("error", handleError);
}
});
}
/**
* @method RedisBackend#disconnect
* @returns {Promise} promises that continues if redis disconnected.
*/
public disconnect(): Promise<string> {
return this.redis.quit();
}
/**
* @method RedisBackend#storeResult
* @param {string} taskId
* @param {*} result
* @param {string} state
*/
public storeResult(
taskId: string,
result: any,
state: string
): Promise<["OK", number]> {
return this.set(
`${keyPrefix}${taskId}`,
JSON.stringify({
status: state,
result: state == 'FAILURE' ? null : result,
traceback: result,
children: [],
task_id: taskId,
date_done: new Date().toISOString()
})
);
}
/**
* @method RedisBackend#getTaskMeta
* @param {string} taskId
* @returns {Promise}
*/
public getTaskMeta(taskId: string): Promise<object> {
return this.get(`${keyPrefix}${taskId}`).then(msg => JSON.parse(msg));
}
/**
* @method RedisBackend#set
* @private
* @param {String} key
* @param {String} value
* @returns {Promise}
*/
private set(key: string, value: string): Promise<["OK", number]> {
return Promise.all([
this.redis.setex(key, 86400, value),
this.redis.publish(key, value) // publish command for subscribe
]);
}
/**
* @method RedisBackend#get
* @private
* @param {string} key
* @return {Promise}
*/
private get(key: string): Promise<string> {
return this.redis.get(key);
}
}