From 23b98252fc6ed7876fadeb73a3bbc169e5250a49 Mon Sep 17 00:00:00 2001 From: QuakeWang Date: Thu, 4 Jun 2026 22:23:52 +0800 Subject: [PATCH 1/2] [python][ray] Abort worker writes on failure Ray write tasks previously closed the worker-side TableWrite even when write or commit preparation failed. Closing can flush pending data, and prepare_commit can materialize normal, blob, or vector files before a later failure prevents the driver commit. Add abort propagation through TableWrite and FileStoreWrite, and make Ray worker writes abort instead of close on failure. Keep dedicated blob and vector metadata in the parent writer so abort can delete files produced by prepare_commit. Signed-off-by: QuakeWang --- .../tests/data_evolution_formats_test.py | 78 +++++++++++++++++++ paimon-python/pypaimon/tests/ray_sink_test.py | 22 +++++- .../pypaimon/write/file_store_write.py | 9 +++ paimon-python/pypaimon/write/ray_datasink.py | 17 +++- paimon-python/pypaimon/write/table_write.py | 3 + .../write/writer/data_vector_writer.py | 4 +- .../pypaimon/write/writer/data_writer.py | 14 ++-- .../write/writer/dedicated_format_writer.py | 16 ++-- 8 files changed, 144 insertions(+), 19 deletions(-) diff --git a/paimon-python/pypaimon/tests/data_evolution_formats_test.py b/paimon-python/pypaimon/tests/data_evolution_formats_test.py index f89f3290f199..f548d5b4afee 100644 --- a/paimon-python/pypaimon/tests/data_evolution_formats_test.py +++ b/paimon-python/pypaimon/tests/data_evolution_formats_test.py @@ -46,6 +46,10 @@ def setUpClass(cls): def tearDownClass(cls): shutil.rmtree(cls.tempdir, ignore_errors=True) + @staticmethod + def _file_path(file_meta): + return file_meta.external_path if file_meta.external_path else file_meta.file_path + # ------------------------------------------------------------------ # Parquet-format data evolution # ------------------------------------------------------------------ @@ -236,6 +240,41 @@ def test_blob_write_and_read(self): self.assertEqual(actual.column('id').to_pylist(), [1, 2, 3]) self.assertEqual(actual.column('payload').to_pylist(), blobs) + def test_blob_abort_deletes_uncommitted_files(self): + pa_schema = pa.schema([ + ('id', pa.int32()), + ('payload', pa.large_binary()), + ]) + schema = Schema.from_pyarrow_schema(pa_schema, options={ + 'row-tracking.enabled': 'true', + 'data-evolution.enabled': 'true', + }) + self.catalog.create_table('default.fmt_blob_abort_cleanup', schema, False) + table = self.catalog.get_table('default.fmt_blob_abort_cleanup') + + writer = table.new_batch_write_builder().new_write() + writer.write_arrow(pa.Table.from_pydict({ + 'id': [1, 2, 3], + 'payload': [b'a', b'b', b'c'], + }, schema=pa_schema)) + commit_messages = writer.prepare_commit() + + all_files = [nf for msg in commit_messages for nf in msg.new_files] + parquet_files = [f for f in all_files if f.file_name.endswith('.parquet')] + blob_files = [f for f in all_files if f.file_name.endswith('.blob')] + self.assertGreater(len(parquet_files), 0) + self.assertGreater(len(blob_files), 0) + for file_meta in all_files: + self.assertTrue(table.file_io.exists(self._file_path(file_meta))) + + writer.abort() + + for file_meta in all_files: + self.assertFalse( + table.file_io.exists(self._file_path(file_meta)), + f"Expected abort to delete {file_meta.file_name}", + ) + def test_blob_column_subset_evolution(self): """Write normal+blob cols in one commit, overwrite normal col in another, merge-read.""" pa_schema = pa.schema([ @@ -563,6 +602,45 @@ def test_vortex_with_row_id_and_filter(self): # Vector (vortex) file format for embedding columns # ------------------------------------------------------------------ + def test_vector_abort_deletes_uncommitted_files(self): + pa_schema = pa.schema([ + ('id', pa.int64()), + ('embed', pa.list_(pa.float32(), 3)), + ]) + schema = Schema.from_pyarrow_schema(pa_schema, options={ + 'row-tracking.enabled': 'true', + 'data-evolution.enabled': 'true', + 'vector.file.format': 'parquet', + }) + self.catalog.create_table('default.fmt_vector_abort_cleanup', schema, False) + table = self.catalog.get_table('default.fmt_vector_abort_cleanup') + + writer = table.new_batch_write_builder().new_write() + writer.write_arrow(pa.table({ + 'id': pa.array([1, 2, 3], type=pa.int64()), + 'embed': pa.FixedSizeListArray.from_arrays( + pa.array([0.1, 0.2, 0.3, + 0.4, 0.5, 0.6, + 0.7, 0.8, 0.9], type=pa.float32()), 3), + })) + commit_messages = writer.prepare_commit() + + all_files = [nf for msg in commit_messages for nf in msg.new_files] + normal_files = [f for f in all_files if not DataFileMeta.is_vector_file(f.file_name)] + vector_files = [f for f in all_files if DataFileMeta.is_vector_file(f.file_name)] + self.assertGreater(len(normal_files), 0) + self.assertGreater(len(vector_files), 0) + for file_meta in all_files: + self.assertTrue(table.file_io.exists(self._file_path(file_meta))) + + writer.abort() + + for file_meta in all_files: + self.assertFalse( + table.file_io.exists(self._file_path(file_meta)), + f"Expected abort to delete {file_meta.file_name}", + ) + @unittest.skipIf(sys.version_info < (3, 11), "vortex-data requires Python >= 3.11") @unittest.skipUnless( __import__('importlib').util.find_spec('vortex') is not None, diff --git a/paimon-python/pypaimon/tests/ray_sink_test.py b/paimon-python/pypaimon/tests/ray_sink_test.py index a6d761df5add..074d9198a54e 100644 --- a/paimon-python/pypaimon/tests/ray_sink_test.py +++ b/paimon-python/pypaimon/tests/ray_sink_test.py @@ -272,7 +272,27 @@ def test_write(self): }) with self.assertRaises(Exception): datasink.write([data_table], ctx) - mock_write.close.assert_called_once() + mock_write.abort.assert_called_once() + mock_write.close.assert_not_called() + + with patch.object(self.table, 'new_batch_write_builder') as mock_builder: + mock_write_builder = Mock() + mock_write_builder.overwrite.return_value = mock_write_builder + mock_write = Mock() + mock_write.prepare_commit.return_value = [Mock(spec=CommitMessage)] + mock_write.close.side_effect = Exception("Close error") + mock_write_builder.new_write.return_value = mock_write + mock_builder.return_value = mock_write_builder + + data_table = pa.table({ + 'id': [1], + 'name': ['Alice'], + 'value': [1.1] + }) + with self.assertRaises(Exception): + datasink.write([data_table], ctx) + mock_write.prepare_commit.assert_called_once() + mock_write.abort.assert_called_once() def test_on_write_complete(self): from ray.data.datasource.datasink import WriteResult diff --git a/paimon-python/pypaimon/write/file_store_write.py b/paimon-python/pypaimon/write/file_store_write.py index c77f88e907be..556dad9ad193 100644 --- a/paimon-python/pypaimon/write/file_store_write.py +++ b/paimon-python/pypaimon/write/file_store_write.py @@ -245,6 +245,15 @@ def close(self): writer.close() self.data_writers.clear() + def abort(self): + """Abort all data writers and clean up files produced by this write.""" + for writer in self.data_writers.values(): + try: + writer.abort() + except Exception as e: + logger.warning("Failed to abort data writer.", exc_info=e) + self.data_writers.clear() + def _seq_number_stats(self, partition: Tuple) -> Dict[int, int]: buckets = self.max_seq_numbers.get(partition) if buckets is None: diff --git a/paimon-python/pypaimon/write/ray_datasink.py b/paimon-python/pypaimon/write/ray_datasink.py index 6d48906f9fd8..18b7f024f02d 100644 --- a/paimon-python/pypaimon/write/ray_datasink.py +++ b/paimon-python/pypaimon/write/ray_datasink.py @@ -136,11 +136,20 @@ def write( commit_messages = table_write.prepare_commit() commit_messages_list.extend(commit_messages) - finally: - if table_write is not None: - table_write.close() - return commit_messages_list + table_write.close() + table_write = None + return commit_messages_list + except Exception: + if table_write is not None: + try: + table_write.abort() + except Exception as abort_error: + logger.warning( + f"Error aborting worker-side table_write: {abort_error}", + exc_info=abort_error + ) + raise @staticmethod def _extract_write_returns(write_result: Any): diff --git a/paimon-python/pypaimon/write/table_write.py b/paimon-python/pypaimon/write/table_write.py index 411ddd9ceb21..91eafa753633 100644 --- a/paimon-python/pypaimon/write/table_write.py +++ b/paimon-python/pypaimon/write/table_write.py @@ -135,6 +135,9 @@ def write_ray( def close(self): self.file_store_write.close() + def abort(self): + self.file_store_write.abort() + def _validate_pyarrow_schema(self, data_schema: pa.Schema): if data_schema == self.table_pyarrow_schema: return diff --git a/paimon-python/pypaimon/write/writer/data_vector_writer.py b/paimon-python/pypaimon/write/writer/data_vector_writer.py index d06d42593665..558e7c04f19a 100644 --- a/paimon-python/pypaimon/write/writer/data_vector_writer.py +++ b/paimon-python/pypaimon/write/writer/data_vector_writer.py @@ -151,7 +151,7 @@ def abort(self): if self.vector_writer is not None: self.vector_writer.abort() self.pending_normal_data = None - self.committed_files.clear() + super().abort() def _split_data(self, data: pa.RecordBatch) -> Tuple[pa.RecordBatch, pa.RecordBatch]: normal_data = ( @@ -187,11 +187,11 @@ def _close_current_writers(self): if self.vector_writer is not None: vector_metas = self.vector_writer.prepare_commit() - self.vector_writer.committed_files.clear() if vector_metas: if normal_meta is not None: self._validate_consistency(normal_meta, vector_metas) self.committed_files.extend(vector_metas) + self.vector_writer.committed_files.clear() self.pending_normal_data = None diff --git a/paimon-python/pypaimon/write/writer/data_writer.py b/paimon-python/pypaimon/write/writer/data_writer.py index 313caa7f6d08..d937bb029485 100644 --- a/paimon-python/pypaimon/write/writer/data_writer.py +++ b/paimon-python/pypaimon/write/writer/data_writer.py @@ -130,8 +130,14 @@ def abort(self): Abort all writers and clean up resources. This method should be called when an error occurs during writing. It deletes any files that were written and cleans up resources. """ - # Delete any files that were written - for file_meta in self.committed_files: + self._delete_committed_files(self.committed_files) + + # Clean up resources + self.pending_data = None + self.committed_files.clear() + + def _delete_committed_files(self, file_metas: List[DataFileMeta]): + for file_meta in file_metas: try: # Use external_path if available (contains full URL scheme), otherwise use file_path path_to_delete = file_meta.external_path if file_meta.external_path else file_meta.file_path @@ -145,10 +151,6 @@ def abort(self): path_to_delete = file_meta.external_path if file_meta.external_path else file_meta.file_path logger.warning(f"Failed to delete file {path_to_delete} during abort: {e}") - # Clean up resources - self.pending_data = None - self.committed_files.clear() - @abstractmethod def _process_data(self, data: pa.RecordBatch) -> pa.RecordBatch: """Process incoming data (e.g., add system fields, sort). Must be implemented by subclasses.""" diff --git a/paimon-python/pypaimon/write/writer/dedicated_format_writer.py b/paimon-python/pypaimon/write/writer/dedicated_format_writer.py index 01216b36cd4d..92df38c7b140 100644 --- a/paimon-python/pypaimon/write/writer/dedicated_format_writer.py +++ b/paimon-python/pypaimon/write/writer/dedicated_format_writer.py @@ -262,7 +262,13 @@ def abort(self): self.vector_writer.abort() if self._external_storage_writer: self._external_storage_writer.abort() + committed_non_blob_files = [ + file_meta for file_meta in self.committed_files + if not DataFileMeta.is_blob_file(file_meta.file_name) + ] + self._delete_committed_files(committed_non_blob_files) self.pending_normal_data = None + self.pending_data = None self.committed_files.clear() def _split_data(self, data: pa.RecordBatch) -> Tuple[ @@ -368,6 +374,7 @@ def _close_current_writers(self): normal_meta = None if self.pending_normal_data is not None and self.pending_normal_data.num_rows > 0: normal_meta = self._write_normal_data_to_file(self.pending_normal_data) + self.committed_files.append(normal_meta) blob_metas = [] for blob_column in self.blob_file_column_names: @@ -375,18 +382,15 @@ def _close_current_writers(self): if normal_meta is not None: self._validate_consistency(normal_meta, writer_metas, blob_column) blob_metas.extend(writer_metas) + self.committed_files.extend(blob_metas) vector_metas = [] if self.vector_writer is not None: vector_metas = self.vector_writer.prepare_commit() - self.vector_writer.committed_files.clear() if vector_metas and normal_meta is not None: self._validate_consistency(normal_meta, vector_metas, 'vector') - - if normal_meta is not None: - self.committed_files.append(normal_meta) - self.committed_files.extend(blob_metas) - self.committed_files.extend(vector_metas) + self.committed_files.extend(vector_metas) + self.vector_writer.committed_files.clear() self.pending_normal_data = None From 843ba4bc9da1c4683299c9db7e437aa52d4161d9 Mon Sep 17 00:00:00 2001 From: QuakeWang Date: Fri, 5 Jun 2026 14:21:18 +0800 Subject: [PATCH 2/2] [python][ray] Propagate writer close failures Dedicated and vector writers aborted on close failures but swallowed the exception. After Ray workers prepared commit messages, that could let a worker return messages whose files had been deleted by abort. Re-raise close failures after aborting so RayDatasink fails the task instead of returning stale messages. Add regressions for dedicated and vector writer close failures after prepare_commit. Signed-off-by: QuakeWang --- .../tests/data_evolution_formats_test.py | 42 ++++++++++++++++ paimon-python/pypaimon/tests/ray_sink_test.py | 49 +++++++++++++++++++ .../write/writer/data_vector_writer.py | 1 + .../write/writer/dedicated_format_writer.py | 1 + 4 files changed, 93 insertions(+) diff --git a/paimon-python/pypaimon/tests/data_evolution_formats_test.py b/paimon-python/pypaimon/tests/data_evolution_formats_test.py index f548d5b4afee..b30560c400af 100644 --- a/paimon-python/pypaimon/tests/data_evolution_formats_test.py +++ b/paimon-python/pypaimon/tests/data_evolution_formats_test.py @@ -641,6 +641,48 @@ def test_vector_abort_deletes_uncommitted_files(self): f"Expected abort to delete {file_meta.file_name}", ) + def test_vector_close_failure_after_prepare_raises(self): + from unittest.mock import patch + + pa_schema = pa.schema([ + ('id', pa.int64()), + ('embed', pa.list_(pa.float32(), 3)), + ]) + schema = Schema.from_pyarrow_schema(pa_schema, options={ + 'row-tracking.enabled': 'true', + 'data-evolution.enabled': 'true', + 'vector.file.format': 'parquet', + }) + self.catalog.create_table('default.fmt_vector_close_failure', schema, False) + table = self.catalog.get_table('default.fmt_vector_close_failure') + + writer = table.new_batch_write_builder().new_write() + writer.write_arrow(pa.table({ + 'id': pa.array([1, 2, 3], type=pa.int64()), + 'embed': pa.FixedSizeListArray.from_arrays( + pa.array([0.1, 0.2, 0.3, + 0.4, 0.5, 0.6, + 0.7, 0.8, 0.9], type=pa.float32()), 3), + })) + commit_messages = writer.prepare_commit() + + all_files = [nf for msg in commit_messages for nf in msg.new_files] + for file_meta in all_files: + self.assertTrue(table.file_io.exists(self._file_path(file_meta))) + + data_writer = next(iter(writer.file_store_write.data_writers.values())) + with patch.object( + data_writer, '_close_current_writers', + side_effect=RuntimeError("Close error")): + with self.assertRaisesRegex(RuntimeError, "Close error"): + writer.close() + + for file_meta in all_files: + self.assertFalse( + table.file_io.exists(self._file_path(file_meta)), + f"Expected abort to delete {file_meta.file_name}", + ) + @unittest.skipIf(sys.version_info < (3, 11), "vortex-data requires Python >= 3.11") @unittest.skipUnless( __import__('importlib').util.find_spec('vortex') is not None, diff --git a/paimon-python/pypaimon/tests/ray_sink_test.py b/paimon-python/pypaimon/tests/ray_sink_test.py index 074d9198a54e..afc86b571ed6 100644 --- a/paimon-python/pypaimon/tests/ray_sink_test.py +++ b/paimon-python/pypaimon/tests/ray_sink_test.py @@ -65,6 +65,16 @@ def tearDown(self): if os.path.exists(self.temp_dir): shutil.rmtree(self.temp_dir) + @staticmethod + def _data_files_under(table): + table_path = table.file_io.to_filesystem_path(table.table_path) + data_files = [] + for root, _, files in os.walk(table_path): + for file_name in files: + if file_name.endswith(('.parquet', '.blob')) or '.vector.' in file_name: + data_files.append(os.path.join(root, file_name)) + return data_files + def test_init_and_serialization(self): """Test initialization, serialization, and table name.""" datasink = PaimonDatasink(self.table, overwrite=False) @@ -294,6 +304,45 @@ def test_write(self): mock_write.prepare_commit.assert_called_once() mock_write.abort.assert_called_once() + def test_write_does_not_return_prepared_messages_when_dedicated_close_aborts(self): + from pypaimon.write.writer.dedicated_format_writer import DedicatedFormatWriter + + pa_schema = pa.schema([ + ('id', pa.int32()), + ('payload', pa.large_binary()), + ]) + schema = Schema.from_pyarrow_schema(pa_schema, options={ + 'row-tracking.enabled': 'true', + 'data-evolution.enabled': 'true', + }) + table_identifier = "test_db.test_blob_close_failure" + self.catalog.create_table(table_identifier, schema, False) + table = self.catalog.get_table(table_identifier) + + datasink = PaimonDatasink(table, overwrite=False) + datasink.on_write_start() + ctx = Mock(spec=TaskContext) + data_table = pa.Table.from_pydict({ + 'id': [1, 2, 3], + 'payload': [b'a', b'b', b'c'], + }, schema=pa_schema) + + original_close_current_writers = DedicatedFormatWriter._close_current_writers + close_current_calls = {'count': 0} + + def fail_during_close(writer): + close_current_calls['count'] += 1 + if close_current_calls['count'] == 1: + return original_close_current_writers(writer) + raise RuntimeError("Close error") + + with patch.object(DedicatedFormatWriter, '_close_current_writers', fail_during_close): + with self.assertRaisesRegex(RuntimeError, "Close error"): + datasink.write([data_table], ctx) + + self.assertEqual(close_current_calls['count'], 2) + self.assertEqual([], self._data_files_under(table)) + def test_on_write_complete(self): from ray.data.datasource.datasink import WriteResult diff --git a/paimon-python/pypaimon/write/writer/data_vector_writer.py b/paimon-python/pypaimon/write/writer/data_vector_writer.py index 558e7c04f19a..809cf5249950 100644 --- a/paimon-python/pypaimon/write/writer/data_vector_writer.py +++ b/paimon-python/pypaimon/write/writer/data_vector_writer.py @@ -143,6 +143,7 @@ def close(self): except Exception as e: logger.error("Exception occurs when closing writer. Cleaning up.", exc_info=e) self.abort() + raise finally: self.closed = True self.pending_normal_data = None diff --git a/paimon-python/pypaimon/write/writer/dedicated_format_writer.py b/paimon-python/pypaimon/write/writer/dedicated_format_writer.py index 92df38c7b140..5beadd6e4ce7 100644 --- a/paimon-python/pypaimon/write/writer/dedicated_format_writer.py +++ b/paimon-python/pypaimon/write/writer/dedicated_format_writer.py @@ -250,6 +250,7 @@ def close(self): except Exception as e: logger.error("Exception occurs when closing writer. Cleaning up.", exc_info=e) self.abort() + raise finally: self.closed = True self.pending_normal_data = None