diff --git a/.env.example b/.env.example index 1e10ade..34d2843 100644 --- a/.env.example +++ b/.env.example @@ -41,3 +41,34 @@ SLACK_BOT_TOKEN_SAEZ=xoxb-placeholder SLACK_APP_TOKEN_SAEZ=xapp-placeholder SLACK_BOT_TOKEN_WU=xoxb-placeholder SLACK_APP_TOKEN_WU=xapp-placeholder +SLACK_BOT_TOKEN_WARD=xoxb-placeholder +SLACK_APP_TOKEN_WARD=xapp-placeholder +SLACK_BOT_TOKEN_BRINEY=xoxb-placeholder +SLACK_APP_TOKEN_BRINEY=xapp-placeholder +SLACK_BOT_TOKEN_FORLI=xoxb-placeholder +SLACK_APP_TOKEN_FORLI=xapp-placeholder +SLACK_BOT_TOKEN_DENIZ=xoxb-placeholder +SLACK_APP_TOKEN_DENIZ=xapp-placeholder +SLACK_BOT_TOKEN_GRANTBOT=xoxb-placeholder +SLACK_APP_TOKEN_GRANTBOT=xapp-placeholder + +# Podcast TTS backend: "mistral" (default) or "local" (vLLM-Omni server) +PODCAST_TTS_BACKEND="local" + +# Mistral AI TTS (used when PODCAST_TTS_BACKEND=mistral) +MISTRAL_API_KEY=your-mistral-api-key +MISTRAL_TTS_MODEL=voxtral-mini-tts-latest +MISTRAL_TTS_DEFAULT_VOICE=your-voice-uuid + +# Local vLLM-Omni TTS server (used when PODCAST_TTS_BACKEND=local) +# Start with: vllm serve --port 8010 +LOCAL_TTS_HOST=127.0.0.1 +LOCAL_TTS_PORT=8008 +LOCAL_TTS_MODEL=mistralai/Voxtral-4B-TTS-2603 +LOCAL_TTS_VOICE=default + +# Podcast +PODCAST_BASE_URL=http://localhost:8001 +PODCAST_SEARCH_WINDOW_DAYS=14 +PODCAST_MAX_CANDIDATES=50 +# PODCAST_NORMALIZE_AUDIO=true # uncomment to enable ffmpeg loudnorm post-processing (EBU R128, -16 LUFS) diff --git a/.gitignore b/.gitignore index aad82ec..342842f 100644 --- a/.gitignore +++ b/.gitignore @@ -53,3 +53,9 @@ certbot/ .pytest_cache/ .coverage htmlcov/ + +# Runtime data (state files, generated audio — ephemeral) +data/ + +# Test output artifacts +.labbot-tests/ diff --git a/AGENT.md b/AGENT.md index a94b338..39628fc 100644 --- a/AGENT.md +++ b/AGENT.md @@ -32,6 +32,7 @@ All specs are in `/specs/`: - `profile-ingestion.md` — 9-step pipeline, ORCID → PubMed → PMC → LLM - `admin-dashboard.md` — read-only, server-rendered, impersonation - `agent-system.md` — Slack Bolt, Socket Mode, two-phase LLM calls, simulation engine +- `labbot-podcast.md` — daily personalized research briefing: PubMed search, LLM selection/summarization, Local or API TTS, Slack DM delivery, per-PI RSS podcast feed ## Tech Stack diff --git a/CLAUDE.md b/CLAUDE.md index 98f4d23..b95eda9 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -42,3 +42,26 @@ docker compose --profile agent run -d --name agent-run agent python -m src.agent ``` **Note:** The agent-run container uses mounted source code but the Python process only loads modules at startup. Code changes require a container restart to take effect. + +## Podcast Pipeline + +The LabBot Podcast pipeline (specs/labbot-podcast.md) runs daily at 9am UTC for each active agent: + +1. Build PubMed queries from lab's public profile +2. Fetch candidates from PubMed + bioRxiv + medRxiv + arXiv (last 14 days, up to 50+10 candidates) +3. Claude Sonnet selects most relevant paper (applying PI's podcast preferences from their private ProfileRevision) +4. Claude Opus writes a ~250-word structured brief +5. TTS audio generated (Mistral or local vLLM-Omni); ffmpeg loudnorm applied if PODCAST_NORMALIZE_AUDIO=true +6. Slack DM sent to PI with text summary + RSS link +7. RSS feed available at `/podcast/{agent_id}/feed.xml` +8. Audio served at `/podcast/{agent_id}/audio/{date}.mp3` + +Preprint IDs use prefixed format: `biorxiv:...`, `medrxiv:...`, `arxiv:...`. The `paper_url` in summaries links to the correct server (not always PubMed). + +```bash +# Run podcast pipeline once for all active agents +docker compose --profile podcast run --rm podcast python -m src.podcast.main + +# Test pipeline for 'su' agent only +docker compose exec app python scripts/test_podcast_su.py +``` diff --git a/Dockerfile b/Dockerfile index c032e95..63a7b94 100644 --- a/Dockerfile +++ b/Dockerfile @@ -6,6 +6,7 @@ WORKDIR /app RUN apt-get update && apt-get install -y --no-install-recommends \ gcc \ libpq-dev \ + ffmpeg \ && rm -rf /var/lib/apt/lists/* # Install Python dependencies diff --git a/alembic/versions/0010_add_podcast_episodes.py b/alembic/versions/0010_add_podcast_episodes.py new file mode 100644 index 0000000..adad7d2 --- /dev/null +++ b/alembic/versions/0010_add_podcast_episodes.py @@ -0,0 +1,56 @@ +"""Add podcast_episodes table + +Revision ID: 0010 +Revises: 0009 +Create Date: 2026-04-09 00:00:00.000000 + +""" + +from typing import Sequence, Union + +import sqlalchemy as sa +from sqlalchemy.dialects import postgresql + +from alembic import op + +revision: str = "0010" +down_revision: Union[str, None] = "0009" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.create_table( + "podcast_episodes", + sa.Column("id", postgresql.UUID(as_uuid=True), primary_key=True), + sa.Column("agent_id", sa.String(50), nullable=False), + sa.Column("episode_date", sa.Date, nullable=False), + sa.Column("pmid", sa.String(100), nullable=False), + sa.Column("paper_title", sa.String(500), nullable=False), + sa.Column("paper_authors", sa.String(500), nullable=False), + sa.Column("paper_journal", sa.String(255), nullable=False), + sa.Column("paper_year", sa.Integer, nullable=False), + sa.Column("text_summary", sa.Text, nullable=False), + sa.Column("audio_file_path", sa.String(500), nullable=True), + sa.Column("audio_duration_seconds", sa.Integer, nullable=True), + sa.Column("slack_delivered", sa.Boolean, nullable=False, server_default="false"), + sa.Column("selection_justification", sa.Text, nullable=False), + sa.Column( + "created_at", + sa.DateTime(timezone=True), + server_default=sa.func.now(), + nullable=False, + ), + ) + op.create_index("ix_podcast_episodes_agent_id", "podcast_episodes", ["agent_id"]) + op.create_index("ix_podcast_episodes_episode_date", "podcast_episodes", ["episode_date"]) + op.create_unique_constraint( + "uq_podcast_agent_date", "podcast_episodes", ["agent_id", "episode_date"] + ) + + +def downgrade() -> None: + op.drop_constraint("uq_podcast_agent_date", "podcast_episodes") + op.drop_index("ix_podcast_episodes_episode_date") + op.drop_index("ix_podcast_episodes_agent_id") + op.drop_table("podcast_episodes") diff --git a/docker-compose.prod.yml b/docker-compose.prod.yml index 44dc726..3c0c371 100644 --- a/docker-compose.prod.yml +++ b/docker-compose.prod.yml @@ -35,6 +35,7 @@ services: volumes: - ./profiles:/app/profiles - ./prompts:/app/prompts + - podcast_data:/app/data depends_on: postgres: condition: service_healthy @@ -83,7 +84,6 @@ services: volumes: - ./profiles:/app/profiles - ./prompts:/app/prompts - - ./data:/app/data depends_on: postgres: condition: service_healthy @@ -108,7 +108,7 @@ services: volumes: - ./profiles:/app/profiles - ./prompts:/app/prompts - - ./data:/app/data + - grantbot_data:/app/data depends_on: postgres: condition: service_healthy @@ -120,6 +120,29 @@ services: awslogs-create-group: "true" awslogs-region: ${AWS_REGION:-us-east-2} + podcast: + build: + context: . + restart: unless-stopped + command: ["python", "-m", "src.podcast.main", "scheduler", "--run-hour", "9"] + env_file: .env + environment: + DATABASE_URL: postgresql+asyncpg://${POSTGRES_USER:-copi}:${POSTGRES_PASSWORD}@postgres:5432/${POSTGRES_DB:-copi} + volumes: + - ./profiles:/app/profiles + - ./prompts:/app/prompts + - podcast_data:/app/data + depends_on: + postgres: + condition: service_healthy + logging: + driver: awslogs + options: + awslogs-group: /copi/podcast + tag: podcast + awslogs-create-group: "true" + awslogs-region: ${AWS_REGION:-us-east-2} + nginx: image: nginx:1.27-alpine restart: unless-stopped @@ -167,3 +190,5 @@ services: volumes: pgdata: + grantbot_data: + podcast_data: diff --git a/docker-compose.yml b/docker-compose.yml index d686043..71d3fd9 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -25,6 +25,7 @@ services: - .:/app - ./profiles:/app/profiles - ./prompts:/app/prompts + - ./data:/app/data depends_on: postgres: condition: service_healthy @@ -69,5 +70,25 @@ services: postgres: condition: service_healthy + podcast: + build: . + command: python -m src.podcast.main scheduler --run-hour 9 + env_file: .env + environment: + # Override LOCAL_TTS_HOST so the container can reach a vLLM-Omni server + # running on the host machine (127.0.0.1 does not reach the host from inside Docker). + LOCAL_TTS_HOST: host.docker.internal + extra_hosts: + # Ensures host.docker.internal resolves on Linux (Docker Desktop sets it automatically on Mac/Windows). + - "host.docker.internal:host-gateway" + volumes: + - .:/app + - ./profiles:/app/profiles + - ./prompts:/app/prompts + - ./data:/app/data + depends_on: + postgres: + condition: service_healthy + volumes: pgdata: diff --git a/prompts/podcast-select.md b/prompts/podcast-select.md new file mode 100644 index 0000000..121af03 --- /dev/null +++ b/prompts/podcast-select.md @@ -0,0 +1,46 @@ +You are a literature triage assistant for a scientific researcher. Your job is to identify the single most relevant and impactful recent paper from a list of candidates, based on the researcher's profile. + +## Researcher Profile + +{profile} + +## PI Podcast Preferences + +{preferences} + +## Task + +Below is a numbered list of recent publications (title + abstract). Select the ONE paper whose findings or outputs could most plausibly accelerate or inform a specific aspect of this researcher's ongoing work. + +Return your answer as JSON: +```json +{"index": , "justification": ""} +``` + +If no paper clears the relevance bar, return: +```json +{"index": null, "justification": "No paper is sufficiently relevant to this researcher's current work."} +``` + +## Selection Criteria + +**INCLUDE** a paper if: +- Its findings or methods could directly accelerate a specific ongoing project, technique, or open question in the researcher's profile +- It releases a new tool, dataset, method, or reagent relevant to the researcher's techniques or targets +- It addresses a disease area, model system, or molecular target the researcher actively works on + +**EXCLUDE** a paper if: +- The connection to the researcher's work is only superficial or generic +- It is a review article, editorial, or commentary (no new primary data) +- It is purely clinical or epidemiological with no basic science relevance +- Recency alone makes it interesting — the connection must be specific and actionable + +**NOTE:** Some candidates are preprints (from bioRxiv, medRxiv, or arXiv) and are marked as such in the journal field. Preprints are valid candidates — treat them the same as peer-reviewed papers for selection purposes. + +**PREFER** papers that release a concrete output alongside findings (code, dataset, protocol, reagent, model). These tend to be immediately useful. + +**FOLLOW PI PREFERENCES:** If the PI Podcast Preferences section above contains specific instructions (e.g., topic focus, exclusions, prioritizations), apply them when selecting. PI preferences override the general criteria above. + +## Candidate Papers + +{candidates} diff --git a/prompts/podcast-summarize.md b/prompts/podcast-summarize.md new file mode 100644 index 0000000..1a96589 --- /dev/null +++ b/prompts/podcast-summarize.md @@ -0,0 +1,46 @@ +You are a science communicator writing a personalized research brief for a specific PI. Your goal is to help the PI quickly grasp whether and how a new paper is useful to their lab. + +## Researcher Profile + +{profile} + +## PI Podcast Preferences + +{preferences} + +## Paper + +{paper} + +## Task + +Write a structured research brief following the exact format below. Be specific, direct, and concise — like a knowledgeable postdoc briefing their PI. No filler phrases, no generic connections. + +--- + +*Today's Research Brief — {date}* + +*{paper_title}* +{authors} · {journal} · {year} + +*What they found:* +[2–3 sentences on core findings. Include specific results, effect sizes, or key observations. Be concrete — name specific proteins, pathways, organisms, or quantitative outcomes where relevant.] + +*Key output:* +[1–2 sentences on the tool, method, dataset, code, protocol, or reagent released with the paper. ONLY include this section if the paper releases a concrete artifact. If there is no distinct output, omit this section entirely — do not write "N/A" or a placeholder.] + +*Why this matters for your lab:* +[2–3 sentences connecting the paper specifically to this PI's work. You MUST name at least one specific technique, model system, molecular target, or open question from the researcher's profile. Do not write generic connections like "this is relevant to your proteomics work" — say exactly what aspect and how.] + +*Link:* {paper_url} + +--- + +## Rules + +- Total length: approximately 200–280 words +- Tone: collegial and precise, not promotional +- The "Why this matters" section is the most important — make it specific to this researcher, not a general statement about the field +- If the PI Podcast Preferences section contains specific instructions on tone, focus, or framing, follow them +- If the abstract is all you have, base the brief on the abstract. Do not speculate about full-text content you weren't given. +- Do not add any text before or after the brief itself diff --git a/pyproject.toml b/pyproject.toml index d09fa83..6b780d2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -25,6 +25,7 @@ dependencies = [ "boto3>=1.34.0", "typer>=0.12.0", "rich>=13.7.0", + "mutagen>=1.47.0", ] [project.optional-dependencies] diff --git a/scripts/test_podcast_su.py b/scripts/test_podcast_su.py new file mode 100644 index 0000000..600c6e6 --- /dev/null +++ b/scripts/test_podcast_su.py @@ -0,0 +1,140 @@ +"""One-shot test: run the podcast pipeline for agent 'su' only. + +Outputs: + .labbot-tests/su-summary-.txt — generated text summary + .labbot-tests/su-audio-.mp3 — TTS audio (if MISTRAL_API_KEY is set) + +Usage: + DATABASE_URL=postgresql+asyncpg://copi:copi@localhost:5432/copi \ + python scripts/test_podcast_su.py +""" + +import asyncio +import logging +import os +import shutil +from datetime import date +from pathlib import Path + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s %(levelname)s %(name)s: %(message)s", +) +logger = logging.getLogger(__name__) + +OUTPUT_DIR = Path(".labbot-tests") +AUDIO_DIR = Path("data/podcast_audio") + + +async def run(): + from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine + from sqlalchemy.orm import sessionmaker + + from src.config import get_settings + from src.podcast.pipeline import ( + _generate_summary, + _load_podcast_preferences, + _load_public_profile, + _parse_profile_markdown, + _select_article, + _try_fetch_full_text, + ) + from src.podcast.tts_utils import get_audio_duration_seconds + from src.podcast.pubmed_search import build_queries, fetch_candidates + from src.podcast.state import get_delivered_pmids, record_delivery + + settings = get_settings() + agent_id = "su" + today = date.today() + OUTPUT_DIR.mkdir(exist_ok=True) + + logger.info("=== LabBot Podcast test run for agent: %s ===", agent_id) + + # 1. Load profiles + profile_text = _load_public_profile(agent_id) + if not profile_text: + logger.error("No public profile found for agent: %s", agent_id) + return + logger.info("Loaded profile (%d chars)", len(profile_text)) + + preferences_text = await _load_podcast_preferences(agent_id) + if preferences_text: + logger.info("Loaded podcast preferences (%d chars)", len(preferences_text)) + else: + logger.info("No podcast preferences found for agent: %s", agent_id) + + # 2. Build queries and fetch candidates + profile_dict = _parse_profile_markdown(profile_text) + queries = build_queries(profile_dict) + logger.info("Search queries: %s", queries) + + already_delivered = get_delivered_pmids(agent_id) + logger.info("Already delivered PMIDs: %s", already_delivered) + + candidates = await fetch_candidates( + queries, + already_delivered=already_delivered, + days=settings.podcast_search_window_days, + max_total=settings.podcast_max_candidates, + ) + logger.info("Fetched %d candidates", len(candidates)) + if not candidates: + logger.error("No candidate articles found — aborting") + return + + # 3. LLM article selection + selected, justification = await _select_article(profile_text, candidates, agent_id, preferences_text) + if selected is None: + logger.error("No article selected — aborting") + return + pmid = selected.get("pmid", "") + logger.info("Selected PMID: %s", pmid) + logger.info("Justification: %s", justification) + + # 4. Fetch full text + full_text = await _try_fetch_full_text(pmid) + logger.info("Full text fetched: %s", bool(full_text)) + + # 5. Generate text summary + summary = await _generate_summary(profile_text, selected, full_text, agent_id, preferences_text) + if not summary: + logger.error("Summary generation failed — aborting") + return + + summary_path = OUTPUT_DIR / f"su-summary-{today.isoformat()}.txt" + summary_path.write_text(summary, encoding="utf-8") + logger.info("Summary written to %s", summary_path) + print("\n" + "=" * 60) + print("TEXT SUMMARY") + print("=" * 60) + print(summary) + print("=" * 60 + "\n") + + # 6. Generate audio — dispatch to backend configured by PODCAST_TTS_BACKEND + if settings.podcast_tts_backend == "local": + from src.podcast.local_tts import generate_audio + logger.info("TTS backend: local vLLM-Omni (%s:%s)", settings.local_tts_host, settings.local_tts_port) + else: + from src.podcast.mistral_tts import generate_audio + logger.info("TTS backend: Mistral AI (%s)", settings.mistral_tts_model) + + audio_src = AUDIO_DIR / agent_id / f"{today.isoformat()}.mp3" + audio_ok = await generate_audio(summary, agent_id, audio_src) + + if audio_ok: + audio_dest = OUTPUT_DIR / f"su-audio-{today.isoformat()}.mp3" + shutil.copy2(audio_src, audio_dest) + duration = get_audio_duration_seconds(audio_src) + logger.info("Audio saved to %s (duration: %ss)", audio_dest, duration) + else: + logger.warning("Audio generation failed (backend: %s)", settings.podcast_tts_backend) + + logger.info("=== Test run complete ===") + logger.info(" PMID: %s", pmid) + logger.info(" Summary: %s", summary_path) + if audio_ok: + logger.info(" Audio: %s", audio_dest) + + +if __name__ == "__main__": + asyncio.run(run()) diff --git a/specs/labbot-podcast.md b/specs/labbot-podcast.md new file mode 100644 index 0000000..0cd617a --- /dev/null +++ b/specs/labbot-podcast.md @@ -0,0 +1,353 @@ +# LabBot Podcast Specification + +## Overview + +LabBot Podcast is a daily personalized research briefing service for each PI. It surfaces the single most relevant and impactful recent publication from the scientific literature based on the PI's profile, generates a structured text summary highlighting findings and tools useful to the PI's ongoing work, and produces a short audio episode via Mistral AI TTS. PIs receive the text summary via Slack DM from their lab bot and can subscribe to a per-PI RSS podcast feed to listen to the audio. + +The system runs once per day (alongside GrantBot) and requires no PI interaction to be useful — but PIs can tune it through the same standing-instruction DM mechanism used by the agent system. + +--- + +## Architecture + +### Service Placement + +LabBot Podcast runs as a separate Docker container (`podcast` service), mirroring the GrantBot pattern: +- Long-running scheduler process +- Executes once per calendar day at 9am UTC (1 hour after GrantBot) +- If the container was down at the scheduled time, runs immediately on startup (catch-up) +- State persisted in `data/podcast_state.json` (tracks which articles have been delivered per agent) + +### Dependencies on Existing Systems + +| Existing component | How Podcast uses it | +|---|---| +| `ResearcherProfile` DB model | Source of PI research areas, keywords, techniques, disease areas | +| `profiles/public/{lab}.md` | Supplementary profile text for LLM article selection and summary | +| `src/services/pubmed.py` | Literature search (keyword + MeSH queries) | +| `src/services/llm.py` | Article selection ranking and summary generation (all calls logged to `LlmCallLog`) | +| `AgentRegistry` | Maps agent → PI → Slack bot token for DM delivery | +| Slack bot DM | Text summary delivery to PI | + +### New External Dependency + +**Mistral AI API** — text-to-speech generation. +- Configured via `MISTRAL_API_KEY` environment variable +- Voice selection per agent configured in `data/podcast_voices.json` (agent_id → voice_id); falls back to a default voice if not set +- Audio files stored at `data/podcast_audio/{agent_id}/{YYYY-MM-DD}.mp3` + +--- + +## Daily Pipeline + +Each day, for each active agent in `AgentRegistry`, the pipeline executes the following steps sequentially: + +### Step 1: Build Search Queries + +Construct PubMed search terms from the PI's `ResearcherProfile`: +- Extract top research area keywords +- Extract technique and experimental model terms +- Combine into 2–3 PubMed query strings (e.g., `(proteostasis OR unfolded protein response) AND (neurodegeneration OR proteomics)`) +- Limit to publications from the last 14 days (rolling window ensures coverage across weekend/holiday gaps) +- Cap at 50 candidate abstracts per agent + +### Step 2: Fetch Candidate Abstracts + +Use `src/services/pubmed.py` to execute each query and retrieve PMIDs + abstracts. Deduplicate across queries. Skip any PMID already in `podcast_state.json` for this agent (prevents re-delivering the same article). + +### Step 3: LLM Article Selection (Sonnet) + +Single LLM call (Sonnet) with: +- The PI's full public profile (from `profiles/public/{lab}.md`) +- The list of candidate abstracts (title + abstract text, numbered) +- Prompt: `prompts/podcast-select.md` + +The LLM returns the index of the single best article, along with a one-sentence justification of why it is relevant to this PI's ongoing work. If no article meets a minimum relevance threshold (as instructed in the prompt), it returns `null` and the pipeline skips delivery for that agent today. + +### Step 4: Generate Text Summary (Opus) + +One LLM call (Opus) with: +- The PI's full public profile +- The selected article's title, abstract, and full text (fetched via `retrieve_full_text` if available in PMC, otherwise abstract only) +- Prompt: `prompts/podcast-summarize.md` + +Output is a structured text summary (see format below). This is the content delivered to the PI via Slack and used as the TTS input. + +### Step 5: Generate Audio (Mistral AI) + +Pass the text summary to the Mistral AI TTS API: +- Voice: agent-specific or default +- Model: configurable via `MISTRAL_TTS_MODEL` +- Output: MP3 file saved to `data/podcast_audio/{agent_id}/{YYYY-MM-DD}.mp3` +- If Mistral TTS call fails, continue — Slack text delivery still proceeds + +### Step 6: Serve Audio via RSS + +The podcast RSS feed for each agent is served by the FastAPI web app. New episodes are registered in `data/podcast_state.json` with the audio file path, episode title, pub date, and duration (parsed from the MP3 file using `mutagen`). + +### Step 7: Deliver via Slack DM + +Send the text summary as a DM from the agent's Slack bot to its PI, using the same `AgentRegistry.slack_bot_token` used by the agent simulation. Format described below. + +### Step 8: Update State + +Append the delivered PMID and episode metadata to `data/podcast_state.json` for this agent. This prevents re-delivery and powers the RSS feed. + +--- + +## Text Summary Format + +The Opus-generated summary follows a consistent structure. The prompt enforces this layout: + +``` +*Today's Research Brief — {Date}* + +*{Paper Title}* +{Authors} · {Journal} · {Year} + +*What they found:* +2–3 sentences on the core findings — specific results, effect sizes, or observations. + +*Key output:* +1–2 sentences on any tool, method, dataset, or reagent released with the paper (if applicable). Omit this section if the paper has no distinct output. + +*Why this matters for your lab:* +2–3 sentences connecting the paper's findings and outputs specifically to the PI's ongoing research areas, techniques, or open questions. Ground this in the PI's profile — name specific techniques, model systems, or questions from their work. + +*PubMed:* https://pubmed.ncbi.nlm.nih.gov/{PMID}/ +``` + +The Slack DM appends a line at the bottom: +> _Listen to the audio version: {rss_feed_url}_ + +--- + +## RSS Podcast Feed + +### Endpoint + +`GET /podcast/{agent_id}/feed.xml` + +Served by FastAPI from `src/routers/podcast.py`. No authentication required — the URL is obscure-by-default (agent_id is a UUID), not secret. + +### Feed Structure + +Standard RSS 2.0 with iTunes podcast extensions: + +```xml + + + {PI Name} — LabBot Research Briefings + Daily personalized research summaries for {PI Name} at Scripps Research + {base_url}/podcast/{agent_id}/feed.xml + {PI Name} + + + {Paper Title} — {Date} + {text summary} + + {RFC 822 date} + {agent_id}-{YYYY-MM-DD} + {duration} + + ... + + +``` + +### Audio File Serving + +`GET /podcast/{agent_id}/audio/{date}.mp3` + +Served directly by FastAPI from `data/podcast_audio/{agent_id}/`. Files are read from disk and streamed with `Content-Type: audio/mpeg`. + +--- + +## LLM Prompt Files + +Two new prompt files in `prompts/`: + +### `prompts/podcast-select.md` + +Instructs the LLM to act as a literature triage assistant for a specific PI. It receives: +- The PI's public profile (research areas, techniques, open questions, unique capabilities) +- Numbered list of candidate abstracts (title + abstract) + +It must return: +- The number of the most relevant article, or `null` if none clears the relevance bar +- A one-sentence justification referencing a specific aspect of the PI's profile + +Key instructions in the prompt: +- Relevance is defined as: the paper's findings or outputs could plausibly accelerate or inform a specific aspect of the PI's ongoing work +- Recency alone is not sufficient — the connection must be specific +- Prefer papers that release a tool, method, dataset, or reagent alongside findings +- Do not pick review articles or editorials + +### `prompts/podcast-summarize.md` + +Instructs the LLM to act as a science communicator writing for a specific PI. It receives: +- The PI's public profile +- Full paper text (or abstract if full text unavailable) + +It must produce the structured summary described above. Key instructions: +- The "Why this matters for your lab" section must name specific techniques, model systems, or open questions from the PI's profile — no generic connections +- Tone is like a knowledgeable postdoc briefing their PI: specific, direct, no filler +- The "Key output" section is only included if the paper releases a concrete artifact (tool, code, dataset, method, reagent); skip it otherwise +- Target length: ~250 words total + +--- + +## Data Model + +### New Table: `PodcastEpisode` + +```python +class PodcastEpisode(Base): + __tablename__ = "podcast_episodes" + + id: Mapped[uuid.UUID] = mapped_column(primary_key=True, default=uuid.uuid4) + agent_id: Mapped[str] = mapped_column(String, nullable=False, index=True) + episode_date: Mapped[date] = mapped_column(Date, nullable=False) + pmid: Mapped[str] = mapped_column(String, nullable=False) + paper_title: Mapped[str] = mapped_column(String, nullable=False) + paper_authors: Mapped[str] = mapped_column(String, nullable=False) + paper_journal: Mapped[str] = mapped_column(String, nullable=False) + paper_year: Mapped[int] = mapped_column(Integer, nullable=False) + text_summary: Mapped[str] = mapped_column(Text, nullable=False) + audio_file_path: Mapped[Optional[str]] = mapped_column(String, nullable=True) # null if TTS failed + audio_duration_seconds: Mapped[Optional[int]] = mapped_column(Integer, nullable=True) + slack_delivered: Mapped[bool] = mapped_column(Boolean, default=False) + selection_justification: Mapped[str] = mapped_column(Text, nullable=False) + created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now()) + + __table_args__ = ( + UniqueConstraint("agent_id", "episode_date", name="uq_podcast_agent_date"), + ) +``` + +The `data/podcast_state.json` file serves as a lightweight startup cache (to avoid a DB query to get delivered PMIDs during query construction), but the DB is the authoritative record for RSS feed generation and admin visibility. + +### Alembic Migration + +Add migration `0005_add_podcast_episodes.py` creating the `podcast_episodes` table. + +--- + +## Configuration + +New environment variables: + +| Variable | Required | Description | +|---|---|---| +| `MISTRAL_API_KEY` | Yes (for audio) | Mistral AI API key | +| `MISTRAL_TTS_MODEL` | No | TTS model ID (default: `mistral-tts-latest`) | +| `MISTRAL_TTS_DEFAULT_VOICE` | No | Default voice when no per-agent override exists | +| `PODCAST_BASE_URL` | Yes | Public base URL for RSS enclosure links (e.g., `https://copi.science`) | +| `PODCAST_SEARCH_WINDOW_DAYS` | No | Rolling search window in days (default: `14`) | +| `PODCAST_MAX_CANDIDATES` | No | Max PubMed abstracts per agent per day (default: `50`) | + +Per-agent voice overrides: `data/podcast_voices.json` +```json +{ + "su": "voice_id_abc123", + "wiseman": "voice_id_def456" +} +``` + +--- + +## Docker Service + +Add `podcast` service to `docker-compose.yml` and `docker-compose.prod.yml`: + +```yaml +podcast: + build: . + command: python -m src.podcast.main + env_file: .env + volumes: + - ./data:/app/data + depends_on: + - postgres + profiles: + - podcast +``` + +Run with: `docker compose --profile podcast up -d podcast` + +--- + +## Module Structure + +``` +src/podcast/ +├── main.py # Scheduler entry point (APScheduler, same pattern as grantbot.py) +├── pipeline.py # Per-agent pipeline (steps 1–8 above) +├── pubmed_search.py # Query builder from ResearcherProfile +├── mistral_tts.py # Mistral AI TTS client wrapper +├── rss.py # RSS feed builder (reads from DB) +└── state.py # podcast_state.json read/write helpers + +src/routers/podcast.py # FastAPI routes: /podcast/{agent_id}/feed.xml, /podcast/{agent_id}/audio/{date}.mp3 +``` + +The scheduler in `src/podcast/main.py` follows the same catch-up-on-startup pattern as `src/agent/grantbot.py`: +1. On startup, check `data/podcast_state.json` for last run timestamp +2. If last run was before today's 9am UTC, run immediately +3. Schedule next run at 9am UTC + +--- + +## Admin Dashboard Integration + +Add a **Podcast** tab to the existing admin dashboard (`src/routers/admin.py` + `templates/admin.html`) showing: +- Table of recent episodes: agent, date, paper title, PMID, Slack delivered (yes/no), audio generated (yes/no) +- Link to each agent's RSS feed +- LLM call counts and token usage for the podcast pipeline (pulled from `LlmCallLog` filtered by `source = "podcast"`) + +The LLM calls from the podcast pipeline should set a `source` tag in `LlmCallLog` (add a `source` column via migration if not already present, or use the existing `extra_metadata` JSONB field). + +--- + +## PI Customization + +PIs can adjust podcast behavior through standing instructions to their lab bot (same DM mechanism as the agent system — see `pi-interaction.md`). The podcast pipeline reads the private profile when building the selection prompt. + +Examples of effective standing instructions: +- "For my daily podcast, focus only on papers that release a new tool or dataset — I don't need summaries of pure wet-lab findings" +- "Prioritize papers from computational biology journals for the podcast" +- "Skip anything about C. elegans — we're not pursuing that direction anymore" + +The bot's private profile rewrite (via `prompts/pi-profile-rewrite.md`) should include a `## Podcast Preferences` section that the podcast pipeline reads when constructing the selection and summarization prompts. + +--- + +## Rollout Phases + +### Phase 1: Text-only delivery +- PubMed search, LLM selection, Opus summarization +- Slack DM delivery +- `PodcastEpisode` DB table and admin visibility +- No audio, no RSS + +### Phase 2: Audio + RSS +- Mistral AI TTS integration +- Audio file storage and streaming endpoint +- RSS feed generation and `/podcast/{agent_id}/feed.xml` endpoint +- Per-agent voice configuration + +### Phase 3: PI customization surface +- Podcast preferences section in private profile +- Pipeline reads preferences when building prompts +- Admin dashboard podcast tab with LLM usage metrics + +--- + +## Out of Scope + +- Real-time or on-demand article requests (this is a daily scheduled briefing only) +- Multi-article episodes (one article per day, selected by the LLM as the single most relevant) +- Full-text audio of the paper itself (summary only) +- Public or shared RSS feeds (each feed is per-PI, addressed by UUID) +- Push notifications or mobile app integration +- Preprint servers (bioRxiv, medRxiv) — PubMed only for Phase 1; preprints are a Phase 2+ addition diff --git a/src/config.py b/src/config.py index 590f4d3..93430b5 100644 --- a/src/config.py +++ b/src/config.py @@ -76,6 +76,26 @@ class Settings(BaseSettings): llm_agent_model_opus: str = "claude-opus-4-6" llm_agent_model_sonnet: str = "claude-sonnet-4-6" + # Mistral AI (podcast TTS) + mistral_api_key: str = "" + mistral_tts_model: str = "voxtral-mini-tts-latest" + mistral_tts_default_voice: str = "" + + # Podcast TTS backend: "mistral" (default) or "local" (vLLM-Omni) + podcast_tts_backend: str = "mistral" + + # Local vLLM-Omni TTS server + local_tts_host: str = "127.0.0.1" + local_tts_port: int = 8010 + local_tts_model: str = "Qwen/Qwen2-Audio-7B-Instruct" + local_tts_voice: str = "default" + + # Podcast + podcast_base_url: str = "" # e.g. https://copi.science — for RSS enclosure URLs + podcast_search_window_days: int = 14 + podcast_max_candidates: int = 50 + podcast_normalize_audio: bool = False # set true to run ffmpeg loudnorm after TTS + # Worker worker_poll_interval: int = 5 # seconds diff --git a/src/main.py b/src/main.py index 30f0700..f6a2070 100644 --- a/src/main.py +++ b/src/main.py @@ -13,7 +13,7 @@ from src.config import get_settings from src.database import get_session_factory -from src.routers import admin, agent_page, auth, invite, onboarding, profile +from src.routers import admin, agent_page, auth, invite, onboarding, podcast, profile from src.routers import settings as settings_router logging.basicConfig( @@ -115,6 +115,7 @@ def create_app() -> FastAPI: application.include_router(admin.router, prefix="/admin", tags=["admin"]) application.include_router(invite.router, tags=["invite"]) application.include_router(settings_router.router, prefix="/settings", tags=["settings"]) + application.include_router(podcast.router, prefix="/podcast", tags=["podcast"]) @application.get("/") async def root(request: Request): diff --git a/src/models/__init__.py b/src/models/__init__.py index 2914431..5721460 100644 --- a/src/models/__init__.py +++ b/src/models/__init__.py @@ -8,6 +8,7 @@ from src.models.delegate import AgentDelegate, DelegateInvitation from src.models.email_notification import EmailEngagementTracker, EmailNotification from src.models.job import Job +from src.models.podcast import PodcastEpisode from src.models.profile_revision import ProfileRevision from src.models.profile import ResearcherProfile from src.models.publication import Publication @@ -30,4 +31,5 @@ "EmailNotification", "EmailEngagementTracker", "ProfileRevision", + "PodcastEpisode", ] diff --git a/src/models/podcast.py b/src/models/podcast.py new file mode 100644 index 0000000..2a1f1bb --- /dev/null +++ b/src/models/podcast.py @@ -0,0 +1,40 @@ +"""PodcastEpisode model.""" + +import uuid +from datetime import date, datetime + +from sqlalchemy import Boolean, Date, DateTime, Integer, String, Text, UniqueConstraint, func +from sqlalchemy.dialects.postgresql import UUID +from sqlalchemy.orm import Mapped, mapped_column + +from src.database import Base + + +class PodcastEpisode(Base): + __tablename__ = "podcast_episodes" + + id: Mapped[uuid.UUID] = mapped_column( + UUID(as_uuid=True), primary_key=True, default=uuid.uuid4 + ) + agent_id: Mapped[str] = mapped_column(String(50), nullable=False, index=True) + episode_date: Mapped[date] = mapped_column(Date, nullable=False) + pmid: Mapped[str] = mapped_column(String(100), nullable=False) + paper_title: Mapped[str] = mapped_column(String(500), nullable=False) + paper_authors: Mapped[str] = mapped_column(String(500), nullable=False) + paper_journal: Mapped[str] = mapped_column(String(255), nullable=False) + paper_year: Mapped[int] = mapped_column(Integer, nullable=False) + text_summary: Mapped[str] = mapped_column(Text, nullable=False) + audio_file_path: Mapped[str | None] = mapped_column(String(500), nullable=True) + audio_duration_seconds: Mapped[int | None] = mapped_column(Integer, nullable=True) + slack_delivered: Mapped[bool] = mapped_column(Boolean, default=False, nullable=False) + selection_justification: Mapped[str] = mapped_column(Text, nullable=False) + created_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), server_default=func.now(), nullable=False + ) + + __table_args__ = ( + UniqueConstraint("agent_id", "episode_date", name="uq_podcast_agent_date"), + ) + + def __repr__(self) -> str: + return f"" diff --git a/src/podcast/__init__.py b/src/podcast/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/podcast/local_tts.py b/src/podcast/local_tts.py new file mode 100644 index 0000000..185d4a3 --- /dev/null +++ b/src/podcast/local_tts.py @@ -0,0 +1,94 @@ +"""Local TTS backend using a vLLM-Omni server. + +vLLM-Omni exposes an OpenAI-compatible /v1/audio/speech endpoint that accepts +the same JSON payload as OpenAI TTS and returns raw audio bytes directly. + +Start a vLLM-Omni server with, e.g.: + vllm serve Qwen/Qwen2-Audio-7B-Instruct --port 8010 + +Then set in .env: + PODCAST_TTS_BACKEND=local + LOCAL_TTS_HOST=127.0.0.1 + LOCAL_TTS_PORT=8010 + LOCAL_TTS_MODEL=Qwen/Qwen2-Audio-7B-Instruct + LOCAL_TTS_VOICE=default +""" + +import json +import logging +from pathlib import Path + +import httpx + +from src.config import get_settings +from src.podcast.tts_utils import get_audio_duration_seconds, normalize_audio, strip_markdown + +logger = logging.getLogger(__name__) + +VOICES_FILE = Path("data/podcast_voices.json") + +__all__ = ["generate_audio", "get_audio_duration_seconds"] + + +def _get_local_tts_url() -> str: + settings = get_settings() + return f"http://{settings.local_tts_host}:{settings.local_tts_port}/v1/audio/speech" + + +def get_voice(agent_id: str) -> str: + """Return the configured voice for an agent, falling back to default.""" + settings = get_settings() + if VOICES_FILE.exists(): + try: + voices = json.loads(VOICES_FILE.read_text(encoding="utf-8")) + if agent_id in voices: + return voices[agent_id] + except Exception as exc: + logger.warning("Failed to load podcast_voices.json: %s", exc) + return settings.local_tts_voice or "default" + + +async def generate_audio(text: str, agent_id: str, output_path: Path) -> bool: + """Generate TTS audio via a local vLLM-Omni server and save to output_path. + + The server must expose an OpenAI-compatible /v1/audio/speech endpoint + that returns raw audio bytes. + + Returns True on success, False on failure. + """ + settings = get_settings() + url = _get_local_tts_url() + voice = get_voice(agent_id) + clean_text = strip_markdown(text) + + payload = { + "model": settings.local_tts_model, + "input": clean_text, + "voice": voice, + "response_format": "mp3", + } + headers = {"Content-Type": "application/json"} + + logger.info("Local TTS request to %s (model=%s, voice=%s)", url, settings.local_tts_model, voice) + + try: + async with httpx.AsyncClient(timeout=300) as client: + resp = await client.post(url, json=payload, headers=headers) + if not resp.is_success: + logger.error("Local TTS error %s: %s", resp.status_code, resp.text) + resp.raise_for_status() + + output_path.parent.mkdir(parents=True, exist_ok=True) + output_path.write_bytes(resp.content) + logger.info("Audio saved to %s (%d bytes)", output_path, len(resp.content)) + if settings.podcast_normalize_audio: + normalize_audio(output_path) + return True + except httpx.ConnectError: + logger.error( + "Could not connect to local TTS server at %s — is vLLM-Omni running?", url + ) + return False + except Exception as exc: + logger.error("Local TTS failed for agent %s: %s", agent_id, exc) + return False diff --git a/src/podcast/main.py b/src/podcast/main.py new file mode 100644 index 0000000..06cacee --- /dev/null +++ b/src/podcast/main.py @@ -0,0 +1,161 @@ +"""LabBot Podcast — daily personalized research briefings for each PI. + +Usage: + python -m src.podcast.main # run once immediately + python -m src.podcast.main scheduler # long-running daily scheduler + +The scheduler runs at 9am UTC daily (1 hour after GrantBot). +""" + +import asyncio +import logging +import time +from datetime import datetime, timezone + +import typer + +from src.config import get_settings + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s %(levelname)s %(name)s: %(message)s", +) +logger = logging.getLogger(__name__) + +app = typer.Typer(invoke_without_command=True) + +RUN_HOUR_UTC = 9 # run at 9am UTC + + +async def run_podcast(dry_run: bool = False) -> list[str]: + """Run the podcast pipeline for all active agents. + + Returns list of agent_ids that produced episodes. + """ + from sqlalchemy import select + + from src.database import get_session_factory + from src.models.agent_registry import AgentRegistry + from src.podcast.pipeline import run_pipeline_for_agent + + settings = get_settings() + slack_tokens = settings.get_slack_tokens() + + session_factory = get_session_factory() + produced: list[str] = [] + + async with session_factory() as db: + result = await db.execute( + select(AgentRegistry).where(AgentRegistry.status == "active") + ) + agents = result.scalars().all() + + if not agents: + logger.warning("No active agents found in registry — trying all known agents") + # Fall back to config-based agent list when DB isn't seeded + for agent_id, tokens in slack_tokens.items(): + bot_token = tokens.get("bot", "") + if not bot_token or bot_token.startswith("xoxb-placeholder") or not bot_token: + continue + if dry_run: + logger.info("DRY RUN — would run pipeline for agent: %s", agent_id) + continue + try: + ok = await run_pipeline_for_agent( + agent_id=agent_id, + bot_name=f"{agent_id.capitalize()}Bot", + pi_name=agent_id.capitalize(), + bot_token=bot_token, + slack_user_id=None, + db_session=db, + ) + if ok: + produced.append(agent_id) + except Exception as exc: + logger.error("Pipeline failed for agent %s: %s", agent_id, exc, exc_info=True) + await db.commit() + else: + for agent in agents: + agent_id = agent.agent_id + tokens = slack_tokens.get(agent_id, {}) + bot_token = agent.slack_bot_token or tokens.get("bot", "") + + if dry_run: + logger.info( + "DRY RUN — would run pipeline for agent: %s (%s)", agent_id, agent.pi_name + ) + continue + + try: + ok = await run_pipeline_for_agent( + agent_id=agent_id, + bot_name=agent.bot_name, + pi_name=agent.pi_name, + bot_token=bot_token, + slack_user_id=agent.slack_user_id, + db_session=db, + ) + if ok: + produced.append(agent_id) + except Exception as exc: + logger.error( + "Pipeline failed for agent %s: %s", agent_id, exc, exc_info=True + ) + + await db.commit() + + logger.info("Podcast run complete: %d episodes produced", len(produced)) + return produced + + +@app.command() +def main( + dry_run: bool = typer.Option(False, "--dry-run", help="Preview without posting or generating audio"), +): + """Run the podcast pipeline once for all active agents.""" + from src.podcast.state import mark_run_complete + + results = asyncio.run(run_podcast(dry_run=dry_run)) + if results: + typer.echo(f"\nProduced {len(results)} episodes:") + for aid in results: + typer.echo(f" {aid}") + else: + typer.echo("No episodes produced.") + if not dry_run: + mark_run_complete() + + +@app.command("scheduler") +def scheduler( + run_hour: int = typer.Option(RUN_HOUR_UTC, "--run-hour", help="UTC hour to run daily (0-23)"), + check_interval: int = typer.Option(900, "--check-interval", help="Seconds between schedule checks"), +): + """Long-running scheduler: runs podcast pipeline once per calendar day. + + If the container starts after the scheduled hour, runs immediately to catch up. + """ + from src.podcast.state import mark_run_complete, should_run_today + + logger.info( + "Podcast scheduler started (run_hour=%d UTC, check every %ds)", run_hour, check_interval + ) + + while True: + now = datetime.now(timezone.utc) + if should_run_today() and now.hour >= run_hour: + logger.info("Running daily podcast pipeline...") + try: + results = asyncio.run(run_podcast()) + mark_run_complete() + logger.info("Daily run complete: %d episodes", len(results)) + except Exception as exc: + logger.error("Daily run failed: %s", exc, exc_info=True) + else: + logger.debug("No run needed (last run: %s, hour: %d)", "?", now.hour) + + time.sleep(check_interval) + + +if __name__ == "__main__": + app() diff --git a/src/podcast/mistral_tts.py b/src/podcast/mistral_tts.py new file mode 100644 index 0000000..eb0c1ec --- /dev/null +++ b/src/podcast/mistral_tts.py @@ -0,0 +1,80 @@ +"""Mistral AI TTS client wrapper.""" + +import base64 +import json +import logging +from pathlib import Path + +import httpx + +from src.config import get_settings +from src.podcast.tts_utils import get_audio_duration_seconds, normalize_audio, strip_markdown + +logger = logging.getLogger(__name__) + +VOICES_FILE = Path("data/podcast_voices.json") +MISTRAL_TTS_URL = "https://api.mistral.ai/v1/audio/speech" + +__all__ = ["generate_audio", "get_audio_duration_seconds"] + + +def get_voice(agent_id: str) -> str: + """Return the configured voice for an agent, falling back to default.""" + settings = get_settings() + if VOICES_FILE.exists(): + try: + voices = json.loads(VOICES_FILE.read_text(encoding="utf-8")) + if agent_id in voices: + return voices[agent_id] + except Exception as exc: + logger.warning("Failed to load podcast_voices.json: %s", exc) + return settings.mistral_tts_default_voice + + +async def generate_audio(text: str, agent_id: str, output_path: Path) -> bool: + """Generate TTS audio via Mistral AI and save to output_path. + + Returns True on success, False on failure. + """ + settings = get_settings() + if not settings.mistral_api_key: + logger.warning("MISTRAL_API_KEY not set — skipping audio generation") + return False + + voice = get_voice(agent_id) + clean_text = strip_markdown(text) + payload = { + "model": settings.mistral_tts_model, + "input": clean_text, + "voice": voice, + } + headers = { + "Authorization": f"Bearer {settings.mistral_api_key}", + "Content-Type": "application/json", + } + + try: + async with httpx.AsyncClient(timeout=120) as client: + resp = await client.post(MISTRAL_TTS_URL, json=payload, headers=headers) + if not resp.is_success: + logger.error("Mistral TTS API error %s: %s", resp.status_code, resp.text) + resp.raise_for_status() + + # Mistral returns {"audio_data": ""} + content_type = resp.headers.get("content-type", "") + if "json" in content_type or resp.content[:1] == b"{": + audio_bytes = base64.b64decode(resp.json()["audio_data"]) + else: + audio_bytes = resp.content + + output_path.parent.mkdir(parents=True, exist_ok=True) + output_path.write_bytes(audio_bytes) + logger.info("Audio saved to %s (%d bytes)", output_path, len(audio_bytes)) + if settings.podcast_normalize_audio: + normalize_audio(output_path) + return True + except Exception as exc: + logger.error("Mistral TTS failed for agent %s: %s", agent_id, exc) + return False + + diff --git a/src/podcast/pipeline.py b/src/podcast/pipeline.py new file mode 100644 index 0000000..8db8cc2 --- /dev/null +++ b/src/podcast/pipeline.py @@ -0,0 +1,411 @@ +"""Per-agent podcast pipeline: search → select → summarize → TTS → Slack DM → DB.""" + +import json +import logging +from datetime import date, datetime, timezone +from pathlib import Path +from typing import Any + +from src.config import get_settings + +logger = logging.getLogger(__name__) + +PROFILES_DIR = Path("profiles/public") +AUDIO_DIR = Path("data/podcast_audio") + + +def _load_public_profile(agent_id: str) -> str: + """Load the public profile markdown for an agent.""" + path = PROFILES_DIR / f"{agent_id}.md" + if path.exists(): + return path.read_text(encoding="utf-8") + return "" + + +async def _load_podcast_preferences(agent_id: str) -> str: + """Load the Podcast Preferences section from the agent's latest private ProfileRevision in the DB.""" + try: + from sqlalchemy import desc, select + + from src.database import get_session_factory + from src.models.agent_registry import AgentRegistry + from src.models.profile_revision import ProfileRevision + + session_factory = get_session_factory() + async with session_factory() as db: + reg_result = await db.execute( + select(AgentRegistry.id).where(AgentRegistry.agent_id == agent_id) + ) + reg_row = reg_result.first() + if not reg_row: + return "" + + rev_result = await db.execute( + select(ProfileRevision.content) + .where( + ProfileRevision.agent_registry_id == reg_row[0], + ProfileRevision.profile_type == "private", + ) + .order_by(desc(ProfileRevision.created_at)) + .limit(1) + ) + rev_row = rev_result.first() + if not rev_row: + return "" + + return _extract_section_text(rev_row[0], "Podcast Preferences") + except Exception as exc: + logger.warning("Could not load podcast preferences for %s: %s", agent_id, exc) + return "" + + +def _format_candidates_for_prompt(records: list[dict[str, Any]]) -> str: + """Format PubMed records as a numbered list for the selection prompt.""" + lines = [] + for i, rec in enumerate(records, 1): + title = rec.get("title", "No title") + abstract = rec.get("abstract", "No abstract")[:600] + journal = rec.get("journal") or "Unknown journal" + year = rec.get("year") or "Unknown year" + lines.append(f"{i}. [{journal}, {year}] {title}\n {abstract}") + return "\n\n".join(lines) + + +async def _select_article( + profile_text: str, + candidates: list[dict[str, Any]], + agent_id: str, + preferences_text: str = "", +) -> tuple[dict[str, Any], str] | tuple[None, str]: + """Use Sonnet to pick the most relevant article. + + Returns (selected_record, justification) or (None, reason). + """ + from src.services.llm import generate_agent_response + + settings = get_settings() + + prompt_path = Path("prompts/podcast-select.md") + template = prompt_path.read_text(encoding="utf-8") + candidates_text = _format_candidates_for_prompt(candidates) + prompt = ( + template + .replace("{profile}", profile_text) + .replace("{candidates}", candidates_text) + .replace("{preferences}", preferences_text or "No specific preferences set.") + ) + + try: + response = await generate_agent_response( + system_prompt=prompt, + messages=[{"role": "user", "content": "Select the most relevant article."}], + model=settings.llm_agent_model_sonnet, + max_tokens=300, + log_meta={"agent_id": agent_id, "phase": "podcast_select"}, + ) + + # Extract JSON + text = response.strip() + start = text.find("{") + end = text.rfind("}") + 1 + if start >= 0 and end > start: + data = json.loads(text[start:end]) + else: + raise ValueError("No JSON object found in response") + + idx = data.get("index") + justification = data.get("justification", "") + + if idx is None: + logger.info("Agent %s: no relevant article found (%s)", agent_id, justification) + return None, justification + + idx = int(idx) - 1 # convert 1-based to 0-based + if 0 <= idx < len(candidates): + return candidates[idx], justification + else: + logger.warning("Agent %s: LLM returned out-of-range index %d", agent_id, idx + 1) + return None, "Index out of range" + + except Exception as exc: + logger.error("Article selection failed for agent %s: %s", agent_id, exc) + return None, str(exc) + + +async def _generate_summary( + profile_text: str, + record: dict[str, Any], + full_text: str | None, + agent_id: str, + preferences_text: str = "", +) -> str | None: + """Use Opus to generate the structured text summary.""" + from src.services.llm import generate_agent_response + + settings = get_settings() + + prompt_path = Path("prompts/podcast-summarize.md") + template = prompt_path.read_text(encoding="utf-8") + + # Build paper section + authors_list = record.get("authors") or [] + if not authors_list: + authors_str = "Authors not available" + elif len(authors_list) > 3: + authors_str = ", ".join(authors_list[:3]) + " et al." + else: + authors_str = ", ".join(authors_list) + + pmid = record.get("pmid", "") + # Preprint records carry a canonical URL; PubMed records use the standard URL + paper_url = record.get("url") or f"https://pubmed.ncbi.nlm.nih.gov/{pmid}/" + + paper_section = f"""Title: {record.get('title', '')} +Authors: {authors_str} +Journal: {record.get('journal') or 'Unknown'} +Year: {record.get('year') or 'Unknown'} +URL: {paper_url} + +Abstract: +{record.get('abstract', '')}""" + + if full_text: + paper_section += f"\n\nFull text excerpt:\n{full_text[:3000]}" + + today_str = datetime.now(timezone.utc).strftime("%B %d, %Y") + + prompt = ( + template + .replace("{profile}", profile_text) + .replace("{paper}", paper_section) + .replace("{date}", today_str) + .replace("{paper_title}", record.get("title", "")) + .replace("{authors}", authors_str) + .replace("{journal}", record.get("journal") or "Unknown") + .replace("{year}", str(record.get("year") or "")) + .replace("{paper_url}", paper_url) + .replace("{preferences}", preferences_text or "No specific preferences set.") + ) + + try: + response = await generate_agent_response( + system_prompt=prompt, + messages=[{"role": "user", "content": "Write the research brief."}], + model=settings.llm_agent_model_opus, + max_tokens=600, + log_meta={"agent_id": agent_id, "phase": "podcast_summarize"}, + ) + return response.strip() + except Exception as exc: + logger.error("Summary generation failed for agent %s: %s", agent_id, exc) + return None + + +async def _try_fetch_full_text(pmid: str) -> str | None: + """Attempt to fetch full text from PMC; return None on failure or for non-PubMed IDs.""" + # Preprint IDs are prefixed (e.g. "biorxiv:...", "arxiv:...") — PMC doesn't have them + if not pmid.isdigit(): + return None + try: + from src.services.pubmed import fetch_full_text + result = await fetch_full_text(pmid) + if "error" in result: + return None + return result.get("methods") + except Exception: + return None + + +async def _deliver_slack_dm( + agent_id: str, + bot_token: str, + slack_user_id: str, + summary_text: str, + rss_url: str, +) -> bool: + """Send the text summary as a Slack DM from the agent bot to the PI.""" + if not bot_token or bot_token.startswith("xoxb-placeholder"): + logger.info("Agent %s: no valid Slack token, skipping DM delivery", agent_id) + return False + if not slack_user_id: + logger.info("Agent %s: no slack_user_id configured, skipping DM delivery", agent_id) + return False + + try: + from slack_sdk import WebClient + client = WebClient(token=bot_token) + + # Open DM channel + dm_resp = client.conversations_open(users=[slack_user_id]) + channel_id = dm_resp["channel"]["id"] + + # Append RSS link + full_message = summary_text + if rss_url: + full_message += f"\n\n_Listen to the audio version: {rss_url}_" + + client.chat_postMessage(channel=channel_id, text=full_message) + logger.info("Agent %s: Slack DM delivered to %s", agent_id, slack_user_id) + return True + except Exception as exc: + logger.error("Agent %s: Slack DM failed: %s", agent_id, exc) + return False + + +async def run_pipeline_for_agent( + agent_id: str, + bot_name: str, + pi_name: str, + bot_token: str, + slack_user_id: str | None, + db_session, +) -> bool: + """Run the full podcast pipeline for one agent. + + Returns True if an episode was produced and recorded. + """ + from src.models.podcast import PodcastEpisode + from src.podcast.pubmed_search import build_queries, fetch_candidates + from src.podcast.tts_utils import get_audio_duration_seconds + from src.podcast.state import get_delivered_pmids, record_delivery + + settings = get_settings() + today = date.today() + + logger.info("Starting podcast pipeline for agent: %s (%s)", agent_id, pi_name) + + # Step 1: Load profiles + profile_text = _load_public_profile(agent_id) + if not profile_text: + logger.warning("Agent %s: no public profile found, skipping", agent_id) + return False + + preferences_text = await _load_podcast_preferences(agent_id) + if preferences_text: + logger.info("Agent %s: loaded podcast preferences (%d chars)", agent_id, len(preferences_text)) + + # Build a minimal profile dict from markdown for query building + profile_dict = _parse_profile_markdown(profile_text) + + # Step 2: Build queries and fetch candidates + queries = build_queries(profile_dict) + if not queries: + logger.warning("Agent %s: could not build search queries", agent_id) + return False + + already_delivered = get_delivered_pmids(agent_id) + candidates = await fetch_candidates( + queries, + already_delivered=already_delivered, + days=settings.podcast_search_window_days, + max_total=settings.podcast_max_candidates, + ) + + if not candidates: + logger.info("Agent %s: no new candidate articles found", agent_id) + return False + + # Step 3: LLM article selection + selected, justification = await _select_article(profile_text, candidates, agent_id, preferences_text) + if selected is None: + logger.info("Agent %s: no article selected", agent_id) + return False + + pmid = selected.get("pmid", "") + logger.info("Agent %s: selected PMID %s", agent_id, pmid) + + # Step 4: Try to fetch full text + full_text = await _try_fetch_full_text(pmid) + + # Step 5: Generate text summary + summary = await _generate_summary(profile_text, selected, full_text, agent_id, preferences_text) + if not summary: + logger.error("Agent %s: summary generation failed", agent_id) + return False + + # Step 6: Generate audio (backend selected by PODCAST_TTS_BACKEND) + audio_path = AUDIO_DIR / agent_id / f"{today.isoformat()}.mp3" + if settings.podcast_tts_backend == "local": + from src.podcast.local_tts import generate_audio + logger.info("Agent %s: using local vLLM-Omni TTS backend", agent_id) + else: + from src.podcast.mistral_tts import generate_audio + logger.info("Agent %s: using Mistral AI TTS backend", agent_id) + audio_ok = await generate_audio(summary, agent_id, audio_path) + audio_file_path = str(audio_path) if audio_ok else None + audio_duration = None + if audio_ok: + audio_duration = get_audio_duration_seconds(audio_path) + + # Step 7: Build RSS URL for DM + base_url = settings.podcast_base_url or settings.base_url + rss_url = f"{base_url}/podcast/{agent_id}/feed.xml" + + # Step 8: Deliver Slack DM + slack_ok = await _deliver_slack_dm( + agent_id=agent_id, + bot_token=bot_token, + slack_user_id=slack_user_id or "", + summary_text=summary, + rss_url=rss_url, + ) + + # Extract metadata from selected record + authors_list = selected.get("authors") or [] + if len(authors_list) > 3: + authors_str = ", ".join(authors_list[:3]) + " et al." + else: + authors_str = ", ".join(authors_list) if authors_list else "Unknown" + + # Step 9: Persist to DB + episode = PodcastEpisode( + agent_id=agent_id, + episode_date=today, + pmid=pmid, + paper_title=selected.get("title") or "", + paper_authors=authors_str, + paper_journal=selected.get("journal") or "", + paper_year=selected.get("year") or 0, + text_summary=summary, + audio_file_path=audio_file_path, + audio_duration_seconds=audio_duration, + slack_delivered=slack_ok, + selection_justification=justification, + ) + db_session.add(episode) + await db_session.flush() + + # Step 10: Update state + record_delivery(agent_id, pmid) + + logger.info( + "Agent %s: episode complete (audio=%s, slack=%s)", agent_id, audio_ok, slack_ok + ) + return True + + +def _parse_profile_markdown(text: str) -> dict[str, Any]: + """Extract structured fields from public profile markdown for query building.""" + from src.agent.grantbot import _extract_list_section + return { + "disease_areas": _extract_list_section(text, "Disease Areas"), + "techniques": _extract_list_section(text, "Key Methods and Technologies"), + "experimental_models": _extract_list_section(text, "Model Systems"), + "keywords": _extract_list_section(text, "Keywords"), + "research_summary": _extract_section_text(text, "Research Summary"), + } + + +def _extract_section_text(text: str, section_name: str) -> str: + """Extract free-form text from a markdown section.""" + lines = [] + in_section = False + for line in text.splitlines(): + if section_name.lower() in line.lower() and line.startswith("##"): + in_section = True + continue + if in_section: + if line.startswith("##"): + break + lines.append(line) + return " ".join(l.strip() for l in lines if l.strip()) diff --git a/src/podcast/preprint_search.py b/src/podcast/preprint_search.py new file mode 100644 index 0000000..b15fa3c --- /dev/null +++ b/src/podcast/preprint_search.py @@ -0,0 +1,289 @@ +"""Preprint server search for the podcast pipeline. + +Supports bioRxiv, medRxiv (via biorxiv.org content API) and arXiv. + +Records returned use the same schema as PubMed records but with: + - pmid: prefixed ID e.g. "biorxiv:2024.04.01.123456", "arxiv:2401.12345" + - url: canonical preprint URL + - journal: " (preprint)" + - source: "biorxiv" | "medrxiv" | "arxiv" +""" + +import logging +import re +import xml.etree.ElementTree as ET +from datetime import datetime, timedelta, timezone +from typing import Any + +import httpx + +logger = logging.getLogger(__name__) + +BIORXIV_API = "https://api.biorxiv.org/details" +ARXIV_API = "https://export.arxiv.org/api/query" +ARXIV_NS = "http://www.w3.org/2005/Atom" + +# arXiv categories relevant to biomedical / computational biology research +ARXIV_CATEGORIES = "cat:q-bio.BM OR cat:q-bio.GN OR cat:q-bio.MN OR cat:q-bio.QM OR cat:cs.LG" + + +# --------------------------------------------------------------------------- +# Shared helpers +# --------------------------------------------------------------------------- + +def _extract_search_terms(queries: list[str]) -> list[str]: + """Extract individual quoted terms from PubMed query strings.""" + terms: list[str] = [] + for q in queries: + for match in re.findall(r'"([^"]+)"', q): + if match not in terms: + terms.append(match) + # Fall back to bare words if no quoted terms + if not terms: + for q in queries: + for word in q.split(): + w = word.strip('"\'') + if len(w) > 4 and w.upper() not in ("AND", "OR", "NOT") and w not in terms: + terms.append(w) + return terms[:12] + + +def _score_record(title: str, abstract: str, terms: list[str]) -> int: + """Count how many search terms appear in title+abstract (case-insensitive).""" + text = (title + " " + abstract).lower() + return sum(1 for t in terms if t.lower() in text) + + +def _date_range(days: int) -> tuple[str, str]: + now = datetime.now(timezone.utc) + start = now - timedelta(days=days) + return start.strftime("%Y-%m-%d"), now.strftime("%Y-%m-%d") + + +# --------------------------------------------------------------------------- +# bioRxiv / medRxiv +# --------------------------------------------------------------------------- + +async def _fetch_biorxiv_server( + server: str, + queries: list[str], + days: int, + max_results: int, +) -> list[dict[str, Any]]: + """Fetch recent preprints from bioRxiv or medRxiv and score against queries.""" + terms = _extract_search_terms(queries) + if not terms: + return [] + + start_date, end_date = _date_range(days) + url = f"{BIORXIV_API}/{server}/{start_date}/{end_date}/0/json" + + try: + async with httpx.AsyncClient(timeout=30) as client: + resp = await client.get(url) + resp.raise_for_status() + data = resp.json() + except Exception as exc: + logger.warning("%s API request failed: %s", server, exc) + return [] + + collection = data.get("collection") or [] + if not isinstance(collection, list): + return [] + + cutoff = datetime.now(timezone.utc) - timedelta(days=days) + scored: list[tuple[int, dict[str, Any]]] = [] + for item in collection: + title = item.get("title") or "" + abstract = item.get("abstract") or "" + if not abstract: + continue + + # The bioRxiv API date-range filter includes revised preprints; filter by + # the item's own date so we only include recently posted/first-version papers. + date_str = item.get("date") or "" + if date_str: + try: + item_date = datetime.strptime(date_str, "%Y-%m-%d").replace(tzinfo=timezone.utc) + if item_date < cutoff: + continue + except ValueError: + pass + + score = _score_record(title, abstract, terms) + if score == 0: + continue + + doi = item.get("doi") or "" + doi_suffix = doi.removeprefix("10.1101/") + record_id = f"{server}:{doi_suffix}" + + # Authors stored as semicolon-separated string + authors_raw = item.get("authors") or "" + authors_list = [a.strip() for a in authors_raw.split(";") if a.strip()] + + year_str = date_str[:4] + year = int(year_str) if year_str.isdigit() else datetime.now(timezone.utc).year + + scored.append((score, { + "pmid": record_id, + "url": f"https://www.{server}.org/content/{doi}v1", + "title": title, + "abstract": abstract, + "journal": f"{server.capitalize()} (preprint)", + "year": year, + "authors": authors_list, + "pub_types": ["Preprint"], + "source": server, + })) + + scored.sort(key=lambda x: x[0], reverse=True) + return [r for _, r in scored[:max_results]] + + +# --------------------------------------------------------------------------- +# arXiv +# --------------------------------------------------------------------------- + +async def _fetch_arxiv( + queries: list[str], + days: int, + max_results: int, +) -> list[dict[str, Any]]: + """Fetch recent preprints from arXiv matching researcher queries.""" + terms = _extract_search_terms(queries) + if not terms: + return [] + + # Build arXiv search: keyword terms in abstract + category filter + term_clause = " OR ".join(f'abs:"{t}"' for t in terms[:6]) + search_query = f"({term_clause}) AND ({ARXIV_CATEGORIES})" + + start_date, _ = _date_range(days) + # arXiv date filter via submittedDate + arxiv_date = start_date.replace("-", "") + "000000" + + params = { + "search_query": search_query, + "start": "0", + "max_results": str(max_results * 2), + "sortBy": "submittedDate", + "sortOrder": "descending", + } + + try: + async with httpx.AsyncClient(timeout=30) as client: + resp = await client.get(ARXIV_API, params=params) + resp.raise_for_status() + xml_text = resp.text + except Exception as exc: + logger.warning("arXiv API request failed: %s", exc) + return [] + + try: + root = ET.fromstring(xml_text) + except ET.ParseError as exc: + logger.warning("arXiv XML parse error: %s", exc) + return [] + + records: list[dict[str, Any]] = [] + cutoff = datetime.now(timezone.utc) - timedelta(days=days) + + for entry in root.findall(f"{{{ARXIV_NS}}}entry"): + title_el = entry.find(f"{{{ARXIV_NS}}}title") + summary_el = entry.find(f"{{{ARXIV_NS}}}summary") + id_el = entry.find(f"{{{ARXIV_NS}}}id") + published_el = entry.find(f"{{{ARXIV_NS}}}published") + + title = (title_el.text or "").strip().replace("\n", " ") if title_el is not None else "" + abstract = (summary_el.text or "").strip() if summary_el is not None else "" + arxiv_url = (id_el.text or "").strip() if id_el is not None else "" + published_str = (published_el.text or "").strip() if published_el is not None else "" + + if not abstract or not arxiv_url: + continue + + # Parse submission date and apply cutoff + try: + pub_dt = datetime.fromisoformat(published_str.replace("Z", "+00:00")) + if pub_dt < cutoff: + continue + year = pub_dt.year + except ValueError: + year = datetime.now(timezone.utc).year + + # Extract arxiv ID from URL like http://arxiv.org/abs/2401.12345v1 + arxiv_id = arxiv_url.split("/abs/")[-1].split("v")[0] + + authors_list = [ + (n_el.text or "").strip() + for author in entry.findall(f"{{{ARXIV_NS}}}author") + for n_el in [author.find(f"{{{ARXIV_NS}}}name")] + if n_el is not None and n_el.text + ] + + records.append({ + "pmid": f"arxiv:{arxiv_id}", + "url": f"https://arxiv.org/abs/{arxiv_id}", + "title": title, + "abstract": abstract, + "journal": "arXiv (preprint)", + "year": year, + "authors": authors_list, + "pub_types": ["Preprint"], + "source": "arxiv", + }) + + if len(records) >= max_results: + break + + return records + + +# --------------------------------------------------------------------------- +# Public interface +# --------------------------------------------------------------------------- + +async def fetch_preprint_candidates( + queries: list[str], + already_delivered: set[str], + days: int = 14, + max_total: int = 20, +) -> list[dict[str, Any]]: + """Fetch preprints from bioRxiv, medRxiv, and arXiv. + + Returns records filtered against already_delivered, up to max_total total. + Each record has the same schema as PubMed records with an added 'url' field. + """ + import asyncio + + per_source = max(max_total // 3, 5) + + biorxiv_task = _fetch_biorxiv_server("biorxiv", queries, days, per_source) + medrxiv_task = _fetch_biorxiv_server("medrxiv", queries, days, per_source) + arxiv_task = _fetch_arxiv(queries, days, per_source) + + results = await asyncio.gather(biorxiv_task, medrxiv_task, arxiv_task, return_exceptions=True) + + candidates: list[dict[str, Any]] = [] + seen_ids: set[str] = set() + source_names = ("bioRxiv", "medRxiv", "arXiv") + for name, result in zip(source_names, results): + if isinstance(result, Exception): + logger.warning("Preprint fetch failed for %s: %s", name, result) + continue + for rec in result: + pid = rec["pmid"] + if pid not in already_delivered and pid not in seen_ids: + seen_ids.add(pid) + candidates.append(rec) + + logger.info( + "Preprint candidates: %d total (%s)", + len(candidates), + ", ".join( + f"{name}: {len(r) if not isinstance(r, Exception) else 'err'}" + for name, r in zip(source_names, results) + ), + ) + return candidates[:max_total] diff --git a/src/podcast/pubmed_search.py b/src/podcast/pubmed_search.py new file mode 100644 index 0000000..05ede0f --- /dev/null +++ b/src/podcast/pubmed_search.py @@ -0,0 +1,149 @@ +"""PubMed query builder and search for the podcast pipeline.""" + +import asyncio +import logging +from typing import Any + +import httpx + +from src.config import get_settings +from src.services.pubmed import _ncbi_get, fetch_pubmed_records + +logger = logging.getLogger(__name__) + +EUTILS_BASE = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils" + + +def build_queries(profile: dict[str, Any]) -> list[str]: + """Build 2–3 PubMed search query strings from a researcher's profile fields. + + profile keys used: disease_areas, techniques, experimental_models, keywords + """ + disease_areas: list[str] = profile.get("disease_areas") or [] + techniques: list[str] = profile.get("techniques") or [] + experimental_models: list[str] = profile.get("experimental_models") or [] + keywords: list[str] = profile.get("keywords") or [] + + queries: list[str] = [] + + # Query 1: disease areas (most specific to the field) + da_terms = [_simplify_term(t) for t in disease_areas[:6] if t] + da_terms = [t for t in da_terms if t and len(t.split()) <= 5] + if da_terms: + queries.append(" OR ".join(f'"{t}"' for t in da_terms[:4])) + + # Query 2: techniques + experimental models (finds methods papers) + tech_terms = [_simplify_term(t) for t in techniques[:4] if t] + tech_terms = [t for t in tech_terms if t and len(t.split()) <= 4] + if tech_terms: + queries.append(" OR ".join(f'"{t}"' for t in tech_terms[:4])) + + # Query 3: keywords (broad coverage) + kw_terms = [_simplify_term(t) for t in keywords[:8] if t] + kw_terms = [t for t in kw_terms if t and len(t.split()) <= 4] + if kw_terms: + queries.append(" OR ".join(f'"{t}"' for t in kw_terms[:5])) + + # Fallback: use research summary words if nothing else + if not queries: + summary = profile.get("research_summary") or "" + words = [w.strip(".,;:") for w in summary.split() if len(w) > 6][:5] + if words: + queries.append(" OR ".join(f'"{w}"' for w in words)) + + return queries + + +def _simplify_term(term: str) -> str: + """Strip parenthetical qualifiers and trim whitespace.""" + return term.split("(")[0].strip() + + +async def search_recent_pmids( + queries: list[str], + days: int = 14, + max_total: int = 50, +) -> list[str]: + """Run PubMed ESearch for each query, return deduplicated list of recent PMIDs.""" + settings = get_settings() + seen: set[str] = set() + pmids: list[str] = [] + + # Date filter: last N days + from datetime import datetime, timedelta, timezone + cutoff = (datetime.now(timezone.utc) - timedelta(days=days)).strftime("%Y/%m/%d") + today = datetime.now(timezone.utc).strftime("%Y/%m/%d") + date_filter = f"{cutoff}:{today}[pdat]" + + for query in queries: + if len(pmids) >= max_total: + break + try: + params = { + "db": "pubmed", + "term": f"({query}) AND {date_filter}", + "retmode": "json", + "retmax": str(max_total), + "sort": "relevance", + } + resp = await _ncbi_get(f"{EUTILS_BASE}/esearch.fcgi", params) + data = resp.json() + ids = data.get("esearchresult", {}).get("idlist", []) + for pid in ids: + if pid not in seen and len(pmids) < max_total: + seen.add(pid) + pmids.append(pid) + logger.debug("Query '%s': %d results", query[:60], len(ids)) + except Exception as exc: + logger.warning("PubMed search failed for query '%s': %s", query[:60], exc) + + logger.info("Found %d candidate PMIDs across %d queries", len(pmids), len(queries)) + return pmids + + +async def fetch_candidates( + queries: list[str], + already_delivered: set[str], + days: int = 14, + max_total: int = 50, +) -> list[dict[str, Any]]: + """Search PubMed and preprint servers, return candidate records excluding already-delivered IDs. + + Returns list of dicts with: pmid, title, abstract, journal, year, pub_types. + Preprint records also include a 'url' and 'source' field. + """ + from src.podcast.preprint_search import fetch_preprint_candidates + + # Fetch PubMed and preprints concurrently + pubmed_pmids_task = search_recent_pmids(queries, days=days, max_total=max_total * 2) + preprint_task = fetch_preprint_candidates( + queries, + already_delivered=already_delivered, + days=days, + max_total=max(max_total // 3, 10), + ) + + pmids_raw, preprint_candidates = await asyncio.gather(pubmed_pmids_task, preprint_task) + + # Filter PubMed results + pmids = [p for p in pmids_raw if p not in already_delivered] + pubmed_records = await fetch_pubmed_records(pmids[:max_total]) if pmids else [] + + # Filter out reviews/editorials and items without abstracts from PubMed + pubmed_candidates = [] + for rec in pubmed_records: + if not rec.get("abstract"): + continue + pub_types = [pt.lower() for pt in (rec.get("pub_types") or [])] + if any(t in pt for t in ("review", "editorial", "comment", "letter") for pt in pub_types): + continue + pubmed_candidates.append(rec) + + candidates = pubmed_candidates + preprint_candidates + logger.info( + "%d total candidates (PubMed: %d, preprints: %d)", + len(candidates), + len(pubmed_candidates), + len(preprint_candidates), + ) + return candidates diff --git a/src/podcast/rss.py b/src/podcast/rss.py new file mode 100644 index 0000000..2195c8f --- /dev/null +++ b/src/podcast/rss.py @@ -0,0 +1,87 @@ +"""RSS feed builder for podcast episodes.""" + +import logging +from datetime import datetime, timezone +from email.utils import format_datetime +from pathlib import Path +from typing import Any + +logger = logging.getLogger(__name__) + +AUDIO_DIR = Path("data/podcast_audio") + + +def build_feed(agent_id: str, pi_name: str, episodes: list[Any], base_url: str) -> str: + """Build an RSS 2.0 feed with iTunes extensions for the given agent's episodes. + + episodes: list of PodcastEpisode ORM objects, newest first. + base_url: public base URL (e.g. https://copi.science) + """ + feed_url = f"{base_url}/podcast/{agent_id}/feed.xml" + items_xml = "\n".join(_build_item(ep, agent_id, base_url) for ep in episodes) + + return f""" + + + {_escape(pi_name)} — LabBot Research Briefings + Daily personalized research summaries for {_escape(pi_name)} at Scripps Research. + {_escape(feed_url)} + en-us + + {_escape(pi_name)} + + false +{items_xml} + +""" + + +def _build_item(ep: Any, agent_id: str, base_url: str) -> str: + """Build a single RSS for a PodcastEpisode.""" + date_str = ep.episode_date.isoformat() + pub_date = format_datetime( + datetime(ep.episode_date.year, ep.episode_date.month, ep.episode_date.day, + 9, 0, 0, tzinfo=timezone.utc) + ) + title = _escape(f"{ep.paper_title} — {date_str}") + description = _escape(ep.text_summary) + guid = f"{agent_id}-{date_str}" + pmid_url = f"https://pubmed.ncbi.nlm.nih.gov/{ep.pmid}/" + + enclosure_xml = "" + duration_xml = "" + if ep.audio_file_path: + audio_url = f"{base_url}/podcast/{agent_id}/audio/{date_str}.mp3" + audio_path = Path(ep.audio_file_path) + file_size = audio_path.stat().st_size if audio_path.exists() else 0 + enclosure_xml = ( + f' ' + ) + if ep.audio_duration_seconds: + mins, secs = divmod(ep.audio_duration_seconds, 60) + duration_xml = f" {mins}:{secs:02d}" + + return f""" + {title} + {description} + {_escape(pmid_url)} + {_escape(guid)} + {pub_date} +{enclosure_xml} +{duration_xml} + """ + + +def _escape(text: str) -> str: + """Escape XML special characters.""" + return ( + str(text) + .replace("&", "&") + .replace("<", "<") + .replace(">", ">") + .replace('"', """) + .replace("'", "'") + ) diff --git a/src/podcast/state.py b/src/podcast/state.py new file mode 100644 index 0000000..67235ce --- /dev/null +++ b/src/podcast/state.py @@ -0,0 +1,60 @@ +"""Podcast state persistence — tracks delivered PMIDs and last run timestamp.""" + +import json +import logging +from datetime import datetime, timezone +from pathlib import Path + +logger = logging.getLogger(__name__) + +STATE_FILE = Path("data/podcast_state.json") + + +def _load() -> dict: + if STATE_FILE.exists(): + try: + return json.loads(STATE_FILE.read_text(encoding="utf-8")) + except Exception as exc: + logger.warning("Failed to load podcast state: %s", exc) + return {} + + +def _save(data: dict) -> None: + STATE_FILE.parent.mkdir(parents=True, exist_ok=True) + STATE_FILE.write_text(json.dumps(data, indent=2), encoding="utf-8") + + +def get_delivered_pmids(agent_id: str) -> set[str]: + """Return the set of PMIDs already delivered to this agent.""" + data = _load() + return set(data.get("agents", {}).get(agent_id, {}).get("delivered_pmids", [])) + + +def record_delivery(agent_id: str, pmid: str) -> None: + """Record that a PMID was delivered to this agent.""" + data = _load() + agents = data.setdefault("agents", {}) + agent_data = agents.setdefault(agent_id, {"delivered_pmids": []}) + pmids = agent_data.setdefault("delivered_pmids", []) + if pmid not in pmids: + pmids.append(pmid) + _save(data) + + +def get_last_run_date() -> str | None: + """Return ISO date string of the last completed podcast run, or None.""" + data = _load() + return data.get("last_run_date") + + +def mark_run_complete() -> None: + """Record that the podcast pipeline ran today (UTC).""" + data = _load() + data["last_run_date"] = datetime.now(timezone.utc).strftime("%Y-%m-%d") + _save(data) + + +def should_run_today() -> bool: + """Return True if the podcast pipeline has not run today (UTC).""" + today = datetime.now(timezone.utc).strftime("%Y-%m-%d") + return get_last_run_date() != today diff --git a/src/podcast/tts_utils.py b/src/podcast/tts_utils.py new file mode 100644 index 0000000..7a56bea --- /dev/null +++ b/src/podcast/tts_utils.py @@ -0,0 +1,81 @@ +"""Shared utilities for podcast TTS backends.""" + +import logging +import re +import shutil +import subprocess +import tempfile +from pathlib import Path + +logger = logging.getLogger(__name__) + + +def strip_markdown(text: str) -> str: + """Remove markdown formatting so TTS reads clean prose.""" + # Remove bold/italic markers (* and _) + text = re.sub(r"\*+([^*]+)\*+", r"\1", text) + text = re.sub(r"_+([^_]+)_+", r"\1", text) + # Remove inline code + text = re.sub(r"`[^`]+`", "", text) + # Remove URLs but keep surrounding text + text = re.sub(r"https?://\S+", "", text) + return text.strip() + + +def normalize_audio(audio_path: Path) -> bool: + """Normalize audio loudness in-place using ffmpeg loudnorm (EBU R128). + + Targets -16 LUFS integrated loudness, -1.5 dBTP true peak — standard + podcast levels. Writes to a temp file then atomically replaces the original. + + Returns True if normalization succeeded, False if ffmpeg is unavailable or + the command fails (the original file is preserved on failure). + """ + ffmpeg = shutil.which("ffmpeg") + if not ffmpeg: + logger.warning("ffmpeg not found on PATH — skipping audio normalization") + return False + + with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as tmp: + tmp_path = Path(tmp.name) + + try: + result = subprocess.run( + [ + ffmpeg, + "-y", # overwrite tmp if it exists + "-i", str(audio_path), + "-af", "loudnorm=I=-16:TP=-1.5:LRA=11", + "-ar", "44100", + str(tmp_path), + ], + capture_output=True, + timeout=120, + ) + if result.returncode != 0: + logger.error( + "ffmpeg loudnorm failed (exit %d): %s", + result.returncode, + result.stderr.decode(errors="replace")[-500:], + ) + tmp_path.unlink(missing_ok=True) + return False + + tmp_path.replace(audio_path) + logger.info("Audio normalized (loudnorm -16 LUFS) → %s", audio_path) + return True + except Exception as exc: + logger.error("Audio normalization failed: %s", exc) + tmp_path.unlink(missing_ok=True) + return False + + +def get_audio_duration_seconds(audio_path: Path) -> int | None: + """Return audio duration in seconds using mutagen, or None if unavailable.""" + try: + from mutagen.mp3 import MP3 + audio = MP3(str(audio_path)) + return int(audio.info.length) + except Exception as exc: + logger.debug("Could not read audio duration from %s: %s", audio_path, exc) + return None diff --git a/src/routers/admin.py b/src/routers/admin.py index befea82..5fb1511 100644 --- a/src/routers/admin.py +++ b/src/routers/admin.py @@ -11,6 +11,7 @@ from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import selectinload +from src.config import get_settings from src.database import get_db from src.dependencies import get_admin_user, get_current_user from src.models import ( @@ -19,6 +20,7 @@ AgentRegistry, Job, LlmCallLog, + PodcastEpisode, Publication, ResearcherProfile, SimulationRun, @@ -958,6 +960,48 @@ async def impersonate_user( return response +@router.get("/podcast", response_class=HTMLResponse) +async def admin_podcast( + request: Request, + agent_filter: str | None = None, + db: AsyncSession = Depends(get_db), + current_user: User = Depends(get_admin_user), +): + """Podcast episodes overview.""" + query = select(PodcastEpisode).order_by(PodcastEpisode.episode_date.desc()).limit(200) + result = await db.execute(query) + all_episodes = result.scalars().all() + + # Apply agent filter + episodes = [e for e in all_episodes if not agent_filter or e.agent_id == agent_filter] + + # Summary stats + total = len(all_episodes) + with_audio = sum(1 for e in all_episodes if e.audio_file_path) + slack_delivered = sum(1 for e in all_episodes if e.slack_delivered) + agent_ids = sorted({e.agent_id for e in all_episodes}) + + settings = get_settings() + base_url = settings.podcast_base_url or settings.base_url + + return templates.TemplateResponse( + request, + "admin/podcast.html", + _template_context( + request, + current_user, + active_admin="podcast", + episodes=episodes, + total=total, + with_audio=with_audio, + slack_delivered=slack_delivered, + agent_ids=agent_ids, + agent_filter=agent_filter, + base_url=base_url, + ), + ) + + @router.post("/impersonate/stop") async def stop_impersonating( request: Request, diff --git a/src/routers/podcast.py b/src/routers/podcast.py new file mode 100644 index 0000000..3560324 --- /dev/null +++ b/src/routers/podcast.py @@ -0,0 +1,130 @@ +"""Podcast RSS feed, audio serving, and on-demand generation endpoints.""" + +import asyncio +import logging +from pathlib import Path + +from fastapi import APIRouter, Depends, HTTPException +from fastapi.responses import FileResponse, Response +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from src.config import get_settings +from src.database import get_db, get_session_factory +from src.models.agent_registry import AgentRegistry +from src.models.podcast import PodcastEpisode +from src.podcast.rss import build_feed + +logger = logging.getLogger(__name__) +router = APIRouter() + +AUDIO_DIR = Path("data/podcast_audio") + + +@router.get("/{agent_id}/feed.xml", response_class=Response) +async def podcast_feed( + agent_id: str, + db: AsyncSession = Depends(get_db), +): + """RSS 2.0 podcast feed for a PI's daily research briefings.""" + # Verify agent exists + agent_result = await db.execute( + select(AgentRegistry).where(AgentRegistry.agent_id == agent_id) + ) + agent = agent_result.scalar_one_or_none() + if not agent: + raise HTTPException(status_code=404, detail="Agent not found") + + # Fetch episodes newest-first + episodes_result = await db.execute( + select(PodcastEpisode) + .where(PodcastEpisode.agent_id == agent_id) + .order_by(PodcastEpisode.episode_date.desc()) + .limit(30) + ) + episodes = episodes_result.scalars().all() + + settings = get_settings() + base_url = settings.podcast_base_url or settings.base_url + + xml = build_feed( + agent_id=agent_id, + pi_name=agent.pi_name, + episodes=episodes, + base_url=base_url, + ) + + return Response(content=xml, media_type="application/rss+xml; charset=utf-8") + + +@router.get("/{agent_id}/audio/{date}.mp3") +async def podcast_audio(agent_id: str, date: str): + """Stream a podcast audio file.""" + # Basic validation to prevent path traversal + if "/" in date or ".." in date or not date.replace("-", "").isdigit(): + raise HTTPException(status_code=400, detail="Invalid date format") + + audio_path = AUDIO_DIR / agent_id / f"{date}.mp3" + if not audio_path.exists(): + raise HTTPException(status_code=404, detail="Audio file not found") + + return FileResponse( + path=str(audio_path), + media_type="audio/mpeg", + filename=f"{agent_id}-{date}.mp3", + ) + + +async def _run_pipeline_background(agent_id: str, bot_name: str, pi_name: str, bot_token: str, slack_user_id: str | None) -> None: + """Run the podcast pipeline in a background task with its own DB session.""" + from src.podcast.pipeline import run_pipeline_for_agent + + session_factory = get_session_factory() + try: + async with session_factory() as db: + ok = await run_pipeline_for_agent( + agent_id=agent_id, + bot_name=bot_name, + pi_name=pi_name, + bot_token=bot_token, + slack_user_id=slack_user_id, + db_session=db, + ) + await db.commit() + logger.info("On-demand podcast pipeline for %s: %s", agent_id, "produced" if ok else "no episode") + except Exception as exc: + logger.error("On-demand podcast pipeline failed for %s: %s", agent_id, exc, exc_info=True) + + +@router.post("/{agent_id}/generate") +async def podcast_generate( + agent_id: str, + db: AsyncSession = Depends(get_db), +): + """Trigger on-demand podcast generation for an agent. + + Returns immediately; pipeline runs in the background. + Check the RSS feed or DB for the resulting episode. + """ + agent_result = await db.execute( + select(AgentRegistry).where(AgentRegistry.agent_id == agent_id) + ) + agent = agent_result.scalar_one_or_none() + if not agent: + raise HTTPException(status_code=404, detail="Agent not found") + + settings = get_settings() + slack_tokens = settings.get_slack_tokens() + bot_token = agent.slack_bot_token or slack_tokens.get(agent_id, {}).get("bot", "") + + asyncio.create_task( + _run_pipeline_background( + agent_id=agent_id, + bot_name=agent.bot_name, + pi_name=agent.pi_name, + bot_token=bot_token, + slack_user_id=agent.slack_user_id, + ) + ) + + return {"status": "started", "agent_id": agent_id, "message": f"Podcast pipeline started for {agent.pi_name}. Check the RSS feed shortly."} diff --git a/templates/admin/podcast.html b/templates/admin/podcast.html new file mode 100644 index 0000000..aa9fbc0 --- /dev/null +++ b/templates/admin/podcast.html @@ -0,0 +1,118 @@ +{% extends "base.html" %} +{% block title %}Admin — Podcast — CoPI{% endblock %} + +{% block content %} +
+

