From 702b1dbbbd2fddd76572ed6b8bdabceeebcf30ea Mon Sep 17 00:00:00 2001 From: David Hurley Date: Wed, 8 Apr 2026 11:54:42 +0000 Subject: [PATCH] =?UTF-8?q?feat:=20add=20AsyncPlasmateCrawlerStrategy=20?= =?UTF-8?q?=E2=80=94=20lightweight=20alternative=20to=20Playwright?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes #1256 (memory leak in Docker from Chrome) Related to #1874 (token usage tracking) Plasmate (https://github.com/plasmate-labs/plasmate) is an open-source Rust browser engine that replaces Chrome/Playwright for static pages. No browser process, ~64MB RAM vs ~300MB, 10-100x fewer tokens per page. Changes: - crawl4ai/async_plasmate_strategy.py: AsyncPlasmateCrawlerStrategy - Implements AsyncCrawlerStrategy ABC (drop-in replacement) - Supports output_format: text (default), markdown, som, links - Supports --selector, --header, --timeout flags - Optional fallback_to_playwright=True for JS-heavy SPAs - Subprocess runs in asyncio executor — safe for concurrent use - crawl4ai/__init__.py: export AsyncPlasmateCrawlerStrategy - tests/general/test_plasmate_strategy.py: 20 unit tests Install: pip install plasmate Usage: from crawl4ai import AsyncWebCrawler from crawl4ai.async_plasmate_strategy import AsyncPlasmateCrawlerStrategy strategy = AsyncPlasmateCrawlerStrategy( output_format="markdown", fallback_to_playwright=True, # SPA safety net ) async with AsyncWebCrawler(crawler_strategy=strategy) as crawler: result = await crawler.arun("https://docs.python.org/3/") --- crawl4ai/__init__.py | 1 + crawl4ai/async_plasmate_strategy.py | 237 +++++++++++++++++++ tests/general/test_plasmate_strategy.py | 302 ++++++++++++++++++++++++ 3 files changed, 540 insertions(+) create mode 100644 crawl4ai/async_plasmate_strategy.py create mode 100644 tests/general/test_plasmate_strategy.py diff --git a/crawl4ai/__init__.py b/crawl4ai/__init__.py index 03e734deb..271f28a6c 100644 --- a/crawl4ai/__init__.py +++ b/crawl4ai/__init__.py @@ -2,6 +2,7 @@ import warnings from .async_webcrawler import AsyncWebCrawler, CacheMode +from .async_plasmate_strategy import AsyncPlasmateCrawlerStrategy # MODIFIED: Add SeedingConfig and VirtualScrollConfig here from .async_configs import BrowserConfig, CrawlerRunConfig, HTTPCrawlerConfig, LLMConfig, ProxyConfig, GeolocationConfig, SeedingConfig, VirtualScrollConfig, LinkPreviewConfig, MatchMode diff --git a/crawl4ai/async_plasmate_strategy.py b/crawl4ai/async_plasmate_strategy.py new file mode 100644 index 000000000..56ba55a90 --- /dev/null +++ b/crawl4ai/async_plasmate_strategy.py @@ -0,0 +1,237 @@ +""" +AsyncPlasmateCrawlerStrategy — lightweight alternative to AsyncPlaywrightCrawlerStrategy. + +Uses Plasmate (https://github.com/plasmate-labs/plasmate) instead of Chrome/Playwright. +Plasmate is an open-source Rust browser engine that outputs Structured Object Model (SOM) +instead of raw HTML, using ~64MB RAM per session vs ~300MB and delivering +10-100x fewer tokens per page — significantly reducing LLM costs. + +Install: pip install plasmate +Docs: https://plasmate.app +""" + +from __future__ import annotations + +import asyncio +import shutil +import subprocess +from typing import Dict, List, Optional + +from .async_crawler_strategy import AsyncCrawlerStrategy +from .async_logger import AsyncLogger +from .models import AsyncCrawlResponse + +_INSTALL_MSG = ( + "plasmate is required for AsyncPlasmateCrawlerStrategy. " + "Install it with: pip install plasmate\n" + "Docs: https://plasmate.app" +) + +_VALID_FORMATS = ("text", "markdown", "som", "links") + + +def _find_plasmate() -> Optional[str]: + """Return the resolved path to the plasmate binary, or None.""" + path = shutil.which("plasmate") + if path: + return path + try: + import plasmate as _p # noqa: F401 + return shutil.which("plasmate") + except ImportError: + return None + + +class AsyncPlasmateCrawlerStrategy(AsyncCrawlerStrategy): + """Lightweight crawler strategy using Plasmate instead of Chrome/Playwright. + + Plasmate fetches pages and returns them as Structured Object Model (SOM) + or plain text / markdown — no browser process, no GPU, no 300 MB Chrome. + + This strategy is a drop-in replacement for ``AsyncPlaywrightCrawlerStrategy`` + for static and server-rendered pages. For JavaScript-heavy SPAs that require + a real browser, set ``fallback_to_playwright=True``. + + Attributes: + output_format: Page output format — ``"text"`` (default), ``"markdown"``, + ``"som"`` (full JSON), or ``"links"``. + timeout: Per-request timeout in seconds. Defaults to 30. + selector: Optional ARIA role or CSS id selector to scope extraction + (e.g. ``"main"`` or ``"#article"``). + extra_headers: Optional HTTP headers forwarded with each request. + fallback_to_playwright: If True, retry with Playwright when Plasmate + returns an empty response (handles SPAs automatically). + verbose: Whether to emit log messages. Defaults to True. + + Example — drop-in replacement:: + + import asyncio + from crawl4ai import AsyncWebCrawler + from crawl4ai.async_plasmate_strategy import AsyncPlasmateCrawlerStrategy + + async def main(): + strategy = AsyncPlasmateCrawlerStrategy( + output_format="markdown", + timeout=30, + fallback_to_playwright=True, + ) + async with AsyncWebCrawler(crawler_strategy=strategy) as crawler: + result = await crawler.arun("https://docs.python.org/3/") + print(result.markdown[:500]) + + asyncio.run(main()) + + Example — direct use:: + + strategy = AsyncPlasmateCrawlerStrategy(output_format="text") + async with strategy: + response = await strategy.crawl("https://example.com") + print(response.html) # clean text output, no HTML boilerplate + """ + + def __init__( + self, + output_format: str = "text", + timeout: int = 30, + selector: Optional[str] = None, + extra_headers: Optional[Dict[str, str]] = None, + fallback_to_playwright: bool = False, + verbose: bool = True, + logger: Optional[AsyncLogger] = None, + **kwargs, + ): + if output_format not in _VALID_FORMATS: + raise ValueError( + f"output_format must be one of {_VALID_FORMATS}; got {output_format!r}" + ) + self.output_format = output_format + self.timeout = timeout + self.selector = selector + self.extra_headers = extra_headers or {} + self.fallback_to_playwright = fallback_to_playwright + self.verbose = verbose + self.logger = logger or AsyncLogger(verbose=verbose) + self._plasmate_bin: Optional[str] = None + + # ------------------------------------------------------------------ + # Context manager + # ------------------------------------------------------------------ + + async def __aenter__(self) -> "AsyncPlasmateCrawlerStrategy": + self._plasmate_bin = _find_plasmate() + if self._plasmate_bin is None: + raise ImportError(_INSTALL_MSG) + if self.verbose: + self.logger.info( + f"AsyncPlasmateCrawlerStrategy ready (format={self.output_format}, " + f"timeout={self.timeout}s, fallback={self.fallback_to_playwright})", + tag="INIT", + ) + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: + # No persistent process to clean up — each fetch is a short-lived subprocess. + pass + + # ------------------------------------------------------------------ + # Internal helpers + # ------------------------------------------------------------------ + + def _build_cmd(self, url: str) -> List[str]: + """Build the plasmate CLI command for a URL.""" + cmd = [ + self._plasmate_bin, + "fetch", + url, + "--format", self.output_format, + "--timeout", str(self.timeout * 1000), # plasmate uses ms + ] + if self.selector: + cmd += ["--selector", self.selector] + for key, value in self.extra_headers.items(): + cmd += ["--header", f"{key}: {value}"] + return cmd + + async def _fetch(self, url: str) -> tuple[str, int]: + """Run plasmate in a thread-pool executor; returns (content, status_code).""" + loop = asyncio.get_event_loop() + + def _run() -> tuple[str, int]: + try: + result = subprocess.run( + self._build_cmd(url), + capture_output=True, + text=True, + timeout=self.timeout + 5, + ) + if result.returncode != 0: + if self.verbose: + self.logger.warning( + f"plasmate exited {result.returncode} for {url}: " + f"{result.stderr[:200]}", + tag="FETCH", + ) + return "", 500 + return result.stdout.strip(), 200 + except subprocess.TimeoutExpired: + if self.verbose: + self.logger.warning(f"Timeout fetching {url}", tag="FETCH") + return "", 504 + except FileNotFoundError: + raise ImportError(_INSTALL_MSG) + + return await loop.run_in_executor(None, _run) + + async def _playwright_fallback(self, url: str) -> tuple[str, int]: + """Delegate to AsyncPlaywrightCrawlerStrategy and return its raw HTML.""" + if self.verbose: + self.logger.info( + f"Plasmate returned empty — falling back to Playwright for {url}", + tag="FALLBACK", + ) + from .async_crawler_strategy import AsyncPlaywrightCrawlerStrategy + + strategy = AsyncPlaywrightCrawlerStrategy() + async with strategy: + response = await strategy.crawl(url) + return response.html, response.status_code + + # ------------------------------------------------------------------ + # Public API + # ------------------------------------------------------------------ + + async def crawl(self, url: str, **kwargs) -> AsyncCrawlResponse: + """Fetch *url* with Plasmate and return an :class:`AsyncCrawlResponse`. + + The ``html`` field of the response contains the Plasmate output in the + requested format (text / markdown / SOM JSON / links) rather than raw HTML. + Downstream Crawl4AI extraction strategies receive this pre-processed content, + reducing token consumption before any LLM call. + + Args: + url: The URL to fetch. + **kwargs: Ignored (accepted for interface compatibility). + + Returns: + :class:`AsyncCrawlResponse` with ``html`` set to Plasmate output. + """ + if self.verbose: + self.logger.info(f"Fetching: {url}", tag="FETCH") + + content, status_code = await self._fetch(url) + + if not content.strip() and self.fallback_to_playwright: + content, status_code = await self._playwright_fallback(url) + + if self.verbose and content: + self.logger.success( + f"Got {len(content):,} chars from {url} " + f"(format={self.output_format})", + tag="FETCH", + ) + + return AsyncCrawlResponse( + html=content, + response_headers={}, + status_code=status_code, + ) diff --git a/tests/general/test_plasmate_strategy.py b/tests/general/test_plasmate_strategy.py new file mode 100644 index 000000000..e941727a0 --- /dev/null +++ b/tests/general/test_plasmate_strategy.py @@ -0,0 +1,302 @@ +"""Tests for AsyncPlasmateCrawlerStrategy.""" + +import asyncio +import subprocess +from unittest.mock import MagicMock, patch + +import pytest + +from crawl4ai.async_plasmate_strategy import AsyncPlasmateCrawlerStrategy +from crawl4ai.models import AsyncCrawlResponse + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _completed_process(stdout: str = "extracted content", returncode: int = 0) -> MagicMock: + m = MagicMock() + m.stdout = stdout + m.returncode = returncode + m.stderr = "" + return m + + +# --------------------------------------------------------------------------- +# Initialisation +# --------------------------------------------------------------------------- + +def test_init_defaults(): + strategy = AsyncPlasmateCrawlerStrategy() + assert strategy.output_format == "text" + assert strategy.timeout == 30 + assert strategy.selector is None + assert strategy.extra_headers == {} + assert strategy.fallback_to_playwright is False + + +def test_init_custom(): + strategy = AsyncPlasmateCrawlerStrategy( + output_format="markdown", + timeout=60, + selector="main", + extra_headers={"X-Custom": "val"}, + fallback_to_playwright=True, + ) + assert strategy.output_format == "markdown" + assert strategy.timeout == 60 + assert strategy.selector == "main" + assert strategy.extra_headers == {"X-Custom": "val"} + assert strategy.fallback_to_playwright is True + + +def test_init_invalid_format(): + with pytest.raises(ValueError, match="output_format"): + AsyncPlasmateCrawlerStrategy(output_format="html") + + +# --------------------------------------------------------------------------- +# Command building +# --------------------------------------------------------------------------- + +def test_build_cmd_defaults(): + strategy = AsyncPlasmateCrawlerStrategy() + strategy._plasmate_bin = "/usr/local/bin/plasmate" + cmd = strategy._build_cmd("https://example.com") + assert cmd[0] == "/usr/local/bin/plasmate" + assert "fetch" in cmd + assert "https://example.com" in cmd + assert "--format" in cmd + assert "text" in cmd + assert "--timeout" in cmd + assert "30000" in cmd + + +def test_build_cmd_with_selector(): + strategy = AsyncPlasmateCrawlerStrategy(selector="main") + strategy._plasmate_bin = "/usr/local/bin/plasmate" + cmd = strategy._build_cmd("https://example.com") + assert "--selector" in cmd + assert cmd[cmd.index("--selector") + 1] == "main" + + +def test_build_cmd_with_headers(): + strategy = AsyncPlasmateCrawlerStrategy(extra_headers={"Authorization": "Bearer tok"}) + strategy._plasmate_bin = "/usr/local/bin/plasmate" + cmd = strategy._build_cmd("https://example.com") + assert "--header" in cmd + assert "Authorization: Bearer tok" in cmd[cmd.index("--header") + 1] + + +def test_build_cmd_timeout_converted_to_ms(): + strategy = AsyncPlasmateCrawlerStrategy(timeout=45) + strategy._plasmate_bin = "/usr/local/bin/plasmate" + cmd = strategy._build_cmd("https://example.com") + assert "45000" in cmd + + +# --------------------------------------------------------------------------- +# Context manager +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio +async def test_aenter_sets_binary(): + with patch("shutil.which", return_value="/usr/local/bin/plasmate"): + strategy = AsyncPlasmateCrawlerStrategy(verbose=False) + async with strategy: + assert strategy._plasmate_bin == "/usr/local/bin/plasmate" + + +@pytest.mark.asyncio +async def test_aenter_raises_if_binary_missing(): + with patch("shutil.which", return_value=None), \ + patch("builtins.__import__", side_effect=ImportError): + strategy = AsyncPlasmateCrawlerStrategy(verbose=False) + with pytest.raises(ImportError, match="plasmate is required"): + await strategy.__aenter__() + + +@pytest.mark.asyncio +async def test_aexit_does_not_raise(): + with patch("shutil.which", return_value="/usr/local/bin/plasmate"): + strategy = AsyncPlasmateCrawlerStrategy(verbose=False) + async with strategy: + pass # __aexit__ should complete cleanly + + +# --------------------------------------------------------------------------- +# crawl() — success paths +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio +async def test_crawl_returns_async_crawl_response(): + with patch("shutil.which", return_value="/usr/local/bin/plasmate"), \ + patch("subprocess.run", return_value=_completed_process("Page text content")): + strategy = AsyncPlasmateCrawlerStrategy(verbose=False) + async with strategy: + response = await strategy.crawl("https://example.com") + + assert isinstance(response, AsyncCrawlResponse) + assert response.status_code == 200 + assert "Page text content" in response.html + + +@pytest.mark.asyncio +async def test_crawl_markdown_format(): + with patch("shutil.which", return_value="/usr/local/bin/plasmate"), \ + patch("subprocess.run", return_value=_completed_process("# Heading\n\nBody")): + strategy = AsyncPlasmateCrawlerStrategy(output_format="markdown", verbose=False) + async with strategy: + response = await strategy.crawl("https://example.com") + + assert "# Heading" in response.html + assert response.status_code == 200 + + +@pytest.mark.asyncio +async def test_crawl_som_format(): + som = '{"role":"document","children":[{"role":"heading","name":"Title"}]}' + with patch("shutil.which", return_value="/usr/local/bin/plasmate"), \ + patch("subprocess.run", return_value=_completed_process(som)): + strategy = AsyncPlasmateCrawlerStrategy(output_format="som", verbose=False) + async with strategy: + response = await strategy.crawl("https://example.com") + + assert "heading" in response.html + assert response.status_code == 200 + + +# --------------------------------------------------------------------------- +# crawl() — error paths +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio +async def test_crawl_nonzero_returncode_returns_500(): + with patch("shutil.which", return_value="/usr/local/bin/plasmate"), \ + patch("subprocess.run", return_value=_completed_process("", returncode=1)): + strategy = AsyncPlasmateCrawlerStrategy(verbose=False) + async with strategy: + response = await strategy.crawl("https://example.com") + + assert response.status_code == 500 + assert response.html == "" + + +@pytest.mark.asyncio +async def test_crawl_timeout_returns_504(): + with patch("shutil.which", return_value="/usr/local/bin/plasmate"), \ + patch("subprocess.run", side_effect=subprocess.TimeoutExpired(cmd="plasmate", timeout=30)): + strategy = AsyncPlasmateCrawlerStrategy(verbose=False) + async with strategy: + response = await strategy.crawl("https://example.com") + + assert response.status_code == 504 + assert response.html == "" + + +# --------------------------------------------------------------------------- +# Playwright fallback +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio +async def test_fallback_triggered_on_empty_response(): + fallback_html = "Playwright fallback" + + mock_fallback_response = AsyncCrawlResponse( + html=fallback_html, + response_headers={}, + status_code=200, + ) + mock_strategy = MagicMock() + mock_strategy.crawl = asyncio.coroutine(lambda url: mock_fallback_response) \ + if hasattr(asyncio, "coroutine") else None + + async def fake_crawl(url): + return mock_fallback_response + + mock_strategy.crawl = fake_crawl + mock_strategy.__aenter__ = asyncio.coroutine(lambda s: s) \ + if hasattr(asyncio, "coroutine") else None + mock_strategy.__aexit__ = asyncio.coroutine(lambda s, *a: None) \ + if hasattr(asyncio, "coroutine") else None + + async def fake_aenter(self): + return self + + async def fake_aexit(self, *args): + pass + + mock_strategy.__aenter__ = lambda: fake_aenter(mock_strategy) + mock_strategy.__aexit__ = lambda *a: fake_aexit(mock_strategy, *a) + + with patch("shutil.which", return_value="/usr/local/bin/plasmate"), \ + patch("subprocess.run", return_value=_completed_process("")), \ + patch( + "crawl4ai.async_plasmate_strategy.AsyncPlaywrightCrawlerStrategy", + return_value=mock_strategy, + ): + strategy = AsyncPlasmateCrawlerStrategy(fallback_to_playwright=True, verbose=False) + async with strategy: + response = await strategy.crawl("https://spa-example.com") + + assert "Playwright fallback" in response.html + + +@pytest.mark.asyncio +async def test_no_fallback_when_content_present(): + with patch("shutil.which", return_value="/usr/local/bin/plasmate"), \ + patch("subprocess.run", return_value=_completed_process("Real content")), \ + patch("crawl4ai.async_plasmate_strategy.AsyncPlaywrightCrawlerStrategy") as mock_pw: + strategy = AsyncPlasmateCrawlerStrategy(fallback_to_playwright=True, verbose=False) + async with strategy: + response = await strategy.crawl("https://example.com") + + mock_pw.assert_not_called() + assert "Real content" in response.html + + +# --------------------------------------------------------------------------- +# Async concurrency +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio +async def test_crawl_multiple_urls_concurrently(): + """Ensure multiple crawl() calls can run concurrently (each is a short-lived subprocess).""" + import time + + call_count = {"n": 0} + + def slow_run(*args, **kwargs): + call_count["n"] += 1 + return _completed_process(f"content {call_count['n']}") + + with patch("shutil.which", return_value="/usr/local/bin/plasmate"), \ + patch("subprocess.run", side_effect=slow_run): + strategy = AsyncPlasmateCrawlerStrategy(verbose=False) + async with strategy: + urls = [f"https://example.com/{i}" for i in range(5)] + responses = await asyncio.gather(*[strategy.crawl(u) for u in urls]) + + assert len(responses) == 5 + assert all(r.status_code == 200 for r in responses) + + +# --------------------------------------------------------------------------- +# Integration with AsyncWebCrawler (mocked) +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio +async def test_drop_in_with_async_web_crawler(): + """Verify strategy is accepted by AsyncWebCrawler without errors.""" + from crawl4ai import AsyncWebCrawler + + with patch("shutil.which", return_value="/usr/local/bin/plasmate"), \ + patch("subprocess.run", return_value=_completed_process("crawled content")): + + strategy = AsyncPlasmateCrawlerStrategy(verbose=False) + + # Patch the webcrawler's __aenter__ to avoid browser initialisation + with patch.object(AsyncWebCrawler, "__aenter__", return_value=MagicMock()), \ + patch.object(AsyncWebCrawler, "__aexit__", return_value=None): + crawler = AsyncWebCrawler(crawler_strategy=strategy) + assert crawler.crawler_strategy is strategy