Skip to content

Update offline caching dev script#1531

Open
nil0x9 wants to merge 2 commits intoInternLM:offline-cachefrom
nil0x9:offline-cache
Open

Update offline caching dev script#1531
nil0x9 wants to merge 2 commits intoInternLM:offline-cachefrom
nil0x9:offline-cache

Conversation

@nil0x9
Copy link
Contributor

@nil0x9 nil0x9 commented Mar 4, 2026

No description provided.

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR updates the offline caching workflow by changing how Ray-based caching tasks are launched and how per-task caches are combined, and adds small dataset-side compatibility tweaks for message formats.

Changes:

  • Disable CPU affinity binding in JsonlDataset tokenization workers (to support the offline caching flow).
  • Accept an additional delivery format for FTDP tokenization ({"messages": ...}) and validate the normalized type.
  • Rework .dev_scripts/offline_cache.py to run multiple Ray tasks per node, isolate per-task cache dirs, and merge worker caches afterward.

Reviewed changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 5 comments.

File Description
xtuner/v1/datasets/jsonl.py Disables CPU affinity setting inside tokenization workers.
xtuner/v1/datasets/ftdp.py Adds support for dict payloads with a messages key + type validation.
.dev_scripts/offline_cache.py Adds multi-task scheduling, per-worker cache directories, and a cache merge step with logging.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +73 to +77
# For offline caching script to work, cpu affinity has to be turned off.
# try:
# os.sched_setaffinity(os.getpid(), cpu_ids)
# except OSError as e:
# logger.debug(f"Failed to set CPU affinity: {e}")
Copy link

Copilot AI Mar 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CPU affinity is now disabled by commenting out the code, which changes runtime behavior for all JsonlDataset tokenization (not just the offline caching script) and can impact performance/CPU scheduling. Consider restoring the affinity logic and gating it behind an explicit flag (e.g., an env var like XTUNER_DISABLE_CPU_AFFINITY=1 or a function argument), and have the offline caching script set that flag instead of modifying core dataset behavior.

Copilot uses AI. Check for mistakes.
Comment on lines +84 to +103
# Copy each hash directory and merge metadata
for hash_key, hash_meta in worker_meta.items():
src_hash_dir = worker_dir / hash_key
dst_hash_dir = base_cache_dir / hash_key

if not src_hash_dir.exists():
logger.warning(f"Hash directory {src_hash_dir} does not exist, skipping")
continue

if dst_hash_dir.exists():
logger.warning(f"Hash directory {dst_hash_dir} already exists, skipping copy")
else:
shutil.move(src_hash_dir, dst_hash_dir)
logger.debug(f"Moved {src_hash_dir} -> {dst_hash_dir}")

# Merge metadata (assuming no conflicts, otherwise we'd need merge logic)
if hash_key in merged_meta:
logger.warning(f"Hash key {hash_key} already exists in merged metadata, skipping")
else:
merged_meta[hash_key] = hash_meta
Copy link

Copilot AI Mar 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

merge_worker_caches assumes worker cache metadata has no key conflicts, but .xpuyu-cache-meta.json contains shared keys (e.g., file-hash entries and a top-level tags section). With the current logic, later workers’ entries will be skipped and their cached data can be lost (and tags will only come from the first worker). Please implement a real merge for this meta structure (union lists for offsets/num_tokens, deep-merge tags), and also handle the case where the destination hash dir already exists by merging subdirectories rather than skipping.

Copilot uses AI. Check for mistakes.
Comment on lines 158 to 161
dataset_config = config.dataset_config
tokenizer_path = config.trainer.tokenizer_path
base_cache_dir = Path(dataset_config[0]["dataset"].cache_dir)

Copy link

Copilot AI Mar 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

base_cache_dir = Path(dataset_config[0]["dataset"].cache_dir) assumes dataset_config is non-empty and that every dataset shares the same non-null cache_dir. If the config is empty or uses multiple cache directories, the merge step will either crash or miss caches. Add validation (e.g., assert non-empty + all cache_dir values are set and identical, or perform per-cache-dir merges).

Copilot uses AI. Check for mistakes.
Comment on lines +163 to +167
worker = ray.remote(
num_cpus=cpus_per_task // tasks_per_node,
runtime_env={
"env_vars": {
"XTUNER_TOKENIZE_WORKERS": str(cpus_per_task // tasks_per_node),
Copy link

Copilot AI Mar 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

num_cpus=cpus_per_task // tasks_per_node (and the matching XTUNER_TOKENIZE_WORKERS value) can become 0 when tasks_per_node > cpus_per_task, which can lead to oversubscription or unexpected scheduling behavior. Please validate inputs and ensure at least 1 CPU/tokenize worker per Ray task (or compute an appropriate split with a clear error message when the requested layout is impossible).

Suggested change
worker = ray.remote(
num_cpus=cpus_per_task // tasks_per_node,
runtime_env={
"env_vars": {
"XTUNER_TOKENIZE_WORKERS": str(cpus_per_task // tasks_per_node),
num_cpus_per_task = cpus_per_task // tasks_per_node
if num_cpus_per_task < 1:
raise ValueError(
f"Invalid resource configuration: cpus_per_task ({cpus_per_task}) "
f"must be >= tasks_per_node ({tasks_per_node}) to allocate at least "
"1 CPU per Ray task."
)
worker = ray.remote(
num_cpus=num_cpus_per_task,
runtime_env={
"env_vars": {
"XTUNER_TOKENIZE_WORKERS": str(num_cpus_per_task),

Copilot uses AI. Check for mistakes.
job = start_ray(nnodes=nnodes, cpus_per_task=cpus_per_task, memory_per_task=memory_per_task)
try:
print(f"ray clsuter job id {job.job_id}")
logger.info(f"ray clsuter job id {job.job_id}")
Copy link

Copilot AI Mar 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo in log message: "clsuter" -> "cluster".

Suggested change
logger.info(f"ray clsuter job id {job.job_id}")
logger.info(f"ray cluster job id {job.job_id}")

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants