-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathworker03.js
More file actions
89 lines (70 loc) · 2.31 KB
/
worker03.js
File metadata and controls
89 lines (70 loc) · 2.31 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
// Generated by CoffeeScript 1.8.0
(function() {
'use strict';
/*
WorkQueueMgr Example -- worker03
This program consumes two work queues: 'demo:work-queue-1' and 'demo:work-queue-2'.
It simply prints each message consumed and then "acks" it, so that the
next message will become available. Each work queue operates independently.
Usage:
cd demo/lib
node worker03.js
or, to monitor for memory leaks
node worker03.js mem | grep '>>>'
Use this program in conjunction with provider03. See provider03 source code
for more details.
*/
var WorkQueueMgr, consumeData, createWorkQueues, initEventHandlers, mgr, queue1, queue1Name, queue2, queue2Name, queuesActive, shutDown;
queue1 = null;
queue2 = null;
queue1Name = 'demo:work-queue-1';
queue2Name = 'demo:work-queue-2';
mgr = null;
queuesActive = 0;
WorkQueueMgr = require('node-redis-queue').WorkQueueMgr;
mgr = new WorkQueueMgr();
mgr.connect(function() {
console.log('work queue broker ready');
initEventHandlers();
createWorkQueues();
consumeData();
return console.log('waiting for data...');
});
initEventHandlers = function() {
mgr.on('error', function(error) {
console.log('>>>' + error);
return shutDown();
});
return mgr.on('end', function() {
console.log('>>>End Redis connection');
return shutDown();
});
};
createWorkQueues = function() {
queue1 = mgr.createQueue(queue1Name);
queue2 = mgr.createQueue(queue2Name);
queuesActive = 2;
};
consumeData = function() {
console.log('consuming queue "' + queue1.queueName + '"');
queue1.consume(function(payload, ack) {
console.log('received message "' + payload + '" in queue "' + queue1.queueName + '"');
ack(payload === '***stop***');
if (payload === '***stop***' && --queuesActive === 0) {
return mgr.end();
}
});
console.log('consuming queue "' + queue2.queueName + '"');
return queue2.consume(function(payload, ack) {
console.log('received message "' + payload + '" in queue "' + queue2.queueName + '"');
ack(payload === '***stop***');
if (payload === '***stop***' && --queuesActive === 0) {
return mgr.end();
}
});
};
shutDown = function() {
mgr.end();
return process.exit();
};
}).call(this);