Skip to content
Open
71 changes: 71 additions & 0 deletions scripts/setup_anthropic_cred.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
#!/usr/bin/env python3
"""
Two-step Anthropic OAuth credential setup.

Step 1 (no args): Generate auth URL + save verifier
python scripts/setup_anthropic_cred.py

Step 2 (with code): Exchange code for tokens
python scripts/setup_anthropic_cred.py "CODE_FROM_BROWSER"
"""
import sys
import os
import json
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "src"))

import asyncio
from pathlib import Path
from rotator_library.providers.anthropic_auth_base import (
_generate_pkce, _build_authorize_url, AnthropicAuthBase
)

STATE_FILE = Path(__file__).parent / ".anthropic_pkce_state.json"
OAUTH_DIR = Path(__file__).parent / ".." / "oauth_creds"

async def exchange_code(auth_code: str):
if not STATE_FILE.exists():
print("Error: PKCE state file not found. Please run Step 1 first.")
sys.exit(1)
state = json.loads(STATE_FILE.read_text())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will raise a FileNotFoundError if Step 2 is run before Step 1. Consider adding a check:

if not STATE_FILE.exists():
    print("Error: PKCE state file not found. Please run Step 1 first.")
    sys.exit(1)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, fixed in 7aa20d4. Added the existence check before reading.

verifier = state["verifier"]

auth = AnthropicAuthBase()
tokens = await auth._exchange_code(auth_code.strip(), verifier)

import time
creds = {
**tokens,
"email": "anthropic-oauth-user",
"_proxy_metadata": {
"email": "anthropic-oauth-user",
"last_check_timestamp": time.time(),
"credential_type": "oauth",
},
}

oauth_dir = OAUTH_DIR.resolve()
oauth_dir.mkdir(parents=True, exist_ok=True)
existing = sorted(oauth_dir.glob("anthropic_oauth_*.json"))
next_num = len(existing) + 1
file_path = oauth_dir / f"anthropic_oauth_{next_num}.json"
Comment on lines +48 to +50
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Avoid len(existing) + 1 for the next credential number.

If anthropic_oauth_2.json is missing but anthropic_oauth_3.json still exists, Line 49 still computes 3 and overwrites the existing credential on the next auth flow.

Suggested fix
-    existing = sorted(oauth_dir.glob("anthropic_oauth_*.json"))
-    next_num = len(existing) + 1
+    existing_nums = [
+        int(path.stem.split("_")[-1])
+        for path in oauth_dir.glob("anthropic_oauth_*.json")
+        if path.stem.split("_")[-1].isdigit()
+    ]
+    next_num = max(existing_nums, default=0) + 1
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@scripts/setup_anthropic_cred.py` around lines 48 - 50, The current logic uses
len(existing) to pick the next file number and can collide if earlier files were
removed; change the allocation to scan existing filenames (from
oauth_dir.glob("anthropic_oauth_*.json")) extract the numeric suffixes (e.g.
parse the part after "anthropic_oauth_" from each Path.stem), compute next_num
as max(found_numbers, default=0) + 1, and then build file_path = oauth_dir /
f"anthropic_oauth_{next_num}.json" so it always picks the next unused numeric
suffix and never overwrites an existing file; reference variables: existing,
next_num, file_path, oauth_dir.


file_path.write_text(json.dumps(creds, indent=2))
os.chmod(file_path, 0o600)
Comment on lines +52 to +53
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OAuth token file briefly world-readable (TOCTOU race)

file_path.write_text(...) creates the file with default umask permissions (0o644 on most systems), and only the subsequent os.chmod(file_path, 0o600) restricts access. Between those two calls the file containing the access and refresh tokens is readable by any local user.

anthropic_auth_base.py's setup_credential() method handles this correctly by calling safe_write_json(..., secure_permissions=True), which writes atomically with restricted permissions. This script should do the same:

from rotator_library.utils.resilient_io import safe_write_json
import logging
_log = logging.getLogger(__name__)

# replace lines 52-53 with:
safe_write_json(str(file_path), creds, _log, secure_permissions=True)

Comment on lines +52 to +53
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Create the credential file with secure permissions from creation time.

Line 52 creates the JSON under the process umask, and Line 53 narrows it only afterward. For access/refresh tokens, this should use the same atomic safe_write_json(..., secure_permissions=True) path already used in AnthropicAuthBase._save_credentials().

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@scripts/setup_anthropic_cred.py` around lines 52 - 53, The current code
writes credentials with file_path.write_text(...) and then calls os.chmod(...),
which is racy with the process umask; replace that pattern by calling the
existing safe_write_json helper used by AnthropicAuthBase._save_credentials and
pass secure_permissions=True so the file is created atomically with correct
0o600 permissions from creation time; remove the write_text and os.chmod calls
and use safe_write_json(creds, file_path, secure_permissions=True) (matching the
behavior and API used in AnthropicAuthBase._save_credentials).

