-
Notifications
You must be signed in to change notification settings - Fork 33
Expand file tree
/
Copy pathsimple.py
More file actions
73 lines (54 loc) · 2.2 KB
/
simple.py
File metadata and controls
73 lines (54 loc) · 2.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import asyncio
import logging
import httpx
from cdp_use.client import CDPClient
# Configure logging to see what's happening
logging.basicConfig(
level=logging.DEBUG, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
async def main():
# Get WebSocket URL using async HTTPX
async with httpx.AsyncClient() as client:
version_info = await client.get("http://localhost:9222/json/version")
browser_ws_url = version_info.json()["webSocketDebuggerUrl"]
# browser_ws_url = "ws://localhost:3000"
# Connect to Chrome DevTools with type-safe CDP client
async with CDPClient(browser_ws_url) as cdp:
# List all targets (tabs, extensions, etc.) - fully type-safe!
targets_result = await cdp.send.Target.getTargets()
page_targets = [t for t in targets_result["targetInfos"] if t["type"] == "page"]
if not page_targets:
raise RuntimeError("No page targets found.")
target_id = page_targets[0]["targetId"]
print(target_id, targets_result)
# Attach to selected tab
attach_result = await cdp.send.Target.attachToTarget(
params={"targetId": target_id, "flatten": True}
)
session_id = attach_result["sessionId"]
# Enable DOM domain
await cdp.send.DOM.enable(session_id=session_id)
await cdp.send.Page.navigate(
params={"url": "https://www.google.com"}, session_id=session_id
)
# Get full DOM tree
dom_result = await cdp.send.DOM.getDocument(
params={"depth": -1, "pierce": True}, session_id=session_id
)
ax_tree = await cdp.send.Accessibility.getFullAXTree(
params={}, session_id=session_id
)
print(ax_tree)
print("Root node ID:", dom_result["root"]["nodeId"])
# Execute 10 concurrent CDP requests with full type safety
tasks = [
cdp.send.DOM.getDocument(
params={"depth": -1, "pierce": True}, session_id=session_id
)
for _ in range(10)
]
results = await asyncio.gather(*tasks)
print(len(results))
# print(results[-1])
asyncio.run(main())