Skip to content

Commit 2db1755

Browse files
waleedlatif1claude
andcommitted
fix(data-drains): abortable webhook backoff + S3/webhook optional form fields
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
1 parent 48f818d commit 2db1755

2 files changed

Lines changed: 66 additions & 6 deletions

File tree

apps/sim/ee/data-drains/destinations/registry.tsx

Lines changed: 44 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
'use client'
22

33
import type { ComponentType } from 'react'
4-
import { FormField, Input, SecretInput } from '@/components/emcn'
4+
import { FormField, Input, SecretInput, Switch } from '@/components/emcn'
55
import type { CreateDataDrainBody } from '@/lib/api/contracts/data-drains'
66
import type { DestinationType } from '@/lib/data-drains/types'
77

@@ -25,6 +25,8 @@ interface S3State {
2525
bucket: string
2626
region: string
2727
prefix: string
28+
endpoint: string
29+
forcePathStyle: boolean
2830
accessKeyId: string
2931
secretAccessKey: string
3032
}
@@ -35,6 +37,8 @@ const s3FormSpec: DestinationFormSpec<S3State> = {
3537
bucket: '',
3638
region: 'us-east-1',
3739
prefix: '',
40+
endpoint: '',
41+
forcePathStyle: false,
3842
accessKeyId: '',
3943
secretAccessKey: '',
4044
},
@@ -59,6 +63,19 @@ const s3FormSpec: DestinationFormSpec<S3State> = {
5963
placeholder='exports/sim'
6064
/>
6165
</FormField>
66+
<FormField label='Endpoint (optional, S3-compatible stores)'>
67+
<Input
68+
value={state.endpoint}
69+
onChange={(e) => setState({ ...state, endpoint: e.target.value })}
70+
placeholder='https://s3.example.com'
71+
/>
72+
</FormField>
73+
<FormField label='Force path style (MinIO, Ceph)'>
74+
<Switch
75+
checked={state.forcePathStyle}
76+
onCheckedChange={(v) => setState({ ...state, forcePathStyle: v })}
77+
/>
78+
</FormField>
6279
<FormField label='Access key ID'>
6380
<SecretInput
6481
value={state.accessKeyId}
@@ -84,6 +101,8 @@ const s3FormSpec: DestinationFormSpec<S3State> = {
84101
bucket: s.bucket,
85102
region: s.region,
86103
prefix: s.prefix || undefined,
104+
endpoint: s.endpoint || undefined,
105+
forcePathStyle: s.forcePathStyle || undefined,
87106
},
88107
destinationCredentials: {
89108
accessKeyId: s.accessKeyId,
@@ -94,12 +113,14 @@ const s3FormSpec: DestinationFormSpec<S3State> = {
94113

95114
interface WebhookState {
96115
url: string
116+
signatureHeader: string
97117
signingSecret: string
118+
bearerToken: string
98119
}
99120

100121
const webhookFormSpec: DestinationFormSpec<WebhookState> = {
101122
displayName: 'HTTPS webhook',
102-
initialState: { url: '', signingSecret: '' },
123+
initialState: { url: '', signatureHeader: '', signingSecret: '', bearerToken: '' },
103124
FormFields: ({ state, setState }) => (
104125
<>
105126
<FormField label='URL'>
@@ -109,19 +130,38 @@ const webhookFormSpec: DestinationFormSpec<WebhookState> = {
109130
placeholder='https://example.com/sim-drain'
110131
/>
111132
</FormField>
133+
<FormField label='Signature header (optional)'>
134+
<Input
135+
value={state.signatureHeader}
136+
onChange={(e) => setState({ ...state, signatureHeader: e.target.value })}
137+
placeholder='X-Sim-Signature'
138+
/>
139+
</FormField>
112140
<FormField label='Signing secret'>
113141
<SecretInput
114142
value={state.signingSecret}
115143
onChange={(v) => setState({ ...state, signingSecret: v })}
116144
/>
117145
</FormField>
146+
<FormField label='Bearer token (optional)'>
147+
<SecretInput
148+
value={state.bearerToken}
149+
onChange={(v) => setState({ ...state, bearerToken: v })}
150+
/>
151+
</FormField>
118152
</>
119153
),
120154
isComplete: (s) => s.url.length > 0 && s.signingSecret.length >= 8,
121155
toDestinationBranch: (s) => ({
122156
destinationType: 'webhook',
123-
destinationConfig: { url: s.url },
124-
destinationCredentials: { signingSecret: s.signingSecret },
157+
destinationConfig: {
158+
url: s.url,
159+
signatureHeader: s.signatureHeader || undefined,
160+
},
161+
destinationCredentials: {
162+
signingSecret: s.signingSecret,
163+
bearerToken: s.bearerToken || undefined,
164+
},
125165
}),
126166
}
127167

apps/sim/lib/data-drains/destinations/webhook.ts

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import { createHmac } from 'node:crypto'
22
import { createLogger } from '@sim/logger'
33
import { toError } from '@sim/utils/errors'
4-
import { sleep } from '@sim/utils/helpers'
54
import { z } from 'zod'
65
import { validateExternalUrl } from '@/lib/core/security/input-validation'
76
import {
@@ -91,6 +90,27 @@ function sign(body: Buffer, secret: string, timestamp: number): string {
9190
return `t=${timestamp},${SIGNATURE_VERSION}=${hmac}`
9291
}
9392

93+
/**
94+
* Resolves after `ms` or as soon as `signal` aborts, whichever happens first.
95+
* The caller checks `signal.aborted` at the top of the next iteration to
96+
* surface the abort — keeping resolution side-effect-free here.
97+
*/
98+
function sleepUntilAborted(ms: number, signal: AbortSignal): Promise<void> {
99+
if (signal.aborted) return Promise.resolve()
100+
return new Promise((resolve) => {
101+
let timeoutId: ReturnType<typeof setTimeout>
102+
const onAbort = () => {
103+
clearTimeout(timeoutId)
104+
resolve()
105+
}
106+
timeoutId = setTimeout(() => {
107+
signal.removeEventListener('abort', onAbort)
108+
resolve()
109+
}, ms)
110+
signal.addEventListener('abort', onAbort, { once: true })
111+
})
112+
}
113+
94114
function backoffWithJitter(attempt: number, retryAfterMs?: number): number {
95115
if (retryAfterMs !== undefined) {
96116
// Floor at 500ms so a misbehaving server returning Retry-After: 0 cannot
@@ -249,7 +269,7 @@ export const webhookDestination: DrainDestination<
249269
retryAfterMs = parseRetryAfter(response.headers.get('retry-after'))
250270
}
251271
if (attempt < MAX_ATTEMPTS) {
252-
await sleep(backoffWithJitter(attempt, retryAfterMs))
272+
await sleepUntilAborted(backoffWithJitter(attempt, retryAfterMs), signal)
253273
}
254274
}
255275
throw lastError instanceof Error

0 commit comments

Comments
 (0)