Skip to content

Latest commit

 

History

History
341 lines (269 loc) · 11.9 KB

File metadata and controls

341 lines (269 loc) · 11.9 KB

effect-machine

Type-safe state machines for Effect.

Commands

bun run gate          # typecheck + lint + test + build
bun test              # Run tests
bun run typecheck     # tsgo --noEmit (native Go compiler)
bun run lint          # oxlint + effect-language-service diagnostics
bun run lint:effect   # effect LSP diagnostics only
bun run fmt           # oxfmt

Conventions

  • Files: kebab-case (actor.ts, persistent-actor.ts)
  • States/Events: schema-first with State({...}) / Event({...}) - they ARE schemas
  • Empty structs: plain values - State.Idle (not callable)
  • Non-empty: State.Loading({ url }) - constructor requiring args
  • Machine creation: Machine.make({ state, event, initial }) - types inferred
  • Exports: all public API via src/index.ts
  • Namespace pattern: import { Machine } from "effect-machine" then Machine.make, etc.

Fluent Builder

const machine = Machine.make({ state, event, initial })
  .on(State.Idle, Event.Start, () => State.Running)
  .on([State.Draft, State.Review], Event.Cancel, () => State.Cancelled) // multi-state
  .onAny(Event.Reset, () => State.Idle) // wildcard (any state)
  .spawn(State.Running, ({ slots }) => slots.poll())
  .timeout(State.Loading, { duration: Duration.seconds(30), event: Event.Timeout })
  .postpone(State.Connecting, Event.Data)
  .final(State.Done);
  • Builder methods mutate this, return this
  • Builder chain ends naturally — no terminal method needed
  • .onAny() fires when no specific .on() matches for that event

.task()

Async work that emits an event on completion:

// Explicit onSuccess mapping
.task(State.Loading, ({ state }) => fetchData(state.url), {
  onSuccess: (data) => Event.Loaded({ data }),
  onFailure: (cause) => Event.Failed({ error: Cause.pretty(cause) }),
})

// Shorthand — when task returns Event directly, onSuccess can be omitted
.task(State.Loading, ({ state }) => fetchData(state.url).pipe(Effect.map(d => Event.Loaded({ data: d }))), {
  onFailure: (cause) => Event.Failed({ error: Cause.pretty(cause) }),
})

// Multi-state
.task([State.Loading, State.Retrying], ({ state }) => fetchData(state.url), { onSuccess: ... })

State.derive()

Construct state from existing source. Per-variant and union-level:

// Per-variant: target-specific, works cross-state
State.Active.derive(state, { count: state.count + 1 });
State.Shipped.derive(processingState, { trackingId: "TRACK-123" });
State.Idle.derive(anyState); // → { _tag: "Idle" }

// Union-level: dispatches by _tag, preserves specific variant subtype
const updated = MyState.derive(state, { queue: newQueue });
// If state is Streaming, returns Streaming (not union type)
// Partial keys not in target variant are silently dropped

Slots

Unified parameterized slots via Slot.define + Slot.fn. Handlers take only params (no ctx parameter):

const MySlots = Slot.define({
  canRetry: Slot.fn({ max: Schema.Number }, Schema.Boolean),
  fetch: Slot.fn({ url: Schema.String }),
});

const machine = Machine.make({ state, event, slots: MySlots, initial }).on(
  State.X,
  Event.Y,
  ({ slots }) =>
    Effect.gen(function* () {
      if (yield* slots.canRetry({ max: 3 })) {
        yield* slots.fetch({ url: "/api" });
      }
      return State.Z;
    }),
);

// Provide slot implementations at spawn time — handlers take only params
const actor =
  yield *
  Machine.spawn(machine, {
    slots: {
      canRetry: ({ max }) => attempts < max,
      fetch: ({ url }) => Http.get(url),
    },
  });
yield * actor.start;

// When a handler needs machine state, access via service
Machine.spawn(machine, {
  slots: {
    canRetry: ({ max }) => machine.Context.pipe(Effect.map((ctx) => ctx.state.attempts < max)),
  },
});

