This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
Telegram meme recommendation bot (@ffmemesbot). Infinite feed of memes with like/dislike that drives personalized recommendations. North star metric is session length (memes per session), NOT like rate.
# First-time setup
cp .env.example .env # Then fill in required values
docker network create ffmemes_network
docker-compose up -d --build
# Development
just up # Start all services
just build # Rebuild containers
just logs app # Tail app logs
just exec bash # Shell into app container
docker compose exec app ipython # Interactive REPL (supports async natively)
# Database
just migrate # Run alembic migrations
just mm "migration name" # Create new autogenerated migration
just downgrade -1 # Rollback one migration
just backup # pg_dump + gzip
just restore <file> # pg_restore
# Code quality — MUST run before every commit
ruff check --fix src/ tests/ # fix lint errors
ruff format src/ tests/ # fix formatting (CI checks this!)
# Or equivalently:
just lint # runs both ruff fix + format
# Tests (all integration tests, require DB)
docker compose exec app pytest
docker compose exec app pytest tests/recommendations/
docker compose exec app pytest tests/recommendations/test_blender.py- Bot: python-telegram-bot 22.3, webhook mode (polling mode for local dev via
start-polling.py) - API: FastAPI + Uvicorn (port 8000 internal, 16000 external)
- DB: PostgreSQL 14 (asyncpg + SQLAlchemy 2.0 raw
Tableobjects, NOT declarative ORM) - Cache/Queue: Redis 6.2 (recommendation queues per user, user info cache, 1h TTL)
- Jobs: Prefect 3.4 (parsing, stats, crossposting crons)
- Describe Memes: OpenRouter FREE vision models only (
src/flows/storage/describe_memes.py), runs every 15 min, 9/batch (~864/day). NEVER add paid models — the client rejects models not ending in:freeand Redis caps calls at 900 attempts/day. Free-tier 429s/timeouts/invalid model output are normal: cool down that model and retry in later scheduled runs. Need $10+ lifetime purchases for 1,000 req/day. See specs/describe-memes.md. Circuit breaker auto-pauses after 3 failures/hour - Python: 3.10 (dev), 3.12 (prod)
| Service | Container | Purpose |
|---|---|---|
app |
app |
FastAPI webhook server |
app_prefect |
prefect_worker |
Prefect scheduled job worker |
app_tgbot_polling |
app_tgbot_polling |
Polling bot (dev alternative to webhooks) |
app_db |
app_db |
PostgreSQL (port 65432) |
app_redis |
app_redis |
Redis (port 36379) |
- Production:
src/main.py(FastAPI app with webhook at/tgbot/webhook) -- only runs on production server - Local testing:
start-polling.py(polling mode with a separate test bot token -- production bot cannot run locally) - Startup scripts:
scripts/start-dev.sh(uvicorn with reload),scripts/start-prod.sh
src/
├── main.py # FastAPI app, lifespan, health check
├── config.py # Pydantic BaseSettings (env vars)
├── database.py # All 20 SQLAlchemy Table definitions + fetch_one/fetch_all/execute helpers
├── redis.py # Redis client, connection pool (128 max), queue/cache operations
├── tgbot/
│ ├── app.py # Bot setup, handler registration
│ ├── router.py # FastAPI webhook route with secret validation
│ ├── handlers/ # /start, reactions, uploads, admin, chat, payments, etc.
│ └── senders/ # Meme delivery, keyboards, next_message queue logic
├── recommendations/
│ ├── candidates.py # 9 candidate engines + CandidatesRetriever (asyncio.gather)
│ ├── blender.py # Weighted random sampling across engines
│ ├── meme_queue.py # Redis queue: check_queue, generate_recommendations
│ └── service.py # Reaction persistence, stats triggers
├── feed_turn/
│ └── planner.py # Pure maturity-stage decision table (engine plan + mod quota); no DB/Redis/TG. Contract for upcoming Feed Turn refactor; not yet wired into hot path.
├── storage/
│ ├── parsers/ # TG (BeautifulSoup HTML), VK (API), IG (HikerAPI)
│ ├── etl.py # Raw posts -> processed memes pipeline
│ ├── watermark.py # Image watermarking (Pillow)
│ └── service.py # Upload to TG storage chat -> telegram_file_id
├── flows/ # Prefect workflows (ETL, stats, crossposting)
│ ├── events.py # safe_emit() wrapper for Prefect events
│ └── hooks.py # on_failure Telegram alert hook
├── stats/ # Stats computation SQL queries
└── crossposting/ # Auto-post to public channels
scripts/
├── serve_flows.py # Prefect .serve() entry point (23 deployments)
└── setup_automations.py # Creates Prefect automations (run once after deploy)
alembic/versions/ # 32 migration files
- No ORM models: Uses SQLAlchemy
Tableobjects insrc/database.py, not declarative classes - Raw SQL: Most queries use
text()with dict-based results viafetch_one()/fetch_all() - Async throughout: asyncpg driver, all DB operations are async
- Migrations auto-generated from
Tabledefinitions indatabase.py
After rebasing a feature branch with a new migration onto production, always
verify a single head before pushing:
docker compose exec app alembic heads # MUST return exactly one revisionIf two heads appear, your migration's down_revision is stale (another migration
landed on production with the same parent). Re-parent it to the new tip and amend
the commit. The CI lint job also enforces this (alembic heads | grep -c '(head)'),
so a multi-head push fails fast at lint time instead of cascading into a confusing
UndefinedTableError in the test job.
When Table() columns change in database.py, the migration must match
exactly — including nullable and unique constraints. Mismatches drift schema
between local autogenerate and prod.
In-progress PR branches live in .worktrees/<slug>/. Check there before
git checkout <branch> — git refuses with already checked out at <path>.
Either cd .worktrees/<slug> to work on the branch, or git worktree remove .worktrees/<slug> if you want to free the branch for the main checkout.
Sources -> Parsers (hourly cron) -> meme_raw_* tables
-> [automation: parser completes -> triggers pipeline]
-> ETL (filter single-media, detect type) -> meme table (status=created)
-> Download + Watermark + Upload to TG storage -> telegram_file_id
-> [automation: pipeline completes -> triggers final_pipeline]
-> Ad filter -> Dedup -> status='ok'
-> Recommendation (9 engines) -> Blender -> Redis queue -> User
-> Like/Dislike -> user_meme_reaction -> Stats aggregation (every 15 min)
9 candidate engines blended with weighted random sampling. All engines are raw SQL queries in candidates.py, fetched concurrently via asyncio.gather().
User maturity stages determine engine mix:
- Cold start (<30 memes):
best_uploaded_memes+fast_dopamineonly - Growing (30-100): 5 engines blended
- Mature (100+): 5 engines with different weights
- Moderators/Admins: 75% low-sent-quota + 25% regular
Queue refills when length <= 2, generating 5 memes per refill.
| Table | Purpose |
|---|---|
meme |
Processed memes (status, type, telegram_file_id, ocr_result) |
meme_source |
Registered sources (TG/VK/IG channels) |
meme_raw_telegram/vk/ig |
Raw parsed posts per platform |
user / user_tg |
User profiles + Telegram metadata |
user_language |
Language preferences (composite key: user_id + language_code) |
user_meme_reaction |
Like/dislike log (PK: user_id, meme_id) |
meme_stats |
Aggregated meme metrics (lr_smoothed, nlikes, sec_to_react) |
user_stats |
User engagement metrics (nlikes, nsessions, time_spent) |
user_meme_source_stats |
Per-user per-source affinity |
meme_source_stats |
Source-level quality metrics |
- All tests are integration tests requiring a running DB (uses default
postgresdatabase, separate from app'sappdatabase) - Tests run alembic upgrade/downgrade automatically via fixtures in
tests/conftest.py - Use
pytest-asynciofor async test support - Test manually with
docker compose exec app ipython(supports async withoutasyncio.run())
For fast iteration on a single test file, run pytest directly on the host. Set
up .env.test (gitignored — keeps real test-DB credentials off the public
repo) by mirroring the env block in .github/workflows/ci.yml. Then:
set -a; source .env.test; set +a
python3 -m pytest tests/comms/test_publishing.py -xRequired vars beyond DATABASE_URL / REDIS_URL: CORS_ORIGINS,
CORS_HEADERS, plus the TG bot vars (any non-empty string works for tests).
If src.config raises a pydantic ValidationError on import, your .env.test
is missing a required field — .github/workflows/ci.yml is the source of
truth for the full set.
Required in .env: DATABASE_URL, REDIS_URL, TELEGRAM_BOT_TOKEN, TELEGRAM_BOT_USERNAME, TELEGRAM_BOT_WEBHOOK_SECRET, MEME_STORAGE_TELEGRAM_CHAT_ID, UPLOADED_MEMES_REVIEW_CHAT_ID, ADMIN_LOGS_CHAT_ID
Optional: VK_TOKEN, OPENROUTER_API_KEY, OPENAI_API_KEY, SENTRY_DSN
IMPORTANT: Always reference credentials via $ENV_VAR_NAME in bash commands. Never expand, paste, or hardcode actual secret values inline. Use source .env to load variables, then reference by name:
# CORRECT — password stays hidden
source .env
psql $ANALYST_DATABASE_URL
# ALSO CORRECT — .pgpass handles auth automatically
psql -h $PGHOST -p $PGPORT -U $PGUSER -d $PGDATABASE
# WRONG — never do this (never inline the actual password)
psql "host=... user=... password=ACTUAL_PASSWORD dbname=..."This applies to all secrets: database URLs, API tokens, bot tokens, etc.
- Like rate ~50%, but dislike != bad meme (users often dislike good memes)
- Text-heavy memes problem: users skip (dislike) memes with lots of text because they're lazy to read, not because the meme is bad. In a different mood they'd like it. Fast dislike (< 2 sec) on text meme = "didn't bother reading", not "bad content"
- Hypothesis: sec_to_react distribution is bimodal per meme — quick reactions (gut response) vs slow reactions (read/think). This should inform scoring
- Personalization: source affinity (user x source like rate) -> within-source meme ranking
- Current reco is rule-based SQL, not ML
- Stats (user_stats, meme_stats, lr_smoothed) update every 15 min; engagement_score updates hourly
- Share button under every meme survives Telegram forwards (external link, not callback button)
- Sentry: configured, CLI (
sentry) is logged in.Forbiddenerrors are filtered (handled by error handler) - Logs: server logs available via Coolify
- Prefect automations (13 total, self-hosted):
- Chain triggers (6): parser -> pipeline -> final_pipeline (per platform)
- Proactive monitors (4): auto-retrigger stats/parser/pipeline if not completed on time
- Circuit breakers (3): pause deployment after 3 failures (parsers, describe_memes)
- All flows emit custom events (
ff.parser.*,ff.pipeline.*,ff.stats.*) viasafe_emit() - Automations managed via
scripts/setup_automations.py(run once after deploy) - Resume paused deployments:
prefect deployment resume "<name>"
After every deploy or when checking system health, verify:
- Sentry:
sentry issue list— should have 0 recent events - Docker:
ssh root@65.108.127.32 "docker ps"— app, prefect-runner, prefect-server all Up - App logs:
ssh root@65.108.127.32 "docker logs <app-container> --since 10m 2>&1"— no errors - Prefect logs:
ssh root@65.108.127.32 "docker logs <prefect-runner> --since 10m 2>&1"— flows Completed - DB health query (run via psql against prod):
SELECT
(SELECT count(*) FROM meme WHERE created_at > now() - interval '24 hours') AS new_memes_24h,
(SELECT round(100.0 * count(*) FILTER (WHERE status = 'ok') / NULLIF(count(*), 0))
FROM meme WHERE created_at > now() - interval '24 hours') AS ok_pct,
(SELECT count(DISTINCT user_id) FROM user_meme_reaction
WHERE reacted_at > now() - interval '24 hours') AS active_users_24h,
(SELECT count(*) FROM user_meme_reaction
WHERE reacted_at > now() - interval '24 hours') AS reactions_24h,
(SELECT max(updated_at) FROM user_stats) AS user_stats_updated,
(SELECT max(updated_at) FROM meme_stats) AS meme_stats_updated;Expected healthy values: new_memes > 100, ok_pct ~90-96%, active_users > 100, reactions > 5000, stats updated within last 15 min
Use the /browse skill from gstack for all web browsing. Never use mcp__claude-in-chrome__* tools.
Available skills: /plan-ceo-review, /plan-eng-review, /review, /ship, /browse, /qa, /setup-browser-cookies, /retro
IMPORTANT: Read docs/broadcasts.md before sending any broadcast. Use send_broadcast() from src/broadcasts/service.py with a unique broadcast_id for dedup. Never run inline ad-hoc broadcast code. Language detection uses user_language table (bot preference), NOT user_tg.language_code (Telegram app language).
- SQL string interpolation in
candidates.py-- needs parameterization (injection risk) - Queue refill threshold=2 is too low for fast users
- No exploration mechanism in recommendations (all engines optimize exploitation)
When the user's request matches an available skill, ALWAYS invoke it using the Skill tool as your FIRST action. Do NOT answer directly, do NOT use other tools first. The skill has specialized workflows that produce better results than ad-hoc answers.
Key routing rules:
- Product ideas, "is this worth building", brainstorming → invoke office-hours
- Bugs, errors, "why is this broken", 500 errors → invoke investigate
- Ship, deploy, push, create PR → invoke ship
- QA, test the site, find bugs → invoke qa
- Code review, check my diff → invoke review
- Update docs after shipping → invoke document-release
- Weekly retro → invoke retro
- Design system, brand → invoke design-consultation
- Visual audit, design polish → invoke design-review
- Architecture review → invoke plan-eng-review
- Save progress, checkpoint, resume → invoke checkpoint
- Code quality, health check → invoke health