diff --git a/ms_agent/tools/search/sirchmunk_search.py b/ms_agent/tools/search/sirchmunk_search.py index cd8aaacf5..4529a1a6f 100644 --- a/ms_agent/tools/search/sirchmunk_search.py +++ b/ms_agent/tools/search/sirchmunk_search.py @@ -7,11 +7,14 @@ import asyncio import json -from loguru import logger from omegaconf import DictConfig from pathlib import Path from typing import Any, Callable, Dict, List, Optional +from ms_agent.utils.logger import get_logger + +logger = get_logger() + def _paths_from_block(block: Any) -> List[str]: if block is None: diff --git a/projects/skill_evolution/README.md b/projects/skill_evolution/README.md new file mode 100644 index 000000000..e69de29bb diff --git a/projects/skill_evolution/README_zh.md b/projects/skill_evolution/README_zh.md new file mode 100644 index 000000000..e69de29bb diff --git a/projects/skill_evolution/agents/macro_skill_manager.yaml b/projects/skill_evolution/agents/macro_skill_manager.yaml new file mode 100644 index 000000000..e69de29bb diff --git a/projects/skill_evolution/agents/micro_skill_manager.yaml b/projects/skill_evolution/agents/micro_skill_manager.yaml new file mode 100644 index 000000000..3ae19f672 --- /dev/null +++ b/projects/skill_evolution/agents/micro_skill_manager.yaml @@ -0,0 +1,58 @@ +llm: + service: openai + model: qwen3.6-flash + openai_api_key: + openai_base_url: https://dashscope.aliyuncs.com/compatible-mode/v1 + + +generation_config: + max_tokens: 64000 + extra_body: + enable_thinking: true + + +prompt: + system: | + You are a micro skill manager who manages the skills, including creating and editing skills. Your responsibility is to manage and update the skills based on the provided skills update suggestions and current rejected updates. Your output will update the skills, and the updated skills will be evaluated on the validation set to verify whether the skills are improved, so you need to provide accurate, reliable and hallucination-free skills updates. You must follow these guidelines: + + Your Steps: + 1. Read ALL the provided skills update suggestions. + 2. Analyze the suggestions and determine whether to create or edit the skills based on the suggestions. + 3. Understand the recent rejected updates for the corresponding viewed skills, and avoid making the same mistakes that led to the rejection of the previous updates. + 4. You can use `skill_view` to view the skill to be updated, and you can use the tool `skill_manage` to create or edit the skill. You can use the tool `skill_manage` multiple times to update the skill until you are satisfied with the updated skill. + 5. You MUST update the skills via invoking the tool `skill_manage` instead of directly outputting the updated skills in plain text. + 6. You MUST list the current skills via invoking the tool `skill_list` before updating the skills to ensure that you are aware of the current skills and avoid creating duplicate skills. + 7. You CANNOT create or edit any skills that are not mentioned in the provided skills update suggestions. You can only create or edit the skills that are mentioned in the provided skills update suggestions. + 8. You CANNOT delete any skills. + + Your Optimization Goals: + 1. [Priority] Ensure your updated skills are actually improved. + 2. [Secondary] Reduce token usage while ensuring the quality of your updated skills. + + +skills: + auto_discovery: false + enable_manage: true + + +tools: + code_executor: + mcp: false + implementation: python_env + exclude: + - notebook_executor + - python_executor + - reset_executor + - get_executor_info + file_system: + mcp: false + include: + - read_file + - write_file + - edit_file + - grep + - glob + + +max_chat_round: 10 +enable_snapshots: false diff --git a/projects/skill_evolution/agents/reflector.yaml b/projects/skill_evolution/agents/reflector.yaml new file mode 100644 index 000000000..255a3ccca --- /dev/null +++ b/projects/skill_evolution/agents/reflector.yaml @@ -0,0 +1,55 @@ +llm: + service: openai + model: qwen3.6-flash + openai_api_key: + openai_base_url: https://dashscope.aliyuncs.com/compatible-mode/v1 + + +generation_config: + max_tokens: 64000 + extra_body: + enable_thinking: true + + +prompt: + system: | + You are a pattern analyst who identifies COMMON success-or-failure patterns based on the provided trajectories. Your responsibility is to analyze the MOST IMPORTANT COMMON patterns across trajectories and provide suggestions to update the skills viewed in the trajectories. Your output will be used by a micro skill manager agent to update the skills, so you need to provide accurate, reliable, and hallucination-free information. You must follow these guidelines: + + [Definition] + Trajectory: In this guideline, a trajectory is defined as a complete interaction process between the agent and the environment, which include multiple steps, including viewing skills, agent actions, and environment feedbacks. + * Example: A QA trajectory. Agent responds to the question after viewing the skills. + * Example: An embodied trajectory. Agent views the skills, performs actions and get the feedbacks from the environment. + + [Definition] + Pattern: In this guideline, a pattern is defined as WHY the agent succeeds or fails in the trajectory. It is a common reason across multiple trajectories. + * Example: The viewed skills lack some relevant rules or the skills are misleading. + * Example: The agent finds the correct answer but gives a wrong format. + + [Definition] + Suggestion: In this guideline, a suggestion is defined as HOW to update the skills based on the identified COMMON patterns. It is a common solution across multiple trajectories. + * Example: Add some relevant rules to the skills or fix the misleading content in the skills. + * Example: Update the answer format in the skills to avoid wrong format. + + Your Steps: + 1. Read ALL the provided trajectories. + 2. Compare the agent's answer and the gold answer(s), refer to the detailed evaluation results, and understand WHY the agent succeeds or fails in the trajectory. + 3. Identify the MOST IMPORTANT AND RELEVANT COMMON patterns across the trajectories. + 4. Provide suggestions to update the skills viewed in the trajectories. + 5. Sugeestions MUST be generalized and applicable to ALL the trajectories, not just a specific trajectory. You should NOT provide suggestions that are only applicable to a specific trajectory. + 6. Suggestions MUST be actionable and specific, not vague or general. You should NOT provide suggestions that are too general or abstract, such as "improve the skills" or "fix the skills". You should provide specific suggestions that can be directly applied to update the skills. + 7. Suggestions MUST be clear and MUST NOT contain risks of leaking test data or causing skills overfitting. You should NOT provide suggestions that are too specific to the test data or the trajectories, such as "add the answer to question X in the skills" or "fix the skills based on the feedback in trajectory Y". You should provide suggestions that are general and applicable to all possible test data and trajectories. + + Output Example: + Patterns: + 1. [identified pattern 1] + ... + Suggestions: + [suggestion content] + + Your Optimization Goals: + 1. [Priority] Ensure your suggestions are generalized, actionable, and clear. + 2. [Secondary] Reduce token usage while ensuring the quality of your suggestions. + + +max_chat_round: 10 +enable_snapshots: false diff --git a/projects/skill_evolution/agents/rollout.yaml b/projects/skill_evolution/agents/rollout.yaml new file mode 100644 index 000000000..9132b1aa6 --- /dev/null +++ b/projects/skill_evolution/agents/rollout.yaml @@ -0,0 +1,39 @@ +llm: + service: openai + model: qwen3.6-flash + openai_api_key: + openai_base_url: https://dashscope.aliyuncs.com/compatible-mode/v1 + + +generation_config: + max_tokens: 64000 + extra_body: + enable_thinking: false + + +skills: + auto_discovery: false + enable_manage: false + + +tools: + code_executor: + mcp: false + implementation: python_env + exclude: + - notebook_executor + - python_executor + - reset_executor + - get_executor_info + file_system: + mcp: false + include: + - read_file + - write_file + - edit_file + - grep + - glob + + +max_chat_round: 10 +enable_snapshots: false diff --git a/projects/skill_evolution/config.yaml b/projects/skill_evolution/config.yaml new file mode 100644 index 000000000..22687fe30 --- /dev/null +++ b/projects/skill_evolution/config.yaml @@ -0,0 +1,20 @@ +agents: + rollout: + agent_config: agents/rollout.yaml + + reflector: + agent_config: agents/reflector.yaml + + micro_skill_manager: + agent_config: agents/micro_skill_manager.yaml + + macro_skill_manager: + agent_config: agents/macro_skill_manager.yaml + +train: + num_epochs: 1 + batch_size: 40 + max_workers: 10 + reflection_trigger_size: 1 + reflection_group_size: 4 + max_rejected_update_buffer_size: 3 diff --git a/projects/skill_evolution/run.py b/projects/skill_evolution/run.py new file mode 100644 index 000000000..5d42c44ec --- /dev/null +++ b/projects/skill_evolution/run.py @@ -0,0 +1,60 @@ +import asyncio + +from tasks.base import BaseDataset, BaseEvaluator, BaseRolloutEnv +from skill_evolution_workflow import SkillEvolutionWorkflow + + +async def run_workflow( + config_file: str, + init_skills_path: str, + workdir: str, + train_set: BaseDataset, + val_set: BaseDataset, + test_set: BaseDataset, + rollout_env: BaseRolloutEnv, + evaluator: BaseEvaluator, +): + skill_evolution_workflow = SkillEvolutionWorkflow( + config_file=config_file, + init_skills_path=init_skills_path, + workdir=workdir, + ) + await skill_evolution_workflow.run( + train_set=train_set, + val_set=val_set, + test_set=test_set, + rollout_env=rollout_env, + evaluator=evaluator, + ) + # import os + # data_batch = train_set.get_batch(batch_size=10) + # await skill_evolution_workflow._train_step( + # current_skills_path=init_skills_path, + # data_batch=data_batch, + # rollout_env=rollout_env, + # evaluator=evaluator, + # sub_workdir=os.path.join(workdir, "train_step") + # ) + +if __name__ == "__main__": + from tasks.searchqa import SearchQADataset, SearchQAEvaluator, SearchQARolloutEnv + + train_set = SearchQADataset(data_path="../../../data/minimal_searchqa_split/train/items.json", is_train=True) + val_set = SearchQADataset(data_path="../../../data/minimal_searchqa_split/val/items.json", is_train=False) + test_set = SearchQADataset(data_path="../../../data/minimal_searchqa_split/test/items.json", is_train=False) + rollout_env = SearchQARolloutEnv() + evaluator = SearchQAEvaluator() + + config_file = "./config.yaml" + + coroutine = run_workflow( + config_file=config_file, + init_skills_path="../../../results/msagent_searchqa_qwen36flash/init_skills", + workdir="../../../results/msagent_searchqa_qwen36flash/workdir", + train_set=train_set, + val_set=val_set, + test_set=test_set, + rollout_env=rollout_env, + evaluator=evaluator, + ) + asyncio.run(coroutine) diff --git a/projects/skill_evolution/skill_evolution_workflow.py b/projects/skill_evolution/skill_evolution_workflow.py new file mode 100644 index 000000000..e559bc9c1 --- /dev/null +++ b/projects/skill_evolution/skill_evolution_workflow.py @@ -0,0 +1,604 @@ +import asyncio +import json +import os +import re +import shutil +from collections import defaultdict, deque +from typing import Deque, Dict, List + +from omegaconf import DictConfig + +from ms_agent.config import Config +from ms_agent.agent.loader import AgentLoader +from ms_agent.llm import Message +from ms_agent.utils import get_logger + +from tasks.base import ( + BaseDataItem, BaseDataset, BaseEvaluationResult, BaseEvaluator, BaseRolloutEnv +) +from utils import ( + collect_and_log_evaluation_results, + format_evaluation_result, + gather_with_semaphore +) + +logger = get_logger(__name__) + + +class SkillEvolutionWorkflow: + """Workflow for skill evolution. + + Args: + config_file (str): Path to the configuration file. + init_skills_path (str): Path to the initial skills directory. + trust_remote_code (bool): Whether to allow loading of remote code. Defaults to False. + workdir (str): Working directory for the workflow. Defaults to `./output`. + """ + SKILL_VIEW_TOOL_NAME = "skills---skill_view" + + WORKFLOW_NAME = "SkillEvolutionWorkflow" + + def __init__( + self, + config_file: str, + init_skills_path: str, + workdir: str = "./output", + ): + # prepare config + self.config = Config.from_task(config_file) + self.agents_config = self.config.get("agents", DictConfig({})) + self.train_config = self.config.get("train", DictConfig({})) + self.init_skills_path = init_skills_path + self.workdir = workdir + + # prepare detailed training config + self.train_config.num_epochs = self.train_config.get("num_epochs", 1) + self.train_config.batch_size = self.train_config.get("batch_size", 10) + self.train_config.max_workers = self.train_config.get("max_workers", 1) + self.train_config.reflection_trigger_size = self.train_config.get("reflection_trigger_size", 2) + self.train_config.reflection_group_size = self.train_config.get("reflection_group_size", 4) + self.train_config.rejected_update_buffer_size = self.train_config.get("rejected_update_buffer_size", 3) + + # prepare runtime + self._prepare_agents() + self.semaphore = asyncio.Semaphore(self.train_config.max_workers) + # {viewed_skills: {status: [trajectories in string]}} + self.trajectories_buffer: Dict[str, Dict[str, List[str]]] = defaultdict(lambda: defaultdict(list)) + # {viewed_skills: deque of rejected updates in string} + self.recent_rejected_update_buffer: Dict[str, Deque[str]] = defaultdict( + lambda: deque(maxlen=self.train_config.rejected_update_buffer_size) + ) + + def _prepare_agents(self): + """Prepare the configurations for different agents used in the workflow.""" + self.rollout_agent_config = self._get_agent_config("rollout") + self.reflector_agent_config = self._get_agent_config("reflector") + self.micro_skill_manager_agent_config = self._get_agent_config("micro_skill_manager") + self.macro_skill_manager_agent_config = self._get_agent_config("macro_skill_manager") + + def _get_agent_config(self, agent_name: str) -> DictConfig: + """Get the configuration for agents. + + Args: + agent_name (str): The name of the agent. + + Returns: + DictConfig: The configuration for agents. + """ + if agent_name not in self.agents_config: + raise ValueError(f"Agent {agent_name} not found in configuration.") + + agent_config = self.agents_config[agent_name].get("agent_config") + if isinstance(agent_config, str): + return Config.from_task(os.path.join(self.config.local_dir, agent_config)) + else: + return agent_config + + async def _build_and_run_rollout_agent( + self, + current_skills_path: str, + data_item: BaseDataItem, + rollout_env: BaseRolloutEnv, + rollout_output_dir: str, + ) -> List[Message]: + """Build and run the rollout agent for a given data item.""" + # inject prompt config and skills config into rollout agent config + rollout_agent_config = self.rollout_agent_config.copy() + rollout_agent_config["prompt"] = { + "system": data_item.system + } + rollout_agent_config["skills"].update({ + "path": current_skills_path, + }) + rollout_agent_config["output_dir"] = rollout_output_dir + # build and run + agent = AgentLoader.build( + config=rollout_agent_config, + tag="rollout_agent", + ) + outputs = await rollout_env.run(agent, data_item) + return outputs + + async def _build_and_run_reflector_agent( + self, + query: str, + reflector_output_dir: str + ) -> List[Message]: + """Build and run the reflector agent for a given query.""" + reflector_agent_config = self.reflector_agent_config.copy() + reflector_agent_config["output_dir"] = reflector_output_dir + # build and run + agent = AgentLoader.build( + config=reflector_agent_config, + tag="reflector_agent", + ) + outputs = await agent.run(query) + return outputs + + async def _build_and_run_skill_manager_agent( + self, + agent_config: DictConfig, + current_skills_path: str, + query: str, + skill_manager_output_dir: str + ) -> List[Message]: + """Build and run the skill manager agent for a given query.""" + # inject skills config into skill manager agent config + skill_manager_agent_config = agent_config.copy() + skill_manager_agent_config["skills"].update({ + "path": current_skills_path, + }) + skill_manager_agent_config["output_dir"] = skill_manager_output_dir + # build and run + agent = AgentLoader.build( + config=skill_manager_agent_config, + tag="skill_manager_agent", + ) + outputs = await agent.run(query) + return outputs + + async def run( + self, + train_set: BaseDataset, + val_set: BaseDataset, + test_set: BaseDataset, + rollout_env: BaseRolloutEnv, + evaluator: BaseEvaluator, + ): + """Run the skill evolution workflow. + + Args: + train_set (BaseDataset): Training dataset. + val_set (BaseDataset): Validation dataset. + test_set (BaseDataset): Test dataset. + rollout_env (BaseRolloutEnv): Environment for conducting rollouts. + evaluator (BaseEvaluator): Evaluator for assessing model performance. + """ + # initial validation + # init skills are copied to workdir/init/skills, rollout results are saved to workdir/init/rollout_results + sub_workdir = os.path.join(self.workdir, "init") + os.makedirs(sub_workdir, exist_ok=True) + current_skills_path = os.path.join(sub_workdir, "skills") + shutil.copytree(self.init_skills_path, current_skills_path, dirs_exist_ok=True) + + current_score = await self._validate_or_test( + current_skills_path=current_skills_path, + val_test_set=val_set, + rollout_env=rollout_env, + evaluator=evaluator, + sub_workdir=sub_workdir, + ) + + best_skills_path, best_score = current_skills_path, current_score + last_skills_path, last_score = current_skills_path, current_score + + # training loop + num_steps = (len(train_set) + self.train_config.batch_size - 1) // self.train_config.batch_size + for epoch in range(1, self.train_config.num_epochs + 1): + for step in range(1, num_steps + 1): + sub_workdir = os.path.join(self.workdir, f"epoch_{epoch:02d}", f"step_{step:04d}") + os.makedirs(sub_workdir, exist_ok=True) + current_skills_path = os.path.join(sub_workdir, "skills") + shutil.copytree(last_skills_path, current_skills_path, dirs_exist_ok=True) + + # a train step consists of rollout, evaluation, reflection and skill management + data_batch = train_set.get_batch(self.train_config.batch_size) + await self._train_step( + current_skills_path=current_skills_path, + data_batch=data_batch, + rollout_env=rollout_env, + evaluator=evaluator, + sub_workdir=os.path.join(sub_workdir, "train_step"), + step_num=step + ) + + current_score = await self._validate_or_test( + current_skills_path=current_skills_path, + val_test_set=val_set, + rollout_env=rollout_env, + evaluator=evaluator, + sub_workdir=os.path.join(sub_workdir, "validation"), + ) + + # accept: update best skills if current score is better + if current_score > best_score: + best_score = current_score + best_skills_path = current_skills_path + logger.info(f"New best skills found at {best_skills_path} with score {best_score:.4f}") + # accept: update last skills if current score is better + if current_score > last_score: + last_score = current_score + last_skills_path = current_skills_path + logger.info(f"Updated last skills at {last_skills_path} with score {last_score:.4f}") + # reject: if current score is worse than last score, we will reject the current skills + # and update the recent rejected update buffer for the corresponding viewed skills + else: + # TODO + pass + + # after all steps in the epoch, we will call marco_skill_manager + # to examine the entire skill set and decide whether to merge or remove skills + sub_workdir = os.path.join(self.workdir, f"epoch_{epoch:02d}", f"step_final") + os.makedirs(sub_workdir, exist_ok=True) + current_skills_path = os.path.join(sub_workdir, "skills") + shutil.copytree(last_skills_path, current_skills_path, dirs_exist_ok=True) + + # build and run macro skill manager agent + # TODO + + current_score = await self._validate_or_test( + current_skills_path=current_skills_path, + val_test_set=val_set, + rollout_env=rollout_env, + evaluator=evaluator, + sub_workdir=sub_workdir, + ) + + # update best skills if current score is better + if current_score > best_score: + best_score = current_score + best_skills_path = current_skills_path + logger.info(f"New best skills found at {best_skills_path} with score {best_score:.4f}") + # force update last skills to the current skills after macro skill management + last_skills_path, last_score = current_skills_path, current_score + + # final test + # 1. init skills + test_init_workdir = os.path.join(self.workdir, "test", "init") + os.makedirs(test_init_workdir, exist_ok=True) + test_init_skills_path = os.path.join(test_init_workdir, "skills") + shutil.copytree(self.init_skills_path, test_init_skills_path, dirs_exist_ok=True) + logger.info(f"Testing with initial skills from {test_init_skills_path}") + + test_init_score = await self._validate_or_test( + current_skills_path=test_init_skills_path, + val_test_set=test_set, + rollout_env=rollout_env, + evaluator=evaluator, + sub_workdir=test_init_workdir, + ) + + # 2. last skills + test_last_workdir = os.path.join(self.workdir, "test", "last") + os.makedirs(test_last_workdir, exist_ok=True) + test_last_skills_path = os.path.join(test_last_workdir, "skills") + shutil.copytree(last_skills_path, test_last_skills_path, dirs_exist_ok=True) + logger.info(f"Testing with last skills from {test_last_skills_path}") + + test_last_score = await self._validate_or_test( + current_skills_path=test_last_skills_path, + val_test_set=test_set, + rollout_env=rollout_env, + evaluator=evaluator, + sub_workdir=test_last_workdir, + ) + + # 3. best skills + test_best_workdir = os.path.join(self.workdir, "test", "best") + os.makedirs(test_best_workdir, exist_ok=True) + test_best_skills_path = os.path.join(test_best_workdir, "skills") + shutil.copytree(best_skills_path, test_best_skills_path, dirs_exist_ok=True) + logger.info(f"Testing with best skills from {test_best_skills_path}") + + test_best_score = await self._validate_or_test( + current_skills_path=test_best_skills_path, + val_test_set=test_set, + rollout_env=rollout_env, + evaluator=evaluator, + sub_workdir=test_best_workdir, + ) + + # 4. log final results + logger.info(f"Test Results:\n" + f"Initial Skills Score: {test_init_score:.4f} (from {test_init_skills_path})\n" + f"Last Skills Score: {test_last_score:.4f} (from {test_last_skills_path})\n" + f"Best Skills Score: {test_best_score:.4f} (from {test_best_skills_path})") + + async def _rollout( + self, + current_skills_path: str, + data_batch: List[BaseDataItem], + rollout_env: BaseRolloutEnv, + rollout_output_dir: str, + ) -> List[List[Message]]: + """Rollout the current skills on a batch of data items. + + Args: + current_skills_path (str): Path to the current skills directory. + data_batch (list[BaseDataItem]): A batch of data items to rollout. + rollout_env (BaseRolloutEnv): Environment for conducting rollouts. + rollout_output_dir (str): Directory to save rollout results. + + Returns: + list: A list of rollout results for each data item in the batch. + """ + os.makedirs(rollout_output_dir, exist_ok=True) + coroutines = [ + self._build_and_run_rollout_agent( + current_skills_path=current_skills_path, + data_item=data_item, + rollout_env=rollout_env, + rollout_output_dir=os.path.join(rollout_output_dir, data_item.id) + ) + for data_item in data_batch + ] + return await gather_with_semaphore(self.semaphore, coroutines) + + async def _evaluate( + self, + data_batch: List[BaseDataItem], + rollout_results: List[List[Message]], + evaluator: BaseEvaluator, + evaluation_output_dir: str + ) -> List[BaseEvaluationResult]: + """Evaluate the rollout results for a batch of data items. + + Args: + data_batch (list[BaseDataItem]): A batch of data items. + rollout_results (list): Rollout results corresponding to the data items. + evaluator (BaseEvaluator): Evaluator for assessing model performance. + evaluation_output_dir (str): Directory to save evaluation results. + + Returns: + list: A list of evaluation results for each data item in the batch. + """ + os.makedirs(evaluation_output_dir, exist_ok=True) + coroutines = [ + evaluator.evaluate(messages, data_item, evaluation_output_dir) + for messages, data_item in zip(rollout_results, data_batch) + ] + evaluation_results = await gather_with_semaphore(self.semaphore, coroutines) + return evaluation_results + + async def _reflect( + self, + viewed_skills_queries: Dict[str, list[str]], + reflector_output_dir: str + ) -> Dict[str, list[str]]: + """Reflect on the viewed skills and their corresponding queries. + + Args: + viewed_skills_queries (dict): A dictionary mapping viewed skills to their corresponding queries. + reflector_output_dir (str): Directory to save reflection results. + + Returns: + dict: A dictionary mapping viewed skills to their corresponding reflection results. + """ + os.makedirs(reflector_output_dir, exist_ok=True) + reflection_results = dict() + for viewed_skills, queries in viewed_skills_queries.items(): + coroutines = [ + self._build_and_run_reflector_agent( + query=query, + reflector_output_dir=os.path.join(reflector_output_dir, viewed_skills, f"group_{i:04d}") + ) + for i, query in enumerate(queries) + ] + results = await gather_with_semaphore(self.semaphore, coroutines) + results = [messages[-1].content for messages in results] + reflection_results[viewed_skills] = results + return reflection_results + + async def _micro_skill_manage( + self, + current_skills_path: str, + micro_skill_manage_queries: Dict[str, str], + micro_skill_manager_output_dir: str + ): + """Perform micro skill management at each step based on the reflection results. + + Args: + micro_skill_manage_queries (dict): A dictionary mapping viewed skills to their corresponding management queries. + micro_skill_manager_output_dir (str): Directory to save micro skill management results. + """ + os.makedirs(micro_skill_manager_output_dir, exist_ok=True) + coroutines = [ + self._build_and_run_skill_manager_agent( + agent_config=self.micro_skill_manager_agent_config, + current_skills_path=current_skills_path, + query=query, + skill_manager_output_dir=os.path.join(micro_skill_manager_output_dir, viewed_skills) + ) + for viewed_skills, query in micro_skill_manage_queries.items() + ] + await gather_with_semaphore(self.semaphore, coroutines) + + def _extract_viewed_skills_from_messages(self, messages: List[Message]) -> str: + """Extract the set of viewed skills from the rollout messages. + + Args: + messages (list[Message]): Rollout messages. + + Returns: + str: A string representation of the viewed skill ids, sorted and joined by underscores. + """ + viewed_skills = set() + for message in messages: + tool_calls = message.tool_calls + if not tool_calls: + continue + for tool_call in tool_calls: + if tool_call.get("tool_name", "") == self.SKILL_VIEW_TOOL_NAME: + try: + skill_id = json.loads(tool_call["arguments"])["skill_id"] + viewed_skills.add(skill_id) + except Exception as e: + logger.warning(f"Failed to extract skill_id from tool_call arguments: " + f"{tool_call['arguments']}. Error: {e}") + continue + # if there is no viewed skill, skills (always=true) work or no skill works + if not viewed_skills: + viewed_skills.add("always-or-no-skill") + + # return a string representation of the viewed skill ids, sorted and joined by underscores + return re.sub(r"[^\w\-]", "_", "_".join(sorted(viewed_skills))) + + def _update_trajectories_buffer( + self, + evaluation_results: List[BaseEvaluationResult] + ): + """Update the trajectories buffer with the latest evaluation results. + + Args: + evaluation_results (list[BaseEvaluationResult]): A list of evaluation results. + """ + for evaluation_result in evaluation_results: + messages = evaluation_result.messages + viewed_skills = self._extract_viewed_skills_from_messages(messages) + status = evaluation_result.status + formatted_trajectories = format_evaluation_result(evaluation_result) + # update the buffer + self.trajectories_buffer[viewed_skills][status].append(formatted_trajectories) + + def _format_viewed_skills_queries(self) -> Dict[str, list[str]]: + viewed_skills_queries = defaultdict(list) + for status in ["success", "failure"]: + for viewed_skills, status_dict in self.trajectories_buffer.items(): + trajectories = status_dict.get(status, []) + if len(trajectories) < self.train_config.reflection_trigger_size: + continue + # group trajectories into chunks of size reflection_group_size + for i in range(0, len(trajectories), self.train_config.reflection_group_size): + end = min(i + self.train_config.reflection_group_size, len(trajectories)) + group_trajectories = trajectories[i:end] + # format the group trajectories into a single query for reflection + query = "Trajectories:\n\n" + "\n\n---\n\n".join(group_trajectories) + viewed_skills_queries[viewed_skills].append(query) + return viewed_skills_queries + + def _format_micro_skill_manage_queries( + self, + reflection_results: Dict[str, list[str]] + ) -> Dict[str, str]: + micro_skill_manage_queries = dict() + for viewed_skills, reflections in reflection_results.items(): + # format the reflections into a single query for micro skill management + query = "Reflections:\n" + "\n\n---\n\n".join(reflections) + # add recent rejected updates to the query + rejected_updates = list(self.recent_rejected_update_buffer[viewed_skills]) + if rejected_updates: + rejected_updates_str = "\n\n---\n\n".join(rejected_updates) + query += f"\n\nRecent Rejected Updates:\n{rejected_updates_str}" + micro_skill_manage_queries[viewed_skills] = query + return micro_skill_manage_queries + + async def _train_step( + self, + current_skills_path: str, + data_batch: list[BaseDataItem], + rollout_env: BaseRolloutEnv, + evaluator: BaseEvaluator, + sub_workdir: str, + step_num: int = 0, + ): + """Perform a single training step consisting of rollout, evaluation, reflection, and skill management. + + Args: + current_skills_path (str): Path to the current skills directory. + data_batch (list[BaseDataItem]): A batch of data items for training. + rollout_env (BaseRolloutEnv): Environment for conducting rollouts. + evaluator (BaseEvaluator): Evaluator for assessing model performance. + sub_workdir (str): Sub-directory within the working directory to save rollout and evaluation results. + step_num (int): The current step number in the training loop. Defaults to 0. + """ + # rollout + rollout_results = await self._rollout( + current_skills_path=current_skills_path, + data_batch=data_batch, + rollout_env=rollout_env, + rollout_output_dir=os.path.join(sub_workdir, "rollout_results") + ) + + # evaluate + evaluation_results = await self._evaluate( + data_batch=data_batch, + rollout_results=rollout_results, + evaluator=evaluator, + evaluation_output_dir=os.path.join(sub_workdir, "evaluation_results") + ) + + # collect and log + collect_and_log_evaluation_results( + evaluation_results=evaluation_results, + context=f"Train Step {step_num:04d}" + ) + + # group, format trajectories and update trajectories buffer + self._update_trajectories_buffer(evaluation_results) + + # build queries for reflection based on the trajectories buffer + viewed_skills_queries = self._format_viewed_skills_queries() + reflection_results = await self._reflect( + viewed_skills_queries=viewed_skills_queries, + reflector_output_dir=os.path.join(sub_workdir, "reflection_results") + ) + + # build and run micro skill manager agent + micro_skill_manage_queries = self._format_micro_skill_manage_queries(reflection_results) + await self._micro_skill_manage( + current_skills_path=current_skills_path, + micro_skill_manage_queries=micro_skill_manage_queries, + micro_skill_manager_output_dir=os.path.join(sub_workdir, "micro_skill_manager_results") + ) + + async def _validate_or_test( + self, + current_skills_path: str, + val_test_set: BaseDataset, + rollout_env: BaseRolloutEnv, + evaluator: BaseEvaluator, + sub_workdir: str, + ) -> float: + """Validate or test the model with the current skills. + + Args: + current_skills_path (str): Path to the current skills directory. + val_test_set (BaseDataset): Validation or test dataset. + rollout_env (BaseRolloutEnv): Environment for conducting rollouts. + evaluator (BaseEvaluator): Evaluator for assessing model performance. + sub_workdir (str): Sub-directory within the working directory to save rollout results. + + Returns: + float: The average evaluation score across the dataset. + """ + data_items = val_test_set.get_batch(len(val_test_set)) + # rollout + rollout_results = await self._rollout( + current_skills_path=current_skills_path, + data_batch=data_items, + rollout_env=rollout_env, + rollout_output_dir=os.path.join(sub_workdir, "rollout_results") + ) + # evaluate + evaluation_results = await self._evaluate( + data_batch=data_items, + rollout_results=rollout_results, + evaluator=evaluator, + evaluation_output_dir=os.path.join(sub_workdir, "evaluation_results") + ) + # collect and log + avg_score = collect_and_log_evaluation_results( + evaluation_results=evaluation_results, + context=f"Validation/Test [{sub_workdir}]" + ) + return avg_score diff --git a/projects/skill_evolution/tasks/base.py b/projects/skill_evolution/tasks/base.py new file mode 100644 index 000000000..9bcafca75 --- /dev/null +++ b/projects/skill_evolution/tasks/base.py @@ -0,0 +1,87 @@ +import random +from abc import abstractmethod, ABC +from dataclasses import asdict, dataclass, field +from typing import Dict, List, Literal, Optional + +from ms_agent.agent import Agent +from ms_agent.llm import Message + + +@dataclass +class BaseDataItem(ABC): + """Base class for data items used in tasks.""" + + id: str + system: Optional[str] = field(default=None, kw_only=True) + query: str + + +@dataclass +class BaseEvaluationResult: + """Base class for evaluation results.""" + + messages: List[Message] + score: float + status: Literal["success", "failure"] + + def to_dict(self) -> Dict: + return asdict(self) + + +class BaseDataset(ABC): + """Base class for datasets used in tasks.""" + + def __init__(self, data_path: str, is_train: bool = True): + self.data_path = data_path + self.is_train = is_train + self.data = self.load_data(self.data_path) + # shuffle if training data + if self.is_train: + random.shuffle(self.data) + self.current_index = 0 + + @abstractmethod + def load_data(self) -> list[BaseDataItem]: + """Load data from the specified path and return a list of BaseDataItem instances.""" + pass + + def __len__(self) -> int: + return len(self.data) + + def __getitem__(self, index: int) -> BaseDataItem: + return self.data[index] + + def get_batch(self, batch_size: int) -> list[BaseDataItem]: + """Get a batch of data items.""" + if self.current_index + batch_size >= len(self.data): + batch = self.data[self.current_index :] + self.current_index = 0 # reset for next epoch + else: + batch = self.data[self.current_index : self.current_index + batch_size] + self.current_index += batch_size + return batch + + +class BaseEvaluator(ABC): + + @abstractmethod + async def evaluate( + self, + messages: List[Message], + data_item: BaseDataItem, + evaluation_output_dir: Optional[str] = None + ) -> BaseEvaluationResult: + """Evaluate based on the interaction messages and the corresponding data item. + + Returns a BaseEvaluationResult instance containing the evaluation metrics. + """ + pass + + +class BaseRolloutEnv(ABC): + """Base class for rollout environments used in tasks.""" + + @abstractmethod + async def run(self, agent: Agent, data_item: BaseDataItem) -> List[Message]: + """Rollout the agent on a single data item and return the interaction messages.""" + pass diff --git a/projects/skill_evolution/tasks/searchqa.py b/projects/skill_evolution/tasks/searchqa.py new file mode 100644 index 000000000..89499efbf --- /dev/null +++ b/projects/skill_evolution/tasks/searchqa.py @@ -0,0 +1,169 @@ +import json +import os +import string +import re +from collections import Counter +from dataclasses import dataclass +from typing import List, Optional + +from ms_agent.agent import Agent +from ms_agent.llm import Message +from ms_agent.utils import logger + +from .base import ( + BaseDataItem, BaseDataset, BaseEvaluationResult, BaseEvaluator, BaseRolloutEnv +) + +logger = logger.get_logger(__name__) + + +@dataclass +class SearchQADataItem(BaseDataItem): + """Data item for SearchQA task.""" + + answers: List[str] + + +@dataclass +class SearchQAEvaluationResult(BaseEvaluationResult): + """Evaluation result for SearchQA task.""" + + f1_score: float + prediction: str + answers: List[str] + + +class SearchQADataset(BaseDataset): + + SYSTEM_PROMPT = ( + "You are an expert question answering assistant.\n\n" + "# Task Format\nYou will receive a CONTEXT containing document passages and a QUESTION.\n" + "Read the context carefully and answer the question based on the information provided.\n\n" + "# Answer Format\nThink step by step, then provide your final answer inside ... tags.\n" + "Keep your answer concise — typically a few words or a short phrase.\n" + "Do not repeat the question. Do not include unnecessary explanation in the answer tags.\n\n" + "Example:\nAbraham Lincoln" + ) + + def load_data(self, data_path: str) -> List[SearchQADataItem]: + """Load SearchQA data from the specified path.""" + # check if data_path is ends with .json + if not data_path.endswith(".json"): + raise ValueError(f"Expected a .json file for SearchQA dataset, got: {data_path}") + + with open(data_path, encoding="utf-8") as f: + data = json.load(f) + + data_items = [] + for item in data: + question, context = item["question"], item["context"] + query = f"## Context\n{context}\n\n## Question\n{question}" + data_item = SearchQADataItem( + id=item["id"], + system=self.SYSTEM_PROMPT, + query=query, + answers=item["answers"] + ) + data_items.append(data_item) + logger.info(f"Loaded {len(data_items)} items from {data_path}") + return data_items + + +class SearchQAEvaluator(BaseEvaluator): + + # adapted from https://github.com/microsoft/SkillOpt/blob/9969a8f393f3b5ece29715e6e5b07deb5be90741/skillopt/envs/searchqa/evaluator.py + + def _normalize_answer(self, answer: str) -> str: + """Normalize answer string (SQuAD convention).""" + answer = answer.lower() + answer = "".join(ch for ch in answer if ch not in string.punctuation) + answer = re.sub(r"\b(a|an|the)\b", " ", answer) + answer = " ".join(answer.split()) + return answer.strip() + + def _extract_answer(self, response: str) -> str: + matches = re.findall(r"(.*?)", response, re.DOTALL | re.IGNORECASE) + if matches: + return matches[-1].strip() + lines = [ln.strip() for ln in response.strip().splitlines() if ln.strip()] + if lines: + return lines[-1] + return response.strip() + + def _exact_match(self, prediction: str, gold_answers: List[str]) -> float: + norm_pred = self._normalize_answer(prediction) + for gold_answer in gold_answers: + if self._normalize_answer(gold_answer) == norm_pred: + return 1.0 + return 0.0 + + def _f1_score(self, prediction: str, gold_answers: List[str]) -> float: + """Token-level F1 (SQuAD-style), max across all gold answers.""" + norm_pred = self._normalize_answer(prediction) + pred_tokens = norm_pred.split() + + if not pred_tokens: + for gold_answer in gold_answers: + if not self._normalize_answer(gold_answer).split(): + return 1.0 + return 0.0 + + best_f1 = 0.0 + for gold_answer in gold_answers: + gold_tokens = self._normalize_answer(gold_answer).split() + if not gold_tokens: + continue + common = Counter(pred_tokens) & Counter(gold_tokens) + n_common = sum(common.values()) + if n_common == 0: + continue + precision = n_common / len(pred_tokens) + recall = n_common / len(gold_tokens) + f1 = 2 * precision * recall / (precision + recall) + best_f1 = max(best_f1, f1) + + return best_f1 + + async def evaluate( + self, + messages: List[Message], + data_item: SearchQADataItem, + evaluation_output_dir: Optional[str] = None + ) -> SearchQAEvaluationResult: + """Evaluate based on the interaction messages and the corresponding data item. + + Returns a BaseEvaluationResult instance containing the evaluation metrics. + """ + response = messages[-1].content if messages else "" + prediction = self._extract_answer(response) + + score = self._exact_match(prediction, data_item.answers) + f1_score = self._f1_score(prediction, data_item.answers) + + evaluation_result = SearchQAEvaluationResult( + messages=messages, + score=score, + status="success" if score > 0 else "failure", + f1_score=f1_score, + prediction=prediction, + answers=data_item.answers + ) + + # save if evaluation_output_dir is provided + if evaluation_output_dir: + os.makedirs(evaluation_output_dir, exist_ok=True) + output_path = os.path.join(evaluation_output_dir, f"{data_item.id}.json") + with open(output_path, "w", encoding="utf-8") as f: + json.dump(evaluation_result.to_dict(), f, ensure_ascii=False, indent=4) + return evaluation_result + + +class SearchQARolloutEnv(BaseRolloutEnv): + """Rollout environment for SearchQA task.""" + + async def run(self, agent: Agent, data_item: SearchQADataItem) -> List[Message]: + """Rollout the agent on a single data item and return the interaction messages.""" + messages = await agent.run("Please check the `question_answering_skill`") + # for next query + messages.append(Message(role="user", content=data_item.query)) + return await agent.run(messages) diff --git a/projects/skill_evolution/utils.py b/projects/skill_evolution/utils.py new file mode 100644 index 000000000..3bc887c68 --- /dev/null +++ b/projects/skill_evolution/utils.py @@ -0,0 +1,66 @@ +import asyncio +from typing import List + +from ms_agent.utils import get_logger + +from tasks.base import BaseEvaluationResult + +logger = get_logger(__name__) + + +async def worker(semaphore: asyncio.Semaphore, coroutine): + """A worker that runs a coroutine with a semaphore to limit concurrency.""" + async with semaphore: + try: + return await coroutine + except Exception as e: + logger.error(f"Error in worker: {e}", exc_info=True) + return None + + +async def gather_with_semaphore(semaphore: asyncio.Semaphore, coroutines: List): + """Gather results from coroutines with a semaphore to limit concurrency.""" + tasks = [worker(semaphore, coroutine) for coroutine in coroutines] + results = await asyncio.gather(*tasks) + filtered_results = [result for result in results if result is not None] + return filtered_results + + +def collect_and_log_evaluation_results( + evaluation_results: List[BaseEvaluationResult], + context: str, +) -> float: + """Collect and log the evaluation results. + + Args: + evaluation_results (list[BaseEvaluationResult]): A list of evaluation results. + context (str): Context for logging (e.g., "Train Step", "Validation", "Test"). + + Returns: + float: The average evaluation score across the dataset. + """ + total_score, total_success = 0.0, 0 + for result in evaluation_results: + total_score += result.score + total_success += 1 if result.status == "success" else 0 + num_results = len(evaluation_results) + + avg_score = total_score / num_results if evaluation_results else 0. + success_rate = total_success / num_results if evaluation_results else 0. + logger.info(f"{context} Evaluation results: avg_score={avg_score:.4f}, " + f"success_rate={success_rate:.2%} [{total_success}/{num_results}]") + return avg_score + + +def format_evaluation_result(evaluation_result: BaseEvaluationResult) -> str: + """Format the evaluation result into a string representation. + + Args: + evaluation_result (BaseEvaluationResult): The evaluation result to format. + + Returns: + str: A string representation of the evaluation result. + """ + messages_str = "\n".join(f"{message.role}: {message.content}" for message in evaluation_result.messages) + other_str = "\n".join(f"{key}: {value}" for key, value in evaluation_result.to_dict().items() if key != "messages") + return f"{messages_str}\n\n{other_str}"