Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions operate/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ def __init__(self):
self.qwen_api_key = (
None # instance variables are backups in case saving to a `.env` fails
)
self.deepseek_api_key = (
None # instance variables are backups in case saving to a `.env` fails
)

def initialize_openai(self):
if self.verbose:
Expand Down Expand Up @@ -92,6 +95,28 @@ def initialize_qwen(self):
client.base_url = "https://dashscope.aliyuncs.com/compatible-mode/v1"
return client

def initialize_deepseek(self):
if self.verbose:
print("[Config][initialize_deepseek]")

if self.deepseek_api_key:
if self.verbose:
print("[Config][initialize_deepseek] using cached deepseek_api_key")
api_key = self.deepseek_api_key
else:
if self.verbose:
print(
"[Config][initialize_deepseek] no cached deepseek_api_key, try to get from env."
)
api_key = os.getenv("DEEPSEEK_API_KEY")

default_base = "https://api.deepseek.com"
base_url = os.getenv("DEEPSEEK_API_BASE_URL", default_base)
client = OpenAI(api_key=api_key, base_url=base_url)
client.api_key = api_key
client.base_url = base_url
return client

def initialize_google(self):
if self.google_api_key:
if self.verbose:
Expand Down Expand Up @@ -149,6 +174,9 @@ def validation(self, model, voice_mode):
"ANTHROPIC_API_KEY", "Anthropic API key", model == "claude-3"
)
self.require_api_key("QWEN_API_KEY", "Qwen API key", model == "qwen-vl")
self.require_api_key(
"DEEPSEEK_API_KEY", "DeepSeek API key", model == "deepseek-with-ocr"
)

def require_api_key(self, key_name, key_description, is_required):
key_exists = bool(os.environ.get(key_name))
Expand Down Expand Up @@ -177,6 +205,8 @@ def prompt_and_save_api_key(self, key_name, key_description):
self.anthropic_api_key = key_value
elif key_name == "QWEN_API_KEY":
self.qwen_api_key = key_value
elif key_name == "DEEPSEEK_API_KEY":
self.deepseek_api_key = key_value
self.save_api_key_to_env(key_name, key_value)
load_dotenv() # Reload environment variables
# Update the instance attribute with the new key
Expand Down
250 changes: 240 additions & 10 deletions operate/models/apis.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import easyocr
import ollama
import pkg_resources
import pytesseract
from PIL import Image
from ultralytics import YOLO

Expand All @@ -25,7 +26,7 @@
)
from operate.utils.ocr import get_text_coordinates, get_text_element
from operate.utils.screenshot import capture_screen_with_cursor, compress_screenshot
from operate.utils.style import ANSI_BRIGHT_MAGENTA, ANSI_GREEN, ANSI_RED, ANSI_RESET
from operate.utils.style import ANSI_BRIGHT_MAGENTA, ANSI_GREEN, ANSI_RED, ANSI_RESET, ANSI_YELLOW

# Load configuration
config = Config()
Expand Down Expand Up @@ -62,6 +63,9 @@ async def get_next_action(model, messages, objective, session_id):
if model == "claude-3":
operation = await call_claude_3_with_ocr(messages, objective, model)
return operation, None
if model == "deepseek-with-ocr":
operation = await call_deepseek_with_ocr(messages, objective, model)
return operation, None
raise ModelNotRecognizedException(model)


Expand Down Expand Up @@ -218,6 +222,11 @@ async def call_qwen_vl_with_ocr(messages, objective, model):
text_element_index = get_text_element(
result, text_to_click, screenshot_filename
)
if text_element_index is None:
print(
f"{ANSI_GREEN}[Self-Operating Computer]{ANSI_YELLOW} Text '{text_to_click}' not found, skipping click. {ANSI_RESET}"
)
continue
coordinates = get_text_coordinates(
result, text_element_index, screenshot_filename
)
Expand Down Expand Up @@ -259,6 +268,208 @@ async def call_qwen_vl_with_ocr(messages, objective, model):
traceback.print_exc()
return gpt_4_fallback(messages, objective, model)

# Cache EasyOCR reader globally to avoid re-initializing every loop
_easyocr_reader = None

def _get_easyocr_reader():
global _easyocr_reader
if _easyocr_reader is None:
_easyocr_reader = easyocr.Reader(["en"])
return _easyocr_reader


async def call_deepseek_with_ocr(messages, objective, model):
if config.verbose:
print("[call_deepseek_with_ocr]")

