diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..740d7b3 --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,31 @@ +name: Backend Quality + +on: + push: + branches: + - main + pull_request: + +jobs: + backend-quality: + runs-on: ubuntu-latest + + steps: + - name: Check out repository + uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: "3.12" + + - name: Set up uv + uses: astral-sh/setup-uv@v6 + with: + enable-cache: true + + - name: Install dependencies + run: uv sync --dev + + - name: Run backend quality gate + run: make check diff --git a/.gitignore b/.gitignore index 505a3b1..5edfa31 100644 --- a/.gitignore +++ b/.gitignore @@ -8,3 +8,7 @@ wheels/ # Virtual environments .venv +.coverage +.coverage.* +htmlcov/ +coverage.xml diff --git a/Makefile b/Makefile index a15b859..0985ae7 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,4 @@ -.PHONY: help build-backend build-electron package-dmg clean install-deps +.PHONY: help build-backend build-electron package-dmg clean install-deps lint format format-check test test-cov check # 项目配置 PROJECT_NAME = AgentForge @@ -79,4 +79,26 @@ check-dmg: ls -lh "$(DMG_OUTPUT)"; \ else \ echo "DMG文件不存在"; \ - fi \ No newline at end of file + fi + +lint: + @echo "运行 Ruff lint..." + uv run ruff check . + +format: + @echo "运行 Ruff format..." + uv run ruff format . + +format-check: + @echo "检查代码格式..." + uv run ruff format --check . + +test: + @echo "运行 Python 测试..." + uv run pytest + +test-cov: + @echo "运行 Python 测试并检查覆盖率..." + uv run pytest --cov --cov-report=term-missing + +check: lint format-check test-cov diff --git a/channels/agent_utils.py b/channels/agent_utils.py index ce1e52b..87ef3a3 100644 --- a/channels/agent_utils.py +++ b/channels/agent_utils.py @@ -9,7 +9,7 @@ from __future__ import annotations import re -from typing import Optional, TYPE_CHECKING +from typing import TYPE_CHECKING, Optional if TYPE_CHECKING: from taskboard import TaskDB diff --git a/channels/dir_utils.py b/channels/dir_utils.py index 8b53576..7fea0cf 100644 --- a/channels/dir_utils.py +++ b/channels/dir_utils.py @@ -10,10 +10,10 @@ from __future__ import annotations +import json import os import re -import json -from typing import Optional, TYPE_CHECKING +from typing import TYPE_CHECKING, Optional if TYPE_CHECKING: from taskboard import TaskDB @@ -83,6 +83,7 @@ def extract_working_dir_with_claude(prompt: str) -> Optional[str]: try: import requests # already in pyproject.toml + resp = requests.post( "https://api.anthropic.com/v1/messages", headers={ diff --git a/channels/feishu_channel.py b/channels/feishu_channel.py index 21bd3e0..9e971c6 100644 --- a/channels/feishu_channel.py +++ b/channels/feishu_channel.py @@ -12,9 +12,10 @@ feishu_default_working_dir (working directory for tasks created from bot) """ +import base64 import json import threading -import base64 +from datetime import datetime, timedelta, timezone from pathlib import Path from typing import TYPE_CHECKING, Optional @@ -27,15 +28,16 @@ try: import lark_oapi as lark from lark_oapi.api.im.v1 import ( - CreateMessageRequest, - CreateMessageRequestBody, CreateMessageReactionRequest, CreateMessageReactionRequestBody, + CreateMessageRequest, + CreateMessageRequestBody, + Emoji, GetMessageResourceRequest, ReplyMessageRequest, ReplyMessageRequestBody, - Emoji, ) + FEISHU_AVAILABLE = True except ImportError: FEISHU_AVAILABLE = False @@ -143,6 +145,7 @@ def start(self) -> None: except Exception as e: print(f"[Feishu] ERROR during initialization: {e}") import traceback + traceback.print_exc() def stop(self) -> None: @@ -169,6 +172,7 @@ def _run_ws(self): except Exception as e: print(f"[Feishu] WebSocket error: {e}") import traceback + traceback.print_exc() finally: print("[Feishu] WebSocket thread exiting") @@ -190,8 +194,10 @@ def send(self, msg: OutboundMessage) -> None: print(f"[Feishu] Task {task_id} not found in database") return - is_completed = (msg.type == OutboundMessageType.TASK_COMPLETED) - print(f"[Feishu] Sending notification for task {task_id} ({'completed' if is_completed else 'failed'})") + is_completed = msg.type == OutboundMessageType.TASK_COMPLETED + print( + f"[Feishu] Sending notification for task {task_id} ({'completed' if is_completed else 'failed'})" + ) if is_completed: result_text = (msg.payload.get("result") or task.get("result") or "").strip() @@ -199,7 +205,9 @@ def send(self, msg: OutboundMessage) -> None: result_text = result_text[:10000] + "\n…(truncated)" content = result_text or "Done." else: - error_text = (msg.payload.get("error") or task.get("error") or "Unknown error").strip()[:800] + error_text = (msg.payload.get("error") or task.get("error") or "Unknown error").strip()[ + :800 + ] content = error_text # Try to reply in thread if we have an origin message @@ -251,7 +259,7 @@ def _send_message(self, chat_id: str, content: str) -> Optional[str]: "config": {"wide_screen_mode": True}, "elements": [{"tag": "markdown", "content": content}], } - print(f"[Feishu] Building CreateMessageRequest...") + print("[Feishu] Building CreateMessageRequest...") request = ( CreateMessageRequest.builder() .receive_id_type(receive_id_type) @@ -264,9 +272,11 @@ def _send_message(self, chat_id: str, content: str) -> Optional[str]: ) .build() ) - print(f"[Feishu] Calling im.v1.message.create()...") + print("[Feishu] Calling im.v1.message.create()...") response = self._client.im.v1.message.create(request) - print(f"[Feishu] Response received: success={response.success()}, code={response.code}, msg={response.msg}") + print( + f"[Feishu] Response received: success={response.success()}, code={response.code}, msg={response.msg}" + ) if response.success(): message_id = response.data.message_id print(f"[Feishu] Message sent successfully, message_id: {message_id}") @@ -277,12 +287,15 @@ def _send_message(self, chat_id: str, content: str) -> Optional[str]: except Exception as e: print(f"[Feishu] Error sending message: {e}") import traceback + traceback.print_exc() return None def _reply_message(self, parent_message_id: str, content: str) -> Optional[str]: """Reply to a specific message (thread-style). Returns the sent message_id or None.""" - print(f"[Feishu] _reply_message called, parent_message_id: {parent_message_id}, content length: {len(content)}") + print( + f"[Feishu] _reply_message called, parent_message_id: {parent_message_id}, content length: {len(content)}" + ) if not self._client: print("[Feishu] Client not initialized in _reply_message") return None @@ -303,9 +316,11 @@ def _reply_message(self, parent_message_id: str, content: str) -> Optional[str]: ) .build() ) - print(f"[Feishu] Calling im.v1.message.reply()...") + print("[Feishu] Calling im.v1.message.reply()...") response = self._client.im.v1.message.reply(request) - print(f"[Feishu] Reply response: success={response.success()}, code={response.code}, msg={response.msg}") + print( + f"[Feishu] Reply response: success={response.success()}, code={response.code}, msg={response.msg}" + ) if response.success(): message_id = response.data.message_id print(f"[Feishu] Reply sent successfully, message_id: {message_id}") @@ -316,6 +331,7 @@ def _reply_message(self, parent_message_id: str, content: str) -> Optional[str]: except Exception as e: print(f"[Feishu] Error replying to message: {e}") import traceback + traceback.print_exc() return None @@ -348,8 +364,9 @@ def _get_usage_stats(self) -> str: """Compute current 5-hour block usage via claude-monitor library.""" try: from datetime import datetime, timezone - from claude_monitor.data.analysis import analyze_usage + from claude_monitor.core.plans import Plans + from claude_monitor.data.analysis import analyze_usage from claude_monitor.utils.time_utils import TimezoneHandler data = analyze_usage(hours_back=192, quick_start=False, use_cache=False) @@ -449,7 +466,7 @@ def _extract_forwarded_content(self, message) -> Optional[dict]: "sender_id": content.get("sender_id"), "timestamp": content.get("create_time"), "text": content.get("text", ""), - "images": content.get("images", []) + "images": content.get("images", []), } except json.JSONDecodeError: pass @@ -469,7 +486,9 @@ def _extract_forwarded_content(self, message) -> Optional[dict]: # 如果 lang_body 是列表(简化的 content 格式),直接遍历 # 否则从 lang_body 中获取 content - paragraphs = lang_body if isinstance(lang_body, list) else lang_body.get("content", []) + paragraphs = ( + lang_body if isinstance(lang_body, list) else lang_body.get("content", []) + ) # 遍历内容查找引用 block for para in paragraphs: @@ -485,7 +504,7 @@ def _extract_forwarded_content(self, message) -> Optional[dict]: "sender_name": sender_name, "sender_id": quote_user.get("open_id"), "text": quote_text, - "timestamp": elem.get("create_time") + "timestamp": elem.get("create_time"), } # 嵌套消息(可能是转发) @@ -497,7 +516,7 @@ def _extract_forwarded_content(self, message) -> Optional[dict]: "sender_id": nested.get("sender_id"), "timestamp": nested.get("create_time"), "text": nested.get("text", ""), - "images": nested.get("images", []) + "images": nested.get("images", []), } except (json.JSONDecodeError, AttributeError): @@ -527,8 +546,7 @@ def _format_forwarded_prompt(self, original_content: str, forwarded: dict) -> st parts.append(f"转发自: {sender_name}") if forwarded.get("timestamp"): - from datetime import datetime - ts = datetime.fromtimestamp(forwarded["timestamp"]) + ts = datetime.fromtimestamp(forwarded["timestamp"], tz=timezone(timedelta(hours=8))) parts.append(f"时间: {ts.strftime('%Y-%m-%d %H:%M')}") parts.append("\n--- 转发内容 ---") @@ -578,13 +596,13 @@ def _download_image(self, message_id: str, image_key: str) -> Optional[str]: image_data = response.raw.content # Detect actual image format from magic bytes - if image_data.startswith(b'\xff\xd8\xff'): + if image_data.startswith(b"\xff\xd8\xff"): extension = "jpg" - elif image_data.startswith(b'\x89PNG\r\n\x1a\n'): + elif image_data.startswith(b"\x89PNG\r\n\x1a\n"): extension = "png" - elif image_data.startswith(b'GIF87a') or image_data.startswith(b'GIF89a'): + elif image_data.startswith(b"GIF87a") or image_data.startswith(b"GIF89a"): extension = "gif" - elif image_data.startswith(b'RIFF') and b'WEBP' in image_data[:12]: + elif image_data.startswith(b"RIFF") and b"WEBP" in image_data[:12]: extension = "webp" else: # Default to jpg if unknown @@ -602,6 +620,7 @@ def _download_image(self, message_id: str, image_key: str) -> Optional[str]: except Exception as e: print(f"[Feishu] Error downloading image {image_key}: {e}") import traceback + traceback.print_exc() return None @@ -609,7 +628,7 @@ def _download_image(self, message_id: str, image_key: str) -> Optional[str]: def _on_bot_added(self, data) -> None: """Called when the bot is added to a chat (including P2P when a user follows the bot).""" - print(f"[Feishu] _on_bot_added called") + print("[Feishu] _on_bot_added called") try: event = data.event chat_id = getattr(event, "chat_id", None) @@ -621,6 +640,7 @@ def _on_bot_added(self, data) -> None: except Exception as e: print(f"[Feishu] _on_bot_added error: {e}") import traceback + traceback.print_exc() def _on_message_sync(self, data) -> None: @@ -628,21 +648,22 @@ def _on_message_sync(self, data) -> None: print(f"[Feishu] _on_message_sync called, data type: {type(data)}") try: self._handle_inbound(data) - print(f"[Feishu] Message handled successfully") + print("[Feishu] Message handled successfully") except Exception as e: print(f"[Feishu] Inbound handler error: {e}") import traceback + traceback.print_exc() def _handle_inbound(self, data) -> None: """Parse incoming Feishu message and act on it.""" - print(f"[Feishu] _handle_inbound processing message...") + print("[Feishu] _handle_inbound processing message...") event = data.event message = event.message sender = event.sender if sender.sender_type == "bot": - print(f"[Feishu] Ignoring message from bot (self)") + print("[Feishu] Ignoring message from bot (self)") return # ignore self-messages msg_type = message.message_type @@ -665,9 +686,9 @@ def _handle_inbound(self, data) -> None: else: # Try language-specific keys lang_body = ( - post_body.get("zh_cn") or - post_body.get("en_us") or - next(iter(post_body.values()), {}) + post_body.get("zh_cn") + or post_body.get("en_us") + or next(iter(post_body.values()), {}) ) title_part = lang_body.get("title", "").strip() @@ -703,6 +724,7 @@ def _handle_inbound(self, data) -> None: except (json.JSONDecodeError, AttributeError, StopIteration) as e: print(f"[Feishu] Failed to parse post message: {e}") import traceback + traceback.print_exc() # Don't use raw content as fallback - it's the JSON structure content = "" @@ -733,7 +755,9 @@ def _handle_inbound(self, data) -> None: # ── 检测转发/引用消息 ─────────────────────────────────── forwarded = self._extract_forwarded_content(message) if forwarded: - print(f"[Feishu] Detected forwarded/quoted message from {forwarded.get('sender_name', 'unknown')}") + print( + f"[Feishu] Detected forwarded/quoted message from {forwarded.get('sender_name', 'unknown')}" + ) content = self._format_forwarded_prompt(content, forwarded) # 处理转发中包含的图片(如果有) for img_info in forwarded.get("images", []): @@ -744,11 +768,11 @@ def _handle_inbound(self, data) -> None: image_paths.append(img_path) if not content: - print(f"[Feishu] Empty content after processing, ignoring") + print("[Feishu] Empty content after processing, ignoring") return # Acknowledge with a "get" reaction - print(f"[Feishu] Adding reaction to message...") + print("[Feishu] Adding reaction to message...") self._add_reaction(message.message_id, "OK") sender_id = sender.sender_id.open_id if sender.sender_id else "unknown" @@ -765,6 +789,7 @@ def _handle_inbound(self, data) -> None: # ── command: /dir — switch working directory ────── if content.startswith("/dir ") or content.startswith("/cd "): from channels.dir_utils import handle_dir_command + reply = handle_dir_command(content, "feishu", self.db) if reply: self._send_message(reply_to, reply) @@ -773,6 +798,7 @@ def _handle_inbound(self, data) -> None: # ── command: /agent — switch coding agent ───────── if content.startswith("/agent "): from channels.agent_utils import handle_agent_command + reply = handle_agent_command(content, "feishu", self.db) if reply: self._send_message(reply_to, reply) @@ -786,7 +812,7 @@ def _handle_inbound(self, data) -> None: # ── command: /resume ──────────────── if content.startswith("/resume "): - print(f"[Feishu] Processing /resume command") + print("[Feishu] Processing /resume command") parts = content[8:].strip().split(" ", 1) if len(parts) >= 2 and parts[0].isdigit(): tid = int(parts[0]) @@ -816,20 +842,25 @@ def _handle_inbound(self, data) -> None: print(f"[Feishu] Task {tid} not found or no session_id") else: self._send_message(reply_to, "Usage: `/resume `") - print(f"[Feishu] Invalid /resume syntax") + print("[Feishu] Invalid /resume syntax") return # ── command: /status ─────────────────────────── if content.startswith("/status "): - print(f"[Feishu] Processing /status command") + print("[Feishu] Processing /status command") parts = content[8:].strip().split() if parts and parts[0].isdigit(): tid = int(parts[0]) task = self.db.get_task(tid) if task: s = task["status"] - icon = {"completed": "✅", "failed": "❌", "running": "⏳", - "pending": "🕐", "cancelled": "🚫"}.get(s, "❓") + icon = { + "completed": "✅", + "failed": "❌", + "running": "⏳", + "pending": "🕐", + "cancelled": "🚫", + }.get(s, "❓") self._send_message( reply_to, f"{icon} **Task #{tid}** — {s}\n\n**{task['title']}**", @@ -841,17 +872,26 @@ def _handle_inbound(self, data) -> None: return # ── filter system notifications ─────────────────────────── - if any(keyword in content.lower() for keyword in [ - "notification", "任务完成", "任务失败", "任务状态", - "任务已", "task completed", "task failed", "task status" - ]): - print(f"[Feishu] Ignoring system notification") + if any( + keyword in content.lower() + for keyword in [ + "notification", + "任务完成", + "任务失败", + "任务状态", + "任务已", + "task completed", + "task failed", + "task status", + ] + ): + print("[Feishu] Ignoring system notification") return # ── detect reply in thread → resume task session ───────── # Check parent_id (direct reply) and root_id (any message in thread) - parent_id = getattr(message, 'parent_id', None) or None - root_id = getattr(message, 'root_id', None) or None + parent_id = getattr(message, "parent_id", None) or None + root_id = getattr(message, "root_id", None) or None is_thread_msg = bool(parent_id or root_id) if is_thread_msg: @@ -876,7 +916,9 @@ def _handle_inbound(self, data) -> None: db_task = self.db.get_task_by_feishu_root_msg(msg_id) if db_task: task_id = db_task["id"] - print(f"[Feishu] Recovered task {task_id} from DB via feishu_root_msg_id={msg_id}") + print( + f"[Feishu] Recovered task {task_id} from DB via feishu_root_msg_id={msg_id}" + ) break # Determine which message to reply to in the thread @@ -910,14 +952,15 @@ def _handle_inbound(self, data) -> None: print(f"[Feishu] Task {task_id} cannot be resumed") return else: - print(f"[Feishu] Thread message but no matching task found, creating new task") + print("[Feishu] Thread message but no matching task found, creating new task") else: - print(f"[Feishu] Message is not a reply (no parent_id or root_id)") + print("[Feishu] Message is not a reply (no parent_id or root_id)") # ── default: create a new task from message text ───────── - print(f"[Feishu] Creating new task from message") + print("[Feishu] Creating new task from message") print(f"[Feishu] image_paths to attach: {image_paths}") from channels.dir_utils import resolve_working_dir + working_dir = resolve_working_dir(content, "feishu", self.db) title = content[:60] + ("…" if len(content) > 60 else "") @@ -932,22 +975,26 @@ def _handle_inbound(self, data) -> None: # Detect media type from file extension ext = Path(img_path).suffix.lower() media_type_map = { - ".jpg": "image/jpeg", ".jpeg": "image/jpeg", - ".png": "image/png", ".gif": "image/gif", ".webp": "image/webp" + ".jpg": "image/jpeg", + ".jpeg": "image/jpeg", + ".png": "image/png", + ".gif": "image/gif", + ".webp": "image/webp", } media_type = media_type_map.get(ext, "image/jpeg") - prompt_images.append({ - "name": Path(img_path).name, - "media_type": media_type, - "data": img_data - }) - print(f"[Feishu] Converted {img_path} to prompt_images ({media_type}, {len(img_data)} bytes)") + prompt_images.append( + {"name": Path(img_path).name, "media_type": media_type, "data": img_data} + ) + print( + f"[Feishu] Converted {img_path} to prompt_images ({media_type}, {len(img_data)} bytes)" + ) except Exception as e: print(f"[Feishu] Failed to convert image {img_path} to base64: {e}") - from taskboard import Task, ScheduleType from channels.agent_utils import resolve_agent + from taskboard import ScheduleType, Task + new_task = Task( title=f"[Feishu] {title}", prompt=content, @@ -959,12 +1006,20 @@ def _handle_inbound(self, data) -> None: feishu_root_msg_id=message.message_id, agent=resolve_agent("feishu", self.db), ) - print(f"[Feishu] Task object created with {len(image_paths)} image_paths and {len(prompt_images)} prompt_images") + print( + f"[Feishu] Task object created with {len(image_paths)} image_paths and {len(prompt_images)} prompt_images" + ) task_id = self.scheduler.submit_task(new_task) print(f"[Feishu] Task {task_id} submitted, verifying in DB...") created_task = self.db.get_task(task_id) - print(f"[Feishu] Task {task_id} in DB has image_paths: {created_task.get('image_paths')}, prompt_images: {len(created_task.get('prompt_images', []))} items") - img_info = f" (with {len(image_paths)} image{'s' if len(image_paths) != 1 else ''})" if image_paths else "" + print( + f"[Feishu] Task {task_id} in DB has image_paths: {created_task.get('image_paths')}, prompt_images: {len(created_task.get('prompt_images', []))} items" + ) + img_info = ( + f" (with {len(image_paths)} image{'s' if len(image_paths) != 1 else ''})" + if image_paths + else "" + ) # Reply with a brief running hint and track origin for completion notification self._reply_message(message.message_id, f"Task #{task_id} is running…") @@ -972,5 +1027,7 @@ def _handle_inbound(self, data) -> None: self._task_origin[task_id] = (reply_to, message.message_id, message.message_id) with self._root_msg_lock: self._root_msg_map[message.message_id] = task_id - print(f"[Feishu] Task {task_id} origin tracked: reply_to={reply_to}, root_msg={message.message_id}") + print( + f"[Feishu] Task {task_id} origin tracked: reply_to={reply_to}, root_msg={message.message_id}" + ) print(f"[Feishu] New task {task_id} created and queued with {len(image_paths)} images") diff --git a/channels/slack_channel.py b/channels/slack_channel.py index e1f3ae6..21d3d41 100644 --- a/channels/slack_channel.py +++ b/channels/slack_channel.py @@ -11,7 +11,6 @@ """ import os -import json import threading import traceback from typing import TYPE_CHECKING, Optional @@ -26,13 +25,13 @@ def _require_slack(): try: from slack_sdk import WebClient from slack_sdk.socket_mode import SocketModeClient - from slack_sdk.socket_mode.response import SocketModeResponse from slack_sdk.socket_mode.request import SocketModeRequest + from slack_sdk.socket_mode.response import SocketModeResponse + return WebClient, SocketModeClient, SocketModeResponse, SocketModeRequest except ImportError as e: raise ImportError( - "slack-sdk is required for SlackChannel. " - "Install it with: uv add slack-sdk" + "slack-sdk is required for SlackChannel. Install it with: uv add slack-sdk" ) from e @@ -60,8 +59,14 @@ def _require_slack(): class SlackChannel(Channel): """Slack channel integration using Socket Mode.""" - def __init__(self, bus: MessageBus, db: "TaskDB", scheduler: "TaskScheduler", - bot_token: str = "", app_token: str = ""): + def __init__( + self, + bus: MessageBus, + db: "TaskDB", + scheduler: "TaskScheduler", + bot_token: str = "", + app_token: str = "", + ): super().__init__("slack", bus, db) self.scheduler = scheduler @@ -92,8 +97,10 @@ def __init__(self, bus: MessageBus, db: "TaskDB", scheduler: "TaskScheduler", # Subscribe to outbound bus messages for task notifications bus.subscribe_outbound(self._on_outbound) - print(f"[Slack] Initialized. bot_token={'set' if self.bot_token else 'MISSING'}, " - f"app_token={'set' if self.app_token else 'MISSING'}") + print( + f"[Slack] Initialized. bot_token={'set' if self.bot_token else 'MISSING'}, " + f"app_token={'set' if self.app_token else 'MISSING'}" + ) # ── lifecycle ──────────────────────────────────────────────── @@ -113,7 +120,9 @@ def start(self) -> None: try: resp = self._web_client.auth_test() self._bot_user_id = resp["user_id"] - print(f"[Slack] auth_test OK — bot_user_id={self._bot_user_id}, team={resp.get('team', '?')}, user={resp.get('user', '?')}") + print( + f"[Slack] auth_test OK — bot_user_id={self._bot_user_id}, team={resp.get('team', '?')}, user={resp.get('user', '?')}" + ) except Exception as e: print(f"[Slack] auth_test FAILED: {e}") traceback.print_exc() @@ -150,7 +159,9 @@ def stop(self) -> None: def _handle_socket_request(self, client, req) -> None: from slack_sdk.socket_mode.response import SocketModeResponse - print(f"[Slack] <<< Socket request received: type={req.type}, envelope_id={req.envelope_id[:12]}...") + print( + f"[Slack] <<< Socket request received: type={req.type}, envelope_id={req.envelope_id[:12]}..." + ) # Acknowledge immediately (Slack requires < 3s ack) client.send_socket_mode_response(SocketModeResponse(envelope_id=req.envelope_id)) @@ -164,13 +175,15 @@ def _handle_socket_request(self, client, req) -> None: event = payload.get("event", {}) event_type = event.get("type", "") - print(f"[Slack] Event type={event_type}, " - f"user={event.get('user', '?')}, " - f"channel={event.get('channel', '?')}, " - f"channel_type={event.get('channel_type', '?')}, " - f"bot_id={event.get('bot_id', 'none')}, " - f"subtype={event.get('subtype', 'none')}, " - f"text={repr((event.get('text', '') or '')[:80])}") + print( + f"[Slack] Event type={event_type}, " + f"user={event.get('user', '?')}, " + f"channel={event.get('channel', '?')}, " + f"channel_type={event.get('channel_type', '?')}, " + f"bot_id={event.get('bot_id', 'none')}, " + f"subtype={event.get('subtype', 'none')}, " + f"text={repr((event.get('text', '') or '')[:80])}" + ) # Handle message events try: @@ -190,16 +203,20 @@ def _handle_message_event(self, event: dict) -> None: """Handle DM messages (channel_type == 'im').""" # Only process DMs, not channel messages (those come via app_mention) if event.get("channel_type") != "im": - print(f"[Slack] message event skipped: channel_type={event.get('channel_type')!r} (not 'im')") + print( + f"[Slack] message event skipped: channel_type={event.get('channel_type')!r} (not 'im')" + ) return # Ignore bot messages and message edits if event.get("bot_id") or event.get("subtype"): - print(f"[Slack] message event skipped: bot_id={event.get('bot_id')}, subtype={event.get('subtype')}") + print( + f"[Slack] message event skipped: bot_id={event.get('bot_id')}, subtype={event.get('subtype')}" + ) return user_id = event.get("user", "") if user_id == self._bot_user_id: - print(f"[Slack] message event skipped: message from self (bot)") + print("[Slack] message event skipped: message from self (bot)") return text = event.get("text", "").strip() @@ -213,12 +230,14 @@ def _handle_message_event(self, event: dict) -> None: def _handle_mention_event(self, event: dict) -> None: """Handle @bot mentions in channels.""" if event.get("bot_id") or event.get("subtype"): - print(f"[Slack] mention event skipped: bot_id={event.get('bot_id')}, subtype={event.get('subtype')}") + print( + f"[Slack] mention event skipped: bot_id={event.get('bot_id')}, subtype={event.get('subtype')}" + ) return user_id = event.get("user", "") if user_id == self._bot_user_id: - print(f"[Slack] mention event skipped: mention from self (bot)") + print("[Slack] mention event skipped: mention from self (bot)") return # Strip the mention prefix (<@BOTID> ...) from the text @@ -226,13 +245,15 @@ def _handle_mention_event(self, event: dict) -> None: if self._bot_user_id: mention_prefix = f"<@{self._bot_user_id}>" if text.startswith(mention_prefix): - text = text[len(mention_prefix):].strip() + text = text[len(mention_prefix) :].strip() channel_id = event.get("channel", "") thread_ts = event.get("thread_ts") msg_ts = event.get("ts") - print(f"[Slack] Mention from user={user_id}, channel={channel_id}, text={repr(text[:80])}") + print( + f"[Slack] Mention from user={user_id}, channel={channel_id}, text={repr(text[:80])}" + ) self._handle_user_message(text, channel_id, thread_ts, msg_ts) def _handle_app_home_opened(self, event: dict) -> None: @@ -253,8 +274,9 @@ def _handle_app_home_opened(self, event: dict) -> None: # ── unified message handler ─────────────────────────────────── - def _handle_user_message(self, text: str, channel_id: str, - thread_ts: Optional[str], msg_ts: str) -> None: + def _handle_user_message( + self, text: str, channel_id: str, thread_ts: Optional[str], msg_ts: str + ) -> None: """Handle any user message: commands, resume-by-reply, or create task.""" if not text: self._reply(channel_id, msg_ts, HELP_TEXT) @@ -274,10 +296,12 @@ def _handle_user_message(self, text: str, channel_id: str, self._cmd_resume(args, channel_id, msg_ts) elif cmd in ("/dir", "/cd"): from channels.dir_utils import handle_dir_command + reply = handle_dir_command(text, "slack", self.db) self._reply(channel_id, msg_ts, reply or HELP_TEXT) elif cmd == "/agent": from channels.agent_utils import handle_agent_command + reply = handle_agent_command(text, "slack", self.db) self._reply(channel_id, msg_ts, reply or HELP_TEXT) elif cmd == "/help": @@ -312,16 +336,14 @@ def _handle_user_message(self, text: str, channel_id: str, with self._origin_lock: self._task_origin[task_id] = (channel_id, thread_ts, msg_ts) self._add_reaction(channel_id, msg_ts, "eyes") - self._reply( - channel_id, thread_ts, - ":arrow_forward:" - ) + self._reply(channel_id, thread_ts, ":arrow_forward:") print(f"[Slack] Auto-resuming task {task_id} from thread reply") return else: self._reply( - channel_id, thread_ts, - f":x: Task *#{task_id}* has no saved session to resume." + channel_id, + thread_ts, + f":x: Task *#{task_id}* has no saved session to resume.", ) return @@ -332,11 +354,12 @@ def _handle_user_message(self, text: str, channel_id: str, def _create_task(self, text: str, channel_id: str, thread_ts: str) -> None: """Create a new task from any message.""" - from taskboard import Task, ScheduleType + from taskboard import ScheduleType, Task title = text[:60] + ("…" if len(text) > 60 else "") - from channels.dir_utils import resolve_working_dir from channels.agent_utils import resolve_agent + from channels.dir_utils import resolve_working_dir + task = Task( title=f"[Slack] {title}", prompt=text, @@ -350,8 +373,10 @@ def _create_task(self, text: str, channel_id: str, thread_ts: str) -> None: with self._origin_lock: self._task_origin[task_id] = (channel_id, thread_ts, thread_ts) - print(f"[Slack] Origin set for task #{task_id}: channel={channel_id}, thread_ts={thread_ts}, " - f"self_id={id(self)}, origin_dict_id={id(self._task_origin)}") + print( + f"[Slack] Origin set for task #{task_id}: channel={channel_id}, thread_ts={thread_ts}, " + f"self_id={id(self)}, origin_dict_id={id(self._task_origin)}" + ) # Track thread root ts → task_id so replies in the thread can resume with self._thread_ts_lock: @@ -409,8 +434,9 @@ def _cmd_cancel(self, args: str, channel_id: str, thread_ts: str) -> None: if task["status"] in ("completed", "failed", "cancelled"): self._reply( - channel_id, thread_ts, - f":information_source: Task #{task_id} is already `{task['status']}`." + channel_id, + thread_ts, + f":information_source: Task #{task_id} is already `{task['status']}`.", ) return @@ -431,7 +457,9 @@ def _cmd_resume(self, args: str, channel_id: str, thread_ts: str) -> None: task = self.db.get_task(tid) if not task or not task.get("session_id"): - self._reply(channel_id, thread_ts, f":x: Task #{tid} not found or has no saved session.") + self._reply( + channel_id, thread_ts, f":x: Task #{tid} not found or has no saved session." + ) return self.db.update_task( @@ -457,9 +485,11 @@ def send(self, msg: OutboundMessage) -> None: task_id = msg.task_id with self._origin_lock: origin = self._task_origin.get(task_id) - print(f"[Slack] send() task_id={task_id} (type={type(task_id).__name__}), " - f"origin={origin}, keys={list(self._task_origin.keys())}, " - f"self_id={id(self)}, origin_dict_id={id(self._task_origin)}") + print( + f"[Slack] send() task_id={task_id} (type={type(task_id).__name__}), " + f"origin={origin}, keys={list(self._task_origin.keys())}, " + f"self_id={id(self)}, origin_dict_id={id(self._task_origin)}" + ) # Build notification text if msg.type == OutboundMessageType.TASK_COMPLETED: @@ -488,7 +518,11 @@ def send(self, msg: OutboundMessage) -> None: if dm_ch: channel_id = dm_ch thread_ts = None - status_emoji = ":white_check_mark:" if msg.type == OutboundMessageType.TASK_COMPLETED else ":x:" + status_emoji = ( + ":white_check_mark:" + if msg.type == OutboundMessageType.TASK_COMPLETED + else ":x:" + ) text = f"{status_emoji} *{title}*\n{text}" print(f"[Slack] Falling back to P2P DM with user {dm_user}") else: @@ -497,10 +531,16 @@ def send(self, msg: OutboundMessage) -> None: elif default_channel: channel_id = default_channel thread_ts = None - status_emoji = ":white_check_mark:" if msg.type == OutboundMessageType.TASK_COMPLETED else ":x:" + status_emoji = ( + ":white_check_mark:" + if msg.type == OutboundMessageType.TASK_COMPLETED + else ":x:" + ) text = f"{status_emoji} *{title}*\n{text}" else: - print(f"[Slack] No origin, no known user, no slack_default_channel for task #{task_id}, skipping") + print( + f"[Slack] No origin, no known user, no slack_default_channel for task #{task_id}, skipping" + ) return print(f"[Slack] Sending outbound notification for task #{task_id}: {msg.type.name}") @@ -562,22 +602,26 @@ def _do(): def _reply(self, channel_id: str, thread_ts: Optional[str], text: str) -> None: if not self._web_client: - print(f"[Slack] _reply skipped: no web_client") + print("[Slack] _reply skipped: no web_client") return try: - print(f"[Slack] >>> Sending message to channel={channel_id}, thread={thread_ts}, len={len(text)}") + print( + f"[Slack] >>> Sending message to channel={channel_id}, thread={thread_ts}, len={len(text)}" + ) self._web_client.chat_postMessage( channel=channel_id, thread_ts=thread_ts, text=text, mrkdwn=True, ) - print(f"[Slack] >>> Message sent OK") + print("[Slack] >>> Message sent OK") except Exception as e: print(f"[Slack] >>> chat_postMessage FAILED: {e}") traceback.print_exc() - def _reply_return_ts(self, channel_id: str, thread_ts: Optional[str], text: str) -> Optional[str]: + def _reply_return_ts( + self, channel_id: str, thread_ts: Optional[str], text: str + ) -> Optional[str]: """Send a message and return its ts (for tracking notification threads).""" if not self._web_client: return None diff --git a/channels/telegram_channel.py b/channels/telegram_channel.py index 7f7c9be..d4a1c78 100644 --- a/channels/telegram_channel.py +++ b/channels/telegram_channel.py @@ -22,10 +22,12 @@ import asyncio import os import threading -from typing import Optional, TYPE_CHECKING +from datetime import datetime, timedelta, timezone +from typing import TYPE_CHECKING, Optional try: from telegram import Update + from telegram.constants import ParseMode from telegram.ext import ( Application, CommandHandler, @@ -33,7 +35,7 @@ MessageHandler, filters, ) - from telegram.constants import ParseMode + TELEGRAM_AVAILABLE = True except ImportError: TELEGRAM_AVAILABLE = False @@ -71,8 +73,14 @@ class TelegramChannel(Channel): the synchronous HTTP server. """ - def __init__(self, bus: MessageBus, db: "TaskDB", scheduler: "TaskScheduler", - token: str, allowed_users: Optional[list[int]] = None): + def __init__( + self, + bus: MessageBus, + db: "TaskDB", + scheduler: "TaskScheduler", + token: str, + allowed_users: Optional[list[int]] = None, + ): super().__init__("telegram", bus, db) self.scheduler = scheduler self._token = token @@ -167,9 +175,15 @@ def send(self, msg: OutboundMessage) -> None: else: default_chat_id = self.db.get_setting("telegram_default_chat_id", "") if not default_chat_id: - print(f"[Telegram] No origin and no telegram_default_chat_id configured for task #{task_id}, skipping") + print( + f"[Telegram] No origin and no telegram_default_chat_id configured for task #{task_id}, skipping" + ) return - chat_id = int(default_chat_id) if str(default_chat_id).lstrip("-").isdigit() else default_chat_id + chat_id = ( + int(default_chat_id) + if str(default_chat_id).lstrip("-").isdigit() + else default_chat_id + ) status_emoji = "✅" if is_completed else "❌" text = f"{status_emoji} Task #{task_id}: {title}\n{body}" if not is_completed: @@ -184,6 +198,7 @@ async def _send_and_track(): emoji = "👍" if msg.type == OutboundMessageType.TASK_COMPLETED else "👎" try: from telegram import ReactionTypeEmoji + await self._app.bot.set_message_reaction( chat_id=chat_id, message_id=react_target, @@ -193,13 +208,16 @@ async def _send_and_track(): print(f"[Telegram] Failed to set reaction on message {react_target}: {e}") sent = await self._app.bot.send_message( - chat_id=chat_id, text=text, + chat_id=chat_id, + text=text, reply_to_message_id=orig_message_id, ) if sent: with self._notification_lock: self._notification_map[sent.message_id] = task_id - print(f"[Telegram] Notification msg_id={sent.message_id} mapped to task #{task_id}") + print( + f"[Telegram] Notification msg_id={sent.message_id} mapped to task #{task_id}" + ) except Exception as e: print(f"[Telegram] Failed to send notification to {chat_id}: {e}") @@ -227,8 +245,8 @@ async def _start_app(self) -> None: self._app = Application.builder().token(self._token).build() # Slash commands - self._app.add_handler(CommandHandler("start", self._cmd_help)) - self._app.add_handler(CommandHandler("help", self._cmd_help)) + self._app.add_handler(CommandHandler("start", self._cmd_help)) + self._app.add_handler(CommandHandler("help", self._cmd_help)) self._app.add_handler(CommandHandler("status", self._cmd_status)) self._app.add_handler(CommandHandler("cancel", self._cmd_cancel)) self._app.add_handler(CommandHandler("resume", self._cmd_resume)) @@ -307,8 +325,7 @@ def _format_forwarded_text(self, text: str, update: "Update") -> str: # 添加时间戳 if msg.forward_date: - from datetime import datetime - ts = datetime.fromtimestamp(msg.forward_date) + ts = datetime.fromtimestamp(msg.forward_date, tz=timezone(timedelta(hours=8))) parts.append(f"时间: {ts.strftime('%Y-%m-%d %H:%M')}") parts.append("\n--- 转发内容 ---") @@ -316,7 +333,9 @@ def _format_forwarded_text(self, text: str, update: "Update") -> str: return "\n".join(parts) - async def _handle_text_message(self, update: "Update", context: "ContextTypes.DEFAULT_TYPE") -> None: + async def _handle_text_message( + self, update: "Update", context: "ContextTypes.DEFAULT_TYPE" + ) -> None: """Handle any non-command text: resume-by-reply or create task.""" if not self._is_allowed(update.effective_user.id): await update.message.reply_text("⛔ You are not authorised to use this bot.") @@ -328,6 +347,7 @@ async def _handle_text_message(self, update: "Update", context: "ContextTypes.DE # ── /dir command: switch working directory ───────────────── from channels.dir_utils import handle_dir_command + dir_reply = handle_dir_command(text, "telegram", self.db) if dir_reply is not None: await update.message.reply_text(dir_reply) @@ -335,6 +355,7 @@ async def _handle_text_message(self, update: "Update", context: "ContextTypes.DE # ── /agent command: switch coding agent ────────────────── from channels.agent_utils import handle_agent_command + agent_reply = handle_agent_command(text, "telegram", self.db) if agent_reply is not None: await update.message.reply_text(agent_reply) @@ -362,11 +383,16 @@ async def _handle_text_message(self, update: "Update", context: "ContextTypes.DE question=None, ) with self._origin_lock: - self._task_origin[task_id] = (chat_id, update.message.message_id, update.message.message_id) + self._task_origin[task_id] = ( + chat_id, + update.message.message_id, + update.message.message_id, + ) # Add "eyes" reaction and send resuming message try: from telegram import ReactionTypeEmoji + await self._app.bot.set_message_reaction( chat_id=chat_id, message_id=update.message.message_id, @@ -388,7 +414,7 @@ async def _handle_text_message(self, update: "Update", context: "ContextTypes.DE def _create_task(self, text: str, chat_id: int, update: "Update") -> None: """Create a new task from any message text.""" - from taskboard import Task, ScheduleType + from taskboard import ScheduleType, Task msg = update.message @@ -398,9 +424,11 @@ def _create_task(self, text: str, chat_id: int, update: "Update") -> None: title = text[:60] + ("…" if len(text) > 60 else "") from channels.dir_utils import resolve_working_dir + working_dir = resolve_working_dir(text, "telegram", self.db) from channels.agent_utils import resolve_agent + task = Task( title=f"[Telegram] {title_prefix}{title}", prompt=text, @@ -410,7 +438,9 @@ def _create_task(self, text: str, chat_id: int, update: "Update") -> None: agent=resolve_agent("telegram", self.db), ) task_id = self.scheduler.submit_task(task) - print(f"[Telegram] Task #{task_id} created from message{' (forwarded)' if is_forwarded else ''}") + print( + f"[Telegram] Task #{task_id} created from message{' (forwarded)' if is_forwarded else ''}" + ) message_id = update.message.message_id with self._origin_lock: @@ -420,6 +450,7 @@ def _create_task(self, text: str, chat_id: int, update: "Update") -> None: async def _react(): try: from telegram import ReactionTypeEmoji + await self._app.bot.set_message_reaction( chat_id=chat_id, message_id=message_id, @@ -459,8 +490,12 @@ async def _cmd_status(self, update: "Update", context: "ContextTypes.DEFAULT_TYP return status_icon = { - "pending": "🕐", "scheduled": "📅", "running": "⏳", - "completed": "✅", "failed": "❌", "cancelled": "🚫", + "pending": "🕐", + "scheduled": "📅", + "running": "⏳", + "completed": "✅", + "failed": "❌", + "cancelled": "🚫", } icon = status_icon.get(task["status"], "•") lines = [ @@ -491,9 +526,7 @@ async def _cmd_cancel(self, update: "Update", context: "ContextTypes.DEFAULT_TYP await update.message.reply_text(f"❌ Task #{task_id} not found.") return if task["status"] in ("completed", "failed", "cancelled"): - await update.message.reply_text( - f"ℹ️ Task #{task_id} is already {task['status']}." - ) + await update.message.reply_text(f"ℹ️ Task #{task_id} is already {task['status']}.") return self.db.update_task(task_id, status="cancelled") @@ -534,6 +567,7 @@ async def _cmd_resume(self, update: "Update", context: "ContextTypes.DEFAULT_TYP # Add "eyes" reaction to the user's command message try: from telegram import ReactionTypeEmoji + await self._app.bot.set_message_reaction( chat_id=chat_id, message_id=update.message.message_id, @@ -547,6 +581,7 @@ async def _cmd_resume(self, update: "Update", context: "ContextTypes.DEFAULT_TYP # ── helpers ────────────────────────────────────────────────────────────────── + def _escape_md(text: str) -> str: """Escape special MarkdownV2 characters.""" special = r"\_*[]()~`>#+-=|{}.!" @@ -555,8 +590,10 @@ def _escape_md(text: str) -> str: # ── factory helper ─────────────────────────────────────────────────────────── -def create_telegram_channel(db, scheduler, bus=None, - token: str = "", allowed_users_str: str = "") -> Optional["TelegramChannel"]: + +def create_telegram_channel( + db, scheduler, bus=None, token: str = "", allowed_users_str: str = "" +) -> Optional["TelegramChannel"]: """Create a TelegramChannel from explicit params or environment variables.""" token = (token or os.environ.get("TELEGRAM_BOT_TOKEN", "")).strip() if not token: @@ -572,6 +609,7 @@ def create_telegram_channel(db, scheduler, bus=None, if bus is None: from taskboard_bus import MessageBus + bus = MessageBus() return TelegramChannel( diff --git a/docs/rfc-heartbeat.md b/docs/rfc-heartbeat.md new file mode 100644 index 0000000..0b39600 --- /dev/null +++ b/docs/rfc-heartbeat.md @@ -0,0 +1,748 @@ +# RFC: Heartbeat Triggers in AgentForge + +**Status:** Draft +**Date:** 2026-03-18 +**Author:** Codex + +--- + +## Summary + +Introduce a new `heartbeat` resource in AgentForge for low-noise, periodic "check before acting" automation. + +Unlike an existing cron task, a heartbeat does not directly execute a full agent run every time it wakes up. Instead, it performs a lightweight decision step first. Only when that decision says there is actionable work does AgentForge create or resume a real task. + +This RFC proposes: + +- a dedicated heartbeat model separate from `tasks` +- silent idle behavior by default +- structured decision output instead of token or keyword matching +- explicit deduplication and cooldown semantics +- a small first version that coexists cleanly with the current scheduler + +--- + +## Motivation + +AgentForge already supports `immediate`, `delayed`, `scheduled_at`, and `cron` tasks. Those schedules answer one question well: "when should this task run?" + +They do not answer a different question: "when should the system check whether a task is worth creating at all?" + +Today, if a user wants "every 10 minutes, check whether my repo needs review; if yes, create a review task," the closest fit is a cron task. That works, but it has undesirable behavior: + +- every wake-up becomes a real task run +- empty checks pollute task history and output history +- idle cycles create noise in channels and UI +- deduplication has to be reimplemented in prompt logic + +Heartbeat addresses this gap by introducing a higher-level trigger primitive: + +- `cron task`: execute on schedule +- `heartbeat`: inspect on schedule, then decide whether to execute + +This design is inspired by the recent heartbeat redesign in `HKUDS/nanobot`, whose release notes describe moving from fragile token detection to a virtual decision mechanism and making heartbeat truly silent when idle. + +References: + +- [HKUDS/nanobot](https://github.com/HKUDS/nanobot) +- [nanobot v0.1.4.post2 release notes](https://newreleases.io/project/github/HKUDS/nanobot/release/v0.1.4.post2) + +--- + +## Goals + +- Provide a first-class abstraction for periodic inspection that does not always create a task. +- Keep idle heartbeat cycles invisible to the main kanban flow by default. +- Reuse the existing AgentForge scheduler, task execution, and MessageBus architecture. +- Make trigger decisions structured, testable, and debuggable. +- Prevent repeated wake-ups from creating duplicate tasks for the same signal. + +--- + +## Non-Goals + +- Replace the existing cron scheduling system. +- Replace `/api/health` or any process liveness check. +- Build a general-purpose workflow engine in v1. +- Add complex multi-step self-orchestration inside heartbeat itself. +- Expose every idle tick as a full task or full run. + +--- + +## Background + +AgentForge's current scheduler is task-centric. A task is persisted in SQLite, moved through statuses such as `pending`, `scheduled`, `running`, `completed`, and `failed`, and accumulates task runs plus output events. + +That model is correct for real executions, but it is too heavy for a "watcher" abstraction. A watcher needs a lightweight place to store: + +- when it last checked +- what it decided +- why it stayed idle +- which signal it already triggered on + +If heartbeat is modeled as a normal cron task, AgentForge loses the ability to distinguish: + +- "I checked and nothing needed to happen" +- "I executed real work" + +That distinction is the main reason heartbeat should be a separate concept. + +--- + +## Terminology + +### Heartbeat + +A persistent configuration that describes: + +- when to wake up +- what to inspect +- how to decide whether work is needed +- what action to take if the decision is positive + +### Tick + +A single execution of a heartbeat's periodic check. + +### Decision + +The structured output of a tick, describing whether the heartbeat stayed idle, created work, resumed work, notified a user, or failed. + +### Action + +The concrete side effect taken after a non-idle decision, such as creating a task. + +--- + +## Cron vs Heartbeat + +The distinction is central to this RFC. + +### Cron task + +A cron task is a real task whose schedule happens to be recurring. + +Properties: + +- wakes up on schedule +- always executes real agent work +- always creates a task run +- always writes output history +- always participates in normal task lifecycle + +It answers: + +- "When should this task run again?" + +### Heartbeat + +A heartbeat is a trigger resource that may or may not produce a task. + +Properties: + +- wakes up on schedule +- performs a decision step first +- only creates or resumes a real task when needed +- stays silent when idle +- tracks trigger-level history separately from task history + +It answers: + +- "When should the system check whether there is work worth creating?" + +### Practical example + +Requirement: + +"Every 10 minutes, check whether there are new commits in this repo that have not been reviewed. If yes, create a review task. If not, do nothing." + +With cron: + +- a real task runs every 10 minutes +- most runs may be empty +- the board fills with no-op history + +With heartbeat: + +- the heartbeat ticks every 10 minutes +- idle ticks remain internal +- only meaningful changes create real tasks + +--- + +## Design Principles + +### 1. Silent When Idle + +An idle heartbeat should not create a task, a run, or user-facing notification by default. + +It may still update internal metadata such as: + +- `last_tick_at` +- `last_decision` +- `last_error` + +### 2. Decision Before Action + +Heartbeat exists to separate "inspection" from "execution." The system must never assume that a scheduled wake-up implies real work. + +### 3. Structured Over Heuristic + +Decision output must be structured JSON, not a magic token such as `HEARTBEAT_OK` and not regex matching over free-form text. + +This makes behavior: + +- explicit +- testable +- less fragile + +### 4. Low-Noise Observability + +Heartbeat must remain debuggable without polluting the main task board. + +### 5. Reuse Existing Execution Paths + +Once a heartbeat decides that real work is needed, it should hand off to the existing task pipeline rather than inventing a second task execution system. + +### 6. State Lives in Storage, Not Session + +Heartbeat is primarily a trigger mechanism, not a long-running conversation. + +Its persistent state should therefore live in AgentForge's own storage layer, not in an ever-growing model session. The system should remember heartbeat progress through explicit fields such as: + +- `last_tick_at` +- `last_decision` +- `last_triggered_at` +- `last_dedupe_key` +- tick history + +This keeps heartbeat behavior deterministic, debuggable, and inexpensive over long periods of time. + +--- + +## Proposed Model + +### New resource type: `heartbeat` + +Heartbeat is a first-class resource, persisted separately from `tasks`. + +Each heartbeat contains: + +- identity and status +- schedule +- check definition +- action template +- deduplication metadata +- last known results + +### Decision types + +The decision layer should support this enum: + +- `idle` +- `trigger_task` +- `resume_task` +- `notify_only` +- `error` + +For v1, AgentForge only needs to implement: + +- `idle` +- `trigger_task` +- `error` + +The others can be reserved for future expansion. + +### Session strategy + +Heartbeat should support a session policy, but the default must be stateless. + +Recommended model: + +- `stateless`: every decision tick runs in a fresh session +- `sticky`: future option for specialized long-running heartbeat patterns + +V1 recommendation: + +- implement heartbeat decision ticks as `stateless` only + +Rationale: + +- avoids unbounded context growth +- avoids stale or polluted decision context +- keeps trigger behavior reproducible +- ensures deduplication depends on explicit database state rather than hidden model memory + +Important distinction: + +- heartbeat decision state belongs in heartbeat tables +- task execution state may still use normal task `session_id` / resume behavior after a heartbeat creates a real task + +--- + +## Data Model + +### `heartbeats` + +Suggested columns: + +```text +id INTEGER PRIMARY KEY +name TEXT NOT NULL +enabled INTEGER NOT NULL DEFAULT 1 +working_dir TEXT NOT NULL +schedule_type TEXT NOT NULL -- 'cron' | 'interval' +cron_expr TEXT +interval_seconds INTEGER +check_prompt TEXT NOT NULL +action_prompt_template TEXT +default_agent TEXT DEFAULT 'claude' +cooldown_seconds INTEGER DEFAULT 0 +last_tick_at TEXT +last_decision TEXT +last_error TEXT +last_triggered_at TEXT +last_dedupe_key TEXT +created_at TEXT NOT NULL +updated_at TEXT NOT NULL +``` + +Notes: + +- `schedule_type` is intentionally narrower than tasks in v1. +- `action_prompt_template` is only used if the decision says to create a task. +- `last_dedupe_key` is a convenience field for quick visibility, not the source of truth. + +### `heartbeat_ticks` + +Suggested columns: + +```text +id INTEGER PRIMARY KEY +heartbeat_id INTEGER NOT NULL +started_at TEXT NOT NULL +finished_at TEXT +status TEXT NOT NULL -- 'idle' | 'triggered' | 'error' +decision_type TEXT +decision_payload TEXT +task_id INTEGER +error TEXT +FOREIGN KEY (heartbeat_id) REFERENCES heartbeats(id) +FOREIGN KEY (task_id) REFERENCES tasks(id) +``` + +Purpose: + +- preserve lightweight audit history +- keep heartbeat history separate from task runs +- support UI inspection and debugging + +### Optional `heartbeat_dedup` + +This table is optional in v1, but likely useful: + +```text +id INTEGER PRIMARY KEY +heartbeat_id INTEGER NOT NULL +dedupe_key TEXT NOT NULL +task_id INTEGER +triggered_at TEXT NOT NULL +UNIQUE(heartbeat_id, dedupe_key) +``` + +This makes cooldown and repeated-trigger suppression easier to implement correctly. + +--- + +## Decision Contract + +Heartbeat decisions should be returned as JSON. + +### Idle example + +```json +{ + "decision": "idle", + "reason": "No actionable change detected", + "dedupe_key": "repo:/Users/me/project:head:abc123", + "title": "", + "prompt": "", + "metadata": {} +} +``` + +### Trigger example + +```json +{ + "decision": "trigger_task", + "reason": "Found 3 changed files since last reviewed commit", + "dedupe_key": "repo:/Users/me/project:review:abc123", + "title": "Review latest changes in project", + "prompt": "Review the latest changes since commit abc123 and summarize risks, regressions, and missing tests.", + "metadata": { + "source": "heartbeat", + "heartbeat_id": 12 + } +} +``` + +### Error example + +```json +{ + "decision": "error", + "reason": "Git command failed: repository not found", + "dedupe_key": "", + "title": "", + "prompt": "", + "metadata": {} +} +``` + +### Required fields + +- `decision` +- `reason` +- `dedupe_key` +- `title` +- `prompt` +- `metadata` + +### Why structured output matters + +This avoids several classes of failure: + +- ambiguous free-form outputs +- accidental token collisions +- brittle string matching in multiple languages +- hidden prompt regressions when model behavior changes + +--- + +## Scheduler Behavior + +Heartbeat should run alongside the current task scheduler, but not as a normal task. + +### Tick lifecycle + +1. Scheduler finds due heartbeats. +2. For each due heartbeat, AgentForge creates a `heartbeat_tick` record. +3. The heartbeat executes a decision step. +4. AgentForge parses and validates the decision JSON. +5. The system checks cooldown and dedupe constraints. +6. If the decision is `idle`, mark the tick idle and update heartbeat metadata. +7. If the decision is `trigger_task`, create a real task through the existing task submission path. +8. Mark the tick as `triggered` and associate it with the created task. +9. If any step fails, mark the tick as `error`. + +### Session behavior during a tick + +For v1, the decision step of a heartbeat tick should always execute in a fresh session. + +That means: + +- no heartbeat-level `session_id` is persisted or resumed +- decision consistency relies on persisted heartbeat metadata and tick history +- if a tick triggers a real task, that task may then use its own normal session semantics + +This preserves a clean separation: + +- heartbeat checks for work +- tasks carry conversational or execution continuity + +### Concurrency + +As with tasks, a heartbeat should not have overlapping active ticks unless explicitly enabled in the future. + +V1 rule: + +- one active tick per heartbeat at a time + +### Failure behavior + +A failed heartbeat tick should not automatically create a failed task. It should remain a heartbeat-level error unless a future policy says otherwise. + +--- + +## Deduplication and Cooldown + +Heartbeat needs explicit trigger suppression. This is one of the biggest differences from cron. + +### Definitions + +- `dedupe_key`: identity of the external signal that triggered action +- `cooldown_seconds`: minimum time before the same signal may trigger again + +### Rules + +If a decision returns `trigger_task`, AgentForge should suppress that trigger when: + +- the same `heartbeat_id + dedupe_key` was already triggered inside cooldown +- the previously created task for that `dedupe_key` is still `pending`, `scheduled`, `blocked`, or `running` + +### Result + +The heartbeat may still record the tick, but the action is downgraded to `idle` with a reason such as: + +- "Suppressed duplicate signal during cooldown" + +--- + +## Action Dispatch + +When a heartbeat decides to act, it should dispatch through existing code paths. + +### `trigger_task` + +Use the normal task creation path with metadata that records heartbeat provenance. + +Suggested metadata fields: + +- `source = "heartbeat"` +- `heartbeat_id` +- `heartbeat_tick_id` +- `dedupe_key` + +This preserves a clean boundary: + +- heartbeat decides +- task system executes + +### `resume_task` + +Reserved for future versions. The likely use case is reviving a waiting conversational task when a heartbeat detects new context. + +### `notify_only` + +Reserved for future versions. This would route through MessageBus without creating a task. + +--- + +## API Proposal + +Suggested endpoints: + +- `GET /api/heartbeats` +- `POST /api/heartbeats` +- `GET /api/heartbeats/{id}` +- `PUT /api/heartbeats/{id}` +- `DELETE /api/heartbeats/{id}` +- `POST /api/heartbeats/{id}/run-now` +- `POST /api/heartbeats/{id}/pause` +- `POST /api/heartbeats/{id}/resume` +- `GET /api/heartbeats/{id}/ticks` + +### Example create payload + +```json +{ + "name": "Repo review watcher", + "working_dir": "~/projects/myapp", + "schedule_type": "interval", + "interval_seconds": 600, + "check_prompt": "Check whether there are new commits since the last reviewed commit. Return heartbeat decision JSON only.", + "action_prompt_template": "Review the latest code changes and summarize bugs, regressions, and missing tests.", + "default_agent": "claude", + "cooldown_seconds": 1800, + "enabled": true +} +``` + +### Example response + +```json +{ + "id": 12, + "name": "Repo review watcher", + "enabled": true, + "schedule_type": "interval", + "interval_seconds": 600, + "last_tick_at": "2026-03-18T10:30:00", + "last_decision": "idle", + "last_triggered_at": "2026-03-18T09:40:00" +} +``` + +--- + +## UI Proposal + +Heartbeat should not be rendered as ordinary cards in the existing Queue / Running / Done columns. + +Suggested UI treatment: + +- a dedicated `Heartbeats` tab or panel +- compact list view rather than kanban +- one row per heartbeat + +Suggested columns: + +- name +- enabled / paused +- schedule +- next tick +- last decision +- last triggered task +- last error + +Suggested actions: + +- run now +- pause / resume +- edit +- view recent ticks + +### Tick inspection + +The heartbeat detail view should show recent ticks with: + +- time +- decision +- reason +- dedupe key +- linked task, if any +- error, if any + +Idle ticks should be visible in heartbeat details, but not in the main task board by default. + +--- + +## Observability + +Heartbeat should be observable without becoming noisy. + +### Internal visibility + +Store: + +- last tick time +- last decision +- last trigger time +- last error +- recent ticks + +### Logging + +Recommended log levels: + +- `debug`: decision payloads, suppressed duplicates +- `info`: actual task-triggering events +- `warning`: malformed decisions, repeated scheduler anomalies +- `error`: execution failures + +### Notifications + +By default: + +- idle heartbeat ticks do not notify users +- triggered tasks behave like normal tasks and may notify via existing channels + +--- + +## Security and Safety Considerations + +Heartbeat introduces recurring automated inspection and therefore deserves guardrails. + +### Prompt safety + +Heartbeat decision prompts should be constrained to return JSON only. + +### Working directory safety + +Heartbeat should reuse the same working directory safety assumptions already enforced for normal task execution. + +### Runaway automation + +Cooldown and dedupe exist partly as correctness features, but also as safety features to avoid accidental flood behavior. + +### User visibility + +Because heartbeat may create tasks on its own, the UI should always show: + +- what heartbeat created the task +- why it triggered + +--- + +## Alternatives Considered + +### 1. Use cron tasks only + +Rejected because it conflates "check" with "execute" and creates excessive no-op runs. + +### 2. Add a boolean flag to cron tasks, such as `silent_if_noop` + +Rejected because it still overloads the task abstraction. The system would have to infer or special-case whether a task actually "did nothing," which is much less explicit than a dedicated heartbeat resource. + +### 3. Parse a magic token from free-form output + +Rejected because it is fragile, hard to validate, and too easy to regress as prompts evolve. + +### 4. Emit every heartbeat tick as a task card + +Rejected because it undermines the entire goal of low-noise monitoring. + +--- + +## Rollout Plan + +### Phase 1: Minimal viable heartbeat + +Scope: + +- heartbeat CRUD +- schedule types: `cron` and `interval` +- decision types: `idle`, `trigger_task`, `error` +- tick history +- dedupe and cooldown +- basic UI list and tick inspection + +No support yet for: + +- `resume_task` +- `notify_only` +- custom tool APIs for decision logic +- channel-specific routing policies + +### Phase 2: Richer actions + +Potential additions: + +- resume existing task +- direct channel notifications +- richer per-heartbeat policies +- heartbeat-specific templates for common watcher types + +### Phase 3: Advanced policies + +Potential additions: + +- backlog thresholds +- max tasks per day +- dependency-aware heartbeats +- per-channel or per-user routing + +--- + +## Open Questions + +1. Should idle ticks be persisted indefinitely, or should AgentForge retain only the last N ticks per heartbeat? +2. Should heartbeat creation be limited to the desktop UI first, or should chat channels be allowed to create them too? +3. Should `trigger_task` create a brand-new task every time, or optionally update/reuse an existing open task for the same dedupe key? +4. Should v1 expose only stateless heartbeat decisions, or should a sticky-session mode be designed now but left unimplemented? +5. Should heartbeat-created tasks be visually marked in the main board, for example with a `heartbeat` badge? + +--- + +## Recommendation + +Implement heartbeat as a separate first-class resource rather than as a modified cron task. + +This keeps the mental model clean: + +- tasks represent real work +- cron defines when real work runs +- heartbeat defines when the system checks whether real work should exist + +That separation makes AgentForge better suited for proactive automation without turning the board into a history of empty checks. diff --git a/docs/superpowers/plans/2026-03-18-backend-quality-baseline.md b/docs/superpowers/plans/2026-03-18-backend-quality-baseline.md new file mode 100644 index 0000000..4b52a0f --- /dev/null +++ b/docs/superpowers/plans/2026-03-18-backend-quality-baseline.md @@ -0,0 +1,72 @@ +# Backend Quality Baseline Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Add a sustainable Python backend quality baseline with Ruff linting/formatting, pytest coverage, and GitHub Actions CI gates. + +**Architecture:** Keep the current backend structure intact, add tool configuration centrally in `pyproject.toml`, and introduce a thin testable seam where needed. Build around the existing `tests/` suite instead of replacing it, then expose one local verification entrypoint through `Makefile` and reuse it in CI. + +**Tech Stack:** Python 3.12, uv, Ruff, pytest, pytest-cov, GitHub Actions + +--- + +### Task 1: Baseline Audit And Tracking + +**Files:** +- Modify: `docs/todo.md` +- Create: `docs/superpowers/plans/2026-03-18-backend-quality-baseline.md` + +- [x] **Step 1: Record the work in project tracking** +- [x] **Step 2: Save the implementation plan before code changes** + +### Task 2: Tooling Configuration + +**Files:** +- Modify: `pyproject.toml` +- Modify: `Makefile` + +- [ ] **Step 1: Add a failing lint/test entrypoint expectation** + - Run: `make check` + - Expected: fail because lint targets/config are missing +- [ ] **Step 2: Add dev dependencies and tool config** + - Add `ruff` and `pytest-cov` + - Add `tool.ruff`, `tool.pytest.ini_options`, and coverage config +- [ ] **Step 3: Add Makefile commands** + - Add `lint`, `format`, `format-check`, `test`, `test-cov`, `check` +- [ ] **Step 4: Re-run focused verification** + - Run: `uv run ruff check .` + - Run: `uv run pytest` + +### Task 3: Test-First Coverage Fixes + +**Files:** +- Modify: `tests/conftest.py` +- Create or Modify: `tests/test_helpers.py` +- Create or Modify: targeted backend tests as needed +- Modify: backend Python files only when a failing test proves a seam is needed + +- [ ] **Step 1: Write or tighten one failing test at a time** + - Cover `_get_env()` and one database or scheduling seam +- [ ] **Step 2: Run the focused test and confirm the failure is correct** +- [ ] **Step 3: Make the minimal implementation/config change** +- [ ] **Step 4: Re-run the focused test until green** + +### Task 4: CI Workflow + +**Files:** +- Create: `.github/workflows/ci.yml` + +- [ ] **Step 1: Add CI for `pull_request` and `push` to `main`** +- [ ] **Step 2: Install Python 3.12 and uv** +- [ ] **Step 3: Run `uv sync --dev` and `make check`** +- [ ] **Step 4: Confirm the workflow matches local verification commands** + +### Task 5: Final Verification And Tracking + +**Files:** +- Modify: `docs/todo.md` + +- [ ] **Step 1: Run full project verification** + - Run: `make check` +- [ ] **Step 2: Update tracking entry with completed status and evidence** +- [ ] **Step 3: Summarize changes and any follow-up recommendations** diff --git a/docs/todo.md b/docs/todo.md index 454e395..c1ac1e3 100644 --- a/docs/todo.md +++ b/docs/todo.md @@ -1,5 +1,38 @@ # AgentForge TODO +## In Progress + +- [x] **修复转发消息时间格式测试失败** — 统一 Feishu / Telegram 转发时间按北京时间 `UTC+8` 格式化 + - ✅ 已修复:`channels/feishu_channel.py` 与 `channels/telegram_channel.py` 不再依赖进程本地时区 + - ✅ 验证:相关 3 个失败用例通过;`tests/test_feishu_forwarded_messages.py` 和 `tests/test_telegram_forwarded_messages.py` 全部通过 +- [x] **rebase `codex/backend-quality-and-heartbeat` onto latest `origin/main`** — 解决 PR #6 的文本冲突并保留 heartbeat + backend quality baseline 改动 + - ✅ 已完成:`git rebase origin/main` 成功,`9ae1cdd` 被自动识别为已在上游并 dropped + - ✅ 已确认:`git merge-base HEAD origin/main` 与 `origin/main` 均为 `6aeb72e` + - ✅ 验证:`make check` 通过,`54 passed` +- [x] **检查并处理当前分支的 merge conflicts** — 核对 git merge/rebase 状态、未合并文件以及与 `main` 的分叉点 + - ✅ 已确认:当前无进行中的 merge/rebase,`git diff --name-only --diff-filter=U` 为空 + - ✅ 已确认:`HEAD`、`main`、`merge-base` 均为 `626f16b`,当前工作树 clean +- [x] **建立后端 lint/test/CI 基线** — 为 Python 后端接入 `ruff`、`pytest` 覆盖率和 GitHub Actions 质量门禁 + - ✅ 已完成:`pyproject.toml` 新增 `ruff` / `pytest-cov` 与 lint、pytest、coverage 配置 + - ✅ 已完成:`Makefile` 新增 `lint` / `format` / `format-check` / `test` / `test-cov` / `check` + - ✅ 已完成:新增 `.github/workflows/ci.yml`,在 `pull_request` 和 `main` push 时执行 `make check` + - ✅ 已完成:补充 `tests/test_channel_utils.py` 和 `tests/test_taskboard_bus.py`,提升后端基线覆盖率 + - ✅ 验证:`make check` 通过,`54 passed`,总覆盖率 `20.02%` +- [x] **Install superpowers for Codex** — followed `https://raw.githubusercontent.com/obra/superpowers/refs/heads/main/.codex/INSTALL.md` + - ✅ Cloned `obra/superpowers` to `~/.codex/superpowers` + - ✅ Created `~/.agents/skills/superpowers` symlink to `~/.codex/superpowers/skills` + - ✅ Verified symlink with `ls -la ~/.agents/skills/superpowers` +- [x] **起草 Heartbeat RFC** — 明确 heartbeat 与现有 cron task 的边界、数据模型、调度行为和第一版实现范围 + - ✅ 已完成:新增 `docs/rfc-heartbeat.md`,覆盖目标/非目标、cron vs heartbeat、数据模型、决策协议、调度与去重、API/UI 草案和 rollout plan +- [x] **实现 Heartbeat MVP** — 提供后端调度、decision tick、REST API 和 macOS App 管理界面 + - ✅ 后端:新增 heartbeat schema、ticks、dedupe、scheduler decision tick 和 `/api/heartbeats*` 端点 + - ✅ 前端:新增 Tasks / Heartbeats 双视图、Heartbeat 创建/编辑 modal、run/pause/resume/delete 操作和 tick detail panel + - ✅ 验证:`uv run pytest tests/test_heartbeat.py -q` 通过,renderer `vite build` 通过 +- [x] **增加 Heartbeat Tick 日志观测性** — 支持查看 running/completed heartbeat tick 的实时输出与历史日志 + - ✅ 后端:heartbeat tick 新增 `raw_output` 持久化和 live output cache,提供 `/api/heartbeats/:id/ticks/:tick_id/output` + - ✅ 前端:Heartbeat detail 新增 Tick Log 面板,可查看选中 tick 的实时日志和历史输出 + - ✅ 验证:`uv run pytest tests/test_heartbeat.py -q` 通过,renderer `vite build` 通过 + ## Critical - Security - [x] **SQL 注入防护** — `taskboard.py:319` `update_task` 方法中 kwargs key 直接拼入 SQL,需对列名做白名单校验 diff --git a/pyproject.toml b/pyproject.toml index 22b0bec..20a022f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -21,4 +21,39 @@ dependencies = [ dev = [ "pyinstaller>=6.18.0", "pytest>=9.0.2", + "pytest-cov>=7.0.0", + "ruff>=0.11.0", ] + +[tool.ruff] +line-length = 100 +target-version = "py312" +extend-exclude = [ + "skills", + "taskboard-electron", +] + +[tool.ruff.lint] +select = ["E", "F", "I", "B"] +ignore = ["E501"] + +[tool.pytest.ini_options] +testpaths = ["tests"] +pythonpath = ["."] +addopts = "-q" + +[tool.coverage.run] +branch = true +source = [ + "taskboard", + "taskboard_bus", + "channels", +] +omit = [ + "tests/*", +] + +[tool.coverage.report] +show_missing = true +skip_covered = false +fail_under = 20 diff --git a/skills/agentforge/scripts/agentforge_api.py b/skills/agentforge/scripts/agentforge_api.py index f405377..1b660f5 100644 --- a/skills/agentforge/scripts/agentforge_api.py +++ b/skills/agentforge/scripts/agentforge_api.py @@ -31,14 +31,13 @@ import argparse import json -import sys import os -import urllib.request +import sys import urllib.error import urllib.parse +import urllib.request from datetime import datetime - API_BASE = "http://127.0.0.1:9712/api" SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) diff --git a/taskboard-electron/src/renderer/App.jsx b/taskboard-electron/src/renderer/App.jsx index 6a32eb0..b0cc8f2 100644 --- a/taskboard-electron/src/renderer/App.jsx +++ b/taskboard-electron/src/renderer/App.jsx @@ -319,6 +319,12 @@ async function fetchTasks() { return res.json(); } +async function fetchHeartbeats() { + const res = await fetch(`${API}/heartbeats`); + if (!res.ok) throw new Error(`HTTP ${res.status}`); + return res.json(); +} + async function createTask(data) { const res = await fetch(`${API}/tasks`, { method: "POST", headers: await csrfHeaders(), @@ -327,6 +333,73 @@ async function createTask(data) { return res.json(); } +async function createHeartbeat(data) { + const res = await fetch(`${API}/heartbeats`, { + method: "POST", headers: await csrfHeaders(), + body: JSON.stringify(data), + }); + const payload = await res.json().catch(() => ({})); + if (!res.ok) throw new Error(payload.error || `HTTP ${res.status}`); + return payload; +} + +async function updateHeartbeat(id, data) { + const res = await fetch(`${API}/heartbeats/${id}`, { + method: "PUT", headers: await csrfHeaders(), + body: JSON.stringify(data), + }); + const payload = await res.json().catch(() => ({})); + if (!res.ok) throw new Error(payload.error || `HTTP ${res.status}`); + return payload; +} + +async function deleteHeartbeat(id) { + const res = await fetch(`${API}/heartbeats/${id}`, { + method: "DELETE", headers: await csrfHeaders(), + }); + if (!res.ok) throw new Error(`HTTP ${res.status}`); +} + +async function runHeartbeatNow(id) { + const res = await fetch(`${API}/heartbeats/${id}/run-now`, { + method: "POST", headers: await csrfHeaders(), + }); + const payload = await res.json().catch(() => ({})); + if (!res.ok) throw new Error(payload.error || `HTTP ${res.status}`); + return payload; +} + +async function pauseHeartbeat(id) { + const res = await fetch(`${API}/heartbeats/${id}/pause`, { + method: "POST", headers: await csrfHeaders(), + }); + const payload = await res.json().catch(() => ({})); + if (!res.ok) throw new Error(payload.error || `HTTP ${res.status}`); + return payload; +} + +async function resumeHeartbeatApi(id) { + const res = await fetch(`${API}/heartbeats/${id}/resume`, { + method: "POST", headers: await csrfHeaders(), + }); + const payload = await res.json().catch(() => ({})); + if (!res.ok) throw new Error(payload.error || `HTTP ${res.status}`); + return payload; +} + +async function fetchHeartbeatTicks(id) { + const res = await fetch(`${API}/heartbeats/${id}/ticks?limit=20`); + if (!res.ok) throw new Error(`HTTP ${res.status}`); + const payload = await res.json(); + return payload.ticks || []; +} + +async function fetchHeartbeatTickOutput(heartbeatId, tickId) { + const res = await fetch(`${API}/heartbeats/${heartbeatId}/ticks/${tickId}/output`); + if (!res.ok) throw new Error(`HTTP ${res.status}`); + return res.json(); +} + async function cancelTask(id) { await fetch(`${API}/tasks/${id}/cancel`, { method: "POST", headers: await csrfHeaders() }); } @@ -648,6 +721,428 @@ function Column({ col, tasks, onAction, onViewDetail }) { ); } +function HeartbeatBadge({ enabled }) { + return ( + + {enabled ? "●" : "◌"} + {enabled ? "Enabled" : "Paused"} + + ); +} + +function HeartbeatModal({ onClose, onSubmit, initialData, defaultAgent, mode = "create" }) { + const savedDir = localStorage.getItem("agentforge_working_dir") || "~/papers"; + const [form, setForm] = useState(() => ({ + name: initialData?.name || "", + working_dir: initialData?.working_dir || savedDir, + schedule_type: initialData?.schedule_type || "interval", + interval_seconds: initialData?.interval_seconds || 600, + cron_expr: initialData?.cron_expr || "", + check_prompt: initialData?.check_prompt || "", + action_prompt_template: initialData?.action_prompt_template || "", + default_agent: initialData?.default_agent || defaultAgent || "claude", + cooldown_seconds: initialData?.cooldown_seconds || 1800, + enabled: initialData?.enabled ?? true, + })); + + const set = (k, v) => setForm(prev => ({ ...prev, [k]: v })); + + const inputStyle = { + width: "100%", padding: "10px 14px", borderRadius: 8, + border: `1px solid ${theme.border}`, background: theme.bg, + color: theme.text, fontSize: 13, outline: "none", boxSizing: "border-box", + fontFamily: "'JetBrains Mono', 'SF Mono', monospace", + }; + const labelStyle = { + fontSize: 11, fontWeight: 600, color: theme.textMuted, + letterSpacing: 0.8, textTransform: "uppercase", marginBottom: 6, display: "block", + }; + + const handleSubmit = () => { + localStorage.setItem("agentforge_working_dir", form.working_dir); + onSubmit({ + ...form, + name: form.name || "Untitled heartbeat", + interval_seconds: form.schedule_type === "interval" ? parseInt(form.interval_seconds) || 600 : null, + cooldown_seconds: parseInt(form.cooldown_seconds) || 0, + cron_expr: form.schedule_type === "cron" ? form.cron_expr : null, + }); + }; + + return ( +
+
e.stopPropagation()} style={{ + background: theme.surface, border: `1px solid ${theme.border}`, + borderRadius: 16, padding: 32, width: 640, maxHeight: "84vh", + overflow: "auto", boxShadow: "0 24px 80px rgba(0,0,0,0.5)", + }}> +

+ {mode === "edit" ? "Edit Heartbeat" : "New Heartbeat"} +

+
+
+ + set("name", e.target.value)} placeholder="Repo review watcher" /> +
+
+ +
+ set("working_dir", e.target.value)} /> + {window.electronAPI?.selectDirectory && ( + + )} +
+
+
+ +
+ {["interval", "cron"].map(t => ( + + ))} +
+
+ {form.schedule_type === "interval" ? ( +
+ + set("interval_seconds", e.target.value)} /> +
+ ) : ( +
+ + set("cron_expr", e.target.value)} placeholder="*/10 * * * *" /> +
+ )} +
+ +