Running Machines

Simple (no registry):

Machine.spawn returns an unstarted actor. Call actor.start to fork the event loop.

const actor = yield * Machine.spawn(machine);
yield * actor.start; // fork event loop, background effects, spawn effects
yield * actor.stop; // caller responsible

// Scope-aware — use Machine.scoped to bridge ActorScope from Scope:
yield *
  Effect.scoped(
    Machine.scoped(
      Effect.gen(function* () {
        const actor = yield* Machine.spawn(machine);
        yield* actor.start;
        // actor.stop called automatically when scope closes
      }),
    ),
  );

With registry:

system.spawn auto-starts — no actor.start needed.

const system = yield * ActorSystemService;
const actor = yield * system.spawn("my-id", machine);

ActorScope: Machine.spawn and system.spawn detect ActorScope via Effect.serviceOption — if present, attach stop finalizer; if absent, skip. Use Machine.scoped(effect) to bridge Scope.ScopeActorScope. This is explicit opt-in — ambient Scope.Scope does NOT trigger auto-cleanup (prevents bugs where unrelated scopes tear down actors).

Recovery + Durability

Lifecycle hooks for persistence. Replace the old PersistConfig.

const actor =
  yield *
  Machine.spawn(machine, {
    lifecycle: {
      recovery: {
        resolve: ({ actorId, generation, machineInitial }) =>
          storage.get(actorId).pipe(Effect.map(Option.fromNullable)),
      },
      durability: {
        save: ({ actorId, nextState, event }) => storage.set(actorId, nextState),
        shouldSave: (state, prev) => state._tag !== prev._tag,
      },
    },
  });
yield * actor.start;
  • Recovery — resolves initial state per generation. Runs during actor.start. Returns Option<S>: Some overrides initial, None uses machine.initial. generation is 0 for cold start, 1+ for supervision restarts.
  • Durability — saves state after committed transitions. Receives DurabilityCommit with actorId, generation, previousState, nextState, event. shouldSave is a sync predicate to skip uninteresting transitions.
  • hydrate overrides recoveryMachine.spawn(machine, { hydrate: state }) skips recovery.resolve entirely.

Supervision

Actors can automatically restart on defect with Supervision.restart():

import { Supervision } from "effect-machine";

const actor =
  yield *
  Machine.spawn(machine, {
    supervision: Supervision.restart({ maxRestarts: 3, within: "1 minute" }),
  });
yield * actor.start;

// Via system (auto-starts)
const actor =
  yield *
  system.spawn("id", machine, {
    supervision: Supervision.restart(),
  });

// Observe exit reason
const exit = yield * actor.awaitExit; // ActorExit<S>
const exit = yield * actor.watch(other); // ActorExit<unknown>
  • Restart from machine.initial — always clean slate, never last-state
  • Actor ID survives — same identity across restarts
  • Pending requests failcall/ask/sendWait behind crash get ActorStoppedError
  • Children die — both scopes close; children come back only if restart re-runs spawn/background
  • stop/drain are terminal — no restart
  • Final state = no restartawaitExit resolves with ActorExit.Final
  • BudgetSchedule controls timing/count; exhaustion = terminal ActorExit.Defect
  • ClassifiershouldRestart optionally skips restart for specific defect types
  • Entity-machine: cluster-supervised via defectRetryPolicy, NOT local supervision

Child Actors

Spawn children from .spawn()/.background() handlers via self.spawn(id, childMachine):

machine.spawn(State.Active, ({ self }) =>
  Effect.gen(function* () {
    const child = yield* self.spawn("worker-1", workerMachine).pipe(Effect.orDie);
    yield* child.send(WorkerEvent.Start);
    // child auto-stopped when parent exits Active state
  }),
);
  • Children spawned in .spawn() handlers are state-scoped — auto-stopped on state exit
  • Children spawned in .background() handlers live for machine lifetime
  • self.spawn returns Effect<ActorRef, DuplicateActorError, R> — use Effect.orDie in handlers
  • Every ActorRef has actor.system for child access: actor.system.get("worker-1")

