Skip to content

Commit 5528f05

Browse files
committed
feat: allow to pass on args to worker function
1 parent baccd1e commit 5528f05

5 files changed

Lines changed: 1104 additions & 1410 deletions

File tree

README.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,12 +96,14 @@ function onJobCompleted(err, ...args) {
9696

9797
### Async/Await in-flight request caching
9898

99-
#### Fanout(jobId: String, [timeout: Number], job: Function)
99+
#### Fanout(jobId: String, [timeout: Number], job: Function, [...args]: any[])
100100

101101
Use `fanout(...)` method for the easiest way to handle job subscriptions where
102102
one actor must perform long-running job, but as soon as it's done - everyone who
103103
queued for the results of this job must be notified.
104104

105+
`args` are optional args to be passed on to the job function. If you need to preserve context - use .bind
106+
105107
Sample of code is provided to make use of this feature:
106108

107109
```js

__tests__/integration.js

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,7 @@ describe('integration tests', () => {
206206

207207
it('#fanout: multiple jobs are completed only once', () => {
208208
const args = ['completed'];
209+
const arg1 = 'arg1';
209210
const job = sinon.spy(() => args);
210211
const onComplete = sinon.spy();
211212
const unexpectedError = sinon.spy();
@@ -218,14 +219,15 @@ describe('integration tests', () => {
218219
const id = String(idx % 3);
219220

220221
try {
221-
onComplete(id, await queueManager.dlock.fanout(id, job));
222+
onComplete(id, await queueManager.dlock.fanout(id, job, arg1));
222223
} catch (e) {
223224
unexpectedError(e);
224225
}
225226
})
226227
.delay(100)
227228
.then(() => {
228229
assert.equal(job.callCount, 3);
230+
assert.equal(job.withArgs(arg1).callCount, 3);
229231
assert.equal(onComplete.withArgs('0', args).callCount, 4);
230232
assert.equal(onComplete.withArgs('1', args).callCount, 3);
231233
assert.equal(onComplete.withArgs('2', args).callCount, 3);
@@ -238,6 +240,7 @@ describe('integration tests', () => {
238240
const job = sinon.spy(async () => {
239241
await Promise.delay(3000);
240242
});
243+
const arg1 = 'arg1';
241244
const onComplete = sinon.spy();
242245
const timeoutError = sinon.spy();
243246
const unexpectedError = sinon.spy();
@@ -246,7 +249,7 @@ describe('integration tests', () => {
246249
const id = String(idx % 3);
247250

248251
try {
249-
const result = await queueManager.dlock.fanout(id, 1500, job);
252+
const result = await queueManager.dlock.fanout(id, 1500, job, arg1);
250253
onComplete(result);
251254
} catch (e) {
252255
if (e.message === 'queue-no-response') {
@@ -258,6 +261,7 @@ describe('integration tests', () => {
258261
});
259262

260263
assert.equal(job.callCount, 3);
264+
assert.equal(job.withArgs(arg1).callCount, 3);
261265
assert.equal(onComplete.callCount, 0);
262266
assert.equal(timeoutError.callCount, 10);
263267
assert.equal(timeoutError.withArgs(sinon.match({ message: 'queue-no-response' })).callCount, 10);

package.json

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -44,19 +44,19 @@
4444
"serialize-error": "^7.0.1"
4545
},
4646
"devDependencies": {
47-
"@babel/cli": "^7.10.1",
48-
"@babel/core": "^7.10.2",
47+
"@babel/cli": "^7.10.3",
48+
"@babel/core": "^7.10.3",
4949
"@babel/plugin-transform-strict-mode": "^7.10.1",
5050
"@makeomatic/deploy": "^10.2.1",
5151
"codecov": "^3.7.0",
5252
"cross-env": "^7.0.2",
53-
"eslint": "^7.1.0",
54-
"eslint-config-makeomatic": "^5.0.0",
55-
"eslint-plugin-import": "^2.20.2",
53+
"eslint": "^7.3.1",
54+
"eslint-config-makeomatic": "^5.0.2",
55+
"eslint-plugin-import": "^2.21.2",
5656
"eslint-plugin-promise": "^4.2.1",
5757
"eslint-plugin-unicorn": "^20.1.0",
5858
"ioredis": "^4.17.3",
59-
"jest": "^26.0.1",
59+
"jest": "^26.1.0",
6060
"sinon": "^9.0.2"
6161
},
6262
"engine": {

src/distributed-callback-queue.js

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -185,15 +185,33 @@ class DistributedCallbackQueue {
185185
* @param {String} suffix job key
186186
* @param {Number} [timeout] when job is considered to be failed and error is returned instead
187187
* @param {Function} worker async job to be performed by the party that gets the lock
188+
* @param {Mixed[]} [args] passed on to worker as args
188189
*
189190
* @return {Function} job handler that must be invoked with a worker that returns a promise
190191
*/
191192
async fanout(suffix, ...props) {
192-
// verify we've got correct args in
193-
assert(props.length >= 1 && props.length <= 2);
194-
const worker = props.pop();
195-
const timeout = props.pop();
193+
const propsAmount = props.length;
194+
assert(propsAmount >= 1, 'must have at least job function passed');
195+
196+
// eslint-disable-next-line prefer-const
197+
let [timeout, worker, ...workerArgs] = props;
198+
199+
// in case of 1 arg
200+
switch (propsAmount) {
201+
case 1:
202+
worker = timeout;
203+
timeout = undefined;
204+
break;
205+
default:
206+
if (typeof timeout === 'function') {
207+
workerArgs.unshift(worker);
208+
worker = timeout;
209+
timeout = undefined;
210+
}
211+
}
212+
196213
assert(typeof worker === 'function', 'ensure that you pass a function as a worker');
214+
assert(typeof timeout === 'number' || typeof timeout === 'undefined', 'invalid timeout value');
197215

198216
// allows us to reject-and-halt (eg. on timeout) even if the #push'ed lock has not yet been acquired
199217
let jobAbortReject;
@@ -244,7 +262,7 @@ class DistributedCallbackQueue {
244262
}
245263

246264
// wrap so that we have concept of "cancelling" work
247-
const performWork = worker();
265+
const performWork = worker(...workerArgs);
248266

249267
try {
250268
const result = await Promise.race([

0 commit comments

Comments
 (0)