diff --git a/dlio_benchmark/data_generator/arrow_generator.py b/dlio_benchmark/data_generator/arrow_generator.py new file mode 100644 index 00000000..3fe1dfe6 --- /dev/null +++ b/dlio_benchmark/data_generator/arrow_generator.py @@ -0,0 +1,291 @@ +""" + Copyright (c) 2025, UChicago Argonne, LLC + All Rights Reserved + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +""" +import os + +import numpy as np +import pyarrow as pa + +from dlio_benchmark.data_generator.data_generator import DataGenerator +from dlio_benchmark.utils.utility import progress, gen_random_tensor + +# All numeric dtypes supported for column generation. +_NP_TYPE_MAP = { + 'uint8': np.uint8, + 'uint16': np.uint16, + 'uint32': np.uint32, + 'uint64': np.uint64, + 'int8': np.int8, + 'int16': np.int16, + 'int32': np.int32, + 'int64': np.int64, + 'float16': np.float16, + 'float32': np.float32, + 'float64': np.float64, +} + +_PA_SCALAR_TYPE_MAP = { + 'uint8': pa.uint8(), + 'uint16': pa.uint16(), + 'uint32': pa.uint32(), + 'uint64': pa.uint64(), + 'int8': pa.int8(), + 'int16': pa.int16(), + 'int32': pa.int32(), + 'int64': pa.int64(), + 'float16': pa.float16(), + 'float32': pa.float32(), + 'float64': pa.float64(), +} + + +class ArrowGenerator(DataGenerator): + """ + Schema-driven Arrow IPC data generator. + + Supports two modes: + + 1. **Column-schema mode** (``arrow_columns`` config list is non-empty): + Generates multi-column files from a list of column specs, each with a + ``name``, ``dtype``, and optional ``size`` (embedding vector length). + Supported dtypes: uint8/16/32/64, int8/16/32/64, float16/32/64, + string, binary, bool. + + 2. **Legacy mode** (``arrow_columns`` empty): + Single ``data`` column of fixed-size uint8 lists, matching the original + DLIO behaviour for backward compatibility. + + Each file is written via ``pa.ipc.RecordBatchFileWriter`` so that readings + can use ``pa.ipc.open_file(pa.memory_map(...))`` for true zero-copy, + memory-mapped access — the OS page cache handles I/O without any explicit + read() syscalls. + + Key design properties: + - **Unique samples**: every row in every batch has distinct data. + - **RNG flow-through**: a single ``np.random.Generator`` is initialised + once per rank and advanced naturally through all file and batch + generations. No seed resets occur between files. + - **Near-zero copy**: numeric columns use ``gen_random_tensor`` with + ``rng=rng``; once the raw bytes exist they are wrapped in a + ``FixedSizeListArray`` via ``pa.array()`` using contiguous buffers. + """ + + def __init__(self): + super().__init__() + self.arrow_columns = getattr(self._args, 'arrow_columns', []) + batch = getattr(self._args, 'arrow_generation_batch_size', 0) + self.generation_batch_size = batch if batch > 0 else 1024 + + # ── Schema ─────────────────────────────────────────────────────────────── + + def _build_schema(self, legacy_elem_size=None): + """Build PyArrow schema from configured columns. + + When called in legacy mode (``arrow_columns`` is empty or None), + ``legacy_elem_size`` must be provided; it is the number of uint8 + elements per sample (= dim1 * dim2). The schema uses a + ``pa.list_(pa.uint8(), legacy_elem_size)`` fixed-size list, which + lets PyArrow use the efficient ``FixedSizeListArray`` representation + on reads. + + When called in column-schema mode, ``legacy_elem_size`` is ignored. + """ + if not self.arrow_columns: + size = legacy_elem_size or 1 + return pa.schema([('data', pa.list_(pa.uint8(), size))]) + + fields = [] + for col_spec in self.arrow_columns: + if hasattr(col_spec, 'get'): + name = str(col_spec.get('name', 'data')) + dtype = str(col_spec.get('dtype', 'float32')) + size = int(col_spec.get('size', 1)) + else: + name, dtype, size = str(col_spec), 'float32', 1 + + pa_scalar = _PA_SCALAR_TYPE_MAP.get(dtype) + + if pa_scalar is not None: + if size == 1: + fields.append(pa.field(name, pa_scalar)) + else: + # Fixed-size list of the scalar type + fields.append(pa.field(name, pa.list_(pa_scalar, size))) + elif dtype == 'list': + fields.append(pa.field(name, pa.list_(pa.float32(), size))) + elif dtype == 'string': + fields.append(pa.field(name, pa.string())) + elif dtype == 'binary': + fields.append(pa.field(name, pa.binary())) + elif dtype == 'bool': + fields.append(pa.field(name, pa.bool_())) + else: + # Unknown dtype — fall back to fixed-size float32 list + fields.append(pa.field(name, pa.list_(pa.float32(), size))) + + return pa.schema(fields) + + # ── Batch generation helpers ────────────────────────────────────────────── + + def _generate_column_data_batch(self, col_spec, batch_size, rng): + """Generate one batch of data for a single column. + + All numeric dtypes use ``gen_random_tensor(rng=rng)`` so the RNG + state advances naturally — no seed is computed or reset between calls. + + Returns ``(name, pa.Array)``. + """ + if hasattr(col_spec, 'get'): + name = str(col_spec.get('name', 'data')) + dtype = str(col_spec.get('dtype', 'float32')) + size = int(col_spec.get('size', 1)) + else: + name, dtype, size = str(col_spec), 'float32', 1 + + np_type = _NP_TYPE_MAP.get(dtype) + pa_scalar = _PA_SCALAR_TYPE_MAP.get(dtype) + + # ── Numeric scalar (size == 1) ────────────────────────────────────── + if np_type is not None and pa_scalar is not None and size == 1: + data = gen_random_tensor(shape=(batch_size,), dtype=np_type, rng=rng) + return name, pa.array(data, type=pa_scalar) + + # ── Numeric fixed-size list (size > 1) ───────────────────────────── + if np_type is not None and pa_scalar is not None: + # Generate as a flat (batch_size * size) array, then wrap as + # FixedSizeListArray — zero extra copies after dgen/numpy. + data = gen_random_tensor(shape=(batch_size * size,), dtype=np_type, rng=rng) + arrow_flat = pa.array(data, type=pa_scalar) + return name, pa.FixedSizeListArray.from_arrays(arrow_flat, size) + + if dtype == 'list': + data = gen_random_tensor(shape=(batch_size * size,), dtype=np.float32, rng=rng) + arrow_flat = pa.array(data, type=pa.float32()) + return name, pa.FixedSizeListArray.from_arrays(arrow_flat, size) + + # ── Non-numeric types — use numpy global state (seeded per rank) ─── + if dtype == 'string': + # Use integers from rng to build strings so they vary per run seed + ints = rng.integers(0, 2**31, size=batch_size) + return name, pa.array([f"s_{v}" for v in ints], type=pa.string()) + + if dtype == 'binary': + # Each sample: size random bytes from rng + rows = [rng.bytes(size) for _ in range(batch_size)] + return name, pa.array(rows, type=pa.binary()) + + if dtype == 'bool': + bits = rng.integers(0, 2, size=batch_size, dtype=np.uint8) + return name, pa.array(bits.astype(bool), type=pa.bool_()) + + # Fallback: float32 fixed-size list + data = gen_random_tensor(shape=(batch_size * size,), dtype=np.float32, rng=rng) + arrow_flat = pa.array(data, type=pa.float32()) + return name, pa.FixedSizeListArray.from_arrays(arrow_flat, size) + + def _generate_batch_columns(self, batch_size, rng): + """Generate all configured columns for one batch. + + The same ``rng`` object is advanced per column so every column in + every batch gets statistically independent, non-repeating data. + """ + columns = {} + for col_spec in self.arrow_columns: + name, arrow_data = self._generate_column_data_batch(col_spec, batch_size, rng) + columns[name] = arrow_data + return columns + + def _generate_legacy_batch(self, elem_size, batch_size, rng): + """Generate one batch for the legacy single-'data'-column mode. + + Generates ``(batch_size * elem_size)`` bytes in one dgen/numpy call, + then wraps the result as a ``FixedSizeListArray`` — no per-row Python + loop, no tiling, no copy. Each row is a distinct slice of the data + stream so samples within the same file are NOT identical. + + ``elem_size`` = dim1 * dim2 (the flat element count per sample). + """ + flat = gen_random_tensor(shape=(batch_size * elem_size,), dtype=np.uint8, rng=rng) + arrow_flat = pa.array(flat, type=pa.uint8()) + arrow_data = pa.FixedSizeListArray.from_arrays(arrow_flat, elem_size) + return {'data': arrow_data} + + # ── Main generation loop ────────────────────────────────────────────────── + + def generate(self): + """Generate Arrow IPC files using batched, RNG-flow-through generation. + + Seeding: + - One ``np.random.Generator`` is created per MPI rank, seeded with + ``BASE_SEED + my_rank``, and advanced through ALL file and batch + generations without any intermediate resets. + - This guarantees: (a) cross-file uniqueness — each file starts from a + different RNG state; (b) within-file uniqueness — each batch and each + sample row continues from where the previous one left off; (c) + reproducibility — the same master seed always produces the same files. + """ + super().generate() + + # Single RNG for the entire rank — never reset between files. + np.random.seed(self.BASE_SEED + self.my_rank) + rng = np.random.default_rng(seed=self.BASE_SEED + self.my_rank) + + dim = self.get_dimension(self.total_files_to_generate) + is_local = self.storage.islocalfs() + + for i in range(self.my_rank, int(self.total_files_to_generate), self.comm_size): + progress(i + 1, self.total_files_to_generate, "Generating Arrow Data") + + out_path_spec = self.storage.get_uri(self._file_list[i]) + + # Compute element size for legacy mode (dim may be list or scalar). + dim_raw = dim[2 * i] + if isinstance(dim_raw, list): + dim1 = int(dim_raw[0]) + dim2 = int(dim_raw[1]) if len(dim_raw) > 1 else 1 + else: + dim1 = int(dim_raw) + dim2 = int(dim[2 * i + 1]) + elem_size = dim1 * dim2 + + # Build schema. + schema = self._build_schema(legacy_elem_size=elem_size) + + # Generate all column data upfront for the entire file. + if self.arrow_columns: + full_columns = self._generate_batch_columns(self.num_samples, rng) + else: + full_columns = self._generate_legacy_batch(elem_size, self.num_samples, rng) + + table = pa.table(full_columns, schema=schema) + + if is_local: + parent_dir = os.path.dirname(out_path_spec) + if parent_dir: + os.makedirs(parent_dir, exist_ok=True) + # Write as Arrow IPC file (random-access format, memory-mappable) + with pa.OSFile(out_path_spec, 'wb') as sink: + writer = pa.ipc.new_file(sink, schema) + writer.write_table(table) + writer.close() + else: + sink = pa.BufferOutputStream() + writer = pa.ipc.new_file(sink, schema) + writer.write_table(table) + writer.close() + self.storage.put_data(out_path_spec, sink.getvalue().to_pybytes()) + + np.random.seed() diff --git a/dlio_benchmark/data_generator/generator_factory.py b/dlio_benchmark/data_generator/generator_factory.py index b654b035..cea7237c 100644 --- a/dlio_benchmark/data_generator/generator_factory.py +++ b/dlio_benchmark/data_generator/generator_factory.py @@ -53,5 +53,8 @@ def get_generator(type): elif type == FormatType.PARQUET: from dlio_benchmark.data_generator.parquet_generator import ParquetGenerator return ParquetGenerator() + elif type == FormatType.ARROW: + from dlio_benchmark.data_generator.arrow_generator import ArrowGenerator + return ArrowGenerator() else: raise Exception(str(ErrorCodes.EC1001)) diff --git a/dlio_benchmark/reader/arrow_reader.py b/dlio_benchmark/reader/arrow_reader.py new file mode 100644 index 00000000..fd1ef225 --- /dev/null +++ b/dlio_benchmark/reader/arrow_reader.py @@ -0,0 +1,96 @@ +""" +Arrow IPC reader using memory-mapped files for zero-copy access. + +Opens Arrow IPC files via ``pa.memory_map()`` + ``pa.ipc.open_file()`` so the +OS page cache handles all I/O — no explicit read() syscalls, no data copies +into Python heap. +""" +import bisect + +from dlio_benchmark.common.constants import MODULE_DATA_READER +from dlio_benchmark.reader.reader_handler import FormatReader +from dlio_benchmark.utils.utility import Profile, utcnow + +dlp = Profile(MODULE_DATA_READER) + + +class ArrowReader(FormatReader): + """ + Memory-mapped Arrow IPC reader. + + Uses ``pa.memory_map`` for true zero-copy reads — the kernel page cache + serves data directly into the process address space. + """ + @dlp.log_init + def __init__(self, dataset_type, thread_index, epoch): + super().__init__(dataset_type, thread_index) + + self.logger.info( + f"{utcnow()} ArrowReader thread={thread_index} epoch={epoch}" + ) + + @dlp.log + def open(self, filename): + """Memory-map an Arrow IPC file and build cumulative row offsets.""" + import pyarrow as pa + + mmap_file = pa.memory_map(filename, 'r') + reader = pa.ipc.open_file(mmap_file) + + # Build cumulative row offsets from record batches + offsets = [0] + for i in range(reader.num_record_batches): + offsets.append(offsets[-1] + reader.get_batch(i).num_rows) + + return (mmap_file, reader, offsets) + + @dlp.log + def close(self, filename): + """Close the memory-mapped file.""" + if filename in self.open_file_map: + entry = self.open_file_map[filename] + if entry is not None: + entry[0].close() + super().close(filename) + + @dlp.log + def get_sample(self, filename, sample_index): + """Read the record batch containing sample_index via zero-copy memory-mapped access.""" + mmap_file, reader, offsets = self.open_file_map[filename] + + # Binary search for the batch containing this sample + batch_idx = max(0, bisect.bisect_right(offsets, sample_index) - 1) + batch_idx = min(batch_idx, reader.num_record_batches - 1) + + batch = reader.get_batch(batch_idx) + + # Touch every page to trigger mmap page faults and ensure full I/O + PAGE_SIZE = 4096 + for col in batch.columns: + for buf in col.buffers(): + if buf is not None and buf.size > 0: + mv = memoryview(buf) + # Touch one byte per page to fault in entire buffer + for offset in range(0, len(mv), PAGE_SIZE): + _ = mv[offset] + + dlp.update(image_size=batch.nbytes) + + def next(self): + for batch in super().next(): + yield batch + + @dlp.log + def read_index(self, image_idx, step): + dlp.update(step=step) + return super().read_index(image_idx, step) + + @dlp.log + def finalize(self): + return super().finalize() + + def is_index_based(self): + return True + + def is_iterator_based(self): + return True diff --git a/dlio_benchmark/reader/reader_factory.py b/dlio_benchmark/reader/reader_factory.py index 536bbe09..6350110c 100644 --- a/dlio_benchmark/reader/reader_factory.py +++ b/dlio_benchmark/reader/reader_factory.py @@ -160,6 +160,9 @@ def get_reader(type, dataset_type, thread_index, epoch_number): else: from dlio_benchmark.reader.parquet_reader import ParquetReader return ParquetReader(dataset_type, thread_index, epoch_number) + elif type == FormatType.ARROW: + from dlio_benchmark.reader.arrow_reader import ArrowReader + return ArrowReader(dataset_type, thread_index, epoch_number) else: raise Exception("Loading data of %s format is not supported without framework data loader" %type) diff --git a/dlio_benchmark/utils/config.py b/dlio_benchmark/utils/config.py index 4df35438..9c24a11f 100644 --- a/dlio_benchmark/utils/config.py +++ b/dlio_benchmark/utils/config.py @@ -188,6 +188,10 @@ class ConfigArguments: parquet_partition_by: Optional[str] = None parquet_generation_batch_size: int = 0 + ## dataset: arrow-only + arrow_columns: ClassVar[List[Dict[str, Any]]] = [] + arrow_generation_batch_size: int = 0 + ## dataset: hdf5-only num_dset_per_record: int = 1 chunk_dims: ClassVar[List[int]] = [] @@ -1147,6 +1151,15 @@ def LoadConfig(args, config): if 'generation_batch_size' in pq_cfg: args.parquet_generation_batch_size = int(pq_cfg['generation_batch_size']) + # arrow only config + if 'arrow' in config['dataset']: + ar_cfg = config['dataset']['arrow'] + if 'columns' in ar_cfg: + cols = ar_cfg['columns'] + args.arrow_columns = [dict(c) if hasattr(c, 'items') else c for c in cols] + if 'generation_batch_size' in ar_cfg: + args.arrow_generation_batch_size = int(ar_cfg['generation_batch_size']) + # hdf5 only config if 'hdf5' in config['dataset']: if 'chunk_dims' in config['dataset']['hdf5']: