Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 54 additions & 1 deletion docs/docs/pypaimon/daft.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,13 @@ under the License.
This requires `daft` to be installed:

```bash
pip install pypaimon[daft]
pip install 'pypaimon[daft]'
```

To execute Daft plans on Ray, install both extras:

```bash
pip install 'pypaimon[daft,ray]'
```

`pypaimon.daft` exposes a top-level `read_paimon` / `write_paimon` API that
Expand Down Expand Up @@ -165,12 +171,59 @@ write_paimon(
)
```

For unpartitioned tables, overwrite replaces the table contents. For
partitioned tables, overwrite follows Paimon's dynamic partition overwrite
semantics by default: only partitions present in the input DataFrame are
replaced, and existing partitions not present in the input are kept.

**Parameters:**
- `df`: the Daft DataFrame to write.
- `table_identifier`: full table name, e.g. `"db_name.table_name"`.
- `catalog_options`: kwargs forwarded to `CatalogFactory.create()`.
- `mode`: write mode — `"append"` (default) or `"overwrite"`.

## Running Daft on Ray

`pypaimon.daft` works with Daft's Ray runner. Configure the runner before the
first Daft execution in the process:

```python
import daft
import ray
from daft import runners
from pypaimon.daft import read_paimon, write_paimon

ray.init() # use address="auto" to connect to an existing Ray cluster
runners.set_runner_ray()

df = daft.from_pydict({
"id": [1, 2, 3],
"name": ["alice", "bob", "charlie"],
"dt": ["2024-01-01", "2024-01-01", "2024-01-02"],
})

write_paimon(
df,
"database_name.table_name",
catalog_options={"warehouse": "/path/to/warehouse"},
)

result = (
read_paimon(
"database_name.table_name",
catalog_options={"warehouse": "/path/to/warehouse"},
)
.where(daft.col("dt") == "2024-01-01")
.select("id", "name")
)

result.show()
```

Use `pypaimon.daft` when your application is written with Daft DataFrames and
you want Daft to schedule the execution on Ray. Use `pypaimon.ray` instead when
your application directly reads or writes Ray Datasets.

## Catalog Abstraction

Paimon catalogs can integrate with Daft's unified `Catalog` / `Table` interfaces:
Expand Down
3 changes: 3 additions & 0 deletions docs/docs/pypaimon/ray-data.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ Ray's built-in Iceberg integration. The lower-level `TableRead.to_ray()` and
already resolved a `(read_builder, splits)` pair or constructed a
`table_write` via the regular pypaimon API.

If your application uses Daft DataFrames and only needs Ray as Daft's execution
backend, see [Running Daft on Ray](./daft.md#running-daft-on-ray).

## Read

### `read_paimon` (recommended)
Expand Down
225 changes: 225 additions & 0 deletions paimon-python/pypaimon/tests/daft/daft_on_ray_e2e_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

"""End-to-end tests for pypaimon.daft with Daft's Ray runner.

Daft runners are process-global and cannot be switched after initialization.
Run the Ray-runner scenario in a fresh Python subprocess.
"""

from __future__ import annotations

import importlib.util
import os
import subprocess
import sys
import textwrap

import pytest

pytestmark = pytest.mark.skipif(
importlib.util.find_spec("daft") is None
or importlib.util.find_spec("ray") is None,
reason="Daft-on-Ray e2e requires both daft and ray",
)


def test_daft_on_ray_read_write_e2e():
python_root = os.path.abspath(
os.path.join(os.path.dirname(__file__), "..", "..", "..")
)
script = textwrap.dedent(
r'''
import os
import shutil
import tempfile

os.environ.setdefault("RAY_TMPDIR", "/tmp")

import daft
import pyarrow as pa
import ray
from daft import runners
from pypaimon import CatalogFactory, Schema
from pypaimon.daft import PaimonCatalog, read_paimon, write_paimon

root = tempfile.mkdtemp(prefix="paimon-daft-on-ray-")
try:
warehouse = os.path.join(root, "warehouse")
catalog_options = {"warehouse": warehouse}
catalog = CatalogFactory.create(catalog_options)
catalog.create_database("test_db", False)

ray.init(
num_cpus=2,
include_dashboard=False,
ignore_reinit_error=True,
)
runners.set_runner_ray()
assert runners.get_or_create_runner().name == "ray"

schema = Schema.from_pyarrow_schema(
pa.schema([
pa.field("id", pa.int64()),
pa.field("name", pa.string()),
pa.field("dt", pa.string()),
]),
partition_keys=["dt"],
options={"file.format": "parquet"},
)
catalog.create_table("test_db.t", schema, False)

df = daft.from_pydict({
"id": [3, 1, 2, 4],
"name": ["c", "a", "b", "d"],
"dt": [
"2026-06-07",
"2026-06-07",
"2026-06-07",
"2026-06-08",
],
}).into_partitions(2)

summary = write_paimon(
df,
"test_db.t",
catalog_options=catalog_options,
).to_arrow()
assert sum(summary.column("rows").to_pylist()) == 4

result = (
read_paimon("test_db.t", catalog_options=catalog_options)
.where(daft.col("dt") == "2026-06-07")
.select("id", "name")
.sort("id")
.to_arrow()
)
assert result.to_pydict() == {
"id": [1, 2, 3],
"name": ["a", "b", "c"],
}

write_paimon(
daft.from_pydict({
"id": [10],
"name": ["z"],
"dt": ["2026-06-07"],
}),
"test_db.t",
catalog_options=catalog_options,
mode="overwrite",
).to_arrow()
same_partition = (
read_paimon("test_db.t", catalog_options=catalog_options)
.sort("id")
.to_arrow()
)
assert same_partition.to_pydict() == {
"id": [4, 10],
"name": ["d", "z"],
"dt": ["2026-06-08", "2026-06-07"],
}

write_paimon(
daft.from_pydict({
"id": [20],
"name": ["new"],
"dt": ["2026-06-09"],
}),
"test_db.t",
catalog_options=catalog_options,
mode="overwrite",
).to_arrow()
new_partition = (
read_paimon("test_db.t", catalog_options=catalog_options)
.sort("id")
.to_arrow()
)
assert new_partition.to_pydict() == {
"id": [4, 10, 20],
"name": ["d", "z", "new"],
"dt": ["2026-06-08", "2026-06-07", "2026-06-09"],
}

unpartitioned_schema = Schema.from_pyarrow_schema(
pa.schema([
pa.field("id", pa.int64()),
pa.field("name", pa.string()),
]),
options={"file.format": "parquet"},
)
catalog.create_table(
"test_db.unpartitioned",
unpartitioned_schema,
False,
)
write_paimon(
daft.from_pydict({"id": [1, 2], "name": ["a", "b"]}),
"test_db.unpartitioned",
catalog_options=catalog_options,
).to_arrow()
write_paimon(
daft.from_pydict({"id": [30], "name": ["only"]}),
"test_db.unpartitioned",
catalog_options=catalog_options,
mode="overwrite",
).to_arrow()
unpartitioned = read_paimon(
"test_db.unpartitioned",
catalog_options=catalog_options,
).to_arrow()
assert unpartitioned.to_pydict() == {
"id": [30],
"name": ["only"],
}

daft_catalog = PaimonCatalog(
CatalogFactory.create(catalog_options),
name="paimon",
)
wrapper_result = (
daft_catalog.get_table("test_db.unpartitioned")
.read()
.to_arrow()
)
assert wrapper_result.to_pydict() == {
"id": [30],
"name": ["only"],
}
finally:
if ray.is_initialized():
ray.shutdown()
shutil.rmtree(root, ignore_errors=True)
'''
)

env = os.environ.copy()
env["PYTHONPATH"] = os.pathsep.join(
[python_root, env.get("PYTHONPATH", "")]
).rstrip(os.pathsep)
result = subprocess.run(
[sys.executable, "-c", script],
capture_output=True,
env=env,
text=True,
timeout=120,
)
assert result.returncode == 0, (
f"stdout:\n{result.stdout}\n"
f"stderr:\n{result.stderr}"
)
6 changes: 5 additions & 1 deletion paimon-python/pypaimon/tests/daft/daft_sink_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,11 @@ def test_write_read_blob_type(self, local_paimon_catalog):
assert isinstance(ref, daft.File)
assert isinstance(ref.path, str)
assert ".blob" in ref.path
assert ref.offset is not None
# Daft 0.7.15 renamed File.offset to File.position.
position = getattr(ref, "position", None)
if position is None:
position = ref.offset
assert position is not None
assert ref.length is not None


Expand Down
Loading