Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 128 additions & 0 deletions paimon-python/pypaimon/tests/blob_table_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -3804,6 +3804,134 @@ def test_nested_field_named_blob_not_treated_as_blob(self):
table = self.catalog.get_table('test_db.nested_blob_name_no_error')
self.assertIsNotNone(table)

def test_blob_table_partial_update_non_blob_column(self):
pa_schema = pa.schema([
('id', pa.int32()),
('name', pa.string()),
('picture', pa.large_binary()),
])
opts = {
'row-tracking.enabled': 'true',
'data-evolution.enabled': 'true',
}
s = Schema.from_pyarrow_schema(pa_schema, options=opts)
table_name = 'test_db.blob_de_seq'
self.catalog.create_table(table_name, s, False)

table = self.catalog.get_table(table_name)
wb = table.new_batch_write_builder()
w = wb.new_write()
w.write_arrow(pa.Table.from_pydict(
{'id': [1, 2], 'name': ['a', 'b'], 'picture': [None, None]},
schema=pa_schema,
))
wb.new_commit().commit(w.prepare_commit())
w.close()

from pypaimon.snapshot.snapshot import BATCH_COMMIT_IDENTIFIER
from pypaimon.write.table_update_by_row_id import TableUpdateByRowId

table = self.catalog.get_table(table_name)
rb = table.new_read_builder()
rb = rb.with_projection(['name', '_ROW_ID'])
splits = rb.new_scan().plan().splits()
source = rb.new_read().to_arrow(splits)

update_data = pa.table({
'_ROW_ID': source.column('_ROW_ID'),
'name': pa.array(['updated', 'updated'], type=pa.string()),
})
updater = TableUpdateByRowId(
table, '_test_', BATCH_COMMIT_IDENTIFIER,
)
msgs = updater.update_columns(update_data, ['name'])
table.new_batch_write_builder().new_commit().commit(msgs)

table = self.catalog.get_table(table_name)
rb = table.new_read_builder()
splits = rb.new_scan().plan().splits()
result = rb.new_read().to_arrow(splits).sort_by('id').to_pydict()
self.assertEqual(result['name'], ['updated', 'updated'])

def test_blob_table_partial_update_non_blob_column_with_rolling_files(self):
from pypaimon.manifest.schema.data_file_meta import DataFileMeta
from pypaimon.snapshot.snapshot import BATCH_COMMIT_IDENTIFIER
from pypaimon.write.table_update_by_row_id import TableUpdateByRowId

pa_schema = pa.schema([
('id', pa.int32()),
('name', pa.string()),
('picture', pa.large_binary()),
])
opts = {
'row-tracking.enabled': 'true',
'data-evolution.enabled': 'true',
'target-file-size': '1KB',
}
s = Schema.from_pyarrow_schema(pa_schema, options=opts)
table_name = 'test_db.blob_de_seq_rolling'
self.catalog.create_table(table_name, s, False)

write_schema = pa.schema([
('id', pa.int32()),
('name', pa.string()),
])
table = self.catalog.get_table(table_name)
wb = table.new_batch_write_builder()
w = wb.new_write().with_write_type(['id', 'name'])
for start in (0, 1000):
ids = list(range(start, start + 1000))
w.write_arrow(pa.Table.from_pydict(
{
'id': ids,
'name': [f'name_{i}_' + 'x' * 2048 for i in ids],
},
schema=write_schema,
))
commit_messages = w.prepare_commit()
normal_files = [
f for msg in commit_messages for f in msg.new_files
if not DataFileMeta.is_blob_file(f.file_name)
and not DataFileMeta.is_vector_file(f.file_name)
]
self.assertGreaterEqual(len(normal_files), 2)
for file in normal_files:
self.assertEqual(file.min_sequence_number, 0)
self.assertEqual(file.max_sequence_number, file.row_count - 1)
wb.new_commit().commit(commit_messages)
w.close()

table = self.catalog.get_table(table_name)
rb = table.new_read_builder().with_projection(['id', 'name', '_ROW_ID'])
splits = rb.new_scan().plan().splits()
source = rb.new_read().to_arrow(splits).sort_by('id')

update_data = pa.table({
'_ROW_ID': source.column('_ROW_ID'),
'name': pa.array(['updated'] * source.num_rows, type=pa.string()),
})
updater = TableUpdateByRowId(
table, '_test_', BATCH_COMMIT_IDENTIFIER,
)
msgs = updater.update_columns(update_data, ['name'])
update_normal_files = [
f for msg in msgs for f in msg.new_files
if not DataFileMeta.is_blob_file(f.file_name)
and not DataFileMeta.is_vector_file(f.file_name)
]
self.assertGreaterEqual(len(update_normal_files), 2)
for file in update_normal_files:
self.assertEqual(file.min_sequence_number, 0)
self.assertEqual(file.max_sequence_number, file.row_count - 1)
table.new_batch_write_builder().new_commit().commit(msgs)

