Production setup and operations for the workflow engine.
# 1. Build
cargo build --release
# 2. Start SpacetimeDB
spacetime start
# 3. Publish
spacetime publish workflow-engine --project-path . --clear-database
# 4. Verify
spacetime logs workflow-engine | grep "initialized"# Linux/macOS
curl -fsSL https://install.spacetimedb.com | bash
# Via Cargo
cargo install spacetimedb-cli
# Verify
spacetime version# Default (localhost:3000)
spacetime start
# Custom address
spacetime start --listen-addr 0.0.0.0:3000
# With persistent data directory
spacetime start --data-dir ~/spacetime-datacargo buildcargo build --releasecargo build --release --features audit_logThis enables the workflow_event table for debugging.
spacetime publish workflow-engine --project-path . --clear-databasespacetime publish workflow-engine --project-path .spacetime publish workflow-engine \
--project-path . \
--server https://your-server.spacetimedb.comspacetime sql workflow-engine "SELECT COUNT(*) FROM workflow"
spacetime sql workflow-engine "SELECT COUNT(*) FROM workflow_timer"# Recent logs
spacetime logs workflow-engine --num-lines 50
# Follow logs
spacetime logs workflow-engine --followInitializing SpacetimeDB Workflow Engine
Registering workflow type: npc_patrol
Registering workflow type: combat
Registering workflow type: buff
Registering workflow type: respawn
Registering workflow type: production
Workflow registry initialized
import { SpacetimeDBClient } from '@clockworklabs/spacetimedb-sdk';
const client = new SpacetimeDBClient();
await client.connect('ws://localhost:3000', 'workflow-engine');
// Subscribe to workflows for an entity
client.subscribe('SELECT * FROM workflow WHERE entity_id = 42');
// Listen for changes
client.on('workflow', (workflows) => {
for (const wf of workflows) {
console.log(`${wf.workflow_type}: ${wf.current_step}`);
}
});
// Start a workflow
await client.call('workflow_start', ['buff', 42, null, JSON.stringify(30)]);
// Send a signal
await client.call('workflow_signal', [workflowId, 'dispel', '[]']);use spacetimedb_sdk::*;
#[tokio::main]
async fn main() -> Result<()> {
let conn = connect("ws://localhost:3000", "workflow-engine", None).await?;
// Subscribe
subscribe(&["SELECT * FROM workflow WHERE entity_id = 42"]).await?;
// Start workflow
conn.call_reducer("workflow_start", &(
"buff",
Some(42u64),
None::<String>,
serde_json::to_vec(&30u32)?,
)).await?;
Ok(())
}# Count active workflows
spacetime sql workflow-engine "
SELECT status, COUNT(*) as count
FROM workflow
GROUP BY status
"
# Check for stuck workflows (active with no timers)
spacetime sql workflow-engine "
SELECT w.id, w.workflow_type, w.current_step
FROM workflow w
LEFT JOIN workflow_timer t ON w.id = t.workflow_id AND NOT t.cancelled
WHERE w.status IN ('Running', 'Suspended')
AND t.scheduled_id IS NULL
"
# Check timer backlog
spacetime sql workflow-engine "
SELECT COUNT(*) as pending_timers
FROM workflow_timer
WHERE NOT cancelled
"Create a reducer for periodic cleanup:
#[reducer]
pub fn cleanup_old_workflows(ctx: &ReducerContext, days_old: u64) -> Result<(), String> {
let cutoff = ctx.timestamp
.checked_sub_duration(Duration::from_secs(days_old * 86400))
.unwrap();
for workflow in ctx.db.workflow().iter() {
if workflow.is_terminal() && workflow.updated_at < cutoff {
ctx.db.workflow().id().delete(&workflow.id);
}
}
Ok(())
}Call periodically:
spacetime call workflow-engine cleanup_old_workflows '[7]' # Clean workflows older than 7 daysIf timers aren't firing (debugging), trigger manually:
#[reducer]
pub fn manual_timer_tick(ctx: &ReducerContext) -> Result<(), String> {
let timers: Vec<WorkflowTimer> = ctx.db.workflow_timer()
.iter()
.filter(|t| !t.cancelled)
.collect();
for timer in timers {
workflow_timer_fire(ctx, timer)?;
}
Ok(())
}| Metric | Query |
|---|---|
| Active workflows | SELECT COUNT(*) FROM workflow WHERE status IN ('Running', 'Suspended') |
| Pending timers | SELECT COUNT(*) FROM workflow_timer WHERE NOT cancelled |
| Failed workflows | SELECT COUNT(*) FROM workflow WHERE status = 'Failed' |
| Workflows by type | SELECT workflow_type, COUNT(*) FROM workflow GROUP BY workflow_type |
| Condition | Query | Threshold |
|---|---|---|
| Stuck workflows | Active with no timers | > 0 |
| Timer backlog | Pending timers | Growing over time |
| Failure rate | Failed / Total | > 1% |
| Long-running | Updated > 1 hour ago, still active | > 0 |
-- View recent events
SELECT * FROM workflow_event
ORDER BY timestamp DESC
LIMIT 100;
-- Events for specific workflow
SELECT * FROM workflow_event
WHERE workflow_id = 1
ORDER BY timestamp;The engine creates btree indexes on:
workflow.entity_idworkflow.correlation_idworkflow.workflow_typeworkflow.statusworkflow_timer.workflow_id
| Practice | Why |
|---|---|
| Keep state small | Large blobs slow serialization |
| Limit timers per workflow | Each timer is a table row |
| Use correlation IDs | Group related workflows for batch queries |
| Clean up completed workflows | Reduce table size |
| Batch workflow creation | Single reducer call for multiple workflows |
| Size | Recommendation |
|---|---|
| < 1 KB | Ideal |
| 1-10 KB | Acceptable |
| 10-100 KB | Consider splitting |
| > 100 KB | Refactor needed |
Prevent manual timer invocation:
#[reducer]
pub fn workflow_timer_fire(ctx: &ReducerContext, timer: WorkflowTimer) -> Result<(), String> {
// Only allow scheduler to call this
if ctx.sender != ctx.identity {
return Err("Only scheduler can invoke this reducer".to_string());
}
// ... implementation
}Add ownership checks:
#[reducer]
pub fn workflow_cancel(ctx: &ReducerContext, workflow_id: u64, reason: String) -> Result<(), String> {
let workflow = ctx.db.workflow().id().find(&workflow_id)
.ok_or("Workflow not found")?;
// Verify ownership (implement your own logic)
// verify_ownership(ctx.sender, workflow.entity_id)?;
// ... cancel logic
}spacetime sql workflow-engine "SELECT * FROM workflow" > workflows.json
spacetime sql workflow-engine "SELECT * FROM workflow_timer" > timers.jsonThe workflow engine is designed to survive restarts:
- Workflows: State is in tables, not memory
- Timers: Stored in
workflow_timertable withscheduled_at - Scheduled reducers: SpacetimeDB resumes timer firing automatically
No manual recovery needed.
See Troubleshooting for common issues.
| Issue | Solution |
|---|---|
| Timers not firing | Check workflow_timer table has rows with cancelled = false |
| Workflow stuck | Check if any pending timers exist for that workflow |
| Handler errors | Check workflow.error_message for failed workflows |
| Unknown workflow type | Verify type is registered in workflows/mod.rs |
- Build with
--release - Enable audit logging if needed
- Set up monitoring queries
- Configure cleanup job
- Test fail-over (restart SpacetimeDB)
- Load test with expected workflow count
- Document custom workflow types
- Set up alerting for failures