diff --git a/.github/workflows/release-deb.yml b/.github/workflows/release-deb.yml new file mode 100644 index 000000000..c825b304a --- /dev/null +++ b/.github/workflows/release-deb.yml @@ -0,0 +1,157 @@ +name: Release Debian Package + +on: + release: + types: [published] + workflow_dispatch: + inputs: + tag: + description: "Release tag to upload assets to (e.g. v0.26.2)." + required: false + type: string + +permissions: + contents: write + +jobs: + build-deb: + strategy: + fail-fast: false + matrix: + include: + - name: linux-x64 + runs-on: ubuntu-24.04 + asset-arch: x86-64 + - name: linux-arm64 + runs-on: ubuntu-24.04-arm + asset-arch: aarch64 + runs-on: ${{ matrix.runs-on }} + env: + RELEASE_TAG: ${{ inputs.tag || github.ref_name }} + steps: + - uses: actions/checkout@v6 + + - name: Install build dependencies + run: sudo apt-get update && sudo apt-get install -y dpkg python3 python3-gi python3-gi-cairo gir1.2-gtk-3.0 gir1.2-ayatanaappindicator3-0.1 gir1.2-webkit2-4.1 + + - name: Install Swift 6.2.1 via swiftly + shell: bash + run: | + set -euo pipefail + + if ! command -v gpg >/dev/null 2>&1; then + sudo apt-get update + sudo apt-get install -y ca-certificates gpg + fi + + SWIFTLY_ARCH="$(uname -m)" + SWIFTLY_HOME_DIR="$HOME/.local/share/swiftly" + SWIFTLY_BIN_DIR="$HOME/.local/bin" + POST_INSTALL_SCRIPT="$(mktemp)" + + mkdir -p "$SWIFTLY_BIN_DIR" + curl -fsSL "https://download.swift.org/swiftly/linux/swiftly-${SWIFTLY_ARCH}.tar.gz" -o /tmp/swiftly.tar.gz + tar -xzf /tmp/swiftly.tar.gz -C /tmp + /tmp/swiftly init --assume-yes --skip-install + + . "$SWIFTLY_HOME_DIR/env.sh" + echo "$SWIFTLY_BIN_DIR" >> "$GITHUB_PATH" + echo "SWIFTLY_HOME_DIR=$SWIFTLY_HOME_DIR" >> "$GITHUB_ENV" + echo "SWIFTLY_BIN_DIR=$SWIFTLY_BIN_DIR" >> "$GITHUB_ENV" + + swiftly install 6.2.1 --use --assume-yes --post-install-file "$POST_INSTALL_SCRIPT" + if [[ -s "$POST_INSTALL_SCRIPT" ]]; then + sudo apt-get update + sudo bash "$POST_INSTALL_SCRIPT" + fi + + hash -r + swift --version + + - name: Build CodexBarCLI (release) + id: build + shell: bash + run: | + set -euo pipefail + swift build -c release --product CodexBarCLI --static-swift-stdlib + BIN_DIR="$(swift build -c release --product CodexBarCLI --static-swift-stdlib --show-bin-path)" + echo "bin_dir=$BIN_DIR" >> "$GITHUB_OUTPUT" + + # Write VERSION file so --version smoke test works + VERSION="${RELEASE_TAG#v}" + printf '%s\n' "$VERSION" > "$BIN_DIR/VERSION" + + - name: Smoke test + timeout-minutes: 5 + shell: bash + run: | + set -euo pipefail + BIN="${{ steps.build.outputs.bin_dir }}/CodexBarCLI" + file "$BIN" + file "$BIN" | grep -q "${{ matrix.asset-arch }}" + + run_with_timeout() { + local out="$1"; shift + "$@" > "$out" & + local pid=$! + for _ in {1..20}; do + if ! kill -0 "$pid" 2>/dev/null; then + wait "$pid"; return $? + fi + sleep 1 + done + kill "$pid" 2>/dev/null || true; wait "$pid" 2>/dev/null || true + echo "$* timed out" >&2; return 124 + } + + run_with_timeout /tmp/help.txt "$BIN" --help + run_with_timeout /tmp/ver.txt "$BIN" --version + grep -Fx "CodexBar ${RELEASE_TAG#v}" /tmp/ver.txt + + - name: Package .deb + id: pkg + shell: bash + run: | + set -euo pipefail + OUT_DIR="$(mktemp -d)" + VERSION="${RELEASE_TAG#v}" + ./Scripts/package_deb.sh \ + --version "$VERSION" \ + --bin "${{ steps.build.outputs.bin_dir }}/CodexBarCLI" \ + --out-dir "$OUT_DIR" + + DEB_ARCH="$(dpkg --print-architecture)" + ASSET="codexbar_${VERSION}_${DEB_ARCH}.deb" + echo "out_dir=$OUT_DIR" >> "$GITHUB_OUTPUT" + echo "asset=$ASSET" >> "$GITHUB_OUTPUT" + echo "deb=$OUT_DIR/$ASSET" >> "$GITHUB_OUTPUT" + + - name: Verify .deb + shell: bash + run: | + set -euo pipefail + dpkg-deb --info "${{ steps.pkg.outputs.deb }}" + dpkg-deb --contents "${{ steps.pkg.outputs.deb }}" + + - name: Upload release assets + if: github.event_name == 'release' || inputs.tag != '' + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + shell: bash + run: | + set -euo pipefail + OUT_DIR="${{ steps.pkg.outputs.out_dir }}" + ASSET="${{ steps.pkg.outputs.asset }}" + gh release upload "${RELEASE_TAG}" \ + "$OUT_DIR/$ASSET" \ + "$OUT_DIR/$ASSET.sha256" \ + --clobber + + - name: Upload workflow artifact (manual runs) + if: github.event_name != 'release' && inputs.tag == '' + uses: actions/upload-artifact@v6 + with: + name: codexbar-deb-${{ matrix.name }} + path: | + ${{ steps.pkg.outputs.out_dir }}/${{ steps.pkg.outputs.asset }} + ${{ steps.pkg.outputs.out_dir }}/${{ steps.pkg.outputs.asset }}.sha256 diff --git a/README.md b/README.md index ef9e29543..fcbd34859 100644 --- a/README.md +++ b/README.md @@ -34,6 +34,20 @@ Download: brew install --cask steipete/tap/codexbar ``` +### Debian / Ubuntu (.deb) +Download `codexbar__amd64.deb` (or `arm64`) from GitHub Releases and install: +```bash +sudo dpkg -i codexbar__amd64.deb +sudo apt-get install -f # pulls in Python/GTK dependencies +``` + +After installing: +- **System tray icon** appears in the GNOME top bar automatically on next login (or run `codexbar-tray` now) +- **Clicking the icon** starts `codexbar serve` and opens the usage dashboard in your browser +- **`codexbar` CLI** is available in the terminal + +Web/browser-backed providers are macOS-only; API-key and OAuth providers work on Linux. + ### CLI Tarballs (macOS/Linux) Homebrew formula (Linux today): ```bash @@ -172,6 +186,14 @@ CLI install: ./bin/install-codexbar-cli.sh ``` +Debian/Ubuntu package (Linux, requires Swift 6.2+ and `dpkg`): +```bash +./Scripts/package_deb.sh # builds CLI + packages tray app as a .deb +sudo dpkg -i codexbar_*.deb +sudo apt-get install -f # install Python/GTK deps +codexbar-tray & # launch tray icon immediately +``` + ## Related - ✂️ [Trimmy](https://github.com/steipete/Trimmy) — “Paste once, run once.” Flatten multi-line shell snippets so they paste and run. - 🧳 [MCPorter](https://mcporter.dev) — TypeScript toolkit + CLI for Model Context Protocol servers. diff --git a/Scripts/package_deb.sh b/Scripts/package_deb.sh new file mode 100755 index 000000000..79e969024 --- /dev/null +++ b/Scripts/package_deb.sh @@ -0,0 +1,290 @@ +#!/usr/bin/env bash +# Builds a .deb package for CodexBar on Linux (x86_64 or aarch64). +# +# What gets installed: +# /usr/bin/CodexBarCLI — statically linked CLI binary +# /usr/bin/codexbar — symlink → CodexBarCLI +# /usr/bin/codexbar-tray — Python tray daemon +# /usr/bin/codexbar-linux-fetcher — Python cookie/sidecar fetcher +# /usr/share/icons/hicolor/512x512/apps/codexbar.png +# /usr/share/applications/codexbar.desktop — app launcher entry +# /etc/xdg/autostart/codexbar-tray.desktop — start tray on login +# +# Usage: +# ./Scripts/package_deb.sh [--version VERSION] [--bin PATH] [--out-dir DIR] +# +# Requires: swift (to build if binary missing), dpkg-deb + +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +REPO_ROOT="$(cd "$SCRIPT_DIR/.." && pwd)" + +# ── defaults ────────────────────────────────────────────────────────────────── +VERSION="" +BIN_PATH="" +OUT_DIR="$REPO_ROOT" + +# ── argument parsing ────────────────────────────────────────────────────────── +while [[ $# -gt 0 ]]; do + case "$1" in + --version) VERSION="$2"; shift 2 ;; + --bin) BIN_PATH="$2"; shift 2 ;; + --out-dir) OUT_DIR="$2"; shift 2 ;; + *) echo "Unknown argument: $1" >&2; exit 1 ;; + esac +done + +# ── resolve version ─────────────────────────────────────────────────────────── +if [[ -z "$VERSION" ]]; then + if [[ -f "$REPO_ROOT/version.env" ]]; then + # shellcheck source=/dev/null + source "$REPO_ROOT/version.env" + VERSION="${MARKETING_VERSION:-}" + fi +fi +if [[ -z "$VERSION" ]]; then + echo "Could not determine version. Pass --version or ensure version.env exists." >&2 + exit 1 +fi + +# ── resolve architecture ────────────────────────────────────────────────────── +MACHINE="$(uname -m)" +case "$MACHINE" in + x86_64) DEB_ARCH="amd64" ;; + aarch64|arm64) DEB_ARCH="arm64" ;; + *) + echo "Unsupported architecture: $MACHINE" >&2 + exit 1 + ;; +esac + +# ── build CLI binary if needed ──────────────────────────────────────────────── +if [[ -z "$BIN_PATH" ]]; then + BIN_PATH="$REPO_ROOT/.build/release/CodexBarCLI" +fi +if [[ ! -f "$BIN_PATH" ]]; then + echo "Binary not found at $BIN_PATH — building..." + cd "$REPO_ROOT" + swift build -c release --product CodexBarCLI --static-swift-stdlib +fi +if [[ ! -f "$BIN_PATH" ]]; then + echo "Build succeeded but binary not found at $BIN_PATH" >&2 + exit 1 +fi + +# ── check dependencies ──────────────────────────────────────────────────────── +if ! command -v dpkg-deb >/dev/null 2>&1; then + echo "dpkg-deb not found. Install it with: sudo apt-get install -y dpkg" >&2 + exit 1 +fi + +ICON_SRC="$REPO_ROOT/docs/icon.png" +if [[ ! -f "$ICON_SRC" ]]; then + echo "Icon not found at $ICON_SRC" >&2 + exit 1 +fi + +TRAY_SRC="$REPO_ROOT/bin/codexbar-tray" +if [[ ! -f "$TRAY_SRC" ]]; then + echo "Tray script not found at $TRAY_SRC" >&2 + exit 1 +fi + +FETCHER_SRC="$REPO_ROOT/bin/codexbar-linux-fetcher" +if [[ ! -f "$FETCHER_SRC" ]]; then + echo "Linux fetcher script not found at $FETCHER_SRC" >&2 + exit 1 +fi + +# ── assemble package tree ───────────────────────────────────────────────────── +PKG_DIR="$(mktemp -d)" +trap 'rm -rf "$PKG_DIR"' EXIT + +install -d "$PKG_DIR/DEBIAN" +install -d "$PKG_DIR/usr/bin" +install -d "$PKG_DIR/usr/share/icons/hicolor/512x512/apps" +install -d "$PKG_DIR/usr/share/applications" +install -d "$PKG_DIR/etc/xdg/autostart" + +# CLI binary + symlink +install -m 0755 "$BIN_PATH" "$PKG_DIR/usr/bin/CodexBarCLI" +ln -s "CodexBarCLI" "$PKG_DIR/usr/bin/codexbar" + +# tray daemon +install -m 0755 "$TRAY_SRC" "$PKG_DIR/usr/bin/codexbar-tray" +install -m 0755 "$FETCHER_SRC" "$PKG_DIR/usr/bin/codexbar-linux-fetcher" + +# icon +install -m 0644 "$ICON_SRC" "$PKG_DIR/usr/share/icons/hicolor/512x512/apps/codexbar.png" + +# app launcher .desktop — clicking opens the tray (or if already running, brings browser) +cat > "$PKG_DIR/usr/share/applications/codexbar.desktop" <<'DESKTOP' +[Desktop Entry] +Name=CodexBar +Comment=AI coding-provider usage tracker +Exec=codexbar-tray +Icon=codexbar +Terminal=false +Type=Application +Categories=Utility;Development; +Keywords=AI;Claude;Codex;Gemini;Cursor;usage;tokens;limits; +StartupNotify=false +DESKTOP + +# autostart .desktop — starts tray daemon on login +cat > "$PKG_DIR/etc/xdg/autostart/codexbar-tray.desktop" <<'AUTOSTART' +[Desktop Entry] +Name=CodexBar Tray +Comment=AI coding-provider usage tracker (tray daemon) +Exec=codexbar-tray +Icon=codexbar +Terminal=false +Type=Application +Categories=Utility; +StartupNotify=false +X-GNOME-Autostart-enabled=true +AUTOSTART + +# post-install: refresh icon cache + create starter config +cat > "$PKG_DIR/DEBIAN/postinst" <<'POSTINST' +#!/bin/sh +set -e + +# ── icon / desktop caches ───────────────────────────────────────────────────── +if command -v update-icon-caches >/dev/null 2>&1; then + update-icon-caches /usr/share/icons/hicolor 2>/dev/null || true +elif command -v gtk-update-icon-cache >/dev/null 2>&1; then + gtk-update-icon-cache -f -t /usr/share/icons/hicolor 2>/dev/null || true +fi +if command -v update-desktop-database >/dev/null 2>&1; then + update-desktop-database /usr/share/applications 2>/dev/null || true +fi + +# ── create starter config for the installing user ──────────────────────────── +# $SUDO_USER is set when running via sudo; fall back to $USER or root +TARGET_USER="${SUDO_USER:-${USER:-root}}" +if [ "$TARGET_USER" = "root" ]; then + TARGET_HOME="/root" +else + TARGET_HOME="$(getent passwd "$TARGET_USER" 2>/dev/null | cut -d: -f6)" || true + TARGET_HOME="${TARGET_HOME:-/home/$TARGET_USER}" +fi + +CONFIG_DIR="$TARGET_HOME/.codexbar" +CONFIG_FILE="$CONFIG_DIR/config.json" + +mkdir -p "$CONFIG_DIR" +# Only write if no config exists yet (never overwrite user tokens) +if [ ! -f "$CONFIG_FILE" ]; then + cat > "$CONFIG_FILE" <<'CONFIG' +{ + "version": 1, + "providers": [ + { "id": "claude", "enabled": true, "source": "cli" }, + { "id": "codex", "enabled": true }, + { "id": "cursor", "enabled": true } + ] +} +CONFIG + chown -R "$TARGET_USER" "$CONFIG_DIR" 2>/dev/null || true +fi + +# ── print setup instructions ────────────────────────────────────────────────── +cat <<'MSG' + +╔══════════════════════════════════════════════════════════════════════╗ +║ CodexBar installed successfully! ║ +╠══════════════════════════════════════════════════════════════════════╣ +║ ║ +║ CLAUDE — works automatically via `claude` CLI. ║ +║ Install: npm i -g @anthropic-ai/claude-code ║ +║ ║ +║ CODEX — works automatically via `codex` CLI. ║ +║ Install: npm i -g @openai/codex ║ +║ Then run: codex login ║ +║ ║ +║ CURSOR — needs a session token (stored in Chrome's encrypted ║ +║ keyring, not accessible from the terminal). ║ +║ 1. Open cursor.com in Firefox (or Chrome DevTools) ║ +║ 2. Find cookie: WorkosCursorSessionToken ║ +║ 3. Add to ~/.codexbar/config.json: ║ +║ "sessionToken": "" (under id=cursor) ║ +║ ║ +║ To start: codexbar-tray & ║ +║ Dashboard: http://localhost:8081 ║ +╚══════════════════════════════════════════════════════════════════════╝ + +MSG +POSTINST +chmod 0755 "$PKG_DIR/DEBIAN/postinst" + +# post-remove: clean up icon cache +cat > "$PKG_DIR/DEBIAN/postrm" <<'POSTRM' +#!/bin/sh +set -e +if command -v update-icon-caches >/dev/null 2>&1; then + update-icon-caches /usr/share/icons/hicolor 2>/dev/null || true +elif command -v gtk-update-icon-cache >/dev/null 2>&1; then + gtk-update-icon-cache -f -t /usr/share/icons/hicolor 2>/dev/null || true +fi +if command -v update-desktop-database >/dev/null 2>&1; then + update-desktop-database /usr/share/applications 2>/dev/null || true +fi +POSTRM +chmod 0755 "$PKG_DIR/DEBIAN/postrm" + +INSTALLED_SIZE="$(du -sk "$PKG_DIR/usr" "$PKG_DIR/etc" | awk '{s+=$1} END{print s}')" + +# ── DEBIAN/control ──────────────────────────────────────────────────────────── +cat > "$PKG_DIR/DEBIAN/control" < +Installed-Size: $INSTALLED_SIZE +Depends: python3 (>= 3.9), python3-gi, python3-gi-cairo, python3-cryptography, gir1.2-gtk-3.0, gir1.2-ayatanaappindicator3-0.1, gir1.2-webkit2-4.1 | gir1.2-webkit2-4.0, gnome-shell-extension-appindicator, x-terminal-emulator +Section: utils +Priority: optional +Homepage: https://codexbar.app +Description: AI coding-provider usage tracker + CodexBar tracks AI coding-provider limits and usage windows. + . + On Linux this package provides: + - codexbar CLI for API-key and OAuth-based providers + - A native GNOME system tray icon (top bar) via codexbar-tray + - A web dashboard at http://localhost:8080 (opened on click) + . + Web/browser-backed providers are macOS-only. +EOF + +# ── DEBIAN/copyright ────────────────────────────────────────────────────────── +cat > "$PKG_DIR/DEBIAN/copyright" <<'EOF' +Format: https://www.debian.org/doc/packaging-manuals/copyright-format/1.0/ +Upstream-Name: codexbar +Upstream-Contact: steipete@gmail.com +Source: https://github.com/steipete/CodexBar + +Files: * +Copyright: Peter Steinberger +License: MIT +EOF + +# ── build .deb ──────────────────────────────────────────────────────────────── +mkdir -p "$OUT_DIR" +DEB_FILE="${OUT_DIR}/codexbar_${VERSION}_${DEB_ARCH}.deb" +dpkg-deb --build --root-owner-group "$PKG_DIR" "$DEB_FILE" + +if command -v sha256sum >/dev/null 2>&1; then + sha256sum "$DEB_FILE" > "${DEB_FILE}.sha256" +else + shasum -a 256 "$DEB_FILE" > "${DEB_FILE}.sha256" +fi + +echo "" +echo "Package: $DEB_FILE" +echo "SHA256: $(awk '{print $1}' "${DEB_FILE}.sha256")" +echo "" +echo "Install with:" +echo " sudo dpkg -i $DEB_FILE" +echo " sudo apt-get install -f # install any missing dependencies" diff --git a/Sources/CodexBarCLI/CLIServeCommand.swift b/Sources/CodexBarCLI/CLIServeCommand.swift index 70aeefc8a..2797b9b1a 100644 --- a/Sources/CodexBarCLI/CLIServeCommand.swift +++ b/Sources/CodexBarCLI/CLIServeCommand.swift @@ -20,6 +20,7 @@ struct ServeOptions: CommanderParsable { } enum CLIServeRoute: Equatable { + case root case health case usage(provider: String?) case cost(provider: String?) @@ -40,6 +41,8 @@ enum CLIServeRouter { let normalizedProvider = provider?.isEmpty == false ? provider : nil switch path { + case "/", "/index.html": + return .root case "/health": return .health case "/usage": @@ -186,6 +189,11 @@ extension CodexBarCLI { } switch route { + case .root: + return CLILocalHTTPResponse( + status: .ok, + body: Data(CLIWebUI.html.utf8), + contentType: "text/html; charset=utf-8") case .health: return Self.serveJSON(ServeHealthPayload(status: "ok")) case let .usage(provider): diff --git a/Sources/CodexBarCLI/CLIWebUI.swift b/Sources/CodexBarCLI/CLIWebUI.swift new file mode 100644 index 000000000..6d43f509f --- /dev/null +++ b/Sources/CodexBarCLI/CLIWebUI.swift @@ -0,0 +1,604 @@ +// swiftlint:disable file_length +enum CLIWebUI { + // Single-file web UI served at / by `codexbar serve`. + // Replicates the macOS popover layout: provider tabs, usage bars, reset countdowns, cost section. + static let html = #""" + + + + + +CodexBar + + + +
+ +
+
+ + + + + + + CodexBar +
+ +
+ +
+
+
+
⚠️
+
Could not connect to CodexBar server.
+
Make sure codexbar serve is running.
+
+ + +
+ + + + +"""# +} +// swiftlint:enable file_length diff --git a/bin/codexbar-linux-fetcher b/bin/codexbar-linux-fetcher new file mode 100755 index 000000000..78a696c4c --- /dev/null +++ b/bin/codexbar-linux-fetcher @@ -0,0 +1,1207 @@ +#!/usr/bin/env python3 +""" +codexbar-linux-fetcher — sidecar that fetches cookie/local-storage-based provider +usage on Linux and returns a JSON array in the same shape as `codexbar serve /usage`. + +Providers handled here (macOS-only in the Swift layer): + codex — chatgpt.com cookie → /backend-api/wham/usage + cursor — cursor.com cookie → /api/usage-summary + /api/auth/me + windsurf — Chrome LevelDB local storage → windsurf.com GetPlanStatus gRPC-Web + perplexity — perplexity.ai cookie → /rest/billing/credits + mistral — admin.mistral.ai cookie → /api/billing/v2/usage + kimi — kimi.com cookie (kimi-auth JWT) → BillingService/GetUsages + abacus — abacus.ai cookie → /api/_getOrganizationComputePoints + minimax — minimax.io cookie → /v1/api/openplatform/coding_plan/remains + +Usage: codexbar-linux-fetcher [--providers p1,p2,...] [--json] +Output: JSON array of provider payloads, written to stdout. + +Dependencies (all stdlib or commonly available): + sqlite3, json, os, re, struct, hmac, hashlib, base64, datetime, urllib + Optional: secretstorage (for Chrome cookie decryption on Linux) +""" + +import argparse +import base64 +import datetime +import fcntl +import hashlib +import hmac +import json +import os +import pty +import re +import select +import shutil +import sqlite3 +import struct +import subprocess +import sys +import tempfile +import termios +import time +import urllib.error +import urllib.parse +import urllib.request +from pathlib import Path +from typing import Optional + +# ── Chrome cookie decryption ────────────────────────────────────────────────── + +def _chrome_decrypt(encrypted: bytes, key: bytes) -> str: + """Decrypt a Chromium v10/v11 AES-CBC-128 encrypted cookie value.""" + if not encrypted: + return "" + if encrypted[:3] in (b"v10", b"v11"): + payload = encrypted[3:] + iv = b" " * 16 + # Try python3-cryptography (system package, widely available) + try: + from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes + from cryptography.hazmat.backends import default_backend + cipher = Cipher(algorithms.AES(key), modes.CBC(iv), backend=default_backend()) + dec = cipher.decryptor() + decrypted = dec.update(payload) + dec.finalize() + pad = decrypted[-1] + return decrypted[:-pad].decode("utf-8", errors="replace") + except Exception: + pass + # Fallback: python3-pycryptodome + try: + from Crypto.Cipher import AES + cipher = AES.new(key, AES.MODE_CBC, iv) + decrypted = cipher.decrypt(payload) + pad = decrypted[-1] + return decrypted[:-pad].decode("utf-8", errors="replace") + except Exception: + pass + # unencrypted plain text + try: + return encrypted.decode("utf-8", errors="replace") + except Exception: + return "" + + +def _get_chrome_key() -> Optional[bytes]: + """Derive the AES key for Chrome v10 cookies on Linux (PBKDF2 + 'peanuts' or secretstorage).""" + # On Linux without a keyring, Chrome uses a hardcoded salt/password "peanuts" + password = b"peanuts" + try: + import secretstorage + bus = secretstorage.dbus_init() + col = secretstorage.get_default_collection(bus) + for item in col.get_all_items(): + if "Chrome" in (item.get_label() or "") or "Chromium" in (item.get_label() or ""): + password = item.get_secret() + break + except Exception: + pass + key = hashlib.pbkdf2_hmac("sha1", password, b"saltysalt", 1, dklen=16) + return key + + +# ── Browser profile discovery ───────────────────────────────────────────────── + +_HOME = Path.home() + +CHROME_PROFILES = [ + _HOME / ".config/google-chrome", + _HOME / ".config/chromium", + _HOME / ".config/microsoft-edge", + _HOME / ".config/BraveSoftware/Brave-Browser", + _HOME / ".config/vivaldi", +] + +FIREFOX_PROFILES = [ + _HOME / ".mozilla/firefox", +] + + +def _chromium_profile_dirs() -> list[Path]: + dirs = [] + for base in CHROME_PROFILES: + if not base.exists(): + continue + for entry in base.iterdir(): + if entry.name in ("Default",) or entry.name.startswith("Profile "): + if (entry / "Cookies").exists(): + dirs.append(entry) + return dirs + + +def _read_chrome_cookies(domains: list[str]) -> dict[str, str]: + """Return {name: value} scanning ALL Chrome profiles; prefer the one with the most cookies.""" + key = _get_chrome_key() + best: dict[str, str] = {} + + for profile in _chromium_profile_dirs(): + cookie_db = profile / "Cookies" + if not cookie_db.exists(): + continue + with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as tmp: + tmp_path = tmp.name + result: dict[str, str] = {} + try: + shutil.copy2(str(cookie_db), tmp_path) + con = sqlite3.connect(tmp_path) + con.row_factory = sqlite3.Row + cur = con.cursor() + # match "example.com", ".example.com", and subdomain variants via LIKE + like_clauses = " OR ".join( + ["host_key = ? OR host_key = ? OR host_key LIKE ?"] * len(domains) + ) + params = [] + for d in domains: + bare = d.lstrip(".") + params.extend([bare, "." + bare, "%." + bare]) + cur.execute( + f"SELECT name, value, encrypted_value FROM cookies WHERE {like_clauses}", + params, + ) + for row in cur.fetchall(): + name = row["name"] + val = row["value"] + if not val and row["encrypted_value"] and key: + val = _chrome_decrypt(bytes(row["encrypted_value"]), key) + if val: + result[name] = val + con.close() + except Exception: + pass + finally: + try: + os.unlink(tmp_path) + except Exception: + pass + # Keep the profile with the most cookies (most likely the active one) + if len(result) > len(best): + best = result + return best + + +def _read_firefox_cookies(domains: list[str]) -> dict[str, str]: + """Return {name: value} from Firefox for any of `domains`.""" + result: dict[str, str] = {} + for base in FIREFOX_PROFILES: + if not base.exists(): + continue + for profile_dir in base.iterdir(): + cookie_db = profile_dir / "cookies.sqlite" + if not cookie_db.exists(): + continue + with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as tmp: + tmp_path = tmp.name + try: + shutil.copy2(str(cookie_db), tmp_path) + con = sqlite3.connect(tmp_path) + cur = con.cursor() + like_clauses = " OR ".join(["host = ? OR host = ?"] * len(domains)) + params = [] + for d in domains: + params.extend([d, "." + d.lstrip(".")]) + cur.execute( + f"SELECT name, value FROM moz_cookies WHERE {like_clauses}", params + ) + for name, val in cur.fetchall(): + if val: + result[name] = val + con.close() + except Exception: + pass + finally: + try: + os.unlink(tmp_path) + except Exception: + pass + if result: + return result + return result + + +def get_cookies(domains: list[str]) -> dict[str, str]: + """Try Chrome first, then Firefox.""" + c = _read_chrome_cookies(domains) + if c: + return c + return _read_firefox_cookies(domains) + + +def cookie_header(cookies: dict[str, str], names: Optional[list[str]] = None) -> str: + """Format a Cookie: header. If `names` given, only include those.""" + items = cookies.items() if names is None else ( + (n, cookies[n]) for n in names if n in cookies + ) + return "; ".join(f"{n}={v}" for n, v in items) + + +# ── Windsurf LevelDB local-storage reader ───────────────────────────────────── + +def _read_windsurf_localstorage() -> Optional[dict[str, str]]: + """ + Read Windsurf devin session tokens from Chrome LevelDB local storage. + Returns dict with keys: devin_session_token, devin_auth1_token, + devin_account_id, devin_primary_org_id — or None if not found. + """ + TARGET_KEYS = { + "devin_session_token", + "devin_auth1_token", + "devin_account_id", + "devin_primary_org_id", + } + + for profile in _chromium_profile_dirs(): + leveldb_path = profile / "Local Storage" / "leveldb" + if not leveldb_path.exists(): + continue + result: dict[str, str] = {} + try: + # Read .ldb and .log files — LevelDB stores entries as key=value + # Key format for local storage: "\x00https://windsurf.com/\x00KEY" + for fname in sorted(leveldb_path.iterdir()): + if fname.suffix not in (".ldb", ".log"): + continue + try: + data = fname.read_bytes() + for key in TARGET_KEYS: + if key in result: + continue + needle = key.encode() + idx = 0 + while True: + pos = data.find(needle, idx) + if pos == -1: + break + rest = data[pos + len(needle):] + m = re.search(rb'"([^"]{8,})"', rest[:256]) + if m: + val = m.group(1).decode("utf-8", errors="replace") + if len(val) > 8: + result[key] = val + break + idx = pos + 1 + except Exception: + pass + except Exception: + pass + if len(result) >= 2 and "devin_session_token" in result: + return result + return None + + +# ── HTTP helpers ────────────────────────────────────────────────────────────── + +def _req(url: str, *, method: str = "GET", headers: dict = {}, body: Optional[bytes] = None, timeout: int = 10): + req = urllib.request.Request(url, data=body, method=method) + for k, v in headers.items(): + req.add_header(k, v) + try: + with urllib.request.urlopen(req, timeout=timeout) as r: + return r.status, json.loads(r.read()) + except urllib.error.HTTPError as e: + try: + return e.code, json.loads(e.read()) + except Exception: + return e.code, {} + except Exception as e: + return None, {"error": str(e)} + + +def _now_iso() -> str: + return datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ") + + +# ── Payload builder ─────────────────────────────────────────────────────────── + +def _rate_window(used_pct: float, resets_at: Optional[str], label: Optional[str] = None, + window_minutes: Optional[int] = None) -> dict: + w = { + "usedPercent": round(min(100, max(0, used_pct)), 2), + "resetsAt": resets_at, + } + if window_minutes is not None: + w["windowMinutes"] = window_minutes + return w + + +def _payload(provider: str, *, primary=None, secondary=None, tertiary=None, + extra_windows=None, identity=None, provider_cost=None, + credits=None, error_msg: Optional[str] = None) -> dict: + if error_msg: + return { + "provider": provider, + "error": {"message": error_msg}, + "usage": None, + } + usage: dict = {"updatedAt": _now_iso()} + if identity: + usage["identity"] = identity + if primary: + usage["primary"] = primary + if secondary: + usage["secondary"] = secondary + if tertiary: + usage["tertiary"] = tertiary + if extra_windows: + usage["extraRateWindows"] = extra_windows + if provider_cost: + usage["providerCost"] = provider_cost + p = {"provider": provider, "usage": usage, "error": None} + if credits is not None: + p["credits"] = credits + return p + + +def _unix_to_iso(ts: int) -> str: + return datetime.datetime.utcfromtimestamp(ts).strftime("%Y-%m-%dT%H:%M:%SZ") + + +# ── Codex CLI PTY probe (mirrors macOS TTYCommandRunner logic) ──────────────── + +def _codex_cli_binary() -> Optional[str]: + """Find the codex CLI binary, checking common npm global locations.""" + for candidate in ["codex", "/usr/local/bin/codex", "/usr/bin/codex"]: + try: + result = subprocess.run( + ["which", candidate] if "/" not in candidate else ["test", "-x", candidate], + capture_output=True, timeout=3, + ) + if result.returncode == 0: + if "/" not in candidate: + return result.stdout.decode().strip() + return candidate + except Exception: + pass + # fallback: look in npm global bin + try: + result = subprocess.run(["npm", "root", "-g"], capture_output=True, text=True, timeout=5) + npm_root = result.stdout.strip() + candidate = str(Path(npm_root).parent / "bin" / "codex") + if Path(candidate).exists(): + return candidate + except Exception: + pass + return None + + +def _fetch_codex_via_cli(timeout_secs: float = 30.0) -> Optional[dict]: + """ + Spawn the codex CLI in a PTY, send /status, parse the output. + Returns a payload dict on success, None if CLI not found. + Raises ValueError with an error message on auth/parse failure. + """ + binary = _codex_cli_binary() + if not binary: + return None + + ROWS, COLS = 60, 200 + + master, slave = pty.openpty() + ws = struct.pack('HHHH', ROWS, COLS, 0, 0) + fcntl.ioctl(slave, termios.TIOCSWINSZ, ws) + + env = os.environ.copy() + env['TERM'] = 'xterm-256color' + env['COLORTERM'] = 'truecolor' + env['COLUMNS'] = str(COLS) + env['LINES'] = str(ROWS) + env.setdefault('LANG', 'en_US.UTF-8') + + proc = subprocess.Popen( + [binary], stdin=slave, stdout=slave, stderr=slave, + close_fds=True, env=env, cwd=str(Path.home()), + ) + os.close(slave) + + STATUS_MARKERS = [b'Credits:', b'5h limit', b'5-hour limit', b'Weekly limit'] + raw = b'' + sent_status = False + saw_status = False + last_enter = 0.0 + script_sent_at = 0.0 + enter_retries = 0 + resend_retries = 0 + startup_delay = 10.0 # seconds to wait before sending /status + deadline = time.monotonic() + timeout_secs + start = time.monotonic() + + try: + while time.monotonic() < deadline: + r, _, _ = select.select([master], [], [], 0.1) + if r: + try: + raw += os.read(master, 16384) + except OSError: + break + + now = time.monotonic() + elapsed = now - start + + if not sent_status: + if elapsed >= startup_delay: + os.write(master, b'/status\r') + sent_status = True + script_sent_at = now + last_enter = now + else: + if any(m in raw for m in STATUS_MARKERS): + saw_status = True + settle = time.monotonic() + 2.0 + while time.monotonic() < settle: + r, _, _ = select.select([master], [], [], 0.1) + if r: + try: + raw += os.read(master, 16384) + except OSError: + pass + break + + if now - last_enter >= 1.2 and enter_retries < 6: + os.write(master, b'\r') + enter_retries += 1 + last_enter = now + + if now - script_sent_at >= 3.0 and resend_retries < 2: + os.write(master, b'/status\r') + resend_retries += 1 + script_sent_at = now + last_enter = now + finally: + try: + os.write(master, b'/exit\r') + except OSError: + pass + proc.terminate() + try: + proc.wait(timeout=2) + except subprocess.TimeoutExpired: + proc.kill() + try: + os.close(master) + except OSError: + pass + + if not saw_status: + raise ValueError("Codex CLI /status timed out — make sure `codex` is installed and logged in.") + + text = raw.decode('utf-8', errors='replace') + # Strip ANSI escape sequences; treat \r as whitespace (not newline) + clean = re.sub(r'\x1b\[[0-9;?]*[a-zA-Z~]', ' ', text) + clean = re.sub(r'\x1b\][^\x07]*\x07', '', clean) + clean = re.sub(r'\x1b[^[]', '', clean) + clean = re.sub(r'\r', ' ', clean) # carriage returns → space + clean = re.sub(r'[^\S\n]+', ' ', clean) # collapse horizontal whitespace + + return _parse_codex_cli_status(clean) + + +def _parse_reset_date(reset_str: str) -> Optional[str]: + """Parse 'HH:MM on DD Mon' or 'HH:MM on Mon DD' into ISO timestamp.""" + if not reset_str: + return None + s = reset_str.strip().strip('()') + now = datetime.datetime.utcnow() + # Try "HH:MM on DD Mon" e.g. "01:00 on 17 May" + m = re.match(r'(\d{1,2}:\d{2})\s+on\s+(\d{1,2}\s+\w+)', s) + if not m: + m = re.match(r'(\d{1,2}:\d{2})\s+on\s+(\w+\s+\d{1,2})', s) + if m: + try: + time_part, date_part = m.group(1), m.group(2) + for fmt in ('%H:%M %d %b', '%H:%M %b %d'): + try: + dt = datetime.datetime.strptime(f"{time_part} {date_part}", fmt) + dt = dt.replace(year=now.year) + if dt < now: + dt = dt.replace(year=now.year + 1) + return dt.strftime('%Y-%m-%dT%H:%M:%SZ') + except ValueError: + continue + except Exception: + pass + # Try bare "HH:MM" + m = re.match(r'^(\d{1,2}:\d{2})$', s) + if m: + try: + t = datetime.datetime.strptime(m.group(1), '%H:%M') + dt = now.replace(hour=t.hour, minute=t.minute, second=0, microsecond=0) + if dt < now: + dt += datetime.timedelta(days=1) + return dt.strftime('%Y-%m-%dT%H:%M:%SZ') + except Exception: + pass + return None + + +def _parse_codex_cli_status(text: str) -> dict: + """Parse the /status output from codex CLI into a CodexBar payload.""" + # Collapse all whitespace (including newlines) into single spaces so that + # the TUI's cursor-position-split lines merge back into searchable runs. + flat = re.sub(r'\s+', ' ', text) + + credits = None + m = re.search(r'Credits:\s*([0-9][0-9.,]*)', flat) + if m: + try: + credits = float(m.group(1).replace(',', '')) + except ValueError: + pass + + def parse_limit_line(pattern: str): + # Wrap pattern in a group so .{0,200} applies to the whole alternation + line_m = re.search(r'(?:' + pattern + r').{0,200}', flat, re.IGNORECASE) + if not line_m: + return None, None + segment = line_m.group(0) + pct_left_m = re.search(r'(\d+)%\s*left', segment) + reset_m = re.search(r'resets?\s+([\d:]+(?:\s+on\s+[\d\w ]+)?)', segment, re.IGNORECASE) + if not reset_m: + reset_m = re.search(r'\(([^)]+)\)', segment) + pct_left = int(pct_left_m.group(1)) if pct_left_m else None + pct_used = (100 - pct_left) if pct_left is not None else None + reset_str = reset_m.group(1) if reset_m else None + resets_at = _parse_reset_date(reset_str) if reset_str else None + return pct_used, resets_at + + five_used, five_resets = parse_limit_line(r'5h limit:|5-hour limit:') + weekly_used, weekly_resets = parse_limit_line(r'Weekly limit:') + + account_m = re.search(r'Account:\s*(\S+)', flat) + email = account_m.group(1).strip() if account_m else '' + + primary = _rate_window(five_used or 0, five_resets, window_minutes=5*60) if five_used is not None else None + secondary = _rate_window(weekly_used or 0, weekly_resets, window_minutes=7*24*60) if weekly_used is not None else None + + if primary is None and secondary is None and credits is None: + raise ValueError("Could not parse Codex CLI /status output.") + + return _payload( + "codex", + primary=primary, + secondary=secondary, + identity={"displayName": "Codex", "email": email, + "sessionLabel": "5h limit", "weeklyLabel": "Weekly"}, + credits={"remaining": credits} if credits is not None else None, + ) + + +# ── Provider: Codex (OpenAI) ────────────────────────────────────────────────── + +def _load_config() -> dict: + """Load ~/.codexbar/config.json, returning {} on missing/error.""" + config_path = Path.home() / ".codexbar" / "config.json" + try: + return json.loads(config_path.read_text()) + except Exception: + return {} + + +def _provider_config(provider_id: str) -> dict: + """Return the config block for a specific provider, or {}.""" + cfg = _load_config() + for p in cfg.get("providers", []): + if p.get("id") == provider_id: + return p + return {} + + +def _read_codex_cli_token() -> Optional[str]: + """Read the access_token from ~/.codex/auth.json (written by `codex login`).""" + auth_path = Path.home() / ".codex" / "auth.json" + try: + data = json.loads(auth_path.read_text()) + tokens = data.get("tokens") or {} + return tokens.get("access_token") or tokens.get("id_token") or None + except Exception: + return None + + +def _fetch_codex_with_bearer(bearer_token: str) -> dict: + hdrs = { + "Authorization": f"Bearer {bearer_token}", + "User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/148.0.0.0 Safari/537.36", + "Referer": "https://chatgpt.com/", + } + status, data = _req("https://chatgpt.com/backend-api/wham/usage", headers=hdrs) + if status == 401: + return _payload("codex", error_msg=( + "Codex token expired. Run `codex login` to refresh, or update " + "providers[id=codex].bearerToken in ~/.codexbar/config.json." + )) + if status != 200: + return _payload("codex", error_msg=f"Failed to fetch Codex usage (HTTP {status}).") + + email = data.get("email", "") + rate_limit = data.get("rate_limit") or {} + primary_w = rate_limit.get("primary_window") or data.get("primary_window") or {} + secondary_w = rate_limit.get("secondary_window") or data.get("secondary_window") or {} + + def parse_window(w: dict) -> Optional[dict]: + if not w: + return None + pct = w.get("used_percent", 0) + reset_at = w.get("reset_at") + resets_iso = _unix_to_iso(reset_at) if reset_at else None + limit_secs = w.get("limit_window_seconds", 0) + return _rate_window(pct, resets_iso, window_minutes=limit_secs // 60 if limit_secs else None) + + credits_data = data.get("credits") or {} + try: + balance = float(credits_data.get("balance", "0")) + except (TypeError, ValueError): + balance = 0.0 + + return _payload( + "codex", + primary=parse_window(primary_w), + secondary=parse_window(secondary_w), + identity={"displayName": "Codex", "email": email}, + credits={"remaining": balance} if balance else None, + ) + + +def fetch_codex() -> dict: + pcfg = _provider_config("codex") + + # 1. Explicit bearerToken in ~/.codexbar/config.json — highest precedence. + bearer_token = pcfg.get("bearerToken") or pcfg.get("bearer_token") + if bearer_token and not bearer_token.startswith("PASTE_"): + return _fetch_codex_with_bearer(bearer_token) + + # 2. Access token from `codex login` (~/.codex/auth.json) — automatic, no setup needed. + cli_token = _read_codex_cli_token() + if cli_token: + result = _fetch_codex_with_bearer(cli_token) + # If expired, fall through to cookie path rather than surfacing a confusing error + if result.get("error") is None: + return result + + # 3. Cookie fallback (Firefox / Chrome pre-127). + DOMAINS = ["chatgpt.com", "openai.com"] + cookies = get_cookies(DOMAINS) + if not cookies: + return _payload("codex", error_msg=( + "No Codex auth found. Run `codex login` (npm i -g @openai/codex) " + "or add providers[id=codex].bearerToken to ~/.codexbar/config.json." + )) + ch = cookie_header(cookies) + + hdrs = { + "Cookie": ch, + "User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/148.0.0.0 Safari/537.36", + "Referer": "https://chatgpt.com/", + } + + # Verify login + status, me = _req("https://chatgpt.com/backend-api/me", headers=hdrs) + if status != 200: + return _payload("codex", error_msg=( + f"Not signed in to chatgpt.com (HTTP {status}). " + "Add bearerToken to ~/.codexbar/config.json (providers[id=codex].bearerToken) " + "or sign in via Firefox." + )) + + email = me.get("email", "") + + # Fetch usage — API returns data under rate_limit.* + status, data = _req("https://chatgpt.com/backend-api/wham/usage", headers=hdrs) + if status != 200: + return _payload("codex", error_msg=f"Failed to fetch Codex usage (HTTP {status}).") + + # Support both legacy top-level shape and current rate_limit-nested shape + rate_limit = data.get("rate_limit") or {} + primary_w = rate_limit.get("primary_window") or data.get("primary_window") or {} + secondary_w = rate_limit.get("secondary_window") or data.get("secondary_window") or {} + + def parse_window(w: dict) -> Optional[dict]: + if not w: + return None + pct = w.get("used_percent", 0) + reset_at = w.get("reset_at") + resets_iso = _unix_to_iso(reset_at) if reset_at else None + limit_secs = w.get("limit_window_seconds", 0) + return _rate_window(pct, resets_iso, window_minutes=limit_secs // 60 if limit_secs else None) + + # Credits from data.credits + credits_data = data.get("credits") or {} + balance_raw = credits_data.get("balance", "0") + try: + balance = float(balance_raw) + except (TypeError, ValueError): + balance = 0.0 + credits = {"remaining": balance} if credits_data else None + + return _payload( + "codex", + primary=parse_window(primary_w), + secondary=parse_window(secondary_w), + identity={"displayName": "Codex", "email": email}, + credits=credits if balance else None, + ) + + +# ── Provider: Cursor ────────────────────────────────────────────────────────── + +def fetch_cursor() -> dict: + # Allow a manually stored WorkOS session token for Chrome app-bound encryption workaround. + pcfg = _provider_config("cursor") + manual_token = pcfg.get("sessionToken") or pcfg.get("session_token") + if manual_token and manual_token.startswith("PASTE_"): + manual_token = None + + if manual_token: + ch = f"WorkosCursorSessionToken={manual_token}" + else: + DOMAINS = ["cursor.com", "www.cursor.com"] + cookies = get_cookies(DOMAINS) + if not cookies: + return _payload("cursor", error_msg=( + "No cursor.com cookies found. Chrome 127+ encrypts cookies with app-bound keys. " + "Add your WorkosCursorSessionToken to ~/.codexbar/config.json under providers[id=cursor].sessionToken, " + "or sign in to cursor.com in Firefox." + )) + ch = cookie_header(cookies) + + base = "https://cursor.com" + hdrs = { + "Cookie": ch, + "User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/148.0.0.0 Safari/537.36", + "Referer": "https://cursor.com/dashboard", + } + + # Get user info + status, me = _req(f"{base}/api/auth/me", headers=hdrs) + if status != 200: + return _payload("cursor", error_msg=f"Not signed in to cursor.com (HTTP {status}).") + + email = me.get("email", "") + user_id = me.get("id", "") + + # Fetch usage summary (newer endpoint) + status, data = _req(f"{base}/api/usage-summary", headers=hdrs) + if status != 200: + return _payload("cursor", error_msg=f"Failed to fetch Cursor usage (HTTP {status}).") + + # Parse response — mirrors CursorStatusProbe precedence logic. + # Current API nests usage under individualUsage.plan; top-level percent fields are + # present on older accounts but absent on newer ones. + ind = data.get("individualUsage") or {} + plan = ind.get("plan") or {} + total_pct = ( + plan.get("totalPercentUsed") + or data.get("planPercentUsed") + or data.get("total_percent_used") + or 0 + ) + def _first_not_none(*vals): + for v in vals: + if v is not None: + return v + return None + + auto_pct = _first_not_none( + plan.get("autoPercentUsed"), + data.get("autoComposerPercentUsed"), + data.get("auto_composer_percent"), + ) + api_pct = _first_not_none( + plan.get("apiPercentUsed"), + data.get("apiPercentUsed"), + data.get("api_percent_used"), + ) + cycle_end = data.get("billingCycleEnd") or data.get("billing_cycle_end") + if isinstance(cycle_end, str): + # Strip sub-second precision so all JS engines accept new Date(resets_at). + resets_at = re.sub(r"\.\d+Z$", "Z", cycle_end) + elif cycle_end: + resets_at = _unix_to_iso(int(cycle_end)) + else: + resets_at = None + + # On-demand cost + on_demand_usd = data.get("currentUsageUsd") or data.get("on_demand_usage_usd") or 0 + hard_limit = data.get("hardLimitUsd") or data.get("hard_limit_usd") or 0 + provider_cost = None + if on_demand_usd or hard_limit: + provider_cost = { + "used": float(on_demand_usd), + "limit": float(hard_limit), + "currencyCode": "USD", + "period": "monthly", + "resetsAt": resets_at, + } + + primary = _rate_window(total_pct, resets_at) + secondary = _rate_window(api_pct, resets_at) if api_pct is not None else None + tertiary = None + + return _payload( + "cursor", + primary=primary, + secondary=secondary, + tertiary=tertiary, + identity={"displayName": "Cursor", "email": email, "sessionLabel": "Plan", "weeklyLabel": "API"}, + provider_cost=provider_cost, + ) + + +# ── Provider: Windsurf ──────────────────────────────────────────────────────── + +def fetch_windsurf() -> dict: + storage = _read_windsurf_localstorage() + if not storage: + return _payload("windsurf", error_msg="No Windsurf session found in browser local storage. Sign in to windsurf.com in Chrome.") + + session_token = storage.get("devin_session_token", "") + auth1_token = storage.get("devin_auth1_token", "") + account_id = storage.get("devin_account_id", "") + org_id = storage.get("devin_primary_org_id", "") + + if not session_token: + return _payload("windsurf", error_msg="Windsurf session token not found in local storage.") + + url = "https://windsurf.com/_backend/exa.seat_management_pb.SeatManagementService/GetPlanStatus" + body = b"\x00\x00\x00\x00\x00" # empty protobuf message (grpc-web) + hdrs = { + "Content-Type": "application/grpc-web+proto", + "X-Grpc-Web": "1", + "X-Devin-Session-Token": session_token, + "X-Devin-Auth1-Token": auth1_token, + "X-Devin-Account-Id": account_id, + "X-Devin-Primary-Org-Id": org_id, + "Origin": "https://windsurf.com", + "Referer": "https://windsurf.com/", + } + + try: + req = urllib.request.Request(url, data=body, method="POST") + for k, v in hdrs.items(): + req.add_header(k, v) + with urllib.request.urlopen(req, timeout=10) as r: + raw = r.read() + # Response is grpc-web framed binary; parse remaining usage from protobuf + # Field 1 (daily remaining), field 2 (weekly remaining) as varint + # Simple heuristic: scan for the two small integers in the response + # For now, report "connected" with unknown usage until full proto decode + daily_remaining = _parse_windsurf_remaining(raw, field=1) + weekly_remaining = _parse_windsurf_remaining(raw, field=2) + + # Convert "remaining" to "used percent" — Windsurf free plan: 100 daily credits + DAILY_LIMIT = 100 + WEEKLY_LIMIT = 500 + daily_pct = max(0, 100 - (daily_remaining / DAILY_LIMIT * 100)) if daily_remaining is not None else None + weekly_pct = max(0, 100 - (weekly_remaining / WEEKLY_LIMIT * 100)) if weekly_remaining is not None else None + + # Reset times: daily at midnight UTC, weekly next monday + now = datetime.datetime.utcnow() + next_day = (now + datetime.timedelta(days=1)).replace(hour=0, minute=0, second=0, microsecond=0) + days_to_monday = (7 - now.weekday()) % 7 or 7 + next_monday = (now + datetime.timedelta(days=days_to_monday)).replace(hour=0, minute=0, second=0, microsecond=0) + + return _payload( + "windsurf", + primary=_rate_window(daily_pct if daily_pct is not None else 0, + next_day.strftime("%Y-%m-%dT%H:%M:%SZ"), + window_minutes=60 * 24), + secondary=_rate_window(weekly_pct if weekly_pct is not None else 0, + next_monday.strftime("%Y-%m-%dT%H:%M:%SZ"), + window_minutes=60 * 24 * 7), + identity={"displayName": "Windsurf", "sessionLabel": "Daily", "weeklyLabel": "Weekly"}, + ) + except Exception as e: + return _payload("windsurf", error_msg=f"Windsurf API error: {e}") + + +def _parse_windsurf_remaining(data: bytes, field: int) -> Optional[int]: + """Very simple protobuf varint reader for a specific field number.""" + try: + i = 0 + # Skip grpc-web 5-byte frame header + if len(data) > 5 and data[0] == 0: + i = 5 + while i < len(data): + tag_byte = data[i]; i += 1 + field_num = tag_byte >> 3 + wire_type = tag_byte & 0x7 + if wire_type == 0: # varint + val = 0; shift = 0 + while i < len(data): + b = data[i]; i += 1 + val |= (b & 0x7F) << shift + if not (b & 0x80): + break + shift += 7 + if field_num == field: + return val + elif wire_type == 2: # length-delimited + ln = data[i]; i += 1 + i += ln + else: + break + except Exception: + pass + return None + + +# ── Provider: Perplexity ────────────────────────────────────────────────────── + +def fetch_perplexity() -> dict: + DOMAINS = ["perplexity.ai", "www.perplexity.ai"] + cookies = get_cookies(DOMAINS) + if not cookies: + return _payload("perplexity", error_msg="No perplexity.ai cookies found. Sign in to perplexity.ai in your browser.") + + ch = cookie_header(cookies) + hdrs = { + "Cookie": ch, + "User-Agent": "Mozilla/5.0", + "Origin": "https://www.perplexity.ai", + "Referer": "https://www.perplexity.ai/account/usage", + } + + status, data = _req( + "https://www.perplexity.ai/rest/billing/credits?version=2.18&source=default", + headers=hdrs, + ) + if status != 200: + return _payload("perplexity", error_msg=f"Not signed in to perplexity.ai (HTTP {status}).") + + # Parse credits response + remaining = data.get("remaining") or data.get("credits_remaining") or 0 + total = data.get("total") or data.get("credits_total") or 0 + reset_at = data.get("reset_at") or data.get("resetsAt") + resets_iso = None + if isinstance(reset_at, (int, float)): + resets_iso = _unix_to_iso(int(reset_at)) + elif isinstance(reset_at, str): + resets_iso = reset_at + + used = total - remaining if total else 0 + pct = (used / total * 100) if total > 0 else 0 + + return _payload( + "perplexity", + primary=_rate_window(pct, resets_iso), + identity={"displayName": "Perplexity"}, + credits={"remaining": float(remaining)}, + ) + + +# ── Provider: Mistral ───────────────────────────────────────────────────────── + +def fetch_mistral() -> dict: + DOMAINS = ["mistral.ai", "admin.mistral.ai", "auth.mistral.ai"] + cookies = get_cookies(DOMAINS) + if not cookies: + return _payload("mistral", error_msg="No mistral.ai cookies found. Sign in to mistral.ai in your browser.") + + # Find ory_session_* cookie + ory_cookie = next((f"{n}={v}" for n, v in cookies.items() if n.startswith("ory_session_")), None) + csrf = cookies.get("csrftoken", "") + if not ory_cookie: + return _payload("mistral", error_msg="Mistral session cookie (ory_session_*) not found. Sign in to console.mistral.ai.") + + now = datetime.datetime.utcnow() + url = f"https://admin.mistral.ai/api/billing/v2/usage?month={now.month}&year={now.year}" + hdrs = { + "Cookie": ory_cookie + (f"; csrftoken={csrf}" if csrf else ""), + "X-CSRFTOKEN": csrf, + "Origin": "https://admin.mistral.ai", + "Referer": "https://admin.mistral.ai/organization/usage", + "User-Agent": "Mozilla/5.0", + } + + status, data = _req(url, headers=hdrs) + if status != 200: + return _payload("mistral", error_msg=f"Not signed in to admin.mistral.ai (HTTP {status}).") + + used = data.get("total_tokens_used") or data.get("used") or 0 + limit = data.get("token_limit") or data.get("limit") or 0 + pct = (used / limit * 100) if limit > 0 else 0 + cost_used = data.get("total_cost_usd") or data.get("cost_usd") or 0 + cost_limit = data.get("cost_limit_usd") or data.get("spending_limit") or 0 + + next_month = (now.replace(day=1) + datetime.timedelta(days=32)).replace(day=1) + resets_iso = next_month.strftime("%Y-%m-%dT00:00:00Z") + + provider_cost = None + if cost_used or cost_limit: + provider_cost = {"used": float(cost_used), "limit": float(cost_limit), "currencyCode": "USD", "period": "monthly", "resetsAt": resets_iso} + + return _payload( + "mistral", + primary=_rate_window(pct, resets_iso), + identity={"displayName": "Mistral"}, + provider_cost=provider_cost, + ) + + +# ── Provider: Kimi ──────────────────────────────────────────────────────────── + +def fetch_kimi() -> dict: + DOMAINS = ["kimi.com", "www.kimi.com"] + cookies = get_cookies(DOMAINS) + if not cookies: + return _payload("kimi", error_msg="No kimi.com cookies found. Sign in to kimi.com in your browser.") + + kimi_auth = cookies.get("kimi-auth", "") + if not kimi_auth: + return _payload("kimi", error_msg="kimi-auth cookie not found. Sign in to kimi.com/code.") + + body = json.dumps({"scope": ["FEATURE_CODING"]}).encode() + hdrs = { + "Authorization": f"Bearer {kimi_auth}", + "Cookie": f"kimi-auth={kimi_auth}", + "Content-Type": "application/json", + "Origin": "https://www.kimi.com", + "Referer": "https://www.kimi.com/code/console", + "connect-protocol-version": "1", + "User-Agent": "Mozilla/5.0", + } + + status, data = _req( + "https://www.kimi.com/apiv2/kimi.gateway.billing.v1.BillingService/GetUsages", + method="POST", + headers=hdrs, + body=body, + ) + if status != 200: + return _payload("kimi", error_msg=f"Failed to fetch Kimi usage (HTTP {status}).") + + # Parse response + usages = data.get("usages") or [] + primary = None + for u in usages: + scope = u.get("scope", "") + if "CODING" in scope or not primary: + used = u.get("used", 0) + total = u.get("total", 0) + pct = (used / total * 100) if total > 0 else 0 + reset_ts = u.get("reset_at") or u.get("resetAt") + resets_iso = _unix_to_iso(int(reset_ts)) if reset_ts else None + primary = _rate_window(pct, resets_iso) + break + + if not primary: + primary = _rate_window(0, None) + + return _payload( + "kimi", + primary=primary, + identity={"displayName": "Kimi"}, + ) + + +# ── Provider: Abacus ────────────────────────────────────────────────────────── + +def fetch_abacus() -> dict: + DOMAINS = ["abacus.ai", "apps.abacus.ai"] + cookies = get_cookies(DOMAINS) + if not cookies: + return _payload("abacus", error_msg="No abacus.ai cookies found. Sign in to apps.abacus.ai in your browser.") + + ch = cookie_header(cookies) + hdrs = { + "Cookie": ch, + "User-Agent": "Mozilla/5.0", + "Origin": "https://apps.abacus.ai", + "Referer": "https://apps.abacus.ai/", + } + + status, data = _req("https://apps.abacus.ai/api/_getOrganizationComputePoints", headers=hdrs) + if status != 200: + return _payload("abacus", error_msg=f"Not signed in to abacus.ai (HTTP {status}).") + + used = data.get("used") or data.get("compute_points_used") or 0 + total = data.get("total") or data.get("compute_points_total") or 0 + pct = (used / total * 100) if total > 0 else 0 + reset_at = data.get("reset_at") or data.get("resetsAt") + resets_iso = _unix_to_iso(int(reset_at)) if isinstance(reset_at, (int, float)) else reset_at + + return _payload( + "abacus", + primary=_rate_window(pct, resets_iso), + identity={"displayName": "Abacus"}, + ) + + +# ── Provider: MiniMax ───────────────────────────────────────────────────────── + +def fetch_minimax() -> dict: + DOMAINS = ["platform.minimax.io", "minimax.io", "platform.minimaxi.com", "minimaxi.com"] + cookies = get_cookies(DOMAINS) + if not cookies: + return _payload("minimax", error_msg="No minimax.io cookies found. Sign in to platform.minimax.io in your browser.") + + ch = cookie_header(cookies) + hdrs = { + "Cookie": ch, + "User-Agent": "Mozilla/5.0", + "Origin": "https://platform.minimax.io", + "Referer": "https://platform.minimax.io/", + } + + status, data = _req( + "https://platform.minimax.io/v1/api/openplatform/coding_plan/remains", + headers=hdrs, + ) + if status != 200: + return _payload("minimax", error_msg=f"Not signed in to minimax.io (HTTP {status}).") + + remaining = data.get("remaining") or data.get("tokens_remaining") or 0 + total = data.get("total") or data.get("tokens_total") or 0 + pct = max(0, 100 - (remaining / total * 100)) if total > 0 else 0 + reset_at = data.get("reset_at") or data.get("resetsAt") + resets_iso = _unix_to_iso(int(reset_at)) if isinstance(reset_at, (int, float)) else reset_at + + return _payload( + "minimax", + primary=_rate_window(pct, resets_iso), + identity={"displayName": "MiniMax"}, + credits={"remaining": float(remaining)}, + ) + + +# ── Dispatcher ──────────────────────────────────────────────────────────────── + +FETCHERS = { + "codex": fetch_codex, + "cursor": fetch_cursor, + "windsurf": fetch_windsurf, + "perplexity": fetch_perplexity, + "mistral": fetch_mistral, + "kimi": fetch_kimi, + "abacus": fetch_abacus, + "minimax": fetch_minimax, +} + +ALL_PROVIDERS = list(FETCHERS.keys()) + + +def run(providers: list[str]) -> list[dict]: + results = [] + for p in providers: + fn = FETCHERS.get(p) + if fn is None: + continue + try: + results.append(fn()) + except Exception as e: + results.append(_payload(p, error_msg=f"Unexpected error: {e}")) + return results + + +def main(): + ap = argparse.ArgumentParser(description="CodexBar Linux cookie fetcher") + ap.add_argument( + "--providers", + default=",".join(ALL_PROVIDERS), + help="Comma-separated list of providers to fetch", + ) + args = ap.parse_args() + providers = [p.strip() for p in args.providers.split(",") if p.strip()] + results = run(providers) + print(json.dumps(results, indent=2)) + + +if __name__ == "__main__": + main() diff --git a/bin/codexbar-tray b/bin/codexbar-tray new file mode 100755 index 000000000..4e3a8598f --- /dev/null +++ b/bin/codexbar-tray @@ -0,0 +1,995 @@ +#!/usr/bin/env python3 +""" +CodexBar system tray daemon for Linux (GNOME/Ubuntu). + +Starts `codexbar serve` in the background, puts an icon in the top bar, +and shows a rich WebKit2 popup panel when clicked. + +Dependencies (installed by the .deb package): + gir1.2-ayatanaappindicator3-0.1 + gir1.2-webkit2-4.1 (or gir1.2-webkit2-4.0) + python3-gi + python3-gi-cairo +""" + +import concurrent.futures +import json +import os +import signal +import socketserver +import http.server +import subprocess +import sys +import threading +import time +import urllib.request +import urllib.error +from pathlib import Path + +import gi +gi.require_version("Gtk", "3.0") +gi.require_version("Gdk", "3.0") +gi.require_version("AyatanaAppIndicator3", "0.1") + +# Try WebKit2 4.1 first, fall back to 4.0 +_webkit_available = False +for _wk_ver in ("4.1", "4.0"): + try: + gi.require_version("WebKit2", _wk_ver) + _webkit_available = True + break + except Exception: + pass + +from gi.repository import Gtk, Gdk, AyatanaAppIndicator3, GLib # noqa: E402 +if _webkit_available: + from gi.repository import WebKit2 # noqa: E402 + +# ── config ──────────────────────────────────────────────────────────────────── +PORT = int(os.environ.get("CODEXBAR_PORT", "8080")) # codexbar serve +UI_PORT = int(os.environ.get("CODEXBAR_UI_PORT", "8081")) # tray web UI +BASE_URL = f"http://127.0.0.1:{PORT}" +UI_URL = f"http://127.0.0.1:{UI_PORT}/" +POPUP_W = 420 +POPUP_H = 520 +REFRESH_SECONDS = 60 +ICON_NAME = "codexbar" +CODEXBAR_BIN = os.environ.get("CODEXBAR_BIN", "codexbar") + +# Path to the Linux cookie fetcher sidecar (same directory as this script) +_SCRIPT_DIR = Path(__file__).resolve().parent +LINUX_FETCHER_BIN = os.environ.get( + "CODEXBAR_LINUX_FETCHER", + str(_SCRIPT_DIR / "codexbar-linux-fetcher"), +) + +# ── state ───────────────────────────────────────────────────────────────────── +_server_proc: "subprocess.Popen | None" = None +_indicator: "AyatanaAppIndicator3.Indicator | None" = None +_popup_win: "Gtk.Window | None" = None +_webview: "WebKit2.WebView | None" = None + +# Cache dir for WebKit2 persistent cookie/data stores +_WK_DATA_DIR = Path.home() / ".config" / "codexbar" / "webkit-data" + + +# ── server management ───────────────────────────────────────────────────────── + +def start_server() -> None: + global _server_proc + if _server_proc and _server_proc.poll() is None: + return + try: + _server_proc = subprocess.Popen( + [CODEXBAR_BIN, "serve", "--port", str(PORT)], + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + ) + except FileNotFoundError: + print(f"[codexbar-tray] '{CODEXBAR_BIN}' not found in PATH", file=sys.stderr) + + +def stop_server() -> None: + global _server_proc + if _server_proc and _server_proc.poll() is None: + _server_proc.terminate() + try: + _server_proc.wait(timeout=3) + except subprocess.TimeoutExpired: + _server_proc.kill() + _server_proc = None + + +def wait_for_server(timeout: float = 10.0) -> bool: + deadline = time.monotonic() + timeout + while time.monotonic() < deadline: + try: + with urllib.request.urlopen(f"{BASE_URL}/health", timeout=1) as r: + if r.status == 200: + return True + except Exception: + pass + time.sleep(0.5) + return False + + +# ── popup window ────────────────────────────────────────────────────────────── + +def _make_popup() -> "Gtk.Window": + win = Gtk.Window(type=Gtk.WindowType.TOPLEVEL) + win.set_decorated(False) + win.set_resizable(False) + win.set_keep_above(True) + win.set_skip_taskbar_hint(True) + win.set_skip_pager_hint(True) + win.set_default_size(POPUP_W, POPUP_H) + win.set_size_request(POPUP_W, POPUP_H) + + # rounded corners via CSS + css = Gtk.CssProvider() + css.load_from_data(b""" + window { + border-radius: 14px; + background: transparent; + } + """) + win.get_style_context().add_provider(css, Gtk.STYLE_PROVIDER_PRIORITY_APPLICATION) + + # drop-shadow effect — use an RGBA visual if compositor is running + screen = win.get_screen() + visual = screen.get_rgba_visual() + if visual: + win.set_visual(visual) + win.set_app_paintable(True) + + if _webkit_available: + wv_settings = WebKit2.Settings() + wv_settings.set_enable_javascript(True) + wv_settings.set_enable_developer_extras(False) + wv_settings.set_hardware_acceleration_policy( + WebKit2.HardwareAccelerationPolicy.NEVER + ) + + wv = WebKit2.WebView.new_with_settings(wv_settings) + wv.set_background_color(Gdk.RGBA(0, 0, 0, 0)) + win.add(wv) + win._webview = wv + else: + # fallback: plain label if WebKit2 is unavailable + lbl = Gtk.Label(label="Install gir1.2-webkit2-4.1\nfor the rich UI") + lbl.set_justify(Gtk.Justification.CENTER) + win.add(lbl) + win._webview = None + + win.connect("focus-out-event", _on_popup_focus_out) + win.connect("key-press-event", _on_popup_key) + return win + + +def _popup_position() -> tuple[int, int]: + """Position popup at the top-right of the screen, below the panel.""" + screen = Gdk.Screen.get_default() + sw = screen.get_width() + margin = 8 + x = sw - POPUP_W - margin + y = 32 + margin # below a ~32px GNOME top bar + return x, y + + +def _on_popup_focus_out(win: "Gtk.Window", _event) -> bool: + win.hide() + return False + + +def _on_popup_key(win: "Gtk.Window", event) -> bool: + if event.keyval == Gdk.KEY_Escape: + win.hide() + return False + + +def toggle_popup(_=None) -> None: + global _popup_win + if _popup_win is None: + _popup_win = _make_popup() + + if _popup_win.get_visible(): + _popup_win.hide() + return + + # (re)load fresh page each time so data is current + if _webkit_available and _popup_win._webview: + _popup_win._webview.load_uri(UI_URL) + + x, y = _popup_position() + _popup_win.move(x, y) + _popup_win.show_all() + _popup_win.present() + _popup_win.grab_focus() + + +# ── tray menu (right-click) ─────────────────────────────────────────────────── + +def _build_menu() -> "Gtk.Menu": + menu = Gtk.Menu() + + open_item = Gtk.MenuItem(label="Open Dashboard") + open_item.connect("activate", lambda _: toggle_popup()) + menu.append(open_item) + + refresh_item = Gtk.MenuItem(label="Refresh Now") + refresh_item.connect("activate", lambda _: _do_reload()) + menu.append(refresh_item) + + menu.append(Gtk.SeparatorMenuItem()) + + quit_item = Gtk.MenuItem(label="Quit CodexBar") + quit_item.connect("activate", on_quit) + menu.append(quit_item) + + menu.show_all() + return menu + + +def _do_reload() -> None: + global _stream_cache, _stream_cache_time + _stream_cache = {} + _stream_cache_time = 0 + if _popup_win and _popup_win.get_visible() and _webkit_available and _popup_win._webview: + GLib.idle_add(_popup_win._webview.reload) + + +# ── actions ─────────────────────────────────────────────────────────────────── + +def on_quit(_=None) -> None: + stop_server() + Gtk.main_quit() + + +# ── embedded web UI server ──────────────────────────────────────────────────── + +_UI_HTML = r""" + + + + +CodexBar + + + +
+
+ + + + + + CodexBar +
+
+ +
+
+
+
+
+
+ + + +""" + + +# ── WebKit2-based cookie fetcher ────────────────────────────────────────────── +# +# Chrome 127+ uses app-bound AES encryption for cookies that cannot be +# decrypted outside the browser process. Instead we use WebKit2's own +# persistent cookie/data store (stored in ~/.config/codexbar/webkit-data/). +# Each provider's hidden WebView shares that store, so once the user signs +# in via the popup it persists for future fetches. +# +# fetch_via_webkit(provider, js_script) → dict (provider payload JSON) +# Runs `js_script` in a hidden WebView and returns the JSON result it posts +# to window.codexbarResult = . + +_wk_fetch_lock = threading.Lock() +_wk_pending: "dict[str, dict | None]" = {} # provider → result or None=pending + + +def _wk_data_manager() -> "WebKit2.WebsiteDataManager | None": + if not _webkit_available: + return None + _WK_DATA_DIR.mkdir(parents=True, exist_ok=True) + base = str(_WK_DATA_DIR) + try: + mgr = WebKit2.WebsiteDataManager( + base_data_directory=base, + base_cache_directory=base + "/cache", + ) + return mgr + except Exception: + return None + + +_wk_data_mgr = None # initialised on first use in GTK main thread + + +def _ensure_wk_data_mgr(): + global _wk_data_mgr + if _wk_data_mgr is None: + _wk_data_mgr = _wk_data_manager() + return _wk_data_mgr + + +def fetch_via_webkit(provider: str, url: str, js_script: str, + timeout_secs: float = 15.0) -> "dict | None": + """ + Load `url` in a hidden WebKit2 view, inject `js_script` after load, + wait for `window._codexbarResult` to be set, return parsed JSON. + Must be called from the GTK main thread (or via GLib.idle_add). + Returns None on timeout/error. + """ + if not _webkit_available: + return None + + result_holder: list = [] + done_event = threading.Event() + + def _run(): + mgr = _ensure_wk_data_mgr() + ctx = WebKit2.WebContext.new_with_website_data_manager(mgr) if mgr else WebKit2.WebContext.get_default() + settings = WebKit2.Settings() + settings.set_enable_javascript(True) + wv = WebKit2.WebView.new_with_context(ctx) + wv.set_settings(settings) + + # Offscreen window (never shown) + win = Gtk.OffscreenWindow() + win.set_default_size(1, 1) + win.add(wv) + win.show_all() + + def _on_load_changed(wv, event): + if event != WebKit2.LoadEvent.FINISHED: + return + full_js = js_script + """ + (function poll(n) { + if (window._codexbarResult !== undefined) { + document.title = '__codexbar__' + JSON.stringify(window._codexbarResult); + } else if (n > 0) { + setTimeout(function(){ poll(n-1); }, 200); + } else { + document.title = '__codexbar__null'; + } + })(50); + """ + wv.run_javascript(full_js, None, None, None) + + def _on_title_changed(wv, _pspec): + title = wv.get_title() or "" + if title.startswith("__codexbar__"): + payload_str = title[len("__codexbar__"):] + try: + result_holder.append(json.loads(payload_str)) + except Exception: + result_holder.append(None) + win.destroy() + done_event.set() + + wv.connect("load-changed", _on_load_changed) + wv.connect("notify::title", _on_title_changed) + wv.load_uri(url) + + GLib.idle_add(_run) + done_event.wait(timeout=timeout_secs) + return result_holder[0] if result_holder else None + + +# ── WebKit2 provider scripts ────────────────────────────────────────────────── + +_CODEX_JS = """ +(async function() { + try { + const r = await fetch('https://chatgpt.com/backend-api/wham/usage', {credentials:'include'}); + if (!r.ok) { window._codexbarResult = {error: 'HTTP ' + r.status}; return; } + window._codexbarResult = await r.json(); + } catch(e) { window._codexbarResult = {error: String(e)}; } +})(); +""" + +_CURSOR_JS = """ +(async function() { + try { + const [sum, me] = await Promise.all([ + fetch('/api/usage-summary', {credentials:'include'}).then(r=>r.json()), + fetch('/api/auth/me', {credentials:'include'}).then(r=>r.json()), + ]); + window._codexbarResult = {summary: sum, me: me}; + } catch(e) { window._codexbarResult = {error: String(e)}; } +})(); +""" + + +def _build_wk_codex_payload(data: dict) -> dict: + import datetime as dt + def _unix(ts): + return dt.datetime.utcfromtimestamp(ts).strftime("%Y-%m-%dT%H:%M:%SZ") if ts else None + def _win(w): + if not w: return None + return {"usedPercent": min(100, max(0, w.get("used_percent", 0))), + "resetsAt": _unix(w.get("reset_at")), + "windowMinutes": (w.get("limit_window_seconds") or 0) // 60 or None} + now_iso = dt.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ") + return { + "provider": "codex", + "error": None, + "usage": { + "updatedAt": now_iso, + "identity": {"displayName": "Codex"}, + "primary": _win(data.get("primary_window")), + "secondary": _win(data.get("secondary_window")), + } + } + + +def _build_wk_cursor_payload(data: dict) -> dict: + import datetime as dt + import re as _re + now_iso = dt.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ") + s = data.get("summary", {}) or {} + me = data.get("me", {}) or {} + # Read nested structure first (current API), fall back to top-level legacy fields + ind = s.get("individualUsage") or {} + plan = ind.get("plan") or {} + total_pct = ( + plan.get("totalPercentUsed") + or s.get("planPercentUsed") + or s.get("total_percent_used") + or 0 + ) + api_pct = plan.get("apiPercentUsed") or s.get("apiPercentUsed") + cycle_end_raw = s.get("billingCycleEnd") or s.get("billing_cycle_end") + # Strip sub-second precision — older WebKit2 builds reject "2025-06-01T00:00:00.000Z" + cycle_end = _re.sub(r"\.\d+Z$", "Z", cycle_end_raw) if isinstance(cycle_end_raw, str) else cycle_end_raw + on_demand = s.get("currentUsageUsd") or 0 + hard_limit = s.get("hardLimitUsd") or 0 + cost = {"used": float(on_demand), "limit": float(hard_limit), + "currencyCode": "USD", "period": "monthly"} if (on_demand or hard_limit) else None + return { + "provider": "cursor", + "error": None, + "usage": { + "updatedAt": now_iso, + "identity": {"displayName": "Cursor", "email": me.get("email", ""), + "sessionLabel": "Plan", "weeklyLabel": "API"}, + "primary": {"usedPercent": total_pct, "resetsAt": cycle_end}, + "secondary": {"usedPercent": api_pct, "resetsAt": cycle_end} if api_pct is not None else None, + "providerCost": cost, + } + } + + +def fetch_wk_provider(provider: str) -> "dict | None": + """Fetch a cookie-based provider via hidden WebKit2 view. Returns payload dict or None.""" + if not _webkit_available: + return None + if provider == "codex": + data = fetch_via_webkit("codex", "https://chatgpt.com/codex", _CODEX_JS) + if data and not data.get("error"): + return _build_wk_codex_payload(data) + return {"provider": "codex", "error": {"message": data.get("error", "Not signed in") if data else "Fetch timeout. Open chatgpt.com/codex in the tray dashboard first."}, "usage": None} + if provider == "cursor": + data = fetch_via_webkit("cursor", "https://cursor.com/dashboard", _CURSOR_JS) + if data and not data.get("error"): + return _build_wk_cursor_payload(data) + return {"provider": "cursor", "error": {"message": data.get("error", "Not signed in") if data else "Fetch timeout. Open cursor.com in the tray dashboard first."}, "usage": None} + return None + + +# ── sidecar fetcher (legacy cookie-DB path + WebKit2 fallback) ──────────────── + +def _run_sidecar(providers: "set[str]") -> list: + """Run codexbar-linux-fetcher for the given providers.""" + fetcher = LINUX_FETCHER_BIN + if not Path(fetcher).exists(): + fetcher = "/usr/bin/codexbar-linux-fetcher" + if not Path(fetcher).exists(): + return [] + try: + result = subprocess.run( + [fetcher, "--providers", ",".join(providers)], + capture_output=True, text=True, timeout=30, + ) + return json.loads(result.stdout) if result.stdout.strip() else [] + except Exception: + return [] + + +# The three providers shown in the UI on Linux +_SHOWN_PROVIDERS = ["claude", "codex", "cursor"] + + +def _fetch_claude_provider() -> "dict | None": + """Fetch claude data from the Swift serve backend.""" + try: + with urllib.request.urlopen(f"{BASE_URL}/usage", timeout=30) as r: + all_providers = json.loads(r.read()) + for p in all_providers: + if p.get("provider") == "claude": + return p + return {"provider": "claude", "error": {"message": "Claude not returned by server"}, "usage": None} + except Exception as e: + return {"provider": "claude", "error": {"message": str(e)}, "usage": None} + + +def _fetch_codex_provider() -> dict: + """Fetch codex via sidecar (cookie DB, no WebKit).""" + results = _run_sidecar({"codex"}) + if results: + return results[0] + return {"provider": "codex", "error": {"message": "Codex fetch failed"}, "usage": None} + + +def _fetch_cursor_provider() -> dict: + """Fetch cursor via sidecar (cookie DB, no WebKit).""" + results = _run_sidecar({"cursor"}) + if results: + return results[0] + return {"provider": "cursor", "error": {"message": "Cursor fetch failed"}, "usage": None} + + +_PROVIDER_FETCHERS = { + "claude": _fetch_claude_provider, + "codex": _fetch_codex_provider, + "cursor": _fetch_cursor_provider, +} + +# Cache for streaming results +_stream_cache: "dict[str, dict]" = {} +_stream_cache_time: float = 0 +_STREAM_TTL = 55 # seconds + + +def _stream_all_providers(write_event): + """ + Fetch claude, codex, cursor in parallel and call write_event(data_dict) + for each result as it arrives. + """ + global _stream_cache, _stream_cache_time + now = time.monotonic() + + # Use cache if fresh enough (avoids hammering APIs on rapid re-opens) + if _stream_cache and now - _stream_cache_time < _STREAM_TTL: + for p in _SHOWN_PROVIDERS: + if p in _stream_cache: + write_event(_stream_cache[p]) + return + + new_cache: dict = {} + + with concurrent.futures.ThreadPoolExecutor(max_workers=3) as ex: + futs = {ex.submit(fn): pid for pid, fn in _PROVIDER_FETCHERS.items()} + for fut in concurrent.futures.as_completed(futs): + pid = futs[fut] + try: + result = fut.result() + except Exception as e: + result = {"provider": pid, "error": {"message": str(e)}, "usage": None} + if result: + new_cache[pid] = result + write_event(result) + + _stream_cache = new_cache + _stream_cache_time = time.monotonic() + + +class _UIHandler(http.server.BaseHTTPRequestHandler): + def do_GET(self): + if self.path == "/health": + try: + with urllib.request.urlopen(f"{BASE_URL}/health", timeout=5) as r: + body = r.read() + self.send_response(200) + except Exception as e: + body = json.dumps({"error": str(e)}).encode() + self.send_response(502) + self.send_header("Content-Type", "application/json; charset=utf-8") + self.send_header("Content-Length", str(len(body))) + self.send_header("Access-Control-Allow-Origin", "*") + self.end_headers() + self.wfile.write(body) + + elif self.path == "/usage-stream": + # SSE endpoint — streams one event per provider as each completes + self.send_response(200) + self.send_header("Content-Type", "text/event-stream") + self.send_header("Cache-Control", "no-cache") + self.send_header("Access-Control-Allow-Origin", "*") + self.send_header("X-Accel-Buffering", "no") + self.end_headers() + + lock = threading.Lock() + + def write_event(data: dict): + line = "data: " + json.dumps(data) + "\n\n" + with lock: + try: + self.wfile.write(line.encode()) + self.wfile.flush() + except Exception: + pass + + try: + _stream_all_providers(write_event) + with lock: + self.wfile.write(b"event: done\ndata: {}\n\n") + self.wfile.flush() + except Exception: + pass + + elif self.path.startswith("/usage"): + # Fetch each provider via its dedicated fetcher (bypasses Swift for codex/cursor) + results: dict[str, dict] = {} + with concurrent.futures.ThreadPoolExecutor(max_workers=3) as ex: + futs = {ex.submit(fn): pid for pid, fn in _PROVIDER_FETCHERS.items()} + for fut in concurrent.futures.as_completed(futs): + pid = futs[fut] + try: + r = fut.result() + except Exception as e: + r = {"provider": pid, "error": {"message": str(e)}, "usage": None} + if r: + results[pid] = r + merged = [results[p] for p in _SHOWN_PROVIDERS if p in results] + body = json.dumps(merged).encode() + self.send_response(200) + self.send_header("Content-Type", "application/json; charset=utf-8") + self.send_header("Content-Length", str(len(body))) + self.send_header("Access-Control-Allow-Origin", "*") + self.end_headers() + self.wfile.write(body) + + elif self.path.startswith("/cost"): + try: + with urllib.request.urlopen(f"{BASE_URL}{self.path}", timeout=15) as r: + body = r.read() + self.send_response(200) + self.send_header("Content-Type", "application/json; charset=utf-8") + except Exception as e: + body = json.dumps({"error": str(e)}).encode() + self.send_response(502) + self.send_header("Content-Type", "application/json; charset=utf-8") + self.send_header("Content-Length", str(len(body))) + self.send_header("Access-Control-Allow-Origin", "*") + self.end_headers() + self.wfile.write(body) + + else: + body = _UI_HTML.encode() + self.send_response(200) + self.send_header("Content-Type", "text/html; charset=utf-8") + self.send_header("Content-Length", str(len(body))) + self.end_headers() + self.wfile.write(body) + + def log_message(self, *_): + pass + + +def start_ui_server() -> None: + socketserver.ThreadingTCPServer.allow_reuse_address = True + with socketserver.ThreadingTCPServer(("127.0.0.1", UI_PORT), _UIHandler) as httpd: + httpd.serve_forever() + + +# ── indicator scroll / click handler ───────────────────────────────────────── +# AyatanaAppIndicator3 doesn't expose a direct left-click signal, so we use a +# secondary "activate" menu item as a workaround: we attach a transparent +# Gtk.Menu that has a single invisible item whose activation opens the popup. +# The user sees no menu — the popup opens instead. +# +# A cleaner approach that works on most GNOME setups: hijack the "scroll-event" +# on the indicator's button via the "secondary-activate" signal. + +def _on_secondary_activate(_indicator) -> None: + GLib.idle_add(toggle_popup) + + +# ── main ────────────────────────────────────────────────────────────────────── + +def main() -> None: + global _indicator + + signal.signal(signal.SIGTERM, lambda *_: on_quit()) + signal.signal(signal.SIGINT, lambda *_: on_quit()) + + start_server() + + _indicator = AyatanaAppIndicator3.Indicator.new( + "codexbar", + ICON_NAME, + AyatanaAppIndicator3.IndicatorCategory.APPLICATION_STATUS, + ) + _indicator.set_status(AyatanaAppIndicator3.IndicatorStatus.ACTIVE) + + # Primary (left) click menu — "Open Dashboard" as first item opens popup + menu = _build_menu() + _indicator.set_menu(menu) + + # secondary-activate fires on middle-click / scroll in some environments + try: + _indicator.connect("secondary-activate", _on_secondary_activate) + except Exception: + pass + + # wait for server then pre-warm (don't block main thread) + def _initial_load(): + wait_for_server(timeout=15) + + threading.Thread(target=_initial_load, daemon=True).start() + + # embedded web UI server + threading.Thread(target=start_ui_server, daemon=True).start() + + Gtk.main() + + +if __name__ == "__main__": + main()