From 81d2ab6164ea9b366f87ea511bbf0b8d0c509ce2 Mon Sep 17 00:00:00 2001 From: xiami762 Date: Thu, 4 Jun 2026 15:00:40 +0800 Subject: [PATCH 1/3] feat(workflow): unify trigger integrations and simplify setup Make workflow.json the source of truth for trigger definitions and consolidate the workflow integration UI around API publishing plus four trigger entrypoints. This removes legacy trigger resurrection and streamlines the trigger configuration experience. Co-authored-by: Cursor --- docs/workflow-triggers.md | 143 ++ flocks/ingest/kafka/manager.py | 28 +- flocks/ingest/syslog/manager.py | 26 +- flocks/server/app.py | 79 +- flocks/server/routes/workflow.py | 550 ++++- flocks/workflow/models.py | 4 + flocks/workflow/poller_manager.py | 65 +- flocks/workflow/triggers/__init__.py | 39 + flocks/workflow/triggers/compat.py | 140 ++ flocks/workflow/triggers/custom_loader.py | 78 + flocks/workflow/triggers/dispatcher.py | 268 +++ flocks/workflow/triggers/models.py | 210 ++ flocks/workflow/triggers/runtime.py | 432 ++++ .../routes/test_workflow_trigger_routes.py | 345 ++++ tests/workflow/test_trigger_dispatcher.py | 99 + tests/workflow/test_trigger_schedule_cron.py | 35 + webui/src/api/workflow.ts | 140 +- webui/src/locales/en-US/workflow.json | 2 +- webui/src/pages/WorkflowCreate/index.tsx | 2 +- webui/src/pages/WorkflowDetail/FlowCanvas.tsx | 148 +- webui/src/pages/WorkflowDetail/index.tsx | 2 +- .../tabs/IntegrationTab.test.tsx | 386 ++-- .../WorkflowDetail/tabs/IntegrationTab.tsx | 1799 ++++++++--------- webui/src/pages/WorkflowEditor/index.tsx | 1 + 24 files changed, 3798 insertions(+), 1223 deletions(-) create mode 100644 docs/workflow-triggers.md create mode 100644 flocks/workflow/triggers/__init__.py create mode 100644 flocks/workflow/triggers/compat.py create mode 100644 flocks/workflow/triggers/custom_loader.py create mode 100644 flocks/workflow/triggers/dispatcher.py create mode 100644 flocks/workflow/triggers/models.py create mode 100644 flocks/workflow/triggers/runtime.py create mode 100644 tests/server/routes/test_workflow_trigger_routes.py create mode 100644 tests/workflow/test_trigger_dispatcher.py create mode 100644 tests/workflow/test_trigger_schedule_cron.py diff --git a/docs/workflow-triggers.md b/docs/workflow-triggers.md new file mode 100644 index 000000000..70b98a43c --- /dev/null +++ b/docs/workflow-triggers.md @@ -0,0 +1,143 @@ +# Workflow Triggers + +## Overview + +Flocks workflows now support a unified `triggers` definition in `workflow.json`. +This brings webhook, schedule, syslog, kafka, and custom trigger sources under a +single runtime model while keeping the legacy per-feature endpoints compatible. + +At a high level the runtime now has four layers: + +1. `TriggerDefinition`: persisted workflow trigger schema. +2. `TriggerEvent`: normalized event envelope for all trigger sources. +3. `EventDispatcher`: filter + mapping + `_flocks.trigger` envelope injection. +4. `TriggerRuntime`: lifecycle management for legacy adapters and custom adapters. + +## Workflow JSON + +Triggers are stored in the root `workflowJson.triggers` field: + +```json +{ + "start": "n1", + "nodes": [ + { + "id": "n1", + "type": "python", + "code": "result = {'ok': True}" + } + ], + "edges": [], + "triggers": [ + { + "id": "hook-default", + "type": "custom_webhook", + "enabled": true, + "source": { + "path": "/alerts/demo", + "method": "POST" + }, + "mapping": { + "event": "$.body" + } + } + ] +} +``` + +For backward compatibility, older workflows that still keep trigger +configuration under `metadata.triggers` are also supported on read. + +## Event Envelope + +All trigger executions inject a reserved `_flocks.trigger` envelope into the +workflow inputs: + +```json +{ + "_flocks": { + "trigger": { + "id": "hook-default", + "type": "custom_webhook", + "source": "/alerts/demo", + "deliveryId": "webhook-123", + "receivedAt": 1760000000000, + "attempt": 1 + } + } +} +``` + +Legacy poller/syslog/kafka runs now also record `triggerId`, `triggerType`, +`deliveryId`, and `triggerSource` in workflow execution history. + +## API + +Unified trigger routes: + +- `GET /api/workflow/{id}/triggers` +- `POST /api/workflow/{id}/triggers` +- `PUT /api/workflow/{id}/triggers/{trigger_id}` +- `POST /api/workflow/{id}/triggers/{trigger_id}/preview-mapping` +- `POST /api/workflow/{id}/triggers/{trigger_id}/test` +- `GET /api/workflow/{id}/triggers/{trigger_id}/status` +- `GET /api/workflow-trigger-plugins` +- `POST /webhook/workflows/{workflow_id}/{trigger_id}` + +The legacy routes still work: + +- `POST /api/workflow/{id}/syslog-config` +- `POST /api/workflow/{id}/kafka-config` +- `POST /api/workflow/{id}/poller-config` + +Saving through a legacy route now also updates the unified `workflowJson.triggers` +representation so the new Automation / Triggers view stays in sync. + +## Legacy Compatibility + +The trigger runtime wraps the existing managers: + +- `schedule` -> `WorkflowPollerManager` +- `syslog` -> `SyslogManager` +- `kafka` -> `KafkaManager` + +On startup the runtime syncs unified trigger definitions back into the legacy +storage keys used by those managers, then starts them once. + +## Custom Triggers + +Two custom paths are supported: + +1. `custom_webhook`: declarative webhook trigger persisted in workflow JSON. +2. `custom_adapter`: plugin-backed trigger loaded from: + - `~/.flocks/plugins/triggers//` + - `/.flocks/plugins/triggers//` + +Supported manifest filenames: + +- `trigger.json` +- `trigger.yaml` +- `trigger.yml` +- `manifest.json` + +Each plugin directory can expose either: + +- `create_trigger_adapter(trigger_definition)` +- `TriggerAdapter(trigger_definition)` + +The runtime calls `adapter.start(definition, emit)` and expects the adapter to +send normalized events back through the provided `emit()` callback. + +## Frontend + +The workflow detail page now includes an `Automation / Triggers` section that: + +- lists all unified triggers +- shows runtime status +- supports quick webhook creation +- previews mapped workflow inputs +- runs test trigger events + +The workflow canvas also renders configured triggers as virtual upstream nodes +connected to the workflow start node. + diff --git a/flocks/ingest/kafka/manager.py b/flocks/ingest/kafka/manager.py index 4cec529ad..d44abb3c2 100644 --- a/flocks/ingest/kafka/manager.py +++ b/flocks/ingest/kafka/manager.py @@ -23,6 +23,7 @@ import hashlib import json import time +import uuid from dataclasses import dataclass from typing import Any, Dict, List, Optional @@ -571,12 +572,27 @@ async def _trigger_workflow( configured_inputs = _strip_execution_only_comments( configured_inputs if isinstance(configured_inputs, dict) else {} ) - inputs = {**configured_inputs, input_key: message} + delivery_id = f"kafka-{uuid.uuid4().hex}" + inputs = { + **configured_inputs, + input_key: message, + "_flocks": { + "trigger": { + "id": "kafka-default", + "type": "kafka", + "source": "kafka", + "deliveryId": delivery_id, + "receivedAt": int(time.time() * 1000), + "attempt": 1, + } + }, + } input_params = {"_trigger": "kafka", input_key: _summarize_large_value(message)} for key, value in configured_inputs.items(): if key == input_key: continue input_params[key] = _summarize_large_value(value) + input_params["_flocks"] = inputs["_flocks"] exec_data = await create_execution_record( workflow_id, @@ -609,6 +625,11 @@ async def _trigger_workflow( "currentNodeId": result.last_node_id, "currentPhase": status, "currentStepIndex": result.steps, + "triggerId": "kafka-default", + "triggerType": "kafka", + "deliveryId": delivery_id, + "attempt": 1, + "triggerSource": "kafka", }) except Exception as exc: duration = time.time() - start_time @@ -622,6 +643,11 @@ async def _trigger_workflow( "finishedAt": int(time.time() * 1000), "duration": duration, "currentPhase": "error", + "triggerId": "kafka-default", + "triggerType": "kafka", + "deliveryId": delivery_id, + "attempt": 1, + "triggerSource": "kafka", }) finally: try: diff --git a/flocks/ingest/syslog/manager.py b/flocks/ingest/syslog/manager.py index 0ef9aae6b..1a57d763d 100644 --- a/flocks/ingest/syslog/manager.py +++ b/flocks/ingest/syslog/manager.py @@ -4,6 +4,7 @@ import asyncio import time +import uuid from typing import Any, Dict, List from flocks.storage.storage import Storage @@ -451,7 +452,20 @@ async def _trigger_workflow( syslog_msg: dict, input_key: str, ) -> None: - inputs = {input_key: syslog_msg} + delivery_id = f"syslog-{uuid.uuid4().hex}" + inputs = { + input_key: syslog_msg, + "_flocks": { + "trigger": { + "id": "syslog-default", + "type": "syslog", + "source": "syslog", + "deliveryId": delivery_id, + "receivedAt": int(time.time() * 1000), + "attempt": 1, + } + }, + } exec_data = await create_execution_record( workflow_id, @@ -479,6 +493,11 @@ async def _trigger_workflow( "currentNodeId": result.last_node_id, "currentPhase": status, "currentStepIndex": result.steps, + "triggerId": "syslog-default", + "triggerType": "syslog", + "deliveryId": delivery_id, + "attempt": 1, + "triggerSource": "syslog", }) except Exception as exc: duration = time.time() - start_time @@ -492,6 +511,11 @@ async def _trigger_workflow( "finishedAt": int(time.time() * 1000), "duration": duration, "currentPhase": "error", + "triggerId": "syslog-default", + "triggerType": "syslog", + "deliveryId": delivery_id, + "attempt": 1, + "triggerSource": "syslog", }) finally: try: diff --git a/flocks/server/app.py b/flocks/server/app.py index 402a75a9a..dfac5b8de 100644 --- a/flocks/server/app.py +++ b/flocks/server/app.py @@ -393,62 +393,21 @@ async def _start_channel_gateway() -> None: except Exception as e: log.warning("channel.gateway.start_failed", {"error": str(e)}) - # Start syslog listeners for workflows with syslog enabled. - # Use a background task with a short delay so the main startup path is not - # blocked and to break the crash-restart loop where an immediate syslog - # flood would bring the server back down before it is fully ready. + # Start the unified workflow trigger runtime after the server is ready. try: - from flocks.ingest.syslog.manager import default_manager as default_syslog_manager + from flocks.workflow.triggers.runtime import default_runtime as default_trigger_runtime - async def _delayed_syslog_start() -> None: - # Wait for storage and tool registry to be fully initialised before - # resuming syslog listeners. + async def _delayed_trigger_runtime_start() -> None: await asyncio.sleep(3) try: - await default_syslog_manager.start_all() - log.info("syslog.manager.started") + await default_trigger_runtime.start_all() + log.info("workflow.trigger_runtime.started") except Exception as exc: - log.warning("syslog.manager.start_failed", {"error": str(exc)}) + log.warning("workflow.trigger_runtime.start_failed", {"error": str(exc)}) - _schedule_startup_phase(app, log, "syslog.manager.start", _delayed_syslog_start) + _schedule_startup_phase(app, log, "workflow.trigger_runtime.start", _delayed_trigger_runtime_start) except Exception as e: - log.warning("syslog.manager.start_failed", {"error": str(e)}) - - # Start Kafka consumers for workflows with kafka input enabled. - # Mirrors the syslog startup: a short delayed background task keeps the main - # startup path unblocked and avoids a crash-restart loop if a broker is down. - try: - from flocks.ingest.kafka.manager import default_manager as default_kafka_manager - - async def _delayed_kafka_start() -> None: - await asyncio.sleep(3) - try: - await default_kafka_manager.start_all() - log.info("kafka.manager.started") - except Exception as exc: - log.warning("kafka.manager.start_failed", {"error": str(exc)}) - - _schedule_startup_phase(app, log, "kafka.manager.start", _delayed_kafka_start) - except Exception as e: - log.warning("kafka.manager.start_failed", {"error": str(e)}) - - # Start workflow pollers for workflows with poller enabled. - # Mirrors Kafka/syslog startup so persistent slow-path workflows resume - # automatically without delaying server readiness. - try: - from flocks.workflow.poller_manager import default_manager as default_poller_manager - - async def _delayed_poller_start() -> None: - await asyncio.sleep(3) - try: - await default_poller_manager.start_all() - log.info("workflow.poller.started") - except Exception as exc: - log.warning("workflow.poller.start_failed", {"error": str(exc)}) - - _schedule_startup_phase(app, log, "workflow.poller.start", _delayed_poller_start) - except Exception as e: - log.warning("workflow.poller.start_failed", {"error": str(e)}) + log.warning("workflow.trigger_runtime.start_failed", {"error": str(e)}) try: from flocks.updater.updater import recover_upgrade_state @@ -513,23 +472,14 @@ async def _delayed_poller_start() -> None: except Exception as e: log.warning("channel.gateway.stop_failed", {"error": str(e)}) - # Stop syslog listeners - try: - from flocks.ingest.syslog.manager import default_manager as default_syslog_manager - - await default_syslog_manager.stop_all() - log.info("syslog.manager.stopped") - except Exception as e: - log.warning("syslog.manager.stop_failed", {"error": str(e)}) - - # Stop Kafka consumers + # Stop the unified workflow trigger runtime. try: - from flocks.ingest.kafka.manager import default_manager as default_kafka_manager + from flocks.workflow.triggers.runtime import default_runtime as default_trigger_runtime - await default_kafka_manager.stop_all() - log.info("kafka.manager.stopped") + await default_trigger_runtime.stop_all() + log.info("workflow.trigger_runtime.stopped") except Exception as e: - log.warning("kafka.manager.stop_failed", {"error": str(e)}) + log.warning("workflow.trigger_runtime.stop_failed", {"error": str(e)}) # Stop Task Center try: @@ -986,7 +936,7 @@ async def general_exception_handler(request: Request, exc: Exception): # P3: TUI control routes for remote TUI control from flocks.server.routes.tui import router as tui_router # WebUI: Workflow routes -from flocks.server.routes.workflow import router as workflow_router +from flocks.server.routes.workflow import router as workflow_router, webhook_router as workflow_webhook_router # WebUI: Skill & Command routes from flocks.server.routes.skill import router as skill_router from flocks.server.routes.hub import router as hub_router @@ -1036,6 +986,7 @@ async def general_exception_handler(request: Request, exc: Exception): app.include_router(mcp_router, prefix="/api/mcp", tags=["MCP"]) # WebUI: Workflow routes app.include_router(workflow_router, prefix="/api", tags=["Workflow"]) +app.include_router(workflow_webhook_router, tags=["WorkflowWebhook"]) # WebUI: Skill & Command routes app.include_router(skill_router, prefix="/api", tags=["Skill"]) # WebUI: Hub routes diff --git a/flocks/server/routes/workflow.py b/flocks/server/routes/workflow.py index d36bc67af..c4d267ac9 100644 --- a/flocks/server/routes/workflow.py +++ b/flocks/server/routes/workflow.py @@ -15,7 +15,7 @@ from dataclasses import dataclass from pathlib import Path from typing import List, Optional, Any, Dict, Literal -from fastapi import APIRouter, HTTPException, status, Query +from fastapi import APIRouter, HTTPException, Request, status, Query from pydantic import BaseModel, Field, ConfigDict import uuid @@ -57,6 +57,25 @@ from flocks.workflow.io import load_workflow, dump_workflow from flocks.workflow.tool_context import build_workflow_tool_context from flocks.workflow.tools import get_tool_registry +from flocks.workflow.triggers import ( + TriggerDefinition, + TriggerEvent, + build_trigger_event, + preview_trigger_mapping, + set_workflow_json_triggers, + workflow_json_declares_triggers, + workflow_trigger_definitions_from_json, +) +from flocks.workflow.triggers.dispatcher import evaluate_trigger_filter +from flocks.workflow.triggers.runtime import default_runtime as default_trigger_runtime +from flocks.workflow.triggers.compat import ( + kafka_trigger_to_legacy_config, + legacy_kafka_trigger_from_config, + legacy_schedule_trigger_from_config, + legacy_syslog_trigger_from_config, + schedule_trigger_to_legacy_config, + syslog_trigger_to_legacy_config, +) from flocks.config.config import Config from flocks.storage.storage import Storage from flocks.server.routes.event import publish_event @@ -65,6 +84,7 @@ router = APIRouter() +webhook_router = APIRouter() log = Log.create(service="workflow-routes") @@ -153,6 +173,11 @@ class WorkflowExecutionResponse(BaseModel): duration: Optional[float] = Field(None, description="Duration (seconds)") executionLog: List[Dict[str, Any]] = Field(default_factory=list, description="Execution log") errorMessage: Optional[str] = Field(None, description="Error message") + triggerId: Optional[str] = Field(None, description="Trigger ID") + triggerType: Optional[str] = Field(None, description="Trigger type") + deliveryId: Optional[str] = Field(None, description="Trigger delivery ID") + attempt: Optional[int] = Field(None, description="Trigger attempt") + triggerSource: Optional[str] = Field(None, description="Trigger source") currentNodeId: Optional[str] = Field(None, description="Current running node ID") currentNodeType: Optional[str] = Field(None, description="Current running node type") currentPhase: Optional[str] = Field(None, description="Current execution phase") @@ -394,6 +419,151 @@ def _syslog_config_key(workflow_id: str) -> str: return f"{WORKFLOW_SYSLOG_CONFIG_PREFIX}{workflow_id}" +async def _read_legacy_trigger_defs(workflow_id: str) -> List[TriggerDefinition]: + triggers: List[TriggerDefinition] = [] + for key, converter in ( + (_kafka_config_key(workflow_id), legacy_kafka_trigger_from_config), + (f"workflow_poller_config/{workflow_id}", legacy_schedule_trigger_from_config), + (_syslog_config_key(workflow_id), legacy_syslog_trigger_from_config), + ): + try: + config = await Storage.read(key) + except Exception: + config = None + trigger = converter(config) + if trigger is not None: + triggers.append(trigger) + return triggers + + +async def _get_workflow_trigger_defs( + workflow_id: str, + workflow_data: Optional[Dict[str, Any]] = None, +) -> List[TriggerDefinition]: + data = workflow_data or _read_workflow_from_fs(workflow_id) + if not data: + return [] + workflow_json = data.get("workflowJson") or {} + triggers = workflow_trigger_definitions_from_json(workflow_json) + # Once the workflow JSON explicitly declares a trigger list, it becomes the + # single source of truth, even when the list is empty. + if workflow_json_declares_triggers(workflow_json): + return triggers + return await _read_legacy_trigger_defs(workflow_id) + + +def _trigger_to_api_dict(trigger: TriggerDefinition) -> Dict[str, Any]: + return trigger.model_dump(mode="json", by_alias=True, exclude_none=True) + + +def _replace_or_append_trigger( + triggers: List[TriggerDefinition], + trigger: TriggerDefinition, +) -> List[TriggerDefinition]: + updated = [existing for existing in triggers if existing.id != trigger.id] + updated.append(trigger) + return updated + + +def _disable_legacy_trigger_of_type( + workflow_id: str, + trigger_type: str, +) -> tuple[Optional[str], Optional[Dict[str, Any]]]: + now_ms = int(time.time() * 1000) + if trigger_type == "kafka": + return ( + _kafka_config_key(workflow_id), + {"workflowId": workflow_id, "enabled": False, "updatedAt": now_ms}, + ) + if trigger_type == "schedule": + return ( + f"workflow_poller_config/{workflow_id}", + {"workflowId": workflow_id, "enabled": False, "updatedAt": now_ms}, + ) + if trigger_type == "syslog": + return ( + _syslog_config_key(workflow_id), + {"workflowId": workflow_id, "enabled": False, "updatedAt": now_ms}, + ) + return None, None + + +async def _sync_trigger_legacy_state(workflow_id: str, trigger: TriggerDefinition) -> Optional[Dict[str, Any]]: + if trigger.type == "kafka": + config = kafka_trigger_to_legacy_config(workflow_id, trigger) + await Storage.write(_kafka_config_key(workflow_id), config) + from flocks.ingest.kafka.manager import default_manager as _kafka_default_manager + + return await _kafka_default_manager.restart_workflow(workflow_id) + if trigger.type == "schedule": + config = schedule_trigger_to_legacy_config(workflow_id, trigger) + await Storage.write(f"workflow_poller_config/{workflow_id}", config) + from flocks.workflow.poller_manager import default_manager as _poller_default_manager + + return await _poller_default_manager.restart_workflow(workflow_id) + if trigger.type == "syslog": + config = syslog_trigger_to_legacy_config(workflow_id, trigger) + await Storage.write(_syslog_config_key(workflow_id), config) + from flocks.ingest.syslog.manager import default_manager as _syslog_default_manager + + return await _syslog_default_manager.restart_workflow(workflow_id) + return await default_trigger_runtime.get_trigger_status(workflow_id, trigger) + + +async def _remove_legacy_trigger_state(workflow_id: str, trigger: TriggerDefinition) -> None: + """Remove legacy trigger configs so deleted unified triggers do not reappear.""" + if trigger.type == "kafka": + try: + from flocks.ingest.kafka.manager import default_manager as _kafka_default_manager + + await _kafka_default_manager.stop_workflow(workflow_id) + except Exception: + pass + try: + await Storage.remove(_kafka_config_key(workflow_id)) + except Storage.NotFoundError: + pass + return + if trigger.type == "schedule": + try: + from flocks.workflow.poller_manager import default_manager as _poller_default_manager + + await _poller_default_manager.stop_workflow(workflow_id) + except Exception: + pass + try: + await Storage.remove(f"workflow_poller_config/{workflow_id}") + except Storage.NotFoundError: + pass + return + if trigger.type == "syslog": + try: + from flocks.ingest.syslog.manager import default_manager as _syslog_default_manager + + await _syslog_default_manager.stop_workflow(workflow_id) + except Exception: + pass + try: + await Storage.remove(_syslog_config_key(workflow_id)) + except Storage.NotFoundError: + pass + + +async def _persist_workflow_triggers( + workflow_id: str, + workflow_data: Dict[str, Any], + triggers: List[TriggerDefinition], +) -> Dict[str, Any]: + workflow_json = workflow_data.get("workflowJson") or {} + updated_json = set_workflow_json_triggers(workflow_json, triggers) + data = dict(workflow_data) + data["workflowJson"] = updated_json + data["updatedAt"] = int(time.time() * 1000) + is_global = data.get("source") == "global" + _write_workflow_to_fs(workflow_id, updated_json, data, data.get("markdownContent"), global_store=is_global) + return data + + async def _run_workflow_execution_task( *, workflow_id: str, @@ -1124,6 +1294,8 @@ async def workflow_center_releases(workflow_id: str): async def get_workflow_history( workflow_id: str, limit: int = Query(50, ge=1, le=100, description="Max results"), + trigger_id: Optional[str] = Query(None, alias="triggerId"), + trigger_type: Optional[str] = Query(None, alias="triggerType"), ): """ Get workflow execution history @@ -1131,7 +1303,8 @@ async def get_workflow_history( Returns list of recent executions for this workflow. """ try: - if not _read_workflow_from_fs(workflow_id): + data = _read_workflow_from_fs(workflow_id) + if not data: raise HTTPException(status_code=404, detail=f"Workflow not found: {workflow_id}") # 单次查询批量读取所有 execution 记录,避免 N 次单独 read 导致超长耗时 @@ -1143,6 +1316,10 @@ async def get_workflow_history( continue if exec_data.get("workflowId") != workflow_id: continue + if trigger_id and exec_data.get("triggerId") != trigger_id: + continue + if trigger_type and exec_data.get("triggerType") != trigger_type: + continue executions.append(WorkflowExecutionResponse(**exec_data)) except Exception as e: log.warning("workflow.history.skip", {"key": _key, "error": str(e)}) @@ -1408,6 +1585,38 @@ def _strip_execution_only_comments(value: Any) -> Any: } +class TriggerEventPayloadRequest(BaseModel): + """Sample event payload for trigger preview/testing.""" + + model_config = ConfigDict(populate_by_name=True) + + body: Any = None + headers: Dict[str, Any] = Field(default_factory=dict) + query: Dict[str, Any] = Field(default_factory=dict) + path_params: Dict[str, Any] = Field(default_factory=dict, alias="pathParams") + + +class TriggerPreviewResponse(BaseModel): + """Preview result for trigger mapping and filtering.""" + + model_config = ConfigDict(populate_by_name=True, by_alias=True) + + triggerId: str + triggerType: str + matched: bool + inputs: Dict[str, Any] = Field(default_factory=dict) + filterError: Optional[str] = None + + +class TriggerSaveResponse(BaseModel): + """Persisted trigger definition with runtime status.""" + + model_config = ConfigDict(populate_by_name=True, by_alias=True) + + trigger: Dict[str, Any] + status: Optional[Dict[str, Any]] = None + + class WorkflowPollerConfigRequest(BaseModel): """Per-workflow background poller configuration.""" @@ -1582,6 +1791,243 @@ async def list_workflow_services(): raise HTTPException(status_code=500, detail=f"Failed to list services: {str(e)}") +def _find_trigger_or_404(triggers: List[TriggerDefinition], trigger_id: str) -> TriggerDefinition: + trigger = next((item for item in triggers if item.id == trigger_id), None) + if trigger is None: + raise HTTPException(status_code=404, detail=f"Trigger not found: {trigger_id}") + return trigger + + +@router.get("/workflow/{workflow_id}/triggers") +async def list_workflow_triggers(workflow_id: str): + """List unified triggers for a workflow with runtime status.""" + data = _read_workflow_from_fs(workflow_id) + if not data: + raise HTTPException(status_code=404, detail=f"Workflow not found: {workflow_id}") + triggers = await _get_workflow_trigger_defs(workflow_id, data) + statuses = { + item.get("triggerId"): item + for item in await default_trigger_runtime.get_workflow_trigger_statuses( + workflow_id, + set_workflow_json_triggers(data.get("workflowJson") or {}, triggers), + ) + } + return [ + { + "trigger": _trigger_to_api_dict(trigger), + "status": statuses.get(trigger.id), + } + for trigger in triggers + ] + + +@router.post("/workflow/{workflow_id}/triggers", response_model=TriggerSaveResponse) +async def create_workflow_trigger(workflow_id: str, trigger: TriggerDefinition): + """Create or replace a unified trigger definition.""" + data = _read_workflow_from_fs(workflow_id) + if not data: + raise HTTPException(status_code=404, detail=f"Workflow not found: {workflow_id}") + existing = await _get_workflow_trigger_defs(workflow_id, data) + updated = _replace_or_append_trigger(existing, trigger) + persisted = await _persist_workflow_triggers(workflow_id, data, updated) + await default_trigger_runtime.restart_workflow(workflow_id, persisted.get("workflowJson") or {}) + status = await default_trigger_runtime.get_trigger_status(workflow_id, trigger) + return TriggerSaveResponse(trigger=_trigger_to_api_dict(trigger), status=status) + + +@router.put("/workflow/{workflow_id}/triggers/{trigger_id}", response_model=TriggerSaveResponse) +async def update_workflow_trigger(workflow_id: str, trigger_id: str, trigger: TriggerDefinition): + """Update a unified trigger definition.""" + data = _read_workflow_from_fs(workflow_id) + if not data: + raise HTTPException(status_code=404, detail=f"Workflow not found: {workflow_id}") + existing = await _get_workflow_trigger_defs(workflow_id, data) + _find_trigger_or_404(existing, trigger_id) + updated_trigger = trigger.model_copy(update={"id": trigger_id}) + updated = _replace_or_append_trigger(existing, updated_trigger) + persisted = await _persist_workflow_triggers(workflow_id, data, updated) + await default_trigger_runtime.restart_workflow(workflow_id, persisted.get("workflowJson") or {}) + status = await default_trigger_runtime.get_trigger_status(workflow_id, updated_trigger) + return TriggerSaveResponse(trigger=_trigger_to_api_dict(updated_trigger), status=status) + + +@router.delete("/workflow/{workflow_id}/triggers/{trigger_id}") +async def delete_workflow_trigger(workflow_id: str, trigger_id: str): + """Delete a unified trigger definition.""" + data = _read_workflow_from_fs(workflow_id) + if not data: + raise HTTPException(status_code=404, detail=f"Workflow not found: {workflow_id}") + existing = await _get_workflow_trigger_defs(workflow_id, data) + trigger = _find_trigger_or_404(existing, trigger_id) + remaining = [item for item in existing if item.id != trigger_id] + persisted = await _persist_workflow_triggers(workflow_id, data, remaining) + await _remove_legacy_trigger_state(workflow_id, trigger) + await default_trigger_runtime.restart_workflow(workflow_id, persisted.get("workflowJson") or {}) + return {"ok": True, "triggerId": trigger_id} + + +@router.get("/workflow/{workflow_id}/triggers/{trigger_id}/status") +async def get_workflow_trigger_status(workflow_id: str, trigger_id: str): + data = _read_workflow_from_fs(workflow_id) + if not data: + raise HTTPException(status_code=404, detail=f"Workflow not found: {workflow_id}") + triggers = await _get_workflow_trigger_defs(workflow_id, data) + trigger = _find_trigger_or_404(triggers, trigger_id) + return await default_trigger_runtime.get_trigger_status(workflow_id, trigger) + + +@router.post("/workflow/{workflow_id}/triggers/{trigger_id}/preview-mapping", response_model=TriggerPreviewResponse) +async def preview_workflow_trigger_mapping( + workflow_id: str, + trigger_id: str, + payload: TriggerEventPayloadRequest, +): + data = _read_workflow_from_fs(workflow_id) + if not data: + raise HTTPException(status_code=404, detail=f"Workflow not found: {workflow_id}") + triggers = await _get_workflow_trigger_defs(workflow_id, data) + trigger = _find_trigger_or_404(triggers, trigger_id) + event = build_trigger_event( + workflow_id=workflow_id, + trigger=trigger, + body=payload.body, + headers=payload.headers, + query=payload.query, + path_params=payload.path_params, + ) + matched, filter_error = evaluate_trigger_filter(trigger, event) + return TriggerPreviewResponse( + triggerId=trigger.id or trigger_id, + triggerType=trigger.type, + matched=matched, + inputs=preview_trigger_mapping(trigger, event), + filterError=filter_error, + ) + + +@router.post("/workflow/{workflow_id}/triggers/{trigger_id}/test") +async def test_workflow_trigger( + workflow_id: str, + trigger_id: str, + payload: TriggerEventPayloadRequest, +): + data = _read_workflow_from_fs(workflow_id) + if not data: + raise HTTPException(status_code=404, detail=f"Workflow not found: {workflow_id}") + workflow_json = data.get("workflowJson") or {} + triggers = await _get_workflow_trigger_defs(workflow_id, data) + trigger = _find_trigger_or_404(triggers, trigger_id) + event = build_trigger_event( + workflow_id=workflow_id, + trigger=trigger, + body=payload.body, + headers=payload.headers, + query=payload.query, + path_params=payload.path_params, + ) + result = await default_trigger_runtime.dispatch_event( + workflow_id=workflow_id, + workflow_json=workflow_json, + trigger=trigger, + event=event, + ) + return { + "ok": True, + "trigger": _trigger_to_api_dict(trigger), + **result, + } + + +@router.get("/workflow-trigger-plugins") +async def list_workflow_trigger_plugins(): + return default_trigger_runtime.list_plugin_specs() + + +def _resolve_trigger_secret(secret_ref: Optional[str]) -> Optional[str]: + if not secret_ref: + return None + try: + from flocks.security import get_secret_manager + + return get_secret_manager().get(secret_ref) + except Exception: + return None + + +def _authorize_webhook_trigger(trigger: TriggerDefinition, headers: Dict[str, str], query: Dict[str, str]) -> None: + auth = trigger.auth + if auth is None or auth.type in {"none", ""}: + return + if auth.type == "api_key": + expected = auth.apiKey or _resolve_trigger_secret(auth.secretRef) + if not expected: + raise HTTPException(status_code=401, detail="Webhook trigger API key is not configured") + header_name = (auth.headerName or "x-api-key").lower() + actual = headers.get(header_name) or query.get(auth.queryParam or "api_key") + if actual != expected: + raise HTTPException(status_code=401, detail="Invalid webhook API key") + return + if auth.type == "hmac": + expected = _resolve_trigger_secret(auth.secretRef) + if not expected: + raise HTTPException(status_code=401, detail="Webhook trigger secret is not configured") + signature = headers.get((auth.headerName or "x-flocks-signature").lower()) + if signature != expected: + raise HTTPException(status_code=401, detail="Invalid webhook signature") + return + raise HTTPException(status_code=400, detail=f"Unsupported webhook auth type: {auth.type}") + + +@webhook_router.post("/webhook/workflows/{workflow_id}/{trigger_id}") +async def invoke_workflow_webhook_trigger( + workflow_id: str, + trigger_id: str, + request: Request, +): + """Invoke a webhook/custom_webhook trigger and dispatch the workflow.""" + data = _read_workflow_from_fs(workflow_id) + if not data: + raise HTTPException(status_code=404, detail=f"Workflow not found: {workflow_id}") + workflow_json = data.get("workflowJson") or {} + triggers = await _get_workflow_trigger_defs(workflow_id, data) + trigger = _find_trigger_or_404(triggers, trigger_id) + if trigger.type not in {"webhook", "custom_webhook"}: + raise HTTPException(status_code=400, detail=f"Trigger is not a webhook trigger: {trigger_id}") + + headers = {key.lower(): value for key, value in request.headers.items()} + query = {key: value for key, value in request.query_params.items()} + _authorize_webhook_trigger(trigger, headers, query) + + try: + body = await request.json() + except Exception: + body = (await request.body()).decode("utf-8", errors="replace") + + event = build_trigger_event( + workflow_id=workflow_id, + trigger=trigger, + body=body, + headers=headers, + query=query, + path_params={"workflow_id": workflow_id, "trigger_id": trigger_id}, + raw=body, + source=(trigger.source or {}).get("path") or str(request.url.path), + ) + result = await default_trigger_runtime.dispatch_event( + workflow_id=workflow_id, + workflow_json=workflow_json, + trigger=trigger, + event=event, + ) + return { + "ok": True, + "matched": result.get("matched", True), + "executed": result.get("executed", False), + "inputs": result.get("inputs", {}), + "result": result.get("result"), + } + + @router.post("/workflow/{workflow_id}/kafka-config") async def save_kafka_config(workflow_id: str, req: KafkaConfigRequest): """ @@ -1608,6 +2054,30 @@ async def save_kafka_config(workflow_id: str, req: KafkaConfigRequest): "updatedAt": int(time.time() * 1000), } await Storage.write(_kafka_config_key(workflow_id), config) + unified_trigger = TriggerDefinition.model_validate( + { + "id": "kafka-default", + "type": "kafka", + "enabled": req.enabled, + "source": { + "inputBroker": req.inputBroker or "", + "inputTopic": req.inputTopic or "", + "inputGroupId": req.inputGroupId or "", + "autoOffsetReset": req.autoOffsetReset, + }, + "mapping": { + req.inputKey or "kafka_message": "$.body", + }, + "inputs": _strip_execution_only_comments(req.inputs), + "updatedAt": config["updatedAt"], + } + ) + triggers = await _get_workflow_trigger_defs(workflow_id, data) + await _persist_workflow_triggers( + workflow_id, + data, + _replace_or_append_trigger(triggers, unified_trigger), + ) from flocks.ingest.kafka.manager import default_manager as _kafka_default_manager @@ -1634,6 +2104,13 @@ async def get_kafka_config(workflow_id: str): """ try: config = await Storage.read(_kafka_config_key(workflow_id)) + if config is None: + data = _read_workflow_from_fs(workflow_id) + if data: + triggers = await _get_workflow_trigger_defs(workflow_id, data) + trigger = next((item for item in triggers if item.type == "kafka"), None) + if trigger is not None: + config = kafka_trigger_to_legacy_config(workflow_id, trigger) return config # None / null if not configured except Exception as e: log.error("workflow.kafka_config.get.error", {"id": workflow_id, "error": str(e)}) @@ -1661,7 +2138,8 @@ async def get_kafka_status(workflow_id: str): async def save_workflow_poller_config(workflow_id: str, req: WorkflowPollerConfigRequest): """Save background poller configuration for a workflow.""" try: - if not _read_workflow_from_fs(workflow_id): + data = _read_workflow_from_fs(workflow_id) + if not data: raise HTTPException(status_code=404, detail=f"Workflow not found: {workflow_id}") config = { @@ -1674,6 +2152,29 @@ async def save_workflow_poller_config(workflow_id: str, req: WorkflowPollerConfi "updatedAt": int(time.time() * 1000), } await Storage.write(f"workflow_poller_config/{workflow_id}", config) + unified_trigger = TriggerDefinition.model_validate( + { + "id": "schedule-default", + "type": "schedule", + "enabled": req.enabled, + "source": { + "mode": "interval", + "intervalSeconds": req.intervalSeconds, + }, + "runtime": { + "timeoutSeconds": req.timeoutSeconds, + "noOverlap": req.noOverlap, + }, + "inputs": req.inputs, + "updatedAt": config["updatedAt"], + } + ) + triggers = await _get_workflow_trigger_defs(workflow_id, data) + await _persist_workflow_triggers( + workflow_id, + data, + _replace_or_append_trigger(triggers, unified_trigger), + ) from flocks.workflow.poller_manager import default_manager as _poller_default_manager @@ -1696,7 +2197,15 @@ async def save_workflow_poller_config(workflow_id: str, req: WorkflowPollerConfi async def get_workflow_poller_config(workflow_id: str): """Get saved poller configuration for a workflow.""" try: - return await Storage.read(f"workflow_poller_config/{workflow_id}") + config = await Storage.read(f"workflow_poller_config/{workflow_id}") + if config is None: + data = _read_workflow_from_fs(workflow_id) + if data: + triggers = await _get_workflow_trigger_defs(workflow_id, data) + trigger = next((item for item in triggers if item.type == "schedule"), None) + if trigger is not None: + config = schedule_trigger_to_legacy_config(workflow_id, trigger) + return config except Exception as e: log.error("workflow.poller_config.get.error", {"id": workflow_id, "error": str(e)}) raise HTTPException(status_code=500, detail=f"Failed to get poller config: {str(e)}") @@ -1718,7 +2227,8 @@ async def get_workflow_poller_status(workflow_id: str): async def run_workflow_poller_once(workflow_id: str): """Trigger one immediate poller execution for a workflow.""" try: - if not _read_workflow_from_fs(workflow_id): + data = _read_workflow_from_fs(workflow_id) + if not data: raise HTTPException(status_code=404, detail=f"Workflow not found: {workflow_id}") from flocks.workflow.poller_manager import default_manager as _poller_default_manager @@ -1758,6 +2268,29 @@ async def save_syslog_config(workflow_id: str, req: SyslogConfigRequest): "updatedAt": int(time.time() * 1000), } await Storage.write(_syslog_config_key(workflow_id), config) + unified_trigger = TriggerDefinition.model_validate( + { + "id": "syslog-default", + "type": "syslog", + "enabled": req.enabled, + "source": { + "protocol": req.protocol, + "host": req.host, + "port": req.port, + "format": req.msg_format, + }, + "mapping": { + req.input_key or "syslog_message": "$.body", + }, + "updatedAt": config["updatedAt"], + } + ) + triggers = await _get_workflow_trigger_defs(workflow_id, data) + await _persist_workflow_triggers( + workflow_id, + data, + _replace_or_append_trigger(triggers, unified_trigger), + ) from flocks.ingest.syslog.manager import default_manager as _syslog_default_manager @@ -1782,6 +2315,13 @@ async def get_syslog_config(workflow_id: str): """Get saved syslog configuration for a workflow.""" try: config = await Storage.read(_syslog_config_key(workflow_id)) + if config is None: + data = _read_workflow_from_fs(workflow_id) + if data: + triggers = await _get_workflow_trigger_defs(workflow_id, data) + trigger = next((item for item in triggers if item.type == "syslog"), None) + if trigger is not None: + config = syslog_trigger_to_legacy_config(workflow_id, trigger) return config except Exception as e: log.error("workflow.syslog_config.get.error", {"id": workflow_id, "error": str(e)}) diff --git a/flocks/workflow/models.py b/flocks/workflow/models.py index 3fa893126..e20f296e3 100644 --- a/flocks/workflow/models.py +++ b/flocks/workflow/models.py @@ -6,6 +6,7 @@ from pydantic import BaseModel, ConfigDict, Field, model_validator from .errors import WorkflowValidationError +from .triggers.models import TriggerDefinition, normalize_trigger_definitions class Node(BaseModel): @@ -96,12 +97,15 @@ class Workflow(BaseModel): start: str = Field(min_length=1) nodes: List[Node] = Field(default_factory=list) edges: List[Edge] = Field(default_factory=list) + triggers: List[TriggerDefinition] = Field(default_factory=list) metadata: Optional[Dict[str, Any]] = None @model_validator(mode="after") def _validate_graph(self) -> "Workflow": if self.version is not None: self.version = None + if not self.triggers and isinstance(self.metadata, dict) and isinstance(self.metadata.get("triggers"), list): + self.triggers = normalize_trigger_definitions(self.metadata.get("triggers")) node_ids = [n.id for n in self.nodes] if len(node_ids) != len(set(node_ids)): dupes = sorted({x for x in node_ids if node_ids.count(x) > 1}) diff --git a/flocks/workflow/poller_manager.py b/flocks/workflow/poller_manager.py index c25d4cd03..5238c9f20 100644 --- a/flocks/workflow/poller_manager.py +++ b/flocks/workflow/poller_manager.py @@ -10,9 +10,11 @@ import threading import time import uuid -from datetime import datetime +from datetime import datetime, timezone from typing import Any, Dict +from croniter import croniter + from flocks.storage.storage import Storage from flocks.utils.log import Log from flocks.workflow.execution_store import ( @@ -60,10 +62,12 @@ def _normalize_config(self, workflow_id: str, data: Any) -> Dict[str, Any]: interval_seconds = int(raw.get("intervalSeconds") or DEFAULT_INTERVAL_SECONDS) timeout_seconds = int(raw.get("timeoutSeconds") or DEFAULT_TIMEOUT_SECONDS) inputs = raw.get("inputs") if isinstance(raw.get("inputs"), dict) else {} + cron_expression = str(raw.get("cronExpression") or "").strip() return { "workflowId": workflow_id, "enabled": bool(raw.get("enabled")), "intervalSeconds": max(1, interval_seconds), + "cronExpression": cron_expression or None, "timeoutSeconds": max(1, timeout_seconds), "noOverlap": bool(raw.get("noOverlap", True)), "inputs": dict(inputs), @@ -98,8 +102,19 @@ def _build_inputs(self, config: Dict[str, Any]) -> Dict[str, Any]: inputs = dict(config.get("inputs") or {}) if not str(inputs.get("input_date") or "").strip(): inputs["input_date"] = _today_string() + run_id = f"poller-{_now_ms()}-{uuid.uuid4().hex[:8]}" inputs["_trigger"] = "poller" - inputs["_poller_run_id"] = f"poller-{_now_ms()}-{uuid.uuid4().hex[:8]}" + inputs["_poller_run_id"] = run_id + inputs["_flocks"] = { + "trigger": { + "id": "schedule-default", + "type": "schedule", + "source": "poller", + "deliveryId": run_id, + "receivedAt": _now_ms(), + "attempt": 1, + } + } return inputs def _summarize_outputs(self, outputs: Any) -> Dict[str, Any]: @@ -141,8 +156,19 @@ def _base_status(self, workflow_id: str) -> Dict[str, Any]: "kafkaMessageCount": None, "nextRunAt": None, "lastRunId": None, + "cronExpression": None, } + def _compute_next_run_at_ms(self, config: Dict[str, Any], *, base_ts_s: float | None = None) -> int: + cron_expression = str(config.get("cronExpression") or "").strip() + if cron_expression: + base = datetime.fromtimestamp( + base_ts_s if base_ts_s is not None else time.time(), + tz=timezone.utc, + ) + return int(croniter(cron_expression, base).get_next(float) * 1000) + return _now_ms() + int(config["intervalSeconds"]) * 1000 + def get_status(self, workflow_id: str) -> Dict[str, Any]: status = dict(self._base_status(workflow_id)) status.update(self._status.get(workflow_id) or {}) @@ -258,9 +284,10 @@ async def restart_workflow(self, workflow_id: str) -> Dict[str, Any]: "error": None, "enabled": True, "intervalSeconds": config["intervalSeconds"], + "cronExpression": config.get("cronExpression"), "timeoutSeconds": config["timeoutSeconds"], "noOverlap": config["noOverlap"], - "nextRunAt": _now_ms(), + "nextRunAt": self._compute_next_run_at_ms(config), } task = asyncio.create_task( self._poller_loop(workflow_id, workflow_json, config, abort_event), @@ -307,17 +334,31 @@ async def _poller_loop( config: Dict[str, Any], abort_event: asyncio.Event, ) -> None: - interval_seconds = config["intervalSeconds"] + cron_expression = str(config.get("cronExpression") or "").strip() try: while not abort_event.is_set(): - await self._schedule_run(workflow_id, workflow_json, config) - next_run_at = _now_ms() + interval_seconds * 1000 current = self._status.get(workflow_id) or self._base_status(workflow_id) + if cron_expression: + next_run_at = self._compute_next_run_at_ms(config) + wait_seconds = max(0.0, (next_run_at - _now_ms()) / 1000.0) + current["nextRunAt"] = next_run_at + current["activeRuns"] = self._cleanup_done_runs(workflow_id) + self._status[workflow_id] = current + try: + await asyncio.wait_for(abort_event.wait(), timeout=wait_seconds) + continue + except asyncio.TimeoutError: + pass + await self._schedule_run(workflow_id, workflow_json, config) + continue + + await self._schedule_run(workflow_id, workflow_json, config) + next_run_at = self._compute_next_run_at_ms(config) current["nextRunAt"] = next_run_at current["activeRuns"] = self._cleanup_done_runs(workflow_id) self._status[workflow_id] = current try: - await asyncio.wait_for(abort_event.wait(), timeout=interval_seconds) + await asyncio.wait_for(abort_event.wait(), timeout=config["intervalSeconds"]) except asyncio.TimeoutError: continue except asyncio.CancelledError: @@ -407,6 +448,11 @@ async def _execute_run( "currentNodeId": result.last_node_id, "currentPhase": status_value, "currentStepIndex": result.steps, + "triggerId": "schedule-default", + "triggerType": "schedule", + "deliveryId": inputs.get("_flocks", {}).get("trigger", {}).get("deliveryId"), + "attempt": 1, + "triggerSource": "poller", }) current = self._status.get(workflow_id) or self._base_status(workflow_id) current.update(summary) @@ -432,6 +478,11 @@ async def _execute_run( "errorMessage": str(exc), "executionLog": compact_history_for_storage(exec_data.get("executionLog")), "currentPhase": status_value, + "triggerId": "schedule-default", + "triggerType": "schedule", + "deliveryId": inputs.get("_flocks", {}).get("trigger", {}).get("deliveryId"), + "attempt": 1, + "triggerSource": "poller", }) current = self._status.get(workflow_id) or self._base_status(workflow_id) current["lastRunAt"] = started_at_ms diff --git a/flocks/workflow/triggers/__init__.py b/flocks/workflow/triggers/__init__.py new file mode 100644 index 000000000..a4563a592 --- /dev/null +++ b/flocks/workflow/triggers/__init__.py @@ -0,0 +1,39 @@ +"""Workflow trigger runtime package.""" + +from .dispatcher import EventDispatcher, TriggerDispatchError, build_trigger_event, preview_trigger_mapping +from .models import ( + TriggerAuth, + TriggerConcurrency, + TriggerDefinition, + TriggerEvent, + TriggerEventSource, + TriggerFilter, + TriggerRuntimeStatus, + default_trigger_id, + normalize_trigger_definitions, + set_workflow_json_triggers, + trigger_definitions_to_json, + workflow_json_declares_triggers, + workflow_trigger_definitions_from_json, +) + +__all__ = [ + "EventDispatcher", + "TriggerAuth", + "TriggerConcurrency", + "TriggerDefinition", + "TriggerDispatchError", + "TriggerEvent", + "TriggerEventSource", + "TriggerFilter", + "TriggerRuntimeStatus", + "build_trigger_event", + "default_trigger_id", + "normalize_trigger_definitions", + "preview_trigger_mapping", + "set_workflow_json_triggers", + "trigger_definitions_to_json", + "workflow_json_declares_triggers", + "workflow_trigger_definitions_from_json", +] + diff --git a/flocks/workflow/triggers/compat.py b/flocks/workflow/triggers/compat.py new file mode 100644 index 000000000..e65bcb8c5 --- /dev/null +++ b/flocks/workflow/triggers/compat.py @@ -0,0 +1,140 @@ +"""Compatibility helpers between unified triggers and legacy config storage.""" + +from __future__ import annotations + +from typing import Any, Dict, Optional + +from .models import TriggerDefinition + +LEGACY_POLLER_CONFIG_PREFIX = "workflow_poller_config/" +LEGACY_SYSLOG_CONFIG_PREFIX = "workflow_syslog_config/" +LEGACY_KAFKA_CONFIG_PREFIX = "workflow_kafka_config/" + + +def legacy_schedule_trigger_from_config(config: Optional[Dict[str, Any]]) -> Optional[TriggerDefinition]: + if not isinstance(config, dict): + return None + cron_expression = str(config.get("cronExpression") or "").strip() + return TriggerDefinition.model_validate( + { + "id": "schedule-default", + "type": "schedule", + "enabled": bool(config.get("enabled")), + "source": { + "mode": "cron" if cron_expression else "interval", + "intervalSeconds": int(config.get("intervalSeconds") or 30), + "cron": cron_expression or None, + }, + "runtime": { + "timeoutSeconds": int(config.get("timeoutSeconds") or 7200), + "noOverlap": bool(config.get("noOverlap", True)), + }, + "inputs": dict(config.get("inputs") or {}), + "updatedAt": config.get("updatedAt"), + } + ) + + +def legacy_syslog_trigger_from_config(config: Optional[Dict[str, Any]]) -> Optional[TriggerDefinition]: + if not isinstance(config, dict): + return None + return TriggerDefinition.model_validate( + { + "id": "syslog-default", + "type": "syslog", + "enabled": bool(config.get("enabled")), + "source": { + "protocol": config.get("protocol") or "udp", + "host": config.get("host") or "0.0.0.0", + "port": int(config.get("port") or 5140), + "format": config.get("format") or "auto", + }, + "mapping": { + str(config.get("inputKey") or "syslog_message"): "$.body", + }, + "updatedAt": config.get("updatedAt"), + } + ) + + +def legacy_kafka_trigger_from_config(config: Optional[Dict[str, Any]]) -> Optional[TriggerDefinition]: + if not isinstance(config, dict): + return None + return TriggerDefinition.model_validate( + { + "id": "kafka-default", + "type": "kafka", + "enabled": bool(config.get("enabled")), + "source": { + "inputBroker": config.get("inputBroker") or "", + "inputTopic": config.get("inputTopic") or "", + "inputGroupId": config.get("inputGroupId") or "", + "autoOffsetReset": config.get("autoOffsetReset") or "latest", + }, + "mapping": { + str(config.get("inputKey") or "kafka_message"): "$.body", + }, + "inputs": dict(config.get("inputs") or {}), + "updatedAt": config.get("updatedAt"), + } + ) + + +def schedule_trigger_to_legacy_config(workflow_id: str, trigger: TriggerDefinition) -> Dict[str, Any]: + source = dict(trigger.source or {}) + runtime = dict(trigger.runtime or {}) + cron_expression = str(source.get("cron") or source.get("cronExpression") or "").strip() + return { + "workflowId": workflow_id, + "enabled": trigger.enabled, + "intervalSeconds": int(source.get("intervalSeconds") or 30), + "cronExpression": cron_expression or None, + "timeoutSeconds": int(runtime.get("timeoutSeconds") or 7200), + "noOverlap": bool(runtime.get("noOverlap", True)), + "inputs": dict(trigger.inputs or {}), + "updatedAt": trigger.updatedAt, + } + + +def syslog_trigger_to_legacy_config(workflow_id: str, trigger: TriggerDefinition) -> Dict[str, Any]: + source = dict(trigger.source or {}) + mapping = dict(trigger.mapping or {}) + input_key = next(iter(mapping.keys()), "syslog_message") + return { + "workflowId": workflow_id, + "enabled": trigger.enabled, + "protocol": source.get("protocol") or "udp", + "host": source.get("host") or "0.0.0.0", + "port": int(source.get("port") or 5140), + "format": source.get("format") or "auto", + "inputKey": input_key, + "updatedAt": trigger.updatedAt, + } + + +def kafka_trigger_to_legacy_config(workflow_id: str, trigger: TriggerDefinition) -> Dict[str, Any]: + source = dict(trigger.source or {}) + mapping = dict(trigger.mapping or {}) + input_key = next(iter(mapping.keys()), "kafka_message") + return { + "workflowId": workflow_id, + "enabled": trigger.enabled, + "inputBroker": source.get("inputBroker") or "", + "inputTopic": source.get("inputTopic") or "", + "inputGroupId": source.get("inputGroupId") or "", + "inputKey": input_key, + "autoOffsetReset": source.get("autoOffsetReset") or "latest", + "inputs": dict(trigger.inputs or {}), + "updatedAt": trigger.updatedAt, + } + + +def trigger_to_legacy_config(workflow_id: str, trigger: TriggerDefinition) -> tuple[Optional[str], Optional[Dict[str, Any]]]: + if trigger.type == "schedule": + return f"{LEGACY_POLLER_CONFIG_PREFIX}{workflow_id}", schedule_trigger_to_legacy_config(workflow_id, trigger) + if trigger.type == "syslog": + return f"{LEGACY_SYSLOG_CONFIG_PREFIX}{workflow_id}", syslog_trigger_to_legacy_config(workflow_id, trigger) + if trigger.type == "kafka": + return f"{LEGACY_KAFKA_CONFIG_PREFIX}{workflow_id}", kafka_trigger_to_legacy_config(workflow_id, trigger) + return None, None + diff --git a/flocks/workflow/triggers/custom_loader.py b/flocks/workflow/triggers/custom_loader.py new file mode 100644 index 000000000..fb4603b5f --- /dev/null +++ b/flocks/workflow/triggers/custom_loader.py @@ -0,0 +1,78 @@ +"""Loader for user-defined trigger plugin specs.""" + +from __future__ import annotations + +import importlib.util +import json +from pathlib import Path +from types import ModuleType +from typing import Any, Dict, List, Optional + +from flocks.workflow.fs_store import find_workspace_root + +try: # pragma: no cover - optional dependency fallback + import yaml +except Exception: # pragma: no cover - fallback branch + yaml = None + +PLUGIN_FILENAMES = ("trigger.json", "trigger.yaml", "trigger.yml", "manifest.json") + + +def trigger_plugin_roots() -> List[Path]: + workspace = find_workspace_root() + return [ + Path.home() / ".flocks" / "plugins" / "triggers", + workspace / ".flocks" / "plugins" / "triggers", + ] + + +def _read_plugin_manifest(path: Path) -> Optional[Dict[str, Any]]: + try: + if path.suffix.lower() == ".json": + return json.loads(path.read_text(encoding="utf-8")) + if yaml is None: + return None + return yaml.safe_load(path.read_text(encoding="utf-8")) + except Exception: + return None + + +def list_trigger_plugins() -> List[Dict[str, Any]]: + plugins: Dict[str, Dict[str, Any]] = {} + for root in trigger_plugin_roots(): + if not root.is_dir(): + continue + for entry in sorted(root.iterdir()): + if not entry.is_dir(): + continue + manifest_path = next((entry / filename for filename in PLUGIN_FILENAMES if (entry / filename).is_file()), None) + if manifest_path is None: + continue + manifest = _read_plugin_manifest(manifest_path) + if not isinstance(manifest, dict): + continue + plugin_id = str(manifest.get("id") or entry.name).strip() or entry.name + plugins[plugin_id] = { + "id": plugin_id, + "name": manifest.get("name") or plugin_id, + "description": manifest.get("description"), + "root": str(entry), + "manifestPath": str(manifest_path), + "handlerPath": str(entry / "handler.py"), + "manifest": manifest, + } + return list(plugins.values()) + + +def load_trigger_plugin_module(plugin_spec: Dict[str, Any]) -> Optional[ModuleType]: + handler_path = Path(str(plugin_spec.get("handlerPath") or "")).expanduser() + if not handler_path.is_file(): + return None + module_name = f"flocks_trigger_plugin_{plugin_spec.get('id', handler_path.stem)}" + spec = importlib.util.spec_from_file_location(module_name, handler_path) + if spec is None or spec.loader is None: + return None + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + return module + diff --git a/flocks/workflow/triggers/dispatcher.py b/flocks/workflow/triggers/dispatcher.py new file mode 100644 index 000000000..6243588a6 --- /dev/null +++ b/flocks/workflow/triggers/dispatcher.py @@ -0,0 +1,268 @@ +"""Unified trigger event mapping, filtering, and dispatch helpers.""" + +from __future__ import annotations + +import ast +import time +import uuid +from typing import Any, Awaitable, Callable, Dict, List, Optional, Tuple + +from .models import TriggerDefinition, TriggerEvent, TriggerEventSource + +DispatchExecutor = Callable[[Dict[str, Any]], Awaitable[Any]] + + +class TriggerDispatchError(Exception): + """Raised when a trigger event cannot be dispatched.""" + + +class TriggerExpressionEvaluator(ast.NodeVisitor): + """Very small safe evaluator for trigger filter expressions.""" + + def __init__(self, variables: Dict[str, Any]) -> None: + self._variables = variables + + def visit_Expression(self, node: ast.Expression) -> Any: # noqa: N802 + return self.visit(node.body) + + def visit_Constant(self, node: ast.Constant) -> Any: # noqa: N802 + return node.value + + def visit_Name(self, node: ast.Name) -> Any: # noqa: N802 + if node.id not in self._variables: + raise TriggerDispatchError(f"Unknown name in trigger filter: {node.id}") + return self._variables[node.id] + + def visit_List(self, node: ast.List) -> Any: # noqa: N802 + return [self.visit(elt) for elt in node.elts] + + def visit_Tuple(self, node: ast.Tuple) -> Any: # noqa: N802 + return tuple(self.visit(elt) for elt in node.elts) + + def visit_Dict(self, node: ast.Dict) -> Any: # noqa: N802 + return {self.visit(key): self.visit(value) for key, value in zip(node.keys, node.values)} + + def visit_BoolOp(self, node: ast.BoolOp) -> Any: # noqa: N802 + if isinstance(node.op, ast.And): + return all(self.visit(value) for value in node.values) + if isinstance(node.op, ast.Or): + return any(self.visit(value) for value in node.values) + raise TriggerDispatchError("Unsupported boolean operator in trigger filter") + + def visit_UnaryOp(self, node: ast.UnaryOp) -> Any: # noqa: N802 + operand = self.visit(node.operand) + if isinstance(node.op, ast.Not): + return not operand + raise TriggerDispatchError("Unsupported unary operator in trigger filter") + + def visit_Compare(self, node: ast.Compare) -> Any: # noqa: N802 + left = self.visit(node.left) + for operator, comparator_node in zip(node.ops, node.comparators): + right = self.visit(comparator_node) + if isinstance(operator, ast.Eq): + ok = left == right + elif isinstance(operator, ast.NotEq): + ok = left != right + elif isinstance(operator, ast.In): + ok = left in right + elif isinstance(operator, ast.NotIn): + ok = left not in right + elif isinstance(operator, ast.Gt): + ok = left > right + elif isinstance(operator, ast.GtE): + ok = left >= right + elif isinstance(operator, ast.Lt): + ok = left < right + elif isinstance(operator, ast.LtE): + ok = left <= right + else: + raise TriggerDispatchError("Unsupported compare operator in trigger filter") + if not ok: + return False + left = right + return True + + def visit_Attribute(self, node: ast.Attribute) -> Any: # noqa: N802 + value = self.visit(node.value) + if isinstance(value, dict): + return value.get(node.attr) + return getattr(value, node.attr, None) + + def visit_Subscript(self, node: ast.Subscript) -> Any: # noqa: N802 + value = self.visit(node.value) + key = self.visit(node.slice) + try: + return value[key] + except Exception as exc: # pragma: no cover - defensive branch + raise TriggerDispatchError(f"Invalid trigger filter subscript access: {exc}") from exc + + def generic_visit(self, node: ast.AST) -> Any: # noqa: D401 + raise TriggerDispatchError(f"Unsupported syntax in trigger filter: {type(node).__name__}") + + +def _tokenize_path(path: str) -> List[Any]: + tokens: List[Any] = [] + i = 0 + while i < len(path): + ch = path[i] + if ch == ".": + i += 1 + continue + if ch == "[": + end = path.find("]", i) + if end < 0: + raise TriggerDispatchError(f"Invalid mapping path: {path}") + raw = path[i + 1 : end].strip() + if raw.isdigit(): + tokens.append(int(raw)) + else: + tokens.append(raw.strip("'\"")) + i = end + 1 + continue + start = i + while i < len(path) and path[i] not in ".[": + i += 1 + tokens.append(path[start:i]) + return [token for token in tokens if token not in ("$", "")] + + +def lookup_mapping_path(data: Any, path: str) -> Any: + raw = (path or "").strip() + if raw in {"", "$"}: + return data + candidate = raw[2:] if raw.startswith("$.") else raw + value = data + for token in _tokenize_path(candidate): + if isinstance(token, int): + if not isinstance(value, list): + return None + if token < 0 or token >= len(value): + return None + value = value[token] + continue + if isinstance(value, dict): + value = value.get(token) + else: + value = getattr(value, token, None) + if value is None: + return None + return value + + +def build_trigger_event( + *, + workflow_id: str, + trigger: TriggerDefinition, + body: Any = None, + headers: Optional[Dict[str, Any]] = None, + query: Optional[Dict[str, Any]] = None, + path_params: Optional[Dict[str, Any]] = None, + source: Optional[str] = None, + raw: Any = None, + delivery_id: Optional[str] = None, +) -> TriggerEvent: + resolved_source = source + if not resolved_source: + src = trigger.source or {} + if isinstance(src, dict): + resolved_source = ( + src.get("path") + or src.get("topic") + or src.get("event") + or src.get("adapterId") + or trigger.type + ) + return TriggerEvent( + source=TriggerEventSource( + workflowId=workflow_id, + triggerId=trigger.id or "", + triggerType=trigger.type, + source=str(resolved_source or trigger.type), + deliveryId=delivery_id or uuid.uuid4().hex, + receivedAt=int(time.time() * 1000), + ), + body=body, + headers=headers or {}, + query=query or {}, + pathParams=path_params or {}, + payload=body, + raw=raw if raw is not None else body, + ) + + +def event_to_context(event: TriggerEvent) -> Dict[str, Any]: + payload = event.model_dump(mode="json", exclude_none=True) + return { + "event": payload, + "body": payload.get("body"), + "headers": payload.get("headers") or {}, + "query": payload.get("query") or {}, + "pathParams": payload.get("pathParams") or {}, + "payload": payload.get("payload"), + "raw": payload.get("raw"), + } + + +def evaluate_trigger_filter(trigger: TriggerDefinition, event: TriggerEvent) -> Tuple[bool, Optional[str]]: + filter_spec = trigger.filter + if filter_spec is None: + return True, None + expr = (filter_spec.expr or "").strip() + if not expr: + return True, None + ctx = event_to_context(event) + try: + parsed = ast.parse(expr, mode="eval") + matched = bool(TriggerExpressionEvaluator(ctx).visit(parsed)) + except Exception as exc: + return False, str(exc) + return matched, None + + +def preview_trigger_mapping(trigger: TriggerDefinition, event: TriggerEvent) -> Dict[str, Any]: + ctx = event_to_context(event) + mapped: Dict[str, Any] = dict(trigger.inputs or {}) + for dst_key, src_path in (trigger.mapping or {}).items(): + mapped[dst_key] = lookup_mapping_path(ctx, src_path) + mapped["_flocks"] = { + "trigger": { + "id": event.source.triggerId, + "type": event.source.triggerType, + "source": event.source.source, + "deliveryId": event.source.deliveryId, + "receivedAt": event.source.receivedAt, + "attempt": event.source.attempt, + } + } + mapped.setdefault("_trigger", trigger.type) + return mapped + + +class EventDispatcher: + """Dispatch trigger events through filtering and mapping.""" + + async def dispatch( + self, + *, + trigger: TriggerDefinition, + event: TriggerEvent, + executor: DispatchExecutor, + ) -> Dict[str, Any]: + matched, filter_error = evaluate_trigger_filter(trigger, event) + mapped_inputs = preview_trigger_mapping(trigger, event) + if filter_error: + raise TriggerDispatchError(filter_error) + if not matched: + return { + "matched": False, + "inputs": mapped_inputs, + "executed": False, + } + result = await executor(mapped_inputs) + return { + "matched": True, + "inputs": mapped_inputs, + "executed": True, + "result": result, + } + diff --git a/flocks/workflow/triggers/models.py b/flocks/workflow/triggers/models.py new file mode 100644 index 000000000..bd26297c8 --- /dev/null +++ b/flocks/workflow/triggers/models.py @@ -0,0 +1,210 @@ +"""Workflow trigger schema models and compatibility helpers.""" + +from __future__ import annotations + +import re +from typing import Any, Dict, Iterable, List, Literal, Optional + +from pydantic import BaseModel, ConfigDict, Field, model_validator + +TriggerType = Literal[ + "manual", + "schedule", + "webhook", + "syslog", + "kafka", + "internal_event", + "custom_webhook", + "custom_adapter", + "plugin", +] + +_TRIGGER_ID_SANITIZE_RE = re.compile(r"[^a-zA-Z0-9_.-]+") + + +def _sanitize_trigger_id(value: str) -> str: + cleaned = _TRIGGER_ID_SANITIZE_RE.sub("-", (value or "").strip()).strip("-") + return cleaned or "trigger" + + +def default_trigger_id(trigger_type: str, *, source: Optional[Dict[str, Any]] = None) -> str: + base = (trigger_type or "trigger").strip().lower() or "trigger" + src = source or {} + for candidate_key in ("path", "topic", "event", "name", "adapterId", "pluginId"): + candidate = src.get(candidate_key) + if isinstance(candidate, str) and candidate.strip(): + return f"{base}-{_sanitize_trigger_id(candidate)}" + return f"{base}-default" + + +class TriggerAuth(BaseModel): + model_config = ConfigDict(extra="allow") + + type: str = "none" + secretRef: Optional[str] = None + headerName: Optional[str] = None + queryParam: Optional[str] = None + apiKey: Optional[str] = None + + +class TriggerFilter(BaseModel): + model_config = ConfigDict(extra="allow") + + expr: Optional[str] = None + mode: Optional[str] = None + path: Optional[str] = None + equals: Optional[Any] = None + + +class TriggerConcurrency(BaseModel): + model_config = ConfigDict(extra="allow") + + policy: Literal["allow", "no_overlap", "queue", "drop_oldest", "drop_newest"] = "allow" + maxParallel: int = Field(1, ge=1) + queueSize: int = Field(100, ge=1) + + +class TriggerTestSample(BaseModel): + model_config = ConfigDict(extra="allow") + + name: str = Field(min_length=1) + payload: Any = None + headers: Dict[str, Any] = Field(default_factory=dict) + query: Dict[str, Any] = Field(default_factory=dict) + + +class TriggerDefinition(BaseModel): + model_config = ConfigDict(extra="allow") + + id: Optional[str] = None + name: Optional[str] = None + type: TriggerType + enabled: bool = True + description: Optional[str] = None + source: Dict[str, Any] = Field(default_factory=dict) + auth: Optional[TriggerAuth] = None + filter: Optional[TriggerFilter] = None + mapping: Dict[str, str] = Field(default_factory=dict) + inputs: Dict[str, Any] = Field(default_factory=dict) + concurrency: TriggerConcurrency = Field(default_factory=TriggerConcurrency) + runtime: Dict[str, Any] = Field(default_factory=dict) + testSamples: List[TriggerTestSample] = Field(default_factory=list) + updatedAt: Optional[int] = None + + @model_validator(mode="before") + @classmethod + def _normalize_nested_values(cls, value: Any) -> Any: + if not isinstance(value, dict): + return value + normalized = dict(value) + auth = normalized.get("auth") + if isinstance(auth, dict): + normalized["auth"] = TriggerAuth.model_validate(auth) + filter_value = normalized.get("filter") + if isinstance(filter_value, dict): + normalized["filter"] = TriggerFilter.model_validate(filter_value) + concurrency = normalized.get("concurrency") + if isinstance(concurrency, dict): + normalized["concurrency"] = TriggerConcurrency.model_validate(concurrency) + samples = normalized.get("testSamples") + if isinstance(samples, list): + normalized["testSamples"] = [ + TriggerTestSample.model_validate(item) if not isinstance(item, TriggerTestSample) else item + for item in samples + if isinstance(item, (dict, TriggerTestSample)) + ] + return normalized + + @model_validator(mode="after") + def _ensure_id(self) -> "TriggerDefinition": + source = self.source if isinstance(self.source, dict) else {} + self.id = _sanitize_trigger_id(self.id or default_trigger_id(self.type, source=source)) + return self + + +class TriggerEventSource(BaseModel): + model_config = ConfigDict(extra="allow") + + workflowId: str + triggerId: str + triggerType: str + source: Optional[str] = None + deliveryId: Optional[str] = None + receivedAt: Optional[int] = None + attempt: int = 1 + + +class TriggerEvent(BaseModel): + model_config = ConfigDict(extra="allow") + + source: TriggerEventSource + body: Any = None + headers: Dict[str, Any] = Field(default_factory=dict) + query: Dict[str, Any] = Field(default_factory=dict) + pathParams: Dict[str, Any] = Field(default_factory=dict) + payload: Any = None + raw: Any = None + + +class TriggerRuntimeStatus(BaseModel): + model_config = ConfigDict(extra="allow") + + workflowId: str + triggerId: str + triggerType: str + state: str + error: Optional[str] = None + + +def normalize_trigger_definitions(raw_triggers: Optional[Iterable[Any]]) -> List[TriggerDefinition]: + if not raw_triggers: + return [] + deduped: Dict[str, TriggerDefinition] = {} + for raw in raw_triggers: + if raw is None: + continue + trigger = raw if isinstance(raw, TriggerDefinition) else TriggerDefinition.model_validate(raw) + deduped[trigger.id or default_trigger_id(trigger.type)] = trigger + return list(deduped.values()) + + +def workflow_trigger_definitions_from_json(workflow_json: Dict[str, Any]) -> List[TriggerDefinition]: + raw = workflow_json.get("triggers") + if raw is None: + metadata = workflow_json.get("metadata") + if isinstance(metadata, dict): + raw = metadata.get("triggers") + if not isinstance(raw, list): + return [] + return normalize_trigger_definitions(raw) + + +def workflow_json_declares_triggers(workflow_json: Dict[str, Any]) -> bool: + if not isinstance(workflow_json, dict): + return False + if "triggers" in workflow_json: + return isinstance(workflow_json.get("triggers"), list) + metadata = workflow_json.get("metadata") + return isinstance(metadata, dict) and isinstance(metadata.get("triggers"), list) + + +def trigger_definitions_to_json(triggers: Iterable[TriggerDefinition]) -> List[Dict[str, Any]]: + return [ + trigger.model_dump(mode="json", by_alias=True, exclude_none=True) + for trigger in normalize_trigger_definitions(triggers) + ] + + +def set_workflow_json_triggers( + workflow_json: Dict[str, Any], + triggers: Iterable[TriggerDefinition], +) -> Dict[str, Any]: + updated = dict(workflow_json) + updated["triggers"] = trigger_definitions_to_json(triggers) + metadata = updated.get("metadata") + if isinstance(metadata, dict) and "triggers" in metadata: + metadata = dict(metadata) + metadata.pop("triggers", None) + updated["metadata"] = metadata + return updated + diff --git a/flocks/workflow/triggers/runtime.py b/flocks/workflow/triggers/runtime.py new file mode 100644 index 000000000..5fc4c2fee --- /dev/null +++ b/flocks/workflow/triggers/runtime.py @@ -0,0 +1,432 @@ +"""Unified trigger runtime with legacy manager compatibility.""" + +from __future__ import annotations + +import asyncio +import time +from typing import Any, Dict, List, Optional, Tuple + +from flocks.storage.storage import Storage +from flocks.utils.log import Log +from flocks.workflow.execution_store import ( + compact_history_for_storage, + compact_outputs_for_storage, + create_execution_record, + record_execution_result, + resolve_execution_outcome, +) +from flocks.workflow.fs_store import read_workflow_dir, workflow_scan_dirs +from flocks.workflow.runner import run_workflow + +from .compat import ( + LEGACY_KAFKA_CONFIG_PREFIX, + LEGACY_POLLER_CONFIG_PREFIX, + LEGACY_SYSLOG_CONFIG_PREFIX, + kafka_trigger_to_legacy_config, + schedule_trigger_to_legacy_config, + syslog_trigger_to_legacy_config, + trigger_to_legacy_config, +) +from .custom_loader import list_trigger_plugins, load_trigger_plugin_module +from .dispatcher import EventDispatcher, TriggerDispatchError, build_trigger_event +from .models import TriggerDefinition, TriggerEvent, TriggerRuntimeStatus, workflow_trigger_definitions_from_json + +log = Log.create(service="workflow.trigger.runtime") + + +def _now_ms() -> int: + return int(time.time() * 1000) + + +class TriggerRuntime: + """Unified trigger runtime that wraps legacy managers and custom adapters.""" + + def __init__(self) -> None: + self._dispatcher = EventDispatcher() + self._custom_adapter_tasks: Dict[tuple[str, str], asyncio.Task[Any]] = {} + self._custom_adapters: Dict[tuple[str, str], Any] = {} + self._custom_status: Dict[tuple[str, str], Dict[str, Any]] = {} + + def _iter_workflows(self) -> List[Dict[str, Any]]: + merged: Dict[str, Dict[str, Any]] = {} + for root, source in workflow_scan_dirs(): + if not root.is_dir(): + continue + for entry in sorted(root.iterdir()): + if not entry.is_dir(): + continue + data = read_workflow_dir(entry, entry.name, source) + if data is not None: + merged[entry.name] = data + return list(merged.values()) + + async def _sync_legacy_configs_from_workflow(self, workflow_id: str, workflow_json: Dict[str, Any]) -> List[TriggerDefinition]: + triggers = workflow_trigger_definitions_from_json(workflow_json) + if not triggers: + return [] + + by_type = {trigger.type: trigger for trigger in triggers} + for trigger in triggers: + key, value = trigger_to_legacy_config(workflow_id, trigger) + if key and value is not None: + await Storage.write(key, value) + + if "schedule" not in by_type: + await Storage.write( + f"{LEGACY_POLLER_CONFIG_PREFIX}{workflow_id}", + {"workflowId": workflow_id, "enabled": False, "updatedAt": _now_ms()}, + ) + if "syslog" not in by_type: + await Storage.write( + f"{LEGACY_SYSLOG_CONFIG_PREFIX}{workflow_id}", + {"workflowId": workflow_id, "enabled": False, "updatedAt": _now_ms()}, + ) + if "kafka" not in by_type: + await Storage.write( + f"{LEGACY_KAFKA_CONFIG_PREFIX}{workflow_id}", + {"workflowId": workflow_id, "enabled": False, "updatedAt": _now_ms()}, + ) + return triggers + + async def start_all(self) -> None: + for workflow in self._iter_workflows(): + try: + await self._sync_legacy_configs_from_workflow(workflow["id"], workflow.get("workflowJson") or {}) + except Exception as exc: + log.warning("trigger.sync_legacy.failed", {"workflow_id": workflow.get("id"), "error": str(exc)}) + + from flocks.ingest.syslog.manager import default_manager as syslog_manager + from flocks.ingest.kafka.manager import default_manager as kafka_manager + from flocks.workflow.poller_manager import default_manager as poller_manager + + await syslog_manager.start_all() + await kafka_manager.start_all() + await poller_manager.start_all() + + for workflow in self._iter_workflows(): + await self._start_custom_adapters_for_workflow(workflow["id"], workflow.get("workflowJson") or {}) + + async def stop_all(self) -> None: + from flocks.ingest.syslog.manager import default_manager as syslog_manager + from flocks.ingest.kafka.manager import default_manager as kafka_manager + from flocks.workflow.poller_manager import default_manager as poller_manager + + for workflow_id, trigger_id in list(self._custom_adapter_tasks.keys()): + await self._stop_custom_adapter(workflow_id, trigger_id) + + await syslog_manager.stop_all() + await kafka_manager.stop_all() + await poller_manager.stop_all() + + async def restart_workflow( + self, + workflow_id: str, + workflow_json: Optional[Dict[str, Any]] = None, + ) -> Dict[str, Any]: + if workflow_json is None: + workflow = next((item for item in self._iter_workflows() if item.get("id") == workflow_id), None) + workflow_json = (workflow or {}).get("workflowJson") or {} + triggers = await self._sync_legacy_configs_from_workflow(workflow_id, workflow_json or {}) + + from flocks.ingest.syslog.manager import default_manager as syslog_manager + from flocks.ingest.kafka.manager import default_manager as kafka_manager + from flocks.workflow.poller_manager import default_manager as poller_manager + + statuses: Dict[str, Any] = {} + by_type = {trigger.type: trigger for trigger in triggers} + + if "syslog" in by_type: + statuses["syslog"] = await syslog_manager.restart_workflow(workflow_id) + else: + await syslog_manager.stop_workflow(workflow_id) + statuses["syslog"] = {"state": "stopped", "error": None} + if "kafka" in by_type: + statuses["kafka"] = await kafka_manager.restart_workflow(workflow_id) + else: + await kafka_manager.stop_workflow(workflow_id) + statuses["kafka"] = {"state": "stopped", "error": None} + if "schedule" in by_type: + statuses["schedule"] = await poller_manager.restart_workflow(workflow_id) + else: + await poller_manager.stop_workflow(workflow_id) + statuses["schedule"] = {"state": "stopped", "error": None} + + await self._start_custom_adapters_for_workflow(workflow_id, workflow_json or {}) + return statuses + + async def _execute_workflow( + self, + *, + workflow_id: str, + workflow_json: Dict[str, Any], + trigger: TriggerDefinition, + mapped_inputs: Dict[str, Any], + ) -> Dict[str, Any]: + exec_data = await create_execution_record( + workflow_id, + input_params=mapped_inputs, + ) + exec_id = exec_data["id"] + started_at = time.time() + try: + result = await asyncio.to_thread( + run_workflow, + workflow=workflow_json, + inputs=mapped_inputs, + trace=False, + ) + status_value, error_message = resolve_execution_outcome(result) + exec_data.update( + { + "status": status_value, + "outputResults": compact_outputs_for_storage(result.outputs), + "finishedAt": _now_ms(), + "duration": time.time() - started_at, + "errorMessage": error_message, + "executionLog": compact_history_for_storage(result.history), + "currentNodeId": result.last_node_id, + "currentPhase": status_value, + "currentStepIndex": result.steps, + "triggerId": trigger.id, + "triggerType": trigger.type, + "deliveryId": mapped_inputs.get("_flocks", {}).get("trigger", {}).get("deliveryId"), + "attempt": mapped_inputs.get("_flocks", {}).get("trigger", {}).get("attempt"), + "triggerSource": mapped_inputs.get("_flocks", {}).get("trigger", {}).get("source"), + } + ) + except Exception as exc: + exec_data.update( + { + "status": "error", + "finishedAt": _now_ms(), + "duration": time.time() - started_at, + "errorMessage": str(exc), + "triggerId": trigger.id, + "triggerType": trigger.type, + "deliveryId": mapped_inputs.get("_flocks", {}).get("trigger", {}).get("deliveryId"), + "attempt": mapped_inputs.get("_flocks", {}).get("trigger", {}).get("attempt"), + "triggerSource": mapped_inputs.get("_flocks", {}).get("trigger", {}).get("source"), + } + ) + await record_execution_result(workflow_id, exec_id, exec_data) + return exec_data + + async def dispatch_event( + self, + *, + workflow_id: str, + workflow_json: Dict[str, Any], + trigger: TriggerDefinition, + event: TriggerEvent, + ) -> Dict[str, Any]: + async def _executor(mapped_inputs: Dict[str, Any]) -> Dict[str, Any]: + return await self._execute_workflow( + workflow_id=workflow_id, + workflow_json=workflow_json, + trigger=trigger, + mapped_inputs=mapped_inputs, + ) + + return await self._dispatcher.dispatch(trigger=trigger, event=event, executor=_executor) + + async def _stop_custom_adapter(self, workflow_id: str, trigger_id: str) -> None: + key = (workflow_id, trigger_id) + adapter = self._custom_adapters.pop(key, None) + task = self._custom_adapter_tasks.pop(key, None) + if adapter is not None and hasattr(adapter, "stop"): + try: + result = adapter.stop() + if asyncio.iscoroutine(result): + await result + except Exception: + pass + if task is not None and not task.done(): + task.cancel() + try: + await task + except Exception: + pass + self._custom_status[key] = { + "workflowId": workflow_id, + "triggerId": trigger_id, + "triggerType": "custom_adapter", + "state": "stopped", + "error": None, + } + + async def _start_custom_adapters_for_workflow(self, workflow_id: str, workflow_json: Dict[str, Any]) -> None: + triggers = workflow_trigger_definitions_from_json(workflow_json) + desired_ids = {trigger.id for trigger in triggers if trigger.type == "custom_adapter" and trigger.enabled} + for active_workflow_id, active_trigger_id in list(self._custom_adapter_tasks.keys()): + if active_workflow_id == workflow_id and active_trigger_id not in desired_ids: + await self._stop_custom_adapter(active_workflow_id, active_trigger_id) + + for trigger in triggers: + if trigger.type != "custom_adapter" or not trigger.enabled: + continue + key = (workflow_id, trigger.id or "") + if key in self._custom_adapter_tasks: + continue + plugin_id = str((trigger.source or {}).get("adapterId") or (trigger.source or {}).get("pluginId") or "").strip() + plugin_spec = next((item for item in list_trigger_plugins() if item.get("id") == plugin_id), None) + if plugin_spec is None: + self._custom_status[key] = { + "workflowId": workflow_id, + "triggerId": trigger.id, + "triggerType": trigger.type, + "state": "failed", + "error": f"custom trigger plugin not found: {plugin_id}", + } + continue + module = load_trigger_plugin_module(plugin_spec) + if module is None: + self._custom_status[key] = { + "workflowId": workflow_id, + "triggerId": trigger.id, + "triggerType": trigger.type, + "state": "failed", + "error": "failed to load custom trigger plugin module", + } + continue + try: + adapter = None + if hasattr(module, "create_trigger_adapter"): + adapter = module.create_trigger_adapter(trigger.model_dump(mode="json")) + elif hasattr(module, "TriggerAdapter"): + adapter = module.TriggerAdapter(trigger.model_dump(mode="json")) + if adapter is None: + raise RuntimeError("plugin must expose create_trigger_adapter() or TriggerAdapter") + except Exception as exc: + self._custom_status[key] = { + "workflowId": workflow_id, + "triggerId": trigger.id, + "triggerType": trigger.type, + "state": "failed", + "error": str(exc), + } + continue + + async def _emit(payload: Any, *, _trigger: TriggerDefinition = trigger) -> Dict[str, Any]: + event = payload if isinstance(payload, TriggerEvent) else build_trigger_event( + workflow_id=workflow_id, + trigger=_trigger, + body=payload, + raw=payload, + ) + try: + result = await self.dispatch_event( + workflow_id=workflow_id, + workflow_json=workflow_json, + trigger=_trigger, + event=event, + ) + self._custom_status[key] = { + "workflowId": workflow_id, + "triggerId": _trigger.id, + "triggerType": _trigger.type, + "state": "running", + "error": None, + "lastDeliveryId": event.source.deliveryId, + "lastMatched": result.get("matched"), + } + return result + except TriggerDispatchError as exc: + self._custom_status[key] = { + "workflowId": workflow_id, + "triggerId": _trigger.id, + "triggerType": _trigger.type, + "state": "failed", + "error": str(exc), + } + raise + + async def _runner() -> None: + self._custom_status[key] = { + "workflowId": workflow_id, + "triggerId": trigger.id, + "triggerType": trigger.type, + "state": "running", + "error": None, + "pluginId": plugin_id, + } + try: + result = adapter.start(trigger.model_dump(mode="json"), _emit) + if asyncio.iscoroutine(result): + await result + except asyncio.CancelledError: + raise + except Exception as exc: + self._custom_status[key] = { + "workflowId": workflow_id, + "triggerId": trigger.id, + "triggerType": trigger.type, + "state": "failed", + "error": str(exc), + "pluginId": plugin_id, + } + + self._custom_adapters[key] = adapter + self._custom_adapter_tasks[key] = asyncio.create_task( + _runner(), + name=f"trigger-custom-{workflow_id}-{trigger.id}", + ) + + async def get_trigger_status(self, workflow_id: str, trigger: TriggerDefinition) -> Dict[str, Any]: + if trigger.type == "syslog": + from flocks.ingest.syslog.manager import default_manager as syslog_manager + + status = syslog_manager.get_listener_status(workflow_id) + return {"workflowId": workflow_id, "triggerId": trigger.id, "triggerType": trigger.type, **status} + if trigger.type == "kafka": + from flocks.ingest.kafka.manager import default_manager as kafka_manager + + status = kafka_manager.get_consumer_status(workflow_id) + return {"workflowId": workflow_id, "triggerId": trigger.id, "triggerType": trigger.type, **status} + if trigger.type == "schedule": + from flocks.workflow.poller_manager import default_manager as poller_manager + + status = poller_manager.get_status(workflow_id) + return {"workflowId": workflow_id, "triggerId": trigger.id, "triggerType": trigger.type, **status} + if trigger.type in {"webhook", "custom_webhook"}: + return { + "workflowId": workflow_id, + "triggerId": trigger.id, + "triggerType": trigger.type, + "state": "ready" if trigger.enabled else "stopped", + "error": None, + "path": (trigger.source or {}).get("path"), + "method": (trigger.source or {}).get("method", "POST"), + } + if trigger.type == "custom_adapter": + return self._custom_status.get( + (workflow_id, trigger.id or ""), + { + "workflowId": workflow_id, + "triggerId": trigger.id, + "triggerType": trigger.type, + "state": "stopped", + "error": None, + }, + ) + return { + "workflowId": workflow_id, + "triggerId": trigger.id, + "triggerType": trigger.type, + "state": "ready" if trigger.enabled else "stopped", + "error": None, + } + + async def get_workflow_trigger_statuses( + self, + workflow_id: str, + workflow_json: Dict[str, Any], + ) -> List[Dict[str, Any]]: + triggers = workflow_trigger_definitions_from_json(workflow_json) + return [await self.get_trigger_status(workflow_id, trigger) for trigger in triggers] + + def list_plugin_specs(self) -> List[Dict[str, Any]]: + return list_trigger_plugins() + + +default_runtime = TriggerRuntime() + diff --git a/tests/server/routes/test_workflow_trigger_routes.py b/tests/server/routes/test_workflow_trigger_routes.py new file mode 100644 index 000000000..e84dc775f --- /dev/null +++ b/tests/server/routes/test_workflow_trigger_routes.py @@ -0,0 +1,345 @@ +from __future__ import annotations + +from types import SimpleNamespace +from typing import Any + +import pytest +from httpx import AsyncClient + +from flocks.server.routes import workflow as workflow_routes + + +@pytest.mark.asyncio +async def test_list_workflow_triggers_returns_unified_status( + client: AsyncClient, + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.setattr( + workflow_routes, + "_read_workflow_from_fs", + lambda workflow_id: { + "id": workflow_id, + "workflowJson": { + "start": "n1", + "nodes": [{"id": "n1", "type": "python", "code": "result = {'ok': True}"}], + "edges": [], + "triggers": [ + { + "id": "schedule-default", + "type": "schedule", + "enabled": True, + "source": {"intervalSeconds": 60}, + } + ], + }, + } if workflow_id == "wf-1" else None, + ) + + async def _fake_statuses(_workflow_id: str, _workflow_json: dict[str, Any]) -> list[dict[str, Any]]: + return [ + { + "workflowId": "wf-1", + "triggerId": "schedule-default", + "triggerType": "schedule", + "state": "running", + } + ] + + monkeypatch.setattr( + workflow_routes, + "default_trigger_runtime", + SimpleNamespace(get_workflow_trigger_statuses=_fake_statuses), + ) + + response = await client.get("/api/workflow/wf-1/triggers") + + assert response.status_code == 200, response.text + body = response.json() + assert body[0]["trigger"]["id"] == "schedule-default" + assert body[0]["status"]["state"] == "running" + + +@pytest.mark.asyncio +async def test_list_workflow_triggers_respects_explicit_empty_trigger_list( + client: AsyncClient, + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.setattr( + workflow_routes, + "_read_workflow_from_fs", + lambda workflow_id: { + "id": workflow_id, + "workflowJson": { + "start": "n1", + "nodes": [{"id": "n1", "type": "python", "code": "result = {'ok': True}"}], + "edges": [], + "triggers": [], + }, + } if workflow_id == "wf-1" else None, + ) + + async def _fake_legacy_triggers(_workflow_id: str) -> list[Any]: + return [ + workflow_routes.TriggerDefinition.model_validate( + { + "id": "schedule-default", + "type": "schedule", + "enabled": True, + "source": {"intervalSeconds": 30}, + } + ) + ] + + async def _fake_statuses(_workflow_id: str, _workflow_json: dict[str, Any]) -> list[dict[str, Any]]: + return [] + + monkeypatch.setattr(workflow_routes, "_read_legacy_trigger_defs", _fake_legacy_triggers) + monkeypatch.setattr( + workflow_routes, + "default_trigger_runtime", + SimpleNamespace(get_workflow_trigger_statuses=_fake_statuses), + ) + + response = await client.get("/api/workflow/wf-1/triggers") + + assert response.status_code == 200, response.text + assert response.json() == [] + + +@pytest.mark.asyncio +async def test_preview_trigger_mapping_returns_mapped_inputs( + client: AsyncClient, + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.setattr( + workflow_routes, + "_read_workflow_from_fs", + lambda workflow_id: { + "id": workflow_id, + "workflowJson": { + "start": "n1", + "nodes": [{"id": "n1", "type": "python", "code": "result = {'ok': True}"}], + "edges": [], + "triggers": [ + { + "id": "hook-default", + "type": "custom_webhook", + "enabled": True, + "mapping": {"alert_data": "$.body.data[0]"}, + "filter": {"expr": "body.data[0].severity == 'high'"}, + } + ], + }, + }, + ) + + response = await client.post( + "/api/workflow/wf-1/triggers/hook-default/preview-mapping", + json={"body": {"data": [{"severity": "high"}]}}, + ) + + assert response.status_code == 200, response.text + body = response.json() + assert body["matched"] is True + assert body["inputs"]["alert_data"] == {"severity": "high"} + assert body["inputs"]["_flocks"]["trigger"]["id"] == "hook-default" + + +@pytest.mark.asyncio +async def test_create_workflow_trigger_persists_and_restarts_runtime( + client: AsyncClient, + monkeypatch: pytest.MonkeyPatch, +) -> None: + stored_payloads: list[dict[str, Any]] = [] + + base_workflow = { + "id": "wf-1", + "name": "demo", + "workflowJson": { + "start": "n1", + "nodes": [{"id": "n1", "type": "python", "code": "result = {'ok': True}"}], + "edges": [], + }, + } + + monkeypatch.setattr( + workflow_routes, + "_read_workflow_from_fs", + lambda workflow_id: base_workflow if workflow_id == "wf-1" else None, + ) + + async def _fake_persist(workflow_id: str, workflow_data: dict[str, Any], triggers: list[Any]) -> dict[str, Any]: + stored_payloads.append( + { + "workflow_id": workflow_id, + "trigger_ids": [trigger.id for trigger in triggers], + } + ) + return { + **workflow_data, + "workflowJson": { + **workflow_data["workflowJson"], + "triggers": [trigger.model_dump(mode="json") for trigger in triggers], + }, + } + + runtime_calls: list[str] = [] + + async def _fake_restart(workflow_id: str, workflow_json: dict[str, Any]) -> dict[str, Any]: + runtime_calls.append(f"restart:{workflow_id}:{len(workflow_json.get('triggers', []))}") + return {} + + async def _fake_status(workflow_id: str, trigger: Any) -> dict[str, Any]: + return {"workflowId": workflow_id, "triggerId": trigger.id, "state": "ready"} + + monkeypatch.setattr(workflow_routes, "_persist_workflow_triggers", _fake_persist) + monkeypatch.setattr( + workflow_routes, + "default_trigger_runtime", + SimpleNamespace(restart_workflow=_fake_restart, get_trigger_status=_fake_status), + ) + + response = await client.post( + "/api/workflow/wf-1/triggers", + json={ + "id": "hook-default", + "type": "custom_webhook", + "enabled": True, + "source": {"path": "/alerts/demo", "method": "POST"}, + "mapping": {"payload": "$.body"}, + }, + ) + + assert response.status_code == 200, response.text + assert stored_payloads[0]["trigger_ids"] == ["hook-default"] + assert runtime_calls == ["restart:wf-1:1"] + assert response.json()["status"]["state"] == "ready" + + +@pytest.mark.asyncio +async def test_delete_workflow_trigger_removes_definition_and_restarts_runtime( + client: AsyncClient, + monkeypatch: pytest.MonkeyPatch, +) -> None: + stored_payloads: list[dict[str, Any]] = [] + + base_workflow = { + "id": "wf-1", + "name": "demo", + "workflowJson": { + "start": "n1", + "nodes": [{"id": "n1", "type": "python", "code": "result = {'ok': True}"}], + "edges": [], + "triggers": [ + { + "id": "hook-default", + "type": "custom_webhook", + "enabled": True, + "source": {"path": "/alerts/demo", "method": "POST"}, + "mapping": {"payload": "$.body"}, + } + ], + }, + } + + monkeypatch.setattr( + workflow_routes, + "_read_workflow_from_fs", + lambda workflow_id: base_workflow if workflow_id == "wf-1" else None, + ) + + async def _fake_persist(workflow_id: str, workflow_data: dict[str, Any], triggers: list[Any]) -> dict[str, Any]: + stored_payloads.append( + { + "workflow_id": workflow_id, + "trigger_ids": [trigger.id for trigger in triggers], + } + ) + return { + **workflow_data, + "workflowJson": { + **workflow_data["workflowJson"], + "triggers": [trigger.model_dump(mode="json") for trigger in triggers], + }, + } + + runtime_calls: list[str] = [] + + async def _fake_restart(workflow_id: str, workflow_json: dict[str, Any]) -> dict[str, Any]: + runtime_calls.append(f"restart:{workflow_id}:{len(workflow_json.get('triggers', []))}") + return {} + + async def _fake_remove_legacy(*_args: Any, **_kwargs: Any) -> None: + return None + + monkeypatch.setattr(workflow_routes, "_persist_workflow_triggers", _fake_persist) + monkeypatch.setattr(workflow_routes, "_remove_legacy_trigger_state", _fake_remove_legacy) + monkeypatch.setattr( + workflow_routes, + "default_trigger_runtime", + SimpleNamespace(restart_workflow=_fake_restart), + ) + + response = await client.delete("/api/workflow/wf-1/triggers/hook-default") + + assert response.status_code == 200, response.text + assert stored_payloads[0]["trigger_ids"] == [] + assert runtime_calls == ["restart:wf-1:0"] + assert response.json() == {"ok": True, "triggerId": "hook-default"} + + +@pytest.mark.asyncio +async def test_webhook_route_authorizes_and_dispatches_trigger( + client: AsyncClient, + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.setattr( + workflow_routes, + "_read_workflow_from_fs", + lambda workflow_id: { + "id": workflow_id, + "workflowJson": { + "start": "n1", + "nodes": [{"id": "n1", "type": "python", "code": "result = {'ok': True}"}], + "edges": [], + "triggers": [ + { + "id": "hook-default", + "type": "custom_webhook", + "enabled": True, + "auth": {"type": "api_key", "apiKey": "demo-secret"}, + "mapping": {"payload": "$.body"}, + "source": {"path": "/webhook/workflows/wf-1/hook-default"}, + } + ], + }, + }, + ) + + async def _fake_dispatch_event(**kwargs: Any) -> dict[str, Any]: + event = kwargs["event"] + return { + "matched": True, + "executed": True, + "inputs": {"payload": event.body}, + "result": {"triggerId": kwargs["trigger"].id}, + } + + monkeypatch.setattr( + workflow_routes, + "default_trigger_runtime", + SimpleNamespace(dispatch_event=_fake_dispatch_event), + ) + + response = await client.post( + "/webhook/workflows/wf-1/hook-default", + headers={"x-api-key": "demo-secret"}, + json={"severity": "high"}, + ) + + assert response.status_code == 200, response.text + body = response.json() + assert body["ok"] is True + assert body["executed"] is True + assert body["inputs"]["payload"] == {"severity": "high"} + diff --git a/tests/workflow/test_trigger_dispatcher.py b/tests/workflow/test_trigger_dispatcher.py new file mode 100644 index 000000000..6f887965a --- /dev/null +++ b/tests/workflow/test_trigger_dispatcher.py @@ -0,0 +1,99 @@ +from __future__ import annotations + +import pytest + +from flocks.workflow.triggers.dispatcher import ( + EventDispatcher, + build_trigger_event, + evaluate_trigger_filter, + lookup_mapping_path, + preview_trigger_mapping, +) +from flocks.workflow.triggers.models import TriggerDefinition + + +def test_lookup_mapping_path_supports_nested_access() -> None: + payload = { + "body": { + "data": [ + {"severity": "high", "source": {"ip": "1.1.1.1"}}, + ] + } + } + + assert lookup_mapping_path(payload, "$.body.data[0].severity") == "high" + assert lookup_mapping_path(payload, "$.body.data[0].source.ip") == "1.1.1.1" + assert lookup_mapping_path(payload, "$.body.data[1]") is None + + +def test_preview_trigger_mapping_builds_flocks_envelope() -> None: + trigger = TriggerDefinition.model_validate( + { + "id": "custom-webhook", + "type": "custom_webhook", + "mapping": { + "alert_data": "$.body.data[0]", + }, + "inputs": {"static_value": 7}, + } + ) + event = build_trigger_event( + workflow_id="wf-1", + trigger=trigger, + body={"data": [{"severity": "high"}]}, + ) + + mapped = preview_trigger_mapping(trigger, event) + + assert mapped["static_value"] == 7 + assert mapped["alert_data"] == {"severity": "high"} + assert mapped["_flocks"]["trigger"]["id"] == "custom-webhook" + assert mapped["_flocks"]["trigger"]["type"] == "custom_webhook" + + +def test_trigger_filter_expression_matches_expected_payload() -> None: + trigger = TriggerDefinition.model_validate( + { + "id": "high-only", + "type": "custom_webhook", + "filter": {"expr": "body.data[0].severity in ['high', 'critical']"}, + } + ) + event = build_trigger_event( + workflow_id="wf-1", + trigger=trigger, + body={"data": [{"severity": "high"}]}, + ) + + matched, error = evaluate_trigger_filter(trigger, event) + + assert matched is True + assert error is None + + +@pytest.mark.asyncio +async def test_event_dispatcher_skips_execution_when_filter_does_not_match() -> None: + dispatcher = EventDispatcher() + trigger = TriggerDefinition.model_validate( + { + "id": "critical-only", + "type": "custom_webhook", + "filter": {"expr": "body.severity == 'critical'"}, + "mapping": {"severity": "$.body.severity"}, + } + ) + event = build_trigger_event( + workflow_id="wf-1", + trigger=trigger, + body={"severity": "low"}, + ) + + async def _executor(_inputs: dict[str, object]) -> dict[str, bool]: + raise AssertionError("executor must not run when the filter misses") + + result = await dispatcher.dispatch(trigger=trigger, event=event, executor=_executor) + + assert result["matched"] is False + assert result["executed"] is False + assert result["inputs"]["severity"] == "low" + diff --git a/tests/workflow/test_trigger_schedule_cron.py b/tests/workflow/test_trigger_schedule_cron.py new file mode 100644 index 000000000..396a5f0a9 --- /dev/null +++ b/tests/workflow/test_trigger_schedule_cron.py @@ -0,0 +1,35 @@ +from __future__ import annotations + +from flocks.workflow.poller_manager import WorkflowPollerManager + + +def test_poller_config_supports_cron_expression() -> None: + manager = WorkflowPollerManager() + + config = manager._normalize_config( # noqa: SLF001 - focused unit test + "wf-1", + { + "enabled": True, + "cronExpression": "*/5 * * * *", + "timeoutSeconds": 120, + }, + ) + + assert config["enabled"] is True + assert config["cronExpression"] == "*/5 * * * *" + assert config["intervalSeconds"] == 30 + + +def test_poller_next_run_uses_cron_when_present() -> None: + manager = WorkflowPollerManager() + + next_run_at = manager._compute_next_run_at_ms( # noqa: SLF001 - focused unit test + { + "intervalSeconds": 30, + "cronExpression": "*/5 * * * *", + }, + base_ts_s=0, + ) + + assert next_run_at == 300000 + diff --git a/webui/src/api/workflow.ts b/webui/src/api/workflow.ts index 301e368a8..88e01f9f4 100644 --- a/webui/src/api/workflow.ts +++ b/webui/src/api/workflow.ts @@ -65,12 +65,106 @@ export interface WorkflowMetadata { [key: string]: any; } +export type WorkflowTriggerType = + | 'manual' + | 'schedule' + | 'webhook' + | 'syslog' + | 'kafka' + | 'internal_event' + | 'custom_webhook' + | 'custom_adapter' + | 'plugin'; + +export interface WorkflowTriggerAuth { + type?: string; + secretRef?: string; + headerName?: string; + queryParam?: string; + apiKey?: string; + [key: string]: any; +} + +export interface WorkflowTriggerFilter { + expr?: string; + mode?: string; + path?: string; + equals?: unknown; + [key: string]: any; +} + +export interface WorkflowTriggerConcurrency { + policy?: 'allow' | 'no_overlap' | 'queue' | 'drop_oldest' | 'drop_newest'; + maxParallel?: number; + queueSize?: number; + [key: string]: any; +} + +export interface WorkflowTriggerSample { + name: string; + payload?: unknown; + headers?: Record; + query?: Record; + [key: string]: any; +} + +export interface WorkflowTrigger { + id: string; + name?: string; + type: WorkflowTriggerType; + enabled?: boolean; + description?: string; + source?: Record; + auth?: WorkflowTriggerAuth; + filter?: WorkflowTriggerFilter; + mapping?: Record; + inputs?: Record; + concurrency?: WorkflowTriggerConcurrency; + runtime?: Record; + testSamples?: WorkflowTriggerSample[]; + updatedAt?: number; + [key: string]: any; +} + +export interface WorkflowTriggerStatus { + workflowId?: string; + triggerId?: string; + triggerType?: WorkflowTriggerType | string; + state: string; + error?: string | null; + [key: string]: any; +} + +export interface WorkflowTriggerRecord { + trigger: WorkflowTrigger; + status?: WorkflowTriggerStatus; +} + +export interface WorkflowTriggerPreview { + triggerId: string; + triggerType: string; + matched: boolean; + inputs: Record; + filterError?: string | null; +} + +export interface WorkflowTriggerPlugin { + id: string; + name: string; + description?: string; + root?: string; + manifestPath?: string; + handlerPath?: string; + manifest?: Record; +} + export interface WorkflowJSON { version?: string; name?: string; start: string; nodes: WorkflowNode[]; edges: WorkflowEdge[]; + triggers?: WorkflowTrigger[]; metadata?: WorkflowMetadata; } @@ -120,6 +214,11 @@ export interface WorkflowExecution { duration?: number; executionLog: WorkflowExecutionStep[]; errorMessage?: string; + triggerId?: string; + triggerType?: string; + deliveryId?: string; + attempt?: number; + triggerSource?: string; currentNodeId?: string; currentNodeType?: string; currentPhase?: string; @@ -214,6 +313,7 @@ export interface WorkflowPollerStatus { error?: string | null; enabled?: boolean; intervalSeconds?: number; + cronExpression?: string | null; timeoutSeconds?: number; noOverlap?: boolean; activeRuns?: number; @@ -268,7 +368,7 @@ export const workflowAPI = { validate: (id: string) => client.post<{ valid: boolean; issues: any[] }>(`/api/workflow/${id}/validate`), - getHistory: (id: string, params?: { limit?: number }) => + getHistory: (id: string, params?: { limit?: number; triggerId?: string; triggerType?: string }) => client.get(`/api/workflow/${id}/history`, { params }), getExecution: (workflowId: string, execId: string) => @@ -303,6 +403,44 @@ export const workflowAPI = { listServices: () => client.get('/api/workflow-services'), + getTriggers: (id: string) => + client.get(`/api/workflow/${id}/triggers`), + + createTrigger: (id: string, trigger: WorkflowTrigger) => + client.post<{ trigger: WorkflowTrigger; status?: WorkflowTriggerStatus }>( + `/api/workflow/${id}/triggers`, + trigger, + ), + + updateTrigger: (id: string, triggerId: string, trigger: WorkflowTrigger) => + client.put<{ trigger: WorkflowTrigger; status?: WorkflowTriggerStatus }>( + `/api/workflow/${id}/triggers/${triggerId}`, + trigger, + ), + + deleteTrigger: (id: string, triggerId: string) => + client.delete<{ ok: boolean; triggerId: string }>(`/api/workflow/${id}/triggers/${triggerId}`), + + getTriggerStatus: (id: string, triggerId: string) => + client.get(`/api/workflow/${id}/triggers/${triggerId}/status`), + + previewTriggerMapping: ( + id: string, + triggerId: string, + payload: { body?: unknown; headers?: Record; query?: Record; pathParams?: Record }, + ) => + client.post(`/api/workflow/${id}/triggers/${triggerId}/preview-mapping`, payload), + + testTrigger: ( + id: string, + triggerId: string, + payload: { body?: unknown; headers?: Record; query?: Record; pathParams?: Record }, + ) => + client.post>(`/api/workflow/${id}/triggers/${triggerId}/test`, payload), + + listTriggerPlugins: () => + client.get('/api/workflow-trigger-plugins'), + saveKafkaConfig: (id: string, config: { enabled?: boolean; inputBroker?: string; diff --git a/webui/src/locales/en-US/workflow.json b/webui/src/locales/en-US/workflow.json index 1360e35fa..5278335bc 100644 --- a/webui/src/locales/en-US/workflow.json +++ b/webui/src/locales/en-US/workflow.json @@ -69,7 +69,7 @@ "tabOverview": "Overview", "tabChat": "AI Edit", "tabRun": "Run", - "tabIntegration": "Integration", + "tabIntegration": "Integrations", "renderError": "Component render error", "deleteWorkflow": "Delete Workflow", "deleteConfirmTitle": "Delete Workflow", diff --git a/webui/src/pages/WorkflowCreate/index.tsx b/webui/src/pages/WorkflowCreate/index.tsx index 17d9b7aac..fe320805d 100644 --- a/webui/src/pages/WorkflowCreate/index.tsx +++ b/webui/src/pages/WorkflowCreate/index.tsx @@ -7,7 +7,7 @@ import CreateTopBar from './CreateTopBar'; import CreateRightPanel from './CreateRightPanel'; const PANEL_MIN = 240; -const PANEL_RATIO = 0.30; +const PANEL_RATIO = 0.40; const EMPTY_WORKFLOW_JSON: WorkflowJSON = { start: '', diff --git a/webui/src/pages/WorkflowDetail/FlowCanvas.tsx b/webui/src/pages/WorkflowDetail/FlowCanvas.tsx index 79ac0442f..77ee39300 100644 --- a/webui/src/pages/WorkflowDetail/FlowCanvas.tsx +++ b/webui/src/pages/WorkflowDetail/FlowCanvas.tsx @@ -99,6 +99,78 @@ const TYPE_CONFIG: Record = { accentBg: 'bg-orange-50', dot: 'bg-orange-400', }, + schedule: { + bg: 'bg-white', + border: 'border-sky-400', + text: 'text-sky-600', + handleColor: '!bg-sky-400', + accentBg: 'bg-sky-50', + dot: 'bg-sky-400', + }, + webhook: { + bg: 'bg-white', + border: 'border-cyan-400', + text: 'text-cyan-700', + handleColor: '!bg-cyan-400', + accentBg: 'bg-cyan-50', + dot: 'bg-cyan-400', + }, + custom_webhook: { + bg: 'bg-white', + border: 'border-cyan-400', + text: 'text-cyan-700', + handleColor: '!bg-cyan-400', + accentBg: 'bg-cyan-50', + dot: 'bg-cyan-400', + }, + kafka: { + bg: 'bg-white', + border: 'border-indigo-400', + text: 'text-indigo-700', + handleColor: '!bg-indigo-400', + accentBg: 'bg-indigo-50', + dot: 'bg-indigo-400', + }, + syslog: { + bg: 'bg-white', + border: 'border-lime-400', + text: 'text-lime-700', + handleColor: '!bg-lime-400', + accentBg: 'bg-lime-50', + dot: 'bg-lime-400', + }, + internal_event: { + bg: 'bg-white', + border: 'border-blue-400', + text: 'text-blue-700', + handleColor: '!bg-blue-400', + accentBg: 'bg-blue-50', + dot: 'bg-blue-400', + }, + custom_adapter: { + bg: 'bg-white', + border: 'border-fuchsia-400', + text: 'text-fuchsia-700', + handleColor: '!bg-fuchsia-400', + accentBg: 'bg-fuchsia-50', + dot: 'bg-fuchsia-400', + }, + manual: { + bg: 'bg-white', + border: 'border-slate-400', + text: 'text-slate-700', + handleColor: '!bg-slate-400', + accentBg: 'bg-slate-50', + dot: 'bg-slate-400', + }, + plugin: { + bg: 'bg-white', + border: 'border-fuchsia-400', + text: 'text-fuchsia-700', + handleColor: '!bg-fuchsia-400', + accentBg: 'bg-fuchsia-50', + dot: 'bg-fuchsia-400', + }, }; const TYPE_ICONS: Record = { @@ -110,6 +182,15 @@ const TYPE_ICONS: Record = { llm: , http_request: , subworkflow: , + schedule: , + webhook: , + custom_webhook: , + kafka: , + syslog: , + internal_event: , + custom_adapter: , + manual: , + plugin: , }; const TYPE_LABELS: Record = { @@ -121,6 +202,15 @@ const TYPE_LABELS: Record = { llm: 'LLM', http_request: 'HTTP', subworkflow: 'SubWorkflow', + schedule: 'Schedule', + webhook: 'Webhook', + custom_webhook: 'Custom Webhook', + kafka: 'Kafka', + syslog: 'Syslog', + internal_event: 'Internal Event', + custom_adapter: 'Custom Adapter', + manual: 'Manual', + plugin: 'Plugin', }; // ───────────────────────────────────────────── @@ -132,6 +222,7 @@ interface ViewNodeData { nodeType: string; description?: string; isStart?: boolean; + isTrigger?: boolean; onNodeClick?: (nodeId: string) => void; } @@ -150,7 +241,9 @@ const ViewNode = memo(function ViewNode({ data, selected }: NodeProps) { transition-all duration-150 ${selected ? 'shadow-md ring-2 ring-offset-1 ring-red-300' : 'hover:shadow-md'} `} - onClick={() => d.onNodeClick?.(d.label)} + onClick={() => { + if (!d.isTrigger) d.onNodeClick?.(d.label); + }} > )} + {d.isTrigger && ( + + Trigger + + )} {/* Node ID */} @@ -184,11 +282,13 @@ const ViewNode = memo(function ViewNode({ data, selected }: NodeProps) { {/* Click hint */} -
- - {t('detail.flow.details')} - -
+ {!d.isTrigger && ( +
+ + {t('detail.flow.details')} + +
+ )} 0) { + const totalWidth = workflowTriggers.length * triggerNodeWidth + Math.max(0, workflowTriggers.length - 1) * triggerGap; + const startX = startPosition.x - totalWidth / 2 + triggerNodeWidth / 2; + workflowTriggers.forEach((trigger, idx) => { + const triggerNodeId = `trigger:${trigger.id}`; + nodes.push({ + id: triggerNodeId, + type: 'view', + position: { + x: startX + idx * (triggerNodeWidth + triggerGap), + y: startPosition.y - (NODE_H + V_GAP), + }, + data: { + label: trigger.name || trigger.id, + nodeType: trigger.type, + description: trigger.description || JSON.stringify(trigger.source ?? {}), + isTrigger: true, + }, + }); + edges.push({ + id: `e-${triggerNodeId}-${startId}`, + source: triggerNodeId, + target: startId, + type: 'smoothstep', + animated: Boolean(trigger.enabled), + markerEnd: { type: MarkerType.ArrowClosed, width: 16, height: 16 }, + style: { stroke: '#7dd3fc', strokeWidth: 1.5, strokeDasharray: '5 4' }, + }); + }); + } + return { nodes, edges }; } diff --git a/webui/src/pages/WorkflowDetail/index.tsx b/webui/src/pages/WorkflowDetail/index.tsx index 3a9275e4b..ec556a510 100644 --- a/webui/src/pages/WorkflowDetail/index.tsx +++ b/webui/src/pages/WorkflowDetail/index.tsx @@ -15,7 +15,7 @@ import NodeInfoPanel from './NodeInfoPanel'; type CanvasTab = 'flow' | 'md' | 'json'; const PANEL_MIN = 240; -const PANEL_RATIO = 0.30; // 初始占可用宽度的 30% +const PANEL_RATIO = 0.40; // 初始占可用宽度的 40% function getInitialPanelWidth() { // 可用宽度 = 视口宽度 - 侧边导航栏(lg 以上为 256px) diff --git a/webui/src/pages/WorkflowDetail/tabs/IntegrationTab.test.tsx b/webui/src/pages/WorkflowDetail/tabs/IntegrationTab.test.tsx index 9f8f06545..c61e506d2 100644 --- a/webui/src/pages/WorkflowDetail/tabs/IntegrationTab.test.tsx +++ b/webui/src/pages/WorkflowDetail/tabs/IntegrationTab.test.tsx @@ -8,17 +8,13 @@ const { workflowAPI } = vi.hoisted(() => ({ getService: vi.fn(), publish: vi.fn(), unpublish: vi.fn(), - getKafkaConfig: vi.fn(), - saveKafkaConfig: vi.fn(), - getKafkaStatus: vi.fn(), - getPollerConfig: vi.fn(), - savePollerConfig: vi.fn(), - getPollerStatus: vi.fn(), - runPollerOnce: vi.fn(), - getSampleInputs: vi.fn(), - getSyslogConfig: vi.fn(), - saveSyslogConfig: vi.fn(), - getSyslogStatus: vi.fn(), + getTriggers: vi.fn(), + createTrigger: vi.fn(), + updateTrigger: vi.fn(), + deleteTrigger: vi.fn(), + previewTriggerMapping: vi.fn(), + testTrigger: vi.fn(), + listTriggerPlugins: vi.fn(), }, })); @@ -43,59 +39,18 @@ vi.mock('react-i18next', () => ({ t: (key: string) => { const translations: Record = { 'detail.run.publishSection': '发布为 API', - 'detail.run.publishDesc': 'desc', + 'detail.run.publishDesc': 'publish desc', 'detail.run.publishAsApi': '发布为 API 服务', - 'detail.run.serviceDriver': '运行方式', + 'detail.run.publishFailed': '发布失败', + 'detail.run.stopFailed': '停止失败', + 'detail.run.stopping': '停止中...', + 'detail.run.stopService': '停止服务', 'detail.run.driverLocal': '本地进程', 'detail.run.driverDocker': 'Docker 容器', - 'detail.run.recommended': '推荐', 'detail.run.driverLocalDesc': 'local desc', 'detail.run.driverDockerDesc': 'docker desc', - 'detail.run.kafkaSection': 'Kafka 配置', - 'detail.run.kafkaExperimental': '实验性', - 'detail.run.kafkaEnabled': '启用消费', - 'detail.run.kafkaInputKey': 'Inputs 键名', - 'detail.run.kafkaInputs': '额外 Inputs JSON', - 'detail.run.kafkaInputsHint': 'kafka inputs hint', - 'detail.run.kafkaInputsJsonError': 'Kafka Inputs 必须是合法的 JSON 对象', - 'detail.run.inputConfig': '输入配置', - 'detail.run.savingConfig': '保存中', - 'detail.run.savedConfig': '已保存', - 'detail.run.saveConfig': '保存配置', - 'detail.run.kafkaHint': 'hint', - 'detail.run.pollerSection': 'Workflow Poller', - 'detail.run.pollerEnabled': '启用轮询服务', - 'detail.run.pollerNoOverlap': '禁止重叠执行', - 'detail.run.pollerInterval': '轮询间隔(秒)', - 'detail.run.pollerTimeout': '执行超时(秒)', - 'detail.run.pollerInputs': 'Inputs JSON', - 'detail.run.pollerInputsJsonError': 'Inputs 必须是合法的 JSON 对象', - 'detail.run.pollerInputsHint': 'poller inputs hint', - 'detail.run.pollerRunOnce': '立即执行一轮', - 'detail.run.pollerRunningOnce': '执行中...', - 'detail.run.pollerRunOnceFailed': '立即执行失败', - 'detail.run.pollerStatus': '轮询状态', - 'detail.run.pollerRunning': '运行中', - 'detail.run.pollerEnabledIdle': '已启用,等待下一轮', - 'detail.run.pollerFailed': '轮询器异常', - 'detail.run.pollerLastRunAt': '上次执行', - 'detail.run.pollerNextRunAt': '下次执行', - 'detail.run.pollerLastStatus': '最近结果', - 'detail.run.pollerLastDuration': '最近耗时', - 'detail.run.pollerSelectedCount': '本轮选中数量', - 'detail.run.pollerActiveRuns': '活跃执行数', - 'detail.run.pollerProcessedMarkCount': 'processed 总数', - 'detail.run.pollerChannelStatus': '通道通知状态', - 'detail.run.pollerHint': 'poller hint', - 'detail.run.syslogSection': 'Syslog', - 'detail.run.syslogExperimental': '实验性', - 'detail.run.syslogEnabled': '启用监听', - 'detail.run.syslogProtocol': '协议', - 'detail.run.syslogHost': '监听地址', - 'detail.run.syslogPort': '端口', - 'detail.run.syslogFormat': '解析格式', - 'detail.run.syslogInputKey': 'Inputs 键名', - 'detail.run.syslogHint': 'syslog hint', + 'detail.run.apiKeyHide': '隐藏', + 'detail.run.apiKeyShow': '显示', }; return translations[key] ?? key; }, @@ -110,6 +65,7 @@ const workflow = { status: 'draft' as const, createdAt: Date.now(), updatedAt: Date.now(), + markdownContent: '', stats: { callCount: 0, successCount: 0, @@ -121,220 +77,202 @@ const workflow = { }, }; -describe('IntegrationTab Kafka config', () => { +describe('IntegrationTab trigger workspace', () => { beforeEach(() => { vi.clearAllMocks(); + vi.stubGlobal('confirm', vi.fn(() => true)); workflowAPI.getService.mockResolvedValue({ data: null }); - workflowAPI.getKafkaConfig.mockResolvedValue({ data: null }); - workflowAPI.getKafkaStatus.mockResolvedValue({ data: { state: 'stopped', error: null } }); - workflowAPI.saveKafkaConfig.mockResolvedValue({ data: { ok: true, consumer: { state: 'stopped', error: null } } }); - workflowAPI.getPollerConfig.mockResolvedValue({ data: null }); - workflowAPI.getPollerStatus.mockResolvedValue({ data: { state: 'stopped', error: null } }); - workflowAPI.savePollerConfig.mockResolvedValue({ data: { ok: true, status: { state: 'running', lastStatus: null } } }); - workflowAPI.runPollerOnce.mockResolvedValue({ data: { ok: true, status: { state: 'stopped', lastStatus: 'success' } } }); - workflowAPI.getSampleInputs.mockResolvedValue({ data: { sampleInputs: {} } }); - workflowAPI.getSyslogConfig.mockResolvedValue({ data: null }); - workflowAPI.getSyslogStatus.mockResolvedValue({ data: { state: 'stopped', error: null } }); + workflowAPI.getTriggers.mockResolvedValue({ data: [] }); + workflowAPI.createTrigger.mockResolvedValue({ data: { trigger: { id: 'hook-created' } } }); + workflowAPI.updateTrigger.mockImplementation(async (_workflowId: string, _triggerId: string, trigger: unknown) => ({ + data: { trigger }, + })); + workflowAPI.deleteTrigger.mockResolvedValue({ data: { ok: true, triggerId: 'hook-1' } }); + workflowAPI.previewTriggerMapping.mockResolvedValue({ + data: { + triggerId: 'hook-1', + triggerType: 'custom_webhook', + matched: true, + inputs: { event: { ok: true } }, + }, + }); + workflowAPI.testTrigger.mockResolvedValue({ + data: { + ok: true, + inputs: { event: { ok: true } }, + }, + }); + workflowAPI.listTriggerPlugins.mockResolvedValue({ data: [] }); }); - it('does not show experimental badges for Kafka and Syslog sections', () => { + it('renders publish section first and unified trigger workspace below', async () => { render(); - expect(screen.queryByText('实验性')).not.toBeInTheDocument(); + expect(await screen.findByText('发布为 API')).toBeInTheDocument(); + expect(await screen.findByText('集成')).toBeInTheDocument(); + expect(screen.queryByText('Kafka 配置')).not.toBeInTheDocument(); + expect(screen.queryByText('Workflow Poller')).not.toBeInTheDocument(); }); - it('saves Kafka consumer config without output fields', async () => { - const user = userEvent.setup(); + it('shows only one empty-state box when there is no trigger', async () => { render(); - await user.click(await screen.findByRole('button', { name: /Kafka 配置/ })); - await user.type(screen.getByPlaceholderText('localhost:9092'), 'localhost:9092'); - await user.type(screen.getByPlaceholderText('workflow-input'), 'workflow-input'); - await user.click(screen.getByLabelText('启用消费')); - await user.click(screen.getByRole('button', { name: '保存配置' })); - - await waitFor(() => { - expect(workflowAPI.saveKafkaConfig).toHaveBeenCalledWith('wf-1', { - enabled: true, - inputBroker: 'localhost:9092', - inputTopic: 'workflow-input', - inputGroupId: '', - inputKey: 'kafka_message', - inputs: {}, - }); - }); - expect(screen.queryByText('输出配置')).not.toBeInTheDocument(); - expect(screen.queryByLabelText('启用输出')).not.toBeInTheDocument(); + expect(await screen.findByText('还没有配置任何 Trigger。可以从上面的快捷按钮开始。')).toBeInTheDocument(); + expect(screen.queryByText('选择或创建一个 Trigger 后,在这里编辑配置。')).not.toBeInTheDocument(); + expect(screen.getByRole('button', { name: 'Schedule' })).toBeEnabled(); + expect(screen.getByRole('button', { name: 'Webhook' })).toBeEnabled(); + expect(screen.getByRole('button', { name: 'Syslog' })).toBeEnabled(); + expect(screen.getByRole('button', { name: 'Kafka' })).toBeEnabled(); + expect(screen.queryByRole('button', { name: /自定义 Trigger/ })).not.toBeInTheDocument(); }); - it('prefills kafka extra inputs from sample inputs without kafka raw payload keys', async () => { - workflowAPI.getSampleInputs.mockResolvedValue({ - data: { - sampleInputs: { - _comment: 'ignore me', - kafka_message: { id: 1 }, - source: 'demo', - kafka_output_enabled: true, + it('renders trigger list in the unified workspace', async () => { + workflowAPI.getTriggers.mockResolvedValue({ + data: [ + { + trigger: { + id: 'schedule-1', + name: 'Daily Scan', + type: 'schedule', + enabled: true, + source: { intervalSeconds: 60 }, + mapping: {}, + inputs: {}, + testSamples: [{ name: 'default', payload: {} }], + }, + status: { state: 'running' }, }, - }, + ], }); render(); - await userEvent.setup().click(await screen.findByRole('button', { name: /Kafka 配置/ })); - const textarea = await screen.findByLabelText('额外 Inputs JSON'); - expect(textarea).toHaveValue(`{ - "source": "demo", - "kafka_output_enabled": true -}`); + expect((await screen.findAllByText('Daily Scan')).length).toBeGreaterThan(0); + expect(screen.getByText('Inputs(JSON)')).toBeInTheDocument(); + expect(screen.queryByText('描述')).not.toBeInTheDocument(); + expect(screen.queryByText('字段映射(JSON)')).not.toBeInTheDocument(); + expect(screen.queryByText('测试事件(JSON)')).not.toBeInTheDocument(); + expect(screen.queryByText('过滤条件')).not.toBeInTheDocument(); }); - it('blocks saving kafka config when extra inputs json is invalid', async () => { - const user = userEvent.setup(); - render(); - - await user.click(await screen.findByRole('button', { name: /Kafka 配置/ })); - const textarea = screen.getByLabelText('额外 Inputs JSON'); - fireEvent.change(textarea, { target: { value: '{"broken": ' } }); - await user.click(screen.getByRole('button', { name: '保存配置' })); - - expect(await screen.findByText('Kafka Inputs 必须是合法的 JSON 对象')).toBeInTheDocument(); - expect(workflowAPI.saveKafkaConfig).not.toHaveBeenCalled(); - }); - - it('strips execution-only comment keys before saving kafka extra inputs', async () => { - const user = userEvent.setup(); - render(); - - await user.click(await screen.findByRole('button', { name: /Kafka 配置/ })); - const textarea = screen.getByLabelText('额外 Inputs JSON'); - fireEvent.change(textarea, { - target: { - value: `{ - "_comment": "remove me", - "kafka_output_enabled": true, - "nested": { - "_comment_nested": "remove too", - "topic": "topic_soc_flocks_result_log" - } -}`, - }, - }); - await user.click(screen.getByRole('button', { name: '保存配置' })); - - await waitFor(() => { - expect(workflowAPI.saveKafkaConfig).toHaveBeenCalledWith('wf-1', { - enabled: false, - inputBroker: '', - inputTopic: '', - inputGroupId: '', - inputKey: 'kafka_message', - inputs: { - kafka_output_enabled: true, - nested: { - topic: 'topic_soc_flocks_result_log', + it('does not render duplicated trigger card when only one trigger exists', async () => { + workflowAPI.getTriggers.mockResolvedValue({ + data: [ + { + trigger: { + id: 'kafka-1', + name: 'Kafka Trigger', + type: 'kafka', + enabled: false, + source: { + inputBroker: 'localhost:9092', + inputTopic: 'wf-1.events', + inputGroupId: 'wf-1-group', + }, + mapping: {}, + inputs: {}, + testSamples: [], }, + status: { state: 'stopped' }, }, - }); - }); - }); - - it('renders poller status badge when runtime is running', async () => { - workflowAPI.getPollerStatus.mockResolvedValue({ - data: { - state: 'running', - lastStatus: 'success', - selectedCount: 12, - activeRuns: 1, - }, + ], }); render(); - await userEvent.setup().click(await screen.findByRole('button', { name: /Workflow Poller/ })); - expect(await screen.findByText('运行中')).toBeInTheDocument(); - expect(screen.getByText(/本轮选中数量: 12/)).toBeInTheDocument(); + expect(await screen.findByText('Kafka Trigger')).toBeInTheDocument(); + expect(screen.getAllByRole('button', { name: '删除' })).toHaveLength(1); }); - it('saves poller config from the integration tab', async () => { + it('creates a webhook trigger from the unified toolbar', async () => { const user = userEvent.setup(); - workflowAPI.getSampleInputs.mockResolvedValue({ - data: { - sampleInputs: { - _comment: 'for display only', - _comment_dispose: 'dispose note', - severity: 'high', - notify: true, - }, - }, - }); + render(); - await user.click(await screen.findByRole('button', { name: /Workflow Poller/ })); - await user.click(screen.getByLabelText('启用轮询服务')); - const intervalInput = screen.getByLabelText('轮询间隔(秒)'); - await user.clear(intervalInput); - await user.type(intervalInput, '45'); - await user.click(screen.getByRole('button', { name: '保存配置' })); + await user.click(await screen.findByRole('button', { name: 'Webhook' })); await waitFor(() => { - expect(workflowAPI.savePollerConfig).toHaveBeenCalledWith('wf-1', { - enabled: true, - intervalSeconds: 45, - timeoutSeconds: 7200, - noOverlap: true, - inputs: { - severity: 'high', - notify: true, - }, - }); + expect(workflowAPI.createTrigger).toHaveBeenCalledWith( + 'wf-1', + expect.objectContaining({ + type: 'custom_webhook', + name: 'Webhook Trigger', + enabled: false, + }), + ); }); }); - it('prefills poller inputs from current workflow sample inputs', async () => { - workflowAPI.getSampleInputs.mockResolvedValue({ - data: { - sampleInputs: { - _comment: 'ignore me', - _comment_cache: 'cache note', - eventType: 'alert', - source: 'demo', + it('saves edited schedule trigger through the unified editor', async () => { + const user = userEvent.setup(); + workflowAPI.getTriggers.mockResolvedValue({ + data: [ + { + trigger: { + id: 'schedule-1', + name: 'Daily Scan', + type: 'schedule', + enabled: true, + source: { mode: 'interval', intervalSeconds: 60 }, + runtime: { timeoutSeconds: 7200, noOverlap: true }, + mapping: {}, + inputs: {}, + testSamples: [{ name: 'default', payload: {} }], + }, + status: { state: 'running' }, }, - }, + ], }); render(); - await userEvent.setup().click(await screen.findByRole('button', { name: /Workflow Poller/ })); - const textarea = await screen.findByLabelText('Inputs JSON'); - expect(textarea).toHaveValue(`{ - "eventType": "alert", - "source": "demo" -}`); - }); - - it('blocks saving poller config when inputs json is invalid', async () => { - const user = userEvent.setup(); - render(); - - await user.click(await screen.findByRole('button', { name: /Workflow Poller/ })); - const textarea = screen.getByLabelText('Inputs JSON'); - fireEvent.change(textarea, { target: { value: '{"broken": ' } }); - await user.click(screen.getByRole('button', { name: '保存配置' })); + const nameInput = await screen.findByDisplayValue('Daily Scan'); + fireEvent.change(nameInput, { target: { value: 'Updated Scan' } }); + await waitFor(() => { + expect(nameInput).toHaveValue('Updated Scan'); + }); + await user.click(screen.getByRole('button', { name: '保存' })); - expect(await screen.findByText('Inputs 必须是合法的 JSON 对象')).toBeInTheDocument(); - expect(workflowAPI.savePollerConfig).not.toHaveBeenCalled(); + await waitFor(() => { + expect(workflowAPI.updateTrigger).toHaveBeenCalledWith( + 'wf-1', + 'schedule-1', + expect.objectContaining({ + id: 'schedule-1', + type: 'schedule', + name: 'Updated Scan', + }), + ); + }); }); - it('runs poller once from the integration tab', async () => { + it('deletes selected trigger from the workspace', async () => { const user = userEvent.setup(); + workflowAPI.getTriggers.mockResolvedValue({ + data: [ + { + trigger: { + id: 'hook-1', + name: 'Webhook Trigger', + type: 'custom_webhook', + enabled: true, + source: { method: 'POST', path: '/demo' }, + auth: { type: 'none' }, + mapping: { event: '$.body' }, + inputs: {}, + testSamples: [{ name: 'default', payload: { example: true } }], + }, + status: { state: 'ready' }, + }, + ], + }); + render(); - await user.click(await screen.findByRole('button', { name: /Workflow Poller/ })); - await user.click(screen.getByRole('button', { name: '立即执行一轮' })); + await user.click(await screen.findByRole('button', { name: '删除' })); await waitFor(() => { - expect(workflowAPI.runPollerOnce).toHaveBeenCalledWith('wf-1'); + expect(workflowAPI.deleteTrigger).toHaveBeenCalledWith('wf-1', 'hook-1'); }); }); }); diff --git a/webui/src/pages/WorkflowDetail/tabs/IntegrationTab.tsx b/webui/src/pages/WorkflowDetail/tabs/IntegrationTab.tsx index 3658d566c..95e5e41ad 100644 --- a/webui/src/pages/WorkflowDetail/tabs/IntegrationTab.tsx +++ b/webui/src/pages/WorkflowDetail/tabs/IntegrationTab.tsx @@ -1,7 +1,23 @@ -import { useState, useEffect, useCallback } from 'react'; import { - Loader2, Globe, StopCircle, Check, ChevronDown, ChevronRight, - AlertCircle, Wifi, Server, + useState, + useEffect, + useCallback, + type InputHTMLAttributes, + type ReactNode, + type SelectHTMLAttributes, + type TextareaHTMLAttributes, +} from 'react'; +import { + AlertCircle, + CalendarClock, + Check, + ChevronDown, + ChevronRight, + Globe, + Loader2, + Server, + Trash2, + Workflow as WorkflowIcon, } from 'lucide-react'; import { useTranslation } from 'react-i18next'; import { @@ -9,9 +25,10 @@ import { Workflow, WorkflowService, WorkflowServiceDriver, - SyslogListenerStatus, - KafkaConsumerStatus, - WorkflowPollerStatus, + WorkflowTrigger, + WorkflowTriggerPlugin, + WorkflowTriggerRecord, + WorkflowTriggerType, } from '@/api/workflow'; import CopyButton from '@/components/common/CopyButton'; import WorkflowStatusBadge from '@/components/common/WorkflowStatusBadge'; @@ -21,9 +38,10 @@ export interface IntegrationTabProps { workflow: Workflow; } -// ───────────────────────────────────────────── -// 共享 SectionHeader -// ───────────────────────────────────────────── +type JsonObject = Record; + +const DEFAULT_JSON_TEXT = JSON.stringify({}, null, 2); + function SectionHeader({ title, expanded, @@ -33,7 +51,7 @@ function SectionHeader({ title: string; expanded: boolean; onToggle: () => void; - badge?: React.ReactNode; + badge?: ReactNode; }) { return (