-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest.py
More file actions
780 lines (620 loc) · 32.9 KB
/
test.py
File metadata and controls
780 lines (620 loc) · 32.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
# test.py - Automated test version of client_claude.py
# Runs with "how are you?" question without user interaction
from __future__ import annotations
from langchain_openai import ChatOpenAI
from langchain_core.tools import BaseTool
from langchain_core.callbacks import CallbackManagerForToolRun, AsyncCallbackManagerForToolRun
from pydantic import BaseModel, Field
from langgraph.prebuilt import create_react_agent
from langchain_core.messages import SystemMessage, HumanMessage
import asyncio
import os
import traceback
import tiktoken
import json
import time
from typing import Union, Optional, List, Dict, Any, Type
from dotenv import load_dotenv
from langchain_anthropic import ChatAnthropic
from langchain.callbacks.base import BaseCallbackHandler
from langchain.schema import LLMResult
from mcp_use import MCPAgent, MCPClient
# Load environment variables
load_dotenv()
class McpDecisionTool(BaseTool):
"""Tool for deciding whether to use MCP tools"""
name: str = "decide_mcp_usage"
description: str = """You are an assistant for a WooCommerce e-shop that sells clothing and clothing accessories. Your task is to answer user questions concisely and accurately, using MCP tools (e.g., wc_get_product, wc_intelligent_search) whenever the question relates to:Product information (e.g., available sizes, prices, discounts, stock, descriptions, categories, brands).
Product searches (e.g., "what T-shirts do you have?", "are there any discounts?", "which sizes are available?").
E-shop actions (e.g., which products are on sale, etc.).
Rules for using tools:If the question contains words such as "product," "T-shirt," "size," "price," "discount," "available," "category," "brand," "new products," or other e-shop-related terms, USE MCP tools to retrieve accurate data.
If the question is a follow-up (e.g., "what are the sizes?" after a previous product-related query), USE tools and refer to the previous context.
If the question is NOT about products or the e-shop (e.g., general questions like "what time is it?", "how are you?", "what is a REST API?", greetings like "hello," or mathematical questions), DO NOT use tools and respond concisely directly.
If the tools return no relevant data (e.g., no products found), inform the user clearly and concisely, e.g., "No products found, please check the name or availability on the website." Keep responses concise, structured, and focused on the user's question.
"""
class DecisionInput(BaseModel):
reason: str = Field(default="decision", description="Reason for using MCP tools")
args_schema: Type[BaseModel] = DecisionInput
def _run(self, reason: str = "decision", run_manager: Optional[CallbackManagerForToolRun] = None) -> str:
return "Decision made"
async def _arun(self, reason: str = "decision",
run_manager: Optional[AsyncCallbackManagerForToolRun] = None) -> str:
return "Decision made"
class McpFunctionRouter:
"""OpenAI Function Calling Router for decision-making"""
def __init__(self, openai_api_key: str):
self.decision_llm = ChatOpenAI(
model="gpt-4o-mini",
temperature=0,
api_key=openai_api_key
)
self.decision_tool = McpDecisionTool()
self.last_mcp_intent = None # Track the last intent
async def should_use_mcp(self, message: str) -> bool:
"""Decides using OpenAI Function Calling whether to use MCP, with context awareness"""
try:
decision_prompt = f"""You are a routing assistant for WordPress MCP tools. Analyze the user's message and decide if WordPress/WooCommerce tools are needed:
- If the user wants to PERFORM a specific WordPress/WooCommerce action → call the decide_mcp_usage function
- If it is a general question, greeting, mathematical operation, or theoretical discussion → DO NOT call any function
- Consider the previous context: The last intent was '{self.last_mcp_intent}' if set, otherwise assume no context.
User message: "{message}"
CRITERIA FOR CALLING THE FUNCTION:
✅ CALL the function for:
- Creating, editing, deleting WordPress content (posts, pages, products) or questions like "Do you have a product?" in an e-commerce context
- Searching content (e.g., "show posts", "what products are on sale", "do you have...?") - always return specific product names and details
- Managing WordPress/WooCommerce settings
- Specific actions like "add product to cart", "check order status"
- Follow-up queries refining a previous product search (e.g., price filters like "under 100 EURO" after "do you have...?", or "top 5" to list top products) - always refine and return specific products
- Key words (English only): product, discount, category, search, create, update, delete, set, order, top
❌ DO NOT call the function for:
- General questions (e.g., "how does WordPress work?", "what is WooCommerce?")
- Greetings (e.g., "hello, how are you?")
- Mathematical questions (e.g., "what is 5+2?")
- Theoretical discussions or explanations (e.g., "explain REST API")
- Key words: how, what is, explain, why, hello, math"""
decision_agent = create_react_agent(
self.decision_llm,
[self.decision_tool]
)
response = await decision_agent.ainvoke({
"messages": [
SystemMessage(content=decision_prompt),
HumanMessage(content=message)
]
})
# Detect tool usage
used_mcp = False
messages = response.get("messages", [])
for msg in messages:
if hasattr(msg, 'tool_calls') and msg.tool_calls:
for tool_call in msg.tool_calls:
# Handle both dict and object formats
if isinstance(tool_call, dict):
if tool_call.get('function', {}).get('name') == 'decide_mcp_usage':
used_mcp = True
break
elif hasattr(tool_call, 'function') and hasattr(tool_call.function, 'name'):
if tool_call.function.name == 'decide_mcp_usage':
used_mcp = True
break
elif hasattr(tool_call, 'name'):
if tool_call.name == 'decide_mcp_usage':
used_mcp = True
break
if hasattr(msg, 'additional_kwargs') and msg.additional_kwargs:
tool_calls = msg.additional_kwargs.get('tool_calls', [])
for tool_call in tool_calls:
# Handle both dict and object formats
if isinstance(tool_call, dict):
if tool_call.get('function', {}).get('name') == 'decide_mcp_usage':
used_mcp = True
break
elif hasattr(tool_call, 'function') and hasattr(tool_call.function, 'name'):
if tool_call.function.name == 'decide_mcp_usage':
used_mcp = True
break
elif hasattr(tool_call, 'name'):
if tool_call.name == 'decide_mcp_usage':
used_mcp = True
break
# Update last intent if MCP tools are used
if used_mcp:
self.last_mcp_intent = message if not self.last_mcp_intent else f"{self.last_mcp_intent}, {message}"
elif self.last_mcp_intent and any(
keyword in message.lower() for keyword in ['under', 'price', 'euro', 'cost', 'top']):
# Treat price-related or top-related follow-ups as MCP if context exists
used_mcp = True
# Fallback keyword check (English only)
if not used_mcp:
message_lower = message.lower()
mcp_keywords = ['product', 'discount', 'category', 'search', 'create', 'update', 'delete', 'set',
'order', 'top']
general_keywords = ['how', 'what is', 'explain', 'why', 'hello', 'math']
if any(keyword in message_lower for keyword in mcp_keywords):
used_mcp = True
elif any(keyword in message_lower for keyword in general_keywords):
used_mcp = False
print(f"🧠 Decision: Use MCP tools = {used_mcp}")
return used_mcp
except Exception as e:
print(f"⚠️ Decision error: {e}")
traceback.print_exc()
# Fallback decision with English keywords
message_lower = message.lower()
mcp_keywords = ['product', 'discount', 'category', 'search', 'create', 'update', 'delete', 'set', 'order',
'top']
used_mcp = any(keyword in message_lower for keyword in mcp_keywords)
print(f"🧠 Fallback decision: Use MCP tools = {used_mcp}")
return used_mcp
class TokenLimitManager:
"""Token limits manager"""
def __init__(self):
# Load limits from .env
self.max_input_tokens_per_call = int(os.getenv("MAX_INPUT_TOKENS_PER_CALL", "12000"))
self.max_output_tokens_per_call = int(os.getenv("MAX_OUTPUT_TOKENS_PER_CALL", "4000"))
self.max_total_input_tokens_per_interaction = int(os.getenv("MAX_TOTAL_INPUT_TOKENS_PER_INTERACTION", "40000"))
self.max_total_output_tokens_per_interaction = int(
os.getenv("MAX_TOTAL_OUTPUT_TOKENS_PER_INTERACTION", "15000"))
self.max_llm_calls_per_interaction = int(os.getenv("MAX_LLM_CALLS_PER_INTERACTION", "12"))
# Smart truncation settings
self.tool_response_truncation_threshold = int(os.getenv("TOOL_RESPONSE_TRUNCATION_THRESHOLD", "8000"))
self.max_products_in_truncated_response = int(os.getenv("MAX_PRODUCTS_IN_TRUNCATED_RESPONSE", "10"))
self.max_lines_in_truncated_text = int(os.getenv("MAX_LINES_IN_TRUNCATED_TEXT", "50"))
self.chars_per_token_ratio = float(os.getenv("CHARS_PER_TOKEN_RATIO", "3.5"))
# Preemptive cleanup settings
self.preemptive_cleanup_threshold = int(os.getenv("PREEMPTIVE_CLEANUP_THRESHOLD", "5000"))
self.nuclear_cleanup_threshold = int(os.getenv("NUCLEAR_CLEANUP_THRESHOLD", "8000"))
# Memory management settings - more aggressive
self.max_conversation_messages = int(os.getenv("MAX_CONVERSATION_MESSAGES", "4"))
self.memory_cleanup_threshold = float(os.getenv("MEMORY_CLEANUP_THRESHOLD", "0.5")) # 50% of limit
self.aggressive_cleanup_threshold = float(
os.getenv("AGGRESSIVE_CLEANUP_THRESHOLD", "0.8")) # 80% for extra cleanup
# Counters for current interaction
self.current_interaction_input_tokens = 0
self.current_interaction_output_tokens = 0
self.current_interaction_llm_calls = 0
self.encoding = tiktoken.get_encoding("cl100k_base")
print(f"📋 Token Limits set:")
print(f" • Max input tokens/call: {self.max_input_tokens_per_call:,}")
print(f" • Max output tokens/call: {self.max_output_tokens_per_call:,}")
print(f" • Max total input/interaction: {self.max_total_input_tokens_per_interaction:,}")
print(f" • Max total output/interaction: {self.max_total_output_tokens_per_interaction:,}")
print(f" • Max LLM calls/interaction: {self.max_llm_calls_per_interaction}")
def reset_interaction(self):
"""Reset counters for new interaction"""
self.current_interaction_input_tokens = 0
self.current_interaction_output_tokens = 0
self.current_interaction_llm_calls = 0
def can_make_llm_call(self, estimated_input_tokens: int = 0) -> tuple[bool, str]:
"""Check if we can make another LLM call"""
# Check number of LLM calls
if self.current_interaction_llm_calls >= self.max_llm_calls_per_interaction:
return False, f"Reached LLM call limit ({self.max_llm_calls_per_interaction})"
# Check input tokens for call
if estimated_input_tokens > self.max_input_tokens_per_call:
return False, f"Input for call too large ({estimated_input_tokens:,} > {self.max_input_tokens_per_call:,})"
# Check total input tokens for interaction
if (
self.current_interaction_input_tokens + estimated_input_tokens) > self.max_total_input_tokens_per_interaction:
return False, f"Total input tokens too large ({self.current_interaction_input_tokens + estimated_input_tokens:,} > {self.max_total_input_tokens_per_interaction:,})"
return True, "OK"
def can_accept_output(self, output_tokens: int) -> tuple[bool, str]:
"""Check if we can accept output"""
# Check output tokens for call
if output_tokens > self.max_output_tokens_per_call:
return False, f"Output for call too large ({output_tokens:,} > {self.max_output_tokens_per_call:,})"
# Check total output tokens for interaction
if (self.current_interaction_output_tokens + output_tokens) > self.max_total_output_tokens_per_interaction:
return False, f"Total output tokens too large ({self.current_interaction_output_tokens + output_tokens:,} > {self.max_total_output_tokens_per_interaction:,})"
return True, "OK"
def add_llm_call(self, input_tokens: int, output_tokens: int):
"""Add LLM call to counters"""
self.current_interaction_input_tokens += input_tokens
self.current_interaction_output_tokens += output_tokens
self.current_interaction_llm_calls += 1
class AgentTokenCounterCallback(BaseCallbackHandler):
"""Callback handler for token counting in agents"""
def __init__(self, token_counter, token_limit_manager):
self.token_counter = token_counter
self.token_limit_manager = token_limit_manager
self.encoding = tiktoken.get_encoding("cl100k_base")
self.current_step_tokens = {'input': 0, 'output': 0}
self.should_stop = False
self.agent_ref = None
def set_agent_reference(self, agent):
"""Set agent reference for memory management"""
self.agent_ref = agent
def on_llm_start(self, serialized: Dict[str, Any], prompts: List[str], **kwargs) -> None:
"""Capture start of LLM call"""
if prompts:
input_text = ' '.join(prompts)
input_tokens = len(self.encoding.encode(input_text))
self.current_step_tokens['input'] = input_tokens
print(f"🔍 LLM START: Input {input_tokens:,} tokens")
def on_llm_end(self, response: LLMResult, **kwargs) -> None:
"""Capture end of LLM call - FIXED real token data retrieval"""
if self.should_stop:
self.should_stop = False
return
# === AGGRESSIVE SEARCH FOR REAL TOKEN DATA ===
real_input = 0
real_output = 0
found_real_data = False
print(f"🔍 SEARCHING for real token data in all possible places:")
# === NEW: Search in message objects ===
if response.generations:
for i, gen_list in enumerate(response.generations):
for j, gen in enumerate(gen_list):
if hasattr(gen, 'message'):
message = gen.message
# 1. usage_metadata (FIXED - it's a dict!)
if hasattr(message, 'usage_metadata') and message.usage_metadata:
usage = message.usage_metadata
if 'input_tokens' in usage: # ← FIXED!
real_input = usage['input_tokens']
real_output = usage['output_tokens']
print(f" ✅ Found in message.usage_metadata: {real_input:,} + {real_output:,}")
found_real_data = True
break
# 2. response_metadata
if not found_real_data and hasattr(message, 'response_metadata') and message.response_metadata:
metadata = message.response_metadata
if 'usage' in metadata:
usage = metadata['usage']
real_input = usage.get('input_tokens', 0)
real_output = usage.get('output_tokens', 0)
if real_input > 0 or real_output > 0:
print(
f" ✅ Found in message.response_metadata.usage: {real_input:,} + {real_output:,}")
found_real_data = True
break
if found_real_data:
break
if found_real_data:
break
# === IF WE HAVE REAL DATA, USE IT ===
if found_real_data and (real_input > 0 or real_output > 0):
print(f"🎯 USING REAL API tokens: {real_input:,} input + {real_output:,} output")
self.current_step_tokens['input'] = real_input
self.current_step_tokens['output'] = real_output
# Add to limits and counters
self.token_limit_manager.add_llm_call(real_input, real_output)
if hasattr(self.token_counter, 'add_llm_call'):
self.token_counter.add_llm_call(real_input, real_output)
return # ← KEY: End here if we have real data!
# === BACKUP: TIKTOKEN ESTIMATE ===
print("⚠️ Real token data not found, using tiktoken estimate")
if response.generations:
# Count output tokens
output_text = ''
for generation in response.generations:
for gen in generation:
if hasattr(gen, 'text'):
output_text += gen.text
output_tokens = len(self.encoding.encode(output_text))
# Check output limits
can_accept, reason = self.token_limit_manager.can_accept_output(output_tokens)
if not can_accept:
print(f"\n⚠️ Output limited: {reason}")
return
self.current_step_tokens['output'] = output_tokens
# Add to limits and token counter (use original input from on_llm_start)
self.token_limit_manager.add_llm_call(
self.current_step_tokens['input'],
output_tokens
)
if hasattr(self.token_counter, 'add_llm_call'):
self.token_counter.add_llm_call(
self.current_step_tokens['input'],
output_tokens
)
class AdvancedTokenCounter:
def __init__(self, token_limit_manager):
# Claude uses cl100k_base encoding (same as GPT-4)
self.encoding = tiktoken.get_encoding("cl100k_base")
self.token_limit_manager = token_limit_manager
self.reset_session()
def reset_session(self):
"""Reset counters for entire session"""
self.session_input_tokens = 0
self.session_output_tokens = 0
self.conversation_history = []
self.current_interaction = {}
def count_tokens(self, text: str) -> int:
"""Count tokens in text"""
if not text or not isinstance(text, str):
return 0
return len(self.encoding.encode(text))
def start_interaction(self, user_input: str):
"""Start new interaction"""
# Reset token limit manager
self.token_limit_manager.reset_interaction()
self.current_interaction = {
'user_input': user_input,
'input_tokens': self.count_tokens(user_input),
'llm_calls': [],
'final_response': '',
'total_input_tokens': 0,
'total_output_tokens': 0,
'start_time': time.time(),
}
def add_llm_call(self, input_tokens: int, output_tokens: int):
"""Add LLM call to current interaction"""
self.current_interaction['llm_calls'].append({
'input_tokens': input_tokens,
'output_tokens': output_tokens
})
def finish_interaction(self, final_response):
"""Finish interaction and calculate total tokens"""
# Extract text from complex structure
if isinstance(final_response, str):
final_response_text = final_response
else:
final_response_text = str(final_response)
final_response_tokens = self.count_tokens(final_response_text)
self.current_interaction['final_response'] = final_response_text
self.current_interaction['duration'] = time.time() - self.current_interaction['start_time']
# === CORRECT INPUT AND OUTPUT TOKEN CALCULATION ===
# INPUT TOKENS = everything that goes TO Claude
total_input = self.current_interaction['input_tokens'] # User input
# Add LLM input tokens
for llm_call in self.current_interaction['llm_calls']:
total_input += llm_call['input_tokens']
# OUTPUT TOKENS = only what Claude GENERATED
total_output = final_response_tokens # Final response
# Add LLM output tokens
for llm_call in self.current_interaction['llm_calls']:
total_output += llm_call['output_tokens']
# Save results
self.current_interaction['total_input_tokens'] = total_input
self.current_interaction['total_output_tokens'] = total_output
# Add to session counter
self.session_input_tokens += total_input
self.session_output_tokens += total_output
# Save to history
self.conversation_history.append(self.current_interaction.copy())
return total_input, total_output
def print_interaction_tokens(self, input_tokens: int, output_tokens: int):
"""Print detailed tokens for this interaction"""
duration = self.current_interaction.get('duration', 0)
llm_calls = len(self.current_interaction.get('llm_calls', []))
print(f"\n📊 Token Usage (this response):")
print(f" 📥 Input tokens: {input_tokens:,}")
print(f" • User input: {self.current_interaction['input_tokens']:,}")
if self.current_interaction.get('llm_calls'):
llm_input_total = sum(call['input_tokens'] for call in self.current_interaction['llm_calls'])
print(f" • LLM calls input: {llm_input_total:,}")
print(f" 📤 Output tokens: {output_tokens:,}")
if self.current_interaction.get('llm_calls'):
llm_output_total = sum(call['output_tokens'] for call in self.current_interaction['llm_calls'])
print(f" • LLM generated text: {llm_output_total:,}")
print(f" • Final response: {self.count_tokens(self.current_interaction['final_response']):,}")
print(f" 📊 Total tokens: {input_tokens + output_tokens:,}")
print(f" ⏱️ Duration: {duration:.1f}s")
print(f" 🌐 LLM calls: {llm_calls}")
class SmartMCPAgent(MCPAgent):
"""MCPAgent with rate limiting and memory management - simplified for Haiku"""
def __init__(self, *args, token_callback=None, **kwargs):
super().__init__(*args, **kwargs)
self.token_callback = token_callback
self.iteration_count = 0
if token_callback:
token_callback.set_agent_reference(self)
async def initialize(self):
"""Override initialize to setup tool interception after tools are ready"""
await super().initialize()
async def run(self, query: str, max_steps=5, **kwargs):
"""Simplified run with automatic fallback"""
self.iteration_count = 0
print(f"🚀 SmartMCPAgent.run() starting (max {max_steps} steps)")
try:
# Set max_steps for MCPAgent
original_max_steps = self.max_steps
self.max_steps = max_steps
try:
# Run original MCPAgent.run
result = await super(SmartMCPAgent, self).run(query, **kwargs)
return result
except Exception as e:
print(f"❌ Error during agent run: {e}")
return self._create_fallback_response(query)
finally:
# Restore original max_steps
self.max_steps = original_max_steps
except Exception as e:
print(f"⚠️ Agent run failed: {e}")
return self._create_fallback_response(query)
def _create_fallback_response(self, query: str) -> str:
"""Create fallback response"""
if any(keyword in query.lower() for keyword in ['product', 'price', 'search', 'buy']):
return "Sorry, I can't search for specific products at the moment. Please visit the shop website for current information."
else:
return "I'm doing well, thank you for asking! How can I help you today?"
# Global rate limiter with adaptive delay
last_api_call = 0
consecutive_429_errors = 0
async def rate_limited_execution(func, *args, **kwargs):
"""Wrapper for rate limiting any async function with adaptive delay"""
global last_api_call, consecutive_429_errors
# Adaptive delay based on errors
base_delay = float(os.getenv("MIN_DELAY_BETWEEN_CALLS", "1.5"))
adaptive_delay = base_delay + (consecutive_429_errors * 0.5) # Add 0.5s per 429 error
adaptive_delay = min(adaptive_delay, 5.0) # Max 5 seconds
current_time = time.time()
time_since_last = current_time - last_api_call
if time_since_last < adaptive_delay:
wait_time = adaptive_delay - time_since_last
print(f"⏳ Rate limiting: waiting {wait_time:.1f}s (adaptive delay: {adaptive_delay:.1f}s)...")
await asyncio.sleep(wait_time)
last_api_call = time.time()
# Retry logic for 429 errors with exponential backoff
max_retries = 3
base_wait = 2.0
for attempt in range(max_retries):
try:
result = await func(*args, **kwargs)
# Success - reset 429 counter
consecutive_429_errors = max(0, consecutive_429_errors - 1)
return result
except Exception as e:
error_msg = str(e).lower()
if "429" in error_msg or "too many requests" in error_msg:
consecutive_429_errors += 1
wait_time = base_wait * (2 ** attempt)
print(f"⚠️ Rate limit hit (attempt {attempt + 1}/{max_retries})")
print(f"⏳ Waiting {wait_time:.1f}s...")
await asyncio.sleep(wait_time)
continue
else:
# Other error, break
raise e
# If all attempts exhausted
consecutive_429_errors += 2 # Penalty for complete failure
raise Exception(f"Rate limit exceeded after {max_retries} retries")
async def main():
"""Main test function - automatically runs with 'how are you?' question"""
print("🔧 Starting automated test with question: 'how are you?'")
print("="*60)
config = {
"mcpServers": {
"wordpress_server": {
"url": os.getenv("MCP_BASE_URL"),
"headers": {
"Authorization": f"Bearer {os.getenv('JWT_TOKEN')}",
"Content-Type": "application/json"
}
}
}
}
# Create MCP client
client = MCPClient.from_dict(config)
# Create sessions with error handling
print("🔍 Creating MCP sessions...")
try:
sessions = await client.create_all_sessions()
print(f"✅ Created sessions: {list(sessions.keys())}")
# Check available tools
for name, session in sessions.items():
tools = session.connector.tools
print(f"🔧 Server '{name}' has {len(tools)} tools")
except Exception as e:
print(f"❌ Error creating sessions: {e}")
traceback.print_exc()
return
# Create token limit manager
token_limit_manager = TokenLimitManager()
# Create advanced token counter
token_counter = AdvancedTokenCounter(token_limit_manager)
# Create callback for token counting
token_callback = AgentTokenCounterCallback(token_counter, token_limit_manager)
# Use dynamic max_tokens from .env
max_tokens = token_limit_manager.max_output_tokens_per_call
# Create Claude LLM with callback
print("🔍 Creating Claude LLM...")
try:
llm = ChatAnthropic(
model="claude-3-5-haiku-20241022",
api_key=os.getenv("ANTHROPIC_API_KEY"),
temperature=0.1,
max_tokens=max_tokens,
max_retries=2,
callbacks=[token_callback]
)
print(f"✅ Claude LLM created (max_tokens: {max_tokens:,})")
except Exception as e:
print(f"❌ Error creating Claude LLM: {e}")
print("💡 Make sure you have ANTHROPIC_API_KEY set in .env file")
return
# Create MCP agent with smart memory management
print("🔍 Creating Smart MCPAgent...")
try:
agent = SmartMCPAgent(
llm=llm,
client=client,
max_steps=5,
memory_enabled=True,
auto_initialize=False,
token_callback=token_callback
)
print("✅ Smart MCPAgent created")
# Manual initialization with detailed logging
print("🔍 Initializing agent...")
await agent.initialize()
print("✅ Agent initialized")
# Check if agent has _tools attribute
if hasattr(agent, '_tools'):
print(f"✅ Agent has _tools: {len(agent._tools) if agent._tools else 0}")
else:
print("❌ Agent doesn't have _tools attribute!")
return
except Exception as e:
print(f"❌ Error creating/initializing agent: {e}")
traceback.print_exc()
return
print("\n🚀 WordPress Claude Chat Bot Test Started!")
print("🤖 Powered by Claude Haiku with automatic tool detection")
print("📊 Advanced token counting + Memory management")
print("🎯 Test question: 'how are you?'")
print("-" * 60)
# === AUTOMATED TEST EXECUTION ===
try:
# Test question
user_input = "how are you?"
print(f"\n👤 Test Question: {user_input}")
print("🤖 Claude: ", end="", flush=True)
try:
# Check before running
if not hasattr(agent, '_tools') or agent._tools is None:
print("\n❌ Agent doesn't have initialized tools!")
return
# Start interaction
token_counter.start_interaction(user_input)
# Run with max_steps for iteration control
result = await agent.run(
user_input,
manage_connector=False,
max_steps=5 # Limit to 5 iterations
)
print(result)
# Finish interaction
input_tokens, output_tokens = token_counter.finish_interaction(result)
token_counter.print_interaction_tokens(input_tokens, output_tokens)
except Exception as e:
print(f"\n❌ Error during processing: {e}")
print(f"📝 Error type: {type(e).__name__}")
# Fallback: Response without tools
print("\n🔄 Trying response without tools...")
try:
token_counter.start_interaction(user_input)
fallback_response = await llm.ainvoke([
SystemMessage(
content="Answer the user's question briefly and directly, without using tools. If they ask about products, inform them to visit the website directly."
),
HumanMessage(content=user_input)
])
response_text = fallback_response.content if hasattr(fallback_response, 'content') else str(
fallback_response)
print(f"🤖 Claude (fallback): {response_text}")
# Count tokens for fallback
input_tokens = token_counter.count_tokens(user_input)
output_tokens = token_counter.count_tokens(response_text)
token_counter.add_llm_call(input_tokens, output_tokens)
input_tokens, output_tokens = token_counter.finish_interaction(response_text)
token_counter.print_interaction_tokens(input_tokens, output_tokens)
except Exception as fallback_error:
print(f"❌ Fallback also failed: {fallback_error}")
print("💡 Try rephrasing the question or restarting the test.")
except Exception as e:
print(f"\n❌ Unexpected error: {e}")
traceback.print_exc()
finally:
print("\n🧹 Closing connections...")
try:
if client.sessions:
await client.close_all_sessions()
print("✅ Connections closed.")
except Exception as e:
print(f"⚠️ Error closing: {e}")
print("\n✅ Test completed!")
if __name__ == "__main__":
asyncio.run(main())