Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions src/sync/streaming/UpdateWorkers/SplitsUpdateWorker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@ import { IUpdateWorker } from './types';
*/
export function SplitsUpdateWorker(log: ILogger, storage: IStorageSync, splitsSyncTask: ISplitsSyncTask, splitsEventEmitter: ISplitsEventEmitter, telemetryTracker: ITelemetryTracker, segmentsSyncTask?: ISegmentsSyncTask): IUpdateWorker<[updateData: ISplitUpdateData]> & { killSplit(event: ISplitKillData): void } {

const ff = SplitsUpdateWorker(storage.splits);
const rbs = SplitsUpdateWorker(storage.rbSegments);

function SplitsUpdateWorker(cache: ISplitsCacheSync | IRBSegmentsCacheSync) {
let maxChangeNumber = 0;
let maxChangeNumber = -1;
let handleNewEvent = false;
let isHandlingEvent: boolean;
let cdnBypass: boolean;
Expand All @@ -44,8 +47,7 @@ export function SplitsUpdateWorker(log: ILogger, storage: IStorageSync, splitsSy

const attempts = backoff.attempts + 1;

// @TODO and with RBS and FF
if (maxChangeNumber <= cache.getChangeNumber()) {
if (ff.isSync() && rbs.isSync()) {
log.debug(`Refresh completed${cdnBypass ? ' bypassing the CDN' : ''} in ${attempts} attempts.`);
isHandlingEvent = false;
return;
Expand Down Expand Up @@ -97,13 +99,13 @@ export function SplitsUpdateWorker(log: ILogger, storage: IStorageSync, splitsSy
stop() {
isHandlingEvent = false;
backoff.reset();
},
isSync() {
return maxChangeNumber <= cache.getChangeNumber();
}
};
}

const ff = SplitsUpdateWorker(storage.splits);
const rbs = SplitsUpdateWorker(storage.rbSegments);

return {
put(parsedData) {
if (parsedData.d && parsedData.c !== undefined) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// @ts-nocheck
import { SDK_SPLITS_ARRIVED } from '../../../../readiness/constants';
import { SplitsCacheInMemory } from '../../../../storages/inMemory/SplitsCacheInMemory';
import { RBSegmentsCacheInMemory } from '../../../../storages/inMemory/RBSegmentsCacheInMemory';
import { SplitsUpdateWorker } from '../SplitsUpdateWorker';
import { FETCH_BACKOFF_MAX_RETRIES } from '../constants';
import { loggerMock } from '../../../../logger/__tests__/sdkLogger.mock';
Expand Down Expand Up @@ -53,19 +54,26 @@ const telemetryTracker = telemetryTrackerFactory(); // no-op telemetry tracker

describe('SplitsUpdateWorker', () => {

const storage = {
splits: new SplitsCacheInMemory(),
rbSegments: new RBSegmentsCacheInMemory()
};

afterEach(() => { // restore
Backoff.__TEST__BASE_MILLIS = undefined;
Backoff.__TEST__MAX_MILLIS = undefined;

storage.splits.clear();
storage.rbSegments.clear();
});

test('put', async () => {

// setup
const cache = new SplitsCacheInMemory();
const splitsSyncTask = splitsSyncTaskMock(cache);
const splitsSyncTask = splitsSyncTaskMock(storage.splits);

Backoff.__TEST__BASE_MILLIS = 1; // retry immediately
const splitUpdateWorker = SplitsUpdateWorker(loggerMock, { splits: cache }, splitsSyncTask, telemetryTracker);
const splitUpdateWorker = SplitsUpdateWorker(loggerMock, storage, splitsSyncTask, telemetryTracker);

// assert calling `splitsSyncTask.execute` if `isExecuting` is false
expect(splitsSyncTask.isExecuting()).toBe(false);
Expand Down Expand Up @@ -102,9 +110,8 @@ describe('SplitsUpdateWorker', () => {
test('put, backoff', async () => {
// setup
Backoff.__TEST__BASE_MILLIS = 50;
const cache = new SplitsCacheInMemory();
const splitsSyncTask = splitsSyncTaskMock(cache, [90, 90, 90]);
const splitUpdateWorker = SplitsUpdateWorker(loggerMock, { splits: cache }, splitsSyncTask, telemetryTracker);
const splitsSyncTask = splitsSyncTaskMock(storage.splits, [90, 90, 90]);
const splitUpdateWorker = SplitsUpdateWorker(loggerMock, storage, splitsSyncTask, telemetryTracker);

// while fetch fails, should retry with backoff
splitUpdateWorker.put({ changeNumber: 100 });
Expand All @@ -121,9 +128,8 @@ describe('SplitsUpdateWorker', () => {
// setup
Backoff.__TEST__BASE_MILLIS = 10; // 10 millis instead of 10 sec
Backoff.__TEST__MAX_MILLIS = 60; // 60 millis instead of 1 min
const cache = new SplitsCacheInMemory();
const splitsSyncTask = splitsSyncTaskMock(cache, [...Array(FETCH_BACKOFF_MAX_RETRIES).fill(90), 90, 100]); // 12 executions. Last one is valid
const splitUpdateWorker = SplitsUpdateWorker(loggerMock, { splits: cache }, splitsSyncTask, telemetryTracker);
const splitsSyncTask = splitsSyncTaskMock(storage.splits, [...Array(FETCH_BACKOFF_MAX_RETRIES).fill(90), 90, 100]); // 12 executions. Last one is valid
const splitUpdateWorker = SplitsUpdateWorker(loggerMock, storage, splitsSyncTask, telemetryTracker);

splitUpdateWorker.put({ changeNumber: 100 }); // queued

Expand All @@ -146,9 +152,8 @@ describe('SplitsUpdateWorker', () => {
// setup
Backoff.__TEST__BASE_MILLIS = 10; // 10 millis instead of 10 sec
Backoff.__TEST__MAX_MILLIS = 60; // 60 millis instead of 1 min
const cache = new SplitsCacheInMemory();
const splitsSyncTask = splitsSyncTaskMock(cache, Array(FETCH_BACKOFF_MAX_RETRIES * 2).fill(90)); // 20 executions. No one is valid
const splitUpdateWorker = SplitsUpdateWorker(loggerMock, { splits: cache }, splitsSyncTask, telemetryTracker);
const splitsSyncTask = splitsSyncTaskMock(storage.splits, Array(FETCH_BACKOFF_MAX_RETRIES * 2).fill(90)); // 20 executions. No one is valid
const splitUpdateWorker = SplitsUpdateWorker(loggerMock, storage, splitsSyncTask, telemetryTracker);

splitUpdateWorker.put({ changeNumber: 100 }); // queued

Expand All @@ -168,18 +173,17 @@ describe('SplitsUpdateWorker', () => {

test('killSplit', async () => {
// setup
const cache = new SplitsCacheInMemory();
cache.addSplit({ name: 'something' });
cache.addSplit({ name: 'something else' });
storage.splits.addSplit({ name: 'something' });
storage.splits.addSplit({ name: 'something else' });

const splitsSyncTask = splitsSyncTaskMock(cache);
const splitUpdateWorker = SplitsUpdateWorker(loggerMock, { splits: cache }, splitsSyncTask, splitsEventEmitterMock, telemetryTracker);
const splitsSyncTask = splitsSyncTaskMock(storage.splits);
const splitUpdateWorker = SplitsUpdateWorker(loggerMock, storage, splitsSyncTask, splitsEventEmitterMock, telemetryTracker);

// assert killing split locally, emitting SDK_SPLITS_ARRIVED event, and synchronizing splits if changeNumber is new
splitUpdateWorker.killSplit({ changeNumber: 100, splitName: 'something', defaultTreatment: 'off' }); // splitsCache.killLocally is synchronous
expect(splitsSyncTask.execute).toBeCalledTimes(1); // synchronizes splits if `isExecuting` is false
expect(splitsEventEmitterMock.emit.mock.calls).toEqual([[SDK_SPLITS_ARRIVED, true]]); // emits `SDK_SPLITS_ARRIVED` with `isSplitKill` flag in true, if split kill resolves with update
assertKilledSplit(cache, 100, 'something', 'off');
assertKilledSplit(storage.splits, 100, 'something', 'off');

// assert not killing split locally, not emitting SDK_SPLITS_ARRIVED event, and not synchronizes splits, if changeNumber is old
splitsSyncTask.__resolveSplitsUpdaterCall(100);
Expand All @@ -192,15 +196,14 @@ describe('SplitsUpdateWorker', () => {
expect(splitsSyncTask.execute).toBeCalledTimes(0); // doesn't synchronize splits if killLocally resolved without update
expect(splitsEventEmitterMock.emit).toBeCalledTimes(0); // doesn't emit `SDK_SPLITS_ARRIVED` if killLocally resolved without update

assertKilledSplit(cache, 100, 'something', 'off'); // calling `killLocally` with an old changeNumber made no effect
assertKilledSplit(storage.splits, 100, 'something', 'off'); // calling `killLocally` with an old changeNumber made no effect
});

test('stop', async () => {
// setup
const cache = new SplitsCacheInMemory();
const splitsSyncTask = splitsSyncTaskMock(cache, [95]);
const splitsSyncTask = splitsSyncTaskMock(storage.splits, [95]);
Backoff.__TEST__BASE_MILLIS = 1;
const splitUpdateWorker = SplitsUpdateWorker(loggerMock, { splits: cache }, splitsSyncTask, telemetryTracker);
const splitUpdateWorker = SplitsUpdateWorker(loggerMock, storage, splitsSyncTask, telemetryTracker);

splitUpdateWorker.put({ changeNumber: 100 });

Expand All @@ -212,11 +215,10 @@ describe('SplitsUpdateWorker', () => {

test('put, avoid fetching if payload sent', async () => {

const cache = new SplitsCacheInMemory();
splitNotifications.forEach(notification => {
const pcn = cache.getChangeNumber();
const splitsSyncTask = splitsSyncTaskMock(cache);
const splitUpdateWorker = SplitsUpdateWorker(loggerMock, { splits: cache }, splitsSyncTask, telemetryTracker);
const pcn = storage.splits.getChangeNumber();
const splitsSyncTask = splitsSyncTaskMock(storage.splits);
const splitUpdateWorker = SplitsUpdateWorker(loggerMock, storage, splitsSyncTask, telemetryTracker);
const payload = notification.decoded;
const changeNumber = payload.changeNumber;
splitUpdateWorker.put({ changeNumber, pcn, d: notification.data, c: notification.compression }); // queued
Expand All @@ -226,17 +228,15 @@ describe('SplitsUpdateWorker', () => {
});

test('put, ccn and pcn validation for IFF', () => {
const cache = new SplitsCacheInMemory();

// ccn = 103 & pcn = 104: Something was missed -> fetch split changes
let ccn = 103;
let pcn = 104;
let changeNumber = 105;
cache.setChangeNumber(ccn);
storage.splits.setChangeNumber(ccn);
const notification = splitNotifications[0];

let splitsSyncTask = splitsSyncTaskMock(cache);
let splitUpdateWorker = SplitsUpdateWorker(loggerMock, { splits: cache }, splitsSyncTask, telemetryTracker);
let splitsSyncTask = splitsSyncTaskMock(storage.splits);
let splitUpdateWorker = SplitsUpdateWorker(loggerMock, storage, splitsSyncTask, telemetryTracker);
splitUpdateWorker.put({ changeNumber, pcn, d: notification.data, c: notification.compression });
expect(splitsSyncTask.execute).toBeCalledTimes(1);
expect(splitsSyncTask.execute.mock.calls[0]).toEqual([true, undefined, undefined]);
Expand All @@ -246,10 +246,10 @@ describe('SplitsUpdateWorker', () => {
ccn = 110;
pcn = 0;
changeNumber = 111;
cache.setChangeNumber(ccn);
storage.splits.setChangeNumber(ccn);

splitsSyncTask = splitsSyncTaskMock(cache);
splitUpdateWorker = SplitsUpdateWorker(loggerMock, { splits: cache }, splitsSyncTask, telemetryTracker);
splitsSyncTask = splitsSyncTaskMock(storage.splits);
splitUpdateWorker = SplitsUpdateWorker(loggerMock, storage, splitsSyncTask, telemetryTracker);
splitUpdateWorker.put({ changeNumber, pcn, d: notification.data, c: notification.compression });
expect(splitsSyncTask.execute).toBeCalledTimes(1);
expect(splitsSyncTask.execute.mock.calls[0]).toEqual([true, undefined, undefined]);
Expand All @@ -259,10 +259,10 @@ describe('SplitsUpdateWorker', () => {
ccn = 120;
pcn = 120;
changeNumber = 121;
cache.setChangeNumber(ccn);
storage.splits.setChangeNumber(ccn);

splitsSyncTask = splitsSyncTaskMock(cache);
splitUpdateWorker = SplitsUpdateWorker(loggerMock, { splits: cache }, splitsSyncTask, telemetryTracker);
splitsSyncTask = splitsSyncTaskMock(storage.splits);
splitUpdateWorker = SplitsUpdateWorker(loggerMock, storage, splitsSyncTask, telemetryTracker);
splitUpdateWorker.put({ changeNumber, pcn, d: notification.data, c: notification.compression });
expect(splitsSyncTask.execute).toBeCalledTimes(1);
expect(splitsSyncTask.execute.mock.calls[0]).toEqual([true, undefined, { payload: notification.decoded, changeNumber }]);
Expand Down