From 0dccd4755a9786c94d5f1e23f6a2cb55156edde9 Mon Sep 17 00:00:00 2001 From: TheR1sing3un Date: Wed, 10 Jun 2026 01:49:43 +0800 Subject: [PATCH 1/5] [python] Support schema evolution of nested struct sub-fields Read-time schema evolution previously aligned only top-level columns by field id; sub-fields inside a ROW (and a ROW nested in an ARRAY/MAP) could not evolve: adding one silently created a top-level column, and rename/drop/update-type raised because the schema manager only handled the last path element. - Assign globally-unique ids to nested sub-fields at create time and compute highestFieldId recursively, so nested ids never collide with top-level ones. - Recurse schema changes along the dotted field-name path (transparently through ARRAY/MAP wrappers) for add/rename/drop/update-type/update-nullability/ update-comment, allocating new ids from the persisted highestFieldId. - Validate update-column-type against the cast-support rules. - Align nested sub-fields by field id at read time: reorder, pad missing with NULL, follow renames, and cast changed types, recursing into struct/array/map. Add tests covering nested add/rename/drop/update-type round-trips (append-only and primary-key), ARRAY/MAP<.,ROW> sub-fields, the id model, and the cast rules. --- paimon-python/pypaimon/casting/__init__.py | 16 ++ .../pypaimon/casting/data_type_casts.py | 167 +++++++++++ .../read/reader/data_file_batch_reader.py | 112 +++++--- paimon-python/pypaimon/schema/data_types.py | 93 ++++++- .../pypaimon/schema/schema_manager.py | 262 +++++++++++++----- paimon-python/pypaimon/schema/table_schema.py | 4 +- .../pypaimon/table/special_fields.py | 9 +- .../schema_evolution_nested_read_test.py | 249 +++++++++++++---- 8 files changed, 750 insertions(+), 162 deletions(-) create mode 100644 paimon-python/pypaimon/casting/__init__.py create mode 100644 paimon-python/pypaimon/casting/data_type_casts.py diff --git a/paimon-python/pypaimon/casting/__init__.py b/paimon-python/pypaimon/casting/__init__.py new file mode 100644 index 000000000000..13a83393a912 --- /dev/null +++ b/paimon-python/pypaimon/casting/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/paimon-python/pypaimon/casting/data_type_casts.py b/paimon-python/pypaimon/casting/data_type_casts.py new file mode 100644 index 000000000000..fc51c6eff855 --- /dev/null +++ b/paimon-python/pypaimon/casting/data_type_casts.py @@ -0,0 +1,167 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Type-cast support rules used to validate ``update column type`` schema +changes. + +The rules mirror the engine-wide cast specification so a type change accepted +here is one the read path can also materialize: an *implicit* cast is a safe +widening (e.g. INT -> BIGINT, any numeric -> DECIMAL/DOUBLE), while an +*explicit* cast covers the broader, possibly lossy conversions a user opts into +(e.g. DOUBLE -> INT truncation, anything -> STRING). Read-time execution then +applies the conversion leniently. +""" + +from pypaimon.schema.data_types import (ArrayType, AtomicType, MapType, + MultisetType, RowType, VectorType) + +# ---- Type roots -------------------------------------------------------------- + +CHAR = "CHAR" +VARCHAR = "VARCHAR" +BOOLEAN = "BOOLEAN" +BINARY = "BINARY" +VARBINARY = "VARBINARY" +DECIMAL = "DECIMAL" +TINYINT = "TINYINT" +SMALLINT = "SMALLINT" +INTEGER = "INTEGER" +BIGINT = "BIGINT" +FLOAT = "FLOAT" +DOUBLE = "DOUBLE" +DATE = "DATE" +TIME = "TIME" +TIMESTAMP = "TIMESTAMP" +TIMESTAMP_LTZ = "TIMESTAMP_LTZ" +ARRAY = "ARRAY" +MAP = "MAP" +MULTISET = "MULTISET" +ROW = "ROW" +VECTOR = "VECTOR" +VARIANT = "VARIANT" +BLOB = "BLOB" + +# ---- Families ---------------------------------------------------------------- + +CHARACTER_STRING = {CHAR, VARCHAR} +BINARY_STRING = {BINARY, VARBINARY} +INTEGER_NUMERIC = {TINYINT, SMALLINT, INTEGER, BIGINT} +NUMERIC = INTEGER_NUMERIC | {FLOAT, DOUBLE, DECIMAL} +TIMESTAMP_FAMILY = {TIMESTAMP, TIMESTAMP_LTZ} +TIME_FAMILY = {TIME} +DATETIME = {DATE, TIME, TIMESTAMP, TIMESTAMP_LTZ} +PREDEFINED = { + CHAR, VARCHAR, BOOLEAN, BINARY, VARBINARY, DECIMAL, + TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, DOUBLE, + DATE, TIME, TIMESTAMP, TIMESTAMP_LTZ, +} +CONSTRUCTED = {ARRAY, MAP, MULTISET, ROW, VECTOR} + + +def _root(data_type) -> str: + if isinstance(data_type, RowType): + return ROW + if isinstance(data_type, ArrayType): + return ARRAY + if isinstance(data_type, MapType): + return MAP + if isinstance(data_type, MultisetType): + return MULTISET + if isinstance(data_type, VectorType): + return VECTOR + if isinstance(data_type, AtomicType): + t = data_type.type.upper() + if t.startswith("DECIMAL") or t.startswith("NUMERIC") or t.startswith("DEC"): + return DECIMAL + if t in ("INT", "INTEGER"): + return INTEGER + if t in (TINYINT, SMALLINT, BIGINT, FLOAT, DOUBLE, BOOLEAN, DATE): + return t + if t == "STRING" or t.startswith("VARCHAR"): + return VARCHAR + if t.startswith("CHAR"): + return CHAR + if t == "BYTES" or t.startswith("VARBINARY"): + return VARBINARY + if t.startswith("BINARY"): + return BINARY + if t == "BLOB": + return BLOB + if t.startswith("TIMESTAMP_LTZ"): + return TIMESTAMP_LTZ + if t.startswith("TIMESTAMP"): + return TIMESTAMP + if t.startswith("TIME"): + return TIME + if t == "VARIANT": + return VARIANT + return None + + +def _build_rules(): + implicit = {} + explicit = {} + # Identity cast for every root. + for root in (PREDEFINED | CONSTRUCTED | {VARIANT, BLOB}): + implicit[root] = {root} + explicit[root] = set() + + def rule(target, implicit_from=None, explicit_from=None): + implicit[target] |= set(implicit_from or set()) + explicit[target] |= set(explicit_from or set()) + + rule(CHAR, {CHAR}, PREDEFINED | CONSTRUCTED) + rule(VARCHAR, CHARACTER_STRING, PREDEFINED | CONSTRUCTED) + rule(BOOLEAN, {BOOLEAN}, CHARACTER_STRING | INTEGER_NUMERIC) + rule(BINARY, {BINARY}, CHARACTER_STRING | {VARBINARY}) + rule(VARBINARY, BINARY_STRING, CHARACTER_STRING | {BINARY}) + rule(DECIMAL, NUMERIC, CHARACTER_STRING | {BOOLEAN, TIMESTAMP, TIMESTAMP_LTZ}) + int_explicit = NUMERIC | CHARACTER_STRING | {BOOLEAN, TIMESTAMP, TIMESTAMP_LTZ} + rule(TINYINT, {TINYINT}, int_explicit) + rule(SMALLINT, {TINYINT, SMALLINT}, int_explicit) + rule(INTEGER, {TINYINT, SMALLINT, INTEGER}, int_explicit) + rule(BIGINT, {TINYINT, SMALLINT, INTEGER, BIGINT}, int_explicit) + rule(FLOAT, {TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, DECIMAL}, int_explicit) + rule(DOUBLE, NUMERIC, CHARACTER_STRING | {BOOLEAN, TIMESTAMP, TIMESTAMP_LTZ}) + rule(DATE, {DATE, TIMESTAMP}, TIMESTAMP_FAMILY | CHARACTER_STRING) + rule(TIME, {TIME, TIMESTAMP}, TIME_FAMILY | TIMESTAMP_FAMILY | CHARACTER_STRING) + rule(TIMESTAMP, {TIMESTAMP, TIMESTAMP_LTZ}, DATETIME | CHARACTER_STRING | NUMERIC) + rule(TIMESTAMP_LTZ, {TIMESTAMP_LTZ, TIMESTAMP}, DATETIME | CHARACTER_STRING | NUMERIC) + return implicit, explicit + + +_IMPLICIT_RULES, _EXPLICIT_RULES = _build_rules() + + +def supports_cast(source_type, target_type, allow_explicit: bool = True) -> bool: + """Whether ``source_type`` can be cast to ``target_type`` for a column type + change. ``allow_explicit`` permits the broader (possibly lossy) conversions + in addition to the safe widening ones.""" + source_root = _root(source_type) + target_root = _root(target_type) + if source_root is None or target_root is None: + return False + # A NOT NULL target cannot accept a nullable source unless explicitly allowed. + if source_type.nullable and not target_type.nullable and not allow_explicit: + return False + if source_root == target_root: + return True + if source_root in _IMPLICIT_RULES.get(target_root, set()): + return True + if allow_explicit and source_root in _EXPLICIT_RULES.get(target_root, set()): + return True + return False diff --git a/paimon-python/pypaimon/read/reader/data_file_batch_reader.py b/paimon-python/pypaimon/read/reader/data_file_batch_reader.py index e3bd51143936..03d8ce6d6827 100644 --- a/paimon-python/pypaimon/read/reader/data_file_batch_reader.py +++ b/paimon-python/pypaimon/read/reader/data_file_batch_reader.py @@ -18,13 +18,15 @@ from typing import List, Optional import pyarrow as pa +import pyarrow.compute as pc from pyarrow import RecordBatch from pypaimon.common.file_io import FileIO from pypaimon.read.partition_info import PartitionInfo from pypaimon.read.reader.format_blob_reader import FormatBlobReader from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader -from pypaimon.schema.data_types import DataField, PyarrowFieldParser +from pypaimon.schema.data_types import (ArrayType, DataField, MapType, + PyarrowFieldParser, RowType) from pypaimon.table.special_fields import SpecialFields @@ -57,55 +59,99 @@ def __init__(self, format_reader: RecordBatchReader, index_mapping: List[int], p self.file_io = file_io # Per-file field-id normalization: map the physically-read columns # (the file's own field order/names) onto the latest read target by - # field id, padding missing ids with NULL. ``None`` when there is no - # evolution to reconcile (identity) -- the common path stays zero-copy. - self._normalize_positions, self._normalize_names = \ - self._build_normalize_plan(file_data_fields, target_data_fields) + # field id, padding missing ids with NULL and recursing into nested + # ROW / ARRAY / MAP<.,ROW> sub-fields the same way. ``None`` when + # there is no evolution to reconcile -- the common path stays zero-copy. + self._normalize_plan = self._build_normalize_plan(file_data_fields, target_data_fields) @staticmethod def _build_normalize_plan(file_data_fields, target_data_fields): """Build a per-file field-id alignment plan. - Returns ``(positions, names)`` where ``positions[i]`` is the column - index in the physically-read batch carrying ``target_data_fields[i]`` - (matched by field id), or -1 if the file does not contain that id (pad - NULL). ``names[i]`` is the latest target name. Returns ``(None, None)`` - when the plan is the identity (no evolution), so the caller skips - normalization and stays zero-copy. + Returns a list of ``(pos, file_field, target_field)`` -- one per target + field, in target order -- where ``pos`` is the column index in the + physically-read batch carrying ``target_field`` (matched by field id), + or -1 if the file does not contain that id (pad NULL). Returns ``None`` + when the file already matches the target exactly (no evolution), so the + caller stays zero-copy. """ if file_data_fields is None or target_data_fields is None: - return None, None + return None + # Recursive equality covers nested sub-field changes too: any rename / + # add / drop / type change at any depth makes the file != target. + if file_data_fields == target_data_fields: + return None file_id_to_pos = {f.id: i for i, f in enumerate(file_data_fields)} - positions = [] - names = [] - # Identity only when every target maps to the same physical position - # AND already carries the same name -- a rename keeps the position but - # changes the name, which still requires a relabel pass. - identity = len(file_data_fields) == len(target_data_fields) - for i, target in enumerate(target_data_fields): + plan = [] + for target in target_data_fields: pos = file_id_to_pos.get(target.id, -1) - positions.append(pos) - names.append(target.name) - if pos != i or (pos >= 0 and file_data_fields[pos].name != target.name): - identity = False - if identity: - return None, None - return positions, names + file_field = file_data_fields[pos] if pos >= 0 else None + plan.append((pos, file_field, target)) + return plan def _normalize_batch(self, record_batch: RecordBatch) -> RecordBatch: """Reorder/pad the physically-read batch onto the latest read target by - field id, and relabel columns to the latest names. Missing ids become - all-NULL columns; types are reconciled later by _align_batch_to_read_schema.""" - if self._normalize_positions is None: + field id, relabel columns to the latest names, and align nested ROW + sub-fields by id. Missing ids become typed all-NULL columns.""" + if self._normalize_plan is None: return record_batch num_rows = record_batch.num_rows arrays = [] - for pos in self._normalize_positions: + names = [] + for pos, file_field, target_field in self._normalize_plan: + target_pa_type = PyarrowFieldParser.from_paimon_type(target_field.type) if pos < 0: - arrays.append(pa.nulls(num_rows)) + arrays.append(pa.nulls(num_rows, type=target_pa_type)) else: - arrays.append(record_batch.column(pos)) - return pa.RecordBatch.from_arrays(arrays, names=self._normalize_names) + arrays.append(self._align_array_by_id( + record_batch.column(pos), file_field.type, target_field.type)) + names.append(target_field.name) + return pa.RecordBatch.from_arrays(arrays, names=names) + + def _align_array_by_id(self, array, file_type, target_type): + """Return *array* converted to *target_type*, matching ROW sub-fields by + field id (reorder, pad missing with NULL, follow renames, cast changed + types) recursively, transparently through ARRAY/MAP wrappers.""" + if isinstance(target_type, RowType) and isinstance(file_type, RowType): + n = len(array) + file_id_to_pos = {f.id: i for i, f in enumerate(file_type.fields)} + children = [] + pa_fields = [] + for tsub in target_type.fields: + p = file_id_to_pos.get(tsub.id, -1) + if p < 0: + child = pa.nulls(n, type=PyarrowFieldParser.from_paimon_type(tsub.type)) + else: + child = self._align_array_by_id( + array.field(p), file_type.fields[p].type, tsub.type) + children.append(child) + pa_fields.append(pa.field(tsub.name, child.type, nullable=tsub.type.nullable)) + # Preserve the struct's own null mask; child values under a null + # struct are irrelevant. + return pa.StructArray.from_arrays( + children, fields=pa_fields, mask=pc.is_null(array)) + if isinstance(target_type, ArrayType) and isinstance(file_type, ArrayType): + aligned_values = self._align_array_by_id( + array.values, file_type.element, target_type.element) + return pa.ListArray.from_arrays( + array.offsets, aligned_values, mask=pc.is_null(array)) + if isinstance(target_type, MapType) and isinstance(file_type, MapType): + aligned_items = self._align_array_by_id( + array.items, file_type.value, target_type.value) + # MapArray.from_arrays cannot carry a null mask (a null map would + # collapse to an empty one), so rebuild from buffers, reusing the + # original validity/offset buffers and only swapping the value child. + target_pa = PyarrowFieldParser.from_paimon_type(target_type) + entries = pa.StructArray.from_arrays( + [array.keys, aligned_items], + fields=[target_pa.key_field, target_pa.item_field]) + return pa.Array.from_buffers( + target_pa, len(array), array.buffers()[:2], children=[entries]) + # Leaf / non-nested: cast to the target type when it differs. + target_pa_type = PyarrowFieldParser.from_paimon_type(target_type) + if array.type != target_pa_type: + return array.cast(target_pa_type, safe=False) + return array def read_arrow_batch(self, start_idx=None, end_idx=None) -> Optional[RecordBatch]: if isinstance(self.format_reader, FormatBlobReader): diff --git a/paimon-python/pypaimon/schema/data_types.py b/paimon-python/pypaimon/schema/data_types.py index 9530996a9d6d..da128d96da37 100755 --- a/paimon-python/pypaimon/schema/data_types.py +++ b/paimon-python/pypaimon/schema/data_types.py @@ -25,6 +25,15 @@ import pyarrow from pyarrow import types +# Field ids at or above this value are reserved for system fields (sequence +# number, value kind, row id, ...). User field ids stay strictly below it, so +# the highest-user-field-id computation can ignore anything from here up. +SYSTEM_FIELD_ID_START = 2147483647 // 2 + + +def is_system_field_id(field_id: int) -> bool: + return field_id >= SYSTEM_FIELD_ID_START + class AtomicInteger: @@ -370,6 +379,73 @@ def __str__(self) -> str: return "ROW<{}>{}".format(', '.join(field_strs), null_suffix) +def reassign_field_id(data_type: DataType, field_id: "AtomicInteger") -> DataType: + """Return a copy of *data_type* with every nested field id reassigned from + *field_id*, depth-first with children allocated before their parent field. + + Mirrors the canonical id-reassignment used when a column (possibly carrying + a nested ROW/ARRAY/MAP) is added, so nested subfields get globally-unique + ids drawn from the schema's running counter rather than struct-local ones. + """ + if isinstance(data_type, RowType): + new_fields = [] + for field in data_type.fields: + # Visit the nested type first, then allocate this field's id, so the + # ordering matches the rest of the engine ecosystem. + new_type = reassign_field_id(field.type, field_id) + new_id = field_id.increment_and_get() + new_fields.append(DataField( + new_id, field.name, new_type, field.description, field.default_value)) + return RowType(data_type.nullable, new_fields) + if isinstance(data_type, ArrayType): + return ArrayType(data_type.nullable, reassign_field_id(data_type.element, field_id)) + if isinstance(data_type, VectorType): + return VectorType( + data_type.nullable, reassign_field_id(data_type.element, field_id), data_type.length) + if isinstance(data_type, MultisetType): + return MultisetType(data_type.nullable, reassign_field_id(data_type.element, field_id)) + if isinstance(data_type, MapType): + new_key = reassign_field_id(data_type.key, field_id) + new_value = reassign_field_id(data_type.value, field_id) + return MapType(data_type.nullable, new_key, new_value) + return data_type + + +def collect_field_ids(data_type: DataType, field_ids: set): + """Collect all (nested) field ids reachable from *data_type* into *field_ids*, + raising on a duplicate id (a broken schema).""" + if isinstance(data_type, RowType): + for field in data_type.fields: + if field.id in field_ids: + raise ValueError( + "Broken schema, field id {} is duplicated.".format(field.id)) + field_ids.add(field.id) + collect_field_ids(field.type, field_ids) + elif isinstance(data_type, (ArrayType, VectorType, MultisetType)): + collect_field_ids(data_type.element, field_ids) + elif isinstance(data_type, MapType): + collect_field_ids(data_type.key, field_ids) + collect_field_ids(data_type.value, field_ids) + + +def current_highest_field_id(fields: List[DataField]) -> int: + """Highest user field id across *fields*, recursing into nested ROW/ARRAY/MAP. + + System field ids are excluded. Returns -1 for an empty/system-only schema. + The result is persisted as ``highestFieldId``; later schema changes seed + their id counter from the stored value (not from the live fields, since a + dropped field may have carried a higher id than any survivor). + """ + field_ids = set() + for field in fields: + if field.id in field_ids: + raise ValueError("Broken schema, field id {} is duplicated.".format(field.id)) + field_ids.add(field.id) + collect_field_ids(field.type, field_ids) + user_ids = [fid for fid in field_ids if not is_system_field_id(fid)] + return max(user_ids) if user_ids else -1 + + class Keyword(Enum): CHAR = "CHAR" VARCHAR = "VARCHAR" @@ -717,12 +793,21 @@ def to_paimon_field(field_idx: int, pa_field: pyarrow.Field) -> DataField: @staticmethod def to_paimon_schema(pa_schema: pyarrow.Schema) -> List[DataField]: - # Convert PyArrow schema to Paimon fields + # Convert PyArrow schema to Paimon fields, assigning globally-unique ids: + # each top-level field takes the next id, then its (possibly nested) type + # has its subfield ids reassigned from the same running counter. A flat + # schema keeps the plain 0,1,2,... ids; nested subfields get ids that do + # not collide with top-level ones. + field_id = AtomicInteger(-1) fields = [] - for i, pa_field in enumerate(pa_schema): + for pa_field in pa_schema: pa_field: pyarrow.Field - data_field = PyarrowFieldParser.to_paimon_field(i, pa_field) - fields.append(data_field) + top_id = field_id.increment_and_get() + data_type = PyarrowFieldParser.to_paimon_type(pa_field.type, pa_field.nullable) + data_type = reassign_field_id(data_type, field_id) + description = pa_field.metadata.get(b'description', b'').decode('utf-8') \ + if pa_field.metadata and b'description' in pa_field.metadata else None + fields.append(DataField(top_id, pa_field.name, data_type, description)) return fields @staticmethod diff --git a/paimon-python/pypaimon/schema/schema_manager.py b/paimon-python/pypaimon/schema/schema_manager.py index d01549c71bc5..99ebc1341bd9 100644 --- a/paimon-python/pypaimon/schema/schema_manager.py +++ b/paimon-python/pypaimon/schema/schema_manager.py @@ -26,7 +26,9 @@ from pypaimon.schema.column_directive_utils import ( apply_add_column_directive, apply_directives, remove_dropped_directive_options) -from pypaimon.schema.data_types import AtomicInteger, DataField +from pypaimon.casting.data_type_casts import supports_cast +from pypaimon.schema.data_types import (ArrayType, AtomicInteger, DataField, + MapType, RowType, reassign_field_id) from pypaimon.schema.schema import Schema from pypaimon.schema.schema_change import (AddColumn, DropColumn, RemoveOption, RenameColumn, SchemaChange, @@ -44,6 +46,108 @@ def _find_field_index(fields: List[DataField], field_name: str) -> Optional[int] return None +def _extract_row_data_fields(data_type, out_fields: List[DataField]) -> int: + """Collect the immediate sub-fields reachable from *data_type* into + *out_fields* and return the path depth consumed. A ROW contributes its + fields (depth 1); an ARRAY/MAP is transparent and descends into its + element/value (consuming the ``element``/``value`` path token); anything + else contributes nothing (depth 1).""" + if isinstance(data_type, RowType): + out_fields.extend(data_type.fields) + return 1 + if isinstance(data_type, ArrayType): + return _extract_row_data_fields(data_type.element, out_fields) + 1 + if isinstance(data_type, MapType): + return _extract_row_data_fields(data_type.value, out_fields) + 1 + return 1 + + +def _wrap_new_row_type(data_type, nested_fields: List[DataField]): + """Rebuild *data_type* substituting *nested_fields* at its innermost ROW, + preserving any ARRAY/MAP wrappers.""" + if isinstance(data_type, RowType): + return RowType(data_type.nullable, nested_fields) + if isinstance(data_type, ArrayType): + return ArrayType(data_type.nullable, _wrap_new_row_type(data_type.element, nested_fields)) + if isinstance(data_type, MapType): + return MapType( + data_type.nullable, data_type.key, + _wrap_new_row_type(data_type.value, nested_fields)) + return data_type + + +def _get_root_type(data_type, curr_depth: int, max_depth: int): + """Return the type sitting at ``max_depth`` when walking ARRAY/MAP wrappers + from *data_type* (e.g. the INT in ARRAY>>).""" + if curr_depth == max_depth - 1: + return data_type + if isinstance(data_type, ArrayType): + return _get_root_type(data_type.element, curr_depth + 1, max_depth) + if isinstance(data_type, MapType): + return _get_root_type(data_type.value, curr_depth + 1, max_depth) + return data_type + + +def _get_array_map_type_with_target_type_root(source, target, curr_depth: int, max_depth: int): + """Rebuild *source* with *target* substituted at ``max_depth``, keeping the + ARRAY/MAP wrappers around it intact.""" + if curr_depth == max_depth - 1: + return target + if isinstance(source, ArrayType): + return ArrayType( + source.nullable, + _get_array_map_type_with_target_type_root( + source.element, target, curr_depth + 1, max_depth)) + if isinstance(source, MapType): + return MapType( + source.nullable, source.key, + _get_array_map_type_with_target_type_root( + source.value, target, curr_depth + 1, max_depth)) + return target + + +def _update_intermediate_column(new_fields, previous_fields, depth, prev_depth, + field_names, update_last_fn): + """Walk *field_names* into nested ROW (transparently through ARRAY/MAP), + then run *update_last_fn(depth, fields, name)* on the field list that + directly contains the final path element, rebuilding parent types upward.""" + if depth == len(field_names) - 1: + update_last_fn(depth, new_fields, field_names[depth]) + return + if depth >= len(field_names): + # Path descended through ARRAY/MAP past the last ROW; operate on the + # field that owns the wrapper at the previous depth. + update_last_fn(prev_depth, previous_fields, field_names[prev_depth]) + return + for i, field in enumerate(new_fields): + if field.name != field_names[depth]: + continue + nested_fields: List[DataField] = [] + new_depth = depth + _extract_row_data_fields(field.type, nested_fields) + _update_intermediate_column( + nested_fields, new_fields, new_depth, depth, field_names, update_last_fn) + field = new_fields[i] + new_fields[i] = DataField( + field.id, field.name, + _wrap_new_row_type(field.type, nested_fields), + field.description, field.default_value) + return + raise ColumnNotExistException('.'.join(field_names[:depth + 1])) + + +def _modify_nested_column(new_fields, field_names, update_last_fn): + _update_intermediate_column(new_fields, new_fields, 0, 0, field_names, update_last_fn) + + +def _update_nested_column(new_fields, field_names, update_func): + def update_last(depth, fields, field_name): + idx = _find_field_index(fields, field_name) + if idx is None: + raise ColumnNotExistException('.'.join(field_names)) + fields[idx] = update_func(fields[idx], depth) + _modify_nested_column(new_fields, field_names, update_last) + + def _get_rename_mappings(changes: List[SchemaChange]) -> dict: rename_mappings = {} for change in changes: @@ -55,49 +159,55 @@ def _get_rename_mappings(changes: List[SchemaChange]) -> dict: def _handle_update_column_comment( change: UpdateColumnComment, new_fields: List[DataField] ): - field_name = change.field_names[-1] - field_index = _find_field_index(new_fields, field_name) - if field_index is None: - raise ColumnNotExistException(field_name) - field = new_fields[field_index] - new_fields[field_index] = DataField( - field.id, field.name, field.type, change.new_comment, field.default_value - ) + def update_func(field: DataField, depth: int) -> DataField: + return DataField( + field.id, field.name, field.type, change.new_comment, field.default_value + ) + _update_nested_column(new_fields, change.field_names, update_func) def _handle_update_column_nullability( change: UpdateColumnNullability, new_fields: List[DataField] ): - field_name = change.field_names[-1] - field_index = _find_field_index(new_fields, field_name) - if field_index is None: - raise ColumnNotExistException(field_name) - field = new_fields[field_index] from pypaimon.schema.data_types import DataTypeParser - field_type_dict = field.type.to_dict() - new_type = DataTypeParser.parse_data_type(field_type_dict) - new_type.nullable = change.new_nullability - new_fields[field_index] = DataField( - field.id, field.name, new_type, field.description, field.default_value - ) + field_names = change.field_names + max_depth = len(field_names) + + def update_func(field: DataField, depth: int) -> DataField: + source_root = _get_root_type(field.type, depth, max_depth) + new_root = DataTypeParser.parse_data_type(source_root.to_dict()) + new_root.nullable = change.new_nullability + new_type = _get_array_map_type_with_target_type_root( + field.type, new_root, depth, max_depth) + return DataField( + field.id, field.name, new_type, field.description, field.default_value + ) + _update_nested_column(new_fields, field_names, update_func) def _handle_update_column_type( change: UpdateColumnType, new_fields: List[DataField] ): - field_name = change.field_names[-1] - field_index = _find_field_index(new_fields, field_name) - if field_index is None: - raise ColumnNotExistException(field_name) - field = new_fields[field_index] from pypaimon.schema.data_types import DataTypeParser - new_type_dict = change.new_data_type.to_dict() - new_type = DataTypeParser.parse_data_type(new_type_dict) - if change.keep_nullability: - new_type.nullable = field.type.nullable - new_fields[field_index] = DataField( - field.id, field.name, new_type, field.description, field.default_value - ) + field_names = change.field_names + max_depth = len(field_names) + + def update_func(field: DataField, depth: int) -> DataField: + source_root = _get_root_type(field.type, depth, max_depth) + target_root = DataTypeParser.parse_data_type(change.new_data_type.to_dict()) + if change.keep_nullability: + target_root.nullable = source_root.nullable + if not supports_cast(source_root, target_root): + raise ValueError( + "Column type {}[{}] cannot be converted to {} without losing information." + .format(field.name, source_root, target_root) + ) + new_type = _get_array_map_type_with_target_type_root( + field.type, target_root, depth, max_depth) + return DataField( + field.id, field.name, new_type, field.description, field.default_value + ) + _update_nested_column(new_fields, field_names, update_func) def _drop_column_validation(schema: 'TableSchema', change: DropColumn): @@ -112,17 +222,20 @@ def _drop_column_validation(schema: 'TableSchema', change: DropColumn): def _handle_drop_column(change: DropColumn, new_fields: List[DataField], new_options: dict): - field_name = change.field_names[-1] - field_index = _find_field_index(new_fields, field_name) - if field_index is None: - raise ColumnNotExistException(field_name) - if len(change.field_names) == 1: - field = new_fields[field_index] - type_root = _get_type_root(field.type) - remove_dropped_directive_options(field_name, type_root, new_options) - new_fields.pop(field_index) - if not new_fields: - raise ValueError("Cannot drop all fields in table") + field_names = change.field_names + + def update_last(depth, fields, field_name): + field_index = _find_field_index(fields, field_name) + if field_index is None: + raise ColumnNotExistException(field_name) + if len(field_names) == 1: + field = fields[field_index] + type_root = _get_type_root(field.type) + remove_dropped_directive_options(field_name, type_root, new_options) + fields.pop(field_index) + if not fields: + raise ValueError("Cannot drop all fields in table") + _modify_nested_column(new_fields, field_names, update_last) def _get_type_root(data_type) -> str: @@ -274,17 +387,19 @@ def _validate_blob_external_storage_fields(fields: List[DataField], options: dic def _handle_rename_column(change: RenameColumn, new_fields: List[DataField]): - field_name = change.field_names[-1] new_name = change.new_name - field_index = _find_field_index(new_fields, field_name) - if field_index is None: - raise ColumnNotExistException(field_name) - if _find_field_index(new_fields, new_name) is not None: - raise ColumnAlreadyExistException(new_name) - field = new_fields[field_index] - new_fields[field_index] = DataField( - field.id, new_name, field.type, field.description, field.default_value - ) + + def update_last(depth, fields, field_name): + field_index = _find_field_index(fields, field_name) + if field_index is None: + raise ColumnNotExistException(field_name) + if _find_field_index(fields, new_name) is not None: + raise ColumnAlreadyExistException(new_name) + field = fields[field_index] + fields[field_index] = DataField( + field.id, new_name, field.type, field.description, field.default_value + ) + _modify_nested_column(new_fields, change.field_names, update_last) def _apply_move(fields: List[DataField], new_field: Optional[DataField], move): @@ -332,11 +447,11 @@ def _handle_add_column( f"Column {'.'.join(change.field_names)} cannot specify NOT NULL in the table." ) field_id = highest_field_id.increment_and_get() + # Reassign ids of any nested sub-fields the new column carries (a ROW/ARRAY/ + # MAP type) so they draw globally-unique ids from the running counter. + data_type = reassign_field_id(change.data_type, highest_field_id) field_name = change.field_names[-1] - if _find_field_index(new_fields, field_name) is not None: - raise ColumnAlreadyExistException(field_name) - data_type = change.data_type comment = change.comment converted = apply_add_column_directive(comment, field_name, data_type, new_options) if converted is not None: @@ -349,21 +464,26 @@ def _handle_add_column( comment = converted.comment new_field = DataField(field_id, field_name, data_type, comment) - if change.move: - _apply_move(new_fields, new_field, change.move) - elif ( - add_column_before_partition - and partition_keys - and len(change.field_names) == 1 - ): - insert_index = len(new_fields) - for i, field in enumerate(new_fields): - if field.name in partition_keys: - insert_index = i - break - new_fields.insert(insert_index, new_field) - else: - new_fields.append(new_field) + + def update_last(depth, fields, fname): + if _find_field_index(fields, fname) is not None: + raise ColumnAlreadyExistException(fname) + if change.move: + _apply_move(fields, new_field, change.move) + elif ( + add_column_before_partition + and partition_keys + and len(change.field_names) == 1 + ): + insert_index = len(fields) + for i, field in enumerate(fields): + if field.name in partition_keys: + insert_index = i + break + fields.insert(insert_index, new_field) + else: + fields.append(new_field) + _modify_nested_column(new_fields, change.field_names, update_last) class SchemaManager: diff --git a/paimon-python/pypaimon/schema/table_schema.py b/paimon-python/pypaimon/schema/table_schema.py index 52324ae7f607..d8586b28d139 100644 --- a/paimon-python/pypaimon/schema/table_schema.py +++ b/paimon-python/pypaimon/schema/table_schema.py @@ -23,7 +23,7 @@ from pypaimon.common.options.core_options import CoreOptions from pypaimon.common.file_io import FileIO from pypaimon.common.json_util import json_field -from pypaimon.schema.data_types import DataField +from pypaimon.schema.data_types import DataField, current_highest_field_id from pypaimon.schema.schema import Schema @@ -115,7 +115,7 @@ def from_schema(schema_id: int, schema: Schema) -> "TableSchema": partition_keys: List[str] = schema.partition_keys primary_keys: List[str] = schema.primary_keys options: Dict[str, str] = schema.options - highest_field_id: int = max((field.id for field in fields), default=0) + highest_field_id: int = current_highest_field_id(fields) return TableSchema( TableSchema.CURRENT_VERSION, diff --git a/paimon-python/pypaimon/table/special_fields.py b/paimon-python/pypaimon/table/special_fields.py index 64d2429bef7d..94dbfacc9ad0 100644 --- a/paimon-python/pypaimon/table/special_fields.py +++ b/paimon-python/pypaimon/table/special_fields.py @@ -17,7 +17,7 @@ from typing import List -from ..schema.data_types import AtomicType, DataField +from ..schema.data_types import AtomicType, DataField, SYSTEM_FIELD_ID_START class SpecialFields: @@ -35,11 +35,18 @@ class SpecialFields: '_ROW_ID' } + SYSTEM_FIELD_ID_START = SYSTEM_FIELD_ID_START + @staticmethod def is_system_field(field_name: str) -> bool: """Check if a field is a system field.""" return field_name in SpecialFields.SYSTEM_FIELD_NAMES + @staticmethod + def is_system_field_id(field_id: int) -> bool: + """Check if a field id is reserved for system fields.""" + return field_id >= SYSTEM_FIELD_ID_START + @staticmethod def find_system_fields(read_fields: List[DataField]) -> dict: """Find system fields in read fields and return a mapping of field name to index.""" diff --git a/paimon-python/pypaimon/tests/schema_evolution_nested_read_test.py b/paimon-python/pypaimon/tests/schema_evolution_nested_read_test.py index c9147b823fc1..8a319518be76 100644 --- a/paimon-python/pypaimon/tests/schema_evolution_nested_read_test.py +++ b/paimon-python/pypaimon/tests/schema_evolution_nested_read_test.py @@ -22,11 +22,10 @@ * Whole-column evolution of a top-level struct/array/map column (add / drop / rename / projection) -- aligned by the column's field id. * Sub-field evolution INSIDE a struct (add/rename/update-type/drop a nested - field via a dotted ``field_names`` path) -- this is NOT implemented: - ``schema_manager`` only operates on the top-level ``field_names[-1]``. - ``SchemaEvolutionNestedGapTest`` locks in the current behaviour with - explicit assertions so the gap is documented and any future fix is - noticed. + field via a dotted ``field_names`` path), including sub-fields of a ROW + nested in an ARRAY/MAP. Sub-fields are aligned by field id, so a rename + follows the data, an added sub-field reads NULL for old rows, a dropped one + is not revived, and a type change is cast at read time. """ import os @@ -37,7 +36,12 @@ import pyarrow as pa from pypaimon import CatalogFactory, Schema -from pypaimon.schema.data_types import AtomicType, PyarrowFieldParser +from pypaimon.casting.data_type_casts import supports_cast +from pypaimon.schema.data_types import (AtomicInteger, AtomicType, DataField, + PyarrowFieldParser, RowType, + collect_field_ids, + current_highest_field_id, + reassign_field_id) from pypaimon.schema.schema_change import SchemaChange @@ -240,18 +244,12 @@ def test_rename_struct_column(self): {'id': 2, 'mv2': {'latest_version': 200, 'latest_value': 'b'}, 'val': 'y'}]) -class SchemaEvolutionNestedGapTest(_NestedBase): - """Sub-field-level evolution inside a struct is NOT implemented. +class SchemaEvolutionNestedSubfieldTest(_NestedBase): + """Sub-field evolution inside a struct, aligned by field id.""" - schema_manager handles only the top-level ``field_names[-1]``; a dotted - path like ``['mv', 'latest_value']`` never recurses into the RowType. - These tests assert the current behaviour (silent top-level mutation, or - ColumnNotExistException) so the gap is documented. - """ - - def _create_struct_table(self, name): + def _create_struct_table(self, name, primary_keys=None, bucket='-1'): s0 = pa.schema([('id', pa.int64()), ('mv', _MV_PA), ('val', pa.string())]) - table = self._create(name, s0) + table = self._create(name, s0, primary_keys=primary_keys, bucket=bucket) self._write(table, pa.Table.from_pylist([ {'id': 1, 'mv': {'latest_version': 100, 'latest_value': 'a'}, 'val': 'x'}, ], schema=s0)) @@ -268,49 +266,198 @@ def _top_level_names(self, table_name): 'default.{}'.format(table_name)).table_schema return [f.name for f in schema.fields] - # -- C7: add nested sub-field -> silently adds a TOP-LEVEL column ---- + def test_add_subfield_goes_inside_struct_and_pads_null(self): + table = self._create_struct_table('nsub_add') + self.catalog.alter_table( + 'default.nsub_add', + [SchemaChange.add_column(['mv', 'score'], AtomicType('INT'))], False) + # The sub-field lands inside mv, not as a stray top-level column. + self.assertEqual(self._mv_subfield_names('nsub_add'), + ['latest_version', 'latest_value', 'score']) + self.assertNotIn('score', self._top_level_names('nsub_add')) + + table = self.catalog.get_table('default.nsub_add') + s1 = pa.schema([ + ('id', pa.int64()), + ('mv', pa.struct([('latest_version', pa.int64()), + ('latest_value', pa.string()), + ('score', pa.int32())])), + ('val', pa.string())]) + self._write(table, pa.Table.from_pylist([ + {'id': 2, 'mv': {'latest_version': 200, 'latest_value': 'b', 'score': 7}, + 'val': 'y'}], schema=s1)) + rows = self._read_sorted(table) + # Old row reads NULL for the added sub-field; new row carries it. + self.assertEqual(rows[0]['mv'], + {'latest_version': 100, 'latest_value': 'a', 'score': None}) + self.assertEqual(rows[1]['mv'], + {'latest_version': 200, 'latest_value': 'b', 'score': 7}) + + def test_rename_subfield_follows_field_id(self): + table = self._create_struct_table('nsub_rename') + self.catalog.alter_table( + 'default.nsub_rename', + [SchemaChange.rename_column(['mv', 'latest_value'], 'lv')], False) + self.assertEqual(self._mv_subfield_names('nsub_rename'), + ['latest_version', 'lv']) + table = self.catalog.get_table('default.nsub_rename') + rows = self._read_sorted(table) + # Old data follows the renamed sub-field by id, not by name. + self.assertEqual(rows[0]['mv'], {'latest_version': 100, 'lv': 'a'}) - def test_nested_add_subfield_mutates_top_level(self): - # GAP: add_column(['mv','new_inner']) does NOT add new_inner inside - # mv; it silently appends a top-level column 'new_inner' instead. - self._create_struct_table('gap_add') + def test_update_subfield_type_casts(self): + s0 = pa.schema([('id', pa.int64()), + ('mv', pa.struct([('v', pa.int32()), ('s', pa.string())]))]) + table = self._create('nsub_type', s0) + self._write(table, pa.Table.from_pylist( + [{'id': 1, 'mv': {'v': 10, 's': 'a'}}], schema=s0)) self.catalog.alter_table( - 'default.gap_add', - [SchemaChange.add_column(['mv', 'new_inner'], AtomicType('INT'))], - False) - # mv's sub-fields are unchanged; a stray top-level column appeared. - self.assertEqual(self._mv_subfield_names('gap_add'), - ['latest_version', 'latest_value']) - self.assertIn('new_inner', self._top_level_names('gap_add')) - - # -- C8/C9/C10: rename / update-type / drop nested sub-field --------- - - def test_nested_rename_subfield_raises(self): - # GAP: field_names[-1]='latest_value' is looked up at the TOP level, - # where it does not exist -> ColumnNotExistException (wrapped). - self._create_struct_table('gap_rename') - with self.assertRaises(RuntimeError) as cm: + 'default.nsub_type', + [SchemaChange.update_column_type(['mv', 'v'], AtomicType('BIGINT'))], False) + table = self.catalog.get_table('default.nsub_type') + rb = table.new_read_builder() + splits = rb.new_scan().plan().splits() + arrow = rb.new_read().to_arrow(splits) + self.assertEqual(arrow.schema.field('mv').type.field('v').type, pa.int64()) + self.assertEqual(arrow.to_pylist()[0]['mv'], {'v': 10, 's': 'a'}) + + def test_drop_subfield_not_revived(self): + table = self._create_struct_table('nsub_drop') + self.catalog.alter_table( + 'default.nsub_drop', + [SchemaChange.drop_column(['mv', 'latest_value'])], False) + self.assertEqual(self._mv_subfield_names('nsub_drop'), ['latest_version']) + table = self.catalog.get_table('default.nsub_drop') + rows = self._read_sorted(table) + # The dropped sub-field's old data is gone, not revived under its id. + self.assertEqual(rows[0]['mv'], {'latest_version': 100}) + + def test_drop_all_subfields_rejected(self): + self._create_struct_table('nsub_dropall') + self.catalog.alter_table( + 'default.nsub_dropall', + [SchemaChange.drop_column(['mv', 'latest_value'])], False) + with self.assertRaises(RuntimeError): self.catalog.alter_table( - 'default.gap_rename', - [SchemaChange.rename_column(['mv', 'latest_value'], 'lv')], False) - self.assertIn('latest_value', str(cm.exception)) + 'default.nsub_dropall', + [SchemaChange.drop_column(['mv', 'latest_version'])], False) - def test_nested_update_subfield_type_raises(self): - self._create_struct_table('gap_update') + def test_unsupported_subfield_cast_rejected(self): + self._create_struct_table('nsub_badcast') with self.assertRaises(RuntimeError) as cm: self.catalog.alter_table( - 'default.gap_update', + 'default.nsub_badcast', [SchemaChange.update_column_type( - ['mv', 'latest_version'], AtomicType('BIGINT'))], False) - self.assertIn('latest_version', str(cm.exception)) + ['mv', 'latest_version'], AtomicType('DATE'))], False) + self.assertIn('cannot be converted', str(cm.exception)) - def test_nested_drop_subfield_raises(self): - self._create_struct_table('gap_drop') - with self.assertRaises(RuntimeError) as cm: - self.catalog.alter_table( - 'default.gap_drop', - [SchemaChange.drop_column(['mv', 'latest_value'])], False) - self.assertIn('latest_value', str(cm.exception)) + def test_pk_nested_subfield_evolution_merge(self): + s0 = pa.schema([('id', pa.int64()), ('mv', _MV_PA)]) + table = self._create('nsub_pk', s0, primary_keys=['id'], bucket='1') + self._write(table, pa.Table.from_pylist( + [{'id': 1, 'mv': {'latest_version': 1, 'latest_value': 'a'}}], schema=s0)) + self.catalog.alter_table( + 'default.nsub_pk', + [SchemaChange.add_column(['mv', 'score'], AtomicType('INT'))], False) + table = self.catalog.get_table('default.nsub_pk') + s1 = pa.schema([('id', pa.int64()), + ('mv', pa.struct([('latest_version', pa.int64()), + ('latest_value', pa.string()), + ('score', pa.int32())]))]) + self._write(table, pa.Table.from_pylist( + [{'id': 1, 'mv': {'latest_version': 2, 'latest_value': 'b', 'score': 9}}], + schema=s1)) + rows = self._read_sorted(table) + self.assertEqual(len(rows), 1) + self.assertEqual(rows[0]['mv'], + {'latest_version': 2, 'latest_value': 'b', 'score': 9}) + + +class SchemaEvolutionNestedContainerTest(_NestedBase): + """Sub-field evolution of a ROW nested inside an ARRAY / MAP.""" + + def test_array_of_row_add_and_rename_subfield(self): + elem = pa.struct([('a', pa.int64()), ('b', pa.string())]) + s0 = pa.schema([('id', pa.int64()), ('arr', pa.list_(elem))]) + table = self._create('narr', s0) + self._write(table, pa.Table.from_pylist( + [{'id': 1, 'arr': [{'a': 1, 'b': 'x'}, {'a': 2, 'b': 'y'}]}], schema=s0)) + # Descend through the array element into the ROW. + self.catalog.alter_table('default.narr', [ + SchemaChange.add_column(['arr', 'element', 'c'], AtomicType('INT')), + SchemaChange.rename_column(['arr', 'element', 'b'], 'bb'), + ], False) + table = self.catalog.get_table('default.narr') + rows = self._read_sorted(table) + self.assertEqual(rows[0]['arr'], + [{'a': 1, 'bb': 'x', 'c': None}, + {'a': 2, 'bb': 'y', 'c': None}]) + + def test_map_of_row_add_subfield(self): + val = pa.struct([('a', pa.int64()), ('b', pa.string())]) + s0 = pa.schema([('id', pa.int64()), ('m', pa.map_(pa.string(), val))]) + table = self._create('nmap', s0) + self._write(table, pa.Table.from_pylist( + [{'id': 1, 'm': [('k', {'a': 1, 'b': 'x'})]}], schema=s0)) + # Descend through the map value into the ROW. + self.catalog.alter_table( + 'default.nmap', + [SchemaChange.add_column(['m', 'value', 'c'], AtomicType('INT'))], False) + table = self.catalog.get_table('default.nmap') + rows = self._read_sorted(table) + self.assertEqual(rows[0]['m'], [('k', {'a': 1, 'b': 'x', 'c': None})]) + + +class NestedFieldIdModelTest(unittest.TestCase): + """Globally-unique nested field ids, mirrored from the engine id model.""" + + def test_nested_ids_are_globally_unique(self): + s = pa.schema([('id', pa.int64()), ('mv', _MV_PA), ('x', pa.string())]) + fields = PyarrowFieldParser.to_paimon_schema(s) + ids = set() + for f in fields: + ids.add(f.id) + collect_field_ids(f.type, ids) + # id(0), mv(1), latest_version(2), latest_value(3), x(4) + self.assertEqual(ids, {0, 1, 2, 3, 4}) + self.assertEqual(current_highest_field_id(fields), 4) + + def test_flat_schema_ids_unchanged(self): + fields = PyarrowFieldParser.to_paimon_schema( + pa.schema([('a', pa.int64()), ('b', pa.string()), ('c', pa.int32())])) + self.assertEqual([f.id for f in fields], [0, 1, 2]) + + def test_reassign_field_id_depth_first_order(self): + inner = RowType(True, [DataField(0, 'c', AtomicType('INT'))]) + mid = RowType(True, [DataField(0, 'b', inner)]) + outer = RowType(True, [DataField(0, 'a', mid), + DataField(0, 'd', AtomicType('INT'))]) + result = reassign_field_id(outer, AtomicInteger(2)) + ids = set() + collect_field_ids(result, ids) + self.assertEqual(ids, {3, 4, 5, 6}) + + def test_duplicate_field_id_raises(self): + bad = [ + DataField(0, 'a', AtomicType('INT')), + DataField(1, 'b', RowType(True, [DataField(0, 'c', AtomicType('INT'))])), + ] + with self.assertRaises(ValueError): + current_highest_field_id(bad) + + +class SupportsCastTest(unittest.TestCase): + + def test_supported_casts(self): + for src, dst in [('INT', 'BIGINT'), ('FLOAT', 'DOUBLE'), ('INT', 'STRING'), + ('DOUBLE', 'INT'), ('DECIMAL(10, 4)', 'DECIMAL(10, 2)')]: + self.assertTrue(supports_cast(AtomicType(src), AtomicType(dst)), + '{} -> {}'.format(src, dst)) + + def test_unsupported_casts(self): + for src, dst in [('BIGINT', 'DATE'), ('BOOLEAN', 'DATE')]: + self.assertFalse(supports_cast(AtomicType(src), AtomicType(dst)), + '{} -> {}'.format(src, dst)) if __name__ == '__main__': From b4d46ecc378b1a11b191d3cc08177282f41410d2 Mon Sep 17 00:00:00 2001 From: TheR1sing3un Date: Wed, 10 Jun 2026 15:42:02 +0800 Subject: [PATCH 2/5] [python] Align nested-leaf projection with field-id schema evolution Nested-leaf projection on append-only reads pushed the leaf path down by the LATEST name, bypassing the per-file field-id normalization: after a sub-field rename the old file's leaf read NULL, and after a sub-field type change old and new batches carried different types and failed to concatenate. Mirror the merge path instead: widen the projection to the full top-level columns so the field-id normalization applies (rename follows the id, missing sub-fields pad NULL, types are cast), then extract the requested leaf paths back to the user's flat schema - batch-level via NestedLeafBatchReader, or row-level via OuterProjectionRecordReader when a post-read filter is involved. Add regression tests projecting a renamed and a type-changed sub-field across old and new files. --- .../read/reader/nested_leaf_batch_reader.py | 64 +++++++++++++++++++ paimon-python/pypaimon/read/split_read.py | 44 +++++++++++++ paimon-python/pypaimon/read/table_read.py | 17 ++++- .../schema_evolution_nested_read_test.py | 50 +++++++++++++++ 4 files changed, 173 insertions(+), 2 deletions(-) create mode 100644 paimon-python/pypaimon/read/reader/nested_leaf_batch_reader.py diff --git a/paimon-python/pypaimon/read/reader/nested_leaf_batch_reader.py b/paimon-python/pypaimon/read/reader/nested_leaf_batch_reader.py new file mode 100644 index 000000000000..8f042a9229e3 --- /dev/null +++ b/paimon-python/pypaimon/read/reader/nested_leaf_batch_reader.py @@ -0,0 +1,64 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from typing import List, Optional + +import pyarrow as pa +import pyarrow.compute as pc +from pyarrow import RecordBatch + +from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader +from pypaimon.schema.data_types import DataField, PyarrowFieldParser + + +class NestedLeafBatchReader(RecordBatchReader): + """Extract projected nested leaves from batches of full top-level columns. + + The inner reader yields batches carrying the widened top-level columns, + already normalized to the latest schema by field id (renames followed, + missing sub-fields padded NULL, types cast). Each requested name path is + walked through the struct children (a NULL parent propagates to the + leaf), producing the user's flat projected schema. + """ + + def __init__(self, inner: RecordBatchReader, name_paths: List[List[str]], + output_fields: List[DataField]): + if len(name_paths) != len(output_fields): + raise ValueError( + "name_paths length {} does not match output_fields length {}".format( + len(name_paths), len(output_fields))) + self._inner = inner + self._paths = name_paths + self._schema = PyarrowFieldParser.from_paimon_schema(output_fields) + + def read_arrow_batch(self) -> Optional[RecordBatch]: + batch = self._inner.read_arrow_batch() + if batch is None: + return None + arrays = [] + for i, path in enumerate(self._paths): + column = batch.column(path[0]) + for name in path[1:]: + column = pc.struct_field(column, name) + target_type = self._schema.field(i).type + if column.type != target_type: + column = column.cast(target_type, safe=False) + arrays.append(column) + return pa.RecordBatch.from_arrays(arrays, schema=self._schema) + + def close(self) -> None: + self._inner.close() diff --git a/paimon-python/pypaimon/read/split_read.py b/paimon-python/pypaimon/read/split_read.py index 543c85893559..1278ea0a2acf 100644 --- a/paimon-python/pypaimon/read/split_read.py +++ b/paimon-python/pypaimon/read/split_read.py @@ -627,6 +627,33 @@ def _genarate_deletion_file_readers(self): class RawFileSplitRead(SplitRead): + def __init__( + self, + table, + predicate: Optional[Predicate], + read_type: List[DataField], + split: Split, + row_tracking_enabled: bool, + outer_extract_name_paths: Optional[List[List[str]]] = None, + outer_flat_read_type: Optional[List[DataField]] = None, + limit: Optional[int] = None): + # Nested-leaf projection is NOT pushed down by name: a leaf path is + # only valid against the latest schema, while each data file stores + # its own (possibly renamed / retyped) sub-fields. Instead the read + # widens to the full top-level columns, which the per-file field-id + # normalization aligns to the latest schema, and the requested leaf + # paths are extracted afterwards (``outer_extract_name_paths``). + super().__init__( + table=table, + predicate=predicate, + read_type=read_type, + split=split, + row_tracking_enabled=row_tracking_enabled, + nested_name_paths=None, + limit=limit) + self.outer_extract_name_paths = outer_extract_name_paths + self.outer_flat_read_type = outer_flat_read_type + def raw_reader_supplier(self, file: DataFileMeta, dv_factory: Optional[Callable] = None) -> Optional[RecordReader]: read_fields = self._get_final_read_data_fields() # Check if this is a SlicedSplit to get shard_file_idx_map @@ -676,10 +703,27 @@ def create_reader(self) -> RecordReader: # if the table is appendonly table, we don't need extra filter, all predicates has pushed down if self.table.is_primary_key_table and self.predicate_for_reader: reader = FilterRecordReader(concat_reader, self.predicate_for_reader) + if self.outer_extract_name_paths: + # Row-level extraction: the filter evaluates rows in the + # widened top-level coordinate space, so extract after it. + from pypaimon.read.reader.outer_projection_record_reader import \ + OuterProjectionRecordReader + reader = OuterProjectionRecordReader( + reader, [f.name for f in self.read_fields], + self.outer_extract_name_paths, + file_io=self.table.file_io, + blob_field_indices=_blob_field_indices(self.read_fields), + vector_field_indices=_vector_field_indices(self.read_fields)) if self.limit is not None: reader = LimitedRecordReader(reader, self.limit) else: reader = concat_reader + if self.outer_extract_name_paths: + from pypaimon.read.reader.nested_leaf_batch_reader import \ + NestedLeafBatchReader + reader = NestedLeafBatchReader( + reader, self.outer_extract_name_paths, + self.outer_flat_read_type) if self.limit is not None: reader = LimitedRecordBatchReader(reader, self.limit) return reader diff --git a/paimon-python/pypaimon/read/table_read.py b/paimon-python/pypaimon/read/table_read.py index 57e704b2b550..3f16c70573b8 100644 --- a/paimon-python/pypaimon/read/table_read.py +++ b/paimon-python/pypaimon/read/table_read.py @@ -603,13 +603,26 @@ def _create_split_read(self, split: Split) -> SplitRead: limit=self.limit, ) else: + inner_read_type = self.read_type + outer_extract_name_paths: Optional[List[List[str]]] = None + if self.nested_name_paths and any( + len(p) > 1 for p in self.nested_name_paths): + # Mirror the merge path: read the full top-level columns so + # the per-file field-id normalization applies (a leaf path is + # only valid against the latest schema, not each file's own + # names/types), then extract the requested sub-paths back to + # the user's flat schema. + inner_read_type = self._widen_to_top_level_for_merge() + outer_extract_name_paths = self.nested_name_paths return RawFileSplitRead( table=self.table, predicate=self.predicate, - read_type=self.read_type, + read_type=inner_read_type, split=split, row_tracking_enabled=self.table.options.row_tracking_enabled(), - nested_name_paths=self.nested_name_paths, + outer_extract_name_paths=outer_extract_name_paths, + outer_flat_read_type=( + self.read_type if outer_extract_name_paths else None), limit=self.limit, ) diff --git a/paimon-python/pypaimon/tests/schema_evolution_nested_read_test.py b/paimon-python/pypaimon/tests/schema_evolution_nested_read_test.py index 8a319518be76..559281b1b6a9 100644 --- a/paimon-python/pypaimon/tests/schema_evolution_nested_read_test.py +++ b/paimon-python/pypaimon/tests/schema_evolution_nested_read_test.py @@ -351,6 +351,56 @@ def test_unsupported_subfield_cast_rejected(self): ['mv', 'latest_version'], AtomicType('DATE'))], False) self.assertIn('cannot be converted', str(cm.exception)) + def test_nested_projection_after_rename_subfield(self): + # Projecting a renamed leaf must follow the field id into old files, + # not look the new name up in the file's physical schema. + s0 = pa.schema([('id', pa.int64()), + ('mv', pa.struct([('v', pa.int32()), ('s', pa.string())]))]) + table = self._create('nsub_proj_rename', s0) + self._write(table, pa.Table.from_pylist( + [{'id': 1, 'mv': {'v': 10, 's': 'a'}}], schema=s0)) + self.catalog.alter_table( + 'default.nsub_proj_rename', + [SchemaChange.rename_column(['mv', 's'], 'ss')], False) + table = self.catalog.get_table('default.nsub_proj_rename') + s1 = pa.schema([('id', pa.int64()), + ('mv', pa.struct([('v', pa.int32()), ('ss', pa.string())]))]) + self._write(table, pa.Table.from_pylist( + [{'id': 2, 'mv': {'v': 20, 'ss': 'b'}}], schema=s1)) + + rows = self._read_sorted(table, projection=['id', 'mv.ss']) + self.assertEqual(rows, [ + {'id': 1, 'mv_ss': 'a'}, + {'id': 2, 'mv_ss': 'b'}, + ]) + + def test_nested_projection_after_update_subfield_type(self): + # Projecting a type-changed leaf must cast old batches to the latest + # type instead of emitting mixed-type batches. + s0 = pa.schema([('id', pa.int64()), + ('mv', pa.struct([('v', pa.int32()), ('s', pa.string())]))]) + table = self._create('nsub_proj_type', s0) + self._write(table, pa.Table.from_pylist( + [{'id': 1, 'mv': {'v': 10, 's': 'a'}}], schema=s0)) + self.catalog.alter_table( + 'default.nsub_proj_type', + [SchemaChange.update_column_type(['mv', 'v'], AtomicType('BIGINT'))], False) + table = self.catalog.get_table('default.nsub_proj_type') + s1 = pa.schema([('id', pa.int64()), + ('mv', pa.struct([('v', pa.int64()), ('s', pa.string())]))]) + self._write(table, pa.Table.from_pylist( + [{'id': 2, 'mv': {'v': 20, 's': 'b'}}], schema=s1)) + + rb = table.new_read_builder().with_projection(['id', 'mv.v']) + splits = rb.new_scan().plan().splits() + arrow = rb.new_read().to_arrow(splits) + self.assertEqual(arrow.schema.field('mv_v').type, pa.int64()) + rows = sorted(arrow.to_pylist(), key=lambda r: r['id']) + self.assertEqual(rows, [ + {'id': 1, 'mv_v': 10}, + {'id': 2, 'mv_v': 20}, + ]) + def test_pk_nested_subfield_evolution_merge(self): s0 = pa.schema([('id', pa.int64()), ('mv', _MV_PA)]) table = self._create('nsub_pk', s0, primary_keys=['id'], bucket='1') From 824faae35a953ec5d75ee53fbc63296dc0d20ff9 Mon Sep 17 00:00:00 2001 From: TheR1sing3un Date: Thu, 11 Jun 2026 10:52:26 +0800 Subject: [PATCH 3/5] [python] Materialize constructed-type to STRING casts at read time update_column_type from ROW/ARRAY/MAP to STRING passes validation (the cast rules allow constructed types to character strings), but reading an old file failed with ArrowNotImplementedError because struct/list/map cannot be cast to utf8 directly. Render the string form during per-file alignment instead, matching the engine's cast rules: ROW as '{v1, v2}', ARRAY as '[e1, e2]', MAP as '{k1 -> v1, k2 -> v2}', with sub-values rendered recursively, NULL sub-values as the literal 'null', and NULL containers staying NULL. Add round-trip tests for ROW/ARRAY/MAP to STRING, NULL semantics, and a nested sub-field changed to STRING. --- .../read/reader/data_file_batch_reader.py | 73 ++++++++++++++++- .../schema_evolution_nested_read_test.py | 82 +++++++++++++++++++ 2 files changed, 153 insertions(+), 2 deletions(-) diff --git a/paimon-python/pypaimon/read/reader/data_file_batch_reader.py b/paimon-python/pypaimon/read/reader/data_file_batch_reader.py index 03d8ce6d6827..109e9a8963a4 100644 --- a/paimon-python/pypaimon/read/reader/data_file_batch_reader.py +++ b/paimon-python/pypaimon/read/reader/data_file_batch_reader.py @@ -25,11 +25,74 @@ from pypaimon.read.partition_info import PartitionInfo from pypaimon.read.reader.format_blob_reader import FormatBlobReader from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader -from pypaimon.schema.data_types import (ArrayType, DataField, MapType, - PyarrowFieldParser, RowType) +from pypaimon.schema.data_types import (ArrayType, AtomicType, DataField, + MapType, PyarrowFieldParser, RowType) from pypaimon.table.special_fields import SpecialFields +def _is_character_string_type(data_type) -> bool: + if not isinstance(data_type, AtomicType): + return False + t = data_type.type.upper() + return t == 'STRING' or t.startswith('VARCHAR') or t.startswith('CHAR') + + +def _to_string_values(array, data_type) -> list: + """Render *array* as a list of per-row strings (None for NULL rows).""" + if isinstance(data_type, (RowType, ArrayType, MapType)): + return _constructed_to_string_array(array, data_type).to_pylist() + return array.cast(pa.string(), safe=False).to_pylist() + + +def _constructed_to_string_array(array, file_type): + """Render a struct/list/map array in the engine's string form: + ROW -> ``{v1, v2}``, ARRAY -> ``[e1, e2]``, MAP -> ``{k1 -> v1, k2 -> v2}``. + Sub-values are rendered recursively; a NULL sub-value renders as the + literal ``null`` while a NULL container row stays NULL.""" + valid = pc.is_valid(array).to_pylist() + out = [] + if isinstance(file_type, RowType): + children = [ + _to_string_values(array.field(i), sub.type) + for i, sub in enumerate(file_type.fields) + ] + for i in range(len(array)): + if not valid[i]: + out.append(None) + continue + vals = [c[i] if c[i] is not None else 'null' for c in children] + out.append('{' + ', '.join(vals) + '}') + elif isinstance(file_type, ArrayType): + values = _to_string_values(array.values, file_type.element) + offsets = array.offsets.to_pylist() + for i in range(len(array)): + if not valid[i]: + out.append(None) + continue + elems = [v if v is not None else 'null' + for v in values[offsets[i]:offsets[i + 1]]] + out.append('[' + ', '.join(elems) + ']') + elif isinstance(file_type, MapType): + keys = _to_string_values(array.keys, file_type.key) + items = _to_string_values(array.items, file_type.value) + offsets = array.offsets.to_pylist() + for i in range(len(array)): + if not valid[i]: + out.append(None) + continue + entries = [ + '{} -> {}'.format( + keys[j] if keys[j] is not None else 'null', + items[j] if items[j] is not None else 'null') + for j in range(offsets[i], offsets[i + 1]) + ] + out.append('{' + ', '.join(entries) + '}') + else: + raise ValueError( + 'Unsupported constructed type for string rendering: {}'.format(file_type)) + return pa.array(out, type=pa.string()) + + class DataFileBatchReader(RecordBatchReader): """ Reads record batch from files of different formats @@ -147,6 +210,12 @@ def _align_array_by_id(self, array, file_type, target_type): fields=[target_pa.key_field, target_pa.item_field]) return pa.Array.from_buffers( target_pa, len(array), array.buffers()[:2], children=[entries]) + # A constructed type changed to a character string: pyarrow cannot + # cast struct/list/map to utf8 directly, so render the engine's + # string form instead. + if (isinstance(file_type, (RowType, ArrayType, MapType)) + and _is_character_string_type(target_type)): + return _constructed_to_string_array(array, file_type) # Leaf / non-nested: cast to the target type when it differs. target_pa_type = PyarrowFieldParser.from_paimon_type(target_type) if array.type != target_pa_type: diff --git a/paimon-python/pypaimon/tests/schema_evolution_nested_read_test.py b/paimon-python/pypaimon/tests/schema_evolution_nested_read_test.py index 559281b1b6a9..66f2f1ef8acc 100644 --- a/paimon-python/pypaimon/tests/schema_evolution_nested_read_test.py +++ b/paimon-python/pypaimon/tests/schema_evolution_nested_read_test.py @@ -458,6 +458,88 @@ def test_map_of_row_add_subfield(self): self.assertEqual(rows[0]['m'], [('k', {'a': 1, 'b': 'x', 'c': None})]) +class SchemaEvolutionConstructedToStringTest(_NestedBase): + """update column type from ROW/ARRAY/MAP to STRING: old files must be + materialized as the engine's string rendering at read time.""" + + def test_row_to_string(self): + s0 = pa.schema([('id', pa.int64()), + ('mv', pa.struct([('a', pa.int32()), ('b', pa.string())]))]) + table = self._create('c2s_row', s0) + self._write(table, pa.Table.from_pylist( + [{'id': 1, 'mv': {'a': 1, 'b': 'x'}}], schema=s0)) + self.catalog.alter_table( + 'default.c2s_row', + [SchemaChange.update_column_type('mv', AtomicType('STRING'))], False) + table = self.catalog.get_table('default.c2s_row') + s1 = pa.schema([('id', pa.int64()), ('mv', pa.string())]) + self._write(table, pa.Table.from_pylist( + [{'id': 2, 'mv': 's2'}], schema=s1)) + + rows = self._read_sorted(table) + self.assertEqual(rows, [ + {'id': 1, 'mv': '{1, x}'}, + {'id': 2, 'mv': 's2'}, + ]) + + def test_array_to_string(self): + s0 = pa.schema([('id', pa.int64()), ('arr', pa.list_(pa.int32()))]) + table = self._create('c2s_arr', s0) + self._write(table, pa.Table.from_pylist( + [{'id': 1, 'arr': [1, 2, 3]}], schema=s0)) + self.catalog.alter_table( + 'default.c2s_arr', + [SchemaChange.update_column_type('arr', AtomicType('STRING'))], False) + table = self.catalog.get_table('default.c2s_arr') + rows = self._read_sorted(table) + self.assertEqual(rows[0]['arr'], '[1, 2, 3]') + + def test_map_to_string(self): + s0 = pa.schema([('id', pa.int64()), + ('m', pa.map_(pa.string(), pa.int32()))]) + table = self._create('c2s_map', s0) + self._write(table, pa.Table.from_pylist( + [{'id': 1, 'm': [('k', 7)]}], schema=s0)) + self.catalog.alter_table( + 'default.c2s_map', + [SchemaChange.update_column_type('m', AtomicType('STRING'))], False) + table = self.catalog.get_table('default.c2s_map') + rows = self._read_sorted(table) + self.assertEqual(rows[0]['m'], '{k -> 7}') + + def test_row_to_string_null_semantics(self): + s0 = pa.schema([('id', pa.int64()), + ('mv', pa.struct([('a', pa.int32()), ('b', pa.string())]))]) + table = self._create('c2s_null', s0) + self._write(table, pa.Table.from_pylist([ + {'id': 1, 'mv': None}, + {'id': 2, 'mv': {'a': None, 'b': 'x'}}, + ], schema=s0)) + self.catalog.alter_table( + 'default.c2s_null', + [SchemaChange.update_column_type('mv', AtomicType('STRING'))], False) + table = self.catalog.get_table('default.c2s_null') + rows = self._read_sorted(table) + # A NULL container stays NULL; a NULL sub-value renders as 'null'. + self.assertIsNone(rows[0]['mv']) + self.assertEqual(rows[1]['mv'], '{null, x}') + + def test_nested_subfield_row_to_string(self): + inner = pa.struct([('a', pa.int32())]) + s0 = pa.schema([('id', pa.int64()), + ('mv', pa.struct([('inner', inner)]))]) + table = self._create('c2s_nested', s0) + self._write(table, pa.Table.from_pylist( + [{'id': 1, 'mv': {'inner': {'a': 1}}}], schema=s0)) + self.catalog.alter_table( + 'default.c2s_nested', + [SchemaChange.update_column_type(['mv', 'inner'], AtomicType('STRING'))], + False) + table = self.catalog.get_table('default.c2s_nested') + rows = self._read_sorted(table) + self.assertEqual(rows[0]['mv'], {'inner': '{1}'}) + + class NestedFieldIdModelTest(unittest.TestCase): """Globally-unique nested field ids, mirrored from the engine id model.""" From 2ffdce30b6b64408236c4d993939fc433c9bf4bb Mon Sep 17 00:00:00 2001 From: TheR1sing3un Date: Thu, 11 Jun 2026 13:59:41 +0800 Subject: [PATCH 4/5] [python] Reject vector-to-string casts and validate nested wrapper tokens Two follow-ups on the nested schema-evolution path: - update_column_type from VECTOR (or MULTISET) to STRING passed validation but old files failed on read: there is no string rendering for them. Narrow the cast rule so only ROW/ARRAY/MAP - the constructed types the read path can render - are accepted as string sources. - The nested path walker consumed the ARRAY/MAP wrapper token by position without checking it, so an invalid path like ['arr', 'wrong', 'c'] was accepted and mutated the schema exactly like ['arr', 'element', 'c']. Require 'element' for arrays and 'value' for maps before descending. Add tests for the rejected vector alter (the column still reads), the narrowed cast rules, and wrong wrapper tokens on ARRAY / MAP<.,ROW>. --- .../pypaimon/casting/data_type_casts.py | 9 ++- .../pypaimon/schema/schema_manager.py | 27 +++++-- .../schema_evolution_nested_read_test.py | 70 ++++++++++++++++++- 3 files changed, 96 insertions(+), 10 deletions(-) diff --git a/paimon-python/pypaimon/casting/data_type_casts.py b/paimon-python/pypaimon/casting/data_type_casts.py index fc51c6eff855..0a53726ecde1 100644 --- a/paimon-python/pypaimon/casting/data_type_casts.py +++ b/paimon-python/pypaimon/casting/data_type_casts.py @@ -70,6 +70,11 @@ DATE, TIME, TIMESTAMP, TIMESTAMP_LTZ, } CONSTRUCTED = {ARRAY, MAP, MULTISET, ROW, VECTOR} +# Constructed types the read path can render as a character string +# ('{v1, v2}' / '[e1, e2]' / '{k -> v}'). VECTOR and MULTISET have no string +# rendering, so a type change from them to CHAR/VARCHAR is rejected here +# rather than failing when an old file is read. +STRING_RENDERABLE_CONSTRUCTED = {ARRAY, MAP, ROW} def _root(data_type) -> str: @@ -124,8 +129,8 @@ def rule(target, implicit_from=None, explicit_from=None): implicit[target] |= set(implicit_from or set()) explicit[target] |= set(explicit_from or set()) - rule(CHAR, {CHAR}, PREDEFINED | CONSTRUCTED) - rule(VARCHAR, CHARACTER_STRING, PREDEFINED | CONSTRUCTED) + rule(CHAR, {CHAR}, PREDEFINED | STRING_RENDERABLE_CONSTRUCTED) + rule(VARCHAR, CHARACTER_STRING, PREDEFINED | STRING_RENDERABLE_CONSTRUCTED) rule(BOOLEAN, {BOOLEAN}, CHARACTER_STRING | INTEGER_NUMERIC) rule(BINARY, {BINARY}, CHARACTER_STRING | {VARBINARY}) rule(VARBINARY, BINARY_STRING, CHARACTER_STRING | {BINARY}) diff --git a/paimon-python/pypaimon/schema/schema_manager.py b/paimon-python/pypaimon/schema/schema_manager.py index 99ebc1341bd9..a975d0813b28 100644 --- a/paimon-python/pypaimon/schema/schema_manager.py +++ b/paimon-python/pypaimon/schema/schema_manager.py @@ -46,22 +46,36 @@ def _find_field_index(fields: List[DataField], field_name: str) -> Optional[int] return None -def _extract_row_data_fields(data_type, out_fields: List[DataField]) -> int: +def _extract_row_data_fields(data_type, out_fields: List[DataField], + field_names: List[str], token_pos: int) -> int: """Collect the immediate sub-fields reachable from *data_type* into *out_fields* and return the path depth consumed. A ROW contributes its fields (depth 1); an ARRAY/MAP is transparent and descends into its - element/value (consuming the ``element``/``value`` path token); anything - else contributes nothing (depth 1).""" + element/value, consuming the ``element``/``value`` path token -- the + consumed token is validated so an unknown step cannot silently mutate + the schema; anything else contributes nothing (depth 1).""" if isinstance(data_type, RowType): out_fields.extend(data_type.fields) return 1 if isinstance(data_type, ArrayType): - return _extract_row_data_fields(data_type.element, out_fields) + 1 + _assert_wrapper_token(field_names, token_pos, 'element') + return _extract_row_data_fields( + data_type.element, out_fields, field_names, token_pos + 1) + 1 if isinstance(data_type, MapType): - return _extract_row_data_fields(data_type.value, out_fields) + 1 + _assert_wrapper_token(field_names, token_pos, 'value') + return _extract_row_data_fields( + data_type.value, out_fields, field_names, token_pos + 1) + 1 return 1 +def _assert_wrapper_token(field_names: List[str], token_pos: int, expected: str): + # A path that ends inside the wrappers (token_pos out of range) is the + # update-the-wrapped-type-itself case, handled by the caller's overflow + # branch; only a present-but-wrong token is rejected. + if token_pos < len(field_names) and field_names[token_pos] != expected: + raise ColumnNotExistException('.'.join(field_names)) + + def _wrap_new_row_type(data_type, nested_fields: List[DataField]): """Rebuild *data_type* substituting *nested_fields* at its innermost ROW, preserving any ARRAY/MAP wrappers.""" @@ -123,7 +137,8 @@ def _update_intermediate_column(new_fields, previous_fields, depth, prev_depth, if field.name != field_names[depth]: continue nested_fields: List[DataField] = [] - new_depth = depth + _extract_row_data_fields(field.type, nested_fields) + new_depth = depth + _extract_row_data_fields( + field.type, nested_fields, field_names, depth + 1) _update_intermediate_column( nested_fields, new_fields, new_depth, depth, field_names, update_last_fn) field = new_fields[i] diff --git a/paimon-python/pypaimon/tests/schema_evolution_nested_read_test.py b/paimon-python/pypaimon/tests/schema_evolution_nested_read_test.py index 66f2f1ef8acc..eb4c0f467d94 100644 --- a/paimon-python/pypaimon/tests/schema_evolution_nested_read_test.py +++ b/paimon-python/pypaimon/tests/schema_evolution_nested_read_test.py @@ -37,9 +37,10 @@ from pypaimon import CatalogFactory, Schema from pypaimon.casting.data_type_casts import supports_cast -from pypaimon.schema.data_types import (AtomicInteger, AtomicType, DataField, +from pypaimon.schema.data_types import (ArrayType, AtomicInteger, AtomicType, + DataField, MapType, MultisetType, PyarrowFieldParser, RowType, - collect_field_ids, + VectorType, collect_field_ids, current_highest_field_id, reassign_field_id) from pypaimon.schema.schema_change import SchemaChange @@ -457,6 +458,40 @@ def test_map_of_row_add_subfield(self): rows = self._read_sorted(table) self.assertEqual(rows[0]['m'], [('k', {'a': 1, 'b': 'x', 'c': None})]) + def test_array_wrapper_token_validated(self): + # The token consumed when descending through an ARRAY must be + # 'element'; an unknown step must not silently mutate the schema. + elem = pa.struct([('a', pa.int64())]) + s0 = pa.schema([('id', pa.int64()), ('arr', pa.list_(elem))]) + self._create('ntok_arr', s0) + with self.assertRaises(RuntimeError) as cm: + self.catalog.alter_table( + 'default.ntok_arr', + [SchemaChange.add_column(['arr', 'wrong', 'c'], AtomicType('INT'))], + False) + self.assertIn('arr.wrong.c', str(cm.exception)) + # The canonical token still works. + self.catalog.alter_table( + 'default.ntok_arr', + [SchemaChange.add_column(['arr', 'element', 'c'], AtomicType('INT'))], + False) + + def test_map_wrapper_token_validated(self): + # The token consumed when descending through a MAP must be 'value'. + val = pa.struct([('a', pa.int64())]) + s0 = pa.schema([('id', pa.int64()), ('m', pa.map_(pa.string(), val))]) + self._create('ntok_map', s0) + with self.assertRaises(RuntimeError) as cm: + self.catalog.alter_table( + 'default.ntok_map', + [SchemaChange.add_column(['m', 'wrong', 'c'], AtomicType('INT'))], + False) + self.assertIn('m.wrong.c', str(cm.exception)) + self.catalog.alter_table( + 'default.ntok_map', + [SchemaChange.add_column(['m', 'value', 'c'], AtomicType('INT'))], + False) + class SchemaEvolutionConstructedToStringTest(_NestedBase): """update column type from ROW/ARRAY/MAP to STRING: old files must be @@ -524,6 +559,24 @@ def test_row_to_string_null_semantics(self): self.assertIsNone(rows[0]['mv']) self.assertEqual(rows[1]['mv'], '{null, x}') + def test_vector_to_string_rejected(self): + # There is no read-time string rendering for vectors, so the type + # change must be rejected at alter time instead of failing on read. + s0 = pa.schema([('id', pa.int64()), + ('embed', pa.list_(pa.float32(), 3))]) + table = self._create('c2s_vec', s0) + self._write(table, pa.Table.from_pylist( + [{'id': 1, 'embed': [1.0, 2.0, 3.0]}], schema=s0)) + with self.assertRaises(RuntimeError) as cm: + self.catalog.alter_table( + 'default.c2s_vec', + [SchemaChange.update_column_type('embed', AtomicType('STRING'))], + False) + self.assertIn('cannot be converted', str(cm.exception)) + # The vector column itself still reads fine. + rows = self._read_sorted(table) + self.assertEqual(rows[0]['embed'], [1.0, 2.0, 3.0]) + def test_nested_subfield_row_to_string(self): inner = pa.struct([('a', pa.int32())]) s0 = pa.schema([('id', pa.int64()), @@ -591,6 +644,19 @@ def test_unsupported_casts(self): self.assertFalse(supports_cast(AtomicType(src), AtomicType(dst)), '{} -> {}'.format(src, dst)) + def test_constructed_to_string(self): + # ROW/ARRAY/MAP have a read-time string rendering; vector and + # multiset do not, so their type change must be rejected. + row = RowType(True, [DataField(0, 'a', AtomicType('INT'))]) + arr = ArrayType(True, AtomicType('INT')) + m = MapType(True, AtomicType('STRING'), AtomicType('INT')) + for src in (row, arr, m): + self.assertTrue(supports_cast(src, AtomicType('STRING')), str(src)) + vec = VectorType(True, AtomicType('FLOAT'), 3) + ms = MultisetType(True, AtomicType('INT')) + for src in (vec, ms): + self.assertFalse(supports_cast(src, AtomicType('STRING')), str(src)) + if __name__ == '__main__': unittest.main() From b3c8c257862f98f1e43cbfe08b773595604e0b6d Mon Sep 17 00:00:00 2001 From: TheR1sing3un Date: Thu, 11 Jun 2026 15:43:38 +0800 Subject: [PATCH 5/5] [python] Harden nested schema evolution: reject reshaping casts, fix sliced arrays, gate null-to-not-null Self-review findings on the nested schema-evolution path: - update_column_type between same-root constructed types (e.g. ROW -> ROW) was accepted: the replacement carried caller-supplied nested ids that corrupt the id model and old rows read all-NULL; a VECTOR length change was accepted but unreadable. Reject non-identical constructed-to-constructed casts - reshaping goes through sub-field / 'element' / 'value' paths, which keep working. - The list/map rebuilds in the alignment and string-rendering paths read offsets/raw buffers directly, which errors on a sliced ListArray and silently misaligns rows on a sliced MapArray; re-materialize sliced inputs first. - Converting a nullable column to NOT NULL was silently accepted; it is now rejected by default and opt-in via 'alter-column-null-to-not-null.disabled' = 'false'. Also add an end-to-end test for the array 'element' type promotion path. --- .../pypaimon/casting/data_type_casts.py | 21 +++- .../read/reader/data_file_batch_reader.py | 15 +++ .../pypaimon/schema/schema_manager.py | 24 ++++- .../schema_evolution_nested_read_test.py | 102 ++++++++++++++++++ 4 files changed, 158 insertions(+), 4 deletions(-) diff --git a/paimon-python/pypaimon/casting/data_type_casts.py b/paimon-python/pypaimon/casting/data_type_casts.py index 0a53726ecde1..fe1f6860ffa2 100644 --- a/paimon-python/pypaimon/casting/data_type_casts.py +++ b/paimon-python/pypaimon/casting/data_type_casts.py @@ -26,8 +26,9 @@ applies the conversion leniently. """ -from pypaimon.schema.data_types import (ArrayType, AtomicType, MapType, - MultisetType, RowType, VectorType) +from pypaimon.schema.data_types import (ArrayType, AtomicType, DataTypeParser, + MapType, MultisetType, RowType, + VectorType) # ---- Type roots -------------------------------------------------------------- @@ -164,9 +165,25 @@ def supports_cast(source_type, target_type, allow_explicit: bool = True) -> bool if source_type.nullable and not target_type.nullable and not allow_explicit: return False if source_root == target_root: + if source_root in CONSTRUCTED: + # A constructed type is only castable to an (ignoring outer + # nullability) identical constructed type. Reshaping is done + # through sub-field / 'element' / 'value' paths instead: a whole + # ROW replacement would carry caller-supplied nested field ids + # that corrupt the id model, and there is no runtime conversion + # between differently-shaped constructed values. + return _equals_ignore_nullable(source_type, target_type) return True if source_root in _IMPLICIT_RULES.get(target_root, set()): return True if allow_explicit and source_root in _EXPLICIT_RULES.get(target_root, set()): return True return False + + +def _equals_ignore_nullable(source_type, target_type) -> bool: + source_copy = DataTypeParser.parse_data_type(source_type.to_dict()) + target_copy = DataTypeParser.parse_data_type(target_type.to_dict()) + source_copy.nullable = True + target_copy.nullable = True + return source_copy == target_copy diff --git a/paimon-python/pypaimon/read/reader/data_file_batch_reader.py b/paimon-python/pypaimon/read/reader/data_file_batch_reader.py index 109e9a8963a4..0183396fe704 100644 --- a/paimon-python/pypaimon/read/reader/data_file_batch_reader.py +++ b/paimon-python/pypaimon/read/reader/data_file_batch_reader.py @@ -37,6 +37,18 @@ def _is_character_string_type(data_type) -> bool: return t == 'STRING' or t.startswith('VARCHAR') or t.startswith('CHAR') +def _unslice(array): + """Re-materialize a sliced array so offsets/buffers start at zero. + + The list/map rebuilds below read ``offsets``/raw buffers directly; on a + sliced array those still point into the parent storage, which either + errors (list rebuild with a null mask) or silently misaligns rows (map + rebuild from raw buffers).""" + if array.offset == 0: + return array + return pa.concat_arrays([array]) + + def _to_string_values(array, data_type) -> list: """Render *array* as a list of per-row strings (None for NULL rows).""" if isinstance(data_type, (RowType, ArrayType, MapType)): @@ -49,6 +61,7 @@ def _constructed_to_string_array(array, file_type): ROW -> ``{v1, v2}``, ARRAY -> ``[e1, e2]``, MAP -> ``{k1 -> v1, k2 -> v2}``. Sub-values are rendered recursively; a NULL sub-value renders as the literal ``null`` while a NULL container row stays NULL.""" + array = _unslice(array) valid = pc.is_valid(array).to_pylist() out = [] if isinstance(file_type, RowType): @@ -194,11 +207,13 @@ def _align_array_by_id(self, array, file_type, target_type): return pa.StructArray.from_arrays( children, fields=pa_fields, mask=pc.is_null(array)) if isinstance(target_type, ArrayType) and isinstance(file_type, ArrayType): + array = _unslice(array) aligned_values = self._align_array_by_id( array.values, file_type.element, target_type.element) return pa.ListArray.from_arrays( array.offsets, aligned_values, mask=pc.is_null(array)) if isinstance(target_type, MapType) and isinstance(file_type, MapType): + array = _unslice(array) aligned_items = self._align_array_by_id( array.items, file_type.value, target_type.value) # MapArray.from_arrays cannot carry a null mask (a null map would diff --git a/paimon-python/pypaimon/schema/schema_manager.py b/paimon-python/pypaimon/schema/schema_manager.py index a975d0813b28..bab26650aa14 100644 --- a/paimon-python/pypaimon/schema/schema_manager.py +++ b/paimon-python/pypaimon/schema/schema_manager.py @@ -181,8 +181,20 @@ def update_func(field: DataField, depth: int) -> DataField: _update_nested_column(new_fields, change.field_names, update_func) +def _assert_nullability_change(old_nullability: bool, new_nullability: bool, + field_name: str, disable_null_to_not_null: bool): + if disable_null_to_not_null and old_nullability and not new_nullability: + raise ValueError( + "Cannot update column type from nullable to non nullable for {}. " + "You can set table configuration option " + "'alter-column-null-to-not-null.disabled' = 'false' " + "to allow converting null columns to not null".format(field_name) + ) + + def _handle_update_column_nullability( - change: UpdateColumnNullability, new_fields: List[DataField] + change: UpdateColumnNullability, new_fields: List[DataField], + disable_null_to_not_null: bool ): from pypaimon.schema.data_types import DataTypeParser field_names = change.field_names @@ -190,6 +202,9 @@ def _handle_update_column_nullability( def update_func(field: DataField, depth: int) -> DataField: source_root = _get_root_type(field.type, depth, max_depth) + _assert_nullability_change( + source_root.nullable, change.new_nullability, + '.'.join(field_names), disable_null_to_not_null) new_root = DataTypeParser.parse_data_type(source_root.to_dict()) new_root.nullable = change.new_nullability new_type = _get_array_map_type_with_target_type_root( @@ -652,6 +667,10 @@ def _generate_table_schema( # Get add_column_before_partition option add_column_before_partition = CoreOptions(Options(old_table_schema.options)).add_column_before_partition() partition_keys = old_table_schema.partition_keys + # Converting a nullable column to NOT NULL is unsafe for existing + # data and is disabled by default; the table option below opts in. + disable_null_to_not_null = str(old_table_schema.options.get( + 'alter-column-null-to-not-null.disabled', 'true')).lower() != 'false' for change in changes: if isinstance(change, SetOption): @@ -688,7 +707,8 @@ def _generate_table_schema( _assert_not_updating_primary_keys( old_table_schema, change.field_names, "change nullability of" ) - _handle_update_column_nullability(change, new_fields) + _handle_update_column_nullability( + change, new_fields, disable_null_to_not_null) elif isinstance(change, UpdateColumnComment): _handle_update_column_comment(change, new_fields) elif isinstance(change, UpdateColumnPosition): diff --git a/paimon-python/pypaimon/tests/schema_evolution_nested_read_test.py b/paimon-python/pypaimon/tests/schema_evolution_nested_read_test.py index eb4c0f467d94..c963f6a1578a 100644 --- a/paimon-python/pypaimon/tests/schema_evolution_nested_read_test.py +++ b/paimon-python/pypaimon/tests/schema_evolution_nested_read_test.py @@ -343,6 +343,31 @@ def test_drop_all_subfields_rejected(self): 'default.nsub_dropall', [SchemaChange.drop_column(['mv', 'latest_version'])], False) + def test_null_to_not_null_disabled_by_default(self): + # Converting nullable -> NOT NULL is unsafe for existing data and is + # rejected unless the table opts in via + # 'alter-column-null-to-not-null.disabled' = 'false'. + self._create_struct_table('nsub_nullability') + with self.assertRaises(RuntimeError) as cm: + self.catalog.alter_table( + 'default.nsub_nullability', + [SchemaChange.update_column_nullability( + ['mv', 'latest_value'], False)], False) + self.assertIn('nullable to non nullable', str(cm.exception)) + # Opt-in makes the same change succeed. + self.catalog.alter_table( + 'default.nsub_nullability', + [SchemaChange.set_option( + 'alter-column-null-to-not-null.disabled', 'false')], False) + self.catalog.alter_table( + 'default.nsub_nullability', + [SchemaChange.update_column_nullability( + ['mv', 'latest_value'], False)], False) + schema = self.catalog.get_table('default.nsub_nullability').table_schema + mv = next(f for f in schema.fields if f.name == 'mv') + lv = next(sf for sf in mv.type.fields if sf.name == 'latest_value') + self.assertFalse(lv.type.nullable) + def test_unsupported_subfield_cast_rejected(self): self._create_struct_table('nsub_badcast') with self.assertRaises(RuntimeError) as cm: @@ -476,6 +501,65 @@ def test_array_wrapper_token_validated(self): [SchemaChange.add_column(['arr', 'element', 'c'], AtomicType('INT'))], False) + def test_array_element_type_update(self): + # The canonical path for promoting an array's element type descends + # through the 'element' token; old files are cast at read time. + s0 = pa.schema([('id', pa.int64()), ('a2', pa.list_(pa.int32()))]) + table = self._create('nelem_type', s0) + self._write(table, pa.Table.from_pylist( + [{'id': 1, 'a2': [1, 2]}], schema=s0)) + self.catalog.alter_table( + 'default.nelem_type', + [SchemaChange.update_column_type(['a2', 'element'], AtomicType('BIGINT'))], + False) + table = self.catalog.get_table('default.nelem_type') + s1 = pa.schema([('id', pa.int64()), ('a2', pa.list_(pa.int64()))]) + self._write(table, pa.Table.from_pylist( + [{'id': 2, 'a2': [3]}], schema=s1)) + rb = table.new_read_builder() + splits = rb.new_scan().plan().splits() + arrow = rb.new_read().to_arrow(splits) + self.assertEqual(arrow.schema.field('a2').type, pa.list_(pa.int64())) + rows = sorted(arrow.to_pylist(), key=lambda r: r['id']) + self.assertEqual(rows, [{'id': 1, 'a2': [1, 2]}, {'id': 2, 'a2': [3]}]) + + def test_whole_struct_type_replacement_rejected(self): + # Replacing a whole ROW type would carry caller-supplied nested ids + # that corrupt the id model; it must be rejected at alter time. + elem = pa.struct([('a', pa.int32()), ('b', pa.string())]) + s0 = pa.schema([('id', pa.int64()), ('mv', elem)]) + self._create('nrow_replace', s0) + new_row = _paimon_type(pa.struct([('a', pa.int64()), ('c', pa.string())])) + with self.assertRaises(RuntimeError) as cm: + self.catalog.alter_table( + 'default.nrow_replace', + [SchemaChange.update_column_type('mv', new_row)], False) + self.assertIn('cannot be converted', str(cm.exception)) + + def test_align_handles_sliced_arrays(self): + # The list/map rebuilds read offsets/raw buffers; a sliced input + # must be re-materialized, not read through stale parent offsets. + from pypaimon.read.reader.data_file_batch_reader import \ + DataFileBatchReader + reader = DataFileBatchReader.__new__(DataFileBatchReader) + sliced_list = pa.array( + [[1, 2], [3], [4, 5, 6], None], type=pa.list_(pa.int32())).slice(1, 3) + out = reader._align_array_by_id( + sliced_list, + ArrayType(True, AtomicType('INT')), + ArrayType(True, AtomicType('BIGINT'))) + self.assertEqual(out.to_pylist(), [[3], [4, 5, 6], None]) + self.assertEqual(out.type, pa.list_(pa.int64())) + + sliced_map = pa.array( + [[('a', 1)], [('b', 2)], None], + type=pa.map_(pa.string(), pa.int32())).slice(1, 2) + out = reader._align_array_by_id( + sliced_map, + MapType(True, AtomicType('STRING'), AtomicType('INT')), + MapType(True, AtomicType('STRING'), AtomicType('BIGINT'))) + self.assertEqual(out.to_pylist(), [[('b', 2)], None]) + def test_map_wrapper_token_validated(self): # The token consumed when descending through a MAP must be 'value'. val = pa.struct([('a', pa.int64())]) @@ -657,6 +741,24 @@ def test_constructed_to_string(self): for src in (vec, ms): self.assertFalse(supports_cast(src, AtomicType('STRING')), str(src)) + def test_constructed_to_differently_shaped_constructed_rejected(self): + # Reshaping a constructed type must go through sub-field / + # 'element' / 'value' paths; a whole-type replacement would carry + # caller-supplied nested ids that corrupt the id model. + self.assertFalse(supports_cast( + RowType(True, [DataField(0, 'a', AtomicType('INT'))]), + RowType(True, [DataField(0, 'a', AtomicType('BIGINT'))]))) + self.assertFalse(supports_cast( + ArrayType(True, AtomicType('INT')), + ArrayType(True, AtomicType('BIGINT')))) + self.assertFalse(supports_cast( + VectorType(True, AtomicType('FLOAT'), 3), + VectorType(True, AtomicType('FLOAT'), 5))) + # Only the outer nullability differing is still an identity cast. + self.assertTrue(supports_cast( + RowType(True, [DataField(2, 'a', AtomicType('INT'))]), + RowType(False, [DataField(2, 'a', AtomicType('INT'))]))) + if __name__ == '__main__': unittest.main()