Skip to content

Commit a719bb7

Browse files
[FME-12059] SDK_UPDATE with metadata
1 parent eaf04b8 commit a719bb7

File tree

9 files changed

+237
-69
lines changed

9 files changed

+237
-69
lines changed

src/readiness/__tests__/readinessManager.spec.ts

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import { EventEmitter } from '../../utils/MinEvents';
33
import { IReadinessManager } from '../types';
44
import { SDK_READY, SDK_UPDATE, SDK_SPLITS_ARRIVED, SDK_SEGMENTS_ARRIVED, SDK_READY_FROM_CACHE, SDK_SPLITS_CACHE_LOADED, SDK_READY_TIMED_OUT } from '../constants';
55
import { ISettings } from '../../types';
6+
import { EventMetadata, SdkUpdateMetadataKeys } from '../../sync/polling/types';
67

78
const settings = {
89
startup: {
@@ -300,3 +301,62 @@ test('READINESS MANAGER / Destroy before it was ready and timedout', (done) => {
300301
}, settingsWithTimeout.startup.readyTimeout * 1.5);
301302

302303
});
304+
305+
test('READINESS MANAGER / SDK_UPDATE should emit with metadata', () => {
306+
const readinessManager = readinessManagerFactory(EventEmitter, settings);
307+
308+
// SDK_READY
309+
readinessManager.splits.emit(SDK_SPLITS_ARRIVED);
310+
readinessManager.segments.emit(SDK_SEGMENTS_ARRIVED);
311+
312+
const metadata: EventMetadata = {
313+
[SdkUpdateMetadataKeys.UPDATED_FLAGS]: ['flag1', 'flag2']
314+
};
315+
316+
let receivedMetadata: EventMetadata | undefined;
317+
readinessManager.gate.on(SDK_UPDATE, (meta: EventMetadata) => {
318+
receivedMetadata = meta;
319+
});
320+
321+
readinessManager.splits.emit(SDK_SPLITS_ARRIVED, metadata);
322+
323+
expect(receivedMetadata).toEqual(metadata);
324+
});
325+
326+
test('READINESS MANAGER / SDK_UPDATE should handle undefined metadata', () => {
327+
const readinessManager = readinessManagerFactory(EventEmitter, settings);
328+
329+
// SDK_READY
330+
readinessManager.splits.emit(SDK_SPLITS_ARRIVED);
331+
readinessManager.segments.emit(SDK_SEGMENTS_ARRIVED);
332+
333+
let receivedMetadata: any;
334+
readinessManager.gate.on(SDK_UPDATE, (meta: EventMetadata) => {
335+
receivedMetadata = meta;
336+
});
337+
338+
readinessManager.splits.emit(SDK_SPLITS_ARRIVED);
339+
340+
expect(receivedMetadata).toBeUndefined();
341+
});
342+
343+
test('READINESS MANAGER / SDK_UPDATE should forward metadata from segments', () => {
344+
const readinessManager = readinessManagerFactory(EventEmitter, settings);
345+
346+
// SDK_READY
347+
readinessManager.splits.emit(SDK_SPLITS_ARRIVED);
348+
readinessManager.segments.emit(SDK_SEGMENTS_ARRIVED);
349+
350+
const metadata: EventMetadata = {
351+
[SdkUpdateMetadataKeys.UPDATED_SEGMENTS]: ['segment1', 'segment2']
352+
};
353+
354+
let receivedMetadata: EventMetadata | undefined;
355+
readinessManager.gate.on(SDK_UPDATE, (meta: EventMetadata) => {
356+
receivedMetadata = meta;
357+
});
358+
359+
readinessManager.segments.emit(SDK_SEGMENTS_ARRIVED, metadata);
360+
361+
expect(receivedMetadata).toEqual(metadata);
362+
});

src/readiness/readinessManager.ts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import { ISettings } from '../types';
33
import SplitIO from '../../types/splitio';
44
import { SDK_SPLITS_ARRIVED, SDK_SPLITS_CACHE_LOADED, SDK_SEGMENTS_ARRIVED, SDK_READY_TIMED_OUT, SDK_READY_FROM_CACHE, SDK_UPDATE, SDK_READY } from './constants';
55
import { IReadinessEventEmitter, IReadinessManager, ISegmentsEventEmitter, ISplitsEventEmitter } from './types';
6+
import { SdkUpdateMetadata } from '../sync/polling/types';
67

78
function splitsEventEmitterFactory(EventEmitter: new () => SplitIO.IEventEmitter): ISplitsEventEmitter {
89
const splitsEventEmitter = objectAssign(new EventEmitter(), {
@@ -15,7 +16,7 @@ function splitsEventEmitterFactory(EventEmitter: new () => SplitIO.IEventEmitter
1516
// `isSplitKill` condition avoids an edge-case of wrongly emitting SDK_READY if:
1617
// - `/memberships` fetch and SPLIT_KILL occurs before `/splitChanges` fetch, and
1718
// - storage has cached splits (for which case `splitsStorage.killLocally` can return true)
18-
splitsEventEmitter.on(SDK_SPLITS_ARRIVED, (isSplitKill: boolean) => { if (!isSplitKill) splitsEventEmitter.splitsArrived = true; });
19+
splitsEventEmitter.on(SDK_SPLITS_ARRIVED, (metadata: SdkUpdateMetadata, isSplitKill: boolean) => { if (!isSplitKill) splitsEventEmitter.splitsArrived = true; });
1920
splitsEventEmitter.once(SDK_SPLITS_CACHE_LOADED, () => { splitsEventEmitter.splitsCacheLoaded = true; });
2021

2122
return splitsEventEmitter;
@@ -98,12 +99,12 @@ export function readinessManagerFactory(
9899
}
99100
}
100101

101-
function checkIsReadyOrUpdate(diff: any) {
102+
function checkIsReadyOrUpdate(metadata: SdkUpdateMetadata) {
102103
if (isDestroyed) return;
103104
if (isReady) {
104105
try {
105106
syncLastUpdate();
106-
gate.emit(SDK_UPDATE, diff);
107+
gate.emit(SDK_UPDATE, metadata);
107108
} catch (e) {
108109
// throws user callback exceptions in next tick
109110
setTimeout(() => { throw e; }, 0);

src/storages/inRedis/__tests__/TelemetryCacheInRedis.spec.ts

Lines changed: 69 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -10,78 +10,88 @@ const latencyKey = `${prefix}.telemetry.latencies`;
1010
const initKey = `${prefix}.telemetry.init`;
1111
const fieldVersionablePrefix = `${metadata.s}/${metadata.n}/${metadata.i}`;
1212

13-
test('TELEMETRY CACHE IN REDIS', async () => {
13+
describe('TELEMETRY CACHE IN REDIS', () => {
14+
let connection: RedisAdapter;
15+
let cache: TelemetryCacheInRedis;
16+
let keysBuilder: KeyBuilderSS;
1417

15-
const keysBuilder = new KeyBuilderSS(prefix, metadata);
16-
const connection = new RedisAdapter(loggerMock);
17-
const cache = new TelemetryCacheInRedis(loggerMock, keysBuilder, connection);
18+
beforeEach(async () => {
19+
keysBuilder = new KeyBuilderSS(prefix, metadata);
20+
connection = new RedisAdapter(loggerMock);
21+
cache = new TelemetryCacheInRedis(loggerMock, keysBuilder, connection);
1822

19-
// recordException
20-
expect(await cache.recordException('tr')).toBe(1);
21-
expect(await cache.recordException('tr')).toBe(2);
22-
expect(await cache.recordException('tcfs')).toBe(1);
23+
await connection.del(exceptionKey);
24+
await connection.del(latencyKey);
25+
await connection.del(initKey);
26+
});
2327

24-
expect(await connection.hget(exceptionKey, fieldVersionablePrefix + '/track')).toBe('2');
25-
expect(await connection.hget(exceptionKey, fieldVersionablePrefix + '/treatment')).toBe(null);
26-
expect(await connection.hget(exceptionKey, fieldVersionablePrefix + '/treatmentsWithConfigByFlagSets')).toBe('1');
28+
test('TELEMETRY CACHE IN REDIS', async () => {
2729

28-
// recordLatency
29-
expect(await cache.recordLatency('tr', 1.6)).toBe(1);
30-
expect(await cache.recordLatency('tr', 1.6)).toBe(2);
31-
expect(await cache.recordLatency('tfs', 1.6)).toBe(1);
30+
// recordException
31+
expect(await cache.recordException('tr')).toBe(1);
32+
expect(await cache.recordException('tr')).toBe(2);
33+
expect(await cache.recordException('tcfs')).toBe(1);
3234

33-
expect(await connection.hget(latencyKey, fieldVersionablePrefix + '/track/2')).toBe('2');
34-
expect(await connection.hget(latencyKey, fieldVersionablePrefix + '/treatment/2')).toBe(null);
35-
expect(await connection.hget(latencyKey, fieldVersionablePrefix + '/treatmentsByFlagSets/2')).toBe('1');
35+
expect(await connection.hget(exceptionKey, fieldVersionablePrefix + '/track')).toBe('2');
36+
expect(await connection.hget(exceptionKey, fieldVersionablePrefix + '/treatment')).toBe(null);
37+
expect(await connection.hget(exceptionKey, fieldVersionablePrefix + '/treatmentsWithConfigByFlagSets')).toBe('1');
3638

37-
// recordConfig
38-
expect(await cache.recordConfig()).toBe(1);
39-
expect(JSON.parse(await connection.hget(initKey, fieldVersionablePrefix) as string)).toEqual({
40-
oM: 1,
41-
st: 'redis',
42-
aF: 0,
43-
rF: 0
44-
});
39+
// recordLatency
40+
expect(await cache.recordLatency('tr', 1.6)).toBe(1);
41+
expect(await cache.recordLatency('tr', 1.6)).toBe(2);
42+
expect(await cache.recordLatency('tfs', 1.6)).toBe(1);
4543

46-
// popLatencies
47-
const latencies = await cache.popLatencies();
48-
latencies.forEach((latency, m) => {
49-
expect(JSON.parse(m)).toEqual(metadata);
50-
expect(latency).toEqual({
51-
tfs: [0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
52-
tr: [0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
53-
});
54-
});
55-
expect(await connection.hget(latencyKey, fieldVersionablePrefix + '/track/2')).toBe(null);
56-
57-
// popExceptions
58-
const exceptions = await cache.popExceptions();
59-
exceptions.forEach((exception, m) => {
60-
expect(JSON.parse(m)).toEqual(metadata);
61-
expect(exception).toEqual({
62-
tcfs: 1,
63-
tr: 2,
64-
});
65-
});
66-
expect(await connection.hget(exceptionKey, fieldVersionablePrefix + '/track')).toBe(null);
44+
expect(await connection.hget(latencyKey, fieldVersionablePrefix + '/track/2')).toBe('2');
45+
expect(await connection.hget(latencyKey, fieldVersionablePrefix + '/treatment/2')).toBe(null);
46+
expect(await connection.hget(latencyKey, fieldVersionablePrefix + '/treatmentsByFlagSets/2')).toBe('1');
6747

68-
// popConfig
69-
const configs = await cache.popConfigs();
70-
configs.forEach((config, m) => {
71-
expect(JSON.parse(m)).toEqual(metadata);
72-
expect(config).toEqual({
48+
// recordConfig
49+
expect(await cache.recordConfig()).toBe(1);
50+
expect(JSON.parse(await connection.hget(initKey, fieldVersionablePrefix) as string)).toEqual({
7351
oM: 1,
7452
st: 'redis',
7553
aF: 0,
7654
rF: 0
7755
});
78-
});
79-
expect(await connection.hget(initKey, fieldVersionablePrefix)).toBe(null);
8056

81-
// pops when there is no data
82-
expect((await cache.popLatencies()).size).toBe(0);
83-
expect((await cache.popExceptions()).size).toBe(0);
84-
expect((await cache.popConfigs()).size).toBe(0);
57+
// popLatencies
58+
const latencies = await cache.popLatencies();
59+
latencies.forEach((latency, m) => {
60+
expect(JSON.parse(m)).toEqual(metadata);
61+
expect(latency).toEqual({
62+
tfs: [0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
63+
tr: [0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
64+
});
65+
});
66+
expect(await connection.hget(latencyKey, fieldVersionablePrefix + '/track/2')).toBe(null);
67+
68+
// popExceptions
69+
const exceptions = await cache.popExceptions();
70+
exceptions.forEach((exception, m) => {
71+
expect(JSON.parse(m)).toEqual(metadata);
72+
expect(exception).toEqual({
73+
tcfs: 1,
74+
tr: 2,
75+
});
76+
});
77+
expect(await connection.hget(exceptionKey, fieldVersionablePrefix + '/track')).toBe(null);
8578

86-
await connection.disconnect();
79+
// popConfig
80+
const configs = await cache.popConfigs();
81+
configs.forEach((config, m) => {
82+
expect(JSON.parse(m)).toEqual(metadata);
83+
expect(config).toEqual({
84+
oM: 1,
85+
st: 'redis',
86+
aF: 0,
87+
rF: 0
88+
});
89+
});
90+
expect(await connection.hget(initKey, fieldVersionablePrefix)).toBe(null);
91+
92+
// pops when there is no data
93+
expect((await cache.popLatencies()).size).toBe(0);
94+
expect((await cache.popExceptions()).size).toBe(0);
95+
expect((await cache.popConfigs()).size).toBe(0);
96+
});
8797
});

src/sync/polling/types.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,3 +31,13 @@ export interface IPollingManagerCS extends IPollingManager {
3131
remove(matchingKey: string): void;
3232
get(matchingKey: string): IMySegmentsSyncTask | undefined
3333
}
34+
35+
export enum SdkUpdateMetadataKeys {
36+
UPDATED_FLAGS = 'updatedFlags',
37+
UPDATED_SEGMENTS = 'updatedSegments'
38+
}
39+
40+
export type SdkUpdateMetadata = {
41+
[SdkUpdateMetadataKeys.UPDATED_FLAGS]?: string[]
42+
[SdkUpdateMetadataKeys.UPDATED_SEGMENTS]?: string[]
43+
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
import { readinessManagerFactory } from '../../../../readiness/readinessManager';
2+
import { SegmentsCacheInMemory } from '../../../../storages/inMemory/SegmentsCacheInMemory';
3+
import { segmentChangesUpdaterFactory } from '../segmentChangesUpdater';
4+
import { fullSettings } from '../../../../utils/settingsValidation/__tests__/settings.mocks';
5+
import { EventEmitter } from '../../../../utils/MinEvents';
6+
import { loggerMock } from '../../../../logger/__tests__/sdkLogger.mock';
7+
import { ISegmentChangesFetcher } from '../../fetchers/types';
8+
import { ISegmentChangesResponse } from '../../../../dtos/types';
9+
import { SDK_SEGMENTS_ARRIVED } from '../../../../readiness/constants';
10+
11+
describe('segmentChangesUpdater', () => {
12+
const segments = new SegmentsCacheInMemory();
13+
const updateSegments = jest.spyOn(segments, 'update');
14+
15+
const readinessManager = readinessManagerFactory(EventEmitter, fullSettings);
16+
const segmentsEmitSpy = jest.spyOn(readinessManager.segments, 'emit');
17+
18+
beforeEach(() => {
19+
jest.clearAllMocks();
20+
segments.clear();
21+
readinessManager.segments.segmentsArrived = false;
22+
});
23+
24+
test('test with segments update - should emit updatedSegments and NOT updatedFlags', async () => {
25+
const segmentName = 'test-segment';
26+
const segmentChange: ISegmentChangesResponse = {
27+
name: segmentName,
28+
added: ['key1', 'key2'],
29+
removed: [],
30+
since: -1,
31+
till: 123
32+
};
33+
34+
const mockSegmentChangesFetcher: ISegmentChangesFetcher = jest.fn().mockResolvedValue([segmentChange]);
35+
36+
const segmentChangesUpdater = segmentChangesUpdaterFactory(
37+
loggerMock,
38+
mockSegmentChangesFetcher,
39+
segments,
40+
readinessManager,
41+
1000,
42+
1
43+
);
44+
45+
segments.registerSegments([segmentName]);
46+
47+
await segmentChangesUpdater(undefined, segmentName);
48+
49+
expect(updateSegments).toHaveBeenCalledWith(segmentName, segmentChange.added, segmentChange.removed, segmentChange.till);
50+
expect(segmentsEmitSpy).toBeCalledWith(SDK_SEGMENTS_ARRIVED, { updatedSegments: [segmentName] });
51+
});
52+
});

0 commit comments

Comments
 (0)