Podcast Episodes

+ {{ total }} total episodes +
+ + +
+
+
{{ total }}
+
Total Episodes
+
+
+
{{ with_audio }}
+
With Audio
+
+
+
{{ slack_delivered }}
+
Slack Delivered
+
+
+ + +
+
+ + +
+ {% if agent_filter %} + + + {% endif %} +
+ + +
+ + + + + + + + + + + + + {% for ep in episodes %} + + + + + + + + + {% else %} + + + + {% endfor %} + +
DateAgentPaperPMIDAudioSlack
{{ ep.episode_date }} + {{ ep.agent_id }} + +
{{ ep.paper_title }}
+
{{ ep.paper_journal }} · {{ ep.paper_year }}
+ {% if ep.selection_justification %} +
{{ ep.selection_justification }}
+ {% endif %} +
+ {{ ep.pmid }} + + {% if ep.audio_file_path %} + ✓ + {% if ep.audio_duration_seconds %} + {{ ep.audio_duration_seconds // 60 }}:{{ '%02d' % (ep.audio_duration_seconds % 60) }} + {% endif %} + + {% else %} + + {% endif %} + + {% if ep.slack_delivered %} + + {% else %} + + {% endif %} +
No podcast episodes yet
+
+ + +{% endblock %} diff --git a/templates/base.html b/templates/base.html index 9ca8721..a8fcdbf 100644 --- a/templates/base.html +++ b/templates/base.html @@ -86,6 +86,7 @@ Activity Discussions Agents + Podcast