Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 26 additions & 7 deletions paimon-python/pypaimon/read/reader/format_pyarrow_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@

from pypaimon.common.file_io import FileIO
from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader
from pypaimon.schema.data_types import DataField, PyarrowFieldParser
from pypaimon.table.special_fields import SpecialFields


class FormatPyArrowReader(RecordBatchReader):
Expand All @@ -32,16 +34,18 @@ class FormatPyArrowReader(RecordBatchReader):
and filters it based on the provided predicate and projection.
"""

def __init__(self, file_io: FileIO, file_format: str, file_path: str, read_fields: List[str],
def __init__(self, file_io: FileIO, file_format: str, file_path: str,
read_fields: List[DataField],
push_down_predicate: Any, batch_size: int = 1024):
file_path_for_pyarrow = file_io.to_filesystem_path(file_path)
self.dataset = ds.dataset(file_path_for_pyarrow, format=file_format, filesystem=file_io.filesystem)
self.read_fields = read_fields
self._read_field_names = [f.name for f in read_fields]

# Identify which fields exist in the file and which are missing
file_schema_names = set(self.dataset.schema.names)
self.existing_fields = [field for field in read_fields if field in file_schema_names]
self.missing_fields = [field for field in read_fields if field not in file_schema_names]
self.existing_fields = [f.name for f in read_fields if f.name in file_schema_names]
self.missing_fields = [f.name for f in read_fields if f.name not in file_schema_names]

# Only pass existing fields to PyArrow scanner to avoid errors
self.reader = self.dataset.scanner(
Expand All @@ -50,20 +54,33 @@ def __init__(self, file_io: FileIO, file_format: str, file_path: str, read_field
batch_size=batch_size
).to_reader()

self._output_schema = (
PyarrowFieldParser.from_paimon_schema(read_fields) if read_fields else None
)

def read_arrow_batch(self) -> Optional[RecordBatch]:
try:
batch = self.reader.read_next_batch()

if not self.missing_fields:
return batch

# Create columns for missing fields with null values
missing_columns = [pa.nulls(batch.num_rows, type=pa.null()) for _ in self.missing_fields]
def _type_for_missing(name: str) -> pa.DataType:
if self._output_schema is not None:
idx = self._output_schema.get_field_index(name)
if idx >= 0:
return self._output_schema.field(idx).type
return pa.null()

missing_columns = [
pa.nulls(batch.num_rows, type=_type_for_missing(name))
for name in self.missing_fields
]

# Reconstruct the batch with all fields in the correct order
all_columns = []
out_fields = []
for field_name in self.read_fields:
for field_name in self._read_field_names:
if field_name in self.existing_fields:
# Get the column from the existing batch
column_idx = self.existing_fields.index(field_name)
Expand All @@ -72,8 +89,10 @@ def read_arrow_batch(self) -> Optional[RecordBatch]:
else:
# Get the column from missing fields
column_idx = self.missing_fields.index(field_name)
col_type = _type_for_missing(field_name)
all_columns.append(missing_columns[column_idx])
out_fields.append(pa.field(field_name, pa.null(), nullable=True))
nullable = not SpecialFields.is_system_field(field_name)
out_fields.append(pa.field(field_name, col_type, nullable=nullable))
# Create a new RecordBatch with all columns
return pa.RecordBatch.from_arrays(all_columns, schema=pa.schema(out_fields))

Expand Down
7 changes: 5 additions & 2 deletions paimon-python/pypaimon/read/split_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,11 @@ def file_reader_supplier(self, file: DataFileMeta, for_merge_read: bool,
format_reader = FormatLanceReader(self.table.file_io, file_path, read_file_fields,
read_arrow_predicate, batch_size=batch_size)
elif file_format == CoreOptions.FILE_FORMAT_PARQUET or file_format == CoreOptions.FILE_FORMAT_ORC:
format_reader = FormatPyArrowReader(self.table.file_io, file_format, file_path,
read_file_fields, read_arrow_predicate, batch_size=batch_size)
name_to_field = {f.name: f for f in self.read_fields}
ordered_read_fields = [name_to_field[n] for n in read_file_fields if n in name_to_field]
format_reader = FormatPyArrowReader(
self.table.file_io, file_format, file_path,
ordered_read_fields, read_arrow_predicate, batch_size=batch_size)
else:
raise ValueError(f"Unexpected file format: {file_format}")

Expand Down
130 changes: 125 additions & 5 deletions paimon-python/pypaimon/tests/data_evolution_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
from pypaimon import CatalogFactory, Schema
from pypaimon.common.predicate import Predicate
from pypaimon.manifest.manifest_list_manager import ManifestListManager
from pypaimon.read.read_builder import ReadBuilder
from pypaimon.snapshot.snapshot_manager import SnapshotManager
from pypaimon.table.row.offset_row import OffsetRow

Expand Down Expand Up @@ -141,13 +140,63 @@ def test_basic(self):
('f1', pa.int16()),
]))
self.assertEqual(actual_data, expect_data)
self.assertEqual(
len(actual_data.schema), len(expect_data.schema),
'Read output column count must match schema')
self.assertEqual(
actual_data.schema.names, expect_data.schema.names,
'Read output column names must match schema')

def test_partitioned_read_requested_column_missing_in_file(self):
pa_schema = pa.schema([('f0', pa.int32()), ('f1', pa.string()), ('dt', pa.string())])
schema = Schema.from_pyarrow_schema(
pa_schema,
partition_keys=['dt'],
options={'row-tracking.enabled': 'true', 'data-evolution.enabled': 'true'}
)
self.catalog.create_table('default.test_partition_missing_col', schema, False)
table = self.catalog.get_table('default.test_partition_missing_col')
wb = table.new_batch_write_builder()

# assert manifest file meta contains min and max row id
tw1 = wb.new_write()
tc1 = wb.new_commit()
tw1.write_arrow(pa.Table.from_pydict(
{'f0': [1, 2], 'f1': ['a', 'b'], 'dt': ['p1', 'p1']},
schema=pa_schema
))
tc1.commit(tw1.prepare_commit())
tw1.close()
tc1.close()

tw2 = wb.new_write().with_write_type(['f0', 'dt'])
tc2 = wb.new_commit()
# Row key extractor uses table column indices; pass table-ordered data with null for f1
tw2.write_arrow(pa.Table.from_pydict(
{'f0': [3, 4], 'f1': [None, None], 'dt': ['p1', 'p1']},
schema=pa_schema
))
tc2.commit(tw2.prepare_commit())
tw2.close()
tc2.close()

actual = table.new_read_builder().new_read().to_arrow(table.new_read_builder().new_scan().plan().splits())
self.assertEqual(len(actual.schema), 3, 'Must have f0, f1, dt (no silent drop when f1 missing in file)')
self.assertEqual(actual.schema.names, ['f0', 'f1', 'dt'])
self.assertEqual(actual.num_rows, 4)
f1_col = actual.column('f1')
self.assertEqual(f1_col[0].as_py(), 'a')
self.assertEqual(f1_col[1].as_py(), 'b')
self.assertIsNone(f1_col[2].as_py())
self.assertIsNone(f1_col[3].as_py())

# Assert manifest file meta contains min and max row id
manifest_list_manager = ManifestListManager(table)
snapshot_manager = SnapshotManager(table)
manifest = manifest_list_manager.read(snapshot_manager.get_latest_snapshot().delta_manifest_list)[0]
self.assertEqual(0, manifest.min_row_id)
self.assertEqual(1, manifest.max_row_id)
all_manifests = manifest_list_manager.read_all(snapshot_manager.get_latest_snapshot())
first_commit = next((m for m in all_manifests if m.min_row_id == 0 and m.max_row_id == 1), None)
self.assertIsNotNone(first_commit, "Should have a manifest with min_row_id=0, max_row_id=1")
second_commit = next((m for m in all_manifests if m.min_row_id == 2 and m.max_row_id == 3), None)
self.assertIsNotNone(second_commit, "Should have a manifest with min_row_id=2, max_row_id=3")

def test_merge_reader(self):
from pypaimon.read.reader.concat_batch_reader import MergeAllBatchReader
Expand Down Expand Up @@ -280,6 +329,14 @@ def test_with_slice(self):
[2, 1001, 2001],
"with_slice(1, 4) should return id in (2, 1001, 2001). Got ids=%s" % ids,
)
scan_oob = rb.new_scan().with_slice(10, 12)
splits_oob = scan_oob.plan().splits()
result_oob = rb.new_read().to_pandas(splits_oob)
self.assertEqual(
len(result_oob),
0,
"with_slice(10, 12) on 6 rows should return 0 rows (out of bounds), got %d" % len(result_oob),
)

# Out-of-bounds slice: 6 rows total, slice(10, 12) should return 0 rows
scan_oob = rb.new_scan().with_slice(10, 12)
Expand Down Expand Up @@ -439,6 +496,8 @@ def test_multiple_appends(self):
'f2': ['b'] * 100 + ['y'] + ['d'],
}, schema=simple_pa_schema)
self.assertEqual(actual, expect)
self.assertEqual(len(actual.schema), len(expect.schema), 'Merge read output column count must match schema')
self.assertEqual(actual.schema.names, expect.schema.names, 'Merge read output column names must match schema')

def test_disorder_cols_append(self):
simple_pa_schema = pa.schema([
Expand Down Expand Up @@ -1175,6 +1234,7 @@ def test_read_row_tracking_metadata(self):
pa.field('_SEQUENCE_NUMBER', pa.int64(), nullable=False),
]))
self.assertEqual(actual_data, expect_data)
self.assertEqual(len(actual_data.schema), len(expect_data.schema), 'Read output column count must match schema')

# write 2
table_write = write_builder.new_write().with_write_type(['f0'])
Expand Down Expand Up @@ -1210,6 +1270,66 @@ def test_read_row_tracking_metadata(self):
pa.field('_SEQUENCE_NUMBER', pa.int64(), nullable=False),
]))
self.assertEqual(actual_data, expect_data)
self.assertEqual(len(actual_data.schema), len(expect_data.schema), 'Read output column count must match schema')

def test_with_blob(self):
from pypaimon.table.row.blob import BlobDescriptor

pa_schema = pa.schema([
('id', pa.int32()),
('picture', pa.large_binary()),
])
schema = Schema.from_pyarrow_schema(
pa_schema,
options={
'row-tracking.enabled': 'true',
'data-evolution.enabled': 'true',
'blob-as-descriptor': 'true',
},
)
self.catalog.create_table('default.test_with_blob', schema, False)
table = self.catalog.get_table('default.test_with_blob')

blob_path = os.path.join(self.tempdir, 'blob_ev')
with open(blob_path, 'wb') as f:
f.write(b'x')
descriptor = BlobDescriptor(blob_path, 0, 1)

wb = table.new_batch_write_builder()
tw = wb.new_write()
tc = wb.new_commit()
tw.write_arrow(pa.Table.from_pydict(
{'id': [1], 'picture': [descriptor.serialize()]},
schema=pa_schema,
))
cmts = tw.prepare_commit()
if cmts and cmts[0].new_files:
for nf in cmts[0].new_files:
nf.first_row_id = 0
tc.commit(cmts)
tw.close()
tc.close()

tw = wb.new_write()
tc = wb.new_commit()
tw.write_arrow(pa.Table.from_pydict(
{'id': [2], 'picture': [descriptor.serialize()]},
schema=pa_schema,
))
cmts = tw.prepare_commit()
if cmts and cmts[0].new_files:
for nf in cmts[0].new_files:
nf.first_row_id = 1
tc.commit(cmts)
tw.close()
tc.close()

rb = table.new_read_builder()
rb.with_projection(['id', '_ROW_ID', 'picture', '_SEQUENCE_NUMBER'])
actual = rb.new_read().to_arrow(rb.new_scan().plan().splits())
self.assertEqual(actual.num_rows, 2)
self.assertEqual(actual.column('id').to_pylist(), [1, 2])
self.assertEqual(actual.column('_ROW_ID').to_pylist(), [0, 1])

def test_from_arrays_without_schema(self):
schema = pa.schema([
Expand Down
Loading