Skip to content

Feat/skill evolution project#919

Open
fanqiNO1 wants to merge 1 commit into
modelscope:mainfrom
fanqiNO1:skill_evolution
Open

Feat/skill evolution project#919
fanqiNO1 wants to merge 1 commit into
modelscope:mainfrom
fanqiNO1:skill_evolution

Conversation

@fanqiNO1

@fanqiNO1 fanqiNO1 commented Jul 3, 2026

Copy link
Copy Markdown

Change Summary

Related issue number

Checklist

  • The pull request title is a good summary of the changes - it will be used in the changelog
  • Unit tests for the changes exist
  • Run pre-commit install and pre-commit run --all-files before git commit, and passed lint check.
  • Documentation reflects the changes where applicable

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a skill evolution workflow framework, including configuration files, a SearchQA task implementation, and utility functions. The review identifies critical correctness issues regarding index alignment when rollouts fail, which can be addressed by updating gather_with_semaphore to optionally retain None values. Additionally, high-severity issues were raised concerning shared state mutations from shallow-copying configurations, a persistent trajectories buffer that causes redundant reflections and potential overfitting, and incomplete TODO blocks for rejected updates and macro skill management. Other recommendations include fixing a configuration key mismatch, correcting a signature mismatch in BaseDataset.load_data, and reshuffling training data upon epoch reset.

Important

The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.

)
for data_item in data_batch
]
return await gather_with_semaphore(self.semaphore, coroutines)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

Critical Severity Correctness Issue: If any rollout fails, gather_with_semaphore will filter out the None result, causing rollout_results to be shorter than data_batch. This shifts the alignment when zipping them in _evaluate, evaluating the wrong predictions against the wrong gold answers. We should disable filter_none here and handle None values explicitly.

Suggested change
return await gather_with_semaphore(self.semaphore, coroutines)
return await gather_with_semaphore(self.semaphore, coroutines, filter_none=False)

Comment on lines +366 to +370
coroutines = [
evaluator.evaluate(messages, data_item, evaluation_output_dir)
for messages, data_item in zip(rollout_results, data_batch)
]
evaluation_results = await gather_with_semaphore(self.semaphore, coroutines)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

Critical Severity Correctness Issue: When rollout_results contains None values (representing failed rollouts), we must handle them explicitly to maintain alignment with data_batch and prevent shifting indices. Failed rollouts should be treated as failed evaluations with a score of 0.0, rather than being silently ignored.

        coroutines = []
        for messages, data_item in zip(rollout_results, data_batch):
            if messages is None:
                async def dummy_eval(item=data_item):
                    return BaseEvaluationResult(messages=[], score=0.0, status="failure")
                coroutines.append(dummy_eval())
            else:
                coroutines.append(evaluator.evaluate(messages, data_item, evaluation_output_dir))
        evaluation_results = await gather_with_semaphore(self.semaphore, coroutines, filter_none=False)

Comment on lines +21 to +26
async def gather_with_semaphore(semaphore: asyncio.Semaphore, coroutines: List):
"""Gather results from coroutines with a semaphore to limit concurrency."""
tasks = [worker(semaphore, coroutine) for coroutine in coroutines]
results = await asyncio.gather(*tasks)
filtered_results = [result for result in results if result is not None]
return filtered_results

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

High Severity Correctness/Robustness Issue: gather_with_semaphore currently filters out None values on exception/failure. While this is fine for some tasks, it breaks index alignment in batch processing tasks like _rollout and _evaluate where the length of the output must match the input batch. Adding a filter_none parameter allows callers to opt-out of filtering.

Suggested change
async def gather_with_semaphore(semaphore: asyncio.Semaphore, coroutines: List):
"""Gather results from coroutines with a semaphore to limit concurrency."""
tasks = [worker(semaphore, coroutine) for coroutine in coroutines]
results = await asyncio.gather(*tasks)
filtered_results = [result for result in results if result is not None]
return filtered_results
async def gather_with_semaphore(semaphore: asyncio.Semaphore, coroutines: List, filter_none: bool = True):
"""Gather results from coroutines with a semaphore to limit concurrency."""
tasks = [worker(semaphore, coroutine) for coroutine in coroutines]
results = await asyncio.gather(*tasks)
if filter_none:
return [result for result in results if result is not None]
return results

