diff --git a/paimon-python/pypaimon/tests/data_evolution_formats_test.py b/paimon-python/pypaimon/tests/data_evolution_formats_test.py index f89f3290f199..b30560c400af 100644 --- a/paimon-python/pypaimon/tests/data_evolution_formats_test.py +++ b/paimon-python/pypaimon/tests/data_evolution_formats_test.py @@ -46,6 +46,10 @@ def setUpClass(cls): def tearDownClass(cls): shutil.rmtree(cls.tempdir, ignore_errors=True) + @staticmethod + def _file_path(file_meta): + return file_meta.external_path if file_meta.external_path else file_meta.file_path + # ------------------------------------------------------------------ # Parquet-format data evolution # ------------------------------------------------------------------ @@ -236,6 +240,41 @@ def test_blob_write_and_read(self): self.assertEqual(actual.column('id').to_pylist(), [1, 2, 3]) self.assertEqual(actual.column('payload').to_pylist(), blobs) + def test_blob_abort_deletes_uncommitted_files(self): + pa_schema = pa.schema([ + ('id', pa.int32()), + ('payload', pa.large_binary()), + ]) + schema = Schema.from_pyarrow_schema(pa_schema, options={ + 'row-tracking.enabled': 'true', + 'data-evolution.enabled': 'true', + }) + self.catalog.create_table('default.fmt_blob_abort_cleanup', schema, False) + table = self.catalog.get_table('default.fmt_blob_abort_cleanup') + + writer = table.new_batch_write_builder().new_write() + writer.write_arrow(pa.Table.from_pydict({ + 'id': [1, 2, 3], + 'payload': [b'a', b'b', b'c'], + }, schema=pa_schema)) + commit_messages = writer.prepare_commit() + + all_files = [nf for msg in commit_messages for nf in msg.new_files] + parquet_files = [f for f in all_files if f.file_name.endswith('.parquet')] + blob_files = [f for f in all_files if f.file_name.endswith('.blob')] + self.assertGreater(len(parquet_files), 0) + self.assertGreater(len(blob_files), 0) + for file_meta in all_files: + self.assertTrue(table.file_io.exists(self._file_path(file_meta))) + + writer.abort() + + for file_meta in all_files: + self.assertFalse( + table.file_io.exists(self._file_path(file_meta)), + f"Expected abort to delete {file_meta.file_name}", + ) + def test_blob_column_subset_evolution(self): """Write normal+blob cols in one commit, overwrite normal col in another, merge-read.""" pa_schema = pa.schema([ @@ -563,6 +602,87 @@ def test_vortex_with_row_id_and_filter(self): # Vector (vortex) file format for embedding columns # ------------------------------------------------------------------ + def test_vector_abort_deletes_uncommitted_files(self): + pa_schema = pa.schema([ + ('id', pa.int64()), + ('embed', pa.list_(pa.float32(), 3)), + ]) + schema = Schema.from_pyarrow_schema(pa_schema, options={ + 'row-tracking.enabled': 'true', + 'data-evolution.enabled': 'true', + 'vector.file.format': 'parquet', + }) + self.catalog.create_table('default.fmt_vector_abort_cleanup', schema, False) + table = self.catalog.get_table('default.fmt_vector_abort_cleanup') + + writer = table.new_batch_write_builder().new_write() + writer.write_arrow(pa.table({ + 'id': pa.array([1, 2, 3], type=pa.int64()), + 'embed': pa.FixedSizeListArray.from_arrays( + pa.array([0.1, 0.2, 0.3, + 0.4, 0.5, 0.6, + 0.7, 0.8, 0.9], type=pa.float32()), 3), + })) + commit_messages = writer.prepare_commit() + + all_files = [nf for msg in commit_messages for nf in msg.new_files] + normal_files = [f for f in all_files if not DataFileMeta.is_vector_file(f.file_name)] + vector_files = [f for f in all_files if DataFileMeta.is_vector_file(f.file_name)] + self.assertGreater(len(normal_files), 0) + self.assertGreater(len(vector_files), 0) + for file_meta in all_files: + self.assertTrue(table.file_io.exists(self._file_path(file_meta))) + + writer.abort() + + for file_meta in all_files: + self.assertFalse( + table.file_io.exists(self._file_path(file_meta)), + f"Expected abort to delete {file_meta.file_name}", + ) + + def test_vector_close_failure_after_prepare_raises(self): + from unittest.mock import patch + + pa_schema = pa.schema([ + ('id', pa.int64()), + ('embed', pa.list_(pa.float32(), 3)), + ]) + schema = Schema.from_pyarrow_schema(pa_schema, options={ + 'row-tracking.enabled': 'true', + 'data-evolution.enabled': 'true', + 'vector.file.format': 'parquet', + }) + self.catalog.create_table('default.fmt_vector_close_failure', schema, False) + table = self.catalog.get_table('default.fmt_vector_close_failure') + + writer = table.new_batch_write_builder().new_write() + writer.write_arrow(pa.table({ + 'id': pa.array([1, 2, 3], type=pa.int64()), + 'embed': pa.FixedSizeListArray.from_arrays( + pa.array([0.1, 0.2, 0.3, + 0.4, 0.5, 0.6, + 0.7, 0.8, 0.9], type=pa.float32()), 3), + })) + commit_messages = writer.prepare_commit() + + all_files = [nf for msg in commit_messages for nf in msg.new_files] + for file_meta in all_files: + self.assertTrue(table.file_io.exists(self._file_path(file_meta))) + + data_writer = next(iter(writer.file_store_write.data_writers.values())) + with patch.object( + data_writer, '_close_current_writers', + side_effect=RuntimeError("Close error")): + with self.assertRaisesRegex(RuntimeError, "Close error"): + writer.close() + + for file_meta in all_files: + self.assertFalse( + table.file_io.exists(self._file_path(file_meta)), + f"Expected abort to delete {file_meta.file_name}", + ) + @unittest.skipIf(sys.version_info < (3, 11), "vortex-data requires Python >= 3.11") @unittest.skipUnless( __import__('importlib').util.find_spec('vortex') is not None, diff --git a/paimon-python/pypaimon/tests/ray_sink_test.py b/paimon-python/pypaimon/tests/ray_sink_test.py index a6d761df5add..afc86b571ed6 100644 --- a/paimon-python/pypaimon/tests/ray_sink_test.py +++ b/paimon-python/pypaimon/tests/ray_sink_test.py @@ -65,6 +65,16 @@ def tearDown(self): if os.path.exists(self.temp_dir): shutil.rmtree(self.temp_dir) + @staticmethod + def _data_files_under(table): + table_path = table.file_io.to_filesystem_path(table.table_path) + data_files = [] + for root, _, files in os.walk(table_path): + for file_name in files: + if file_name.endswith(('.parquet', '.blob')) or '.vector.' in file_name: + data_files.append(os.path.join(root, file_name)) + return data_files + def test_init_and_serialization(self): """Test initialization, serialization, and table name.""" datasink = PaimonDatasink(self.table, overwrite=False) @@ -272,7 +282,66 @@ def test_write(self): }) with self.assertRaises(Exception): datasink.write([data_table], ctx) - mock_write.close.assert_called_once() + mock_write.abort.assert_called_once() + mock_write.close.assert_not_called() + + with patch.object(self.table, 'new_batch_write_builder') as mock_builder: + mock_write_builder = Mock() + mock_write_builder.overwrite.return_value = mock_write_builder + mock_write = Mock() + mock_write.prepare_commit.return_value = [Mock(spec=CommitMessage)] + mock_write.close.side_effect = Exception("Close error") + mock_write_builder.new_write.return_value = mock_write + mock_builder.return_value = mock_write_builder + + data_table = pa.table({ + 'id': [1], + 'name': ['Alice'], + 'value': [1.1] + }) + with self.assertRaises(Exception): + datasink.write([data_table], ctx) + mock_write.prepare_commit.assert_called_once() + mock_write.abort.assert_called_once() + + def test_write_does_not_return_prepared_messages_when_dedicated_close_aborts(self): + from pypaimon.write.writer.dedicated_format_writer import DedicatedFormatWriter + + pa_schema = pa.schema([ + ('id', pa.int32()), + ('payload', pa.large_binary()), + ]) + schema = Schema.from_pyarrow_schema(pa_schema, options={ + 'row-tracking.enabled': 'true', + 'data-evolution.enabled': 'true', + }) + table_identifier = "test_db.test_blob_close_failure" + self.catalog.create_table(table_identifier, schema, False) + table = self.catalog.get_table(table_identifier) + + datasink = PaimonDatasink(table, overwrite=False) + datasink.on_write_start() + ctx = Mock(spec=TaskContext) + data_table = pa.Table.from_pydict({ + 'id': [1, 2, 3], + 'payload': [b'a', b'b', b'c'], + }, schema=pa_schema) + + original_close_current_writers = DedicatedFormatWriter._close_current_writers + close_current_calls = {'count': 0} + + def fail_during_close(writer): + close_current_calls['count'] += 1 + if close_current_calls['count'] == 1: + return original_close_current_writers(writer) + raise RuntimeError("Close error") + + with patch.object(DedicatedFormatWriter, '_close_current_writers', fail_during_close): + with self.assertRaisesRegex(RuntimeError, "Close error"): + datasink.write([data_table], ctx) + + self.assertEqual(close_current_calls['count'], 2) + self.assertEqual([], self._data_files_under(table)) def test_on_write_complete(self): from ray.data.datasource.datasink import WriteResult diff --git a/paimon-python/pypaimon/write/file_store_write.py b/paimon-python/pypaimon/write/file_store_write.py index ee3b96d8d73d..58ceb1c83c37 100644 --- a/paimon-python/pypaimon/write/file_store_write.py +++ b/paimon-python/pypaimon/write/file_store_write.py @@ -251,6 +251,15 @@ def close(self): writer.close() self.data_writers.clear() + def abort(self): + """Abort all data writers and clean up files produced by this write.""" + for writer in self.data_writers.values(): + try: + writer.abort() + except Exception as e: + logger.warning("Failed to abort data writer.", exc_info=e) + self.data_writers.clear() + def _seq_number_stats(self, partition: Tuple) -> Dict[int, int]: buckets = self.max_seq_numbers.get(partition) if buckets is None: diff --git a/paimon-python/pypaimon/write/ray_datasink.py b/paimon-python/pypaimon/write/ray_datasink.py index 6d48906f9fd8..18b7f024f02d 100644 --- a/paimon-python/pypaimon/write/ray_datasink.py +++ b/paimon-python/pypaimon/write/ray_datasink.py @@ -136,11 +136,20 @@ def write( commit_messages = table_write.prepare_commit() commit_messages_list.extend(commit_messages) - finally: - if table_write is not None: - table_write.close() - return commit_messages_list + table_write.close() + table_write = None + return commit_messages_list + except Exception: + if table_write is not None: + try: + table_write.abort() + except Exception as abort_error: + logger.warning( + f"Error aborting worker-side table_write: {abort_error}", + exc_info=abort_error + ) + raise @staticmethod def _extract_write_returns(write_result: Any): diff --git a/paimon-python/pypaimon/write/table_write.py b/paimon-python/pypaimon/write/table_write.py index 411ddd9ceb21..91eafa753633 100644 --- a/paimon-python/pypaimon/write/table_write.py +++ b/paimon-python/pypaimon/write/table_write.py @@ -135,6 +135,9 @@ def write_ray( def close(self): self.file_store_write.close() + def abort(self): + self.file_store_write.abort() + def _validate_pyarrow_schema(self, data_schema: pa.Schema): if data_schema == self.table_pyarrow_schema: return diff --git a/paimon-python/pypaimon/write/writer/data_vector_writer.py b/paimon-python/pypaimon/write/writer/data_vector_writer.py index d06d42593665..809cf5249950 100644 --- a/paimon-python/pypaimon/write/writer/data_vector_writer.py +++ b/paimon-python/pypaimon/write/writer/data_vector_writer.py @@ -143,6 +143,7 @@ def close(self): except Exception as e: logger.error("Exception occurs when closing writer. Cleaning up.", exc_info=e) self.abort() + raise finally: self.closed = True self.pending_normal_data = None @@ -151,7 +152,7 @@ def abort(self): if self.vector_writer is not None: self.vector_writer.abort() self.pending_normal_data = None - self.committed_files.clear() + super().abort() def _split_data(self, data: pa.RecordBatch) -> Tuple[pa.RecordBatch, pa.RecordBatch]: normal_data = ( @@ -187,11 +188,11 @@ def _close_current_writers(self): if self.vector_writer is not None: vector_metas = self.vector_writer.prepare_commit() - self.vector_writer.committed_files.clear() if vector_metas: if normal_meta is not None: self._validate_consistency(normal_meta, vector_metas) self.committed_files.extend(vector_metas) + self.vector_writer.committed_files.clear() self.pending_normal_data = None diff --git a/paimon-python/pypaimon/write/writer/data_writer.py b/paimon-python/pypaimon/write/writer/data_writer.py index 9f10e8a64e22..a433fd9360c4 100644 --- a/paimon-python/pypaimon/write/writer/data_writer.py +++ b/paimon-python/pypaimon/write/writer/data_writer.py @@ -138,8 +138,15 @@ def abort(self): Abort all writers and clean up resources. This method should be called when an error occurs during writing. It deletes any files that were written and cleans up resources. """ - # Delete any files that were written (data + changelog) - for file_meta in self.committed_files + self.committed_changelog_files: + self._delete_committed_files(self.committed_files + self.committed_changelog_files) + + # Clean up resources + self.pending_data = None + self.committed_files.clear() + self.committed_changelog_files.clear() + + def _delete_committed_files(self, file_metas: List[DataFileMeta]): + for file_meta in file_metas: try: path_to_delete = file_meta.external_path if file_meta.external_path else file_meta.file_path if path_to_delete: @@ -151,11 +158,6 @@ def abort(self): path_to_delete = file_meta.external_path if file_meta.external_path else file_meta.file_path logger.warning(f"Failed to delete file {path_to_delete} during abort: {e}") - # Clean up resources - self.pending_data = None - self.committed_files.clear() - self.committed_changelog_files.clear() - @abstractmethod def _process_data(self, data: pa.RecordBatch) -> pa.RecordBatch: """Process incoming data (e.g., add system fields, sort). Must be implemented by subclasses.""" diff --git a/paimon-python/pypaimon/write/writer/dedicated_format_writer.py b/paimon-python/pypaimon/write/writer/dedicated_format_writer.py index 444e88332bfc..3eaa8e3a2fed 100644 --- a/paimon-python/pypaimon/write/writer/dedicated_format_writer.py +++ b/paimon-python/pypaimon/write/writer/dedicated_format_writer.py @@ -252,6 +252,7 @@ def close(self): except Exception as e: logger.error("Exception occurs when closing writer. Cleaning up.", exc_info=e) self.abort() + raise finally: self.closed = True self.pending_normal_data = None @@ -264,7 +265,13 @@ def abort(self): self.vector_writer.abort() if self._external_storage_writer: self._external_storage_writer.abort() + committed_non_blob_files = [ + file_meta for file_meta in self.committed_files + if not DataFileMeta.is_blob_file(file_meta.file_name) + ] + self._delete_committed_files(committed_non_blob_files) self.pending_normal_data = None + self.pending_data = None self.committed_files.clear() def _split_data(self, data: pa.RecordBatch) -> Tuple[ @@ -370,6 +377,7 @@ def _close_current_writers(self): normal_meta = None if self.pending_normal_data is not None and self.pending_normal_data.num_rows > 0: normal_meta = self._write_normal_data_to_file(self.pending_normal_data) + self.committed_files.append(normal_meta) blob_metas = [] for blob_column in self.blob_file_column_names: @@ -377,18 +385,15 @@ def _close_current_writers(self): if normal_meta is not None: self._validate_consistency(normal_meta, writer_metas, blob_column) blob_metas.extend(writer_metas) + self.committed_files.extend(blob_metas) vector_metas = [] if self.vector_writer is not None: vector_metas = self.vector_writer.prepare_commit() - self.vector_writer.committed_files.clear() if vector_metas and normal_meta is not None: self._validate_consistency(normal_meta, vector_metas, 'vector') - - if normal_meta is not None: - self.committed_files.append(normal_meta) - self.committed_files.extend(blob_metas) - self.committed_files.extend(vector_metas) + self.committed_files.extend(vector_metas) + self.vector_writer.committed_files.clear() self.pending_normal_data = None