diff --git a/examples/v1/config/rl_dapo_math.py b/examples/v1/config/rl_dapo_math.py index 85fc22283..7c4369b1b 100644 --- a/examples/v1/config/rl_dapo_math.py +++ b/examples/v1/config/rl_dapo_math.py @@ -28,7 +28,7 @@ # basic settings experimental_name = "dapo_math" total_epochs = 1 -train_batch_size = 512 +train_batch_size = int(os.environ.get("TRAIN_BATCH_SIZE", 512)) prompt_repeat_k = 16 rollout_tp_size = 1 rollout_ep_size = 1 diff --git a/examples/v1/config/rl_dapo_math_async.py b/examples/v1/config/rl_dapo_math_async.py index f62b3712f..129763ef7 100644 --- a/examples/v1/config/rl_dapo_math_async.py +++ b/examples/v1/config/rl_dapo_math_async.py @@ -28,7 +28,7 @@ # basic settings experimental_name = "dapo_math" total_epochs = 1 -train_batch_size = 512 +train_batch_size = int(os.environ.get("TRAIN_BATCH_SIZE", 512)) prompt_repeat_k = 16 rollout_tp_size = 1 rollout_ep_size = 1 diff --git a/examples/v1/config/rl_dapo_math_async_filter.py b/examples/v1/config/rl_dapo_math_async_filter.py index d43ee9e6a..6b99b9a1b 100644 --- a/examples/v1/config/rl_dapo_math_async_filter.py +++ b/examples/v1/config/rl_dapo_math_async_filter.py @@ -28,7 +28,7 @@ # basic settings experimental_name = "dapo_math" total_epochs = 1 -train_batch_size = 512 +train_batch_size = int(os.environ.get("TRAIN_BATCH_SIZE", 512)) prompt_repeat_k = 16 rollout_tp_size = 1 rollout_ep_size = 1 diff --git a/examples/v1/config/rl_grpo_geo3k_judge.py b/examples/v1/config/rl_grpo_geo3k_judge.py index 5f37d6d93..9c01d901a 100644 --- a/examples/v1/config/rl_grpo_geo3k_judge.py +++ b/examples/v1/config/rl_grpo_geo3k_judge.py @@ -37,7 +37,7 @@ total_train_steps = 45 # TODO: total_epoch evaluate_step = 45 train_optimizer_steps = 4 -train_batch_size = 1024 +train_batch_size = int(os.environ.get("TRAIN_BATCH_SIZE", 1024)) prompt_repeat_k = 5 rollout_tp_size = 1 rollout_ep_size = 1 diff --git a/examples/v1/config/rl_grpo_gsm8k_async.py b/examples/v1/config/rl_grpo_gsm8k_async.py index b804932c3..37f76ad9b 100644 --- a/examples/v1/config/rl_grpo_gsm8k_async.py +++ b/examples/v1/config/rl_grpo_gsm8k_async.py @@ -37,7 +37,7 @@ total_train_steps = 45 evaluate_step = 45 train_optimizer_steps = 1 -train_batch_size = 64 * train_optimizer_steps +train_batch_size = int(os.environ.get("TRAIN_BATCH_SIZE", 64 * train_optimizer_steps)) prompt_repeat_k = 5 rollout_tp_size = 1 rollout_ep_size = 1 diff --git a/examples/v1/config/rl_grpo_gsm8k_judge.py b/examples/v1/config/rl_grpo_gsm8k_judge.py index 021344f6b..c7c3255f5 100644 --- a/examples/v1/config/rl_grpo_gsm8k_judge.py +++ b/examples/v1/config/rl_grpo_gsm8k_judge.py @@ -37,7 +37,7 @@ total_train_steps = 45 evaluate_step = 45 train_optimizer_steps = 1 -train_batch_size = 64 * train_optimizer_steps +train_batch_size = int(os.environ.get("TRAIN_BATCH_SIZE", 64 * train_optimizer_steps)) prompt_repeat_k = 5 rollout_tp_size = 1 rollout_ep_size = 1 diff --git a/examples/v1/config/rl_grpo_gsm8k_with_tool.py b/examples/v1/config/rl_grpo_gsm8k_with_tool.py index 779716442..f535a76a4 100644 --- a/examples/v1/config/rl_grpo_gsm8k_with_tool.py +++ b/examples/v1/config/rl_grpo_gsm8k_with_tool.py @@ -37,7 +37,7 @@ total_train_steps = 45 evaluate_step = 45 train_optimizer_steps = 1 -train_batch_size = 64 * train_optimizer_steps +train_batch_size = int(os.environ.get("TRAIN_BATCH_SIZE", 64 * train_optimizer_steps)) prompt_repeat_k = 5 rollout_tp_size = 1 rollout_ep_size = 1 diff --git a/examples/v1/config/rl_multi_task_gsm8k_dapo_math.py b/examples/v1/config/rl_multi_task_gsm8k_dapo_math.py index dc4eee36d..70fc2d183 100644 --- a/examples/v1/config/rl_multi_task_gsm8k_dapo_math.py +++ b/examples/v1/config/rl_multi_task_gsm8k_dapo_math.py @@ -58,7 +58,7 @@ total_train_steps = 50 evaluate_step = 5 train_optimizer_steps = 8 -train_batch_size = 128 +train_batch_size = int(os.environ.get("TRAIN_BATCH_SIZE", 128)) gsm8k_task_weight = float(os.environ.get("GSM8K_TASK_WEIGHT", "1.0")) dapo_task_weight = float(os.environ.get("DAPO_TASK_WEIGHT", "1.0")) rollout_tp_size = 1 diff --git a/examples/v1/config/rl_qwen3p5_vl_35B_grpo_mixdata.py b/examples/v1/config/rl_qwen3p5_vl_35B_grpo_mixdata.py index 1ed66bbb2..a55596218 100644 --- a/examples/v1/config/rl_qwen3p5_vl_35B_grpo_mixdata.py +++ b/examples/v1/config/rl_qwen3p5_vl_35B_grpo_mixdata.py @@ -40,7 +40,7 @@ def _as_list(value): # basic settings experimental_name = "grpo_mix_data" total_epochs = 15 -global_batch_size = 256 +train_batch_size = int(os.environ.get("TRAIN_BATCH_SIZE", 256)) prompt_repeat_k = 8 rollout_tp_size = 2 rollout_ep_size = 1 @@ -253,7 +253,7 @@ def _as_list(value): evaluator_config=EvaluatorConfig(compute_metric_func=None), load_from=model_path, total_epochs=total_epochs, - train_batch_size=global_batch_size, + train_batch_size=train_batch_size, advantage_estimator_config=GRPOAdvantageConfig(eps=1e-8), enable_evaluate=enable_evaluate, enable_initial_evaluate=False, diff --git a/examples/v1/scripts/run_rl.sh b/examples/v1/scripts/run_rl.sh index 53219cbe9..1df15d578 100644 --- a/examples/v1/scripts/run_rl.sh +++ b/examples/v1/scripts/run_rl.sh @@ -18,9 +18,19 @@ if [ $ACCELERATOR != "GPU" ] && [ $ACCELERATOR != "NPU" ]; then exit 1 fi if [ "$ACCELERATOR" = "NPU" ]; then - ACCELERATOR_PER_NODE=${7:-16} + accelerator_per_node=${7:-16} else - ACCELERATOR_PER_NODE=${7:-8} + if [ -n "${CUDA_VISIBLE_DEVICES:-}" ]; then + IFS=',' read -ra visible_devices <<< "${CUDA_VISIBLE_DEVICES}" + accelerator_per_node=${#visible_devices[@]} + else + accelerator_per_node=${7:-8} + fi +fi +export ACCELERATOR +RAY_ACCELERATOR_ARGS=() +if [ "$ACCELERATOR" = "GPU" ]; then + RAY_ACCELERATOR_ARGS=(--num-gpus="$accelerator_per_node") fi ulimit -n 65536 # OSError: [Errno 24] Too many open files @@ -87,6 +97,8 @@ fi # 2. Launch Ray cluster # 根据 NODE_COUNT 分配 num_cpus, 防止内存OOM node_count=${NODE_COUNT:-1} +expected_accelerator_count=$((node_count * accelerator_per_node)) +export XTUNER_RL_NUM_WORKERS=${XTUNER_RL_NUM_WORKERS:-$expected_accelerator_count} WORK_DIR=$(realpath "$WORK_DIR") if [ "$RAY_RANK" -eq 0 ]; then @@ -101,6 +113,7 @@ if [ "$RAY_RANK" -eq 0 ]; then --dashboard-port=$RAY_DASHBOARD_PORT \ --include-dashboard=true \ --disable-usage-stats \ + "${RAY_ACCELERATOR_ARGS[@]}" \ --temp-dir="/tmp/ray_log/" else while true; do @@ -112,12 +125,11 @@ else sleep 2 fi done - ray start --address="$RAY_MASTER_ADDR:$RAY_HEAD_PORT" --block --disable-usage-stats + ray start --address="$RAY_MASTER_ADDR:$RAY_HEAD_PORT" --block --disable-usage-stats "${RAY_ACCELERATOR_ARGS[@]}" fi while true; do result=$(ray status | grep ${ACCELERATOR} | cut -d ' ' -f2 | cut -d '/' -f2) - expected_accelerator_count=$((node_count * ${ACCELERATOR_PER_NODE})) if [ "$result" = "$expected_accelerator_count.0" ]; then break else @@ -133,4 +145,5 @@ LOG_FILE="${WORK_DIR}/training_log_${current_time}.txt" python xtuner/v1/train/cli/rl.py \ --config $CONFIG_PATH \ + --num-workers $XTUNER_RL_NUM_WORKERS \ 2>&1 | tee -a "${WORK_DIR}/training_log_${current_time}.txt" diff --git a/xtuner/v1/rl/trainer/worker.py b/xtuner/v1/rl/trainer/worker.py index bbfbd2a8c..c073610a3 100644 --- a/xtuner/v1/rl/trainer/worker.py +++ b/xtuner/v1/rl/trainer/worker.py @@ -642,6 +642,10 @@ def fit(self, data_batches: list[WorkerInputItem], rollout_idx: int) -> WorkerLo if self.rank == 0: self.logger.info(logger_msg) + only_calc_mismatch_ratio = os.environ.get("ONLY_CALC_MISMATCH_RATIO", "0") == "1" + if only_calc_mismatch_ratio: + return worker_log_item + # compute reference logprobs ref_logprobs_list: list[torch.Tensor] | None = None if self._has_ref: diff --git a/xtuner/v1/rl/utils/ray_accelerator_worker.py b/xtuner/v1/rl/utils/ray_accelerator_worker.py index 2bfd9e9e5..a7d52d981 100644 --- a/xtuner/v1/rl/utils/ray_accelerator_worker.py +++ b/xtuner/v1/rl/utils/ray_accelerator_worker.py @@ -91,6 +91,11 @@ def model_post_init(self, __context: Any) -> None: # NOTE: Ascend 910 has 16 NPUs per node self.num_accelerators_per_node = 16 + def validate_available_resources(self) -> None: + """Validate resources against the current Ray cluster. + + Keep this out of model_post_init so CLI overrides can adjust loaded config objects before resource checks run. + """ assert ray.is_initialized(), "Ray must be initialized before creating AcceleratorResourcesConfig." available_resources = ray.available_resources() @@ -283,6 +288,7 @@ def build_placement_group(resources_config: AcceleratorResourcesConfig, name="tr Returns: PlacementGroup: The created Ray PlacementGroup. """ + resources_config.validate_available_resources() bundles = [ { "CPU": resources_config.num_cpus_per_worker, diff --git a/xtuner/v1/train/cli/rl.py b/xtuner/v1/train/cli/rl.py index 9b7dfdfd8..5fd57843e 100644 --- a/xtuner/v1/train/cli/rl.py +++ b/xtuner/v1/train/cli/rl.py @@ -46,7 +46,7 @@ def main( cfg = Config.fromfile(config) if work_dir is not None: cfg.trainer.work_dir = work_dir - if num_workers is not None: + if num_workers is not None and hasattr(cfg.trainer, "resources"): cfg.trainer.resources.num_workers = num_workers trainer = cfg.trainer.build() trainer.fit() diff --git a/xtuner/v1/utils/track_rl_mem.py b/xtuner/v1/utils/track_rl_mem.py index 230a906fb..d09edb83a 100644 --- a/xtuner/v1/utils/track_rl_mem.py +++ b/xtuner/v1/utils/track_rl_mem.py @@ -24,6 +24,16 @@ def _get_current_node_id() -> str: return str(node_id) +def _get_current_node_ray_gpu_resources(current_node_id: str) -> float | None: + for node in ray.nodes(): + node_id = _get_actor_value(node, "NodeID", "node_id", "NodeId") + if str(node_id) != current_node_id: + continue + resources = node.get("Resources") or node.get("resources") or {} + return float(resources.get("GPU", 0)) + return None + + def _get_actor_value(actor_info, *keys): for key in keys: value = actor_info.get(key) @@ -174,7 +184,10 @@ def monitor_actor_memory(work_dir: str, interval: int = 60): finally: pynvml.nvmlShutdown() - print(f"当前节点 GPU 数量: {local_gpus}") + local_ray_gpus = _get_current_node_ray_gpu_resources(current_node_id) + print(f"当前节点物理 GPU 数量(NVML): {local_gpus}") + if local_ray_gpus is not None: + print(f"当前节点 Ray 配置 GPU resource 数量: {local_ray_gpus:g}") tb_writer_list = [TensorboardWriter(log_dir=f"{work_dir}/tb/{rank}") for rank in range(max(local_gpus, 1))] count = 0