Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/content/docs/meta.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
"how-it-works",
"observability",
"ai",
"recipes",
"deploying",
"errors",
"api-reference"
Expand Down
290 changes: 290 additions & 0 deletions docs/content/docs/recipes/data-sync.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,290 @@
---
title: Data Synchronization
description: Sync data between external systems using durable polling, batch processing, webhooks, and reconciliation patterns.
type: guide
summary: Patterns for reliably syncing data between external systems.
prerequisites:
- /docs/foundations/workflows-and-steps
- /docs/foundations/hooks
related:
- /docs/foundations/common-patterns
- /docs/foundations/idempotency
- /docs/api-reference/workflow/sleep
---

Data synchronization is one of the most common workflow use cases. Whether you are polling a CRM for updated records, pushing batches of data to a warehouse, or reacting to webhook notifications, Workflow DevKit gives you durable building blocks that survive crashes, retries, and cold starts.

This guide covers the core patterns you will need. Each pattern is self-contained, so you can jump to whichever one fits your use case.

## Polling Pattern

The simplest sync strategy is to poll an external system at regular intervals using [`sleep()`](/docs/api-reference/workflow/sleep) in a loop. The workflow stays alive between polls, so you never miss an interval even if your compute restarts.

```typescript title="workflows/crm-poll.ts" lineNumbers
import { sleep } from "workflow";

declare function fetchUpdatedContacts(since: Date): Promise<Contact[]>; // @setup
declare function upsertContacts(contacts: Contact[]): Promise<void>; // @setup
declare type Contact = { id: string; name: string; updatedAt: Date }; // @setup

export async function crmPollingSync() {
"use workflow";

let cursor = new Date();

while (true) {
const contacts = await fetchUpdatedContacts(cursor); // [!code highlight]

if (contacts.length > 0) {
await upsertContacts(contacts);
cursor = new Date();
}

await sleep("5 minutes"); // [!code highlight]
}
}
```

The step functions handle the actual I/O with full Node.js access:

```typescript title="workflows/crm-poll-steps.ts" lineNumbers
declare const db: { contacts: { upsert(opts: any): Promise<void> } }; // @setup

async function fetchUpdatedContacts(since: Date) {
"use step";

const response = await fetch(
`https://api.example-crm.com/contacts?updated_since=${since.toISOString()}`
);

if (!response.ok) {
throw new Error(`CRM API error: ${response.status}`);
}

return response.json();
}

async function upsertContacts(contacts: Array<{ id: string; name: string; updatedAt: Date }>) {
"use step";

for (const contact of contacts) {
await db.contacts.upsert({
where: { externalId: contact.id },
data: contact,
});
}
}
```

<Callout type="info">
Because `sleep()` is durable, the workflow does not consume compute while waiting. The runtime suspends execution and resumes it when the interval elapses.
</Callout>

## Batch Sync Pattern

When you need to sync many records at once, process them in parallel with `Promise.all`. Each record is handled by its own step, so a single failure does not block the rest.

```typescript title="workflows/batch-sync.ts" lineNumbers
declare function fetchPendingRecords(): Promise<Record[]>; // @setup
declare function syncRecord(record: Record): Promise<void>; // @setup
declare type Record = { id: string; data: string }; // @setup

export async function batchSync() {
"use workflow";

const records = await fetchPendingRecords();

await Promise.all( // [!code highlight]
records.map((record) => syncRecord(record)) // [!code highlight]
); // [!code highlight]
}
```

```typescript title="workflows/batch-sync-steps.ts" lineNumbers
import { getStepMetadata } from "workflow";

async function fetchPendingRecords() {
"use step";

const response = await fetch("https://api.example.com/records?status=pending");
return response.json();
}

async function syncRecord(record: { id: string; data: string }) {
"use step";

const { stepId } = getStepMetadata();

await fetch(`https://api.destination.com/records/${record.id}`, {
method: "PUT",
headers: {
"Content-Type": "application/json",
"Idempotency-Key": stepId, // [!code highlight]
},
body: JSON.stringify(record),
});
}
```

Each step retries independently on failure. The `stepId` idempotency key prevents duplicate writes even if a step retries. See [Idempotency](/docs/foundations/idempotency) for more on this pattern.

## Webhook-Driven Sync

Instead of polling, you can react to changes in real time using [`createWebhook()`](/docs/api-reference/workflow/create-webhook). Register the webhook URL with the external system, and the workflow resumes whenever a notification arrives.

```typescript title="workflows/webhook-sync.ts" lineNumbers
import { createWebhook } from "workflow";

declare function registerWebhook(url: string): Promise<void>; // @setup
declare function processChange(change: Change): Promise<void>; // @setup
declare type Change = { type: string; recordId: string; data: unknown }; // @setup

export async function webhookDrivenSync() {
"use workflow";

const webhook = createWebhook<Change>({
respondWith: Response.json({ received: true }),
});

// Register the webhook URL with the external system
await registerWebhook(webhook.url); // [!code highlight]

// Process incoming change notifications
for await (const request of webhook) { // [!code highlight]
const change = await request.json();
await processChange(change);
}
}
```

```typescript title="workflows/webhook-sync-steps.ts" lineNumbers
declare const db: { records: { upsert(opts: any): Promise<void> } }; // @setup

async function registerWebhook(url: string) {
"use step";

await fetch("https://api.example.com/webhooks", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ url, events: ["record.updated", "record.created"] }),
});
}

async function processChange(change: { type: string; recordId: string; data: unknown }) {
"use step";

await db.records.upsert({
where: { externalId: change.recordId },
data: change.data,
});
}
```

The `for await` loop keeps the workflow alive, processing each event as it arrives. The static `respondWith` immediately acknowledges every incoming request so the external system does not time out.

<Callout type="info">
See [Hooks & Webhooks](/docs/foundations/hooks) for the full guide on creating webhooks, handling multiple events, and sending dynamic responses.
</Callout>

## Reconciliation Pattern

When two systems can drift out of sync, periodically reconcile them by fetching both sides, computing a diff, and applying only the changes.

```typescript title="workflows/reconcile.ts" lineNumbers
import { sleep } from "workflow";

declare function fetchLocalRecords(): Promise<SyncRecord[]>; // @setup
declare function fetchRemoteRecords(): Promise<SyncRecord[]>; // @setup
declare function applyChanges(changes: SyncRecord[]): Promise<void>; // @setup
declare type SyncRecord = { id: string; data: string; updatedAt: number }; // @setup

export async function reconciliationSync() {
"use workflow";

while (true) {
// Fetch both sides in parallel
const [local, remote] = await Promise.all([ // [!code highlight]
fetchLocalRecords(), // [!code highlight]
fetchRemoteRecords(), // [!code highlight]
]); // [!code highlight]

// Compute the diff
const localMap = new Map(local.map((r) => [r.id, r]));
const changes = remote.filter((r) => {
const existing = localMap.get(r.id);
return !existing || existing.updatedAt < r.updatedAt;
});

if (changes.length > 0) {
await applyChanges(changes);
}

await sleep("1 hour");
}
}
```

```typescript title="workflows/reconcile-steps.ts" lineNumbers
declare const db: { records: { findMany(): Promise<any[]>; upsert(opts: any): Promise<void> } }; // @setup

async function fetchLocalRecords() {
"use step";

return db.records.findMany();
}

async function fetchRemoteRecords() {
"use step";

const response = await fetch("https://api.example.com/records");
return response.json();
}

async function applyChanges(changes: Array<{ id: string; data: string; updatedAt: number }>) {
"use step";

for (const record of changes) {
await db.records.upsert({
where: { externalId: record.id },
data: record,
});
}
}
```

Diffing logic runs inside the workflow function, which is lightweight and deterministic. Only the actual I/O (fetching records and writing changes) happens in steps.

## Idempotent Writes

Any step that writes to an external system should use an idempotency key to prevent duplicates on retry. Use [`getStepMetadata()`](/docs/api-reference/workflow/get-step-metadata) to get a stable identifier for each step invocation.

```typescript title="workflows/idempotent-write.ts" lineNumbers
import { getStepMetadata } from "workflow";

async function createInvoice(customerId: string, amount: number) {
"use step";

const { stepId } = getStepMetadata(); // [!code highlight]

await fetch("https://api.billing.com/invoices", {
method: "POST",
headers: {
"Content-Type": "application/json",
"Idempotency-Key": stepId, // [!code highlight]
},
body: JSON.stringify({ customerId, amount }),
});
}
```

The `stepId` is stable across retries but unique per step invocation, making it the ideal idempotency key. See [Idempotency](/docs/foundations/idempotency) for the full explanation.

## Related Documentation

- [Common Patterns](/docs/foundations/common-patterns) - Sequential, parallel, and timeout patterns
- [Hooks & Webhooks](/docs/foundations/hooks) - Receive external events in workflows
- [Idempotency](/docs/foundations/idempotency) - Prevent duplicate side effects
- [Errors & Retrying](/docs/foundations/errors-and-retries) - Control retry behavior with `FatalError` and `RetryableError`
- [`sleep()` API Reference](/docs/api-reference/workflow/sleep) - Durable sleep for polling intervals
- [`createWebhook()` API Reference](/docs/api-reference/workflow/create-webhook) - Create webhook endpoints
- [`getStepMetadata()` API Reference](/docs/api-reference/workflow/get-step-metadata) - Access step metadata for idempotency
17 changes: 17 additions & 0 deletions docs/content/docs/recipes/index.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
---
title: Recipes
description: Real-world workflow patterns for common use cases.
type: overview
summary: Production-ready patterns you can adapt for your own workflows.
related:
- /docs/foundations/common-patterns
- /docs/foundations/workflows-and-steps
---

Recipes are complete, production-ready patterns for common workflow use cases. Each recipe builds on the [Foundations](/docs/foundations) concepts and shows how to combine them to solve real problems.

<Cards>
<Card href="/docs/recipes/data-sync" title="Data Synchronization">
Sync data between external systems using polling, batch processing, webhooks, and reconciliation patterns.
</Card>
</Cards>
4 changes: 4 additions & 0 deletions docs/content/docs/recipes/meta.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"title": "Recipes",
"pages": ["data-sync"]
}
Loading