forked from splitio/javascript-commons
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsegmentChangesUpdater.ts
More file actions
151 lines (141 loc) · 5.88 KB
/
segmentChangesUpdater.ts
File metadata and controls
151 lines (141 loc) · 5.88 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
import { ISegmentChangesFetcher } from '../fetchers/types';
import { ISegmentsCacheBase } from '../../../storages/types';
import { IReadinessManager } from '../../../readiness/types';
import { MaybeThenable } from '../../../dtos/types';
import { findIndex } from '../../../utils/lang';
import { SDK_SEGMENTS_ARRIVED } from '../../../readiness/constants';
import { ILogger } from '../../../logger/types';
import {
LOG_PREFIX_INSTANTIATION,
LOG_PREFIX_SYNC_SEGMENTS,
} from '../../../logger/constants';
import { thenable } from '../../../utils/promise/thenable';
type ISegmentChangesUpdater = (
fetchOnlyNew?: boolean,
segmentName?: string,
noCache?: boolean,
till?: number
) => Promise<boolean>;
/**
* Factory of SegmentChanges updater, a task that:
* - fetches segment changes using `segmentChangesFetcher`
* - updates `segmentsCache`
* - uses `segmentsEventEmitter` to emit events related to segments data updates
*
* @param log logger instance
* @param segmentChangesFetcher fetcher of `/segmentChanges`
* @param segments segments storage, with sync or async methods
* @param readiness optional readiness manager. Not required for synchronizer or producer mode.
*/
export function segmentChangesUpdaterFactory(
log: ILogger,
segmentChangesFetcher: ISegmentChangesFetcher,
segments: ISegmentsCacheBase,
numConcurrentSegmentFetches: number,
readiness?: IReadinessManager
): ISegmentChangesUpdater {
let readyOnAlreadyExistentState = true;
async function updateSegment(
segmentName: string,
noCache?: boolean,
till?: number,
fetchOnlyNew?: boolean
) {
log.debug(`${LOG_PREFIX_SYNC_SEGMENTS}Processing segment ${segmentName}`);
const since = await segments.getChangeNumber(segmentName);
// if fetchOnlyNew flag, avoid processing already fetched segments
if (fetchOnlyNew && since !== -1) return -1;
const changes = await segmentChangesFetcher(
since,
segmentName,
noCache,
till
);
let changeNumber = -1;
const results: MaybeThenable<boolean | void>[] = [];
changes.forEach((x) => {
if (x.added.length > 0)
results.push(segments.addToSegment(segmentName, x.added));
if (x.removed.length > 0)
results.push(segments.removeFromSegment(segmentName, x.removed));
if (x.added.length > 0 || x.removed.length > 0) {
results.push(segments.setChangeNumber(segmentName, x.till));
changeNumber = x.till;
}
log.debug(
`${LOG_PREFIX_SYNC_SEGMENTS}Processed ${segmentName} with till = ${x.till}. Added: ${x.added.length}. Removed: ${x.removed.length}`
);
});
// If at least one storage operation result is a promise, join all in a single promise.
if (results.some((result) => thenable(result)))
return Promise.all(results).then(() => changeNumber);
return changeNumber;
}
/**
* Segments updater returns a promise that resolves with a `false` boolean value if it fails at least to fetch a segment or synchronize it with the storage.
* Thus, a false result doesn't imply that SDK_SEGMENTS_ARRIVED was not emitted.
* Returned promise will not be rejected.
*
* @param {boolean | undefined} fetchOnlyNew if true, only fetch the segments that not exists, i.e., which `changeNumber` is equal to -1.
* This param is used by SplitUpdateWorker on server-side SDK, to fetch new registered segments on SPLIT_UPDATE notifications.
* @param {string | undefined} segmentName segment name to fetch. By passing `undefined` it fetches the list of segments registered at the storage
* @param {boolean | undefined} noCache true to revalidate data to fetch on a SEGMENT_UPDATE notifications.
* @param {number | undefined} till till target for the provided segmentName, for CDN bypass.
*/
return async function segmentChangesUpdater(
fetchOnlyNew?: boolean,
segmentName?: string,
noCache?: boolean,
till?: number
) {
log.debug(`${LOG_PREFIX_SYNC_SEGMENTS}Started segments update`);
// If not a segment name provided, read list of available segments names to be updated.
const segmentNames = await (segmentName
? [segmentName]
: segments.getRegisteredSegments());
try {
let shouldUpdateFlags: number[] = [];
// chunk in order to avoid an unbounded amount of simultaneous segment fetch requests
const chunkSize = numConcurrentSegmentFetches;
for (let i = 0; i < segmentNames.length; i += chunkSize) {
const chunk = segmentNames.slice(i, i + chunkSize);
shouldUpdateFlags = shouldUpdateFlags.concat(
await Promise.all(
chunk.map((segmentName) =>
updateSegment(segmentName, noCache, till, fetchOnlyNew)
)
)
);
}
// if at least one segment fetch succeeded, mark segments ready
if (
findIndex(shouldUpdateFlags, (v) => v !== -1) !== -1 ||
readyOnAlreadyExistentState
) {
readyOnAlreadyExistentState = false;
if (readiness) readiness.segments.emit(SDK_SEGMENTS_ARRIVED);
}
return true;
} catch (error) {
// Handles rejected promises at `segmentChangesFetcher`, `segments.getRegisteredSegments` and other segment storage operations.
if (
error &&
typeof error === 'object' &&
'statusCode' in error &&
(error as any).statusCode === 403
) {
// If the operation is forbidden, it may be due to permissions. Destroy the SDK instance.
// @TODO although factory status is destroyed, synchronization is not stopped
if (readiness) readiness.destroy();
log.error(
`${LOG_PREFIX_INSTANTIATION}: you passed a client-side type authorizationKey, please grab an SDK Key from the Split user interface that is of type server-side.`
);
} else {
log.warn(
`${LOG_PREFIX_SYNC_SEGMENTS}Error while doing fetch of segments. ${error}`
);
}
return false;
}
};
}