From acb41bbde285f933be42cc645c41adc2dd0af654 Mon Sep 17 00:00:00 2001 From: Bruno Bornsztein Date: Tue, 23 Jun 2026 09:02:09 -0500 Subject: [PATCH 1/2] feat(notify): push notifications + one-tap unblock (ntfy/Telegram) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds a first-class push notifier that fires on task lifecycle events (blocked/needs-input, auth-required, completed, failed) and lets you act from your phone with one tap. - internal/notify: provider-agnostic Notifier with ntfy (JSON publish, structured action buttons) and Telegram (deep-link buttons) providers. OFF by default; settings read live from the DB. - Wires into the existing events.Emitter via a Notifier interface — every mutation path (executor, MCP, CLI, TUI) already routes through it, so no parallel event path. Fires on the emitter's wait group so short-lived CLI/MCP commands flush pushes before exit. - needs-input / auth-required pushes carry a one-tap ntfy "http" action that POSTs to the existing POST /api/tasks/{id}/input endpoint to resume the agent, plus an "Open task" deep link. Reason falls back to the latest taskyou_needs_input question for a meaningful body. - Config via `ty settings` (notify_enabled, notify_ntfy_*, notify_telegram_*, notify_base_url, notify_unblock_reply); token keys are secret-hidden. - Tests: provider unit tests, event-mapping/filtering, reason enrichment, and end-to-end DB → emitter → notifier → HTTP through the real wiring. - Docs: docs/notifications.md + README feature bullet. Co-Authored-By: Claude Opus 4.8 --- README.md | 1 + cmd/task/completion.go | 8 + cmd/task/main.go | 56 +++++- docs/notifications.md | 119 +++++++++++ internal/config/config.go | 37 ++++ internal/events/events.go | 42 +++- internal/events/events_test.go | 45 +++++ internal/executor/executor.go | 9 + internal/notify/integration_test.go | 172 ++++++++++++++++ internal/notify/notify.go | 298 ++++++++++++++++++++++++++++ internal/notify/notify_test.go | 218 ++++++++++++++++++++ internal/notify/ntfy.go | 115 +++++++++++ internal/notify/telegram.go | 104 ++++++++++ 13 files changed, 1214 insertions(+), 10 deletions(-) create mode 100644 docs/notifications.md create mode 100644 internal/notify/integration_test.go create mode 100644 internal/notify/notify.go create mode 100644 internal/notify/notify_test.go create mode 100644 internal/notify/ntfy.go create mode 100644 internal/notify/telegram.go diff --git a/README.md b/README.md index d1e6c58f..e609e5f5 100644 --- a/README.md +++ b/README.md @@ -77,6 +77,7 @@ The same UI is also served in your browser at `http://localhost:8484` whenever ` - **Git Worktrees** - Each task runs in an isolated worktree, no conflicts between parallel tasks - **Pluggable Executors** - Choose between Claude Code, OpenAI Codex, Gemini, Pi, OpenClaw, or OpenCode per task - **Event Hooks** - Run scripts when tasks change state (see [Event Hooks](#event-hooks)) +- **Push Notifications** - Get a push (ntfy or Telegram) when a task needs you, with a one-tap reply to unblock it from your phone (see [docs/notifications.md](docs/notifications.md)) - **Ghost Text Autocomplete** - LLM-powered suggestions for task titles and descriptions as you type - **VS Code-style Fuzzy Search** - Quick task navigation with smart matching (e.g., "dsno" matches "diseno website") - **Markdown Rendering** - Task descriptions render with proper formatting in the detail view diff --git a/cmd/task/completion.go b/cmd/task/completion.go index e0545af9..efcb22c2 100644 --- a/cmd/task/completion.go +++ b/cmd/task/completion.go @@ -117,6 +117,14 @@ func completeSettingKeys(cmd *cobra.Command, args []string, toComplete string) ( "idle_suspend_timeout\tIdle timeout before suspending (e.g. 6h)", "http_api_port\tPort for the daemon-hosted HTTP API (default 8080)", "http_api_disabled\tDisable the daemon-hosted HTTP API (true/false)", + "notify_enabled\tEnable push notifications (true/false)", + "notify_base_url\tExternally reachable HTTP API base for one-tap actions", + "notify_unblock_reply\tCanned reply sent on a one-tap unblock", + "notify_ntfy_server\tntfy server base URL (default https://ntfy.sh)", + "notify_ntfy_topic\tntfy topic to publish to", + "notify_ntfy_token\tntfy access token for protected topics (secret)", + "notify_telegram_token\tTelegram bot token (secret)", + "notify_telegram_chat_id\tTelegram chat ID to deliver to", }, cobra.ShellCompDirectiveNoFileComp } return nil, cobra.ShellCompDirectiveNoFileComp diff --git a/cmd/task/main.go b/cmd/task/main.go index b5fb14a3..c36b56e5 100644 --- a/cmd/task/main.go +++ b/cmd/task/main.go @@ -32,6 +32,7 @@ import ( "github.com/bborn/workflow/internal/github" "github.com/bborn/workflow/internal/hooks" "github.com/bborn/workflow/internal/mcp" + "github.com/bborn/workflow/internal/notify" "github.com/bborn/workflow/internal/ui" "github.com/bborn/workflow/internal/web" ) @@ -84,6 +85,9 @@ func openTaskDB(path string) (*db.DB, error) { if taskEmitter == nil { taskEmitter = events.New(hooks.DefaultHooksDir()) } + // Bind the push notifier to the database this caller is using so settings + // (and the latest needs-input question) are read from the live handle. + taskEmitter.SetNotifier(notify.New(database)) database.SetEventEmitter(taskEmitter) return database, nil } @@ -2339,6 +2343,25 @@ servers programmatically.`, } fmt.Printf("idle_suspend_timeout: %s\n", idleTimeout) + // Push notifications + notifyEnabled, _ := database.GetSetting(config.SettingNotifyEnabled) + if notifyEnabled == "" { + notifyEnabled = "false (default)" + } + fmt.Printf("notify_enabled: %s\n", notifyEnabled) + if ntfyTopic, _ := database.GetSetting(config.SettingNtfyTopic); ntfyTopic != "" { + fmt.Printf("notify_ntfy_topic: %s\n", ntfyTopic) + } + if ntfyToken, _ := database.GetSetting(config.SettingNtfyToken); ntfyToken != "" { + fmt.Printf("notify_ntfy_token: %s\n", dimStyle.Render("(set — hidden)")) + } + if tgToken, _ := database.GetSetting(config.SettingTelegramToken); tgToken != "" { + fmt.Printf("notify_telegram_token: %s\n", dimStyle.Render("(set — hidden)")) + } + if baseURL, _ := database.GetSetting(config.SettingNotifyBaseURL); baseURL != "" { + fmt.Printf("notify_base_url: %s\n", baseURL) + } + fmt.Println() fmt.Println(dimStyle.Render("Use 'task settings set ' to change settings")) }, @@ -2356,7 +2379,18 @@ Available settings: autocomplete_enabled Enable/disable ghost text autocomplete (true/false) idle_suspend_timeout How long blocked tasks wait before suspending (e.g. 6h, 30m, 24h) http_api_port Port the daemon-hosted HTTP API listens on (default 8080) - http_api_disabled Stop the daemon from hosting the HTTP API (true/false)`, + http_api_disabled Stop the daemon from hosting the HTTP API (true/false) + +Push notifications (off by default; see docs/notifications.md): + notify_enabled Turn push notifications on/off (true/false) + notify_base_url Externally reachable HTTP API base for one-tap actions + (e.g. https://ty.my-tailnet.ts.net:8080) + notify_unblock_reply Canned reply sent on a one-tap unblock (default "continue") + notify_ntfy_server ntfy server base URL (default https://ntfy.sh) + notify_ntfy_topic ntfy topic to publish to (enables the ntfy provider) + notify_ntfy_token ntfy access token for protected topics (secret) + notify_telegram_token Telegram bot token (secret; enables Telegram) + notify_telegram_chat_id Telegram chat ID to deliver to`, Args: cobra.ExactArgs(2), Run: func(cmd *cobra.Command, args []string) { key := args[0] @@ -2389,9 +2423,27 @@ Available settings: fmt.Println(errorStyle.Render("Value must be 'true' or 'false'")) return } + case config.SettingNotifyEnabled: + if value != "true" && value != "false" { + fmt.Println(errorStyle.Render("Value must be 'true' or 'false'")) + return + } + case config.SettingNotifyBaseURL, config.SettingNtfyServer: + if !strings.HasPrefix(value, "http://") && !strings.HasPrefix(value, "https://") { + fmt.Println(errorStyle.Render("Value must be an http:// or https:// URL")) + return + } + case config.SettingNotifyUnblockReply, + config.SettingNtfyTopic, config.SettingNtfyToken, + config.SettingTelegramToken, config.SettingTelegramChatID: + // Free-form strings; no validation beyond being non-empty. + if value == "" { + fmt.Println(errorStyle.Render("Value must not be empty")) + return + } default: fmt.Println(errorStyle.Render("Unknown setting: " + key)) - fmt.Println(dimStyle.Render("Available: anthropic_api_key, autocomplete_enabled, idle_suspend_timeout, http_api_port, http_api_disabled")) + fmt.Println(dimStyle.Render("Available: anthropic_api_key, autocomplete_enabled, idle_suspend_timeout, http_api_port, http_api_disabled, notify_enabled, notify_base_url, notify_unblock_reply, notify_ntfy_server, notify_ntfy_topic, notify_ntfy_token, notify_telegram_token, notify_telegram_chat_id")) return } diff --git a/docs/notifications.md b/docs/notifications.md new file mode 100644 index 00000000..ae5478e6 --- /dev/null +++ b/docs/notifications.md @@ -0,0 +1,119 @@ +# Push notifications & one-tap unblock + +TaskYou can push a notification to your phone when a task needs you — and let +you act on it with one tap, without opening a laptop. Notifications fire on task +lifecycle events and are delivered through [ntfy](https://ntfy.sh) (simplest) +or a Telegram bot. + +**Notifications are OFF by default.** Nothing is sent until you turn them on and +configure a provider. + +## What fires a push + +| Event | When | Push | +|-------|------|------| +| `task.blocked` | A task calls `taskyou_needs_input`, or is otherwise blocked waiting on you | 🔔 "Needs input" + one-tap reply action | +| `task.auth_required` | An executor session needs re-authentication (e.g. expired login) | 🔐 "Auth required" + one-tap reply action | +| `task.completed` | A task finishes (or its PR is up for review) | ✅ "Completed" | +| `task.failed` | A task fails | ❌ "Failed" | + +The body always includes the **task title**, **project**, and a **short +reason** — for needs-input pushes that reason is the actual question the agent +asked. + +These hook into the existing event system (`internal/events`), so every code +path that changes task state — the executor, MCP (`taskyou_needs_input`, +`taskyou_complete`), CLI, and the TUI — produces a push. No parallel path. + +## Quick start (ntfy) + +1. Install the [ntfy app](https://ntfy.sh/app) on your phone and subscribe to a + private, hard-to-guess topic (e.g. `ty-bruno-7f3a9c`). + +2. Configure TaskYou: + + ```sh + ty settings set notify_enabled true + ty settings set notify_ntfy_topic ty-bruno-7f3a9c + ``` + +3. To make the **one-tap reply** button work from your phone, the daemon's HTTP + API must be reachable from the internet (or your VPN/tailnet). Point + TaskYou at that reachable base URL: + + ```sh + ty settings set notify_base_url https://ty.my-tailnet.ts.net:8080 + ``` + + If you don't set `notify_base_url`, action links fall back to + `http://localhost:`, which only works on the same machine. + +That's it. Block a task (or have an agent call `taskyou_needs_input`) and you'll +get a push within a few seconds. + +## One-tap unblock — how it works + +A needs-input / auth-required push carries two action buttons: + +- **Reply "continue"** — an ntfy `http` action that POSTs to the existing web + API, `POST /api/tasks/{id}/input`, with `{"message":"continue"}`. That types + the reply into the agent's session and presses Enter, resuming the task. The + canned reply is configurable: + + ```sh + ty settings set notify_unblock_reply "yes, go ahead" + ``` + +- **Open task** — a `view` action that opens the web UI so you can type a + full custom reply. + +The round trip is: push → tap → `POST /api/tasks/{id}/input` → `tmux +send-keys` into the executor pane → agent resumes. + +## Protected ntfy topics + +If your topic requires auth, set an access token (stored as a secret and hidden +from `ty settings` / the settings API): + +```sh +ty settings set notify_ntfy_token tk_xxxxxxxxxxxxxxxx +``` + +You can also self-host ntfy and point at it: + +```sh +ty settings set notify_ntfy_server https://ntfy.example.com +``` + +## Telegram (optional second provider) + +[Create a bot](https://core.telegram.org/bots#how-do-i-create-a-bot) via +@BotFather, then find your chat ID (e.g. message +[@userinfobot](https://t.me/userinfobot)): + +```sh +ty settings set notify_telegram_token 123456:ABC-DEF... +ty settings set notify_telegram_chat_id 987654321 +``` + +Telegram inline buttons can only navigate (not POST), so Telegram pushes get an +**Open task** deep link to the web UI rather than a true one-tap reply. Use ntfy +for one-tap unblock. + +If both ntfy and Telegram are configured, pushes go to both. + +## All settings + +| Key | Default | Notes | +|-----|---------|-------| +| `notify_enabled` | `false` | Master on/off switch | +| `notify_base_url` | `http://localhost:` | Externally reachable HTTP API base for action links | +| `notify_unblock_reply` | `continue` | Canned reply for the one-tap action | +| `notify_ntfy_server` | `https://ntfy.sh` | ntfy server base URL | +| `notify_ntfy_topic` | — | ntfy topic; setting it enables the ntfy provider | +| `notify_ntfy_token` | — | ntfy access token for protected topics (secret) | +| `notify_telegram_token` | — | Telegram bot token (secret); setting it + chat ID enables Telegram | +| `notify_telegram_chat_id` | — | Telegram chat ID | + +Settings whose names contain `token`/`key`/`secret`/`password` are never shown +by `ty settings` or returned by the settings API. diff --git a/internal/config/config.go b/internal/config/config.go index c8aefc0a..310270fd 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -29,8 +29,45 @@ const ( // SettingHTTPAPIDisabled, when "true", stops the daemon from hosting the // HTTP API (for headless/security-sensitive boxes). The API is on by default. SettingHTTPAPIDisabled = "http_api_disabled" + + // Push notification settings. OFF by default — nothing is sent unless + // SettingNotifyEnabled is "true" AND a provider is configured. See + // internal/notify for the delivery logic. + + // SettingNotifyEnabled, when "true", turns on push notifications for task + // lifecycle events (blocked/needs-input, auth-required, completed, failed). + SettingNotifyEnabled = "notify_enabled" + // SettingNotifyBaseURL is the externally reachable base URL of the daemon + // HTTP API, used to build one-tap action links in notifications (e.g. + // "https://ty.my-tailnet.ts.net:8080"). Falls back to http://localhost:. + SettingNotifyBaseURL = "notify_base_url" + // SettingNotifyUnblockReply is the canned reply sent to a blocked task when + // the user taps the one-tap action button. Defaults to "continue". + SettingNotifyUnblockReply = "notify_unblock_reply" + + // ntfy (https://ntfy.sh) provider. + // SettingNtfyServer is the ntfy server base URL (default https://ntfy.sh). + SettingNtfyServer = "notify_ntfy_server" + // SettingNtfyTopic is the ntfy topic to publish to. A bare topic name or a + // full topic URL. Empty disables the ntfy provider. + SettingNtfyTopic = "notify_ntfy_topic" + // SettingNtfyToken is an optional ntfy access token for protected topics. + // Treated as a secret (hidden from `ty settings` and the settings API). + SettingNtfyToken = "notify_ntfy_token" //nolint:gosec // G101: this is a settings-key name, not a credential + + // Telegram bot provider. + // SettingTelegramToken is the Telegram bot token. Secret. Empty disables it. + SettingTelegramToken = "notify_telegram_token" //nolint:gosec // G101: this is a settings-key name, not a credential + // SettingTelegramChatID is the Telegram chat ID to deliver messages to. + SettingTelegramChatID = "notify_telegram_chat_id" ) +// DefaultNtfyServer is the ntfy server used when SettingNtfyServer is unset. +const DefaultNtfyServer = "https://ntfy.sh" + +// DefaultUnblockReply is the canned reply sent on a one-tap unblock action. +const DefaultUnblockReply = "continue" + // DefaultHTTPAPIPort is the port the daemon-hosted HTTP API binds by default. // Matches the standalone `ty serve` default so existing clients (ty-web, the // ty-chrome extension) keep working without reconfiguration. diff --git a/internal/events/events.go b/internal/events/events.go index 7aa9b3e7..a620a782 100644 --- a/internal/events/events.go +++ b/internal/events/events.go @@ -42,9 +42,17 @@ type Event struct { Timestamp time.Time `json:"timestamp"` } +// Notifier receives every emitted event so it can deliver push notifications. +// It is implemented by internal/notify. Kept as an interface here so the events +// package stays free of provider/config dependencies. +type Notifier interface { + Notify(eventType string, task *db.Task, message string) +} + // Emitter handles event emission via hooks. type Emitter struct { hooksDir string + notifier Notifier wg sync.WaitGroup } @@ -53,21 +61,39 @@ func New(hooksDir string) *Emitter { return &Emitter{hooksDir: hooksDir} } +// SetNotifier attaches a push notifier. Once set, every emitted event is also +// forwarded to it (the notifier itself decides what, if anything, to send). +func (e *Emitter) SetNotifier(n Notifier) { + e.notifier = n +} + // Emit triggers a hook script if it exists for the event type. // Hooks run in a background goroutine — short-lived CLI commands should // call Wait before exiting so the hook actually runs. func (e *Emitter) Emit(event Event) { - if e.hooksDir == "" { - return - } if event.Timestamp.IsZero() { event.Timestamp = time.Now() } - e.wg.Add(1) - go func() { - defer e.wg.Done() - e.runHook(event) - }() + + // Run the matching hook script (if a hooks dir is configured). + if e.hooksDir != "" { + e.wg.Add(1) + go func() { + defer e.wg.Done() + e.runHook(event) + }() + } + + // Fan the event out to the push notifier on the same wait group so + // short-lived CLI/MCP commands flush notifications via Wait before exit. + // This runs independently of hooks — a notifier works even with no hooks dir. + if e.notifier != nil { + e.wg.Add(1) + go func() { + defer e.wg.Done() + e.notifier.Notify(event.Type, event.Task, event.Message) + }() + } } // Wait blocks until all in-flight hooks have completed. diff --git a/internal/events/events_test.go b/internal/events/events_test.go index 16dfc6c5..263423ab 100644 --- a/internal/events/events_test.go +++ b/internal/events/events_test.go @@ -3,6 +3,7 @@ package events import ( "os" "path/filepath" + "sync" "testing" "time" @@ -103,6 +104,50 @@ echo "$WORKTREE_PATH:$WORKTREE_BRANCH:$WORKTREE_PORT" > "` + markerFile + `" } } +// recordingNotifier captures the events forwarded to it. +type recordingNotifier struct { + mu sync.Mutex + events []string +} + +func (r *recordingNotifier) Notify(eventType string, task *db.Task, message string) { + r.mu.Lock() + defer r.mu.Unlock() + r.events = append(r.events, eventType) +} + +func (r *recordingNotifier) seen() []string { + r.mu.Lock() + defer r.mu.Unlock() + return append([]string(nil), r.events...) +} + +func TestEmitterForwardsToNotifier(t *testing.T) { + // No hooks dir: notifier must still receive every emitted event. + emitter := New("") + rec := &recordingNotifier{} + emitter.SetNotifier(rec) + + emitter.EmitTaskBlocked(&db.Task{ID: 1, Title: "Blocked"}, "needs input") + emitter.EmitTaskCompleted(&db.Task{ID: 1, Title: "Done"}) + emitter.Wait() + + seen := rec.seen() + if len(seen) != 2 { + t.Fatalf("notifier saw %d events, want 2: %v", len(seen), seen) + } + if seen[0] != TaskBlocked || seen[1] != TaskCompleted { + t.Errorf("notifier saw %v, want [%s %s]", seen, TaskBlocked, TaskCompleted) + } +} + +func TestEmitterNoNotifier(t *testing.T) { + // No notifier set: emitting must not panic. + emitter := New("") + emitter.Emit(Event{Type: TaskBlocked, TaskID: 1}) + emitter.Wait() +} + func TestEmitterNoHooksDir(t *testing.T) { emitter := New("") // Should not panic diff --git a/internal/executor/executor.go b/internal/executor/executor.go index 5e33d4d8..9f695b9c 100644 --- a/internal/executor/executor.go +++ b/internal/executor/executor.go @@ -25,6 +25,7 @@ import ( "github.com/bborn/workflow/internal/events" "github.com/bborn/workflow/internal/github" "github.com/bborn/workflow/internal/hooks" + "github.com/bborn/workflow/internal/notify" ) // TaskEvent represents a change to a task. @@ -165,6 +166,10 @@ func New(database *db.DB, cfg *config.Config) *Executor { executorName: display, } + // Attach the push notifier so lifecycle events (blocked/needs-input, + // auth-required, completed, failed) can deliver pushes. OFF unless configured. + eventsEmitter.SetNotifier(notify.New(database)) + // Register the events emitter with the database for event emission database.SetEventEmitter(eventsEmitter) @@ -203,6 +208,10 @@ func NewWithLogging(database *db.DB, cfg *config.Config, w io.Writer) *Executor executorName: display, } + // Attach the push notifier so lifecycle events (blocked/needs-input, + // auth-required, completed, failed) can deliver pushes. OFF unless configured. + eventsEmitter.SetNotifier(notify.New(database)) + // Register the events emitter with the database for event emission database.SetEventEmitter(eventsEmitter) diff --git a/internal/notify/integration_test.go b/internal/notify/integration_test.go new file mode 100644 index 00000000..c30678d1 --- /dev/null +++ b/internal/notify/integration_test.go @@ -0,0 +1,172 @@ +package notify_test + +import ( + "encoding/json" + "net/http" + "net/http/httptest" + "path/filepath" + "strconv" + "strings" + "sync" + "testing" + "time" + + "github.com/bborn/workflow/internal/config" + "github.com/bborn/workflow/internal/db" + "github.com/bborn/workflow/internal/events" + "github.com/bborn/workflow/internal/notify" +) + +// capture records the most recent ntfy publish payload. +type capture struct { + mu sync.Mutex + body map[string]any + hits int +} + +func (c *capture) record(b map[string]any) { + c.mu.Lock() + defer c.mu.Unlock() + c.body = b + c.hits++ +} + +func (c *capture) snapshot() (map[string]any, int) { + c.mu.Lock() + defer c.mu.Unlock() + return c.body, c.hits +} + +// TestEndToEndBlockedDeliversActionablePush exercises the full real path: +// db.UpdateTaskStatus(blocked) → events.Emitter.Emit → notify.Notifier → +// ntfy HTTP publish, asserting the push carries a one-tap action that targets +// the existing POST /api/tasks/{id}/input endpoint. This proves the wiring is +// not a parallel path — it rides the same emitter every mutation already uses. +func TestEndToEndBlockedDeliversActionablePush(t *testing.T) { + cap := &capture{} + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var payload map[string]any + _ = json.NewDecoder(r.Body).Decode(&payload) + cap.record(payload) + w.WriteHeader(http.StatusOK) + })) + defer srv.Close() + + dbPath := filepath.Join(t.TempDir(), "test.db") + database, err := db.Open(dbPath) + if err != nil { + t.Fatalf("open db: %v", err) + } + defer database.Close() + + if err := database.CreateProject(&db.Project{Name: "webapp", Path: t.TempDir()}); err != nil { + t.Fatalf("create project: %v", err) + } + task := &db.Task{Title: "Fix the login bug", Status: db.StatusProcessing, Type: db.TypeCode, Project: "webapp"} + if err := database.CreateTask(task); err != nil { + t.Fatalf("create task: %v", err) + } + // The agent asks a question via taskyou_needs_input, which logs it. + if err := database.AppendTaskLog(task.ID, "question", "Should I use Postgres or SQLite?"); err != nil { + t.Fatalf("append log: %v", err) + } + + // Configure notifications to point at our fake ntfy server. + settings := map[string]string{ + config.SettingNotifyEnabled: "true", + config.SettingNtfyServer: srv.URL, + config.SettingNtfyTopic: "ty-test", + config.SettingNotifyBaseURL: "https://ty.example.ts.net:8080", + } + for k, v := range settings { + if err := database.SetSetting(k, v); err != nil { + t.Fatalf("set setting %s: %v", k, err) + } + } + + // Wire the real emitter + notifier exactly like the executor/CLI do. + emitter := events.New("") // no hooks dir; notifier still fires + emitter.SetNotifier(notify.New(database)) + database.SetEventEmitter(emitter) + + // Block the task through the normal mutation path. + if err := database.UpdateTaskStatus(task.ID, db.StatusBlocked); err != nil { + t.Fatalf("update status: %v", err) + } + emitter.Wait() // flush async notification + + body, hits := cap.snapshot() + if hits == 0 { + t.Fatal("expected a push to be delivered on task.blocked") + } + if body["topic"] != "ty-test" { + t.Errorf("topic = %v, want ty-test", body["topic"]) + } + if title, _ := body["title"].(string); !strings.Contains(title, "Fix the login bug") { + t.Errorf("title %q missing task title", title) + } + if msg, _ := body["message"].(string); !strings.Contains(msg, "Postgres") || !strings.Contains(msg, "webapp") { + t.Errorf("message %q missing question/project", msg) + } + + actions, _ := body["actions"].([]any) + var found bool + wantURL := "https://ty.example.ts.net:8080/api/tasks/" + strconv.FormatInt(task.ID, 10) + "/input" + for _, a := range actions { + m, _ := a.(map[string]any) + if m["action"] == "http" && m["url"] == wantURL { + found = true + if m["method"] != http.MethodPost { + t.Errorf("action method = %v, want POST", m["method"]) + } + if reply, _ := m["body"].(string); !strings.Contains(reply, "continue") { + t.Errorf("action body %q missing default reply", reply) + } + } + } + if !found { + t.Errorf("no http action targeting %s; actions=%v", wantURL, actions) + } +} + +// TestEndToEndDisabledSendsNothing confirms the default-off behavior end to end. +func TestEndToEndDisabledSendsNothing(t *testing.T) { + cap := &capture{} + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + cap.record(nil) + w.WriteHeader(http.StatusOK) + })) + defer srv.Close() + + dbPath := filepath.Join(t.TempDir(), "test.db") + database, err := db.Open(dbPath) + if err != nil { + t.Fatalf("open db: %v", err) + } + defer database.Close() + if err := database.CreateProject(&db.Project{Name: "p", Path: t.TempDir()}); err != nil { + t.Fatalf("create project: %v", err) + } + task := &db.Task{Title: "T", Status: db.StatusProcessing, Type: db.TypeCode, Project: "p"} + if err := database.CreateTask(task); err != nil { + t.Fatalf("create task: %v", err) + } + // ntfy topic set, but notify_enabled NOT set → nothing should send. + _ = database.SetSetting(config.SettingNtfyServer, srv.URL) + _ = database.SetSetting(config.SettingNtfyTopic, "ty-test") + + emitter := events.New("") + emitter.SetNotifier(notify.New(database)) + database.SetEventEmitter(emitter) + + if err := database.UpdateTaskStatus(task.ID, db.StatusBlocked); err != nil { + t.Fatalf("update status: %v", err) + } + emitter.Wait() + + // Give any erroneous async send a brief window to land. + time.Sleep(50 * time.Millisecond) + if _, hits := cap.snapshot(); hits != 0 { + t.Fatalf("expected no delivery when disabled, got %d", hits) + } +} diff --git a/internal/notify/notify.go b/internal/notify/notify.go new file mode 100644 index 00000000..e9dbe29d --- /dev/null +++ b/internal/notify/notify.go @@ -0,0 +1,298 @@ +// Package notify delivers push notifications for task lifecycle events +// (blocked/needs-input, auth-required, completed, failed) to providers like +// ntfy and Telegram. It plugs into the existing events.Emitter rather than +// adding a parallel event path: events.Emitter calls Notifier.Notify for every +// event it emits, and the Notifier decides what (if anything) to send. +// +// Notifications are OFF by default. Nothing is sent unless notify_enabled is +// "true" and at least one provider is configured. Settings are read live from +// the database on every event so config changes take effect without a restart. +package notify + +import ( + "context" + "fmt" + "net/http" + "strings" + "time" + + "github.com/bborn/workflow/internal/config" + "github.com/bborn/workflow/internal/db" +) + +// sendTimeout bounds how long a single provider delivery may take. Kept short +// so a slow provider never holds up the executor or a CLI command's flush. +const sendTimeout = 8 * time.Second + +// SettingsStore is the subset of *db.DB the notifier needs. An interface keeps +// the package testable without a real database. +type SettingsStore interface { + GetSetting(key string) (string, error) + GetTaskLogs(taskID int64, limit int) ([]*db.TaskLog, error) +} + +// Action is a one-tap button attached to a notification. +type Action struct { + // Type is the provider-agnostic action kind: "view" opens a URL, "http" + // performs an HTTP request (used for one-tap unblock). + Type string + Label string + URL string + Method string + Body string + Headers map[string]string + // Clear dismisses the notification after the action succeeds (ntfy only). + Clear bool +} + +// Message is a provider-agnostic notification. +type Message struct { + Title string + Body string + Priority int // 1 (min) .. 5 (max); ntfy semantics, mapped per provider + Tags []string + ClickURL string + Actions []Action +} + +// Provider delivers a Message to a specific backend. +type Provider interface { + Name() string + Send(ctx context.Context, msg Message) error +} + +// Notifier maps task lifecycle events to notifications and fans them out to the +// configured providers. It satisfies the events.Notifier interface. +type Notifier struct { + store SettingsStore + client *http.Client + // logf, if set, receives non-fatal delivery errors. nil = silent. + logf func(format string, args ...any) +} + +// New builds a Notifier backed by the given database. +func New(store SettingsStore) *Notifier { + return &Notifier{ + store: store, + client: &http.Client{Timeout: sendTimeout}, + } +} + +// SetLogf installs a logging callback for delivery errors (best-effort). +func (n *Notifier) SetLogf(logf func(format string, args ...any)) { + n.logf = logf +} + +func (n *Notifier) logErr(format string, args ...any) { + if n.logf != nil { + n.logf(format, args...) + } +} + +// setting reads a setting, trimming whitespace and tolerating errors. +func (n *Notifier) setting(key string) string { + v, err := n.store.GetSetting(key) + if err != nil { + return "" + } + return strings.TrimSpace(v) +} + +// Enabled reports whether notifications are switched on. +func (n *Notifier) Enabled() bool { + return strings.EqualFold(n.setting(config.SettingNotifyEnabled), "true") +} + +// Notify is the events.Notifier entry point. It is called for every emitted +// event; non-notifiable types and disabled/unconfigured setups are dropped +// silently. Safe to call from a goroutine. +func (n *Notifier) Notify(eventType string, task *db.Task, message string) { + if !n.Enabled() { + return + } + spec, ok := notifiableEvents[eventType] + if !ok { + return + } + providers := n.providers() + if len(providers) == 0 { + return + } + + msg := n.buildMessage(spec, task, message) + + ctx, cancel := context.WithTimeout(context.Background(), sendTimeout) + defer cancel() + for _, p := range providers { + if err := p.Send(ctx, msg); err != nil { + n.logErr("notify: %s delivery failed: %v", p.Name(), err) + } + } +} + +// providers builds the set of configured providers from settings. +func (n *Notifier) providers() []Provider { + var out []Provider + + if topic := n.setting(config.SettingNtfyTopic); topic != "" { + server := n.setting(config.SettingNtfyServer) + if server == "" { + server = config.DefaultNtfyServer + } + out = append(out, &ntfyProvider{ + client: n.client, + server: server, + topic: topic, + token: n.setting(config.SettingNtfyToken), + }) + } + + if token := n.setting(config.SettingTelegramToken); token != "" { + if chatID := n.setting(config.SettingTelegramChatID); chatID != "" { + out = append(out, &telegramProvider{ + client: n.client, + token: token, + chatID: chatID, + }) + } + } + + return out +} + +// eventSpec describes how a given event type renders as a notification. +type eventSpec struct { + title string + tags []string + priority int + actionable bool // attach a one-tap unblock action +} + +// notifiableEvents is the allow-list of event types that produce a push. Other +// events (created/updated/started/...) are intentionally ignored. +var notifiableEvents = map[string]eventSpec{ + "task.blocked": {title: "Needs input", tags: []string{"bell"}, priority: 4, actionable: true}, + "task.auth_required": {title: "Auth required", tags: []string{"closed_lock_with_key"}, priority: 5, actionable: true}, + "task.completed": {title: "Completed", tags: []string{"white_check_mark"}, priority: 3}, + "task.failed": {title: "Failed", tags: []string{"x"}, priority: 4}, +} + +// buildMessage renders a Message for an event. +func (n *Notifier) buildMessage(spec eventSpec, task *db.Task, message string) Message { + title := spec.title + var taskID int64 + bodyLines := []string{} + + if task != nil { + taskID = task.ID + if task.Title != "" { + title = fmt.Sprintf("%s: %s", spec.title, task.Title) + } + if task.Project != "" { + bodyLines = append(bodyLines, "Project: "+task.Project) + } + } + + reason := n.reasonFor(task, message) + if reason != "" { + bodyLines = append(bodyLines, reason) + } + + msg := Message{ + Title: title, + Body: strings.Join(bodyLines, "\n"), + Priority: spec.priority, + Tags: spec.tags, + } + + if taskID > 0 { + base := n.baseURL() + msg.ClickURL = base + if spec.actionable { + reply := n.setting(config.SettingNotifyUnblockReply) + if reply == "" { + reply = config.DefaultUnblockReply + } + msg.Actions = []Action{ + { + Type: "http", + Label: fmt.Sprintf("Reply %q", reply), + URL: fmt.Sprintf("%s/api/tasks/%d/input", base, taskID), + Method: http.MethodPost, + Body: fmt.Sprintf(`{"message":%q}`, reply), + Headers: map[string]string{ + "Content-Type": "application/json", + }, + Clear: true, + }, + { + Type: "view", + Label: "Open task", + URL: base, + }, + } + } + } + + return msg +} + +// reasonFor produces a short, human-meaningful reason for the notification +// body. Several code paths emit a generic message ("status change", "Task needs +// input"); in those cases we surface the latest "question" log entry instead, +// which carries the actual taskyou_needs_input question or PR-review note. +func (n *Notifier) reasonFor(task *db.Task, message string) string { + msg := strings.TrimSpace(message) + if isGenericReason(msg) && task != nil { + if q := n.latestQuestion(task.ID); q != "" { + return truncate(q, 300) + } + } + return truncate(msg, 300) +} + +func isGenericReason(msg string) bool { + switch strings.ToLower(msg) { + case "", "status change", "task needs input", "task waiting for input": + return true + } + return false +} + +// latestQuestion returns the most recent "question" log entry for a task. +func (n *Notifier) latestQuestion(taskID int64) string { + logs, err := n.store.GetTaskLogs(taskID, 25) + if err != nil { + return "" + } + // GetTaskLogs returns newest-first, so the first question wins. + for _, l := range logs { + if l.LineType == "question" { + return strings.TrimSpace(l.Content) + } + } + return "" +} + +// baseURL returns the externally reachable base URL for action links, with no +// trailing slash. Falls back to http://localhost:. +func (n *Notifier) baseURL() string { + if u := n.setting(config.SettingNotifyBaseURL); u != "" { + return strings.TrimRight(u, "/") + } + port := config.DefaultHTTPAPIPort + if v := n.setting(config.SettingHTTPAPIPort); v != "" { + if _, err := fmt.Sscanf(v, "%d", &port); err != nil { + port = config.DefaultHTTPAPIPort + } + } + return fmt.Sprintf("http://localhost:%d", port) +} + +func truncate(s string, max int) string { + s = strings.TrimSpace(s) + if len(s) <= max { + return s + } + return s[:max-1] + "…" +} diff --git a/internal/notify/notify_test.go b/internal/notify/notify_test.go new file mode 100644 index 00000000..154f1e1c --- /dev/null +++ b/internal/notify/notify_test.go @@ -0,0 +1,218 @@ +package notify + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "strings" + "testing" + + "github.com/bborn/workflow/internal/config" + "github.com/bborn/workflow/internal/db" +) + +// fakeStore is an in-memory SettingsStore for tests. +type fakeStore struct { + settings map[string]string + logs map[int64][]*db.TaskLog +} + +func newFakeStore(settings map[string]string) *fakeStore { + return &fakeStore{settings: settings, logs: map[int64][]*db.TaskLog{}} +} + +func (f *fakeStore) GetSetting(key string) (string, error) { + return f.settings[key], nil +} + +func (f *fakeStore) GetTaskLogs(taskID int64, limit int) ([]*db.TaskLog, error) { + return f.logs[taskID], nil +} + +func sampleTask() *db.Task { + return &db.Task{ID: 42, Title: "Fix the login bug", Project: "webapp", Status: "blocked"} +} + +func TestNotifyDisabledByDefault(t *testing.T) { + // No settings at all → notifications off. + n := New(newFakeStore(map[string]string{})) + if n.Enabled() { + t.Fatal("expected notifications to be disabled by default") + } + + // A topic is configured but notify_enabled is not set: still off, and Notify + // must not panic or attempt delivery. + n = New(newFakeStore(map[string]string{config.SettingNtfyTopic: "mytopic"})) + if n.Enabled() { + t.Fatal("expected notifications disabled when notify_enabled unset") + } + n.Notify("task.blocked", sampleTask(), "needs input") // should be a no-op +} + +func TestNtfyDeliversBlockedWithAction(t *testing.T) { + var got ntfyPayload + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Header.Get("Authorization") != "Bearer secret-token" { + t.Errorf("missing/wrong auth header: %q", r.Header.Get("Authorization")) + } + if err := json.NewDecoder(r.Body).Decode(&got); err != nil { + t.Errorf("decode payload: %v", err) + } + w.WriteHeader(http.StatusOK) + })) + defer srv.Close() + + n := New(newFakeStore(map[string]string{ + config.SettingNotifyEnabled: "true", + config.SettingNtfyServer: srv.URL, + config.SettingNtfyTopic: "ty-bruno", + config.SettingNtfyToken: "secret-token", + config.SettingNotifyBaseURL: "https://ty.example.ts.net:8080", + })) + + n.Notify("task.blocked", sampleTask(), "Which database should I use?") + + if got.Topic != "ty-bruno" { + t.Errorf("topic = %q, want ty-bruno", got.Topic) + } + if !strings.Contains(got.Title, "Fix the login bug") { + t.Errorf("title %q missing task title", got.Title) + } + if !strings.Contains(got.Message, "webapp") { + t.Errorf("body %q missing project", got.Message) + } + if !strings.Contains(got.Message, "Which database") { + t.Errorf("body %q missing reason", got.Message) + } + + // One-tap action must POST to the input endpoint with a JSON reply body. + var httpAction *ntfyAction + for i := range got.Actions { + if got.Actions[i].Action == "http" { + httpAction = &got.Actions[i] + } + } + if httpAction == nil { + t.Fatal("expected an http action for one-tap unblock") + } + wantURL := "https://ty.example.ts.net:8080/api/tasks/42/input" + if httpAction.URL != wantURL { + t.Errorf("action URL = %q, want %q", httpAction.URL, wantURL) + } + if httpAction.Method != http.MethodPost { + t.Errorf("action method = %q, want POST", httpAction.Method) + } + if !strings.Contains(httpAction.Body, "continue") { + t.Errorf("action body %q missing default reply", httpAction.Body) + } +} + +func TestNotifyIgnoresNonNotifiableEvents(t *testing.T) { + called := false + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + called = true + w.WriteHeader(http.StatusOK) + })) + defer srv.Close() + + n := New(newFakeStore(map[string]string{ + config.SettingNotifyEnabled: "true", + config.SettingNtfyServer: srv.URL, + config.SettingNtfyTopic: "ty-bruno", + })) + + n.Notify("task.created", sampleTask(), "") + n.Notify("task.updated", sampleTask(), "") + if called { + t.Fatal("non-notifiable events must not trigger delivery") + } + + n.Notify("task.completed", sampleTask(), "") + if !called { + t.Fatal("task.completed should trigger delivery") + } +} + +func TestReasonFallsBackToLatestQuestion(t *testing.T) { + store := newFakeStore(map[string]string{ + config.SettingNotifyEnabled: "true", + }) + // Newest-first ordering, like GetTaskLogs. + store.logs[42] = []*db.TaskLog{ + {ID: 3, TaskID: 42, LineType: "output", Content: "some output"}, + {ID: 2, TaskID: 42, LineType: "question", Content: "Should I delete the old table?"}, + {ID: 1, TaskID: 42, LineType: "question", Content: "an older question"}, + } + n := New(store) + + // A generic reason should be replaced by the latest question log. + if got := n.reasonFor(sampleTask(), "status change"); got != "Should I delete the old table?" { + t.Errorf("reason = %q, want the latest question", got) + } + // A specific reason should be preserved as-is. + if got := n.reasonFor(sampleTask(), "OAuth token expired"); got != "OAuth token expired" { + t.Errorf("reason = %q, want the provided message", got) + } +} + +func TestBaseURLFallback(t *testing.T) { + n := New(newFakeStore(map[string]string{})) + if got := n.baseURL(); got != "http://localhost:8080" { + t.Errorf("baseURL = %q, want http://localhost:8080", got) + } + + n = New(newFakeStore(map[string]string{config.SettingHTTPAPIPort: "9000"})) + if got := n.baseURL(); got != "http://localhost:9000" { + t.Errorf("baseURL = %q, want http://localhost:9000", got) + } + + n = New(newFakeStore(map[string]string{config.SettingNotifyBaseURL: "https://x.ts.net:8080/"})) + if got := n.baseURL(); got != "https://x.ts.net:8080" { + t.Errorf("baseURL = %q, want trailing slash trimmed", got) + } +} + +func TestTelegramDeliversWithDeepLink(t *testing.T) { + var got telegramPayload + var path string + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + path = r.URL.Path + _ = json.NewDecoder(r.Body).Decode(&got) + w.WriteHeader(http.StatusOK) + })) + defer srv.Close() + + p := &telegramProvider{client: srv.Client(), token: "bot-token", chatID: "12345", apiBase: srv.URL} + msg := Message{ + Title: "Needs input: Fix the login bug", + Body: "Project: webapp\nWhich database?", + ClickURL: "https://ty.example/", + Actions: []Action{ + {Type: "view", Label: "Open task", URL: "https://ty.example/"}, + // An http action has no URL-navigation equivalent and must be dropped. + {Type: "http", Label: "Reply", URL: "https://ty.example/api/tasks/42/input"}, + }, + } + + if err := p.Send(context.Background(), msg); err != nil { + t.Fatalf("Send returned error: %v", err) + } + + if path != "/botbot-token/sendMessage" { + t.Errorf("path = %q, want /botbot-token/sendMessage", path) + } + if got.ChatID != "12345" { + t.Errorf("chat_id = %q, want 12345", got.ChatID) + } + if !strings.Contains(got.Text, "Fix the login bug") || !strings.Contains(got.Text, "Which database?") { + t.Errorf("text %q missing expected content", got.Text) + } + if got.ReplyMarkup == nil || len(got.ReplyMarkup.InlineKeyboard) != 1 { + t.Fatalf("expected one inline keyboard row, got %+v", got.ReplyMarkup) + } + row := got.ReplyMarkup.InlineKeyboard[0] + if len(row) != 1 || row[0].URL != "https://ty.example/" { + t.Errorf("expected single view button to the web UI, got %+v", row) + } +} diff --git a/internal/notify/ntfy.go b/internal/notify/ntfy.go new file mode 100644 index 00000000..2ab81149 --- /dev/null +++ b/internal/notify/ntfy.go @@ -0,0 +1,115 @@ +package notify + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "strings" +) + +// ntfyProvider publishes to an ntfy server (https://ntfy.sh or self-hosted) +// using the JSON publish API, which lets us attach structured action buttons +// without the escaping pitfalls of the header-based format. +type ntfyProvider struct { + client *http.Client + server string // base URL, e.g. https://ntfy.sh + topic string // bare topic or full topic URL + token string // optional access token for protected topics +} + +func (p *ntfyProvider) Name() string { return "ntfy" } + +// ntfyAction mirrors ntfy's JSON action schema. +type ntfyAction struct { + Action string `json:"action"` // "view" | "http" | "broadcast" + Label string `json:"label"` + URL string `json:"url"` + Method string `json:"method,omitempty"` + Headers map[string]string `json:"headers,omitempty"` + Body string `json:"body,omitempty"` + Clear bool `json:"clear,omitempty"` +} + +type ntfyPayload struct { + Topic string `json:"topic"` + Title string `json:"title,omitempty"` + Message string `json:"message,omitempty"` + Priority int `json:"priority,omitempty"` + Tags []string `json:"tags,omitempty"` + Click string `json:"click,omitempty"` + Actions []ntfyAction `json:"actions,omitempty"` +} + +// resolve splits the configured server/topic into the JSON publish endpoint and +// the bare topic name. A full topic URL in `topic` (e.g. +// "https://ntfy.sh/mytopic") overrides `server`. +func (p *ntfyProvider) resolve() (endpoint, topic string) { + server := strings.TrimRight(p.server, "/") + topic = p.topic + if strings.HasPrefix(topic, "http://") || strings.HasPrefix(topic, "https://") { + trimmed := strings.TrimRight(topic, "/") + if idx := strings.LastIndex(trimmed, "/"); idx != -1 { + server = trimmed[:idx] + topic = trimmed[idx+1:] + } + } + // ntfy's JSON publish endpoint is the server root. + return server, topic +} + +func (p *ntfyProvider) Send(ctx context.Context, msg Message) error { + endpoint, topic := p.resolve() + + payload := ntfyPayload{ + Topic: topic, + Title: msg.Title, + Message: msg.Body, + Priority: msg.Priority, + Tags: msg.Tags, + Click: msg.ClickURL, + } + for _, a := range msg.Actions { + action := "view" + if a.Type == "http" { + action = "http" + } + payload.Actions = append(payload.Actions, ntfyAction{ + Action: action, + Label: a.Label, + URL: a.URL, + Method: a.Method, + Headers: a.Headers, + Body: a.Body, + Clear: a.Clear, + }) + } + + body, err := json.Marshal(payload) + if err != nil { + return fmt.Errorf("marshal ntfy payload: %w", err) + } + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, endpoint, bytes.NewReader(body)) + if err != nil { + return fmt.Errorf("build ntfy request: %w", err) + } + req.Header.Set("Content-Type", "application/json") + if p.token != "" { + req.Header.Set("Authorization", "Bearer "+p.token) + } + + resp, err := p.client.Do(req) + if err != nil { + return fmt.Errorf("post to ntfy: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode >= 300 { + snippet, _ := io.ReadAll(io.LimitReader(resp.Body, 512)) + return fmt.Errorf("ntfy returned %d: %s", resp.StatusCode, strings.TrimSpace(string(snippet))) + } + return nil +} diff --git a/internal/notify/telegram.go b/internal/notify/telegram.go new file mode 100644 index 00000000..0ef81bc2 --- /dev/null +++ b/internal/notify/telegram.go @@ -0,0 +1,104 @@ +package notify + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "strings" +) + +// telegramProvider delivers via the Telegram Bot API. Telegram inline buttons +// can only navigate (URL buttons) — they can't POST to our API the way an ntfy +// "http" action can — so actions degrade to deep links that open the web UI. +type telegramProvider struct { + client *http.Client + token string + chatID string + // apiBase is the Telegram Bot API root. Empty defaults to the public API; + // overridable in tests. + apiBase string +} + +func (p *telegramProvider) Name() string { return "telegram" } + +func (p *telegramProvider) base() string { + if p.apiBase != "" { + return strings.TrimRight(p.apiBase, "/") + } + return "https://api.telegram.org" +} + +type telegramButton struct { + Text string `json:"text"` + URL string `json:"url"` +} + +type telegramMarkup struct { + InlineKeyboard [][]telegramButton `json:"inline_keyboard"` +} + +type telegramPayload struct { + ChatID string `json:"chat_id"` + Text string `json:"text"` + ReplyMarkup *telegramMarkup `json:"reply_markup,omitempty"` +} + +func (p *telegramProvider) Send(ctx context.Context, msg Message) error { + var sb strings.Builder + if msg.Title != "" { + sb.WriteString(msg.Title) + } + if msg.Body != "" { + if sb.Len() > 0 { + sb.WriteString("\n") + } + sb.WriteString(msg.Body) + } + + payload := telegramPayload{ + ChatID: p.chatID, + Text: sb.String(), + } + + // Telegram can't POST, so only URL-navigation buttons survive. Always offer + // "Open task" so the user can reply from the web UI. + var buttons []telegramButton + for _, a := range msg.Actions { + if a.Type == "view" && a.URL != "" { + buttons = append(buttons, telegramButton{Text: a.Label, URL: a.URL}) + } + } + if len(buttons) == 0 && msg.ClickURL != "" { + buttons = append(buttons, telegramButton{Text: "Open task", URL: msg.ClickURL}) + } + if len(buttons) > 0 { + payload.ReplyMarkup = &telegramMarkup{InlineKeyboard: [][]telegramButton{buttons}} + } + + body, err := json.Marshal(payload) + if err != nil { + return fmt.Errorf("marshal telegram payload: %w", err) + } + + endpoint := fmt.Sprintf("%s/bot%s/sendMessage", p.base(), p.token) + req, err := http.NewRequestWithContext(ctx, http.MethodPost, endpoint, bytes.NewReader(body)) + if err != nil { + return fmt.Errorf("build telegram request: %w", err) + } + req.Header.Set("Content-Type", "application/json") + + resp, err := p.client.Do(req) + if err != nil { + return fmt.Errorf("post to telegram: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode >= 300 { + snippet, _ := io.ReadAll(io.LimitReader(resp.Body, 512)) + return fmt.Errorf("telegram returned %d: %s", resp.StatusCode, strings.TrimSpace(string(snippet))) + } + return nil +} From e7601da8043304633f66dfca9963cf7bafd488ea Mon Sep 17 00:00:00 2001 From: Bruno Bornsztein Date: Tue, 23 Jun 2026 18:28:59 -0500 Subject: [PATCH 2/2] fix(notify): read settings synchronously so short-lived CLI commands deliver MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Live end-to-end testing surfaced a race: the notifier read its settings from the DB inside the async delivery goroutine, but short-lived CLI commands (e.g. `ty status`) `defer database.Close()` the moment Run returns — before PersistentPostRun flushes the emitter wait group. The goroutine then read a closed DB, saw notifications as disabled, and silently dropped the push. (Daemon/MCP were unaffected: the DB stays open.) - events.Notifier.Notify now returns a delivery closure. The emitter calls Notify synchronously (DB guaranteed open) and runs the returned closure on its wait group; the closure performs only network I/O, touching no DB. - notify.Notifier.Notify reads settings/logs and builds the message up front, returns the sender closure (nil = nothing to send). Adds Deliver() for synchronous callers/tests. - Surface delivery failures: daemon executor wires logf to its logger; openTaskDB attaches a stderr logf gated by TY_NOTIFY_DEBUG. Best-effort pushes were otherwise completely silent on failure. - Update events/notify tests for the new signature. Verified live: blocking a task delivered an ntfy push (title + project + the actual needs-input question), and tapping the http action POSTed to /api/tasks/{id}/input and landed the reply in the agent's tmux pane. Co-Authored-By: Claude Opus 4.8 --- cmd/task/main.go | 8 +++++++- internal/events/events.go | 28 +++++++++++++++++--------- internal/events/events_test.go | 6 ++++-- internal/executor/executor.go | 16 +++++++++++++-- internal/notify/notify.go | 36 +++++++++++++++++++++++----------- internal/notify/notify_test.go | 10 +++++----- 6 files changed, 74 insertions(+), 30 deletions(-) diff --git a/cmd/task/main.go b/cmd/task/main.go index c36b56e5..5324913e 100644 --- a/cmd/task/main.go +++ b/cmd/task/main.go @@ -87,7 +87,13 @@ func openTaskDB(path string) (*db.DB, error) { } // Bind the push notifier to the database this caller is using so settings // (and the latest needs-input question) are read from the live handle. - taskEmitter.SetNotifier(notify.New(database)) + ntf := notify.New(database) + if os.Getenv("TY_NOTIFY_DEBUG") != "" { + ntf.SetLogf(func(format string, args ...any) { + fmt.Fprintf(os.Stderr, "[notify] "+format+"\n", args...) + }) + } + taskEmitter.SetNotifier(ntf) database.SetEventEmitter(taskEmitter) return database, nil } diff --git a/internal/events/events.go b/internal/events/events.go index a620a782..892b050a 100644 --- a/internal/events/events.go +++ b/internal/events/events.go @@ -45,8 +45,15 @@ type Event struct { // Notifier receives every emitted event so it can deliver push notifications. // It is implemented by internal/notify. Kept as an interface here so the events // package stays free of provider/config dependencies. +// +// Notify is invoked synchronously while the caller's database handle is still +// open, so it must read any state it needs (settings, task logs) before +// returning. It returns a delivery closure to run asynchronously — or nil if +// nothing should be sent. Splitting it this way keeps slow network I/O off the +// caller's path while ensuring DB reads never race a deferred db.Close() in +// short-lived CLI/MCP commands. type Notifier interface { - Notify(eventType string, task *db.Task, message string) + Notify(eventType string, task *db.Task, message string) func() } // Emitter handles event emission via hooks. @@ -84,15 +91,18 @@ func (e *Emitter) Emit(event Event) { }() } - // Fan the event out to the push notifier on the same wait group so - // short-lived CLI/MCP commands flush notifications via Wait before exit. - // This runs independently of hooks — a notifier works even with no hooks dir. + // Fan the event out to the push notifier. The notifier reads state + // synchronously here (DB still open) and hands back a delivery closure that + // we run on the same wait group, so short-lived CLI/MCP commands flush + // notifications via Wait before exit. Works even with no hooks dir. if e.notifier != nil { - e.wg.Add(1) - go func() { - defer e.wg.Done() - e.notifier.Notify(event.Type, event.Task, event.Message) - }() + if deliver := e.notifier.Notify(event.Type, event.Task, event.Message); deliver != nil { + e.wg.Add(1) + go func() { + defer e.wg.Done() + deliver() + }() + } } } diff --git a/internal/events/events_test.go b/internal/events/events_test.go index 263423ab..0bf447d4 100644 --- a/internal/events/events_test.go +++ b/internal/events/events_test.go @@ -110,10 +110,12 @@ type recordingNotifier struct { events []string } -func (r *recordingNotifier) Notify(eventType string, task *db.Task, message string) { +func (r *recordingNotifier) Notify(eventType string, task *db.Task, message string) func() { r.mu.Lock() - defer r.mu.Unlock() r.events = append(r.events, eventType) + r.mu.Unlock() + // Return a no-op delivery closure to exercise the emitter's async path. + return func() {} } func (r *recordingNotifier) seen() []string { diff --git a/internal/executor/executor.go b/internal/executor/executor.go index 9f695b9c..ae6cd8a2 100644 --- a/internal/executor/executor.go +++ b/internal/executor/executor.go @@ -168,7 +168,13 @@ func New(database *db.DB, cfg *config.Config) *Executor { // Attach the push notifier so lifecycle events (blocked/needs-input, // auth-required, completed, failed) can deliver pushes. OFF unless configured. - eventsEmitter.SetNotifier(notify.New(database)) + // Surface delivery failures in the executor log — pushes are best-effort, so + // this is the only place a misconfigured topic/token shows up. + ntf := notify.New(database) + ntf.SetLogf(func(format string, args ...any) { + e.logger.Warn(fmt.Sprintf(format, args...)) + }) + eventsEmitter.SetNotifier(ntf) // Register the events emitter with the database for event emission database.SetEventEmitter(eventsEmitter) @@ -210,7 +216,13 @@ func NewWithLogging(database *db.DB, cfg *config.Config, w io.Writer) *Executor // Attach the push notifier so lifecycle events (blocked/needs-input, // auth-required, completed, failed) can deliver pushes. OFF unless configured. - eventsEmitter.SetNotifier(notify.New(database)) + // Surface delivery failures in the executor log — pushes are best-effort, so + // this is the only place a misconfigured topic/token shows up. + ntf := notify.New(database) + ntf.SetLogf(func(format string, args ...any) { + e.logger.Warn(fmt.Sprintf(format, args...)) + }) + eventsEmitter.SetNotifier(ntf) // Register the events emitter with the database for event emission database.SetEventEmitter(eventsEmitter) diff --git a/internal/notify/notify.go b/internal/notify/notify.go index e9dbe29d..650325bb 100644 --- a/internal/notify/notify.go +++ b/internal/notify/notify.go @@ -104,32 +104,46 @@ func (n *Notifier) Enabled() bool { } // Notify is the events.Notifier entry point. It is called for every emitted -// event; non-notifiable types and disabled/unconfigured setups are dropped -// silently. Safe to call from a goroutine. -func (n *Notifier) Notify(eventType string, task *db.Task, message string) { +// event and reads all of its state (settings, task logs) synchronously, while +// the caller's DB handle is guaranteed open. It returns a delivery closure that +// performs the (slow, network-bound) send, or nil when there is nothing to send +// — non-notifiable types, notifications disabled, or no providers configured. +// The returned closure touches no database, so it is safe to run after the +// caller has closed its DB. +func (n *Notifier) Notify(eventType string, task *db.Task, message string) func() { if !n.Enabled() { - return + return nil } spec, ok := notifiableEvents[eventType] if !ok { - return + return nil } providers := n.providers() if len(providers) == 0 { - return + return nil } msg := n.buildMessage(spec, task, message) - ctx, cancel := context.WithTimeout(context.Background(), sendTimeout) - defer cancel() - for _, p := range providers { - if err := p.Send(ctx, msg); err != nil { - n.logErr("notify: %s delivery failed: %v", p.Name(), err) + return func() { + ctx, cancel := context.WithTimeout(context.Background(), sendTimeout) + defer cancel() + for _, p := range providers { + if err := p.Send(ctx, msg); err != nil { + n.logErr("notify: %s delivery failed: %v", p.Name(), err) + } } } } +// Deliver reads state and sends synchronously in one call. It is a convenience +// for callers (and tests) that aren't driving the events.Emitter wait group. +func (n *Notifier) Deliver(eventType string, task *db.Task, message string) { + if deliver := n.Notify(eventType, task, message); deliver != nil { + deliver() + } +} + // providers builds the set of configured providers from settings. func (n *Notifier) providers() []Provider { var out []Provider diff --git a/internal/notify/notify_test.go b/internal/notify/notify_test.go index 154f1e1c..a97049dd 100644 --- a/internal/notify/notify_test.go +++ b/internal/notify/notify_test.go @@ -47,7 +47,7 @@ func TestNotifyDisabledByDefault(t *testing.T) { if n.Enabled() { t.Fatal("expected notifications disabled when notify_enabled unset") } - n.Notify("task.blocked", sampleTask(), "needs input") // should be a no-op + n.Deliver("task.blocked", sampleTask(), "needs input") // should be a no-op } func TestNtfyDeliversBlockedWithAction(t *testing.T) { @@ -71,7 +71,7 @@ func TestNtfyDeliversBlockedWithAction(t *testing.T) { config.SettingNotifyBaseURL: "https://ty.example.ts.net:8080", })) - n.Notify("task.blocked", sampleTask(), "Which database should I use?") + n.Deliver("task.blocked", sampleTask(), "Which database should I use?") if got.Topic != "ty-bruno" { t.Errorf("topic = %q, want ty-bruno", got.Topic) @@ -122,13 +122,13 @@ func TestNotifyIgnoresNonNotifiableEvents(t *testing.T) { config.SettingNtfyTopic: "ty-bruno", })) - n.Notify("task.created", sampleTask(), "") - n.Notify("task.updated", sampleTask(), "") + n.Deliver("task.created", sampleTask(), "") + n.Deliver("task.updated", sampleTask(), "") if called { t.Fatal("non-notifiable events must not trigger delivery") } - n.Notify("task.completed", sampleTask(), "") + n.Deliver("task.completed", sampleTask(), "") if !called { t.Fatal("task.completed should trigger delivery") }