diff --git a/docs/docs/pypaimon/daft.md b/docs/docs/pypaimon/daft.md index d02590d1555d..c553bc2fbd16 100644 --- a/docs/docs/pypaimon/daft.md +++ b/docs/docs/pypaimon/daft.md @@ -225,6 +225,113 @@ Use `pypaimon.daft` when your application is written with Daft DataFrames and you want Daft to schedule the execution on Ray. Use `pypaimon.ray` instead when your application directly reads or writes Ray Datasets. +## Reading Blob Columns + +Tables with BLOB columns (see [Blob Storage](./blob)) can be read with Daft. +Blob columns are returned as `daft.File` references — the actual bytes are +**not** loaded until you explicitly read them. + +:::caution Daft's built-in connector does not support blob lazy loading yet + +`daft.read_paimon()` currently reads blob data eagerly during the scan phase, +ignoring column pruning and filter pushdown for blob columns. +Use `from pypaimon.daft import read_paimon` instead for blob lazy loading. +::: + +### Lazy loading + +When you read a blob table through `pypaimon.daft`, the connector +automatically enables descriptor mode internally. Blob columns appear as +`daft.File` references in the DataFrame — no bytes are fetched from storage +at this point. The actual I/O only happens when you call `file.open()` inside +a UDF, and only for the rows that survive earlier filters: + +```python +import daft +from daft import col +from pypaimon.daft import read_paimon + +df = read_paimon( + "my_db.image_table", + catalog_options={"warehouse": "/path/to/warehouse"}, +) + +# Filter runs BEFORE any blob bytes are read. +df = df.where(col("id") < 100) + +# file.length returns the blob size from the descriptor (zero I/O). +@daft.func(return_dtype=daft.DataType.int64()) +def file_length(f: daft.File) -> int | None: + return None if f is None else f.length + +result = df.with_column("size", file_length(col("image"))) +result.show() +``` + +To read actual blob content (e.g., decode an image), use `file.open()`. +Only filtered rows trigger I/O: + +```python +@daft.func +def decode_image(file: daft.File) -> str: + with file.open() as f: + data = f.read() + return f"decoded {len(data)} bytes" + +result = df.with_column("info", decode_image(col("image"))) +result.show() +``` + +### Parallel blob reading + +By default, `@daft.func` processes rows sequentially. To read blobs +concurrently within a single worker, use an async UDF with +`max_concurrency` and `asyncio.to_thread` (because `file.open().read()` +is synchronous blocking I/O): + +```python +import asyncio + +def _read_blob(file: daft.File | None) -> str | None: + if file is None: + return None + with file.open() as f: + data = f.read() + return f"decoded {len(data)} bytes" + +@daft.func(max_concurrency=8) +async def decode_image(file: daft.File | None) -> str | None: + return await asyncio.to_thread(_read_blob, file) + +result = df.with_column("info", decode_image(col("image"))) +result.show() +``` + +When running on Ray, the UDF calls are distributed across Ray workers +automatically — each worker processes its partition in parallel, giving you +batch-level concurrency without any extra code. + +### Streaming (chunk) reads + +`file.open()` returns a seekable stream that supports `read(size)`, so you can +process large blobs in chunks without loading everything into memory: + +```python +@daft.func(return_dtype=daft.DataType.binary()) +def first_4k(file: daft.File) -> bytes | None: + if file is None: + return None + with file.open() as f: + return f.read(4096) + +result = df.with_column("header", first_4k(col("image"))) +result.show() +``` + +See [Blob Storage — Streaming for Large Blobs](./blob#streaming-for-large-blobs) +for more details on the underlying `OffsetInputStream` API (`read(size)` / +`seek()` / `tell()`). + ## Catalog Abstraction Paimon catalogs can integrate with Daft's unified `Catalog` / `Table` interfaces: