Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions services/hackbot-pulse-listener/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
FROM python:3.14-slim AS builder

COPY --from=ghcr.io/astral-sh/uv:latest /uv /uvx /bin/

ENV UV_PROJECT_ENVIRONMENT=/opt/venv

WORKDIR /app

# Install external deps without building workspace members.
RUN --mount=type=cache,target=/root/.cache/uv \
--mount=type=bind,source=pyproject.toml,target=pyproject.toml \
--mount=type=bind,source=uv.lock,target=uv.lock \
--mount=type=bind,source=VERSION,target=VERSION \
uv sync --frozen --no-dev --no-install-workspace --package hackbot-pulse-listener

RUN --mount=type=cache,target=/root/.cache/uv \
--mount=type=bind,target=/app,rw \
uv sync --locked --no-dev --no-editable --package hackbot-pulse-listener

FROM python:3.14-slim AS base

COPY --from=builder /opt/venv /opt/venv
WORKDIR /app

ENV PYTHONUNBUFFERED=1
ENV PYTHONDONTWRITEBYTECODE=1
ENV PATH="/opt/venv/bin:$PATH"

RUN useradd --create-home --shell /bin/bash app
USER app

CMD ["python", "-m", "app"]
44 changes: 44 additions & 0 deletions services/hackbot-pulse-listener/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# Hackbot Pulse Listener

Listens to Taskcluster build-failure pulse messages, and for failed **Firefox build
tasks** triggers the `build-repair` hackbot agent through the hackbot-api. When a run
finishes (minutes later) it emails the developer who pushed the change a link to the
hackbot UI and a summary of the analysis and fix.

## How it works

1. Consume `task-failed` messages from `pulse.mozilla.org`.
2. Keep only **build-kind** tasks (`tags.kind == "build"`) on a watched `project`
(`WATCHED_REPOS`, default `try`). Build tasks don't run tests, so a failure is a
compilation/link error.
3. Fetch the task definition to read `GECKO_HEAD_REV` (the revision is not in the message).
4. Dedupe by revision with an in-memory TTL cache, so only one agent run is triggered per
revision even when many build tasks fail for the same push.
5. `POST /agents/build-repair/runs`, then poll `GET /runs/{run_id}` until terminal and send
the report email.

The dedupe cache and pending-run tracking are in-memory (reset on restart).

## Run locally

```bash
export PULSE_USER=... PULSE_PASSWORD=... # https://pulseguardian.mozilla.org
export HACKBOT_API_URL=https://hackbot-api.../ HACKBOT_API_KEY=...
export HACKBOT_UI_URL=https://hackbot-ui.../
export WATCHED_REPOS=try
export DRY_RUN=true # log intended calls, don't POST
uv run --package hackbot-pulse-listener python -m app
```

Email is sent only when `SENDGRID_API_KEY` and `NOTIFICATION_SENDER` are set; otherwise it
is logged and skipped.

## Test

```bash
uv run --package hackbot-pulse-listener pytest services/hackbot-pulse-listener/tests
```

## Deploy

Cloud Run worker pool (no HTTP). See `deploy.sh`.
Empty file.
44 changes: 44 additions & 0 deletions services/hackbot-pulse-listener/app/__main__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import logging
import signal
from concurrent.futures import ThreadPoolExecutor

from app import consumer
from app.config import settings

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def main() -> None:
if settings.sentry_dsn:
import sentry_sdk

sentry_sdk.init(dsn=settings.sentry_dsn, environment=settings.environment)

if not (settings.pulse_user and settings.pulse_password):
logger.warning("PULSE_USER/PULSE_PASSWORD not set; listener will not start")
return

executor = ThreadPoolExecutor(max_workers=settings.poll_max_workers)
consumer_obj = consumer.build_consumer(executor)

def shutdown(signum, _frame):
logger.info("Received signal %s; shutting down", signum)
consumer_obj.should_stop = True

signal.signal(signal.SIGTERM, shutdown)
signal.signal(signal.SIGINT, shutdown)

logger.info(
"Listening for build failures on %s; watched repos: %s",
", ".join(consumer.EXCHANGES),
sorted(settings.watched_repos_set),
)
try:
consumer_obj.run()
finally:
executor.shutdown(wait=False)


if __name__ == "__main__":
main()
32 changes: 32 additions & 0 deletions services/hackbot-pulse-listener/app/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import logging

import httpx

from app.config import settings

logger = logging.getLogger(__name__)

_TIMEOUT = httpx.Timeout(30.0)


def _headers() -> dict[str, str]:
return {"X-API-Key": settings.hackbot_api_key}


def trigger_run(inputs: dict) -> str | None:
"""Create a build-repair run. Returns the run id, or None in dry-run mode."""
if settings.dry_run:
logger.info("[dry-run] would trigger %s run: %s", settings.agent_name, inputs)
return None

url = f"{settings.hackbot_api_url}/agents/{settings.agent_name}/runs"
resp = httpx.post(url, json=inputs, headers=_headers(), timeout=_TIMEOUT)
resp.raise_for_status()
return resp.json()["run_id"]