try:
# Smarter delay: extra wait after enter/navigation, base otherwise
wait_time = 2
if len(messages) >= 3:
try:
last_assistant = messages[-2].get("content", "")
last_ops = json.loads(last_assistant)
for op in last_ops:
keys = op.get("keys", [])
if any(k.lower() == "enter" for k in keys):
wait_time = 4
break
except (json.JSONDecodeError, KeyError, IndexError):
pass
time.sleep(wait_time)

client = config.initialize_deepseek()

confirm_system_prompt(messages, objective, model)
screenshots_dir = "screenshots"
if not os.path.exists(screenshots_dir):
os.makedirs(screenshots_dir)

screenshot_filename = os.path.join(screenshots_dir, "screenshot.png")
capture_screen_with_cursor(screenshot_filename)

# Use Tesseract for fast, accurate text extraction with bounding boxes
import pytesseract
from PIL import Image as PILImage
img = PILImage.open(screenshot_filename)
ocr_data = pytesseract.image_to_data(img, output_type=pytesseract.Output.DICT)

ocr_text_elements = []
ocr_text_list = ""
idx = 0
for i, text in enumerate(ocr_data["text"]):
t = text.strip()
if t and len(t) > 0 and int(ocr_data["conf"][i]) > 20:
x = ocr_data["left"][i] + ocr_data["width"][i] // 2
y = ocr_data["top"][i] + ocr_data["height"][i] // 2
ocr_text_elements.append({"index": idx, "text": t, "x": x, "y": y})
ocr_text_list += f"[{idx}] \"{t}\"\n"
idx += 1

# Show what Tesseract sees on screen
print(f"\n{ANSI_GREEN}═══ SCREEN TEXT ═══{ANSI_RESET}")
if ocr_text_elements:
for e in ocr_text_elements:
print(f" [{e['index']}] \"{e['text']}\"")
else:
print(f" {ANSI_YELLOW}(no text detected){ANSI_RESET}")
print(f"{ANSI_GREEN}═══════════════════{ANSI_RESET}\n")

if len(messages) == 1:
user_prompt = get_user_first_message_prompt()
else:
user_prompt = get_user_prompt()

if not ocr_text_elements:
# Check message history to understand what happened last
last_actions = ""
if len(messages) >= 3:
try:
last = json.loads(messages[-2].get("content", "[]"))
last_keys = []
for op in last:
if op.get("operation") == "press":
last_keys.extend(op.get("keys", []))
elif op.get("operation") == "write":
last_keys.append(f"typed '{op.get('content','')}'")
if last_keys:
last_actions = f" Your last actions: {', '.join(last_keys)}. "
except Exception:
pass

screen_context = (
f"SCREEN STATUS: No readable text detected on screen.{last_actions}"
f"If you just tried to launch an app, it may already be open - try Alt+Tab or check the panel/dock. "
f"If you're in terminal, you can run commands directly (e.g. type 'vivaldi' and press Enter). "
f"Keyboard shortcuts (Win/Super, Alt+Tab, Ctrl+T) are reliable even with no visible text."
)
else:
screen_context = (
f"Available text elements on screen ({len(ocr_text_elements)} found):\n"
f"{ocr_text_list}\n"
f"When you need to click, reference the exact text string from this list."
)

text_only_prompt = (
f"{user_prompt}\n\n"
f"{screen_context}\n\n"
f"**REMEMBER** Only output valid JSON array. Do not append any other text."
)

messages.append({"role": "user", "content": text_only_prompt})

model_name = os.getenv("DEEPSEEK_MODEL_NAME", "deepseek-v4-pro")
response = client.chat.completions.create(
model=model_name,
messages=messages,
extra_body={"thinking": {"type": "enabled"}},
)

# Show DeepSeek's reasoning/thought process
if hasattr(response.choices[0].message, "reasoning_content") and response.choices[0].message.reasoning_content:
print(f"\n{ANSI_BRIGHT_MAGENTA}[DeepSeek Reasoning]{ANSI_RESET}")
print(response.choices[0].message.reasoning_content[:500])
print(f"{ANSI_BRIGHT_MAGENTA}[End Reasoning]{ANSI_RESET}\n")

content = response.choices[0].message.content

content = clean_json(content)

content_str = content

content = json.loads(content)

processed_content = []
skipped_clicks = False

for operation in content:
if operation.get("operation") == "click":
text_to_click = operation.get("text")
if not text_to_click or text_to_click == "nothing to click":
continue

if config.verbose:
print(
"[call_deepseek_with_ocr][click] text_to_click",
text_to_click,
)

