-
Notifications
You must be signed in to change notification settings - Fork 10
Expand file tree
/
Copy pathdriveWatcher.js
More file actions
101 lines (93 loc) · 3.02 KB
/
driveWatcher.js
File metadata and controls
101 lines (93 loc) · 3.02 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
const { EventEmitter } = require('events')
const low = require('last-one-wins')
const pump = require('pump')
const streamx = require('streamx')
class DriveWatcher extends EventEmitter {
constructor (client, drive, opts = {}) {
super()
this.client = client
this.drive = drive
this.recursive = !!opts.recursive
this.drivesByPath = new Map([[ '/', drive ]])
this.versionsByPath = new Map()
this.unwatchesByPath = new Map()
this.watchers = []
this.timer = null
this.emittingStats = false
}
_createDiffer (path, drive) {
// Last-one-wins in case the watch is triggered many times in quick succession.
const self = this
return low(onupdate)
async function onupdate (_, cb) {
const lastVersion = self.versionsByPath.get(path)
try {
const diffStream = await drive.createDiffStream(lastVersion)
const currentVersion = await drive.version()
self.versionsByPath.set(path, currentVersion)
return pump(diffStream, new streamx.Transform({
transform: (data, cb) => {
for (const watcher of self.watchers) {
watcher(p.join(path, data.name))
}
return cb(null)
}
}), err => {
if (err) return cb(err)
return cb(null)
})
} catch (err) {
return cb(err)
}
}
}
async _emitStats () {
if (this.emittingStats) return
this.emittingStats = true
var total = 0
var downloaded = 0
var peers = 0
for (const [path, drive] of this.drivesByPath) {
const driveStats = await drive.stats()
for (const { path, content } of driveStats.stats) {
if (path !== '/' || !content) continue
downloaded += content.downloadedBlocks
total += content.totalBlocks
peers = content.peers
}
}
this.emit('stats', { total, downloaded, peers })
this.emittingStats = false
}
async start () {
// TODO: Handle dynamic (un)mounting.
this.versionsByPath.set('/', await this.drive.version())
this.unwatchesByPath.set('/', this.drive.watch('/', this._createDiffer('/', this.drive)))
const allMounts = await this.drive.mounts({ memory: false, recursive: this.recursive })
for (const { path, opts } of allMounts) {
if (path === '/') continue
const childDrive = await this.client.drive.get({ key: opts.key })
this.drivesByPath.set(path, childDrive)
this.versionsByPath.set(path, opts.version)
this.unwatchesByPath.set(path, childDrive.watch('/', this._createDiffer(path, childDrive)))
}
this.timer = setInterval(this._emitStats.bind(this), 1000)
}
watch (_, onwatch) {
// The watch path is ignored for drives.
this.watchers.push(onwatch)
return () => {
this.watchers.splice(this.watchers.indexOf(onwatch), 1)
}
}
async close () {
for (const [path, unwatch] of this.unwatchesByPath) {
await unwatch()
}
if (this.timer) {
clearInterval(this.timer)
this.timer = null
}
}
}
module.exports = DriveWatcher