diff --git a/.golangci.yml b/.golangci.yml index 2676388..a73cf62 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -1,31 +1,35 @@ +version: "2" +run: + tests: true linters: enable: - - errcheck - - gosimple - - govet - - ineffassign - - staticcheck - - unused - - gofmt - - goimports - misspell - unconvert - unparam - whitespace - -linters-settings: - errcheck: - check-type-assertions: true - check-blank: true - - govet: - check-shadowing: true - -run: - timeout: 5m - tests: true - + settings: + errcheck: + check-type-assertions: true + check-blank: true + govet: + enable: + - shadow + exclusions: + generated: lax + paths: + - third_party$ + - builtin$ + - examples$ issues: - exclude-use-default: false max-issues-per-linter: 0 max-same-issues: 0 +formatters: + enable: + - gofmt + - goimports + exclusions: + generated: lax + paths: + - third_party$ + - builtin$ + - examples$ diff --git a/Taskfile.yml b/Taskfile.yml index faa2d7c..6a567b5 100644 --- a/Taskfile.yml +++ b/Taskfile.yml @@ -41,6 +41,11 @@ tasks: cmds: - golangci-lint run ./... + lint-fix: + desc: Run linter + cmds: + - golangci-lint run ./... --fix + fmt: desc: Format code cmds: diff --git a/cmd/smallflow/main.go b/cmd/smallflow/main.go index facc4d1..007a72d 100644 --- a/cmd/smallflow/main.go +++ b/cmd/smallflow/main.go @@ -1,7 +1,107 @@ package main -import "fmt" +import ( + "context" + "fmt" + orchestrator2 "github.com/morebec/smallflow/internal/application/orchestrator" + "github.com/morebec/smallflow/internal/business/workflowmgmt" + "github.com/morebec/smallflow/internal/integration/postgres" + "time" + + "github.com/morebec/go-misas/misas" + "github.com/morebec/go-misas/mpostgres" + "github.com/morebec/go-misas/muuid" + "github.com/morebec/go-misas/mx" +) func main() { fmt.Println("Build, run, and observe workflows without the overhead!") + + clock := mx.NewRealTimeClock(time.UTC) + + dbConn, err := mpostgres.OpenConn("postgres://smallflow:smallflow@localhost:5432/postgres?sslmode=disable") + if err != nil { + panic(err) + } + + var eventStore misas.EventStore + eventStore, err = mpostgres.NewEventStore(clock, dbConn) + if err != nil { + panic(err) + } + + mx.EventRegistry.Register(workflowmgmt.WorkflowEnabledEventTypeName, workflowmgmt.WorkflowEnabledEvent{}) + mx.EventRegistry.Register(workflowmgmt.WorkflowDisabledEventTypeName, workflowmgmt.WorkflowDisabledEvent{}) + mx.EventRegistry.Register(workflowmgmt.WorkflowTriggeredEventTypeName, workflowmgmt.WorkflowTriggeredEvent{}) + mx.EventRegistry.Register(workflowmgmt.WorkflowStartedEventTypeName, workflowmgmt.WorkflowStartedEvent{}) + mx.EventRegistry.Register(workflowmgmt.WorkflowEndedEventTypeName, workflowmgmt.WorkflowEndedEvent{}) + mx.EventRegistry.Register(workflowmgmt.StepStartedEventTypeName, workflowmgmt.StepStartedEvent{}) + mx.EventRegistry.Register(workflowmgmt.StepEndedEventTypeName, workflowmgmt.StepEndedEvent{}) + + eventStore = mx.NewEventStoreDeserializerDecorator(eventStore) + + workflowRepo := &postgres.EventStoreWorkflowRepository{ + EventStore: eventStore, + UUIDGenerator: muuid.NewRandomUUIDGenerator(), + } + runRepo := &postgres.EventStoreRunRepository{ + EventStore: eventStore, + UUIDGenerator: muuid.NewRandomUUIDGenerator(), + } + + api := workflowmgmt.NewSubsystem(clock, workflowRepo, runRepo, muuid.NewRandomUUIDGenerator()).API + workflowLeaseRepository, err := postgres.NewWorkflowLeaseRepository(dbConn) + if err != nil { + panic(err) + } + + leaseManager := orchestrator2.WorkflowLeaseManager{ + Clock: clock, + Repository: workflowLeaseRepository, + } + + checkpointStore, err := mpostgres.NewPostgreSQLCheckpointStore(dbConn) + if err != nil { + panic(err) + } + orch := orchestrator2.NewWorkflowOrchestrator( + clock, + api, + leaseManager, + muuid.NewRandomUUIDGenerator(), + eventStore, + checkpointStore, + ) + orch.Start() + defer orch.Stop() + + ctx := context.Background() + + fmt.Println("Enabling workflow...") + if result := api.HandleCommand(ctx, workflowmgmt.EnableWorkflowCommand{ + WorkflowID: "my-workflow", + }); result.Error != nil { + panic(result.Error) + } + + for i := range 1 { + fmt.Printf("Triggering workflow #%d...\n", i+1) + if result := api.HandleCommand(ctx, workflowmgmt.TriggerWorkflowCommand{ + WorkflowID: "my-workflow", + RunID: muuid.NewRandomUUIDGenerator().Generate().String(), + }); result.Error != nil { + panic(result.Error) + } + } + + <-time.After(30 * time.Second) + fmt.Println("Current events in the event store:") + stream, err := eventStore.ReadFromStream(ctx, eventStore.GlobalStreamID(), misas.ReadFromEventStreamOptions{}.FromStart().Forward()) + if err != nil { + panic(err) + } + + for i, event := range stream.Events { + fmt.Printf("Event %d: %T → %+v\n", i, event, event) + } } diff --git a/compose.debug.yaml b/compose.debug.yaml deleted file mode 100644 index e97b22b..0000000 --- a/compose.debug.yaml +++ /dev/null @@ -1,8 +0,0 @@ -services: - smallflow: - image: smallflow - build: - context: . - dockerfile: ./Dockerfile - ports: - - 3000:3000 diff --git a/compose.yaml b/compose.yaml index e97b22b..c297f2d 100644 --- a/compose.yaml +++ b/compose.yaml @@ -1,8 +1,22 @@ services: - smallflow: - image: smallflow - build: - context: . - dockerfile: ./Dockerfile +# smallflow: +# image: smallflow +# build: +# context: . +# dockerfile: ./Dockerfile +# ports: +# - 3000:3000 + + postgres: + image: postgres:18-alpine + environment: + POSTGRES_USER: smallflow + POSTGRES_PASSWORD: smallflow + POSTGRES_DB: smallflow ports: - - 3000:3000 + - "5432:5432" + volumes: + - postgres:/var/lib/postgresql/data + +volumes: + postgres: diff --git a/go.mod b/go.mod index c9c1e0d..0562285 100644 --- a/go.mod +++ b/go.mod @@ -1,3 +1,19 @@ module github.com/morebec/smallflow go 1.24.8 + +replace github.com/morebec/go-misas => ./../go-misas-back + +require ( + github.com/alitto/pond/v2 v2.5.0 + github.com/morebec/go-misas v0.0.0-00010101000000-000000000000 + github.com/samber/lo v1.49.1 +) + +require ( + github.com/google/uuid v1.6.0 // indirect + github.com/lib/pq v1.10.9 // indirect + go.opentelemetry.io/otel v1.35.0 // indirect + go.opentelemetry.io/otel/trace v1.35.0 // indirect + golang.org/x/text v0.28.0 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..b114515 --- /dev/null +++ b/go.sum @@ -0,0 +1,49 @@ +github.com/DATA-DOG/go-sqlmock v1.5.2 h1:OcvFkGmslmlZibjAjaHm3L//6LiuBgolP7OputlJIzU= +github.com/DATA-DOG/go-sqlmock v1.5.2/go.mod h1:88MAG/4G7SMwSE3CeA0ZKzrT5CiOU3OJ+JlNzwDqpNU= +github.com/alitto/pond/v2 v2.5.0 h1:vPzS5GnvSDRhWQidmj2djHllOmjFExVFbDGCw1jdqDw= +github.com/alitto/pond/v2 v2.5.0/go.mod h1:xkjYEgQ05RSpWdfSd1nM3OVv7TBhLdy7rMp3+2Nq+yE= +github.com/cucumber/gherkin/go/v26 v26.2.0 h1:EgIjePLWiPeslwIWmNQ3XHcypPsWAHoMCz/YEBKP4GI= +github.com/cucumber/gherkin/go/v26 v26.2.0/go.mod h1:t2GAPnB8maCT4lkHL99BDCVNzCh1d7dBhCLt150Nr/0= +github.com/cucumber/godog v0.15.1 h1:rb/6oHDdvVZKS66hrhpjFQFHjthFSrQBCOI1LwshNTI= +github.com/cucumber/godog v0.15.1/go.mod h1:qju+SQDewOljHuq9NSM66s0xEhogx0q30flfxL4WUk8= +github.com/cucumber/messages/go/v21 v21.0.1 h1:wzA0LxwjlWQYZd32VTlAVDTkW6inOFmSM+RuOwHZiMI= +github.com/cucumber/messages/go/v21 v21.0.1/go.mod h1:zheH/2HS9JLVFukdrsPWoPdmUtmYQAQPLk7w5vWsk5s= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/gofrs/uuid v4.3.1+incompatible h1:0/KbAdpx3UXAx1kEOWHJeOkpbgRFGHVgv+CFIY7dBJI= +github.com/gofrs/uuid v4.3.1+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/hashicorp/go-immutable-radix v1.3.1 h1:DKHmCUm2hRBK510BaiZlwvpD40f8bJFeZnpfm2KLowc= +github.com/hashicorp/go-immutable-radix v1.3.1/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60= +github.com/hashicorp/go-memdb v1.3.4 h1:XSL3NR682X/cVk2IeV0d70N4DZ9ljI885xAEU8IoK3c= +github.com/hashicorp/go-memdb v1.3.4/go.mod h1:uBTr1oQbtuMgd1SSGoR8YV27eT3sBHbYiNm53bMpgSg= +github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc= +github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= +github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= +github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/samber/lo v1.49.1 h1:4BIFyVfuQSEpluc7Fua+j1NolZHiEHEpaSEKdsH0tew= +github.com/samber/lo v1.49.1/go.mod h1:dO6KHFzUKXgP8LDhU0oI8d2hekjXnGOu0DB8Jecxd6o= +github.com/sergi/go-diff v1.4.0 h1:n/SP9D5ad1fORl+llWyN+D6qoUETXNZARKjyY2/KVCw= +github.com/sergi/go-diff v1.4.0/go.mod h1:A0bzQcvG0E7Rwjx0REVgAGH58e96+X0MeOfepqsbeW4= +github.com/spf13/pflag v1.0.7 h1:vN6T9TfwStFPFM5XzjsvmzZkLuaLX+HS+0SeFLRgU6M= +github.com/spf13/pflag v1.0.7/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/yudai/gojsondiff v1.0.0 h1:27cbfqXLVEJ1o8I6v3y9lg8Ydm53EKqHXAOMxEGlCOA= +github.com/yudai/gojsondiff v1.0.0/go.mod h1:AY32+k2cwILAkW1fbgxQ5mUmMiZFgLIV+FBNExI05xg= +github.com/yudai/golcs v0.0.0-20170316035057-ecda9a501e82 h1:BHyfKlQyqbsFN5p3IfnEUduWvb9is428/nNb5L3U01M= +github.com/yudai/golcs v0.0.0-20170316035057-ecda9a501e82/go.mod h1:lgjkn3NuSvDfVJdfcVVdX+jpBxNmX4rDAzaS45IcYoM= +go.opentelemetry.io/otel v1.35.0 h1:xKWKPxrxB6OtMCbmMY021CqC45J+3Onta9MqjhnusiQ= +go.opentelemetry.io/otel v1.35.0/go.mod h1:UEqy8Zp11hpkUrL73gSlELM0DupHoiq72dR+Zqel/+Y= +go.opentelemetry.io/otel/trace v1.35.0 h1:dPpEfJu1sDIqruz7BHFG3c7528f6ddfSWfFDVt/xgMs= +go.opentelemetry.io/otel/trace v1.35.0/go.mod h1:WUk7DtFp1Aw2MkvqGdwiXYDZZNvA/1J8o6xRXLrIkyc= +golang.org/x/text v0.24.0 h1:dd5Bzh4yt5KYA8f9CJHCP4FB4D51c2c6JvN37xJJkJ0= +golang.org/x/text v0.24.0/go.mod h1:L8rBsPeo2pSS+xqN0d5u2ikmjtmoJbDBT1b7nHvFCdU= +golang.org/x/text v0.28.0/go.mod h1:U8nCwOR8jO/marOQ0QbDiOngZVEBB7MAiitBuMjXiNU= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/application/orchestrator/lease.go b/internal/application/orchestrator/lease.go new file mode 100644 index 0000000..c0a5a4f --- /dev/null +++ b/internal/application/orchestrator/lease.go @@ -0,0 +1,73 @@ +package orchestrator + +import ( + "context" + "time" + + "github.com/morebec/go-misas/misas" +) + +const defaultLeaseDuration = 5 * time.Minute + +type WorkflowLease struct { + WorkflowID string + RunID string + expiresAt time.Time +} + +func (wl WorkflowLease) IsExpired(currentTime time.Time) bool { return currentTime.After(wl.expiresAt) } + +type WorkflowLeaseRepository interface { + Add(context.Context, WorkflowLease) error + Update(context.Context, WorkflowLease) error + Remove(ctx context.Context, workflowID string, runID string) error + FindByWorkflowRunID(ctx context.Context, workflowID string, runID string) (*WorkflowLease, error) +} + +type WorkflowLeaseManager struct { + Clock misas.Clock + Repository WorkflowLeaseRepository +} + +func (l WorkflowLeaseManager) TryAcquire(ctx context.Context, workflowID string, runID string) (bool, error) { + lease, err := l.Repository.FindByWorkflowRunID(ctx, workflowID, runID) + if err != nil { + return false, err + } + + if lease != nil && !lease.IsExpired(l.Clock.Now()) { + // Lease is still valid + return false, nil + } + + wl := WorkflowLease{ + WorkflowID: workflowID, + RunID: runID, + expiresAt: l.Clock.Now().Add(defaultLeaseDuration), + } + if err := l.Repository.Add(ctx, wl); err != nil { + return false, err + } + + return true, nil +} + +func (l WorkflowLeaseManager) Release(ctx context.Context, workflowID string, runID string) error { + if err := l.Repository.Remove(ctx, workflowID, runID); err != nil { + return err + } + + return nil +} + +func (l WorkflowLeaseManager) RenewLease(ctx context.Context, workflowID string, runID string) error { + if err := l.Repository.Update(ctx, WorkflowLease{ + WorkflowID: workflowID, + RunID: runID, + expiresAt: l.Clock.Now().Add(defaultLeaseDuration), + }); err != nil { + return err + } + + return nil +} diff --git a/internal/application/orchestrator/orchestrator.go b/internal/application/orchestrator/orchestrator.go new file mode 100644 index 0000000..ebbaed7 --- /dev/null +++ b/internal/application/orchestrator/orchestrator.go @@ -0,0 +1,91 @@ +package orchestrator + +import ( + "context" + "fmt" + "github.com/morebec/smallflow/internal/business/workflowmgmt" + + "github.com/alitto/pond/v2" + "github.com/morebec/go-misas/misas" + "github.com/morebec/go-misas/muuid" + "github.com/morebec/go-misas/mx" +) + +const defaultMaxConcurrentWorkers = 1000 + +type WorkflowOrchestrator struct { + Clock misas.Clock + API misas.BusinessAPI + LeaseManager WorkflowLeaseManager + UUIDGenerator muuid.UUIDGenerator + Pool pond.Pool + + eventProcessor mx.EventProcessor +} + +func NewWorkflowOrchestrator( + clock misas.Clock, + API misas.BusinessAPI, + leaseManager WorkflowLeaseManager, + uuidGenerator muuid.UUIDGenerator, + eventStore misas.EventStore, + checkpointStore misas.CheckpointStore, +) *WorkflowOrchestrator { + wo := &WorkflowOrchestrator{ + Clock: clock, + API: API, + LeaseManager: leaseManager, + UUIDGenerator: uuidGenerator, + Pool: pond.NewPool(defaultMaxConcurrentWorkers), + } + + wo.eventProcessor = *mx.NewEventProcessor(mx.EventProcessorConfig{ + ID: "workflow-orchestrator", + StreamID: eventStore.GlobalStreamID(), + EventStore: eventStore, + CheckpointStore: checkpointStore, + CommitStrategy: misas.CheckpointCommitStrategyAfterProcessing, + Handler: wo, + }) + + return wo +} + +func (m *WorkflowOrchestrator) Start() { + err := m.eventProcessor.Start() + if err != nil { + panic(err) + } +} + +func (m *WorkflowOrchestrator) Stop() { + m.eventProcessor.Stop() +} + +func (m *WorkflowOrchestrator) IsRunning() bool { + return m.eventProcessor.IsRunning() +} + +func (m *WorkflowOrchestrator) HandleEvent(ctx context.Context, event misas.Event) misas.Error { + switch e := event.(type) { + case workflowmgmt.WorkflowTriggeredEvent: + m.runWorkflow(ctx, e) + return nil + } + + return nil +} + +func (m *WorkflowOrchestrator) runWorkflow(_ context.Context, e workflowmgmt.WorkflowTriggeredEvent) { + m.Pool.Submit(func() { + runner := Worker{ + Clock: m.Clock, + API: m.API, + LeaseManager: m.LeaseManager, + InstanceID: m.UUIDGenerator.Generate().String(), + } + if err := runner.Run(context.Background(), e.WorkflowID, e.RunID); err != nil { + fmt.Println("workflow runner error:", err) + } + }) +} diff --git a/internal/application/orchestrator/worker.go b/internal/application/orchestrator/worker.go new file mode 100644 index 0000000..1849d8e --- /dev/null +++ b/internal/application/orchestrator/worker.go @@ -0,0 +1,91 @@ +package orchestrator + +import ( + "context" + "fmt" + "github.com/morebec/smallflow/internal/business/workflowmgmt" + "time" + + "github.com/morebec/go-misas/misas" +) + +const defaultLeaseHeartbeatDuration = time.Second * 30 + +type Worker struct { + Clock misas.Clock + API misas.BusinessAPI + LeaseManager WorkflowLeaseManager + InstanceID string +} + +func (r Worker) acquireLease(ctx context.Context, workflowID, runID string) (context.Context, context.CancelFunc, error) { + acquired, err := r.LeaseManager.TryAcquire(ctx, workflowID, runID) + if err != nil { + return nil, nil, err + } + + if !acquired { + // Could not acquire lease, another runner instance is likely handling this workflow run. + fmt.Printf("Could not acquire lease for workflow run: {workflowID: %s, runID: %s}\n", workflowID, runID) + return nil, nil, nil + } + fmt.Printf("Lease acquired for workflow run: {workflowID: %s, runID: %s}\n", workflowID, runID) + + ctx, cancel := context.WithCancel(ctx) + + ticker := time.NewTicker(defaultLeaseHeartbeatDuration) + go func() { + for { + select { + case <-ticker.C: + if err := r.LeaseManager.RenewLease(ctx, workflowID, runID); err != nil { + fmt.Printf( + "Failed to renew lease for workflow run: {workflowID: %s, runID: %s}: %s\n", + workflowID, + runID, + err, + ) + } + case <-ctx.Done(): + return + } + } + }() + + stopHeartbeat := func() { + ticker.Stop() + cancel() + // Release the lease when stopping the heartbeat + // use context.Background() given we just canceled the original context. + if err := r.LeaseManager.Release(context.Background(), workflowID, runID); err != nil { + fmt.Printf( + "Failed to release lease for workflow run: {workflowID: %s, runID: %s}: %s\n", + workflowID, + runID, + err, + ) + } + fmt.Printf("Lease released for workflow run: {workflowID: %s, runID: %s}\n", workflowID, runID) + } + + return ctx, stopHeartbeat, nil +} + +func (r Worker) Run(ctx context.Context, workflowID, runID string) error { + ctx, releaseLease, err := r.acquireLease(ctx, workflowID, runID) + if err != nil { + return err + } + if releaseLease == nil { + // Lease not acquired, another runner is handling this workflow run. + return nil + } + defer releaseLease() + + result := r.API.HandleCommand(ctx, workflowmgmt.RunWorkflowCommand{ + WorkflowID: workflowID, + RunID: runID, + }) + + return result.Error +} diff --git a/internal/business/workflowmgmt/action.go b/internal/business/workflowmgmt/action.go new file mode 100644 index 0000000..7b0b462 --- /dev/null +++ b/internal/business/workflowmgmt/action.go @@ -0,0 +1,22 @@ +package workflowmgmt + +import "context" + +type ActionID string + +type Action interface { + Run(ctx context.Context) *WorkflowError + ID() ActionID +} + +type ActionFunc struct { + fn func(ctx context.Context) *WorkflowError + id ActionID +} + +func (a *ActionFunc) Run(ctx context.Context) *WorkflowError { return a.fn(ctx) } +func (a *ActionFunc) ID() ActionID { return a.id } + +func NewActionFunc(id ActionID, fn func(ctx context.Context) *WorkflowError) *ActionFunc { + return &ActionFunc{id: id, fn: fn} +} diff --git a/internal/business/workflowmgmt/api.go b/internal/business/workflowmgmt/api.go new file mode 100644 index 0000000..c852a9f --- /dev/null +++ b/internal/business/workflowmgmt/api.go @@ -0,0 +1,228 @@ +package workflowmgmt + +import ( + "fmt" + "time" + + "github.com/morebec/go-misas/misas" + "github.com/morebec/go-misas/muuid" + "github.com/morebec/go-misas/mx" +) + +func NewSubsystem( + clock misas.Clock, + workflowRepo WorkflowRepository, + runRepository RunRepository, + uidg muuid.UUIDGenerator, +) misas.BusinessSubsystem { + return mx.NewBusinessSubsystemAssembler(). + WithCommandHandler(EnableWorkflowCommandTypeName, mx.NewTypedCommandHandler(EnableWorkflowCommandHandler{ + WorkflowRepository: workflowRepo, + Clock: clock, + })). + WithCommandHandler(DisableWorkflowCommandTypeName, mx.NewTypedCommandHandler(DisableWorkflowCommandHandler{ + WorkflowRepository: workflowRepo, + Clock: clock, + })). + WithCommandHandler(TriggerWorkflowCommandTypeName, mx.NewTypedCommandHandler(TriggerWorkflowCommandHandler{ + WorkflowRepository: workflowRepo, + UUIDGenerator: uidg, + Clock: clock, + })). + WithCommandHandler(RunWorkflowCommandTypeName, mx.NewTypedCommandHandler(RunWorkflowCommandHandler{ + WorkflowRepository: workflowRepo, + RunRepository: runRepository, + Clock: clock, + })). + WithCommandHandler(ResumeWorkflowRunCommandTypeName, mx.NewTypedCommandHandler(ResumeWorkflowRunCommandHandler{ + WorkflowRepository: workflowRepo, + RunRepository: runRepository, + })). + Assemble() +} + +// TriggerWorkflowCommand represents a command to trigger a new run of a +// workflow. +// +// This command is idempotent based on the RunID provided. If a run +// with the same RunID already exists for the given workflow, this command will +// have no effect and will succeed. +// +// If no RunID is provided, a new unique RunID +// will be generated. +// +// The workflow must be enabled for this command to succeed. +// If the workflow is disabled, this command will fail. If the workflow does not +// exist, this command will fail. If the concurrent runs limit has been reached, +// this command will fail. +type TriggerWorkflowCommand struct { + WorkflowID string + RunID string +} + +const TriggerWorkflowCommandTypeName = "TriggerWorkflowCommand" + +func (TriggerWorkflowCommand) TypeName() misas.CommandTypeName { return TriggerWorkflowCommandTypeName } + +type WorkflowTriggeredEvent struct { + WorkflowID string + RunID string + TriggeredAt time.Time +} + +const WorkflowTriggeredEventTypeName = "WorkflowTriggeredEvent" + +func (WorkflowTriggeredEvent) TypeName() misas.EventTypeName { return WorkflowTriggeredEventTypeName } + +// EnableWorkflowCommand represents a command to enable a workflow. Enabling a +// workflow allows new runs to be triggered. If the workflow does not exist, this +// command will fail. +type EnableWorkflowCommand struct { + WorkflowID string +} + +const EnableWorkflowCommandTypeName = "EnableWorkflowCommand" + +func (EnableWorkflowCommand) TypeName() misas.CommandTypeName { return EnableWorkflowCommandTypeName } + +// WorkflowEnabledEvent is emitted when a workflow is successfully enabled. + +type WorkflowEnabledEvent struct { + WorkflowID string + EnabledAt time.Time +} + +const WorkflowEnabledEventTypeName = "WorkflowEnabledEvent" + +func (WorkflowEnabledEvent) TypeName() misas.EventTypeName { return WorkflowEnabledEventTypeName } + +// DisableWorkflowCommand represents a command to disable a workflow. Disabling a +// workflow prevents new runs from being triggered, but does not affect currently +// active runs. +type DisableWorkflowCommand struct { + WorkflowID string +} + +const DisableWorkflowCommandTypeName = "DisableWorkflowCommand" + +func (DisableWorkflowCommand) TypeName() misas.CommandTypeName { return DisableWorkflowCommandTypeName } + +type WorkflowDisabledEvent struct { + WorkflowID string + DisabledAt time.Time + ActiveRuns int +} + +const WorkflowDisabledEventTypeName = "WorkflowDisabledEvent" + +func (WorkflowDisabledEvent) TypeName() misas.EventTypeName { return WorkflowDisabledEventTypeName } + +type WorkflowStartedEvent struct { + WorkflowID string + RunID string + StartedAt time.Time +} + +const WorkflowStartedEventTypeName = "WorkflowStartedEvent" + +func (WorkflowStartedEvent) TypeName() misas.EventTypeName { return WorkflowStartedEventTypeName } + +type WorkflowEndedEvent struct { + WorkflowID string + RunID string + EndedAt time.Time + StartedAt time.Time + Errors map[string]*WorkflowError + Status string +} + +const WorkflowEndedEventTypeName = "WorkflowEndedEvent" + +func (WorkflowEndedEvent) TypeName() misas.EventTypeName { return WorkflowEndedEventTypeName } + +type StepStartedEvent struct { + WorkflowID string + RunID string + StepID string + ActionID string + StartedAt time.Time + IgnoreErrors bool +} + +const StepStartedEventTypeName = "StepStartedEvent" + +func (StepStartedEvent) TypeName() misas.EventTypeName { return StepStartedEventTypeName } + +type StepEndedEvent struct { + WorkflowID string + RunID string + StepID string + ActionID string + EndedAt time.Time + Error *WorkflowError + Status string +} + +const StepEndedEventTypeName = "StepEndedEvent" + +func (StepEndedEvent) TypeName() misas.EventTypeName { return StepEndedEventTypeName } + +type WorkflowError struct { + Kind string // e.g. "user", "system", "internal" + Code string // e.g. "timeout", "network_error", "invalid_input" + Message string // human-readable message + Details map[string]any // additional details, e.g. {"go_error": "error message"} +} + +func (e WorkflowError) AsError() error { + return fmt.Errorf("%s(%s): %s", e.Kind, e.Code, e.Message) +} + +// RunWorkflowCommand runs a workflow synchronously. +// This command is idempotent based on the RunID provided: +// If a run with the same RunID already exists for the given workflow, this command will +// have no effect and will succeed. +// If no RunID is provided, this command will fail. +// This command is intended to be run after a workflow has been triggered. +// +// This command will not fail if the workflow or any of its steps fail, as these +// are expected outcomes of a workflow run. Instead, the errors will be recorded +// in the StepEndedEvent and WorkflowEndedEvent. +// This command will only return if internal errors have occurred, such as +// issues with the data storage. +type RunWorkflowCommand struct { + WorkflowID string + RunID string +} + +const RunWorkflowCommandTypeName = "RunWorkflowCommand" + +func (RunWorkflowCommand) TypeName() misas.CommandTypeName { return RunWorkflowCommandTypeName } + +type ResumeWorkflowRunCommand struct { + WorkflowID string + RunID string +} + +const ResumeWorkflowRunCommandTypeName = "ResumeWorkflowRunCommand" + +func (ResumeWorkflowRunCommand) TypeName() misas.CommandTypeName { + return ResumeWorkflowRunCommandTypeName +} + +type WorkflowRunReport struct { + WorkflowID string + RunID string + StartedAt time.Time + EndedAt time.Time + Errors map[string]*WorkflowError + Status string +} + +const ErrorCodeWorkflowNotFound misas.ErrorCode = "workflow_not_found" + +var ErrWorkflowNotFound = mx.ErrNotFound.WithCode(ErrorCodeWorkflowNotFound).WithMessage("workflow not found") + +const ErrorCodeRunNotFound misas.ErrorCode = "workflow_run_not_found" + +var ErrWorkflowRunNotFound = mx.ErrNotFound.WithCode(ErrorCodeRunNotFound).WithMessage("workflow run not found") diff --git a/internal/business/workflowmgmt/run.go b/internal/business/workflowmgmt/run.go new file mode 100644 index 0000000..9f5f5fd --- /dev/null +++ b/internal/business/workflowmgmt/run.go @@ -0,0 +1,195 @@ +package workflowmgmt + +import ( + "fmt" + "time" + + "github.com/morebec/go-misas/misas" + "github.com/morebec/go-misas/mx" +) + +type WorkflowStatus string + +const ( + WorkflowStatusFailed WorkflowStatus = "failed" + WorkflowStatusSucceeded WorkflowStatus = "succeeded" +) + +type RunID string + +type Run struct { + ID RunID + WorkflowID WorkflowID + StartedAt time.Time + EndedAt *time.Time + Error *WorkflowError + CurrentStepID StepID + Steps map[StepID]*StepRun + + events []misas.Event + Status WorkflowStatus +} + +func StartRun(workflowID WorkflowID, id RunID, startedAt time.Time) *Run { + run := &Run{} + run.record(WorkflowStartedEvent{ + RunID: string(id), + WorkflowID: string(workflowID), + StartedAt: startedAt, + }) + return run +} + +func (r *Run) StartStep(stepID StepID, id ActionID, ignoreErrors bool, currentTime time.Time) misas.Error { + if r.CurrentStepID == stepID { + // already running, idempotent + return nil + } + + if r.CurrentStepID != "" { + return mx.ErrConflict.WithMessage(fmt.Sprintf( + "workflow error: %s: cannot start step %s: step %s is currently running", + r.WorkflowID, + stepID, + r.CurrentStepID, + )) + } + + r.record(StepStartedEvent{ + RunID: string(r.ID), + WorkflowID: string(r.WorkflowID), + StepID: string(stepID), + ActionID: string(id), + StartedAt: currentTime, + IgnoreErrors: ignoreErrors, + }) + + return nil +} + +func (r *Run) EndStep(stepID StepID, err *WorkflowError, currentTime time.Time) misas.Error { + step := r.Steps[stepID] + if step.EndedAt != nil { + // already ended, idempotent + return nil + } + + if r.CurrentStepID != stepID { + return mx.ErrConflict.WithMessage(fmt.Sprintf( + "workflow error: %s: cannot start step %s: step %s is currently running", + r.WorkflowID, + stepID, + r.CurrentStepID, + )) + } + + status := StepStatusSucceeded + if err != nil && !step.IgnoreError { + status = StepStatusFailed + } + + r.record(StepEndedEvent{ + RunID: string(r.ID), + WorkflowID: string(r.WorkflowID), + StepID: string(stepID), + ActionID: string(step.ActionID), + EndedAt: currentTime, + Error: err, + Status: string(status), + }) + + return nil +} + +func (r *Run) End(currentTime time.Time) { + if r.EndedAt != nil { + // Already ended + return + } + + workflowStatus := WorkflowStatusSucceeded + for _, s := range r.Steps { + if s.Status == StepStatusFailed { + workflowStatus = WorkflowStatusFailed + break + } + } + + r.record(WorkflowEndedEvent{ + RunID: string(r.ID), + WorkflowID: string(r.WorkflowID), + StartedAt: r.StartedAt, + EndedAt: currentTime, + Status: string(workflowStatus), + Errors: r.Errors(), + }) +} + +func (r *Run) Errors() map[string]*WorkflowError { + var stepErrors map[string]*WorkflowError + for _, s := range r.Steps { + if stepErrors == nil { + stepErrors = make(map[string]*WorkflowError) + } + stepErrors[string(s.ID)] = s.Error + } + return stepErrors +} + +func (r *Run) Apply(events []misas.Event) { + for _, event := range events { + switch e := event.(type) { + case WorkflowStartedEvent: + r.ID = RunID(e.RunID) + r.WorkflowID = WorkflowID(e.WorkflowID) + r.Steps = make(map[StepID]*StepRun) + r.StartedAt = e.StartedAt + + case WorkflowEndedEvent: + r.EndedAt = &e.EndedAt + r.Status = WorkflowStatus(e.Status) + + case StepStartedEvent: + r.CurrentStepID = StepID(e.StepID) + r.Steps[StepID(e.StepID)] = &StepRun{ + ID: StepID(e.StepID), + StartedAt: e.StartedAt, + ActionID: ActionID(e.ActionID), + IgnoreError: e.IgnoreErrors, + } + + case StepEndedEvent: + r.CurrentStepID = "" + step := r.Steps[StepID(e.StepID)] + step.EndedAt = &e.EndedAt + step.Error = e.Error + step.Status = StepStatus(e.Status) + } + } +} + +func (r *Run) UncommittedEvents() []misas.Event { return r.events } + +func (r *Run) record(event misas.Event) { + r.events = append(r.events, event) + r.Apply([]misas.Event{event}) +} + +func (r *Run) Commit() { r.events = nil } + +type StepStatus string + +const ( + StepStatusFailed StepStatus = "failed" + StepStatusSucceeded StepStatus = "succeeded" +) + +type StepRun struct { + ID StepID + StartedAt time.Time + EndedAt *time.Time + Error *WorkflowError + ActionID ActionID + Status StepStatus + IgnoreError bool +} diff --git a/internal/business/workflowmgmt/runner.go b/internal/business/workflowmgmt/runner.go new file mode 100644 index 0000000..8da24d0 --- /dev/null +++ b/internal/business/workflowmgmt/runner.go @@ -0,0 +1,129 @@ +package workflowmgmt + +import ( + "context" + "fmt" + + "github.com/morebec/go-misas/misas" + "github.com/morebec/go-misas/mx" +) + +type WorkflowRunner struct { + RunRepository RunRepository + WorkflowRepository WorkflowRepository + Clock misas.Clock +} + +func (r WorkflowRunner) Run(ctx context.Context, wf *Workflow, rID RunID) (WorkflowRunReport, misas.Error) { + workflowID := wf.ID() + run, err := r.RunRepository.FindByID(ctx, string(workflowID), string(rID)) + if err != nil { + return WorkflowRunReport{}, err + } + if run != nil { + // Run already exists, nothing to do. + return WorkflowRunReport{}, nil + } + + run = StartRun(workflowID, rID, r.Clock.Now()) + if err := r.RunRepository.Add(ctx, run); err != nil { + return WorkflowRunReport{}, err + } + + if err := r.executeRun(ctx, run, wf.Definition().Steps); err != nil { + return WorkflowRunReport{}, err + } + + return newWorkflowRunReport(run), nil +} + +func (r WorkflowRunner) ResumeFromStep(ctx context.Context, wf *Workflow, rID RunID) (WorkflowRunReport, misas.Error) { + workflowID := wf.ID() + run, err := r.RunRepository.FindByID(ctx, string(workflowID), string(rID)) + if err != nil { + return WorkflowRunReport{}, err + } + if run == nil { + // TODO: better error message. + return WorkflowRunReport{}, mx.ErrNotFound.WithMessage(fmt.Sprintf("workflow run not found: %s", rID)) + } + + steps := r.remainingStepsFrom(wf.Definition().Steps, run.CurrentStepID) + if err := r.executeRun(ctx, run, steps); err != nil { + return WorkflowRunReport{}, err + } + + return newWorkflowRunReport(run), nil +} + +func (r WorkflowRunner) runSteps(ctx context.Context, run *Run, steps []StepDefinition) misas.Error { + // If a start step ID is provided, set from to false until we encounter it. + for _, step := range steps { + if err := r.runStep(ctx, run, step); err != nil { + return err + } + } + + return nil +} + +func (r WorkflowRunner) runStep(ctx context.Context, run *Run, step StepDefinition) misas.Error { + if err := run.StartStep(step.ID, step.Action.ID(), step.IgnoreError, r.Clock.Now()); err != nil { + return err + } + if err := r.RunRepository.Save(ctx, run); err != nil { + return err + } + + workflowErr := r.runStepAction(ctx, step) + + if err := run.EndStep(step.ID, workflowErr, r.Clock.Now()); err != nil { + return err + } + if err := r.RunRepository.Save(ctx, run); err != nil { + return err + } + + return nil +} + +func (r WorkflowRunner) runStepAction(ctx context.Context, step StepDefinition) *WorkflowError { + return step.Action.Run(ctx) +} + +func (r WorkflowRunner) executeRun(ctx context.Context, run *Run, steps []StepDefinition) misas.Error { + err := r.runSteps(ctx, run, steps) + + run.End(r.Clock.Now()) + if repoErr := r.RunRepository.Save(ctx, run); repoErr != nil { + err = misas.ErrorGroup{err, repoErr} + } + + return err +} + +func (r WorkflowRunner) remainingStepsFrom(steps []StepDefinition, start StepID) []StepDefinition { + var remaining []StepDefinition + include := false + for _, step := range steps { + if !include && step.ID == start { + include = true + } + if include { + remaining = append(remaining, step) + } + } + + return remaining +} + +func newWorkflowRunReport(run *Run) WorkflowRunReport { + return WorkflowRunReport{ + WorkflowID: string(run.WorkflowID), + RunID: string(run.ID), + StartedAt: run.StartedAt, + EndedAt: *run.EndedAt, + Errors: run.Errors(), + Status: string(run.Status), + } +} diff --git a/internal/business/workflowmgmt/spi.go b/internal/business/workflowmgmt/spi.go new file mode 100644 index 0000000..4321a83 --- /dev/null +++ b/internal/business/workflowmgmt/spi.go @@ -0,0 +1,18 @@ +package workflowmgmt + +import ( + "context" + + "github.com/morebec/go-misas/misas" +) + +type WorkflowRepository interface { + FindByID(ctx context.Context, workflowID string) (*Workflow, misas.Error) + Save(ctx context.Context, wf *Workflow) misas.Error +} + +type RunRepository interface { + FindByID(ctx context.Context, workflowID string, runID string) (*Run, misas.Error) + Add(ctx context.Context, run *Run) misas.Error + Save(ctx context.Context, r *Run) misas.Error +} diff --git a/internal/business/workflowmgmt/util.go b/internal/business/workflowmgmt/util.go new file mode 100644 index 0000000..dec9572 --- /dev/null +++ b/internal/business/workflowmgmt/util.go @@ -0,0 +1,19 @@ +package workflowmgmt + +import "github.com/morebec/go-misas/misas" + +func NewWorkflowNotFoundError(workflowID any) misas.Error { + switch id := workflowID.(type) { + case WorkflowID: + return ErrWorkflowNotFound.WithAppendedMessage(string(id)) + case string: + return ErrWorkflowNotFound.WithAppendedMessage(id) + } + panic("invalid type for workflowID") +} + +func WorkflowNotFoundCommandResult(workflowID any) misas.CommandResult { + return misas.CommandResult{ + Error: NewWorkflowNotFoundError(workflowID), + } +} diff --git a/internal/business/workflowmgmt/workflow.go b/internal/business/workflowmgmt/workflow.go new file mode 100644 index 0000000..a2bffdb --- /dev/null +++ b/internal/business/workflowmgmt/workflow.go @@ -0,0 +1,107 @@ +package workflowmgmt + +import ( + "fmt" + "time" + + "github.com/morebec/go-misas/misas" + "github.com/morebec/go-misas/mx" +) + +type WorkflowID string + +type Workflow struct { + enabled bool + definition WorkflowDefinition + + runIds map[RunID]struct{} + activeRuns map[RunID]struct{} + + events []misas.Event +} + +func NewWorkflow(d WorkflowDefinition, enabled bool) *Workflow { + return &Workflow{ + definition: d, + enabled: enabled, + runIds: make(map[RunID]struct{}), + activeRuns: make(map[RunID]struct{}), + } +} + +func (w *Workflow) ID() WorkflowID { return w.definition.ID } + +func (w *Workflow) Trigger(id RunID, currentTime time.Time) misas.Error { + if _, exists := w.runIds[id]; exists { + return nil // idempotent + } + + workflowID := w.ID() + if !w.enabled { + return mx.ErrConflict.WithMessage(fmt.Sprintf("workflow is not enabled: %s", workflowID)) + } + + if w.definition.ConcurrencyLimit != ConcurrencyLimitNone && + len(w.activeRuns) >= int(w.definition.ConcurrencyLimit) { + return mx.ErrConflict.WithMessage(fmt.Sprintf("workflow concurrency limit reached: %s", workflowID)) + } + + w.record(WorkflowTriggeredEvent{ + WorkflowID: string(workflowID), + RunID: string(id), + TriggeredAt: currentTime, + }) + + return nil +} + +func (w *Workflow) Enable(now time.Time) { + if w.enabled { + return + } + + w.record(WorkflowEnabledEvent{ + WorkflowID: string(w.ID()), + EnabledAt: now, + }) +} + +func (w *Workflow) Disable(now time.Time) { + if !w.enabled { + return + } + + w.record(WorkflowDisabledEvent{ + WorkflowID: string(w.ID()), + DisabledAt: now, + }) +} + +func (w *Workflow) record(event misas.Event) { + w.Apply([]misas.Event{event}) + w.events = append(w.events, event) +} + +func (w *Workflow) UncommittedEvents() []misas.Event { return w.events } + +func (w *Workflow) Apply(events []misas.Event) { + for _, event := range events { + switch e := event.(type) { + case WorkflowTriggeredEvent: + w.runIds[RunID(e.RunID)] = struct{}{} + w.activeRuns[RunID(e.RunID)] = struct{}{} + case WorkflowEndedEvent: + delete(w.activeRuns, RunID(e.RunID)) + + case WorkflowEnabledEvent: + w.enabled = true + + case WorkflowDisabledEvent: + w.enabled = false + } + } +} + +func (w *Workflow) Definition() WorkflowDefinition { return w.definition } + +func (w *Workflow) Commit() { w.events = nil } diff --git a/internal/business/workflowmgmt/workflow_definition.go b/internal/business/workflowmgmt/workflow_definition.go new file mode 100644 index 0000000..d8223b0 --- /dev/null +++ b/internal/business/workflowmgmt/workflow_definition.go @@ -0,0 +1,21 @@ +package workflowmgmt + +type StepID string + +type StepDefinition struct { + ID StepID + IgnoreError bool + Action Action +} + +type ConcurrencyLimit int + +const ( + ConcurrencyLimitNone ConcurrencyLimit = 0 +) + +type WorkflowDefinition struct { + ID WorkflowID + ConcurrencyLimit ConcurrencyLimit + Steps []StepDefinition +} diff --git a/internal/business/workflowmgmt/workflow_disable.go b/internal/business/workflowmgmt/workflow_disable.go new file mode 100644 index 0000000..b8e4ec4 --- /dev/null +++ b/internal/business/workflowmgmt/workflow_disable.go @@ -0,0 +1,27 @@ +package workflowmgmt + +import ( + "context" + + "github.com/morebec/go-misas/misas" + "github.com/morebec/go-misas/mx" +) + +type DisableWorkflowCommandHandler struct { + Clock misas.Clock + WorkflowRepository WorkflowRepository +} + +func (h DisableWorkflowCommandHandler) Handle(ctx context.Context, cmd DisableWorkflowCommand) misas.CommandResult { + workflow, err := h.WorkflowRepository.FindByID(ctx, cmd.WorkflowID) + if err != nil { + return mx.CommandResultFromError(err) + } + if workflow == nil { + return WorkflowNotFoundCommandResult(cmd.WorkflowID) + } + + workflow.Disable(h.Clock.Now()) + + return mx.CommandResultFromError(h.WorkflowRepository.Save(ctx, workflow)) +} diff --git a/internal/business/workflowmgmt/workflow_enable.go b/internal/business/workflowmgmt/workflow_enable.go new file mode 100644 index 0000000..5fa9bf5 --- /dev/null +++ b/internal/business/workflowmgmt/workflow_enable.go @@ -0,0 +1,27 @@ +package workflowmgmt + +import ( + "context" + + "github.com/morebec/go-misas/misas" + "github.com/morebec/go-misas/mx" +) + +type EnableWorkflowCommandHandler struct { + Clock misas.Clock + WorkflowRepository WorkflowRepository +} + +func (h EnableWorkflowCommandHandler) Handle(ctx context.Context, cmd EnableWorkflowCommand) misas.CommandResult { + workflow, err := h.WorkflowRepository.FindByID(ctx, cmd.WorkflowID) + if err != nil { + return mx.CommandResultFromError(err) + } + if workflow == nil { + return WorkflowNotFoundCommandResult(cmd.WorkflowID) + } + + workflow.Enable(h.Clock.Now()) + + return mx.CommandResultFromError(h.WorkflowRepository.Save(ctx, workflow)) +} diff --git a/internal/business/workflowmgmt/workflow_resume.go b/internal/business/workflowmgmt/workflow_resume.go new file mode 100644 index 0000000..217f59c --- /dev/null +++ b/internal/business/workflowmgmt/workflow_resume.go @@ -0,0 +1,31 @@ +package workflowmgmt + +import ( + "context" + + "github.com/morebec/go-misas/misas" + "github.com/morebec/go-misas/mx" +) + +type ResumeWorkflowRunCommandHandler struct { + WorkflowRepository WorkflowRepository + RunRepository RunRepository + Runner WorkflowRunner +} + +func (h ResumeWorkflowRunCommandHandler) Handle(ctx context.Context, cmd ResumeWorkflowRunCommand) misas.CommandResult { + wf, err := h.WorkflowRepository.FindByID(ctx, cmd.WorkflowID) + if err != nil { + return mx.CommandResultFromError(err) + } + if wf == nil { + return WorkflowNotFoundCommandResult(cmd.WorkflowID) + } + + report, err := h.Runner.ResumeFromStep(ctx, wf, RunID(cmd.RunID)) + if err != nil { + return mx.CommandResultFromError(err) + } + + return misas.CommandResult{Payload: report} +} diff --git a/internal/business/workflowmgmt/workflow_run.go b/internal/business/workflowmgmt/workflow_run.go new file mode 100644 index 0000000..aff5222 --- /dev/null +++ b/internal/business/workflowmgmt/workflow_run.go @@ -0,0 +1,37 @@ +package workflowmgmt + +import ( + "context" + + "github.com/morebec/go-misas/misas" + "github.com/morebec/go-misas/mx" +) + +type RunWorkflowCommandHandler struct { + Clock misas.Clock + WorkflowRepository WorkflowRepository + RunRepository RunRepository +} + +func (h RunWorkflowCommandHandler) Handle(ctx context.Context, cmd RunWorkflowCommand) misas.CommandResult { + wf, err := h.WorkflowRepository.FindByID(ctx, cmd.WorkflowID) + if err != nil { + return mx.CommandResultFromError(err) + } + if wf == nil { + return WorkflowNotFoundCommandResult(cmd.WorkflowID) + } + + runner := WorkflowRunner{ + RunRepository: h.RunRepository, + WorkflowRepository: h.WorkflowRepository, + Clock: h.Clock, + } + + report, err := runner.Run(ctx, wf, RunID(cmd.RunID)) + if err != nil { + return mx.CommandResultFromError(err) + } + + return misas.CommandResult{Payload: report} +} diff --git a/internal/business/workflowmgmt/workflow_trigger.go b/internal/business/workflowmgmt/workflow_trigger.go new file mode 100644 index 0000000..aa40512 --- /dev/null +++ b/internal/business/workflowmgmt/workflow_trigger.go @@ -0,0 +1,35 @@ +package workflowmgmt + +import ( + "context" + + "github.com/morebec/go-misas/misas" + "github.com/morebec/go-misas/muuid" + "github.com/morebec/go-misas/mx" +) + +type TriggerWorkflowCommandHandler struct { + WorkflowRepository WorkflowRepository + UUIDGenerator muuid.UUIDGenerator + Clock misas.Clock +} + +func (h TriggerWorkflowCommandHandler) Handle(ctx context.Context, cmd TriggerWorkflowCommand) misas.CommandResult { + wf, err := h.WorkflowRepository.FindByID(ctx, cmd.WorkflowID) + if err != nil { + return mx.CommandResultFromError(err) + } + if wf == nil { + return WorkflowNotFoundCommandResult(cmd.WorkflowID) + } + + if cmd.RunID == "" { + cmd.RunID = h.UUIDGenerator.Generate().String() + } + + if err := wf.Trigger(RunID(cmd.RunID), h.Clock.Now()); err != nil { + return mx.CommandResultFromError(err) + } + + return mx.CommandResultFromError(h.WorkflowRepository.Save(ctx, wf)) +} diff --git a/internal/integration/inmemory/lease_inmemory.go b/internal/integration/inmemory/lease_inmemory.go new file mode 100644 index 0000000..0bd31ad --- /dev/null +++ b/internal/integration/inmemory/lease_inmemory.go @@ -0,0 +1,47 @@ +package adapters + +import ( + "context" + "github.com/morebec/smallflow/internal/application/orchestrator" + "sync" +) + +type InMemoryWorkflowLeaseRepository struct { + mu sync.Mutex + leases map[string]orchestrator.WorkflowLease +} + +func NewInMemoryWorkflowLeaseRepository() *InMemoryWorkflowLeaseRepository { + return &InMemoryWorkflowLeaseRepository{leases: make(map[string]orchestrator.WorkflowLease)} +} + +func (r *InMemoryWorkflowLeaseRepository) Add(_ context.Context, lease orchestrator.WorkflowLease) error { + r.mu.Lock() + defer r.mu.Unlock() + r.leases[lease.WorkflowID+lease.RunID] = lease + return nil +} + +func (r *InMemoryWorkflowLeaseRepository) Update(_ context.Context, lease orchestrator.WorkflowLease) error { + r.mu.Lock() + defer r.mu.Unlock() + r.leases[lease.WorkflowID+lease.RunID] = lease + return nil +} + +func (r *InMemoryWorkflowLeaseRepository) Remove(_ context.Context, workflowID string, runID string) error { + r.mu.Lock() + defer r.mu.Unlock() + delete(r.leases, workflowID+runID) + return nil +} + +func (r *InMemoryWorkflowLeaseRepository) FindByWorkflowRunID(_ context.Context, workflowID string, runID string) (*orchestrator.WorkflowLease, error) { + r.mu.Lock() + defer r.mu.Unlock() + lease, ok := r.leases[workflowID+runID] + if !ok { + return nil, nil + } + return &lease, nil +} diff --git a/internal/integration/postgres/lease_repo.go b/internal/integration/postgres/lease_repo.go new file mode 100644 index 0000000..fbafcf5 --- /dev/null +++ b/internal/integration/postgres/lease_repo.go @@ -0,0 +1,86 @@ +package postgres + +import ( + "context" + "github.com/morebec/smallflow/internal/application/orchestrator" + + "github.com/morebec/go-misas/mpostgres" +) + +type WorkflowLeaseRepository struct { + conn mpostgres.DB + collection mpostgres.Collection +} + +func NewWorkflowLeaseRepository(conn mpostgres.DB) (WorkflowLeaseRepository, error) { + ctx := context.Background() + docStore, err := mpostgres.NewDocumentStore(ctx, conn) + if err != nil { + return WorkflowLeaseRepository{}, err + } + + collection, err := docStore.Collection("workflow_leases") + if err != nil { + return WorkflowLeaseRepository{}, err + } + if err := collection.Create(ctx); err != nil { + return WorkflowLeaseRepository{}, err + } + + return WorkflowLeaseRepository{conn: conn, collection: collection}, nil +} + +func (r WorkflowLeaseRepository) Add(ctx context.Context, lease orchestrator.WorkflowLease) error { + doc, err := mpostgres.NewDocument(r.workflowLeasID(lease.WorkflowID, lease.RunID), lease) + if err != nil { + return err + } + + if err := r.collection.Add(ctx, doc); err != nil { + return err + } + + return nil +} + +func (r WorkflowLeaseRepository) workflowLeasID(workflowID, runID string) string { + return workflowID + "/" + runID +} + +func (r WorkflowLeaseRepository) Update(ctx context.Context, lease orchestrator.WorkflowLease) error { + doc, err := mpostgres.NewDocument(r.workflowLeasID(lease.WorkflowID, lease.RunID), lease) + if err != nil { + return err + } + + if _, err := r.collection.Update(ctx, doc); err != nil { + return err + } + + return nil +} + +func (r WorkflowLeaseRepository) Remove(ctx context.Context, workflowID string, runID string) error { + if _, err := r.collection.RemoveByID(ctx, r.workflowLeasID(workflowID, runID)); err != nil { + return err + } + + return nil +} + +func (r WorkflowLeaseRepository) FindByWorkflowRunID(ctx context.Context, workflowID string, runID string) (*orchestrator.WorkflowLease, error) { + doc, err := r.collection.FindByID(ctx, r.workflowLeasID(workflowID, runID)) + if err != nil { + return nil, err + } + if doc == nil { + return nil, nil + } + + var wl *orchestrator.WorkflowLease + if err := doc.Unmarshal(&wl); err != nil { + return nil, err + } + + return wl, nil +} diff --git a/internal/integration/postgres/run_repo.go b/internal/integration/postgres/run_repo.go new file mode 100644 index 0000000..7710e4d --- /dev/null +++ b/internal/integration/postgres/run_repo.go @@ -0,0 +1,98 @@ +package postgres + +import ( + "context" + "github.com/morebec/smallflow/internal/business/workflowmgmt" + + "github.com/morebec/go-misas/misas" + "github.com/morebec/go-misas/muuid" + "github.com/morebec/go-misas/mx" + "github.com/samber/lo" +) + +type EventStoreRunRepository struct { + UUIDGenerator muuid.UUIDGenerator + EventStore misas.EventStore +} + +func (r EventStoreRunRepository) Add(ctx context.Context, run *workflowmgmt.Run) misas.Error { + descriptors := lo.Map(run.UncommittedEvents(), func(event misas.Event, _ int) misas.EventDescriptor { + return misas.EventDescriptor{ + ID: r.UUIDGenerator.Generate().String(), + TypeName: event.TypeName(), + Data: event, + } + }) + + err := r.EventStore.AppendToStream( + ctx, + misas.EventStreamID("runs/"+run.ID), + descriptors, + misas.AppendToEventStreamOptions{}. + ExpectNotExist(), + ) + if err != nil { + return mx.NewInternalErrorFrom(err) + } + + run.Commit() + + return nil +} + +func (r EventStoreRunRepository) Save(ctx context.Context, run *workflowmgmt.Run) misas.Error { + descriptors := lo.Map(run.UncommittedEvents(), func(event misas.Event, _ int) misas.EventDescriptor { + return misas.EventDescriptor{ + ID: r.UUIDGenerator.Generate().String(), + TypeName: event.TypeName(), + Data: event, + } + }) + + err := r.EventStore.AppendToStream( + ctx, + misas.EventStreamID("runs/"+run.ID), + descriptors, + misas.AppendToEventStreamOptions{}, //TODO expected versioning + ) + if err != nil { + return mx.NewInternalErrorFrom(err) + } + + run.Commit() + + return nil +} + +func (r EventStoreRunRepository) FindByID(ctx context.Context, workflowID string, runID string) (*workflowmgmt.Run, misas.Error) { + var run *workflowmgmt.Run + + stream, err := r.EventStore.ReadFromStream( + ctx, + misas.EventStreamID("runs/"+runID), + misas.ReadFromEventStreamOptions{}. + FromStart(). + Forward(), + ) + if err != nil { + if misas.ErrorHasCode(err, misas.ErrEventStreamNotFoundErrorCode) { + return nil, nil + } + + return nil, mx.NewInternalErrorFrom(err) + } + + run = &workflowmgmt.Run{ + ID: workflowmgmt.RunID(runID), + WorkflowID: workflowmgmt.WorkflowID(workflowID), + } + + var events []misas.Event + for _, record := range stream.Events { + events = append(events, record.Data) + } + + run.Apply(events) + + return run, nil +} diff --git a/internal/integration/postgres/workflow_repo.go b/internal/integration/postgres/workflow_repo.go new file mode 100644 index 0000000..d12077e --- /dev/null +++ b/internal/integration/postgres/workflow_repo.go @@ -0,0 +1,94 @@ +package postgres + +import ( + "context" + "fmt" + "github.com/morebec/smallflow/internal/business/workflowmgmt" + + "github.com/morebec/go-misas/misas" + "github.com/morebec/go-misas/muuid" + "github.com/morebec/go-misas/mx" + "github.com/samber/lo" +) + +type EventStoreWorkflowRepository struct { + EventStore misas.EventStore + UUIDGenerator muuid.UUIDGenerator +} + +func (r EventStoreWorkflowRepository) FindByID(ctx context.Context, workflowID string) (*workflowmgmt.Workflow, misas.Error) { + stream, err := r.EventStore.ReadFromStream( + ctx, + misas.EventStreamID("workflows/"+workflowID), + misas.ReadFromEventStreamOptions{}. + FromStart(). + Forward(), + ) + if err != nil { + if !misas.ErrorHasCode(err, misas.ErrEventStreamNotFoundErrorCode) { + return nil, mx.NewInternalErrorFrom(err) + } + } + + wf := workflowmgmt.NewWorkflow(workflowmgmt.WorkflowDefinition{ + ID: workflowmgmt.WorkflowID(workflowID), + ConcurrencyLimit: workflowmgmt.ConcurrencyLimitNone, + Steps: []workflowmgmt.StepDefinition{ + { + ID: "step-1", + IgnoreError: false, + Action: workflowmgmt.NewActionFunc("my-action", func(ctx context.Context) *workflowmgmt.WorkflowError { + fmt.Println("Executing my-action") + return nil + }), + }, + { + ID: "step-2", + IgnoreError: false, + Action: workflowmgmt.NewActionFunc("my-action-2", func(ctx context.Context) *workflowmgmt.WorkflowError { + fmt.Println("Executing my-action 2") + return &workflowmgmt.WorkflowError{ + Kind: "internal", + Code: "not_implemented", + Message: "this action is not implemented", + Details: map[string]any{"action_id": "my-action-2"}, + } + //return nil + }), + }, + }, + }, false) + + var events []misas.Event + for _, record := range stream.Events { + events = append(events, record.Data) + } + + wf.Apply(events) + + return wf, nil +} + +func (r EventStoreWorkflowRepository) Save(ctx context.Context, wf *workflowmgmt.Workflow) misas.Error { + descriptors := lo.Map(wf.UncommittedEvents(), func(event misas.Event, _ int) misas.EventDescriptor { + return misas.EventDescriptor{ + ID: r.UUIDGenerator.Generate().String(), + TypeName: event.TypeName(), + Data: event, + } + }) + + err := r.EventStore.AppendToStream( + ctx, + misas.EventStreamID("workflows/"+wf.ID()), + descriptors, + misas.AppendToEventStreamOptions{}, //TODO expected versioning + ) + if err != nil { + return mx.NewInternalErrorFrom(err) + } + + wf.Commit() + + return nil +} diff --git a/specs/business/workflowmgmt.def.hcl b/specs/business/workflowmgmt.def.hcl new file mode 100644 index 0000000..934c368 --- /dev/null +++ b/specs/business/workflowmgmt.def.hcl @@ -0,0 +1,295 @@ +subsystem "workflowmgmt" { + description = "Business subsystem responsible for core business logic related to managing workflows." + type = "business" +} + + +command "workflowmgmt.EnableWorkflow" { + description = "EnableWorkflowCommand represents a command to enable a workflow." + field "WorkflowID" { + description = "ID of the workflow to enable." + type = "identifier" + } +} + +event "workflowmgmt.WorkflowEnabled" { + description = "Event emitted when a workflow is successfully enabled." + field "WorkflowID" { + description = "ID of the workflow that was enabled." + type = "identifier" + } + field "EnabledAt" { + description = "Timestamp when the workflow was enabled." + type = "datetime" + } +} + +command "workflowmgmt.DisableWorkflow" { + description = "DisableWorkflowCommand represents a command to disable a workflow." + field "WorkflowID" { + description = "ID of the workflow to disable." + type = "identifier" + } +} + +event "workflowmgmt.WorkflowDisabled" { + description = "Event emitted when a workflow is successfully disabled." + field "WorkflowID" { + description = "ID of the workflow that was disabled." + type = "identifier" + } + field "DisabledAt" { + description = "Timestamp when the workflow was disabled." + type = "datetime" + } +} + +command "workflowmgmt.TriggerWorkflow" { + description = <