STATE_FILE.unlink(missing_ok=True)

print(f"Credential saved to: {file_path}")
print(f"Access token prefix: {tokens['access_token'][:20]}...")
Comment on lines +25 to +57
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if token exchange fails (line 33), STATE_FILE with PKCE verifier remains on disk; wrap in try-finally to ensure cleanup

Prompt To Fix With AI
This is a comment left during a code review.
Path: scripts/setup_anthropic_cred.py
Line: 25-57

Comment:
if token exchange fails (line 33), STATE_FILE with PKCE verifier remains on disk; wrap in try-finally to ensure cleanup

How can I resolve this? If you propose a fix, please make it concise.


def step1():
verifier, challenge = _generate_pkce()
url = _build_authorize_url(verifier, challenge)
STATE_FILE.write_text(json.dumps({"verifier": verifier, "challenge": challenge}))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PKCE state file written world-readable

STATE_FILE.write_text(...) creates the file with the process's default umask (typically 0o644), so any local user can read the PKCE verifier until Step 2 completes. The verifier must remain secret — if an attacker can read it and also intercepts the auth code (e.g. from a shared browser history or redirect log), they can redeem the code themselves. The credential file in exchange_code() is separately protected with os.chmod(…, 0o600), but the state file has no such protection.

Suggested change
STATE_FILE.write_text(json.dumps({"verifier": verifier, "challenge": challenge}))
STATE_FILE.write_text(json.dumps({"verifier": verifier, "challenge": challenge}))
os.chmod(STATE_FILE, 0o600)

print("Open this URL in your browser, authorize, then copy the code:\n")
print(url)
print(f"\nThen run: python scripts/setup_anthropic_cred.py \"PASTE_CODE_HERE\"")

if __name__ == "__main__":
if len(sys.argv) > 1:
asyncio.run(exchange_code(sys.argv[1]))
else:
step1()
3 changes: 2 additions & 1 deletion src/rotator_library/credential_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
"qwen_code": Path.home() / ".qwen",
"iflow": Path.home() / ".iflow",
"antigravity": Path.home() / ".antigravity",
# Add other providers like 'claude' here if they have a standard CLI path
"anthropic": Path.home() / ".anthropic",
}

# OAuth providers that support environment variable-based credentials
Expand All @@ -28,6 +28,7 @@
"antigravity": "ANTIGRAVITY",
"qwen_code": "QWEN_CODE",
"iflow": "IFLOW",
"anthropic": "ANTHROPIC_OAUTH",
}


Expand Down
2 changes: 2 additions & 0 deletions src/rotator_library/provider_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@
from .providers.qwen_auth_base import QwenAuthBase
from .providers.iflow_auth_base import IFlowAuthBase
from .providers.antigravity_auth_base import AntigravityAuthBase
from .providers.anthropic_auth_base import AnthropicAuthBase

PROVIDER_MAP = {
"gemini_cli": GeminiAuthBase,
"qwen_code": QwenAuthBase,
"iflow": IFlowAuthBase,
"antigravity": AntigravityAuthBase,
"anthropic": AnthropicAuthBase,
}

def get_provider_auth_class(provider_name: str):
Expand Down
Loading
Loading