This repository was archived by the owner on Oct 12, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 172
Expand file tree
/
Copy pathcomm.ts
More file actions
265 lines (230 loc) · 8.92 KB
/
comm.ts
File metadata and controls
265 lines (230 loc) · 8.92 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
import {Injectable, NgZone} from '@angular/core';
import {BehaviorSubject} from 'rxjs/BehaviorSubject';
import {Observable} from 'rxjs/Observable';
import {Observer} from 'rxjs/Observer';
import {fromByteArray} from 'base64-js';
export function doAsync<T>(fn: (T) => Observable<any>): any {
return (obs: Observable<T>) => obs
.concatMap(value => fn(value)
.reduce(() => value, value));
}
import 'rxjs/add/observable/concat';
import 'rxjs/add/observable/defer';
import 'rxjs/add/observable/empty';
import 'rxjs/add/observable/from';
import 'rxjs/add/observable/fromEvent';
import 'rxjs/add/observable/merge';
import 'rxjs/add/observable/of';
import 'rxjs/add/observable/timer';
import 'rxjs/add/operator/concatMap';
import 'rxjs/add/operator/do';
import 'rxjs/add/operator/expand';
import 'rxjs/add/operator/filter';
import 'rxjs/add/operator/first';
import 'rxjs/add/operator/let';
import 'rxjs/add/operator/map';
import 'rxjs/add/operator/mergeMap';
import 'rxjs/add/operator/publishReplay';
import 'rxjs/add/operator/reduce';
import 'rxjs/add/operator/share';
import 'rxjs/add/operator/switchMap';
import 'rxjs/add/operator/take';
function fromPromise<T>(promiseFn: (() => Promise<T>)): Observable<T> {
return Observable.create(observer => {
promiseFn()
.then(v => observer.next(v))
.then(() => observer.complete())
.catch(err => observer.error(err));
});
}
export interface UpdateEvent {
type: "pending" | "activation";
version?: string;
}
// A push notification registration, including the endpoint URL and encryption keys.
export class NgPushRegistration {
private ps: PushSubscription;
constructor(ps: any) {
this.ps = ps;
}
// Get the authentication key
auth(): string {
return this.key('auth');
}
key(method: string = 'p256dh'): string {
return fromByteArray(new Uint8Array(this.ps.getKey(method)));
}
get url(): string {
return this.ps.endpoint;
}
toJSON(): Object {
return this.ps.toJSON();
}
unsubscribe(): Observable<boolean> {
// TODO: switch to Observable.fromPromise when it's not broken.
return fromPromise(() => this.ps.unsubscribe());
}
}
@Injectable()
export class NgServiceWorker {
// Typed reference to navigator.serviceWorker.
private container: ServiceWorkerContainer;
// Always returns the current controlling worker, or undefined if there isn't one.
private controllingWorker = new BehaviorSubject<ServiceWorker>(undefined);
// Always returns the current controlling worker, and waits for one to exist
// if it does not.
private awaitSingleControllingWorker: Observable<ServiceWorker>;
push: Observable<any>;
updates: Observable<UpdateEvent>;
constructor(private zone: NgZone) {
// Extract a typed version of navigator.serviceWorker.
this.container = navigator['serviceWorker'] as ServiceWorkerContainer;
// Final Observable that will always give back the current controlling worker,
// and follow changes over time.
Observable
// Combine current and future controllers.
.concat(
// Current controlling worker (if any).
Observable.of(this.container.controller),
// Future changes of the controlling worker.
Observable
// Track changes of the controlling worker via the controllerchange event.
.fromEvent(this.container, 'controllerchange')
// Read the new controller when it changes.
.map(_ => this.container.controller)
)
// Cache the latest controller for immediate delivery.
.subscribe(
worker => this.controllingWorker.next(worker),
err => this.controllingWorker.error(err),
() => this.controllingWorker.complete(),
);
// To make one-off calls to the worker, awaitSingleControllingWorker waits for
// a controlling worker to exist.
this.awaitSingleControllingWorker = this
.controllingWorker
.filter(worker => !!worker)
.take(1);
// Setup the push Observable as a broadcast mechanism for push notifications.
this.push = Observable
.defer(() => this.send({cmd: 'push'}))
.share();
// Setup the updates Observable as a broadcast mechanism for update notifications.
this.updates = Observable
.defer(() => this.send({cmd: 'update'}))
.share();
}
private registrationForWorker(): ((obs: Observable<ServiceWorker>) => Observable<ServiceWorkerRegistration>) {
return (obs: Observable<ServiceWorker>) => obs
.switchMap(worker => fromPromise(() => this.container.getRegistrations())
.expand(regs => Observable.from(regs))
.filter(reg => reg.active === worker)
.take(1)
);
}
// Sends a single message to the worker, and awaits one (or more) responses.
private sendToWorker(worker: ServiceWorker, message: Object): Observable<any> {
// A MessageChannel is sent with the message so responses can be correlated.
const channel = new MessageChannel()
// Observe replies.
const result = new Observable<any>(observer => {
let cancelId = null;
const listener = (event: MessageEvent) => {
const data = event.data;
if (!!data && typeof data === "object" && data.hasOwnProperty('$ngsw') && data.hasOwnProperty('id')) {
cancelId = data['id'];
} else if (data === null) {
observer.complete();
channel.port1.removeEventListener('message', listener);
return;
} else {
observer.next(data);
}
};
channel.port1.addEventListener('message', listener);
return () => {
channel.port1.removeEventListener('message', listener);
this.sendToWorker(worker, {cmd: 'cancel', id: cancelId});
};
})
// The message will be sent before the consumer has a chance to subscribe to
// the response Observable, so publishReplay() records any responses and ensures
// they arrive properly.
.publishReplay();
// Connecting actually creates the event subscription and starts recording
// for replay.
result.connect();
// Start receiving message(s).
channel.port1.start();
// Set a magic value in the message.
message['$ngsw'] = true;
worker.postMessage(message, [channel.port2]);
return result;
}
// Send a message to the current controlling worker, waiting for one if needed.
private send(message: Object): Observable<any> {
return this
// Wait for a controlling worker to exist.
.awaitSingleControllingWorker
// Send the message and await any replies. switchMap is chosen so if a new
// controlling worker arrives somehow, the message will still get through.
.switchMap(worker => this.sendToWorker(worker, message));
}
// Send a 'ping' to the worker. The returned Observable will complete when the worker
// acknowledges the message. This provides a test that the worker is alive and listening.
ping(): Observable<any> {
return this.send({
cmd: 'ping'
});
}
log(): Observable<string> {
return this.send({
cmd: 'log'
});
}
activateUpdate(version: string): Observable<boolean> {
return this.send({
cmd: 'activateUpdate',
version,
});
}
registerForPush(): Observable<NgPushRegistration> {
return this
// Wait for a controlling worker to exist.
.awaitSingleControllingWorker
// Get the ServiceWorkerRegistration for the worker.
.let(this.registrationForWorker())
// Access the PushManager used to control push notifications.
.map((worker: ServiceWorkerRegistration) => worker.pushManager)
.switchMap(pushManager => {
// Create an Observable to wrap the Promises of the PushManager API.
// TODO: switch to Observable.fromPromise when it's not broken.
// This is extracted as a variable so Typescript infers types correctly.
let reg: Observable<NgPushRegistration> = Observable.create(observer => {
// Function that maps subscriptions to an Angular-specific representation.
let regFromSub = (sub: PushSubscription) => new NgPushRegistration(sub);
pushManager
// First, check for an existing subscription.
.getSubscription()
.then(sub => {
// If there is one, we don't need to register, just return it.
if (!!sub) {
return regFromSub(sub);
}
// No existing subscription, register (with userVisibleOnly: true).
return pushManager
.subscribe({userVisibleOnly: true})
.then(regFromSub);
})
// Map from promises to the Observable being returned.
.then(sub => this.zone.run(() => observer.next(sub)))
.then(() => this.zone.run(() => observer.complete()))
.catch(err => this.zone.run(() => observer.error(err)));
});
return reg;
});
}
checkForUpdate(): Observable<boolean> {
return this.send({cmd: 'checkUpdate'});
}
}