def get_run(run_id: str) -> dict:
url = f"{settings.hackbot_api_url}/runs/{run_id}"
resp = httpx.get(url, headers=_headers(), timeout=_TIMEOUT)
resp.raise_for_status()
return resp.json()
51 changes: 51 additions & 0 deletions services/hackbot-pulse-listener/app/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
from pydantic_settings import BaseSettings


class Settings(BaseSettings):
# Pulse (https://pulseguardian.mozilla.org)
pulse_user: str = ""
pulse_password: str = ""
taskcluster_root_url: str = "https://firefox-ci-tc.services.mozilla.com"

# hackbot-api
hackbot_api_url: str = ""
hackbot_api_key: str = ""
hackbot_ui_url: str = ""
agent_name: str = "build-repair"

# Failure filtering and agent inputs.
# ``watched_repos`` is a comma-separated list of Taskcluster ``project`` tags.
watched_repos: str = "try,autoland"
run_try_push: bool = False
model: str | None = None
max_turns: int | None = None

# Dedupe (in-memory, by git revision)
dedupe_ttl_seconds: int = 6 * 60 * 60
dedupe_max_size: int = 4096

# Polling the API for run completion
poll_interval_seconds: int = 60
run_max_age_minutes: int = 12 * 60
poll_max_workers: int = 8

# Email notifications (SendGrid)
sendgrid_api_key: str | None = None
notification_sender: str | None = None

dry_run: bool = False
environment: str = "development"
sentry_dsn: str | None = None

model_config = {
"env_file": ".env",
"env_file_encoding": "utf-8",
"extra": "ignore",
}

@property
def watched_repos_set(self) -> set[str]:
return {r.strip() for r in self.watched_repos.split(",") if r.strip()}


settings = Settings()
118 changes: 118 additions & 0 deletions services/hackbot-pulse-listener/app/consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
import logging
from concurrent.futures import Executor

from cachetools import TTLCache
from kombu import Connection, Exchange, Queue
from kombu.mixins import ConsumerMixin

from app import client, taskcluster, worker
from app.config import settings

logger = logging.getLogger(__name__)

CONNECTION_URL = "amqp://{}:{}@pulse.mozilla.org:5671/?ssl=1"

EXCHANGES = ("exchange/taskcluster-queue/v1/task-failed",)

# In-memory dedupe of git revisions already handed to the agent. Only the
# single consumer thread touches it, so no lock is needed.
_seen: TTLCache = TTLCache(
maxsize=settings.dedupe_max_size, ttl=settings.dedupe_ttl_seconds
)


def process(body: dict, executor: Executor) -> str | None:
"""Handle one Taskcluster failure message. Returns the triggered run id."""
tags = (body.get("task") or {}).get("tags") or {}

if tags.get("kind") != "build":
return None

project = tags.get("project")
if project not in settings.watched_repos_set:
return None

task_id = body["status"]["taskId"]
task_name = tags.get("label") or task_id
developer_email = tags.get("createdForUser")

revision = taskcluster.get_revision(task_id)
if not revision:
logger.warning("No GECKO_HEAD_REV for task %s; skipping", task_id)
return None

if revision in _seen:
logger.info("Revision %s already processed; skipping", revision)
return None
_seen[revision] = True

inputs: dict = {
"git_commit": revision,
"failure_tasks": {task_name: task_id},
"run_try_push": settings.run_try_push,
}
if settings.model:
inputs["model"] = settings.model
if settings.max_turns is not None:
inputs["max_turns"] = settings.max_turns

try:
run_id = client.trigger_run(inputs)
except Exception:
logger.exception("Failed to trigger build-repair run for %s", revision)
_seen.pop(revision, None)
return None

logger.info("Triggered build-repair run %s for %s@%s", run_id, project, revision)
if run_id is not None:
executor.submit(
worker.poll_and_notify, run_id, revision, project, developer_email
)
return run_id


def make_handler(executor: Executor):
def on_message(body, message):
try:
process(body, executor)
except Exception:
logger.exception("Error handling pulse message")
finally:
message.ack()

return on_message


def _build_queues(user: str) -> list[Queue]:
queues = []
for exchange in EXCHANGES:
suffix = exchange.rsplit("/", 1)[-1]
queues.append(
Queue(
name=f"queue/{user}/build-repair-{suffix}",
exchange=Exchange(exchange, type="topic", no_declare=True),
routing_key="#",
durable=True,
auto_delete=True,
)
)
return queues


class BuildFailureConsumer(ConsumerMixin):
def __init__(self, connection, queues, on_message):
self.connection = connection
self.queues = queues
self.on_message = on_message

def get_consumers(self, Consumer, channel):
return [Consumer(queues=self.queues, callbacks=[self.on_message])]


def build_consumer(executor: Executor) -> BuildFailureConsumer:
connection = Connection(
CONNECTION_URL.format(settings.pulse_user, settings.pulse_password)
)
return BuildFailureConsumer(
connection, _build_queues(settings.pulse_user), make_handler(executor)
)
Loading