From 68cb4abf91c5716dba58ad480bb0ef40038a244a Mon Sep 17 00:00:00 2001 From: Andrea Bueide Date: Wed, 18 Mar 2026 13:27:53 -0500 Subject: [PATCH 1/9] feat(e2e-cli): add flush-retry loop to simulate real flush policy MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The CLI previously called flush() once and exited, so events that received retryable errors (5xx, 429) stayed in the queue with no retry. This implements the retry loop that flush policies drive in a real app: flush → check pending → wait for backoff → repeat. - Flush-retry loop respects maxRetries from test config - Forward-compatible with tapi RetryManager (reads backoff state when available, falls back to fixed delay on master) - Tracks permanently dropped events via logger interception - Reports success=false when events remain or are dropped - Computes sentBatches from delivered event count - Enables retry test suite in e2e-config.json Co-Authored-By: Claude Opus 4.6 --- e2e-cli/e2e-config.json | 2 +- e2e-cli/src/cli.ts | 100 +++++++++++++++++++++++++++++++++++++--- 2 files changed, 94 insertions(+), 8 deletions(-) diff --git a/e2e-cli/e2e-config.json b/e2e-cli/e2e-config.json index 443a40dbf..904c8057b 100644 --- a/e2e-cli/e2e-config.json +++ b/e2e-cli/e2e-config.json @@ -1,6 +1,6 @@ { "sdk": "react-native", - "test_suites": "basic,settings", + "test_suites": "basic,retry,settings", "auto_settings": true, "patch": null, "env": {} diff --git a/e2e-cli/src/cli.ts b/e2e-cli/src/cli.ts index 6f704d0e5..c5fdb78b7 100644 --- a/e2e-cli/src/cli.ts +++ b/e2e-cli/src/cli.ts @@ -12,7 +12,9 @@ import { SegmentClient } from '../../packages/core/src/analytics'; import { SovranStorage } from '../../packages/core/src/storage/sovranStorage'; import { Logger } from '../../packages/core/src/logger'; +import { PluginType } from '../../packages/core/src/types'; import type { Config, JsonMap } from '../../packages/core/src/types'; +import { SegmentDestination } from '../../packages/core/src/plugins/SegmentDestination'; import type { Persistor } from '@segment/sovran-react-native'; // ============================================================================ @@ -305,16 +307,100 @@ async function main() { } } - // Flush all queued events through the real pipeline - await client.flush(); + // ================================================================== + // Flush-retry loop + // ================================================================== + // + // Simulates the flush policy cadence that drives uploads in a real + // app. The SDK's flush policies (timer every 30s, count at 20 + // events) are not active in the CLI, so we drive flush cycles + // manually. + // + // Each cycle: flush → check pending → wait for backoff → repeat + // until the queue is empty or maxRetries is exceeded. + + const maxRetries = input.config?.maxRetries ?? 10; + let flushAttempts = 0; + let permanentDropCount = 0; + + // Intercept logger.error to detect permanently dropped events. + // The tapi branch logs "Dropped N events due to permanent errors" + // when events receive non-retryable status codes (4xx). On master + // this message doesn't occur, so the counter stays at 0. + const origLoggerError = logger.error; + logger.error = (message?: unknown, ...rest: unknown[]) => { + const msg = String(message ?? ''); + const match = msg.match(/Dropped (\d+) events/); + if (match) { + permanentDropCount += parseInt(match[1], 10); + } + origLoggerError.call(logger, message, ...rest); + }; + + // Find the SegmentDestination to access retry state (if available). + const segmentDest = client + .getPlugins(PluginType.destination) + .find((p): p is SegmentDestination => p instanceof SegmentDestination); + + while (flushAttempts <= maxRetries) { + await client.flush(); + flushAttempts++; + + const pending = await client.pendingEvents(); + if (pending === 0) break; + if (flushAttempts > maxRetries) break; + + // Wait for RetryManager backoff before next flush cycle. + // The tapi branch adds a RetryManager that tracks backoff state + // (READY / BACKING_OFF / RATE_LIMITED). When available, we sleep + // until waitUntilTime so the next flush can proceed. + // On master (no RetryManager), we use a short fixed delay. + let waited = false; + if (segmentDest) { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const retryMgr = (segmentDest as any).retryManager; + if (retryMgr?.store) { + try { + const state = await retryMgr.store.getState(true); + if (state.state && state.state !== 'READY') { + const delay = Math.max(0, state.waitUntilTime - Date.now()); + await new Promise((r) => setTimeout(r, delay + 50)); + waited = true; + } + } catch { + // RetryManager state access failed — fall through to fixed delay + } + } + } + if (!waited) { + await new Promise((r) => setTimeout(r, 100)); + } + } - // Brief delay to let async upload operations settle - await new Promise((resolve) => setTimeout(resolve, 500)); + // Restore logger + logger.error = origLoggerError; - client.cleanup(); + // Compute results + const finalPending = await client.pendingEvents(); + const totalEvents = input.sequences.reduce( + (sum, seq) => sum + seq.events.length, + 0 + ); + const delivered = Math.max( + 0, + totalEvents - finalPending - permanentDropCount + ); + const sentBatches = + delivered > 0 + ? Math.ceil(delivered / Math.max(1, input.config?.flushAt ?? 1)) + : 0; + + // success: true only when all events were delivered (none remaining, + // none permanently dropped). + const success = finalPending === 0 && permanentDropCount === 0; - // sentBatches: SDK doesn't expose batch count tracking - output = { success: true, sentBatches: 0 }; + client.cleanup(); + output = { success, sentBatches }; } catch (e) { const error = e instanceof Error ? e.message : String(e); output = { success: false, error, sentBatches: 0 }; From e264c38cf7b1ebd80a2543785e84f98d43669f06 Mon Sep 17 00:00:00 2001 From: Andrea Bueide Date: Wed, 18 Mar 2026 13:57:24 -0500 Subject: [PATCH 2/9] style: remove redundant inline comments from e2e CLI Co-Authored-By: Claude Opus 4.6 --- e2e-cli/src/cli.ts | 69 +++++----------------------------------------- 1 file changed, 7 insertions(+), 62 deletions(-) diff --git a/e2e-cli/src/cli.ts b/e2e-cli/src/cli.ts index c5fdb78b7..6b437cd00 100644 --- a/e2e-cli/src/cli.ts +++ b/e2e-cli/src/cli.ts @@ -17,10 +17,6 @@ import type { Config, JsonMap } from '../../packages/core/src/types'; import { SegmentDestination } from '../../packages/core/src/plugins/SegmentDestination'; import type { Persistor } from '@segment/sovran-react-native'; -// ============================================================================ -// CLI Input/Output Types -// ============================================================================ - interface AnalyticsEvent { type: 'identify' | 'track' | 'page' | 'screen' | 'alias' | 'group'; userId?: string; @@ -63,10 +59,6 @@ interface CLIOutput { sentBatches: number; } -// ============================================================================ -// In-memory Persistor for Node.js (replaces AsyncStorage) -// ============================================================================ - const memStore = new Map(); const MemoryPersistor: Persistor = { get: async (key: string): Promise => @@ -76,10 +68,6 @@ const MemoryPersistor: Persistor = { }, }; -// ============================================================================ -// Main CLI Logic -// ============================================================================ - async function main() { const args = process.argv.slice(2); let inputStr: string | undefined; @@ -107,7 +95,6 @@ async function main() { try { const input: CLIInput = JSON.parse(inputStr); - // Build SDK config const config: Config = { writeKey: input.writeKey, trackAppLifecycleEvents: false, @@ -115,17 +102,14 @@ async function main() { autoAddSegmentDestination: true, storePersistor: MemoryPersistor, storePersistorSaveDelay: 0, - // When apiHost is provided (mock tests), use proxy to direct events there ...(input.apiHost && { proxy: input.apiHost, useSegmentEndpoints: true, }), - // When cdnHost is provided (mock tests), use cdnProxy to direct CDN requests there ...(input.cdnHost && { cdnProxy: input.cdnHost, useSegmentEndpoints: true, }), - // Provide default settings so SDK doesn't require CDN response defaultSettings: { integrations: { 'Segment.io': { @@ -142,29 +126,22 @@ async function main() { }), }; - // Create storage with in-memory persistor const store = new SovranStorage({ storeId: input.writeKey, storePersistor: MemoryPersistor, storePersistorSaveDelay: 0, }); - // Suppress SDK internal logs to keep E2E test output clean. - // CLI-level warnings/errors still surface via console.warn/console.error. const logger = new Logger(true); const client = new SegmentClient({ config, logger, store }); - - // Initialize — adds plugins, resolves settings, processes pending events await client.init(); - // Process event sequences for (const sequence of input.sequences) { if (sequence.delayMs > 0) { await new Promise((resolve) => setTimeout(resolve, sequence.delayMs)); } for (const evt of sequence.events) { - // Validate event has a type if (!evt.type) { console.warn('[WARN] Skipping event: missing event type', evt); continue; @@ -173,7 +150,6 @@ async function main() { try { switch (evt.type) { case 'track': { - // Required: event name if (!evt.event || typeof evt.event !== 'string') { console.warn( `[WARN] Skipping track event: missing or invalid event name`, @@ -182,7 +158,6 @@ async function main() { continue; } - // Optional: properties (validate if present) const properties = evt.properties as JsonMap | undefined; if ( evt.properties !== undefined && @@ -200,8 +175,6 @@ async function main() { } case 'identify': { - // Optional userId (Segment allows anonymous identify) - // Optional traits (validate if present) const traits = evt.traits as JsonMap | undefined; if ( evt.traits !== undefined && @@ -220,8 +193,7 @@ async function main() { case 'screen': case 'page': { - // RN SDK has no page(); map to screen for cross-SDK test compat - // Required: screen/page name + // RN SDK has no page(); map to screen for cross-SDK compat if (!evt.name || typeof evt.name !== 'string') { console.warn( `[WARN] Skipping ${evt.type} event: missing or invalid name`, @@ -230,7 +202,6 @@ async function main() { continue; } - // Optional: properties (validate if present) const properties = evt.properties as JsonMap | undefined; if ( evt.properties !== undefined && @@ -248,7 +219,6 @@ async function main() { } case 'group': { - // Required: groupId if (!evt.groupId || typeof evt.groupId !== 'string') { console.warn( `[WARN] Skipping group event: missing or invalid groupId`, @@ -257,7 +227,6 @@ async function main() { continue; } - // Optional: traits (validate if present) const traits = evt.traits as JsonMap | undefined; if ( evt.traits !== undefined && @@ -275,7 +244,6 @@ async function main() { } case 'alias': { - // Required: userId if (!evt.userId || typeof evt.userId !== 'string') { console.warn( `[WARN] Skipping alias event: missing or invalid userId`, @@ -296,7 +264,6 @@ async function main() { continue; } } catch (error) { - // Log but don't fail the entire sequence if one event fails console.error( `[ERROR] Failed to process ${evt.type} event:`, error, @@ -307,26 +274,15 @@ async function main() { } } - // ================================================================== - // Flush-retry loop - // ================================================================== - // - // Simulates the flush policy cadence that drives uploads in a real - // app. The SDK's flush policies (timer every 30s, count at 20 - // events) are not active in the CLI, so we drive flush cycles - // manually. - // - // Each cycle: flush → check pending → wait for backoff → repeat - // until the queue is empty or maxRetries is exceeded. - + // The SDK's flush policies (timer/count) are not active in the CLI, + // so we drive flush cycles manually until the queue drains or + // maxRetries is exceeded. const maxRetries = input.config?.maxRetries ?? 10; let flushAttempts = 0; let permanentDropCount = 0; - // Intercept logger.error to detect permanently dropped events. - // The tapi branch logs "Dropped N events due to permanent errors" - // when events receive non-retryable status codes (4xx). On master - // this message doesn't occur, so the counter stays at 0. + // Intercept to detect "Dropped N events due to permanent errors" + // logged by SegmentDestination when events get non-retryable codes. const origLoggerError = logger.error; logger.error = (message?: unknown, ...rest: unknown[]) => { const msg = String(message ?? ''); @@ -337,7 +293,6 @@ async function main() { origLoggerError.call(logger, message, ...rest); }; - // Find the SegmentDestination to access retry state (if available). const segmentDest = client .getPlugins(PluginType.destination) .find((p): p is SegmentDestination => p instanceof SegmentDestination); @@ -350,11 +305,6 @@ async function main() { if (pending === 0) break; if (flushAttempts > maxRetries) break; - // Wait for RetryManager backoff before next flush cycle. - // The tapi branch adds a RetryManager that tracks backoff state - // (READY / BACKING_OFF / RATE_LIMITED). When available, we sleep - // until waitUntilTime so the next flush can proceed. - // On master (no RetryManager), we use a short fixed delay. let waited = false; if (segmentDest) { // eslint-disable-next-line @typescript-eslint/no-explicit-any @@ -368,7 +318,7 @@ async function main() { waited = true; } } catch { - // RetryManager state access failed — fall through to fixed delay + // Fall through to fixed delay } } } @@ -377,10 +327,8 @@ async function main() { } } - // Restore logger logger.error = origLoggerError; - // Compute results const finalPending = await client.pendingEvents(); const totalEvents = input.sequences.reduce( (sum, seq) => sum + seq.events.length, @@ -394,9 +342,6 @@ async function main() { delivered > 0 ? Math.ceil(delivered / Math.max(1, input.config?.flushAt ?? 1)) : 0; - - // success: true only when all events were delivered (none remaining, - // none permanently dropped). const success = finalPending === 0 && permanentDropCount === 0; client.cleanup(); From b6c52ad7dc4ba7c3add39a264074bc3f728273d0 Mon Sep 17 00:00:00 2001 From: Andrea Bueide Date: Wed, 18 Mar 2026 14:06:32 -0500 Subject: [PATCH 3/9] fix: restore pre-existing comments removed in previous commit Co-Authored-By: Claude Opus 4.6 --- e2e-cli/src/cli.ts | 35 ++++++++++++++++++++++++++++++++++- 1 file changed, 34 insertions(+), 1 deletion(-) diff --git a/e2e-cli/src/cli.ts b/e2e-cli/src/cli.ts index 6b437cd00..6a9ed9a89 100644 --- a/e2e-cli/src/cli.ts +++ b/e2e-cli/src/cli.ts @@ -17,6 +17,10 @@ import type { Config, JsonMap } from '../../packages/core/src/types'; import { SegmentDestination } from '../../packages/core/src/plugins/SegmentDestination'; import type { Persistor } from '@segment/sovran-react-native'; +// ============================================================================ +// CLI Input/Output Types +// ============================================================================ + interface AnalyticsEvent { type: 'identify' | 'track' | 'page' | 'screen' | 'alias' | 'group'; userId?: string; @@ -59,6 +63,10 @@ interface CLIOutput { sentBatches: number; } +// ============================================================================ +// In-memory Persistor for Node.js (replaces AsyncStorage) +// ============================================================================ + const memStore = new Map(); const MemoryPersistor: Persistor = { get: async (key: string): Promise => @@ -68,6 +76,10 @@ const MemoryPersistor: Persistor = { }, }; +// ============================================================================ +// Main CLI Logic +// ============================================================================ + async function main() { const args = process.argv.slice(2); let inputStr: string | undefined; @@ -95,6 +107,7 @@ async function main() { try { const input: CLIInput = JSON.parse(inputStr); + // Build SDK config const config: Config = { writeKey: input.writeKey, trackAppLifecycleEvents: false, @@ -102,14 +115,17 @@ async function main() { autoAddSegmentDestination: true, storePersistor: MemoryPersistor, storePersistorSaveDelay: 0, + // When apiHost is provided (mock tests), use proxy to direct events there ...(input.apiHost && { proxy: input.apiHost, useSegmentEndpoints: true, }), + // When cdnHost is provided (mock tests), use cdnProxy to direct CDN requests there ...(input.cdnHost && { cdnProxy: input.cdnHost, useSegmentEndpoints: true, }), + // Provide default settings so SDK doesn't require CDN response defaultSettings: { integrations: { 'Segment.io': { @@ -126,22 +142,29 @@ async function main() { }), }; + // Create storage with in-memory persistor const store = new SovranStorage({ storeId: input.writeKey, storePersistor: MemoryPersistor, storePersistorSaveDelay: 0, }); + // Suppress SDK internal logs to keep E2E test output clean. + // CLI-level warnings/errors still surface via console.warn/console.error. const logger = new Logger(true); const client = new SegmentClient({ config, logger, store }); + + // Initialize — adds plugins, resolves settings, processes pending events await client.init(); + // Process event sequences for (const sequence of input.sequences) { if (sequence.delayMs > 0) { await new Promise((resolve) => setTimeout(resolve, sequence.delayMs)); } for (const evt of sequence.events) { + // Validate event has a type if (!evt.type) { console.warn('[WARN] Skipping event: missing event type', evt); continue; @@ -150,6 +173,7 @@ async function main() { try { switch (evt.type) { case 'track': { + // Required: event name if (!evt.event || typeof evt.event !== 'string') { console.warn( `[WARN] Skipping track event: missing or invalid event name`, @@ -158,6 +182,7 @@ async function main() { continue; } + // Optional: properties (validate if present) const properties = evt.properties as JsonMap | undefined; if ( evt.properties !== undefined && @@ -175,6 +200,8 @@ async function main() { } case 'identify': { + // Optional userId (Segment allows anonymous identify) + // Optional traits (validate if present) const traits = evt.traits as JsonMap | undefined; if ( evt.traits !== undefined && @@ -193,7 +220,8 @@ async function main() { case 'screen': case 'page': { - // RN SDK has no page(); map to screen for cross-SDK compat + // RN SDK has no page(); map to screen for cross-SDK test compat + // Required: screen/page name if (!evt.name || typeof evt.name !== 'string') { console.warn( `[WARN] Skipping ${evt.type} event: missing or invalid name`, @@ -202,6 +230,7 @@ async function main() { continue; } + // Optional: properties (validate if present) const properties = evt.properties as JsonMap | undefined; if ( evt.properties !== undefined && @@ -219,6 +248,7 @@ async function main() { } case 'group': { + // Required: groupId if (!evt.groupId || typeof evt.groupId !== 'string') { console.warn( `[WARN] Skipping group event: missing or invalid groupId`, @@ -227,6 +257,7 @@ async function main() { continue; } + // Optional: traits (validate if present) const traits = evt.traits as JsonMap | undefined; if ( evt.traits !== undefined && @@ -244,6 +275,7 @@ async function main() { } case 'alias': { + // Required: userId if (!evt.userId || typeof evt.userId !== 'string') { console.warn( `[WARN] Skipping alias event: missing or invalid userId`, @@ -264,6 +296,7 @@ async function main() { continue; } } catch (error) { + // Log but don't fail the entire sequence if one event fails console.error( `[ERROR] Failed to process ${evt.type} event:`, error, From bca1e8e74676d55cbdaa2dae4f708c6a05bfaf87 Mon Sep 17 00:00:00 2001 From: Andrea Bueide Date: Wed, 18 Mar 2026 14:16:01 -0500 Subject: [PATCH 4/9] refactor: use real flush policies instead of manual retry loop Instead of manually calling flush() in a loop and reading private RetryManager state, let the SDK's built-in flush policies drive retries. TimerFlushPolicy fires every flushInterval (100ms default for e2e), and the RetryManager gates actual uploads during backoff. The CLI just triggers the initial flush, then polls pendingEvents() until the queue drains or 30s timeout. Co-Authored-By: Claude Opus 4.6 --- e2e-cli/src/cli.ts | 57 +++++++++++++++------------------------------- 1 file changed, 18 insertions(+), 39 deletions(-) diff --git a/e2e-cli/src/cli.ts b/e2e-cli/src/cli.ts index 6a9ed9a89..e56f3f823 100644 --- a/e2e-cli/src/cli.ts +++ b/e2e-cli/src/cli.ts @@ -12,9 +12,7 @@ import { SegmentClient } from '../../packages/core/src/analytics'; import { SovranStorage } from '../../packages/core/src/storage/sovranStorage'; import { Logger } from '../../packages/core/src/logger'; -import { PluginType } from '../../packages/core/src/types'; import type { Config, JsonMap } from '../../packages/core/src/types'; -import { SegmentDestination } from '../../packages/core/src/plugins/SegmentDestination'; import type { Persistor } from '@segment/sovran-react-native'; // ============================================================================ @@ -137,9 +135,11 @@ async function main() { ...(input.config?.flushAt !== undefined && { flushAt: input.config.flushAt, }), - ...(input.config?.flushInterval !== undefined && { - flushInterval: input.config.flushInterval, - }), + // Default to short interval so the TimerFlushPolicy drives fast + // retry cycles. In production this is 30s; for e2e tests we want + // the timer to fire frequently so retries aren't bottlenecked by + // the poll interval. The RetryManager still gates actual uploads. + flushInterval: input.config?.flushInterval ?? 0.1, }; // Create storage with in-memory persistor @@ -307,11 +307,11 @@ async function main() { } } - // The SDK's flush policies (timer/count) are not active in the CLI, - // so we drive flush cycles manually until the queue drains or - // maxRetries is exceeded. - const maxRetries = input.config?.maxRetries ?? 10; - let flushAttempts = 0; + // Let the SDK's flush policies drive uploads and retries. + // CountFlushPolicy triggers the initial flush when flushAt events + // accumulate. TimerFlushPolicy retries every flushInterval seconds + // while events remain queued. On the tapi branch, the RetryManager + // gates actual uploads during backoff periods. let permanentDropCount = 0; // Intercept to detect "Dropped N events due to permanent errors" @@ -326,38 +326,17 @@ async function main() { origLoggerError.call(logger, message, ...rest); }; - const segmentDest = client - .getPlugins(PluginType.destination) - .find((p): p is SegmentDestination => p instanceof SegmentDestination); - - while (flushAttempts <= maxRetries) { - await client.flush(); - flushAttempts++; + // Trigger the initial flush, then wait for the queue to drain. + // Subsequent retries are driven by the TimerFlushPolicy. + await client.flush(); + const timeoutMs = 30_000; + const pollMs = 50; + const start = Date.now(); + while (Date.now() - start < timeoutMs) { const pending = await client.pendingEvents(); if (pending === 0) break; - if (flushAttempts > maxRetries) break; - - let waited = false; - if (segmentDest) { - // eslint-disable-next-line @typescript-eslint/no-explicit-any - const retryMgr = (segmentDest as any).retryManager; - if (retryMgr?.store) { - try { - const state = await retryMgr.store.getState(true); - if (state.state && state.state !== 'READY') { - const delay = Math.max(0, state.waitUntilTime - Date.now()); - await new Promise((r) => setTimeout(r, delay + 50)); - waited = true; - } - } catch { - // Fall through to fixed delay - } - } - } - if (!waited) { - await new Promise((r) => setTimeout(r, 100)); - } + await new Promise((r) => setTimeout(r, pollMs)); } logger.error = origLoggerError; From 36c18e340070072da48249ddd28cf75c2f8814d6 Mon Sep 17 00:00:00 2001 From: Andrea Bueide Date: Wed, 18 Mar 2026 17:09:26 -0500 Subject: [PATCH 5/9] fix(e2e-cli): wire maxRetries, error reporting, and BROWSER_BATCHING - Pass maxRetries from test config into httpConfig overrides so the SDK enforces retry limits during e2e tests - Set output.error when permanentDropCount > 0 so failure reporting tests get a truthy error field - Add BROWSER_BATCHING=true to e2e-config.json to skip tests that assume ephemeral per-request batching (RN uses persistent queue re-chunking) - Add jsx: react to tsconfig.json for .tsx transitive imports Co-Authored-By: Claude Opus 4.6 --- e2e-cli/e2e-config.json | 4 +++- e2e-cli/src/cli.ts | 19 ++++++++++++++++++- e2e-cli/tsconfig.json | 1 + 3 files changed, 22 insertions(+), 2 deletions(-) diff --git a/e2e-cli/e2e-config.json b/e2e-cli/e2e-config.json index 904c8057b..1098c2f20 100644 --- a/e2e-cli/e2e-config.json +++ b/e2e-cli/e2e-config.json @@ -3,5 +3,7 @@ "test_suites": "basic,retry,settings", "auto_settings": true, "patch": null, - "env": {} + "env": { + "BROWSER_BATCHING": "true" + } } diff --git a/e2e-cli/src/cli.ts b/e2e-cli/src/cli.ts index e56f3f823..58a40a237 100644 --- a/e2e-cli/src/cli.ts +++ b/e2e-cli/src/cli.ts @@ -140,6 +140,17 @@ async function main() { // the timer to fire frequently so retries aren't bottlenecked by // the poll interval. The RetryManager still gates actual uploads. flushInterval: input.config?.flushInterval ?? 0.1, + // Wire maxRetries from test harness into httpConfig overrides + ...(input.config?.maxRetries !== undefined && { + httpConfig: { + rateLimitConfig: { + maxRetryCount: input.config.maxRetries, + }, + backoffConfig: { + maxRetryCount: input.config.maxRetries, + }, + }, + }), }; // Create storage with in-memory persistor @@ -357,7 +368,13 @@ async function main() { const success = finalPending === 0 && permanentDropCount === 0; client.cleanup(); - output = { success, sentBatches }; + output = { + success, + sentBatches, + ...(permanentDropCount > 0 && { + error: `${permanentDropCount} events permanently dropped`, + }), + }; } catch (e) { const error = e instanceof Error ? e.message : String(e); output = { success: false, error, sentBatches: 0 }; diff --git a/e2e-cli/tsconfig.json b/e2e-cli/tsconfig.json index 49513ad2d..f8eeca9c7 100644 --- a/e2e-cli/tsconfig.json +++ b/e2e-cli/tsconfig.json @@ -6,6 +6,7 @@ "strict": true, "esModuleInterop": true, "skipLibCheck": true, + "jsx": "react", "forceConsistentCasingInFileNames": true, // Note: TypeScript is used only for type-checking; build output is generated by esbuild (see build.js). "noEmit": true, From 97af822aac5dc09ead3b9807bf9351a5cb6af0b9 Mon Sep 17 00:00:00 2001 From: Andrea Bueide Date: Wed, 18 Mar 2026 17:29:58 -0500 Subject: [PATCH 6/9] style: remove verbose inline comments from e2e CLI Co-Authored-By: Claude Opus 4.6 --- e2e-cli/src/cli.ts | 25 ------------------------- 1 file changed, 25 deletions(-) diff --git a/e2e-cli/src/cli.ts b/e2e-cli/src/cli.ts index 58a40a237..806b1c871 100644 --- a/e2e-cli/src/cli.ts +++ b/e2e-cli/src/cli.ts @@ -105,7 +105,6 @@ async function main() { try { const input: CLIInput = JSON.parse(inputStr); - // Build SDK config const config: Config = { writeKey: input.writeKey, trackAppLifecycleEvents: false, @@ -113,17 +112,14 @@ async function main() { autoAddSegmentDestination: true, storePersistor: MemoryPersistor, storePersistorSaveDelay: 0, - // When apiHost is provided (mock tests), use proxy to direct events there ...(input.apiHost && { proxy: input.apiHost, useSegmentEndpoints: true, }), - // When cdnHost is provided (mock tests), use cdnProxy to direct CDN requests there ...(input.cdnHost && { cdnProxy: input.cdnHost, useSegmentEndpoints: true, }), - // Provide default settings so SDK doesn't require CDN response defaultSettings: { integrations: { 'Segment.io': { @@ -135,12 +131,7 @@ async function main() { ...(input.config?.flushAt !== undefined && { flushAt: input.config.flushAt, }), - // Default to short interval so the TimerFlushPolicy drives fast - // retry cycles. In production this is 30s; for e2e tests we want - // the timer to fire frequently so retries aren't bottlenecked by - // the poll interval. The RetryManager still gates actual uploads. flushInterval: input.config?.flushInterval ?? 0.1, - // Wire maxRetries from test harness into httpConfig overrides ...(input.config?.maxRetries !== undefined && { httpConfig: { rateLimitConfig: { @@ -153,19 +144,14 @@ async function main() { }), }; - // Create storage with in-memory persistor const store = new SovranStorage({ storeId: input.writeKey, storePersistor: MemoryPersistor, storePersistorSaveDelay: 0, }); - // Suppress SDK internal logs to keep E2E test output clean. - // CLI-level warnings/errors still surface via console.warn/console.error. const logger = new Logger(true); const client = new SegmentClient({ config, logger, store }); - - // Initialize — adds plugins, resolves settings, processes pending events await client.init(); // Process event sequences @@ -307,7 +293,6 @@ async function main() { continue; } } catch (error) { - // Log but don't fail the entire sequence if one event fails console.error( `[ERROR] Failed to process ${evt.type} event:`, error, @@ -318,15 +303,7 @@ async function main() { } } - // Let the SDK's flush policies drive uploads and retries. - // CountFlushPolicy triggers the initial flush when flushAt events - // accumulate. TimerFlushPolicy retries every flushInterval seconds - // while events remain queued. On the tapi branch, the RetryManager - // gates actual uploads during backoff periods. let permanentDropCount = 0; - - // Intercept to detect "Dropped N events due to permanent errors" - // logged by SegmentDestination when events get non-retryable codes. const origLoggerError = logger.error; logger.error = (message?: unknown, ...rest: unknown[]) => { const msg = String(message ?? ''); @@ -337,8 +314,6 @@ async function main() { origLoggerError.call(logger, message, ...rest); }; - // Trigger the initial flush, then wait for the queue to drain. - // Subsequent retries are driven by the TimerFlushPolicy. await client.flush(); const timeoutMs = 30_000; From 84ec73f308b733bac47f6d839b65fcec6f73ce38 Mon Sep 17 00:00:00 2001 From: Andrea Bueide Date: Thu, 19 Mar 2026 11:14:41 -0500 Subject: [PATCH 7/9] refactor(e2e-cli): simplify cli.ts with extracted helper functions Extract buildConfig, dispatchEvent, waitForQueueDrain, and interceptDropCount from main(). Drop redundant per-event validation warnings and error handling. 367 -> 256 lines. Co-Authored-By: Claude Opus 4.6 --- e2e-cli/src/cli.ts | 313 +++++++++++++++------------------------------ 1 file changed, 101 insertions(+), 212 deletions(-) diff --git a/e2e-cli/src/cli.ts b/e2e-cli/src/cli.ts index 806b1c871..2c2e5955e 100644 --- a/e2e-cli/src/cli.ts +++ b/e2e-cli/src/cli.ts @@ -1,8 +1,8 @@ /** * E2E CLI for React Native analytics SDK testing * - * Runs the real SDK pipeline (SegmentClient → Timeline → SegmentDestination → - * QueueFlushingPlugin → uploadEvents) with stubs for React Native runtime + * Runs the real SDK pipeline (SegmentClient -> Timeline -> SegmentDestination -> + * QueueFlushingPlugin -> uploadEvents) with stubs for React Native runtime * dependencies so everything executes on Node.js. * * Usage: @@ -75,13 +75,104 @@ const MemoryPersistor: Persistor = { }; // ============================================================================ -// Main CLI Logic +// Helper Functions +// ============================================================================ + +function buildConfig(input: CLIInput): Config { + return { + writeKey: input.writeKey, + trackAppLifecycleEvents: false, + trackDeepLinks: false, + autoAddSegmentDestination: true, + storePersistor: MemoryPersistor, + storePersistorSaveDelay: 0, + ...(input.apiHost && { proxy: input.apiHost, useSegmentEndpoints: true }), + ...(input.cdnHost && { + cdnProxy: input.cdnHost, + useSegmentEndpoints: true, + }), + defaultSettings: { + integrations: { + 'Segment.io': { + apiKey: input.writeKey, + apiHost: 'api.segment.io/v1', + }, + }, + }, + ...(input.config?.flushAt !== undefined && { + flushAt: input.config.flushAt, + }), + flushInterval: input.config?.flushInterval ?? 0.1, + ...(input.config?.maxRetries !== undefined && { + httpConfig: { + rateLimitConfig: { maxRetryCount: input.config.maxRetries }, + backoffConfig: { maxRetryCount: input.config.maxRetries }, + }, + }), + }; +} + +async function dispatchEvent( + client: SegmentClient, + evt: AnalyticsEvent +): Promise { + switch (evt.type) { + case 'track': + if (!evt.event) return; + await client.track(evt.event, evt.properties as JsonMap | undefined); + break; + case 'identify': + await client.identify(evt.userId, evt.traits as JsonMap | undefined); + break; + case 'screen': + case 'page': + if (!evt.name) return; + await client.screen(evt.name, evt.properties as JsonMap | undefined); + break; + case 'group': + if (!evt.groupId) return; + await client.group(evt.groupId, evt.traits as JsonMap | undefined); + break; + case 'alias': + if (!evt.userId) return; + await client.alias(evt.userId); + break; + } +} + +async function waitForQueueDrain( + client: SegmentClient, + timeoutMs = 30_000 +): Promise { + const pollMs = 50; + const start = Date.now(); + while (Date.now() - start < timeoutMs) { + if ((await client.pendingEvents()) === 0) return; + await new Promise((r) => setTimeout(r, pollMs)); + } +} + +function interceptDropCount(logger: Logger): () => number { + let count = 0; + const origError = logger.error; + logger.error = (message?: unknown, ...rest: unknown[]) => { + const match = String(message ?? '').match(/Dropped (\d+) events/); + if (match) count += parseInt(match[1], 10); + origError.call(logger, message, ...rest); + }; + return () => { + logger.error = origError; + return count; + }; +} + +// ============================================================================ +// Main // ============================================================================ async function main() { const args = process.argv.slice(2); let inputStr: string | undefined; - for (let i = 0; i < args.length; i++) { if (args[i] === '--input' && i + 1 < args.length) { inputStr = args[i + 1]; @@ -91,11 +182,7 @@ async function main() { if (!inputStr) { console.log( - JSON.stringify({ - success: false, - error: 'No input provided', - sentBatches: 0, - }) + JSON.stringify({ success: false, error: 'No input provided', sentBatches: 0 }) ); process.exit(1); } @@ -104,228 +191,30 @@ async function main() { try { const input: CLIInput = JSON.parse(inputStr); - - const config: Config = { - writeKey: input.writeKey, - trackAppLifecycleEvents: false, - trackDeepLinks: false, - autoAddSegmentDestination: true, - storePersistor: MemoryPersistor, - storePersistorSaveDelay: 0, - ...(input.apiHost && { - proxy: input.apiHost, - useSegmentEndpoints: true, - }), - ...(input.cdnHost && { - cdnProxy: input.cdnHost, - useSegmentEndpoints: true, - }), - defaultSettings: { - integrations: { - 'Segment.io': { - apiKey: input.writeKey, - apiHost: 'api.segment.io/v1', - }, - }, - }, - ...(input.config?.flushAt !== undefined && { - flushAt: input.config.flushAt, - }), - flushInterval: input.config?.flushInterval ?? 0.1, - ...(input.config?.maxRetries !== undefined && { - httpConfig: { - rateLimitConfig: { - maxRetryCount: input.config.maxRetries, - }, - backoffConfig: { - maxRetryCount: input.config.maxRetries, - }, - }, - }), - }; + const config = buildConfig(input); const store = new SovranStorage({ storeId: input.writeKey, storePersistor: MemoryPersistor, storePersistorSaveDelay: 0, }); - const logger = new Logger(true); const client = new SegmentClient({ config, logger, store }); await client.init(); - // Process event sequences for (const sequence of input.sequences) { if (sequence.delayMs > 0) { await new Promise((resolve) => setTimeout(resolve, sequence.delayMs)); } - for (const evt of sequence.events) { - // Validate event has a type - if (!evt.type) { - console.warn('[WARN] Skipping event: missing event type', evt); - continue; - } - - try { - switch (evt.type) { - case 'track': { - // Required: event name - if (!evt.event || typeof evt.event !== 'string') { - console.warn( - `[WARN] Skipping track event: missing or invalid event name`, - evt - ); - continue; - } - - // Optional: properties (validate if present) - const properties = evt.properties as JsonMap | undefined; - if ( - evt.properties !== undefined && - (evt.properties === null || - Array.isArray(evt.properties) || - typeof evt.properties !== 'object') - ) { - console.warn( - `[WARN] Track event "${evt.event}" has invalid properties, proceeding without them` - ); - } - - await client.track(evt.event, properties); - break; - } - - case 'identify': { - // Optional userId (Segment allows anonymous identify) - // Optional traits (validate if present) - const traits = evt.traits as JsonMap | undefined; - if ( - evt.traits !== undefined && - (evt.traits === null || - Array.isArray(evt.traits) || - typeof evt.traits !== 'object') - ) { - console.warn( - `[WARN] Identify event has invalid traits, proceeding without them` - ); - } - - await client.identify(evt.userId, traits); - break; - } - - case 'screen': - case 'page': { - // RN SDK has no page(); map to screen for cross-SDK test compat - // Required: screen/page name - if (!evt.name || typeof evt.name !== 'string') { - console.warn( - `[WARN] Skipping ${evt.type} event: missing or invalid name`, - evt - ); - continue; - } - - // Optional: properties (validate if present) - const properties = evt.properties as JsonMap | undefined; - if ( - evt.properties !== undefined && - (evt.properties === null || - Array.isArray(evt.properties) || - typeof evt.properties !== 'object') - ) { - console.warn( - `[WARN] Screen "${evt.name}" has invalid properties, proceeding without them` - ); - } - - await client.screen(evt.name, properties); - break; - } - - case 'group': { - // Required: groupId - if (!evt.groupId || typeof evt.groupId !== 'string') { - console.warn( - `[WARN] Skipping group event: missing or invalid groupId`, - evt - ); - continue; - } - - // Optional: traits (validate if present) - const traits = evt.traits as JsonMap | undefined; - if ( - evt.traits !== undefined && - (evt.traits === null || - Array.isArray(evt.traits) || - typeof evt.traits !== 'object') - ) { - console.warn( - `[WARN] Group event for "${evt.groupId}" has invalid traits, proceeding without them` - ); - } - - await client.group(evt.groupId, traits); - break; - } - - case 'alias': { - // Required: userId - if (!evt.userId || typeof evt.userId !== 'string') { - console.warn( - `[WARN] Skipping alias event: missing or invalid userId`, - evt - ); - continue; - } - - await client.alias(evt.userId); - break; - } - - default: - console.warn( - `[WARN] Skipping event: unknown event type "${evt.type}"`, - evt - ); - continue; - } - } catch (error) { - console.error( - `[ERROR] Failed to process ${evt.type} event:`, - error, - evt - ); - continue; - } + await dispatchEvent(client, evt); } } - let permanentDropCount = 0; - const origLoggerError = logger.error; - logger.error = (message?: unknown, ...rest: unknown[]) => { - const msg = String(message ?? ''); - const match = msg.match(/Dropped (\d+) events/); - if (match) { - permanentDropCount += parseInt(match[1], 10); - } - origLoggerError.call(logger, message, ...rest); - }; - + const getDropCount = interceptDropCount(logger); await client.flush(); - - const timeoutMs = 30_000; - const pollMs = 50; - const start = Date.now(); - while (Date.now() - start < timeoutMs) { - const pending = await client.pendingEvents(); - if (pending === 0) break; - await new Promise((r) => setTimeout(r, pollMs)); - } - - logger.error = origLoggerError; + await waitForQueueDrain(client); + const permanentDropCount = getDropCount(); const finalPending = await client.pendingEvents(); const totalEvents = input.sequences.reduce( From c0515d74979a2129d4efacaa0936757120987960 Mon Sep 17 00:00:00 2001 From: Andrea Bueide Date: Thu, 19 Mar 2026 11:31:59 -0500 Subject: [PATCH 8/9] fix(e2e-cli): improve robustness of flush-retry helpers - Return boolean from waitForQueueDrain to indicate drain vs timeout - Add JSDoc for interceptDropCount noting coupling to SegmentDestination log format - Add console.warn in dispatchEvent when events are skipped for missing fields - Add comment on sentBatches approximation Co-Authored-By: Claude Opus 4.6 --- e2e-cli/src/cli.ts | 40 ++++++++++++++++++++++++++++++++-------- 1 file changed, 32 insertions(+), 8 deletions(-) diff --git a/e2e-cli/src/cli.ts b/e2e-cli/src/cli.ts index 2c2e5955e..ffe0ba665 100644 --- a/e2e-cli/src/cli.ts +++ b/e2e-cli/src/cli.ts @@ -118,7 +118,10 @@ async function dispatchEvent( ): Promise { switch (evt.type) { case 'track': - if (!evt.event) return; + if (!evt.event) { + console.warn(`[e2e] skipping track: missing event name`); + return; + } await client.track(evt.event, evt.properties as JsonMap | undefined); break; case 'identify': @@ -126,32 +129,51 @@ async function dispatchEvent( break; case 'screen': case 'page': - if (!evt.name) return; + if (!evt.name) { + console.warn(`[e2e] skipping ${evt.type}: missing name`); + return; + } await client.screen(evt.name, evt.properties as JsonMap | undefined); break; case 'group': - if (!evt.groupId) return; + if (!evt.groupId) { + console.warn(`[e2e] skipping group: missing groupId`); + return; + } await client.group(evt.groupId, evt.traits as JsonMap | undefined); break; case 'alias': - if (!evt.userId) return; + if (!evt.userId) { + console.warn(`[e2e] skipping alias: missing userId`); + return; + } await client.alias(evt.userId); break; + default: + console.warn(`[e2e] skipping event: unknown type "${evt.type}"`); } } +/** Polls pendingEvents() until the queue is empty or timeout. Returns true if drained. */ async function waitForQueueDrain( client: SegmentClient, timeoutMs = 30_000 -): Promise { +): Promise { const pollMs = 50; const start = Date.now(); while (Date.now() - start < timeoutMs) { - if ((await client.pendingEvents()) === 0) return; + if ((await client.pendingEvents()) === 0) return true; await new Promise((r) => setTimeout(r, pollMs)); } + return false; } +/** + * Intercepts logger.error() to count permanently dropped events. + * Coupled to the log format in SegmentDestination.ts — matches: + * "Dropped N events due to permanent errors" + * "Dropped N events due to retry limit exceeded" + */ function interceptDropCount(logger: Logger): () => number { let count = 0; const origError = logger.error; @@ -213,10 +235,10 @@ async function main() { const getDropCount = interceptDropCount(logger); await client.flush(); - await waitForQueueDrain(client); + const drained = await waitForQueueDrain(client); const permanentDropCount = getDropCount(); - const finalPending = await client.pendingEvents(); + const finalPending = drained ? 0 : await client.pendingEvents(); const totalEvents = input.sequences.reduce( (sum, seq) => sum + seq.events.length, 0 @@ -225,6 +247,8 @@ async function main() { 0, totalEvents - finalPending - permanentDropCount ); + // Approximate: SDK doesn't expose actual batch count, so we derive it + // from delivered event count and configured batch size. const sentBatches = delivered > 0 ? Math.ceil(delivered / Math.max(1, input.config?.flushAt ?? 1)) From ec82caaa74e7b360add82d306da5791eca58b8cd Mon Sep 17 00:00:00 2001 From: Andrea Bueide Date: Thu, 19 Mar 2026 11:43:45 -0500 Subject: [PATCH 9/9] feat(core): expose droppedEvents() on SegmentClient Replace fragile log-interception pattern in e2e-cli with a proper counter on SegmentDestination. The CLI now calls client.droppedEvents() instead of monkey-patching logger.error and regex-matching drop messages. - Add droppedEventCount property to SegmentDestination - Add droppedEvents() accessor on SegmentClient (mirrors pendingEvents) - Remove interceptDropCount helper from e2e-cli Co-Authored-By: Claude Opus 4.6 --- e2e-cli/src/cli.ts | 23 +------------------ packages/core/src/analytics.ts | 14 +++++++++++ .../core/src/plugins/SegmentDestination.ts | 1 + 3 files changed, 16 insertions(+), 22 deletions(-) diff --git a/e2e-cli/src/cli.ts b/e2e-cli/src/cli.ts index ffe0ba665..a23a33047 100644 --- a/e2e-cli/src/cli.ts +++ b/e2e-cli/src/cli.ts @@ -168,26 +168,6 @@ async function waitForQueueDrain( return false; } -/** - * Intercepts logger.error() to count permanently dropped events. - * Coupled to the log format in SegmentDestination.ts — matches: - * "Dropped N events due to permanent errors" - * "Dropped N events due to retry limit exceeded" - */ -function interceptDropCount(logger: Logger): () => number { - let count = 0; - const origError = logger.error; - logger.error = (message?: unknown, ...rest: unknown[]) => { - const match = String(message ?? '').match(/Dropped (\d+) events/); - if (match) count += parseInt(match[1], 10); - origError.call(logger, message, ...rest); - }; - return () => { - logger.error = origError; - return count; - }; -} - // ============================================================================ // Main // ============================================================================ @@ -233,10 +213,9 @@ async function main() { } } - const getDropCount = interceptDropCount(logger); await client.flush(); const drained = await waitForQueueDrain(client); - const permanentDropCount = getDropCount(); + const permanentDropCount = client.droppedEvents(); const finalPending = drained ? 0 : await client.pendingEvents(); const totalEvents = input.sequences.reduce( diff --git a/packages/core/src/analytics.ts b/packages/core/src/analytics.ts index 0b496ca3f..c40881751 100644 --- a/packages/core/src/analytics.ts +++ b/packages/core/src/analytics.ts @@ -1048,6 +1048,20 @@ export class SegmentClient { return totalEventsCount; } + + /** + * Method to get count of events permanently dropped by SegmentDestination. + */ + droppedEvents(): number { + let count = 0; + for (const plugin of this.getPlugins()) { + if (plugin instanceof SegmentDestination) { + count += plugin.droppedEventCount; + } + } + return count; + } + private resumeTimeoutId?: ReturnType; private waitingPlugins = new Set(); diff --git a/packages/core/src/plugins/SegmentDestination.ts b/packages/core/src/plugins/SegmentDestination.ts index cc7e911e6..21e9b157a 100644 --- a/packages/core/src/plugins/SegmentDestination.ts +++ b/packages/core/src/plugins/SegmentDestination.ts @@ -22,6 +22,7 @@ export const SEGMENT_DESTINATION_KEY = 'Segment.io'; export class SegmentDestination extends DestinationPlugin { type = PluginType.destination; key = SEGMENT_DESTINATION_KEY; + droppedEventCount = 0; private apiHost?: string; private settingsResolve: () => void; private settingsPromise: Promise;