Skip to content

Commit f6282d0

Browse files
abueideclaude
andcommitted
fix: wire shutdown lifecycle, handle limit-exceeded signal, age-based pruning
- Override shutdown() instead of standalone destroy() to integrate with the plugin lifecycle — prevents auto-flush timer leak on client cleanup - Handle RetryResult 'limit_exceeded' from RetryManager: log warning and let per-event age pruning (pruneExpiredEvents via _queuedAt) handle event drops rather than dropping all retryable events on global counter reset - Import RetryResult type for type-safe limit checking Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent c642fc8 commit f6282d0

File tree

1 file changed

+35
-14
lines changed

1 file changed

+35
-14
lines changed

packages/core/src/plugins/SegmentDestination.ts

Lines changed: 35 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import { QueueFlushingPlugin } from './QueueFlushingPlugin';
1515
import { defaultApiHost, defaultConfig } from '../constants';
1616
import { translateHTTPError, classifyError, parseRetryAfter } from '../errors';
1717
import { RetryManager } from '../backoff/RetryManager';
18+
import type { RetryResult } from '../backoff';
1819

1920
const MAX_EVENTS_PER_BATCH = 100;
2021
const MAX_PAYLOAD_SIZE_IN_KB = 500;
@@ -100,9 +101,19 @@ export class SegmentDestination extends DestinationPlugin {
100101
retryAfterSeconds: retryAfterSeconds ?? 60,
101102
};
102103
} else if (classification.errorType === 'transient') {
103-
return { batch, messageIds, status: 'transient', statusCode: res.status };
104+
return {
105+
batch,
106+
messageIds,
107+
status: 'transient',
108+
statusCode: res.status,
109+
};
104110
} else {
105-
return { batch, messageIds, status: 'permanent', statusCode: res.status };
111+
return {
112+
batch,
113+
messageIds,
114+
status: 'permanent',
115+
statusCode: res.status,
116+
};
106117
}
107118
} catch (e) {
108119
this.analytics?.reportInternalError(translateHTTPError(e));
@@ -146,8 +157,7 @@ export class SegmentDestination extends DestinationPlugin {
146157
private async pruneExpiredEvents(
147158
events: SegmentEvent[]
148159
): Promise<SegmentEvent[]> {
149-
const maxAge =
150-
this.httpConfig?.backoffConfig?.maxTotalBackoffDuration ?? 0;
160+
const maxAge = this.httpConfig?.backoffConfig?.maxTotalBackoffDuration ?? 0;
151161
if (maxAge <= 0) {
152162
return events;
153163
}
@@ -180,26 +190,30 @@ export class SegmentDestination extends DestinationPlugin {
180190
/**
181191
* Update retry state based on aggregated batch results.
182192
* 429 takes precedence over transient errors.
193+
* Returns true if retry limits were exceeded (caller should drop events).
183194
*/
184195
private async updateRetryState(
185196
aggregation: ErrorAggregation
186-
): Promise<void> {
197+
): Promise<boolean> {
187198
if (!this.retryManager) {
188-
return;
199+
return false;
189200
}
190201

191202
const has429 = aggregation.rateLimitResults.length > 0;
203+
let result: RetryResult | undefined;
192204

193205
if (has429) {
194206
// Each call lets RetryManager.applyRetryStrategy consolidate wait times
195-
for (const result of aggregation.rateLimitResults) {
196-
await this.retryManager.handle429(result.retryAfterSeconds ?? 60);
207+
for (const r of aggregation.rateLimitResults) {
208+
result = await this.retryManager.handle429(r.retryAfterSeconds ?? 60);
197209
}
198210
} else if (aggregation.hasTransientError) {
199-
await this.retryManager.handleTransientError();
211+
result = await this.retryManager.handleTransientError();
200212
} else if (aggregation.successfulMessageIds.length > 0) {
201213
await this.retryManager.reset();
202214
}
215+
216+
return result === 'limit_exceeded';
203217
}
204218

205219
private sendEvents = async (events: SegmentEvent[]): Promise<void> => {
@@ -235,7 +249,7 @@ export class SegmentDestination extends DestinationPlugin {
235249

236250
const aggregation = this.aggregateErrors(results);
237251

238-
await this.updateRetryState(aggregation);
252+
const limitExceeded = await this.updateRetryState(aggregation);
239253

240254
if (aggregation.successfulMessageIds.length > 0) {
241255
await this.queuePlugin.dequeueByMessageIds(
@@ -257,6 +271,16 @@ export class SegmentDestination extends DestinationPlugin {
257271
);
258272
}
259273

274+
// When retry limits are exceeded, the RetryManager resets to READY.
275+
// We do NOT drop events here — individual events are only dropped when
276+
// they exceed maxTotalBackoffDuration via pruneExpiredEvents (per-event age).
277+
// The global retry counter reset just allows the next flush cycle to retry.
278+
if (limitExceeded) {
279+
this.analytics?.logger.warn(
280+
'Retry limits exceeded, counter reset. Stale events will be pruned by age on next flush.'
281+
);
282+
}
283+
260284
const failedCount =
261285
events.length -
262286
aggregation.successfulMessageIds.length -
@@ -356,10 +380,7 @@ export class SegmentDestination extends DestinationPlugin {
356380
return this.queuePlugin.flush();
357381
}
358382

359-
/**
360-
* Clean up resources. Clears RetryManager auto-flush timer.
361-
*/
362-
destroy(): void {
383+
shutdown(): void {
363384
this.retryManager?.destroy();
364385
}
365386
}

0 commit comments

Comments
 (0)