diff --git a/paimon-python/pypaimon/casting/__init__.py b/paimon-python/pypaimon/casting/__init__.py new file mode 100644 index 000000000000..13a83393a912 --- /dev/null +++ b/paimon-python/pypaimon/casting/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/paimon-python/pypaimon/casting/data_type_casts.py b/paimon-python/pypaimon/casting/data_type_casts.py new file mode 100644 index 000000000000..fe1f6860ffa2 --- /dev/null +++ b/paimon-python/pypaimon/casting/data_type_casts.py @@ -0,0 +1,189 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Type-cast support rules used to validate ``update column type`` schema +changes. + +The rules mirror the engine-wide cast specification so a type change accepted +here is one the read path can also materialize: an *implicit* cast is a safe +widening (e.g. INT -> BIGINT, any numeric -> DECIMAL/DOUBLE), while an +*explicit* cast covers the broader, possibly lossy conversions a user opts into +(e.g. DOUBLE -> INT truncation, anything -> STRING). Read-time execution then +applies the conversion leniently. +""" + +from pypaimon.schema.data_types import (ArrayType, AtomicType, DataTypeParser, + MapType, MultisetType, RowType, + VectorType) + +# ---- Type roots -------------------------------------------------------------- + +CHAR = "CHAR" +VARCHAR = "VARCHAR" +BOOLEAN = "BOOLEAN" +BINARY = "BINARY" +VARBINARY = "VARBINARY" +DECIMAL = "DECIMAL" +TINYINT = "TINYINT" +SMALLINT = "SMALLINT" +INTEGER = "INTEGER" +BIGINT = "BIGINT" +FLOAT = "FLOAT" +DOUBLE = "DOUBLE" +DATE = "DATE" +TIME = "TIME" +TIMESTAMP = "TIMESTAMP" +TIMESTAMP_LTZ = "TIMESTAMP_LTZ" +ARRAY = "ARRAY" +MAP = "MAP" +MULTISET = "MULTISET" +ROW = "ROW" +VECTOR = "VECTOR" +VARIANT = "VARIANT" +BLOB = "BLOB" + +# ---- Families ---------------------------------------------------------------- + +CHARACTER_STRING = {CHAR, VARCHAR} +BINARY_STRING = {BINARY, VARBINARY} +INTEGER_NUMERIC = {TINYINT, SMALLINT, INTEGER, BIGINT} +NUMERIC = INTEGER_NUMERIC | {FLOAT, DOUBLE, DECIMAL} +TIMESTAMP_FAMILY = {TIMESTAMP, TIMESTAMP_LTZ} +TIME_FAMILY = {TIME} +DATETIME = {DATE, TIME, TIMESTAMP, TIMESTAMP_LTZ} +PREDEFINED = { + CHAR, VARCHAR, BOOLEAN, BINARY, VARBINARY, DECIMAL, + TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, DOUBLE, + DATE, TIME, TIMESTAMP, TIMESTAMP_LTZ, +} +CONSTRUCTED = {ARRAY, MAP, MULTISET, ROW, VECTOR} +# Constructed types the read path can render as a character string +# ('{v1, v2}' / '[e1, e2]' / '{k -> v}'). VECTOR and MULTISET have no string +# rendering, so a type change from them to CHAR/VARCHAR is rejected here +# rather than failing when an old file is read. +STRING_RENDERABLE_CONSTRUCTED = {ARRAY, MAP, ROW} + + +def _root(data_type) -> str: + if isinstance(data_type, RowType): + return ROW + if isinstance(data_type, ArrayType): + return ARRAY + if isinstance(data_type, MapType): + return MAP + if isinstance(data_type, MultisetType): + return MULTISET + if isinstance(data_type, VectorType): + return VECTOR + if isinstance(data_type, AtomicType): + t = data_type.type.upper() + if t.startswith("DECIMAL") or t.startswith("NUMERIC") or t.startswith("DEC"): + return DECIMAL + if t in ("INT", "INTEGER"): + return INTEGER + if t in (TINYINT, SMALLINT, BIGINT, FLOAT, DOUBLE, BOOLEAN, DATE): + return t + if t == "STRING" or t.startswith("VARCHAR"): + return VARCHAR + if t.startswith("CHAR"): + return CHAR + if t == "BYTES" or t.startswith("VARBINARY"): + return VARBINARY + if t.startswith("BINARY"): + return BINARY + if t == "BLOB": + return BLOB + if t.startswith("TIMESTAMP_LTZ"): + return TIMESTAMP_LTZ + if t.startswith("TIMESTAMP"): + return TIMESTAMP + if t.startswith("TIME"): + return TIME + if t == "VARIANT": + return VARIANT + return None + + +def _build_rules(): + implicit = {} + explicit = {} + # Identity cast for every root. + for root in (PREDEFINED | CONSTRUCTED | {VARIANT, BLOB}): + implicit[root] = {root} + explicit[root] = set() + + def rule(target, implicit_from=None, explicit_from=None): + implicit[target] |= set(implicit_from or set()) + explicit[target] |= set(explicit_from or set()) + + rule(CHAR, {CHAR}, PREDEFINED | STRING_RENDERABLE_CONSTRUCTED) + rule(VARCHAR, CHARACTER_STRING, PREDEFINED | STRING_RENDERABLE_CONSTRUCTED) + rule(BOOLEAN, {BOOLEAN}, CHARACTER_STRING | INTEGER_NUMERIC) + rule(BINARY, {BINARY}, CHARACTER_STRING | {VARBINARY}) + rule(VARBINARY, BINARY_STRING, CHARACTER_STRING | {BINARY}) + rule(DECIMAL, NUMERIC, CHARACTER_STRING | {BOOLEAN, TIMESTAMP, TIMESTAMP_LTZ}) + int_explicit = NUMERIC | CHARACTER_STRING | {BOOLEAN, TIMESTAMP, TIMESTAMP_LTZ} + rule(TINYINT, {TINYINT}, int_explicit) + rule(SMALLINT, {TINYINT, SMALLINT}, int_explicit) + rule(INTEGER, {TINYINT, SMALLINT, INTEGER}, int_explicit) + rule(BIGINT, {TINYINT, SMALLINT, INTEGER, BIGINT}, int_explicit) + rule(FLOAT, {TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, DECIMAL}, int_explicit) + rule(DOUBLE, NUMERIC, CHARACTER_STRING | {BOOLEAN, TIMESTAMP, TIMESTAMP_LTZ}) + rule(DATE, {DATE, TIMESTAMP}, TIMESTAMP_FAMILY | CHARACTER_STRING) + rule(TIME, {TIME, TIMESTAMP}, TIME_FAMILY | TIMESTAMP_FAMILY | CHARACTER_STRING) + rule(TIMESTAMP, {TIMESTAMP, TIMESTAMP_LTZ}, DATETIME | CHARACTER_STRING | NUMERIC) + rule(TIMESTAMP_LTZ, {TIMESTAMP_LTZ, TIMESTAMP}, DATETIME | CHARACTER_STRING | NUMERIC) + return implicit, explicit + + +_IMPLICIT_RULES, _EXPLICIT_RULES = _build_rules() + + +def supports_cast(source_type, target_type, allow_explicit: bool = True) -> bool: + """Whether ``source_type`` can be cast to ``target_type`` for a column type + change. ``allow_explicit`` permits the broader (possibly lossy) conversions + in addition to the safe widening ones.""" + source_root = _root(source_type) + target_root = _root(target_type) + if source_root is None or target_root is None: + return False + # A NOT NULL target cannot accept a nullable source unless explicitly allowed. + if source_type.nullable and not target_type.nullable and not allow_explicit: + return False + if source_root == target_root: + if source_root in CONSTRUCTED: + # A constructed type is only castable to an (ignoring outer + # nullability) identical constructed type. Reshaping is done + # through sub-field / 'element' / 'value' paths instead: a whole + # ROW replacement would carry caller-supplied nested field ids + # that corrupt the id model, and there is no runtime conversion + # between differently-shaped constructed values. + return _equals_ignore_nullable(source_type, target_type) + return True + if source_root in _IMPLICIT_RULES.get(target_root, set()): + return True + if allow_explicit and source_root in _EXPLICIT_RULES.get(target_root, set()): + return True + return False + + +def _equals_ignore_nullable(source_type, target_type) -> bool: + source_copy = DataTypeParser.parse_data_type(source_type.to_dict()) + target_copy = DataTypeParser.parse_data_type(target_type.to_dict()) + source_copy.nullable = True + target_copy.nullable = True + return source_copy == target_copy diff --git a/paimon-python/pypaimon/read/reader/data_file_batch_reader.py b/paimon-python/pypaimon/read/reader/data_file_batch_reader.py index e3bd51143936..0183396fe704 100644 --- a/paimon-python/pypaimon/read/reader/data_file_batch_reader.py +++ b/paimon-python/pypaimon/read/reader/data_file_batch_reader.py @@ -18,16 +18,94 @@ from typing import List, Optional import pyarrow as pa +import pyarrow.compute as pc from pyarrow import RecordBatch from pypaimon.common.file_io import FileIO from pypaimon.read.partition_info import PartitionInfo from pypaimon.read.reader.format_blob_reader import FormatBlobReader from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader -from pypaimon.schema.data_types import DataField, PyarrowFieldParser +from pypaimon.schema.data_types import (ArrayType, AtomicType, DataField, + MapType, PyarrowFieldParser, RowType) from pypaimon.table.special_fields import SpecialFields +def _is_character_string_type(data_type) -> bool: + if not isinstance(data_type, AtomicType): + return False + t = data_type.type.upper() + return t == 'STRING' or t.startswith('VARCHAR') or t.startswith('CHAR') + + +def _unslice(array): + """Re-materialize a sliced array so offsets/buffers start at zero. + + The list/map rebuilds below read ``offsets``/raw buffers directly; on a + sliced array those still point into the parent storage, which either + errors (list rebuild with a null mask) or silently misaligns rows (map + rebuild from raw buffers).""" + if array.offset == 0: + return array + return pa.concat_arrays([array]) + + +def _to_string_values(array, data_type) -> list: + """Render *array* as a list of per-row strings (None for NULL rows).""" + if isinstance(data_type, (RowType, ArrayType, MapType)): + return _constructed_to_string_array(array, data_type).to_pylist() + return array.cast(pa.string(), safe=False).to_pylist() + + +def _constructed_to_string_array(array, file_type): + """Render a struct/list/map array in the engine's string form: + ROW -> ``{v1, v2}``, ARRAY -> ``[e1, e2]``, MAP -> ``{k1 -> v1, k2 -> v2}``. + Sub-values are rendered recursively; a NULL sub-value renders as the + literal ``null`` while a NULL container row stays NULL.""" + array = _unslice(array) + valid = pc.is_valid(array).to_pylist() + out = [] + if isinstance(file_type, RowType): + children = [ + _to_string_values(array.field(i), sub.type) + for i, sub in enumerate(file_type.fields) + ] + for i in range(len(array)): + if not valid[i]: + out.append(None) + continue + vals = [c[i] if c[i] is not None else 'null' for c in children] + out.append('{' + ', '.join(vals) + '}') + elif isinstance(file_type, ArrayType): + values = _to_string_values(array.values, file_type.element) + offsets = array.offsets.to_pylist() + for i in range(len(array)): + if not valid[i]: + out.append(None) + continue + elems = [v if v is not None else 'null' + for v in values[offsets[i]:offsets[i + 1]]] + out.append('[' + ', '.join(elems) + ']') + elif isinstance(file_type, MapType): + keys = _to_string_values(array.keys, file_type.key) + items = _to_string_values(array.items, file_type.value) + offsets = array.offsets.to_pylist() + for i in range(len(array)): + if not valid[i]: + out.append(None) + continue + entries = [ + '{} -> {}'.format( + keys[j] if keys[j] is not None else 'null', + items[j] if items[j] is not None else 'null') + for j in range(offsets[i], offsets[i + 1]) + ] + out.append('{' + ', '.join(entries) + '}') + else: + raise ValueError( + 'Unsupported constructed type for string rendering: {}'.format(file_type)) + return pa.array(out, type=pa.string()) + + class DataFileBatchReader(RecordBatchReader): """ Reads record batch from files of different formats @@ -57,55 +135,107 @@ def __init__(self, format_reader: RecordBatchReader, index_mapping: List[int], p self.file_io = file_io # Per-file field-id normalization: map the physically-read columns # (the file's own field order/names) onto the latest read target by - # field id, padding missing ids with NULL. ``None`` when there is no - # evolution to reconcile (identity) -- the common path stays zero-copy. - self._normalize_positions, self._normalize_names = \ - self._build_normalize_plan(file_data_fields, target_data_fields) + # field id, padding missing ids with NULL and recursing into nested + # ROW / ARRAY / MAP<.,ROW> sub-fields the same way. ``None`` when + # there is no evolution to reconcile -- the common path stays zero-copy. + self._normalize_plan = self._build_normalize_plan(file_data_fields, target_data_fields) @staticmethod def _build_normalize_plan(file_data_fields, target_data_fields): """Build a per-file field-id alignment plan. - Returns ``(positions, names)`` where ``positions[i]`` is the column - index in the physically-read batch carrying ``target_data_fields[i]`` - (matched by field id), or -1 if the file does not contain that id (pad - NULL). ``names[i]`` is the latest target name. Returns ``(None, None)`` - when the plan is the identity (no evolution), so the caller skips - normalization and stays zero-copy. + Returns a list of ``(pos, file_field, target_field)`` -- one per target + field, in target order -- where ``pos`` is the column index in the + physically-read batch carrying ``target_field`` (matched by field id), + or -1 if the file does not contain that id (pad NULL). Returns ``None`` + when the file already matches the target exactly (no evolution), so the + caller stays zero-copy. """ if file_data_fields is None or target_data_fields is None: - return None, None + return None + # Recursive equality covers nested sub-field changes too: any rename / + # add / drop / type change at any depth makes the file != target. + if file_data_fields == target_data_fields: + return None file_id_to_pos = {f.id: i for i, f in enumerate(file_data_fields)} - positions = [] - names = [] - # Identity only when every target maps to the same physical position - # AND already carries the same name -- a rename keeps the position but - # changes the name, which still requires a relabel pass. - identity = len(file_data_fields) == len(target_data_fields) - for i, target in enumerate(target_data_fields): + plan = [] + for target in target_data_fields: pos = file_id_to_pos.get(target.id, -1) - positions.append(pos) - names.append(target.name) - if pos != i or (pos >= 0 and file_data_fields[pos].name != target.name): - identity = False - if identity: - return None, None - return positions, names + file_field = file_data_fields[pos] if pos >= 0 else None + plan.append((pos, file_field, target)) + return plan def _normalize_batch(self, record_batch: RecordBatch) -> RecordBatch: """Reorder/pad the physically-read batch onto the latest read target by - field id, and relabel columns to the latest names. Missing ids become - all-NULL columns; types are reconciled later by _align_batch_to_read_schema.""" - if self._normalize_positions is None: + field id, relabel columns to the latest names, and align nested ROW + sub-fields by id. Missing ids become typed all-NULL columns.""" + if self._normalize_plan is None: return record_batch num_rows = record_batch.num_rows arrays = [] - for pos in self._normalize_positions: + names = [] + for pos, file_field, target_field in self._normalize_plan: + target_pa_type = PyarrowFieldParser.from_paimon_type(target_field.type) if pos < 0: - arrays.append(pa.nulls(num_rows)) + arrays.append(pa.nulls(num_rows, type=target_pa_type)) else: - arrays.append(record_batch.column(pos)) - return pa.RecordBatch.from_arrays(arrays, names=self._normalize_names) + arrays.append(self._align_array_by_id( + record_batch.column(pos), file_field.type, target_field.type)) + names.append(target_field.name) + return pa.RecordBatch.from_arrays(arrays, names=names) + + def _align_array_by_id(self, array, file_type, target_type): + """Return *array* converted to *target_type*, matching ROW sub-fields by + field id (reorder, pad missing with NULL, follow renames, cast changed + types) recursively, transparently through ARRAY/MAP wrappers.""" + if isinstance(target_type, RowType) and isinstance(file_type, RowType): + n = len(array) + file_id_to_pos = {f.id: i for i, f in enumerate(file_type.fields)} + children = [] + pa_fields = [] + for tsub in target_type.fields: + p = file_id_to_pos.get(tsub.id, -1) + if p < 0: + child = pa.nulls(n, type=PyarrowFieldParser.from_paimon_type(tsub.type)) + else: + child = self._align_array_by_id( + array.field(p), file_type.fields[p].type, tsub.type) + children.append(child) + pa_fields.append(pa.field(tsub.name, child.type, nullable=tsub.type.nullable)) + # Preserve the struct's own null mask; child values under a null + # struct are irrelevant. + return pa.StructArray.from_arrays( + children, fields=pa_fields, mask=pc.is_null(array)) + if isinstance(target_type, ArrayType) and isinstance(file_type, ArrayType): + array = _unslice(array) + aligned_values = self._align_array_by_id( + array.values, file_type.element, target_type.element) + return pa.ListArray.from_arrays( + array.offsets, aligned_values, mask=pc.is_null(array)) + if isinstance(target_type, MapType) and isinstance(file_type, MapType): + array = _unslice(array) + aligned_items = self._align_array_by_id( + array.items, file_type.value, target_type.value) + # MapArray.from_arrays cannot carry a null mask (a null map would + # collapse to an empty one), so rebuild from buffers, reusing the + # original validity/offset buffers and only swapping the value child. + target_pa = PyarrowFieldParser.from_paimon_type(target_type) + entries = pa.StructArray.from_arrays( + [array.keys, aligned_items], + fields=[target_pa.key_field, target_pa.item_field]) + return pa.Array.from_buffers( + target_pa, len(array), array.buffers()[:2], children=[entries]) + # A constructed type changed to a character string: pyarrow cannot + # cast struct/list/map to utf8 directly, so render the engine's + # string form instead. + if (isinstance(file_type, (RowType, ArrayType, MapType)) + and _is_character_string_type(target_type)): + return _constructed_to_string_array(array, file_type) + # Leaf / non-nested: cast to the target type when it differs. + target_pa_type = PyarrowFieldParser.from_paimon_type(target_type) + if array.type != target_pa_type: + return array.cast(target_pa_type, safe=False) + return array def read_arrow_batch(self, start_idx=None, end_idx=None) -> Optional[RecordBatch]: if isinstance(self.format_reader, FormatBlobReader): diff --git a/paimon-python/pypaimon/read/reader/nested_leaf_batch_reader.py b/paimon-python/pypaimon/read/reader/nested_leaf_batch_reader.py new file mode 100644 index 000000000000..8f042a9229e3 --- /dev/null +++ b/paimon-python/pypaimon/read/reader/nested_leaf_batch_reader.py @@ -0,0 +1,64 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from typing import List, Optional + +import pyarrow as pa +import pyarrow.compute as pc +from pyarrow import RecordBatch + +from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader +from pypaimon.schema.data_types import DataField, PyarrowFieldParser + + +class NestedLeafBatchReader(RecordBatchReader): + """Extract projected nested leaves from batches of full top-level columns. + + The inner reader yields batches carrying the widened top-level columns, + already normalized to the latest schema by field id (renames followed, + missing sub-fields padded NULL, types cast). Each requested name path is + walked through the struct children (a NULL parent propagates to the + leaf), producing the user's flat projected schema. + """ + + def __init__(self, inner: RecordBatchReader, name_paths: List[List[str]], + output_fields: List[DataField]): + if len(name_paths) != len(output_fields): + raise ValueError( + "name_paths length {} does not match output_fields length {}".format( + len(name_paths), len(output_fields))) + self._inner = inner + self._paths = name_paths + self._schema = PyarrowFieldParser.from_paimon_schema(output_fields) + + def read_arrow_batch(self) -> Optional[RecordBatch]: + batch = self._inner.read_arrow_batch() + if batch is None: + return None + arrays = [] + for i, path in enumerate(self._paths): + column = batch.column(path[0]) + for name in path[1:]: + column = pc.struct_field(column, name) + target_type = self._schema.field(i).type + if column.type != target_type: + column = column.cast(target_type, safe=False) + arrays.append(column) + return pa.RecordBatch.from_arrays(arrays, schema=self._schema) + + def close(self) -> None: + self._inner.close() diff --git a/paimon-python/pypaimon/read/split_read.py b/paimon-python/pypaimon/read/split_read.py index 543c85893559..1278ea0a2acf 100644 --- a/paimon-python/pypaimon/read/split_read.py +++ b/paimon-python/pypaimon/read/split_read.py @@ -627,6 +627,33 @@ def _genarate_deletion_file_readers(self): class RawFileSplitRead(SplitRead): + def __init__( + self, + table, + predicate: Optional[Predicate], + read_type: List[DataField], + split: Split, + row_tracking_enabled: bool, + outer_extract_name_paths: Optional[List[List[str]]] = None, + outer_flat_read_type: Optional[List[DataField]] = None, + limit: Optional[int] = None): + # Nested-leaf projection is NOT pushed down by name: a leaf path is + # only valid against the latest schema, while each data file stores + # its own (possibly renamed / retyped) sub-fields. Instead the read + # widens to the full top-level columns, which the per-file field-id + # normalization aligns to the latest schema, and the requested leaf + # paths are extracted afterwards (``outer_extract_name_paths``). + super().__init__( + table=table, + predicate=predicate, + read_type=read_type, + split=split, + row_tracking_enabled=row_tracking_enabled, + nested_name_paths=None, + limit=limit) + self.outer_extract_name_paths = outer_extract_name_paths + self.outer_flat_read_type = outer_flat_read_type + def raw_reader_supplier(self, file: DataFileMeta, dv_factory: Optional[Callable] = None) -> Optional[RecordReader]: read_fields = self._get_final_read_data_fields() # Check if this is a SlicedSplit to get shard_file_idx_map @@ -676,10 +703,27 @@ def create_reader(self) -> RecordReader: # if the table is appendonly table, we don't need extra filter, all predicates has pushed down if self.table.is_primary_key_table and self.predicate_for_reader: reader = FilterRecordReader(concat_reader, self.predicate_for_reader) + if self.outer_extract_name_paths: + # Row-level extraction: the filter evaluates rows in the + # widened top-level coordinate space, so extract after it. + from pypaimon.read.reader.outer_projection_record_reader import \ + OuterProjectionRecordReader + reader = OuterProjectionRecordReader( + reader, [f.name for f in self.read_fields], + self.outer_extract_name_paths, + file_io=self.table.file_io, + blob_field_indices=_blob_field_indices(self.read_fields), + vector_field_indices=_vector_field_indices(self.read_fields)) if self.limit is not None: reader = LimitedRecordReader(reader, self.limit) else: reader = concat_reader + if self.outer_extract_name_paths: + from pypaimon.read.reader.nested_leaf_batch_reader import \ + NestedLeafBatchReader + reader = NestedLeafBatchReader( + reader, self.outer_extract_name_paths, + self.outer_flat_read_type) if self.limit is not None: reader = LimitedRecordBatchReader(reader, self.limit) return reader diff --git a/paimon-python/pypaimon/read/table_read.py b/paimon-python/pypaimon/read/table_read.py index 57e704b2b550..3f16c70573b8 100644 --- a/paimon-python/pypaimon/read/table_read.py +++ b/paimon-python/pypaimon/read/table_read.py @@ -603,13 +603,26 @@ def _create_split_read(self, split: Split) -> SplitRead: limit=self.limit, ) else: + inner_read_type = self.read_type + outer_extract_name_paths: Optional[List[List[str]]] = None + if self.nested_name_paths and any( + len(p) > 1 for p in self.nested_name_paths): + # Mirror the merge path: read the full top-level columns so + # the per-file field-id normalization applies (a leaf path is + # only valid against the latest schema, not each file's own + # names/types), then extract the requested sub-paths back to + # the user's flat schema. + inner_read_type = self._widen_to_top_level_for_merge() + outer_extract_name_paths = self.nested_name_paths return RawFileSplitRead( table=self.table, predicate=self.predicate, - read_type=self.read_type, + read_type=inner_read_type, split=split, row_tracking_enabled=self.table.options.row_tracking_enabled(), - nested_name_paths=self.nested_name_paths, + outer_extract_name_paths=outer_extract_name_paths, + outer_flat_read_type=( + self.read_type if outer_extract_name_paths else None), limit=self.limit, ) diff --git a/paimon-python/pypaimon/schema/data_types.py b/paimon-python/pypaimon/schema/data_types.py index 9530996a9d6d..da128d96da37 100755 --- a/paimon-python/pypaimon/schema/data_types.py +++ b/paimon-python/pypaimon/schema/data_types.py @@ -25,6 +25,15 @@ import pyarrow from pyarrow import types +# Field ids at or above this value are reserved for system fields (sequence +# number, value kind, row id, ...). User field ids stay strictly below it, so +# the highest-user-field-id computation can ignore anything from here up. +SYSTEM_FIELD_ID_START = 2147483647 // 2 + + +def is_system_field_id(field_id: int) -> bool: + return field_id >= SYSTEM_FIELD_ID_START + class AtomicInteger: @@ -370,6 +379,73 @@ def __str__(self) -> str: return "ROW<{}>{}".format(', '.join(field_strs), null_suffix) +def reassign_field_id(data_type: DataType, field_id: "AtomicInteger") -> DataType: + """Return a copy of *data_type* with every nested field id reassigned from + *field_id*, depth-first with children allocated before their parent field. + + Mirrors the canonical id-reassignment used when a column (possibly carrying + a nested ROW/ARRAY/MAP) is added, so nested subfields get globally-unique + ids drawn from the schema's running counter rather than struct-local ones. + """ + if isinstance(data_type, RowType): + new_fields = [] + for field in data_type.fields: + # Visit the nested type first, then allocate this field's id, so the + # ordering matches the rest of the engine ecosystem. + new_type = reassign_field_id(field.type, field_id) + new_id = field_id.increment_and_get() + new_fields.append(DataField( + new_id, field.name, new_type, field.description, field.default_value)) + return RowType(data_type.nullable, new_fields) + if isinstance(data_type, ArrayType): + return ArrayType(data_type.nullable, reassign_field_id(data_type.element, field_id)) + if isinstance(data_type, VectorType): + return VectorType( + data_type.nullable, reassign_field_id(data_type.element, field_id), data_type.length) + if isinstance(data_type, MultisetType): + return MultisetType(data_type.nullable, reassign_field_id(data_type.element, field_id)) + if isinstance(data_type, MapType): + new_key = reassign_field_id(data_type.key, field_id) + new_value = reassign_field_id(data_type.value, field_id) + return MapType(data_type.nullable, new_key, new_value) + return data_type + + +def collect_field_ids(data_type: DataType, field_ids: set): + """Collect all (nested) field ids reachable from *data_type* into *field_ids*, + raising on a duplicate id (a broken schema).""" + if isinstance(data_type, RowType): + for field in data_type.fields: + if field.id in field_ids: + raise ValueError( + "Broken schema, field id {} is duplicated.".format(field.id)) + field_ids.add(field.id) + collect_field_ids(field.type, field_ids) + elif isinstance(data_type, (ArrayType, VectorType, MultisetType)): + collect_field_ids(data_type.element, field_ids) + elif isinstance(data_type, MapType): + collect_field_ids(data_type.key, field_ids) + collect_field_ids(data_type.value, field_ids) + + +def current_highest_field_id(fields: List[DataField]) -> int: + """Highest user field id across *fields*, recursing into nested ROW/ARRAY/MAP. + + System field ids are excluded. Returns -1 for an empty/system-only schema. + The result is persisted as ``highestFieldId``; later schema changes seed + their id counter from the stored value (not from the live fields, since a + dropped field may have carried a higher id than any survivor). + """ + field_ids = set() + for field in fields: + if field.id in field_ids: + raise ValueError("Broken schema, field id {} is duplicated.".format(field.id)) + field_ids.add(field.id) + collect_field_ids(field.type, field_ids) + user_ids = [fid for fid in field_ids if not is_system_field_id(fid)] + return max(user_ids) if user_ids else -1 + + class Keyword(Enum): CHAR = "CHAR" VARCHAR = "VARCHAR" @@ -717,12 +793,21 @@ def to_paimon_field(field_idx: int, pa_field: pyarrow.Field) -> DataField: @staticmethod def to_paimon_schema(pa_schema: pyarrow.Schema) -> List[DataField]: - # Convert PyArrow schema to Paimon fields + # Convert PyArrow schema to Paimon fields, assigning globally-unique ids: + # each top-level field takes the next id, then its (possibly nested) type + # has its subfield ids reassigned from the same running counter. A flat + # schema keeps the plain 0,1,2,... ids; nested subfields get ids that do + # not collide with top-level ones. + field_id = AtomicInteger(-1) fields = [] - for i, pa_field in enumerate(pa_schema): + for pa_field in pa_schema: pa_field: pyarrow.Field - data_field = PyarrowFieldParser.to_paimon_field(i, pa_field) - fields.append(data_field) + top_id = field_id.increment_and_get() + data_type = PyarrowFieldParser.to_paimon_type(pa_field.type, pa_field.nullable) + data_type = reassign_field_id(data_type, field_id) + description = pa_field.metadata.get(b'description', b'').decode('utf-8') \ + if pa_field.metadata and b'description' in pa_field.metadata else None + fields.append(DataField(top_id, pa_field.name, data_type, description)) return fields @staticmethod diff --git a/paimon-python/pypaimon/schema/schema_manager.py b/paimon-python/pypaimon/schema/schema_manager.py index d01549c71bc5..bab26650aa14 100644 --- a/paimon-python/pypaimon/schema/schema_manager.py +++ b/paimon-python/pypaimon/schema/schema_manager.py @@ -26,7 +26,9 @@ from pypaimon.schema.column_directive_utils import ( apply_add_column_directive, apply_directives, remove_dropped_directive_options) -from pypaimon.schema.data_types import AtomicInteger, DataField +from pypaimon.casting.data_type_casts import supports_cast +from pypaimon.schema.data_types import (ArrayType, AtomicInteger, DataField, + MapType, RowType, reassign_field_id) from pypaimon.schema.schema import Schema from pypaimon.schema.schema_change import (AddColumn, DropColumn, RemoveOption, RenameColumn, SchemaChange, @@ -44,6 +46,123 @@ def _find_field_index(fields: List[DataField], field_name: str) -> Optional[int] return None +def _extract_row_data_fields(data_type, out_fields: List[DataField], + field_names: List[str], token_pos: int) -> int: + """Collect the immediate sub-fields reachable from *data_type* into + *out_fields* and return the path depth consumed. A ROW contributes its + fields (depth 1); an ARRAY/MAP is transparent and descends into its + element/value, consuming the ``element``/``value`` path token -- the + consumed token is validated so an unknown step cannot silently mutate + the schema; anything else contributes nothing (depth 1).""" + if isinstance(data_type, RowType): + out_fields.extend(data_type.fields) + return 1 + if isinstance(data_type, ArrayType): + _assert_wrapper_token(field_names, token_pos, 'element') + return _extract_row_data_fields( + data_type.element, out_fields, field_names, token_pos + 1) + 1 + if isinstance(data_type, MapType): + _assert_wrapper_token(field_names, token_pos, 'value') + return _extract_row_data_fields( + data_type.value, out_fields, field_names, token_pos + 1) + 1 + return 1 + + +def _assert_wrapper_token(field_names: List[str], token_pos: int, expected: str): + # A path that ends inside the wrappers (token_pos out of range) is the + # update-the-wrapped-type-itself case, handled by the caller's overflow + # branch; only a present-but-wrong token is rejected. + if token_pos < len(field_names) and field_names[token_pos] != expected: + raise ColumnNotExistException('.'.join(field_names)) + + +def _wrap_new_row_type(data_type, nested_fields: List[DataField]): + """Rebuild *data_type* substituting *nested_fields* at its innermost ROW, + preserving any ARRAY/MAP wrappers.""" + if isinstance(data_type, RowType): + return RowType(data_type.nullable, nested_fields) + if isinstance(data_type, ArrayType): + return ArrayType(data_type.nullable, _wrap_new_row_type(data_type.element, nested_fields)) + if isinstance(data_type, MapType): + return MapType( + data_type.nullable, data_type.key, + _wrap_new_row_type(data_type.value, nested_fields)) + return data_type + + +def _get_root_type(data_type, curr_depth: int, max_depth: int): + """Return the type sitting at ``max_depth`` when walking ARRAY/MAP wrappers + from *data_type* (e.g. the INT in ARRAY>>).""" + if curr_depth == max_depth - 1: + return data_type + if isinstance(data_type, ArrayType): + return _get_root_type(data_type.element, curr_depth + 1, max_depth) + if isinstance(data_type, MapType): + return _get_root_type(data_type.value, curr_depth + 1, max_depth) + return data_type + + +def _get_array_map_type_with_target_type_root(source, target, curr_depth: int, max_depth: int): + """Rebuild *source* with *target* substituted at ``max_depth``, keeping the + ARRAY/MAP wrappers around it intact.""" + if curr_depth == max_depth - 1: + return target + if isinstance(source, ArrayType): + return ArrayType( + source.nullable, + _get_array_map_type_with_target_type_root( + source.element, target, curr_depth + 1, max_depth)) + if isinstance(source, MapType): + return MapType( + source.nullable, source.key, + _get_array_map_type_with_target_type_root( + source.value, target, curr_depth + 1, max_depth)) + return target + + +def _update_intermediate_column(new_fields, previous_fields, depth, prev_depth, + field_names, update_last_fn): + """Walk *field_names* into nested ROW (transparently through ARRAY/MAP), + then run *update_last_fn(depth, fields, name)* on the field list that + directly contains the final path element, rebuilding parent types upward.""" + if depth == len(field_names) - 1: + update_last_fn(depth, new_fields, field_names[depth]) + return + if depth >= len(field_names): + # Path descended through ARRAY/MAP past the last ROW; operate on the + # field that owns the wrapper at the previous depth. + update_last_fn(prev_depth, previous_fields, field_names[prev_depth]) + return + for i, field in enumerate(new_fields): + if field.name != field_names[depth]: + continue + nested_fields: List[DataField] = [] + new_depth = depth + _extract_row_data_fields( + field.type, nested_fields, field_names, depth + 1) + _update_intermediate_column( + nested_fields, new_fields, new_depth, depth, field_names, update_last_fn) + field = new_fields[i] + new_fields[i] = DataField( + field.id, field.name, + _wrap_new_row_type(field.type, nested_fields), + field.description, field.default_value) + return + raise ColumnNotExistException('.'.join(field_names[:depth + 1])) + + +def _modify_nested_column(new_fields, field_names, update_last_fn): + _update_intermediate_column(new_fields, new_fields, 0, 0, field_names, update_last_fn) + + +def _update_nested_column(new_fields, field_names, update_func): + def update_last(depth, fields, field_name): + idx = _find_field_index(fields, field_name) + if idx is None: + raise ColumnNotExistException('.'.join(field_names)) + fields[idx] = update_func(fields[idx], depth) + _modify_nested_column(new_fields, field_names, update_last) + + def _get_rename_mappings(changes: List[SchemaChange]) -> dict: rename_mappings = {} for change in changes: @@ -55,49 +174,70 @@ def _get_rename_mappings(changes: List[SchemaChange]) -> dict: def _handle_update_column_comment( change: UpdateColumnComment, new_fields: List[DataField] ): - field_name = change.field_names[-1] - field_index = _find_field_index(new_fields, field_name) - if field_index is None: - raise ColumnNotExistException(field_name) - field = new_fields[field_index] - new_fields[field_index] = DataField( - field.id, field.name, field.type, change.new_comment, field.default_value - ) + def update_func(field: DataField, depth: int) -> DataField: + return DataField( + field.id, field.name, field.type, change.new_comment, field.default_value + ) + _update_nested_column(new_fields, change.field_names, update_func) + + +def _assert_nullability_change(old_nullability: bool, new_nullability: bool, + field_name: str, disable_null_to_not_null: bool): + if disable_null_to_not_null and old_nullability and not new_nullability: + raise ValueError( + "Cannot update column type from nullable to non nullable for {}. " + "You can set table configuration option " + "'alter-column-null-to-not-null.disabled' = 'false' " + "to allow converting null columns to not null".format(field_name) + ) def _handle_update_column_nullability( - change: UpdateColumnNullability, new_fields: List[DataField] + change: UpdateColumnNullability, new_fields: List[DataField], + disable_null_to_not_null: bool ): - field_name = change.field_names[-1] - field_index = _find_field_index(new_fields, field_name) - if field_index is None: - raise ColumnNotExistException(field_name) - field = new_fields[field_index] from pypaimon.schema.data_types import DataTypeParser - field_type_dict = field.type.to_dict() - new_type = DataTypeParser.parse_data_type(field_type_dict) - new_type.nullable = change.new_nullability - new_fields[field_index] = DataField( - field.id, field.name, new_type, field.description, field.default_value - ) + field_names = change.field_names + max_depth = len(field_names) + + def update_func(field: DataField, depth: int) -> DataField: + source_root = _get_root_type(field.type, depth, max_depth) + _assert_nullability_change( + source_root.nullable, change.new_nullability, + '.'.join(field_names), disable_null_to_not_null) + new_root = DataTypeParser.parse_data_type(source_root.to_dict()) + new_root.nullable = change.new_nullability + new_type = _get_array_map_type_with_target_type_root( + field.type, new_root, depth, max_depth) + return DataField( + field.id, field.name, new_type, field.description, field.default_value + ) + _update_nested_column(new_fields, field_names, update_func) def _handle_update_column_type( change: UpdateColumnType, new_fields: List[DataField] ): - field_name = change.field_names[-1] - field_index = _find_field_index(new_fields, field_name) - if field_index is None: - raise ColumnNotExistException(field_name) - field = new_fields[field_index] from pypaimon.schema.data_types import DataTypeParser - new_type_dict = change.new_data_type.to_dict() - new_type = DataTypeParser.parse_data_type(new_type_dict) - if change.keep_nullability: - new_type.nullable = field.type.nullable - new_fields[field_index] = DataField( - field.id, field.name, new_type, field.description, field.default_value - ) + field_names = change.field_names + max_depth = len(field_names) + + def update_func(field: DataField, depth: int) -> DataField: + source_root = _get_root_type(field.type, depth, max_depth) + target_root = DataTypeParser.parse_data_type(change.new_data_type.to_dict()) + if change.keep_nullability: + target_root.nullable = source_root.nullable + if not supports_cast(source_root, target_root): + raise ValueError( + "Column type {}[{}] cannot be converted to {} without losing information." + .format(field.name, source_root, target_root) + ) + new_type = _get_array_map_type_with_target_type_root( + field.type, target_root, depth, max_depth) + return DataField( + field.id, field.name, new_type, field.description, field.default_value + ) + _update_nested_column(new_fields, field_names, update_func) def _drop_column_validation(schema: 'TableSchema', change: DropColumn): @@ -112,17 +252,20 @@ def _drop_column_validation(schema: 'TableSchema', change: DropColumn): def _handle_drop_column(change: DropColumn, new_fields: List[DataField], new_options: dict): - field_name = change.field_names[-1] - field_index = _find_field_index(new_fields, field_name) - if field_index is None: - raise ColumnNotExistException(field_name) - if len(change.field_names) == 1: - field = new_fields[field_index] - type_root = _get_type_root(field.type) - remove_dropped_directive_options(field_name, type_root, new_options) - new_fields.pop(field_index) - if not new_fields: - raise ValueError("Cannot drop all fields in table") + field_names = change.field_names + + def update_last(depth, fields, field_name): + field_index = _find_field_index(fields, field_name) + if field_index is None: + raise ColumnNotExistException(field_name) + if len(field_names) == 1: + field = fields[field_index] + type_root = _get_type_root(field.type) + remove_dropped_directive_options(field_name, type_root, new_options) + fields.pop(field_index) + if not fields: + raise ValueError("Cannot drop all fields in table") + _modify_nested_column(new_fields, field_names, update_last) def _get_type_root(data_type) -> str: @@ -274,17 +417,19 @@ def _validate_blob_external_storage_fields(fields: List[DataField], options: dic def _handle_rename_column(change: RenameColumn, new_fields: List[DataField]): - field_name = change.field_names[-1] new_name = change.new_name - field_index = _find_field_index(new_fields, field_name) - if field_index is None: - raise ColumnNotExistException(field_name) - if _find_field_index(new_fields, new_name) is not None: - raise ColumnAlreadyExistException(new_name) - field = new_fields[field_index] - new_fields[field_index] = DataField( - field.id, new_name, field.type, field.description, field.default_value - ) + + def update_last(depth, fields, field_name): + field_index = _find_field_index(fields, field_name) + if field_index is None: + raise ColumnNotExistException(field_name) + if _find_field_index(fields, new_name) is not None: + raise ColumnAlreadyExistException(new_name) + field = fields[field_index] + fields[field_index] = DataField( + field.id, new_name, field.type, field.description, field.default_value + ) + _modify_nested_column(new_fields, change.field_names, update_last) def _apply_move(fields: List[DataField], new_field: Optional[DataField], move): @@ -332,11 +477,11 @@ def _handle_add_column( f"Column {'.'.join(change.field_names)} cannot specify NOT NULL in the table." ) field_id = highest_field_id.increment_and_get() + # Reassign ids of any nested sub-fields the new column carries (a ROW/ARRAY/ + # MAP type) so they draw globally-unique ids from the running counter. + data_type = reassign_field_id(change.data_type, highest_field_id) field_name = change.field_names[-1] - if _find_field_index(new_fields, field_name) is not None: - raise ColumnAlreadyExistException(field_name) - data_type = change.data_type comment = change.comment converted = apply_add_column_directive(comment, field_name, data_type, new_options) if converted is not None: @@ -349,21 +494,26 @@ def _handle_add_column( comment = converted.comment new_field = DataField(field_id, field_name, data_type, comment) - if change.move: - _apply_move(new_fields, new_field, change.move) - elif ( - add_column_before_partition - and partition_keys - and len(change.field_names) == 1 - ): - insert_index = len(new_fields) - for i, field in enumerate(new_fields): - if field.name in partition_keys: - insert_index = i - break - new_fields.insert(insert_index, new_field) - else: - new_fields.append(new_field) + + def update_last(depth, fields, fname): + if _find_field_index(fields, fname) is not None: + raise ColumnAlreadyExistException(fname) + if change.move: + _apply_move(fields, new_field, change.move) + elif ( + add_column_before_partition + and partition_keys + and len(change.field_names) == 1 + ): + insert_index = len(fields) + for i, field in enumerate(fields): + if field.name in partition_keys: + insert_index = i + break + fields.insert(insert_index, new_field) + else: + fields.append(new_field) + _modify_nested_column(new_fields, change.field_names, update_last) class SchemaManager: @@ -517,6 +667,10 @@ def _generate_table_schema( # Get add_column_before_partition option add_column_before_partition = CoreOptions(Options(old_table_schema.options)).add_column_before_partition() partition_keys = old_table_schema.partition_keys + # Converting a nullable column to NOT NULL is unsafe for existing + # data and is disabled by default; the table option below opts in. + disable_null_to_not_null = str(old_table_schema.options.get( + 'alter-column-null-to-not-null.disabled', 'true')).lower() != 'false' for change in changes: if isinstance(change, SetOption): @@ -553,7 +707,8 @@ def _generate_table_schema( _assert_not_updating_primary_keys( old_table_schema, change.field_names, "change nullability of" ) - _handle_update_column_nullability(change, new_fields) + _handle_update_column_nullability( + change, new_fields, disable_null_to_not_null) elif isinstance(change, UpdateColumnComment): _handle_update_column_comment(change, new_fields) elif isinstance(change, UpdateColumnPosition): diff --git a/paimon-python/pypaimon/schema/table_schema.py b/paimon-python/pypaimon/schema/table_schema.py index 52324ae7f607..d8586b28d139 100644 --- a/paimon-python/pypaimon/schema/table_schema.py +++ b/paimon-python/pypaimon/schema/table_schema.py @@ -23,7 +23,7 @@ from pypaimon.common.options.core_options import CoreOptions from pypaimon.common.file_io import FileIO from pypaimon.common.json_util import json_field -from pypaimon.schema.data_types import DataField +from pypaimon.schema.data_types import DataField, current_highest_field_id from pypaimon.schema.schema import Schema @@ -115,7 +115,7 @@ def from_schema(schema_id: int, schema: Schema) -> "TableSchema": partition_keys: List[str] = schema.partition_keys primary_keys: List[str] = schema.primary_keys options: Dict[str, str] = schema.options - highest_field_id: int = max((field.id for field in fields), default=0) + highest_field_id: int = current_highest_field_id(fields) return TableSchema( TableSchema.CURRENT_VERSION, diff --git a/paimon-python/pypaimon/table/special_fields.py b/paimon-python/pypaimon/table/special_fields.py index 64d2429bef7d..94dbfacc9ad0 100644 --- a/paimon-python/pypaimon/table/special_fields.py +++ b/paimon-python/pypaimon/table/special_fields.py @@ -17,7 +17,7 @@ from typing import List -from ..schema.data_types import AtomicType, DataField +from ..schema.data_types import AtomicType, DataField, SYSTEM_FIELD_ID_START class SpecialFields: @@ -35,11 +35,18 @@ class SpecialFields: '_ROW_ID' } + SYSTEM_FIELD_ID_START = SYSTEM_FIELD_ID_START + @staticmethod def is_system_field(field_name: str) -> bool: """Check if a field is a system field.""" return field_name in SpecialFields.SYSTEM_FIELD_NAMES + @staticmethod + def is_system_field_id(field_id: int) -> bool: + """Check if a field id is reserved for system fields.""" + return field_id >= SYSTEM_FIELD_ID_START + @staticmethod def find_system_fields(read_fields: List[DataField]) -> dict: """Find system fields in read fields and return a mapping of field name to index.""" diff --git a/paimon-python/pypaimon/tests/schema_evolution_nested_read_test.py b/paimon-python/pypaimon/tests/schema_evolution_nested_read_test.py index c9147b823fc1..c963f6a1578a 100644 --- a/paimon-python/pypaimon/tests/schema_evolution_nested_read_test.py +++ b/paimon-python/pypaimon/tests/schema_evolution_nested_read_test.py @@ -22,11 +22,10 @@ * Whole-column evolution of a top-level struct/array/map column (add / drop / rename / projection) -- aligned by the column's field id. * Sub-field evolution INSIDE a struct (add/rename/update-type/drop a nested - field via a dotted ``field_names`` path) -- this is NOT implemented: - ``schema_manager`` only operates on the top-level ``field_names[-1]``. - ``SchemaEvolutionNestedGapTest`` locks in the current behaviour with - explicit assertions so the gap is documented and any future fix is - noticed. + field via a dotted ``field_names`` path), including sub-fields of a ROW + nested in an ARRAY/MAP. Sub-fields are aligned by field id, so a rename + follows the data, an added sub-field reads NULL for old rows, a dropped one + is not revived, and a type change is cast at read time. """ import os @@ -37,7 +36,13 @@ import pyarrow as pa from pypaimon import CatalogFactory, Schema -from pypaimon.schema.data_types import AtomicType, PyarrowFieldParser +from pypaimon.casting.data_type_casts import supports_cast +from pypaimon.schema.data_types import (ArrayType, AtomicInteger, AtomicType, + DataField, MapType, MultisetType, + PyarrowFieldParser, RowType, + VectorType, collect_field_ids, + current_highest_field_id, + reassign_field_id) from pypaimon.schema.schema_change import SchemaChange @@ -240,18 +245,12 @@ def test_rename_struct_column(self): {'id': 2, 'mv2': {'latest_version': 200, 'latest_value': 'b'}, 'val': 'y'}]) -class SchemaEvolutionNestedGapTest(_NestedBase): - """Sub-field-level evolution inside a struct is NOT implemented. +class SchemaEvolutionNestedSubfieldTest(_NestedBase): + """Sub-field evolution inside a struct, aligned by field id.""" - schema_manager handles only the top-level ``field_names[-1]``; a dotted - path like ``['mv', 'latest_value']`` never recurses into the RowType. - These tests assert the current behaviour (silent top-level mutation, or - ColumnNotExistException) so the gap is documented. - """ - - def _create_struct_table(self, name): + def _create_struct_table(self, name, primary_keys=None, bucket='-1'): s0 = pa.schema([('id', pa.int64()), ('mv', _MV_PA), ('val', pa.string())]) - table = self._create(name, s0) + table = self._create(name, s0, primary_keys=primary_keys, bucket=bucket) self._write(table, pa.Table.from_pylist([ {'id': 1, 'mv': {'latest_version': 100, 'latest_value': 'a'}, 'val': 'x'}, ], schema=s0)) @@ -268,49 +267,497 @@ def _top_level_names(self, table_name): 'default.{}'.format(table_name)).table_schema return [f.name for f in schema.fields] - # -- C7: add nested sub-field -> silently adds a TOP-LEVEL column ---- + def test_add_subfield_goes_inside_struct_and_pads_null(self): + table = self._create_struct_table('nsub_add') + self.catalog.alter_table( + 'default.nsub_add', + [SchemaChange.add_column(['mv', 'score'], AtomicType('INT'))], False) + # The sub-field lands inside mv, not as a stray top-level column. + self.assertEqual(self._mv_subfield_names('nsub_add'), + ['latest_version', 'latest_value', 'score']) + self.assertNotIn('score', self._top_level_names('nsub_add')) + + table = self.catalog.get_table('default.nsub_add') + s1 = pa.schema([ + ('id', pa.int64()), + ('mv', pa.struct([('latest_version', pa.int64()), + ('latest_value', pa.string()), + ('score', pa.int32())])), + ('val', pa.string())]) + self._write(table, pa.Table.from_pylist([ + {'id': 2, 'mv': {'latest_version': 200, 'latest_value': 'b', 'score': 7}, + 'val': 'y'}], schema=s1)) + rows = self._read_sorted(table) + # Old row reads NULL for the added sub-field; new row carries it. + self.assertEqual(rows[0]['mv'], + {'latest_version': 100, 'latest_value': 'a', 'score': None}) + self.assertEqual(rows[1]['mv'], + {'latest_version': 200, 'latest_value': 'b', 'score': 7}) + + def test_rename_subfield_follows_field_id(self): + table = self._create_struct_table('nsub_rename') + self.catalog.alter_table( + 'default.nsub_rename', + [SchemaChange.rename_column(['mv', 'latest_value'], 'lv')], False) + self.assertEqual(self._mv_subfield_names('nsub_rename'), + ['latest_version', 'lv']) + table = self.catalog.get_table('default.nsub_rename') + rows = self._read_sorted(table) + # Old data follows the renamed sub-field by id, not by name. + self.assertEqual(rows[0]['mv'], {'latest_version': 100, 'lv': 'a'}) - def test_nested_add_subfield_mutates_top_level(self): - # GAP: add_column(['mv','new_inner']) does NOT add new_inner inside - # mv; it silently appends a top-level column 'new_inner' instead. - self._create_struct_table('gap_add') + def test_update_subfield_type_casts(self): + s0 = pa.schema([('id', pa.int64()), + ('mv', pa.struct([('v', pa.int32()), ('s', pa.string())]))]) + table = self._create('nsub_type', s0) + self._write(table, pa.Table.from_pylist( + [{'id': 1, 'mv': {'v': 10, 's': 'a'}}], schema=s0)) self.catalog.alter_table( - 'default.gap_add', - [SchemaChange.add_column(['mv', 'new_inner'], AtomicType('INT'))], - False) - # mv's sub-fields are unchanged; a stray top-level column appeared. - self.assertEqual(self._mv_subfield_names('gap_add'), - ['latest_version', 'latest_value']) - self.assertIn('new_inner', self._top_level_names('gap_add')) + 'default.nsub_type', + [SchemaChange.update_column_type(['mv', 'v'], AtomicType('BIGINT'))], False) + table = self.catalog.get_table('default.nsub_type') + rb = table.new_read_builder() + splits = rb.new_scan().plan().splits() + arrow = rb.new_read().to_arrow(splits) + self.assertEqual(arrow.schema.field('mv').type.field('v').type, pa.int64()) + self.assertEqual(arrow.to_pylist()[0]['mv'], {'v': 10, 's': 'a'}) - # -- C8/C9/C10: rename / update-type / drop nested sub-field --------- + def test_drop_subfield_not_revived(self): + table = self._create_struct_table('nsub_drop') + self.catalog.alter_table( + 'default.nsub_drop', + [SchemaChange.drop_column(['mv', 'latest_value'])], False) + self.assertEqual(self._mv_subfield_names('nsub_drop'), ['latest_version']) + table = self.catalog.get_table('default.nsub_drop') + rows = self._read_sorted(table) + # The dropped sub-field's old data is gone, not revived under its id. + self.assertEqual(rows[0]['mv'], {'latest_version': 100}) - def test_nested_rename_subfield_raises(self): - # GAP: field_names[-1]='latest_value' is looked up at the TOP level, - # where it does not exist -> ColumnNotExistException (wrapped). - self._create_struct_table('gap_rename') + def test_drop_all_subfields_rejected(self): + self._create_struct_table('nsub_dropall') + self.catalog.alter_table( + 'default.nsub_dropall', + [SchemaChange.drop_column(['mv', 'latest_value'])], False) + with self.assertRaises(RuntimeError): + self.catalog.alter_table( + 'default.nsub_dropall', + [SchemaChange.drop_column(['mv', 'latest_version'])], False) + + def test_null_to_not_null_disabled_by_default(self): + # Converting nullable -> NOT NULL is unsafe for existing data and is + # rejected unless the table opts in via + # 'alter-column-null-to-not-null.disabled' = 'false'. + self._create_struct_table('nsub_nullability') with self.assertRaises(RuntimeError) as cm: self.catalog.alter_table( - 'default.gap_rename', - [SchemaChange.rename_column(['mv', 'latest_value'], 'lv')], False) - self.assertIn('latest_value', str(cm.exception)) + 'default.nsub_nullability', + [SchemaChange.update_column_nullability( + ['mv', 'latest_value'], False)], False) + self.assertIn('nullable to non nullable', str(cm.exception)) + # Opt-in makes the same change succeed. + self.catalog.alter_table( + 'default.nsub_nullability', + [SchemaChange.set_option( + 'alter-column-null-to-not-null.disabled', 'false')], False) + self.catalog.alter_table( + 'default.nsub_nullability', + [SchemaChange.update_column_nullability( + ['mv', 'latest_value'], False)], False) + schema = self.catalog.get_table('default.nsub_nullability').table_schema + mv = next(f for f in schema.fields if f.name == 'mv') + lv = next(sf for sf in mv.type.fields if sf.name == 'latest_value') + self.assertFalse(lv.type.nullable) - def test_nested_update_subfield_type_raises(self): - self._create_struct_table('gap_update') + def test_unsupported_subfield_cast_rejected(self): + self._create_struct_table('nsub_badcast') with self.assertRaises(RuntimeError) as cm: self.catalog.alter_table( - 'default.gap_update', + 'default.nsub_badcast', [SchemaChange.update_column_type( - ['mv', 'latest_version'], AtomicType('BIGINT'))], False) - self.assertIn('latest_version', str(cm.exception)) + ['mv', 'latest_version'], AtomicType('DATE'))], False) + self.assertIn('cannot be converted', str(cm.exception)) + + def test_nested_projection_after_rename_subfield(self): + # Projecting a renamed leaf must follow the field id into old files, + # not look the new name up in the file's physical schema. + s0 = pa.schema([('id', pa.int64()), + ('mv', pa.struct([('v', pa.int32()), ('s', pa.string())]))]) + table = self._create('nsub_proj_rename', s0) + self._write(table, pa.Table.from_pylist( + [{'id': 1, 'mv': {'v': 10, 's': 'a'}}], schema=s0)) + self.catalog.alter_table( + 'default.nsub_proj_rename', + [SchemaChange.rename_column(['mv', 's'], 'ss')], False) + table = self.catalog.get_table('default.nsub_proj_rename') + s1 = pa.schema([('id', pa.int64()), + ('mv', pa.struct([('v', pa.int32()), ('ss', pa.string())]))]) + self._write(table, pa.Table.from_pylist( + [{'id': 2, 'mv': {'v': 20, 'ss': 'b'}}], schema=s1)) + + rows = self._read_sorted(table, projection=['id', 'mv.ss']) + self.assertEqual(rows, [ + {'id': 1, 'mv_ss': 'a'}, + {'id': 2, 'mv_ss': 'b'}, + ]) + + def test_nested_projection_after_update_subfield_type(self): + # Projecting a type-changed leaf must cast old batches to the latest + # type instead of emitting mixed-type batches. + s0 = pa.schema([('id', pa.int64()), + ('mv', pa.struct([('v', pa.int32()), ('s', pa.string())]))]) + table = self._create('nsub_proj_type', s0) + self._write(table, pa.Table.from_pylist( + [{'id': 1, 'mv': {'v': 10, 's': 'a'}}], schema=s0)) + self.catalog.alter_table( + 'default.nsub_proj_type', + [SchemaChange.update_column_type(['mv', 'v'], AtomicType('BIGINT'))], False) + table = self.catalog.get_table('default.nsub_proj_type') + s1 = pa.schema([('id', pa.int64()), + ('mv', pa.struct([('v', pa.int64()), ('s', pa.string())]))]) + self._write(table, pa.Table.from_pylist( + [{'id': 2, 'mv': {'v': 20, 's': 'b'}}], schema=s1)) + + rb = table.new_read_builder().with_projection(['id', 'mv.v']) + splits = rb.new_scan().plan().splits() + arrow = rb.new_read().to_arrow(splits) + self.assertEqual(arrow.schema.field('mv_v').type, pa.int64()) + rows = sorted(arrow.to_pylist(), key=lambda r: r['id']) + self.assertEqual(rows, [ + {'id': 1, 'mv_v': 10}, + {'id': 2, 'mv_v': 20}, + ]) + + def test_pk_nested_subfield_evolution_merge(self): + s0 = pa.schema([('id', pa.int64()), ('mv', _MV_PA)]) + table = self._create('nsub_pk', s0, primary_keys=['id'], bucket='1') + self._write(table, pa.Table.from_pylist( + [{'id': 1, 'mv': {'latest_version': 1, 'latest_value': 'a'}}], schema=s0)) + self.catalog.alter_table( + 'default.nsub_pk', + [SchemaChange.add_column(['mv', 'score'], AtomicType('INT'))], False) + table = self.catalog.get_table('default.nsub_pk') + s1 = pa.schema([('id', pa.int64()), + ('mv', pa.struct([('latest_version', pa.int64()), + ('latest_value', pa.string()), + ('score', pa.int32())]))]) + self._write(table, pa.Table.from_pylist( + [{'id': 1, 'mv': {'latest_version': 2, 'latest_value': 'b', 'score': 9}}], + schema=s1)) + rows = self._read_sorted(table) + self.assertEqual(len(rows), 1) + self.assertEqual(rows[0]['mv'], + {'latest_version': 2, 'latest_value': 'b', 'score': 9}) + + +class SchemaEvolutionNestedContainerTest(_NestedBase): + """Sub-field evolution of a ROW nested inside an ARRAY / MAP.""" + + def test_array_of_row_add_and_rename_subfield(self): + elem = pa.struct([('a', pa.int64()), ('b', pa.string())]) + s0 = pa.schema([('id', pa.int64()), ('arr', pa.list_(elem))]) + table = self._create('narr', s0) + self._write(table, pa.Table.from_pylist( + [{'id': 1, 'arr': [{'a': 1, 'b': 'x'}, {'a': 2, 'b': 'y'}]}], schema=s0)) + # Descend through the array element into the ROW. + self.catalog.alter_table('default.narr', [ + SchemaChange.add_column(['arr', 'element', 'c'], AtomicType('INT')), + SchemaChange.rename_column(['arr', 'element', 'b'], 'bb'), + ], False) + table = self.catalog.get_table('default.narr') + rows = self._read_sorted(table) + self.assertEqual(rows[0]['arr'], + [{'a': 1, 'bb': 'x', 'c': None}, + {'a': 2, 'bb': 'y', 'c': None}]) + + def test_map_of_row_add_subfield(self): + val = pa.struct([('a', pa.int64()), ('b', pa.string())]) + s0 = pa.schema([('id', pa.int64()), ('m', pa.map_(pa.string(), val))]) + table = self._create('nmap', s0) + self._write(table, pa.Table.from_pylist( + [{'id': 1, 'm': [('k', {'a': 1, 'b': 'x'})]}], schema=s0)) + # Descend through the map value into the ROW. + self.catalog.alter_table( + 'default.nmap', + [SchemaChange.add_column(['m', 'value', 'c'], AtomicType('INT'))], False) + table = self.catalog.get_table('default.nmap') + rows = self._read_sorted(table) + self.assertEqual(rows[0]['m'], [('k', {'a': 1, 'b': 'x', 'c': None})]) + + def test_array_wrapper_token_validated(self): + # The token consumed when descending through an ARRAY must be + # 'element'; an unknown step must not silently mutate the schema. + elem = pa.struct([('a', pa.int64())]) + s0 = pa.schema([('id', pa.int64()), ('arr', pa.list_(elem))]) + self._create('ntok_arr', s0) + with self.assertRaises(RuntimeError) as cm: + self.catalog.alter_table( + 'default.ntok_arr', + [SchemaChange.add_column(['arr', 'wrong', 'c'], AtomicType('INT'))], + False) + self.assertIn('arr.wrong.c', str(cm.exception)) + # The canonical token still works. + self.catalog.alter_table( + 'default.ntok_arr', + [SchemaChange.add_column(['arr', 'element', 'c'], AtomicType('INT'))], + False) - def test_nested_drop_subfield_raises(self): - self._create_struct_table('gap_drop') + def test_array_element_type_update(self): + # The canonical path for promoting an array's element type descends + # through the 'element' token; old files are cast at read time. + s0 = pa.schema([('id', pa.int64()), ('a2', pa.list_(pa.int32()))]) + table = self._create('nelem_type', s0) + self._write(table, pa.Table.from_pylist( + [{'id': 1, 'a2': [1, 2]}], schema=s0)) + self.catalog.alter_table( + 'default.nelem_type', + [SchemaChange.update_column_type(['a2', 'element'], AtomicType('BIGINT'))], + False) + table = self.catalog.get_table('default.nelem_type') + s1 = pa.schema([('id', pa.int64()), ('a2', pa.list_(pa.int64()))]) + self._write(table, pa.Table.from_pylist( + [{'id': 2, 'a2': [3]}], schema=s1)) + rb = table.new_read_builder() + splits = rb.new_scan().plan().splits() + arrow = rb.new_read().to_arrow(splits) + self.assertEqual(arrow.schema.field('a2').type, pa.list_(pa.int64())) + rows = sorted(arrow.to_pylist(), key=lambda r: r['id']) + self.assertEqual(rows, [{'id': 1, 'a2': [1, 2]}, {'id': 2, 'a2': [3]}]) + + def test_whole_struct_type_replacement_rejected(self): + # Replacing a whole ROW type would carry caller-supplied nested ids + # that corrupt the id model; it must be rejected at alter time. + elem = pa.struct([('a', pa.int32()), ('b', pa.string())]) + s0 = pa.schema([('id', pa.int64()), ('mv', elem)]) + self._create('nrow_replace', s0) + new_row = _paimon_type(pa.struct([('a', pa.int64()), ('c', pa.string())])) with self.assertRaises(RuntimeError) as cm: self.catalog.alter_table( - 'default.gap_drop', - [SchemaChange.drop_column(['mv', 'latest_value'])], False) - self.assertIn('latest_value', str(cm.exception)) + 'default.nrow_replace', + [SchemaChange.update_column_type('mv', new_row)], False) + self.assertIn('cannot be converted', str(cm.exception)) + + def test_align_handles_sliced_arrays(self): + # The list/map rebuilds read offsets/raw buffers; a sliced input + # must be re-materialized, not read through stale parent offsets. + from pypaimon.read.reader.data_file_batch_reader import \ + DataFileBatchReader + reader = DataFileBatchReader.__new__(DataFileBatchReader) + sliced_list = pa.array( + [[1, 2], [3], [4, 5, 6], None], type=pa.list_(pa.int32())).slice(1, 3) + out = reader._align_array_by_id( + sliced_list, + ArrayType(True, AtomicType('INT')), + ArrayType(True, AtomicType('BIGINT'))) + self.assertEqual(out.to_pylist(), [[3], [4, 5, 6], None]) + self.assertEqual(out.type, pa.list_(pa.int64())) + + sliced_map = pa.array( + [[('a', 1)], [('b', 2)], None], + type=pa.map_(pa.string(), pa.int32())).slice(1, 2) + out = reader._align_array_by_id( + sliced_map, + MapType(True, AtomicType('STRING'), AtomicType('INT')), + MapType(True, AtomicType('STRING'), AtomicType('BIGINT'))) + self.assertEqual(out.to_pylist(), [[('b', 2)], None]) + + def test_map_wrapper_token_validated(self): + # The token consumed when descending through a MAP must be 'value'. + val = pa.struct([('a', pa.int64())]) + s0 = pa.schema([('id', pa.int64()), ('m', pa.map_(pa.string(), val))]) + self._create('ntok_map', s0) + with self.assertRaises(RuntimeError) as cm: + self.catalog.alter_table( + 'default.ntok_map', + [SchemaChange.add_column(['m', 'wrong', 'c'], AtomicType('INT'))], + False) + self.assertIn('m.wrong.c', str(cm.exception)) + self.catalog.alter_table( + 'default.ntok_map', + [SchemaChange.add_column(['m', 'value', 'c'], AtomicType('INT'))], + False) + + +class SchemaEvolutionConstructedToStringTest(_NestedBase): + """update column type from ROW/ARRAY/MAP to STRING: old files must be + materialized as the engine's string rendering at read time.""" + + def test_row_to_string(self): + s0 = pa.schema([('id', pa.int64()), + ('mv', pa.struct([('a', pa.int32()), ('b', pa.string())]))]) + table = self._create('c2s_row', s0) + self._write(table, pa.Table.from_pylist( + [{'id': 1, 'mv': {'a': 1, 'b': 'x'}}], schema=s0)) + self.catalog.alter_table( + 'default.c2s_row', + [SchemaChange.update_column_type('mv', AtomicType('STRING'))], False) + table = self.catalog.get_table('default.c2s_row') + s1 = pa.schema([('id', pa.int64()), ('mv', pa.string())]) + self._write(table, pa.Table.from_pylist( + [{'id': 2, 'mv': 's2'}], schema=s1)) + + rows = self._read_sorted(table) + self.assertEqual(rows, [ + {'id': 1, 'mv': '{1, x}'}, + {'id': 2, 'mv': 's2'}, + ]) + + def test_array_to_string(self): + s0 = pa.schema([('id', pa.int64()), ('arr', pa.list_(pa.int32()))]) + table = self._create('c2s_arr', s0) + self._write(table, pa.Table.from_pylist( + [{'id': 1, 'arr': [1, 2, 3]}], schema=s0)) + self.catalog.alter_table( + 'default.c2s_arr', + [SchemaChange.update_column_type('arr', AtomicType('STRING'))], False) + table = self.catalog.get_table('default.c2s_arr') + rows = self._read_sorted(table) + self.assertEqual(rows[0]['arr'], '[1, 2, 3]') + + def test_map_to_string(self): + s0 = pa.schema([('id', pa.int64()), + ('m', pa.map_(pa.string(), pa.int32()))]) + table = self._create('c2s_map', s0) + self._write(table, pa.Table.from_pylist( + [{'id': 1, 'm': [('k', 7)]}], schema=s0)) + self.catalog.alter_table( + 'default.c2s_map', + [SchemaChange.update_column_type('m', AtomicType('STRING'))], False) + table = self.catalog.get_table('default.c2s_map') + rows = self._read_sorted(table) + self.assertEqual(rows[0]['m'], '{k -> 7}') + + def test_row_to_string_null_semantics(self): + s0 = pa.schema([('id', pa.int64()), + ('mv', pa.struct([('a', pa.int32()), ('b', pa.string())]))]) + table = self._create('c2s_null', s0) + self._write(table, pa.Table.from_pylist([ + {'id': 1, 'mv': None}, + {'id': 2, 'mv': {'a': None, 'b': 'x'}}, + ], schema=s0)) + self.catalog.alter_table( + 'default.c2s_null', + [SchemaChange.update_column_type('mv', AtomicType('STRING'))], False) + table = self.catalog.get_table('default.c2s_null') + rows = self._read_sorted(table) + # A NULL container stays NULL; a NULL sub-value renders as 'null'. + self.assertIsNone(rows[0]['mv']) + self.assertEqual(rows[1]['mv'], '{null, x}') + + def test_vector_to_string_rejected(self): + # There is no read-time string rendering for vectors, so the type + # change must be rejected at alter time instead of failing on read. + s0 = pa.schema([('id', pa.int64()), + ('embed', pa.list_(pa.float32(), 3))]) + table = self._create('c2s_vec', s0) + self._write(table, pa.Table.from_pylist( + [{'id': 1, 'embed': [1.0, 2.0, 3.0]}], schema=s0)) + with self.assertRaises(RuntimeError) as cm: + self.catalog.alter_table( + 'default.c2s_vec', + [SchemaChange.update_column_type('embed', AtomicType('STRING'))], + False) + self.assertIn('cannot be converted', str(cm.exception)) + # The vector column itself still reads fine. + rows = self._read_sorted(table) + self.assertEqual(rows[0]['embed'], [1.0, 2.0, 3.0]) + + def test_nested_subfield_row_to_string(self): + inner = pa.struct([('a', pa.int32())]) + s0 = pa.schema([('id', pa.int64()), + ('mv', pa.struct([('inner', inner)]))]) + table = self._create('c2s_nested', s0) + self._write(table, pa.Table.from_pylist( + [{'id': 1, 'mv': {'inner': {'a': 1}}}], schema=s0)) + self.catalog.alter_table( + 'default.c2s_nested', + [SchemaChange.update_column_type(['mv', 'inner'], AtomicType('STRING'))], + False) + table = self.catalog.get_table('default.c2s_nested') + rows = self._read_sorted(table) + self.assertEqual(rows[0]['mv'], {'inner': '{1}'}) + + +class NestedFieldIdModelTest(unittest.TestCase): + """Globally-unique nested field ids, mirrored from the engine id model.""" + + def test_nested_ids_are_globally_unique(self): + s = pa.schema([('id', pa.int64()), ('mv', _MV_PA), ('x', pa.string())]) + fields = PyarrowFieldParser.to_paimon_schema(s) + ids = set() + for f in fields: + ids.add(f.id) + collect_field_ids(f.type, ids) + # id(0), mv(1), latest_version(2), latest_value(3), x(4) + self.assertEqual(ids, {0, 1, 2, 3, 4}) + self.assertEqual(current_highest_field_id(fields), 4) + + def test_flat_schema_ids_unchanged(self): + fields = PyarrowFieldParser.to_paimon_schema( + pa.schema([('a', pa.int64()), ('b', pa.string()), ('c', pa.int32())])) + self.assertEqual([f.id for f in fields], [0, 1, 2]) + + def test_reassign_field_id_depth_first_order(self): + inner = RowType(True, [DataField(0, 'c', AtomicType('INT'))]) + mid = RowType(True, [DataField(0, 'b', inner)]) + outer = RowType(True, [DataField(0, 'a', mid), + DataField(0, 'd', AtomicType('INT'))]) + result = reassign_field_id(outer, AtomicInteger(2)) + ids = set() + collect_field_ids(result, ids) + self.assertEqual(ids, {3, 4, 5, 6}) + + def test_duplicate_field_id_raises(self): + bad = [ + DataField(0, 'a', AtomicType('INT')), + DataField(1, 'b', RowType(True, [DataField(0, 'c', AtomicType('INT'))])), + ] + with self.assertRaises(ValueError): + current_highest_field_id(bad) + + +class SupportsCastTest(unittest.TestCase): + + def test_supported_casts(self): + for src, dst in [('INT', 'BIGINT'), ('FLOAT', 'DOUBLE'), ('INT', 'STRING'), + ('DOUBLE', 'INT'), ('DECIMAL(10, 4)', 'DECIMAL(10, 2)')]: + self.assertTrue(supports_cast(AtomicType(src), AtomicType(dst)), + '{} -> {}'.format(src, dst)) + + def test_unsupported_casts(self): + for src, dst in [('BIGINT', 'DATE'), ('BOOLEAN', 'DATE')]: + self.assertFalse(supports_cast(AtomicType(src), AtomicType(dst)), + '{} -> {}'.format(src, dst)) + + def test_constructed_to_string(self): + # ROW/ARRAY/MAP have a read-time string rendering; vector and + # multiset do not, so their type change must be rejected. + row = RowType(True, [DataField(0, 'a', AtomicType('INT'))]) + arr = ArrayType(True, AtomicType('INT')) + m = MapType(True, AtomicType('STRING'), AtomicType('INT')) + for src in (row, arr, m): + self.assertTrue(supports_cast(src, AtomicType('STRING')), str(src)) + vec = VectorType(True, AtomicType('FLOAT'), 3) + ms = MultisetType(True, AtomicType('INT')) + for src in (vec, ms): + self.assertFalse(supports_cast(src, AtomicType('STRING')), str(src)) + + def test_constructed_to_differently_shaped_constructed_rejected(self): + # Reshaping a constructed type must go through sub-field / + # 'element' / 'value' paths; a whole-type replacement would carry + # caller-supplied nested ids that corrupt the id model. + self.assertFalse(supports_cast( + RowType(True, [DataField(0, 'a', AtomicType('INT'))]), + RowType(True, [DataField(0, 'a', AtomicType('BIGINT'))]))) + self.assertFalse(supports_cast( + ArrayType(True, AtomicType('INT')), + ArrayType(True, AtomicType('BIGINT')))) + self.assertFalse(supports_cast( + VectorType(True, AtomicType('FLOAT'), 3), + VectorType(True, AtomicType('FLOAT'), 5))) + # Only the outer nullability differing is still an identity cast. + self.assertTrue(supports_cast( + RowType(True, [DataField(2, 'a', AtomicType('INT'))]), + RowType(False, [DataField(2, 'a', AtomicType('INT'))]))) if __name__ == '__main__':