Feat/skill evolution project#919
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces a skill evolution workflow framework, including configuration files, a SearchQA task implementation, and utility functions. The review identifies critical correctness issues regarding index alignment when rollouts fail, which can be addressed by updating gather_with_semaphore to optionally retain None values. Additionally, high-severity issues were raised concerning shared state mutations from shallow-copying configurations, a persistent trajectories buffer that causes redundant reflections and potential overfitting, and incomplete TODO blocks for rejected updates and macro skill management. Other recommendations include fixing a configuration key mismatch, correcting a signature mismatch in BaseDataset.load_data, and reshuffling training data upon epoch reset.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| ) | ||
| for data_item in data_batch | ||
| ] | ||
| return await gather_with_semaphore(self.semaphore, coroutines) |
There was a problem hiding this comment.
Critical Severity Correctness Issue: If any rollout fails, gather_with_semaphore will filter out the None result, causing rollout_results to be shorter than data_batch. This shifts the alignment when zipping them in _evaluate, evaluating the wrong predictions against the wrong gold answers. We should disable filter_none here and handle None values explicitly.
| return await gather_with_semaphore(self.semaphore, coroutines) | |
| return await gather_with_semaphore(self.semaphore, coroutines, filter_none=False) |
| coroutines = [ | ||
| evaluator.evaluate(messages, data_item, evaluation_output_dir) | ||
| for messages, data_item in zip(rollout_results, data_batch) | ||
| ] | ||
| evaluation_results = await gather_with_semaphore(self.semaphore, coroutines) |
There was a problem hiding this comment.
Critical Severity Correctness Issue: When rollout_results contains None values (representing failed rollouts), we must handle them explicitly to maintain alignment with data_batch and prevent shifting indices. Failed rollouts should be treated as failed evaluations with a score of 0.0, rather than being silently ignored.
coroutines = []
for messages, data_item in zip(rollout_results, data_batch):
if messages is None:
async def dummy_eval(item=data_item):
return BaseEvaluationResult(messages=[], score=0.0, status="failure")
coroutines.append(dummy_eval())
else:
coroutines.append(evaluator.evaluate(messages, data_item, evaluation_output_dir))
evaluation_results = await gather_with_semaphore(self.semaphore, coroutines, filter_none=False)| async def gather_with_semaphore(semaphore: asyncio.Semaphore, coroutines: List): | ||
| """Gather results from coroutines with a semaphore to limit concurrency.""" | ||
| tasks = [worker(semaphore, coroutine) for coroutine in coroutines] | ||
| results = await asyncio.gather(*tasks) | ||
| filtered_results = [result for result in results if result is not None] | ||
| return filtered_results |
There was a problem hiding this comment.
High Severity Correctness/Robustness Issue: gather_with_semaphore currently filters out None values on exception/failure. While this is fine for some tasks, it breaks index alignment in batch processing tasks like _rollout and _evaluate where the length of the output must match the input batch. Adding a filter_none parameter allows callers to opt-out of filtering.
| async def gather_with_semaphore(semaphore: asyncio.Semaphore, coroutines: List): | |
| """Gather results from coroutines with a semaphore to limit concurrency.""" | |
| tasks = [worker(semaphore, coroutine) for coroutine in coroutines] | |
| results = await asyncio.gather(*tasks) | |
| filtered_results = [result for result in results if result is not None] | |
| return filtered_results | |
| async def gather_with_semaphore(semaphore: asyncio.Semaphore, coroutines: List, filter_none: bool = True): | |
| """Gather results from coroutines with a semaphore to limit concurrency.""" | |
| tasks = [worker(semaphore, coroutine) for coroutine in coroutines] | |
| results = await asyncio.gather(*tasks) | |
| if filter_none: | |
| return [result for result in results if result is not None] | |
| return results |
| # reject: if current score is worse than last score, we will reject the current skills | ||
| # and update the recent rejected update buffer for the corresponding viewed skills | ||
| else: | ||
| # TODO | ||
| pass |
There was a problem hiding this comment.
High Severity Bug / Functional Gap: The else block for handling rejected skill updates is currently a TODO and does nothing. As a result, the recent_rejected_update_buffer is never populated. This directly breaks the feedback loop for the micro_skill_manager agent, which is explicitly instructed in its system prompt to understand and avoid recent rejected updates to prevent repeating the same mistakes.
| # build and run macro skill manager agent | ||
| # TODO | ||
|
|
||
| current_score = await self._validate_or_test( | ||
| current_skills_path=current_skills_path, | ||
| val_test_set=val_set, | ||
| rollout_env=rollout_env, | ||
| evaluator=evaluator, | ||
| sub_workdir=sub_workdir, | ||
| ) |
There was a problem hiding this comment.
High Severity Inefficiency: The macro skill manager agent is currently a TODO and does nothing, meaning the skills in current_skills_path remain identical to last_skills_path. However, the workflow still runs a full validation rollout and evaluation (_validate_or_test) on these unchanged skills. This is extremely expensive in terms of LLM tokens and execution time. Consider skipping the validation step or directly setting current_score = last_score if the macro skill manager is not implemented or if no changes were made to the skills.
| rollout_agent_config = self.rollout_agent_config.copy() | ||
| rollout_agent_config["prompt"] = { | ||
| "system": data_item.system | ||
| } | ||
| rollout_agent_config["skills"].update({ | ||
| "path": current_skills_path, | ||
| }) |
There was a problem hiding this comment.
High Severity Bug / Shared State Mutation: self.rollout_agent_config.copy() performs a shallow copy of the DictConfig object. Since the nested "skills" dictionary is not deep-copied, mutating rollout_agent_config["skills"] directly mutates the shared self.rollout_agent_config["skills"] object. This leads to race conditions and incorrect paths being used across concurrent rollout agents. Use copy.deepcopy() to ensure nested configurations are fully independent.
| rollout_agent_config = self.rollout_agent_config.copy() | |
| rollout_agent_config["prompt"] = { | |
| "system": data_item.system | |
| } | |
| rollout_agent_config["skills"].update({ | |
| "path": current_skills_path, | |
| }) | |
| import copy | |
| rollout_agent_config = copy.deepcopy(self.rollout_agent_config) | |
| rollout_agent_config["prompt"] = { | |
| "system": data_item.system | |
| } | |
| rollout_agent_config["skills"].update({ | |
| "path": current_skills_path, | |
| }) |
| skill_manager_agent_config = agent_config.copy() | ||
| skill_manager_agent_config["skills"].update({ | ||
| "path": current_skills_path, | ||
| }) | ||
| skill_manager_agent_config["output_dir"] = skill_manager_output_dir |
There was a problem hiding this comment.
High Severity Bug / Shared State Mutation: agent_config.copy() performs a shallow copy of the DictConfig object. Since the nested "skills" dictionary is not deep-copied, mutating skill_manager_agent_config["skills"] directly mutates the shared agent_config["skills"] object. Use copy.deepcopy() to ensure nested configurations are fully independent.
| skill_manager_agent_config = agent_config.copy() | |
| skill_manager_agent_config["skills"].update({ | |
| "path": current_skills_path, | |
| }) | |
| skill_manager_agent_config["output_dir"] = skill_manager_output_dir | |
| import copy | |
| skill_manager_agent_config = copy.deepcopy(agent_config) | |
| skill_manager_agent_config["skills"].update({ | |
| "path": current_skills_path, | |
| }) |
| self.train_config.max_workers = self.train_config.get("max_workers", 1) | ||
| self.train_config.reflection_trigger_size = self.train_config.get("reflection_trigger_size", 2) | ||
| self.train_config.reflection_group_size = self.train_config.get("reflection_group_size", 4) | ||
| self.train_config.rejected_update_buffer_size = self.train_config.get("rejected_update_buffer_size", 3) |
There was a problem hiding this comment.
Medium Severity Config Mismatch: The configuration key defined in config.yaml is max_rejected_update_buffer_size: 3, but the code reads rejected_update_buffer_size. This causes the user-configured value to be ignored and always fall back to the default of 3.
| self.train_config.rejected_update_buffer_size = self.train_config.get("rejected_update_buffer_size", 3) | |
| self.train_config.rejected_update_buffer_size = self.train_config.get("max_rejected_update_buffer_size", 3) |
| @abstractmethod | ||
| def load_data(self) -> list[BaseDataItem]: | ||
| """Load data from the specified path and return a list of BaseDataItem instances.""" | ||
| pass |
There was a problem hiding this comment.
Medium Severity Signature Mismatch: The abstract method load_data in BaseDataset is defined without any arguments (except self), but in __init__ it is called as self.load_data(self.data_path). The subclass SearchQADataset also defines it as load_data(self, data_path: str). Please update the abstract method signature to accept data_path: str to maintain consistency and satisfy static analysis tools.
| @abstractmethod | |
| def load_data(self) -> list[BaseDataItem]: | |
| """Load data from the specified path and return a list of BaseDataItem instances.""" | |
| pass | |
| @abstractmethod | |
| def load_data(self, data_path: str) -> list[BaseDataItem]: | |
| """Load data from the specified path and return a list of BaseDataItem instances.""" | |
| pass |
| if self.current_index + batch_size >= len(self.data): | ||
| batch = self.data[self.current_index :] | ||
| self.current_index = 0 # reset for next epoch | ||
| else: | ||
| batch = self.data[self.current_index : self.current_index + batch_size] | ||
| self.current_index += batch_size |
There was a problem hiding this comment.
Medium Severity ML Best Practice: When self.current_index is reset for the next epoch, the dataset is not reshuffled. Reshuffling training data at the start of each epoch is standard practice in machine learning to prevent ordering bias and improve generalization.
| if self.current_index + batch_size >= len(self.data): | |
| batch = self.data[self.current_index :] | |
| self.current_index = 0 # reset for next epoch | |
| else: | |
| batch = self.data[self.current_index : self.current_index + batch_size] | |
| self.current_index += batch_size | |
| if self.current_index + batch_size >= len(self.data): | |
| batch = self.data[self.current_index :] | |
| self.current_index = 0 # reset for next epoch | |
| if self.is_train: | |
| random.shuffle(self.data) | |
| else: | |
| batch = self.data[self.current_index : self.current_index + batch_size] | |
| self.current_index += batch_size |
There was a problem hiding this comment.
Code Review
This pull request introduces a skill evolution workflow framework, including agent configurations, task definitions for SearchQA, and workflow orchestration utilities. The review feedback highlights several critical issues: a bug in gather_with_semaphore that filters out exceptions and causes silent data misalignment during evaluation; a potential crash in _extract_answer when handling non-string or null responses; infinite trajectory accumulation in trajectories_buffer leading to redundant reflections; an incomplete feedback loop due to an unimplemented TODO for rejected updates; a configuration key mismatch for the rejected update buffer size; and an abstract method signature mismatch for load_data in the base dataset class.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| async def worker(semaphore: asyncio.Semaphore, coroutine): | ||
| """A worker that runs a coroutine with a semaphore to limit concurrency.""" | ||
| async with semaphore: | ||
| try: | ||
| return await coroutine | ||
| except Exception as e: | ||
| logger.error(f"Error in worker: {e}", exc_info=True) | ||
| return None | ||
|
|
||
|
|
||
| async def gather_with_semaphore(semaphore: asyncio.Semaphore, coroutines: List): | ||
| """Gather results from coroutines with a semaphore to limit concurrency.""" | ||
| tasks = [worker(semaphore, coroutine) for coroutine in coroutines] | ||
| results = await asyncio.gather(*tasks) | ||
| filtered_results = [result for result in results if result is not None] | ||
| return filtered_results |
There was a problem hiding this comment.
Scrambled Data Alignment on Exceptions
In gather_with_semaphore, if any coroutine raises an exception, the worker catches it, logs it, and returns None. Then, gather_with_semaphore filters out all None values:
filtered_results = [result for result in results if result is not None]This shortens the returned list of results. Downstream, _evaluate and other methods zip the results with the original data_batch:
for messages, data_item in zip(rollout_results, data_batch)If rollout_results is shorter than data_batch, zip will silently truncate the evaluation and misalign the remaining items (e.g., evaluating item_i using the rollout result of item_{i+1}). This leads to silent data corruption and incorrect evaluation metrics.
To fix this, let exceptions propagate so the workflow fails loudly and clearly, preserving correct list length and alignment.
| async def worker(semaphore: asyncio.Semaphore, coroutine): | |
| """A worker that runs a coroutine with a semaphore to limit concurrency.""" | |
| async with semaphore: | |
| try: | |
| return await coroutine | |
| except Exception as e: | |
| logger.error(f"Error in worker: {e}", exc_info=True) | |
| return None | |
| async def gather_with_semaphore(semaphore: asyncio.Semaphore, coroutines: List): | |
| """Gather results from coroutines with a semaphore to limit concurrency.""" | |
| tasks = [worker(semaphore, coroutine) for coroutine in coroutines] | |
| results = await asyncio.gather(*tasks) | |
| filtered_results = [result for result in results if result is not None] | |
| return filtered_results | |
| async def worker(semaphore: asyncio.Semaphore, coroutine): | |
| """A worker that runs a coroutine with a semaphore to limit concurrency.""" | |
| async with semaphore: | |
| return await coroutine | |
| async def gather_with_semaphore(semaphore: asyncio.Semaphore, coroutines: List): | |
| """Gather results from coroutines with a semaphore to limit concurrency.""" | |
| tasks = [worker(semaphore, coroutine) for coroutine in coroutines] | |
| return await asyncio.gather(*tasks) |
| def _extract_answer(self, response: str) -> str: | ||
| matches = re.findall(r"<answer>(.*?)</answer>", response, re.DOTALL | re.IGNORECASE) | ||
| if matches: | ||
| return matches[-1].strip() | ||
| lines = [ln.strip() for ln in response.strip().splitlines() if ln.strip()] | ||
| if lines: | ||
| return lines[-1] | ||
| return response.strip() |
There was a problem hiding this comment.
Potential Crash When response is None
If the assistant message only contains tool_calls and has no text content (which is extremely common when the agent decides to call a tool first), messages[-1].content will be None. Passing None to _extract_answer will cause a TypeError in re.findall or an AttributeError in response.strip().
Add a defensive check to handle None or non-string responses gracefully.
| def _extract_answer(self, response: str) -> str: | |
| matches = re.findall(r"<answer>(.*?)</answer>", response, re.DOTALL | re.IGNORECASE) | |
| if matches: | |
| return matches[-1].strip() | |
| lines = [ln.strip() for ln in response.strip().splitlines() if ln.strip()] | |
| if lines: | |
| return lines[-1] | |
| return response.strip() | |
| def _extract_answer(self, response: Optional[str]) -> str: | |
| if not response or not isinstance(response, str): | |
| return "" | |
| matches = re.findall(r"<answer>(.*?)</answer>", response, re.DOTALL | re.IGNORECASE) | |
| if matches: | |
| return matches[-1].strip() | |
| lines = [ln.strip() for ln in response.strip().splitlines() if ln.strip()] | |
| if lines: | |
| return lines[-1] | |
| return response.strip() |
| def _format_viewed_skills_queries(self) -> Dict[str, list[str]]: | ||
| viewed_skills_queries = defaultdict(list) | ||
| for status in ["success", "failure"]: | ||
| for viewed_skills, status_dict in self.trajectories_buffer.items(): | ||
| trajectories = status_dict.get(status, []) | ||
| if len(trajectories) < self.train_config.reflection_trigger_size: | ||
| continue | ||
| # group trajectories into chunks of size reflection_group_size | ||
| for i in range(0, len(trajectories), self.train_config.reflection_group_size): | ||
| end = min(i + self.train_config.reflection_group_size, len(trajectories)) | ||
| group_trajectories = trajectories[i:end] | ||
| # format the group trajectories into a single query for reflection | ||
| query = "Trajectories:\n\n" + "\n\n---\n\n".join(group_trajectories) | ||
| viewed_skills_queries[viewed_skills].append(query) | ||
| return viewed_skills_queries |
There was a problem hiding this comment.
Infinite Accumulation and Redundant Reflections in trajectories_buffer
self.trajectories_buffer is updated at each training step but is never cleared or pruned after generating reflection queries. As training progresses, the buffer will accumulate all trajectories from previous steps, causing the reflector agent to repeatedly reflect on the same old trajectories. This leads to massive token waste, potential context window overflow, and degraded skill optimization quality.
Clear the processed trajectories from the buffer after generating the reflection queries.
def _format_viewed_skills_queries(self) -> Dict[str, list[str]]:
viewed_skills_queries = defaultdict(list)
for status in ["success", "failure"]:
for viewed_skills, status_dict in self.trajectories_buffer.items():
trajectories = status_dict.get(status, [])
if len(trajectories) < self.train_config.reflection_trigger_size:
continue
# group trajectories into chunks of size reflection_group_size
for i in range(0, len(trajectories), self.train_config.reflection_group_size):
end = min(i + self.train_config.reflection_group_size, len(trajectories))
group_trajectories = trajectories[i:end]
# format the group trajectories into a single query for reflection
query = "Trajectories:\n\n" + "\n\n---\n\n".join(group_trajectories)
viewed_skills_queries[viewed_skills].append(query)
# Clear the processed trajectories to avoid redundant reflections in subsequent steps
status_dict[status] = []
return viewed_skills_queries| # reject: if current score is worse than last score, we will reject the current skills | ||
| # and update the recent rejected update buffer for the corresponding viewed skills | ||
| else: | ||
| # TODO | ||
| pass |
There was a problem hiding this comment.
Incomplete Feedback Loop Due to Missing Rejected Updates Handling
The else block for handling rejected updates is left as a TODO and currently does nothing. Because of this, self.recent_rejected_update_buffer is never populated when an update is rejected. Consequently, the micro skill manager agent will never receive previous rejected updates in its prompt, completely breaking the core feedback loop of avoiding previous mistakes.
| if tool_call.get("tool_name", "") == self.SKILL_VIEW_TOOL_NAME: | ||
| try: | ||
| skill_id = json.loads(tool_call["arguments"])["skill_id"] | ||
| viewed_skills.add(skill_id) | ||
| except Exception as e: | ||
| logger.warning(f"Failed to extract skill_id from tool_call arguments: " | ||
| f"{tool_call['arguments']}. Error: {e}") | ||
| continue |
There was a problem hiding this comment.
Defensive Handling of Tool Call Arguments
tool_call["arguments"] is assumed to always be a JSON-encoded string. However, depending on the LLM client or custom agent implementation, arguments might already be parsed into a Python dictionary. Calling json.loads on a dictionary will raise a TypeError.
Use defensive type checking to handle both string and dictionary formats safely.
| if tool_call.get("tool_name", "") == self.SKILL_VIEW_TOOL_NAME: | |
| try: | |
| skill_id = json.loads(tool_call["arguments"])["skill_id"] | |
| viewed_skills.add(skill_id) | |
| except Exception as e: | |
| logger.warning(f"Failed to extract skill_id from tool_call arguments: " | |
| f"{tool_call['arguments']}. Error: {e}") | |
| continue | |
| if tool_call.get("tool_name", "") == self.SKILL_VIEW_TOOL_NAME: | |
| try: | |
| args = tool_call.get("arguments", {}) | |
| if isinstance(args, str): | |
| args = json.loads(args) | |
| skill_id = args.get("skill_id") | |
| if skill_id: | |
| viewed_skills.add(skill_id) | |
| except Exception as e: | |
| logger.warning(f"Failed to extract skill_id from tool_call arguments: " | |
| f"{tool_call.get('arguments')}. Error: {e}") | |
| continue |
| self.train_config.max_workers = self.train_config.get("max_workers", 1) | ||
| self.train_config.reflection_trigger_size = self.train_config.get("reflection_trigger_size", 2) | ||
| self.train_config.reflection_group_size = self.train_config.get("reflection_group_size", 4) | ||
| self.train_config.rejected_update_buffer_size = self.train_config.get("rejected_update_buffer_size", 3) |
There was a problem hiding this comment.
Configuration Key Mismatch
In config.yaml, the key is defined as max_rejected_update_buffer_size: 3, but the workflow code looks for rejected_update_buffer_size. This mismatch causes the configured value to be silently ignored and fallback to the default of 3.
Update the key to max_rejected_update_buffer_size to align with the configuration file.
| self.train_config.rejected_update_buffer_size = self.train_config.get("rejected_update_buffer_size", 3) | |
| self.train_config.max_rejected_update_buffer_size = self.train_config.get("max_rejected_update_buffer_size", 3) |
| self.recent_rejected_update_buffer: Dict[str, Deque[str]] = defaultdict( | ||
| lambda: deque(maxlen=self.train_config.rejected_update_buffer_size) | ||
| ) |
There was a problem hiding this comment.
Update Buffer Size Reference to Match Configuration
Update the reference to use max_rejected_update_buffer_size instead of rejected_update_buffer_size to match the configuration file.
| self.recent_rejected_update_buffer: Dict[str, Deque[str]] = defaultdict( | |
| lambda: deque(maxlen=self.train_config.rejected_update_buffer_size) | |
| ) | |
| self.recent_rejected_update_buffer: Dict[str, Deque[str]] = defaultdict( | |
| lambda: deque(maxlen=self.train_config.max_rejected_update_buffer_size) | |
| ) |
| @abstractmethod | ||
| def load_data(self) -> list[BaseDataItem]: | ||
| """Load data from the specified path and return a list of BaseDataItem instances.""" | ||
| pass |
There was a problem hiding this comment.
Abstract Method Signature Mismatch
The abstract method load_data is defined with no arguments: def load_data(self) -> list[BaseDataItem]. However, in __init__ it is called with self.data_path, and the subclass SearchQADataset implements it as def load_data(self, data_path: str). This signature mismatch violates the Liskov Substitution Principle and will cause static type checkers or linters to fail.
Update the abstract method signature to accept data_path.
| @abstractmethod | |
| def load_data(self) -> list[BaseDataItem]: | |
| """Load data from the specified path and return a list of BaseDataItem instances.""" | |
| pass | |
| @abstractmethod | |
| def load_data(self, data_path: str) -> list[BaseDataItem]: | |
| """Load data from the specified path and return a list of BaseDataItem instances.""" | |
| pass |
Change Summary
Related issue number
Checklist
pre-commit installandpre-commit run --all-filesbefore git commit, and passed lint check.