Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions catalog/plugins.json
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@
"specifier": ">=3.10"
},
"data_designer": {
"requirement": "data-designer>=0.5.7",
"specifier": ">=0.5.7",
"requirement": "data-designer>=0.6.1",
"specifier": ">=0.6.1",
"marker": null
}
},
Expand Down
4 changes: 2 additions & 2 deletions docs/catalog-schema.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ The top-level document must contain `schema_version` and `packages`:
"specifier": ">=3.10"
},
"data_designer": {
"requirement": "data-designer>=0.5.7",
"specifier": ">=0.5.7",
"requirement": "data-designer>=0.6.1",
"specifier": ">=0.6.1",
"marker": null
}
},
Expand Down
4 changes: 2 additions & 2 deletions docs/catalogs.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ after installation.
"specifier": ">=3.10"
},
"data_designer": {
"requirement": "data-designer>=0.5.7",
"specifier": ">=0.5.7",
"requirement": "data-designer>=0.6.1",
"specifier": ">=0.6.1",
"marker": null
}
},
Expand Down
41 changes: 34 additions & 7 deletions plugins/data-designer-retrieval-sdg/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,32 @@ via `[project.entry-points."data_designer.plugins"]`:
Both are registered automatically through Python entry points when the
package is installed (see [Installation](#installation)).

## Native async (`DATA_DESIGNER_ASYNC_ENGINE=1`)
## Native async and resumable generation

`embedding-dedup` implements `agenerate()` directly on top of
`model.agenerate_text_embeddings`, so the column participates in
DataDesigner's async cell-level scheduler whenever the env var is set:
DataDesigner's async cell-level scheduler.

The `generate` command uses DataDesigner's native resumable generation.
Use a stable `--artifact-path`, `--dataset-name`, and `--buffer-size`, then
resume an interrupted run with `--resume always`:

```bash
export DATA_DESIGNER_ASYNC_ENGINE=1
data-designer-retrieval-sdg generate ...
data-designer-retrieval-sdg generate \
--input-dir ./my_documents \
--output-dir ./generated_output \
--dataset-name my_retrieval_run \
--buffer-size 200 \
--resume always
```

The async engine requires Python 3.11+; without the env var the package
runs on Python 3.10+ via the framework's sync bridge.
Use `--resume if_possible` to resume only when the saved config matches and
otherwise start a fresh run.

`--buffer-size` controls DataDesigner's checkpoint/write granularity and must
match across resumed runs. In DataDesigner 0.6.1, `create()` still profiles the
completed dataset before returning, so `--buffer-size` is not a hard cap on
final peak memory for very large runs.

## Installation

Expand Down Expand Up @@ -91,16 +104,30 @@ uv run data-designer-retrieval-sdg generate --help
data-designer-retrieval-sdg generate \
--input-dir ./my_documents \
--output-dir ./generated_output \
--dataset-name my_retrieval_run \
--buffer-size 200 \
--resume if_possible \
--num-pairs 7
```

Generation writes DataDesigner artifacts under `--artifact-path` and exports a
single JSONL file to `--output-dir`.

### Convert to training format

```bash
data-designer-retrieval-sdg convert ./generated_output \
data-designer-retrieval-sdg convert ./generated_output/my_retrieval_run.jsonl \
--corpus-id my_corpus
```

Legacy `generated_batch*.json` directories remain supported by `convert`, but
`generate` no longer writes per-batch JSON files. The old manual restart flags
`--batch-size`, `--start-batch-index`, and `--end-batch-index` were removed
because DataDesigner now owns checkpointing through `--buffer-size` and
`--resume`. For very large corpora, keep input partitions sized for
DataDesigner's final profiling step until DataDesigner exposes a no-materialize
create/export path.

### Use as a library

```python
Expand Down
2 changes: 1 addition & 1 deletion plugins/data-designer-retrieval-sdg/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ version = "0.1.0"
description = "Retriever SDG toolkit: registers the embedding-dedup column generator and document-chunker seed reader, plus a multi-step QA generation pipeline, CLI, and Automodel-compatible data conversion"
requires-python = ">=3.10"
dependencies = [
"data-designer>=0.5.7",
"data-designer>=0.6.1",
"nltk>=3.9.2",
"pyyaml>=6.0",
"pyarrow>=14.0",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,9 @@
- ``generate`` -- run the full SDG pipeline on a directory of text files
- ``convert`` -- convert raw SDG output to Automodel-compatible formats

The ``generate`` subcommand drives a per-batch loop so each batch's output
is checkpointed to its own JSON file (resumable across crashes). The
batching wraps DataDesigner's native ``IndexRange`` selection strategy
applied to a :class:`DocumentChunkerSeedSource`; the framework owns
discovery, chunking, and async cell scheduling (when
``DATA_DESIGNER_ASYNC_ENGINE=1`` is set).
The ``generate`` subcommand runs the full pipeline through DataDesigner's
native resumable generation support. The framework owns discovery, chunking,
checkpointing, and async cell scheduling.
"""

from __future__ import annotations
Expand All @@ -26,6 +23,7 @@
import data_designer.config as dd
from data_designer.engine.resources.seed_reader import SeedReaderError
from data_designer.engine.secret_resolver import PlaintextResolver
from data_designer.engine.storage.artifact_storage import ResumeMode
from data_designer.interface import DataDesigner
from data_designer.logging import LoggerConfig, LoggingConfig, OutputConfig, configure_logging

Expand Down Expand Up @@ -67,6 +65,55 @@ def _count_seed_records(seed_source: DocumentChunkerSeedSource) -> int:
return reader.get_seed_dataset_size()


def _path_is_relative_to(path: Path, root: Path) -> bool:
"""Return whether *path* is contained by *root* after resolution."""
try:
path.relative_to(root)
except ValueError:
return False
return True


def _validate_dataset_name(dataset_name: str, artifact_path: Path) -> str:
"""Validate a DataDesigner dataset name before it is used as an artifact path segment.

Args:
dataset_name: Requested dataset name.
artifact_path: DataDesigner artifact root.

Returns:
The validated dataset name.

Raises:
ValueError: If the dataset name is empty, unsafe, or escapes the artifact root.
"""
if not dataset_name:
raise ValueError("--dataset-name must not be empty")
if dataset_name in {".", ".."}:
raise ValueError("--dataset-name must be a real path segment, not '.' or '..'")
if any(ord(char) < 32 or ord(char) == 127 for char in dataset_name):
raise ValueError("--dataset-name must not contain control characters")
if any(separator in dataset_name for separator in ("/", "\\")):
raise ValueError("--dataset-name must be a single path segment without path separators")

dataset_path = Path(dataset_name)
if dataset_path.is_absolute() or len(dataset_path.parts) != 1:
raise ValueError("--dataset-name must be a single relative path segment")

artifact_root = artifact_path.resolve()
resolved_dataset_path = (artifact_root / dataset_name).resolve()
if resolved_dataset_path == artifact_root or not _path_is_relative_to(resolved_dataset_path, artifact_root):
raise ValueError("--dataset-name must resolve under --artifact-path")

return dataset_name


def _resolve_dataset_name(input_dir: Path, artifact_path: Path, dataset_name: str | None) -> str:
"""Return the explicit or default dataset name after safety validation."""
resolved_name = dataset_name if dataset_name is not None else input_dir.name or "retrieval_sdg"
return _validate_dataset_name(resolved_name, artifact_path)


def _add_generate_parser(subparsers: argparse._SubParsersAction) -> None:
"""Register the ``generate`` subcommand."""
p = subparsers.add_parser(
Expand Down Expand Up @@ -98,9 +145,15 @@ def _add_generate_parser(subparsers: argparse._SubParsersAction) -> None:
p.add_argument("--similarity-threshold", type=float, default=0.9, help="Cosine threshold for QA-pair dedup")
p.add_argument("--preview", action="store_true", help="Preview without full generation")
p.add_argument("--artifact-path", type=Path, default=Path("./artifacts"), help="DD artifact path")
p.add_argument("--batch-size", type=int, default=200, help="Records per batch")
p.add_argument("--start-batch-index", type=int, default=0, help="Batch index to start from")
p.add_argument("--end-batch-index", type=int, default=-1, help="Batch index to end at (exclusive)")
p.add_argument("--dataset-name", default=None, help="Stable DD dataset name for artifacts and resume")
p.add_argument("--buffer-size", type=int, default=200, help="DataDesigner checkpoint buffer size")
p.add_argument(
"--resume",
"-r",
choices=[mode.value for mode in ResumeMode],
default=ResumeMode.NEVER.value,
help="Resume behavior for interrupted generation runs",
)

g = p.add_argument_group("multi-document bundling")
g.add_argument("--multi-doc", action="store_true", help="Enable multi-doc bundling")
Expand Down Expand Up @@ -158,6 +211,12 @@ def _run_generate(args: argparse.Namespace) -> None:
row_type = "bundles" if args.multi_doc else "text files"
print(f"Discovered {total_records} {row_type} under {args.input_dir}")

try:
args.dataset_name = _resolve_dataset_name(args.input_dir, args.artifact_path, args.dataset_name)
except ValueError as exc:
print(f"Error: {exc}", file=sys.stderr)
sys.exit(2)

model_providers, custom_providers = build_model_providers(
custom_provider_endpoint=args.custom_provider_endpoint,
custom_provider_name=args.custom_provider_name,
Expand All @@ -167,34 +226,22 @@ def _run_generate(args: argparse.Namespace) -> None:
)

data_designer = DataDesigner(artifact_path=args.artifact_path, model_providers=model_providers)
data_designer.set_run_config(dd.RunConfig(disable_early_shutdown=True))
data_designer.set_run_config(dd.RunConfig(disable_early_shutdown=True, buffer_size=args.buffer_size))

args.output_dir.mkdir(parents=True, exist_ok=True)

num_batches = (total_records + args.batch_size - 1) // args.batch_size
actual_end_batch = num_batches if args.end_batch_index == -1 else min(args.end_batch_index, num_batches)

pipeline_kwargs = _pipeline_kwargs(args)
_print_model_config(args, custom_providers)

if args.preview:
_run_preview(data_designer, seed_source, total_records, args, pipeline_kwargs)
return

_run_batches(
data_designer,
seed_source,
total_records,
num_batches,
args.start_batch_index,
actual_end_batch,
args,
pipeline_kwargs,
)
_run_create(data_designer, seed_source, total_records, args, pipeline_kwargs)


def _pipeline_kwargs(args: argparse.Namespace) -> dict:
"""Collect pipeline-builder keyword arguments shared between preview and batch runs."""
"""Collect pipeline-builder keyword arguments shared between preview and create runs."""
return {
"max_artifacts_per_type": args.max_artifacts_per_type,
"num_pairs": args.num_pairs,
Expand Down Expand Up @@ -238,7 +285,7 @@ def _run_preview(
config_builder = build_qa_generation_pipeline(
seed_source=seed_source,
start_index=0,
end_index=min(args.batch_size - 1, total_records - 1),
end_index=min(args.buffer_size - 1, total_records - 1),
**pipeline_kwargs,
)
print("\nPreviewing generation...")
Expand All @@ -249,51 +296,40 @@ def _run_preview(
logger.warning("Preview error: %s", e)


def _run_batches(
def _run_create(
data_designer: DataDesigner,
seed_source: DocumentChunkerSeedSource,
total_records: int,
num_batches: int,
start_batch: int,
end_batch: int,
args: argparse.Namespace,
pipeline_kwargs: dict,
) -> None:
"""Process the pipeline in batches, writing one JSON per batch."""
"""Run full generation once and export the resulting dataset as JSONL."""
print(f"\nTotal records: {total_records}")
print(f"Batch size: {args.batch_size}")
print(f"Total batches: {num_batches}")
print(f"Starting from batch index: {start_batch}")
print(f"Ending at batch index: {end_batch} (exclusive)")

for batch_idx in range(start_batch, end_batch):
start_idx = batch_idx * args.batch_size
end_idx = min(start_idx + args.batch_size - 1, total_records - 1)
num_in_batch = end_idx - start_idx + 1

print(f"\n{'=' * 60}")
print(f"Processing batch {batch_idx}/{num_batches - 1} (records {start_idx}-{end_idx})")
print(f"{'=' * 60}")

config_builder = build_qa_generation_pipeline(
seed_source=seed_source,
start_index=start_idx,
end_index=end_idx,
**pipeline_kwargs,
)
print(f"Buffer size: {args.buffer_size}")
print(f"Resume mode: {args.resume}")

input_basename = args.input_dir.name
dataset_name = f"{input_basename}_batch{batch_idx}_{start_idx}_{end_idx}"
result = data_designer.create(config_builder, num_records=num_in_batch, dataset_name=dataset_name)
generated_df = result.load_dataset()
Comment thread
eric-tramel marked this conversation as resolved.
config_builder = build_qa_generation_pipeline(
seed_source=seed_source,
start_index=0,
end_index=total_records - 1,
**pipeline_kwargs,
)
Comment thread
shan-nvidia marked this conversation as resolved.

dataset_name = _resolve_dataset_name(args.input_dir, args.artifact_path, args.dataset_name)
print(f"Dataset name: {dataset_name}")
print("\nGenerating dataset...")
result = data_designer.create(
config_builder,
num_records=total_records,
dataset_name=dataset_name,
resume=ResumeMode(args.resume),
)

output_filename = f"generated_batch{batch_idx}_{start_idx}_{end_idx}.json"
generated_df.to_json(args.output_dir / output_filename, orient="records", indent=2)
print(f"Saved {output_filename} ({len(generated_df)} records)")
output_path = args.output_dir / f"{result.artifact_storage.resolved_dataset_name}.jsonl"
result.export(output_path, format="jsonl")

print(f"\n{'=' * 60}")
print(f"Generation complete! All batches saved to {args.output_dir}")
print(f"Total batches processed: {end_batch - start_batch}")
print(f"\nGeneration complete! Artifacts saved to {result.artifact_storage.base_dataset_path}")
print(f"Exported JSONL to {output_path}")


def _add_convert_parser(subparsers: argparse._SubParsersAction) -> None:
Expand All @@ -304,7 +340,7 @@ def _add_convert_parser(subparsers: argparse._SubParsersAction) -> None:
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)

p.add_argument("input_path", help="Path to JSON file or directory of batch files")
p.add_argument("input_path", help="Path to generated JSONL/JSON/parquet file or output directory")
p.add_argument("--corpus-id", required=True, help="Corpus identifier")
p.add_argument("--output-dir", default=None, help="Output directory")
p.add_argument("--eval-only", action="store_true", help="BEIR eval only (no train/val)")
Expand Down
Loading
Loading