diff --git a/cmd/seed/workflows.go b/cmd/seed/workflows.go index 37e8fae1..e86a0b7f 100644 --- a/cmd/seed/workflows.go +++ b/cmd/seed/workflows.go @@ -254,6 +254,10 @@ func importWorkflowSeedDefinition(tx *gorm.DB, seedDef workflowSeedDefinition) ( } summary.ControlRelationships += relationshipSummary.ControlRelationships + if err := workflows.NewFilterSyncService(tx, nil).SyncFilterForDefinition(defID); err != nil { + return summary, fmt.Errorf("sync workflow filter for seed definition %q: %w", seedDef.Key, err) + } + instanceSummary, err := importWorkflowSeedInstances(tx, seedDef, &defID) if err != nil { return summary, err diff --git a/cmd/seed/workflows_test.go b/cmd/seed/workflows_test.go index 9b55fcd0..436b2153 100644 --- a/cmd/seed/workflows_test.go +++ b/cmd/seed/workflows_test.go @@ -37,6 +37,9 @@ func setupWorkflowSeedTestDB(t *testing.T) *gorm.DB { t.Fatalf("failed to migrate %T: %v", entity, err) } } + if err := db.AutoMigrate(&relational.Control{}, &relational.Filter{}); err != nil { + t.Fatalf("failed to migrate filter entities: %v", err) + } return db } @@ -44,6 +47,7 @@ func setupWorkflowSeedTestDB(t *testing.T) *gorm.DB { func TestImportWorkflowsFromFile(t *testing.T) { db := setupWorkflowSeedTestDB(t) sugar := zap.NewNop().Sugar() + seedWorkflowFilterControl(t, db) summary, err := importWorkflowsFromFile(context.Background(), db, sugar, "testdata/soc2_workflows.sample.json") if err != nil { @@ -58,6 +62,7 @@ func TestImportWorkflowsFromFile(t *testing.T) { assertWorkflowSeedCounts(t, db) assertWorkflowSeedDependencies(t, db) assertWorkflowSeedControlRelationships(t, db) + assertWorkflowSeedFilterControl(t, db) assertWorkflowSeedCronCadence(t, db) secondSummary, err := importWorkflowsFromFile(context.Background(), db, sugar, "testdata/soc2_workflows.sample.json") @@ -71,6 +76,7 @@ func TestImportWorkflowsFromFile(t *testing.T) { t.Fatalf("expected second import to update 2 definitions, got created=%d updated=%d", secondSummary.DefinitionsCreated, secondSummary.DefinitionsUpdated) } assertWorkflowSeedCounts(t, db) + assertWorkflowSeedFilterControl(t, db) } func TestImportWorkflowSeedDefinitionRejectsDuplicateStepNames(t *testing.T) { @@ -475,6 +481,50 @@ func assertWorkflowSeedControlRelationships(t *testing.T, db *gorm.DB) { } } +func seedWorkflowFilterControl(t *testing.T, db *gorm.DB) { + t.Helper() + + catalogID := uuid.MustParse("0f9d8e10-363b-4a8f-ade5-f11c0b2b1202") + control := relational.Control{ + CatalogID: catalogID, + ID: "ctrl-cc5-2-002", + Title: "Technology control scope is defined", + } + if err := db.Create(&control).Error; err != nil { + t.Fatalf("failed to seed workflow filter control: %v", err) + } +} + +func assertWorkflowSeedFilterControl(t *testing.T, db *gorm.DB) { + t.Helper() + + var filters []relational.Filter + if err := db.Preload("Controls"). + Where("name = ?", "Workflow: Technology Controls Governance & Independent Review"). + Find(&filters).Error; err != nil { + t.Fatalf("failed to load workflow filter: %v", err) + } + if len(filters) != 1 { + t.Fatalf("expected one deterministic workflow filter, got %d", len(filters)) + } + + if len(filters[0].Controls) != 1 { + t.Fatalf("expected workflow filter to have one resolved control, got %d", len(filters[0].Controls)) + } + control := filters[0].Controls[0] + if control.CatalogID != uuid.MustParse("0f9d8e10-363b-4a8f-ade5-f11c0b2b1202") || control.ID != "ctrl-cc5-2-002" { + t.Fatalf("expected workflow filter control ctrl-cc5-2-002 in SOC 2 catalog, got catalog=%s control=%s", control.CatalogID, control.ID) + } + + var joinCount int64 + if err := db.Table("filter_controls").Count(&joinCount).Error; err != nil { + t.Fatalf("failed to count filter controls: %v", err) + } + if joinCount != 1 { + t.Fatalf("expected repeated import to keep one filter control association, got %d", joinCount) + } +} + func assertWorkflowSeedCronCadence(t *testing.T, db *gorm.DB) { t.Helper() diff --git a/internal/api/handler/oscal/profile_compliance_workflow_integration_test.go b/internal/api/handler/oscal/profile_compliance_workflow_integration_test.go new file mode 100644 index 00000000..64d77f56 --- /dev/null +++ b/internal/api/handler/oscal/profile_compliance_workflow_integration_test.go @@ -0,0 +1,127 @@ +//go:build integration + +package oscal + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "time" + + "github.com/compliance-framework/api/internal/service/relational" + "github.com/compliance-framework/api/internal/service/relational/workflows" + workflowevidence "github.com/compliance-framework/api/internal/workflow" + "github.com/google/uuid" + "github.com/labstack/echo/v4" + "go.uber.org/zap" +) + +func (suite *ProfileIntegrationSuite) TestComplianceProgressIncludesWorkflowCompletionEvidence() { + suite.Require().NoError(suite.Migrator.Refresh()) + suite.Require().NoError(suite.DB.AutoMigrate( + &workflows.WorkflowDefinition{}, + &workflows.WorkflowInstance{}, + &workflows.RoleAssignment{}, + &workflows.WorkflowExecution{}, + &workflows.WorkflowStepDefinition{}, + &workflows.StepExecution{}, + &workflows.ControlRelationship{}, + )) + + catalogID := uuid.New() + control := relational.Control{CatalogID: catalogID, ID: "ctrl-workflow", Title: "Workflow Control"} + suite.Require().NoError(suite.DB.Create(&control).Error) + + profileID := uuid.New() + profile := relational.Profile{ + UUIDModel: relational.UUIDModel{ID: &profileID}, + Metadata: relational.Metadata{Title: "Workflow Profile"}, + } + suite.Require().NoError(suite.DB.Create(&profile).Error) + suite.Require().NoError(suite.DB.Model(&profile).Association("Controls").Append(&control)) + + definition := workflows.WorkflowDefinition{ + Name: "Workflow Review", + Version: "1.0", + SuggestedCadence: string(workflows.CadenceWeekly), + } + suite.Require().NoError(suite.DB.Create(&definition).Error) + + relationship := workflows.ControlRelationship{ + WorkflowDefinitionID: definition.ID, + ControlID: control.ID, + ControlSource: "Test Catalog", + CatalogID: catalogID.String(), + RelationshipType: "satisfies", + Strength: "primary", + IsActive: true, + } + suite.Require().NoError(suite.DB.Create(&relationship).Error) + suite.Require().NoError(workflows.NewFilterSyncService(suite.DB, zap.NewNop().Sugar()).SyncFilterForDefinition(*definition.ID)) + + sspID := uuid.New() + instance := workflows.WorkflowInstance{ + WorkflowDefinitionID: definition.ID, + Name: "Workflow Instance", + Cadence: string(workflows.CadenceWeekly), + SystemSecurityPlanID: &sspID, + } + suite.Require().NoError(suite.DB.Create(&instance).Error) + + startedAt := time.Now().Add(-time.Hour) + completedAt := time.Now() + execution := workflows.WorkflowExecution{ + WorkflowInstanceID: instance.ID, + Status: workflows.WorkflowStatusCompleted.String(), + TriggeredBy: "manual", + StartedAt: &startedAt, + CompletedAt: &completedAt, + } + suite.Require().NoError(suite.DB.Create(&execution).Error) + suite.Require().NoError(workflowevidence.NewEvidenceIntegration(suite.DB, zap.NewNop().Sugar()).AddExecutionCompletionEvidence(context.Background(), execution.ID)) + + e := echo.New() + req := httptest.NewRequest(http.MethodGet, "/profiles/"+profileID.String()+"/compliance-progress", nil) + rec := httptest.NewRecorder() + ctx := e.NewContext(req, rec) + ctx.SetParamNames("id") + ctx.SetParamValues(profileID.String()) + + suite.Require().NoError(NewProfileHandler(zap.NewNop().Sugar(), suite.DB).ComplianceProgress(ctx)) + suite.Require().Equal(http.StatusOK, rec.Code) + + var response struct { + Data ProfileComplianceProgress `json:"data"` + } + suite.Require().NoError(json.Unmarshal(rec.Body.Bytes(), &response)) + suite.Require().Equal(1, response.Data.Summary.TotalControls) + suite.Require().Equal(1, response.Data.Summary.Satisfied) + suite.Require().Len(response.Data.Controls, 1) + suite.Require().Equal("satisfied", response.Data.Controls[0].ComputedStatus) + + laterStartedAt := completedAt.Add(time.Hour) + nextExecution := workflows.WorkflowExecution{ + WorkflowInstanceID: instance.ID, + Status: workflows.WorkflowStatusPending.String(), + TriggeredBy: "manual", + StartedAt: &laterStartedAt, + } + suite.Require().NoError(suite.DB.Create(&nextExecution).Error) + suite.Require().NoError(workflowevidence.NewEvidenceIntegration(suite.DB, zap.NewNop().Sugar()).AddWorkflowExecutionEvidence(context.Background(), nextExecution.ID, "started")) + + req = httptest.NewRequest(http.MethodGet, "/profiles/"+profileID.String()+"/compliance-progress", nil) + rec = httptest.NewRecorder() + ctx = e.NewContext(req, rec) + ctx.SetParamNames("id") + ctx.SetParamValues(profileID.String()) + + suite.Require().NoError(NewProfileHandler(zap.NewNop().Sugar(), suite.DB).ComplianceProgress(ctx)) + suite.Require().Equal(http.StatusOK, rec.Code) + + suite.Require().NoError(json.Unmarshal(rec.Body.Bytes(), &response)) + suite.Require().Equal(1, response.Data.Summary.TotalControls) + suite.Require().Equal(1, response.Data.Summary.Satisfied) + suite.Require().Len(response.Data.Controls, 1) + suite.Require().Equal("satisfied", response.Data.Controls[0].ComputedStatus) +} diff --git a/internal/api/handler/workflows/common_test.go b/internal/api/handler/workflows/common_test.go index cda1b0fd..bea5662f 100644 --- a/internal/api/handler/workflows/common_test.go +++ b/internal/api/handler/workflows/common_test.go @@ -19,6 +19,8 @@ func setupTestDB(t *testing.T) *gorm.DB { err = db.AutoMigrate( &relational.Metadata{}, &relational.Catalog{}, + &relational.Control{}, + &relational.Filter{}, &relational.User{}, &relational.BackMatterResource{}, &relational.BackMatter{}, diff --git a/internal/api/handler/workflows/control_relationship.go b/internal/api/handler/workflows/control_relationship.go index 91360092..52226a3c 100644 --- a/internal/api/handler/workflows/control_relationship.go +++ b/internal/api/handler/workflows/control_relationship.go @@ -112,7 +112,13 @@ func (h *ControlRelationshipHandler) Create(ctx echo.Context) error { relationship.IsActive = *req.IsActive } - if err := h.service.Create(relationship); err != nil { + err := h.db.Transaction(func(tx *gorm.DB) error { + if err := workflows.NewControlRelationshipService(tx).Create(relationship); err != nil { + return err + } + return workflows.NewFilterSyncService(tx, h.sugar).SyncFilterForDefinition(*relationship.WorkflowDefinitionID) + }) + if err != nil { return h.HandleServiceError(ctx, err, "create", "control relationship") } @@ -225,16 +231,24 @@ func (h *ControlRelationshipHandler) Update(ctx echo.Context) error { updates["strength"] = *req.Strength } - // Use DB directly for partial updates - if len(updates) > 0 { - if err := h.db.Model(&workflows.ControlRelationship{}).Where("id = ?", id).Updates(updates).Error; err != nil { - return h.HandleServiceError(ctx, err, "update", "control relationship") + var relationship *workflows.ControlRelationship + err = h.db.Transaction(func(tx *gorm.DB) error { + if len(updates) > 0 { + if err := tx.Model(&workflows.ControlRelationship{}).Where("id = ?", id).Updates(updates).Error; err != nil { + return err + } } - } - relationship, err := h.service.GetByID(id) + relationshipSvc := workflows.NewControlRelationshipService(tx) + var err error + relationship, err = relationshipSvc.GetByID(id) + if err != nil { + return err + } + return workflows.NewFilterSyncService(tx, h.sugar).SyncFilterForDefinition(*relationship.WorkflowDefinitionID) + }) if err != nil { - return h.HandleServiceError(ctx, err, "get", "control relationship after update") + return h.HandleServiceError(ctx, err, "update", "control relationship") } h.sugar.Infow("Control relationship updated", "id", id) @@ -260,7 +274,18 @@ func (h *ControlRelationshipHandler) Delete(ctx echo.Context) error { return HandleError(err) } - if err := h.service.Delete(id); err != nil { + err = h.db.Transaction(func(tx *gorm.DB) error { + relationshipSvc := workflows.NewControlRelationshipService(tx) + relationship, err := relationshipSvc.GetByID(id) + if err != nil { + return err + } + if err := relationshipSvc.Delete(id); err != nil { + return err + } + return workflows.NewFilterSyncService(tx, h.sugar).SyncFilterForDefinition(*relationship.WorkflowDefinitionID) + }) + if err != nil { return h.HandleServiceError(ctx, err, "delete", "control relationship") } @@ -288,21 +313,26 @@ func (h *ControlRelationshipHandler) Activate(ctx echo.Context) error { return HandleError(err) } - // Check if relationship exists first - _, err = h.service.GetByID(id) + var relationship *workflows.ControlRelationship + err = h.db.Transaction(func(tx *gorm.DB) error { + relationshipSvc := workflows.NewControlRelationshipService(tx) + existing, err := relationshipSvc.GetByID(id) + if err != nil { + return err + } + if err := relationshipSvc.Activate(id); err != nil { + return err + } + relationship, err = relationshipSvc.GetByID(id) + if err != nil { + return err + } + return workflows.NewFilterSyncService(tx, h.sugar).SyncFilterForDefinition(*existing.WorkflowDefinitionID) + }) if err != nil { - return h.HandleServiceError(ctx, err, "get", "control relationship") - } - - if err := h.service.Activate(id); err != nil { return h.HandleServiceError(ctx, err, "activate", "control relationship") } - relationship, err := h.service.GetByID(id) - if err != nil { - return h.HandleServiceError(ctx, err, "get", "control relationship after activation") - } - h.sugar.Infow("Control relationship activated", "id", id) return h.RespondOK(ctx, ControlRelationshipResponse{Data: relationship}) } @@ -327,21 +357,26 @@ func (h *ControlRelationshipHandler) Deactivate(ctx echo.Context) error { return HandleError(err) } - // Check if relationship exists first - _, err = h.service.GetByID(id) + var relationship *workflows.ControlRelationship + err = h.db.Transaction(func(tx *gorm.DB) error { + relationshipSvc := workflows.NewControlRelationshipService(tx) + existing, err := relationshipSvc.GetByID(id) + if err != nil { + return err + } + if err := relationshipSvc.Deactivate(id); err != nil { + return err + } + relationship, err = relationshipSvc.GetByID(id) + if err != nil { + return err + } + return workflows.NewFilterSyncService(tx, h.sugar).SyncFilterForDefinition(*existing.WorkflowDefinitionID) + }) if err != nil { - return h.HandleServiceError(ctx, err, "get", "control relationship") - } - - if err := h.service.Deactivate(id); err != nil { return h.HandleServiceError(ctx, err, "deactivate", "control relationship") } - relationship, err := h.service.GetByID(id) - if err != nil { - return h.HandleServiceError(ctx, err, "get", "control relationship after deactivation") - } - h.sugar.Infow("Control relationship deactivated", "id", id) return h.RespondOK(ctx, ControlRelationshipResponse{Data: relationship}) } diff --git a/internal/api/handler/workflows/workflow_definition.go b/internal/api/handler/workflows/workflow_definition.go index 3a911a7f..e9fc1471 100644 --- a/internal/api/handler/workflows/workflow_definition.go +++ b/internal/api/handler/workflows/workflow_definition.go @@ -9,12 +9,14 @@ import ( type WorkflowDefinitionHandler struct { *BaseHandler + db *gorm.DB service *workflows.WorkflowDefinitionService } func NewWorkflowDefinitionHandler(sugar *zap.SugaredLogger, db *gorm.DB) *WorkflowDefinitionHandler { return &WorkflowDefinitionHandler{ BaseHandler: NewBaseHandler(sugar), + db: db, service: workflows.NewWorkflowDefinitionService(db), } } @@ -79,7 +81,13 @@ func (h *WorkflowDefinitionHandler) Create(ctx echo.Context) error { GracePeriodDays: req.GracePeriodDays, } - if err := h.service.Create(definition); err != nil { + err := h.db.Transaction(func(tx *gorm.DB) error { + if err := workflows.NewWorkflowDefinitionService(tx).Create(definition); err != nil { + return err + } + return workflows.NewFilterSyncService(tx, h.sugar).SyncFilterForDefinition(*definition.ID) + }) + if err != nil { return h.HandleServiceError(ctx, err, "create", "workflow definition") } @@ -185,7 +193,13 @@ func (h *WorkflowDefinitionHandler) Update(ctx echo.Context) error { definition.GracePeriodDays = req.GracePeriodDays } - if err := h.service.Update(id, definition); err != nil { + err = h.db.Transaction(func(tx *gorm.DB) error { + if err := workflows.NewWorkflowDefinitionService(tx).Update(id, definition); err != nil { + return err + } + return workflows.NewFilterSyncService(tx, h.sugar).SyncFilterForDefinition(*id) + }) + if err != nil { return h.HandleServiceError(ctx, err, "update", "workflow definition") } @@ -213,7 +227,13 @@ func (h *WorkflowDefinitionHandler) Delete(ctx echo.Context) error { return HandleError(err) } - if err := h.service.Delete(id); err != nil { + err = h.db.Transaction(func(tx *gorm.DB) error { + if err := workflows.NewFilterSyncService(tx, h.sugar).DeleteFilterForDefinition(*id); err != nil { + return err + } + return workflows.NewWorkflowDefinitionService(tx).Delete(id) + }) + if err != nil { return h.HandleServiceError(ctx, err, "delete", "workflow definition") } diff --git a/internal/api/handler/workflows/workflow_definition_integration_test.go b/internal/api/handler/workflows/workflow_definition_integration_test.go index c791e127..1bc90ee9 100644 --- a/internal/api/handler/workflows/workflow_definition_integration_test.go +++ b/internal/api/handler/workflows/workflow_definition_integration_test.go @@ -10,6 +10,7 @@ import ( "testing" "github.com/compliance-framework/api/internal/api/middleware" + "github.com/compliance-framework/api/internal/service/relational" "github.com/compliance-framework/api/internal/service/relational/workflows" "github.com/google/uuid" "github.com/labstack/echo/v4" @@ -380,6 +381,24 @@ func TestWorkflowDefinitionHandler_Delete(t *testing.T) { require.NoError(t, db.Create(definition).Error) t.Run("Success", func(t *testing.T) { + catalogID := uuid.New() + control := relational.Control{CatalogID: catalogID, ID: "ctrl-1", Title: "Control 1"} + require.NoError(t, db.Create(&control).Error) + relationship := &workflows.ControlRelationship{ + WorkflowDefinitionID: definition.ID, + ControlID: control.ID, + ControlSource: "test catalog", + CatalogID: catalogID.String(), + RelationshipType: "satisfies", + Strength: "primary", + IsActive: true, + } + require.NoError(t, db.Create(relationship).Error) + require.NoError(t, workflows.NewFilterSyncService(db, zap.NewNop().Sugar()).SyncFilterForDefinition(*definition.ID)) + + var filter relational.Filter + require.NoError(t, db.First(&filter, "name = ?", "Workflow: "+definition.Name).Error) + req := httptest.NewRequest(http.MethodDelete, "/workflows/definitions/"+definition.ID.String(), nil) rec := httptest.NewRecorder() c := e.NewContext(req, rec) @@ -394,6 +413,10 @@ func TestWorkflowDefinitionHandler_Delete(t *testing.T) { var count int64 db.Model(&workflows.WorkflowDefinition{}).Where("id = ?", definition.ID).Count(&count) assert.Equal(t, int64(0), count) + db.Model(&relational.Filter{}).Where("id = ?", filter.ID).Count(&count) + assert.Equal(t, int64(0), count) + db.Table("filter_controls").Where("filter_id = ?", filter.ID).Count(&count) + assert.Equal(t, int64(0), count) }) t.Run("NotFound", func(t *testing.T) { diff --git a/internal/service/relational/workflows/constants.go b/internal/service/relational/workflows/constants.go index 4c6a1492..8f5cfae8 100644 --- a/internal/service/relational/workflows/constants.go +++ b/internal/service/relational/workflows/constants.go @@ -1,6 +1,9 @@ package workflows -import "github.com/robfig/cron/v3" +import ( + "github.com/google/uuid" + "github.com/robfig/cron/v3" +) // Field length constraints const ( @@ -27,6 +30,16 @@ const ( CadenceAnnually CadenceType = "annually" ) +const ( + WorkflowEvidencePolicyLabel = "_policy" + WorkflowEvidencePluginLabel = "_plugin" + WorkflowEvidencePluginValue = "workflow" +) + +func WorkflowPolicyValue(definitionID uuid.UUID) string { + return WorkflowEvidencePluginValue + "." + definitionID.String() +} + // IsValid checks if the cadence type is valid func (c CadenceType) IsValid() bool { switch c { diff --git a/internal/service/relational/workflows/filter_service.go b/internal/service/relational/workflows/filter_service.go new file mode 100644 index 00000000..8905a4bc --- /dev/null +++ b/internal/service/relational/workflows/filter_service.go @@ -0,0 +1,218 @@ +package workflows + +import ( + "crypto/sha256" + "encoding/hex" + "errors" + "fmt" + "sort" + "strings" + + "github.com/compliance-framework/api/internal/converters/labelfilter" + "github.com/compliance-framework/api/internal/service/relational" + "github.com/google/uuid" + "go.uber.org/zap" + "gorm.io/datatypes" + "gorm.io/gorm" +) + +type FilterSyncService struct { + db *gorm.DB + logger *zap.SugaredLogger + controlRelationshipSvc *ControlRelationshipService + workflowDefinitionSvc *WorkflowDefinitionService +} + +func NewFilterSyncService(db *gorm.DB, logger *zap.SugaredLogger) *FilterSyncService { + if logger == nil { + logger = zap.NewNop().Sugar() + } + return &FilterSyncService{ + db: db, + logger: logger, + controlRelationshipSvc: NewControlRelationshipService(db), + workflowDefinitionSvc: NewWorkflowDefinitionService(db), + } +} + +func (s *FilterSyncService) SyncFilterForDefinition(definitionID uuid.UUID) error { + definition, err := s.workflowDefinitionSvc.GetByID(&definitionID) + if err != nil { + return fmt.Errorf("failed to load workflow definition: %w", err) + } + + relationships, err := s.controlRelationshipSvc.GetByWorkflowDefinitionID(&definitionID) + if err != nil { + return fmt.Errorf("failed to load control relationships: %w", err) + } + + type relationshipControlRef struct { + catalogID uuid.UUID + controlID string + normalizedControlID string + } + + controlKey := func(catalogID uuid.UUID, normalizedControlID string) string { + return catalogID.String() + ":" + normalizedControlID + } + + controlIDsByCatalog := make(map[uuid.UUID]map[string]struct{}) + controlRefs := make([]relationshipControlRef, 0, len(relationships)) + seenControls := make(map[string]struct{}, len(relationships)) + for _, relationship := range relationships { + if !relationship.IsActive { + continue + } + + if relationship.CatalogID == "" { + s.logger.Warnw("Skipping workflow control relationship with empty catalog ID", + "workflow_definition_id", definitionID, + "control_id", relationship.ControlID, + ) + continue + } + + catalogID, parseErr := uuid.Parse(relationship.CatalogID) + if parseErr != nil { + s.logger.Warnw("Skipping workflow control relationship with invalid catalog ID", + "workflow_definition_id", definitionID, + "catalog_id", relationship.CatalogID, + "control_id", relationship.ControlID, + "error", parseErr, + ) + continue + } + + normalizedControlID := strings.ToUpper(relationship.ControlID) + key := controlKey(catalogID, normalizedControlID) + if _, ok := seenControls[key]; ok { + continue + } + + seenControls[key] = struct{}{} + if _, ok := controlIDsByCatalog[catalogID]; !ok { + controlIDsByCatalog[catalogID] = make(map[string]struct{}) + } + controlIDsByCatalog[catalogID][normalizedControlID] = struct{}{} + controlRefs = append(controlRefs, relationshipControlRef{ + catalogID: catalogID, + controlID: relationship.ControlID, + normalizedControlID: normalizedControlID, + }) + } + + catalogIDs := make([]uuid.UUID, 0, len(controlIDsByCatalog)) + for catalogID := range controlIDsByCatalog { + catalogIDs = append(catalogIDs, catalogID) + } + sort.Slice(catalogIDs, func(i, j int) bool { + return catalogIDs[i].String() < catalogIDs[j].String() + }) + + controlLookup := make(map[string]relational.Control, len(controlRefs)) + for _, catalogID := range catalogIDs { + controlIDs := make([]string, 0, len(controlIDsByCatalog[catalogID])) + for controlID := range controlIDsByCatalog[catalogID] { + controlIDs = append(controlIDs, controlID) + } + sort.Strings(controlIDs) + + var catalogControls []relational.Control + if err := s.db.Where("catalog_id = ? AND UPPER(id) IN ?", catalogID, controlIDs).Find(&catalogControls).Error; err != nil { + return fmt.Errorf("failed to resolve workflow control relationships for catalog %s: %w", catalogID, err) + } + + for _, control := range catalogControls { + key := controlKey(catalogID, strings.ToUpper(control.ID)) + if _, ok := controlLookup[key]; !ok { + controlLookup[key] = control + } + } + } + + controls := make([]relational.Control, 0, len(controlRefs)) + for _, controlRef := range controlRefs { + control, ok := controlLookup[controlKey(controlRef.catalogID, controlRef.normalizedControlID)] + if !ok { + s.logger.Warnw("Skipping workflow control relationship for unresolved control", + "workflow_definition_id", definitionID, + "catalog_id", controlRef.catalogID, + "control_id", controlRef.controlID, + ) + continue + } + + controls = append(controls, control) + } + + filterID := generateWorkflowFilterUUID(definitionID) + filter := relational.Filter{ + UUIDModel: relational.UUIDModel{ID: &filterID}, + Name: "Workflow: " + definition.Name, + Filter: datatypes.NewJSONType(labelfilter.Filter{ + Scope: &labelfilter.Scope{ + Condition: &labelfilter.Condition{ + Label: WorkflowEvidencePolicyLabel, + Operator: "=", + Value: WorkflowPolicyValue(definitionID), + }, + }, + }), + } + + var existing relational.Filter + err = s.db.First(&existing, "id = ?", filterID).Error + switch { + case err == nil: + filter.ID = existing.ID + if err := s.db.Model(&existing).Updates(map[string]interface{}{ + "name": filter.Name, + "filter": filter.Filter, + }).Error; err != nil { + return fmt.Errorf("failed to update workflow filter: %w", err) + } + filter = existing + case errors.Is(err, gorm.ErrRecordNotFound): + if err := s.db.Create(&filter).Error; err != nil { + return fmt.Errorf("failed to create workflow filter: %w", err) + } + default: + return fmt.Errorf("failed to load workflow filter: %w", err) + } + + if err := s.db.Model(&filter).Association("Controls").Replace(controls); err != nil { + return fmt.Errorf("failed to sync workflow filter controls: %w", err) + } + + return nil +} + +func (s *FilterSyncService) DeleteFilterForDefinition(definitionID uuid.UUID) error { + filterID := generateWorkflowFilterUUID(definitionID) + + var filter relational.Filter + err := s.db.First(&filter, "id = ?", filterID).Error + switch { + case err == nil: + case errors.Is(err, gorm.ErrRecordNotFound): + return nil + default: + return fmt.Errorf("failed to load workflow filter: %w", err) + } + + if err := s.db.Model(&filter).Association("Controls").Clear(); err != nil { + return fmt.Errorf("failed to clear workflow filter controls: %w", err) + } + if err := s.db.Delete(&filter).Error; err != nil { + return fmt.Errorf("failed to delete workflow filter: %w", err) + } + + return nil +} + +func generateWorkflowFilterUUID(definitionID uuid.UUID) uuid.UUID { + seed := fmt.Sprintf("workflow-filter:%s:%s", definitionID.String(), "v1") + hash := sha256.Sum256([]byte(seed)) + hashStr := hex.EncodeToString(hash[:16]) + return uuid.MustParse(hashStr[:8] + "-" + hashStr[8:12] + "-" + hashStr[12:16] + "-" + hashStr[16:20] + "-" + hashStr[20:32]) +} diff --git a/internal/service/relational/workflows/filter_service_test.go b/internal/service/relational/workflows/filter_service_test.go new file mode 100644 index 00000000..087a30ea --- /dev/null +++ b/internal/service/relational/workflows/filter_service_test.go @@ -0,0 +1,195 @@ +package workflows + +import ( + "testing" + + "github.com/compliance-framework/api/internal/service/relational" + "github.com/google/uuid" + "github.com/stretchr/testify/require" + "go.uber.org/zap" + "gorm.io/driver/sqlite" + "gorm.io/gorm" + "gorm.io/gorm/logger" +) + +func setupFilterSyncTestDB(t *testing.T) *gorm.DB { + db, err := gorm.Open(sqlite.Open(":memory:"), &gorm.Config{ + Logger: logger.Default.LogMode(logger.Silent), + }) + require.NoError(t, err) + + for _, entity := range GetWorkflowEntities() { + require.NoError(t, db.AutoMigrate(entity)) + } + require.NoError(t, db.AutoMigrate( + &relational.Control{}, + &relational.Filter{}, + )) + + return db +} + +func TestFilterSyncService_SyncFilterForDefinition(t *testing.T) { + db := setupFilterSyncTestDB(t) + service := NewFilterSyncService(db, zap.NewNop().Sugar()) + + definition := createTestWorkflowDefinition() + require.NoError(t, db.Create(definition).Error) + + catalogID1 := uuid.New() + catalogID2 := uuid.New() + control1 := relational.Control{CatalogID: catalogID1, ID: "ctrl-1", Title: "Control 1"} + control2 := relational.Control{CatalogID: catalogID2, ID: "ctrl-1", Title: "Control 1 in another catalog"} + control3 := relational.Control{CatalogID: catalogID1, ID: "ctrl-2", Title: "Control 2"} + require.NoError(t, db.Create(&control1).Error) + require.NoError(t, db.Create(&control2).Error) + require.NoError(t, db.Create(&control3).Error) + + relationship1 := &ControlRelationship{ + WorkflowDefinitionID: definition.ID, + ControlID: control1.ID, + ControlSource: "test catalog", + CatalogID: catalogID1.String(), + RelationshipType: "satisfies", + Strength: "primary", + IsActive: true, + } + inactiveRelationship := &ControlRelationship{ + WorkflowDefinitionID: definition.ID, + ControlID: control2.ID, + ControlSource: "other catalog", + CatalogID: catalogID2.String(), + RelationshipType: "satisfies", + Strength: "primary", + IsActive: false, + } + require.NoError(t, db.Create(relationship1).Error) + require.NoError(t, db.Create(inactiveRelationship).Error) + require.NoError(t, db.Model(inactiveRelationship).Update("is_active", false).Error) + + require.NoError(t, service.SyncFilterForDefinition(*definition.ID)) + filterID := generateWorkflowFilterUUID(*definition.ID) + + var filter relational.Filter + require.NoError(t, db.Preload("Controls").First(&filter, "id = ?", filterID).Error) + require.Equal(t, "Workflow: "+definition.Name, filter.Name) + require.Len(t, filter.Controls, 1) + require.Equal(t, catalogID1, filter.Controls[0].CatalogID) + require.Equal(t, "ctrl-1", filter.Controls[0].ID) + require.Equal(t, WorkflowEvidencePolicyLabel, filter.Filter.Data().Scope.Label) + require.Equal(t, WorkflowPolicyValue(*definition.ID), filter.Filter.Data().Scope.Value) + + require.NoError(t, service.SyncFilterForDefinition(*definition.ID)) + var filterCount int64 + require.NoError(t, db.Model(&relational.Filter{}).Where("id = ?", filterID).Count(&filterCount).Error) + require.Equal(t, int64(1), filterCount) + + relationship2 := &ControlRelationship{ + WorkflowDefinitionID: definition.ID, + ControlID: control3.ID, + ControlSource: "test catalog", + CatalogID: catalogID1.String(), + RelationshipType: "satisfies", + Strength: "primary", + IsActive: true, + } + require.NoError(t, db.Create(relationship2).Error) + require.NoError(t, service.SyncFilterForDefinition(*definition.ID)) + + require.NoError(t, db.Preload("Controls").First(&filter, "id = ?", filterID).Error) + require.Len(t, filter.Controls, 2) + + require.NoError(t, db.Delete(relationship2).Error) + require.NoError(t, service.SyncFilterForDefinition(*definition.ID)) + require.NoError(t, db.Preload("Controls").First(&filter, "id = ?", filterID).Error) + require.Len(t, filter.Controls, 1) + require.Equal(t, "ctrl-1", filter.Controls[0].ID) + + require.NoError(t, db.Model(relationship1).Update("is_active", false).Error) + require.NoError(t, service.SyncFilterForDefinition(*definition.ID)) + require.NoError(t, db.Preload("Controls").First(&filter, "id = ?", filterID).Error) + require.Empty(t, filter.Controls) +} + +func TestFilterSyncService_SyncFilterForDefinitionMatchesControlIDsCaseInsensitively(t *testing.T) { + db := setupFilterSyncTestDB(t) + service := NewFilterSyncService(db, zap.NewNop().Sugar()) + + definition := createTestWorkflowDefinition() + require.NoError(t, db.Create(definition).Error) + + catalogID := uuid.New() + control := relational.Control{CatalogID: catalogID, ID: "AC-1", Title: "Access Control 1"} + require.NoError(t, db.Create(&control).Error) + + relationships := []*ControlRelationship{ + { + WorkflowDefinitionID: definition.ID, + ControlID: "ac-1", + ControlSource: "test catalog", + CatalogID: catalogID.String(), + RelationshipType: "satisfies", + Strength: "primary", + IsActive: true, + }, + { + WorkflowDefinitionID: definition.ID, + ControlID: "Ac-1", + ControlSource: "test catalog", + CatalogID: catalogID.String(), + RelationshipType: "satisfies", + Strength: "primary", + IsActive: true, + }, + } + for _, relationship := range relationships { + require.NoError(t, db.Create(relationship).Error) + } + + require.NoError(t, service.SyncFilterForDefinition(*definition.ID)) + + var filter relational.Filter + require.NoError(t, db.Preload("Controls").First(&filter, "id = ?", generateWorkflowFilterUUID(*definition.ID)).Error) + require.Len(t, filter.Controls, 1) + require.Equal(t, catalogID, filter.Controls[0].CatalogID) + require.Equal(t, "AC-1", filter.Controls[0].ID) +} + +func TestFilterSyncService_DeleteFilterForDefinition(t *testing.T) { + db := setupFilterSyncTestDB(t) + service := NewFilterSyncService(db, zap.NewNop().Sugar()) + + definition := createTestWorkflowDefinition() + require.NoError(t, db.Create(definition).Error) + + catalogID := uuid.New() + control := relational.Control{CatalogID: catalogID, ID: "ctrl-1", Title: "Control 1"} + require.NoError(t, db.Create(&control).Error) + + relationship := &ControlRelationship{ + WorkflowDefinitionID: definition.ID, + ControlID: control.ID, + ControlSource: "test catalog", + CatalogID: catalogID.String(), + RelationshipType: "satisfies", + Strength: "primary", + IsActive: true, + } + require.NoError(t, db.Create(relationship).Error) + require.NoError(t, service.SyncFilterForDefinition(*definition.ID)) + + filterID := generateWorkflowFilterUUID(*definition.ID) + var joinCount int64 + require.NoError(t, db.Table("filter_controls").Where("filter_id = ?", filterID).Count(&joinCount).Error) + require.Equal(t, int64(1), joinCount) + + require.NoError(t, service.DeleteFilterForDefinition(*definition.ID)) + + var filterCount int64 + require.NoError(t, db.Model(&relational.Filter{}).Where("id = ?", filterID).Count(&filterCount).Error) + require.Equal(t, int64(0), filterCount) + require.NoError(t, db.Table("filter_controls").Where("filter_id = ?", filterID).Count(&joinCount).Error) + require.Equal(t, int64(0), joinCount) + + require.NoError(t, service.DeleteFilterForDefinition(*definition.ID)) +} diff --git a/internal/service/worker/service.go b/internal/service/worker/service.go index 034e56ba..7751fbc7 100644 --- a/internal/service/worker/service.go +++ b/internal/service/worker/service.go @@ -167,8 +167,16 @@ func NewServiceWithDigest( enqueuerProxy, ) + // Determine grace period days for workflow expiry and scheduling, with safe defaults. + gracePeriodDays := config.DefaultWorkflowConfig().GracePeriodDays + overdueCheckEnabled := config.DefaultWorkflowConfig().OverdueCheckEnabled + if digestCfg != nil && digestCfg.Workflow != nil { + gracePeriodDays = digestCfg.Workflow.GracePeriodDays + overdueCheckEnabled = digestCfg.Workflow.OverdueCheckEnabled + } + // Initialize evidence integration and set it on the executor - evidenceIntegration := workflow.NewEvidenceIntegration(db, logger) + evidenceIntegration := workflow.NewEvidenceIntegration(db, logger, gracePeriodDays) executor.SetEvidenceIntegration(evidenceIntegration) // Set evidence integration on step execution service @@ -191,14 +199,6 @@ func NewServiceWithDigest( ) workflowManager.SetEvidenceCreator(evidenceIntegration) - // Determine grace period days for the workflow scheduler, with safe defaults. - gracePeriodDays := config.DefaultWorkflowConfig().GracePeriodDays - overdueCheckEnabled := config.DefaultWorkflowConfig().OverdueCheckEnabled - if digestCfg != nil && digestCfg.Workflow != nil { - gracePeriodDays = digestCfg.Workflow.GracePeriodDays - overdueCheckEnabled = digestCfg.Workflow.OverdueCheckEnabled - } - overdueService := workflow.NewOverdueService( db, workflowExecService, diff --git a/internal/workflow/evidence.go b/internal/workflow/evidence.go index e24b05f9..ceb01f36 100644 --- a/internal/workflow/evidence.go +++ b/internal/workflow/evidence.go @@ -9,10 +9,12 @@ import ( "strings" "time" + "github.com/compliance-framework/api/internal/config" "github.com/compliance-framework/api/internal/service/relational" "github.com/compliance-framework/api/internal/service/relational/workflows" oscalTypes_1_1_3 "github.com/defenseunicorns/go-oscal/src/types/oscal-1-1-3" "github.com/google/uuid" + "github.com/robfig/cron/v3" "go.uber.org/zap" "gorm.io/datatypes" "gorm.io/gorm" @@ -24,28 +26,40 @@ import ( // - 1 evidence stream per workflow instance (accumulates execution completion evidence) // - Streams use label-seeded UUIDs for deterministic identification type EvidenceIntegration struct { - db *gorm.DB - logger *zap.SugaredLogger - workflowExecutionSvc *workflows.WorkflowExecutionService - stepExecutionSvc *workflows.StepExecutionService - workflowInstanceSvc *workflows.WorkflowInstanceService - workflowDefinitionSvc *workflows.WorkflowDefinitionService - stepDefinitionSvc *workflows.WorkflowStepDefinitionService + db *gorm.DB + logger *zap.SugaredLogger + workflowExecutionSvc *workflows.WorkflowExecutionService + stepExecutionSvc *workflows.StepExecutionService + workflowInstanceSvc *workflows.WorkflowInstanceService + workflowDefinitionSvc *workflows.WorkflowDefinitionService + stepDefinitionSvc *workflows.WorkflowStepDefinitionService + defaultGracePeriodDays int } // NewEvidenceIntegration creates a new evidence integration service func NewEvidenceIntegration( db *gorm.DB, logger *zap.SugaredLogger, + defaultGracePeriodDays ...int, ) *EvidenceIntegration { + if logger == nil { + logger = zap.NewNop().Sugar() + } + + gracePeriodDays := config.DefaultWorkflowConfig().GracePeriodDays + if len(defaultGracePeriodDays) > 0 { + gracePeriodDays = defaultGracePeriodDays[0] + } + return &EvidenceIntegration{ - db: db, - logger: logger, - workflowExecutionSvc: workflows.NewWorkflowExecutionService(db), - stepExecutionSvc: workflows.NewStepExecutionService(db, nil), - workflowInstanceSvc: workflows.NewWorkflowInstanceService(db), - workflowDefinitionSvc: workflows.NewWorkflowDefinitionService(db), - stepDefinitionSvc: workflows.NewWorkflowStepDefinitionService(db), + db: db, + logger: logger, + workflowExecutionSvc: workflows.NewWorkflowExecutionService(db), + stepExecutionSvc: workflows.NewStepExecutionService(db, nil), + workflowInstanceSvc: workflows.NewWorkflowInstanceService(db), + workflowDefinitionSvc: workflows.NewWorkflowDefinitionService(db), + stepDefinitionSvc: workflows.NewWorkflowStepDefinitionService(db), + defaultGracePeriodDays: gracePeriodDays, } } @@ -183,7 +197,7 @@ func (e *EvidenceIntegration) GetOrCreateInstanceStream(ctx context.Context, wor return stream, nil } -// AddWorkflowExecutionEvidence adds a workflow execution evidence record to the instance stream +// AddWorkflowExecutionEvidence adds a workflow execution evidence record. func (e *EvidenceIntegration) AddWorkflowExecutionEvidence(ctx context.Context, workflowExecutionID *uuid.UUID, status string) error { // Get workflow execution execution, err := e.workflowExecutionSvc.GetByID(workflowExecutionID) @@ -191,18 +205,19 @@ func (e *EvidenceIntegration) AddWorkflowExecutionEvidence(ctx context.Context, return fmt.Errorf("failed to get workflow execution: %w", err) } + if status != "started" && status != "completed" { + return fmt.Errorf("unsupported workflow execution evidence status %q; expected started or completed", status) + } + // Started evidence may be emitted right before or right after the transition. if status == "started" && execution.Status != "pending" && execution.Status != "in_progress" { - return fmt.Errorf("workflow execution is not in pending status, status: %s", execution.Status) + return fmt.Errorf("workflow execution is not in pending or in_progress status, status: %s", execution.Status) } if status == "completed" && execution.Status != "in_progress" && execution.Status != "completed" { - return fmt.Errorf("workflow execution is not in status, status: %s", execution.Status) + return fmt.Errorf("workflow execution is not in in_progress or completed status, status: %s", execution.Status) } - - // Get or create instance stream (NOT execution stream) - stream, err := e.GetOrCreateInstanceStream(ctx, execution.WorkflowInstanceID) - if err != nil { - return fmt.Errorf("failed to get instance stream: %w", err) + if execution.StartedAt == nil { + return fmt.Errorf("workflow execution %s evidence requires started_at", status) } // Get workflow definition through the instance @@ -217,27 +232,43 @@ func (e *EvidenceIntegration) AddWorkflowExecutionEvidence(ctx context.Context, } var title string var description string + var stream *relational.Evidence + endTimestamp := execution.StartedAt switch status { case "started": + stream, err = e.GetOrCreateExecutionStream(ctx, execution.ID) + if err != nil { + return fmt.Errorf("failed to get execution stream: %w", err) + } title = fmt.Sprintf("Workflow Execution Started: %s", definition.Name) description = fmt.Sprintf("Workflow execution '%s' started at %s", execution.ID.String(), execution.StartedAt.Format(time.RFC3339), ) case "completed": + if execution.CompletedAt == nil { + return fmt.Errorf("workflow execution completed evidence requires completed_at") + } + stream, err = e.GetOrCreateInstanceStream(ctx, execution.WorkflowInstanceID) + if err != nil { + return fmt.Errorf("failed to get instance stream: %w", err) + } title = fmt.Sprintf("Workflow Execution Completed: %s", definition.Name) description = fmt.Sprintf("Workflow execution '%s' completed at %s", execution.ID.String(), - execution.StartedAt.Format(time.RFC3339), + execution.CompletedAt.Format(time.RFC3339), ) + endTimestamp = execution.CompletedAt + default: + return fmt.Errorf("unsupported workflow execution evidence status %q; expected started or completed", status) } // Create evidence record evidence := &relational.Evidence{ - UUID: stream.UUID, // Same stream UUID as the instance stream + UUID: stream.UUID, Title: title, Description: description, Start: *execution.StartedAt, - End: *execution.StartedAt, + End: *endTimestamp, Labels: []relational.Labels{ {Name: "workflow.execution.id", Value: execution.ID.String()}, {Name: "workflow.definition.id", Value: definition.ID.String()}, @@ -249,10 +280,12 @@ func (e *EvidenceIntegration) AddWorkflowExecutionEvidence(ctx context.Context, if status == "completed" { evidence.Status = datatypes.NewJSONType(oscalTypes_1_1_3.ObjectiveStatus{ - State: "satisfied", + State: relational.EvidenceStatusSatisfied, }) + evidence.Labels = append(evidence.Labels, e.buildWorkflowCoverageLabels(*definition.ID)...) + evidence.Expires = e.calculateCompletionEvidenceExpires(execution.CompletedAt, instance, definition) } - if err := e.db.Create(&evidence).Error; err != nil { + if err := e.db.Create(evidence).Error; err != nil { return fmt.Errorf("failed to create workflow execution evidence: %w", err) } @@ -345,6 +378,12 @@ func (e *EvidenceIntegration) AddExecutionCompletionEvidence(ctx context.Context if execution.Status != "completed" { return fmt.Errorf("workflow execution is not completed, status: %s", execution.Status) } + if execution.StartedAt == nil { + return fmt.Errorf("workflow execution completion evidence requires started_at") + } + if execution.CompletedAt == nil { + return fmt.Errorf("workflow execution completion evidence requires completed_at") + } // Get or create instance stream stream, err := e.GetOrCreateInstanceStream(ctx, execution.WorkflowInstanceID) @@ -352,6 +391,16 @@ func (e *EvidenceIntegration) AddExecutionCompletionEvidence(ctx context.Context return fmt.Errorf("failed to get instance stream: %w", err) } + instance, err := e.workflowInstanceSvc.GetByID(execution.WorkflowInstanceID) + if err != nil { + return fmt.Errorf("failed to get workflow instance: %w", err) + } + + definition, err := e.workflowDefinitionSvc.GetByID(instance.WorkflowDefinitionID) + if err != nil { + return fmt.Errorf("failed to get workflow definition: %w", err) + } + // Get step executions for metrics stepExecutions, err := e.stepExecutionSvc.GetByWorkflowExecutionID(workflowExecutionID) if err != nil { @@ -377,6 +426,8 @@ func (e *EvidenceIntegration) AddExecutionCompletionEvidence(ctx context.Context Description: description, Start: *execution.StartedAt, End: *execution.CompletedAt, + Status: datatypes.NewJSONType[oscalTypes_1_1_3.ObjectiveStatus](oscalTypes_1_1_3.ObjectiveStatus{State: relational.EvidenceStatusSatisfied}), + Expires: e.calculateCompletionEvidenceExpires(execution.CompletedAt, instance, definition), } // Generate unique ID for this evidence record @@ -395,6 +446,7 @@ func (e *EvidenceIntegration) AddExecutionCompletionEvidence(ctx context.Context {Name: "workflow.step_count", Value: fmt.Sprintf("%d", len(stepExecutions))}, {Name: "evidence.type", Value: "execution_completion"}, } + labels = append(labels, e.buildWorkflowCoverageLabels(*definition.ID)...) if err := e.db.Model(evidence).Association("Labels").Append(labels); err != nil { return fmt.Errorf("failed to add labels: %w", err) @@ -499,9 +551,19 @@ func (e *EvidenceIntegration) addFailureEvidenceToStream( ) error { var stream *relational.Evidence var err error + var coverageLabels []relational.Labels if executionStream { stream, err = e.GetOrCreateExecutionStream(ctx, execution.ID) } else { + instance, instanceErr := e.workflowInstanceSvc.GetByID(execution.WorkflowInstanceID) + if instanceErr != nil { + return fmt.Errorf("failed to get workflow instance: %w", instanceErr) + } + if instance.WorkflowDefinitionID == nil { + return fmt.Errorf("workflow instance %s has no workflow definition id", execution.WorkflowInstanceID) + } + coverageLabels = e.buildWorkflowCoverageLabels(*instance.WorkflowDefinitionID) + stream, err = e.GetOrCreateInstanceStream(ctx, execution.WorkflowInstanceID) } if err != nil { @@ -514,7 +576,7 @@ func (e *EvidenceIntegration) addFailureEvidenceToStream( Description: description, Start: nowOrValue(execution.StartedAt), End: nowOrValue(execution.FailedAt), - Status: datatypes.NewJSONType[oscalTypes_1_1_3.ObjectiveStatus](oscalTypes_1_1_3.ObjectiveStatus{State: "not-satisfied"}), + Status: datatypes.NewJSONType[oscalTypes_1_1_3.ObjectiveStatus](oscalTypes_1_1_3.ObjectiveStatus{State: relational.EvidenceStatusNotSatisfied}), } id := uuid.New() evidence.ID = &id @@ -531,6 +593,9 @@ func (e *EvidenceIntegration) addFailureEvidenceToStream( {Name: "workflow.completed_steps", Value: fmt.Sprintf("%d", completedCount)}, {Name: "workflow.unresolved_assignees", Value: strings.Join(unresolvedAssignees, ",")}, } + if !executionStream { + labels = append(labels, coverageLabels...) + } return e.db.Model(evidence).Association("Labels").Append(labels) } @@ -549,6 +614,64 @@ func nowOrValue(ts *time.Time) time.Time { return *ts } +func (e *EvidenceIntegration) buildWorkflowCoverageLabels(definitionID uuid.UUID) []relational.Labels { + return []relational.Labels{ + {Name: workflows.WorkflowEvidencePolicyLabel, Value: workflows.WorkflowPolicyValue(definitionID)}, + {Name: workflows.WorkflowEvidencePluginLabel, Value: workflows.WorkflowEvidencePluginValue}, + } +} + +func (e *EvidenceIntegration) calculateCompletionEvidenceExpires(completedAt *time.Time, instance *workflows.WorkflowInstance, definition *workflows.WorkflowDefinition) *time.Time { + if completedAt == nil { + return nil + } + effectiveInstance := instance + if definition != nil && instance != nil { + instanceCopy := *instance + instanceCopy.WorkflowDefinition = definition + effectiveInstance = &instanceCopy + } + + cadence := "" + if effectiveInstance != nil { + cadence = effectiveInstance.Cadence + } + if cadence == "" && definition != nil { + cadence = definition.SuggestedCadence + } + + graceDays := ResolveGraceDays(effectiveInstance, e.defaultGracePeriodDays) + expires := nextCadenceExpiryBase(*completedAt, cadence).AddDate(0, 0, graceDays) + return &expires +} + +func nextCadenceExpiryBase(completedAt time.Time, cadence string) time.Time { + cadenceType := workflows.CadenceType(cadence) + if cadenceType.IsCron() { + parser := cron.NewParser(cron.Second | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow) + schedule, err := parser.Parse(cadenceType.CronExpression()) + if err != nil { + return completedAt.AddDate(0, 1, 0) + } + return schedule.Next(completedAt) + } + + switch cadenceType { + case workflows.CadenceDaily: + return completedAt.AddDate(0, 0, 1) + case workflows.CadenceWeekly: + return completedAt.AddDate(0, 0, 7) + case workflows.CadenceQuarterly: + return completedAt.AddDate(0, 3, 0) + case workflows.CadenceAnnually: + return completedAt.AddDate(1, 0, 0) + case workflows.CadenceMonthly: + return completedAt.AddDate(0, 1, 0) + default: + return completedAt.AddDate(0, 1, 0) + } +} + // generateExecutionStreamUUID generates a deterministic UUID for an execution stream based on labels func (e *EvidenceIntegration) generateExecutionStreamUUID( definition *workflows.WorkflowDefinition, diff --git a/internal/workflow/evidence_test.go b/internal/workflow/evidence_test.go index 368eb2b9..dc307e6e 100644 --- a/internal/workflow/evidence_test.go +++ b/internal/workflow/evidence_test.go @@ -357,6 +357,10 @@ func TestAddExecutionCompletionEvidence(t *testing.T) { assert.Equal(t, "execution_completion", labelMap["evidence.type"]) assert.Equal(t, execution.ID.String(), labelMap["workflow.execution.id"]) assert.Equal(t, "completed", labelMap["workflow.execution.status"]) + assert.Equal(t, workflows.WorkflowPolicyValue(*definition.ID), labelMap[workflows.WorkflowEvidencePolicyLabel]) + assert.Equal(t, workflows.WorkflowEvidencePluginValue, labelMap[workflows.WorkflowEvidencePluginLabel]) + assert.Equal(t, relational.EvidenceStatusSatisfied, execEvidence.Status.Data().State) + assert.NotNil(t, execEvidence.Expires) // Completion evidence should not have failure reason _, exists := labelMap["workflow.failure_reason"] assert.False(t, exists, "completion evidence should not have failure reason") @@ -378,6 +382,36 @@ func TestAddExecutionCompletionEvidence(t *testing.T) { assert.Contains(t, err.Error(), "not completed") }) + t.Run("RejectCompletedExecutionWithoutStartedAt", func(t *testing.T) { + completedAt := time.Now() + executionWithoutStartedAt := &workflows.WorkflowExecution{ + WorkflowInstanceID: instance.ID, + Status: "completed", + TriggeredBy: "manual", + CompletedAt: &completedAt, + } + require.NoError(t, db.Create(executionWithoutStartedAt).Error) + + err := integration.AddExecutionCompletionEvidence(ctx, executionWithoutStartedAt.ID) + require.Error(t, err) + assert.Contains(t, err.Error(), "requires started_at") + }) + + t.Run("RejectCompletedExecutionWithoutCompletedAt", func(t *testing.T) { + startedAt := time.Now() + executionWithoutCompletedAt := &workflows.WorkflowExecution{ + WorkflowInstanceID: instance.ID, + Status: "completed", + TriggeredBy: "manual", + StartedAt: &startedAt, + } + require.NoError(t, db.Create(executionWithoutCompletedAt).Error) + + err := integration.AddExecutionCompletionEvidence(ctx, executionWithoutCompletedAt.ID) + require.Error(t, err) + assert.Contains(t, err.Error(), "requires completed_at") + }) + t.Run("MultipleExecutionsInSameStream", func(t *testing.T) { // Create another execution for the same instance startTime2 := time.Now() @@ -405,6 +439,62 @@ func TestAddExecutionCompletionEvidence(t *testing.T) { }) } +func TestAddExecutionFailureEvidenceLabelsInstanceStream(t *testing.T) { + db := setupEvidenceTestDB(t) + defer func() { + sqlDB, _ := db.DB() + err := sqlDB.Close() + require.NoError(t, err) + }() + + logger := zap.NewNop().Sugar() + integration := NewEvidenceIntegration(db, logger) + ctx := context.Background() + + definition, instance, execution, _ := createTestWorkflowContext(t, db) + + failedAt := time.Now() + execution.Status = workflows.WorkflowStatusFailed.String() + execution.FailedAt = &failedAt + require.NoError(t, db.Save(execution).Error) + + stepDef := &workflows.WorkflowStepDefinition{ + WorkflowDefinitionID: definition.ID, + Name: "Failed Step", + ResponsibleRole: "engineer", + } + require.NoError(t, db.Create(stepDef).Error) + + stepExecution := &workflows.StepExecution{ + WorkflowExecutionID: execution.ID, + WorkflowStepDefinitionID: stepDef.ID, + Status: workflows.StepStatusFailed.String(), + StartedAt: execution.StartedAt, + FailedAt: &failedAt, + } + require.NoError(t, db.Create(stepExecution).Error) + + require.NoError(t, integration.AddExecutionFailureEvidence(ctx, execution.ID)) + + stream, err := integration.GetOrCreateInstanceStream(ctx, instance.ID) + require.NoError(t, err) + + var evidence relational.Evidence + require.NoError(t, db.Where("uuid = ? AND title = ?", stream.UUID, "Workflow Execution Failed").First(&evidence).Error) + assert.Equal(t, relational.EvidenceStatusNotSatisfied, evidence.Status.Data().State) + + var labels []relational.Labels + require.NoError(t, db.Model(&evidence).Association("Labels").Find(&labels)) + labelMap := make(map[string]string) + for _, label := range labels { + labelMap[label.Name] = label.Value + } + + assert.Equal(t, workflows.WorkflowPolicyValue(*definition.ID), labelMap[workflows.WorkflowEvidencePolicyLabel]) + assert.Equal(t, workflows.WorkflowEvidencePluginValue, labelMap[workflows.WorkflowEvidencePluginLabel]) + assert.Equal(t, "execution_failure", labelMap["evidence.type"]) +} + func TestGenerateStreamUUIDs(t *testing.T) { db := setupEvidenceTestDB(t) defer func() { @@ -535,6 +625,7 @@ func TestAddWorkflowExecutionStartedEvidence(t *testing.T) { assert.Equal(t, fmt.Sprintf("Workflow Execution Started: %s", definition.Name), evidence.Title) assert.Contains(t, evidence.Description, execution.ID.String()) assert.Contains(t, evidence.Description, "started at") + assert.Equal(t, evidenceIntegration.generateExecutionStreamUUID(definition, instance, execution), evidence.UUID) // Verify labels var labels []relational.Labels @@ -552,6 +643,8 @@ func TestAddWorkflowExecutionStartedEvidence(t *testing.T) { assert.Equal(t, definition.Name, labelMap["workflow.definition.name"]) assert.Equal(t, instance.ID.String(), labelMap["workflow.instance.id"]) assert.Equal(t, "workflow_execution_started", labelMap["evidence.type"]) + assert.NotContains(t, labelMap, workflows.WorkflowEvidencePolicyLabel) + assert.NotContains(t, labelMap, workflows.WorkflowEvidencePluginLabel) }) t.Run("RejectInvalidExecutionStatusForStarted", func(t *testing.T) { @@ -565,7 +658,133 @@ func TestAddWorkflowExecutionStartedEvidence(t *testing.T) { // Try to add started evidence for a failed execution (should fail) err = evidenceIntegration.AddWorkflowExecutionEvidence(context.Background(), execution.ID, "started") require.Error(t, err) - assert.Contains(t, err.Error(), "not in pending status") + assert.Contains(t, err.Error(), "not in pending or in_progress status") + }) + + t.Run("RejectUnsupportedStatus", func(t *testing.T) { + // Create workflow context + _, _, execution, _ := createTestWorkflowContext(t, db) + + execution.StartedAt = nil + require.NoError(t, db.Save(execution).Error) + + err := evidenceIntegration.AddWorkflowExecutionEvidence(context.Background(), execution.ID, "paused") + require.Error(t, err) + assert.Contains(t, err.Error(), "unsupported workflow execution evidence status") + }) + + t.Run("RejectStartedEvidenceWithoutStartedAt", func(t *testing.T) { + // Create workflow context + _, _, execution, _ := createTestWorkflowContext(t, db) + + execution.StartedAt = nil + require.NoError(t, db.Save(execution).Error) + + err := evidenceIntegration.AddWorkflowExecutionEvidence(context.Background(), execution.ID, "started") + require.Error(t, err) + assert.Contains(t, err.Error(), "requires started_at") + }) + + t.Run("RejectCompletedEvidenceWithoutCompletedAt", func(t *testing.T) { + // Create workflow context + _, _, execution, _ := createTestWorkflowContext(t, db) + + execution.Status = "completed" + execution.CompletedAt = nil + require.NoError(t, db.Save(execution).Error) + + err := evidenceIntegration.AddWorkflowExecutionEvidence(context.Background(), execution.ID, "completed") + require.Error(t, err) + assert.Contains(t, err.Error(), "requires completed_at") + }) + + t.Run("RejectCompletedEvidenceWithoutStartedAt", func(t *testing.T) { + // Create workflow context + _, _, execution, _ := createTestWorkflowContext(t, db) + + completedAt := time.Now().UTC() + execution.Status = "completed" + execution.StartedAt = nil + execution.CompletedAt = &completedAt + require.NoError(t, db.Save(execution).Error) + + err := evidenceIntegration.AddWorkflowExecutionEvidence(context.Background(), execution.ID, "completed") + require.Error(t, err) + assert.Contains(t, err.Error(), "requires started_at") + }) + + t.Run("CompletedEvidenceUsesCompletedAt", func(t *testing.T) { + // Create workflow context + _, _, execution, _ := createTestWorkflowContext(t, db) + + completedAt := time.Now().Add(10 * time.Minute).UTC() + execution.Status = "completed" + execution.CompletedAt = &completedAt + require.NoError(t, db.Save(execution).Error) + + err := evidenceIntegration.AddWorkflowExecutionEvidence(context.Background(), execution.ID, "completed") + require.NoError(t, err) + + var evidence relational.Evidence + err = db.Where("title LIKE ?", "Workflow Execution Completed: %").Order("id desc").First(&evidence).Error + require.NoError(t, err) + assert.Contains(t, evidence.Description, completedAt.Format(time.RFC3339)) + assert.True(t, evidence.Start.Equal(*execution.StartedAt)) + assert.True(t, evidence.End.Equal(completedAt)) + }) +} + +func TestCalculateCompletionEvidenceExpires(t *testing.T) { + evidenceIntegration := NewEvidenceIntegration(nil, zap.NewNop().Sugar()) + completedAt := time.Date(2026, 1, 15, 10, 30, 0, 0, time.UTC) + zeroGrace := 0 + + tests := []struct { + name string + cadence string + expected time.Time + }{ + { + name: "hourly cron", + cadence: "cron:0 0 * * * *", + expected: time.Date(2026, 1, 15, 11, 0, 0, 0, time.UTC), + }, + { + name: "daily cron", + cadence: "cron:0 0 9 * * *", + expected: time.Date(2026, 1, 16, 9, 0, 0, 0, time.UTC), + }, + { + name: "monthly named cadence", + cadence: string(workflows.CadenceMonthly), + expected: time.Date(2026, 2, 15, 10, 30, 0, 0, time.UTC), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + instance := &workflows.WorkflowInstance{ + Cadence: tt.cadence, + GracePeriodDays: &zeroGrace, + } + + expires := evidenceIntegration.calculateCompletionEvidenceExpires(&completedAt, instance, nil) + + require.NotNil(t, expires) + assert.Equal(t, tt.expected, *expires) + }) + } + + t.Run("uses configured default grace period", func(t *testing.T) { + evidenceIntegration := NewEvidenceIntegration(nil, zap.NewNop().Sugar(), 3) + instance := &workflows.WorkflowInstance{ + Cadence: string(workflows.CadenceDaily), + } + + expires := evidenceIntegration.calculateCompletionEvidenceExpires(&completedAt, instance, nil) + + require.NotNil(t, expires) + assert.Equal(t, time.Date(2026, 1, 19, 10, 30, 0, 0, time.UTC), *expires) }) }