From d32e241fbd9787483307518b42325e79e512860d Mon Sep 17 00:00:00 2001 From: bobo Date: Mon, 8 Sep 2025 10:46:02 +0800 Subject: [PATCH 01/12] =?UTF-8?q?feat=EF=BC=9A=20=E9=85=8D=E7=BD=AEclient?= =?UTF-8?q?=20=E6=B1=A0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../sunagent_ext/tweet/TwitterClientPool.py | 84 ++++++++++ .../sunagent_ext/tweet/TwitterGetContext.py | 146 ++++++++++++++++++ .../src/sunagent_ext/tweet/__init__.py | 0 3 files changed, 230 insertions(+) create mode 100644 packages/sunagent-ext/src/sunagent_ext/tweet/TwitterClientPool.py create mode 100644 packages/sunagent-ext/src/sunagent_ext/tweet/TwitterGetContext.py create mode 100644 packages/sunagent-ext/src/sunagent_ext/tweet/__init__.py diff --git a/packages/sunagent-ext/src/sunagent_ext/tweet/TwitterClientPool.py b/packages/sunagent-ext/src/sunagent_ext/tweet/TwitterClientPool.py new file mode 100644 index 0000000..460b00f --- /dev/null +++ b/packages/sunagent-ext/src/sunagent_ext/tweet/TwitterClientPool.py @@ -0,0 +1,84 @@ +import asyncio +import logging +import time +from typing import List, Optional +import tweepy +from dataclasses import dataclass + +logger = logging.getLogger(__name__) + +RETRY_AFTER_SEC = 15 * 60 # 15 分钟 + +@dataclass +class _PoolItem: + client: tweepy.Client # 你的 client 实例 + dead_at: Optional[float] = None # None 表示 alive + +class TwitterClientPool: + """ + Twitter 客户端专用池:轮询获取、异常熔断、15 min 复活、支持永久摘除 + """ + def __init__(self, clients: List[tweepy.Client], retry_after: float = RETRY_AFTER_SEC): + self._retry_after = retry_after + self._pool: List[_PoolItem] = [_PoolItem(c) for c in clients] + self._lock = asyncio.Lock() + self._not_empty = asyncio.Event() + self._rr_idx = 0 + if self._pool: + self._not_empty.set() + + # -------------------- 对外 API -------------------- + async def acquire(self) -> tweepy.Client : + """轮询获取一个健康 client;池空时阻塞直到有可用实例。""" + while True: + async with self._lock: + now = time.time() + # 1. 复活 + for it in self._pool: + if it.dead_at and now - it.dead_at >= self._retry_after: + it.dead_at = None + logger.info("client %s revived", id(it.client)) + + # 2. 留活的 + alive = [it for it in self._pool if it.dead_at is None] + if not alive: + self._not_empty.clear() + logger.warning("all clients dead, waiting ...") + await asyncio.wait_for(self._not_empty.wait(), timeout=1) + continue + + # 3. 轮询 + idx = self._rr_idx % len(alive) + self._rr_idx += 1 + chosen = alive[idx] + # 移到尾部,公平 RR + self._pool.remove(chosen) + self._pool.append(chosen) + return chosen.client + + def remove(self, client: tweepy.Client ) -> None: + """永久摘除某个 client(不再放回池子)。""" + for it in self._pool: + if it.client is client: + self._pool.remove(it) + logger.info("client %s removed permanently", id(client)) + if not any(it.dead_at is None for it in self._pool): + self._not_empty.clear() + return + + def release(self, client: tweepy.Client , *, failed: bool = False) -> None: + """归还 client;failed=True 表示请求异常,触发 15 min 熔断。""" + for it in self._pool: + if it.client is client: + if failed: + it.dead_at = time.time() + logger.warning("client %s dead, will retry after %s min", + id(client), self._retry_after // 60) + asyncio.create_task(self._notify_maybe_alive()) + return + + # -------------------- 内部 -------------------- + async def _notify_maybe_alive(self) -> None: + async with self._lock: + if any(it.dead_at is None for it in self._pool): + self._not_empty.set() \ No newline at end of file diff --git a/packages/sunagent-ext/src/sunagent_ext/tweet/TwitterGetContext.py b/packages/sunagent-ext/src/sunagent_ext/tweet/TwitterGetContext.py new file mode 100644 index 0000000..7549c1a --- /dev/null +++ b/packages/sunagent-ext/src/sunagent_ext/tweet/TwitterGetContext.py @@ -0,0 +1,146 @@ +import asyncio +import json +import logging +from typing import List, Dict, Optional, Any, cast +from datetime import datetime, timedelta, timezone + +from sunagent_ext.cache_store import CacheStore +from tweepy import Response as TwitterResponse, TwitterServerError, NotFound + +from sunagent_ext.tweet import TwitterClientPool + +logger = logging.getLogger("tweet_get_context") + +# 字段按需裁剪 +TWEET_FIELDS = [ + "id", "created_at", "author_id", "text", "public_metrics", + "referenced_tweets", "conversation_id", "entities", +] +EXPANSIONS = ["author_id", "referenced_tweets.id", "referenced_tweets.id.author_id"] +USER_FIELDS = ["id", "username", "name", "public_metrics"] +MAX_RESULTS = 100 # 每页上限 + +class TweetGetContext: + """ + 只负责「Home 时间线 & Mention 时间线」增量抓取, + 所有网络请求通过外部 TwitterClientPool 完成。 + """ + def __init__( + self, + pool: TwitterClientPool, # 前面实现的池子 + cache: Optional[CacheStore[str]] = None, + max_results: int = MAX_RESULTS, + ) -> None: + self.pool = pool + self.cache = cache + self.max_results = max_results + + # -------------------- 对外 API -------------------- + async def get_home_timeline_with_context( + self, + me_id: str, + hours: int = 24, + since_id: Optional[str] = None, + ) -> List[Dict[str, Any]]: + """ + 抓取 **Home 时间线** 最近 N 小时推文(不含回复/转推), + 返回 List[Dict](按时间升序)。 + """ + return await self._fetch_timeline( + endpoint="home", + me_id=me_id, + hours=hours, + since_id=since_id, + ) + + async def get_mentions_with_context( + self, + me_id: str, + hours: int = 24, + since_id: Optional[str] = None, + ) -> List[Dict[str, Any]]: + """ + 抓取 **@我 的 Mention** 最近 N 小时推文, + 返回 List[Dict](按时间升序)。 + """ + return await self._fetch_timeline( + endpoint="mentions", + me_id=me_id, + hours=hours, + since_id=since_id, + ) + + # -------------------- 统一抓取逻辑 -------------------- + async def _fetch_timeline( + self, + endpoint: str, # "home" | "mentions" + me_id: str, + hours: int, + since_id: Optional[str], + ) -> List[Dict[str, Any]]: + since = datetime.now(timezone.utc) - timedelta(hours=hours) + start_time = since.isoformat(timespec="seconds") + + all_tweets: List[Dict[str, Any]] = [] + next_token = None + cache_key = f"{endpoint}:{me_id}" + + # 增量游标 + if not since_id and self.cache: + since_id = self.cache.get(cache_key) + + while True: + cli = await self.pool.acquire() + try: + if endpoint == "home": + resp = cli.get_home_timeline( + tweet_fields=TWEET_FIELDS, + expansions=EXPANSIONS, + user_fields=USER_FIELDS, + exclude=["replies", "retweets"], + start_time=start_time, + since_id=since_id, + max_results=self.max_results, + pagination_token=next_token, + ) + else: # mentions + resp = cli.get_users_mentions( + id=me_id, + tweet_fields=TWEET_FIELDS, + expansions=EXPANSIONS, + user_fields=USER_FIELDS, + start_time=start_time, + since_id=since_id, + max_results=self.max_results, + pagination_token=next_token, + ) + + if resp.data: + users = {str(u.id): u.data for u in resp.includes.get("users", [])} + for tw in resp.data: + d = tw.data + d["author"] = users.get(d["author_id"], {}).get("username", d["author_id"]) + all_tweets.append(d) + + next_token = resp.meta.get("next_token") + if not next_token: + break + + self.pool.release(cli, failed=False) + + except (NotFound, TwitterServerError): + # 404 / 5xx 不再重试 + self.pool.release(cli, failed=False) + break + except Exception as e: + logger.warning("timeline %s error: %s", endpoint, e) + self.pool.release(cli, failed=True) + break + + # 升序(老→新)并缓存 newest_id + all_tweets.sort(key=lambda t: t["id"]) + if all_tweets and self.cache: + newest_id = all_tweets[-1]["id"] + self.cache.set(cache_key, str(newest_id)) + + return all_tweets \ No newline at end of file diff --git a/packages/sunagent-ext/src/sunagent_ext/tweet/__init__.py b/packages/sunagent-ext/src/sunagent_ext/tweet/__init__.py new file mode 100644 index 0000000..e69de29 From 164108efa3d245d5b28ed29dbee72011b2ec8a7d Mon Sep 17 00:00:00 2001 From: bobo Date: Mon, 8 Sep 2025 15:18:03 +0800 Subject: [PATCH 02/12] feat: format code --- packages/sunagent-ext/pyproject.toml | 2 +- .../sunagent_ext/tweet/TwitterGetContext.py | 146 ------- ...erClientPool.py => twitter_client_pool.py} | 31 +- .../sunagent_ext/tweet/twitter_get_context.py | 402 ++++++++++++++++++ 4 files changed, 421 insertions(+), 160 deletions(-) delete mode 100644 packages/sunagent-ext/src/sunagent_ext/tweet/TwitterGetContext.py rename packages/sunagent-ext/src/sunagent_ext/tweet/{TwitterClientPool.py => twitter_client_pool.py} (76%) create mode 100644 packages/sunagent-ext/src/sunagent_ext/tweet/twitter_get_context.py diff --git a/packages/sunagent-ext/pyproject.toml b/packages/sunagent-ext/pyproject.toml index e7fb11b..b3f9945 100644 --- a/packages/sunagent-ext/pyproject.toml +++ b/packages/sunagent-ext/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "sunagent-ext" -version = "0.0.6b10" +version = "0.0.7b1" license = {file = "LICENSE-CODE"} description = "AutoGen extensions library" readme = "README.md" diff --git a/packages/sunagent-ext/src/sunagent_ext/tweet/TwitterGetContext.py b/packages/sunagent-ext/src/sunagent_ext/tweet/TwitterGetContext.py deleted file mode 100644 index 7549c1a..0000000 --- a/packages/sunagent-ext/src/sunagent_ext/tweet/TwitterGetContext.py +++ /dev/null @@ -1,146 +0,0 @@ -import asyncio -import json -import logging -from typing import List, Dict, Optional, Any, cast -from datetime import datetime, timedelta, timezone - -from sunagent_ext.cache_store import CacheStore -from tweepy import Response as TwitterResponse, TwitterServerError, NotFound - -from sunagent_ext.tweet import TwitterClientPool - -logger = logging.getLogger("tweet_get_context") - -# 字段按需裁剪 -TWEET_FIELDS = [ - "id", "created_at", "author_id", "text", "public_metrics", - "referenced_tweets", "conversation_id", "entities", -] -EXPANSIONS = ["author_id", "referenced_tweets.id", "referenced_tweets.id.author_id"] -USER_FIELDS = ["id", "username", "name", "public_metrics"] -MAX_RESULTS = 100 # 每页上限 - -class TweetGetContext: - """ - 只负责「Home 时间线 & Mention 时间线」增量抓取, - 所有网络请求通过外部 TwitterClientPool 完成。 - """ - def __init__( - self, - pool: TwitterClientPool, # 前面实现的池子 - cache: Optional[CacheStore[str]] = None, - max_results: int = MAX_RESULTS, - ) -> None: - self.pool = pool - self.cache = cache - self.max_results = max_results - - # -------------------- 对外 API -------------------- - async def get_home_timeline_with_context( - self, - me_id: str, - hours: int = 24, - since_id: Optional[str] = None, - ) -> List[Dict[str, Any]]: - """ - 抓取 **Home 时间线** 最近 N 小时推文(不含回复/转推), - 返回 List[Dict](按时间升序)。 - """ - return await self._fetch_timeline( - endpoint="home", - me_id=me_id, - hours=hours, - since_id=since_id, - ) - - async def get_mentions_with_context( - self, - me_id: str, - hours: int = 24, - since_id: Optional[str] = None, - ) -> List[Dict[str, Any]]: - """ - 抓取 **@我 的 Mention** 最近 N 小时推文, - 返回 List[Dict](按时间升序)。 - """ - return await self._fetch_timeline( - endpoint="mentions", - me_id=me_id, - hours=hours, - since_id=since_id, - ) - - # -------------------- 统一抓取逻辑 -------------------- - async def _fetch_timeline( - self, - endpoint: str, # "home" | "mentions" - me_id: str, - hours: int, - since_id: Optional[str], - ) -> List[Dict[str, Any]]: - since = datetime.now(timezone.utc) - timedelta(hours=hours) - start_time = since.isoformat(timespec="seconds") - - all_tweets: List[Dict[str, Any]] = [] - next_token = None - cache_key = f"{endpoint}:{me_id}" - - # 增量游标 - if not since_id and self.cache: - since_id = self.cache.get(cache_key) - - while True: - cli = await self.pool.acquire() - try: - if endpoint == "home": - resp = cli.get_home_timeline( - tweet_fields=TWEET_FIELDS, - expansions=EXPANSIONS, - user_fields=USER_FIELDS, - exclude=["replies", "retweets"], - start_time=start_time, - since_id=since_id, - max_results=self.max_results, - pagination_token=next_token, - ) - else: # mentions - resp = cli.get_users_mentions( - id=me_id, - tweet_fields=TWEET_FIELDS, - expansions=EXPANSIONS, - user_fields=USER_FIELDS, - start_time=start_time, - since_id=since_id, - max_results=self.max_results, - pagination_token=next_token, - ) - - if resp.data: - users = {str(u.id): u.data for u in resp.includes.get("users", [])} - for tw in resp.data: - d = tw.data - d["author"] = users.get(d["author_id"], {}).get("username", d["author_id"]) - all_tweets.append(d) - - next_token = resp.meta.get("next_token") - if not next_token: - break - - self.pool.release(cli, failed=False) - - except (NotFound, TwitterServerError): - # 404 / 5xx 不再重试 - self.pool.release(cli, failed=False) - break - except Exception as e: - logger.warning("timeline %s error: %s", endpoint, e) - self.pool.release(cli, failed=True) - break - - # 升序(老→新)并缓存 newest_id - all_tweets.sort(key=lambda t: t["id"]) - if all_tweets and self.cache: - newest_id = all_tweets[-1]["id"] - self.cache.set(cache_key, str(newest_id)) - - return all_tweets \ No newline at end of file diff --git a/packages/sunagent-ext/src/sunagent_ext/tweet/TwitterClientPool.py b/packages/sunagent-ext/src/sunagent_ext/tweet/twitter_client_pool.py similarity index 76% rename from packages/sunagent-ext/src/sunagent_ext/tweet/TwitterClientPool.py rename to packages/sunagent-ext/src/sunagent_ext/tweet/twitter_client_pool.py index 460b00f..19e5daa 100644 --- a/packages/sunagent-ext/src/sunagent_ext/tweet/TwitterClientPool.py +++ b/packages/sunagent-ext/src/sunagent_ext/tweet/twitter_client_pool.py @@ -1,24 +1,29 @@ import asyncio import logging import time -from typing import List, Optional -import tweepy from dataclasses import dataclass +from typing import Any, Coroutine, List, Optional + +import tweepy +from tweepy import Client logger = logging.getLogger(__name__) -RETRY_AFTER_SEC = 15 * 60 # 15 分钟 +RETRY_AFTER_SEC = 15 * 60 # 15 分钟 + @dataclass -class _PoolItem: - client: tweepy.Client # 你的 client 实例 +class _PoolItem: # type: ignore[no-any-unimported] + client: tweepy.Client # type: ignore[no-any-unimported] dead_at: Optional[float] = None # None 表示 alive + class TwitterClientPool: """ Twitter 客户端专用池:轮询获取、异常熔断、15 min 复活、支持永久摘除 """ - def __init__(self, clients: List[tweepy.Client], retry_after: float = RETRY_AFTER_SEC): + + def __init__(self, clients: List[Client], retry_after: float = RETRY_AFTER_SEC): # type: ignore[no-any-unimported] self._retry_after = retry_after self._pool: List[_PoolItem] = [_PoolItem(c) for c in clients] self._lock = asyncio.Lock() @@ -28,7 +33,7 @@ def __init__(self, clients: List[tweepy.Client], retry_after: float = RETRY_AFTE self._not_empty.set() # -------------------- 对外 API -------------------- - async def acquire(self) -> tweepy.Client : + async def acquire(self) -> tuple[Client, Any]: # type: ignore[no-any-unimported] """轮询获取一个健康 client;池空时阻塞直到有可用实例。""" while True: async with self._lock: @@ -54,9 +59,10 @@ async def acquire(self) -> tweepy.Client : # 移到尾部,公平 RR self._pool.remove(chosen) self._pool.append(chosen) - return chosen.client + client = chosen.client + return client, client.consumer_key - def remove(self, client: tweepy.Client ) -> None: + def remove(self, client: tweepy.Client) -> None: # type: ignore[no-any-unimported] """永久摘除某个 client(不再放回池子)。""" for it in self._pool: if it.client is client: @@ -66,14 +72,13 @@ def remove(self, client: tweepy.Client ) -> None: self._not_empty.clear() return - def release(self, client: tweepy.Client , *, failed: bool = False) -> None: + def release(self, client: tweepy.Client, *, failed: bool = False) -> None: # type: ignore[no-any-unimported] """归还 client;failed=True 表示请求异常,触发 15 min 熔断。""" for it in self._pool: if it.client is client: if failed: it.dead_at = time.time() - logger.warning("client %s dead, will retry after %s min", - id(client), self._retry_after // 60) + logger.warning("client %s dead, will retry after %s min", id(client), self._retry_after // 60) asyncio.create_task(self._notify_maybe_alive()) return @@ -81,4 +86,4 @@ def release(self, client: tweepy.Client , *, failed: bool = False) -> None: async def _notify_maybe_alive(self) -> None: async with self._lock: if any(it.dead_at is None for it in self._pool): - self._not_empty.set() \ No newline at end of file + self._not_empty.set() diff --git a/packages/sunagent-ext/src/sunagent_ext/tweet/twitter_get_context.py b/packages/sunagent-ext/src/sunagent_ext/tweet/twitter_get_context.py new file mode 100644 index 0000000..f129e3c --- /dev/null +++ b/packages/sunagent-ext/src/sunagent_ext/tweet/twitter_get_context.py @@ -0,0 +1,402 @@ +""" +Twitter 时间线 & Mention 增量抓取 + 对话链拼合 +网络请求通过 TwitterClientPool,Prometheus 埋点带 client_key +""" + +import asyncio +import json +import logging +from datetime import datetime, timedelta, timezone +from typing import Any, Callable, Dict, List, Optional, cast + +from prometheus_client import Counter, Gauge +from tweepy import Media, NotFound, TwitterServerError, User # 保持原类型注解 +from tweepy import Response as TwitterResponse + +from sunagent_ext.tweet.twitter_client_pool import TwitterClientPool + +logger = logging.getLogger("tweet_get_context") + +# ---------- Prometheus 指标 ---------- +read_tweet_success_count = Counter( + "read_tweet_success_count", "Number of successful read tweets", labelnames=["client_key"] +) +read_tweet_failure_count = Counter( + "read_tweet_failure_count", "Number of failed read tweets", labelnames=["client_key"] +) +tweet_monthly_cap = Gauge("tweet_monthly_cap", "0=触顶 1=正常", labelnames=["client_key"]) + +# ---------- 字段 ---------- +TWEET_FIELDS = [ + "id", + "created_at", + "author_id", + "text", + "public_metrics", + "referenced_tweets", + "conversation_id", + "entities", + "display_text_range", + "attachments", + "withheld", + "note_tweet", + "edit_controls", + "edit_history_tweet_ids", + "possibly_sensitive", + "reply_settings", + "source", + "lang", + "geo", + "context_annotations", + "card_uri", + "community_id", + "in_reply_to_user_id", + "media_metadata", +] +EXPANSIONS = [ + "author_id", + "referenced_tweets.id", + "referenced_tweets.id.author_id", + "attachments.media_keys", + "attachments.poll_ids", + "geo.place_id", +] +USER_FIELDS = [ + "id", + "username", + "name", + "public_metrics", + "created_at", + "description", + "entities", + "location", + "pinned_tweet_id", + "profile_image_url", + "protected", + "verified", + "verified_type", + "is_identity_verified", + "affiliation", + "connection_status", + "most_recent_tweet_id", + "parody", + "receives_your_dm", + "subscription", + "subscription_type", + "profile_banner_url", + "withheld", +] +MEDIA_FIELDS = [ + "alt_text", + "duration_ms", + "height", + "media_key", + "preview_image_url", + "public_metrics", + "type", + "url", + "variants", + "width", +] +POLL_FIELDS = ["duration_minutes", "end_datetime", "id", "options", "voting_status"] +PLACE_FIELDS = ["contained_within", "country", "country_code", "full_name", "geo", "id", "name", "place_type"] +MAX_RESULTS = 100 +MONTHLY_CAP_INFO = "Monthly product cap" + + +# ---------- 主类 ---------- +class TweetGetContext: + def __init__( # type: ignore[no-untyped-def] + self, + pool: TwitterClientPool, # 外部池子 + cache=None, # 可选缓存 + max_results: int = MAX_RESULTS, + block_user_ids: Optional[list[str]] = None, + white_user_ids: Optional[list[str]] = None, + reply_freq_limit: int = 5, + max_depth: int = 5, + ) -> None: + self.pool = pool + self.cache = cache + self.max_depth = max_depth + self.max_results = max_results + self.block_uids = set(block_user_ids or []) + self.white_uids = set(white_user_ids or []) + self.freq_limit = reply_freq_limit + # 用于 mentions_me 判断(可外部注入 me_id) + self.me_id: Optional[str] = None + + # ===================== 对外 API ===================== + async def get_home_timeline_with_context( + self, + me_id: str, + hours: int = 24, + since_id: Optional[str] = None, + filter_func: Optional[Callable[[Dict[str, Any]], bool]] = None, + ) -> list[Dict[str, Any]]: + return await self._fetch_timeline( + endpoint="home", + me_id=me_id, + hours=hours, + since_id=since_id, + filter_func=filter_func or (lambda _: True), + ) + + async def get_mentions_with_context( + self, + me_id: str, + hours: int = 24, + since_id: Optional[str] = None, + filter_func: Optional[Callable[[Dict[str, Any]], bool]] = None, + ) -> list[Dict[str, Any]]: + self.me_id = me_id + return await self._fetch_timeline( + endpoint="mentions", + me_id=me_id, + hours=hours, + since_id=since_id, + filter_func=filter_func or (lambda _: True), + ) + + # ===================== 统一抓取 ===================== + async def _fetch_timeline( + self, + endpoint: str, + me_id: str, + hours: int, + since_id: Optional[str], + filter_func: Callable[[Dict[str, Any]], bool], + ) -> list[Dict[str, Any]]: + since = datetime.now(timezone.utc) - timedelta(hours=hours) + start_time = since.isoformat(timespec="seconds") + next_token = None + all_raw: list[Dict[str, Any]] = [] + cache_key = f"{endpoint}:{me_id}" + if not since_id and self.cache: + since_id = self.cache.get(cache_key) + + while True: + cli, client_key = await self.pool.acquire() + try: + if endpoint == "home": + resp = cli.get_home_timeline( + tweet_fields=TWEET_FIELDS, + expansions=EXPANSIONS, + media_fields=MEDIA_FIELDS, + poll_fields=POLL_FIELDS, + user_fields=USER_FIELDS, + place_fields=PLACE_FIELDS, + exclude=["replies", "retweets"], + start_time=start_time, + since_id=since_id, + max_results=self.max_results, + pagination_token=next_token, + user_auth=True, + ) + else: # mentions + resp = cli.get_users_mentions( + id=me_id, + tweet_fields=TWEET_FIELDS, + expansions=EXPANSIONS, + media_fields=MEDIA_FIELDS, + poll_fields=POLL_FIELDS, + user_fields=USER_FIELDS, + place_fields=PLACE_FIELDS, + start_time=start_time, + since_id=since_id, + max_results=self.max_results, + pagination_token=next_token, + user_auth=True, + ) + + # ③ 成功埋点 + read_tweet_success_count.labels(client_key=client_key).inc(len(resp.data or [])) + + # 交给中间层处理 + tweet_list, next_token = await self.on_twitter_response(resp, filter_func) + all_raw.extend(tweet_list) + if not next_token: + break + self.pool.release(cli, failed=False) + + except (NotFound, TwitterServerError): + self.pool.release(cli, failed=False) + break + except Exception as e: + logger.warning("timeline %s error: %s", endpoint, e) + # ④ 失败埋点 + read_tweet_failure_count.labels(client_key=client_key).inc() + # ⑤ 月额度检测 + if MONTHLY_CAP_INFO in str(e): + tweet_monthly_cap.labels(client_key=client_key).set(0) + self.pool.remove(cli) # 永久踢出 + logger.error("client %s removed due to monthly cap", client_key) + break + else: + tweet_monthly_cap.labels(client_key=client_key).set(1) + self.pool.release(cli, failed=True) + break + + all_raw.sort(key=lambda t: t["id"]) + if all_raw and self.cache: + newest_id = all_raw[-1]["id"] + self.cache.set(cache_key, str(newest_id)) + return all_raw + + # ===================== 中间处理钩子(保留) ===================== + async def on_twitter_response( # type: ignore[no-any-unimported] + self, + response: TwitterResponse, + filter_func: Callable[[Dict[str, Any]], bool], + ) -> tuple[list[Dict[str, Any]], Optional[str]]: + next_token = response.meta.get("next_token") + if response.meta.get("result_count", 0) == 0 or response.data is None: + return [], next_token + + users = self._build_users(response.includes) + medias = self._build_medias(response.includes) + all_tweets = self._get_all_tweets(response, users, medias) + out: list[Dict[str, Any]] = [] + + for tweet in all_tweets: + if not await self._should_keep(tweet, filter_func): + continue + norm = await self._normalize_tweet(tweet) + out.append(norm) + return out, next_token + + async def _should_keep(self, tweet: Dict[str, Any], filter_func: Callable[[Dict[str, Any]], bool]) -> bool: + author_id = str(tweet["author_id"]) + if author_id in self.block_uids: + logger.info("blocked user %s", author_id) + return False + if self.cache and self.cache.get(f"processed:{tweet['id']}"): + logger.info("already processed %s", tweet["id"]) + return False + conv_id = tweet.get("conversation_id", tweet["id"]) + freq = int(self.cache.get(f"freq:{conv_id}") or 0) if self.cache else 0 + if freq >= self.freq_limit and author_id not in self.white_uids: + logger.info(f"skip tweet {tweet['id']} freq {freq}") + return False + return filter_func(tweet) + + async def _normalize_tweet(self, tweet: Dict[str, Any]) -> Dict[str, Any]: + wanted = [ + "id", + "created_at", + "author_id", + "author", + "text", + "public_metrics", + "conversation_id", + "entities", + ] + out = {k: tweet[k] for k in wanted if k in tweet} + out["history"] = await self._build_context(tweet) + out["sampling_quote"] = not tweet.get("referenced_tweets") + return out + + async def _build_context(self, tweet: Dict[str, Any]) -> str: + chain: list[Dict[str, Any]] = [] + await self._recursive_fetch(tweet, chain, depth=0) + lines = [""] + for t in chain: + lines.append(f"{t.get('text', '')}") + lines.append("") + return "\n".join(lines) + + async def _recursive_fetch(self, tweet: Dict[str, Any], chain: list[Dict[str, Any]], depth: int) -> None: + if depth > 5: + chain.append(tweet) + return + parent_id = None + if tweet.get("referenced_tweets"): + ref = tweet["referenced_tweets"][0] + if ref["type"] == "replied_to": + parent_id = ref["id"] + if parent_id: + parent = await self._get_tweet_with_retry(parent_id) + if parent: + await self._recursive_fetch(parent, chain, depth + 1) + chain.append(tweet) + + async def _get_tweet_with_retry(self, tweet_id: str) -> Optional[Dict[str, Any]]: + for attempt in range(3): + cli, client_key = await self.pool.acquire() + try: + resp = cli.get_tweet( + tweet_id, tweet_fields=TWEET_FIELDS, expansions=EXPANSIONS, user_fields=USER_FIELDS, user_auth=True + ) + if not resp.data: + return None + tw: Dict[str, Any] = resp.data.data + users = self._build_users(resp.includes) + self._format_tweet_data(tw, users, self._build_medias(resp.includes)) + self.pool.release(cli, failed=False) + return tw + except (NotFound, TwitterServerError): + self.pool.release(cli, failed=False) + return None + except Exception as e: + logger.warning("get_tweet retry %s: %s", attempt + 1, e) + self.pool.release(cli, failed=True) + if attempt == 2: + return None + await asyncio.sleep(2**attempt) + return None + + # ===================== 原方法签名保持不变 ===================== + def _format_tweet_data(self, tweet: Dict[str, Any], users: Dict[str, User], medias: Dict[str, Media]) -> None: # type: ignore[no-any-unimported] + """标准化推文内容""" + author_id = tweet["author_id"] + user = users[author_id] if author_id in users else None + author = str(user.username) if user and "username" in user else author_id + tweet["author"] = author + tweet["is_robot"] = ( + "Automated" in user["affiliation"]["description"] + if user and "affiliation" in user and "description" in user["affiliation"] + else False + ) + tweet["mentions_me"] = ( + "entities" in tweet + and "mentions" in tweet["entities"] + and self.me_id in list(str(i["id"]) for i in tweet["entities"]["mentions"]) + ) + text = tweet["text"] + if "display_text_range" in tweet: + display_text_range: list[int] = tweet["display_text_range"] + text = text[display_text_range[0] : display_text_range[1]] + tweet["text"] = f"{author}:\n{text}\n\n" + + if ( + "attachments" in tweet + and "media_keys" in tweet["attachments"] + and len(tweet["attachments"]["media_keys"]) > 0 + ): + key = tweet["attachments"]["media_keys"][0] + if key in medias and medias[key].type == "photo": + tweet["image_url"] = medias[key].url + + def _build_users(self, includes: Dict[str, Any]) -> Dict[str, User]: # type: ignore[no-any-unimported] + users: Dict[str, User] = {} # type: ignore[no-any-unimported] + if "users" in includes: + for user in includes["users"]: + users[str(user.id)] = user + return users + + def _build_medias(self, includes: Dict[str, Any]) -> Dict[str, Media]: # type: ignore[no-any-unimported] + medias: Dict[str, Media] = {} # type: ignore[no-any-unimported] + if "media" in includes: + for media in includes["media"]: + medias[str(media.media_key)] = media + return medias + + def _get_all_tweets( # type: ignore[no-any-unimported] + self, response: TwitterResponse, users: Dict[str, User], medias: Dict[str, Media] + ) -> list[Dict[str, Any]]: + all_tweets: list[Dict[str, Any]] = [] + for tweet in response.data: + t = tweet.data + self._format_tweet_data(t, users, medias) + all_tweets.append(t) + return all_tweets From f7ca37561f12957007afb3159b54fb6fe8a2d8dd Mon Sep 17 00:00:00 2001 From: bobo Date: Mon, 8 Sep 2025 15:37:47 +0800 Subject: [PATCH 03/12] feat: close #44 --- .../sunagent_ext/tweet/twitter_client_pool.py | 10 +++++----- .../sunagent_ext/tweet/twitter_get_context.py | 16 ++++++++-------- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/packages/sunagent-ext/src/sunagent_ext/tweet/twitter_client_pool.py b/packages/sunagent-ext/src/sunagent_ext/tweet/twitter_client_pool.py index 19e5daa..496e0e5 100644 --- a/packages/sunagent-ext/src/sunagent_ext/tweet/twitter_client_pool.py +++ b/packages/sunagent-ext/src/sunagent_ext/tweet/twitter_client_pool.py @@ -13,7 +13,7 @@ @dataclass -class _PoolItem: # type: ignore[no-any-unimported] +class _PoolItem: # type: ignore[no-any-unimported] client: tweepy.Client # type: ignore[no-any-unimported] dead_at: Optional[float] = None # None 表示 alive @@ -23,7 +23,7 @@ class TwitterClientPool: Twitter 客户端专用池:轮询获取、异常熔断、15 min 复活、支持永久摘除 """ - def __init__(self, clients: List[Client], retry_after: float = RETRY_AFTER_SEC): # type: ignore[no-any-unimported] + def __init__(self, clients: List[Client], retry_after: float = RETRY_AFTER_SEC): # type: ignore[no-any-unimported] self._retry_after = retry_after self._pool: List[_PoolItem] = [_PoolItem(c) for c in clients] self._lock = asyncio.Lock() @@ -33,7 +33,7 @@ def __init__(self, clients: List[Client], retry_after: float = RETRY_AFTER_SEC): self._not_empty.set() # -------------------- 对外 API -------------------- - async def acquire(self) -> tuple[Client, Any]: # type: ignore[no-any-unimported] + async def acquire(self) -> tuple[Client, Any]: # type: ignore[no-any-unimported] """轮询获取一个健康 client;池空时阻塞直到有可用实例。""" while True: async with self._lock: @@ -62,7 +62,7 @@ async def acquire(self) -> tuple[Client, Any]: # type: ignore[no-any-unimported] client = chosen.client return client, client.consumer_key - def remove(self, client: tweepy.Client) -> None: # type: ignore[no-any-unimported] + def remove(self, client: tweepy.Client) -> None: # type: ignore[no-any-unimported] """永久摘除某个 client(不再放回池子)。""" for it in self._pool: if it.client is client: @@ -72,7 +72,7 @@ def remove(self, client: tweepy.Client) -> None: # type: ignore[no-any-unimporte self._not_empty.clear() return - def release(self, client: tweepy.Client, *, failed: bool = False) -> None: # type: ignore[no-any-unimported] + def release(self, client: tweepy.Client, *, failed: bool = False) -> None: # type: ignore[no-any-unimported] """归还 client;failed=True 表示请求异常,触发 15 min 熔断。""" for it in self._pool: if it.client is client: diff --git a/packages/sunagent-ext/src/sunagent_ext/tweet/twitter_get_context.py b/packages/sunagent-ext/src/sunagent_ext/tweet/twitter_get_context.py index f129e3c..4819343 100644 --- a/packages/sunagent-ext/src/sunagent_ext/tweet/twitter_get_context.py +++ b/packages/sunagent-ext/src/sunagent_ext/tweet/twitter_get_context.py @@ -106,7 +106,7 @@ # ---------- 主类 ---------- class TweetGetContext: - def __init__( # type: ignore[no-untyped-def] + def __init__( # type: ignore[no-untyped-def] self, pool: TwitterClientPool, # 外部池子 cache=None, # 可选缓存 @@ -244,7 +244,7 @@ async def _fetch_timeline( return all_raw # ===================== 中间处理钩子(保留) ===================== - async def on_twitter_response( # type: ignore[no-any-unimported] + async def on_twitter_response( # type: ignore[no-any-unimported] self, response: TwitterResponse, filter_func: Callable[[Dict[str, Any]], bool], @@ -346,7 +346,7 @@ async def _get_tweet_with_retry(self, tweet_id: str) -> Optional[Dict[str, Any]] return None # ===================== 原方法签名保持不变 ===================== - def _format_tweet_data(self, tweet: Dict[str, Any], users: Dict[str, User], medias: Dict[str, Media]) -> None: # type: ignore[no-any-unimported] + def _format_tweet_data(self, tweet: Dict[str, Any], users: Dict[str, User], medias: Dict[str, Media]) -> None: # type: ignore[no-any-unimported] """标准化推文内容""" author_id = tweet["author_id"] user = users[author_id] if author_id in users else None @@ -377,21 +377,21 @@ def _format_tweet_data(self, tweet: Dict[str, Any], users: Dict[str, User], medi if key in medias and medias[key].type == "photo": tweet["image_url"] = medias[key].url - def _build_users(self, includes: Dict[str, Any]) -> Dict[str, User]: # type: ignore[no-any-unimported] - users: Dict[str, User] = {} # type: ignore[no-any-unimported] + def _build_users(self, includes: Dict[str, Any]) -> Dict[str, User]: # type: ignore[no-any-unimported] + users: Dict[str, User] = {} # type: ignore[no-any-unimported] if "users" in includes: for user in includes["users"]: users[str(user.id)] = user return users - def _build_medias(self, includes: Dict[str, Any]) -> Dict[str, Media]: # type: ignore[no-any-unimported] - medias: Dict[str, Media] = {} # type: ignore[no-any-unimported] + def _build_medias(self, includes: Dict[str, Any]) -> Dict[str, Media]: # type: ignore[no-any-unimported] + medias: Dict[str, Media] = {} # type: ignore[no-any-unimported] if "media" in includes: for media in includes["media"]: medias[str(media.media_key)] = media return medias - def _get_all_tweets( # type: ignore[no-any-unimported] + def _get_all_tweets( # type: ignore[no-any-unimported] self, response: TwitterResponse, users: Dict[str, User], medias: Dict[str, Media] ) -> list[Dict[str, Any]]: all_tweets: list[Dict[str, Any]] = [] From 232cb2bdff6c95076e07014d608b232290fe6118 Mon Sep 17 00:00:00 2001 From: bobo Date: Mon, 8 Sep 2025 16:30:49 +0800 Subject: [PATCH 04/12] feat: update metric setting --- .../src/sunagent_ext/tweet/twitter_get_context.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/sunagent-ext/src/sunagent_ext/tweet/twitter_get_context.py b/packages/sunagent-ext/src/sunagent_ext/tweet/twitter_get_context.py index 4819343..fb6ec46 100644 --- a/packages/sunagent-ext/src/sunagent_ext/tweet/twitter_get_context.py +++ b/packages/sunagent-ext/src/sunagent_ext/tweet/twitter_get_context.py @@ -19,12 +19,12 @@ # ---------- Prometheus 指标 ---------- read_tweet_success_count = Counter( - "read_tweet_success_count", "Number of successful read tweets", labelnames=["client_key"] + "ext_read_tweet_success_count", "Number of successful read tweets", labelnames=["client_key"] ) read_tweet_failure_count = Counter( - "read_tweet_failure_count", "Number of failed read tweets", labelnames=["client_key"] + "ext_read_tweet_failure_count", "Number of failed read tweets", labelnames=["client_key"] ) -tweet_monthly_cap = Gauge("tweet_monthly_cap", "0=触顶 1=正常", labelnames=["client_key"]) +tweet_monthly_cap = Gauge("ext_tweet_monthly_cap", "0=触顶 1=正常", labelnames=["client_key"]) # ---------- 字段 ---------- TWEET_FIELDS = [ From 6b129131004f715d032472f828c888ce6ed205e3 Mon Sep 17 00:00:00 2001 From: bobo Date: Mon, 8 Sep 2025 17:05:17 +0800 Subject: [PATCH 05/12] feat: update metric setting --- .../sunagent_ext/tweet/twitter_get_context.py | 56 +++++++++++++++++-- 1 file changed, 50 insertions(+), 6 deletions(-) diff --git a/packages/sunagent-ext/src/sunagent_ext/tweet/twitter_get_context.py b/packages/sunagent-ext/src/sunagent_ext/tweet/twitter_get_context.py index fb6ec46..fbe56dd 100644 --- a/packages/sunagent-ext/src/sunagent_ext/tweet/twitter_get_context.py +++ b/packages/sunagent-ext/src/sunagent_ext/tweet/twitter_get_context.py @@ -101,6 +101,10 @@ POLL_FIELDS = ["duration_minutes", "end_datetime", "id", "options", "voting_status"] PLACE_FIELDS = ["contained_within", "country", "country_code", "full_name", "geo", "id", "name", "place_type"] MAX_RESULTS = 100 +PROCESS_KEY_PREFIX = "P:" +FREQ_KEY_PREFIX = "F:" +HOME_TIMELINE_ID = "last_home_timeline" +MENTIONS_TIMELINE_ID = "last_mentions_timeline" MONTHLY_CAP_INFO = "Monthly product cap" @@ -115,7 +119,9 @@ def __init__( # type: ignore[no-untyped-def] white_user_ids: Optional[list[str]] = None, reply_freq_limit: int = 5, max_depth: int = 5, + agent_id: str ="", ) -> None: + self.agent_id = agent_id self.pool = pool self.cache = cache self.max_depth = max_depth @@ -171,7 +177,9 @@ async def _fetch_timeline( start_time = since.isoformat(timespec="seconds") next_token = None all_raw: list[Dict[str, Any]] = [] - cache_key = f"{endpoint}:{me_id}" + cache_key = f"{self.agent_id}:{MENTIONS_TIMELINE_ID}" + if endpoint == "home": + cache_key = f"{self.agent_id}:{HOME_TIMELINE_ID}" if not since_id and self.cache: since_id = self.cache.get(cache_key) @@ -266,20 +274,56 @@ async def on_twitter_response( # type: ignore[no-any-unimported] return out, next_token async def _should_keep(self, tweet: Dict[str, Any], filter_func: Callable[[Dict[str, Any]], bool]) -> bool: + is_processed = await self._check_tweet_process(tweet["id"]) + if is_processed: + logger.info("already processed %s", tweet["id"]) + return False author_id = str(tweet["author_id"]) if author_id in self.block_uids: logger.info("blocked user %s", author_id) return False - if self.cache and self.cache.get(f"processed:{tweet['id']}"): - logger.info("already processed %s", tweet["id"]) - return False - conv_id = tweet.get("conversation_id", tweet["id"]) - freq = int(self.cache.get(f"freq:{conv_id}") or 0) if self.cache else 0 + freq = await self._get_freq(tweet) if freq >= self.freq_limit and author_id not in self.white_uids: logger.info(f"skip tweet {tweet['id']} freq {freq}") return False + await self._increase_freq(tweet) return filter_func(tweet) + async def _check_tweet_process(self, tweet_id: str) -> bool: + if self.cache is None: + return False + try: + return self.cache.get(f"{self.agent_id}:{PROCESS_KEY_PREFIX}{tweet_id}") is not None + except Exception: + # regard it as processed if cache not available + return True + + async def _mark_tweet_process(self, tweet_id: str) -> None: + if self.cache is None: + return + try: + self.cache.set(f"{self.agent_id}:{PROCESS_KEY_PREFIX}{tweet_id}", "") + except Exception: + pass + + async def _get_freq(self, tweet: Dict[str, Any]) -> int: + if self.cache is None: + return -1 + try: + freq = self.cache.get(f"{self.agent_id}:{FREQ_KEY_PREFIX}{tweet['conversation_id']}") + return int(freq) if freq else 0 + except Exception: + return 0 + + async def _increase_freq(self, tweet: Dict[str, Any]) -> None: + if self.cache is None: + return + freq = await self._get_freq(tweet) + try: + self.cache.set(f"{self.agent_id}:{FREQ_KEY_PREFIX}{tweet['conversation_id']}", str(freq + 1)) + except Exception: + pass + async def _normalize_tweet(self, tweet: Dict[str, Any]) -> Dict[str, Any]: wanted = [ "id", From 64c59e2c695e7c32621b575573003e656340dec1 Mon Sep 17 00:00:00 2001 From: bobo Date: Mon, 8 Sep 2025 17:08:38 +0800 Subject: [PATCH 06/12] feat: update metric setting --- .../sunagent-ext/src/sunagent_ext/tweet/twitter_get_context.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/sunagent-ext/src/sunagent_ext/tweet/twitter_get_context.py b/packages/sunagent-ext/src/sunagent_ext/tweet/twitter_get_context.py index fbe56dd..7bf4e2d 100644 --- a/packages/sunagent-ext/src/sunagent_ext/tweet/twitter_get_context.py +++ b/packages/sunagent-ext/src/sunagent_ext/tweet/twitter_get_context.py @@ -119,7 +119,7 @@ def __init__( # type: ignore[no-untyped-def] white_user_ids: Optional[list[str]] = None, reply_freq_limit: int = 5, max_depth: int = 5, - agent_id: str ="", + agent_id: str = "", ) -> None: self.agent_id = agent_id self.pool = pool From 62f99d07903df1fdec75d442cd1bef26b1c1d37b Mon Sep 17 00:00:00 2001 From: bobo Date: Mon, 8 Sep 2025 18:46:46 +0800 Subject: [PATCH 07/12] =?UTF-8?q?feat=EF=BC=9A=20add=20lock?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../sunagent_ext/tweet/twitter_client_pool.py | 93 +++++++++++-------- .../sunagent_ext/tweet/twitter_get_context.py | 14 +-- 2 files changed, 59 insertions(+), 48 deletions(-) diff --git a/packages/sunagent-ext/src/sunagent_ext/tweet/twitter_client_pool.py b/packages/sunagent-ext/src/sunagent_ext/tweet/twitter_client_pool.py index 496e0e5..edbf542 100644 --- a/packages/sunagent-ext/src/sunagent_ext/tweet/twitter_client_pool.py +++ b/packages/sunagent-ext/src/sunagent_ext/tweet/twitter_client_pool.py @@ -2,10 +2,9 @@ import logging import time from dataclasses import dataclass -from typing import Any, Coroutine, List, Optional +from typing import List, Optional, Protocol import tweepy -from tweepy import Client logger = logging.getLogger(__name__) @@ -15,75 +14,87 @@ @dataclass class _PoolItem: # type: ignore[no-any-unimported] client: tweepy.Client # type: ignore[no-any-unimported] + client_key: str # 用 consumer_key 当唯一标识 dead_at: Optional[float] = None # None 表示 alive class TwitterClientPool: """ Twitter 客户端专用池:轮询获取、异常熔断、15 min 复活、支持永久摘除 + 全部操作在锁内完成,保证并发安全。 """ - def __init__(self, clients: List[Client], retry_after: float = RETRY_AFTER_SEC): # type: ignore[no-any-unimported] + def __init__(self, clients: list[tweepy.Client], retry_after: float = RETRY_AFTER_SEC) -> None: # type: ignore[no-any-unimported] self._retry_after = retry_after - self._pool: List[_PoolItem] = [_PoolItem(c) for c in clients] + self._pool: list[_PoolItem] = [_PoolItem(c, c.consumer_key) for c in clients] self._lock = asyncio.Lock() self._not_empty = asyncio.Event() + # 轮询指针:永远按「当前池子长度」取模,保证真·RR self._rr_idx = 0 if self._pool: self._not_empty.set() # -------------------- 对外 API -------------------- - async def acquire(self) -> tuple[Client, Any]: # type: ignore[no-any-unimported] - """轮询获取一个健康 client;池空时阻塞直到有可用实例。""" + async def acquire(self) -> tuple[tweepy.Client, str]: # type: ignore[no-any-unimported] + """真·Round-Robin:在完整池子上轮询,跳过 dead 的。""" while True: async with self._lock: now = time.time() # 1. 复活 + revived = False for it in self._pool: if it.dead_at and now - it.dead_at >= self._retry_after: it.dead_at = None - logger.info("client %s revived", id(it.client)) + revived = True + logger.info("client %s revived", it.client_key) + if revived: + self._not_empty.set() - # 2. 留活的 - alive = [it for it in self._pool if it.dead_at is None] - if not alive: - self._not_empty.clear() - logger.warning("all clients dead, waiting ...") - await asyncio.wait_for(self._not_empty.wait(), timeout=1) - continue + # 2. 真·轮询:在完整池子上跳过 dead 的 + for _ in range(len(self._pool)): + idx = self._rr_idx % len(self._pool) + self._rr_idx += 1 + chosen = self._pool[idx] + if chosen.dead_at is None: + # 移到尾部,实现 RR + self._pool.pop(idx) + self._pool.append(chosen) + return chosen.client, chosen.client_key - # 3. 轮询 - idx = self._rr_idx % len(alive) - self._rr_idx += 1 - chosen = alive[idx] - # 移到尾部,公平 RR - self._pool.remove(chosen) - self._pool.append(chosen) - client = chosen.client - return client, client.consumer_key + # 3. 没有 alive 的 + self._not_empty.clear() - def remove(self, client: tweepy.Client) -> None: # type: ignore[no-any-unimported] + # 释放锁后再等,避免忙等 + await self._not_empty.wait() + + # -------------------- 加锁摘除 -------------------- + async def remove(self, client: tweepy.Client) -> None: # type: ignore[no-any-unimported] """永久摘除某个 client(不再放回池子)。""" - for it in self._pool: - if it.client is client: - self._pool.remove(it) - logger.info("client %s removed permanently", id(client)) - if not any(it.dead_at is None for it in self._pool): - self._not_empty.clear() - return + async with self._lock: + for idx, it in enumerate(self._pool): + if it.client is client: + self._pool.pop(idx) + logger.info("client %s removed permanently", it.client_key) + if not any(item.dead_at is None for item in self._pool): + self._not_empty.clear() + return - def release(self, client: tweepy.Client, *, failed: bool = False) -> None: # type: ignore[no-any-unimported] - """归还 client;failed=True 表示请求异常,触发 15 min 熔断。""" - for it in self._pool: - if it.client is client: - if failed: - it.dead_at = time.time() - logger.warning("client %s dead, will retry after %s min", id(client), self._retry_after // 60) - asyncio.create_task(self._notify_maybe_alive()) - return + # -------------------- 归还 -------------------- + async def release(self, client: tweepy.Client, *, failed: bool = False) -> None: # type: ignore[no-any-unimported] + async with self._lock: + for it in self._pool: + if it.client is client: + if failed: + it.dead_at = time.time() + logger.warning( + "client %s dead, will retry after %s min", it.client_key, self._retry_after // 60 + ) + # 异步唤醒等待者 + asyncio.create_task(self._notify_maybe_alive()) + return # -------------------- 内部 -------------------- async def _notify_maybe_alive(self) -> None: async with self._lock: - if any(it.dead_at is None for it in self._pool): + if any(item.dead_at is None for item in self._pool): self._not_empty.set() diff --git a/packages/sunagent-ext/src/sunagent_ext/tweet/twitter_get_context.py b/packages/sunagent-ext/src/sunagent_ext/tweet/twitter_get_context.py index 7bf4e2d..762e012 100644 --- a/packages/sunagent-ext/src/sunagent_ext/tweet/twitter_get_context.py +++ b/packages/sunagent-ext/src/sunagent_ext/tweet/twitter_get_context.py @@ -225,10 +225,10 @@ async def _fetch_timeline( all_raw.extend(tweet_list) if not next_token: break - self.pool.release(cli, failed=False) + await self.pool.release(cli, failed=False) except (NotFound, TwitterServerError): - self.pool.release(cli, failed=False) + await self.pool.release(cli, failed=False) break except Exception as e: logger.warning("timeline %s error: %s", endpoint, e) @@ -237,12 +237,12 @@ async def _fetch_timeline( # ⑤ 月额度检测 if MONTHLY_CAP_INFO in str(e): tweet_monthly_cap.labels(client_key=client_key).set(0) - self.pool.remove(cli) # 永久踢出 + await self.pool.remove(cli) # 永久踢出 logger.error("client %s removed due to monthly cap", client_key) break else: tweet_monthly_cap.labels(client_key=client_key).set(1) - self.pool.release(cli, failed=True) + await self.pool.release(cli, failed=True) break all_raw.sort(key=lambda t: t["id"]) @@ -376,14 +376,14 @@ async def _get_tweet_with_retry(self, tweet_id: str) -> Optional[Dict[str, Any]] tw: Dict[str, Any] = resp.data.data users = self._build_users(resp.includes) self._format_tweet_data(tw, users, self._build_medias(resp.includes)) - self.pool.release(cli, failed=False) + await self.pool.release(cli, failed=False) return tw except (NotFound, TwitterServerError): - self.pool.release(cli, failed=False) + await self.pool.release(cli, failed=False) return None except Exception as e: logger.warning("get_tweet retry %s: %s", attempt + 1, e) - self.pool.release(cli, failed=True) + await self.pool.release(cli, failed=True) if attempt == 2: return None await asyncio.sleep(2**attempt) From d52bba7a42bc23ef2365f64e9640c90ba5ebbacd Mon Sep 17 00:00:00 2001 From: bobo Date: Tue, 9 Sep 2025 16:11:53 +0800 Subject: [PATCH 08/12] =?UTF-8?q?feat=EF=BC=9A=20fix=20bugs?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../sunagent_ext/tweet/twitter_client_pool.py | 68 ++++++++++--------- .../sunagent_ext/tweet/twitter_get_context.py | 54 +++++++-------- 2 files changed, 64 insertions(+), 58 deletions(-) diff --git a/packages/sunagent-ext/src/sunagent_ext/tweet/twitter_client_pool.py b/packages/sunagent-ext/src/sunagent_ext/tweet/twitter_client_pool.py index edbf542..7fca007 100644 --- a/packages/sunagent-ext/src/sunagent_ext/tweet/twitter_client_pool.py +++ b/packages/sunagent-ext/src/sunagent_ext/tweet/twitter_client_pool.py @@ -2,12 +2,12 @@ import logging import time from dataclasses import dataclass -from typing import List, Optional, Protocol +from typing import Any, Coroutine, List, Optional import tweepy +from tweepy import Client logger = logging.getLogger(__name__) - RETRY_AFTER_SEC = 15 * 60 # 15 分钟 @@ -20,8 +20,8 @@ class _PoolItem: # type: ignore[no-any-unimported] class TwitterClientPool: """ - Twitter 客户端专用池:轮询获取、异常熔断、15 min 复活、支持永久摘除 - 全部操作在锁内完成,保证并发安全。 + Twitter 客户端专用池:轮询获取、异常熔断、15 min 复活、支持永久摘除。 + 所有操作在锁内完成,保证并发安全。 """ def __init__(self, clients: list[tweepy.Client], retry_after: float = RETRY_AFTER_SEC) -> None: # type: ignore[no-any-unimported] @@ -29,18 +29,25 @@ def __init__(self, clients: list[tweepy.Client], retry_after: float = RETRY_AFTE self._pool: list[_PoolItem] = [_PoolItem(c, c.consumer_key) for c in clients] self._lock = asyncio.Lock() self._not_empty = asyncio.Event() - # 轮询指针:永远按「当前池子长度」取模,保证真·RR + # 轮询指针:指向下一次应该开始检查的索引 self._rr_idx = 0 - if self._pool: + if any(item.dead_at is None for item in self._pool): self._not_empty.set() - # -------------------- 对外 API -------------------- - async def acquire(self) -> tuple[tweepy.Client, str]: # type: ignore[no-any-unimported] - """真·Round-Robin:在完整池子上轮询,跳过 dead 的。""" + async def acquire(self) -> tuple[Client, str]: # type: ignore[no-any-unimported,return] + """ + 以轮询方式获取一个可用的客户端。 + 如果当前没有可用的客户端,将异步等待直到有客户端复活或被添加。 + """ while True: async with self._lock: + # 0. 如果池子已空(所有客户端被永久移除),直接挂起等待 + if not self._pool: + self._not_empty.clear() + # 跳出 with-block 以释放锁,然后等待 + raise RuntimeError("TwitterClientPool: 所有客户端已被永久摘除,请重建池子") + # 1. 检查并复活到期的客户端 now = time.time() - # 1. 复活 revived = False for it in self._pool: if it.dead_at and now - it.dead_at >= self._retry_after: @@ -49,7 +56,6 @@ async def acquire(self) -> tuple[tweepy.Client, str]: # type: ignore[no-any-uni logger.info("client %s revived", it.client_key) if revived: self._not_empty.set() - # 2. 真·轮询:在完整池子上跳过 dead 的 for _ in range(len(self._pool)): idx = self._rr_idx % len(self._pool) @@ -60,10 +66,8 @@ async def acquire(self) -> tuple[tweepy.Client, str]: # type: ignore[no-any-uni self._pool.pop(idx) self._pool.append(chosen) return chosen.client, chosen.client_key - # 3. 没有 alive 的 self._not_empty.clear() - # 释放锁后再等,避免忙等 await self._not_empty.wait() @@ -71,30 +75,32 @@ async def acquire(self) -> tuple[tweepy.Client, str]: # type: ignore[no-any-uni async def remove(self, client: tweepy.Client) -> None: # type: ignore[no-any-unimported] """永久摘除某个 client(不再放回池子)。""" async with self._lock: - for idx, it in enumerate(self._pool): - if it.client is client: - self._pool.pop(idx) - logger.info("client %s removed permanently", it.client_key) - if not any(item.dead_at is None for item in self._pool): - self._not_empty.clear() - return + # 使用列表推导式过滤掉要移除的客户端,比 pop 更安全 + original_len = len(self._pool) + client_key_to_remove = client.consumer_key + self._pool = [it for it in self._pool if it.client is not client] + if len(self._pool) < original_len: + logger.info("client %s removed permanently", client_key_to_remove) + # 检查移除后是否还有存活的客户端 + if not any(item.dead_at is None for item in self._pool): + self._not_empty.clear() # -------------------- 归还 -------------------- - async def release(self, client: tweepy.Client, *, failed: bool = False) -> None: # type: ignore[no-any-unimported] + async def report_failure(self, client: tweepy.Client) -> None: # type: ignore[no-any-unimported] + """ + 报告一个客户端操作失败,将其置于熔断状态。 + 这不会将客户端从池中移除,它将在指定时间后自动复活。 + """ async with self._lock: for it in self._pool: if it.client is client: - if failed: + # 只有当它还活着时才标记为死亡,避免重复记录 + if it.dead_at is None: it.dead_at = time.time() logger.warning( "client %s dead, will retry after %s min", it.client_key, self._retry_after // 60 ) - # 异步唤醒等待者 - asyncio.create_task(self._notify_maybe_alive()) - return - - # -------------------- 内部 -------------------- - async def _notify_maybe_alive(self) -> None: - async with self._lock: - if any(item.dead_at is None for item in self._pool): - self._not_empty.set() + # 检查此操作是否导致所有客户端都死亡 + if not any(item.dead_at is None for item in self._pool): + self._not_empty.clear() + return # 找到后即可退出 diff --git a/packages/sunagent-ext/src/sunagent_ext/tweet/twitter_get_context.py b/packages/sunagent-ext/src/sunagent_ext/tweet/twitter_get_context.py index 762e012..8958366 100644 --- a/packages/sunagent-ext/src/sunagent_ext/tweet/twitter_get_context.py +++ b/packages/sunagent-ext/src/sunagent_ext/tweet/twitter_get_context.py @@ -4,7 +4,6 @@ """ import asyncio -import json import logging from datetime import datetime, timedelta, timezone from typing import Any, Callable, Dict, List, Optional, cast @@ -119,9 +118,7 @@ def __init__( # type: ignore[no-untyped-def] white_user_ids: Optional[list[str]] = None, reply_freq_limit: int = 5, max_depth: int = 5, - agent_id: str = "", ) -> None: - self.agent_id = agent_id self.pool = pool self.cache = cache self.max_depth = max_depth @@ -136,6 +133,7 @@ def __init__( # type: ignore[no-untyped-def] async def get_home_timeline_with_context( self, me_id: str, + agent_id: str, hours: int = 24, since_id: Optional[str] = None, filter_func: Optional[Callable[[Dict[str, Any]], bool]] = None, @@ -145,12 +143,14 @@ async def get_home_timeline_with_context( me_id=me_id, hours=hours, since_id=since_id, + agent_id=agent_id, filter_func=filter_func or (lambda _: True), ) async def get_mentions_with_context( self, me_id: str, + agent_id: str, hours: int = 24, since_id: Optional[str] = None, filter_func: Optional[Callable[[Dict[str, Any]], bool]] = None, @@ -160,6 +160,7 @@ async def get_mentions_with_context( endpoint="mentions", me_id=me_id, hours=hours, + agent_id=agent_id, since_id=since_id, filter_func=filter_func or (lambda _: True), ) @@ -172,14 +173,15 @@ async def _fetch_timeline( hours: int, since_id: Optional[str], filter_func: Callable[[Dict[str, Any]], bool], + agent_id: str, ) -> list[Dict[str, Any]]: since = datetime.now(timezone.utc) - timedelta(hours=hours) start_time = since.isoformat(timespec="seconds") next_token = None all_raw: list[Dict[str, Any]] = [] - cache_key = f"{self.agent_id}:{MENTIONS_TIMELINE_ID}" + cache_key = f"{agent_id}:{MENTIONS_TIMELINE_ID}" if endpoint == "home": - cache_key = f"{self.agent_id}:{HOME_TIMELINE_ID}" + cache_key = f"{agent_id}:{HOME_TIMELINE_ID}" if not since_id and self.cache: since_id = self.cache.get(cache_key) @@ -221,14 +223,11 @@ async def _fetch_timeline( read_tweet_success_count.labels(client_key=client_key).inc(len(resp.data or [])) # 交给中间层处理 - tweet_list, next_token = await self.on_twitter_response(resp, filter_func) + tweet_list, next_token = await self.on_twitter_response(agent_id, resp, filter_func) all_raw.extend(tweet_list) if not next_token: break - await self.pool.release(cli, failed=False) - except (NotFound, TwitterServerError): - await self.pool.release(cli, failed=False) break except Exception as e: logger.warning("timeline %s error: %s", endpoint, e) @@ -242,7 +241,7 @@ async def _fetch_timeline( break else: tweet_monthly_cap.labels(client_key=client_key).set(1) - await self.pool.release(cli, failed=True) + await self.pool.report_failure(cli) break all_raw.sort(key=lambda t: t["id"]) @@ -254,6 +253,7 @@ async def _fetch_timeline( # ===================== 中间处理钩子(保留) ===================== async def on_twitter_response( # type: ignore[no-any-unimported] self, + agent_id: str, response: TwitterResponse, filter_func: Callable[[Dict[str, Any]], bool], ) -> tuple[list[Dict[str, Any]], Optional[str]]: @@ -267,14 +267,16 @@ async def on_twitter_response( # type: ignore[no-any-unimported] out: list[Dict[str, Any]] = [] for tweet in all_tweets: - if not await self._should_keep(tweet, filter_func): + if not await self._should_keep(agent_id, tweet, filter_func): continue norm = await self._normalize_tweet(tweet) out.append(norm) return out, next_token - async def _should_keep(self, tweet: Dict[str, Any], filter_func: Callable[[Dict[str, Any]], bool]) -> bool: - is_processed = await self._check_tweet_process(tweet["id"]) + async def _should_keep( + self, agent_id: str, tweet: Dict[str, Any], filter_func: Callable[[Dict[str, Any]], bool] + ) -> bool: + is_processed = await self._check_tweet_process(tweet["id"], agent_id) if is_processed: logger.info("already processed %s", tweet["id"]) return False @@ -282,45 +284,45 @@ async def _should_keep(self, tweet: Dict[str, Any], filter_func: Callable[[Dict[ if author_id in self.block_uids: logger.info("blocked user %s", author_id) return False - freq = await self._get_freq(tweet) + freq = await self._get_freq(agent_id, tweet) if freq >= self.freq_limit and author_id not in self.white_uids: logger.info(f"skip tweet {tweet['id']} freq {freq}") return False - await self._increase_freq(tweet) + await self._increase_freq(agent_id, tweet) return filter_func(tweet) - async def _check_tweet_process(self, tweet_id: str) -> bool: + async def _check_tweet_process(self, tweet_id: str, agent_id: str) -> bool: if self.cache is None: return False try: - return self.cache.get(f"{self.agent_id}:{PROCESS_KEY_PREFIX}{tweet_id}") is not None + return self.cache.get(f"{agent_id}:{PROCESS_KEY_PREFIX}{tweet_id}") is not None except Exception: # regard it as processed if cache not available return True - async def _mark_tweet_process(self, tweet_id: str) -> None: + async def _mark_tweet_process(self, tweet_id: str, agent_id: str) -> None: if self.cache is None: return try: - self.cache.set(f"{self.agent_id}:{PROCESS_KEY_PREFIX}{tweet_id}", "") + self.cache.set(f"{agent_id}:{PROCESS_KEY_PREFIX}{tweet_id}", "") except Exception: pass - async def _get_freq(self, tweet: Dict[str, Any]) -> int: + async def _get_freq(self, agent_id: str, tweet: Dict[str, Any]) -> int: if self.cache is None: return -1 try: - freq = self.cache.get(f"{self.agent_id}:{FREQ_KEY_PREFIX}{tweet['conversation_id']}") + freq = self.cache.get(f"{agent_id}:{FREQ_KEY_PREFIX}{tweet['conversation_id']}") return int(freq) if freq else 0 except Exception: return 0 - async def _increase_freq(self, tweet: Dict[str, Any]) -> None: + async def _increase_freq(self, agent_id: str, tweet: Dict[str, Any]) -> None: if self.cache is None: return - freq = await self._get_freq(tweet) + freq = await self._get_freq(agent_id, tweet) try: - self.cache.set(f"{self.agent_id}:{FREQ_KEY_PREFIX}{tweet['conversation_id']}", str(freq + 1)) + self.cache.set(f"{agent_id}:{FREQ_KEY_PREFIX}{tweet['conversation_id']}", str(freq + 1)) except Exception: pass @@ -376,14 +378,12 @@ async def _get_tweet_with_retry(self, tweet_id: str) -> Optional[Dict[str, Any]] tw: Dict[str, Any] = resp.data.data users = self._build_users(resp.includes) self._format_tweet_data(tw, users, self._build_medias(resp.includes)) - await self.pool.release(cli, failed=False) return tw except (NotFound, TwitterServerError): - await self.pool.release(cli, failed=False) return None except Exception as e: logger.warning("get_tweet retry %s: %s", attempt + 1, e) - await self.pool.release(cli, failed=True) + await self.pool.report_failure(cli) if attempt == 2: return None await asyncio.sleep(2**attempt) From f152b595072e2d27104f767c04c149416abad596 Mon Sep 17 00:00:00 2001 From: bobo Date: Tue, 9 Sep 2025 17:20:41 +0800 Subject: [PATCH 09/12] feat: update round --- .../sunagent_ext/tweet/twitter_client_pool.py | 44 +++++++++---------- 1 file changed, 20 insertions(+), 24 deletions(-) diff --git a/packages/sunagent-ext/src/sunagent_ext/tweet/twitter_client_pool.py b/packages/sunagent-ext/src/sunagent_ext/tweet/twitter_client_pool.py index 7fca007..5077eb5 100644 --- a/packages/sunagent-ext/src/sunagent_ext/tweet/twitter_client_pool.py +++ b/packages/sunagent-ext/src/sunagent_ext/tweet/twitter_client_pool.py @@ -34,42 +34,38 @@ def __init__(self, clients: list[tweepy.Client], retry_after: float = RETRY_AFTE if any(item.dead_at is None for item in self._pool): self._not_empty.set() - async def acquire(self) -> tuple[Client, str]: # type: ignore[no-any-unimported,return] - """ - 以轮询方式获取一个可用的客户端。 - 如果当前没有可用的客户端,将异步等待直到有客户端复活或被添加。 - """ + async def acquire(self) -> tuple[Client, str]: # type: ignore[no-any-unimported] while True: async with self._lock: - # 0. 如果池子已空(所有客户端被永久移除),直接挂起等待 + # 0. 处理池子为空的边缘情况 if not self._pool: self._not_empty.clear() - # 跳出 with-block 以释放锁,然后等待 + # 在锁外等待,但先释放锁 raise RuntimeError("TwitterClientPool: 所有客户端已被永久摘除,请重建池子") - # 1. 检查并复活到期的客户端 now = time.time() + # 1. 复活 revived = False for it in self._pool: if it.dead_at and now - it.dead_at >= self._retry_after: it.dead_at = None revived = True logger.info("client %s revived", it.client_key) - if revived: - self._not_empty.set() - # 2. 真·轮询:在完整池子上跳过 dead 的 - for _ in range(len(self._pool)): - idx = self._rr_idx % len(self._pool) - self._rr_idx += 1 - chosen = self._pool[idx] - if chosen.dead_at is None: - # 移到尾部,实现 RR - self._pool.pop(idx) - self._pool.append(chosen) - return chosen.client, chosen.client_key - # 3. 没有 alive 的 - self._not_empty.clear() - # 释放锁后再等,避免忙等 - await self._not_empty.wait() + if revived: + self._not_empty.set() + # 2. 简化的、正确的轮询 + # 从上一次的位置开始,最多检查 N 次 (N=池子大小) + for i in range(len(self._pool)): + # 使用 % 来确保索引总在有效范围内 + idx = (self._rr_idx + i) % len(self._pool) + chosen = self._pool[idx] + if chosen.dead_at is None: + # 找到了一个可用的,更新下一次开始搜索的指针 + self._rr_idx = idx + 1 + return chosen.client, chosen.client_key + # 3. 如果循环走完都没有找到 alive 的 + self._not_empty.clear() + # 4. 释放锁后等待,避免死锁和忙等 + await self._not_empty.wait() # -------------------- 加锁摘除 -------------------- async def remove(self, client: tweepy.Client) -> None: # type: ignore[no-any-unimported] From 441fbd5401f2847e59c689efe522fa19414b1fdf Mon Sep 17 00:00:00 2001 From: bobo Date: Tue, 9 Sep 2025 18:01:02 +0800 Subject: [PATCH 10/12] feat: update round --- .../sunagent_ext/tweet/twitter_client_pool.py | 43 ++++++++++--------- 1 file changed, 23 insertions(+), 20 deletions(-) diff --git a/packages/sunagent-ext/src/sunagent_ext/tweet/twitter_client_pool.py b/packages/sunagent-ext/src/sunagent_ext/tweet/twitter_client_pool.py index 5077eb5..55c64f7 100644 --- a/packages/sunagent-ext/src/sunagent_ext/tweet/twitter_client_pool.py +++ b/packages/sunagent-ext/src/sunagent_ext/tweet/twitter_client_pool.py @@ -34,38 +34,41 @@ def __init__(self, clients: list[tweepy.Client], retry_after: float = RETRY_AFTE if any(item.dead_at is None for item in self._pool): self._not_empty.set() - async def acquire(self) -> tuple[Client, str]: # type: ignore[no-any-unimported] + async def acquire(self) -> tuple[Client, str]: # type: ignore[no-any-unimported] + """ + 以轮询方式获取一个可用的客户端。 + 如果当前没有可用的客户端,将异步等待直到有客户端复活或被添加。 + """ while True: async with self._lock: - # 0. 处理池子为空的边缘情况 + # 0. 如果池子已空(所有客户端被永久移除),直接挂起等待 if not self._pool: self._not_empty.clear() - # 在锁外等待,但先释放锁 + # 跳出 with-block 以释放锁,然后等待 raise RuntimeError("TwitterClientPool: 所有客户端已被永久摘除,请重建池子") + # 1. 检查并复活到期的客户端 now = time.time() - # 1. 复活 revived = False for it in self._pool: if it.dead_at and now - it.dead_at >= self._retry_after: it.dead_at = None revived = True logger.info("client %s revived", it.client_key) - if revived: - self._not_empty.set() - # 2. 简化的、正确的轮询 - # 从上一次的位置开始,最多检查 N 次 (N=池子大小) - for i in range(len(self._pool)): - # 使用 % 来确保索引总在有效范围内 - idx = (self._rr_idx + i) % len(self._pool) - chosen = self._pool[idx] - if chosen.dead_at is None: - # 找到了一个可用的,更新下一次开始搜索的指针 - self._rr_idx = idx + 1 - return chosen.client, chosen.client_key - # 3. 如果循环走完都没有找到 alive 的 - self._not_empty.clear() - # 4. 释放锁后等待,避免死锁和忙等 - await self._not_empty.wait() + if revived: + self._not_empty.set() + # 2. 健壮的轮询逻辑 + # 从上一个位置开始,遍历整个池子寻找可用的客户端 + for i in range(len(self._pool)): + idx = (self._rr_idx + i) % len(self._pool) + chosen = self._pool[idx] + if chosen.dead_at is None: + # 找到了,更新下一次轮询的起始点 + self._rr_idx = (idx + 1) % len(self._pool) + return chosen.client, chosen.client_key + # 3. 如果没有找到可用的客户端,清空事件,准备等待 + self._not_empty.clear() + # 4. 在锁外等待,避免阻塞其他协程 + await self._not_empty.wait() # -------------------- 加锁摘除 -------------------- async def remove(self, client: tweepy.Client) -> None: # type: ignore[no-any-unimported] From 698a2464c2bc6ab7649e1ecba4d7a891bc37b3e1 Mon Sep 17 00:00:00 2001 From: bobo Date: Tue, 9 Sep 2025 18:02:16 +0800 Subject: [PATCH 11/12] feat: update round --- .../sunagent-ext/src/sunagent_ext/tweet/twitter_client_pool.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/sunagent-ext/src/sunagent_ext/tweet/twitter_client_pool.py b/packages/sunagent-ext/src/sunagent_ext/tweet/twitter_client_pool.py index 55c64f7..1f06f1a 100644 --- a/packages/sunagent-ext/src/sunagent_ext/tweet/twitter_client_pool.py +++ b/packages/sunagent-ext/src/sunagent_ext/tweet/twitter_client_pool.py @@ -34,7 +34,7 @@ def __init__(self, clients: list[tweepy.Client], retry_after: float = RETRY_AFTE if any(item.dead_at is None for item in self._pool): self._not_empty.set() - async def acquire(self) -> tuple[Client, str]: # type: ignore[no-any-unimported] + async def acquire(self) -> tuple[Client, str]: # type: ignore[no-any-unimported] """ 以轮询方式获取一个可用的客户端。 如果当前没有可用的客户端,将异步等待直到有客户端复活或被添加。 From 8896845c700724a026d785e3f4f5d5c5c00629b3 Mon Sep 17 00:00:00 2001 From: bobo Date: Tue, 9 Sep 2025 18:13:10 +0800 Subject: [PATCH 12/12] feat: update round --- .../src/sunagent_ext/tweet/twitter_client_pool.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/packages/sunagent-ext/src/sunagent_ext/tweet/twitter_client_pool.py b/packages/sunagent-ext/src/sunagent_ext/tweet/twitter_client_pool.py index 1f06f1a..6421b2d 100644 --- a/packages/sunagent-ext/src/sunagent_ext/tweet/twitter_client_pool.py +++ b/packages/sunagent-ext/src/sunagent_ext/tweet/twitter_client_pool.py @@ -40,6 +40,7 @@ async def acquire(self) -> tuple[Client, str]: # type: ignore[no-any-unimported 如果当前没有可用的客户端,将异步等待直到有客户端复活或被添加。 """ while True: + need_wake = True async with self._lock: # 0. 如果池子已空(所有客户端被永久移除),直接挂起等待 if not self._pool: @@ -55,7 +56,9 @@ async def acquire(self) -> tuple[Client, str]: # type: ignore[no-any-unimported revived = True logger.info("client %s revived", it.client_key) if revived: - self._not_empty.set() + need_wake = True + else: + need_wake = False # 2. 健壮的轮询逻辑 # 从上一个位置开始,遍历整个池子寻找可用的客户端 for i in range(len(self._pool)): @@ -67,6 +70,8 @@ async def acquire(self) -> tuple[Client, str]: # type: ignore[no-any-unimported return chosen.client, chosen.client_key # 3. 如果没有找到可用的客户端,清空事件,准备等待 self._not_empty.clear() + if need_wake: + self._not_empty.set() # 4. 在锁外等待,避免阻塞其他协程 await self._not_empty.wait()