Comment on lines +233 to +237
# reject: if current score is worse than last score, we will reject the current skills
# and update the recent rejected update buffer for the corresponding viewed skills
else:
# TODO
pass

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

High Severity Bug / Functional Gap: The else block for handling rejected skill updates is currently a TODO and does nothing. As a result, the recent_rejected_update_buffer is never populated. This directly breaks the feedback loop for the micro_skill_manager agent, which is explicitly instructed in its system prompt to understand and avoid recent rejected updates to prevent repeating the same mistakes.

Comment on lines +246 to +255
# build and run macro skill manager agent
# TODO

current_score = await self._validate_or_test(
current_skills_path=current_skills_path,
val_test_set=val_set,
rollout_env=rollout_env,
evaluator=evaluator,
sub_workdir=sub_workdir,
)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

High Severity Inefficiency: The macro skill manager agent is currently a TODO and does nothing, meaning the skills in current_skills_path remain identical to last_skills_path. However, the workflow still runs a full validation rollout and evaluation (_validate_or_test) on these unchanged skills. This is extremely expensive in terms of LLM tokens and execution time. Consider skipping the validation step or directly setting current_score = last_score if the macro skill manager is not implemented or if no changes were made to the skills.

Comment on lines +106 to +112
rollout_agent_config = self.rollout_agent_config.copy()
rollout_agent_config["prompt"] = {
"system": data_item.system
}
rollout_agent_config["skills"].update({
"path": current_skills_path,
})

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

High Severity Bug / Shared State Mutation: self.rollout_agent_config.copy() performs a shallow copy of the DictConfig object. Since the nested "skills" dictionary is not deep-copied, mutating rollout_agent_config["skills"] directly mutates the shared self.rollout_agent_config["skills"] object. This leads to race conditions and incorrect paths being used across concurrent rollout agents. Use copy.deepcopy() to ensure nested configurations are fully independent.

Suggested change
rollout_agent_config = self.rollout_agent_config.copy()
rollout_agent_config["prompt"] = {
"system": data_item.system
}
rollout_agent_config["skills"].update({
"path": current_skills_path,
})
import copy
rollout_agent_config = copy.deepcopy(self.rollout_agent_config)
rollout_agent_config["prompt"] = {
"system": data_item.system
}
rollout_agent_config["skills"].update({
"path": current_skills_path,
})

Comment on lines +147 to +151
skill_manager_agent_config = agent_config.copy()
skill_manager_agent_config["skills"].update({
"path": current_skills_path,
})
skill_manager_agent_config["output_dir"] = skill_manager_output_dir

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

High Severity Bug / Shared State Mutation: agent_config.copy() performs a shallow copy of the DictConfig object. Since the nested "skills" dictionary is not deep-copied, mutating skill_manager_agent_config["skills"] directly mutates the shared agent_config["skills"] object. Use copy.deepcopy() to ensure nested configurations are fully independent.

Suggested change
skill_manager_agent_config = agent_config.copy()
skill_manager_agent_config["skills"].update({
"path": current_skills_path,
})
skill_manager_agent_config["output_dir"] = skill_manager_output_dir
import copy
skill_manager_agent_config = copy.deepcopy(agent_config)
skill_manager_agent_config["skills"].update({
"path": current_skills_path,
})

self.train_config.max_workers = self.train_config.get("max_workers", 1)
self.train_config.reflection_trigger_size = self.train_config.get("reflection_trigger_size", 2)
self.train_config.reflection_group_size = self.train_config.get("reflection_group_size", 4)
self.train_config.rejected_update_buffer_size = self.train_config.get("rejected_update_buffer_size", 3)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Medium Severity Config Mismatch: The configuration key defined in config.yaml is max_rejected_update_buffer_size: 3, but the code reads rejected_update_buffer_size. This causes the user-configured value to be ignored and always fall back to the default of 3.

Suggested change
self.train_config.rejected_update_buffer_size = self.train_config.get("rejected_update_buffer_size", 3)
self.train_config.rejected_update_buffer_size = self.train_config.get("max_rejected_update_buffer_size", 3)

