Skip to content

Commit 57f6571

Browse files
waleedlatif1claude
andcommitted
refactor(data-drains): trim extraneous comments and defensive code
Audit cleanup before merge: - service: drop chunk-empty defensive skip (sources already handle it), trim WHAT-comments - dispatcher: tighten claim-race / rollback / enterprise-cache rationale to a single WHY each - access: collapse the duplicated module-top + inline comments into one TSDoc on the gate function - s3: fix orphaned doc block over assertEndpointIsPublic, soften the forcePathStyle TSDoc to match the actual default - webhook: drop empty close() comment - docs: clarify that drain reads also require owner/admin, drop the "on the dispatcher tick" implementation detail Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
1 parent 4fe0d0e commit 57f6571

6 files changed

Lines changed: 33 additions & 58 deletions

File tree

apps/docs/content/docs/en/enterprise/data-drains.mdx

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ description: Continuously export workflow logs, audit logs, and Mothership data
55

66
import { FAQ } from '@/components/ui/faq'
77

8-
Data Drains let organization owners and admins on Enterprise plans continuously export Sim data to a destination they control — a customer-owned S3 bucket or an HTTPS webhook. A drain runs on a schedule, picks up only new rows since its last successful run, and writes them as NDJSON to the destination.
8+
Data Drains let organization owners and admins on Enterprise plans continuously export Sim data to a destination they control — a customer-owned S3 bucket or an HTTPS webhook. A drain runs on a schedule, picks up only new rows since its last successful run, and writes them as NDJSON to the destination. Viewing drain configuration and run history is restricted to owners and admins as well, since destinations expose internal bucket names and webhook URLs.
99

1010
Drains pair naturally with [Data Retention](/enterprise/data-retention): drain into long-term storage first, then let retention safely delete from Sim.
1111

@@ -32,7 +32,7 @@ A drain exports exactly one source. To export multiple sources, create multiple
3232
|---|---|
3333
| **Workflow logs** | Workflow execution records (one row per execution, only after the run reaches a terminal state). |
3434
| **Job logs** | Background job records (deployed APIs, schedules, webhooks). Only terminal-state rows are exported. |
35-
| **Audit logs** | Organization and workspace audit events — logins, permission changes, resource creation/deletion, drain configuration changes. |
35+
| **Audit logs** | Organization- and workspace-scoped audit events — logins, permission changes, resource creation/deletion, drain configuration changes. |
3636
| **Copilot chats** | Mothership chat history. |
3737
| **Copilot runs** | Mothership run records (terminal state only). |
3838

@@ -98,7 +98,7 @@ Failed deliveries retry up to 3 times with exponential backoff (500ms, 1s, 2s wi
9898

9999
| Cadence | Drain runs |
100100
|---|---|
101-
| **Hourly** | Once per hour, on the dispatcher tick. |
101+
| **Hourly** | Once per hour. |
102102
| **Daily** | Once per day. |
103103

104104
You can also disable a drain with the **Enabled** toggle (it stops running but is preserved), or trigger an out-of-schedule run with **Run now** on any drain row.
@@ -127,7 +127,7 @@ The **last 10 runs** for each drain are visible by expanding its row in the sett
127127
<FAQ items={[
128128
{
129129
question: "Who can configure data drains?",
130-
answer: "Only organization owners and admins can create, edit, run, or delete drains. On Sim Cloud, the organization must be on an Enterprise plan."
130+
answer: "Only organization owners and admins can view, create, edit, run, or delete drains. On Sim Cloud, the organization must be on an Enterprise plan."
131131
},
132132
{
133133
question: "Will drained data be duplicated if a run fails?",

apps/sim/lib/data-drains/access.ts

Lines changed: 5 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,6 @@ import { getSession } from '@/lib/auth'
66
import { isOrganizationOnEnterprisePlan } from '@/lib/billing/core/subscription'
77
import { isBillingEnabled, isDataDrainsEnabled } from '@/lib/core/config/feature-flags'
88

9-
/**
10-
* On Sim Cloud (billing enabled), enterprise plan is the gate. On self-hosted
11-
* deployments, owners opt in by setting `DATA_DRAINS_ENABLED=true` — and
12-
* routes 404 until they do, so enterprise customers don't accidentally expose
13-
* mutating endpoints just by deploying a newer image.
14-
*/
15-
169
export interface DrainAccessSession {
1710
user: {
1811
id: string
@@ -29,10 +22,11 @@ export type DrainAccessResult =
2922
| { ok: false; response: NextResponse }
3023

3124
/**
32-
* Centralizes the auth + membership + role + enterprise-plan gates that all
33-
* data-drain routes share. Owner/admin role is required for both reads and
34-
* writes — drain configs expose customer-controlled bucket names and webhook
35-
* URLs, which a regular member shouldn't be able to enumerate.
25+
* Auth + membership + role + enterprise-plan gate shared by every data-drain
26+
* route. Owner/admin role is required for reads as well as writes since drain
27+
* configs expose customer bucket names and webhook URLs. On Sim Cloud the
28+
* gate is the Enterprise plan; on self-hosted it's `DATA_DRAINS_ENABLED`,
29+
* which 404s when unset so a newer image doesn't silently expose drains.
3630
*/
3731
export async function authorizeDrainAccess(
3832
organizationId: string,
@@ -59,10 +53,6 @@ export async function authorizeDrainAccess(
5953
}
6054
}
6155

62-
// Feature-flag and enterprise-plan gates apply to reads as well as writes —
63-
// drain configs (bucket names, webhook URLs) are sensitive enough that an
64-
// org member shouldn't be able to enumerate them on a deployment that
65-
// hasn't opted in or after a downgrade off Enterprise.
6656
if (!isBillingEnabled && !isDataDrainsEnabled) {
6757
return {
6858
ok: false,

apps/sim/lib/data-drains/destinations/s3.ts

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,8 @@ const s3ConfigSchema = z.object({
3232
})
3333
.optional(),
3434
/**
35-
* Force path-style addressing. MinIO and Ceph RGW need `true`; AWS S3 and
36-
* Cloudflare R2 need `false`. No auto-default — explicit choice avoids
37-
* silent breakage when a user points at a non-MinIO endpoint.
35+
* Force path-style addressing. Set `true` for MinIO / Ceph RGW; defaults
36+
* to `false` for AWS S3 and Cloudflare R2.
3837
*/
3938
forcePathStyle: z.boolean().optional(),
4039
})
@@ -97,18 +96,12 @@ function isS3ServiceException(error: unknown): error is S3ServiceException {
9796
)
9897
}
9998

100-
/**
101-
* Wraps an S3 SDK call so customers see actionable error codes
102-
* (`AccessDenied`, `NoSuchBucket`, `InvalidAccessKeyId`,
103-
* `SignatureDoesNotMatch`, ...) instead of opaque "UnknownError".
104-
*/
10599
/**
106100
* Resolves the optional custom endpoint and confirms it does not point at a
107101
* private, loopback, or cloud-metadata address. The schema-level
108102
* `validateExternalUrl` only catches IP literals, so a hostname like
109103
* `evil.example.com` resolving to `169.254.169.254` would slip past it; the
110-
* AWS SDK then resolves the host itself, bypassing the SSRF guard. This is
111-
* the same DNS-aware check used by the webhook destination.
104+
* AWS SDK then resolves the host itself, bypassing the SSRF guard.
112105
*/
113106
async function assertEndpointIsPublic(endpoint: string | undefined): Promise<void> {
114107
if (!endpoint) return
@@ -118,6 +111,11 @@ async function assertEndpointIsPublic(endpoint: string | undefined): Promise<voi
118111
}
119112
}
120113

114+
/**
115+
* Surfaces actionable S3 SDK error codes (`AccessDenied`, `NoSuchBucket`,
116+
* `InvalidAccessKeyId`, `SignatureDoesNotMatch`, ...) and preserves the
117+
* original error as `cause` so callers can still branch on `code`/`$metadata`.
118+
*/
121119
async function withS3ErrorContext<T>(action: string, fn: () => Promise<T>): Promise<T> {
122120
try {
123121
return await fn()
@@ -147,9 +145,8 @@ export const s3Destination: DrainDestination<S3DestinationConfig, S3DestinationC
147145
async test({ config, credentials, signal }) {
148146
await assertEndpointIsPublic(config.endpoint)
149147
const client = buildClient(config, credentials)
150-
// Probe with an actual write — HeadBucket only checks read/list and
151-
// produces both false positives (read-only creds pass, deliver later
152-
// fails) and false negatives (write-only IAM policies fail).
148+
// Probe with a real write so read-only creds and write-only IAM policies
149+
// surface here instead of at the first scheduled run.
153150
const probeKey = `${normalizePrefix(config.prefix)}.sim-drain-write-probe/${generateShortId(12)}`
154151
try {
155152
await withS3ErrorContext('test-put', () =>

apps/sim/lib/data-drains/destinations/webhook.ts

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -256,10 +256,7 @@ export const webhookDestination: DrainDestination<
256256
? lastError
257257
: new Error('Webhook delivery failed after retries')
258258
},
259-
async close() {
260-
// No persistent state to release; the pinned-IP agent is created
261-
// per-request inside secureFetchWithPinnedIP.
262-
},
259+
async close() {},
263260
}
264261
},
265262
}

apps/sim/lib/data-drains/dispatcher.ts

Lines changed: 10 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -84,11 +84,9 @@ export async function dispatchDueDrains(now: Date = new Date()): Promise<{
8484
return { candidates: 0, dispatched: 0, skipped: 0, reaped }
8585
}
8686

87-
// Cache enterprise checks per-org since we may have several drains per org.
88-
// Self-hosted (no billing) deployments have no subscription infrastructure,
89-
// so the enterprise predicate would return false for every org and silently
90-
// skip every drain — short-circuit to allow when billing is disabled, and
91-
// let the cron route's `DATA_DRAINS_ENABLED` gate own the global on/off.
87+
// Self-hosted deployments have no subscription infra; `DATA_DRAINS_ENABLED`
88+
// is the global on/off there. Cache per-org so a multi-drain org pays one
89+
// billing lookup.
9290
const enterpriseCache = new Map<string, boolean>()
9391
const isEnterprise = async (orgId: string): Promise<boolean> => {
9492
if (!isBillingEnabled) return true
@@ -131,27 +129,22 @@ export async function dispatchDueDrains(now: Date = new Date()): Promise<{
131129
.where(and(eq(dataDrains.id, candidate.id), duePredicate))
132130
.returning({ id: dataDrains.id })
133131

134-
if (claimed.length === 0) {
135-
// Lost the race — another invocation already claimed this drain.
136-
continue
137-
}
132+
if (claimed.length === 0) continue
138133

139134
try {
140-
// concurrencyKey serializes runs of the same drain end-to-end on the
141-
// job queue, so even if a manual run-now races with a cron claim the
142-
// queue backend won't execute both in parallel.
135+
// concurrencyKey serializes runs of the same drain on the job queue, so
136+
// a manual run-now racing a cron claim can never execute in parallel.
143137
await queue.enqueue(
144138
'run-data-drain',
145139
{ drainId: candidate.id, trigger: 'cron' },
146140
{ concurrencyKey: `data-drain:${candidate.id}` }
147141
)
148142
dispatched++
149143
} catch (error) {
150-
// Roll back the claim — otherwise a transient queue outage silently
151-
// delays this drain by a full cadence. We only revert if no other
152-
// process has advanced lastRunAt past our claim timestamp. Guard the
153-
// rollback itself so a transient DB error here doesn't abort the
154-
// remaining candidates in the batch.
144+
// Roll back the claim so a transient queue outage doesn't delay this
145+
// drain by a full cadence. Scoped to our own claim timestamp so it
146+
// can't trample a concurrent advance. The rollback itself is guarded
147+
// so a DB error here doesn't abort the rest of the batch.
155148
try {
156149
await db
157150
.update(dataDrains)

apps/sim/lib/data-drains/service.ts

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,6 @@ export async function runDrain(
8888
chunkSize: CHUNK_SIZE,
8989
signal,
9090
})) {
91-
if (chunk.length === 0) continue
9291
const ndjson = `${chunk.map((row) => JSON.stringify(source.serialize(row))).join('\n')}\n`
9392
const body = Buffer.from(ndjson, 'utf8')
9493

@@ -177,16 +176,15 @@ export async function runDrain(
177176
finishedAt,
178177
rowsExported,
179178
bytesWritten,
180-
cursorAfter: cursorBefore, // cursor not advanced on failure
179+
cursorAfter: cursorBefore,
181180
locators,
182181
error: message.slice(0, 4000),
183182
})
184183
.where(eq(dataDrainRuns.id, runId))
185184
})
186185
} catch (statusError) {
187-
// Don't let a failed status-update mask the real delivery error —
188-
// the reaper will eventually rewrite the row to `failed`. Log so we
189-
// can spot DB outages that would otherwise hide behind delivery errors.
186+
// Best-effort status write — the reaper repairs stuck rows. Log so DB
187+
// outages don't hide behind the original delivery error.
190188
logger.error('Failed to record data drain failure status', {
191189
drainId,
192190
runId,

0 commit comments

Comments
 (0)