Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 120 additions & 0 deletions paimon-python/pypaimon/tests/data_evolution_formats_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ def setUpClass(cls):
def tearDownClass(cls):
shutil.rmtree(cls.tempdir, ignore_errors=True)

@staticmethod
def _file_path(file_meta):
return file_meta.external_path if file_meta.external_path else file_meta.file_path

# ------------------------------------------------------------------
# Parquet-format data evolution
# ------------------------------------------------------------------
Expand Down Expand Up @@ -236,6 +240,41 @@ def test_blob_write_and_read(self):
self.assertEqual(actual.column('id').to_pylist(), [1, 2, 3])
self.assertEqual(actual.column('payload').to_pylist(), blobs)

def test_blob_abort_deletes_uncommitted_files(self):
pa_schema = pa.schema([
('id', pa.int32()),
('payload', pa.large_binary()),
])
schema = Schema.from_pyarrow_schema(pa_schema, options={
'row-tracking.enabled': 'true',
'data-evolution.enabled': 'true',
})
self.catalog.create_table('default.fmt_blob_abort_cleanup', schema, False)
table = self.catalog.get_table('default.fmt_blob_abort_cleanup')

writer = table.new_batch_write_builder().new_write()
writer.write_arrow(pa.Table.from_pydict({
'id': [1, 2, 3],
'payload': [b'a', b'b', b'c'],
}, schema=pa_schema))
commit_messages = writer.prepare_commit()

all_files = [nf for msg in commit_messages for nf in msg.new_files]
parquet_files = [f for f in all_files if f.file_name.endswith('.parquet')]
blob_files = [f for f in all_files if f.file_name.endswith('.blob')]
self.assertGreater(len(parquet_files), 0)
self.assertGreater(len(blob_files), 0)
for file_meta in all_files:
self.assertTrue(table.file_io.exists(self._file_path(file_meta)))

writer.abort()

for file_meta in all_files:
self.assertFalse(
table.file_io.exists(self._file_path(file_meta)),
f"Expected abort to delete {file_meta.file_name}",
)