Comment on lines +43 to +46
@abstractmethod
def load_data(self) -> list[BaseDataItem]:
"""Load data from the specified path and return a list of BaseDataItem instances."""
pass

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Medium Severity Signature Mismatch: The abstract method load_data in BaseDataset is defined without any arguments (except self), but in __init__ it is called as self.load_data(self.data_path). The subclass SearchQADataset also defines it as load_data(self, data_path: str). Please update the abstract method signature to accept data_path: str to maintain consistency and satisfy static analysis tools.

Suggested change
@abstractmethod
def load_data(self) -> list[BaseDataItem]:
"""Load data from the specified path and return a list of BaseDataItem instances."""
pass
@abstractmethod
def load_data(self, data_path: str) -> list[BaseDataItem]:
"""Load data from the specified path and return a list of BaseDataItem instances."""
pass

Comment on lines +56 to +61
if self.current_index + batch_size >= len(self.data):
batch = self.data[self.current_index :]
self.current_index = 0 # reset for next epoch
else:
batch = self.data[self.current_index : self.current_index + batch_size]
self.current_index += batch_size

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Medium Severity ML Best Practice: When self.current_index is reset for the next epoch, the dataset is not reshuffled. Reshuffling training data at the start of each epoch is standard practice in machine learning to prevent ordering bias and improve generalization.

Suggested change
if self.current_index + batch_size >= len(self.data):
batch = self.data[self.current_index :]
self.current_index = 0 # reset for next epoch
else:
batch = self.data[self.current_index : self.current_index + batch_size]
self.current_index += batch_size
if self.current_index + batch_size >= len(self.data):
batch = self.data[self.current_index :]
self.current_index = 0 # reset for next epoch
if self.is_train:
random.shuffle(self.data)
else:
batch = self.data[self.current_index : self.current_index + batch_size]
self.current_index += batch_size

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a skill evolution workflow framework, including agent configurations, task definitions for SearchQA, and workflow orchestration utilities. The review feedback highlights several critical issues: a bug in gather_with_semaphore that filters out exceptions and causes silent data misalignment during evaluation; a potential crash in _extract_answer when handling non-string or null responses; infinite trajectory accumulation in trajectories_buffer leading to redundant reflections; an incomplete feedback loop due to an unimplemented TODO for rejected updates; a configuration key mismatch for the rejected update buffer size; and an abstract method signature mismatch for load_data in the base dataset class.

Important

The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.

Comment on lines +11 to +26
async def worker(semaphore: asyncio.Semaphore, coroutine):
"""A worker that runs a coroutine with a semaphore to limit concurrency."""
async with semaphore:
try:
return await coroutine
except Exception as e:
logger.error(f"Error in worker: {e}", exc_info=True)
return None


async def gather_with_semaphore(semaphore: asyncio.Semaphore, coroutines: List):
"""Gather results from coroutines with a semaphore to limit concurrency."""
tasks = [worker(semaphore, coroutine) for coroutine in coroutines]
results = await asyncio.gather(*tasks)
filtered_results = [result for result in results if result is not None]
return filtered_results

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

Scrambled Data Alignment on Exceptions

In gather_with_semaphore, if any coroutine raises an exception, the worker catches it, logs it, and returns None. Then, gather_with_semaphore filters out all None values:

filtered_results = [result for result in results if result is not None]

This shortens the returned list of results. Downstream, _evaluate and other methods zip the results with the original data_batch:

for messages, data_item in zip(rollout_results, data_batch)

If rollout_results is shorter than data_batch, zip will silently truncate the evaluation and misalign the remaining items (e.g., evaluating item_i using the rollout result of item_{i+1}). This leads to silent data corruption and incorrect evaluation metrics.

To fix this, let exceptions propagate so the workflow fails loudly and clearly, preserving correct list length and alignment.

Suggested change
async def worker(semaphore: asyncio.Semaphore, coroutine):
"""A worker that runs a coroutine with a semaphore to limit concurrency."""
async with semaphore:
try:
return await coroutine
except Exception as e:
logger.error(f"Error in worker: {e}", exc_info=True)
return None
async def gather_with_semaphore(semaphore: asyncio.Semaphore, coroutines: List):
"""Gather results from coroutines with a semaphore to limit concurrency."""
tasks = [worker(semaphore, coroutine) for coroutine in coroutines]
results = await asyncio.gather(*tasks)
filtered_results = [result for result in results if result is not None]
return filtered_results
async def worker(semaphore: asyncio.Semaphore, coroutine):
"""A worker that runs a coroutine with a semaphore to limit concurrency."""
async with semaphore:
return await coroutine
async def gather_with_semaphore(semaphore: asyncio.Semaphore, coroutines: List):
"""Gather results from coroutines with a semaphore to limit concurrency."""
tasks = [worker(semaphore, coroutine) for coroutine in coroutines]
return await asyncio.gather(*tasks)

