Mechanoid supports event sourcing for durable FSMs:
- Events are persisted after the transition action succeeds
- State is reconstructed by replaying events
- Snapshots reduce recovery time
import mechanoid.*
import zio.*
enum OrderState derives Finite:
case Pending, Paid, Shipped
enum OrderEvent derives Finite:
case Pay, Ship
import OrderState.*, OrderEvent.*
val machine = Machine(assembly[OrderState, OrderEvent](
Pending via Pay to Paid,
Paid via Ship to Shipped,
))
type OrderId = String
val orderId: OrderId = "order-1"
val eventStoreLayer: zio.ULayer[EventStore[OrderId, OrderState, OrderEvent]] =
InMemoryEventStore.layer[OrderId, OrderState, OrderEvent]val program = ZIO.scoped {
for
fsm <- FSMRuntime(orderId, machine, Pending)
_ <- fsm.send(Pay) // Event persisted
_ <- fsm.send(Ship) // Event persisted
yield ()
}.provide(
eventStoreLayer,
TimeoutStrategy.fiber[OrderId],
LockingStrategy.optimistic[OrderId]
)Implement EventStore[Id, S, E] for your storage backend:
import zio.stream.ZStream
trait EventStore[Id, S, E]:
def append(instanceId: Id, event: E, expectedSeqNr: Long): ZIO[Any, MechanoidError, Long]
def loadEvents(instanceId: Id): ZStream[Any, MechanoidError, StoredEvent[Id, E]]
def loadEventsFrom(instanceId: Id, fromSeqNr: Long): ZStream[Any, MechanoidError, StoredEvent[Id, E]]
def loadSnapshot(instanceId: Id): ZIO[Any, MechanoidError, Option[FSMSnapshot[Id, S]]]
def saveSnapshot(snapshot: FSMSnapshot[Id, S]): ZIO[Any, MechanoidError, Unit]
def highestSequenceNr(instanceId: Id): ZIO[Any, MechanoidError, Long]Critical: append must implement optimistic locking - atomically check that expectedSeqNr matches the current highest sequence number, then increment. This prevents lost updates in concurrent scenarios.
Snapshots capture point-in-time state to speed up recovery:
val program = ZIO.scoped {
for
fsm <- FSMRuntime(orderId, machine, Pending)
// Manual snapshot (you control when)
_ <- fsm.saveSnapshot
// Example strategies:
// After every N events
seqNr <- fsm.lastSequenceNr
_ <- ZIO.when(seqNr % 100 == 0)(fsm.saveSnapshot)
// On specific states
state <- fsm.currentState
_ <- ZIO.when(state == Shipped)(fsm.saveSnapshot)
yield ()
}.provide(
eventStoreLayer,
TimeoutStrategy.fiber[OrderId],
LockingStrategy.optimistic[OrderId]
)On startup, FSMRuntime (when provided with an EventStore):
- Loads the latest snapshot (if any)
- Replays only events after the snapshot's sequence number
- Resumes normal operation
Recovery time is proportional to events since the last snapshot, not total events.
The persistence layer uses optimistic locking to detect concurrent modifications:
// If another process modified the FSM between read and write:
val error = SequenceConflictError(instanceId = orderId, expectedSeqNr = 5, actualSeqNr = 6)This error indicates a concurrent modification. The caller should reload state and retry.
<< Previous: Running FSMs | Back to Index | Next: Durable Timeouts >>