From 924a7cf3b95b41103d51a6140ae40ac9940d7e43 Mon Sep 17 00:00:00 2001 From: Fourier Date: Wed, 25 Mar 2026 16:05:08 +0800 Subject: [PATCH 1/7] =?UTF-8?q?feat:=20add=20channel=20=E2=80=94=20event?= =?UTF-8?q?=20subscription=20protocol?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements opencli channel with consumer-side subscription model: - Sources poll platforms (GitHub via gh api), track cursors, dedup events - Consumers subscribe to origins they care about - Daemon delivers events to subscribers via sinks (stdout, webhook) CLI: opencli channel {sources,subscribe,unsubscribe,subscriptions,start,stop,status,poll} GitHub source: repo events, issue/PR comments, pulls, issues Docs: docs/channel.md with architecture, usage, extension guide Ref: #369 --- README.md | 17 ++ docs/channel.md | 309 ++++++++++++++++++++++++++++ src/channel/cursor-store.ts | 61 ++++++ src/channel/dedup.ts | 28 +++ src/channel/index.ts | 376 ++++++++++++++++++++++++++++++++++ src/channel/registry.ts | 82 ++++++++ src/channel/scheduler.ts | 149 ++++++++++++++ src/channel/sinks/stdout.ts | 18 ++ src/channel/sinks/webhook.ts | 38 ++++ src/channel/sources/github.ts | 317 ++++++++++++++++++++++++++++ src/channel/types.ts | 84 ++++++++ src/cli.ts | 4 + 12 files changed, 1483 insertions(+) create mode 100644 docs/channel.md create mode 100644 src/channel/cursor-store.ts create mode 100644 src/channel/dedup.ts create mode 100644 src/channel/index.ts create mode 100644 src/channel/registry.ts create mode 100644 src/channel/scheduler.ts create mode 100644 src/channel/sinks/stdout.ts create mode 100644 src/channel/sinks/webhook.ts create mode 100644 src/channel/sources/github.ts create mode 100644 src/channel/types.ts diff --git a/README.md b/README.md index 0c61db0a..2eb82373 100644 --- a/README.md +++ b/README.md @@ -336,6 +336,23 @@ Explore outputs to `.opencli/explore//` (manifest.json, endpoints.json, ca See **[TESTING.md](./TESTING.md)** for how to run and write tests. +## Channel — Event Subscriptions + +Subscribe to platform events from the command line. Channel brings the reverse direction: instead of you querying platforms, platforms notify you when something changes. + +```bash +# Subscribe to comments on a GitHub issue +opencli channel subscribe github:owner/repo#42 + +# Start polling +opencli channel start + +# One-shot poll +opencli channel poll github:owner/repo#42 +``` + +See **[docs/channel.md](./docs/channel.md)** for the full guide. + ## Troubleshooting - **"Extension not connected"** diff --git a/docs/channel.md b/docs/channel.md new file mode 100644 index 00000000..69c18a19 --- /dev/null +++ b/docs/channel.md @@ -0,0 +1,309 @@ +# OpenCLI Channel — Event Subscription Protocol + +**Subscribe to platform events from the command line.** Channel brings the reverse direction to OpenCLI: instead of you asking platforms for data, platforms tell you when something happens. + +## The Idea + +OpenCLI today is pull-only: `opencli twitter post`, `opencli notion read`, `opencli gh pr list`. You ask, the platform answers. But what about the other direction? + +- A reviewer leaves comments on your GitHub PR → you want your agent to pick them up automatically. +- Someone comments on your Notion doc → you want to be notified and respond. +- A new issue appears in your repo → you want it routed to the right handler. + +Channel fills this gap. It's [fetchmail](https://en.wikipedia.org/wiki/Fetchmail) for APIs: poll remote platforms, track what you've already seen, deliver new events to whoever subscribed. + +## Quick Start + +```bash +# 1. See what sources are available +opencli channel sources + +# 2. Subscribe to an issue's comments +opencli channel subscribe github:owner/repo#42 + +# 3. One-shot poll (prints events as JSON lines) +opencli channel poll github:owner/repo#42 + +# 4. Start the daemon for continuous polling +opencli channel start + +# 5. Check status +opencli channel status + +# 6. Stop the daemon +opencli channel stop +``` + +## Architecture + +``` +┌─ opencli channel ─────────────────────────────────────┐ +│ │ +│ Sources (platform adapters): │ +│ └── github.ts → poll via `gh api` │ +│ │ +│ Core: │ +│ ├── Scheduler → per-origin poll loop + backoff │ +│ ├── Cursor Store → persists position per origin │ +│ ├── Dedup → ring buffer, no re-delivery │ +│ └── Registry → who subscribed to what │ +│ │ +│ Sinks (output adapters): │ +│ ├── stdout → JSON lines (pipe-friendly) │ +│ └── webhook → POST to any URL │ +│ │ +└────────────────────────────────────────────────────────┘ +``` + +**Three boundaries, cleanly separated:** +- **Sources** know how to poll a specific platform. They don't know about subscribers or sinks. +- **Core** knows scheduling, state, dedup, and the subscription registry. It doesn't know about platforms. +- **Sinks** know how to deliver events. They don't know where events came from. + +### Consumer-Side Subscription + +The key design choice: **Channel doesn't decide where events go.** Instead, consumers (humans or agents) subscribe to the origins they care about. + +This means: +- No routing logic in Channel. No dispatcher, no "smart" routing. +- Multi-to-multi is free: one consumer subscribes to many sources, one source has many consumers. +- Session lifecycle is not Channel's problem: if a consumer dies, delivery fails, Channel cleans up. + +## Event Schema + +Every event follows a unified envelope: + +```json +{ + "id": "gh-comment-123456", + "source": "github", + "type": "issue_comment.created", + "timestamp": "2026-03-24T17:30:00Z", + "origin": "github:user/repo#42", + "payload": { + "author": "reviewer", + "body": "This needs error handling", + "htmlUrl": "https://github.com/user/repo/issues/42#issuecomment-123456" + } +} +``` + +| Field | Description | +|-------|-------------| +| `id` | Globally unique event ID (used for dedup) | +| `source` | Which source adapter produced this event | +| `type` | Platform-specific event type (dot-namespaced) | +| `timestamp` | When the event occurred on the platform (ISO-8601) | +| `origin` | Origin identifier — what subscriptions match against | +| `payload` | Platform-specific event data | + +## GitHub Source + +The GitHub source adapter uses `gh api` for all API calls, inheriting your existing `gh` authentication, proxy settings, and host configuration. + +### Origin Formats + +| Origin | What it watches | +|--------|-----------------| +| `github:owner/repo` | All repo events (pushes, PRs, issues, stars, etc.) | +| `github:owner/repo#42` | Comments on issue/PR #42 | +| `github:owner/repo/pulls` | All pull request activity | +| `github:owner/repo/issues` | All issue activity | + +### Event Types + +| Event Type | Origin | Description | +|------------|--------|-------------| +| `issue_comment.created` | `#number` | New comment on an issue/PR | +| `pull_request.open` | `/pulls` | PR opened or updated | +| `pull_request.closed` | `/pulls` | PR closed | +| `issue.open` | `/issues` | Issue opened or updated | +| `issue.closed` | `/issues` | Issue closed | +| `push` | repo-level | Code pushed | +| `pull_request_review` | repo-level | PR review submitted | +| `release` | repo-level | New release published | +| `star` | repo-level | Repo starred | + +### Examples + +```bash +# Watch a specific issue for new comments +opencli channel subscribe github:jackwener/opencli#369 +opencli channel start + +# Watch all PRs in a repo +opencli channel subscribe github:myorg/myproject/pulls + +# One-shot: grab recent events for an issue +opencli channel poll github:myorg/myproject#100 + +# Poll from a specific point in time +opencli channel poll github:myorg/myproject#100 --since 2026-03-01T00:00:00Z +``` + +## CLI Reference + +### `opencli channel sources [name]` + +List available event sources. With a source name, lists subscribable items. + +```bash +opencli channel sources # all sources +opencli channel sources github # GitHub-specific items +``` + +### `opencli channel subscribe ` + +Subscribe to events from an origin. + +```bash +opencli channel subscribe github:owner/repo#42 +opencli channel subscribe github:owner/repo/pulls --sink webhook --webhook-url http://localhost:3000/events +opencli channel subscribe github:owner/repo --interval 120000 # 2 min interval +``` + +Options: +- `-s, --sink ` — Sink to deliver to (default: `stdout`) +- `-i, --interval ` — Poll interval in ms (default: `60000`) +- `--webhook-url ` — URL for webhook sink + +### `opencli channel unsubscribe ` + +Remove a subscription. + +```bash +opencli channel unsubscribe github:owner/repo#42 +``` + +### `opencli channel subscriptions` + +List all current subscriptions. + +```bash +opencli channel subscriptions +opencli channel subscriptions --format json +``` + +### `opencli channel start` + +Start the polling daemon. + +```bash +opencli channel start # foreground (Ctrl+C to stop) +opencli channel start -d # background daemon +``` + +### `opencli channel stop` + +Stop the background daemon. + +### `opencli channel status` + +Show daemon status, subscription list, and cursor positions. + +### `opencli channel poll ` + +One-shot poll: fetch events and print to stdout as JSON lines. + +```bash +opencli channel poll github:owner/repo#42 +opencli channel poll github:owner/repo#42 --since 2026-03-01T00:00:00Z +``` + +## Writing a Custom Source Adapter + +A source adapter implements the `ChannelSource` interface: + +```typescript +import type { ChannelSource, ChannelEvent, PollResult, SourcePollConfig, SubscribableItem } from '../types.js'; + +export class MySource implements ChannelSource { + readonly name = 'mysource'; + + async listSubscribable(config: Record): Promise { + // Return items users can subscribe to + return [ + { origin: 'mysource:channel/general', description: 'General channel' }, + ]; + } + + parseOrigin(origin: string): SourcePollConfig | null { + // Parse "mysource:channel/general" → config object + if (!origin.startsWith('mysource:')) return null; + const channel = origin.slice('mysource:'.length); + return { channel }; + } + + async poll(config: SourcePollConfig, cursor: string | null): Promise { + // Fetch new events since cursor + // Use CLI tools (not raw HTTP) when possible + const events: ChannelEvent[] = [/* ... */]; + return { events, cursor: 'new-cursor-value' }; + } +} +``` + +Then register it in `src/channel/index.ts`: + +```typescript +import { MySource } from './sources/mysource.js'; + +function getSources(): Map { + const map = new Map(); + map.set('github', new GitHubSource()); + map.set('mysource', new MySource()); // ← add here + return map; +} +``` + +## Writing a Custom Sink Adapter + +A sink adapter implements the `ChannelSink` interface: + +```typescript +import type { ChannelSink, ChannelEvent } from '../types.js'; + +export class MySink implements ChannelSink { + readonly name = 'mysink'; + + async init(config: Record): Promise { + // Initialize with config from subscription + } + + async deliver(events: ChannelEvent[]): Promise { + for (const event of events) { + // Deliver each event + } + } +} +``` + +## Configuration Files + +All state lives in `~/.opencli/channel/`: + +| File | Purpose | +|------|---------| +| `subscriptions.json` | Subscription registry | +| `cursors.json` | Poll cursor positions per origin | +| `daemon.pid` | PID of running daemon | + +These are plain JSON — human-readable and inspectable. + +## Design Philosophy + +Channel is a **pipe**, not a brain. + +It borrows from Unix `fetchmail`: poll remote sources, track what you've seen, deliver to whoever asked. It doesn't decide what to do with events — that's the consumer's job. + +The consumer-side subscription model means Channel stays thin: +1. **Deliver** — get the event to the right place +2. **Continuity** — same origin always goes to the same subscriber +3. **Isolation** — different subscriptions don't cross + +Everything else — how to respond, whether to spawn new sessions, whether to write to a doc or reply in chat — is the consumer's decision. + +## Related + +- [RFC: OpenCLI Channel](https://github.com/jackwener/opencli/issues/369) +- [fetchmail](https://en.wikipedia.org/wiki/Fetchmail) — the Unix inspiration diff --git a/src/channel/cursor-store.ts b/src/channel/cursor-store.ts new file mode 100644 index 00000000..681bfde4 --- /dev/null +++ b/src/channel/cursor-store.ts @@ -0,0 +1,61 @@ +/** + * Cursor store — persists poll positions per origin. + * File: ~/.opencli/channel/cursors.json + */ + +import { mkdirSync } from 'node:fs'; +import { readFile, writeFile, rename } from 'node:fs/promises'; +import { dirname, join } from 'node:path'; +import { homedir } from 'node:os'; + +export interface CursorEntry { + cursor: string; + lastPoll: string; + eventsDelivered: number; +} + +const DEFAULT_PATH = join(homedir(), '.opencli', 'channel', 'cursors.json'); + +export class CursorStore { + private entries = new Map(); + + constructor(private readonly path: string = DEFAULT_PATH) {} + + async load(): Promise { + try { + const raw = await readFile(this.path, 'utf8'); + const parsed = JSON.parse(raw) as Record; + this.entries = new Map(Object.entries(parsed)); + } catch (e: unknown) { + if ((e as NodeJS.ErrnoException).code === 'ENOENT') { + this.entries = new Map(); + return; + } + throw e; + } + } + + async save(): Promise { + mkdirSync(dirname(this.path), { recursive: true }); + const tmp = `${this.path}.tmp`; + const data = JSON.stringify(Object.fromEntries(this.entries), null, 2); + await writeFile(tmp, data, 'utf8'); + await rename(tmp, this.path); + } + + get(origin: string): CursorEntry | undefined { + return this.entries.get(origin); + } + + set(origin: string, cursor: string, eventsDelivered: number): void { + this.entries.set(origin, { + cursor, + lastPoll: new Date().toISOString(), + eventsDelivered, + }); + } + + getAll(): Map { + return new Map(this.entries); + } +} diff --git a/src/channel/dedup.ts b/src/channel/dedup.ts new file mode 100644 index 00000000..0bae2c27 --- /dev/null +++ b/src/channel/dedup.ts @@ -0,0 +1,28 @@ +/** + * In-memory ring-buffer dedup. + * Keeps last N event IDs to prevent re-delivery. + */ + +export class Dedup { + private readonly ids: string[] = []; + private readonly seen = new Set(); + private readonly maxSize: number; + + constructor(maxSize = 10_000) { + this.maxSize = maxSize; + } + + isDuplicate(id: string): boolean { + return this.seen.has(id); + } + + add(id: string): void { + if (this.seen.has(id)) return; + this.ids.push(id); + this.seen.add(id); + if (this.ids.length > this.maxSize) { + const oldest = this.ids.shift(); + if (oldest !== undefined) this.seen.delete(oldest); + } + } +} diff --git a/src/channel/index.ts b/src/channel/index.ts new file mode 100644 index 00000000..0bbac4ba --- /dev/null +++ b/src/channel/index.ts @@ -0,0 +1,376 @@ +/** + * Channel — Event subscription protocol for OpenCLI. + * + * CLI subcommands: + * opencli channel sources [name] — list available sources + * opencli channel subscribe — subscribe to events + * opencli channel unsubscribe — remove subscription + * opencli channel subscriptions — list current subscriptions + * opencli channel start [-d] — start polling daemon + * opencli channel stop — stop daemon + * opencli channel status — show stats + * opencli channel poll — one-shot poll + */ + +import { Command } from 'commander'; +import { existsSync, readFileSync, writeFileSync, unlinkSync } from 'node:fs'; +import { join } from 'node:path'; +import { homedir } from 'node:os'; +import { execSync, spawn } from 'node:child_process'; + +import { CursorStore } from './cursor-store.js'; +import { Dedup } from './dedup.js'; +import { SubscriptionRegistry } from './registry.js'; +import { Scheduler } from './scheduler.js'; +import type { ChannelSink, ChannelSource } from './types.js'; + +// Sources +import { GitHubSource } from './sources/github.js'; + +// Sinks +import { StdoutSink } from './sinks/stdout.js'; +import { WebhookSink } from './sinks/webhook.js'; + +// ── Constants ─────────────────────────────────────────────────────── + +const CHANNEL_DIR = join(homedir(), '.opencli', 'channel'); +const PID_FILE = join(CHANNEL_DIR, 'daemon.pid'); + +// ── Source / Sink registries ──────────────────────────────────────── + +function getSources(): Map { + const map = new Map(); + map.set('github', new GitHubSource()); + return map; +} + +function getSinks(): Map { + const map = new Map(); + const stdout = new StdoutSink(); + const webhook = new WebhookSink(); + map.set('stdout', stdout); + map.set('webhook', webhook); + return map; +} + +// ── CLI registration ──────────────────────────────────────────────── + +export function registerChannelCommand(program: Command): void { + const channel = program + .command('channel') + .description('Event subscription — subscribe to platform events and receive them in your session'); + + // ── sources ───────────────────────────────────────────────────── + + channel + .command('sources [name]') + .description('List available event sources (or subscribable items for a specific source)') + .action(async (name?: string) => { + const sources = getSources(); + + if (!name) { + // List all sources + console.log('Available sources:\n'); + for (const [sourceName, source] of sources) { + console.log(` ${sourceName}`); + const items = await source.listSubscribable({}); + for (const item of items.slice(0, 5)) { + console.log(` ${item.origin} — ${item.description}`); + } + if (items.length > 5) console.log(` ... and ${items.length - 5} more`); + console.log(); + } + return; + } + + const source = sources.get(name); + if (!source) { + console.error(`Unknown source: ${name}`); + console.error(`Available: ${[...sources.keys()].join(', ')}`); + process.exit(1); + } + + const items = await source.listSubscribable({}); + console.log(`Subscribable items from ${name}:\n`); + for (const item of items) { + console.log(` ${item.origin} — ${item.description}`); + } + }); + + // ── subscribe ─────────────────────────────────────────────────── + + channel + .command('subscribe ') + .description('Subscribe to events from an origin (e.g. github:owner/repo#42)') + .option('-s, --sink ', 'Sink to deliver events to', 'stdout') + .option('-i, --interval ', 'Poll interval in milliseconds', '60000') + .option('--webhook-url ', 'Webhook URL (when sink=webhook)') + .action(async (origin: string, opts: { sink: string; interval: string; webhookUrl?: string }) => { + // Validate origin can be parsed by some source + const sources = getSources(); + let parsed = false; + for (const source of sources.values()) { + if (source.parseOrigin(origin)) { parsed = true; break; } + } + if (!parsed) { + console.error(`Cannot parse origin: ${origin}`); + console.error('Expected format: github:owner/repo, github:owner/repo#42, etc.'); + process.exit(1); + } + + const sinkConfig: Record = {}; + if (opts.sink === 'webhook' && opts.webhookUrl) { + sinkConfig.url = opts.webhookUrl; + } + + const registry = new SubscriptionRegistry(); + await registry.load(); + const sub = registry.add(origin, opts.sink, sinkConfig, parseInt(opts.interval, 10)); + await registry.save(); + + console.log(`✅ Subscribed to ${origin}`); + console.log(` ID: ${sub.id}`); + console.log(` Sink: ${opts.sink}`); + console.log(` Interval: ${opts.interval}ms`); + }); + + // ── unsubscribe ───────────────────────────────────────────────── + + channel + .command('unsubscribe ') + .description('Remove subscription for an origin') + .action(async (origin: string) => { + const registry = new SubscriptionRegistry(); + await registry.load(); + const removed = registry.remove(origin); + await registry.save(); + + if (removed) { + console.log(`✅ Unsubscribed from ${origin}`); + } else { + console.log(`No subscription found for ${origin}`); + } + }); + + // ── subscriptions ─────────────────────────────────────────────── + + channel + .command('subscriptions') + .alias('list') + .description('List current subscriptions') + .option('-f, --format ', 'Output format: table, json', 'table') + .action(async (opts: { format: string }) => { + const registry = new SubscriptionRegistry(); + await registry.load(); + const subs = registry.list(); + + if (subs.length === 0) { + console.log('No subscriptions. Use `opencli channel subscribe ` to add one.'); + return; + } + + if (opts.format === 'json') { + console.log(JSON.stringify(subs, null, 2)); + return; + } + + // Table output + const header = ['ORIGIN', 'SINK', 'INTERVAL', 'CREATED']; + const rows = subs.map(s => [ + s.origin, + s.sink, + s.intervalMs > 0 ? `${s.intervalMs}ms` : 'default', + new Date(s.createdAt).toLocaleDateString(), + ]); + + const widths = header.map((h, i) => + Math.max(h.length, ...rows.map(r => r[i].length)), + ); + + console.log(header.map((h, i) => h.padEnd(widths[i])).join(' ')); + console.log(widths.map(w => '─'.repeat(w)).join(' ')); + for (const row of rows) { + console.log(row.map((v, i) => v.padEnd(widths[i])).join(' ')); + } + }); + + // ── start ─────────────────────────────────────────────────────── + + channel + .command('start') + .description('Start the channel polling daemon') + .option('-d, --daemon', 'Run in background') + .action(async (opts: { daemon?: boolean }) => { + if (opts.daemon) { + // Spawn detached child + const child = spawn(process.execPath, [process.argv[1], 'channel', 'start'], { + detached: true, + stdio: 'ignore', + }); + child.unref(); + writeFileSync(PID_FILE, String(child.pid)); + console.log(`Channel daemon started (PID: ${child.pid})`); + return; + } + + // Foreground mode + console.log('Starting channel daemon (foreground)...'); + const registry = new SubscriptionRegistry(); + await registry.load(); + + const subs = registry.list(); + if (subs.length === 0) { + console.log('No subscriptions. Use `opencli channel subscribe ` first.'); + process.exit(0); + } + + const cursors = new CursorStore(); + await cursors.load(); + + const sources = getSources(); + const sinks = getSinks(); + + // Init sinks with config from subscriptions + for (const sub of subs) { + const sink = sinks.get(sub.sink); + if (sink) await sink.init(sub.sinkConfig); + } + + const dedup = new Dedup(); + const scheduler = new Scheduler(sources, sinks, registry, cursors, dedup); + + const shutdown = (): void => { + console.log('\nStopping channel daemon...'); + scheduler.stop(); + process.exit(0); + }; + process.on('SIGINT', shutdown); + process.on('SIGTERM', shutdown); + + // Write PID for status/stop + writeFileSync(PID_FILE, String(process.pid)); + + console.log(`Polling ${subs.length} subscription(s)...`); + for (const sub of subs) { + console.log(` ${sub.origin} → ${sub.sink}`); + } + console.log(); + + await scheduler.start(); + }); + + // ── stop ──────────────────────────────────────────────────────── + + channel + .command('stop') + .description('Stop the channel daemon') + .action(() => { + if (!existsSync(PID_FILE)) { + console.log('No daemon running (no PID file).'); + return; + } + + const pid = parseInt(readFileSync(PID_FILE, 'utf8').trim(), 10); + try { + process.kill(pid, 'SIGTERM'); + unlinkSync(PID_FILE); + console.log(`Channel daemon stopped (PID: ${pid})`); + } catch { + unlinkSync(PID_FILE); + console.log('Daemon was not running. Cleaned up PID file.'); + } + }); + + // ── status ────────────────────────────────────────────────────── + + channel + .command('status') + .description('Show channel daemon status and cursor positions') + .action(async () => { + // Check daemon + let daemonRunning = false; + if (existsSync(PID_FILE)) { + const pid = parseInt(readFileSync(PID_FILE, 'utf8').trim(), 10); + try { + process.kill(pid, 0); // Check if alive + daemonRunning = true; + } catch { + // Not running + } + } + console.log(`Daemon: ${daemonRunning ? '🟢 running' : '⚪ stopped'}`); + console.log(); + + // Subscriptions + const registry = new SubscriptionRegistry(); + await registry.load(); + const subs = registry.list(); + console.log(`Subscriptions: ${subs.length}`); + + if (subs.length === 0) return; + + // Cursors + const cursors = new CursorStore(); + await cursors.load(); + + console.log(); + const header = ['ORIGIN', 'SINK', 'LAST POLL', 'EVENTS']; + const rows = subs.map(s => { + const c = cursors.get(s.origin); + return [ + s.origin, + s.sink, + c?.lastPoll ? new Date(c.lastPoll).toLocaleString() : 'never', + String(c?.eventsDelivered ?? 0), + ]; + }); + + const widths = header.map((h, i) => + Math.max(h.length, ...rows.map(r => r[i].length)), + ); + console.log(header.map((h, i) => h.padEnd(widths[i])).join(' ')); + console.log(widths.map(w => '─'.repeat(w)).join(' ')); + for (const row of rows) { + console.log(row.map((v, i) => v.padEnd(widths[i])).join(' ')); + } + }); + + // ── poll (one-shot) ───────────────────────────────────────────── + + channel + .command('poll ') + .description('One-shot poll: fetch events and print to stdout') + .option('--since ', 'Poll from specific cursor/timestamp') + .action(async (origin: string, opts: { since?: string }) => { + const sources = getSources(); + + // Find matching source + let matchedSource: ChannelSource | undefined; + for (const source of sources.values()) { + if (source.parseOrigin(origin)) { + matchedSource = source; + break; + } + } + + if (!matchedSource) { + console.error(`Cannot parse origin: ${origin}`); + process.exit(1); + } + + const config = matchedSource.parseOrigin(origin)!; + const cursor = opts.since ?? null; + + const result = await matchedSource.poll(config, cursor); + + for (const event of result.events) { + process.stdout.write(JSON.stringify(event) + '\n'); + } + + if (result.events.length === 0) { + console.error('(no new events)'); + } else { + console.error(`${result.events.length} event(s). cursor: ${result.cursor}`); + } + }); +} diff --git a/src/channel/registry.ts b/src/channel/registry.ts new file mode 100644 index 00000000..6b9e8dcb --- /dev/null +++ b/src/channel/registry.ts @@ -0,0 +1,82 @@ +/** + * Subscription registry — persists who subscribes to what. + * File: ~/.opencli/channel/subscriptions.json + */ + +import { mkdirSync } from 'node:fs'; +import { readFile, writeFile, rename } from 'node:fs/promises'; +import { dirname, join } from 'node:path'; +import { homedir } from 'node:os'; +import { randomUUID } from 'node:crypto'; +import type { Subscription } from './types.js'; + +const DEFAULT_PATH = join(homedir(), '.opencli', 'channel', 'subscriptions.json'); + +export class SubscriptionRegistry { + private subs: Subscription[] = []; + + constructor(private readonly path: string = DEFAULT_PATH) {} + + async load(): Promise { + try { + const raw = await readFile(this.path, 'utf8'); + this.subs = JSON.parse(raw) as Subscription[]; + } catch (e: unknown) { + if ((e as NodeJS.ErrnoException).code === 'ENOENT') { + this.subs = []; + return; + } + throw e; + } + } + + async save(): Promise { + mkdirSync(dirname(this.path), { recursive: true }); + const tmp = `${this.path}.tmp`; + await writeFile(tmp, JSON.stringify(this.subs, null, 2), 'utf8'); + await rename(tmp, this.path); + } + + add(origin: string, sink: string, sinkConfig: Record = {}, intervalMs = 0): Subscription { + // Check for existing subscription with same origin + sink + const existing = this.subs.find(s => s.origin === origin && s.sink === sink); + if (existing) return existing; + + const sub: Subscription = { + id: randomUUID(), + origin, + sink, + sinkConfig, + intervalMs, + createdAt: new Date().toISOString(), + }; + this.subs.push(sub); + return sub; + } + + remove(origin: string): boolean { + const before = this.subs.length; + this.subs = this.subs.filter(s => s.origin !== origin); + return this.subs.length < before; + } + + removeById(id: string): boolean { + const before = this.subs.length; + this.subs = this.subs.filter(s => s.id !== id); + return this.subs.length < before; + } + + list(): Subscription[] { + return [...this.subs]; + } + + /** Get all unique origins. */ + origins(): string[] { + return [...new Set(this.subs.map(s => s.origin))]; + } + + /** Get all subscriptions for a given origin. */ + forOrigin(origin: string): Subscription[] { + return this.subs.filter(s => s.origin === origin); + } +} diff --git a/src/channel/scheduler.ts b/src/channel/scheduler.ts new file mode 100644 index 00000000..dcf93bdc --- /dev/null +++ b/src/channel/scheduler.ts @@ -0,0 +1,149 @@ +/** + * Scheduler — manages per-origin poll loops. + * One loop per unique origin across all subscriptions. + */ + +import { CursorStore } from './cursor-store.js'; +import { Dedup } from './dedup.js'; +import { SubscriptionRegistry } from './registry.js'; +import type { ChannelEvent, ChannelSink, ChannelSource, SourcePollConfig } from './types.js'; + +const DEFAULT_INTERVAL_MS = 60_000; +const MIN_INTERVAL_MS = 10_000; +const MAX_BACKOFF_MS = 5 * 60_000; + +interface OriginLoop { + origin: string; + source: ChannelSource; + pollConfig: SourcePollConfig; + timer: ReturnType | null; + intervalMs: number; + consecutiveErrors: number; +} + +export class Scheduler { + private readonly loops = new Map(); + private running = false; + + constructor( + private readonly sources: Map, + private readonly sinks: Map, + private readonly registry: SubscriptionRegistry, + private readonly cursors: CursorStore, + private readonly dedup: Dedup, + ) {} + + async start(): Promise { + this.running = true; + const origins = this.registry.origins(); + + for (const origin of origins) { + this.startOriginLoop(origin); + } + } + + stop(): void { + this.running = false; + for (const loop of this.loops.values()) { + if (loop.timer) clearTimeout(loop.timer); + } + this.loops.clear(); + } + + private startOriginLoop(origin: string): void { + if (this.loops.has(origin)) return; + + // Parse origin → source + config + const [sourceName] = origin.split(':', 1); + const source = this.sources.get(sourceName); + if (!source) { + console.error(`[channel] Unknown source "${sourceName}" for origin "${origin}"`); + return; + } + + const pollConfig = source.parseOrigin(origin); + if (!pollConfig) { + console.error(`[channel] Source "${sourceName}" cannot parse origin "${origin}"`); + return; + } + + // Use subscription-level interval override, or default + const subs = this.registry.forOrigin(origin); + const overrideMs = subs.reduce((min, s) => { + if (s.intervalMs > 0 && s.intervalMs < min) return s.intervalMs; + return min; + }, DEFAULT_INTERVAL_MS); + + const loop: OriginLoop = { + origin, + source, + pollConfig, + timer: null, + intervalMs: Math.max(overrideMs, MIN_INTERVAL_MS), + consecutiveErrors: 0, + }; + + this.loops.set(origin, loop); + void this.tick(loop); + } + + private async tick(loop: OriginLoop): Promise { + if (!this.running) return; + + try { + const cursorEntry = this.cursors.get(loop.origin); + const cursor = cursorEntry?.cursor ?? null; + + const result = await loop.source.poll(loop.pollConfig, cursor); + + // Dedup + const fresh: ChannelEvent[] = []; + for (const event of result.events) { + const dedupKey = `${loop.origin}:${event.id}`; + if (!this.dedup.isDuplicate(dedupKey)) { + this.dedup.add(dedupKey); + fresh.push(event); + } + } + + // Deliver to all subscribers of this origin + if (fresh.length > 0) { + const subs = this.registry.forOrigin(loop.origin); + for (const sub of subs) { + const sink = this.sinks.get(sub.sink); + if (!sink) { + console.error(`[channel] Unknown sink "${sub.sink}" for subscription ${sub.id}`); + continue; + } + try { + await sink.deliver(fresh); + } catch (e) { + console.error(`[channel] Sink "${sub.sink}" delivery failed:`, e); + } + } + } + + // Update cursor + this.cursors.set(loop.origin, result.cursor, fresh.length); + await this.cursors.save(); + + // Reset backoff on success + loop.consecutiveErrors = 0; + + // Respect server-recommended interval + if (result.recommendedIntervalMs && result.recommendedIntervalMs > loop.intervalMs) { + loop.intervalMs = result.recommendedIntervalMs; + } + } catch (e) { + loop.consecutiveErrors++; + console.error(`[channel] Poll failed for "${loop.origin}" (attempt ${loop.consecutiveErrors}):`, e); + } + + // Schedule next tick with backoff + const backoff = loop.consecutiveErrors > 0 + ? Math.min(loop.intervalMs * Math.pow(2, loop.consecutiveErrors - 1), MAX_BACKOFF_MS) + : loop.intervalMs; + + loop.timer = setTimeout(() => void this.tick(loop), backoff); + } +} diff --git a/src/channel/sinks/stdout.ts b/src/channel/sinks/stdout.ts new file mode 100644 index 00000000..8a996b07 --- /dev/null +++ b/src/channel/sinks/stdout.ts @@ -0,0 +1,18 @@ +/** + * Stdout sink — prints events as JSON lines. + * Pipe-friendly, zero config. + */ + +import type { ChannelEvent, ChannelSink } from '../types.js'; + +export class StdoutSink implements ChannelSink { + readonly name = 'stdout'; + + async init(_config: Record): Promise {} + + async deliver(events: ChannelEvent[]): Promise { + for (const event of events) { + process.stdout.write(JSON.stringify(event) + '\n'); + } + } +} diff --git a/src/channel/sinks/webhook.ts b/src/channel/sinks/webhook.ts new file mode 100644 index 00000000..989196d2 --- /dev/null +++ b/src/channel/sinks/webhook.ts @@ -0,0 +1,38 @@ +/** + * Webhook sink — POST events to a URL. + */ + +import type { ChannelEvent, ChannelSink } from '../types.js'; + +export class WebhookSink implements ChannelSink { + readonly name = 'webhook'; + private url = ''; + private headers: Record = {}; + + async init(config: Record): Promise { + if (typeof config.url !== 'string' || !config.url) { + throw new Error('Webhook sink requires a "url" config.'); + } + this.url = config.url; + if (config.headers && typeof config.headers === 'object') { + this.headers = config.headers as Record; + } + } + + async deliver(events: ChannelEvent[]): Promise { + for (const event of events) { + try { + const res = await fetch(this.url, { + method: 'POST', + headers: { 'Content-Type': 'application/json', ...this.headers }, + body: JSON.stringify(event), + }); + if (!res.ok) { + console.error(`[webhook] ${res.status} ${res.statusText} for event ${event.id}`); + } + } catch (e) { + console.error(`[webhook] Failed to deliver event ${event.id}:`, e); + } + } + } +} diff --git a/src/channel/sources/github.ts b/src/channel/sources/github.ts new file mode 100644 index 00000000..3a329bab --- /dev/null +++ b/src/channel/sources/github.ts @@ -0,0 +1,317 @@ +/** + * GitHub source adapter. + * + * Uses `gh api` CLI for all API calls — inherits auth, proxy, host config. + * + * Origin formats: + * github:owner/repo — repo-level events + * github:owner/repo#42 — issue/PR comments + * github:owner/repo/pulls — all PR activity + * github:owner/repo/issues — all issue activity + */ + +import { execSync } from 'node:child_process'; +import type { + ChannelEvent, + ChannelSource, + PollResult, + SourcePollConfig, + SubscribableItem, +} from '../types.js'; + +// ── Origin parsing ────────────────────────────────────────────────── + +interface RepoPollConfig extends SourcePollConfig { + kind: 'repo'; + owner: string; + repo: string; +} + +interface IssuePollConfig extends SourcePollConfig { + kind: 'issue'; + owner: string; + repo: string; + number: number; +} + +interface PullsPollConfig extends SourcePollConfig { + kind: 'pulls'; + owner: string; + repo: string; +} + +interface IssuesPollConfig extends SourcePollConfig { + kind: 'issues'; + owner: string; + repo: string; +} + +type GitHubPollConfig = RepoPollConfig | IssuePollConfig | PullsPollConfig | IssuesPollConfig; + +// ── Helpers ───────────────────────────────────────────────────────── + +function ghJson(endpoint: string, _args: string[] = []): { data: T; pollInterval?: number } { + // Call gh api twice: once for headers (--include is unreliable to parse), + // once for JSON body. We use -i and split on the first '[' or '{' as + // the JSON boundary — gh outputs headers then a blank line then JSON. + const cmd = `gh api --include ${endpoint}`; + const raw = execSync(cmd, { encoding: 'utf8', timeout: 30_000 }); + + // Find the start of JSON body — first '[' or '{' that starts a line + const jsonStart = raw.search(/^\s*[\[{]/m); + const headers = jsonStart > 0 ? raw.slice(0, jsonStart) : ''; + const body = jsonStart >= 0 ? raw.slice(jsonStart) : raw; + + const data = JSON.parse(body) as T; + + // Extract X-Poll-Interval if present + let pollInterval: number | undefined; + const match = headers.match(/x-poll-interval:\s*(\d+)/i); + if (match) pollInterval = parseInt(match[1], 10) * 1000; + + return { data, pollInterval }; +} + +// ── Source implementation ─────────────────────────────────────────── + +export class GitHubSource implements ChannelSource { + readonly name = 'github'; + + async listSubscribable(_config: Record): Promise { + // List user's repos as subscribable items + try { + const { data } = ghJson>( + '/user/repos?per_page=30&sort=updated', + ); + return data.map(r => ({ + origin: `github:${r.full_name}`, + description: r.description ?? r.full_name, + })); + } catch { + return [ + { origin: 'github:/', description: 'Subscribe to repo events' }, + { origin: 'github:/#', description: 'Subscribe to issue/PR comments' }, + { origin: 'github://pulls', description: 'Subscribe to all PR activity' }, + { origin: 'github://issues', description: 'Subscribe to all issue activity' }, + ]; + } + } + + parseOrigin(origin: string): GitHubPollConfig | null { + if (!origin.startsWith('github:')) return null; + const rest = origin.slice('github:'.length); + + // github:owner/repo/pulls + const pullsMatch = rest.match(/^([^/]+)\/([^/]+)\/pulls$/); + if (pullsMatch) return { kind: 'pulls', owner: pullsMatch[1], repo: pullsMatch[2] }; + + // github:owner/repo/issues + const issuesMatch = rest.match(/^([^/]+)\/([^/]+)\/issues$/); + if (issuesMatch) return { kind: 'issues', owner: issuesMatch[1], repo: issuesMatch[2] }; + + // github:owner/repo#42 + const issueMatch = rest.match(/^([^/]+)\/([^/#]+)#(\d+)$/); + if (issueMatch) return { kind: 'issue', owner: issueMatch[1], repo: issueMatch[2], number: parseInt(issueMatch[3], 10) }; + + // github:owner/repo + const repoMatch = rest.match(/^([^/]+)\/([^/#]+)$/); + if (repoMatch) return { kind: 'repo', owner: repoMatch[1], repo: repoMatch[2] }; + + return null; + } + + async poll(config: SourcePollConfig, cursor: string | null): Promise { + const c = config as GitHubPollConfig; + switch (c.kind) { + case 'repo': return this.pollRepoEvents(c, cursor); + case 'issue': return this.pollIssueComments(c, cursor); + case 'pulls': return this.pollPullRequests(c, cursor); + case 'issues': return this.pollIssues(c, cursor); + } + } + + // ── Poll strategies ───────────────────────────────────────────── + + private pollRepoEvents(c: RepoPollConfig, cursor: string | null): PollResult { + const endpoint = `/repos/${c.owner}/${c.repo}/events?per_page=100`; + const { data, pollInterval } = ghJson; + }>>(endpoint); + + const cursorTs = cursor ? Date.parse(cursor) : 0; + const events: ChannelEvent[] = data + .filter(e => Date.parse(e.created_at) > cursorTs) + .map(e => ({ + id: `gh-event-${e.id}`, + source: 'github', + type: mapGitHubEventType(e.type), + timestamp: e.created_at, + origin: `github:${c.owner}/${c.repo}`, + payload: { + actor: e.actor.login, + eventType: e.type, + ...e.payload, + }, + })); + + const newCursor = data.length > 0 ? data[0].created_at : (cursor ?? ''); + + return { + events, + cursor: newCursor, + recommendedIntervalMs: pollInterval, + }; + } + + private pollIssueComments(c: IssuePollConfig, cursor: string | null): PollResult { + const sinceParam = cursor ? `?since=${cursor}` : ''; + const endpoint = `/repos/${c.owner}/${c.repo}/issues/${c.number}/comments${sinceParam}`; + const { data, pollInterval } = ghJson>(endpoint); + + // Filter out events at or before cursor (since= is inclusive) + const cursorTs = cursor ? Date.parse(cursor) : 0; + const events: ChannelEvent[] = data + .filter(comment => Date.parse(comment.created_at) > cursorTs) + .map(comment => ({ + id: `gh-comment-${comment.id}`, + source: 'github', + type: 'issue_comment.created', + timestamp: comment.created_at, + origin: `github:${c.owner}/${c.repo}#${c.number}`, + payload: { + author: comment.user.login, + body: comment.body, + htmlUrl: comment.html_url, + }, + })); + + const newCursor = data.length > 0 + ? data[data.length - 1].created_at + : (cursor ?? ''); + + return { + events, + cursor: newCursor, + recommendedIntervalMs: pollInterval, + }; + } + + private pollPullRequests(c: PullsPollConfig, cursor: string | null): PollResult { + const sinceParam = cursor ? `&since=${cursor}` : ''; + const endpoint = `/repos/${c.owner}/${c.repo}/pulls?state=all&sort=updated&direction=desc&per_page=30${sinceParam}`; + const { data, pollInterval } = ghJson>(endpoint); + + const cursorTs = cursor ? Date.parse(cursor) : 0; + const events: ChannelEvent[] = data + .filter(pr => Date.parse(pr.updated_at) > cursorTs) + .map(pr => ({ + id: `gh-pr-${pr.id}-${pr.updated_at}`, + source: 'github', + type: `pull_request.${pr.state}`, + timestamp: pr.updated_at, + origin: `github:${c.owner}/${c.repo}/pulls`, + payload: { + number: pr.number, + title: pr.title, + state: pr.state, + author: pr.user.login, + htmlUrl: pr.html_url, + }, + })); + + const newCursor = data.length > 0 + ? data[0].updated_at + : (cursor ?? ''); + + return { + events, + cursor: newCursor, + recommendedIntervalMs: pollInterval, + }; + } + + private pollIssues(c: IssuesPollConfig, cursor: string | null): PollResult { + const sinceParam = cursor ? `?since=${cursor}` : '?'; + const endpoint = `/repos/${c.owner}/${c.repo}/issues${sinceParam}&state=all&sort=updated&direction=desc&per_page=30`; + const { data, pollInterval } = ghJson>(endpoint); + + const cursorTs = cursor ? Date.parse(cursor) : 0; + const events: ChannelEvent[] = data + // Filter out PRs (GitHub issues API includes PRs) + .filter(issue => !issue.pull_request) + .filter(issue => Date.parse(issue.updated_at) > cursorTs) + .map(issue => ({ + id: `gh-issue-${issue.id}-${issue.updated_at}`, + source: 'github', + type: `issue.${issue.state}`, + timestamp: issue.updated_at, + origin: `github:${c.owner}/${c.repo}/issues`, + payload: { + number: issue.number, + title: issue.title, + state: issue.state, + author: issue.user.login, + htmlUrl: issue.html_url, + }, + })); + + const newCursor = data.length > 0 + ? data[0].updated_at + : (cursor ?? ''); + + return { + events, + cursor: newCursor, + recommendedIntervalMs: pollInterval, + }; + } +} + +// ── Event type mapping ────────────────────────────────────────────── + +function mapGitHubEventType(ghType: string): string { + const map: Record = { + PushEvent: 'push', + PullRequestEvent: 'pull_request', + PullRequestReviewEvent: 'pull_request_review', + PullRequestReviewCommentEvent: 'pull_request_review_comment', + IssuesEvent: 'issue', + IssueCommentEvent: 'issue_comment', + CreateEvent: 'create', + DeleteEvent: 'delete', + ForkEvent: 'fork', + WatchEvent: 'star', + ReleaseEvent: 'release', + }; + return map[ghType] ?? ghType.replace(/Event$/, '').toLowerCase(); +} diff --git a/src/channel/types.ts b/src/channel/types.ts new file mode 100644 index 00000000..2addb974 --- /dev/null +++ b/src/channel/types.ts @@ -0,0 +1,84 @@ +/** + * Channel — Event subscription protocol for OpenCLI. + * + * Core types: events, sources, sinks, subscriptions. + */ + +/** Unified event envelope emitted by all sources. */ +export interface ChannelEvent { + /** Globally unique event ID (used for dedup). */ + id: string; + /** Which source adapter produced this event. */ + source: string; + /** Platform-specific event type (dot-namespaced). */ + type: string; + /** When the event occurred on the platform (ISO-8601). */ + timestamp: string; + /** + * Origin identifier — what subscriptions match against. + * Format: `source:path` e.g. `github:user/repo#42` + */ + origin: string; + /** Platform-specific event data. */ + payload: Record; +} + +/** A source adapter knows how to poll a specific platform for events. */ +export interface ChannelSource { + readonly name: string; + + /** Return human-readable list of subscribable items for discovery. */ + listSubscribable(config: Record): Promise; + + /** + * Parse an origin string into source-specific config. + * e.g. "github:user/repo#42" → { owner: "user", repo: "repo", issue: 42 } + * Returns null if this source can't handle the origin. + */ + parseOrigin(origin: string): SourcePollConfig | null; + + /** + * Poll for new events since cursor. + * Returns events + new cursor position. + */ + poll(config: SourcePollConfig, cursor: string | null): Promise; +} + +export interface SubscribableItem { + origin: string; + description: string; +} + +export interface SourcePollConfig { + [key: string]: unknown; +} + +export interface PollResult { + events: ChannelEvent[]; + cursor: string; + /** Server-recommended poll interval in ms (e.g. GitHub X-Poll-Interval). */ + recommendedIntervalMs?: number; +} + +/** A sink adapter knows how to deliver events to a consumer. */ +export interface ChannelSink { + readonly name: string; + init(config: Record): Promise; + deliver(events: ChannelEvent[]): Promise; +} + +/** A subscription: consumer → origin mapping. */ +export interface Subscription { + /** Unique subscription ID. */ + id: string; + /** Origin pattern to match, e.g. "github:user/repo#42". */ + origin: string; + /** Sink name to deliver to. */ + sink: string; + /** Sink-specific config. */ + sinkConfig: Record; + /** Poll interval override in ms (0 = use source default). */ + intervalMs: number; + /** When this subscription was created. */ + createdAt: string; +} diff --git a/src/cli.ts b/src/cli.ts index d9c26684..f4290ef1 100644 --- a/src/cli.ts +++ b/src/cli.ts @@ -16,6 +16,7 @@ import { printCompletionScript } from './completion.js'; import { loadExternalClis, executeExternalCli, installExternalCli, registerExternalCli, isBinaryInstalled } from './external.js'; import { registerAllCommands } from './commanderAdapter.js'; import { getErrorMessage } from './errors.js'; +import { registerChannelCommand } from './channel/index.js'; export function runCli(BUILTIN_CLIS: string, USER_CLIS: string): void { const program = new Command(); @@ -451,6 +452,9 @@ export function runCli(BUILTIN_CLIS: string, USER_CLIS: string): void { siteGroups.set('antigravity', antigravityCmd); registerAllCommands(program, siteGroups); + // ── Channel (event subscriptions) ─────────────────────────────────────── + registerChannelCommand(program); + // ── Unknown command fallback ────────────────────────────────────────────── // Security: do NOT auto-discover and register arbitrary system binaries. // Only explicitly registered external CLIs (via `opencli register`) are allowed. From 92da5d57034962426438aa20db65c8617eb787c8 Mon Sep 17 00:00:00 2001 From: Fourier Date: Wed, 25 Mar 2026 16:15:45 +0800 Subject: [PATCH 2/7] fix: serialize concurrent cursor saves to avoid race condition When multiple origins poll simultaneously, concurrent save() calls could race on the temp file. Add a simple promise chain to serialize writes, and use PID-stamped tmp filenames for extra safety. --- src/channel/cursor-store.ts | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/src/channel/cursor-store.ts b/src/channel/cursor-store.ts index 681bfde4..4314a203 100644 --- a/src/channel/cursor-store.ts +++ b/src/channel/cursor-store.ts @@ -35,9 +35,21 @@ export class CursorStore { } } + private saving: Promise | null = null; + async save(): Promise { + // Serialize concurrent saves to avoid tmp file race condition + if (this.saving) { + await this.saving; + } + this.saving = this._doSave(); + await this.saving; + this.saving = null; + } + + private async _doSave(): Promise { mkdirSync(dirname(this.path), { recursive: true }); - const tmp = `${this.path}.tmp`; + const tmp = `${this.path}.${process.pid}.tmp`; const data = JSON.stringify(Object.fromEntries(this.entries), null, 2); await writeFile(tmp, data, 'utf8'); await rename(tmp, this.path); From 47aed4bb0091201ee276adc15c99ee1467ef3020 Mon Sep 17 00:00:00 2001 From: Fourier Date: Wed, 25 Mar 2026 16:31:04 +0800 Subject: [PATCH 3/7] fix: address code review findings (security + correctness) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - CRITICAL: Replace execSync with execFileSync to prevent command injection via malicious origin strings (e.g. $(whoami) in owner/repo) - HIGH: Fix cursor-store mutex — use promise chain instead of broken check-then-act pattern; use unique tmp filenames with PID+timestamp - HIGH: Registry save now uses unique tmp filenames - HIGH: Webhook sink validates URL scheme (http/https only) - HIGH: Daemon start -d checks for existing running daemon before spawning - MEDIUM: Cursor only advances when at least one sink succeeds - LOW: Validate --interval (reject NaN), require --webhook-url with --sink webhook, remove unused import --- src/channel/cursor-store.ts | 14 +++++--------- src/channel/index.ts | 28 +++++++++++++++++++++++++++- src/channel/registry.ts | 2 +- src/channel/scheduler.ts | 10 +++++++--- src/channel/sinks/webhook.ts | 5 +++++ src/channel/sources/github.ts | 14 +++++++------- 6 files changed, 52 insertions(+), 21 deletions(-) diff --git a/src/channel/cursor-store.ts b/src/channel/cursor-store.ts index 4314a203..0df49ef8 100644 --- a/src/channel/cursor-store.ts +++ b/src/channel/cursor-store.ts @@ -35,21 +35,17 @@ export class CursorStore { } } - private saving: Promise | null = null; + private saveQueue: Promise = Promise.resolve(); async save(): Promise { - // Serialize concurrent saves to avoid tmp file race condition - if (this.saving) { - await this.saving; - } - this.saving = this._doSave(); - await this.saving; - this.saving = null; + // Chain saves to serialize concurrent calls + this.saveQueue = this.saveQueue.then(() => this._doSave(), () => this._doSave()); + await this.saveQueue; } private async _doSave(): Promise { mkdirSync(dirname(this.path), { recursive: true }); - const tmp = `${this.path}.${process.pid}.tmp`; + const tmp = `${this.path}.${process.pid}.${Date.now()}.tmp`; const data = JSON.stringify(Object.fromEntries(this.entries), null, 2); await writeFile(tmp, data, 'utf8'); await rename(tmp, this.path); diff --git a/src/channel/index.ts b/src/channel/index.ts index 0bbac4ba..72c6a2f4 100644 --- a/src/channel/index.ts +++ b/src/channel/index.ts @@ -16,7 +16,7 @@ import { Command } from 'commander'; import { existsSync, readFileSync, writeFileSync, unlinkSync } from 'node:fs'; import { join } from 'node:path'; import { homedir } from 'node:os'; -import { execSync, spawn } from 'node:child_process'; +import { spawn } from 'node:child_process'; import { CursorStore } from './cursor-store.js'; import { Dedup } from './dedup.js'; @@ -118,6 +118,19 @@ export function registerChannelCommand(program: Command): void { process.exit(1); } + // Validate interval + const intervalMs = parseInt(opts.interval, 10); + if (isNaN(intervalMs) || intervalMs < 0) { + console.error(`Invalid interval: ${opts.interval}. Must be a positive number in milliseconds.`); + process.exit(1); + } + + // Validate webhook config + if (opts.sink === 'webhook' && !opts.webhookUrl) { + console.error('Webhook sink requires --webhook-url.'); + process.exit(1); + } + const sinkConfig: Record = {}; if (opts.sink === 'webhook' && opts.webhookUrl) { sinkConfig.url = opts.webhookUrl; @@ -202,6 +215,19 @@ export function registerChannelCommand(program: Command): void { .option('-d, --daemon', 'Run in background') .action(async (opts: { daemon?: boolean }) => { if (opts.daemon) { + // Check for stale PID file + if (existsSync(PID_FILE)) { + const existingPid = parseInt(readFileSync(PID_FILE, 'utf8').trim(), 10); + try { + process.kill(existingPid, 0); + console.error(`Channel daemon already running (PID: ${existingPid}). Use 'opencli channel stop' first.`); + process.exit(1); + } catch { + // Stale PID, clean up + unlinkSync(PID_FILE); + } + } + // Spawn detached child const child = spawn(process.execPath, [process.argv[1], 'channel', 'start'], { detached: true, diff --git a/src/channel/registry.ts b/src/channel/registry.ts index 6b9e8dcb..f7ef67b7 100644 --- a/src/channel/registry.ts +++ b/src/channel/registry.ts @@ -32,7 +32,7 @@ export class SubscriptionRegistry { async save(): Promise { mkdirSync(dirname(this.path), { recursive: true }); - const tmp = `${this.path}.tmp`; + const tmp = `${this.path}.${process.pid}.${Date.now()}.tmp`; await writeFile(tmp, JSON.stringify(this.subs, null, 2), 'utf8'); await rename(tmp, this.path); } diff --git a/src/channel/scheduler.ts b/src/channel/scheduler.ts index dcf93bdc..b3a764d5 100644 --- a/src/channel/scheduler.ts +++ b/src/channel/scheduler.ts @@ -107,6 +107,7 @@ export class Scheduler { } // Deliver to all subscribers of this origin + let anyDelivered = false; if (fresh.length > 0) { const subs = this.registry.forOrigin(loop.origin); for (const sub of subs) { @@ -117,15 +118,18 @@ export class Scheduler { } try { await sink.deliver(fresh); + anyDelivered = true; } catch (e) { console.error(`[channel] Sink "${sub.sink}" delivery failed:`, e); } } } - // Update cursor - this.cursors.set(loop.origin, result.cursor, fresh.length); - await this.cursors.save(); + // Only advance cursor if at least one sink succeeded (or no events) + if (fresh.length === 0 || anyDelivered) { + this.cursors.set(loop.origin, result.cursor, fresh.length); + await this.cursors.save(); + } // Reset backoff on success loop.consecutiveErrors = 0; diff --git a/src/channel/sinks/webhook.ts b/src/channel/sinks/webhook.ts index 989196d2..a96e3fe9 100644 --- a/src/channel/sinks/webhook.ts +++ b/src/channel/sinks/webhook.ts @@ -13,6 +13,11 @@ export class WebhookSink implements ChannelSink { if (typeof config.url !== 'string' || !config.url) { throw new Error('Webhook sink requires a "url" config.'); } + // Validate URL scheme to prevent SSRF + const parsed = new URL(config.url); + if (!['http:', 'https:'].includes(parsed.protocol)) { + throw new Error(`Webhook sink only supports http/https URLs, got: ${parsed.protocol}`); + } this.url = config.url; if (config.headers && typeof config.headers === 'object') { this.headers = config.headers as Record; diff --git a/src/channel/sources/github.ts b/src/channel/sources/github.ts index 3a329bab..75ea2b4c 100644 --- a/src/channel/sources/github.ts +++ b/src/channel/sources/github.ts @@ -10,7 +10,7 @@ * github:owner/repo/issues — all issue activity */ -import { execSync } from 'node:child_process'; +import { execFileSync } from 'node:child_process'; import type { ChannelEvent, ChannelSource, @@ -50,12 +50,12 @@ type GitHubPollConfig = RepoPollConfig | IssuePollConfig | PullsPollConfig | Iss // ── Helpers ───────────────────────────────────────────────────────── -function ghJson(endpoint: string, _args: string[] = []): { data: T; pollInterval?: number } { - // Call gh api twice: once for headers (--include is unreliable to parse), - // once for JSON body. We use -i and split on the first '[' or '{' as - // the JSON boundary — gh outputs headers then a blank line then JSON. - const cmd = `gh api --include ${endpoint}`; - const raw = execSync(cmd, { encoding: 'utf8', timeout: 30_000 }); +function ghJson(endpoint: string): { data: T; pollInterval?: number } { + // Use execFileSync to avoid shell injection — origin strings are user-supplied. + const raw = execFileSync('gh', ['api', '--include', endpoint], { + encoding: 'utf8', + timeout: 30_000, + }); // Find the start of JSON body — first '[' or '{' that starts a line const jsonStart = raw.search(/^\s*[\[{]/m); From abffcdae80d735fb8f77cb89c1af67375b7de43d Mon Sep 17 00:00:00 2001 From: Fourier Date: Wed, 25 Mar 2026 16:38:59 +0800 Subject: [PATCH 4/7] fix: address all remaining review findings MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Sink per-subscription: replace shared singleton sinks with SinkFactory pattern — each subscription gets a dedicated sink instance, fixing the webhook URL overwrite bug when multiple subscriptions use different URLs - Issue comments cursor: use updated_at (matches GitHub's since= semantics) instead of created_at; detect edited comments via issue_comment.updated type; include updated_at in dedup ID to catch edits - eventsDelivered: cumulative counter instead of resetting each poll - Graceful shutdown: flush cursor store before process.exit - Dedup: O(1) circular buffer replacing O(n) Array.shift() - Remove unused _args parameter from ghJson --- src/channel/cursor-store.ts | 6 +++-- src/channel/dedup.ts | 21 ++++++++++------ src/channel/index.ts | 28 ++++++++------------- src/channel/scheduler.ts | 47 +++++++++++++++++++++++++---------- src/channel/sources/github.ts | 15 ++++++----- 5 files changed, 72 insertions(+), 45 deletions(-) diff --git a/src/channel/cursor-store.ts b/src/channel/cursor-store.ts index 0df49ef8..be1bf775 100644 --- a/src/channel/cursor-store.ts +++ b/src/channel/cursor-store.ts @@ -55,11 +55,13 @@ export class CursorStore { return this.entries.get(origin); } - set(origin: string, cursor: string, eventsDelivered: number): void { + set(origin: string, cursor: string, newEventsDelivered: number): void { + const existing = this.entries.get(origin); + const cumulative = (existing?.eventsDelivered ?? 0) + newEventsDelivered; this.entries.set(origin, { cursor, lastPoll: new Date().toISOString(), - eventsDelivered, + eventsDelivered: cumulative, }); } diff --git a/src/channel/dedup.ts b/src/channel/dedup.ts index 0bae2c27..2df487eb 100644 --- a/src/channel/dedup.ts +++ b/src/channel/dedup.ts @@ -1,15 +1,18 @@ /** - * In-memory ring-buffer dedup. + * In-memory circular-buffer dedup. * Keeps last N event IDs to prevent re-delivery. + * O(1) add/check via Set + circular index (no Array.shift overhead). */ export class Dedup { - private readonly ids: string[] = []; + private readonly ring: (string | undefined)[]; private readonly seen = new Set(); private readonly maxSize: number; + private writeIdx = 0; constructor(maxSize = 10_000) { this.maxSize = maxSize; + this.ring = new Array(maxSize); } isDuplicate(id: string): boolean { @@ -18,11 +21,15 @@ export class Dedup { add(id: string): void { if (this.seen.has(id)) return; - this.ids.push(id); - this.seen.add(id); - if (this.ids.length > this.maxSize) { - const oldest = this.ids.shift(); - if (oldest !== undefined) this.seen.delete(oldest); + + // Evict the oldest entry at the write position + const evicted = this.ring[this.writeIdx]; + if (evicted !== undefined) { + this.seen.delete(evicted); } + + this.ring[this.writeIdx] = id; + this.seen.add(id); + this.writeIdx = (this.writeIdx + 1) % this.maxSize; } } diff --git a/src/channel/index.ts b/src/channel/index.ts index 72c6a2f4..019a1bb4 100644 --- a/src/channel/index.ts +++ b/src/channel/index.ts @@ -21,7 +21,7 @@ import { spawn } from 'node:child_process'; import { CursorStore } from './cursor-store.js'; import { Dedup } from './dedup.js'; import { SubscriptionRegistry } from './registry.js'; -import { Scheduler } from './scheduler.js'; +import { Scheduler, type SinkFactory } from './scheduler.js'; import type { ChannelSink, ChannelSource } from './types.js'; // Sources @@ -44,13 +44,13 @@ function getSources(): Map { return map; } -function getSinks(): Map { - const map = new Map(); - const stdout = new StdoutSink(); - const webhook = new WebhookSink(); - map.set('stdout', stdout); - map.set('webhook', webhook); - return map; +/** Factory that creates a fresh sink instance per subscription. */ +function createSink(name: string, _config: Record): ChannelSink { + switch (name) { + case 'stdout': return new StdoutSink(); + case 'webhook': return new WebhookSink(); + default: throw new Error(`Unknown sink: ${name}`); + } } // ── CLI registration ──────────────────────────────────────────────── @@ -254,21 +254,15 @@ export function registerChannelCommand(program: Command): void { await cursors.load(); const sources = getSources(); - const sinks = getSinks(); - - // Init sinks with config from subscriptions - for (const sub of subs) { - const sink = sinks.get(sub.sink); - if (sink) await sink.init(sub.sinkConfig); - } const dedup = new Dedup(); - const scheduler = new Scheduler(sources, sinks, registry, cursors, dedup); + const scheduler = new Scheduler(sources, createSink, registry, cursors, dedup); const shutdown = (): void => { console.log('\nStopping channel daemon...'); scheduler.stop(); - process.exit(0); + // Flush cursors before exit + cursors.save().catch(() => {}).finally(() => process.exit(0)); }; process.on('SIGINT', shutdown); process.on('SIGTERM', shutdown); diff --git a/src/channel/scheduler.ts b/src/channel/scheduler.ts index b3a764d5..caa883c9 100644 --- a/src/channel/scheduler.ts +++ b/src/channel/scheduler.ts @@ -12,10 +12,19 @@ const DEFAULT_INTERVAL_MS = 60_000; const MIN_INTERVAL_MS = 10_000; const MAX_BACKOFF_MS = 5 * 60_000; +/** Factory that creates a fresh sink instance per subscription. */ +export type SinkFactory = (name: string, config: Record) => ChannelSink; + +interface SubscriptionSink { + subscriptionId: string; + sink: ChannelSink; +} + interface OriginLoop { origin: string; source: ChannelSource; pollConfig: SourcePollConfig; + sinks: SubscriptionSink[]; timer: ReturnType | null; intervalMs: number; consecutiveErrors: number; @@ -27,7 +36,7 @@ export class Scheduler { constructor( private readonly sources: Map, - private readonly sinks: Map, + private readonly sinkFactory: SinkFactory, private readonly registry: SubscriptionRegistry, private readonly cursors: CursorStore, private readonly dedup: Dedup, @@ -38,7 +47,7 @@ export class Scheduler { const origins = this.registry.origins(); for (const origin of origins) { - this.startOriginLoop(origin); + await this.startOriginLoop(origin); } } @@ -50,7 +59,7 @@ export class Scheduler { this.loops.clear(); } - private startOriginLoop(origin: string): void { + private async startOriginLoop(origin: string): Promise { if (this.loops.has(origin)) return; // Parse origin → source + config @@ -67,8 +76,25 @@ export class Scheduler { return; } - // Use subscription-level interval override, or default + // Create a dedicated sink instance per subscription const subs = this.registry.forOrigin(origin); + const sinkInstances: SubscriptionSink[] = []; + for (const sub of subs) { + try { + const sink = this.sinkFactory(sub.sink, sub.sinkConfig); + await sink.init(sub.sinkConfig); + sinkInstances.push({ subscriptionId: sub.id, sink }); + } catch (e) { + console.error(`[channel] Failed to init sink "${sub.sink}" for subscription ${sub.id}:`, e); + } + } + + if (sinkInstances.length === 0) { + console.error(`[channel] No valid sinks for origin "${origin}", skipping.`); + return; + } + + // Use subscription-level interval override, or default const overrideMs = subs.reduce((min, s) => { if (s.intervalMs > 0 && s.intervalMs < min) return s.intervalMs; return min; @@ -78,6 +104,7 @@ export class Scheduler { origin, source, pollConfig, + sinks: sinkInstances, timer: null, intervalMs: Math.max(overrideMs, MIN_INTERVAL_MS), consecutiveErrors: 0, @@ -106,21 +133,15 @@ export class Scheduler { } } - // Deliver to all subscribers of this origin + // Deliver to each subscription's dedicated sink let anyDelivered = false; if (fresh.length > 0) { - const subs = this.registry.forOrigin(loop.origin); - for (const sub of subs) { - const sink = this.sinks.get(sub.sink); - if (!sink) { - console.error(`[channel] Unknown sink "${sub.sink}" for subscription ${sub.id}`); - continue; - } + for (const { subscriptionId, sink } of loop.sinks) { try { await sink.deliver(fresh); anyDelivered = true; } catch (e) { - console.error(`[channel] Sink "${sub.sink}" delivery failed:`, e); + console.error(`[channel] Sink delivery failed for subscription ${subscriptionId}:`, e); } } } diff --git a/src/channel/sources/github.ts b/src/channel/sources/github.ts index 75ea2b4c..e299a523 100644 --- a/src/channel/sources/github.ts +++ b/src/channel/sources/github.ts @@ -179,15 +179,18 @@ export class GitHubSource implements ChannelSource { user: { login: string }; }>>(endpoint); - // Filter out events at or before cursor (since= is inclusive) + // GitHub's `since` param filters by updated_at, so we must also + // filter by updated_at to avoid missing edits or re-delivering stale items. const cursorTs = cursor ? Date.parse(cursor) : 0; const events: ChannelEvent[] = data - .filter(comment => Date.parse(comment.created_at) > cursorTs) + .filter(comment => Date.parse(comment.updated_at) > cursorTs) .map(comment => ({ - id: `gh-comment-${comment.id}`, + id: `gh-comment-${comment.id}-${comment.updated_at}`, source: 'github', - type: 'issue_comment.created', - timestamp: comment.created_at, + type: comment.created_at === comment.updated_at + ? 'issue_comment.created' + : 'issue_comment.updated', + timestamp: comment.updated_at, origin: `github:${c.owner}/${c.repo}#${c.number}`, payload: { author: comment.user.login, @@ -197,7 +200,7 @@ export class GitHubSource implements ChannelSource { })); const newCursor = data.length > 0 - ? data[data.length - 1].created_at + ? data[data.length - 1].updated_at : (cursor ?? ''); return { From 24bde5d5c18e3b6cfbb2ea7518477b3e51479da7 Mon Sep 17 00:00:00 2001 From: Fourier Date: Wed, 25 Mar 2026 16:43:43 +0800 Subject: [PATCH 5/7] fix: final polish from second-round review - Clean up PID file on foreground daemon shutdown (SIGINT/SIGTERM) - Guard against spawn failure (child.pid undefined) - Reuse validated intervalMs variable instead of re-parsing - Consistent URL construction in pollIssues (no ?& artifact) --- src/channel/index.ts | 7 ++++++- src/channel/sources/github.ts | 4 ++-- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/src/channel/index.ts b/src/channel/index.ts index 019a1bb4..7d1b518f 100644 --- a/src/channel/index.ts +++ b/src/channel/index.ts @@ -138,7 +138,7 @@ export function registerChannelCommand(program: Command): void { const registry = new SubscriptionRegistry(); await registry.load(); - const sub = registry.add(origin, opts.sink, sinkConfig, parseInt(opts.interval, 10)); + const sub = registry.add(origin, opts.sink, sinkConfig, intervalMs); await registry.save(); console.log(`✅ Subscribed to ${origin}`); @@ -234,6 +234,10 @@ export function registerChannelCommand(program: Command): void { stdio: 'ignore', }); child.unref(); + if (!child.pid) { + console.error('Failed to spawn daemon process.'); + process.exit(1); + } writeFileSync(PID_FILE, String(child.pid)); console.log(`Channel daemon started (PID: ${child.pid})`); return; @@ -261,6 +265,7 @@ export function registerChannelCommand(program: Command): void { const shutdown = (): void => { console.log('\nStopping channel daemon...'); scheduler.stop(); + try { unlinkSync(PID_FILE); } catch {} // Flush cursors before exit cursors.save().catch(() => {}).finally(() => process.exit(0)); }; diff --git a/src/channel/sources/github.ts b/src/channel/sources/github.ts index e299a523..8ccbcb1d 100644 --- a/src/channel/sources/github.ts +++ b/src/channel/sources/github.ts @@ -254,8 +254,8 @@ export class GitHubSource implements ChannelSource { } private pollIssues(c: IssuesPollConfig, cursor: string | null): PollResult { - const sinceParam = cursor ? `?since=${cursor}` : '?'; - const endpoint = `/repos/${c.owner}/${c.repo}/issues${sinceParam}&state=all&sort=updated&direction=desc&per_page=30`; + const sinceParam = cursor ? `&since=${cursor}` : ''; + const endpoint = `/repos/${c.owner}/${c.repo}/issues?state=all&sort=updated&direction=desc&per_page=30${sinceParam}`; const { data, pollInterval } = ghJson Date: Wed, 25 Mar 2026 16:46:49 +0800 Subject: [PATCH 6/7] fix: address Codex R2 findings - Cursor only advances when ALL sinks succeed (not any-one), preventing a failing sink from permanently missing events - Webhook fetch: add 30s AbortController timeout to prevent hung connections from blocking the poll loop indefinitely - Registry dedup: include sinkConfig in dedup key, so two webhook subscriptions with different URLs are correctly stored as separate entries. Update interval on re-subscribe instead of silently ignoring. --- src/channel/registry.ts | 15 ++++++++++++--- src/channel/scheduler.ts | 10 ++++++---- src/channel/sinks/webhook.ts | 21 ++++++++++++++------- 3 files changed, 32 insertions(+), 14 deletions(-) diff --git a/src/channel/registry.ts b/src/channel/registry.ts index f7ef67b7..0841fbdd 100644 --- a/src/channel/registry.ts +++ b/src/channel/registry.ts @@ -38,9 +38,18 @@ export class SubscriptionRegistry { } add(origin: string, sink: string, sinkConfig: Record = {}, intervalMs = 0): Subscription { - // Check for existing subscription with same origin + sink - const existing = this.subs.find(s => s.origin === origin && s.sink === sink); - if (existing) return existing; + // Check for existing subscription with same origin + sink + config + const configKey = JSON.stringify(sinkConfig); + const existing = this.subs.find( + s => s.origin === origin && s.sink === sink && JSON.stringify(s.sinkConfig) === configKey, + ); + if (existing) { + // Update interval if changed + if (intervalMs !== existing.intervalMs) { + existing.intervalMs = intervalMs; + } + return existing; + } const sub: Subscription = { id: randomUUID(), diff --git a/src/channel/scheduler.ts b/src/channel/scheduler.ts index caa883c9..e6b6b39b 100644 --- a/src/channel/scheduler.ts +++ b/src/channel/scheduler.ts @@ -134,20 +134,22 @@ export class Scheduler { } // Deliver to each subscription's dedicated sink - let anyDelivered = false; + let allDelivered = true; if (fresh.length > 0) { for (const { subscriptionId, sink } of loop.sinks) { try { await sink.deliver(fresh); - anyDelivered = true; } catch (e) { + allDelivered = false; console.error(`[channel] Sink delivery failed for subscription ${subscriptionId}:`, e); } } } - // Only advance cursor if at least one sink succeeded (or no events) - if (fresh.length === 0 || anyDelivered) { + // Only advance cursor if ALL sinks succeeded (or no events). + // This prevents data loss: a failing sink will see the same events + // on the next poll rather than missing them forever. + if (fresh.length === 0 || allDelivered) { this.cursors.set(loop.origin, result.cursor, fresh.length); await this.cursors.save(); } diff --git a/src/channel/sinks/webhook.ts b/src/channel/sinks/webhook.ts index a96e3fe9..8fd3136a 100644 --- a/src/channel/sinks/webhook.ts +++ b/src/channel/sinks/webhook.ts @@ -27,13 +27,20 @@ export class WebhookSink implements ChannelSink { async deliver(events: ChannelEvent[]): Promise { for (const event of events) { try { - const res = await fetch(this.url, { - method: 'POST', - headers: { 'Content-Type': 'application/json', ...this.headers }, - body: JSON.stringify(event), - }); - if (!res.ok) { - console.error(`[webhook] ${res.status} ${res.statusText} for event ${event.id}`); + const controller = new AbortController(); + const timeout = setTimeout(() => controller.abort(), 30_000); + try { + const res = await fetch(this.url, { + method: 'POST', + headers: { 'Content-Type': 'application/json', ...this.headers }, + body: JSON.stringify(event), + signal: controller.signal, + }); + if (!res.ok) { + console.error(`[webhook] ${res.status} ${res.statusText} for event ${event.id}`); + } + } finally { + clearTimeout(timeout); } } catch (e) { console.error(`[webhook] Failed to deliver event ${event.id}:`, e); From 1ac9999fa7b7b5934560fd6d7f5f14ddb56978a1 Mon Sep 17 00:00:00 2001 From: Fourier Date: Wed, 25 Mar 2026 16:51:21 +0800 Subject: [PATCH 7/7] fix: eliminate all known limitations MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - async: replace execFileSync with async execFile (promisified) in GitHub source — no longer blocks the event loop, multiple origins poll concurrently - file locking: SubscriptionRegistry.withLock() uses O_CREAT|O_EXCL lockfile with stale detection (10s timeout). subscribe/unsubscribe use withLock to prevent concurrent CLI invocations from clobbering each other's changes - all poll methods now async, awaiting gh api calls --- src/channel/index.ts | 10 ++--- src/channel/registry.ts | 75 +++++++++++++++++++++++++++++++---- src/channel/sources/github.ts | 35 ++++++++-------- 3 files changed, 90 insertions(+), 30 deletions(-) diff --git a/src/channel/index.ts b/src/channel/index.ts index 7d1b518f..2baee4b0 100644 --- a/src/channel/index.ts +++ b/src/channel/index.ts @@ -137,9 +137,9 @@ export function registerChannelCommand(program: Command): void { } const registry = new SubscriptionRegistry(); - await registry.load(); - const sub = registry.add(origin, opts.sink, sinkConfig, intervalMs); - await registry.save(); + const sub = await registry.withLock(() => + registry.add(origin, opts.sink, sinkConfig, intervalMs), + ); console.log(`✅ Subscribed to ${origin}`); console.log(` ID: ${sub.id}`); @@ -154,9 +154,7 @@ export function registerChannelCommand(program: Command): void { .description('Remove subscription for an origin') .action(async (origin: string) => { const registry = new SubscriptionRegistry(); - await registry.load(); - const removed = registry.remove(origin); - await registry.save(); + const removed = await registry.withLock(() => registry.remove(origin)); if (removed) { console.log(`✅ Unsubscribed from ${origin}`); diff --git a/src/channel/registry.ts b/src/channel/registry.ts index 0841fbdd..43db04e7 100644 --- a/src/channel/registry.ts +++ b/src/channel/registry.ts @@ -1,10 +1,13 @@ /** * Subscription registry — persists who subscribes to what. * File: ~/.opencli/channel/subscriptions.json + * + * Uses a lockfile to prevent concurrent CLI invocations from clobbering + * each other's changes (e.g. two `subscribe` commands in parallel). */ -import { mkdirSync } from 'node:fs'; -import { readFile, writeFile, rename } from 'node:fs/promises'; +import { mkdirSync, existsSync, unlinkSync } from 'node:fs'; +import { readFile, writeFile, rename, open } from 'node:fs/promises'; import { dirname, join } from 'node:path'; import { homedir } from 'node:os'; import { randomUUID } from 'node:crypto'; @@ -14,10 +17,68 @@ const DEFAULT_PATH = join(homedir(), '.opencli', 'channel', 'subscriptions.json' export class SubscriptionRegistry { private subs: Subscription[] = []; + private readonly lockPath: string; - constructor(private readonly path: string = DEFAULT_PATH) {} + constructor(private readonly path: string = DEFAULT_PATH) { + this.lockPath = `${path}.lock`; + } + + /** + * Atomically load → mutate → save with file locking. + * Ensures concurrent CLI invocations don't clobber each other. + */ + async withLock(fn: () => T | Promise): Promise { + mkdirSync(dirname(this.path), { recursive: true }); + + // Acquire lock via O_CREAT|O_EXCL (atomic create-or-fail) + const maxRetries = 50; + const retryMs = 100; + let lockFd: Awaited> | null = null; + + for (let i = 0; i < maxRetries; i++) { + try { + lockFd = await open(this.lockPath, 'wx'); + break; + } catch (e: unknown) { + if ((e as NodeJS.ErrnoException).code === 'EEXIST') { + // Lock held by another process — check if stale (>10s old) + try { + const { mtimeMs } = await (await open(this.lockPath, 'r')).stat(); + (await open(this.lockPath, 'r')).close(); + if (Date.now() - mtimeMs > 10_000) { + // Stale lock — remove and retry + try { unlinkSync(this.lockPath); } catch {} + continue; + } + } catch {} + await new Promise(r => setTimeout(r, retryMs)); + continue; + } + throw e; + } + } + + if (!lockFd) { + throw new Error(`Failed to acquire lock on ${this.lockPath} after ${maxRetries} retries`); + } + try { + await this._load(); + const result = await fn(); + await this._save(); + return result; + } finally { + await lockFd.close(); + try { unlinkSync(this.lockPath); } catch {} + } + } + + /** Load without lock (for read-only operations like list/status). */ async load(): Promise { + await this._load(); + } + + private async _load(): Promise { try { const raw = await readFile(this.path, 'utf8'); this.subs = JSON.parse(raw) as Subscription[]; @@ -31,6 +92,10 @@ export class SubscriptionRegistry { } async save(): Promise { + await this._save(); + } + + private async _save(): Promise { mkdirSync(dirname(this.path), { recursive: true }); const tmp = `${this.path}.${process.pid}.${Date.now()}.tmp`; await writeFile(tmp, JSON.stringify(this.subs, null, 2), 'utf8'); @@ -38,13 +103,11 @@ export class SubscriptionRegistry { } add(origin: string, sink: string, sinkConfig: Record = {}, intervalMs = 0): Subscription { - // Check for existing subscription with same origin + sink + config const configKey = JSON.stringify(sinkConfig); const existing = this.subs.find( s => s.origin === origin && s.sink === sink && JSON.stringify(s.sinkConfig) === configKey, ); if (existing) { - // Update interval if changed if (intervalMs !== existing.intervalMs) { existing.intervalMs = intervalMs; } @@ -79,12 +142,10 @@ export class SubscriptionRegistry { return [...this.subs]; } - /** Get all unique origins. */ origins(): string[] { return [...new Set(this.subs.map(s => s.origin))]; } - /** Get all subscriptions for a given origin. */ forOrigin(origin: string): Subscription[] { return this.subs.filter(s => s.origin === origin); } diff --git a/src/channel/sources/github.ts b/src/channel/sources/github.ts index 8ccbcb1d..c971db3f 100644 --- a/src/channel/sources/github.ts +++ b/src/channel/sources/github.ts @@ -10,7 +10,8 @@ * github:owner/repo/issues — all issue activity */ -import { execFileSync } from 'node:child_process'; +import { execFile } from 'node:child_process'; +import { promisify } from 'node:util'; import type { ChannelEvent, ChannelSource, @@ -19,6 +20,8 @@ import type { SubscribableItem, } from '../types.js'; +const execFileAsync = promisify(execFile); + // ── Origin parsing ────────────────────────────────────────────────── interface RepoPollConfig extends SourcePollConfig { @@ -50,17 +53,15 @@ type GitHubPollConfig = RepoPollConfig | IssuePollConfig | PullsPollConfig | Iss // ── Helpers ───────────────────────────────────────────────────────── -function ghJson(endpoint: string): { data: T; pollInterval?: number } { - // Use execFileSync to avoid shell injection — origin strings are user-supplied. - const raw = execFileSync('gh', ['api', '--include', endpoint], { +async function ghJson(endpoint: string): Promise<{ data: T; pollInterval?: number }> { + const { stdout } = await execFileAsync('gh', ['api', '--include', endpoint], { encoding: 'utf8', timeout: 30_000, }); - // Find the start of JSON body — first '[' or '{' that starts a line - const jsonStart = raw.search(/^\s*[\[{]/m); - const headers = jsonStart > 0 ? raw.slice(0, jsonStart) : ''; - const body = jsonStart >= 0 ? raw.slice(jsonStart) : raw; + const jsonStart = stdout.search(/^\s*[\[{]/m); + const headers = jsonStart > 0 ? stdout.slice(0, jsonStart) : ''; + const body = jsonStart >= 0 ? stdout.slice(jsonStart) : stdout; const data = JSON.parse(body) as T; @@ -80,7 +81,7 @@ export class GitHubSource implements ChannelSource { async listSubscribable(_config: Record): Promise { // List user's repos as subscribable items try { - const { data } = ghJson>( + const { data } = await ghJson>( '/user/repos?per_page=30&sort=updated', ); return data.map(r => ({ @@ -132,9 +133,9 @@ export class GitHubSource implements ChannelSource { // ── Poll strategies ───────────────────────────────────────────── - private pollRepoEvents(c: RepoPollConfig, cursor: string | null): PollResult { + private async pollRepoEvents(c: RepoPollConfig, cursor: string | null): Promise { const endpoint = `/repos/${c.owner}/${c.repo}/events?per_page=100`; - const { data, pollInterval } = ghJson { const sinceParam = cursor ? `?since=${cursor}` : ''; const endpoint = `/repos/${c.owner}/${c.repo}/issues/${c.number}/comments${sinceParam}`; - const { data, pollInterval } = ghJson { const sinceParam = cursor ? `&since=${cursor}` : ''; const endpoint = `/repos/${c.owner}/${c.repo}/pulls?state=all&sort=updated&direction=desc&per_page=30${sinceParam}`; - const { data, pollInterval } = ghJson { const sinceParam = cursor ? `&since=${cursor}` : ''; const endpoint = `/repos/${c.owner}/${c.repo}/issues?state=all&sort=updated&direction=desc&per_page=30${sinceParam}`; - const { data, pollInterval } = ghJson