Comment on lines +84 to +91
def _extract_answer(self, response: str) -> str:
matches = re.findall(r"<answer>(.*?)</answer>", response, re.DOTALL | re.IGNORECASE)
if matches:
return matches[-1].strip()
lines = [ln.strip() for ln in response.strip().splitlines() if ln.strip()]
if lines:
return lines[-1]
return response.strip()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Potential Crash When response is None

If the assistant message only contains tool_calls and has no text content (which is extremely common when the agent decides to call a tool first), messages[-1].content will be None. Passing None to _extract_answer will cause a TypeError in re.findall or an AttributeError in response.strip().

Add a defensive check to handle None or non-string responses gracefully.

Suggested change
def _extract_answer(self, response: str) -> str:
matches = re.findall(r"<answer>(.*?)</answer>", response, re.DOTALL | re.IGNORECASE)
if matches:
return matches[-1].strip()
lines = [ln.strip() for ln in response.strip().splitlines() if ln.strip()]
if lines:
return lines[-1]
return response.strip()
def _extract_answer(self, response: Optional[str]) -> str:
if not response or not isinstance(response, str):
return ""
matches = re.findall(r"<answer>(.*?)</answer>", response, re.DOTALL | re.IGNORECASE)
if matches:
return matches[-1].strip()
lines = [ln.strip() for ln in response.strip().splitlines() if ln.strip()]
if lines:
return lines[-1]
return response.strip()

Comment on lines +473 to +487
def _format_viewed_skills_queries(self) -> Dict[str, list[str]]:
viewed_skills_queries = defaultdict(list)
for status in ["success", "failure"]:
for viewed_skills, status_dict in self.trajectories_buffer.items():
trajectories = status_dict.get(status, [])
if len(trajectories) < self.train_config.reflection_trigger_size:
continue
# group trajectories into chunks of size reflection_group_size
for i in range(0, len(trajectories), self.train_config.reflection_group_size):
end = min(i + self.train_config.reflection_group_size, len(trajectories))
group_trajectories = trajectories[i:end]
# format the group trajectories into a single query for reflection
query = "Trajectories:\n\n" + "\n\n---\n\n".join(group_trajectories)
viewed_skills_queries[viewed_skills].append(query)
return viewed_skills_queries

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Infinite Accumulation and Redundant Reflections in trajectories_buffer

self.trajectories_buffer is updated at each training step but is never cleared or pruned after generating reflection queries. As training progresses, the buffer will accumulate all trajectories from previous steps, causing the reflector agent to repeatedly reflect on the same old trajectories. This leads to massive token waste, potential context window overflow, and degraded skill optimization quality.

Clear the processed trajectories from the buffer after generating the reflection queries.

    def _format_viewed_skills_queries(self) -> Dict[str, list[str]]:
        viewed_skills_queries = defaultdict(list)
        for status in ["success", "failure"]:
            for viewed_skills, status_dict in self.trajectories_buffer.items():
                trajectories = status_dict.get(status, [])
                if len(trajectories) < self.train_config.reflection_trigger_size:
                    continue
                # group trajectories into chunks of size reflection_group_size
                for i in range(0, len(trajectories), self.train_config.reflection_group_size):
                    end = min(i + self.train_config.reflection_group_size, len(trajectories))
                    group_trajectories = trajectories[i:end]
                    # format the group trajectories into a single query for reflection
                    query = "Trajectories:\n\n" + "\n\n---\n\n".join(group_trajectories)
                    viewed_skills_queries[viewed_skills].append(query)
                # Clear the processed trajectories to avoid redundant reflections in subsequent steps
                status_dict[status] = []
        return viewed_skills_queries

Comment on lines +233 to +237
# reject: if current score is worse than last score, we will reject the current skills
# and update the recent rejected update buffer for the corresponding viewed skills
else:
# TODO
pass

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Incomplete Feedback Loop Due to Missing Rejected Updates Handling