table = self.catalog.get_table(table_name)
rb = table.new_read_builder().with_projection(['id', 'name'])
splits = rb.new_scan().plan().splits()
result = rb.new_read().to_arrow(splits).sort_by('id').to_pydict()
self.assertEqual(result['id'], list(range(2000)))
self.assertEqual(result['name'], ['updated'] * 2000)


class GetBlobTest(unittest.TestCase):

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1957,7 +1957,6 @@ def test_self_merge_multi_clause_fall_through(self):
self.assertEqual(out['name'], ['old', 'young', 'senior'])
self.assertEqual(out['age'], [10, 20, 30])

@unittest.skip("blocked by blob DE sequence bug fix, see PR #8147")
@unittest.skipIf(_SKIP_CONDITION, _SKIP_REASON)
def test_self_merge_blob_source_condition(self):
blob_schema = pa.schema([
Expand Down
131 changes: 131 additions & 0 deletions paimon-python/pypaimon/tests/vector_table_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,137 @@ def test_vector_dedicated_format_write_read_lance(self):

self.assertEqual(result.num_rows, 3)

def test_vector_table_partial_update_non_vector_column(self):
vector_schema = pa.schema([
('id', pa.int32()),
('name', pa.string()),
('embedding', pa.list_(pa.float32(), 4)),
])
opts = {
'row-tracking.enabled': 'true',
'data-evolution.enabled': 'true',
'vector.file.format': 'parquet',
}
s = Schema.from_pyarrow_schema(vector_schema, options=opts)
table_name = 'test_db.vector_de_seq'
self.catalog.create_table(table_name, s, False)

table = self.catalog.get_table(table_name)
wb = table.new_batch_write_builder()
w = wb.new_write()
w.write_arrow(pa.Table.from_pydict(
{
'id': [1, 2],
'name': ['a', 'b'],
'embedding': [[1.0, 2.0, 3.0, 4.0], [5.0, 6.0, 7.0, 8.0]],
},
schema=vector_schema,
))
wb.new_commit().commit(w.prepare_commit())
w.close()

from pypaimon.snapshot.snapshot import BATCH_COMMIT_IDENTIFIER
from pypaimon.write.table_update_by_row_id import TableUpdateByRowId

table = self.catalog.get_table(table_name)
rb = table.new_read_builder()
rb = rb.with_projection(['name', '_ROW_ID'])
splits = rb.new_scan().plan().splits()
source = rb.new_read().to_arrow(splits)

update_data = pa.table({
'_ROW_ID': source.column('_ROW_ID'),
'name': pa.array(['updated', 'updated'], type=pa.string()),
})
updater = TableUpdateByRowId(
table, '_test_', BATCH_COMMIT_IDENTIFIER,
)
msgs = updater.update_columns(update_data, ['name'])
table.new_batch_write_builder().new_commit().commit(msgs)

table = self.catalog.get_table(table_name)
rb = table.new_read_builder()
splits = rb.new_scan().plan().splits()
result = rb.new_read().to_arrow(splits).sort_by('id').to_pydict()
self.assertEqual(result['name'], ['updated', 'updated'])

def test_vector_table_partial_update_non_vector_column_with_rolling_files(self):
from pypaimon.snapshot.snapshot import BATCH_COMMIT_IDENTIFIER
from pypaimon.write.table_update_by_row_id import TableUpdateByRowId

vector_schema = pa.schema([
('id', pa.int32()),
('name', pa.string()),
('embedding', pa.list_(pa.float32(), 4)),
])
opts = {
'row-tracking.enabled': 'true',
'data-evolution.enabled': 'true',
'vector.file.format': 'parquet',
'target-file-size': '1KB',
}
s = Schema.from_pyarrow_schema(vector_schema, options=opts)
table_name = 'test_db.vector_de_seq_rolling'
self.catalog.create_table(table_name, s, False)

write_schema = pa.schema([
('id', pa.int32()),
('name', pa.string()),
])
table = self.catalog.get_table(table_name)
wb = table.new_batch_write_builder()
w = wb.new_write().with_write_type(['id', 'name'])
for start in (0, 1000):
ids = list(range(start, start + 1000))
w.write_arrow(pa.Table.from_pydict(
{
'id': ids,
'name': [f'name_{i}_' + 'x' * 2048 for i in ids],
},
schema=write_schema,
))
commit_messages = w.prepare_commit()
normal_files = [
f for msg in commit_messages for f in msg.new_files
if not DataFileMeta.is_vector_file(f.file_name)
]
self.assertGreaterEqual(len(normal_files), 2)
for file in normal_files:
self.assertEqual(file.min_sequence_number, 0)
self.assertEqual(file.max_sequence_number, file.row_count - 1)
wb.new_commit().commit(commit_messages)
w.close()

table = self.catalog.get_table(table_name)
rb = table.new_read_builder().with_projection(['id', 'name', '_ROW_ID'])
splits = rb.new_scan().plan().splits()
source = rb.new_read().to_arrow(splits).sort_by('id')

update_data = pa.table({
'_ROW_ID': source.column('_ROW_ID'),
'name': pa.array(['updated'] * source.num_rows, type=pa.string()),
})
updater = TableUpdateByRowId(
table, '_test_', BATCH_COMMIT_IDENTIFIER,
)
msgs = updater.update_columns(update_data, ['name'])
update_normal_files = [
f for msg in msgs for f in msg.new_files
if not DataFileMeta.is_vector_file(f.file_name)
]
self.assertGreaterEqual(len(update_normal_files), 2)
for file in update_normal_files:
self.assertEqual(file.min_sequence_number, 0)
self.assertEqual(file.max_sequence_number, file.row_count - 1)
table.new_batch_write_builder().new_commit().commit(msgs)

table = self.catalog.get_table(table_name)
rb = table.new_read_builder().with_projection(['id', 'name'])
splits = rb.new_scan().plan().splits()
result = rb.new_read().to_arrow(splits).sort_by('id').to_pydict()
self.assertEqual(result['id'], list(range(2000)))
self.assertEqual(result['name'], ['updated'] * 2000)


if __name__ == '__main__':
unittest.main()
6 changes: 4 additions & 2 deletions paimon-python/pypaimon/write/writer/data_vector_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,8 @@ def _write_normal_data_to_file(self, data: pa.Table) -> Optional[DataFileMeta]:
stats_columns = self.normal_columns if metadata_stats_enabled else []
value_stats = self._collect_value_stats(data, stats_columns)

min_seq, max_seq = self._append_file_sequence_range(data.num_rows)

return DataFileMeta.create(
file_name=file_name,
file_size=self.file_io.get_file_size(file_path),
Expand All @@ -234,8 +236,8 @@ def _write_normal_data_to_file(self, data: pa.Table) -> Optional[DataFileMeta]:
max_key=GenericRow([], []),
key_stats=SimpleStats.empty_stats(),
value_stats=value_stats,
min_sequence_number=-1,
max_sequence_number=-1,
min_sequence_number=min_seq,
max_sequence_number=max_seq,
schema_id=self.table.table_schema.id,
level=0,
extra_files=[],
Expand Down
10 changes: 10 additions & 0 deletions paimon-python/pypaimon/write/writer/data_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,16 @@ def _process_data(self, data: pa.RecordBatch) -> pa.RecordBatch:
def _merge_data(self, existing_data: pa.RecordBatch, new_data: pa.RecordBatch) -> pa.RecordBatch:
"""Merge existing data with new data. Must be implemented by subclasses."""

def _append_file_sequence_range(self, row_count: int) -> Tuple[int, int]:
if row_count <= 0:
raise ValueError("row_count must be positive")

if self.options.data_evolution_enabled(False):
# Row-tracking commit stamps this sentinel range with the snapshot id.
return 0, row_count - 1

return -1, -1

def _check_and_roll_if_needed(self):
while self.pending_data is not None:
current_size = self.pending_data.nbytes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@ def _create_data_file_meta(self, file_name: str, file_path: str, data: pa.Table,
stats_columns = self.normal_columns if metadata_stats_enabled else []
value_stats = self._collect_value_stats(data, stats_columns)

self.sequence_generator.start = self.sequence_generator.current
min_seq, max_seq = self._append_file_sequence_range(data.num_rows)

return DataFileMeta.create(
file_name=file_name,
Expand All @@ -445,8 +445,8 @@ def _create_data_file_meta(self, file_name: str, file_path: str, data: pa.Table,
max_key=GenericRow([], []),
key_stats=SimpleStats.empty_stats(),
value_stats=value_stats,
min_sequence_number=-1,
max_sequence_number=-1,
min_sequence_number=min_seq,
max_sequence_number=max_seq,
schema_id=self.table.table_schema.id,
level=0,
extra_files=[],
Expand Down
Loading