def test_blob_column_subset_evolution(self):
"""Write normal+blob cols in one commit, overwrite normal col in another, merge-read."""
pa_schema = pa.schema([
Expand Down Expand Up @@ -563,6 +602,87 @@ def test_vortex_with_row_id_and_filter(self):
# Vector (vortex) file format for embedding columns
# ------------------------------------------------------------------

def test_vector_abort_deletes_uncommitted_files(self):
pa_schema = pa.schema([
('id', pa.int64()),
('embed', pa.list_(pa.float32(), 3)),
])
schema = Schema.from_pyarrow_schema(pa_schema, options={
'row-tracking.enabled': 'true',
'data-evolution.enabled': 'true',
'vector.file.format': 'parquet',
})
self.catalog.create_table('default.fmt_vector_abort_cleanup', schema, False)
table = self.catalog.get_table('default.fmt_vector_abort_cleanup')

writer = table.new_batch_write_builder().new_write()
writer.write_arrow(pa.table({
'id': pa.array([1, 2, 3], type=pa.int64()),
'embed': pa.FixedSizeListArray.from_arrays(
pa.array([0.1, 0.2, 0.3,
0.4, 0.5, 0.6,
0.7, 0.8, 0.9], type=pa.float32()), 3),
}))
commit_messages = writer.prepare_commit()

all_files = [nf for msg in commit_messages for nf in msg.new_files]
normal_files = [f for f in all_files if not DataFileMeta.is_vector_file(f.file_name)]
vector_files = [f for f in all_files if DataFileMeta.is_vector_file(f.file_name)]
self.assertGreater(len(normal_files), 0)
self.assertGreater(len(vector_files), 0)
for file_meta in all_files:
self.assertTrue(table.file_io.exists(self._file_path(file_meta)))

writer.abort()

for file_meta in all_files:
self.assertFalse(
table.file_io.exists(self._file_path(file_meta)),
f"Expected abort to delete {file_meta.file_name}",
)

def test_vector_close_failure_after_prepare_raises(self):
from unittest.mock import patch

pa_schema = pa.schema([
('id', pa.int64()),
('embed', pa.list_(pa.float32(), 3)),
])
schema = Schema.from_pyarrow_schema(pa_schema, options={
'row-tracking.enabled': 'true',
'data-evolution.enabled': 'true',
'vector.file.format': 'parquet',
})
self.catalog.create_table('default.fmt_vector_close_failure', schema, False)
table = self.catalog.get_table('default.fmt_vector_close_failure')

writer = table.new_batch_write_builder().new_write()
writer.write_arrow(pa.table({
'id': pa.array([1, 2, 3], type=pa.int64()),
'embed': pa.FixedSizeListArray.from_arrays(
pa.array([0.1, 0.2, 0.3,
0.4, 0.5, 0.6,
0.7, 0.8, 0.9], type=pa.float32()), 3),
}))
commit_messages = writer.prepare_commit()

all_files = [nf for msg in commit_messages for nf in msg.new_files]
for file_meta in all_files:
self.assertTrue(table.file_io.exists(self._file_path(file_meta)))

data_writer = next(iter(writer.file_store_write.data_writers.values()))
with patch.object(
data_writer, '_close_current_writers',
side_effect=RuntimeError("Close error")):
with self.assertRaisesRegex(RuntimeError, "Close error"):
writer.close()

for file_meta in all_files:
self.assertFalse(
table.file_io.exists(self._file_path(file_meta)),
f"Expected abort to delete {file_meta.file_name}",
)

@unittest.skipIf(sys.version_info < (3, 11), "vortex-data requires Python >= 3.11")
@unittest.skipUnless(
__import__('importlib').util.find_spec('vortex') is not None,
Expand Down
71 changes: 70 additions & 1 deletion paimon-python/pypaimon/tests/ray_sink_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,16 @@ def tearDown(self):
if os.path.exists(self.temp_dir):
shutil.rmtree(self.temp_dir)

@staticmethod
def _data_files_under(table):
table_path = table.file_io.to_filesystem_path(table.table_path)
data_files = []
for root, _, files in os.walk(table_path):
for file_name in files:
if file_name.endswith(('.parquet', '.blob')) or '.vector.' in file_name:
data_files.append(os.path.join(root, file_name))
return data_files

def test_init_and_serialization(self):
"""Test initialization, serialization, and table name."""
datasink = PaimonDatasink(self.table, overwrite=False)
Expand Down Expand Up @@ -272,7 +282,66 @@ def test_write(self):
})
with self.assertRaises(Exception):
datasink.write([data_table], ctx)
mock_write.close.assert_called_once()
mock_write.abort.assert_called_once()
mock_write.close.assert_not_called()

with patch.object(self.table, 'new_batch_write_builder') as mock_builder:
mock_write_builder = Mock()
mock_write_builder.overwrite.return_value = mock_write_builder
mock_write = Mock()
mock_write.prepare_commit.return_value = [Mock(spec=CommitMessage)]
mock_write.close.side_effect = Exception("Close error")
mock_write_builder.new_write.return_value = mock_write
mock_builder.return_value = mock_write_builder

data_table = pa.table({
'id': [1],
'name': ['Alice'],
'value': [1.1]
})
with self.assertRaises(Exception):
datasink.write([data_table], ctx)
mock_write.prepare_commit.assert_called_once()
mock_write.abort.assert_called_once()

def test_write_does_not_return_prepared_messages_when_dedicated_close_aborts(self):
from pypaimon.write.writer.dedicated_format_writer import DedicatedFormatWriter

pa_schema = pa.schema([
('id', pa.int32()),
('payload', pa.large_binary()),
])
schema = Schema.from_pyarrow_schema(pa_schema, options={
'row-tracking.enabled': 'true',
'data-evolution.enabled': 'true',
})
table_identifier = "test_db.test_blob_close_failure"
self.catalog.create_table(table_identifier, schema, False)
table = self.catalog.get_table(table_identifier)

datasink = PaimonDatasink(table, overwrite=False)
datasink.on_write_start()
ctx = Mock(spec=TaskContext)
data_table = pa.Table.from_pydict({
'id': [1, 2, 3],
'payload': [b'a', b'b', b'c'],
}, schema=pa_schema)

original_close_current_writers = DedicatedFormatWriter._close_current_writers
close_current_calls = {'count': 0}

def fail_during_close(writer):
close_current_calls['count'] += 1
if close_current_calls['count'] == 1:
return original_close_current_writers(writer)
raise RuntimeError("Close error")

with patch.object(DedicatedFormatWriter, '_close_current_writers', fail_during_close):
with self.assertRaisesRegex(RuntimeError, "Close error"):
datasink.write([data_table], ctx)

self.assertEqual(close_current_calls['count'], 2)
self.assertEqual([], self._data_files_under(table))

def test_on_write_complete(self):
from ray.data.datasource.datasink import WriteResult
Expand Down
9 changes: 9 additions & 0 deletions paimon-python/pypaimon/write/file_store_write.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,15 @@ def close(self):
writer.close()
self.data_writers.clear()

def abort(self):
"""Abort all data writers and clean up files produced by this write."""
for writer in self.data_writers.values():
try:
writer.abort()
except Exception as e:
logger.warning("Failed to abort data writer.", exc_info=e)
self.data_writers.clear()

def _seq_number_stats(self, partition: Tuple) -> Dict[int, int]:
buckets = self.max_seq_numbers.get(partition)
if buckets is None:
Expand Down
17 changes: 13 additions & 4 deletions paimon-python/pypaimon/write/ray_datasink.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,11 +136,20 @@ def write(

commit_messages = table_write.prepare_commit()
commit_messages_list.extend(commit_messages)
finally:
if table_write is not None:
table_write.close()

return commit_messages_list
table_write.close()
table_write = None
return commit_messages_list
except Exception:
if table_write is not None:
try:
table_write.abort()
except Exception as abort_error:
logger.warning(
f"Error aborting worker-side table_write: {abort_error}",
exc_info=abort_error
)
raise

@staticmethod
def _extract_write_returns(write_result: Any):
Expand Down
3 changes: 3 additions & 0 deletions paimon-python/pypaimon/write/table_write.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,9 @@ def write_ray(
def close(self):
self.file_store_write.close()

def abort(self):
self.file_store_write.abort()

def _validate_pyarrow_schema(self, data_schema: pa.Schema):
if data_schema == self.table_pyarrow_schema:
return
Expand Down
5 changes: 3 additions & 2 deletions paimon-python/pypaimon/write/writer/data_vector_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ def close(self):
except Exception as e:
logger.error("Exception occurs when closing writer. Cleaning up.", exc_info=e)
self.abort()
raise
finally:
self.closed = True
self.pending_normal_data = None
Expand All @@ -151,7 +152,7 @@ def abort(self):
if self.vector_writer is not None:
self.vector_writer.abort()
self.pending_normal_data = None
self.committed_files.clear()
super().abort()

def _split_data(self, data: pa.RecordBatch) -> Tuple[pa.RecordBatch, pa.RecordBatch]:
normal_data = (
Expand Down Expand Up @@ -187,11 +188,11 @@ def _close_current_writers(self):

if self.vector_writer is not None:
vector_metas = self.vector_writer.prepare_commit()
self.vector_writer.committed_files.clear()
if vector_metas:
if normal_meta is not None:
self._validate_consistency(normal_meta, vector_metas)
self.committed_files.extend(vector_metas)
self.vector_writer.committed_files.clear()

self.pending_normal_data = None

Expand Down
14 changes: 8 additions & 6 deletions paimon-python/pypaimon/write/writer/data_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,14 @@ def abort(self):
Abort all writers and clean up resources. This method should be called when an error occurs
during writing. It deletes any files that were written and cleans up resources.
"""
# Delete any files that were written
for file_meta in self.committed_files:
self._delete_committed_files(self.committed_files)

# Clean up resources
self.pending_data = None
self.committed_files.clear()

def _delete_committed_files(self, file_metas: List[DataFileMeta]):
for file_meta in file_metas:
try:
# Use external_path if available (contains full URL scheme), otherwise use file_path
path_to_delete = file_meta.external_path if file_meta.external_path else file_meta.file_path
Expand All @@ -145,10 +151,6 @@ def abort(self):
path_to_delete = file_meta.external_path if file_meta.external_path else file_meta.file_path
logger.warning(f"Failed to delete file {path_to_delete} during abort: {e}")

# Clean up resources
self.pending_data = None
self.committed_files.clear()

@abstractmethod
def _process_data(self, data: pa.RecordBatch) -> pa.RecordBatch:
"""Process incoming data (e.g., add system fields, sort). Must be implemented by subclasses."""
Expand Down
17 changes: 11 additions & 6 deletions paimon-python/pypaimon/write/writer/dedicated_format_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ def close(self):
except Exception as e:
logger.error("Exception occurs when closing writer. Cleaning up.", exc_info=e)
self.abort()
raise
finally:
self.closed = True
self.pending_normal_data = None
Expand All @@ -262,7 +263,13 @@ def abort(self):
self.vector_writer.abort()
if self._external_storage_writer:
self._external_storage_writer.abort()
committed_non_blob_files = [
file_meta for file_meta in self.committed_files
if not DataFileMeta.is_blob_file(file_meta.file_name)
]
self._delete_committed_files(committed_non_blob_files)
self.pending_normal_data = None
self.pending_data = None
self.committed_files.clear()

def _split_data(self, data: pa.RecordBatch) -> Tuple[
Expand Down Expand Up @@ -368,25 +375,23 @@ def _close_current_writers(self):
normal_meta = None
if self.pending_normal_data is not None and self.pending_normal_data.num_rows > 0:
normal_meta = self._write_normal_data_to_file(self.pending_normal_data)
self.committed_files.append(normal_meta)

blob_metas = []
for blob_column in self.blob_file_column_names:
writer_metas = self.blob_writers[blob_column].prepare_commit()
if normal_meta is not None:
self._validate_consistency(normal_meta, writer_metas, blob_column)
blob_metas.extend(writer_metas)
self.committed_files.extend(blob_metas)

vector_metas = []
if self.vector_writer is not None:
vector_metas = self.vector_writer.prepare_commit()
self.vector_writer.committed_files.clear()
if vector_metas and normal_meta is not None:
self._validate_consistency(normal_meta, vector_metas, 'vector')

if normal_meta is not None:
self.committed_files.append(normal_meta)
self.committed_files.extend(blob_metas)
self.committed_files.extend(vector_metas)
self.committed_files.extend(vector_metas)
self.vector_writer.committed_files.clear()

self.pending_normal_data = None

Expand Down
Loading