forked from FirebaseExtended/firebase-queue
-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathqueue.js
More file actions
116 lines (96 loc) · 3.24 KB
/
queue.js
File metadata and controls
116 lines (96 loc) · 3.24 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
'use strict'
const QueueWorker = require('./queue_worker.js')
module.exports = Queue
/**
*
* @param {{
* tasksRef: any,
* processTask: any,
* reportError: any,
* options?: {
* spec?: {
* finishedState?: string | null,
* startState?: null,
* inProgressState?: string | undefined,
* errorState?: string | undefined
* }
* numWorkers?: number
* }
* }} x
*/
function Queue({
tasksRef,
processTask,
reportError,
options: {
spec: {
startState = null,
inProgressState = 'in_progress',
finishedState = null,
errorState = 'error'
} = {},
numWorkers = 1
} = {}
}) {
if (!(this instanceof Queue)) throw new Error('You forgot the `new` keyword: `new Queue(...)`')
const spec = { startState, inProgressState, finishedState, errorState }
check(tasksRef, isFirebaseRef,
'tasksRef must be a Firebase reference')
check(processTask, isFunction,
'processTask must be a function')
check(reportError, isFunction,
'reportError must be a function')
check(inProgressState, isString,
'options.spec.inProgressState must be a string')
check(startState, isNull, [isString, not(inProgressState)],
'options.spec.startState must be null or a string that !== inProgressState')
check(finishedState, isNull, [isString, not(inProgressState), not(startState)],
'options.spec.finishedState must be null or a string that !== inProgressState and !== startState')
check(errorState, [isString, not(inProgressState), not(startState), not(finishedState)],
'options.spec.errorState must be a string that !== inProgressState and !== startState and !== finishedState')
check(numWorkers, isPositiveInteger,
'options.numWorkers must be a positive integer')
const queueId = tasksRef.push().key
let shutdownStarted = null
let removeWorkers = createWorkers()
this.shutdown = shutdown
async function shutdown() {
if (shutdownStarted) return shutdownStarted
shutdownStarted = removeWorkers()
removeWorkers = null // make sure no references to workers are being kept and allow garbage collection
return shutdownStarted
}
function createWorkers() {
const workers = [...Array(numWorkers).keys()].map(createWorker)
return async () => {
await Promise.all(workers.map(worker => worker.shutdown()))
}
function createWorker(index) {
return new QueueWorker({
processId: `${queueId}:${index}`,
tasksRef,
spec,
processTask,
reportError
})
}
}
function isFunction(x) { return typeof x === 'function' }
function isFirebaseRef(x) { return x && [x.on, x.off, x.transaction, x.orderByChild, x.push].every(isFunction) }
function isString(x) { return typeof x === 'string' }
function isNull(x) { return x === null }
function not(y) { return x => x !== y }
function isPositiveInteger(x) { return typeof x === 'number' && x >= 1 && x % 1 === 0 }
function check(val, ...rest) {
const message = rest[rest.length - 1]
const or = rest.slice(0, rest.length -1)
const valid = or.reduce(
(result, and) => result || [].concat(and).reduce(
(result, isValid) => result && isValid(val),
true
),
false
)
if (!valid) throw new Error(message)
}
}