# Find coordinates from Tesseract data via fuzzy match
from difflib import SequenceMatcher
search_lower = text_to_click.lower().strip()
best_score = 0.0
best_elem = None
for elem in ocr_text_elements:
score = SequenceMatcher(None, search_lower, elem["text"].lower()).ratio()
if score > best_score:
best_score = score
best_elem = elem

if best_score >= 0.5 and best_elem:
img_w, img_h = PILImage.open(screenshot_filename).size
operation["x"] = round(best_elem["x"] / img_w, 3)
operation["y"] = round(best_elem["y"] / img_h, 3)
if config.verbose:
print(
"[call_deepseek_with_ocr][click] matched:",
best_elem["text"],
"at",
operation["x"],
operation["y"],
)
processed_content.append(operation)
else:
print(
f"{ANSI_GREEN}[Self-Operating Computer]{ANSI_YELLOW} Text '{text_to_click}' not found on screen (best match: {best_score:.2f}), skipping click. {ANSI_RESET}"
)
skipped_clicks = True

else:
processed_content.append(operation)

# If clicks were skipped, tell the model to use keyboard navigation instead
if skipped_clicks:
assistant_message = {
"role": "assistant",
"content": content_str
+ "\n[System: Some text elements were not found to click. Use keyboard shortcuts (Tab, Enter) or press operations instead of relying on clicks.]",
}
else:
assistant_message = {"role": "assistant", "content": content_str}

messages.append(assistant_message)

return processed_content

except Exception as e:
print(
f"{ANSI_GREEN}[Self-Operating Computer]{ANSI_BRIGHT_MAGENTA}[{model}] Error: {e} {ANSI_RESET}"
)
if config.verbose:
traceback.print_exc()
return []


def call_gemini_pro_vision(messages, objective):
"""
Get the next action for Self-Operating Computer using Gemini Pro Vision
Expand Down Expand Up @@ -382,6 +593,11 @@ async def call_gpt_4o_with_ocr(messages, objective, model):
text_element_index = get_text_element(
result, text_to_click, screenshot_filename
)
if text_element_index is None:
print(
f"{ANSI_GREEN}[Self-Operating Computer]{ANSI_YELLOW} Text '{text_to_click}' not found, skipping click. {ANSI_RESET}"
)
continue
coordinates = get_text_coordinates(
result, text_element_index, screenshot_filename
)
Expand Down Expand Up @@ -490,6 +706,11 @@ async def call_gpt_4_1_with_ocr(messages, objective, model):
text_element_index = get_text_element(
result, text_to_click, screenshot_filename
)
if text_element_index is None:
print(
f"{ANSI_GREEN}[Self-Operating Computer]{ANSI_YELLOW} Text '{text_to_click}' not found, skipping click. {ANSI_RESET}"
)
continue
coordinates = get_text_coordinates(
result, text_element_index, screenshot_filename
)
Expand Down Expand Up @@ -601,6 +822,11 @@ async def call_o1_with_ocr(messages, objective, model):
text_element_index = get_text_element(
result, text_to_click, screenshot_filename
)
if text_element_index is None:
print(
f"{ANSI_GREEN}[Self-Operating Computer]{ANSI_YELLOW} Text '{text_to_click}' not found, skipping click. {ANSI_RESET}"
)
continue
coordinates = get_text_coordinates(
result, text_element_index, screenshot_filename
)
Expand Down Expand Up @@ -1077,17 +1303,21 @@ def get_last_assistant_message(messages):
def gpt_4_fallback(messages, objective, model):
if config.verbose:
print("[gpt_4_fallback]")
system_prompt = get_system_prompt("gpt-4o", objective)
new_system_message = {"role": "system", "content": system_prompt}
# remove and replace the first message in `messages` with `new_system_message`

messages[0] = new_system_message
try:
system_prompt = get_system_prompt("gpt-4o", objective)
new_system_message = {"role": "system", "content": system_prompt}
messages[0] = new_system_message

if config.verbose:
print("[gpt_4_fallback][updated]")
print("[gpt_4_fallback][updated] len(messages)", len(messages))
if config.verbose:
print("[gpt_4_fallback][updated]")
print("[gpt_4_fallback][updated] len(messages)", len(messages))

return call_gpt_4o(messages)
return call_gpt_4o(messages)
except Exception as e:
print(
f"{ANSI_GREEN}[Self-Operating Computer]{ANSI_RED}[Error] GPT-4 fallback also failed: {e} {ANSI_RESET}"
)
raise


def confirm_system_prompt(messages, objective, model):
Expand Down
Loading