Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 107 additions & 0 deletions docs/docs/pypaimon/daft.md
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,113 @@ Use `pypaimon.daft` when your application is written with Daft DataFrames and
you want Daft to schedule the execution on Ray. Use `pypaimon.ray` instead when
your application directly reads or writes Ray Datasets.

## Reading Blob Columns

Tables with BLOB columns (see [Blob Storage](./blob)) can be read with Daft.
Blob columns are returned as `daft.File` references — the actual bytes are
**not** loaded until you explicitly read them.

:::caution Daft's built-in connector does not support blob lazy loading yet

`daft.read_paimon()` currently reads blob data eagerly during the scan phase,
ignoring column pruning and filter pushdown for blob columns.
Use `from pypaimon.daft import read_paimon` instead for blob lazy loading.
:::

### Lazy loading

When you read a blob table through `pypaimon.daft`, the connector
automatically enables descriptor mode internally. Blob columns appear as
`daft.File` references in the DataFrame — no bytes are fetched from storage
at this point. The actual I/O only happens when you call `file.open()` inside
a UDF, and only for the rows that survive earlier filters:

```python
import daft
from daft import col
from pypaimon.daft import read_paimon

df = read_paimon(
"my_db.image_table",
catalog_options={"warehouse": "/path/to/warehouse"},
)

# Filter runs BEFORE any blob bytes are read.
df = df.where(col("id") < 100)

# file.length returns the blob size from the descriptor (zero I/O).
@daft.func(return_dtype=daft.DataType.int64())
def file_length(f: daft.File) -> int | None:
return None if f is None else f.length

result = df.with_column("size", file_length(col("image")))
result.show()
```

To read actual blob content (e.g., decode an image), use `file.open()`.
Only filtered rows trigger I/O:

```python
@daft.func
def decode_image(file: daft.File) -> str:
with file.open() as f:
data = f.read()
return f"decoded {len(data)} bytes"

result = df.with_column("info", decode_image(col("image")))
result.show()
```

### Parallel blob reading

By default, `@daft.func` processes rows sequentially. To read blobs
concurrently within a single worker, use an async UDF with
`max_concurrency` and `asyncio.to_thread` (because `file.open().read()`
is synchronous blocking I/O):

```python
import asyncio

def _read_blob(file: daft.File | None) -> str | None:
if file is None:
return None
with file.open() as f:
data = f.read()
return f"decoded {len(data)} bytes"

@daft.func(max_concurrency=8)
async def decode_image(file: daft.File | None) -> str | None:
return await asyncio.to_thread(_read_blob, file)

result = df.with_column("info", decode_image(col("image")))
result.show()
```

When running on Ray, the UDF calls are distributed across Ray workers
automatically — each worker processes its partition in parallel, giving you
batch-level concurrency without any extra code.

### Streaming (chunk) reads

`file.open()` returns a seekable stream that supports `read(size)`, so you can
process large blobs in chunks without loading everything into memory:

```python
@daft.func(return_dtype=daft.DataType.binary())
def first_4k(file: daft.File) -> bytes | None:
if file is None:
return None
with file.open() as f:
return f.read(4096)

result = df.with_column("header", first_4k(col("image")))
result.show()
```

See [Blob Storage — Streaming for Large Blobs](./blob#streaming-for-large-blobs)
for more details on the underlying `OffsetInputStream` API (`read(size)` /
`seek()` / `tell()`).

## Catalog Abstraction

Paimon catalogs can integrate with Daft's unified `Catalog` / `Table` interfaces:
Expand Down
Loading