Skip to content

Commit db5d298

Browse files
committed
feat(enterprise): add data drains for continuous export to S3 / webhook
Data drains let enterprise organizations continuously export Sim data (workflow logs, job logs, audit logs, copilot chats, copilot runs) to customer-controlled S3 buckets or HTTPS webhooks on hourly or daily schedules. Pairs with data retention to satisfy long-term compliance archives. Built around two registries (DrainSource + DrainDestination) so adding new sources or destinations is a single-file change. Cursor-based at-least-once delivery; cursor advances only on full success and rows carry stable ids so consumers can dedupe. Includes SSRF-validated webhooks with DNS pinning, HMAC-SHA256 timestamp signatures, S3 server-side encryption, audit logging on every config and run change, and self-hosted env var gating that mirrors data retention.
1 parent 9eeb1b2 commit db5d298

52 files changed

Lines changed: 19786 additions & 3 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.
Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
---
2+
title: Data Drains
3+
description: Continuously export workflow logs, audit logs, and Mothership data to your own S3 bucket or HTTPS endpoint on a schedule
4+
---
5+
6+
import { FAQ } from '@/components/ui/faq'
7+
8+
Data Drains let organization owners and admins on Enterprise plans continuously export Sim data to a destination they control — a customer-owned S3 bucket or an HTTPS webhook. A drain runs on a schedule, picks up only new rows since its last successful run, and writes them as NDJSON to the destination.
9+
10+
Drains pair naturally with [Data Retention](/enterprise/data-retention): drain into long-term storage first, then let retention safely delete from Sim.
11+
12+
---
13+
14+
## Setup
15+
16+
Go to **Settings → Enterprise → Data Drains** in your workspace, then click **New drain**.
17+
18+
Each drain has four pieces:
19+
20+
1. A **source** — the category of data to export
21+
2. A **destination** — where the data goes
22+
3. A **schedule** — how often it runs
23+
4. A **name** — unique within your organization
24+
25+
---
26+
27+
## Sources
28+
29+
A drain exports exactly one source. To export multiple sources, create multiple drains.
30+
31+
| Source | Description |
32+
|---|---|
33+
| **Workflow logs** | Workflow execution records (one row per execution, only after the run reaches a terminal state). |
34+
| **Job logs** | Background job records (deployed APIs, schedules, webhooks). Only terminal-state rows are exported. |
35+
| **Audit logs** | Organization and workspace audit events — logins, permission changes, resource creation/deletion, drain configuration changes. |
36+
| **Copilot chats** | Mothership chat history. |
37+
| **Copilot runs** | Mothership run records (terminal state only). |
38+
39+
Each row is delivered as a single line of NDJSON. The shape of each row is part of the public schema and stable across versions; every row carries an `id` field that downstream consumers can use to dedupe.
40+
41+
---
42+
43+
## Destinations
44+
45+
### Amazon S3 (or any S3-compatible store)
46+
47+
Writes one NDJSON object per delivered chunk to your bucket.
48+
49+
- **Bucket** — the bucket name. Must already exist; Sim does not create buckets.
50+
- **Region** — AWS region (e.g. `us-east-1`).
51+
- **Prefix** *(optional)* — folder path inside the bucket. Trailing slash optional.
52+
- **Access key ID / Secret access key** — IAM credentials with `s3:PutObject` on the bucket. The "Test connection" button performs a real write probe to verify, then deletes it.
53+
- **Endpoint** *(optional)* — for non-AWS stores like MinIO, Cloudflare R2, or GCS S3-interop. Leave blank for AWS S3.
54+
- **Force path-style** *(optional)* — required for MinIO/Ceph, must be off for AWS S3 and R2.
55+
56+
Object keys are deterministic:
57+
58+
```
59+
{prefix}/{source}/{drainId}/{yyyy}/{mm}/{dd}/{runId}-{seq}.ndjson
60+
```
61+
62+
Objects are written with `AES256` server-side encryption.
63+
64+
### HTTPS Webhook
65+
66+
POSTs each chunk as NDJSON to your endpoint.
67+
68+
- **URL** — must be HTTPS. Sim resolves the hostname and refuses to deliver to private, loopback, or cloud-metadata IPs. The resolved IP is pinned for the duration of a run to prevent DNS rebinding.
69+
- **Signing secret** — shared secret used for HMAC-SHA256 signing.
70+
- **Bearer token** *(optional)* — sent as `Authorization: Bearer <token>`.
71+
- **Signature header name** *(optional)* — defaults to `X-Sim-Signature`.
72+
73+
Each request includes:
74+
75+
```
76+
Content-Type: application/x-ndjson
77+
User-Agent: Sim-DataDrain/1.0
78+
X-Sim-Timestamp: <unix-seconds>
79+
X-Sim-Signature-Version: v1
80+
X-Sim-Signature: t=<unix-seconds>,v1=<hex(hmac-sha256)>
81+
X-Sim-Drain-Id: <drain id>
82+
X-Sim-Run-Id: <run id>
83+
X-Sim-Source: <source name>
84+
X-Sim-Sequence: <chunk index>
85+
X-Sim-Row-Count: <rows in this chunk>
86+
Idempotency-Key: <runId>-<sequence>
87+
```
88+
89+
The signature is computed as `HMAC-SHA256(secret, "${timestamp}.${body}")` and serialized as `t=<timestamp>,v1=<hex>`. Verify by recomputing over the same string and rejecting timestamps older than ~5 minutes — this defends against captured-request replay attacks.
90+
91+
Failed deliveries retry up to 3 times with exponential backoff (500ms, 1s, 2s with ±20% jitter), respecting `Retry-After` on 429/503. Non-retryable 4xx responses fail the run immediately.
92+
93+
---
94+
95+
## Schedule
96+
97+
| Cadence | Drain runs |
98+
|---|---|
99+
| **Hourly** | Once per hour, on the dispatcher tick. |
100+
| **Daily** | Once per day. |
101+
102+
You can also disable a drain with the **Enabled** toggle (it stops running but is preserved), or trigger an out-of-schedule run with **Run now** on any drain row.
103+
104+
---
105+
106+
## Delivery semantics
107+
108+
Drains use an **opaque cursor** that advances only on full success. If a delivery fails partway through a run, the cursor is unchanged and the next run replays from the last successful position.
109+
110+
This is **at-least-once delivery**. Combined with the `id` field on every row and the `Idempotency-Key` header on every webhook chunk, downstream systems can dedupe deterministically.
111+
112+
The **last 10 runs** for each drain are visible by expanding its row in the settings page, with status, row count, bytes written, destination locator (`s3://...` or webhook URL), and the error message if it failed.
113+
114+
---
115+
116+
## Security
117+
118+
- Destination credentials are encrypted at rest using the same key-rotation–aware encryption that protects OAuth tokens.
119+
- Credentials are **never** returned by the Sim API after creation. Updates accept new credentials; omitting them leaves the existing encrypted blob in place.
120+
- Webhook URLs are SSRF-validated: HTTPS-only, no private/loopback/metadata IPs, with the resolved IP pinned to defeat DNS rebinding.
121+
- Every create, update, delete, manual run, and test-connection call is recorded in the [Audit Log](/enterprise/audit-logs).
122+
123+
---
124+
125+
<FAQ items={[
126+
{
127+
question: "Who can configure data drains?",
128+
answer: "Only organization owners and admins can create, edit, run, or delete drains. On Sim Cloud, the organization must be on an Enterprise plan."
129+
},
130+
{
131+
question: "Will drained data be duplicated if a run fails?",
132+
answer: "The drain cursor only advances on overall success, so a failure replays the same chunks on the next run. Every row has a stable `id` field and every webhook chunk has an `Idempotency-Key` header so receivers can dedupe."
133+
},
134+
{
135+
question: "Can I export multiple sources to the same destination?",
136+
answer: "Yes — create one drain per source, all pointing at the same bucket or endpoint. S3 destinations namespace by source automatically; webhook receivers can branch on the `X-Sim-Source` header."
137+
},
138+
{
139+
question: "Does deleting a drain delete the data already exported?",
140+
answer: "No. Deletion only removes the drain's configuration and its run history from Sim. Data already written to your bucket or sent to your webhook is yours and is unaffected."
141+
},
142+
{
143+
question: "What happens if my credentials stop working mid-run?",
144+
answer: "The run fails, the drain cursor does not advance, and the failed run is recorded with the error. Once you fix the credentials with an Update or by re-creating the drain, the next run replays from where the last successful run left off."
145+
},
146+
{
147+
question: "What format is the data in?",
148+
answer: "NDJSON — newline-delimited JSON, one row per line. Each chunk is a single S3 object or a single POST body."
149+
}
150+
]} />
151+
152+
---
153+
154+
## Self-hosted setup
155+
156+
### Environment variables
157+
158+
```bash
159+
DATA_DRAINS_ENABLED=true
160+
NEXT_PUBLIC_DATA_DRAINS_ENABLED=true
161+
```
162+
163+
`NEXT_PUBLIC_DATA_DRAINS_ENABLED` shows the **Settings → Enterprise → Data Drains** page in the UI. `DATA_DRAINS_ENABLED` is reserved for server-side feature gating on self-hosted deployments. Both should be set to `true` together.
164+
165+
Data Drains otherwise rely on the standard Trigger.dev background job infrastructure used elsewhere in Sim — no additional setup is required. The cron dispatcher runs hourly and fans out due drains as background jobs.

