diff --git a/paimon-python/pypaimon/tests/blob_table_test.py b/paimon-python/pypaimon/tests/blob_table_test.py index 7619fcb6de13..e3af63ad521f 100755 --- a/paimon-python/pypaimon/tests/blob_table_test.py +++ b/paimon-python/pypaimon/tests/blob_table_test.py @@ -3804,6 +3804,134 @@ def test_nested_field_named_blob_not_treated_as_blob(self): table = self.catalog.get_table('test_db.nested_blob_name_no_error') self.assertIsNotNone(table) + def test_blob_table_partial_update_non_blob_column(self): + pa_schema = pa.schema([ + ('id', pa.int32()), + ('name', pa.string()), + ('picture', pa.large_binary()), + ]) + opts = { + 'row-tracking.enabled': 'true', + 'data-evolution.enabled': 'true', + } + s = Schema.from_pyarrow_schema(pa_schema, options=opts) + table_name = 'test_db.blob_de_seq' + self.catalog.create_table(table_name, s, False) + + table = self.catalog.get_table(table_name) + wb = table.new_batch_write_builder() + w = wb.new_write() + w.write_arrow(pa.Table.from_pydict( + {'id': [1, 2], 'name': ['a', 'b'], 'picture': [None, None]}, + schema=pa_schema, + )) + wb.new_commit().commit(w.prepare_commit()) + w.close() + + from pypaimon.snapshot.snapshot import BATCH_COMMIT_IDENTIFIER + from pypaimon.write.table_update_by_row_id import TableUpdateByRowId + + table = self.catalog.get_table(table_name) + rb = table.new_read_builder() + rb = rb.with_projection(['name', '_ROW_ID']) + splits = rb.new_scan().plan().splits() + source = rb.new_read().to_arrow(splits) + + update_data = pa.table({ + '_ROW_ID': source.column('_ROW_ID'), + 'name': pa.array(['updated', 'updated'], type=pa.string()), + }) + updater = TableUpdateByRowId( + table, '_test_', BATCH_COMMIT_IDENTIFIER, + ) + msgs = updater.update_columns(update_data, ['name']) + table.new_batch_write_builder().new_commit().commit(msgs) + + table = self.catalog.get_table(table_name) + rb = table.new_read_builder() + splits = rb.new_scan().plan().splits() + result = rb.new_read().to_arrow(splits).sort_by('id').to_pydict() + self.assertEqual(result['name'], ['updated', 'updated']) + + def test_blob_table_partial_update_non_blob_column_with_rolling_files(self): + from pypaimon.manifest.schema.data_file_meta import DataFileMeta + from pypaimon.snapshot.snapshot import BATCH_COMMIT_IDENTIFIER + from pypaimon.write.table_update_by_row_id import TableUpdateByRowId + + pa_schema = pa.schema([ + ('id', pa.int32()), + ('name', pa.string()), + ('picture', pa.large_binary()), + ]) + opts = { + 'row-tracking.enabled': 'true', + 'data-evolution.enabled': 'true', + 'target-file-size': '1KB', + } + s = Schema.from_pyarrow_schema(pa_schema, options=opts) + table_name = 'test_db.blob_de_seq_rolling' + self.catalog.create_table(table_name, s, False) + + write_schema = pa.schema([ + ('id', pa.int32()), + ('name', pa.string()), + ]) + table = self.catalog.get_table(table_name) + wb = table.new_batch_write_builder() + w = wb.new_write().with_write_type(['id', 'name']) + for start in (0, 1000): + ids = list(range(start, start + 1000)) + w.write_arrow(pa.Table.from_pydict( + { + 'id': ids, + 'name': [f'name_{i}_' + 'x' * 2048 for i in ids], + }, + schema=write_schema, + )) + commit_messages = w.prepare_commit() + normal_files = [ + f for msg in commit_messages for f in msg.new_files + if not DataFileMeta.is_blob_file(f.file_name) + and not DataFileMeta.is_vector_file(f.file_name) + ] + self.assertGreaterEqual(len(normal_files), 2) + for file in normal_files: + self.assertEqual(file.min_sequence_number, 0) + self.assertEqual(file.max_sequence_number, file.row_count - 1) + wb.new_commit().commit(commit_messages) + w.close() + + table = self.catalog.get_table(table_name) + rb = table.new_read_builder().with_projection(['id', 'name', '_ROW_ID']) + splits = rb.new_scan().plan().splits() + source = rb.new_read().to_arrow(splits).sort_by('id') + + update_data = pa.table({ + '_ROW_ID': source.column('_ROW_ID'), + 'name': pa.array(['updated'] * source.num_rows, type=pa.string()), + }) + updater = TableUpdateByRowId( + table, '_test_', BATCH_COMMIT_IDENTIFIER, + ) + msgs = updater.update_columns(update_data, ['name']) + update_normal_files = [ + f for msg in msgs for f in msg.new_files + if not DataFileMeta.is_blob_file(f.file_name) + and not DataFileMeta.is_vector_file(f.file_name) + ] + self.assertGreaterEqual(len(update_normal_files), 2) + for file in update_normal_files: + self.assertEqual(file.min_sequence_number, 0) + self.assertEqual(file.max_sequence_number, file.row_count - 1) + table.new_batch_write_builder().new_commit().commit(msgs) + + table = self.catalog.get_table(table_name) + rb = table.new_read_builder().with_projection(['id', 'name']) + splits = rb.new_scan().plan().splits() + result = rb.new_read().to_arrow(splits).sort_by('id').to_pydict() + self.assertEqual(result['id'], list(range(2000))) + self.assertEqual(result['name'], ['updated'] * 2000) + class GetBlobTest(unittest.TestCase): diff --git a/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py b/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py index 7e365009db5c..cd2745dc0f77 100644 --- a/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py +++ b/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py @@ -1957,7 +1957,6 @@ def test_self_merge_multi_clause_fall_through(self): self.assertEqual(out['name'], ['old', 'young', 'senior']) self.assertEqual(out['age'], [10, 20, 30]) - @unittest.skip("blocked by blob DE sequence bug fix, see PR #8147") @unittest.skipIf(_SKIP_CONDITION, _SKIP_REASON) def test_self_merge_blob_source_condition(self): blob_schema = pa.schema([ diff --git a/paimon-python/pypaimon/tests/vector_table_test.py b/paimon-python/pypaimon/tests/vector_table_test.py index 75a97f19a867..0f96455bbde0 100644 --- a/paimon-python/pypaimon/tests/vector_table_test.py +++ b/paimon-python/pypaimon/tests/vector_table_test.py @@ -340,6 +340,137 @@ def test_vector_dedicated_format_write_read_lance(self): self.assertEqual(result.num_rows, 3) + def test_vector_table_partial_update_non_vector_column(self): + vector_schema = pa.schema([ + ('id', pa.int32()), + ('name', pa.string()), + ('embedding', pa.list_(pa.float32(), 4)), + ]) + opts = { + 'row-tracking.enabled': 'true', + 'data-evolution.enabled': 'true', + 'vector.file.format': 'parquet', + } + s = Schema.from_pyarrow_schema(vector_schema, options=opts) + table_name = 'test_db.vector_de_seq' + self.catalog.create_table(table_name, s, False) + + table = self.catalog.get_table(table_name) + wb = table.new_batch_write_builder() + w = wb.new_write() + w.write_arrow(pa.Table.from_pydict( + { + 'id': [1, 2], + 'name': ['a', 'b'], + 'embedding': [[1.0, 2.0, 3.0, 4.0], [5.0, 6.0, 7.0, 8.0]], + }, + schema=vector_schema, + )) + wb.new_commit().commit(w.prepare_commit()) + w.close() + + from pypaimon.snapshot.snapshot import BATCH_COMMIT_IDENTIFIER + from pypaimon.write.table_update_by_row_id import TableUpdateByRowId + + table = self.catalog.get_table(table_name) + rb = table.new_read_builder() + rb = rb.with_projection(['name', '_ROW_ID']) + splits = rb.new_scan().plan().splits() + source = rb.new_read().to_arrow(splits) + + update_data = pa.table({ + '_ROW_ID': source.column('_ROW_ID'), + 'name': pa.array(['updated', 'updated'], type=pa.string()), + }) + updater = TableUpdateByRowId( + table, '_test_', BATCH_COMMIT_IDENTIFIER, + ) + msgs = updater.update_columns(update_data, ['name']) + table.new_batch_write_builder().new_commit().commit(msgs) + + table = self.catalog.get_table(table_name) + rb = table.new_read_builder() + splits = rb.new_scan().plan().splits() + result = rb.new_read().to_arrow(splits).sort_by('id').to_pydict() + self.assertEqual(result['name'], ['updated', 'updated']) + + def test_vector_table_partial_update_non_vector_column_with_rolling_files(self): + from pypaimon.snapshot.snapshot import BATCH_COMMIT_IDENTIFIER + from pypaimon.write.table_update_by_row_id import TableUpdateByRowId + + vector_schema = pa.schema([ + ('id', pa.int32()), + ('name', pa.string()), + ('embedding', pa.list_(pa.float32(), 4)), + ]) + opts = { + 'row-tracking.enabled': 'true', + 'data-evolution.enabled': 'true', + 'vector.file.format': 'parquet', + 'target-file-size': '1KB', + } + s = Schema.from_pyarrow_schema(vector_schema, options=opts) + table_name = 'test_db.vector_de_seq_rolling' + self.catalog.create_table(table_name, s, False) + + write_schema = pa.schema([ + ('id', pa.int32()), + ('name', pa.string()), + ]) + table = self.catalog.get_table(table_name) + wb = table.new_batch_write_builder() + w = wb.new_write().with_write_type(['id', 'name']) + for start in (0, 1000): + ids = list(range(start, start + 1000)) + w.write_arrow(pa.Table.from_pydict( + { + 'id': ids, + 'name': [f'name_{i}_' + 'x' * 2048 for i in ids], + }, + schema=write_schema, + )) + commit_messages = w.prepare_commit() + normal_files = [ + f for msg in commit_messages for f in msg.new_files + if not DataFileMeta.is_vector_file(f.file_name) + ] + self.assertGreaterEqual(len(normal_files), 2) + for file in normal_files: + self.assertEqual(file.min_sequence_number, 0) + self.assertEqual(file.max_sequence_number, file.row_count - 1) + wb.new_commit().commit(commit_messages) + w.close() + + table = self.catalog.get_table(table_name) + rb = table.new_read_builder().with_projection(['id', 'name', '_ROW_ID']) + splits = rb.new_scan().plan().splits() + source = rb.new_read().to_arrow(splits).sort_by('id') + + update_data = pa.table({ + '_ROW_ID': source.column('_ROW_ID'), + 'name': pa.array(['updated'] * source.num_rows, type=pa.string()), + }) + updater = TableUpdateByRowId( + table, '_test_', BATCH_COMMIT_IDENTIFIER, + ) + msgs = updater.update_columns(update_data, ['name']) + update_normal_files = [ + f for msg in msgs for f in msg.new_files + if not DataFileMeta.is_vector_file(f.file_name) + ] + self.assertGreaterEqual(len(update_normal_files), 2) + for file in update_normal_files: + self.assertEqual(file.min_sequence_number, 0) + self.assertEqual(file.max_sequence_number, file.row_count - 1) + table.new_batch_write_builder().new_commit().commit(msgs) + + table = self.catalog.get_table(table_name) + rb = table.new_read_builder().with_projection(['id', 'name']) + splits = rb.new_scan().plan().splits() + result = rb.new_read().to_arrow(splits).sort_by('id').to_pydict() + self.assertEqual(result['id'], list(range(2000))) + self.assertEqual(result['name'], ['updated'] * 2000) + if __name__ == '__main__': unittest.main() diff --git a/paimon-python/pypaimon/write/writer/data_vector_writer.py b/paimon-python/pypaimon/write/writer/data_vector_writer.py index d06d42593665..e63a916ed49a 100644 --- a/paimon-python/pypaimon/write/writer/data_vector_writer.py +++ b/paimon-python/pypaimon/write/writer/data_vector_writer.py @@ -226,6 +226,8 @@ def _write_normal_data_to_file(self, data: pa.Table) -> Optional[DataFileMeta]: stats_columns = self.normal_columns if metadata_stats_enabled else [] value_stats = self._collect_value_stats(data, stats_columns) + min_seq, max_seq = self._append_file_sequence_range(data.num_rows) + return DataFileMeta.create( file_name=file_name, file_size=self.file_io.get_file_size(file_path), @@ -234,8 +236,8 @@ def _write_normal_data_to_file(self, data: pa.Table) -> Optional[DataFileMeta]: max_key=GenericRow([], []), key_stats=SimpleStats.empty_stats(), value_stats=value_stats, - min_sequence_number=-1, - max_sequence_number=-1, + min_sequence_number=min_seq, + max_sequence_number=max_seq, schema_id=self.table.table_schema.id, level=0, extra_files=[], diff --git a/paimon-python/pypaimon/write/writer/data_writer.py b/paimon-python/pypaimon/write/writer/data_writer.py index 9f10e8a64e22..e237bf9f3742 100644 --- a/paimon-python/pypaimon/write/writer/data_writer.py +++ b/paimon-python/pypaimon/write/writer/data_writer.py @@ -164,6 +164,16 @@ def _process_data(self, data: pa.RecordBatch) -> pa.RecordBatch: def _merge_data(self, existing_data: pa.RecordBatch, new_data: pa.RecordBatch) -> pa.RecordBatch: """Merge existing data with new data. Must be implemented by subclasses.""" + def _append_file_sequence_range(self, row_count: int) -> Tuple[int, int]: + if row_count <= 0: + raise ValueError("row_count must be positive") + + if self.options.data_evolution_enabled(False): + # Row-tracking commit stamps this sentinel range with the snapshot id. + return 0, row_count - 1 + + return -1, -1 + def _check_and_roll_if_needed(self): while self.pending_data is not None: current_size = self.pending_data.nbytes diff --git a/paimon-python/pypaimon/write/writer/dedicated_format_writer.py b/paimon-python/pypaimon/write/writer/dedicated_format_writer.py index 444e88332bfc..c223c1c62b14 100644 --- a/paimon-python/pypaimon/write/writer/dedicated_format_writer.py +++ b/paimon-python/pypaimon/write/writer/dedicated_format_writer.py @@ -435,7 +435,7 @@ def _create_data_file_meta(self, file_name: str, file_path: str, data: pa.Table, stats_columns = self.normal_columns if metadata_stats_enabled else [] value_stats = self._collect_value_stats(data, stats_columns) - self.sequence_generator.start = self.sequence_generator.current + min_seq, max_seq = self._append_file_sequence_range(data.num_rows) return DataFileMeta.create( file_name=file_name, @@ -445,8 +445,8 @@ def _create_data_file_meta(self, file_name: str, file_path: str, data: pa.Table, max_key=GenericRow([], []), key_stats=SimpleStats.empty_stats(), value_stats=value_stats, - min_sequence_number=-1, - max_sequence_number=-1, + min_sequence_number=min_seq, + max_sequence_number=max_seq, schema_id=self.table.table_schema.id, level=0, extra_files=[],