-
Notifications
You must be signed in to change notification settings - Fork 7
Expand file tree
/
Copy pathsegmentChangesUpdater.ts
More file actions
105 lines (95 loc) · 5.46 KB
/
segmentChangesUpdater.ts
File metadata and controls
105 lines (95 loc) · 5.46 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
import { ISegmentChangesFetcher } from '../fetchers/types';
import { ISegmentsCacheBase } from '../../../storages/types';
import { IReadinessManager } from '../../../readiness/types';
import { SDK_SEGMENTS_ARRIVED } from '../../../readiness/constants';
import { ILogger } from '../../../logger/types';
import { LOG_PREFIX_INSTANTIATION, LOG_PREFIX_SYNC_SEGMENTS } from '../../../logger/constants';
import { timeout } from '../../../utils/promise/timeout';
type ISegmentChangesUpdater = (fetchOnlyNew?: boolean, segmentName?: string, noCache?: boolean, till?: number) => Promise<boolean>
/**
* Factory of SegmentChanges updater, a task that:
* - fetches segment changes using `segmentChangesFetcher`
* - updates `segmentsCache`
* - uses `segmentsEventEmitter` to emit events related to segments data updates
*
* @param log - logger instance
* @param segmentChangesFetcher - fetcher of `/segmentChanges`
* @param segments - segments storage, with sync or async methods
* @param readiness - optional readiness manager. Not required for synchronizer or producer mode.
*/
export function segmentChangesUpdaterFactory(
log: ILogger,
segmentChangesFetcher: ISegmentChangesFetcher,
segments: ISegmentsCacheBase,
readiness?: IReadinessManager,
requestTimeoutBeforeReady?: number,
retriesOnFailureBeforeReady?: number,
): ISegmentChangesUpdater {
let readyOnAlreadyExistentState = true;
function _promiseDecorator<T>(promise: Promise<T>) {
if (readyOnAlreadyExistentState && requestTimeoutBeforeReady) promise = timeout(requestTimeoutBeforeReady, promise);
return promise;
}
function updateSegment(segmentName: string, noCache?: boolean, till?: number, fetchOnlyNew?: boolean, retries?: number): Promise<boolean> {
log.debug(`${LOG_PREFIX_SYNC_SEGMENTS}Processing segment ${segmentName}`);
const sincePromise = Promise.resolve(segments.getChangeNumber(segmentName));
return sincePromise.then(since => {
// if fetchOnlyNew flag, avoid processing already fetched segments
return fetchOnlyNew && since !== undefined ?
false :
segmentChangesFetcher(since || -1, segmentName, noCache, till, _promiseDecorator).then((changes) => {
return Promise.all(changes.map(x => {
log.debug(`${LOG_PREFIX_SYNC_SEGMENTS}Processing ${segmentName} with till = ${x.till}. Added: ${x.added.length}. Removed: ${x.removed.length}`);
return segments.update(segmentName, x.added, x.removed, x.till);
})).then((updates) => {
return updates.some(update => update);
});
}).catch(error => {
if (retries) {
log.warn(`${LOG_PREFIX_SYNC_SEGMENTS}Retrying fetch of segment ${segmentName} (attempt #${retries}). Reason: ${error}`);
return updateSegment(segmentName, noCache, till, fetchOnlyNew, retries - 1);
}
throw error;
});
});
}
/**
* Segments updater returns a promise that resolves with a `false` boolean value if it fails at least to fetch a segment or synchronize it with the storage.
* Thus, a false result doesn't imply that SDK_SEGMENTS_ARRIVED was not emitted.
* Returned promise will not be rejected.
*
* @param fetchOnlyNew - if true, only fetch the segments that not exists, i.e., which `changeNumber` is equal to -1.
* This param is used by SplitUpdateWorker on server-side SDK, to fetch new registered segments on SPLIT_UPDATE or RB_SEGMENT_UPDATE notifications.
* @param segmentName - segment name to fetch. By passing `undefined` it fetches the list of segments registered at the storage
* @param noCache - true to revalidate data to fetch on a SEGMENT_UPDATE notifications.
* @param till - till target for the provided segmentName, for CDN bypass.
*/
return function segmentChangesUpdater(fetchOnlyNew?: boolean, segmentName?: string, noCache?: boolean, till?: number) {
log.debug(`${LOG_PREFIX_SYNC_SEGMENTS}Started segments update`);
// If not a segment name provided, read list of available segments names to be updated.
let segmentsPromise = Promise.resolve(segmentName ? [segmentName] : segments.getRegisteredSegments());
return segmentsPromise.then(segmentNames => {
const updaters = segmentNames.map(segmentName => updateSegment(segmentName, noCache, till, fetchOnlyNew, readyOnAlreadyExistentState ? retriesOnFailureBeforeReady : 0));
return Promise.all(updaters).then(shouldUpdateFlags => {
// if at least one segment fetch succeeded, mark segments ready
if (shouldUpdateFlags.some(update => update) || readyOnAlreadyExistentState) {
readyOnAlreadyExistentState = false;
if (readiness) readiness.segments.emit(SDK_SEGMENTS_ARRIVED);
}
return true;
});
})
// Handles rejected promises at `segmentChangesFetcher`, `segments.getRegisteredSegments` and other segment storage operations.
.catch(error => {
if (error && error.statusCode === 403) {
// If the operation is forbidden, it may be due to permissions. Destroy the SDK instance.
// @TODO although factory status is destroyed, synchronization is not stopped
if (readiness) readiness.setDestroyed();
log.error(`${LOG_PREFIX_INSTANTIATION}: you passed a client-side type authorizationKey, please grab an SDK Key from the Split user interface that is of type server-side.`);
} else {
log.warn(`${LOG_PREFIX_SYNC_SEGMENTS}Error while doing fetch of segments. ${error}`);
}
return false;
});
};
}