ActorRef API

actor.send(event); // fire-and-forget
actor.cast(event); // alias for send
actor.call(event); // request-reply, returns ProcessEventResult
actor.ask(event); // typed reply (event must have Event.reply())
actor.waitFor(State.X); // wait for state (constructor or predicate)
actor.sendAndWait(ev, X); // send + wait for state
actor.awaitFinal; // wait for final state
actor.watch(other); // completes when other actor stops
actor.drain; // process remaining queue, then stop
actor.subscribe(fn); // sync callback, returns unsubscribe
actor.system; // ActorSystem
actor.children; // ReadonlyMap<string, ActorRef>

// Sync helpers (for UI hooks)
actor.sync.send(event);
actor.sync.stop();
actor.sync.snapshot();
actor.sync.matches(tag);
actor.sync.can(event);

ask / reply

Events declare reply schemas via Event.reply(). Handlers use Machine.reply():

const MyEvent = Event({
  GetCount: Event.reply({}, Schema.Number),
  Reset: {},
});

.on(State.Active, Event.GetCount, ({ state }) =>
  Machine.reply(state, state.count),
)

const count = yield* actor.ask(Event.GetCount);  // number

Handler Type Constraints

Method Allowed R Why
.on() / .reenter() never Pure transitions, no services
.spawn() / .background() Scope Finalizers allowed
spawn(..., { slots }) Any R Slot implementations can use services
  • Handlers cannot require arbitrary services — use slots
  • Handlers cannot produce errors — error channel fixed to never
  • Handlers must return machine's state schema — wrong states rejected at compile time

Gotchas

  • Machine.spawn returns an unstarted actor — must call yield* actor.start. system.spawn auto-starts.
  • Never throw in Effect.gen — use yield* Effect.fail()
  • yield* Effect.yieldNow after send() to let effects run
  • simulate()/createTestHarness() don't run spawn effects
  • Same-state transitions skip spawn/finalizers — use .reenter() to force
  • Empty structs: State.Idle not State.Idle()
  • .onAny() only fires when no specific .on() matches
  • self.spawn errors with DuplicateActorError — wrap with Effect.orDie
  • Sync helpers live on actor.sync.*
  • Pending call/ask Deferreds settled with ActorStoppedError on stop
  • ask() only accepts events with Event.reply() — non-reply events are a type error
  • Reply decode failures (schema mismatch) are defects

Cluster / Entity Machines

Wire machines to @effect/cluster for distributed actors:

import { toEntity, EntityMachine } from "effect-machine/cluster";

const OrderEntity = toEntity(orderMachine, { type: "Order" });
const OrderEntityLayer = EntityMachine.layer(OrderEntity, orderMachine, {
  initializeState: (entityId) => OrderState.Pending({ orderId: entityId }),
  persistence: { strategy: "journal" },
});
  • toEntity generates Entity with Send/Ask/GetState/WatchState RPCs
  • EntityMachine.layer wires machine to cluster via shared runtime kernel
  • EntityActorRef: typed client wrapper (send/ask/snapshot/watch/waitFor)

Entity Persistence

Opt-in via EntityMachineOptions.persistence:

  • Snapshot strategy (default): background scheduler + deactivation finalizer
  • Journal strategy: inline event append on each RPC, replay on reactivation
  • PersistenceAdapter service tag resolved from context
  • Journal append failures defect entity — cluster retry restarts from snapshot
  • Hydration: snapshot → journal replay → initializeStatemachine.initial

Cluster Gotchas

  • Entity tests use Entity.makeTestClient + ShardingConfig.layer + Effect.scoped
  • EntityMachine.layer accepts raw Machine
  • Entity RPCs use .tag field (not ._tag) to distinguish request types
  • WatchState test skipped due to effect beta Queue bug

Documentation

  • SKILL.md — AI agent quick reference