Skip to content
This repository was archived by the owner on May 31, 2020. It is now read-only.

Commit 2445558

Browse files
holmmher
authored andcommitted
Support additional options for broker and backend (#84)
* Use same BROKER_OPTIONS for backend connection * Remove block that should never happen And wouldn’t work if it did * Add error handling to backend connection * Make broker and backend options more flexible
1 parent 3e6c60b commit 2445558

2 files changed

Lines changed: 80 additions & 84 deletions

File tree

celery.js

Lines changed: 39 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,23 @@ var createMessage = require('./protocol').createMessage;
1010
var debug = process.env.NODE_CELERY_DEBUG === '1' ? console.info : function() {};
1111

1212
var supportedProtocols = ['amqp', 'amqps', 'redis'];
13-
function checkProtocol(kind, protocol) {
13+
function getProtocol(kind, options) {
14+
const protocol = url.parse(options.url).protocol.slice(0, -1);
15+
if (protocol === 'amqps') {
16+
protocol = 'amqp';
17+
}
1418
if (supportedProtocols.indexOf(protocol) === -1) {
1519
throw new Error(util.format('Unsupported %s type: %s', kind, protocol));
1620
}
21+
debug(kind + ' type: ' + protocol);
22+
23+
return protocol;
24+
}
25+
26+
function addProtocolDefaults(protocol, options) {
27+
if (protocol === 'amqp') {
28+
options.heartbeat = options.heartbeat || 580;
29+
}
1730
}
1831

1932
function Configuration(options) {
@@ -29,8 +42,20 @@ function Configuration(options) {
2942
self.TASK_RESULT_EXPIRES = self.TASK_RESULT_EXPIRES * 1000 || 86400000; // Default 1 day
3043

3144
// broker
32-
self.BROKER_URL = self.BROKER_URL || 'amqp://';
33-
self.BROKER_OPTIONS = self.BROKER_OPTIONS || { url: self.BROKER_URL, heartbeat: 580 };
45+
self.BROKER_OPTIONS = self.BROKER_OPTIONS || {};
46+
self.BROKER_OPTIONS.url = self.BROKER_URL || 'amqp://';
47+
self.broker_type = getProtocol('broker', self.BROKER_OPTIONS);
48+
addProtocolDefaults(self.broker_type, self.BROKER_OPTIONS);
49+
50+
// backend
51+
self.RESULT_BACKEND_OPTIONS = self.RESULT_BACKEND_OPTIONS || {};
52+
if (self.RESULT_BACKEND === self.broker_type) {
53+
self.RESULT_BACKEND = self.BROKER_URL;
54+
}
55+
self.RESULT_BACKEND_OPTIONS.url = self.RESULT_BACKEND || self.BROKER_URL;
56+
self.backend_type = getProtocol('backend', self.RESULT_BACKEND_OPTIONS);
57+
addProtocolDefaults(self.backend_type, self.RESULT_BACKEND_OPTIONS);
58+
3459
self.DEFAULT_QUEUE = self.DEFAULT_QUEUE || 'celery';
3560
self.DEFAULT_EXCHANGE = self.DEFAULT_EXCHANGE || '';
3661
self.DEFAULT_EXCHANGE_TYPE = self.DEFAULT_EXCHANGE_TYPE || 'direct';
@@ -39,46 +64,11 @@ function Configuration(options) {
3964
self.IGNORE_RESULT = self.IGNORE_RESULT || false;
4065
self.TASK_RESULT_DURABLE = undefined !== self.TASK_RESULT_DURABLE ? self.TASK_RESULT_DURABLE : true; // Set Durable true by default (Celery 3.1.7)
4166
self.ROUTES = self.ROUTES || {};
42-
43-
self.broker_type = url.parse(self.BROKER_URL).protocol.slice(0, -1);
44-
if (self.broker_type === 'amqps')
45-
self.broker_type = 'amqp';
46-
debug('Broker type: ' + self.broker_type);
47-
checkProtocol('broker', self.broker_type);
48-
49-
// backend
50-
if (!self.RESULT_BACKEND || (self.RESULT_BACKEND === self.broker_type)) {
51-
self.RESULT_BACKEND = self.BROKER_URL;
52-
}
53-
54-
self.backend_type = url.parse(self.RESULT_BACKEND).protocol.slice(0, -1);
55-
if (self.backend_type === 'amqps')
56-
self.backend_type = 'amqp';
57-
debug('Backend type: ' + self.backend_type);
58-
checkProtocol('backend', self.backend_type);
5967
}
6068

61-
function RedisBroker(broker_url) {
69+
function RedisBroker(conf) {
6270
var self = this;
63-
var purl = url.parse(broker_url);
64-
var database;
65-
66-
if (purl.pathname) {
67-
database = purl.pathname.slice(1);
68-
}
69-
70-
self.redis = redis.createClient(purl.port || 6379,
71-
purl.hostname || 'localhost');
72-
73-
if (purl.auth) {
74-
debug('Authenticating broker...');
75-
self.redis.auth(purl.auth.split(':')[1]);
76-
debug('Broker authenticated...');
77-
}
78-
79-
if (database) {
80-
self.redis.select(database);
81-
}
71+
self.redis = redis.createClient(conf.BROKER_OPTIONS);
8272

8373
self.end = function() {
8474
self.redis.end(true);
@@ -128,26 +118,10 @@ util.inherits(RedisBroker, events.EventEmitter);
128118

129119
function RedisBackend(conf) {
130120
var self = this;
131-
var purl = url.parse(conf.RESULT_BACKEND);
132-
var database;
121+
self.redis = redis.createClient(conf.RESULT_BACKEND_OPTIONS);
133122

134-
if (purl.pathname) {
135-
database = purl.pathname.slice(1);
136-
}
137-
138-
debug('Connecting to backend...');
139-
if (purl.auth) {
140-
self.redis = redis.createClient(purl.port, purl.hostname, {'auth_pass': purl.auth.split(':')[1]});
141-
} else {
142-
self.redis = redis.createClient(purl.port, purl.hostname);
143-
}
144-
// needed because we'll use `psubscribe`
145123
var backend_ex = self.redis.duplicate();
146124

147-
if (database) {
148-
self.redis.select(database);
149-
}
150-
151125
self.redis.on('error', function(err) {
152126
self.emit('error', err);
153127
});
@@ -156,7 +130,7 @@ function RedisBackend(conf) {
156130
self.emit('end');
157131
});
158132

159-
self.quit = function() {
133+
self.disconnect = function() {
160134
backend_ex.quit();
161135
self.redis.quit();
162136
};
@@ -211,24 +185,21 @@ function Client(conf) {
211185
self.emit('message', msg);
212186
});
213187
} else if (self.conf.backend_type === 'amqp') {
214-
self.backend = amqp.createConnection({
215-
url: self.conf.BROKER_URL,
216-
heartbeat: 580
217-
}, {
188+
self.backend = amqp.createConnection(self.conf.RESULT_BACKEND_OPTIONS, {
218189
defaultExchangeName: self.conf.DEFAULT_EXCHANGE
219190
});
220-
} else if (self.conf.backend_type === self.conf.broker_type) {
221-
if (self.conf.backend_type === 'amqp') {
222-
self.backend = self.broker;
223-
}
224191
}
225192

193+
self.backend.on('error', function(err) {
194+
self.emit('error', err);
195+
});
196+
226197
// backend ready...
227198
self.backend.on('ready', function() {
228199
debug('Connecting to broker...');
229200

230201
if (self.conf.broker_type === 'redis') {
231-
self.broker = new RedisBroker(self.conf.BROKER_URL);
202+
self.broker = new RedisBroker(self.conf);
232203
} else if (self.conf.broker_type === 'amqp') {
233204
self.broker = amqp.createConnection(self.conf.BROKER_OPTIONS, {
234205
defaultExchangeName: self.conf.DEFAULT_EXCHANGE
@@ -260,11 +231,7 @@ Client.prototype.createTask = function(name, options, exchange) {
260231

261232
Client.prototype.end = function() {
262233
this.broker.disconnect();
263-
if (this.conf.backend_type === 'redis') {
264-
this.backend.quit();
265-
} else if (this.conf.broker_type !== this.conf.backend_type) {
266-
this.backend.quit();
267-
}
234+
this.backend.disconnect();
268235
};
269236

270237
Client.prototype.call = function(name /*[args], [kwargs], [options], [callback]*/ ) {

tests/test_celery.js

Lines changed: 41 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,34 +20,63 @@ var conf_redis = {
2020

2121
describe('celery functional tests', function() {
2222
describe('initialization', function() {
23-
it('should create a client without error', function(done) {
24-
var client1 = celery.createClient(conf_amqp),
25-
client2 = celery.createClient(conf_invalid);
23+
it('should create a valid amqp client without error', function(done) {
24+
var client = celery.createClient(conf_amqp);
2625

27-
client1.on('connect', function() {
28-
client1.end();
26+
assert.equal(client.conf.BROKER_OPTIONS.url, 'amqp://');
27+
assert.equal(client.conf.BROKER_OPTIONS.heartbeat, 580);
28+
assert.equal(client.conf.broker_type, 'amqp');
29+
30+
assert.equal(client.conf.RESULT_BACKEND_OPTIONS.url, 'amqp://');
31+
assert.equal(client.conf.RESULT_BACKEND_OPTIONS.heartbeat, 580);
32+
assert.equal(client.conf.backend_type, 'amqp');
33+
34+
client.on('connect', function() {
35+
client.end();
2936
});
3037

31-
client1.on('error', function(exception) {
32-
console.log(exception);
38+
client.on('error', function(exception) {
3339
assert.ok(false);
3440
});
3541

36-
client1.once('end', function() {
42+
client.once('end', function() {
3743
done();
3844
});
45+
});
46+
47+
it('should create a valid redis client without error', function(done) {
48+
var client = celery.createClient(conf_redis);
49+
50+
assert.equal(client.conf.BROKER_OPTIONS.url, 'redis://');
51+
assert.equal(client.conf.broker_type, 'redis');
52+
53+
assert.equal(client.conf.RESULT_BACKEND_OPTIONS.url, 'redis://');
54+
assert.equal(client.conf.backend_type, 'redis');
55+
56+
client.on('connect', function() {
57+
client.end();
58+
});
3959

40-
client2.on('ready', function() {
60+
client.on('error', function(exception) {
4161
assert.ok(false);
4262
});
4363

44-
client2.on('error', function(exception) {
45-
assert.ok(exception);
64+
client.once('end', function() {
65+
done();
4666
});
67+
});
4768

48-
client2.once('end', function() {
69+
it('should throw error on invalid amqp client', function(done) {
70+
var client = celery.createClient(conf_invalid);
71+
72+
client.on('ready', function() {
4973
assert.ok(false);
5074
});
75+
76+
client.on('error', function(exception) {
77+
assert.ok(exception);
78+
done();
79+
});
5180
});
5281
});
5382

0 commit comments

Comments
 (0)