diff --git a/xtuner/v1/rl/agent_loop/__init__.py b/xtuner/v1/rl/agent_loop/__init__.py index da2802fb0..331dedd47 100644 --- a/xtuner/v1/rl/agent_loop/__init__.py +++ b/xtuner/v1/rl/agent_loop/__init__.py @@ -8,11 +8,14 @@ RouterAgentLoop, get_agent_loop_rollout_ctl, ) +from .localhost_agent_loop.agent_in_localhost_loop import AgentInLocalhostLoop, AgentInLocalhostLoopConfig from .sandbox_agent_loop.agent_in_sandbox_loop import AgentInSandboxLoop, AgentInSandboxLoopConfig from .single_turn_agent_loop import SingleTurnAgentLoop, SingleTurnAgentLoopConfig __all__ = [ + "AgentInLocalhostLoop", + "AgentInLocalhostLoopConfig", "AgentInSandboxLoop", "AgentInSandboxLoopConfig", "AgentLoopConfig", diff --git a/xtuner/v1/rl/agent_loop/localhost_agent_loop/__init__.py b/xtuner/v1/rl/agent_loop/localhost_agent_loop/__init__.py new file mode 100644 index 000000000..c6ce999c8 --- /dev/null +++ b/xtuner/v1/rl/agent_loop/localhost_agent_loop/__init__.py @@ -0,0 +1,34 @@ +"""Public surface for the localhost_agent_loop runner.""" + +from xtuner.v1.rl.agent_loop.localhost_agent_loop.agent_in_localhost_loop import ( + AgentInLocalhostLoop, + AgentInLocalhostLoopConfig, +) +from xtuner.v1.rl.agent_loop.localhost_agent_loop.compose import LocalhostComposeStage +from xtuner.v1.rl.agent_loop.localhost_agent_loop.judger import LocalhostJudgerStage +from xtuner.v1.rl.agent_loop.localhost_agent_loop.runner import LocalhostRunner +from xtuner.v1.rl.agent_loop.localhost_agent_loop.schemas import LocalhostAgentSpec +from xtuner.v1.rl.agent_loop.localhost_agent_loop.stage import LocalhostStage +from xtuner.v1.rl.agent_loop.sandbox_agent_loop.schemas import ( + AgentRolloutItem, + RolloutError, + RolloutStatus, + StageRecord, + StageStatus, +) + + +__all__ = [ + "AgentInLocalhostLoop", + "AgentInLocalhostLoopConfig", + "AgentRolloutItem", + "LocalhostAgentSpec", + "LocalhostComposeStage", + "LocalhostJudgerStage", + "LocalhostRunner", + "LocalhostStage", + "RolloutError", + "RolloutStatus", + "StageRecord", + "StageStatus", +] diff --git a/xtuner/v1/rl/agent_loop/localhost_agent_loop/agent_in_localhost_loop.py b/xtuner/v1/rl/agent_loop/localhost_agent_loop/agent_in_localhost_loop.py new file mode 100644 index 000000000..a82229e8f --- /dev/null +++ b/xtuner/v1/rl/agent_loop/localhost_agent_loop/agent_in_localhost_loop.py @@ -0,0 +1,156 @@ +from __future__ import annotations + +import asyncio +import copy +import importlib +import traceback +import uuid +from typing import Any + +from lagent.utils import create_object + +from xtuner.v1.data_proto.rl_data import RolloutState, SampleParams, Status +from xtuner.v1.rl.agent_loop.sandbox_agent_loop.schemas import ( + AgentRolloutItem, + RolloutStatus, +) +from xtuner.v1.rl.judger import Judger +from xtuner.v1.rl.rollout import RolloutController +from xtuner.v1.rl.rollout.trace_store import get_store +from xtuner.v1.rl.utils import create_task + +from ..agent_loop import AgentLoop, AgentLoopConfig + + +def _import_from_path(path: str) -> Any: + module_name, _, attr = path.rpartition(".") + if not module_name or not attr: + raise ValueError(f"Invalid import path: {path!r}. Expected 'module.attr'.") + module = importlib.import_module(module_name) + return getattr(module, attr) + + +def _resolve_runner(pipeline: Any) -> Any: + if isinstance(pipeline, str): + pipeline = _import_from_path(pipeline) + if isinstance(pipeline, dict): + return create_object(copy.deepcopy(pipeline)) + return pipeline + + +class AgentInLocalhostLoopConfig(AgentLoopConfig): + """Run a localhost agent runner from ``RolloutState.extra_fields``.""" + + max_concurrent_samples: int | None = None + + def build_local( + self, + rollout_controller: RolloutController | None = None, + judger: Judger | None = None, + logger=None, + ) -> AgentInLocalhostLoop: + return AgentInLocalhostLoop( + rollout_ctl=rollout_controller, + sample_params=self.sample_params, + hf_checkpoint=self.hf_checkpoint, + judger=judger, + logger=logger, + max_concurrent_samples=self.max_concurrent_samples, + ) + + +class AgentInLocalhostLoop(AgentLoop): + """AgentLoop adapter for localhost_agent_loop runners.""" + + def __init__( + self, + rollout_ctl: RolloutController | None = None, + sample_params: SampleParams | None = None, + hf_checkpoint: str | None = None, + judger: Judger | None = None, + logger=None, + max_concurrent_samples: int | None = None, + ): + super().__init__(rollout_ctl, sample_params, hf_checkpoint, judger, logger) + self.max_concurrent_samples = max_concurrent_samples + self._sample_semaphore = asyncio.Semaphore(max_concurrent_samples) if max_concurrent_samples else None + + async def generate_group(self, rollout_state: list[RolloutState], **kwargs) -> list[RolloutState]: + async def generate_one(state: RolloutState) -> RolloutState: + if self._sample_semaphore is None: + return await self.generate_sample(state, **kwargs) + async with self._sample_semaphore: + return await self.generate_sample(state, **kwargs) + + tasks = [] + for state in rollout_state: + state.sample_params = self.sample_params + tasks.append(create_task(generate_one(state))) + return await asyncio.gather(*tasks) + + async def generate_sample(self, rollout_state: RolloutState, **kwargs) -> RolloutState: + try: + item = self._rollout_item(rollout_state) + if rollout_state.uid is None: + rollout_state.uid = uuid.uuid4().int + item.uid = rollout_state.uid + item.group_id = rollout_state.message_uid + result = await self._run_item(item) + await self._fill_rollout_state(rollout_state, result) + return rollout_state + except Exception as exc: + rollout_state.status = Status.FAILED + rollout_state.finish_reason = "error" + rollout_state.error_msg = f"{type(exc).__name__}: {exc}" + self.logger.error(f"[AgentInLocalhostLoop] failed: {exc}\n{traceback.format_exc()}") + return rollout_state + + def _rollout_item(self, rollout_state: RolloutState) -> AgentRolloutItem: + raw_item = rollout_state.extra_fields["rollout_item"] + if isinstance(raw_item, AgentRolloutItem): + return raw_item.model_copy(deep=True) + return AgentRolloutItem.model_validate(raw_item).model_copy(deep=True) + + async def _run_item(self, item: AgentRolloutItem) -> AgentRolloutItem: + runner = _resolve_runner(item.pipeline) + if runner is None: + raise ValueError("AgentRolloutItem.pipeline is required.") + return await runner.run(item) + + async def _fill_rollout_state(self, rollout_state: RolloutState, item: AgentRolloutItem) -> None: + trace = item.artifacts.get("messages") + if not isinstance(trace, list) or not trace: + raise ValueError("Agent artifacts must contain at least one trainable messages trace.") + segment = trace[-1] + if not isinstance(segment, dict) or "messages" not in segment or "tools" not in segment: + raise ValueError("Agent messages trace segment must contain messages and tools.") + messages = segment["messages"] + if not isinstance(messages, list): + raise TypeError("Agent messages trace segment.messages must be a list.") + text = self.tokenizer.apply_chat_template( + messages, + tools=segment["tools"], + tokenize=False, + add_generation_prompt=False, + ) + prompt_text = text[:-1] if text.endswith("\n") else text + data = await get_store().export_training_trace.remote(str(rollout_state.uid), prompt_text) + + rollout_state.input_ids = data["input_ids"] + rollout_state.labels = data["labels"] + rollout_state.response_ids = [ + token_id for token_id, label in zip(data["input_ids"][1:], data["labels"][1:]) if label != -100 + ] + rollout_state.logprobs = data["logprobs"] + rollout_state.routed_experts = data["routed_experts"] + rollout_state.response = str(item.artifacts.get("response") or "") + rollout_state.finish_reason = "stop" if item.status == RolloutStatus.COMPLETED else "error" + rollout_state.status = Status.COMPLETED if item.status == RolloutStatus.COMPLETED else Status.FAILED + rollout_state.reward = {"score": item.reward} + rollout_state.extra_fields["raw_prompt"] = prompt_text + rollout_state.extra_fields["agent_artifacts"] = item.artifacts + if item.error is not None: + rollout_state.error_msg = f"{item.error.stage}/{item.error.category}: {item.error.message}" + + +__all__ = ["AgentInLocalhostLoop", "AgentInLocalhostLoopConfig"] diff --git a/xtuner/v1/rl/agent_loop/localhost_agent_loop/compose.py b/xtuner/v1/rl/agent_loop/localhost_agent_loop/compose.py new file mode 100644 index 000000000..c09d30590 --- /dev/null +++ b/xtuner/v1/rl/agent_loop/localhost_agent_loop/compose.py @@ -0,0 +1,72 @@ +"""Composable localhost stages.""" + +from __future__ import annotations + +import time +from typing import Any + +from lagent.utils import create_object + +from xtuner.v1.rl.agent_loop.sandbox_agent_loop.schemas import ( + AgentRolloutItem, + RolloutError, + StageRecord, + StageStatus, +) + + +class LocalhostComposeStage: + """Compose multiple local validation stages behind ``run(item, record) -> + float``.""" + + def __init__( + self, + stages: list[Any], + *, + name: str = "validate", + weight: float = 1.0, + ): + if not stages: + raise ValueError("LocalhostComposeStage.stages is empty") + self.name = name + self.stages = [create_object(stage) for stage in stages] + self.weight = weight + + async def run(self, item: AgentRolloutItem, record: StageRecord) -> float: + record.status = StageStatus.RUNNING + record.started_at = record.started_at or time.monotonic() + try: + weighted_score = 0.0 + total_weight = 0.0 + for stage in self.stages: + name = getattr(stage, "name", stage.__class__.__name__) + child_record = item.judgers.setdefault(name, StageRecord()) + score = float(await stage.run(item, child_record)) + stage_weight = max(float(getattr(stage, "weight", 1.0)), 0.0) + weighted_score += score * stage_weight + total_weight += stage_weight + record.score = weighted_score / total_weight if total_weight > 0 else 0.0 + record.status = StageStatus.COMPLETED + return record.score + except Exception as exc: + record.status = StageStatus.FAILED + child_error = next( + (child.error for child in item.judgers.values() if child.error is not None), + None, + ) + record.error = ( + record.error + or child_error + or RolloutError( + stage=self.name, + category="validate_failed", + type=type(exc).__name__, + message=str(exc), + ) + ) + raise + finally: + record.finished_at = time.monotonic() + + +__all__ = ["LocalhostComposeStage"] diff --git a/xtuner/v1/rl/agent_loop/localhost_agent_loop/judger.py b/xtuner/v1/rl/agent_loop/localhost_agent_loop/judger.py new file mode 100644 index 000000000..c0abb337a --- /dev/null +++ b/xtuner/v1/rl/agent_loop/localhost_agent_loop/judger.py @@ -0,0 +1,117 @@ +"""Localhost judger stages.""" + +from __future__ import annotations + +import time +from copy import deepcopy +from typing import Any + +from lagent.utils import create_object + +from xtuner.v1.data_proto.rl_data import RolloutState, Status +from xtuner.v1.rl.agent_loop.sandbox_agent_loop.schemas import ( + AgentRolloutItem, + RolloutError, + StageRecord, + StageStatus, +) +from xtuner.v1.rl.judger.native import Judger + + +class LocalhostJudgerStage: + """Run one local validation stage. + + Public stage interface is ``run(item, record) -> float``. ``RolloutState`` + is only the internal shape needed to reuse xtuner judgers. + """ + + def __init__( + self, + *, + name: str, + judger_config: Any | None = None, + judger: Any | None = None, + reward_key: str = "score", + weight: float = 1.0, + ): + if judger_config is None: + judger_config = judger + if judger_config is None: + raise ValueError("LocalhostJudgerStage requires judger_config") + self.name = name + self._judger_config = judger_config + self._judger: Judger | Any | None = None + self.reward_key = reward_key + self.weight = weight + + async def run(self, item: AgentRolloutItem, record: StageRecord) -> float: + record.status = StageStatus.RUNNING + record.started_at = record.started_at or time.monotonic() + try: + reward_model = dict(item.reward_model or {}) + + messages_artifact = item.artifacts.get("messages") + if messages_artifact is not None: + if not isinstance(messages_artifact, list) or not messages_artifact: + raise ValueError("Agent messages artifact must be a non-empty list.") + segment = messages_artifact[-1] + if not isinstance(segment, dict) or "messages" not in segment or "tools" not in segment: + raise ValueError("Agent messages trace segment must contain messages and tools.") + messages = segment["messages"] + if not isinstance(messages, list): + raise TypeError("Agent messages trace segment.messages must be a list.") + tool_turns = sum( + 1 + for message in messages + if isinstance(message, dict) + and isinstance(message.get("tool_calls"), list) + and message["tool_calls"] + ) + reward_model.setdefault("agent_trace", messages) + reward_model.setdefault("num_turns", tool_turns) + + response = str(item.artifacts.get("response") or "") + rollout_state = RolloutState( + message=[{"role": "user", "content": item.instruction}], + response=response, + reward_model=reward_model, + status=Status.COMPLETED if item.infer.status == StageStatus.COMPLETED else Status.FAILED, + ) + judged = await self.build().judge(rollout_state) + reward_payload = judged.reward or {} + if self.reward_key not in reward_payload: + raise KeyError(f"judger reward payload has no {self.reward_key!r}: {reward_payload!r}") + record.metadata["reward"] = reward_payload + record.score = float(reward_payload[self.reward_key]) + record.status = StageStatus.COMPLETED + return record.score + except Exception as exc: + record.status = StageStatus.FAILED + record.error = record.error or RolloutError( + stage=self.name, + category="judger", + type=type(exc).__name__, + message=str(exc), + ) + raise + finally: + record.finished_at = time.monotonic() + + def build(self) -> Judger | Any: + if self._judger is None: + if isinstance(self._judger_config, dict): + config = create_object(deepcopy(self._judger_config)) + else: + config = self._judger_config + if hasattr(config, "build"): + self._judger = config.build() + elif hasattr(config, "judge"): + self._judger = config + else: + raise TypeError( + f"judger_config must build a Judger or be a Judger-like object, got {type(config).__name__}" + ) + return self._judger + + +__all__ = ["LocalhostJudgerStage"] diff --git a/xtuner/v1/rl/agent_loop/localhost_agent_loop/local_run.py b/xtuner/v1/rl/agent_loop/localhost_agent_loop/local_run.py new file mode 100644 index 000000000..0db42d215 --- /dev/null +++ b/xtuner/v1/rl/agent_loop/localhost_agent_loop/local_run.py @@ -0,0 +1,168 @@ +from __future__ import annotations + +import argparse +import asyncio +import importlib.util +import json +import os +import sys +import traceback +from copy import deepcopy +from pathlib import Path +from typing import Any + +from lagent.utils import create_object + +from xtuner.v1.data_proto.rl_data import RolloutState +from xtuner.v1.rl.agent_loop.localhost_agent_loop import AgentInLocalhostLoop, AgentRolloutItem + + +def _load_config(path: Path) -> Any: + spec = importlib.util.spec_from_file_location("local_run_config", path) + if spec is None or spec.loader is None: + raise RuntimeError(f"cannot load {path}") + mod = importlib.util.module_from_spec(spec) + spec.loader.exec_module(mod) + return mod + + +async def _run_runner(dataset: Any, item: AgentRolloutItem) -> dict[str, Any]: + runner_cfg = item.pipeline or dataset.pipeline + runner = create_object(deepcopy(runner_cfg)) if isinstance(runner_cfg, dict) else runner_cfg + result = await runner.run(item) + dumped = result.model_dump(mode="json", exclude={"artifacts", "pipeline"}) + dumped["artifacts"] = _serialize_artifacts(result.artifacts) + return dumped + + +async def _run_agentloop(dataset: Any, item: AgentRolloutItem, agent_loop: AgentInLocalhostLoop) -> dict[str, Any]: + item = item.model_copy(update={"pipeline": item.pipeline or dataset.pipeline}, deep=True) + content = _instruction_text(item) + prompt_ids = agent_loop.tokenizer.encode(content, add_special_tokens=False) + + rollout_state = RolloutState( + message=[{"role": "user", "content": content}], + prompt_ids=prompt_ids, + num_tokens=len(prompt_ids), + data_source={item.data_source: 1.0}, + reward_model=item.reward_model or {"style": item.data_source}, + uid=item.uid, + message_uid=item.group_id, + extra_fields={"rollout_item": item}, + ) + result = await agent_loop.generate_sample(rollout_state) + return { + "id": item.id, + "status": result.status.value, + "reward": result.reward["score"] if result.reward and "score" in result.reward else None, + "error": result.error_msg, + "finish_reason": result.finish_reason, + "response": result.response, + "response_ids_len": len(result.response_ids or []), + "prompt_ids_len": len(result.prompt_ids or []), + "input_ids_len": len(result.input_ids or []), + "agent_artifacts": _serialize_artifacts(result.extra_fields.get("agent_artifacts", {})), + } + + +def _instruction_text(item: AgentRolloutItem) -> str: + if item.task_root is not None: + path = Path(item.task_root) / item.instruction + if path.exists(): + return path.read_text(encoding="utf-8") + return item.instruction + + +def _serialize_artifacts(artifacts: dict[str, Any]) -> dict[str, Any]: + out: dict[str, Any] = {} + for key, value in artifacts.items(): + if isinstance(value, (bytes, bytearray)): + out[key] = f"<{len(value)} bytes>" + else: + out[key] = value + return out + + +async def main_async(args: argparse.Namespace) -> int: + cfg = _load_config(Path(args.config)) + dataset = cfg.build_dataset(args.jsonl_path) + + if args.mode == "agentloop": + import ray + + ray.init(address=args.ray_address) + + if args.tasks: + wanted = set(args.tasks) + pairs = [(tid, item) for tid, item in dataset.iter_tasks() if tid in wanted] + else: + pairs = list(dataset.iter_tasks()) + if args.limit: + pairs = pairs[: args.limit] + if not pairs: + print("no tasks to run", file=sys.stderr) + return 1 + + print( + f"running {len(pairs)} task(s) mode={args.mode} concurrency={args.concurrency}", + file=sys.stderr, + ) + sem = asyncio.Semaphore(max(1, args.concurrency)) + agent_loop = None + if args.mode == "agentloop": + if not args.hf_checkpoint: + raise ValueError("--hf-checkpoint is required in agentloop mode.") + agent_loop = AgentInLocalhostLoop(hf_checkpoint=args.hf_checkpoint) + + async def guarded(idx: int, tid: str, item: AgentRolloutItem) -> dict[str, Any]: + async with sem: + item = item.model_copy(update={"group_id": 0, "uid": idx}) + try: + if args.mode == "agentloop": + assert agent_loop is not None + return await _run_agentloop(dataset, item, agent_loop) + return await _run_runner(dataset, item) + except Exception as exc: + tb = traceback.format_exc() + print(f"[{item.id}] uncaught: {type(exc).__name__}: {exc}\n{tb}", file=sys.stderr) + return {"id": item.id, "error": f"{type(exc).__name__}: {exc}", "traceback": tb} + + out_fp = open(args.output, "w", encoding="utf-8") if args.output else None + try: + coros = [guarded(i, tid, item) for i, (tid, item) in enumerate(pairs)] + for coro in asyncio.as_completed(coros): + result = await coro + line = json.dumps(result, ensure_ascii=False) + if out_fp is not None: + out_fp.write(line + "\n") + out_fp.flush() + print(json.dumps({k: result.get(k) for k in ("id", "status", "reward", "error")}, ensure_ascii=False)) + finally: + if out_fp is not None: + out_fp.close() + return 0 + + +def main(default_config: Path, argv: list[str] | None = None) -> int: + parser = argparse.ArgumentParser(description="Run localhost_agent_loop tasks for this recipe.") + parser.add_argument("--jsonl-path", required=True, help="Path to JSONL data file.") + parser.add_argument("--config", default=str(default_config)) + parser.add_argument("--tasks", nargs="*", help="Specific task ids to run; default: all from dataset") + parser.add_argument("--limit", type=int, default=0, help="Limit total tasks (0=all)") + parser.add_argument("--concurrency", type=int, default=4) + parser.add_argument("--output", help="Optional JSONL path to dump full per-sample results") + parser.add_argument("--mode", choices=("runner", "agentloop"), default="runner") + parser.add_argument( + "--hf-checkpoint", + default=os.environ.get("HF_CHECKPOINT") or os.environ.get("QWEN3P5_VL_MODEL_PATH"), + help="Tokenizer/processor checkpoint used by agentloop mode.", + ) + parser.add_argument( + "--ray-address", + default=os.environ.get("RAY_ADDRESS", "auto"), + help="Ray address used by agentloop mode.", + ) + return asyncio.run(main_async(parser.parse_args(argv))) + + +__all__ = ["main", "main_async"] diff --git a/xtuner/v1/rl/agent_loop/localhost_agent_loop/runner.py b/xtuner/v1/rl/agent_loop/localhost_agent_loop/runner.py new file mode 100644 index 000000000..80dad0b96 --- /dev/null +++ b/xtuner/v1/rl/agent_loop/localhost_agent_loop/runner.py @@ -0,0 +1,123 @@ +"""LocalhostRunner: orchestrates local infer and optional validation.""" + +from __future__ import annotations + +import time +import traceback +from typing import Any + +from lagent.utils import create_object + +from xtuner.v1.rl.agent_loop.localhost_agent_loop.stage import LocalhostStage +from xtuner.v1.rl.agent_loop.sandbox_agent_loop.schemas import ( + AgentRolloutItem, + RolloutError, + RolloutStatus, + StageRecord, + StageStatus, +) +from xtuner.v1.utils import get_logger + + +class LocalhostRunner: + """Drive one rollout sample on a lagent agent running on this machine.""" + + def __init__( + self, + infer: LocalhostStage | dict[str, Any], + validate: Any | dict[str, Any] | None = None, + ): + self.infer: LocalhostStage = create_object(infer) if isinstance(infer, dict) else infer + self.validate = create_object(validate) if isinstance(validate, dict) else validate + + async def run(self, item: AgentRolloutItem) -> AgentRolloutItem: + if not item.instruction: + raise ValueError("AgentRolloutItem.instruction is required by LocalhostRunner.run") + item.status = RolloutStatus.RUNNING + tid = item.id + t_validate: float | None = None + + try: + infer_result = await self.infer.run(item, item.infer) + if not infer_result.ok: + return self._fail(item, item.infer.error) + + if self.validate is not None: + t0 = time.monotonic() + validate_name = getattr(self.validate, "name", "validate") + validate_record = item.judgers.setdefault( + validate_name, + StageRecord(), + ) + try: + score = float(await self.validate.run(item, validate_record)) + except Exception: + return self._fail( + item, + validate_record.error + or _first_judger_error(item) + or RolloutError( + stage=validate_name, + category="validate_failed", + type=type(self.validate).__name__, + message="validate failed", + ), + ) + t_validate = time.monotonic() - t0 + item.reward = score + + item.status = RolloutStatus.COMPLETED + return item + except Exception as exc: + promoted = ( + item.infer.error + or _first_judger_error(item) + or RolloutError( + stage="runner", + category="runner_exception", + type=type(exc).__name__, + message=str(exc), + ) + ) + get_logger().error(f"[{tid}] traceback:\n{traceback.format_exc()}") + return self._fail(item, promoted) + finally: + self._log_final(tid, item, t_validate) + + # -- internals -- + + def _fail(self, item: AgentRolloutItem, error: RolloutError | None) -> AgentRolloutItem: + item.status = RolloutStatus.FAILED + if item.error is None: + item.error = error + if item.infer.status == StageStatus.RUNNING and item.infer.error is None: + item.infer.status = StageStatus.FAILED + item.infer.error = error + if error is not None: + get_logger().error(f"[{item.id}] failed: {error.category}: {error.message}") + else: + get_logger().error(f"[{item.id}] failed: unknown error") + return item + + def _log_final(self, tid: str, item: AgentRolloutItem, t_validate: float | None) -> None: + agent_name = item.infer.agent.name if item.infer.agent is not None else "?" + parts = [f"status={item.status.value}", f"agent={agent_name}"] + if item.reward is not None: + parts.append(f"reward={item.reward:.4f}") + if item.infer.started_at and item.infer.finished_at: + parts.append(f"t_infer={item.infer.finished_at - item.infer.started_at:.1f}s") + if t_validate is not None: + parts.append(f"t_validate={t_validate:.1f}s") + if item.status == RolloutStatus.FAILED and item.error: + parts.append(f"error={item.error.stage or '?'}/{item.error.category}") + get_logger().info(f"[{tid}] done {' '.join(parts)}") + + +def _first_judger_error(item: AgentRolloutItem) -> RolloutError | None: + for record in item.judgers.values(): + if record.error is not None: + return record.error + return None + + +__all__ = ["LocalhostRunner"] diff --git a/xtuner/v1/rl/agent_loop/localhost_agent_loop/schemas.py b/xtuner/v1/rl/agent_loop/localhost_agent_loop/schemas.py new file mode 100644 index 000000000..70515e4f5 --- /dev/null +++ b/xtuner/v1/rl/agent_loop/localhost_agent_loop/schemas.py @@ -0,0 +1,39 @@ +"""Schemas specific to the localhost_agent_loop runner. + +Shared types (``AgentRolloutItem``, ``RolloutError``, ``StageStatus`` ...) are +re-exported from :mod:`sandbox_agent_loop.schemas` so localhost and sandbox +runners agree on item shape; the only local addition is +:class:`LocalhostAgentSpec`. +""" + +from __future__ import annotations + +from pydantic import BaseModel, ConfigDict + + +class LocalhostAgentSpec(BaseModel): + """One local agent variant. + + ``config`` holds the agent definition. Two forms accepted: + + - **dotted-path string** (preferred, mirrors sandbox ``AgentSpec.config="config.py"``): + ``"recipe.math_ci_eval.infer.agents.python.config"`` resolves at run + time to ``.agent_config`` via ``importlib``. Use + ``":"`` to point at a different attribute name. + - **inline dict**: the lagent ``FunctionCallAgent`` config dict + directly. Convenient for tests; verbose for production pipelines. + + The runner resolves ``config`` to a dict and ``create_object``s it once + per rollout (so module-level side effects in the agent module fire only + at first import). Nothing about the variant leaks outside the + resolved dict. + """ + + model_config = ConfigDict(extra="forbid", arbitrary_types_allowed=True) + + name: str + config: str | dict + weight: float = 1.0 + + +__all__ = ["LocalhostAgentSpec"] diff --git a/xtuner/v1/rl/agent_loop/localhost_agent_loop/stage.py b/xtuner/v1/rl/agent_loop/localhost_agent_loop/stage.py new file mode 100644 index 000000000..15ac13e44 --- /dev/null +++ b/xtuner/v1/rl/agent_loop/localhost_agent_loop/stage.py @@ -0,0 +1,136 @@ +"""LocalhostStage: run a local lagent agent.""" + +from __future__ import annotations + +import importlib +import inspect +import time +from copy import deepcopy +from typing import Any + +from lagent.utils import create_object + +from xtuner.v1.rl.agent_loop.localhost_agent_loop.schemas import LocalhostAgentSpec +from xtuner.v1.rl.agent_loop.sandbox_agent_loop.schemas import ( + AgentRolloutItem, + RolloutError, + SelectedAgentRecord, + StageRecord, + StageResult, + StageStatus, +) +from xtuner.v1.utils import get_logger + + +class LocalhostStage: + """Run one local infer stage. + + Local infer does not need a sandbox-style command layer. The stage owns agent selection, creation, execution, and + artifact recording directly. + """ + + def __init__( + self, + *, + agents: list[LocalhostAgentSpec | dict[str, Any]], + name: str = "infer", + ): + if not agents: + raise ValueError("LocalhostStage.agents is empty") + self.agents = [create_object(agent) if isinstance(agent, dict) else agent for agent in agents] + self.name = name + + async def run(self, item: AgentRolloutItem, record: StageRecord) -> StageResult: + record.status = StageStatus.RUNNING + record.started_at = record.started_at or time.monotonic() + agent = None + try: + spec = self._pick_agent(item, record) + agent = create_object(deepcopy(_resolve_agent_config(spec.config))) + output = await agent(item.instruction) + content = output.content if hasattr(output, "content") else output + item.artifacts["response"] = content if isinstance(content, str) else str(content) + item.artifacts["messages"] = agent.get_messages() + result = StageResult(stdout=item.artifacts["response"], return_code=0) + record.entry_result = result + record.status = StageStatus.COMPLETED + return result + except Exception as exc: + record.status = StageStatus.FAILED + record.error = record.error or RolloutError( + stage=self.name, + category="agent_exception", + type=type(exc).__name__, + message=str(exc), + ) + result = StageResult(return_code=None, error=str(exc), stderr=str(exc)) + record.entry_result = result + return result + finally: + record.finished_at = time.monotonic() + if agent is not None: + await _close_agent(agent) + + def _pick_agent(self, item: AgentRolloutItem, record: StageRecord) -> LocalhostAgentSpec: + group_id = item.group_id or 0 + weights = [max(agent.weight, 0.0) for agent in self.agents] + total = sum(weights) + if total <= 0: + chosen = self.agents[group_id % len(self.agents)] + else: + target = (group_id * 2654435761 % 2**32) / 2**32 * total + running = 0.0 + chosen = self.agents[-1] + for spec, weight in zip(self.agents, weights): + running += weight + if target < running: + chosen = spec + break + + record.agent = SelectedAgentRecord( + name=chosen.name, + config=chosen.config if isinstance(chosen.config, str) else "", + weight=chosen.weight, + template_root="", + ) + return chosen + + +def _resolve_agent_config(config: str | dict[str, Any]) -> dict[str, Any]: + if isinstance(config, dict): + return config + if isinstance(config, str): + if ":" in config: + module_path, attr = config.split(":", 1) + else: + module_path, attr = config, "agent_config" + module = importlib.import_module(module_path) + try: + value = getattr(module, attr) + except AttributeError as exc: + raise AttributeError( + f"LocalhostAgentSpec.config: module {module_path!r} has no attribute {attr!r}" + ) from exc + if not isinstance(value, dict): + raise TypeError( + f"LocalhostAgentSpec.config: {module_path}:{attr} resolved to {type(value).__name__}, expected dict" + ) + return value + raise TypeError(f"LocalhostAgentSpec.config must be str or dict, got {type(config).__name__}") + + +async def _close_agent(agent: Any) -> None: + for name in ("aclose", "close"): + method = getattr(agent, name, None) + if method is None: + continue + try: + result = method() + if inspect.isawaitable(result): + await result + except Exception as exc: + get_logger().warning("localhost agent close failed for %s: %s", type(agent).__name__, exc) + return + + +__all__ = ["LocalhostStage"] diff --git a/xtuner/v1/rl/agent_loop/sandbox_agent_loop/agent_in_sandbox_loop.py b/xtuner/v1/rl/agent_loop/sandbox_agent_loop/agent_in_sandbox_loop.py index a4db6e80c..43520b4b1 100644 --- a/xtuner/v1/rl/agent_loop/sandbox_agent_loop/agent_in_sandbox_loop.py +++ b/xtuner/v1/rl/agent_loop/sandbox_agent_loop/agent_in_sandbox_loop.py @@ -1,12 +1,13 @@ from __future__ import annotations -import copy import asyncio +import copy import importlib import json import traceback -from typing import Any import uuid +from typing import Any + from lagent.utils import create_object from xtuner.v1.data_proto.rl_data import RolloutState, Status @@ -14,9 +15,9 @@ from xtuner.v1.rl.rollout import RolloutController from xtuner.v1.rl.utils import create_task +from ...rollout.trace_store import get_store from ..agent_loop import AgentLoop, AgentLoopConfig from .schemas import AgentRolloutItem, RolloutStatus -from ...rollout.trace_store import get_store def _import_from_path(path: str) -> Any: @@ -52,9 +53,12 @@ class AgentInSandboxLoopConfig(AgentLoopConfig): transcript back into the standard ``RolloutState`` fields consumed by the replay buffer/trainer. """ + max_concurrent_samples: int | None = None - def build_local(self, rollout_controller: RolloutController | None = None, judger: Judger | None = None, logger=None) -> "AgentInSandboxLoop": + def build_local( + self, rollout_controller: RolloutController | None = None, judger: Judger | None = None, logger=None + ) -> AgentInSandboxLoop: return AgentInSandboxLoop( rollout_ctl=rollout_controller, hf_checkpoint=self.hf_checkpoint, @@ -116,30 +120,39 @@ async def _run_item(self, item: AgentRolloutItem) -> AgentRolloutItem: raise ValueError("AgentRolloutItem.pipeline is required.") return await runner.run(item) - async def _fill_rollout_state(self, rollout_state: RolloutState, item: AgentRolloutItem) -> None: artifacts = item.artifacts - message=json.loads(artifacts["message"]) - messages = message['policy_agent.messages'] - tools = message.get("tools", None) + trace = json.loads(artifacts["message"]) + if not isinstance(trace, list) or not trace: + raise ValueError("Agent artifacts must contain at least one trainable messages trace.") + segment = trace[-1] + if not isinstance(segment, dict) or "messages" not in segment or "tools" not in segment: + raise ValueError("Agent messages trace segment must contain messages and tools.") + messages = segment["messages"] + if not isinstance(messages, list): + raise TypeError("Agent messages trace segment.messages must be a list.") session_id = rollout_state.uid - + trace_store = get_store() - text = self.tokenizer.apply_chat_template(messages, tools=tools, tokenize=False, add_generation_prompt=False) - data = await trace_store.export_training_trace.remote(str(session_id), text[:-1]) # '\n' - - rollout_state.input_ids = data['input_ids'] - rollout_state.labels = data['labels'] + text = self.tokenizer.apply_chat_template( + messages, + tools=segment["tools"], + tokenize=False, + add_generation_prompt=False, + ) + prompt_text = text[:-1] if text.endswith("\n") else text + data = await trace_store.export_training_trace.remote(str(session_id), prompt_text) + + rollout_state.input_ids = data["input_ids"] + rollout_state.labels = data["labels"] # Agentic training consumes input_ids/labels directly. response_ids is # filled here only so rollout throughput logging can print rollout_tgs. rollout_state.response_ids = [ - token_id - for token_id, label in zip(data["input_ids"][1:], data["labels"][1:]) - if label != -100 + token_id for token_id, label in zip(data["input_ids"][1:], data["labels"][1:]) if label != -100 ] - rollout_state.logprobs = data['logprobs'] - rollout_state.routed_experts = data['routed_experts'] - rollout_state.finish_reason = 'stop' if item.status == RolloutStatus.COMPLETED else 'error' + rollout_state.logprobs = data["logprobs"] + rollout_state.routed_experts = data["routed_experts"] + rollout_state.finish_reason = "stop" if item.status == RolloutStatus.COMPLETED else "error" rollout_state.status = item.status rollout_state.reward = {"score": item.reward} if item.error is not None: diff --git a/xtuner/v1/rl/agent_loop/sandbox_agent_loop/schemas.py b/xtuner/v1/rl/agent_loop/sandbox_agent_loop/schemas.py index d4cd081a8..f179e1316 100644 --- a/xtuner/v1/rl/agent_loop/sandbox_agent_loop/schemas.py +++ b/xtuner/v1/rl/agent_loop/sandbox_agent_loop/schemas.py @@ -207,6 +207,9 @@ class AgentRolloutItem(BaseModel): artifacts: dict[str, Any] = Field(default_factory=dict) # Opaque dataset/business metadata. Runner core does not branch on keys here. metadata: dict[str, Any] = Field(default_factory=dict) + # Judger input prepared by the dataset. Host/sandbox runner core should + # pass it through instead of inferring task-specific keys from metadata. + reward_model: dict[str, Any] = Field(default_factory=dict) error: RolloutError | None = None