apps/docs/content/docs/en/enterprise/index.mdx

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,12 @@ Configure how long execution logs, soft-deleted resources, and Mothership data a
5959

6060
---
6161

62+
## Data Drains
63+
64+
Continuously export workflow logs, audit logs, and Mothership data to a customer-owned S3 bucket or HTTPS webhook on a schedule. See the [data drains guide](/docs/enterprise/data-drains).
65+
66+
---
67+
6268
<FAQ items={[
6369
{ question: "Who can manage Enterprise features?", answer: "Workspace admins on an Enterprise-entitled workspace. Access Control, SSO, whitelabeling, audit logs, and data retention are all configured per workspace under Settings → Enterprise." },
6470
{ question: "Which SSO providers are supported?", answer: "Sim supports SAML 2.0 and OIDC, which works with virtually any enterprise identity provider including Okta, Azure AD (Entra ID), Google Workspace, ADFS, and OneLogin." },
@@ -79,6 +85,7 @@ Self-hosted deployments enable enterprise features via environment variables ins
7985
| `WHITELABELING_ENABLED`, `NEXT_PUBLIC_WHITELABELING_ENABLED` | Custom branding |
8086
| `AUDIT_LOGS_ENABLED`, `NEXT_PUBLIC_AUDIT_LOGS_ENABLED` | Audit logging |
8187
| `NEXT_PUBLIC_DATA_RETENTION_ENABLED` | Data retention configuration |
88+
| `DATA_DRAINS_ENABLED`, `NEXT_PUBLIC_DATA_DRAINS_ENABLED` | Data drains |
8289
| `CREDENTIAL_SETS_ENABLED`, `NEXT_PUBLIC_CREDENTIAL_SETS_ENABLED` | Polling groups for email triggers |
8390
| `INBOX_ENABLED`, `NEXT_PUBLIC_INBOX_ENABLED` | Sim Mailer inbox |
8491
| `DISABLE_INVITATIONS`, `NEXT_PUBLIC_DISABLE_INVITATIONS` | Disable invitations; manage membership via Admin API |
Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,13 @@
11
{
22
"title": "Enterprise",
3-
"pages": ["index", "sso", "access-control", "whitelabeling", "audit-logs", "data-retention"],
3+
"pages": [
4+
"index",
5+
"sso",
6+
"access-control",
7+
"whitelabeling",
8+
"audit-logs",
9+
"data-retention",
10+
"data-drains"
11+
],
412
"defaultOpen": false
513
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
import { createLogger } from '@sim/logger'
2+
import { toError } from '@sim/utils/errors'
3+
import { type NextRequest, NextResponse } from 'next/server'
4+
import { verifyCronAuth } from '@/lib/auth/internal'
5+
import { withRouteHandler } from '@/lib/core/utils/with-route-handler'
6+
import { dispatchDueDrains } from '@/lib/data-drains/dispatcher'
7+
8+
const logger = createLogger('CronRunDataDrains')
9+
10+
export const GET = withRouteHandler(async (request: NextRequest) => {
11+
const authError = verifyCronAuth(request, 'Data drain dispatcher')
12+
if (authError) return authError
13+
14+
try {
15+
const result = await dispatchDueDrains()
16+
logger.info('Data drain dispatcher run complete', result)
17+
return NextResponse.json({ success: true, ...result })
18+
} catch (error) {
19+
logger.error('Data drain dispatcher run failed', { error: toError(error).message })
20+
return NextResponse.json({ error: 'Internal server error' }, { status: 500 })
21+
}
22+
})
Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
import { AuditAction, AuditResourceType, recordAudit } from '@sim/audit'
2+
import { db } from '@sim/db'
3+
import { dataDrains } from '@sim/db/schema'
4+
import { createLogger } from '@sim/logger'
5+
import { and, eq, ne } from 'drizzle-orm'
6+
import { type NextRequest, NextResponse } from 'next/server'
7+
import {
8+
deleteDataDrainContract,
9+
getDataDrainContract,
10+
updateDataDrainContract,
11+
} from '@/lib/api/contracts/data-drains'
12+
import { parseRequest, validationErrorResponse } from '@/lib/api/server'
13+
import { withRouteHandler } from '@/lib/core/utils/with-route-handler'
14+
import { authorizeDrainAccess, loadDrain } from '@/lib/data-drains/access'
15+
import { getDestination } from '@/lib/data-drains/destinations/registry'
16+
import { encryptCredentials } from '@/lib/data-drains/encryption'
17+
import { serializeDrain } from '@/lib/data-drains/serializers'
18+
19+
const logger = createLogger('DataDrainAPI')
20+
21+
type RouteContext = { params: Promise<{ id: string; drainId: string }> }
22+
23+
export const GET = withRouteHandler(async (request: NextRequest, context: RouteContext) => {
24+
const { id: organizationId, drainId } = await context.params
25+
const access = await authorizeDrainAccess(organizationId, { requireMutating: false })
26+
if (!access.ok) return access.response
27+
28+
const parsed = await parseRequest(getDataDrainContract, request, context)
29+
if (!parsed.success) return parsed.response
30+
31+
const drain = await loadDrain(organizationId, drainId)
32+
if (!drain) {
33+
return NextResponse.json({ error: 'Data drain not found' }, { status: 404 })
34+
}
35+
return NextResponse.json({ drain: serializeDrain(drain) })
36+
})
37+
38+
export const PUT = withRouteHandler(async (request: NextRequest, context: RouteContext) => {
39+
const { id: organizationId, drainId } = await context.params
40+
const access = await authorizeDrainAccess(organizationId, { requireMutating: true })
41+
if (!access.ok) return access.response
42+
43+
const parsed = await parseRequest(updateDataDrainContract, request, context)
44+
if (!parsed.success) return parsed.response
45+
46+
const body = parsed.data.body
47+
48+
const drain = await loadDrain(organizationId, drainId)
49+
if (!drain) {
50+
return NextResponse.json({ error: 'Data drain not found' }, { status: 404 })
51+
}
52+
53+
if (body.name !== undefined && body.name !== drain.name) {
54+
const [conflict] = await db
55+
.select({ id: dataDrains.id })
56+
.from(dataDrains)
57+
.where(
58+
and(
59+
eq(dataDrains.organizationId, organizationId),
60+
eq(dataDrains.name, body.name),
61+
ne(dataDrains.id, drainId)
62+
)
63+
)
64+
.limit(1)
65+
if (conflict) {
66+
return NextResponse.json(
67+
{ error: 'A data drain with this name already exists in this organization' },
68+
{ status: 409 }
69+
)
70+
}
71+
}
72+
73+
if (body.source !== undefined && body.source !== drain.source) {
74+
return NextResponse.json({ error: 'source cannot be changed after creation' }, { status: 400 })
75+
}
76+
77+
const updates: Partial<typeof dataDrains.$inferInsert> = { updatedAt: new Date() }
78+
if (body.name !== undefined) updates.name = body.name
79+
if (body.scheduleCadence !== undefined) updates.scheduleCadence = body.scheduleCadence
80+
if (body.enabled !== undefined) updates.enabled = body.enabled
81+
82+
if (body.destinationType !== undefined && body.destinationType !== drain.destinationType) {
83+
return NextResponse.json(
84+
{ error: 'destinationType cannot be changed after creation' },
85+
{ status: 400 }
86+
)
87+
}
88+
if (body.destinationConfig !== undefined || body.destinationCredentials !== undefined) {
89+
const destination = getDestination(drain.destinationType)
90+
if (body.destinationConfig !== undefined) {
91+
const configResult = destination.configSchema.safeParse(body.destinationConfig)
92+
if (!configResult.success) return validationErrorResponse(configResult.error)
93+
updates.destinationConfig = configResult.data as Record<string, unknown>
94+
}
95+
if (body.destinationCredentials !== undefined) {
96+
const credentialsResult = destination.credentialsSchema.safeParse(body.destinationCredentials)
97+
if (!credentialsResult.success) return validationErrorResponse(credentialsResult.error)
98+
updates.destinationCredentials = await encryptCredentials(credentialsResult.data)
99+
}
100+
}
101+
102+
const [updated] = await db
103+
.update(dataDrains)
104+
.set(updates)
105+
.where(eq(dataDrains.id, drainId))
106+
.returning()
107+
108+
logger.info('Data drain updated', { drainId, organizationId })
109+
110+
recordAudit({
111+
workspaceId: null,
112+
actorId: access.session.user.id,
113+
action: AuditAction.DATA_DRAIN_UPDATED,
114+
resourceType: AuditResourceType.DATA_DRAIN,
115+
resourceId: drainId,
116+
actorName: access.session.user.name ?? undefined,
117+
actorEmail: access.session.user.email ?? undefined,
118+
resourceName: updated.name,
119+
description: `Updated data drain '${updated.name}'`,
120+
metadata: {
121+
organizationId,
122+
changes: {
123+
name: body.name,
124+
source: body.source,
125+
scheduleCadence: body.scheduleCadence,
126+
enabled: body.enabled,
127+
destinationConfigChanged: body.destinationConfig !== undefined,
128+
destinationCredentialsChanged: body.destinationCredentials !== undefined,
129+
},
130+
},
131+
request,
132+
})
133+
134+
return NextResponse.json({ drain: serializeDrain(updated) })
135+
})
136+
137+
export const DELETE = withRouteHandler(async (request: NextRequest, context: RouteContext) => {
138+
const { id: organizationId, drainId } = await context.params
139+
const access = await authorizeDrainAccess(organizationId, { requireMutating: true })
140+
if (!access.ok) return access.response
141+
142+
const parsed = await parseRequest(deleteDataDrainContract, request, context)
143+
if (!parsed.success) return parsed.response
144+
145+
const drain = await loadDrain(organizationId, drainId)
146+
if (!drain) {
147+
return NextResponse.json({ error: 'Data drain not found' }, { status: 404 })
148+
}
149+
150+
await db.delete(dataDrains).where(eq(dataDrains.id, drainId))
151+
152+
logger.info('Data drain deleted', { drainId, organizationId })
153+
154+
recordAudit({
155+
workspaceId: null,
156+
actorId: access.session.user.id,
157+
action: AuditAction.DATA_DRAIN_DELETED,
158+
resourceType: AuditResourceType.DATA_DRAIN,
159+
resourceId: drainId,
160+
actorName: access.session.user.name ?? undefined,
161+
actorEmail: access.session.user.email ?? undefined,
162+
resourceName: drain.name,
163+
description: `Deleted data drain '${drain.name}'`,
164+
metadata: {
165+
organizationId,
166+
source: drain.source,
167+
destinationType: drain.destinationType,
168+
},
169+
request,
170+
})
171+
172+
return NextResponse.json({ success: true as const })
173+
})

0 commit comments

Comments
 (0)