Update offline caching dev script#1531
Update offline caching dev script#1531nil0x9 wants to merge 2 commits intoInternLM:offline-cachefrom
Conversation
There was a problem hiding this comment.
Pull request overview
This PR updates the offline caching workflow by changing how Ray-based caching tasks are launched and how per-task caches are combined, and adds small dataset-side compatibility tweaks for message formats.
Changes:
- Disable CPU affinity binding in
JsonlDatasettokenization workers (to support the offline caching flow). - Accept an additional delivery format for FTDP tokenization (
{"messages": ...}) and validate the normalized type. - Rework
.dev_scripts/offline_cache.pyto run multiple Ray tasks per node, isolate per-task cache dirs, and merge worker caches afterward.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 5 comments.
| File | Description |
|---|---|
xtuner/v1/datasets/jsonl.py |
Disables CPU affinity setting inside tokenization workers. |
xtuner/v1/datasets/ftdp.py |
Adds support for dict payloads with a messages key + type validation. |
.dev_scripts/offline_cache.py |
Adds multi-task scheduling, per-worker cache directories, and a cache merge step with logging. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| # For offline caching script to work, cpu affinity has to be turned off. | ||
| # try: | ||
| # os.sched_setaffinity(os.getpid(), cpu_ids) | ||
| # except OSError as e: | ||
| # logger.debug(f"Failed to set CPU affinity: {e}") |
There was a problem hiding this comment.
CPU affinity is now disabled by commenting out the code, which changes runtime behavior for all JsonlDataset tokenization (not just the offline caching script) and can impact performance/CPU scheduling. Consider restoring the affinity logic and gating it behind an explicit flag (e.g., an env var like XTUNER_DISABLE_CPU_AFFINITY=1 or a function argument), and have the offline caching script set that flag instead of modifying core dataset behavior.
| # Copy each hash directory and merge metadata | ||
| for hash_key, hash_meta in worker_meta.items(): | ||
| src_hash_dir = worker_dir / hash_key | ||
| dst_hash_dir = base_cache_dir / hash_key | ||
|
|
||
| if not src_hash_dir.exists(): | ||
| logger.warning(f"Hash directory {src_hash_dir} does not exist, skipping") | ||
| continue | ||
|
|
||
| if dst_hash_dir.exists(): | ||
| logger.warning(f"Hash directory {dst_hash_dir} already exists, skipping copy") | ||
| else: | ||
| shutil.move(src_hash_dir, dst_hash_dir) | ||
| logger.debug(f"Moved {src_hash_dir} -> {dst_hash_dir}") | ||
|
|
||
| # Merge metadata (assuming no conflicts, otherwise we'd need merge logic) | ||
| if hash_key in merged_meta: | ||
| logger.warning(f"Hash key {hash_key} already exists in merged metadata, skipping") | ||
| else: | ||
| merged_meta[hash_key] = hash_meta |
There was a problem hiding this comment.
merge_worker_caches assumes worker cache metadata has no key conflicts, but .xpuyu-cache-meta.json contains shared keys (e.g., file-hash entries and a top-level tags section). With the current logic, later workers’ entries will be skipped and their cached data can be lost (and tags will only come from the first worker). Please implement a real merge for this meta structure (union lists for offsets/num_tokens, deep-merge tags), and also handle the case where the destination hash dir already exists by merging subdirectories rather than skipping.
| dataset_config = config.dataset_config | ||
| tokenizer_path = config.trainer.tokenizer_path | ||
| base_cache_dir = Path(dataset_config[0]["dataset"].cache_dir) | ||
|
|
There was a problem hiding this comment.
base_cache_dir = Path(dataset_config[0]["dataset"].cache_dir) assumes dataset_config is non-empty and that every dataset shares the same non-null cache_dir. If the config is empty or uses multiple cache directories, the merge step will either crash or miss caches. Add validation (e.g., assert non-empty + all cache_dir values are set and identical, or perform per-cache-dir merges).
| worker = ray.remote( | ||
| num_cpus=cpus_per_task // tasks_per_node, | ||
| runtime_env={ | ||
| "env_vars": { | ||
| "XTUNER_TOKENIZE_WORKERS": str(cpus_per_task // tasks_per_node), |
There was a problem hiding this comment.
num_cpus=cpus_per_task // tasks_per_node (and the matching XTUNER_TOKENIZE_WORKERS value) can become 0 when tasks_per_node > cpus_per_task, which can lead to oversubscription or unexpected scheduling behavior. Please validate inputs and ensure at least 1 CPU/tokenize worker per Ray task (or compute an appropriate split with a clear error message when the requested layout is impossible).
| worker = ray.remote( | |
| num_cpus=cpus_per_task // tasks_per_node, | |
| runtime_env={ | |
| "env_vars": { | |
| "XTUNER_TOKENIZE_WORKERS": str(cpus_per_task // tasks_per_node), | |
| num_cpus_per_task = cpus_per_task // tasks_per_node | |
| if num_cpus_per_task < 1: | |
| raise ValueError( | |
| f"Invalid resource configuration: cpus_per_task ({cpus_per_task}) " | |
| f"must be >= tasks_per_node ({tasks_per_node}) to allocate at least " | |
| "1 CPU per Ray task." | |
| ) | |
| worker = ray.remote( | |
| num_cpus=num_cpus_per_task, | |
| runtime_env={ | |
| "env_vars": { | |
| "XTUNER_TOKENIZE_WORKERS": str(num_cpus_per_task), |
| job = start_ray(nnodes=nnodes, cpus_per_task=cpus_per_task, memory_per_task=memory_per_task) | ||
| try: | ||
| print(f"ray clsuter job id {job.job_id}") | ||
| logger.info(f"ray clsuter job id {job.job_id}") |
There was a problem hiding this comment.
Typo in log message: "clsuter" -> "cluster".
| logger.info(f"ray clsuter job id {job.job_id}") | |
| logger.info(f"ray cluster job id {job.job_id}") |
No description provided.