diff --git a/CHANGELOG.md b/CHANGELOG.md index 5b54b0a..2475016 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - **Optional MCP container image:** Added `Containerfile.mcp` plus GHCR publishing for explicit MCP tags (`mcp`, `X.Y.Z-mcp`, `vX.Y.Z-mcp`) alongside the default API image. - **Manual publish workflow controls:** Added `workflow_dispatch` inputs for version override plus selective publish toggles for PyPI, GHCR (Podman), and GitHub Releases. - **Executable branding assets:** Added packaged application icons for Windows (`.ico`) and macOS (`.icns`) release builds. +- **TOTP 2FA support in session token generation:** The `perplexity-webui-scraper token` CLI wizard now handles Perplexity accounts with TOTP-based two-factor authentication. After email OTP verification, the CLI detects the TOTP challenge redirect, prompts for the authenticator app code, and completes the login flow automatically. ### Changed diff --git a/README.md b/README.md index 413192c..852a9fc 100644 --- a/README.md +++ b/README.md @@ -100,11 +100,11 @@ for model in MODELS.list_all(): ## Available CLI -| Command | Extra | Description | -| -------------------------------- | ----- | --------------------------------------------------------- | -| `perplexity-webui-scraper token` | `cli` | Interactive email auth wizard to generate a session token | -| `perplexity-webui-scraper mcp` | `mcp` | Start the MCP server | -| `perplexity-webui-scraper api` | `api` | Start the OpenAI-compatible REST API server | +| Command | Extra | Description | +| -------------------------------- | ----- | ----------------------------------------------------------------------------- | +| `perplexity-webui-scraper token` | `cli` | Interactive email auth wizard to generate a session token (supports TOTP 2FA) | +| `perplexity-webui-scraper mcp` | `mcp` | Start the MCP server | +| `perplexity-webui-scraper api` | `api` | Start the OpenAI-compatible REST API server | ## OpenAI-Compatible API diff --git a/src/perplexity_webui_scraper/_internal/constants.py b/src/perplexity_webui_scraper/_internal/constants.py index 829e857..6a26b72 100644 --- a/src/perplexity_webui_scraper/_internal/constants.py +++ b/src/perplexity_webui_scraper/_internal/constants.py @@ -43,6 +43,9 @@ ENDPOINT_AUTH_OTP_REDIRECT: Final[str] = "/api/auth/otp-redirect-link" """Endpoint to convert an OTP code into a redirect URL.""" +ENDPOINT_AUTH_TOTP_CHALLENGE_VERIFY: Final[str] = "/api/auth/totp/challenge-verify" +"""Endpoint to verify a TOTP code during 2FA challenge.""" + # --------------------------------------------------------------------------- # Authentication # --------------------------------------------------------------------------- diff --git a/src/perplexity_webui_scraper/cli/commands/get_session_token.py b/src/perplexity_webui_scraper/cli/commands/get_session_token.py index ceede29..e008ccb 100644 --- a/src/perplexity_webui_scraper/cli/commands/get_session_token.py +++ b/src/perplexity_webui_scraper/cli/commands/get_session_token.py @@ -5,7 +5,9 @@ from __future__ import annotations +from contextlib import suppress from typing import Annotated +from urllib.parse import parse_qs, urlparse from curl_cffi.requests import Session from pyperclip import PyperclipException, copy @@ -16,9 +18,11 @@ from perplexity_webui_scraper._internal.constants import ( API_BASE_URL, + API_VERSION, ENDPOINT_AUTH_CSRF, ENDPOINT_AUTH_OTP_REDIRECT, ENDPOINT_AUTH_SIGNIN, + ENDPOINT_AUTH_TOTP_CHALLENGE_VERIFY, SESSION_COOKIE_NAME, ) @@ -51,6 +55,179 @@ def _show_exit_message() -> None: console.input() +def _prompt_email(provided: str | None) -> str: + """Prompt for or validate the user's email address.""" + if not provided: + console.print("\n[bold cyan]Step 1: Email Verification[/bold cyan]") + email = Prompt.ask(" Enter your Perplexity email", console=console) + else: + console.print(f"\n[bold cyan]Step 1: Email Verification[/bold cyan] (using [white]{provided}[/white])") + email = provided + + email = email.strip() + + if not email or "@" not in email: + raise ValueError("Invalid email address.") + + return email + + +def _fetch_csrf(session: Session) -> str: + """Obtain a CSRF token from Perplexity's auth endpoint.""" + with console.status("[bold green]Initializing secure connection...", spinner="dots"): + session.get(API_BASE_URL) + csrf_response = session.get(f"{API_BASE_URL}{ENDPOINT_AUTH_CSRF}") + csrf_response.raise_for_status() + csrf_token: str = csrf_response.json().get("csrfToken", "") + + if not csrf_token: + raise ValueError("Failed to obtain CSRF token.") + + return csrf_token + + +def _send_otp(session: Session, email: str, csrf_token: str) -> None: + """Send an OTP email to the given address.""" + with console.status("[bold green]Sending verification code...", spinner="dots"): + response = session.post( + f"{API_BASE_URL}{ENDPOINT_AUTH_SIGNIN}?version={API_VERSION}&source=default", + json={ + "email": email, + "csrfToken": csrf_token, + "useNumericOtp": "true", + "json": "true", + "callbackUrl": f"{API_BASE_URL}/?login-source=floatingSignup", + }, + ) + response.raise_for_status() + + +def _resolve_redirect_url(session: Session, email: str, otp_code: str) -> str: + """Convert an OTP code or magic link into a redirect URL.""" + if otp_code.startswith("http"): + return otp_code + + otp_response = session.post( + f"{API_BASE_URL}{ENDPOINT_AUTH_OTP_REDIRECT}", + json={ + "email": email, + "otp": otp_code, + "redirectUrl": f"{API_BASE_URL}/?login-source=floatingSignup", + "emailLoginMethod": "web-otp", + }, + ) + otp_response.raise_for_status() + + redirect_path = otp_response.json().get("redirect", "") + + if not redirect_path: + raise ValueError("No redirect URL received.") + + return f"{API_BASE_URL}{redirect_path}" if redirect_path.startswith("/") else redirect_path + + +def _follow_callback(session: Session, redirect_url: str) -> str | None: + """Follow the callback and return a TOTP challenge token if 2FA is required.""" + callback_resp = session.get(redirect_url, allow_redirects=False) + + if callback_resp.status_code not in (301, 302, 307, 308): + return None + + location = callback_resp.headers.get("Location", "") + + if "error=" in location: + raise ValueError("Verification failed. The OTP code may be invalid or expired.") + + if "/auth/totp-challenge" in location: + parsed = urlparse(location if location.startswith("http") else f"{API_BASE_URL}{location}") + challenge_token = parse_qs(parsed.query).get("token", [""])[0] + + if not challenge_token: + raise ValueError("TOTP challenge token not found in redirect.") + + return challenge_token + + # Normal flow — follow the redirect + follow_url = location if location.startswith("http") else f"{API_BASE_URL}{location}" + session.get(follow_url) + + return None + + +def _verify_totp(session: Session, challenge_token: str) -> None: + """Prompt for and verify a TOTP code, then follow any post-verification redirect.""" + console.print("\n[bold cyan]Step 3: Two-Factor Authentication[/bold cyan]") + console.print(" Your account has TOTP enabled. Enter the code from your authenticator app.") + + verify_url = f"{API_BASE_URL}{ENDPOINT_AUTH_TOTP_CHALLENGE_VERIFY}?version={API_VERSION}&source=default" + + while True: + totp_code = Prompt.ask(" Enter TOTP code", console=console).strip() + + if not totp_code or not totp_code.isdigit() or len(totp_code) != 6: + console.print("[red] Invalid format. TOTP code must be a 6-digit number.[/red]") + continue + + with console.status("[bold green]Verifying TOTP...", spinner="dots"): + totp_verify_response = session.post( + verify_url, + json={"token": challenge_token, "code": totp_code}, + ) + + try: + totp_verify_response.raise_for_status() + except Exception: + with suppress(Exception): + totp_data = totp_verify_response.json() + + if "error" in totp_data: + console.print(f"[red] ❌ {totp_data.get('error')}[/red]") + continue + + raise + + if totp_verify_response.status_code in (301, 302, 307, 308): + next_location = totp_verify_response.headers.get("Location", "") + + if next_location: + next_url = next_location if next_location.startswith("http") else f"{API_BASE_URL}{next_location}" + session.get(next_url) + + break + + totp_data = totp_verify_response.json() + + if "error" in totp_data: + console.print(f"[red] ❌ {totp_data.get('error')}[/red]") + continue + + next_redirect = totp_data.get("redirect", "") + + if next_redirect: + next_url = f"{API_BASE_URL}{next_redirect}" if next_redirect.startswith("/") else next_redirect + session.get(next_url) + + break + + +def _extract_and_present_token(session: Session) -> None: + """Extract the session cookie, display the token, and offer clipboard copy.""" + session_token = session.cookies.get(SESSION_COOKIE_NAME) + + if not session_token: + raise ValueError("Authentication successful, but token not found.") + + console.print("\n[bold green]✅ Token generated successfully![/bold green]") + console.print(f"\n[bold white]Your session token:[/bold white]\n[green]{session_token}[/green]\n") + + if Confirm.ask("Copy token to clipboard?", default=False, console=console): + try: + copy(session_token) + console.print("[dim]Token copied to clipboard.[/dim]") + except PyperclipException as error: + console.print(f"[red]Could not copy to clipboard: {error}[/red]") + + def run( email: Annotated[str | None, typer.Argument(help="Your Perplexity account email.")] = None, ) -> None: @@ -60,103 +237,56 @@ def run( displays the extracted session token, and offers to copy it to the clipboard. The screen is cleared on exit for security. """ - with console.screen(): + with console.screen(hide_cursor=False): try: _show_header() - if not email: - console.print("\n[bold cyan]Step 1: Email Verification[/bold cyan]") - email = Prompt.ask(" Enter your Perplexity email", console=console) - else: - console.print(f"\n[bold cyan]Step 1: Email Verification[/bold cyan] (using [white]{email}[/white])") - - email = email.strip() - - if not email or "@" not in email: - raise ValueError("Invalid email address.") + email = _prompt_email(email) with Session(impersonate="chrome", headers=_DEFAULT_HEADERS) as session: - # Step 1: Obtain CSRF token - with console.status("[bold green]Initializing secure connection...", spinner="dots"): - session.get(API_BASE_URL) - csrf_response = session.get(f"{API_BASE_URL}{ENDPOINT_AUTH_CSRF}") - csrf_response.raise_for_status() - csrf_token: str = csrf_response.json().get("csrfToken", "") - - if not csrf_token: - raise ValueError("Failed to obtain CSRF token.") - - # Step 2: Send OTP email - with console.status("[bold green]Sending verification code...", spinner="dots"): - signin_response = session.post( - f"{API_BASE_URL}{ENDPOINT_AUTH_SIGNIN}?version=2.18&source=default", - json={ - "email": email, - "csrfToken": csrf_token, - "useNumericOtp": "true", - "json": "true", - "callbackUrl": f"{API_BASE_URL}/?login-source=floatingSignup", - }, - ) - signin_response.raise_for_status() + csrf_token = _fetch_csrf(session) + _send_otp(session, email, csrf_token) console.print("\n[bold cyan]Step 2: Verification[/bold cyan]") console.print(" Check your email for a [bold]6-digit code[/bold] or [bold]magic link[/bold].") - # Step 3: Prompt user for OTP code - otp_code = Prompt.ask(" Enter code or paste link", console=console).strip() - - if not otp_code: - raise ValueError("OTP code cannot be empty.") - - # Step 4: Convert OTP to redirect URL - with console.status("[bold green]Validating...", spinner="dots"): - if otp_code.startswith("http"): - redirect_url = otp_code - else: - otp_response = session.post( - f"{API_BASE_URL}{ENDPOINT_AUTH_OTP_REDIRECT}", - json={ - "email": email, - "otp": otp_code, - "redirectUrl": f"{API_BASE_URL}/?login-source=floatingSignup", - "emailLoginMethod": "web-otp", - }, - ) - otp_response.raise_for_status() - - redirect_path = otp_response.json().get("redirect", "") - if not redirect_path: - raise ValueError("No redirect URL received.") - - redirect_url = ( - f"{API_BASE_URL}{redirect_path}" if redirect_path.startswith("/") else redirect_path - ) - - # Step 5: Follow redirect to set session cookie - session.get(redirect_url) - - # Step 6: Extract session cookie - session_token = session.cookies.get(SESSION_COOKIE_NAME) - - if not session_token: - raise ValueError("Authentication successful, but token not found.") - - console.print("\n[bold green]✅ Token generated successfully![/bold green]") - console.print(f"\n[bold white]Your session token:[/bold white]\n[green]{session_token}[/green]\n") - - if Confirm.ask("Copy token to clipboard?", default=False, console=console): - try: - copy(session_token) - console.print("[dim]Token copied to clipboard.[/dim]") - except PyperclipException as error: - console.print(f"[red]Could not copy to clipboard: {error}[/red]") + while True: + otp_code = Prompt.ask(" Enter code or paste link", console=console).strip() - _show_exit_message() + if not otp_code: + console.print("[red] OTP code cannot be empty.[/red]") + continue + + if not otp_code.startswith("http") and (not otp_code.isdigit() or len(otp_code) != 6): + console.print("[red] Invalid format. Enter a 6-digit code or a valid magic link.[/red]") + continue + try: + with console.status("[bold green]Validating...", spinner="dots"): + redirect_url = _resolve_redirect_url(session, email, otp_code) + challenge_token = _follow_callback(session, redirect_url) + break + except ValueError as e: + if "Verification failed" in str(e): + console.print(f"[red] ❌ {e}[/red]") + console.print( + "[yellow] The previous request was invalidated. Resending a new code...[/yellow]" + ) + _send_otp(session, email, csrf_token) + console.print(" Check your email for the [bold]new[/bold] 6-digit code.") + continue + + raise + + if challenge_token: + _verify_totp(session, challenge_token) + + _extract_and_present_token(session) + _show_exit_message() except KeyboardInterrupt: raise typer.Exit(code=0) from None except Exception as error: console.print(f"\n[bold red]⛔ Error:[/bold red] {error}") console.input("[dim]Press ENTER to exit...[/dim]") + raise typer.Exit(code=1) from error diff --git a/uv.lock b/uv.lock index dcd81fd..89ab947 100644 --- a/uv.lock +++ b/uv.lock @@ -678,14 +678,14 @@ wheels = [ [[package]] name = "joserfc" -version = "1.6.5" +version = "1.6.7" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "cryptography" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/3b/dc/5f768c2e391e9afabe5d18e3221346deb5fb6338565f1ccc9e7c6d7befdd/joserfc-1.6.5.tar.gz", hash = "sha256:1482a7db78fb4602e44ed89e51b599d052e091288c7c532c5b694e20149dec48", size = 231881, upload-time = "2026-05-06T04:58:13.408Z" } +sdist = { url = "https://files.pythonhosted.org/packages/1b/cb/52e479f20804904f5df20ac4539d292dcecd1287aaa33cba1d1def1d9d8e/joserfc-1.6.7.tar.gz", hash = "sha256:6999fe89457069ecacd8cc797c88a805f83054dd883333fa0409f74b46479fd7", size = 232158, upload-time = "2026-05-23T01:46:44.069Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/54/3b/ad1cb22e75c963b1f07c8a2329bf47227ce7e4361df5eb2fb101b2ce33ef/joserfc-1.6.5-py3-none-any.whl", hash = "sha256:e9878a0f8243fe7b95e11fdda81374ca9f7a689e302751579d3dfdeec559675e", size = 70464, upload-time = "2026-05-06T04:58:11.668Z" }, + { url = "https://files.pythonhosted.org/packages/c5/e4/bcf6718b5662894c6831f46296b73cd4b1a2e90c20b6d437e20c4997388c/joserfc-1.6.7-py3-none-any.whl", hash = "sha256:9e51e4a64840aa1734a058258e80a4480e2ff2d5686e480e7c92c954a92fbe05", size = 70603, upload-time = "2026-05-23T01:46:42.129Z" }, ] [[package]]