From b76c93844f5310b4dde408bc9221ed6b584e8462 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Sun, 7 Jun 2026 11:07:10 +0800 Subject: [PATCH 1/4] [python] Add Daft on Ray e2e and docs --- docs/docs/pypaimon/daft.md | 55 ++++- docs/docs/pypaimon/ray-data.md | 3 + .../tests/daft/daft_on_ray_e2e_test.py | 221 ++++++++++++++++++ 3 files changed, 278 insertions(+), 1 deletion(-) create mode 100644 paimon-python/pypaimon/tests/daft/daft_on_ray_e2e_test.py diff --git a/docs/docs/pypaimon/daft.md b/docs/docs/pypaimon/daft.md index ae158c3283e7..08ce5bbfef53 100644 --- a/docs/docs/pypaimon/daft.md +++ b/docs/docs/pypaimon/daft.md @@ -30,7 +30,13 @@ under the License. This requires `daft` to be installed: ```bash -pip install pypaimon[daft] +pip install 'pypaimon[daft]' +``` + +To execute Daft plans on Ray, install both extras: + +```bash +pip install 'pypaimon[daft,ray]' ``` `pypaimon.daft` exposes a top-level `read_paimon` / `write_paimon` API that @@ -165,12 +171,59 @@ write_paimon( ) ``` +For unpartitioned tables, overwrite replaces the table contents. For +partitioned tables, overwrite follows Paimon's dynamic partition overwrite +semantics by default: only partitions present in the input DataFrame are +replaced, and existing partitions not present in the input are kept. + **Parameters:** - `df`: the Daft DataFrame to write. - `table_identifier`: full table name, e.g. `"db_name.table_name"`. - `catalog_options`: kwargs forwarded to `CatalogFactory.create()`. - `mode`: write mode — `"append"` (default) or `"overwrite"`. +## Running Daft on Ray + +`pypaimon.daft` works with Daft's Ray runner. Configure the runner before the +first Daft execution in the process: + +```python +import daft +import ray +from daft import runners +from pypaimon.daft import read_paimon, write_paimon + +ray.init() # use address="auto" to connect to an existing Ray cluster +runners.set_runner_ray() + +df = daft.from_pydict({ + "id": [1, 2, 3], + "name": ["alice", "bob", "charlie"], + "dt": ["2024-01-01", "2024-01-01", "2024-01-02"], +}) + +write_paimon( + df, + "database_name.table_name", + catalog_options={"warehouse": "/path/to/warehouse"}, +) + +result = ( + read_paimon( + "database_name.table_name", + catalog_options={"warehouse": "/path/to/warehouse"}, + ) + .where(daft.col("dt") == "2024-01-01") + .select("id", "name") +) + +result.show() +``` + +Use `pypaimon.daft` when your application is written with Daft DataFrames and +you want Daft to schedule the execution on Ray. Use `pypaimon.ray` instead when +your application directly reads or writes Ray Datasets. + ## Catalog Abstraction Paimon catalogs can integrate with Daft's unified `Catalog` / `Table` interfaces: diff --git a/docs/docs/pypaimon/ray-data.md b/docs/docs/pypaimon/ray-data.md index a6d98f8fcce0..679d7080ad08 100644 --- a/docs/docs/pypaimon/ray-data.md +++ b/docs/docs/pypaimon/ray-data.md @@ -34,6 +34,9 @@ Ray's built-in Iceberg integration. The lower-level `TableRead.to_ray()` and already resolved a `(read_builder, splits)` pair or constructed a `table_write` via the regular pypaimon API. +If your application uses Daft DataFrames and only needs Ray as Daft's execution +backend, see [Running Daft on Ray](./daft.md#running-daft-on-ray). + ## Read ### `read_paimon` (recommended) diff --git a/paimon-python/pypaimon/tests/daft/daft_on_ray_e2e_test.py b/paimon-python/pypaimon/tests/daft/daft_on_ray_e2e_test.py new file mode 100644 index 000000000000..6ba5a9f72d21 --- /dev/null +++ b/paimon-python/pypaimon/tests/daft/daft_on_ray_e2e_test.py @@ -0,0 +1,221 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +"""End-to-end tests for pypaimon.daft with Daft's Ray runner. + +Daft runners are process-global and cannot be switched after initialization. +Run the Ray-runner scenario in a fresh Python subprocess. +""" + +from __future__ import annotations + +import importlib.util +import os +import subprocess +import sys +import textwrap + +import pytest + +pytestmark = pytest.mark.skipif( + importlib.util.find_spec("daft") is None + or importlib.util.find_spec("ray") is None, + reason="Daft-on-Ray e2e requires both daft and ray", +) + + +def test_daft_on_ray_read_write_e2e(): + python_root = os.path.abspath( + os.path.join(os.path.dirname(__file__), "..", "..", "..") + ) + script = textwrap.dedent( + r''' + import os + import shutil + import tempfile + + os.environ.setdefault("RAY_TMPDIR", "/tmp") + + import daft + import pyarrow as pa + import ray + from daft import runners + from pypaimon import CatalogFactory, Schema + from pypaimon.daft import PaimonCatalog, read_paimon, write_paimon + + root = tempfile.mkdtemp(prefix="paimon-daft-on-ray-") + try: + warehouse = os.path.join(root, "warehouse") + catalog_options = {"warehouse": warehouse} + catalog = CatalogFactory.create(catalog_options) + catalog.create_database("test_db", False) + + ray.init( + num_cpus=2, + include_dashboard=False, + ignore_reinit_error=True, + ) + runners.set_runner_ray() + assert runners.get_or_create_runner().name == "ray" + + schema = Schema.from_pyarrow_schema( + pa.schema([ + pa.field("id", pa.int64()), + pa.field("name", pa.string()), + pa.field("dt", pa.string()), + ]), + partition_keys=["dt"], + options={"file.format": "parquet"}, + ) + catalog.create_table("test_db.t", schema, False) + + df = daft.from_pydict({ + "id": [3, 1, 2, 4], + "name": ["c", "a", "b", "d"], + "dt": [ + "2026-06-07", + "2026-06-07", + "2026-06-07", + "2026-06-08", + ], + }).into_partitions(2) + + summary = write_paimon( + df, + "test_db.t", + catalog_options=catalog_options, + ).to_arrow() + assert sum(summary.column("rows").to_pylist()) == 4 + + result = ( + read_paimon("test_db.t", catalog_options=catalog_options) + .where(daft.col("dt") == "2026-06-07") + .select("id", "name") + .sort("id") + .to_arrow() + ) + assert result.to_pydict() == { + "id": [1, 2, 3], + "name": ["a", "b", "c"], + } + + write_paimon( + daft.from_pydict({ + "id": [10], + "name": ["z"], + "dt": ["2026-06-07"], + }), + "test_db.t", + catalog_options=catalog_options, + mode="overwrite", + ).to_arrow() + same_partition = ( + read_paimon("test_db.t", catalog_options=catalog_options) + .sort("id") + .to_arrow() + ) + assert same_partition.to_pydict() == { + "id": [4, 10], + "name": ["d", "z"], + "dt": ["2026-06-08", "2026-06-07"], + } + + write_paimon( + daft.from_pydict({ + "id": [20], + "name": ["new"], + "dt": ["2026-06-09"], + }), + "test_db.t", + catalog_options=catalog_options, + mode="overwrite", + ).to_arrow() + new_partition = ( + read_paimon("test_db.t", catalog_options=catalog_options) + .sort("id") + .to_arrow() + ) + assert new_partition.to_pydict() == { + "id": [4, 10, 20], + "name": ["d", "z", "new"], + "dt": ["2026-06-08", "2026-06-07", "2026-06-09"], + } + + unpartitioned_schema = Schema.from_pyarrow_schema( + pa.schema([ + pa.field("id", pa.int64()), + pa.field("name", pa.string()), + ]), + options={"file.format": "parquet"}, + ) + catalog.create_table( + "test_db.unpartitioned", + unpartitioned_schema, + False, + ) + write_paimon( + daft.from_pydict({"id": [1, 2], "name": ["a", "b"]}), + "test_db.unpartitioned", + catalog_options=catalog_options, + ).to_arrow() + write_paimon( + daft.from_pydict({"id": [30], "name": ["only"]}), + "test_db.unpartitioned", + catalog_options=catalog_options, + mode="overwrite", + ).to_arrow() + unpartitioned = read_paimon( + "test_db.unpartitioned", + catalog_options=catalog_options, + ).to_arrow() + assert unpartitioned.to_pydict() == { + "id": [30], + "name": ["only"], + } + + daft_catalog = PaimonCatalog( + CatalogFactory.create(catalog_options), + name="paimon", + ) + wrapper_result = ( + daft_catalog.get_table("test_db.unpartitioned") + .read() + .to_arrow() + ) + assert wrapper_result.to_pydict() == { + "id": [30], + "name": ["only"], + } + finally: + if ray.is_initialized(): + ray.shutdown() + shutil.rmtree(root, ignore_errors=True) + ''' + ) + + env = os.environ.copy() + env["PYTHONPATH"] = os.pathsep.join( + [python_root, env.get("PYTHONPATH", "")] + ).rstrip(os.pathsep) + result = subprocess.run( + [sys.executable, "-c", script], + capture_output=True, + env=env, + text=True, + ) + assert result.returncode == 0, result.stdout + result.stderr From 8df7f78460e580fd2680b2b72c2f017474085e49 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Sun, 7 Jun 2026 11:18:10 +0800 Subject: [PATCH 2/4] [python] Add timeout to Daft on Ray e2e --- paimon-python/pypaimon/tests/daft/daft_on_ray_e2e_test.py | 1 + 1 file changed, 1 insertion(+) diff --git a/paimon-python/pypaimon/tests/daft/daft_on_ray_e2e_test.py b/paimon-python/pypaimon/tests/daft/daft_on_ray_e2e_test.py index 6ba5a9f72d21..2f076582e6fc 100644 --- a/paimon-python/pypaimon/tests/daft/daft_on_ray_e2e_test.py +++ b/paimon-python/pypaimon/tests/daft/daft_on_ray_e2e_test.py @@ -217,5 +217,6 @@ def test_daft_on_ray_read_write_e2e(): capture_output=True, env=env, text=True, + timeout=120, ) assert result.returncode == 0, result.stdout + result.stderr From 36a5fd58688de12a183c8c1697696660306c758f Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Sun, 7 Jun 2026 11:26:22 +0800 Subject: [PATCH 3/4] [python] Improve Daft on Ray e2e failure output --- paimon-python/pypaimon/tests/daft/daft_on_ray_e2e_test.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/paimon-python/pypaimon/tests/daft/daft_on_ray_e2e_test.py b/paimon-python/pypaimon/tests/daft/daft_on_ray_e2e_test.py index 2f076582e6fc..5dd1bcf3fd4d 100644 --- a/paimon-python/pypaimon/tests/daft/daft_on_ray_e2e_test.py +++ b/paimon-python/pypaimon/tests/daft/daft_on_ray_e2e_test.py @@ -219,4 +219,7 @@ def test_daft_on_ray_read_write_e2e(): text=True, timeout=120, ) - assert result.returncode == 0, result.stdout + result.stderr + assert result.returncode == 0, ( + f"stdout:\n{result.stdout}\n" + f"stderr:\n{result.stderr}" + ) From 4cc957d9cf1894b844b75abbc104c73414ec0f6b Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Sun, 7 Jun 2026 12:04:23 +0800 Subject: [PATCH 4/4] [python] Fix Daft file position test compatibility --- paimon-python/pypaimon/tests/daft/daft_sink_test.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/paimon-python/pypaimon/tests/daft/daft_sink_test.py b/paimon-python/pypaimon/tests/daft/daft_sink_test.py index ad963de4447e..36196c33b357 100644 --- a/paimon-python/pypaimon/tests/daft/daft_sink_test.py +++ b/paimon-python/pypaimon/tests/daft/daft_sink_test.py @@ -483,7 +483,11 @@ def test_write_read_blob_type(self, local_paimon_catalog): assert isinstance(ref, daft.File) assert isinstance(ref.path, str) assert ".blob" in ref.path - assert ref.offset is not None + # Daft 0.7.15 renamed File.offset to File.position. + position = getattr(ref, "position", None) + if position is None: + position = ref.offset + assert position is not None assert ref.length is not None