Skip to content
3 changes: 3 additions & 0 deletions paimon-python/pypaimon/ray/ray_paimon.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ def read_paimon(
limit: Optional[int] = None,
snapshot_id: Optional[int] = None,
tag_name: Optional[str] = None,
dynamic_options: Optional[Dict[str, str]] = None,
ray_remote_args: Optional[Dict[str, Any]] = None,
concurrency: Optional[int] = None,
override_num_blocks: Optional[int] = None,
Expand All @@ -74,6 +75,7 @@ def read_paimon(
exclusive with ``tag_name``.
tag_name: Optional tag name to time-travel to. Mutually
exclusive with ``snapshot_id``.
dynamic_options: Optional dynamic options to override at read time.
ray_remote_args: Optional kwargs passed to ``ray.remote`` in read tasks.
concurrency: Optional max number of Ray read tasks to run concurrently.
override_num_blocks: Optional override for the number of output blocks.
Expand Down Expand Up @@ -106,6 +108,7 @@ def read_paimon(
limit=limit,
snapshot_id=snapshot_id,
tag_name=tag_name,
dynamic_options=dynamic_options,
)

if not split_provider.splits():
Expand Down
32 changes: 27 additions & 5 deletions paimon-python/pypaimon/read/datasource/split_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,22 +85,42 @@ def __init__(
limit: Optional[int] = None,
snapshot_id: Optional[int] = None,
tag_name: Optional[str] = None,
dynamic_options: Optional[Dict[str, str]] = None,
):
if not table_identifier:
raise ValueError("table_identifier is required")
if catalog_options is None:
raise ValueError("catalog_options is required")
from pypaimon.snapshot.time_travel_util import SCAN_KEYS
scan_keys = set(SCAN_KEYS)

if snapshot_id is not None and tag_name is not None:
raise ValueError(
"snapshot_id and tag_name cannot be set at the same time"
)

if dynamic_options:
dynamic_tt_keys = scan_keys & dynamic_options.keys()
if (snapshot_id is not None or tag_name is not None) and dynamic_tt_keys:
raise ValueError(
"snapshot_id/tag_name and dynamic_options "
"time-travel keys cannot be set at the same time, "
"got: {}".format(", ".join(sorted(dynamic_tt_keys)))
)
if len(dynamic_tt_keys) > 1:
raise ValueError(
"dynamic_options contains multiple time-travel "
"keys which are mutually exclusive: {}".format(
", ".join(sorted(dynamic_tt_keys)))
)
self._table_identifier = table_identifier
self._catalog_options = catalog_options
self._predicate = predicate
self._projection = projection
self._limit = limit
self._snapshot_id = snapshot_id
self._tag_name = tag_name
self._dynamic_options = dynamic_options
self._table_cached = None
self._splits_cached = None
self._read_type_cached = None
Expand All @@ -110,13 +130,15 @@ def _ensure_table(self):
from pypaimon.catalog.catalog_factory import CatalogFactory
catalog = CatalogFactory.create(self._catalog_options)
table = catalog.get_table(self._table_identifier)
travel_options = {}
dynamic_options = {}
if self._snapshot_id is not None:
travel_options["scan.snapshot-id"] = str(self._snapshot_id)
dynamic_options["scan.snapshot-id"] = str(self._snapshot_id)
if self._tag_name is not None:
travel_options["scan.tag-name"] = self._tag_name
if travel_options:
table = table.copy(travel_options)
dynamic_options["scan.tag-name"] = self._tag_name
if self._dynamic_options:
dynamic_options.update(self._dynamic_options)
if dynamic_options:
table = table.copy(dynamic_options)
self._table_cached = table
return self._table_cached

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,8 @@ def test_blob_table_feature_update(self):
('feature', pa.int32()),
])
name = f'default.tbl_{uuid.uuid4().hex[:8]}'
schema = Schema.from_pyarrow_schema(blob_schema, options=self.de_options)
schema = Schema.from_pyarrow_schema(
blob_schema, options=self.de_options)
self.catalog.create_table(name, schema, False)
self._write(
name,
Expand Down Expand Up @@ -586,6 +587,84 @@ def compute_feature(batch):
'num_matched': 2, 'num_inserted': 0, 'num_unchanged': 0,
})

def test_blob_descriptor_resolve_and_merge(self):
from pypaimon.table.row.blob import BlobDescriptor, Blob
from pypaimon.common.uri_reader import UriReaderFactory

