diff --git a/paimon-python/pypaimon/ray/ray_paimon.py b/paimon-python/pypaimon/ray/ray_paimon.py index 0e796dd7c223..d0e7706c8347 100644 --- a/paimon-python/pypaimon/ray/ray_paimon.py +++ b/paimon-python/pypaimon/ray/ray_paimon.py @@ -56,6 +56,7 @@ def read_paimon( limit: Optional[int] = None, snapshot_id: Optional[int] = None, tag_name: Optional[str] = None, + dynamic_options: Optional[Dict[str, str]] = None, ray_remote_args: Optional[Dict[str, Any]] = None, concurrency: Optional[int] = None, override_num_blocks: Optional[int] = None, @@ -74,6 +75,7 @@ def read_paimon( exclusive with ``tag_name``. tag_name: Optional tag name to time-travel to. Mutually exclusive with ``snapshot_id``. + dynamic_options: Optional dynamic options to override at read time. ray_remote_args: Optional kwargs passed to ``ray.remote`` in read tasks. concurrency: Optional max number of Ray read tasks to run concurrently. override_num_blocks: Optional override for the number of output blocks. @@ -106,6 +108,7 @@ def read_paimon( limit=limit, snapshot_id=snapshot_id, tag_name=tag_name, + dynamic_options=dynamic_options, ) if not split_provider.splits(): diff --git a/paimon-python/pypaimon/read/datasource/split_provider.py b/paimon-python/pypaimon/read/datasource/split_provider.py index e981eebb96e4..430eeb7cac1e 100644 --- a/paimon-python/pypaimon/read/datasource/split_provider.py +++ b/paimon-python/pypaimon/read/datasource/split_provider.py @@ -85,15 +85,34 @@ def __init__( limit: Optional[int] = None, snapshot_id: Optional[int] = None, tag_name: Optional[str] = None, + dynamic_options: Optional[Dict[str, str]] = None, ): if not table_identifier: raise ValueError("table_identifier is required") if catalog_options is None: raise ValueError("catalog_options is required") + from pypaimon.snapshot.time_travel_util import SCAN_KEYS + scan_keys = set(SCAN_KEYS) + if snapshot_id is not None and tag_name is not None: raise ValueError( "snapshot_id and tag_name cannot be set at the same time" ) + + if dynamic_options: + dynamic_tt_keys = scan_keys & dynamic_options.keys() + if (snapshot_id is not None or tag_name is not None) and dynamic_tt_keys: + raise ValueError( + "snapshot_id/tag_name and dynamic_options " + "time-travel keys cannot be set at the same time, " + "got: {}".format(", ".join(sorted(dynamic_tt_keys))) + ) + if len(dynamic_tt_keys) > 1: + raise ValueError( + "dynamic_options contains multiple time-travel " + "keys which are mutually exclusive: {}".format( + ", ".join(sorted(dynamic_tt_keys))) + ) self._table_identifier = table_identifier self._catalog_options = catalog_options self._predicate = predicate @@ -101,6 +120,7 @@ def __init__( self._limit = limit self._snapshot_id = snapshot_id self._tag_name = tag_name + self._dynamic_options = dynamic_options self._table_cached = None self._splits_cached = None self._read_type_cached = None @@ -110,13 +130,15 @@ def _ensure_table(self): from pypaimon.catalog.catalog_factory import CatalogFactory catalog = CatalogFactory.create(self._catalog_options) table = catalog.get_table(self._table_identifier) - travel_options = {} + dynamic_options = {} if self._snapshot_id is not None: - travel_options["scan.snapshot-id"] = str(self._snapshot_id) + dynamic_options["scan.snapshot-id"] = str(self._snapshot_id) if self._tag_name is not None: - travel_options["scan.tag-name"] = self._tag_name - if travel_options: - table = table.copy(travel_options) + dynamic_options["scan.tag-name"] = self._tag_name + if self._dynamic_options: + dynamic_options.update(self._dynamic_options) + if dynamic_options: + table = table.copy(dynamic_options) self._table_cached = table return self._table_cached diff --git a/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py b/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py index b70ef40a58e9..4782b1dc9342 100644 --- a/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py +++ b/paimon-python/pypaimon/tests/ray_data_evolution_merge_into_test.py @@ -523,7 +523,8 @@ def test_blob_table_feature_update(self): ('feature', pa.int32()), ]) name = f'default.tbl_{uuid.uuid4().hex[:8]}' - schema = Schema.from_pyarrow_schema(blob_schema, options=self.de_options) + schema = Schema.from_pyarrow_schema( + blob_schema, options=self.de_options) self.catalog.create_table(name, schema, False) self._write( name, @@ -586,6 +587,84 @@ def compute_feature(batch): 'num_matched': 2, 'num_inserted': 0, 'num_unchanged': 0, }) + def test_blob_descriptor_resolve_and_merge(self): + from pypaimon.table.row.blob import BlobDescriptor, Blob + from pypaimon.common.uri_reader import UriReaderFactory + + blob_schema = pa.schema([ + ('id', pa.int32()), + ('payload', pa.large_binary()), + ('feature', pa.int32()), + ]) + name = f'default.tbl_{uuid.uuid4().hex[:8]}' + schema = Schema.from_pyarrow_schema( + blob_schema, options=self.de_options) + self.catalog.create_table(name, schema, False) + self._write( + name, + pa.Table.from_pydict( + { + 'id': pa.array([1, 2, 3], type=pa.int32()), + 'payload': [b'aa', b'bbb', b'cccc'], + 'feature': pa.array([10, 20, 30], type=pa.int32()), + }, + schema=blob_schema, + ), + ) + + num_partitions = _TEST_NUM_PARTITIONS + input_ids = ray.data.from_arrow(pa.Table.from_pydict({ + 'id': pa.array([1, 3], type=pa.int32()), + })) + + target_rows = read_paimon( + name, + self.catalog_options, + projection=['id', 'payload'], + dynamic_options={'blob-as-descriptor': 'true'}, + ) + + matched = input_ids.join( + target_rows, join_type='inner', + num_partitions=num_partitions, on=['id'], + ) + + uri_factory = UriReaderFactory(self.catalog_options) + + def resolve_and_compute(batch): + features = [] + for desc_bytes in batch['payload'].to_pylist(): + desc = BlobDescriptor.deserialize(desc_bytes) + reader = uri_factory.create(desc.uri) + data = Blob.from_descriptor(reader, desc).to_data() + features.append(len(data) * 100) + return pa.Table.from_pydict({ + 'id': batch['id'], + 'new_feature': pa.array(features, type=pa.int32()), + }) + + updates = matched.map_batches( + resolve_and_compute, batch_format='pyarrow') + metrics = merge_into( + target=name, + source=updates, + catalog_options=self.catalog_options, + on=['id'], + when_matched=[ + WhenMatched(update={'feature': source_col('new_feature')}) + ], + num_partitions=num_partitions, + ) + + table = self.catalog.get_table(name) + rb = table.new_read_builder() + splits = rb.new_scan().plan().splits() + out = rb.new_read().to_arrow(splits).sort_by('id').to_pydict() + self.assertEqual(out['id'], [1, 2, 3]) + self.assertEqual(out['feature'], [200, 20, 400]) + self.assertEqual(out['payload'], [b'aa', b'bbb', b'cccc']) + self.assertEqual(metrics['num_matched'], 2) + def test_combined_writes_single_snapshot(self): target = self._create_table() self._write( diff --git a/paimon-python/pypaimon/tests/split_provider_test.py b/paimon-python/pypaimon/tests/split_provider_test.py index 19525f8e5eef..61e14005f5fc 100644 --- a/paimon-python/pypaimon/tests/split_provider_test.py +++ b/paimon-python/pypaimon/tests/split_provider_test.py @@ -227,6 +227,44 @@ def test_catalog_provider_time_travel_by_tag_name(self): rows = tr.to_arrow(provider.splits()).to_pylist() self.assertEqual([r['id'] for r in rows], [11]) + def test_dynamic_options_blob_as_descriptor(self): + pa_schema = pa.schema([ + ('id', pa.int32()), + ('picture', pa.large_binary()), + ]) + identifier = 'default.split_provider_blob_desc' + schema = Schema.from_pyarrow_schema(pa_schema, options={ + 'blob-as-descriptor': 'false', + 'row-tracking.enabled': 'true', + 'data-evolution.enabled': 'true', + }) + catalog = CatalogFactory.create(self.catalog_options) + catalog.create_table(identifier, schema, False) + + provider = CatalogSplitProvider( + table_identifier=identifier, + catalog_options=self.catalog_options, + dynamic_options={'blob-as-descriptor': 'true'}, + ) + table = provider.table() + self.assertTrue(table.options.blob_as_descriptor()) + + def test_dynamic_options_rejects_tt_conflict(self): + args = dict(table_identifier=self.identifier, + catalog_options=self.catalog_options) + with self.assertRaises(ValueError): + CatalogSplitProvider( + **args, snapshot_id=1, + dynamic_options={'scan.tag-name': 'v1'}) + with self.assertRaises(ValueError): + CatalogSplitProvider( + **args, tag_name='v1', + dynamic_options={'scan.snapshot-id': '1'}) + with self.assertRaises(ValueError): + CatalogSplitProvider( + **args, dynamic_options={ + 'scan.snapshot-id': '1', 'scan.tag-name': 'v1'}) + def test_pre_resolved_provider_returns_inputs(self): """PreResolvedSplitProvider just hands back what it was given.""" catalog = CatalogFactory.create(self.catalog_options)