diff --git a/bench/batch_store.py b/bench/batch_store.py new file mode 100644 index 00000000..7b84370d --- /dev/null +++ b/bench/batch_store.py @@ -0,0 +1,170 @@ +####################################################################### +# Copyright (c) 2019-present, Blosc Development Team +# All rights reserved. +# +# SPDX-License-Identifier: BSD-3-Clause +####################################################################### + +from __future__ import annotations + +import argparse +import random +import statistics +import time + +import blosc2 + + +URLPATH = "bench_batch_store.b2b" +NBATCHES = 10_000 +OBJECTS_PER_BATCH = 100 +TOTAL_OBJECTS = NBATCHES * OBJECTS_PER_BATCH +BLOCKSIZE_MAX = 32 +N_RANDOM_READS = 1_000 + + +def make_rgb(batch_index: int, item_index: int) -> dict[str, int]: + global_index = batch_index * OBJECTS_PER_BATCH + item_index + return { + "red": batch_index, + "green": item_index, + "blue": global_index, + } + + +def make_batch(batch_index: int) -> list[dict[str, int]]: + return [make_rgb(batch_index, item_index) for item_index in range(OBJECTS_PER_BATCH)] + + +def expected_entry(batch_index: int, item_index: int) -> dict[str, int]: + return { + "red": batch_index, + "green": item_index, + "blue": batch_index * OBJECTS_PER_BATCH + item_index, + } + + +def build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser( + description="Benchmark BatchStore single-entry reads.", + formatter_class=argparse.ArgumentDefaultsHelpFormatter, + ) + parser.add_argument("--codec", type=str, default="ZSTD", choices=[codec.name for codec in blosc2.Codec]) + parser.add_argument("--clevel", type=int, default=5) + parser.add_argument("--serializer", type=str, default="msgpack", choices=["msgpack", "arrow"]) + parser.add_argument("--use-dict", action="store_true", help="Enable dictionaries for ZSTD/LZ4/LZ4HC codecs.") + parser.add_argument("--in-mem", action="store_true", help="Keep the BatchStore purely in memory.") + return parser + + +def build_store( + codec: blosc2.Codec, clevel: int, use_dict: bool, serializer: str, in_mem: bool +) -> blosc2.BatchStore | None: + if in_mem: + storage = blosc2.Storage(mode="w") + store = blosc2.BatchStore( + storage=storage, + max_blocksize=BLOCKSIZE_MAX, + serializer=serializer, + cparams={ + "codec": codec, + "clevel": clevel, + "use_dict": use_dict and codec in (blosc2.Codec.ZSTD, blosc2.Codec.LZ4, blosc2.Codec.LZ4HC), + }, + ) + for batch_index in range(NBATCHES): + store.append(make_batch(batch_index)) + return store + + blosc2.remove_urlpath(URLPATH) + storage = blosc2.Storage(urlpath=URLPATH, mode="w", contiguous=True) + cparams = { + "codec": codec, + "clevel": clevel, + "use_dict": use_dict and codec in (blosc2.Codec.ZSTD, blosc2.Codec.LZ4, blosc2.Codec.LZ4HC), + } + with blosc2.BatchStore( + storage=storage, max_blocksize=BLOCKSIZE_MAX, serializer=serializer, cparams=cparams + ) as store: + for batch_index in range(NBATCHES): + store.append(make_batch(batch_index)) + return None + + +def measure_random_reads(store: blosc2.BatchStore) -> tuple[list[tuple[int, int, int, dict[str, int]]], list[int]]: + rng = random.Random(2024) + samples: list[tuple[int, int, int, dict[str, int]]] = [] + timings_ns: list[int] = [] + + for _ in range(N_RANDOM_READS): + batch_index = rng.randrange(len(store)) + item_index = rng.randrange(OBJECTS_PER_BATCH) + t0 = time.perf_counter_ns() + value = store[batch_index][item_index] + timings_ns.append(time.perf_counter_ns() - t0) + if value != expected_entry(batch_index, item_index): + raise RuntimeError(f"Value mismatch at batch={batch_index}, item={item_index}") + samples.append((timings_ns[-1], batch_index, item_index, value)) + + return samples, timings_ns + + +def main() -> None: + parser = build_parser() + args = parser.parse_args() + codec = blosc2.Codec[args.codec] + use_dict = args.use_dict and codec in (blosc2.Codec.ZSTD, blosc2.Codec.LZ4, blosc2.Codec.LZ4HC) + + mode_label = "in-memory" if args.in_mem else "persistent" + article = "an" if args.in_mem else "a" + print(f"Building {article} {mode_label} BatchStore with 1,000,000 RGB dicts and timing 1,000 random scalar reads...") + print(f" codec: {codec.name}") + print(f" clevel: {args.clevel}") + print(f" serializer: {args.serializer}") + print(f" use_dict: {use_dict}") + print(f" in_mem: {args.in_mem}") + t0 = time.perf_counter() + store = build_store( + codec=codec, clevel=args.clevel, use_dict=use_dict, serializer=args.serializer, in_mem=args.in_mem + ) + build_time_s = time.perf_counter() - t0 + if args.in_mem: + assert store is not None + read_store = store + else: + read_store = blosc2.BatchStore(urlpath=URLPATH, mode="r", contiguous=True, max_blocksize=BLOCKSIZE_MAX) + samples, timings_ns = measure_random_reads(read_store) + t0 = time.perf_counter() + checksum = 0 + nitems = 0 + for item in read_store.iter_items(): + checksum += item["blue"] + nitems += 1 + iter_time_s = time.perf_counter() - t0 + + print() + print("BatchStore benchmark") + print(f" build time: {build_time_s:.3f} s") + print(f" batches: {len(read_store)}") + print(f" items: {TOTAL_OBJECTS}") + print(f" max_blocksize: {read_store.max_blocksize}") + print() + print(read_store.info) + print(f"Random scalar reads: {N_RANDOM_READS}") + print(f" mean: {statistics.fmean(timings_ns) / 1_000:.2f} us") + print(f" max: {max(timings_ns) / 1_000:.2f} us") + print(f" min: {min(timings_ns) / 1_000:.2f} us") + print(f"Item iteration via iter_items(): {iter_time_s:.3f} s") + print(f" per item: {iter_time_s * 1_000_000 / nitems:.2f} us") + print(f" checksum: {checksum}") + print("Sample reads:") + for timing_ns, batch_index, item_index, value in samples[:5]: + print(f" {timing_ns / 1_000:.2f} us -> read_store[{batch_index}][{item_index}] = {value}") + if args.in_mem: + print("BatchStore kept in memory") + else: + print(f"BatchStore file at: {read_store.urlpath}") + + +if __name__ == "__main__": + main() diff --git a/doc/getting_started/tutorials/12.batchstore.ipynb b/doc/getting_started/tutorials/12.batchstore.ipynb new file mode 100644 index 00000000..52bcc660 --- /dev/null +++ b/doc/getting_started/tutorials/12.batchstore.ipynb @@ -0,0 +1,495 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "2c822501cae3b91d", + "metadata": {}, + "source": [ + "# Working with BatchStore\n", + "\n", + "A `BatchStore` is a batch-oriented container for variable-length Python items backed by a single `SChunk`. Each batch is stored in one compressed chunk, and each chunk may contain one or more internal variable-length blocks.\n", + "\n", + "This makes `BatchStore` a good fit when data arrives naturally in batches and you want efficient batch append/update operations together with occasional item-level access inside each batch." + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "id": "be8591f8f86952e8", + "metadata": { + "ExecuteTime": { + "end_time": "2026-03-20T10:24:10.190550Z", + "start_time": "2026-03-20T10:24:10.014859Z" + }, + "execution": { + "iopub.execute_input": "2026-03-20T10:23:51.329739Z", + "iopub.status.busy": "2026-03-20T10:23:51.329437Z", + "iopub.status.idle": "2026-03-20T10:23:51.556056Z", + "shell.execute_reply": "2026-03-20T10:23:51.555614Z" + } + }, + "outputs": [], + "source": [ + "import blosc2\n", + "\n", + "\n", + "def show(label, value):\n", + " print(f\"{label}: {value}\")\n", + "\n", + "\n", + "urlpath = \"batchstore_tutorial.b2b\"\n", + "copy_path = \"batchstore_tutorial_copy.b2b\"\n", + "blosc2.remove_urlpath(urlpath)\n", + "blosc2.remove_urlpath(copy_path)" + ] + }, + { + "cell_type": "markdown", + "id": "dda38c56e3e63ec1", + "metadata": {}, + "source": [ + "## Creating and populating a BatchStore\n", + "\n", + "A `BatchStore` is indexed by batch. Batches can be appended one by one with `append()` or in bulk with `extend()`. Here we set a small `max_blocksize` just so the internal block structure is easy to observe in `.info`." + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "id": "f8c8a2b7692e7228", + "metadata": { + "ExecuteTime": { + "end_time": "2026-03-20T10:24:10.211954Z", + "start_time": "2026-03-20T10:24:10.191296Z" + }, + "execution": { + "iopub.execute_input": "2026-03-20T10:23:51.557338Z", + "iopub.status.busy": "2026-03-20T10:23:51.557245Z", + "iopub.status.idle": "2026-03-20T10:23:51.564920Z", + "shell.execute_reply": "2026-03-20T10:23:51.564578Z" + } + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Batches: [[{'name': 'alpha', 'count': 1}, {'name': 'beta', 'count': 2}, {'name': 'gamma', 'count': 3}], [{'name': 'delta', 'count': 4}, {'name': 'epsilon', 'count': 5}], [{'name': 'zeta', 'count': 6}], [{'name': 'eta', 'count': 7}, {'name': 'theta', 'count': 8}], [{'name': 'iota', 'count': 9}, {'name': 'kappa', 'count': 10}, {'name': 'lambda', 'count': 11}]]\n", + "Number of batches: 5\n" + ] + } + ], + "source": [ + "store = blosc2.BatchStore(urlpath=urlpath, mode=\"w\", contiguous=True, max_blocksize=2)\n", + "store.append(\n", + " [\n", + " {\"name\": \"alpha\", \"count\": 1},\n", + " {\"name\": \"beta\", \"count\": 2},\n", + " {\"name\": \"gamma\", \"count\": 3},\n", + " ]\n", + ")\n", + "store.append(\n", + " [\n", + " {\"name\": \"delta\", \"count\": 4},\n", + " {\"name\": \"epsilon\", \"count\": 5},\n", + " ]\n", + ")\n", + "store.extend(\n", + " [\n", + " [{\"name\": \"zeta\", \"count\": 6}],\n", + " [{\"name\": \"eta\", \"count\": 7}, {\"name\": \"theta\", \"count\": 8}],\n", + " [\n", + " {\"name\": \"iota\", \"count\": 9},\n", + " {\"name\": \"kappa\", \"count\": 10},\n", + " {\"name\": \"lambda\", \"count\": 11},\n", + " ],\n", + " ]\n", + ")\n", + "\n", + "show(\"Batches\", [batch[:] for batch in store])\n", + "show(\"Number of batches\", len(store))" + ] + }, + { + "cell_type": "markdown", + "id": "f57fc5cf2cbaa9ba", + "metadata": {}, + "source": [ + "## Batch and item access\n", + "\n", + "Indexing the store returns a batch. Indexing a batch returns an item inside that batch. Flat item-wise traversal is available through `iter_items()`." + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "id": "20861d3e348f9df1", + "metadata": { + "ExecuteTime": { + "end_time": "2026-03-20T10:24:10.229980Z", + "start_time": "2026-03-20T10:24:10.213198Z" + }, + "execution": { + "iopub.execute_input": "2026-03-20T10:23:51.566000Z", + "iopub.status.busy": "2026-03-20T10:23:51.565919Z", + "iopub.status.idle": "2026-03-20T10:23:51.569765Z", + "shell.execute_reply": "2026-03-20T10:23:51.569439Z" + } + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "First batch: [{'name': 'alpha', 'count': 1}, {'name': 'beta', 'count': 2}, {'name': 'gamma', 'count': 3}]\n", + "Second item in first batch: {'name': 'beta', 'count': 2}\n", + "Slice of second batch: [{'name': 'delta', 'count': 4}]\n", + "All items: [{'name': 'alpha', 'count': 1}, {'name': 'beta', 'count': 2}, {'name': 'gamma', 'count': 3}, {'name': 'delta', 'count': 4}, {'name': 'epsilon', 'count': 5}, {'name': 'zeta', 'count': 6}, {'name': 'eta', 'count': 7}, {'name': 'theta', 'count': 8}, {'name': 'iota', 'count': 9}, {'name': 'kappa', 'count': 10}, {'name': 'lambda', 'count': 11}]\n" + ] + } + ], + "source": [ + "show(\"First batch\", store[0][:])\n", + "show(\"Second item in first batch\", store[0][1])\n", + "show(\"Slice of second batch\", store[1][:1])\n", + "show(\"All items\", list(store.iter_items()))" + ] + }, + { + "cell_type": "markdown", + "id": "eba42acee73bffe3", + "metadata": {}, + "source": [ + "## Updating, inserting, and deleting batches\n", + "\n", + "Mutation is batch-oriented too: you overwrite, insert, delete, and pop whole batches." + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "id": "df556f6da8adc369", + "metadata": { + "ExecuteTime": { + "end_time": "2026-03-20T10:24:10.259055Z", + "start_time": "2026-03-20T10:24:10.231589Z" + }, + "execution": { + "iopub.execute_input": "2026-03-20T10:23:51.570823Z", + "iopub.status.busy": "2026-03-20T10:23:51.570763Z", + "iopub.status.idle": "2026-03-20T10:23:51.577607Z", + "shell.execute_reply": "2026-03-20T10:23:51.577269Z" + } + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Popped batch: [{'name': 'zeta', 'count': 6}]\n", + "After updates: [[{'name': 'alpha*', 'count': 10}, {'name': 'beta*', 'count': 20}], [{'name': 'delta*', 'count': 40}, {'name': 'epsilon*', 'count': 50}], [{'name': 'between-a', 'count': 99}, {'name': 'between-b', 'count': 100}], [{'name': 'eta', 'count': 7}, {'name': 'theta', 'count': 8}], [{'name': 'iota', 'count': 9}, {'name': 'kappa', 'count': 10}, {'name': 'lambda', 'count': 11}]]\n" + ] + } + ], + "source": [ + "store[1] = [\n", + " {\"name\": \"delta*\", \"count\": 40},\n", + " {\"name\": \"epsilon*\", \"count\": 50},\n", + "]\n", + "store.insert(2, [{\"name\": \"between-a\", \"count\": 99}, {\"name\": \"between-b\", \"count\": 100}])\n", + "removed = store.pop(3)\n", + "del store[0]\n", + "store.insert(0, [{\"name\": \"alpha*\", \"count\": 10}, {\"name\": \"beta*\", \"count\": 20}])\n", + "\n", + "show(\"Popped batch\", removed)\n", + "show(\"After updates\", [batch[:] for batch in store])" + ] + }, + { + "cell_type": "markdown", + "id": "e48791c431156e56", + "metadata": {}, + "source": [ + "## Iteration and summary info\n", + "\n", + "Iterating a `BatchStore` yields batches. The `.info` summary reports both batch-level and internal block-level statistics." + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "id": "b32d72a68d83673e", + "metadata": { + "ExecuteTime": { + "end_time": "2026-03-20T10:24:10.300526Z", + "start_time": "2026-03-20T10:24:10.259712Z" + }, + "execution": { + "iopub.execute_input": "2026-03-20T10:23:51.578504Z", + "iopub.status.busy": "2026-03-20T10:23:51.578433Z", + "iopub.status.idle": "2026-03-20T10:23:51.581563Z", + "shell.execute_reply": "2026-03-20T10:23:51.581191Z" + } + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Batches via iteration: [[{'name': 'alpha*', 'count': 10}, {'name': 'beta*', 'count': 20}], [{'name': 'delta*', 'count': 40}, {'name': 'epsilon*', 'count': 50}], [{'name': 'between-a', 'count': 99}, {'name': 'between-b', 'count': 100}], [{'name': 'eta', 'count': 7}, {'name': 'theta', 'count': 8}], [{'name': 'iota', 'count': 9}, {'name': 'kappa', 'count': 10}, {'name': 'lambda', 'count': 11}]]\n", + "type : BatchStore\n", + "serializer : msgpack\n", + "nbatches : 5 (items per batch: mean=2.20, max=3, min=2)\n", + "nblocks : 6 (items per block: mean=1.83, max=2, min=1)\n", + "nitems : 11\n", + "nbytes : 226 (226 B)\n", + "cbytes : 680 (680 B)\n", + "cratio : 0.33\n", + "cparams : CParams(codec=, codec_meta=0, clevel=5, use_dict=False, typesize=1,\n", + " : nthreads=12, blocksize=0, splitmode=,\n", + " : filters=[, , ,\n", + " : , , ], filters_meta=[0,\n", + " : 0, 0, 0, 0, 0], tuner=)\n", + "dparams : DParams(nthreads=12)\n", + "\n" + ] + } + ], + "source": [ + "show(\"Batches via iteration\", [batch[:] for batch in store])\n", + "print(store.info)" + ] + }, + { + "cell_type": "markdown", + "id": "1d6abe8fe87d3663", + "metadata": {}, + "source": [ + "## Copying and changing storage settings\n", + "\n", + "Like other Blosc2 containers, `BatchStore.copy()` can write a new persistent store while changing storage or compression settings." + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "id": "45f878b8f4414a3b", + "metadata": { + "ExecuteTime": { + "end_time": "2026-03-20T10:24:10.334099Z", + "start_time": "2026-03-20T10:24:10.301619Z" + }, + "execution": { + "iopub.execute_input": "2026-03-20T10:23:51.582437Z", + "iopub.status.busy": "2026-03-20T10:23:51.582372Z", + "iopub.status.idle": "2026-03-20T10:23:51.590494Z", + "shell.execute_reply": "2026-03-20T10:23:51.590186Z" + } + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Copied batches: [[{'name': 'alpha*', 'count': 10}, {'name': 'beta*', 'count': 20}], [{'name': 'delta*', 'count': 40}, {'name': 'epsilon*', 'count': 50}], [{'name': 'between-a', 'count': 99}, {'name': 'between-b', 'count': 100}], [{'name': 'eta', 'count': 7}, {'name': 'theta', 'count': 8}], [{'name': 'iota', 'count': 9}, {'name': 'kappa', 'count': 10}, {'name': 'lambda', 'count': 11}]]\n", + "Copy serializer: msgpack\n", + "Copy codec: Codec.LZ4\n" + ] + } + ], + "source": [ + "store_copy = store.copy(\n", + " urlpath=copy_path,\n", + " contiguous=False,\n", + " cparams={\"codec\": blosc2.Codec.LZ4, \"clevel\": 5},\n", + ")\n", + "\n", + "show(\"Copied batches\", [batch[:] for batch in store_copy])\n", + "show(\"Copy serializer\", store_copy.serializer)\n", + "show(\"Copy codec\", store_copy.cparams.codec)" + ] + }, + { + "cell_type": "markdown", + "id": "19c51a629db1209", + "metadata": {}, + "source": [ + "## Round-tripping through cframes and reopening from disk\n", + "\n", + "Tagged persistent stores automatically reopen as `BatchStore`, and a serialized cframe buffer does too." + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "id": "fd4957093f509bd4", + "metadata": { + "ExecuteTime": { + "end_time": "2026-03-20T10:24:10.359063Z", + "start_time": "2026-03-20T10:24:10.343012Z" + }, + "execution": { + "iopub.execute_input": "2026-03-20T10:23:51.591475Z", + "iopub.status.busy": "2026-03-20T10:23:51.591415Z", + "iopub.status.idle": "2026-03-20T10:23:51.594839Z", + "shell.execute_reply": "2026-03-20T10:23:51.594553Z" + } + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "from_cframe type: BatchStore\n", + "from_cframe batches: [[{'name': 'alpha*', 'count': 10}, {'name': 'beta*', 'count': 20}], [{'name': 'delta*', 'count': 40}, {'name': 'epsilon*', 'count': 50}], [{'name': 'between-a', 'count': 99}, {'name': 'between-b', 'count': 100}], [{'name': 'eta', 'count': 7}, {'name': 'theta', 'count': 8}], [{'name': 'iota', 'count': 9}, {'name': 'kappa', 'count': 10}, {'name': 'lambda', 'count': 11}]]\n", + "Reopened type: BatchStore\n", + "Reopened batches: [[{'name': 'alpha*', 'count': 10}, {'name': 'beta*', 'count': 20}], [{'name': 'delta*', 'count': 40}, {'name': 'epsilon*', 'count': 50}], [{'name': 'between-a', 'count': 99}, {'name': 'between-b', 'count': 100}], [{'name': 'eta', 'count': 7}, {'name': 'theta', 'count': 8}], [{'name': 'iota', 'count': 9}, {'name': 'kappa', 'count': 10}, {'name': 'lambda', 'count': 11}]]\n" + ] + } + ], + "source": [ + "cframe = store.to_cframe()\n", + "restored = blosc2.from_cframe(cframe)\n", + "show(\"from_cframe type\", type(restored).__name__)\n", + "show(\"from_cframe batches\", [batch[:] for batch in restored])\n", + "\n", + "reopened = blosc2.open(urlpath, mode=\"r\", mmap_mode=\"r\")\n", + "show(\"Reopened type\", type(reopened).__name__)\n", + "show(\"Reopened batches\", [batch[:] for batch in reopened])" + ] + }, + { + "cell_type": "markdown", + "id": "dc362a1cab78d016", + "metadata": {}, + "source": [ + "## Clearing and reusing a store\n", + "\n", + "Calling `clear()` resets the backing storage so the container remains ready for new batches." + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "id": "2214b2be1bfb5bc7", + "metadata": { + "ExecuteTime": { + "end_time": "2026-03-20T10:24:10.386442Z", + "start_time": "2026-03-20T10:24:10.365740Z" + }, + "execution": { + "iopub.execute_input": "2026-03-20T10:23:51.595854Z", + "iopub.status.busy": "2026-03-20T10:23:51.595778Z", + "iopub.status.idle": "2026-03-20T10:23:51.601478Z", + "shell.execute_reply": "2026-03-20T10:23:51.601232Z" + } + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "After clear + extend: [[{'name': 'fresh', 'count': 1}], [{'name': 'again', 'count': 2}, {'name': 'done', 'count': 3}]]\n" + ] + } + ], + "source": [ + "scratch = store.copy()\n", + "scratch.clear()\n", + "scratch.extend(\n", + " [\n", + " [{\"name\": \"fresh\", \"count\": 1}],\n", + " [{\"name\": \"again\", \"count\": 2}, {\"name\": \"done\", \"count\": 3}],\n", + " ]\n", + ")\n", + "show(\"After clear + extend\", [batch[:] for batch in scratch])" + ] + }, + { + "cell_type": "markdown", + "id": "8d8f9df58a46c4c1", + "metadata": {}, + "source": [ + "## Flat item access with `.items`\n", + "\n", + "The main `BatchStore` API remains batch-oriented, but the `.items` accessor offers a read-only flat view across all items. Integer indexing returns one item and slicing returns a Python list." + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "id": "4f5c4e5a1b8f92d4", + "metadata": { + "ExecuteTime": { + "end_time": "2026-03-20T10:24:10.403443Z", + "start_time": "2026-03-20T10:24:10.387808Z" + }, + "execution": { + "iopub.execute_input": "2026-03-20T10:23:51.602502Z", + "iopub.status.busy": "2026-03-20T10:23:51.602451Z", + "iopub.status.idle": "2026-03-20T10:23:51.606267Z", + "shell.execute_reply": "2026-03-20T10:23:51.605893Z" + } + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Flat item 0: {'name': 'alpha*', 'count': 10}\n", + "Flat item 6: {'name': 'eta', 'count': 7}\n", + "Flat slice 3:8: [{'name': 'epsilon*', 'count': 50}, {'name': 'between-a', 'count': 99}, {'name': 'between-b', 'count': 100}, {'name': 'eta', 'count': 7}, {'name': 'theta', 'count': 8}]\n" + ] + } + ], + "source": [ + "show(\"Flat item 0\", store.items[0])\n", + "show(\"Flat item 6\", store.items[6])\n", + "show(\"Flat slice 3:8\", store.items[3:8])" + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "id": "2a355a3fc8673692", + "metadata": { + "ExecuteTime": { + "end_time": "2026-03-20T10:24:10.420064Z", + "start_time": "2026-03-20T10:24:10.403926Z" + }, + "execution": { + "iopub.execute_input": "2026-03-20T10:23:51.607247Z", + "iopub.status.busy": "2026-03-20T10:23:51.607185Z", + "iopub.status.idle": "2026-03-20T10:23:51.608877Z", + "shell.execute_reply": "2026-03-20T10:23:51.608598Z" + } + }, + "outputs": [], + "source": [ + "blosc2.remove_urlpath(urlpath)\n", + "blosc2.remove_urlpath(copy_path)" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.13.5" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/doc/reference/batch_store.rst b/doc/reference/batch_store.rst new file mode 100644 index 00000000..6b1edc8d --- /dev/null +++ b/doc/reference/batch_store.rst @@ -0,0 +1,93 @@ +.. _BatchStore: + +BatchStore +========== + +Overview +-------- +BatchStore is a batch-oriented container for variable-length Python items +backed by a single Blosc2 ``SChunk``. + +Each batch is stored in one compressed chunk: + +- batches contain one or more Python items +- each chunk may contain one or more internal variable-length blocks +- the store itself is indexed by batch +- item-wise traversal is available via :meth:`BatchStore.iter_items` + +BatchStore is a good fit when data arrives naturally in batches and you want: + +- efficient batch append/update operations +- persistent ``.b2b`` stores +- item-level reads inside a batch +- compact summary information about batches and internal blocks via ``.info`` + +Serializer support +------------------ + +BatchStore currently supports two serializers: + +- ``"msgpack"``: the default and general-purpose choice for Python items +- ``"arrow"``: optional and requires ``pyarrow``; mainly useful when data is + already Arrow-shaped before ingestion + +Quick example +------------- + +.. code-block:: python + + import blosc2 + + store = blosc2.BatchStore(urlpath="example_batch_store.b2b", mode="w", contiguous=True) + store.append([{"red": 1, "green": 2, "blue": 3}, {"red": 4, "green": 5, "blue": 6}]) + store.append([{"red": 7, "green": 8, "blue": 9}]) + + print(store[0]) # first batch + print(store[0][1]) # second item in first batch + print(list(store.iter_items())) + + reopened = blosc2.open("example_batch_store.b2b", mode="r") + print(type(reopened).__name__) + print(reopened.info) + +.. note:: + BatchStore is batch-oriented by design. ``store[i]`` returns a batch, not a + single item. Use :meth:`BatchStore.iter_items` for flat item-wise traversal. + +.. currentmodule:: blosc2 + +.. autoclass:: BatchStore + + Constructors + ------------ + .. automethod:: __init__ + + Batch Interface + --------------- + .. automethod:: __getitem__ + .. automethod:: __setitem__ + .. automethod:: __delitem__ + .. automethod:: __len__ + .. automethod:: __iter__ + .. automethod:: iter_items + + Mutation + -------- + .. automethod:: append + .. automethod:: extend + .. automethod:: insert + .. automethod:: pop + .. automethod:: delete + .. automethod:: clear + .. automethod:: copy + + Context Manager + --------------- + .. automethod:: __enter__ + .. automethod:: __exit__ + + Public Members + -------------- + .. automethod:: to_cframe + +.. autoclass:: Batch diff --git a/doc/reference/classes.rst b/doc/reference/classes.rst index 84af533c..83733b2f 100644 --- a/doc/reference/classes.rst +++ b/doc/reference/classes.rst @@ -16,6 +16,7 @@ Main Classes DictStore TreeStore EmbedStore + BatchStore VLArray Proxy ProxySource @@ -34,6 +35,7 @@ Main Classes dict_store tree_store embed_store + batch_store vlarray proxy proxysource diff --git a/examples/batch_store.py b/examples/batch_store.py new file mode 100644 index 00000000..9a809fec --- /dev/null +++ b/examples/batch_store.py @@ -0,0 +1,73 @@ +####################################################################### +# Copyright (c) 2019-present, Blosc Development Team +# All rights reserved. +# +# SPDX-License-Identifier: BSD-3-Clause +####################################################################### + +from __future__ import annotations + +import random + +import blosc2 + +URLPATH = "example_batch_store.b2b" +NBATCHES = 100 +OBJECTS_PER_BATCH = 100 +BLOCKSIZE_MAX = 32 +N_RANDOM_SAMPLES = 5 + + +def make_rgb(batch_index: int, item_index: int) -> dict[str, int]: + global_index = batch_index * OBJECTS_PER_BATCH + item_index + return { + "red": batch_index, + "green": item_index, + "blue": global_index, + } + + +def make_batch(batch_index: int) -> list[dict[str, int]]: + return [make_rgb(batch_index, item_index) for item_index in range(OBJECTS_PER_BATCH)] + + +def main() -> None: + # Start clean so the example is reproducible when run multiple times. + blosc2.remove_urlpath(URLPATH) + + storage = blosc2.Storage(urlpath=URLPATH, mode="w", contiguous=True) + with blosc2.BatchStore(storage=storage, max_blocksize=BLOCKSIZE_MAX) as store: + for batch_index in range(NBATCHES): + store.append(make_batch(batch_index)) + + total_objects = sum(len(batch) for batch in store) + print("Created BatchStore") + print(f" batches: {len(store)}") + print(f" objects: {total_objects}") + print(f" max_blocksize: {store.max_blocksize}") + + # Reopen with the same max_blocksize hint so scalar reads can use the + # VL-block path instead of decoding the entire batch. + reopened = blosc2.BatchStore(urlpath=URLPATH, mode="r", contiguous=True, max_blocksize=BLOCKSIZE_MAX) + + print() + print(reopened.info) + + sample_rng = random.Random(2024) + print("Random scalar reads:") + for _ in range(N_RANDOM_SAMPLES): + batch_index = sample_rng.randrange(len(reopened)) + item_index = sample_rng.randrange(OBJECTS_PER_BATCH) + value = reopened[batch_index][item_index] + print(f" reopened[{batch_index}][{item_index}] -> {value}") + + print() + print("Flat item reads via .items:") + print(f" reopened.items[0] -> {reopened.items[0]}") + print(f" reopened.items[150:153] -> {reopened.items[150:153]}") + + print(f"BatchStore file at: {reopened.urlpath}") + + +if __name__ == "__main__": + main() diff --git a/pyproject.toml b/pyproject.toml index 81c8d52a..c25612d5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -40,6 +40,12 @@ dependencies = [ "requests", ] version = "4.1.3.dev0" + +[project.optional-dependencies] +recommended = [ + "pyarrow", +] + [project.entry-points."array_api"] blosc2 = "blosc2" diff --git a/src/blosc2/__init__.py b/src/blosc2/__init__.py index e0e8f9ac..e32b2f48 100644 --- a/src/blosc2/__init__.py +++ b/src/blosc2/__init__.py @@ -37,6 +37,11 @@ from .version import __array_api_version__, __version__ +_PACKAGE_DIR = str(Path(__file__).resolve().parent) +if _PACKAGE_DIR in __path__: + __path__.remove(_PACKAGE_DIR) +__path__.insert(0, _PACKAGE_DIR) + def _configure_libtcc_runtime_path(): """Best-effort configuration so miniexpr can find bundled libtcc at runtime.""" @@ -530,6 +535,7 @@ def _raise(exc): from .embed_store import EmbedStore, estore_from_cframe from .dict_store import DictStore from .tree_store import TreeStore +from .batch_store import Batch, BatchStore from .vlarray import VLArray, vlarray_from_cframe from .c2array import c2context, C2Array, URLPath @@ -714,6 +720,8 @@ def _raise(exc): # Classes "C2Array", "CParams", + "Batch", + "BatchStore", # Enums "Codec", "DParams", diff --git a/src/blosc2/batch_store.py b/src/blosc2/batch_store.py new file mode 100644 index 00000000..984f043e --- /dev/null +++ b/src/blosc2/batch_store.py @@ -0,0 +1,937 @@ +####################################################################### +# Copyright (c) 2019-present, Blosc Development Team +# All rights reserved. +# +# SPDX-License-Identifier: BSD-3-Clause +####################################################################### + +from __future__ import annotations + +import copy +import pathlib +import statistics +from collections.abc import Iterator, Sequence +from dataclasses import asdict +from functools import lru_cache +from typing import Any + +import numpy as np + +import blosc2 +from blosc2._msgpack_utils import msgpack_packb, msgpack_unpackb +from blosc2.info import InfoReporter, format_nbytes_info + +_BATCHSTORE_META = {"version": 1, "serializer": "msgpack", "max_blocksize": None, "arrow_schema": None} +_SUPPORTED_SERIALIZERS = {"msgpack", "arrow"} +_BATCHSTORE_VLMETA_KEY = "_batch_store_metadata" + + +def _check_serialized_size(buffer: bytes) -> None: + if len(buffer) > blosc2.MAX_BUFFERSIZE: + raise ValueError(f"Serialized objects cannot be larger than {blosc2.MAX_BUFFERSIZE} bytes") + + +class Batch(Sequence[Any]): + """A lazy sequence representing one batch in a :class:`BatchStore`. + + ``Batch`` provides sequence-style access to the items stored in a single + batch. Integer indexing can use block-local reads when possible, while + slicing materializes the full batch into Python items. + + Batch instances are normally obtained via :class:`BatchStore` indexing or + iteration rather than constructed directly. + """ + + def __init__(self, parent: BatchStore, nbatch: int, lazybatch: bytes) -> None: + self._parent = parent + self._nbatch = nbatch + self._lazybatch = lazybatch + self._items: list[Any] | None = None + self._cached_block_index: int | None = None + self._cached_block: list[Any] | None = None + self._nbytes, self._cbytes, self._nblocks = blosc2.get_cbuffer_sizes(lazybatch) + + def _normalize_index(self, index: int) -> int: + if not isinstance(index, int): + raise TypeError("Batch indices must be integers") + if index < 0: + index += len(self) + if index < 0 or index >= len(self): + raise IndexError("Batch index out of range") + return index + + def _decode_items(self) -> list[Any]: + if self._items is None: + blocks = self._parent._decode_blocks(self._nbatch) + self._items = [item for block in blocks for item in block] + return self._items + + def _get_block(self, block_index: int) -> list[Any]: + if self._cached_block_index == block_index and self._cached_block is not None: + return self._cached_block + block = self._parent._deserialize_block(self._parent.schunk.get_vlblock(self._nbatch, block_index)) + self._cached_block_index = block_index + self._cached_block = block + return block + + def __getitem__(self, index: int | slice) -> Any | list[Any]: + if isinstance(index, slice): + items = self._decode_items() + return items[index] + if index < 0: + items = self._decode_items() + index = self._normalize_index(index) + return items[index] + max_blocksize = self._parent.max_blocksize + if max_blocksize is not None: + block_index, item_index = divmod(index, max_blocksize) + if block_index >= self._nblocks: + raise IndexError("Batch index out of range") + block = self._get_block(block_index) + try: + return block[item_index] + except IndexError as exc: + raise IndexError("Batch index out of range") from exc + items = self._decode_items() + index = self._normalize_index(index) + return items[index] + + def __len__(self) -> int: + batch_length = self._parent._batch_length(self._nbatch) + if batch_length is not None: + return batch_length + return len(self._decode_items()) + + def __iter__(self) -> Iterator[Any]: + for i in range(len(self)): + yield self[i] + + @property + def lazybatch(self) -> bytes: + return self._lazybatch + + @property + def nbytes(self) -> int: + return self._nbytes + + @property + def cbytes(self) -> int: + return self._cbytes + + @property + def cratio(self) -> float: + return self._nbytes / self._cbytes + + def __repr__(self) -> str: + return f"Batch(len={len(self)}, nbytes={self.nbytes}, cbytes={self.cbytes})" + + +class BatchStoreItems(Sequence[Any]): + """A read-only flat view over the items stored in a :class:`BatchStore`.""" + + def __init__(self, parent: BatchStore) -> None: + self._parent = parent + + def __getitem__(self, index: int | slice) -> Any | list[Any]: + return self._parent._get_flat_item(index) + + def __len__(self) -> int: + return self._parent._get_total_item_count() + + +class BatchStore: + """A batched container for variable-length Python items. + + BatchStore stores data as a sequence of *batches*, where each batch contains + one or more Python items. Each batch is stored in one compressed chunk, and + each chunk is internally split into one or more variable-length blocks for + efficient item access. + + The main abstraction is batch-oriented: + + - indexing the store returns batches + - iterating the store yields batches + - :meth:`iter_items` provides flat item-wise traversal + + BatchStore is a good fit when: + + - data arrives naturally in batches + - batch-level append/update operations are important + - occasional item-level reads are needed inside a batch + + Parameters + ---------- + max_blocksize : int, optional + Maximum number of items stored in each internal variable-length block. + If not provided, a value is inferred from the first batch. + serializer : {"msgpack", "arrow"}, optional + Serializer used for batch payloads. ``"msgpack"`` is the default and is + the general-purpose choice for Python items. ``"arrow"`` is optional and + requires ``pyarrow``. + _from_schunk : blosc2.SChunk, optional + Internal hook used when reopening an already-tagged BatchStore. + **kwargs + Storage, compression, and decompression arguments accepted by the + constructor. + """ + + @staticmethod + def _set_typesize_one(cparams: blosc2.CParams | dict | None) -> blosc2.CParams | dict: + if cparams is None: + cparams = blosc2.CParams() + elif isinstance(cparams, blosc2.CParams): + cparams = copy.deepcopy(cparams) + else: + cparams = dict(cparams) + + if isinstance(cparams, blosc2.CParams): + cparams.typesize = 1 + else: + cparams["typesize"] = 1 + return cparams + + @staticmethod + def _coerce_storage(storage: blosc2.Storage | dict | None, kwargs: dict[str, Any]) -> blosc2.Storage: + if storage is not None: + storage_keys = set(blosc2.Storage.__annotations__) + storage_kwargs = storage_keys.intersection(kwargs) + if storage_kwargs: + unexpected = ", ".join(sorted(storage_kwargs)) + raise AttributeError( + f"Cannot pass both `storage` and other kwargs already included in Storage: {unexpected}" + ) + if isinstance(storage, blosc2.Storage): + return copy.deepcopy(storage) + return blosc2.Storage(**storage) + + storage_kwargs = { + name: kwargs.pop(name) for name in list(blosc2.Storage.__annotations__) if name in kwargs + } + return blosc2.Storage(**storage_kwargs) + + @staticmethod + def _validate_storage(storage: blosc2.Storage) -> None: + if storage.mmap_mode not in (None, "r"): + raise ValueError("For BatchStore containers, mmap_mode must be None or 'r'") + if storage.mmap_mode == "r" and storage.mode != "r": + raise ValueError("For BatchStore containers, mmap_mode='r' requires mode='r'") + + def _attach_schunk(self, schunk: blosc2.SChunk) -> None: + self.schunk = schunk + self.mode = schunk.mode + self.mmap_mode = getattr(schunk, "mmap_mode", None) + try: + batchstore_meta = self.schunk.meta["batchstore"] + except KeyError: + batchstore_meta = {} + self._serializer = batchstore_meta.get("serializer", self._serializer) + self._max_blocksize = batchstore_meta.get("max_blocksize", self._max_blocksize) + self._arrow_schema = batchstore_meta.get("arrow_schema", self._arrow_schema) + self._arrow_schema_obj = None + self._batch_lengths = self._load_batch_lengths() + self._items = BatchStoreItems(self) + self._item_prefix_sums: np.ndarray | None = None + self._validate_tag() + + def _maybe_open_existing(self, storage: blosc2.Storage) -> bool: + urlpath = storage.urlpath + if urlpath is None or storage.mode not in ("r", "a") or not pathlib.Path(urlpath).exists(): + return False + + schunk = blosc2.blosc2_ext.open(urlpath, mode=storage.mode, offset=0, mmap_mode=storage.mmap_mode) + self._attach_schunk(schunk) + return True + + def _make_storage(self) -> blosc2.Storage: + meta = {name: self.meta[name] for name in self.meta} + return blosc2.Storage( + contiguous=self.schunk.contiguous, + urlpath=self.urlpath, + mode=self.mode, + mmap_mode=self.mmap_mode, + meta=meta, + ) + + def __init__( + self, + max_blocksize: int | None = None, + serializer: str = "msgpack", + _from_schunk: blosc2.SChunk | None = None, + **kwargs: Any, + ) -> None: + """Create a new BatchStore or reopen an existing one. + + When a persistent ``urlpath`` points to an existing BatchStore and the + mode is ``"r"`` or ``"a"``, the container is reopened automatically. + Otherwise a new empty store is created. + """ + if max_blocksize is not None and max_blocksize <= 0: + raise ValueError("max_blocksize must be a positive integer") + if serializer not in _SUPPORTED_SERIALIZERS: + raise ValueError(f"Unsupported BatchStore serializer: {serializer!r}") + self._max_blocksize: int | None = max_blocksize + self._serializer = serializer + self._arrow_schema: bytes | None = None + self._arrow_schema_obj = None + self._batch_lengths: list[int] | None = None + if _from_schunk is not None: + if kwargs: + unexpected = ", ".join(sorted(kwargs)) + raise ValueError(f"Cannot pass {unexpected} together with `_from_schunk`") + self._attach_schunk(_from_schunk) + return + cparams = kwargs.pop("cparams", None) + dparams = kwargs.pop("dparams", None) + storage = kwargs.pop("storage", None) + storage = self._coerce_storage(storage, kwargs) + + if kwargs: + unexpected = ", ".join(sorted(kwargs)) + raise ValueError(f"Unsupported BatchStore keyword argument(s): {unexpected}") + + self._validate_storage(storage) + cparams = self._set_typesize_one(cparams) + + if dparams is None: + dparams = blosc2.DParams() + + if self._maybe_open_existing(storage): + return + + fixed_meta = dict(storage.meta or {}) + fixed_meta["batchstore"] = { + **_BATCHSTORE_META, + "serializer": self._serializer, + "max_blocksize": self._max_blocksize, + "arrow_schema": self._arrow_schema, + } + storage.meta = fixed_meta + schunk = blosc2.SChunk(chunksize=-1, data=None, cparams=cparams, dparams=dparams, storage=storage) + self._attach_schunk(schunk) + + def _validate_tag(self) -> None: + if "batchstore" not in self.schunk.meta: + raise ValueError("The supplied SChunk is not tagged as a BatchStore") + if self._serializer not in _SUPPORTED_SERIALIZERS: + raise ValueError(f"Unsupported BatchStore serializer in metadata: {self._serializer!r}") + if self._serializer == "arrow": + self._require_pyarrow() + + @staticmethod + @lru_cache(maxsize=1) + def _require_pyarrow(): + try: + import pyarrow as pa + import pyarrow.ipc as pa_ipc + except ImportError as exc: + raise ImportError("BatchStore serializer='arrow' requires pyarrow") from exc + return pa, pa_ipc + + def _check_writable(self) -> None: + if self.mode == "r": + raise ValueError("Cannot modify a BatchStore opened in read-only mode") + + def _normalize_index(self, index: int) -> int: + if not isinstance(index, int): + raise TypeError("BatchStore indices must be integers") + if index < 0: + index += len(self) + if index < 0 or index >= len(self): + raise IndexError("BatchStore index out of range") + return index + + def _normalize_insert_index(self, index: int) -> int: + if not isinstance(index, int): + raise TypeError("BatchStore indices must be integers") + if index < 0: + index += len(self) + if index < 0: + return 0 + if index > len(self): + return len(self) + return index + + def _slice_indices(self, index: slice) -> list[int]: + return list(range(*index.indices(len(self)))) + + def _copy_meta(self) -> dict[str, Any]: + return {name: self.meta[name] for name in self.meta} + + def _load_batch_lengths(self) -> list[int] | None: + try: + metadata = self.schunk.vlmeta[_BATCHSTORE_VLMETA_KEY] + except KeyError: + return None + batch_lengths = metadata.get("batch_lengths") + if not isinstance(batch_lengths, list): + return None + return [int(length) for length in batch_lengths] + + def _persist_batch_lengths(self) -> None: + if self._batch_lengths is None: + return + if len(self._batch_lengths) == 0: + if _BATCHSTORE_VLMETA_KEY in self.vlmeta: + del self.vlmeta[_BATCHSTORE_VLMETA_KEY] + return + self.schunk.vlmeta[_BATCHSTORE_VLMETA_KEY] = {"batch_lengths": list(self._batch_lengths)} + + def _get_batch_lengths(self) -> list[int] | None: + return self._batch_lengths + + def _ensure_batch_lengths(self) -> list[int]: + if self._batch_lengths is None: + self._batch_lengths = [] + return self._batch_lengths + + def _load_or_compute_batch_lengths(self) -> list[int]: + if self._batch_lengths is None: + self._batch_lengths = [len(self._get_batch(i)) for i in range(len(self))] + if self.mode != "r": + self._persist_batch_lengths() + return self._batch_lengths + + def _batch_length(self, index: int) -> int | None: + if self._batch_lengths is None: + return None + return self._batch_lengths[index] + + def _invalidate_item_cache(self) -> None: + self._item_prefix_sums = None + + def _get_item_prefix_sums(self) -> np.ndarray: + if self._item_prefix_sums is None: + batch_lengths = np.asarray(self._load_or_compute_batch_lengths(), dtype=np.int64) + prefix_sums = np.empty(len(batch_lengths) + 1, dtype=np.int64) + prefix_sums[0] = 0 + prefix_sums[1:] = np.cumsum(batch_lengths, dtype=np.int64) + self._item_prefix_sums = prefix_sums + return self._item_prefix_sums + + def _get_total_item_count(self) -> int: + return int(self._get_item_prefix_sums()[-1]) + + def _get_flat_item(self, index: int | slice) -> Any | list[Any]: + if isinstance(index, slice): + return [self._get_flat_item(i) for i in range(*index.indices(self._get_total_item_count()))] + if not isinstance(index, int): + raise TypeError("BatchStore item indices must be integers") + nitems = self._get_total_item_count() + if index < 0: + index += nitems + if index < 0 or index >= nitems: + raise IndexError("BatchStore item index out of range") + + prefix_sums = self._get_item_prefix_sums() + batch_index = int(np.searchsorted(prefix_sums, index, side="right") - 1) + item_index = int(index - prefix_sums[batch_index]) + return self[batch_index][item_index] + + def _block_sizes_from_batch_length(self, batch_length: int, nblocks: int) -> list[int]: + if self._max_blocksize is None or nblocks <= 0: + return [] + full_blocks, remainder = divmod(batch_length, self._max_blocksize) + block_sizes = [self._max_blocksize] * full_blocks + if remainder: + block_sizes.append(remainder) + if not block_sizes and batch_length > 0: + block_sizes.append(batch_length) + if len(block_sizes) != nblocks: + return [] + return block_sizes + + def _get_block_sizes(self, batch_sizes: list[int]) -> list[int] | None: + if self._max_blocksize is None: + return None + block_sizes: list[int] = [] + for index, batch_length in enumerate(batch_sizes): + lazychunk = self.schunk.get_lazychunk(index) + _, _, nblocks = blosc2.get_cbuffer_sizes(lazychunk) + sizes = self._block_sizes_from_batch_length(batch_length, nblocks) + if not sizes: + return None + block_sizes.extend(sizes) + return block_sizes + + def _total_nblocks(self) -> int: + total = 0 + for index in range(len(self)): + lazychunk = self.schunk.get_lazychunk(index) + _, _, nblocks = blosc2.get_cbuffer_sizes(lazychunk) + total += nblocks + return total + + def _user_vlmeta_items(self) -> dict[str, Any]: + return {key: value for key, value in self.vlmeta.getall().items() if key != _BATCHSTORE_VLMETA_KEY} + + def _normalize_msgpack_batch(self, value: object) -> list[Any]: + if isinstance(value, (str, bytes, bytearray, memoryview)): + raise TypeError("BatchStore entries must be sequences of Python objects") + if not isinstance(value, Sequence): + raise TypeError("BatchStore entries must be sequences of Python objects") + values = list(value) + if len(values) == 0: + raise ValueError("BatchStore entries cannot be empty") + return values + + def _normalize_arrow_batch(self, value: object): + pa, _ = self._require_pyarrow() + if isinstance(value, pa.ChunkedArray): + value = value.combine_chunks() + elif isinstance(value, pa.RecordBatch): + if value.num_columns != 1: + raise TypeError("Arrow RecordBatch inputs for BatchStore must have exactly one column") + value = value.column(0) + elif not isinstance(value, pa.Array): + if isinstance(value, (str, bytes, bytearray, memoryview)): + raise TypeError("BatchStore entries must be Arrow arrays or sequences of Python objects") + if not isinstance(value, Sequence): + raise TypeError("BatchStore entries must be Arrow arrays or sequences of Python objects") + value = pa.array(list(value)) + if len(value) == 0: + raise ValueError("BatchStore entries cannot be empty") + self._ensure_arrow_schema(value) + return value + + def _ensure_arrow_schema(self, batch) -> None: + if self._serializer != "arrow": + return + pa, _ = self._require_pyarrow() + schema = pa.schema([pa.field("values", batch.type)]) + if self._arrow_schema is None: + self._arrow_schema = schema.serialize().to_pybytes() + self._arrow_schema_obj = schema + return + existing_schema = self._get_arrow_schema() + if not existing_schema.equals(schema): + raise TypeError("All Arrow batches in a BatchStore must share the same schema") + + def _get_arrow_schema(self): + if self._serializer != "arrow": + return None + if self._arrow_schema is None: + raise RuntimeError("Arrow schema is not initialized") + if self._arrow_schema_obj is None: + pa, pa_ipc = self._require_pyarrow() + self._arrow_schema_obj = pa_ipc.read_schema(pa.BufferReader(self._arrow_schema)) + return self._arrow_schema_obj + + def _normalize_batch(self, value: object) -> Any: + if self._serializer == "arrow": + return self._normalize_arrow_batch(value) + return self._normalize_msgpack_batch(value) + + def _batch_len(self, batch: Any) -> int: + return len(batch) + + def _payload_sizes_for_batch(self, batch: Any) -> list[int]: + if self._serializer == "arrow": + total_size = batch.get_total_buffer_size() + avg_size = max(1, total_size // max(1, len(batch))) + return [avg_size] * len(batch) + return [len(msgpack_packb(item)) for item in batch] + + def _ensure_layout_for_batch(self, batch: Any) -> None: + layout_changed = False + if self._max_blocksize is None: + payload_sizes = self._payload_sizes_for_batch(batch) + self._max_blocksize = self._guess_blocksize(payload_sizes) + layout_changed = True + if self._serializer == "arrow" and self._arrow_schema is not None: + layout_changed = layout_changed or len(self) == 0 + if layout_changed: + self._persist_layout_metadata() + + def _persist_layout_metadata(self) -> None: + if len(self) > 0: + return + batch_lengths = None if self._batch_lengths is None else list(self._batch_lengths) + user_vlmeta = self._user_vlmeta_items() if len(self.vlmeta) > 0 else {} + storage = self._make_storage() + fixed_meta = dict(storage.meta or {}) + fixed_meta["batchstore"] = { + **dict(fixed_meta.get("batchstore", {})), + "max_blocksize": self._max_blocksize, + "serializer": self._serializer, + "arrow_schema": self._arrow_schema, + } + storage.meta = fixed_meta + schunk = blosc2.SChunk( + chunksize=-1, + data=None, + cparams=copy.deepcopy(self.cparams), + dparams=copy.deepcopy(self.dparams), + storage=storage, + ) + self._attach_schunk(schunk) + for key, value in user_vlmeta.items(): + self.vlmeta[key] = value + if batch_lengths is not None and self._batch_lengths is None: + self._batch_lengths = batch_lengths + + def _guess_blocksize(self, payload_sizes: list[int]) -> int: + if not payload_sizes: + raise ValueError("BatchStore entries cannot be empty") + clevel = self.cparams.clevel + if clevel == 9: + return len(payload_sizes) + if 0 < clevel < 5: + budget = blosc2.cpu_info.get("l1_data_cache_size") + elif 5 <= clevel < 9: + budget = blosc2.cpu_info.get("l2_cache_size") + else: + return len(payload_sizes) + if not isinstance(budget, int) or budget <= 0: + return len(payload_sizes) + total = 0 + count = 0 + for payload_size in payload_sizes: + if count > 0 and total + payload_size > budget: + break + total += payload_size + count += 1 + if count == 0: + count = 1 + return min(count, len(payload_sizes)) + + def _serialize_batch(self, value: object) -> Any: + batch = self._normalize_batch(value) + self._ensure_layout_for_batch(batch) + return batch + + def _serialize_msgpack_block(self, items: list[Any]) -> bytes: + payload = msgpack_packb(items) + _check_serialized_size(payload) + return payload + + def _serialize_arrow_block(self, items) -> bytes: + pa, _ = self._require_pyarrow() + batch = pa.record_batch([items], schema=self._get_arrow_schema()) + payload = batch.serialize().to_pybytes() + _check_serialized_size(payload) + return payload + + def _serialize_block(self, items: Any) -> bytes: + if self._serializer == "arrow": + return self._serialize_arrow_block(items) + return self._serialize_msgpack_block(items) + + def _deserialize_msgpack_block(self, payload: bytes) -> list[Any]: + return msgpack_unpackb(payload) + + def _deserialize_arrow_block(self, payload: bytes) -> list[Any]: + pa, pa_ipc = self._require_pyarrow() + batch = pa_ipc.read_record_batch(pa.BufferReader(payload), self._get_arrow_schema()) + return batch.column(0).to_pylist() + + def _deserialize_block(self, payload: bytes) -> list[Any]: + if self._serializer == "arrow": + return self._deserialize_arrow_block(payload) + return self._deserialize_msgpack_block(payload) + + def _vl_cparams_kwargs(self) -> dict[str, Any]: + return asdict(self.schunk.cparams) + + def _vl_dparams_kwargs(self) -> dict[str, Any]: + return asdict(self.schunk.dparams) + + def _compress_batch(self, batch: Any) -> bytes: + if self._max_blocksize is None: + raise RuntimeError("BatchStore max_blocksize is not initialized") + blocks = [ + self._serialize_block(batch[i : i + self._max_blocksize]) + for i in range(0, self._batch_len(batch), self._max_blocksize) + ] + return blosc2.blosc2_ext.vlcompress(blocks, **self._vl_cparams_kwargs()) + + def _decode_blocks(self, nbatch: int) -> list[list[Any]]: + block_payloads = blosc2.blosc2_ext.vldecompress( + self.schunk.get_chunk(nbatch), **self._vl_dparams_kwargs() + ) + return [self._deserialize_block(payload) for payload in block_payloads] + + def _get_batch(self, index: int) -> Batch: + return Batch(self, index, self.schunk.get_lazychunk(index)) + + def append(self, value: object) -> int: + """Append one batch and return the new number of batches.""" + self._check_writable() + batch = self._serialize_batch(value) + batch_payload = self._compress_batch(batch) + length = self._batch_len(batch) + new_len = self.schunk.append_chunk(batch_payload) + self._ensure_batch_lengths().append(length) + self._persist_batch_lengths() + self._invalidate_item_cache() + return new_len + + def insert(self, index: int, value: object) -> int: + """Insert one batch at ``index`` and return the new number of batches.""" + self._check_writable() + index = self._normalize_insert_index(index) + batch = self._serialize_batch(value) + batch_payload = self._compress_batch(batch) + length = self._batch_len(batch) + new_len = self.schunk.insert_chunk(index, batch_payload) + self._ensure_batch_lengths().insert(index, length) + self._persist_batch_lengths() + self._invalidate_item_cache() + return new_len + + def delete(self, index: int | slice) -> int: + """Delete the batch at ``index`` and return the new number of batches.""" + self._check_writable() + if isinstance(index, slice): + for idx in reversed(self._slice_indices(index)): + self.schunk.delete_chunk(idx) + if self._batch_lengths is not None: + del self._batch_lengths[idx] + self._persist_batch_lengths() + self._invalidate_item_cache() + return len(self) + index = self._normalize_index(index) + new_len = self.schunk.delete_chunk(index) + if self._batch_lengths is not None: + del self._batch_lengths[index] + self._persist_batch_lengths() + self._invalidate_item_cache() + return new_len + + def pop(self, index: int = -1) -> list[Any]: + """Remove and return the batch at ``index`` as a Python list.""" + self._check_writable() + if isinstance(index, slice): + raise NotImplementedError("Slicing is not supported for BatchStore") + index = self._normalize_index(index) + value = self[index][:] + self.delete(index) + return value + + def extend(self, values: object) -> None: + """Append all batches from an iterable of batches.""" + self._check_writable() + for value in values: + batch = self._serialize_batch(value) + batch_payload = self._compress_batch(batch) + self.schunk.append_chunk(batch_payload) + self._ensure_batch_lengths().append(self._batch_len(batch)) + self._persist_batch_lengths() + self._invalidate_item_cache() + + def clear(self) -> None: + """Remove all entries from the container.""" + self._check_writable() + storage = self._make_storage() + if storage.urlpath is not None: + blosc2.remove_urlpath(storage.urlpath) + schunk = blosc2.SChunk( + chunksize=-1, + data=None, + cparams=copy.deepcopy(self.cparams), + dparams=copy.deepcopy(self.dparams), + storage=storage, + ) + self._attach_schunk(schunk) + self._batch_lengths = [] + self._persist_batch_lengths() + self._invalidate_item_cache() + + def __getitem__(self, index: int | slice) -> Batch | list[Batch]: + """Return one batch or a list of batches.""" + if isinstance(index, slice): + return [self[i] for i in self._slice_indices(index)] + index = self._normalize_index(index) + return self._get_batch(index) + + def __setitem__(self, index: int | slice, value: object) -> None: + if isinstance(index, slice): + self._check_writable() + indices = self._slice_indices(index) + values = list(value) + step = 1 if index.step is None else index.step + if step == 1: + start = self._normalize_insert_index(0 if index.start is None else index.start) + for idx in reversed(indices): + self.schunk.delete_chunk(idx) + if self._batch_lengths is not None: + del self._batch_lengths[idx] + for offset, item in enumerate(values): + batch = self._serialize_batch(item) + batch_payload = self._compress_batch(batch) + self.schunk.insert_chunk(start + offset, batch_payload) + self._ensure_batch_lengths().insert(start + offset, self._batch_len(batch)) + self._persist_batch_lengths() + self._invalidate_item_cache() + return + if len(values) != len(indices): + raise ValueError( + f"attempt to assign sequence of size {len(values)} to extended slice of size {len(indices)}" + ) + for idx, item in zip(indices, values, strict=True): + batch = self._serialize_batch(item) + batch_payload = self._compress_batch(batch) + self.schunk.update_chunk(idx, batch_payload) + if self._batch_lengths is not None: + self._batch_lengths[idx] = self._batch_len(batch) + self._persist_batch_lengths() + self._invalidate_item_cache() + return + self._check_writable() + index = self._normalize_index(index) + batch = self._serialize_batch(value) + batch_payload = self._compress_batch(batch) + self.schunk.update_chunk(index, batch_payload) + if self._batch_lengths is not None: + self._batch_lengths[index] = self._batch_len(batch) + self._persist_batch_lengths() + self._invalidate_item_cache() + + def __delitem__(self, index: int | slice) -> None: + self.delete(index) + + def __len__(self) -> int: + """Return the number of batches stored in the container.""" + return self.schunk.nchunks + + def iter_items(self) -> Iterator[Any]: + """Iterate over all items across all batches in order.""" + for batch in self: + yield from batch + + def __iter__(self) -> Iterator[Batch]: + for i in range(len(self)): + yield self[i] + + @property + def meta(self): + return self.schunk.meta + + @property + def vlmeta(self): + return self.schunk.vlmeta + + @property + def cparams(self): + return self.schunk.cparams + + @property + def dparams(self): + return self.schunk.dparams + + @property + def max_blocksize(self) -> int | None: + return self._max_blocksize + + @property + def items(self) -> BatchStoreItems: + return self._items + + @property + def typesize(self) -> int: + return self.schunk.typesize + + @property + def nbytes(self) -> int: + return self.schunk.nbytes + + @property + def cbytes(self) -> int: + return self.schunk.cbytes + + @property + def cratio(self) -> float: + return self.schunk.cratio + + @property + def urlpath(self) -> str | None: + return self.schunk.urlpath + + @property + def contiguous(self) -> bool: + return self.schunk.contiguous + + @property + def info(self) -> InfoReporter: + """Return an info reporter with a compact summary of the store.""" + return InfoReporter(self) + + @property + def info_items(self) -> list: + """Return summary information as ``(name, value)`` pairs.""" + batch_sizes = self._get_batch_lengths() + if batch_sizes is None: + batch_sizes = [len(batch) for batch in self] + block_sizes = self._get_block_sizes(batch_sizes) + if batch_sizes: + batch_stats = ( + f"mean={statistics.fmean(batch_sizes):.2f}, max={max(batch_sizes)}, min={min(batch_sizes)}" + ) + nbatches_value = f"{len(self)} (items per batch: {batch_stats})" + else: + nbatches_value = f"{len(self)} (items per batch: n/a)" + if block_sizes: + block_stats = ( + f"mean={statistics.fmean(block_sizes):.2f}, max={max(block_sizes)}, min={min(block_sizes)}" + ) + nblocks_value = f"{self._total_nblocks()} (items per block: {block_stats})" + else: + nblocks_value = f"{self._total_nblocks()} (items per block: n/a)" + return [ + ("type", f"{self.__class__.__name__}"), + ("serializer", self.serializer), + ("nbatches", nbatches_value), + ("nblocks", nblocks_value), + ("nitems", sum(batch_sizes)), + ("nbytes", format_nbytes_info(self.nbytes)), + ("cbytes", format_nbytes_info(self.cbytes)), + ("cratio", f"{self.cratio:.2f}"), + ("cparams", self.cparams), + ("dparams", self.dparams), + ] + + def to_cframe(self) -> bytes: + """Serialize the full store to a Blosc2 cframe buffer.""" + return self.schunk.to_cframe() + + def copy(self, **kwargs: Any) -> BatchStore: + """Create a copy of the store with optional constructor overrides.""" + if "meta" in kwargs: + raise ValueError("meta should not be passed to copy") + kwargs["cparams"] = kwargs.get("cparams", copy.deepcopy(self.cparams)) + kwargs["dparams"] = kwargs.get("dparams", copy.deepcopy(self.dparams)) + kwargs["max_blocksize"] = kwargs.get("max_blocksize", self.max_blocksize) + kwargs["serializer"] = kwargs.get("serializer", self.serializer) + user_vlmeta = self._user_vlmeta_items() if len(self.vlmeta) > 0 else {} + + if "storage" in kwargs: + storage = self._coerce_storage(kwargs["storage"], {}) + fixed_meta = self._copy_meta() + if storage.meta is not None: + fixed_meta.update(storage.meta) + storage.meta = fixed_meta + kwargs["storage"] = storage + else: + kwargs["meta"] = self._copy_meta() + kwargs["contiguous"] = kwargs.get("contiguous", self.schunk.contiguous) + if "urlpath" in kwargs and "mode" not in kwargs: + kwargs["mode"] = "w" + + out = BatchStore(**kwargs) + for key, value in user_vlmeta.items(): + out.vlmeta[key] = value + out.extend(self) + return out + + def __enter__(self) -> BatchStore: + return self + + def __exit__(self, exc_type, exc_val, exc_tb) -> bool: + return False + + def __repr__(self) -> str: + return f"BatchStore(len={len(self)}, urlpath={self.urlpath!r})" + + @property + def serializer(self) -> str: + """Serializer name used for batch payloads.""" + return self._serializer diff --git a/src/blosc2/blosc2_ext.pyx b/src/blosc2/blosc2_ext.pyx index 604be0bd..7b6791fa 100644 --- a/src/blosc2/blosc2_ext.pyx +++ b/src/blosc2/blosc2_ext.pyx @@ -8,6 +8,7 @@ #cython: language_level=3 import os +import dataclasses import ast import atexit import pathlib @@ -279,9 +280,17 @@ cdef extern from "blosc2.h": blosc2_context * context, const void * src, int32_t srcsize, void * dest, int32_t destsize) nogil + int blosc2_vlcompress_ctx( + blosc2_context * context, const void * const * srcs, const int32_t * srcsizes, + int32_t nblocks, void * dest, int32_t destsize) nogil + int blosc2_decompress_ctx(blosc2_context * context, const void * src, int32_t srcsize, void * dest, int32_t destsize) nogil + int blosc2_vldecompress_ctx(blosc2_context* context, const void* src, + int32_t srcsize, void** dests, + int32_t* destsizes, int32_t maxblocks) + int blosc2_getitem_ctx(blosc2_context* context, const void* src, int32_t srcsize, int start, int nitems, void* dest, int32_t destsize) nogil @@ -381,6 +390,8 @@ cdef extern from "blosc2.h": c_bool *needs_free) nogil int blosc2_schunk_get_lazychunk(blosc2_schunk *schunk, int64_t nchunk, uint8_t ** chunk, c_bool *needs_free) nogil + int blosc2_schunk_get_vlblock(blosc2_schunk *schunk, int64_t nchunk, int32_t nblock, + uint8_t **dest, int32_t *destsize) int blosc2_schunk_get_slice_buffer(blosc2_schunk *schunk, int64_t start, int64_t stop, void *buffer) int blosc2_schunk_set_slice_buffer(blosc2_schunk *schunk, int64_t start, int64_t stop, void *buffer) int blosc2_schunk_get_cparams(blosc2_schunk *schunk, blosc2_cparams** cparams) @@ -407,6 +418,9 @@ cdef extern from "blosc2.h": uint8_t **content, int32_t *content_len) int blosc2_vlmeta_delete(blosc2_schunk *schunk, const char *name) int blosc2_vlmeta_get_names(blosc2_schunk *schunk, char **names) + int blosc2_vldecompress_block_ctx(blosc2_context* context, const void* src, + int32_t srcsize, int32_t nblock, uint8_t** dest, + int32_t* destsize) int blosc1_get_blocksize() @@ -1095,6 +1109,7 @@ def compress2(src, **kwargs): return dest[:size] cdef create_dparams_from_kwargs(blosc2_dparams *dparams, kwargs, blosc2_cparams* cparams=NULL): + memcpy(dparams, &BLOSC2_DPARAMS_DEFAULTS, sizeof(BLOSC2_DPARAMS_DEFAULTS)) dparams.nthreads = kwargs.get('nthreads', blosc2.nthreads) dparams.schunk = NULL dparams.postfilter = NULL @@ -1154,6 +1169,158 @@ def decompress2(src, dst=None, **kwargs): raise ValueError("Error while decompressing, check the src data and/or the dparams") +def vlcompress(srcs, **kwargs): + cdef blosc2_cparams cparams + create_cparams_from_kwargs(&cparams, kwargs) + + cdef Py_ssize_t nblocks = len(srcs) + if nblocks <= 0: + raise ValueError("At least one block is required") + + cdef blosc2_context *cctx = NULL + cdef Py_buffer *buffers = calloc(nblocks, sizeof(Py_buffer)) + cdef const void **src_ptrs = malloc(nblocks * sizeof(void *)) + cdef int32_t *srcsizes = malloc(nblocks * sizeof(int32_t)) + cdef Py_ssize_t acquired = 0 + cdef Py_ssize_t i + cdef int64_t total_nbytes = 0 + cdef int32_t len_dest + cdef int size + cdef Py_ssize_t release_i + cdef void *_dest + if buffers == NULL or src_ptrs == NULL or srcsizes == NULL: + free(buffers) + free(src_ptrs) + free(srcsizes) + raise MemoryError() + + try: + for i in range(nblocks): + PyObject_GetBuffer(srcs[i], &buffers[i], PyBUF_SIMPLE) + acquired += 1 + if buffers[i].len <= 0: + raise ValueError("Each VL block must have at least one byte") + src_ptrs[i] = buffers[i].buf + srcsizes[i] = buffers[i].len + total_nbytes += buffers[i].len + + # VL blocks can carry enough per-block framing that the simple + # total_nbytes + global_overhead estimate is too small for many tiny + # buffers. Budget one max-overhead chunk per block as a conservative + # upper bound for the temporary destination. + len_dest = (total_nbytes + BLOSC2_MAX_OVERHEAD * (nblocks + 1) + 64) + dest = PyBytes_FromStringAndSize(NULL, len_dest) + if dest is None: + raise MemoryError() + _dest = dest + cctx = blosc2_create_cctx(cparams) + if cctx == NULL: + raise RuntimeError("Could not create the compression context") + if RELEASEGIL: + with nogil: + size = blosc2_vlcompress_ctx(cctx, src_ptrs, srcsizes, nblocks, _dest, len_dest) + else: + size = blosc2_vlcompress_ctx(cctx, src_ptrs, srcsizes, nblocks, _dest, len_dest) + finally: + if cctx != NULL: + blosc2_free_ctx(cctx) + for release_i in range(acquired): + PyBuffer_Release(&buffers[release_i]) + free(buffers) + free(src_ptrs) + free(srcsizes) + + if size < 0: + raise RuntimeError("Could not compress the data") + elif size == 0: + del dest + raise RuntimeError("The result could not fit ") + return dest[:size] + + +def vldecompress(src, **kwargs): + cdef blosc2_dparams dparams + create_dparams_from_kwargs(&dparams, kwargs) + + cdef blosc2_context *dctx = blosc2_create_dctx(dparams) + if dctx == NULL: + raise RuntimeError("Could not create decompression context") + + cdef const uint8_t[:] typed_view_src + mem_view_src = memoryview(src) + typed_view_src = mem_view_src.cast('B') + _check_comp_length('src', typed_view_src.nbytes) + cdef int32_t nbytes + cdef int32_t cbytes + cdef int32_t nblocks + blosc2_cbuffer_sizes(&typed_view_src[0], &nbytes, &cbytes, &nblocks) + if nblocks <= 0: + blosc2_free_ctx(dctx) + raise ValueError("Chunk does not contain VL blocks") + + cdef void **dests = calloc(nblocks, sizeof(void *)) + cdef int32_t *destsizes = malloc(nblocks * sizeof(int32_t)) + cdef int32_t rc + cdef int32_t i + cdef list out = [] + if dests == NULL or destsizes == NULL: + blosc2_free_ctx(dctx) + free(dests) + free(destsizes) + raise MemoryError() + + try: + rc = blosc2_vldecompress_ctx(dctx, &typed_view_src[0], cbytes, dests, destsizes, nblocks) + if rc < 0: + raise RuntimeError("Could not decompress the data") + for i in range(rc): + out.append(PyBytes_FromStringAndSize(dests[i], destsizes[i])) + free(dests[i]) + dests[i] = NULL + return out + finally: + for i in range(nblocks): + if dests[i] != NULL: + free(dests[i]) + free(dests) + free(destsizes) + blosc2_free_ctx(dctx) + + +def vldecompress_block(src, int32_t nblock, **kwargs): + cdef blosc2_dparams dparams + create_dparams_from_kwargs(&dparams, kwargs) + + cdef blosc2_context *dctx = blosc2_create_dctx(dparams) + if dctx == NULL: + raise RuntimeError("Could not create decompression context") + + cdef const uint8_t[:] typed_view_src + mem_view_src = memoryview(src) + typed_view_src = mem_view_src.cast('B') + _check_comp_length('src', typed_view_src.nbytes) + + cdef uint8_t *dest = NULL + cdef int32_t destsize = 0 + cdef int32_t rc + try: + rc = blosc2_vldecompress_block_ctx( + dctx, + &typed_view_src[0], + typed_view_src.nbytes, + nblock, + &dest, + &destsize, + ) + if rc < 0: + raise RuntimeError("Could not decompress the block") + return PyBytes_FromStringAndSize(dest, destsize) + finally: + if dest != NULL: + free(dest) + blosc2_free_ctx(dctx) + + cdef create_storage(blosc2_storage *storage, kwargs): contiguous = kwargs.get('contiguous', blosc2.storage_dflts['contiguous']) storage.contiguous = contiguous @@ -1559,6 +1726,16 @@ cdef class SChunk: free(chunk) return ret_chunk + def get_vlblock(self, nchunk, nblock): + cdef uint8_t *block + cdef int32_t destsize + cbytes = blosc2_schunk_get_vlblock(self.schunk, nchunk, nblock, &block, &destsize) + if cbytes < 0: + raise RuntimeError("Error while getting the vlblock") + ret_block = PyBytes_FromStringAndSize(block, destsize) + free(block) + return ret_block + def delete_chunk(self, nchunk): rc = blosc2_schunk_delete_chunk(self.schunk, nchunk) if rc < 0: @@ -2444,7 +2621,8 @@ def open(urlpath, mode, offset, **kwargs): if mode != "w" and kwargs is not None: check_schunk_params(schunk, kwargs) cparams = kwargs.get("cparams") - # For reading with the default number of threads + # nthreads is not stored in the frame; apply the live global when the caller + # did not supply an explicit cparams — symmetric with the DParams default below. dparams = kwargs.get("dparams", blosc2.DParams()) if is_ndarray: @@ -2452,6 +2630,8 @@ def open(urlpath, mode, offset, **kwargs): _array=PyCapsule_New(array, "b2nd_array_t*", NULL)) if cparams is not None: res.schunk.cparams = cparams if isinstance(cparams, blosc2.CParams) else blosc2.CParams(**cparams) + else: + res.schunk.cparams = dataclasses.replace(res.schunk.cparams, nthreads=blosc2.nthreads) if dparams is not None: res.schunk.dparams = dparams if isinstance(dparams, blosc2.DParams) else blosc2.DParams(**dparams) res.schunk.mode = mode @@ -2460,6 +2640,8 @@ def open(urlpath, mode, offset, **kwargs): mode=mode, **kwargs) if cparams is not None: res.cparams = cparams if isinstance(cparams, blosc2.CParams) else blosc2.CParams(**cparams) + else: + res.cparams = dataclasses.replace(res.cparams, nthreads=blosc2.nthreads) if dparams is not None: res.dparams = dparams if isinstance(dparams, blosc2.DParams) else blosc2.DParams(**dparams) diff --git a/src/blosc2/c2array.py b/src/blosc2/c2array.py index e8556ba4..11f7f6cb 100644 --- a/src/blosc2/c2array.py +++ b/src/blosc2/c2array.py @@ -18,7 +18,7 @@ import requests import blosc2 -from blosc2.info import InfoReporter +from blosc2.info import InfoReporter, format_nbytes_info _subscriber_data = { "urlbase": os.environ.get("BLOSC_C2URLBASE"), @@ -424,8 +424,8 @@ def info_items(self) -> list: items += [("chunks", self.chunks)] items += [("blocks", self.blocks)] items += [("dtype", self.dtype)] - items += [("nbytes", self.nbytes)] - items += [("cbytes", self.cbytes)] + items += [("nbytes", format_nbytes_info(self.nbytes))] + items += [("cbytes", format_nbytes_info(self.cbytes))] items += [("cratio", f"{self.cratio:.2f}")] items += [("cparams", self.cparams)] # items += [("dparams", self.dparams)] diff --git a/src/blosc2/core.py b/src/blosc2/core.py index 5526a7f2..d574a21e 100644 --- a/src/blosc2/core.py +++ b/src/blosc2/core.py @@ -1918,9 +1918,9 @@ def ndarray_from_cframe(cframe: bytes | str, copy: bool = False) -> blosc2.NDArr def from_cframe( cframe: bytes | str, copy: bool = True -) -> blosc2.EmbedStore | blosc2.NDArray | blosc2.SChunk | blosc2.VLArray: - """Create a :ref:`EmbedStore `, :ref:`NDArray `, :ref:`SChunk ` - or :ref:`VLArray ` instance +) -> blosc2.EmbedStore | blosc2.NDArray | blosc2.SChunk | blosc2.BatchStore | blosc2.VLArray: + """Create a :ref:`EmbedStore `, :ref:`NDArray `, :ref:`SChunk `, + :ref:`BatchStore ` or :ref:`VLArray ` instance from a contiguous frame buffer. Parameters @@ -1937,8 +1937,8 @@ def from_cframe( Returns ------- - out: :ref:`EmbedStore `, :ref:`NDArray `, :ref:`SChunk ` - or :ref:`VLArray ` + out: :ref:`EmbedStore `, :ref:`NDArray `, :ref:`SChunk `, + :ref:`BatchStore ` or :ref:`VLArray ` A new instance of the appropriate type containing the data passed. See Also @@ -1952,6 +1952,8 @@ def from_cframe( # Check the metalayer to determine the type if "b2embed" in schunk.meta: return blosc2.estore_from_cframe(cframe, copy=copy) + if "batchstore" in schunk.meta: + return blosc2.BatchStore(_from_schunk=schunk_from_cframe(cframe, copy=copy)) if "vlarray" in schunk.meta: return blosc2.vlarray_from_cframe(cframe, copy=copy) if "b2nd" in schunk.meta: diff --git a/src/blosc2/dict_store.py b/src/blosc2/dict_store.py index 1cb6dd3c..6fe9e7ee 100644 --- a/src/blosc2/dict_store.py +++ b/src/blosc2/dict_store.py @@ -10,6 +10,7 @@ import os import shutil import tempfile +import warnings import zipfile from typing import TYPE_CHECKING, Any @@ -36,6 +37,8 @@ class DictStore: are stored as .b2nd files. - blosc2.SChunk: super-chunks. When persisted externally they are stored as .b2f files. + - blosc2.BatchStore: batched variable-length containers. When persisted + externally they are stored as .b2b files. - blosc2.C2Array: columnar containers. These are always kept inside the embedded store (never externalized). - numpy.ndarray: converted to blosc2.NDArray on assignment. @@ -82,7 +85,7 @@ class DictStore: >>> schunk.append_data(b"abcd") 4 >>> dstore["/dir1/schunk1"] = schunk # externalized as .b2f if above threshold - >>> dstore.to_b2z() # persist to the zip file; external files are copied in + >>> dstore.to_b2z(filename="my_dstore.b2z") # persist to the zip file; external files are copied in >>> print(sorted(dstore.keys())) ['/dir1/node3', '/dir1/schunk1', '/node1', '/node2'] >>> print(dstore["/node1"][:])) @@ -91,7 +94,10 @@ class DictStore: Notes ----- - External persistence uses the following file extensions: - .b2nd for NDArray and .b2f for SChunk. + .b2nd for NDArray, .b2f for SChunk, and .b2b for BatchStore. + These suffixes are a naming convention for newly written leaves; when + reopening an existing store, leaf typing is resolved from object + metadata instead of trusting the suffix alone. """ def __init__( @@ -110,7 +116,7 @@ def __init__( """ See :class:`DictStore` for full documentation of parameters. """ - self.localpath = localpath if isinstance(localpath, (str, bytes)) else str(localpath) + self.localpath = localpath if isinstance(localpath, str | bytes) else str(localpath) if not self.localpath.endswith((".b2z", ".b2d")): raise ValueError(f"localpath must have a .b2z or .b2d extension; you passed: {self.localpath}") if mode not in ("r", "w", "a"): @@ -180,10 +186,7 @@ def _init_read_mode(self, dparams: blosc2.DParams | None = None): mmap_mode=self.mmap_mode, dparams=dparams, ) - for filepath in self.offsets: - if filepath.endswith((".b2nd", ".b2f")): - key = "/" + filepath[: -5 if filepath.endswith(".b2nd") else -4] - self.map_tree[key] = filepath + self._update_map_tree_from_offsets() else: # .b2d if not os.path.isdir(self.localpath): raise FileNotFoundError(f"Directory {self.localpath} does not exist for reading.") @@ -199,6 +202,90 @@ def _init_read_mode(self, dparams: blosc2.DParams | None = None): self._estore = EmbedStore(_from_schunk=schunk) self.storage.meta = self._estore.storage.meta + @staticmethod + def _logical_key_from_relpath(rel_path: str) -> str: + """Map an external leaf path to its logical tree key.""" + rel_path = rel_path.replace(os.sep, "/") + key = os.path.splitext(rel_path)[0] + if not key.startswith("/"): + key = "/" + key + return key + + @staticmethod + def _expected_ext_from_kind(kind: str) -> str: + """Return the canonical write-time suffix for a supported external leaf kind.""" + if kind == "ndarray": + return ".b2nd" + if kind == "batchstore": + return ".b2b" + return ".b2f" + + @classmethod + def _opened_external_kind( + cls, + opened: blosc2.NDArray | SChunk | blosc2.VLArray | blosc2.BatchStore | C2Array | Any, + rel_path: str, + ) -> str | None: + """Return the supported external leaf kind for an already opened object.""" + processed = _process_opened_object(opened) + if isinstance(processed, blosc2.BatchStore): + kind = "batchstore" + elif isinstance(processed, blosc2.VLArray): + kind = "vlarray" + elif isinstance(processed, blosc2.NDArray): + kind = "ndarray" + elif isinstance(processed, SChunk): + kind = "schunk" + else: + warnings.warn( + f"Ignoring unsupported Blosc2 object at '{rel_path}' during DictStore discovery: " + f"{type(processed).__name__}", + UserWarning, + stacklevel=2, + ) + return None + + expected_ext = cls._expected_ext_from_kind(kind) + found_ext = os.path.splitext(rel_path)[1] + if found_ext != expected_ext: + warnings.warn( + f"External leaf '{rel_path}' uses extension '{found_ext}' but metadata resolves to " + f"{type(processed).__name__}; expected '{expected_ext}'.", + UserWarning, + stacklevel=2, + ) + return kind + + def _probe_external_leaf_path(self, rel_path: str) -> bool: + """Return whether a working-dir file is a supported external leaf.""" + urlpath = os.path.join(self.working_dir, rel_path) + try: + opened = blosc2.blosc2_ext.open( + urlpath, + mode="r", + offset=0, + mmap_mode=self.mmap_mode, + dparams=self.dparams, + ) + except Exception: + return False + return self._opened_external_kind(opened, rel_path) is not None + + def _probe_external_leaf_offset(self, filepath: str) -> bool: + """Return whether a zip member is a supported external leaf.""" + offset = self.offsets[filepath]["offset"] + try: + opened = blosc2.blosc2_ext.open( + self.b2z_path, + mode="r", + offset=offset, + mmap_mode=self.mmap_mode, + dparams=self.dparams, + ) + except Exception: + return False + return self._opened_external_kind(opened, filepath) is not None + def _init_write_append_mode( self, cparams: blosc2.CParams | None, @@ -224,24 +311,23 @@ def _init_write_append_mode( self._update_map_tree() def _update_map_tree(self): - # Build map_tree from .b2nd and .b2f files in working dir + # Build map_tree from supported external leaves in working dir. for root, _, files in os.walk(self.working_dir): for file in files: filepath = os.path.join(root, file) - if filepath.endswith((".b2nd", ".b2f")): - # Convert filename to key: remove extension and ensure starts with / - rel_path = os.path.relpath(filepath, self.working_dir) - # Normalize path separators to forward slashes for cross-platform consistency - rel_path = rel_path.replace(os.sep, "/") - if rel_path.endswith(".b2nd"): - key = rel_path[:-5] - elif rel_path.endswith(".b2f"): - key = rel_path[:-4] - else: - continue - if not key.startswith("/"): - key = "/" + key - self.map_tree[key] = rel_path + if os.path.abspath(filepath) == os.path.abspath(self.estore_path): + continue + rel_path = os.path.relpath(filepath, self.working_dir).replace(os.sep, "/") + if self._probe_external_leaf_path(rel_path): + self.map_tree[self._logical_key_from_relpath(rel_path)] = rel_path + + def _update_map_tree_from_offsets(self): + """Build map_tree from supported external leaves in a zip store.""" + for filepath in self.offsets: + if filepath == "embed.b2e": + continue + if self._probe_external_leaf_offset(filepath): + self.map_tree[self._logical_key_from_relpath(filepath)] = filepath @property def estore(self) -> EmbedStore: @@ -249,24 +335,28 @@ def estore(self) -> EmbedStore: return self._estore @staticmethod - def _value_nbytes(value: blosc2.Array | SChunk | blosc2.VLArray) -> int: - if isinstance(value, blosc2.VLArray): + def _value_nbytes(value: blosc2.Array | SChunk | blosc2.VLArray | blosc2.BatchStore) -> int: + if isinstance(value, blosc2.VLArray | blosc2.BatchStore): return value.schunk.nbytes return value.nbytes @staticmethod - def _is_external_value(value: blosc2.Array | SChunk | blosc2.VLArray) -> bool: - return isinstance(value, (blosc2.NDArray, SChunk, blosc2.VLArray)) and bool( + def _is_external_value(value: blosc2.Array | SChunk | blosc2.VLArray | blosc2.BatchStore) -> bool: + return isinstance(value, blosc2.NDArray | SChunk | blosc2.VLArray | blosc2.BatchStore) and bool( getattr(value, "urlpath", None) ) @staticmethod - def _external_ext(value: blosc2.Array | SChunk | blosc2.VLArray) -> str: + def _external_ext(value: blosc2.Array | SChunk | blosc2.VLArray | blosc2.BatchStore) -> str: if isinstance(value, blosc2.NDArray): return ".b2nd" + if isinstance(value, blosc2.BatchStore): + return ".b2b" return ".b2f" - def __setitem__(self, key: str, value: blosc2.Array | SChunk | blosc2.VLArray) -> None: + def __setitem__( + self, key: str, value: blosc2.Array | SChunk | blosc2.VLArray | blosc2.BatchStore + ) -> None: """Add a node to the DictStore.""" if isinstance(value, np.ndarray): value = blosc2.asarray(value, cparams=self.cparams, dparams=self.dparams) @@ -292,7 +382,7 @@ def __setitem__(self, key: str, value: blosc2.Array | SChunk | blosc2.VLArray) - if hasattr(value, "save"): value.save(urlpath=dest_path) else: - # SChunk and VLArray can both be persisted via their cframe. + # SChunk, VLArray and BatchStore can all be persisted via their cframe. with open(dest_path, "wb") as f: f.write(value.to_cframe()) else: @@ -310,7 +400,9 @@ def __setitem__(self, key: str, value: blosc2.Array | SChunk | blosc2.VLArray) - value = blosc2.from_cframe(value.to_cframe()) self._estore[key] = value - def __getitem__(self, key: str) -> blosc2.NDArray | SChunk | blosc2.VLArray | C2Array: + def __getitem__( + self, key: str + ) -> blosc2.NDArray | SChunk | blosc2.VLArray | blosc2.BatchStore | C2Array: """Retrieve a node from the DictStore.""" # Check map_tree first if key in self.map_tree: @@ -340,7 +432,9 @@ def __getitem__(self, key: str) -> blosc2.NDArray | SChunk | blosc2.VLArray | C2 # Fall back to EmbedStore return self._estore[key] - def get(self, key: str, default: Any = None) -> blosc2.NDArray | SChunk | blosc2.VLArray | C2Array | Any: + def get( + self, key: str, default: Any = None + ) -> blosc2.NDArray | SChunk | blosc2.VLArray | blosc2.BatchStore | C2Array | Any: """Retrieve a node, or default if not found.""" try: return self[key] @@ -393,12 +487,14 @@ def values(self) -> Iterator[blosc2.NDArray | SChunk | C2Array]: if self.is_zip_store: if filepath in self.offsets: offset = self.offsets[filepath]["offset"] - yield blosc2.blosc2_ext.open( - self.b2z_path, - mode="r", - offset=offset, - mmap_mode=self.mmap_mode, - dparams=self.dparams, + yield _process_opened_object( + blosc2.blosc2_ext.open( + self.b2z_path, + mode="r", + offset=offset, + mmap_mode=self.mmap_mode, + dparams=self.dparams, + ) ) else: urlpath = os.path.join(self.working_dir, filepath) @@ -425,12 +521,14 @@ def items(self) -> Iterator[tuple[str, blosc2.NDArray | SChunk | C2Array]]: offset = self.offsets[filepath]["offset"] yield ( key, - blosc2.blosc2_ext.open( - self.b2z_path, - mode="r", - offset=offset, - mmap_mode=self.mmap_mode, - dparams=self.dparams, + _process_opened_object( + blosc2.blosc2_ext.open( + self.b2z_path, + mode="r", + offset=offset, + mmap_mode=self.mmap_mode, + dparams=self.dparams, + ) ), ) else: @@ -457,14 +555,19 @@ def to_b2z(self, overwrite=False, filename=None) -> os.PathLike[Any] | str: If True, overwrite the existing b2z file if it exists. Default is False. filename : str, optional If provided, use this filename instead of the default b2z file path. + Keyword use is recommended for clarity. Returns ------- filename : str The absolute path to the created b2z file. """ - if self.mode == "r": - raise ValueError("Cannot call to_b2z() on a DictStore opened in read mode.") + if isinstance(overwrite, str | os.PathLike) and filename is None: + filename = overwrite + overwrite = False + + if self.mode == "r" and self.is_zip_store: + raise ValueError("Cannot call to_b2z() on a .b2z DictStore opened in read mode.") b2z_path = self.b2z_path if filename is None else filename if not b2z_path.endswith(".b2z"): @@ -484,7 +587,7 @@ def to_b2z(self, overwrite=False, filename=None) -> os.PathLike[Any] | str: # Sort filepaths by file size from largest to smallest filepaths.sort(key=os.path.getsize, reverse=True) - with zipfile.ZipFile(self.b2z_path, "w", zipfile.ZIP_STORED) as zf: + with zipfile.ZipFile(b2z_path, "w", zipfile.ZIP_STORED) as zf: # Write all files (except estore_path) first (sorted by size) for filepath in filepaths: arcname = os.path.relpath(filepath, self.working_dir) @@ -493,7 +596,7 @@ def to_b2z(self, overwrite=False, filename=None) -> os.PathLike[Any] | str: if os.path.exists(self.estore_path): arcname = os.path.relpath(self.estore_path, self.working_dir) zf.write(self.estore_path, arcname) - return os.path.abspath(self.b2z_path) + return os.path.abspath(b2z_path) def _get_zip_offsets(self) -> dict[str, dict[str, int]]: """Get offset and length of all files in the zip archive.""" diff --git a/src/blosc2/embed_store.py b/src/blosc2/embed_store.py index b03d892e..2497cad2 100644 --- a/src/blosc2/embed_store.py +++ b/src/blosc2/embed_store.py @@ -173,7 +173,9 @@ def _ensure_capacity(self, needed_bytes: int) -> None: new_size = max(required_size, int(self._store.shape[0] * 1.5)) self._store.resize((new_size,)) - def __setitem__(self, key: str, value: blosc2.Array | SChunk | blosc2.VLArray) -> None: + def __setitem__( + self, key: str, value: blosc2.Array | SChunk | blosc2.VLArray | blosc2.BatchStore + ) -> None: """Add a node to the embed store.""" if self.mode == "r": raise ValueError("Cannot set items in read-only mode.") @@ -196,7 +198,7 @@ def __setitem__(self, key: str, value: blosc2.Array | SChunk | blosc2.VLArray) - self._embed_map[key] = {"offset": offset, "length": data_len} self._save_metadata() - def __getitem__(self, key: str) -> blosc2.NDArray | SChunk | blosc2.VLArray: + def __getitem__(self, key: str) -> blosc2.NDArray | SChunk | blosc2.VLArray | blosc2.BatchStore: """Retrieve a node from the embed store.""" if key not in self._embed_map: raise KeyError(f"Key '{key}' not found in the embed store.") @@ -212,7 +214,9 @@ def __getitem__(self, key: str) -> blosc2.NDArray | SChunk | blosc2.VLArray: # Use from_cframe so we can deserialize either an NDArray or an SChunk return blosc2.from_cframe(serialized_data, copy=True) - def get(self, key: str, default: Any = None) -> blosc2.NDArray | SChunk | blosc2.VLArray | Any: + def get( + self, key: str, default: Any = None + ) -> blosc2.NDArray | SChunk | blosc2.VLArray | blosc2.BatchStore | Any: """Retrieve a node, or default if not found.""" return self[key] if key in self._embed_map else default @@ -239,12 +243,12 @@ def keys(self) -> KeysView[str]: """Return all keys.""" return self._embed_map.keys() - def values(self) -> Iterator[blosc2.NDArray | SChunk | blosc2.VLArray]: + def values(self) -> Iterator[blosc2.NDArray | SChunk | blosc2.VLArray | blosc2.BatchStore]: """Iterate over all values.""" for key in self._embed_map: yield self[key] - def items(self) -> Iterator[tuple[str, blosc2.NDArray | SChunk | blosc2.VLArray]]: + def items(self) -> Iterator[tuple[str, blosc2.NDArray | SChunk | blosc2.VLArray | blosc2.BatchStore]]: """Iterate over (key, value) pairs.""" for key in self._embed_map: yield key, self[key] diff --git a/src/blosc2/info.py b/src/blosc2/info.py index 4ac629da..ef1e3011 100644 --- a/src/blosc2/info.py +++ b/src/blosc2/info.py @@ -10,6 +10,22 @@ from textwrap import TextWrapper +def format_nbytes_human(nbytes: int) -> str: + units = ("B", "KiB", "MiB", "GiB", "TiB", "PiB") + value = float(nbytes) + for unit in units: + if value < 1024.0 or unit == units[-1]: + if unit == "B": + return f"{nbytes} B" + return f"{value:.2f} {unit}" + value /= 1024.0 + return None + + +def format_nbytes_info(nbytes: int) -> str: + return f"{nbytes} ({format_nbytes_human(nbytes)})" + + def info_text_report_(items: list) -> str: with io.StringIO() as buf: print(items, file=buf) diff --git a/src/blosc2/ndarray.py b/src/blosc2/ndarray.py index bc396622..4c35cef6 100644 --- a/src/blosc2/ndarray.py +++ b/src/blosc2/ndarray.py @@ -29,7 +29,7 @@ import blosc2 from blosc2 import SpecialValue, blosc2_ext, compute_chunks_blocks -from blosc2.info import InfoReporter +from blosc2.info import InfoReporter, format_nbytes_info from blosc2.schunk import SChunk from .linalg import matmul @@ -3838,8 +3838,8 @@ def info_items(self) -> list: items += [("chunks", self.chunks)] items += [("blocks", self.blocks)] items += [("dtype", self.dtype)] - items += [("nbytes", self.nbytes)] - items += [("cbytes", self.cbytes)] + items += [("nbytes", format_nbytes_info(self.nbytes))] + items += [("cbytes", format_nbytes_info(self.cbytes))] items += [("cratio", f"{self.cratio:.2f}")] items += [("cparams", self.cparams)] items += [("dparams", self.dparams)] diff --git a/src/blosc2/schunk.py b/src/blosc2/schunk.py index 5421ae41..55b4acdf 100644 --- a/src/blosc2/schunk.py +++ b/src/blosc2/schunk.py @@ -20,7 +20,7 @@ import blosc2 from blosc2 import SpecialValue, blosc2_ext from blosc2._msgpack_utils import msgpack_packb, msgpack_unpackb -from blosc2.info import InfoReporter +from blosc2.info import InfoReporter, format_nbytes_info class vlmeta(MutableMapping, blosc2_ext.vlmeta): @@ -491,8 +491,8 @@ def info_items(self) -> list: items += [("chunksize", self.chunksize)] items += [("blocksize", self.blocksize)] items += [("typesize", self.typesize)] - items += [("nbytes", self.nbytes)] - items += [("cbytes", self.cbytes)] + items += [("nbytes", format_nbytes_info(self.nbytes))] + items += [("cbytes", format_nbytes_info(self.cbytes))] items += [("cratio", f"{self.cratio:.2f}")] items += [("cparams", self.cparams)] items += [("dparams", self.dparams)] @@ -674,6 +674,10 @@ def get_chunk(self, nchunk: int) -> bytes: """ return super().get_chunk(nchunk) + def get_vlblock(self, nchunk: int, nblock: int) -> bytes: + """Return the decompressed payload of one VL block from a chunk.""" + return super().get_vlblock(nchunk, nblock) + def delete_chunk(self, nchunk: int) -> int: """Delete the specified chunk from the SChunk. @@ -1621,6 +1625,11 @@ def _process_opened_object(res): return VLArray(_from_schunk=getattr(res, "schunk", res)) + if "batchstore" in meta: + from blosc2.batch_store import BatchStore + + return BatchStore(_from_schunk=getattr(res, "schunk", res)) + if isinstance(res, blosc2.NDArray) and "LazyArray" in res.schunk.meta: return blosc2._open_lazyarray(res) else: @@ -1632,6 +1641,7 @@ def open( ) -> ( blosc2.SChunk | blosc2.NDArray + | blosc2.BatchStore | blosc2.VLArray | blosc2.C2Array | blosc2.LazyArray diff --git a/src/blosc2/storage.py b/src/blosc2/storage.py index 43835118..0015aea9 100644 --- a/src/blosc2/storage.py +++ b/src/blosc2/storage.py @@ -46,7 +46,9 @@ class CParams: (maximum compression). Default is 1. use_dict: bool Whether to use dictionaries when compressing - (only for :py:obj:`blosc2.Codec.ZSTD `). Default is `False`. + (supported for :py:obj:`blosc2.Codec.ZSTD `, + :py:obj:`blosc2.Codec.LZ4 `, and + :py:obj:`blosc2.Codec.LZ4HC `). Default is `False`. typesize: int The data type size, ranging from 1 to 255. Default is 8. nthreads: int diff --git a/src/blosc2/tree_store.py b/src/blosc2/tree_store.py index a96c11a4..9be6672f 100644 --- a/src/blosc2/tree_store.py +++ b/src/blosc2/tree_store.py @@ -226,7 +226,9 @@ def _validate_key(self, key: str) -> str: return key - def __setitem__(self, key: str, value: blosc2.Array | SChunk | blosc2.VLArray) -> None: + def __setitem__( + self, key: str, value: blosc2.Array | SChunk | blosc2.VLArray | blosc2.BatchStore + ) -> None: """Add a node with hierarchical key validation. Parameters @@ -268,7 +270,9 @@ def __setitem__(self, key: str, value: blosc2.Array | SChunk | blosc2.VLArray) - full_key = self._translate_key_to_full(key) super().__setitem__(full_key, value) - def __getitem__(self, key: str) -> NDArray | C2Array | SChunk | blosc2.VLArray | TreeStore: + def __getitem__( + self, key: str + ) -> NDArray | C2Array | SChunk | blosc2.VLArray | blosc2.BatchStore | TreeStore: """Retrieve a node or subtree view. If the key points to a subtree (intermediate path with children), @@ -282,7 +286,7 @@ def __getitem__(self, key: str) -> NDArray | C2Array | SChunk | blosc2.VLArray | Returns ------- - out : blosc2.NDArray or blosc2.C2Array or blosc2.SChunk or blosc2.VLArray or TreeStore + out : blosc2.NDArray or blosc2.C2Array or blosc2.SChunk or blosc2.VLArray or blosc2.BatchStore or TreeStore The stored array/chunk if key is a leaf node, or a TreeStore subtree view if key is an intermediate path with children. @@ -664,8 +668,15 @@ def _persist_vlmeta(self) -> None: """ if hasattr(self, "_vlmeta_key"): vlmeta_key = self._vlmeta_key - # Only embedded case is expected; handle it safely. - if hasattr(self, "_estore") and vlmeta_key in self._estore: + if vlmeta_key in self.map_tree: + filepath = self.map_tree[vlmeta_key] + dest_path = os.path.join(self.working_dir, filepath) + parent_dir = os.path.dirname(dest_path) + if parent_dir and not os.path.exists(parent_dir): + os.makedirs(parent_dir, exist_ok=True) + with open(dest_path, "wb") as f: + f.write(self._vlmeta.to_cframe()) + elif hasattr(self, "_estore") and vlmeta_key in self._estore: # Replace the stored snapshot with contextlib.suppress(KeyError): del self._estore[vlmeta_key] diff --git a/src/blosc2/vlarray.py b/src/blosc2/vlarray.py index d7d885e2..18c737a6 100644 --- a/src/blosc2/vlarray.py +++ b/src/blosc2/vlarray.py @@ -13,6 +13,7 @@ import blosc2 from blosc2._msgpack_utils import msgpack_packb, msgpack_unpackb +from blosc2.info import InfoReporter, format_nbytes_info if TYPE_CHECKING: from collections.abc import Iterator @@ -32,17 +33,27 @@ class VLArray: @staticmethod def _set_typesize_one(cparams: blosc2.CParams | dict | None) -> blosc2.CParams | dict: + auto_use_dict = cparams is None if cparams is None: cparams = blosc2.CParams() elif isinstance(cparams, blosc2.CParams): cparams = copy.deepcopy(cparams) else: cparams = dict(cparams) + auto_use_dict = "use_dict" not in cparams if isinstance(cparams, blosc2.CParams): cparams.typesize = 1 + if auto_use_dict and cparams.codec == blosc2.Codec.ZSTD and cparams.clevel > 0: + # VLArray stores many small serialized payloads, where Zstd dicts help materially. + cparams.use_dict = True else: cparams["typesize"] = 1 + codec = cparams.get("codec", blosc2.Codec.ZSTD) + clevel = cparams.get("clevel", 5) + if auto_use_dict and codec == blosc2.Codec.ZSTD and clevel > 0: + # VLArray stores many small serialized payloads, where Zstd dicts help materially. + cparams["use_dict"] = True return cparams @staticmethod @@ -73,7 +84,6 @@ def _validate_storage(storage: blosc2.Storage) -> None: def _attach_schunk(self, schunk: SChunk) -> None: self.schunk = schunk - self.urlpath = schunk.urlpath self.mode = schunk.mode self.mmap_mode = getattr(schunk, "mmap_mode", None) self._validate_tag() @@ -174,6 +184,15 @@ def _slice_indices(self, index: slice) -> list[int]: def _copy_meta(self) -> dict[str, Any]: return {name: self.meta[name] for name in self.meta} + def _item_size_stats(self) -> tuple[list[int], list[int]]: + item_nbytes = [] + chunk_cbytes = [] + for i in range(len(self)): + nbytes, cbytes, _ = blosc2.get_cbuffer_sizes(self.schunk.get_lazychunk(i)) + item_nbytes.append(nbytes) + chunk_cbytes.append(cbytes) + return item_nbytes, chunk_cbytes + def _serialize(self, value: Any) -> bytes: payload = msgpack_packb(value) _check_serialized_size(payload) @@ -301,6 +320,57 @@ def dparams(self): def chunksize(self) -> int: return self.schunk.chunksize + @property + def typesize(self) -> int: + return self.schunk.typesize + + @property + def nbytes(self) -> int: + return self.schunk.nbytes + + @property + def cbytes(self) -> int: + return self.schunk.cbytes + + @property + def cratio(self) -> float: + return self.schunk.cratio + + @property + def urlpath(self) -> str | None: + return self.schunk.urlpath + + @property + def contiguous(self) -> bool: + return self.schunk.contiguous + + @property + def info(self) -> InfoReporter: + """Print information about this VLArray.""" + return InfoReporter(self) + + @property + def info_items(self) -> list: + """A list of tuples with summary information about this VLArray.""" + item_nbytes, chunk_cbytes = self._item_size_stats() + avg_item_nbytes = sum(item_nbytes) / len(item_nbytes) if item_nbytes else 0.0 + avg_chunk_cbytes = sum(chunk_cbytes) / len(chunk_cbytes) if chunk_cbytes else 0.0 + return [ + ("type", f"{self.__class__.__name__}"), + ("entries", len(self)), + ("item_nbytes_min", min(item_nbytes) if item_nbytes else 0), + ("item_nbytes_max", max(item_nbytes) if item_nbytes else 0), + ("item_nbytes_avg", f"{avg_item_nbytes:.2f}"), + ("chunk_cbytes_min", min(chunk_cbytes) if chunk_cbytes else 0), + ("chunk_cbytes_max", max(chunk_cbytes) if chunk_cbytes else 0), + ("chunk_cbytes_avg", f"{avg_chunk_cbytes:.2f}"), + ("nbytes", format_nbytes_info(self.nbytes)), + ("cbytes", format_nbytes_info(self.cbytes)), + ("cratio", f"{self.cratio:.2f}"), + ("cparams", self.cparams), + ("dparams", self.dparams), + ] + def to_cframe(self) -> bytes: return self.schunk.to_cframe() diff --git a/tests/ndarray/test_ndarray.py b/tests/ndarray/test_ndarray.py index 8c9b45f7..b557bb65 100644 --- a/tests/ndarray/test_ndarray.py +++ b/tests/ndarray/test_ndarray.py @@ -103,6 +103,18 @@ def test_asarray(a): np.testing.assert_allclose(a, b[:]) +def test_ndarray_info_has_human_sizes(): + array = blosc2.asarray(np.arange(16, dtype=np.int32)) + + items = dict(array.info_items) + assert "(" in items["nbytes"] + assert "(" in items["cbytes"] + + text = repr(array.info) + assert "nbytes" in text + assert "cbytes" in text + + @pytest.mark.parametrize( ("shape", "newshape", "chunks", "blocks"), [ diff --git a/tests/ndarray/test_setitem.py b/tests/ndarray/test_setitem.py index 02a0a336..bde27317 100644 --- a/tests/ndarray/test_setitem.py +++ b/tests/ndarray/test_setitem.py @@ -66,7 +66,8 @@ def test_setitem_torch_proxy(shape, chunks, blocks, slices, dtype): dtype_ = {np.float32: torch.float32, np.int32: torch.int32, np.float64: torch.float64}[dtype] val = torch.ones(slice_shape, dtype=dtype_) a[slices] = val - nparray[slices] = val + # Make the expected assignment explicit so NumPy does not rely on torch.__array__(). + nparray[slices] = val.numpy() np.testing.assert_almost_equal(a[...], nparray) diff --git a/tests/test_batch_store.py b/tests/test_batch_store.py new file mode 100644 index 00000000..9ae83de5 --- /dev/null +++ b/tests/test_batch_store.py @@ -0,0 +1,722 @@ +####################################################################### +# Copyright (c) 2019-present, Blosc Development Team +# All rights reserved. +# +# SPDX-License-Identifier: BSD-3-Clause +####################################################################### + +import pytest + +import blosc2 +from blosc2._msgpack_utils import msgpack_packb, msgpack_unpackb + +BATCHES = [ + [b"bytes\x00payload", "plain text", 42], + [{"nested": [1, 2]}, None, {"tail": True}], + [(1, 2, "three"), 3.5, True], +] + + +def _make_payload(seed, size): + base = bytes((seed + i) % 251 for i in range(251)) + reps = size // len(base) + 1 + return (base * reps)[:size] + + +def _storage(contiguous, urlpath, mode="w"): + return blosc2.Storage(contiguous=contiguous, urlpath=urlpath, mode=mode) + + +@pytest.mark.parametrize( + ("contiguous", "urlpath"), + [ + (False, None), + (True, None), + (True, "test_batchstore.b2b"), + (False, "test_batchstore_s.b2b"), + ], +) +def test_batchstore_roundtrip(contiguous, urlpath): + blosc2.remove_urlpath(urlpath) + + barray = blosc2.BatchStore(storage=_storage(contiguous, urlpath)) + assert barray.meta["batchstore"]["serializer"] == "msgpack" + + for i, batch in enumerate(BATCHES, start=1): + assert barray.append(batch) == i + + assert len(barray) == len(BATCHES) + assert barray.max_blocksize is not None + assert 1 <= barray.max_blocksize <= len(BATCHES[0]) + assert [batch[:] for batch in barray] == BATCHES + assert barray.append([1, 2]) == len(BATCHES) + 1 + assert [batch[:] for batch in barray][-1] == [1, 2] + + batch0 = barray[0] + assert isinstance(batch0, blosc2.Batch) + assert len(batch0) == len(BATCHES[0]) + assert batch0[1] == BATCHES[0][1] + assert batch0[:] == BATCHES[0] + assert isinstance(batch0.lazybatch, bytes) + assert batch0.nbytes > 0 + assert batch0.cbytes > 0 + assert batch0.cratio > 0 + + expected = list(BATCHES) + expected.append([1, 2]) + expected[1] = ["updated", {"tuple": (7, 8)}, 99] + expected[-1] = ["tiny", False, "x"] + barray[1] = expected[1] + barray[-1] = expected[-1] + assert barray.insert(0, ["head", 0, "x"]) == len(expected) + 1 + expected.insert(0, ["head", 0, "x"]) + assert barray.insert(-1, ["between", {"k": 5}, None]) == len(expected) + 1 + expected.insert(-1, ["between", {"k": 5}, None]) + assert barray.insert(999, ["tail", 1, 2]) == len(expected) + 1 + expected.insert(999, ["tail", 1, 2]) + assert barray.delete(2) == len(expected) - 1 + del expected[2] + del barray[-2] + del expected[-2] + assert [batch[:] for batch in barray] == expected + + if urlpath is not None: + reopened = blosc2.open(urlpath, mode="r") + assert isinstance(reopened, blosc2.BatchStore) + assert reopened.max_blocksize == barray.max_blocksize + assert [batch[:] for batch in reopened] == expected + with pytest.raises(ValueError): + reopened.append(["nope"]) + with pytest.raises(ValueError): + reopened[0] = ["nope"] + with pytest.raises(ValueError): + reopened.insert(0, ["nope"]) + with pytest.raises(ValueError): + reopened.delete(0) + with pytest.raises(ValueError): + del reopened[0] + with pytest.raises(ValueError): + reopened.extend([["nope"]]) + with pytest.raises(ValueError): + reopened.pop() + with pytest.raises(ValueError): + reopened.clear() + + reopened_rw = blosc2.open(urlpath, mode="a") + reopened_rw[0] = ["changed", "batch", 0] + expected[0] = ["changed", "batch", 0] + assert [batch[:] for batch in reopened_rw] == expected + + if contiguous: + reopened_mmap = blosc2.open(urlpath, mode="r", mmap_mode="r") + assert isinstance(reopened_mmap, blosc2.BatchStore) + assert [batch[:] for batch in reopened_mmap] == expected + + blosc2.remove_urlpath(urlpath) + + +def test_batchstore_arrow_ipc_roundtrip(): + pa = pytest.importorskip("pyarrow") + urlpath = "test_batchstore_arrow_ipc.b2b" + blosc2.remove_urlpath(urlpath) + + barray = blosc2.BatchStore(storage=_storage(True, urlpath), serializer="arrow") + assert barray.serializer == "arrow" + assert barray.meta["batchstore"]["serializer"] == "arrow" + + batch1 = pa.array([[1, 2], None, [3]]) + batch2 = pa.array([[4], [5, 6]]) + barray.append(batch1) + barray.append(batch2) + + assert barray[0][:] == [[1, 2], None, [3]] + assert barray[1][:] == [[4], [5, 6]] + assert barray.meta["batchstore"]["arrow_schema"] is not None + + reopened = blosc2.open(urlpath, mode="r") + assert isinstance(reopened, blosc2.BatchStore) + assert reopened.serializer == "arrow" + assert reopened.meta["batchstore"]["serializer"] == "arrow" + assert reopened[0][:] == [[1, 2], None, [3]] + assert reopened[1][:] == [[4], [5, 6]] + + blosc2.remove_urlpath(urlpath) + + +def test_batchstore_inferred_layout_preserves_user_vlmeta(): + barray = blosc2.BatchStore() + barray.vlmeta["user"] = {"x": 1} + + barray.append([1, 2, 3]) + + assert barray.vlmeta["user"] == {"x": 1} + + +def test_batchstore_arrow_layout_persistence_preserves_user_vlmeta(): + pa = pytest.importorskip("pyarrow") + + barray = blosc2.BatchStore(serializer="arrow") + barray.vlmeta["user"] = {"x": 1} + + barray.append(pa.array([[1], [2, 3]])) + + assert barray.vlmeta["user"] == {"x": 1} + + +def test_batchstore_from_cframe(): + barray = blosc2.BatchStore() + barray.extend(BATCHES) + barray.insert(1, ["inserted", True, None]) + del barray[3] + expected = list(BATCHES) + expected.insert(1, ["inserted", True, None]) + del expected[3] + + restored = blosc2.from_cframe(barray.to_cframe()) + assert isinstance(restored, blosc2.BatchStore) + assert [batch[:] for batch in restored] == expected + + restored2 = blosc2.from_cframe(barray.to_cframe()) + assert isinstance(restored2, blosc2.BatchStore) + assert [batch[:] for batch in restored2] == expected + + +def test_batchstore_info(): + barray = blosc2.BatchStore() + barray.extend(BATCHES) + + assert barray.typesize == 1 + assert barray.contiguous == barray.schunk.contiguous + assert barray.urlpath == barray.schunk.urlpath + + items = dict(barray.info_items) + assert items["type"] == "BatchStore" + assert items["serializer"] == "msgpack" + assert items["nbatches"].startswith(f"{len(BATCHES)} (items per batch: mean=") + assert items["nblocks"].startswith(str(len(BATCHES))) + assert items["nitems"] == sum(len(batch) for batch in BATCHES) + assert "urlpath" not in items + assert "contiguous" not in items + assert "typesize" not in items + assert "(" in items["nbytes"] + assert "(" in items["cbytes"] + assert "B)" in items["nbytes"] or "KiB)" in items["nbytes"] or "MiB)" in items["nbytes"] + + text = repr(barray.info) + assert "type" in text + assert "serializer" in text + assert "BatchStore" in text + assert "items per batch" in text + assert "items per block" in text + + +def test_batchstore_info_uses_persisted_batch_lengths(): + barray = blosc2.BatchStore() + barray.extend(BATCHES) + + assert barray.vlmeta["_batch_store_metadata"]["batch_lengths"] == [len(batch) for batch in BATCHES] + + def fail_decode(*args, **kwargs): + raise AssertionError( + "info() should not deserialize batches when batch_lengths metadata is available" + ) + + original_decode_blocks = barray._decode_blocks + barray._decode_blocks = fail_decode + try: + items = dict(barray.info_items) + finally: + barray._decode_blocks = original_decode_blocks + + assert items["nitems"] == sum(len(batch) for batch in BATCHES) + assert "items per batch: mean=" in items["nbatches"] + + +def test_batchstore_info_reports_exact_block_stats_from_lazy_chunks(): + barray = blosc2.BatchStore(max_blocksize=2) + barray.extend([[1, 2, 3, 4, 5], [6, 7], [8]]) + + items = dict(barray.info_items) + assert items["nblocks"] == "5 (items per block: mean=1.60, max=2, min=1)" + + +def test_batchstore_pop_keeps_batch_lengths_metadata_in_sync(): + barray = blosc2.BatchStore(max_blocksize=2) + barray.extend([[1, 2, 3], [4, 5], [6]]) + + removed = barray.pop(1) + + assert removed == [4, 5] + assert [batch[:] for batch in barray] == [[1, 2, 3], [6]] + assert barray.vlmeta["_batch_store_metadata"]["batch_lengths"] == [3, 1] + items = dict(barray.info_items) + assert items["nbatches"].startswith("2 (items per batch: mean=2.00") + + +def test_batchstore_clear_keeps_empty_store_vlmeta_readable(): + urlpath = "test_batchstore_clear_empty_vlmeta.b2b" + blosc2.remove_urlpath(urlpath) + + barray = blosc2.BatchStore(urlpath=urlpath, mode="w", contiguous=True) + barray.append([1, 2, 3]) + barray.clear() + + assert barray.vlmeta.getall() == {} + + reopened = blosc2.open(urlpath, mode="r") + assert reopened.vlmeta.getall() == {} + + blosc2.remove_urlpath(urlpath) + + +def test_batchstore_delete_last_keeps_empty_store_vlmeta_readable(): + urlpath = "test_batchstore_delete_last_empty_vlmeta.b2b" + blosc2.remove_urlpath(urlpath) + + barray = blosc2.BatchStore(urlpath=urlpath, mode="w", contiguous=True) + barray.append([1, 2, 3]) + barray.delete(0) + + assert barray.vlmeta.getall() == {} + + reopened = blosc2.open(urlpath, mode="r") + assert reopened.vlmeta.getall() == {} + + blosc2.remove_urlpath(urlpath) + + +def test_batchstore_zstd_does_not_use_dict_by_default(): + barray = blosc2.BatchStore() + assert barray.cparams.codec == blosc2.Codec.ZSTD + assert barray.cparams.use_dict is False + + +def test_batchstore_explicit_max_blocksize(): + barray = blosc2.BatchStore(max_blocksize=2) + assert barray.max_blocksize == 2 + barray.append([1, 2, 3]) + barray.append([4]) + assert [batch[:] for batch in barray] == [[1, 2, 3], [4]] + + +def test_batchstore_get_vlblock_and_scalar_access(): + urlpath = "test_batchstore_vlblock.b2b" + blosc2.remove_urlpath(urlpath) + + batch = [0, 1, 2, 3, 4] + barray = blosc2.BatchStore(storage=_storage(True, urlpath), max_blocksize=2) + barray.append(batch) + + assert barray.max_blocksize == 2 + assert msgpack_unpackb(barray.schunk.get_vlblock(0, 0)) == batch[:2] + assert msgpack_unpackb(barray.schunk.get_vlblock(0, 1)) == batch[2:4] + assert msgpack_unpackb(barray.schunk.get_vlblock(0, 2)) == batch[4:] + + assert barray[0][0] == 0 + assert barray[0][2] == 2 + assert barray[0][4] == 4 + + reopened = blosc2.open(urlpath, mode="r") + assert isinstance(reopened, blosc2.BatchStore) + assert reopened.max_blocksize == 2 + assert reopened[0][0] == 0 + assert reopened[0][2] == 2 + assert reopened[0][4] == 4 + assert msgpack_unpackb(reopened.schunk.get_vlblock(0, 1)) == batch[2:4] + + blosc2.remove_urlpath(urlpath) + + +def test_batchstore_scalar_reads_cache_vlblocks(): + barray = blosc2.BatchStore(max_blocksize=2) + barray.append([0, 1, 2, 3, 4]) + + batch = barray[0] + original_get_vlblock = barray.schunk.get_vlblock + calls = [] + + def wrapped_get_vlblock(nchunk, nblock): + calls.append((nchunk, nblock)) + return original_get_vlblock(nchunk, nblock) + + barray.schunk.get_vlblock = wrapped_get_vlblock + try: + assert batch[0] == 0 + assert batch[1] == 1 + assert batch[0] == 0 + assert batch[2] == 2 + assert batch[3] == 3 + assert calls == [(0, 0), (0, 1)] + finally: + barray.schunk.get_vlblock = original_get_vlblock + + +def test_batchstore_iter_items(): + barray = blosc2.BatchStore(max_blocksize=2) + batches = [[1, 2, 3], [4], [5, 6]] + barray.extend(batches) + + assert [batch[:] for batch in barray] == batches + assert list(barray.iter_items()) == [1, 2, 3, 4, 5, 6] + + +def test_batchstore_respects_explicit_use_dict_and_non_zstd(): + barray = blosc2.BatchStore(cparams={"codec": blosc2.Codec.LZ4, "clevel": 5}) + assert barray.cparams.codec == blosc2.Codec.LZ4 + assert barray.cparams.use_dict is False + + barray = blosc2.BatchStore(cparams={"codec": blosc2.Codec.LZ4HC, "clevel": 1, "use_dict": True}) + assert barray.cparams.codec == blosc2.Codec.LZ4HC + assert barray.cparams.use_dict is True + + barray = blosc2.BatchStore(cparams={"codec": blosc2.Codec.ZSTD, "clevel": 0}) + assert barray.cparams.codec == blosc2.Codec.ZSTD + assert barray.cparams.use_dict is False + + barray = blosc2.BatchStore(cparams={"codec": blosc2.Codec.ZSTD, "clevel": 5, "use_dict": False}) + assert barray.cparams.use_dict is False + + barray = blosc2.BatchStore(cparams=blosc2.CParams(codec=blosc2.Codec.ZSTD, clevel=5, use_dict=False)) + assert barray.cparams.use_dict is False + + +def test_batchstore_guess_max_blocksize_uses_l2_for_clevel_5(monkeypatch): + monkeypatch.setitem(blosc2.cpu_info, "l1_data_cache_size", 100) + monkeypatch.setitem(blosc2.cpu_info, "l2_cache_size", 1000) + barray = blosc2.BatchStore(cparams={"clevel": 5}) + assert barray._guess_blocksize([30, 30, 30, 30]) == 4 + + +def test_batchstore_guess_max_blocksize_uses_l2_for_mid_clevel(monkeypatch): + monkeypatch.setitem(blosc2.cpu_info, "l1_data_cache_size", 100) + monkeypatch.setitem(blosc2.cpu_info, "l2_cache_size", 150) + barray = blosc2.BatchStore(cparams={"clevel": 6}) + assert barray._guess_blocksize([60, 60, 60, 60]) == 2 + + +def test_batchstore_guess_max_blocksize_uses_full_batch_for_clevel_9(monkeypatch): + monkeypatch.setitem(blosc2.cpu_info, "l1_data_cache_size", 1) + monkeypatch.setitem(blosc2.cpu_info, "l2_cache_size", 1) + barray = blosc2.BatchStore(cparams={"clevel": 9}) + assert barray._guess_blocksize([100, 100, 100, 100]) == 4 + + +def test_vlcompress_small_blocks_roundtrip(): + values = [ + {"value": None}, + {"value": []}, + {"value": []}, + {"value": ["en:salt"]}, + {"value": []}, + {"value": ["en:sugar", "en:flour"]}, + {"value": None}, + {"value": []}, + {"value": ["en:water", "en:yeast", "en:oil"]}, + {"value": []}, + {"value": []}, + {"value": ["en:acid", "en:color", "en:preservative", "en:spice"]}, + {"value": None}, + {"value": []}, + {"value": ["en:a", "en:b", "en:c", "en:d", "en:e", "en:f"]}, + {"value": []}, + {"value": []}, + {"value": None}, + {"value": ["en:x"]}, + {"value": []}, + ] + payloads = [msgpack_packb(value) for value in values] + + batch_payload = blosc2.blosc2_ext.vlcompress( + payloads, + codec=blosc2.Codec.ZSTD, + clevel=5, + typesize=1, + nthreads=1, + ) + out = blosc2.blosc2_ext.vldecompress(batch_payload, nthreads=1) + + assert out == payloads + + +def test_batchstore_constructor_kwargs(): + urlpath = "test_batchstore_kwargs.b2b" + blosc2.remove_urlpath(urlpath) + + barray = blosc2.BatchStore(urlpath=urlpath, mode="w", contiguous=True) + barray.extend(BATCHES) + + reopened = blosc2.BatchStore(urlpath=urlpath, mode="r", contiguous=True, mmap_mode="r") + assert [batch[:] for batch in reopened] == BATCHES + + blosc2.remove_urlpath(urlpath) + + +@pytest.mark.parametrize( + ("contiguous", "urlpath"), + [ + (False, None), + (True, None), + (True, "test_batchstore_list_ops.b2b"), + (False, "test_batchstore_list_ops_s.b2b"), + ], +) +def test_batchstore_list_like_ops(contiguous, urlpath): + blosc2.remove_urlpath(urlpath) + + barray = blosc2.BatchStore(storage=_storage(contiguous, urlpath)) + barray.extend([[1, 2, 3], [4, 5, 6], [7, 8, 9]]) + assert [batch[:] for batch in barray] == [[1, 2, 3], [4, 5, 6], [7, 8, 9]] + assert barray.pop() == [7, 8, 9] + assert barray.pop(0) == [1, 2, 3] + assert [batch[:] for batch in barray] == [[4, 5, 6]] + + barray.clear() + assert len(barray) == 0 + assert [batch[:] for batch in barray] == [] + + barray.extend([["a", "b", "c"], ["d", "e", "f"]]) + assert [batch[:] for batch in barray] == [["a", "b", "c"], ["d", "e", "f"]] + + if urlpath is not None: + reopened = blosc2.open(urlpath, mode="r") + assert [batch[:] for batch in reopened] == [["a", "b", "c"], ["d", "e", "f"]] + + blosc2.remove_urlpath(urlpath) + + +@pytest.mark.parametrize( + ("contiguous", "urlpath"), + [ + (False, None), + (True, None), + (True, "test_batchstore_slices.b2b"), + (False, "test_batchstore_slices_s.b2b"), + ], +) +def test_batchstore_slices(contiguous, urlpath): + blosc2.remove_urlpath(urlpath) + + expected = [[i, i + 100, i + 200] for i in range(8)] + barray = blosc2.BatchStore(storage=_storage(contiguous, urlpath)) + barray.extend(expected) + + assert [batch[:] for batch in barray[1:6:2]] == expected[1:6:2] + assert [batch[:] for batch in barray[::-2]] == expected[::-2] + + barray[2:5] = [["a", "b", "c"], ["d", "e", "f"], ["g", "h", "i"]] + expected[2:5] = [["a", "b", "c"], ["d", "e", "f"], ["g", "h", "i"]] + assert [batch[:] for batch in barray] == expected + + barray[1:6:2] = [[100, 101, 102], [103, 104, 105], [106, 107, 108]] + expected[1:6:2] = [[100, 101, 102], [103, 104, 105], [106, 107, 108]] + assert [batch[:] for batch in barray] == expected + + del barray[::3] + del expected[::3] + assert [batch[:] for batch in barray] == expected + + if urlpath is not None: + reopened = blosc2.open(urlpath, mode="r") + assert [batch[:] for batch in reopened[::2]] == expected[::2] + with pytest.raises(ValueError): + reopened[1:3] = [[9]] + with pytest.raises(ValueError): + del reopened[::2] + + blosc2.remove_urlpath(urlpath) + + +def test_batchstore_slice_errors(): + barray = blosc2.BatchStore() + barray.extend([[0], [1], [2], [3]]) + + with pytest.raises(ValueError, match="extended slice"): + barray[::2] = [[9]] + with pytest.raises(TypeError): + barray[1:2] = 3 + with pytest.raises(ValueError): + _ = barray[::0] + + +@pytest.mark.parametrize( + ("contiguous", "urlpath"), + [ + (False, None), + (True, None), + (True, "test_batchstore_items.b2b"), + (False, "test_batchstore_items_s.b2b"), + ], +) +def test_batchstore_items_accessor(contiguous, urlpath): + blosc2.remove_urlpath(urlpath) + + batches = [["a", "b"], [10, 11, 12], [{"x": 1}], [None, True]] + flat = [item for batch in batches for item in batch] + barray = blosc2.BatchStore(storage=_storage(contiguous, urlpath), max_blocksize=2) + barray.extend(batches) + + assert len(barray.items) == len(flat) + assert barray.items[0] == flat[0] + assert barray.items[3] == flat[3] + assert barray.items[-1] == flat[-1] + assert barray.items[1:6] == flat[1:6] + assert barray.items[::-2] == flat[::-2] + + barray.append(["tail0", "tail1"]) + flat.extend(["tail0", "tail1"]) + assert len(barray.items) == len(flat) + assert barray.items[-2:] == flat[-2:] + + barray.insert(1, ["mid0", "mid1"]) + flat[2:2] = ["mid0", "mid1"] + assert barray.items[:] == flat + + barray[2] = ["replaced"] + batch_start = len(batches[0]) + 2 + flat[batch_start : batch_start + 3] = ["replaced"] + assert barray.items[:] == flat + + del barray[0] + del flat[:2] + assert barray.items[:] == flat + + with pytest.raises(IndexError, match="item index out of range"): + _ = barray.items[len(flat)] + with pytest.raises(TypeError, match="item indices must be integers"): + _ = barray.items[1.5] + with pytest.raises(ValueError): + _ = barray.items[::0] + + if urlpath is not None: + reopened = blosc2.open(urlpath, mode="r") + assert reopened.items[:] == flat + assert reopened.items[2] == flat[2] + + blosc2.remove_urlpath(urlpath) + + +def test_batchstore_copy(): + urlpath = "test_batchstore_copy.b2b" + copy_path = "test_batchstore_copy_out.b2b" + blosc2.remove_urlpath(urlpath) + blosc2.remove_urlpath(copy_path) + + original = blosc2.BatchStore(urlpath=urlpath, mode="w", contiguous=True) + original.extend(BATCHES) + original.insert(1, ["copy", True, 123]) + + copied = original.copy( + urlpath=copy_path, contiguous=False, cparams={"codec": blosc2.Codec.LZ4, "clevel": 5} + ) + assert [batch[:] for batch in copied] == [batch[:] for batch in original] + assert copied.urlpath == copy_path + assert copied.schunk.contiguous is False + assert copied.cparams.codec == blosc2.Codec.LZ4 + assert copied.cparams.clevel == 5 + + inmem = original.copy() + assert [batch[:] for batch in inmem] == [batch[:] for batch in original] + assert inmem.urlpath is None + + with pytest.raises(ValueError, match="meta should not be passed to copy"): + original.copy(meta={}) + + blosc2.remove_urlpath(urlpath) + blosc2.remove_urlpath(copy_path) + + +def test_batchstore_copy_with_storage_preserves_user_metadata(): + urlpath = "test_batchstore_copy_storage.b2b" + copy_path = "test_batchstore_copy_storage_out.b2b" + blosc2.remove_urlpath(urlpath) + blosc2.remove_urlpath(copy_path) + + original = blosc2.BatchStore(urlpath=urlpath, mode="w", contiguous=True, meta={"user_meta": {"a": 1}}) + original.vlmeta["user_vlmeta"] = {"b": 2} + original.extend(BATCHES) + + copied = original.copy(storage=blosc2.Storage(contiguous=False, urlpath=copy_path, mode="w")) + + assert [batch[:] for batch in copied] == [batch[:] for batch in original] + assert copied.meta["user_meta"] == {"a": 1} + assert copied.vlmeta["user_vlmeta"] == {"b": 2} + + blosc2.remove_urlpath(urlpath) + blosc2.remove_urlpath(copy_path) + + +@pytest.mark.parametrize(("contiguous", "nthreads"), [(False, 2), (True, 4)]) +def test_batchstore_multithreaded_inner_vl(contiguous, nthreads): + batches = [] + for batch_id in range(24): + batch = [] + for obj_id, size in enumerate( + (13, 1024 + batch_id * 17, 70_000 + batch_id * 13, 250_000 + batch_id * 101) + ): + batch.append( + { + "batch": batch_id, + "obj": obj_id, + "size": size, + "payload": _make_payload(batch_id + obj_id, size), + } + ) + batches.append(batch) + + barray = blosc2.BatchStore( + storage=blosc2.Storage(contiguous=contiguous), + cparams=blosc2.CParams(typesize=1, nthreads=nthreads, codec=blosc2.Codec.ZSTD, clevel=5), + dparams=blosc2.DParams(nthreads=nthreads), + ) + barray.extend(batches) + + assert [batch[:] for batch in barray] == batches + assert [barray[i][:] for i in range(len(barray))] == batches + + +def test_batchstore_validation_errors(): + barray = blosc2.BatchStore() + + with pytest.raises(TypeError): + barray.append("value") + with pytest.raises(ValueError): + barray.append([]) + with pytest.raises(TypeError): + barray.insert("0", ["bad"]) + with pytest.raises(IndexError): + barray.delete(3) + with pytest.raises(IndexError): + blosc2.BatchStore().pop() + barray.extend([[1, 2, 3]]) + assert barray.append([2, 3]) == 2 + assert [batch[:] for batch in barray] == [[1, 2, 3], [2, 3]] + with pytest.raises(NotImplementedError): + barray.pop(slice(0, 1)) + + +def test_batchstore_in_embed_store(): + estore = blosc2.EmbedStore() + barray = blosc2.BatchStore() + barray.extend(BATCHES) + + estore["/batch"] = barray + restored = estore["/batch"] + assert isinstance(restored, blosc2.BatchStore) + assert [batch[:] for batch in restored] == BATCHES + + +def test_batchstore_in_dict_store(): + path = "test_batchstore_store.b2z" + blosc2.remove_urlpath(path) + + with blosc2.DictStore(path, mode="w", threshold=1) as dstore: + barray = blosc2.BatchStore() + barray.extend(BATCHES) + dstore["/batch"] = barray + + with blosc2.DictStore(path, mode="r") as dstore: + restored = dstore["/batch"] + assert isinstance(restored, blosc2.BatchStore) + assert [batch[:] for batch in restored] == BATCHES + + blosc2.remove_urlpath(path) diff --git a/tests/test_dict_store.py b/tests/test_dict_store.py index 74122424..337ace30 100644 --- a/tests/test_dict_store.py +++ b/tests/test_dict_store.py @@ -16,6 +16,22 @@ from blosc2.dict_store import DictStore +def _rename_store_member(store_path, old_name, new_name): + """Rename an external leaf inside a .b2d/.b2z store without changing its contents.""" + if str(store_path).endswith(".b2d"): + old_path = os.path.join(store_path, old_name.replace("/", os.sep)) + new_path = os.path.join(store_path, new_name.replace("/", os.sep)) + os.rename(old_path, new_path) + return + + tmp_zip = f"{store_path}.tmp" + with zipfile.ZipFile(store_path, "r") as src, zipfile.ZipFile(tmp_zip, "w", zipfile.ZIP_STORED) as dst: + for info in src.infolist(): + arcname = new_name if info.filename == old_name else info.filename + dst.writestr(arcname, src.read(info.filename), compress_type=zipfile.ZIP_STORED) + os.replace(tmp_zip, store_path) + + @pytest.fixture(params=["b2d", "b2z"]) def populated_dict_store(request): """Create and populate a DictStore for tests. @@ -98,6 +114,74 @@ def test_to_b2z_and_reopen(populated_dict_store): assert np.all(dstore_read["/nodeB"][:] == np.arange(6)) +def test_to_b2z_from_readonly_b2d(): + b2d_path = "test_to_b2z_from_readonly.b2d" + b2z_path = "test_to_b2z_from_readonly.b2z" + + if os.path.exists(b2d_path): + shutil.rmtree(b2d_path) + if os.path.exists(b2z_path): + os.remove(b2z_path) + + with DictStore(b2d_path, mode="w") as dstore: + dstore["/nodeA"] = np.arange(5) + dstore["/nodeB"] = np.arange(6) + + with DictStore(b2d_path, mode="r") as dstore: + packed = dstore.to_b2z(filename=b2z_path) + assert packed.endswith(b2z_path) + + with DictStore(b2z_path, mode="r") as dstore: + assert np.all(dstore["/nodeA"][:] == np.arange(5)) + assert np.all(dstore["/nodeB"][:] == np.arange(6)) + + shutil.rmtree(b2d_path) + os.remove(b2z_path) + + +def test_to_b2z_accepts_positional_filename(): + b2d_path = "test_to_b2z_positional_filename.b2d" + b2z_path = "test_to_b2z_positional_filename.b2z" + + if os.path.exists(b2d_path): + shutil.rmtree(b2d_path) + if os.path.exists(b2z_path): + os.remove(b2z_path) + + with DictStore(b2d_path, mode="w") as dstore: + dstore["/nodeA"] = np.arange(5) + + with DictStore(b2d_path, mode="r") as dstore: + packed = dstore.to_b2z(b2z_path) + assert packed.endswith(b2z_path) + + with DictStore(b2z_path, mode="r") as dstore: + assert np.all(dstore["/nodeA"][:] == np.arange(5)) + + shutil.rmtree(b2d_path) + os.remove(b2z_path) + + +def test_to_b2z_from_readonly_b2z_raises(): + b2z_path = "test_to_b2z_readonly_zip.b2z" + out_path = "test_to_b2z_readonly_zip_out.b2z" + + for path in (b2z_path, out_path): + if os.path.exists(path): + os.remove(path) + + with DictStore(b2z_path, mode="w") as dstore: + dstore["/nodeA"] = np.arange(5) + + with ( + DictStore(b2z_path, mode="r") as dstore, + pytest.raises(ValueError, match=r"\.b2z DictStore opened in read mode"), + ): + dstore.to_b2z(filename=out_path) + + os.remove(b2z_path) + + def test_map_tree_precedence(populated_dict_store): dstore, path = populated_dict_store # Create external file and add to dstore @@ -266,6 +350,78 @@ def test_external_vlarray_file_and_reopen(tmp_path): assert value.vlmeta["description"] == "External VLArray" +@pytest.mark.parametrize("storage_type", ["b2d", "b2z"]) +def test_metadata_discovery_reopens_renamed_external_ndarray(storage_type, tmp_path): + path = tmp_path / f"test_renamed_ndarray.{storage_type}" + ext_path = tmp_path / "renamed_array_source.b2nd" + + with DictStore(str(path), mode="w", threshold=None) as dstore: + arr_external = blosc2.arange(5, urlpath=str(ext_path), mode="w") + arr_external.vlmeta["description"] = "Renamed NDArray" + dstore["/dir1/node3"] = arr_external + + old_name = "dir1/node3.b2nd" + new_name = "dir1/node3.weird" + _rename_store_member(str(path), old_name, new_name) + + with pytest.warns(UserWarning, match=r"node3\.weird'.*NDArray.*expected '\.b2nd'"): + dstore_read = DictStore(str(path), mode="r") + with dstore_read: + assert dstore_read.map_tree["/dir1/node3"] == new_name + node3 = dstore_read["/dir1/node3"] + assert isinstance(node3, blosc2.NDArray) + assert np.array_equal(node3[:], np.arange(5)) + assert node3.vlmeta["description"] == "Renamed NDArray" + + +@pytest.mark.parametrize("storage_type", ["b2d", "b2z"]) +def test_metadata_discovery_reopens_renamed_external_vlarray(storage_type, tmp_path): + path = tmp_path / f"test_renamed_vlarray.{storage_type}" + ext_path = tmp_path / "renamed_vlarray_source.b2frame" + values = ["alpha", {"nested": True}, None, (1, 2, 3)] + + vlarray = blosc2.VLArray(urlpath=str(ext_path), mode="w", contiguous=True) + vlarray.extend(values) + vlarray.vlmeta["description"] = "Renamed VLArray" + + with DictStore(str(path), mode="w", threshold=None) as dstore: + dstore["/dir1/vlarray_ext"] = vlarray + + old_name = "dir1/vlarray_ext.b2f" + new_name = "dir1/vlarray_ext.renamed" + _rename_store_member(str(path), old_name, new_name) + + with pytest.warns(UserWarning, match=r"vlarray_ext\.renamed'.*VLArray.*expected '\.b2f'"): + dstore_read = DictStore(str(path), mode="r") + with dstore_read: + assert dstore_read.map_tree["/dir1/vlarray_ext"] == new_name + value = dstore_read["/dir1/vlarray_ext"] + assert isinstance(value, blosc2.VLArray) + assert list(value) == values + assert value.vlmeta["description"] == "Renamed VLArray" + + +def test_metadata_discovery_warns_and_skips_unsupported_blosc2_leaf(tmp_path): + path = tmp_path / "test_unsupported_lazyexpr.b2d" + + with DictStore(str(path), mode="w") as dstore: + dstore["/embedded"] = np.arange(3) + + a = blosc2.asarray(np.arange(5), urlpath=str(tmp_path / "a.b2nd"), mode="w") + b = blosc2.asarray(np.arange(5), urlpath=str(tmp_path / "b.b2nd"), mode="w") + expr = a + b + expr_path = path / "unsupported_lazyexpr.b2nd" + expr.save(str(expr_path)) + + with pytest.warns( + UserWarning, match=r"Ignoring unsupported Blosc2 object.*unsupported_lazyexpr\.b2nd.*LazyExpr" + ): + dstore_read = DictStore(str(path), mode="r") + with dstore_read: + assert "/unsupported_lazyexpr" not in dstore_read + assert "/embedded" in dstore_read + + def _digest_value(value): """Return a bytes digest of a stored value.""" if isinstance(value, blosc2.SChunk): diff --git a/tests/test_schunk.py b/tests/test_schunk.py index 539e495f..db7c087e 100644 --- a/tests/test_schunk.py +++ b/tests/test_schunk.py @@ -186,6 +186,19 @@ def test_schunk(contiguous, urlpath, mode, mmap_mode, nbytes, cparams, dparams, blosc2.remove_urlpath(urlpath) +def test_schunk_info_has_human_sizes(): + schunk = blosc2.SChunk(chunksize=32) + schunk.append_data(b"a" * 32) + + items = dict(schunk.info_items) + assert "(" in items["nbytes"] + assert "(" in items["cbytes"] + + text = repr(schunk.info) + assert "nbytes" in text + assert "cbytes" in text + + @pytest.mark.parametrize( ("urlpath", "contiguous", "mode", "mmap_mode"), [ diff --git a/tests/test_tree_store.py b/tests/test_tree_store.py index 5da45f64..780d6cf5 100644 --- a/tests/test_tree_store.py +++ b/tests/test_tree_store.py @@ -7,6 +7,7 @@ import os import shutil +import zipfile import numpy as np import pytest @@ -15,6 +16,22 @@ from blosc2.tree_store import TreeStore +def _rename_store_member(store_path, old_name, new_name): + """Rename an external leaf inside a .b2d/.b2z store without changing its contents.""" + if str(store_path).endswith(".b2d"): + old_path = os.path.join(store_path, old_name.replace("/", os.sep)) + new_path = os.path.join(store_path, new_name.replace("/", os.sep)) + os.rename(old_path, new_path) + return + + tmp_zip = f"{store_path}.tmp" + with zipfile.ZipFile(store_path, "r") as src, zipfile.ZipFile(tmp_zip, "w", zipfile.ZIP_STORED) as dst: + for info in src.infolist(): + arcname = new_name if info.filename == old_name else info.filename + dst.writestr(arcname, src.read(info.filename), compress_type=zipfile.ZIP_STORED) + os.replace(tmp_zip, store_path) + + @pytest.fixture(params=["b2d", "b2z"]) def populated_tree_store(request): """A fixture that creates and populates a TreeStore.""" @@ -654,6 +671,56 @@ def test_external_vlarray_support(): os.remove("test_vlarray_external.b2z") +def test_external_batchstore_support(tmp_path): + store_path = tmp_path / "test_batchstore_external.b2d" + + with TreeStore(str(store_path), mode="w", threshold=0) as tstore: + bstore = blosc2.BatchStore(max_blocksize=2) + bstore.extend([[{"id": 1}, {"id": 2}], [{"id": 3}]]) + tstore["/data/batchstore"] = bstore + + batchstore_path = store_path / "data" / "batchstore.b2b" + assert batchstore_path.exists() + + with TreeStore(str(store_path), mode="r") as tstore: + retrieved = tstore["/data/batchstore"] + assert isinstance(retrieved, blosc2.BatchStore) + assert [batch[:] for batch in retrieved] == [[{"id": 1}, {"id": 2}], [{"id": 3}]] + + +@pytest.mark.parametrize("storage_type", ["b2d", "b2z"]) +def test_metadata_discovery_reopens_renamed_batchstore_leaf(storage_type, tmp_path): + store_path = tmp_path / f"test_batchstore_renamed.{storage_type}" + + with TreeStore(str(store_path), mode="w", threshold=0) as tstore: + bstore = blosc2.BatchStore(max_blocksize=2) + bstore.extend([[{"id": 1}, {"id": 2}], [{"id": 3}]]) + tstore["/data/batchstore"] = bstore + + old_name = "data/batchstore.b2b" + new_name = "data/batchstore.odd" + _rename_store_member(str(store_path), old_name, new_name) + + with pytest.warns(UserWarning, match=r"batchstore\.odd'.*BatchStore.*expected '\.b2b'"): + tstore = TreeStore(str(store_path), mode="r") + with tstore: + assert tstore.map_tree["/data/batchstore"] == new_name + retrieved = tstore["/data/batchstore"] + assert isinstance(retrieved, blosc2.BatchStore) + assert [batch[:] for batch in retrieved] == [[{"id": 1}, {"id": 2}], [{"id": 3}]] + + +def test_treestore_vlmeta_externalized_b2d(tmp_path): + store_path = tmp_path / "test_vlmeta_externalized.b2d" + + with TreeStore(str(store_path), mode="w", threshold=0) as tstore: + tstore["/data"] = np.array([1, 2, 3]) + tstore.vlmeta["schema_manifest"] = {"version": 1, "fields": {"a": {"kind": "fixed"}}} + + with TreeStore(str(store_path), mode="r") as tstore: + assert tstore.vlmeta["schema_manifest"] == {"version": 1, "fields": {"a": {"kind": "fixed"}}} + + def test_walk_topdown_argument_ordering(): """Ensure walk supports topdown argument mimicking os.walk order semantics.""" with TreeStore("test_walk_topdown.b2z", mode="w") as tstore: diff --git a/tests/test_vlarray.py b/tests/test_vlarray.py index 3a25fa78..0c1f01f3 100644 --- a/tests/test_vlarray.py +++ b/tests/test_vlarray.py @@ -117,6 +117,59 @@ def test_vlarray_from_cframe(): assert list(restored2) == expected +def test_vlarray_info(): + vlarray = blosc2.VLArray() + vlarray.extend(VALUES) + + assert vlarray.typesize == 1 + assert vlarray.contiguous == vlarray.schunk.contiguous + assert vlarray.urlpath == vlarray.schunk.urlpath + + items = dict(vlarray.info_items) + assert items["type"] == "VLArray" + assert items["entries"] == len(VALUES) + assert items["item_nbytes_min"] > 0 + assert items["item_nbytes_max"] >= items["item_nbytes_min"] + assert items["chunk_cbytes_min"] > 0 + assert items["chunk_cbytes_max"] >= items["chunk_cbytes_min"] + assert "urlpath" not in items + assert "contiguous" not in items + assert "typesize" not in items + assert "(" in items["nbytes"] + assert "(" in items["cbytes"] + + text = repr(vlarray.info) + assert "type" in text + assert "VLArray" in text + assert "item_nbytes_avg" in text + + +def test_vlarray_zstd_uses_dict_by_default(): + vlarray = blosc2.VLArray() + assert vlarray.cparams.codec == blosc2.Codec.ZSTD + assert vlarray.cparams.use_dict is True + + +def test_vlarray_respects_explicit_use_dict_and_non_zstd(): + vlarray = blosc2.VLArray(cparams={"codec": blosc2.Codec.LZ4, "clevel": 5}) + assert vlarray.cparams.codec == blosc2.Codec.LZ4 + assert vlarray.cparams.use_dict is False + + vlarray = blosc2.VLArray(cparams={"codec": blosc2.Codec.LZ4HC, "clevel": 1, "use_dict": True}) + assert vlarray.cparams.codec == blosc2.Codec.LZ4HC + assert vlarray.cparams.use_dict is True + + vlarray = blosc2.VLArray(cparams={"codec": blosc2.Codec.ZSTD, "clevel": 0}) + assert vlarray.cparams.codec == blosc2.Codec.ZSTD + assert vlarray.cparams.use_dict is False + + vlarray = blosc2.VLArray(cparams={"codec": blosc2.Codec.ZSTD, "clevel": 5, "use_dict": False}) + assert vlarray.cparams.use_dict is False + + vlarray = blosc2.VLArray(cparams=blosc2.CParams(codec=blosc2.Codec.ZSTD, clevel=5, use_dict=False)) + assert vlarray.cparams.use_dict is False + + def test_vlarray_constructor_kwargs(): urlpath = "test_vlarray_kwargs.b2frame" blosc2.remove_urlpath(urlpath)