[python][ray] Abort worker writes on failure#8124
Conversation
Ray write tasks previously closed the worker-side TableWrite even when write or commit preparation failed. Closing can flush pending data, and prepare_commit can materialize normal, blob, or vector files before a later failure prevents the driver commit. Add abort propagation through TableWrite and FileStoreWrite, and make Ray worker writes abort instead of close on failure. Keep dedicated blob and vector metadata in the parent writer so abort can delete files produced by prepare_commit. Signed-off-by: QuakeWang <wangfuzheng0814@foxmail.com>
|
Thanks for the PR! I agree abort is useful before prepare_commit() succeeds, but I’m worried about aborting after commit messages have already been produced. The main risk here is after prepare_commit() succeeds. RayDatasink.write() will return the prepared commit messages after close(). But DedicatedFormatWriter.close() catches exceptions, calls abort(), and does not re-raise. So if close() fails, abort() may delete files referenced by the prepared commit messages, while the worker still returns those messages to the driver. The driver may then commit a manifest with missing files. We hit an abort-related data-safety issue in production before; see #7232 for context |
Dedicated and vector writers aborted on close failures but swallowed the exception. After Ray workers prepared commit messages, that could let a worker return messages whose files had been deleted by abort. Re-raise close failures after aborting so RayDatasink fails the task instead of returning stale messages. Add regressions for dedicated and vector writer close failures after prepare_commit. Signed-off-by: QuakeWang <wangfuzheng0814@foxmail.com>
|
@XiaoHongbo-Hope Thanks for catching this. You are right: I fixed it by re-raising after abort in |
|
I reviewed this PR as part of a pass over recent open non-draft Paimon PRs. I did not find any clear correctness, compatibility, or regression issue to flag from the current diff. |
Purpose
Ray write tasks previously closed on the worker-side
TableWritewhen write, prepare, or close failed. That is unsafe because close can flush pending data, andprepare_commit()can materialize normal, blob, or vector files before a later failure prevents the driver commit.This PR makes worker-side Ray writes abort on failure and propagates abort through
TableWriteandFileStoreWrite. It also keeps dedicated blob/vector metadata reachable from the parent writer so abort can delete files produced before the failed commit path.Tests
python -m pytest pypaimon/tests/ray_sink_test.py::RaySinkTest::test_write -q