blob_schema = pa.schema([
('id', pa.int32()),
('payload', pa.large_binary()),
('feature', pa.int32()),
])
name = f'default.tbl_{uuid.uuid4().hex[:8]}'
schema = Schema.from_pyarrow_schema(
blob_schema, options=self.de_options)
self.catalog.create_table(name, schema, False)
self._write(
name,
pa.Table.from_pydict(
{
'id': pa.array([1, 2, 3], type=pa.int32()),
'payload': [b'aa', b'bbb', b'cccc'],
'feature': pa.array([10, 20, 30], type=pa.int32()),
},
schema=blob_schema,
),
)

num_partitions = _TEST_NUM_PARTITIONS
input_ids = ray.data.from_arrow(pa.Table.from_pydict({
'id': pa.array([1, 3], type=pa.int32()),
}))

target_rows = read_paimon(
name,
self.catalog_options,
projection=['id', 'payload'],
dynamic_options={'blob-as-descriptor': 'true'},
)

matched = input_ids.join(
target_rows, join_type='inner',
num_partitions=num_partitions, on=['id'],
)

uri_factory = UriReaderFactory(self.catalog_options)

def resolve_and_compute(batch):
features = []
for desc_bytes in batch['payload'].to_pylist():
desc = BlobDescriptor.deserialize(desc_bytes)
reader = uri_factory.create(desc.uri)
data = Blob.from_descriptor(reader, desc).to_data()
features.append(len(data) * 100)
return pa.Table.from_pydict({
'id': batch['id'],
'new_feature': pa.array(features, type=pa.int32()),
})

updates = matched.map_batches(
resolve_and_compute, batch_format='pyarrow')
metrics = merge_into(
target=name,
source=updates,
catalog_options=self.catalog_options,
on=['id'],
when_matched=[
WhenMatched(update={'feature': source_col('new_feature')})
],
num_partitions=num_partitions,
)

table = self.catalog.get_table(name)
rb = table.new_read_builder()
splits = rb.new_scan().plan().splits()
out = rb.new_read().to_arrow(splits).sort_by('id').to_pydict()
self.assertEqual(out['id'], [1, 2, 3])
self.assertEqual(out['feature'], [200, 20, 400])
self.assertEqual(out['payload'], [b'aa', b'bbb', b'cccc'])
self.assertEqual(metrics['num_matched'], 2)

def test_combined_writes_single_snapshot(self):
target = self._create_table()
self._write(
Expand Down
38 changes: 38 additions & 0 deletions paimon-python/pypaimon/tests/split_provider_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,44 @@ def test_catalog_provider_time_travel_by_tag_name(self):
rows = tr.to_arrow(provider.splits()).to_pylist()
self.assertEqual([r['id'] for r in rows], [11])

def test_dynamic_options_blob_as_descriptor(self):
pa_schema = pa.schema([
('id', pa.int32()),
('picture', pa.large_binary()),
])
identifier = 'default.split_provider_blob_desc'
schema = Schema.from_pyarrow_schema(pa_schema, options={
'blob-as-descriptor': 'false',
'row-tracking.enabled': 'true',
'data-evolution.enabled': 'true',
})
catalog = CatalogFactory.create(self.catalog_options)
catalog.create_table(identifier, schema, False)

provider = CatalogSplitProvider(
table_identifier=identifier,
catalog_options=self.catalog_options,
dynamic_options={'blob-as-descriptor': 'true'},
)
table = provider.table()
self.assertTrue(table.options.blob_as_descriptor())

def test_dynamic_options_rejects_tt_conflict(self):
args = dict(table_identifier=self.identifier,
catalog_options=self.catalog_options)
with self.assertRaises(ValueError):
CatalogSplitProvider(
**args, snapshot_id=1,
dynamic_options={'scan.tag-name': 'v1'})
with self.assertRaises(ValueError):
CatalogSplitProvider(
**args, tag_name='v1',
dynamic_options={'scan.snapshot-id': '1'})
with self.assertRaises(ValueError):
CatalogSplitProvider(
**args, dynamic_options={
'scan.snapshot-id': '1', 'scan.tag-name': 'v1'})

def test_pre_resolved_provider_returns_inputs(self):
"""PreResolvedSplitProvider just hands back what it was given."""
catalog = CatalogFactory.create(self.catalog_options)
Expand Down
Loading