Skip to content

Commit 2d5553a

Browse files
committed
feat(workflow): implement saga pattern with compensation steps and child workflows
- Added StepWithCompensation and StepWithResultAndCompensation methods to support compensation for successful steps. - Introduced RunChild and SpawnChild methods for executing child workflows with checkpointing. - Implemented FanOut to run multiple child workflows in parallel and collect results. - Enhanced event handling with WaitForAll, WaitForAny, and WaitForMatch methods for reactive patterns. - Updated Store interface to include ListChildRuns and DeleteCheckpointsAfter methods for better management of workflow state. - Added versioning support for workflows with NewWorkflowV and tests for versioned workflows.
1 parent 4cc3492 commit 2d5553a

87 files changed

Lines changed: 12751 additions & 183 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

_examples/kitchen-sink/main.go

Lines changed: 378 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,378 @@
1+
// Package main demonstrates a comprehensive Dispatch setup with jobs, workflows,
2+
// cron scheduling, saga compensations, child workflows, and a DWP WebSocket
3+
// server. Run it and connect via WebSocket, HTTP RPC, or the Go/TS/Python client.
4+
//
5+
// Usage:
6+
//
7+
// go run .
8+
//
9+
// Then in another terminal:
10+
//
11+
// # Enqueue a job via HTTP RPC
12+
// curl -X POST http://localhost:8080/dwp/rpc \
13+
// -H "Content-Type: application/json" \
14+
// -d '{
15+
// "id": "req-1",
16+
// "type": "request",
17+
// "method": "job.enqueue",
18+
// "token": "demo-token",
19+
// "data": {"name":"send-email","payload":{"to":"user@example.com","subject":"Hello"}}
20+
// }'
21+
//
22+
// # Start a workflow via HTTP RPC
23+
// curl -X POST http://localhost:8080/dwp/rpc \
24+
// -H "Content-Type: application/json" \
25+
// -d '{
26+
// "id": "req-2",
27+
// "type": "request",
28+
// "method": "workflow.start",
29+
// "token": "demo-token",
30+
// "data": {"name":"order-pipeline","input":{"order_id":"ORD-001","items":["widget","gadget"]}}
31+
// }'
32+
//
33+
// # Get stats
34+
// curl -X POST http://localhost:8080/dwp/rpc \
35+
// -H "Content-Type: application/json" \
36+
// -d '{"id":"req-3","type":"request","method":"stats","token":"demo-token"}'
37+
package main
38+
39+
import (
40+
"context"
41+
"fmt"
42+
"log/slog"
43+
"os"
44+
"os/signal"
45+
"syscall"
46+
"time"
47+
48+
"github.com/xraph/dispatch"
49+
"github.com/xraph/dispatch/cron"
50+
"github.com/xraph/dispatch/dwp"
51+
"github.com/xraph/dispatch/engine"
52+
"github.com/xraph/dispatch/job"
53+
"github.com/xraph/dispatch/store/memory"
54+
"github.com/xraph/dispatch/workflow"
55+
56+
"github.com/xraph/forge"
57+
)
58+
59+
func main() {
60+
logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug}))
61+
62+
// ──────────────────────────────────────────────────
63+
// 1. Create the Dispatch engine
64+
// ──────────────────────────────────────────────────
65+
66+
s := memory.New()
67+
d, err := dispatch.New(
68+
dispatch.WithStore(s),
69+
dispatch.WithConcurrency(4),
70+
dispatch.WithQueues([]string{"default", "email", "images"}),
71+
dispatch.WithLogger(logger),
72+
)
73+
if err != nil {
74+
logger.Error("failed to create dispatcher", slog.String("error", err.Error()))
75+
os.Exit(1)
76+
}
77+
78+
eng, err := engine.Build(d,
79+
engine.WithStreamBroker(),
80+
)
81+
if err != nil {
82+
logger.Error("failed to build engine", slog.String("error", err.Error()))
83+
os.Exit(1)
84+
}
85+
86+
// ──────────────────────────────────────────────────
87+
// 2. Register jobs
88+
// ──────────────────────────────────────────────────
89+
90+
// A simple email-sending job.
91+
engine.Register(eng, job.NewDefinition("send-email", func(ctx context.Context, p struct {
92+
To string `json:"to"`
93+
Subject string `json:"subject"`
94+
}) error {
95+
logger.Info("sending email", slog.String("to", p.To), slog.String("subject", p.Subject))
96+
time.Sleep(100 * time.Millisecond) // Simulate I/O.
97+
logger.Info("email sent", slog.String("to", p.To))
98+
return nil
99+
}))
100+
101+
// An image processing job with retry semantics.
102+
engine.Register(eng, job.NewDefinition("process-image", func(ctx context.Context, p struct {
103+
URL string `json:"url"`
104+
Width int `json:"width"`
105+
Height int `json:"height"`
106+
}) error {
107+
logger.Info("processing image",
108+
slog.String("url", p.URL),
109+
slog.Int("width", p.Width),
110+
slog.Int("height", p.Height),
111+
)
112+
time.Sleep(200 * time.Millisecond) // Simulate processing.
113+
logger.Info("image processed", slog.String("url", p.URL))
114+
return nil
115+
}))
116+
117+
// A payment charging job (used by saga workflow).
118+
engine.Register(eng, job.NewDefinition("charge-payment", func(_ context.Context, p struct {
119+
OrderID string `json:"order_id"`
120+
Amount float64 `json:"amount"`
121+
}) error {
122+
logger.Info("charging payment",
123+
slog.String("order_id", p.OrderID),
124+
slog.Float64("amount", p.Amount),
125+
)
126+
return nil
127+
}))
128+
129+
// ──────────────────────────────────────────────────
130+
// 3. Register workflows
131+
// ──────────────────────────────────────────────────
132+
133+
// (a) Multi-step order pipeline with StepWithResult and parallel steps.
134+
engine.RegisterWorkflow(eng, workflow.NewWorkflow("order-pipeline",
135+
func(wf *workflow.Workflow, input struct {
136+
OrderID string `json:"order_id"`
137+
Items []string `json:"items"`
138+
}) error {
139+
// Step 1: Validate the order.
140+
if err := wf.Step("validate", func(ctx context.Context) error {
141+
logger.Info("validating order", slog.String("order_id", input.OrderID))
142+
if len(input.Items) == 0 {
143+
return fmt.Errorf("order has no items")
144+
}
145+
return nil
146+
}); err != nil {
147+
return err
148+
}
149+
150+
// Step 2: Calculate total (with typed result).
151+
total, err := workflow.StepWithResult[float64](wf, "calculate-total",
152+
func(ctx context.Context) (float64, error) {
153+
price := float64(len(input.Items)) * 19.99
154+
logger.Info("calculated total",
155+
slog.String("order_id", input.OrderID),
156+
slog.Float64("total", price),
157+
)
158+
return price, nil
159+
})
160+
if err != nil {
161+
return err
162+
}
163+
164+
// Step 3: Process payment.
165+
if err := wf.Step("process-payment", func(ctx context.Context) error {
166+
logger.Info("processing payment",
167+
slog.String("order_id", input.OrderID),
168+
slog.Float64("amount", total),
169+
)
170+
return nil
171+
}); err != nil {
172+
return err
173+
}
174+
175+
// Step 4: Send confirmation.
176+
return wf.Step("send-confirmation", func(ctx context.Context) error {
177+
logger.Info("sending order confirmation",
178+
slog.String("order_id", input.OrderID),
179+
)
180+
return nil
181+
})
182+
},
183+
))
184+
185+
// (b) Saga workflow with compensation (rollback on failure).
186+
engine.RegisterWorkflow(eng, workflow.NewWorkflow("booking-saga",
187+
func(wf *workflow.Workflow, input struct {
188+
TripID string `json:"trip_id"`
189+
}) error {
190+
// Step 1: Reserve hotel (with compensation).
191+
if err := wf.StepWithCompensation(
192+
"reserve-hotel",
193+
func(ctx context.Context) error {
194+
logger.Info("reserving hotel", slog.String("trip_id", input.TripID))
195+
return nil
196+
},
197+
func(ctx context.Context) error {
198+
logger.Info("cancelling hotel reservation", slog.String("trip_id", input.TripID))
199+
return nil
200+
},
201+
); err != nil {
202+
return err
203+
}
204+
205+
// Step 2: Reserve flight (with compensation).
206+
if err := wf.StepWithCompensation(
207+
"reserve-flight",
208+
func(ctx context.Context) error {
209+
logger.Info("reserving flight", slog.String("trip_id", input.TripID))
210+
return nil
211+
},
212+
func(ctx context.Context) error {
213+
logger.Info("cancelling flight reservation", slog.String("trip_id", input.TripID))
214+
return nil
215+
},
216+
); err != nil {
217+
return err
218+
}
219+
220+
// Step 3: Reserve car (with compensation).
221+
return wf.StepWithCompensation(
222+
"reserve-car",
223+
func(ctx context.Context) error {
224+
logger.Info("reserving rental car", slog.String("trip_id", input.TripID))
225+
return nil
226+
},
227+
func(ctx context.Context) error {
228+
logger.Info("cancelling car reservation", slog.String("trip_id", input.TripID))
229+
return nil
230+
},
231+
)
232+
},
233+
))
234+
235+
// (c) Child workflow for processing individual items.
236+
engine.RegisterWorkflow(eng, workflow.NewWorkflow("process-item",
237+
func(wf *workflow.Workflow, input struct {
238+
ItemID string `json:"item_id"`
239+
}) error {
240+
return wf.Step("process", func(ctx context.Context) error {
241+
logger.Info("processing item", slog.String("item_id", input.ItemID))
242+
time.Sleep(50 * time.Millisecond)
243+
return nil
244+
})
245+
},
246+
))
247+
248+
// (d) Parent workflow that spawns child workflows.
249+
engine.RegisterWorkflow(eng, workflow.NewWorkflow("batch-process",
250+
func(wf *workflow.Workflow, input struct {
251+
ItemIDs []string `json:"item_ids"`
252+
}) error {
253+
// Spawn child workflows for each item (fire-and-forget).
254+
for _, itemID := range input.ItemIDs {
255+
runID, err := workflow.SpawnChild(wf, "process-item", struct {
256+
ItemID string `json:"item_id"`
257+
}{ItemID: itemID})
258+
if err != nil {
259+
return fmt.Errorf("spawn child for %s: %w", itemID, err)
260+
}
261+
logger.Info("spawned child workflow",
262+
slog.String("item_id", itemID),
263+
slog.String("child_run_id", runID.String()),
264+
)
265+
}
266+
267+
return wf.Step("finalize", func(ctx context.Context) error {
268+
logger.Info("batch processing finalized",
269+
slog.Int("total_items", len(input.ItemIDs)),
270+
)
271+
return nil
272+
})
273+
},
274+
))
275+
276+
// ──────────────────────────────────────────────────
277+
// 4. Register cron schedules
278+
// ──────────────────────────────────────────────────
279+
280+
ctx := context.Background()
281+
282+
if err := engine.RegisterCron(ctx, eng, &cron.Definition[struct{}]{
283+
Name: "daily-cleanup",
284+
Schedule: "0 2 * * *", // Every day at 2 AM.
285+
JobName: "process-image",
286+
Queue: "images",
287+
Payload: struct{}{},
288+
}); err != nil {
289+
logger.Error("failed to register cron", slog.String("error", err.Error()))
290+
}
291+
292+
// ──────────────────────────────────────────────────
293+
// 5. Set up DWP server
294+
// ──────────────────────────────────────────────────
295+
296+
broker := eng.StreamBroker()
297+
handler := dwp.NewHandler(eng, broker, logger)
298+
dwpServer := dwp.NewServer(broker, handler,
299+
dwp.WithAuth(dwp.NewAPIKeyAuthenticator(
300+
dwp.APIKeyEntry{
301+
Token: "demo-token",
302+
Identity: dwp.Identity{
303+
Subject: "demo-user",
304+
AppID: "kitchen-sink",
305+
OrgID: "demo-org",
306+
Scopes: []string{dwp.ScopeAll},
307+
},
308+
},
309+
)),
310+
dwp.WithLogger(logger),
311+
)
312+
313+
// ──────────────────────────────────────────────────
314+
// 6. Create Forge app and register routes
315+
// ──────────────────────────────────────────────────
316+
317+
app := forge.New(
318+
forge.WithAppName("dispatch-kitchen-sink"),
319+
forge.WithAppVersion("0.1.0"),
320+
forge.WithHTTPAddress(":8080"),
321+
)
322+
323+
// Register DWP routes (WebSocket, SSE, HTTP RPC).
324+
dwpServer.RegisterRoutes(app.Router())
325+
326+
// ──────────────────────────────────────────────────
327+
// 7. Start and run
328+
// ──────────────────────────────────────────────────
329+
330+
// Start the engine (begins processing jobs and cron ticks).
331+
if err := eng.Start(ctx); err != nil {
332+
logger.Error("failed to start engine", slog.String("error", err.Error()))
333+
os.Exit(1)
334+
}
335+
336+
logger.Info("Dispatch kitchen-sink example running",
337+
slog.String("dwp_ws", "ws://localhost:8080/dwp"),
338+
slog.String("dwp_rpc", "http://localhost:8080/dwp/rpc"),
339+
slog.String("dwp_sse", "http://localhost:8080/dwp/sse"),
340+
)
341+
342+
// Enqueue a demo job to show it works.
343+
demoJob, _ := engine.Enqueue(ctx, eng, "send-email", struct {
344+
To string `json:"to"`
345+
Subject string `json:"subject"`
346+
}{
347+
To: "hello@example.com",
348+
Subject: "Welcome to Dispatch!",
349+
})
350+
if demoJob != nil {
351+
logger.Info("demo job enqueued", slog.String("job_id", demoJob.ID.String()))
352+
}
353+
354+
// Start a demo workflow.
355+
demoRun, _ := engine.StartWorkflow(ctx, eng, "order-pipeline", struct {
356+
OrderID string `json:"order_id"`
357+
Items []string `json:"items"`
358+
}{
359+
OrderID: "DEMO-001",
360+
Items: []string{"widget", "gadget"},
361+
})
362+
if demoRun != nil {
363+
logger.Info("demo workflow started", slog.String("run_id", demoRun.ID.String()))
364+
}
365+
366+
// Wait for shutdown signal.
367+
quit := make(chan os.Signal, 1)
368+
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
369+
<-quit
370+
371+
logger.Info("shutting down...")
372+
shutdownCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
373+
defer cancel()
374+
if err := eng.Stop(shutdownCtx); err != nil {
375+
logger.Error("engine shutdown error", slog.String("error", err.Error()))
376+
}
377+
logger.Info("goodbye")
378+
}

0 commit comments

Comments
 (0)