The else block for handling rejected updates is left as a TODO and currently does nothing. Because of this, self.recent_rejected_update_buffer is never populated when an update is rejected. Consequently, the micro skill manager agent will never receive previous rejected updates in its prompt, completely breaking the core feedback loop of avoiding previous mistakes.

Comment on lines +441 to +448
if tool_call.get("tool_name", "") == self.SKILL_VIEW_TOOL_NAME:
try:
skill_id = json.loads(tool_call["arguments"])["skill_id"]
viewed_skills.add(skill_id)
except Exception as e:
logger.warning(f"Failed to extract skill_id from tool_call arguments: "
f"{tool_call['arguments']}. Error: {e}")
continue

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Defensive Handling of Tool Call Arguments

tool_call["arguments"] is assumed to always be a JSON-encoded string. However, depending on the LLM client or custom agent implementation, arguments might already be parsed into a Python dictionary. Calling json.loads on a dictionary will raise a TypeError.

Use defensive type checking to handle both string and dictionary formats safely.

Suggested change
if tool_call.get("tool_name", "") == self.SKILL_VIEW_TOOL_NAME:
try:
skill_id = json.loads(tool_call["arguments"])["skill_id"]
viewed_skills.add(skill_id)
except Exception as e:
logger.warning(f"Failed to extract skill_id from tool_call arguments: "
f"{tool_call['arguments']}. Error: {e}")
continue
if tool_call.get("tool_name", "") == self.SKILL_VIEW_TOOL_NAME:
try:
args = tool_call.get("arguments", {})
if isinstance(args, str):
args = json.loads(args)
skill_id = args.get("skill_id")
if skill_id:
viewed_skills.add(skill_id)
except Exception as e:
logger.warning(f"Failed to extract skill_id from tool_call arguments: "
f"{tool_call.get('arguments')}. Error: {e}")
continue

self.train_config.max_workers = self.train_config.get("max_workers", 1)
self.train_config.reflection_trigger_size = self.train_config.get("reflection_trigger_size", 2)
self.train_config.reflection_group_size = self.train_config.get("reflection_group_size", 4)
self.train_config.rejected_update_buffer_size = self.train_config.get("rejected_update_buffer_size", 3)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Configuration Key Mismatch

In config.yaml, the key is defined as max_rejected_update_buffer_size: 3, but the workflow code looks for rejected_update_buffer_size. This mismatch causes the configured value to be silently ignored and fallback to the default of 3.

Update the key to max_rejected_update_buffer_size to align with the configuration file.

Suggested change
self.train_config.rejected_update_buffer_size = self.train_config.get("rejected_update_buffer_size", 3)
self.train_config.max_rejected_update_buffer_size = self.train_config.get("max_rejected_update_buffer_size", 3)

Comment on lines +68 to +70
self.recent_rejected_update_buffer: Dict[str, Deque[str]] = defaultdict(
lambda: deque(maxlen=self.train_config.rejected_update_buffer_size)
)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Update Buffer Size Reference to Match Configuration

Update the reference to use max_rejected_update_buffer_size instead of rejected_update_buffer_size to match the configuration file.

Suggested change
self.recent_rejected_update_buffer: Dict[str, Deque[str]] = defaultdict(
lambda: deque(maxlen=self.train_config.rejected_update_buffer_size)
)
self.recent_rejected_update_buffer: Dict[str, Deque[str]] = defaultdict(
lambda: deque(maxlen=self.train_config.max_rejected_update_buffer_size)
)

Comment on lines +43 to +46
@abstractmethod
def load_data(self) -> list[BaseDataItem]:
"""Load data from the specified path and return a list of BaseDataItem instances."""
pass

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Abstract Method Signature Mismatch

The abstract method load_data is defined with no arguments: def load_data(self) -> list[BaseDataItem]. However, in __init__ it is called with self.data_path, and the subclass SearchQADataset implements it as def load_data(self, data_path: str). This signature mismatch violates the Liskov Substitution Principle and will cause static type checkers or linters to fail.

Update the abstract method signature to accept data_path.

Suggested change
@abstractmethod
def load_data(self) -> list[BaseDataItem]:
"""Load data from the specified path and return a list of BaseDataItem instances."""
pass
@abstractmethod
def load_data(self, data_path: str) -> list[BaseDataItem]:
"""Load data from the specified path and return a list of BaseDataItem instances."""
pass

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant