diff --git a/packages/core/src/__tests__/api.test.ts b/packages/core/src/__tests__/api.test.ts index f968a3201..ce7aaf021 100644 --- a/packages/core/src/__tests__/api.test.ts +++ b/packages/core/src/__tests__/api.test.ts @@ -25,7 +25,11 @@ describe('#sendEvents', () => { .mockReturnValue('2001-01-01T00:00:00.000Z'); }); - async function sendAnEventPer(writeKey: string, toUrl: string) { + async function sendAnEventPer( + writeKey: string, + toUrl: string, + retryCount?: number + ) { const mockResponse = Promise.resolve('MANOS'); // eslint-disable-next-line @typescript-eslint/ban-ts-comment // @ts-ignore @@ -60,9 +64,19 @@ describe('#sendEvents', () => { writeKey: writeKey, url: toUrl, events: [event], + retryCount, }); - expect(fetch).toHaveBeenCalledWith(toUrl, { + return event; + } + + it('sends an event', async () => { + const toSegmentBatchApi = 'https://api.segment.io/v1.b'; + const writeKey = 'SEGMENT_KEY'; + + const event = await sendAnEventPer(writeKey, toSegmentBatchApi); + + expect(fetch).toHaveBeenCalledWith(toSegmentBatchApi, { method: 'POST', keepalive: true, body: JSON.stringify({ @@ -72,21 +86,67 @@ describe('#sendEvents', () => { }), headers: { 'Content-Type': 'application/json; charset=utf-8', + 'X-Retry-Count': '0', }, }); - } - - it('sends an event', async () => { - const toSegmentBatchApi = 'https://api.segment.io/v1.b'; - const writeKey = 'SEGMENT_KEY'; - - await sendAnEventPer(writeKey, toSegmentBatchApi); }); it('sends an event to proxy', async () => { const toProxyUrl = 'https://myprox.io/b'; const writeKey = 'SEGMENT_KEY'; - await sendAnEventPer(writeKey, toProxyUrl); + const event = await sendAnEventPer(writeKey, toProxyUrl); + + expect(fetch).toHaveBeenCalledWith(toProxyUrl, { + method: 'POST', + body: JSON.stringify({ + batch: [event], + sentAt: '2001-01-01T00:00:00.000Z', + writeKey: 'SEGMENT_KEY', + }), + headers: { + 'Content-Type': 'application/json; charset=utf-8', + 'X-Retry-Count': '0', + }, + keepalive: true, + }); + }); + + it('sends X-Retry-Count header with default value 0', async () => { + const url = 'https://api.segment.io/v1.b'; + await sendAnEventPer('KEY', url); + + expect(fetch).toHaveBeenCalledWith( + url, + expect.objectContaining({ + headers: expect.objectContaining({ + 'X-Retry-Count': '0', + }), + }) + ); + }); + + it('sends X-Retry-Count header with provided retry count', async () => { + const url = 'https://api.segment.io/v1.b'; + await sendAnEventPer('KEY', url, 5); + + expect(fetch).toHaveBeenCalledWith( + url, + expect.objectContaining({ + headers: expect.objectContaining({ + 'X-Retry-Count': '5', + }), + }) + ); + }); + + it('sends X-Retry-Count as string format', async () => { + const url = 'https://api.segment.io/v1.b'; + await sendAnEventPer('KEY', url, 42); + + const callArgs = (fetch as jest.Mock).mock.calls[0]; + const headers = callArgs[1].headers; + expect(typeof headers['X-Retry-Count']).toBe('string'); + expect(headers['X-Retry-Count']).toBe('42'); }); }); diff --git a/packages/core/src/__tests__/internal/fetchSettings.test.ts b/packages/core/src/__tests__/internal/fetchSettings.test.ts index f061afe5a..4599b55bf 100644 --- a/packages/core/src/__tests__/internal/fetchSettings.test.ts +++ b/packages/core/src/__tests__/internal/fetchSettings.test.ts @@ -1,5 +1,5 @@ import { SegmentClient } from '../../analytics'; -import { settingsCDN } from '../../constants'; +import { settingsCDN, defaultHttpConfig } from '../../constants'; import { SEGMENT_DESTINATION_KEY } from '../../plugins/SegmentDestination'; import { getMockLogger, MockSegmentStore } from '../../test-helpers'; import { getURL } from '../../util'; @@ -436,6 +436,80 @@ describe('internal #getSettings', () => { }); }); + describe('CDN integrations validation', () => { + it('treats null integrations as empty (no integrations configured)', async () => { + (fetch as jest.MockedFunction).mockResolvedValueOnce({ + ok: true, + json: () => Promise.resolve({ integrations: null }), + status: 200, + } as Response); + + const client = new SegmentClient(clientArgs); + await client.fetchSettings(); + + expect(setSettingsSpy).toHaveBeenCalledWith({}); + }); + + it('treats missing integrations as empty (no integrations configured)', async () => { + (fetch as jest.MockedFunction).mockResolvedValueOnce({ + ok: true, + json: () => Promise.resolve({}), + status: 200, + } as Response); + + const client = new SegmentClient(clientArgs); + await client.fetchSettings(); + + expect(setSettingsSpy).toHaveBeenCalledWith({}); + }); + + it('falls back to defaults when CDN returns integrations as an array', async () => { + (fetch as jest.MockedFunction).mockResolvedValueOnce({ + ok: true, + json: () => Promise.resolve({ integrations: ['invalid'] }), + status: 200, + } as Response); + + const client = new SegmentClient(clientArgs); + await client.fetchSettings(); + + expect(setSettingsSpy).toHaveBeenCalledWith( + defaultIntegrationSettings.integrations + ); + }); + + it('falls back to defaults when CDN returns integrations as a string', async () => { + (fetch as jest.MockedFunction).mockResolvedValueOnce({ + ok: true, + json: () => Promise.resolve({ integrations: 'invalid' }), + status: 200, + } as Response); + + const client = new SegmentClient(clientArgs); + await client.fetchSettings(); + + expect(setSettingsSpy).toHaveBeenCalledWith( + defaultIntegrationSettings.integrations + ); + }); + + it('stores empty integrations when CDN returns null integrations and no defaults', async () => { + (fetch as jest.MockedFunction).mockResolvedValueOnce({ + ok: true, + json: () => Promise.resolve({ integrations: null }), + status: 200, + } as Response); + + const client = new SegmentClient({ + ...clientArgs, + config: { ...clientArgs.config, defaultSettings: undefined }, + }); + await client.fetchSettings(); + + expect(setSettingsSpy).toHaveBeenCalledWith({}); + }); + }); + describe('httpConfig extraction', () => { it('extracts httpConfig from CDN response and merges with defaults', async () => { const serverHttpConfig = { @@ -483,7 +557,7 @@ describe('internal #getSettings', () => { expect(result?.backoffConfig?.jitterPercent).toBe(20); }); - it('returns undefined httpConfig when CDN has no httpConfig', async () => { + it('returns defaultHttpConfig when CDN has no httpConfig', async () => { (fetch as jest.MockedFunction).mockResolvedValueOnce({ ok: true, json: () => Promise.resolve(defaultIntegrationSettings), @@ -496,7 +570,17 @@ describe('internal #getSettings', () => { }); await anotherClient.fetchSettings(); - expect(anotherClient.getHttpConfig()).toBeUndefined(); + const result = anotherClient.getHttpConfig(); + expect(result).toBeDefined(); + expect(result?.rateLimitConfig?.enabled).toBe( + defaultHttpConfig.rateLimitConfig!.enabled + ); + expect(result?.backoffConfig?.enabled).toBe( + defaultHttpConfig.backoffConfig!.enabled + ); + expect(result?.backoffConfig?.statusCodeOverrides).toEqual( + defaultHttpConfig.backoffConfig!.statusCodeOverrides + ); }); it('returns undefined httpConfig when fetch fails', async () => { diff --git a/packages/core/src/analytics.ts b/packages/core/src/analytics.ts index 55e229329..d26d071db 100644 --- a/packages/core/src/analytics.ts +++ b/packages/core/src/analytics.ts @@ -202,8 +202,8 @@ export class SegmentClient { } /** - * Retrieves the server-side httpConfig from CDN settings. - * Returns undefined if the CDN did not provide httpConfig (retry features disabled). + * Retrieves the merged httpConfig (defaultHttpConfig ← CDN ← config overrides). + * Returns undefined only if settings have not yet been fetched. */ getHttpConfig(): HttpConfig | undefined { return this.httpConfig; @@ -406,8 +406,13 @@ export class SegmentClient { const resJson: SegmentAPISettings = (await res.json()) as SegmentAPISettings; + // A valid 200 with missing integrations means "no integrations configured" + // Only fall back to defaults for truly malformed types (non-object or array) + if (resJson.integrations == null) { + resJson.integrations = {}; + } + if ( - resJson.integrations == null || typeof resJson.integrations !== 'object' || Array.isArray(resJson.integrations) ) { @@ -429,25 +434,27 @@ export class SegmentClient { resJson.middlewareSettings?.routingRules ?? [] ); - // Extract httpConfig from CDN, merge with defaults, validate and clamp - if (resJson.httpConfig) { - const mergedRateLimit = resJson.httpConfig.rateLimitConfig - ? { - ...defaultHttpConfig.rateLimitConfig!, - ...resJson.httpConfig.rateLimitConfig, - } - : defaultHttpConfig.rateLimitConfig!; - - const mergedBackoff = resJson.httpConfig.backoffConfig - ? { - ...defaultHttpConfig.backoffConfig!, - ...resJson.httpConfig.backoffConfig, - statusCodeOverrides: { - ...defaultHttpConfig.backoffConfig!.statusCodeOverrides, - ...resJson.httpConfig.backoffConfig.statusCodeOverrides, - }, - } - : defaultHttpConfig.backoffConfig!; + // Merge httpConfig: defaultHttpConfig ← CDN ← config overrides + { + const cdnConfig = resJson.httpConfig ?? {}; + const clientConfig = this.config.httpConfig ?? {}; + + const mergedRateLimit = { + ...defaultHttpConfig.rateLimitConfig!, + ...(cdnConfig.rateLimitConfig ?? {}), + ...(clientConfig.rateLimitConfig ?? {}), + }; + + const mergedBackoff = { + ...defaultHttpConfig.backoffConfig!, + ...(cdnConfig.backoffConfig ?? {}), + ...(clientConfig.backoffConfig ?? {}), + statusCodeOverrides: { + ...defaultHttpConfig.backoffConfig!.statusCodeOverrides, + ...(cdnConfig.backoffConfig?.statusCodeOverrides ?? {}), + ...(clientConfig.backoffConfig?.statusCodeOverrides ?? {}), + }, + }; const validatedRateLimit = validateRateLimitConfig( mergedRateLimit, @@ -461,7 +468,9 @@ export class SegmentClient { validatedRateLimit ), }; - this.logger.info('Loaded httpConfig from CDN settings.'); + if (resJson.httpConfig) { + this.logger.info('Loaded httpConfig from CDN settings.'); + } } this.logger.info('Received settings from Segment succesfully.'); diff --git a/packages/core/src/api.ts b/packages/core/src/api.ts index 6aa2851a7..8853234e7 100644 --- a/packages/core/src/api.ts +++ b/packages/core/src/api.ts @@ -4,10 +4,12 @@ export const uploadEvents = async ({ writeKey, url, events, + retryCount = 0, }: { writeKey: string; url: string; events: SegmentEvent[]; + retryCount?: number; }) => { return await fetch(url, { method: 'POST', @@ -19,6 +21,7 @@ export const uploadEvents = async ({ }), headers: { 'Content-Type': 'application/json; charset=utf-8', + 'X-Retry-Count': retryCount.toString(), }, }); }; diff --git a/packages/core/src/backoff/RetryManager.ts b/packages/core/src/backoff/RetryManager.ts index c418d350b..240ba7976 100644 --- a/packages/core/src/backoff/RetryManager.ts +++ b/packages/core/src/backoff/RetryManager.ts @@ -29,8 +29,9 @@ export type RetryResult = 'rate_limited' | 'backed_off' | 'limit_exceeded'; * - BACKING_OFF: transient error; exponential backoff until wait expires * * Designed for concurrent batch uploads (Promise.all). Multiple batches can - * fail simultaneously with different errors or partially succeed. The retry - * strategy (eager/lazy) controls how concurrent wait times are consolidated. + * fail simultaneously with different errors or partially succeed. When + * concurrent wait times conflict, the shorter wait is used (eager strategy) + * to retry sooner. * * Uses a global retry counter since batches are re-chunked from the event * queue on each flush and have no stable identities. @@ -40,20 +41,17 @@ export class RetryManager { private rateLimitConfig?: RateLimitConfig; private backoffConfig?: BackoffConfig; private logger?: LoggerType; - private retryStrategy: 'eager' | 'lazy'; constructor( storeId: string, persistor: Persistor | undefined, rateLimitConfig?: RateLimitConfig, backoffConfig?: BackoffConfig, - logger?: LoggerType, - retryStrategy: 'eager' | 'lazy' = 'lazy' + logger?: LoggerType ) { this.rateLimitConfig = rateLimitConfig; this.backoffConfig = backoffConfig; this.logger = logger; - this.retryStrategy = retryStrategy; try { this.store = createStore( @@ -293,14 +291,11 @@ export class RetryManager { } /** - * Consolidate two wait-until times based on retry strategy. - * - 'lazy': take the longer wait (most conservative, default) - * - 'eager': take the shorter wait (retry sooner) + * Consolidate two wait-until times using the eager strategy: + * take the shorter wait to retry sooner. */ private applyRetryStrategy(existing: number, incoming: number): number { - return this.retryStrategy === 'eager' - ? Math.min(existing, incoming) - : Math.max(existing, incoming); + return Math.min(existing, incoming); } private async transitionToReady(): Promise { diff --git a/packages/core/src/backoff/__tests__/RetryManager.test.ts b/packages/core/src/backoff/__tests__/RetryManager.test.ts index 705ddf475..189d9e83b 100644 --- a/packages/core/src/backoff/__tests__/RetryManager.test.ts +++ b/packages/core/src/backoff/__tests__/RetryManager.test.ts @@ -166,7 +166,7 @@ describe('RetryManager', () => { expect(await rm.getRetryCount()).toBe(2); }); - it('uses longest retry-after when multiple 429s occur', async () => { + it('uses shortest retry-after when multiple 429s occur (eager)', async () => { const now = 1000000; jest.spyOn(Date, 'now').mockReturnValue(now); @@ -181,11 +181,8 @@ describe('RetryManager', () => { await rm.handle429(60); await rm.handle429(120); - // Should wait 120s, not 60s + // Eager: should use 60s (shortest) jest.spyOn(Date, 'now').mockReturnValue(now + 61000); - expect(await rm.canRetry()).toBe(false); - - jest.spyOn(Date, 'now').mockReturnValue(now + 121000); expect(await rm.canRetry()).toBe(true); }); @@ -391,20 +388,22 @@ describe('RetryManager', () => { jest.spyOn(Date, 'now').mockReturnValue(now + 400); expect(await rm.canRetry()).toBe(false); - // Second error before first expires: 0.5 * 2^1 = 1s - jest.spyOn(Date, 'now').mockReturnValue(now + 400); + // Advance past first wait → transitions to READY (retryCount preserved) + jest.spyOn(Date, 'now').mockReturnValue(now + 600); + expect(await rm.canRetry()).toBe(true); + + // Second error: 0.5 * 2^1 = 1s (retryCount is still 1) await rm.handleTransientError(); - // Should now wait for the 1s from second error - jest.spyOn(Date, 'now').mockReturnValue(now + 1300); - expect(await rm.canRetry()).toBe(false); jest.spyOn(Date, 'now').mockReturnValue(now + 1500); + expect(await rm.canRetry()).toBe(false); + jest.spyOn(Date, 'now').mockReturnValue(now + 1700); expect(await rm.canRetry()).toBe(true); }); it('clamps backoff to maxBackoffInterval', async () => { - const now = 1000000; - jest.spyOn(Date, 'now').mockReturnValue(now); + let t = 1000000; + jest.spyOn(Date, 'now').mockReturnValue(t); const config: BackoffConfig = { ...defaultBackoffConfig, @@ -418,16 +417,21 @@ describe('RetryManager', () => { mockLogger ); - // Retry many times to exceed maxBackoffInterval - // Without moving time forward so they accumulate - for (let i = 0; i < 10; i++) { + // Advance retryCount high enough that unclamped backoff would exceed 5s + // (0.5 * 2^4 = 8s > 5s). Advance time between errors to avoid eager min. + for (let i = 0; i < 5; i++) { await rm.handleTransientError(); + t += 10000; + jest.spyOn(Date, 'now').mockReturnValue(t); + await rm.canRetry(); // transition to READY } - // Should be clamped to 5s - jest.spyOn(Date, 'now').mockReturnValue(now + 4000); + // Next error: 0.5 * 2^5 = 16s, clamped to 5s + await rm.handleTransientError(); + + jest.spyOn(Date, 'now').mockReturnValue(t + 4000); expect(await rm.canRetry()).toBe(false); - jest.spyOn(Date, 'now').mockReturnValue(now + 6000); + jest.spyOn(Date, 'now').mockReturnValue(t + 6000); expect(await rm.canRetry()).toBe(true); }); @@ -506,12 +510,11 @@ describe('RetryManager', () => { }); }); - describe('retryStrategy', () => { - it('defaults to lazy (uses longest wait time)', async () => { + describe('concurrent wait time consolidation', () => { + it('uses shortest wait time when multiple 429s occur (eager)', async () => { const now = 1000000; jest.spyOn(Date, 'now').mockReturnValue(now); - // No retryStrategy passed → defaults to 'lazy' const rm = new RetryManager( 'test-key', mockPersistor, @@ -523,36 +526,12 @@ describe('RetryManager', () => { await rm.handle429(60); await rm.handle429(120); - // Lazy: should use 120s (longest) - jest.spyOn(Date, 'now').mockReturnValue(now + 61000); - expect(await rm.canRetry()).toBe(false); - - jest.spyOn(Date, 'now').mockReturnValue(now + 121000); - expect(await rm.canRetry()).toBe(true); - }); - - it('eager strategy uses shortest wait time', async () => { - const now = 1000000; - jest.spyOn(Date, 'now').mockReturnValue(now); - - const rm = new RetryManager( - 'test-key', - mockPersistor, - defaultRateLimitConfig, - defaultBackoffConfig, - mockLogger, - 'eager' - ); - - await rm.handle429(60); - await rm.handle429(120); - // Eager: should use 60s (shortest) jest.spyOn(Date, 'now').mockReturnValue(now + 61000); expect(await rm.canRetry()).toBe(true); }); - it('lazy strategy uses longest wait time', async () => { + it('uses shortest wait time for transient errors too', async () => { const now = 1000000; jest.spyOn(Date, 'now').mockReturnValue(now); @@ -561,32 +540,7 @@ describe('RetryManager', () => { mockPersistor, defaultRateLimitConfig, defaultBackoffConfig, - mockLogger, - 'lazy' - ); - - await rm.handle429(60); - await rm.handle429(120); - - // Lazy: should use 120s (longest) - jest.spyOn(Date, 'now').mockReturnValue(now + 61000); - expect(await rm.canRetry()).toBe(false); - - jest.spyOn(Date, 'now').mockReturnValue(now + 121000); - expect(await rm.canRetry()).toBe(true); - }); - - it('eager strategy applies to transient errors too', async () => { - const now = 1000000; - jest.spyOn(Date, 'now').mockReturnValue(now); - - const rm = new RetryManager( - 'test-key', - mockPersistor, - defaultRateLimitConfig, - defaultBackoffConfig, - mockLogger, - 'eager' + mockLogger ); // First transient: 0.5 * 2^0 = 0.5s → wait until now + 500ms @@ -763,7 +717,7 @@ describe('RetryManager', () => { }); describe('mixed 429 and transient errors', () => { - it('429 wait time takes precedence over shorter transient backoff', async () => { + it('state stays RATE_LIMITED but wait uses shorter time (eager)', async () => { const now = 1000000; jest.spyOn(Date, 'now').mockReturnValue(now); @@ -775,20 +729,18 @@ describe('RetryManager', () => { mockLogger ); - // Get a 429 first + // Get a 429 first → RATE_LIMITED, waitUntilTime = now + 60s await rm.handle429(60); expect(await rm.getRetryCount()).toBe(1); - // Then a transient error before 429 expires + // Transient error at t=10s → backoff = 0.5*2^1 = 1s → waitUntil = now+11s + // Eager: min(now+60s, now+11s) = now+11s, state stays RATE_LIMITED jest.spyOn(Date, 'now').mockReturnValue(now + 10000); await rm.handleTransientError(); expect(await rm.getRetryCount()).toBe(2); - // Should use the longest wait time (429's 60s) - jest.spyOn(Date, 'now').mockReturnValue(now + 50000); - expect(await rm.canRetry()).toBe(false); - - jest.spyOn(Date, 'now').mockReturnValue(now + 61000); + // Eager picks shorter wait, so retryable after ~11s from original now + jest.spyOn(Date, 'now').mockReturnValue(now + 12000); expect(await rm.canRetry()).toBe(true); }); }); diff --git a/packages/core/src/constants.ts b/packages/core/src/constants.ts index 79b216d3d..feb5985f2 100644 --- a/packages/core/src/constants.ts +++ b/packages/core/src/constants.ts @@ -10,7 +10,6 @@ export const defaultConfig: Config = { trackAppLifecycleEvents: false, autoAddSegmentDestination: true, useSegmentEndpoints: false, - retryStrategy: 'lazy', }; export const defaultHttpConfig: HttpConfig = { diff --git a/packages/core/src/errors.ts b/packages/core/src/errors.ts index 0dd05938d..c55fab691 100644 --- a/packages/core/src/errors.ts +++ b/packages/core/src/errors.ts @@ -18,6 +18,7 @@ export enum ErrorType { InitializationError, ResetError, FlushError, + EventsDropped, } /** @@ -27,13 +28,20 @@ export class SegmentError extends Error { type: ErrorType; message: string; innerError?: unknown; + metadata?: Record; - constructor(type: ErrorType, message: string, innerError?: unknown) { + constructor( + type: ErrorType, + message: string, + innerError?: unknown, + metadata?: Record + ) { super(message); Object.setPrototypeOf(this, SegmentError.prototype); this.type = type; this.message = message; this.innerError = innerError; + this.metadata = metadata; } } diff --git a/packages/core/src/plugins/QueueFlushingPlugin.ts b/packages/core/src/plugins/QueueFlushingPlugin.ts index 1580ee288..fa92c9c9a 100644 --- a/packages/core/src/plugins/QueueFlushingPlugin.ts +++ b/packages/core/src/plugins/QueueFlushingPlugin.ts @@ -15,12 +15,12 @@ export class QueueFlushingPlugin extends UtilityPlugin { type = PluginType.after; private storeKey: string; - private isPendingUpload = false; private queueStore: Store<{ events: SegmentEvent[] }> | undefined; private onFlush: (events: SegmentEvent[]) => Promise; private isRestoredResolve: () => void; private isRestored: Promise; private timeoutWarned = false; + private flushPromise?: Promise; /** * @param onFlush callback to execute when the queue is flushed (either by reaching the limit or manually) e.g. code to upload events to your destination @@ -63,16 +63,35 @@ export class QueueFlushingPlugin extends UtilityPlugin { async execute(event: SegmentEvent): Promise { await this.queueStore?.dispatch((state) => { - const events = [...state.events, event]; + const stampedEvent = { ...event, _queuedAt: Date.now() }; + const events = [...state.events, stampedEvent]; return { events }; }); return event; } /** - * Calls the onFlush callback with the events in the queue + * Calls the onFlush callback with the events in the queue. + * Ensures only one flush operation runs at a time. */ async flush() { + if (this.flushPromise) { + this.analytics?.logger.info( + 'Flush already in progress, waiting for completion' + ); + await this.flushPromise; + return; + } + + this.flushPromise = this._doFlush(); + try { + await this.flushPromise; + } finally { + this.flushPromise = undefined; + } + } + + private async _doFlush(): Promise { // Wait for the queue to be restored try { await this.isRestored; @@ -103,33 +122,24 @@ export class QueueFlushingPlugin extends UtilityPlugin { } const events = (await this.queueStore?.getState(true))?.events ?? []; - if (!this.isPendingUpload) { - try { - this.isPendingUpload = true; - await this.onFlush(events); - } finally { - this.isPendingUpload = false; - } - } + await this.onFlush(events); } - /** - * Removes one or multiple events from the queue - * @param events events to remove - */ - async dequeue(events: SegmentEvent | SegmentEvent[]) { + async dequeueByMessageIds(messageIds: string[]): Promise { await this.queueStore?.dispatch((state) => { - const eventsToRemove = Array.isArray(events) ? events : [events]; - - if (eventsToRemove.length === 0 || state.events.length === 0) { + if (messageIds.length === 0 || state.events.length === 0) { return state; } - const setToRemove = new Set(eventsToRemove); - const filteredEvents = state.events.filter((e) => !setToRemove.has(e)); + const idsToRemove = new Set(messageIds); + const filteredEvents = state.events.filter( + (e) => e.messageId == null || !idsToRemove.has(e.messageId) + ); + return { events: filteredEvents }; }); } + /** * Clear all events from the queue */ diff --git a/packages/core/src/plugins/SegmentDestination.ts b/packages/core/src/plugins/SegmentDestination.ts index cc7e911e6..ef4c36aca 100644 --- a/packages/core/src/plugins/SegmentDestination.ts +++ b/packages/core/src/plugins/SegmentDestination.ts @@ -1,5 +1,6 @@ import { DestinationPlugin } from '../plugin'; import { + HttpConfig, PluginType, SegmentAPIIntegration, SegmentAPISettings, @@ -11,20 +12,45 @@ import { uploadEvents } from '../api'; import type { SegmentClient } from '../analytics'; import { DestinationMetadataEnrichment } from './DestinationMetadataEnrichment'; import { QueueFlushingPlugin } from './QueueFlushingPlugin'; -import { defaultApiHost } from '../constants'; -import { checkResponseForErrors, translateHTTPError } from '../errors'; -import { defaultConfig } from '../constants'; +import { defaultApiHost, defaultConfig } from '../constants'; +import { + SegmentError, + ErrorType, + translateHTTPError, + classifyError, + parseRetryAfter, +} from '../errors'; +import { RetryManager } from '../backoff/RetryManager'; +import type { RetryResult } from '../backoff'; const MAX_EVENTS_PER_BATCH = 100; const MAX_PAYLOAD_SIZE_IN_KB = 500; export const SEGMENT_DESTINATION_KEY = 'Segment.io'; +type BatchResult = { + batch: SegmentEvent[]; + messageIds: string[]; + status: 'success' | '429' | 'transient' | 'permanent' | 'network_error'; + statusCode?: number; + retryAfterSeconds?: number; +}; + +type ErrorAggregation = { + successfulMessageIds: string[]; + rateLimitResults: BatchResult[]; + hasTransientError: boolean; + permanentErrorMessageIds: string[]; + retryableMessageIds: string[]; +}; + export class SegmentDestination extends DestinationPlugin { type = PluginType.destination; key = SEGMENT_DESTINATION_KEY; private apiHost?: string; + private httpConfig?: HttpConfig; private settingsResolve: () => void; private settingsPromise: Promise; + private retryManager?: RetryManager; constructor() { super(); @@ -34,9 +60,184 @@ export class SegmentDestination extends DestinationPlugin { this.settingsResolve = resolve; } + private async uploadBatch(batch: SegmentEvent[]): Promise { + const config = this.analytics?.getConfig() ?? defaultConfig; + const messageIds = batch + .map((e) => e.messageId) + .filter((id): id is string => id !== undefined && id !== ''); + + const retryCount = this.retryManager + ? await this.retryManager.getRetryCount() + : 0; + + const cleanedBatch = batch.map(({ _queuedAt, ...event }) => event); + + try { + const res = await uploadEvents({ + writeKey: config.writeKey, + url: this.getEndpoint(), + events: cleanedBatch as SegmentEvent[], + retryCount, + }); + + if (res.ok) { + return { batch, messageIds, status: 'success', statusCode: res.status }; + } + + const retryAfterSeconds = + res.status === 429 + ? parseRetryAfter( + res.headers.get('Retry-After'), + this.httpConfig?.rateLimitConfig?.maxRetryInterval + ) + : undefined; + + const classification = classifyError(res.status, { + default4xxBehavior: this.httpConfig?.backoffConfig?.default4xxBehavior, + default5xxBehavior: this.httpConfig?.backoffConfig?.default5xxBehavior, + statusCodeOverrides: + this.httpConfig?.backoffConfig?.statusCodeOverrides, + rateLimitEnabled: this.httpConfig?.rateLimitConfig?.enabled, + }); + + if (classification.errorType === 'rate_limit') { + return { + batch, + messageIds, + status: '429', + statusCode: res.status, + retryAfterSeconds: retryAfterSeconds ?? 60, + }; + } else if (classification.errorType === 'transient') { + return { + batch, + messageIds, + status: 'transient', + statusCode: res.status, + }; + } else { + return { + batch, + messageIds, + status: 'permanent', + statusCode: res.status, + }; + } + } catch (e) { + this.analytics?.reportInternalError(translateHTTPError(e)); + return { batch, messageIds, status: 'network_error' }; + } + } + + private aggregateErrors(results: BatchResult[]): ErrorAggregation { + const aggregation: ErrorAggregation = { + successfulMessageIds: [], + rateLimitResults: [], + hasTransientError: false, + permanentErrorMessageIds: [], + retryableMessageIds: [], + }; + + for (const result of results) { + switch (result.status) { + case 'success': + aggregation.successfulMessageIds.push(...result.messageIds); + break; + case '429': + aggregation.rateLimitResults.push(result); + aggregation.retryableMessageIds.push(...result.messageIds); + break; + case 'transient': + case 'network_error': + aggregation.hasTransientError = true; + aggregation.retryableMessageIds.push(...result.messageIds); + break; + case 'permanent': + aggregation.permanentErrorMessageIds.push(...result.messageIds); + break; + } + } + + return aggregation; + } + + /** + * Drop events whose _queuedAt exceeds maxTotalBackoffDuration. + * Returns the remaining fresh events. + */ + private async pruneExpiredEvents( + events: SegmentEvent[] + ): Promise { + const maxAge = this.httpConfig?.backoffConfig?.maxTotalBackoffDuration ?? 0; + if (maxAge <= 0) { + return events; + } + + const now = Date.now(); + const maxAgeMs = maxAge * 1000; + const expiredMessageIds: string[] = []; + const freshEvents: SegmentEvent[] = []; + + for (const event of events) { + if (event._queuedAt !== undefined && now - event._queuedAt > maxAgeMs) { + if (event.messageId !== undefined && event.messageId !== '') { + expiredMessageIds.push(event.messageId); + } + } else { + freshEvents.push(event); + } + } + + if (expiredMessageIds.length > 0) { + await this.queuePlugin.dequeueByMessageIds(expiredMessageIds); + this.analytics?.reportInternalError( + new SegmentError( + ErrorType.EventsDropped, + `Dropped ${expiredMessageIds.length} events exceeding max age (${maxAge}s)`, + undefined, + { droppedCount: expiredMessageIds.length, reason: 'max_age_exceeded' } + ) + ); + this.analytics?.logger.warn( + `Pruned ${expiredMessageIds.length} events older than ${maxAge}s` + ); + } + + return freshEvents; + } + + /** + * Update retry state based on aggregated batch results. + * 429 takes precedence over transient errors. + * Returns true if retry limits were exceeded (caller should drop events). + */ + private async updateRetryState( + aggregation: ErrorAggregation + ): Promise { + if (!this.retryManager) { + return false; + } + + const has429 = aggregation.rateLimitResults.length > 0; + let result: RetryResult | undefined; + + if (has429) { + for (const r of aggregation.rateLimitResults) { + result = await this.retryManager.handle429(r.retryAfterSeconds ?? 60); + } + } else if (aggregation.hasTransientError) { + result = await this.retryManager.handleTransientError(); + } else if (aggregation.successfulMessageIds.length > 0) { + await this.retryManager.reset(); + } + + return result === 'limit_exceeded'; + } + private sendEvents = async (events: SegmentEvent[]): Promise => { if (events.length === 0) { - return Promise.resolve(); + await this.retryManager?.reset(); + return; } // We're not sending events until Segment has loaded all settings @@ -44,46 +245,92 @@ export class SegmentDestination extends DestinationPlugin { const config = this.analytics?.getConfig() ?? defaultConfig; - const chunkedEvents: SegmentEvent[][] = chunk( + events = await this.pruneExpiredEvents(events); + if (events.length === 0) { + await this.retryManager?.reset(); + return; + } + + if (this.retryManager && !(await this.retryManager.canRetry())) { + this.analytics?.logger.info('Upload blocked by retry manager'); + return; + } + + const batches: SegmentEvent[][] = chunk( events, config.maxBatchSize ?? MAX_EVENTS_PER_BATCH, MAX_PAYLOAD_SIZE_IN_KB ); - let sentEvents: SegmentEvent[] = []; - let numFailedEvents = 0; - - await Promise.all( - chunkedEvents.map(async (batch: SegmentEvent[]) => { - try { - const res = await uploadEvents({ - writeKey: config.writeKey, - url: this.getEndpoint(), - events: batch, - }); - checkResponseForErrors(res); - sentEvents = sentEvents.concat(batch); - } catch (e) { - this.analytics?.reportInternalError(translateHTTPError(e)); - this.analytics?.logger.warn(e); - numFailedEvents += batch.length; - } finally { - await this.queuePlugin.dequeue(sentEvents); - } - }) + const results: BatchResult[] = await Promise.all( + batches.map((batch) => this.uploadBatch(batch)) ); - if (sentEvents.length) { + const aggregation = this.aggregateErrors(results); + + const limitExceeded = await this.updateRetryState(aggregation); + + if (aggregation.successfulMessageIds.length > 0) { + await this.queuePlugin.dequeueByMessageIds( + aggregation.successfulMessageIds + ); if (config.debug === true) { - this.analytics?.logger.info(`Sent ${sentEvents.length} events`); + this.analytics?.logger.info( + `Sent ${aggregation.successfulMessageIds.length} events` + ); } } - if (numFailedEvents) { - this.analytics?.logger.error(`Failed to send ${numFailedEvents} events.`); + if (aggregation.permanentErrorMessageIds.length > 0) { + await this.queuePlugin.dequeueByMessageIds( + aggregation.permanentErrorMessageIds + ); + this.analytics?.reportInternalError( + new SegmentError( + ErrorType.EventsDropped, + `Dropped ${aggregation.permanentErrorMessageIds.length} events due to permanent errors`, + undefined, + { + droppedCount: aggregation.permanentErrorMessageIds.length, + reason: 'permanent_error', + } + ) + ); + this.analytics?.logger.error( + `Dropped ${aggregation.permanentErrorMessageIds.length} events due to permanent errors` + ); + } + + if (limitExceeded && aggregation.retryableMessageIds.length > 0) { + await this.queuePlugin.dequeueByMessageIds( + aggregation.retryableMessageIds + ); + this.analytics?.reportInternalError( + new SegmentError( + ErrorType.EventsDropped, + `Dropped ${aggregation.retryableMessageIds.length} events due to retry limit exceeded`, + undefined, + { + droppedCount: aggregation.retryableMessageIds.length, + reason: 'retry_limit_exceeded', + } + ) + ); + this.analytics?.logger.error( + `Dropped ${aggregation.retryableMessageIds.length} events due to retry limit exceeded` + ); } - return Promise.resolve(); + const failedCount = + events.length - + aggregation.successfulMessageIds.length - + aggregation.permanentErrorMessageIds.length; + if (failedCount > 0) { + const has429 = aggregation.rateLimitResults.length > 0; + this.analytics?.logger.warn( + `${failedCount} events will retry (429: ${has429}, transient: ${aggregation.hasTransientError})` + ); + } }; private readonly queuePlugin = new QueueFlushingPlugin(this.sendEvents); @@ -95,7 +342,7 @@ export class SegmentDestination extends DestinationPlugin { let baseURL = ''; let endpoint = ''; if (hasProxy) { - //baseURL is always config?.proxy if hasProxy + //baseURL is always config?.proxy if hasProxy baseURL = config?.proxy ?? ''; if (useSegmentEndpoints) { const isProxyEndsWithSlash = baseURL.endsWith('/'); @@ -111,12 +358,15 @@ export class SegmentDestination extends DestinationPlugin { return defaultApiHost; } } + configure(analytics: SegmentClient): void { super.configure(analytics); + const config = analytics.getConfig(); + // If the client has a proxy we don't need to await for settings apiHost, we can send events directly // Important! If new settings are required in the future you probably want to change this! - if (analytics.getConfig().proxy !== undefined) { + if (config.proxy !== undefined) { this.settingsResolve(); } @@ -137,13 +387,32 @@ export class SegmentDestination extends DestinationPlugin { //assign the api host from segment settings (domain/v1) this.apiHost = `https://${segmentSettings.apiHost}/b`; } + + const httpConfig = this.analytics?.getHttpConfig(); + if (httpConfig) { + this.httpConfig = httpConfig; + + if ( + !this.retryManager && + (httpConfig.rateLimitConfig || httpConfig.backoffConfig) + ) { + const config = this.analytics?.getConfig(); + this.retryManager = new RetryManager( + config?.writeKey ?? '', + config?.storePersistor, + httpConfig.rateLimitConfig, + httpConfig.backoffConfig, + this.analytics?.logger + ); + } + } + this.settingsResolve(); } execute(event: SegmentEvent): Promise { // Execute the internal timeline here, the queue plugin will pick up the event and add it to the queue automatically - const enrichedEvent = super.execute(event); - return enrichedEvent; + return super.execute(event); } async flush() { diff --git a/packages/core/src/plugins/__tests__/QueueFlushingPlugin.test.ts b/packages/core/src/plugins/__tests__/QueueFlushingPlugin.test.ts index b3b02d802..1a483f73d 100644 --- a/packages/core/src/plugins/__tests__/QueueFlushingPlugin.test.ts +++ b/packages/core/src/plugins/__tests__/QueueFlushingPlugin.test.ts @@ -60,6 +60,7 @@ describe('QueueFlushingPlugin', () => { const event: SegmentEvent = { type: EventType.TrackEvent, event: 'test2', + messageId: 'msg-dequeue-1', properties: { test: 'test2', }, @@ -72,7 +73,7 @@ describe('QueueFlushingPlugin', () => { // eslint-disable-next-line @typescript-eslint/ban-ts-comment // @ts-ignore expect(queuePlugin.queueStore?.getState().events).toHaveLength(1); - await queuePlugin.dequeue(event); + await queuePlugin.dequeueByMessageIds(['msg-dequeue-1']); // eslint-disable-next-line @typescript-eslint/ban-ts-comment // @ts-ignore expect(queuePlugin.queueStore?.getState().events).toHaveLength(0); @@ -111,6 +112,7 @@ describe('QueueFlushingPlugin', () => { const event1: SegmentEvent = { type: EventType.TrackEvent, event: 'test1', + messageId: 'msg-count-1', properties: { test: 'test1', }, @@ -119,6 +121,7 @@ describe('QueueFlushingPlugin', () => { const event2: SegmentEvent = { type: EventType.TrackEvent, event: 'test2', + messageId: 'msg-count-2', properties: { test: 'test2', }, @@ -130,7 +133,7 @@ describe('QueueFlushingPlugin', () => { let eventsCount = await queuePlugin.pendingEvents(); expect(eventsCount).toBe(2); - await queuePlugin.dequeue(event1); + await queuePlugin.dequeueByMessageIds(['msg-count-1']); eventsCount = await queuePlugin.pendingEvents(); expect(eventsCount).toBe(1); diff --git a/packages/core/src/plugins/__tests__/SegmentDestination.test.ts b/packages/core/src/plugins/__tests__/SegmentDestination.test.ts index 885097fd9..23f4d410c 100644 --- a/packages/core/src/plugins/__tests__/SegmentDestination.test.ts +++ b/packages/core/src/plugins/__tests__/SegmentDestination.test.ts @@ -9,6 +9,7 @@ import { import { Config, EventType, + HttpConfig, SegmentAPIIntegration, SegmentEvent, TrackEventType, @@ -260,10 +261,12 @@ describe('SegmentDestination', () => { config, settings, events, + httpConfig, }: { config?: Config; settings?: SegmentAPIIntegration; events: SegmentEvent[]; + httpConfig?: HttpConfig; }) => { const plugin = new SegmentDestination(); @@ -278,6 +281,13 @@ describe('SegmentDestination', () => { }); plugin.configure(analytics); + + if (httpConfig !== undefined) { + // eslint-disable-next-line @typescript-eslint/ban-ts-comment + // @ts-ignore + plugin.httpConfig = httpConfig; + } + // The settings store won't match but that's ok, the plugin should rely only on the settings it receives during update plugin.update( { @@ -322,6 +332,7 @@ describe('SegmentDestination', () => { expect(sendEventsSpy).toHaveBeenCalledWith({ url: getURL(defaultApiHost, ''), // default api already appended with '/b' writeKey: '123-456', + retryCount: 0, events: events.slice(0, 2).map((e) => ({ ...e, })), @@ -329,6 +340,7 @@ describe('SegmentDestination', () => { expect(sendEventsSpy).toHaveBeenCalledWith({ url: getURL(defaultApiHost, ''), // default api already appended with '/b' writeKey: '123-456', + retryCount: 0, events: events.slice(2, 4).map((e) => ({ ...e, })), @@ -356,6 +368,7 @@ describe('SegmentDestination', () => { expect(sendEventsSpy).toHaveBeenCalledWith({ url: getURL(customEndpoint, '/b'), writeKey: '123-456', + retryCount: 0, events: events.slice(0, 2).map((e) => ({ ...e, })), @@ -407,13 +420,129 @@ describe('SegmentDestination', () => { expect(sendEventsSpy).toHaveBeenCalledWith({ url: expectedUrl, writeKey: '123-456', + retryCount: 0, events: events.map((e) => ({ ...e, })), }); } ); + + describe('event age pruning', () => { + it('prunes events older than maxTotalBackoffDuration', async () => { + const now = Date.now(); + const events = [ + { messageId: 'old-1', _queuedAt: now - 50000 * 1000 }, + { messageId: 'fresh-1' }, + { messageId: 'fresh-2' }, + ] as SegmentEvent[]; + + const { plugin, sendEventsSpy } = createTestWith({ + events, + httpConfig: { + backoffConfig: { + enabled: true, + maxRetryCount: 100, + baseBackoffInterval: 0.5, + maxBackoffInterval: 300, + maxTotalBackoffDuration: 43200, + jitterPercent: 10, + default4xxBehavior: 'drop', + default5xxBehavior: 'retry', + statusCodeOverrides: {}, + }, + }, + }); + + await plugin.flush(); + + expect(sendEventsSpy).toHaveBeenCalledTimes(1); + const sentEvents = sendEventsSpy.mock.calls[0][0].events; + expect(sentEvents).toHaveLength(2); + expect(sentEvents.map((e: SegmentEvent) => e.messageId)).toEqual([ + 'fresh-1', + 'fresh-2', + ]); + }); + + it('does not prune when maxTotalBackoffDuration is 0', async () => { + const now = Date.now(); + const events = [ + { messageId: 'old-1', _queuedAt: now - 50000 * 1000 }, + { messageId: 'fresh-1' }, + ] as SegmentEvent[]; + + const { plugin, sendEventsSpy } = createTestWith({ + events, + httpConfig: { + backoffConfig: { + enabled: true, + maxRetryCount: 100, + baseBackoffInterval: 0.5, + maxBackoffInterval: 300, + maxTotalBackoffDuration: 0, + jitterPercent: 10, + default4xxBehavior: 'drop', + default5xxBehavior: 'retry', + statusCodeOverrides: {}, + }, + }, + }); + + await plugin.flush(); + + expect(sendEventsSpy).toHaveBeenCalledTimes(1); + const sentEvents = sendEventsSpy.mock.calls[0][0].events; + expect(sentEvents).toHaveLength(2); + }); + + it('does not prune events without _queuedAt', async () => { + const events = [ + { messageId: 'old-1' }, + { messageId: 'fresh-1' }, + ] as SegmentEvent[]; + + const { plugin, sendEventsSpy } = createTestWith({ + events, + httpConfig: { + backoffConfig: { + enabled: true, + maxRetryCount: 100, + baseBackoffInterval: 0.5, + maxBackoffInterval: 300, + maxTotalBackoffDuration: 43200, + jitterPercent: 10, + default4xxBehavior: 'drop', + default5xxBehavior: 'retry', + statusCodeOverrides: {}, + }, + }, + }); + + await plugin.flush(); + + expect(sendEventsSpy).toHaveBeenCalledTimes(1); + const sentEvents = sendEventsSpy.mock.calls[0][0].events; + expect(sentEvents).toHaveLength(2); + }); + + it('strips _queuedAt before upload', async () => { + const now = Date.now(); + const events = [ + { messageId: 'msg-1', _queuedAt: now - 1000 }, + ] as SegmentEvent[]; + + const { plugin, sendEventsSpy } = createTestWith({ events }); + + await plugin.flush(); + + expect(sendEventsSpy).toHaveBeenCalledTimes(1); + const sentEvents = sendEventsSpy.mock.calls[0][0].events; + expect(sentEvents[0]).not.toHaveProperty('_queuedAt'); + }); + }); }); + describe('getEndpoint', () => { it.each([ ['example.com/v1/', 'https://example.com/v1/'], diff --git a/packages/core/src/types.ts b/packages/core/src/types.ts index aef451e61..12e58a43c 100644 --- a/packages/core/src/types.ts +++ b/packages/core/src/types.ts @@ -154,6 +154,8 @@ export type Config = { cdnProxy?: string; useSegmentEndpoints?: boolean; // Use if you want to use Segment endpoints errorHandler?: (error: SegmentError) => void; + /** Client-side httpConfig overrides (highest precedence over defaults and CDN). */ + httpConfig?: DeepPartial; }; export type ClientMethods = {