Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/v1/config/rl_dapo_math.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
# basic settings
experimental_name = "dapo_math"
total_epochs = 1
train_batch_size = 512
train_batch_size = int(os.environ.get("TRAIN_BATCH_SIZE", 512))
prompt_repeat_k = 16
rollout_tp_size = 1
rollout_ep_size = 1
Expand Down
2 changes: 1 addition & 1 deletion examples/v1/config/rl_dapo_math_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
# basic settings
experimental_name = "dapo_math"
total_epochs = 1
train_batch_size = 512
train_batch_size = int(os.environ.get("TRAIN_BATCH_SIZE", 512))
prompt_repeat_k = 16
rollout_tp_size = 1
rollout_ep_size = 1
Expand Down
2 changes: 1 addition & 1 deletion examples/v1/config/rl_dapo_math_async_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
# basic settings
experimental_name = "dapo_math"
total_epochs = 1
train_batch_size = 512
train_batch_size = int(os.environ.get("TRAIN_BATCH_SIZE", 512))
prompt_repeat_k = 16
rollout_tp_size = 1
rollout_ep_size = 1
Expand Down
2 changes: 1 addition & 1 deletion examples/v1/config/rl_grpo_geo3k_judge.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
total_train_steps = 45 # TODO: total_epoch
evaluate_step = 45
train_optimizer_steps = 4
train_batch_size = 1024
train_batch_size = int(os.environ.get("TRAIN_BATCH_SIZE", 1024))
prompt_repeat_k = 5
rollout_tp_size = 1
rollout_ep_size = 1
Expand Down
2 changes: 1 addition & 1 deletion examples/v1/config/rl_grpo_gsm8k_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
total_train_steps = 45
evaluate_step = 45
train_optimizer_steps = 1
train_batch_size = 64 * train_optimizer_steps
train_batch_size = int(os.environ.get("TRAIN_BATCH_SIZE", 64 * train_optimizer_steps))
prompt_repeat_k = 5
rollout_tp_size = 1
rollout_ep_size = 1
Expand Down
2 changes: 1 addition & 1 deletion examples/v1/config/rl_grpo_gsm8k_judge.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
total_train_steps = 45
evaluate_step = 45
train_optimizer_steps = 1
train_batch_size = 64 * train_optimizer_steps
train_batch_size = int(os.environ.get("TRAIN_BATCH_SIZE", 64 * train_optimizer_steps))
prompt_repeat_k = 5
rollout_tp_size = 1
rollout_ep_size = 1
Expand Down
2 changes: 1 addition & 1 deletion examples/v1/config/rl_grpo_gsm8k_with_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
total_train_steps = 45
evaluate_step = 45
train_optimizer_steps = 1
train_batch_size = 64 * train_optimizer_steps
train_batch_size = int(os.environ.get("TRAIN_BATCH_SIZE", 64 * train_optimizer_steps))
prompt_repeat_k = 5
rollout_tp_size = 1
rollout_ep_size = 1
Expand Down
2 changes: 1 addition & 1 deletion examples/v1/config/rl_multi_task_gsm8k_dapo_math.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
total_train_steps = 50
evaluate_step = 5
train_optimizer_steps = 8
train_batch_size = 128
train_batch_size = int(os.environ.get("TRAIN_BATCH_SIZE", 128))
gsm8k_task_weight = float(os.environ.get("GSM8K_TASK_WEIGHT", "1.0"))
dapo_task_weight = float(os.environ.get("DAPO_TASK_WEIGHT", "1.0"))
rollout_tp_size = 1
Expand Down
4 changes: 2 additions & 2 deletions examples/v1/config/rl_qwen3p5_vl_35B_grpo_mixdata.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def _as_list(value):
# basic settings
experimental_name = "grpo_mix_data"
total_epochs = 15
global_batch_size = 256
train_batch_size = int(os.environ.get("TRAIN_BATCH_SIZE", 256))
prompt_repeat_k = 8
rollout_tp_size = 2
rollout_ep_size = 1
Expand Down Expand Up @@ -253,7 +253,7 @@ def _as_list(value):
evaluator_config=EvaluatorConfig(compute_metric_func=None),
load_from=model_path,
total_epochs=total_epochs,
train_batch_size=global_batch_size,
train_batch_size=train_batch_size,
advantage_estimator_config=GRPOAdvantageConfig(eps=1e-8),
enable_evaluate=enable_evaluate,
enable_initial_evaluate=False,
Expand Down
21 changes: 17 additions & 4 deletions examples/v1/scripts/run_rl.sh
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,19 @@ if [ $ACCELERATOR != "GPU" ] && [ $ACCELERATOR != "NPU" ]; then
exit 1
fi
if [ "$ACCELERATOR" = "NPU" ]; then
ACCELERATOR_PER_NODE=${7:-16}
accelerator_per_node=${7:-16}
else
ACCELERATOR_PER_NODE=${7:-8}
if [ -n "${CUDA_VISIBLE_DEVICES:-}" ]; then
IFS=',' read -ra visible_devices <<< "${CUDA_VISIBLE_DEVICES}"
accelerator_per_node=${#visible_devices[@]}
else
accelerator_per_node=${7:-8}
fi
fi
export ACCELERATOR
RAY_ACCELERATOR_ARGS=()
if [ "$ACCELERATOR" = "GPU" ]; then
RAY_ACCELERATOR_ARGS=(--num-gpus="$accelerator_per_node")
fi

ulimit -n 65536 # OSError: [Errno 24] Too many open files
Expand Down Expand Up @@ -87,6 +97,8 @@ fi
# 2. Launch Ray cluster
# 根据 NODE_COUNT 分配 num_cpus, 防止内存OOM
node_count=${NODE_COUNT:-1}
expected_accelerator_count=$((node_count * accelerator_per_node))
export XTUNER_RL_NUM_WORKERS=${XTUNER_RL_NUM_WORKERS:-$expected_accelerator_count}

WORK_DIR=$(realpath "$WORK_DIR")
if [ "$RAY_RANK" -eq 0 ]; then
Expand All @@ -101,6 +113,7 @@ if [ "$RAY_RANK" -eq 0 ]; then
--dashboard-port=$RAY_DASHBOARD_PORT \
--include-dashboard=true \
--disable-usage-stats \
"${RAY_ACCELERATOR_ARGS[@]}" \
--temp-dir="/tmp/ray_log/"
else
while true; do
Expand All @@ -112,12 +125,11 @@ else
sleep 2
fi
done
ray start --address="$RAY_MASTER_ADDR:$RAY_HEAD_PORT" --block --disable-usage-stats
ray start --address="$RAY_MASTER_ADDR:$RAY_HEAD_PORT" --block --disable-usage-stats "${RAY_ACCELERATOR_ARGS[@]}"
fi

while true; do
result=$(ray status | grep ${ACCELERATOR} | cut -d ' ' -f2 | cut -d '/' -f2)
expected_accelerator_count=$((node_count * ${ACCELERATOR_PER_NODE}))
if [ "$result" = "$expected_accelerator_count.0" ]; then
break
else
Expand All @@ -133,4 +145,5 @@ LOG_FILE="${WORK_DIR}/training_log_${current_time}.txt"

python xtuner/v1/train/cli/rl.py \
--config $CONFIG_PATH \
--num-workers $XTUNER_RL_NUM_WORKERS \
2>&1 | tee -a "${WORK_DIR}/training_log_${current_time}.txt"
4 changes: 4 additions & 0 deletions xtuner/v1/rl/trainer/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -642,6 +642,10 @@ def fit(self, data_batches: list[WorkerInputItem], rollout_idx: int) -> WorkerLo
if self.rank == 0:
self.logger.info(logger_msg)

only_calc_mismatch_ratio = os.environ.get("ONLY_CALC_MISMATCH_RATIO", "0") == "1"
if only_calc_mismatch_ratio:
return worker_log_item

# compute reference logprobs
ref_logprobs_list: list[torch.Tensor] | None = None
if self._has_ref:
Expand Down
6 changes: 6 additions & 0 deletions xtuner/v1/rl/utils/ray_accelerator_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@ def model_post_init(self, __context: Any) -> None:
# NOTE: Ascend 910 has 16 NPUs per node
self.num_accelerators_per_node = 16

def validate_available_resources(self) -> None:
"""Validate resources against the current Ray cluster.

Keep this out of model_post_init so CLI overrides can adjust loaded config objects before resource checks run.
"""
assert ray.is_initialized(), "Ray must be initialized before creating AcceleratorResourcesConfig."

available_resources = ray.available_resources()
Expand Down Expand Up @@ -283,6 +288,7 @@ def build_placement_group(resources_config: AcceleratorResourcesConfig, name="tr
Returns:
PlacementGroup: The created Ray PlacementGroup.
"""
resources_config.validate_available_resources()
bundles = [
{
"CPU": resources_config.num_cpus_per_worker,
Expand Down
2 changes: 1 addition & 1 deletion xtuner/v1/train/cli/rl.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def main(
cfg = Config.fromfile(config)
if work_dir is not None:
cfg.trainer.work_dir = work_dir
if num_workers is not None:
if num_workers is not None and hasattr(cfg.trainer, "resources"):
cfg.trainer.resources.num_workers = num_workers
trainer = cfg.trainer.build()
trainer.fit()
Expand Down
15 changes: 14 additions & 1 deletion xtuner/v1/utils/track_rl_mem.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,16 @@ def _get_current_node_id() -> str:
return str(node_id)


def _get_current_node_ray_gpu_resources(current_node_id: str) -> float | None:
for node in ray.nodes():
node_id = _get_actor_value(node, "NodeID", "node_id", "NodeId")
if str(node_id) != current_node_id:
continue
resources = node.get("Resources") or node.get("resources") or {}
return float(resources.get("GPU", 0))
return None


def _get_actor_value(actor_info, *keys):
for key in keys:
value = actor_info.get(key)
Expand Down Expand Up @@ -174,7 +184,10 @@ def monitor_actor_memory(work_dir: str, interval: int = 60):
finally:
pynvml.nvmlShutdown()

print(f"当前节点 GPU 数量: {local_gpus}")
local_ray_gpus = _get_current_node_ray_gpu_resources(current_node_id)
print(f"当前节点物理 GPU 数量(NVML): {local_gpus}")
if local_ray_gpus is not None:
print(f"当前节点 Ray 配置 GPU resource 数量: {local_ray_gpus:g}")
tb_writer_list = [TensorboardWriter(log_dir=f"{work_dir}/tb/{rank}") for rank in range(max(local_gpus, 1))]

count = 0
Expand Down
Loading