From 08c1a18c883bfb111017573a38ec9fdd5f02813f Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Wed, 10 Jun 2026 11:57:02 +0800 Subject: [PATCH 01/12] Add Reading Blob Columns section to Daft documentation --- docs/docs/pypaimon/daft.md | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/docs/docs/pypaimon/daft.md b/docs/docs/pypaimon/daft.md index d02590d1555d..ada90aba8828 100644 --- a/docs/docs/pypaimon/daft.md +++ b/docs/docs/pypaimon/daft.md @@ -225,6 +225,37 @@ Use `pypaimon.daft` when your application is written with Daft DataFrames and you want Daft to schedule the execution on Ray. Use `pypaimon.ray` instead when your application directly reads or writes Ray Datasets. +## Reading Blob Columns + +Tables with BLOB columns (see [Blob Storage](./blob)) can be read with Daft. +Blob columns are returned as `daft.File` references — the actual bytes are +**not** loaded until you explicitly read them. Use `daft.func` to process blob +data in parallel: + +```python +import daft +from pypaimon.daft import read_paimon + +df = read_paimon( + "my_db.image_table", + catalog_options={"warehouse": "/path/to/warehouse"}, +) + +# Blob columns appear as File references (lazy, no I/O yet). +# Use @daft.func to read and process blob data in parallel. +@daft.func +def image_size(file: daft.File) -> int: + with file.open() as f: + return len(f.read()) + +result = df.with_column("size", image_size(df["image"])) +result.show() +``` + +When running on Ray, the UDF calls are distributed across Ray workers +automatically — each worker processes its partition in parallel, giving you +batch-level concurrency without any extra code. + ## Catalog Abstraction Paimon catalogs can integrate with Daft's unified `Catalog` / `Table` interfaces: From 81432d0ca75a9f4695fb6e8988aa22feb5a4e5e5 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Wed, 10 Jun 2026 12:11:31 +0800 Subject: [PATCH 02/12] Expand blob reading docs: lazy loading, parallel reads, migration from daft built-in --- docs/docs/pypaimon/daft.md | 52 ++++++++++++++++++++++++++++++++++---- 1 file changed, 47 insertions(+), 5 deletions(-) diff --git a/docs/docs/pypaimon/daft.md b/docs/docs/pypaimon/daft.md index ada90aba8828..8fa9492a73ff 100644 --- a/docs/docs/pypaimon/daft.md +++ b/docs/docs/pypaimon/daft.md @@ -229,8 +229,23 @@ your application directly reads or writes Ray Datasets. Tables with BLOB columns (see [Blob Storage](./blob)) can be read with Daft. Blob columns are returned as `daft.File` references — the actual bytes are -**not** loaded until you explicitly read them. Use `daft.func` to process blob -data in parallel: +**not** loaded until you explicitly read them. + +:::caution Use `pypaimon.daft`, not Daft's built-in connector + +Daft's built-in `daft.read_paimon()` does **not** support blob lazy loading — +it reads blob data eagerly during the scan phase, ignoring column pruning and +filter pushdown for blob columns. Always use `from pypaimon.daft import +read_paimon` for tables with BLOB columns. +::: + +### Lazy loading + +When you read a blob table through `pypaimon.daft`, the connector +automatically enables descriptor mode internally. Blob columns appear as +`daft.File` references in the DataFrame — no bytes are fetched from storage +at this point. The actual I/O only happens when you call `file.open()` inside +a UDF, and only for the rows that survive earlier filters: ```python import daft @@ -241,14 +256,41 @@ df = read_paimon( catalog_options={"warehouse": "/path/to/warehouse"}, ) -# Blob columns appear as File references (lazy, no I/O yet). -# Use @daft.func to read and process blob data in parallel. +# Filter runs BEFORE any blob bytes are read. +df = df.where(daft.col("obs_index") < 2) + +# Only filtered rows trigger blob I/O inside the UDF. @daft.func def image_size(file: daft.File) -> int: with file.open() as f: return len(f.read()) -result = df.with_column("size", image_size(df["image"])) +result = df.with_column("real_size", image_size(df["image_jpeg"])) +result.show() +``` + +### Parallel blob reading + +By default, `@daft.func` processes rows sequentially. To read blobs +concurrently, use an async UDF with `max_concurrency`: + +```python +import asyncio +import daft +from pypaimon.daft import read_paimon + +df = read_paimon( + "my_db.image_table", + catalog_options={"warehouse": "/path/to/warehouse"}, +) + +@daft.func(max_concurrency=8) +async def image_size(file: daft.File) -> int: + await asyncio.sleep(0) + with file.open() as f: + return len(f.read()) + +result = df.with_column("real_size", image_size(df["image_jpeg"])) result.show() ``` From 9f80f18dbeda9a642200c1d374bd2e01be58e3c8 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Wed, 10 Jun 2026 12:18:08 +0800 Subject: [PATCH 03/12] Soften caution wording for Daft built-in connector blob limitation --- docs/docs/pypaimon/daft.md | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/docs/docs/pypaimon/daft.md b/docs/docs/pypaimon/daft.md index 8fa9492a73ff..c72bc3a91cbd 100644 --- a/docs/docs/pypaimon/daft.md +++ b/docs/docs/pypaimon/daft.md @@ -231,12 +231,11 @@ Tables with BLOB columns (see [Blob Storage](./blob)) can be read with Daft. Blob columns are returned as `daft.File` references — the actual bytes are **not** loaded until you explicitly read them. -:::caution Use `pypaimon.daft`, not Daft's built-in connector +:::caution Daft's built-in connector does not support blob lazy loading yet -Daft's built-in `daft.read_paimon()` does **not** support blob lazy loading — -it reads blob data eagerly during the scan phase, ignoring column pruning and -filter pushdown for blob columns. Always use `from pypaimon.daft import -read_paimon` for tables with BLOB columns. +`daft.read_paimon()` currently reads blob data eagerly during the scan phase, +ignoring column pruning and filter pushdown for blob columns. +Use `from pypaimon.daft import read_paimon` instead for blob lazy loading. ::: ### Lazy loading From bc8f4589044338e32719166e505ca2f12dc90562 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Wed, 10 Jun 2026 12:20:25 +0800 Subject: [PATCH 04/12] Use generic column names in blob reading examples --- docs/docs/pypaimon/daft.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/docs/pypaimon/daft.md b/docs/docs/pypaimon/daft.md index c72bc3a91cbd..0235f45a6049 100644 --- a/docs/docs/pypaimon/daft.md +++ b/docs/docs/pypaimon/daft.md @@ -256,7 +256,7 @@ df = read_paimon( ) # Filter runs BEFORE any blob bytes are read. -df = df.where(daft.col("obs_index") < 2) +df = df.where(daft.col("id") < 100) # Only filtered rows trigger blob I/O inside the UDF. @daft.func @@ -264,7 +264,7 @@ def image_size(file: daft.File) -> int: with file.open() as f: return len(f.read()) -result = df.with_column("real_size", image_size(df["image_jpeg"])) +result = df.with_column("size", image_size(df["image"])) result.show() ``` @@ -289,7 +289,7 @@ async def image_size(file: daft.File) -> int: with file.open() as f: return len(f.read()) -result = df.with_column("real_size", image_size(df["image_jpeg"])) +result = df.with_column("size", image_size(df["image"])) result.show() ``` From b1969cdd616616072ae55e76a126303717198d56 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Wed, 10 Jun 2026 12:32:19 +0800 Subject: [PATCH 05/12] Add file.length example for getting blob size without I/O --- docs/docs/pypaimon/daft.md | 24 ++++++++++++++++++++++-- 1 file changed, 22 insertions(+), 2 deletions(-) diff --git a/docs/docs/pypaimon/daft.md b/docs/docs/pypaimon/daft.md index 0235f45a6049..64014f51939d 100644 --- a/docs/docs/pypaimon/daft.md +++ b/docs/docs/pypaimon/daft.md @@ -248,6 +248,7 @@ a UDF, and only for the rows that survive earlier filters: ```python import daft +from daft import col from pypaimon.daft import read_paimon df = read_paimon( @@ -256,7 +257,7 @@ df = read_paimon( ) # Filter runs BEFORE any blob bytes are read. -df = df.where(daft.col("id") < 100) +df = df.where(col("id") < 100) # Only filtered rows trigger blob I/O inside the UDF. @daft.func @@ -264,10 +265,29 @@ def image_size(file: daft.File) -> int: with file.open() as f: return len(f.read()) -result = df.with_column("size", image_size(df["image"])) +result = df.with_column("size", image_size(col("image"))) +result.show() +``` + +To get the blob size **without** reading the actual bytes, use `file.length` +which is available directly from the descriptor: + +```python +@daft.func(return_dtype=daft.DataType.int64()) +def file_length(f: daft.File) -> int | None: + return None if f is None else f.length + +result = ( + df.where(col("id") < 100) + .with_column("size", file_length(col("image"))) +) result.show() ``` +Note: `col("image").length()` does not work on `File` type columns. Use a +UDF with `file.length` instead — this reads the size from the blob descriptor +with zero I/O. + ### Parallel blob reading By default, `@daft.func` processes rows sequentially. To read blobs From 3a8ae5b5bb93e4ad82fbb96b57a17123359b88f9 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Wed, 10 Jun 2026 12:34:58 +0800 Subject: [PATCH 06/12] Reorganize blob examples: file.length first, file.open for content --- docs/docs/pypaimon/daft.md | 35 ++++++++++++++++------------------- 1 file changed, 16 insertions(+), 19 deletions(-) diff --git a/docs/docs/pypaimon/daft.md b/docs/docs/pypaimon/daft.md index 64014f51939d..dc36f5258f55 100644 --- a/docs/docs/pypaimon/daft.md +++ b/docs/docs/pypaimon/daft.md @@ -259,35 +259,32 @@ df = read_paimon( # Filter runs BEFORE any blob bytes are read. df = df.where(col("id") < 100) -# Only filtered rows trigger blob I/O inside the UDF. -@daft.func -def image_size(file: daft.File) -> int: - with file.open() as f: - return len(f.read()) +# file.length returns the blob size from the descriptor (zero I/O). +@daft.func(return_dtype=daft.DataType.int64()) +def file_length(f: daft.File) -> int | None: + return None if f is None else f.length -result = df.with_column("size", image_size(col("image"))) +result = df.with_column("size", file_length(col("image"))) result.show() ``` -To get the blob size **without** reading the actual bytes, use `file.length` -which is available directly from the descriptor: +Note: `col("image").length()` does not work on `File` type columns — use +`file.length` as shown above. + +To read actual blob content (e.g., decode an image), use `file.open()`. +Only filtered rows trigger I/O: ```python -@daft.func(return_dtype=daft.DataType.int64()) -def file_length(f: daft.File) -> int | None: - return None if f is None else f.length +@daft.func +def decode_image(file: daft.File) -> str: + with file.open() as f: + data = f.read() + return f"decoded {len(data)} bytes" -result = ( - df.where(col("id") < 100) - .with_column("size", file_length(col("image"))) -) +result = df.with_column("info", decode_image(col("image"))) result.show() ``` -Note: `col("image").length()` does not work on `File` type columns. Use a -UDF with `file.length` instead — this reads the size from the blob descriptor -with zero I/O. - ### Parallel blob reading By default, `@daft.func` processes rows sequentially. To read blobs From e3e4e3aa14282e52897aea4128116c440705f4fc Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Wed, 10 Jun 2026 13:03:48 +0800 Subject: [PATCH 07/12] Remove unnecessary note about col.length() --- docs/docs/pypaimon/daft.md | 3 --- 1 file changed, 3 deletions(-) diff --git a/docs/docs/pypaimon/daft.md b/docs/docs/pypaimon/daft.md index dc36f5258f55..f3664de8018f 100644 --- a/docs/docs/pypaimon/daft.md +++ b/docs/docs/pypaimon/daft.md @@ -268,9 +268,6 @@ result = df.with_column("size", file_length(col("image"))) result.show() ``` -Note: `col("image").length()` does not work on `File` type columns — use -`file.length` as shown above. - To read actual blob content (e.g., decode an image), use `file.open()`. Only filtered rows trigger I/O: From 50070374cddc0f0cd5db9abf2c6148d1737010cf Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Wed, 10 Jun 2026 13:07:09 +0800 Subject: [PATCH 08/12] Simplify parallel blob reading example --- docs/docs/pypaimon/daft.md | 17 ++++------------- 1 file changed, 4 insertions(+), 13 deletions(-) diff --git a/docs/docs/pypaimon/daft.md b/docs/docs/pypaimon/daft.md index f3664de8018f..bc641a2e0186 100644 --- a/docs/docs/pypaimon/daft.md +++ b/docs/docs/pypaimon/daft.md @@ -288,22 +288,13 @@ By default, `@daft.func` processes rows sequentially. To read blobs concurrently, use an async UDF with `max_concurrency`: ```python -import asyncio -import daft -from pypaimon.daft import read_paimon - -df = read_paimon( - "my_db.image_table", - catalog_options={"warehouse": "/path/to/warehouse"}, -) - @daft.func(max_concurrency=8) -async def image_size(file: daft.File) -> int: - await asyncio.sleep(0) +async def decode_image(file: daft.File) -> str: with file.open() as f: - return len(f.read()) + data = f.read() + return f"decoded {len(data)} bytes" -result = df.with_column("size", image_size(df["image"])) +result = df.with_column("info", decode_image(col("image"))) result.show() ``` From 565acb9a8e9b397beee6c9566aaad89e909d8fdc Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Wed, 10 Jun 2026 13:49:14 +0800 Subject: [PATCH 09/12] Add streaming (chunk) reads section to Daft blob documentation --- docs/docs/pypaimon/daft.md | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/docs/docs/pypaimon/daft.md b/docs/docs/pypaimon/daft.md index bc641a2e0186..cd3ecc51b0e4 100644 --- a/docs/docs/pypaimon/daft.md +++ b/docs/docs/pypaimon/daft.md @@ -302,6 +302,26 @@ When running on Ray, the UDF calls are distributed across Ray workers automatically — each worker processes its partition in parallel, giving you batch-level concurrency without any extra code. +### Streaming (chunk) reads + +`file.open()` returns a seekable stream that supports `read(size)`, so you can +process large blobs in chunks without loading everything into memory: + +```python +@daft.func(return_dtype=daft.DataType.binary()) +def first_4k(file: daft.File) -> bytes | None: + if file is None: + return None + with file.open() as f: + return f.read(4096) + +result = df.with_column("header", first_4k(col("image"))) +result.show() +``` + +See [Blob Storage — Streaming Read](./blob#streaming-read) for more details on +the underlying `OffsetInputStream` API (`read(size)` / `seek()` / `tell()`). + ## Catalog Abstraction Paimon catalogs can integrate with Daft's unified `Catalog` / `Table` interfaces: From 1b5e8743413aa9035d9bee0efada59351393c8ed Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Wed, 10 Jun 2026 16:10:11 +0800 Subject: [PATCH 10/12] Fix parallel blob reading example: use asyncio.to_thread for sync I/O --- docs/docs/pypaimon/daft.md | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/docs/docs/pypaimon/daft.md b/docs/docs/pypaimon/daft.md index cd3ecc51b0e4..cf57dcd5cdc5 100644 --- a/docs/docs/pypaimon/daft.md +++ b/docs/docs/pypaimon/daft.md @@ -285,15 +285,22 @@ result.show() ### Parallel blob reading By default, `@daft.func` processes rows sequentially. To read blobs -concurrently, use an async UDF with `max_concurrency`: +concurrently within a single worker, use an async UDF with +`max_concurrency` and `asyncio.to_thread` (because `file.open().read()` +is synchronous blocking I/O): ```python -@daft.func(max_concurrency=8) -async def decode_image(file: daft.File) -> str: +import asyncio + +def _read_blob(file: daft.File) -> str: with file.open() as f: data = f.read() return f"decoded {len(data)} bytes" +@daft.func(max_concurrency=8) +async def decode_image(file: daft.File) -> str: + return await asyncio.to_thread(_read_blob, file) + result = df.with_column("info", decode_image(col("image"))) result.show() ``` From 06ec1c6f9961de802bf4591496d6274120465e9b Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Wed, 10 Jun 2026 16:47:05 +0800 Subject: [PATCH 11/12] Handle nullable blob in parallel reading example --- docs/docs/pypaimon/daft.md | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/docs/docs/pypaimon/daft.md b/docs/docs/pypaimon/daft.md index cf57dcd5cdc5..99540b8b5717 100644 --- a/docs/docs/pypaimon/daft.md +++ b/docs/docs/pypaimon/daft.md @@ -292,13 +292,15 @@ is synchronous blocking I/O): ```python import asyncio -def _read_blob(file: daft.File) -> str: +def _read_blob(file: daft.File | None) -> str | None: + if file is None: + return None with file.open() as f: data = f.read() return f"decoded {len(data)} bytes" @daft.func(max_concurrency=8) -async def decode_image(file: daft.File) -> str: +async def decode_image(file: daft.File | None) -> str | None: return await asyncio.to_thread(_read_blob, file) result = df.with_column("info", decode_image(col("image"))) From 0378a72e9dc23f3646082a5d357d3839e4b45377 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Wed, 10 Jun 2026 21:27:26 +0800 Subject: [PATCH 12/12] Fix broken anchor: blob#streaming-read -> blob#streaming-for-large-blobs --- docs/docs/pypaimon/daft.md | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/docs/docs/pypaimon/daft.md b/docs/docs/pypaimon/daft.md index 99540b8b5717..c553bc2fbd16 100644 --- a/docs/docs/pypaimon/daft.md +++ b/docs/docs/pypaimon/daft.md @@ -328,8 +328,9 @@ result = df.with_column("header", first_4k(col("image"))) result.show() ``` -See [Blob Storage — Streaming Read](./blob#streaming-read) for more details on -the underlying `OffsetInputStream` API (`read(size)` / `seek()` / `tell()`). +See [Blob Storage — Streaming for Large Blobs](./blob#streaming-for-large-blobs) +for more details on the underlying `OffsetInputStream` API (`read(size)` / +`seek()` / `tell()`). ## Catalog Abstraction