diff --git a/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexMultiColumnWriter.java b/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexMultiColumnWriter.java new file mode 100644 index 000000000000..58a847b64ca8 --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexMultiColumnWriter.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.globalindex; + +import org.apache.paimon.data.InternalRow; + +import javax.annotation.Nullable; + +/** Index writer for global index that accepts multiple column values per row. */ +public interface GlobalIndexMultiColumnWriter extends GlobalIndexWriter { + + /** + * Write one record's indexed columns at the given relative row id. + * + * @param rowId the record's row id relative to the current shard (0 to rowCnt - 1); a null row + * still advances the row id without indexing a value + * @param row a projected row containing only the indexed columns, whose layout matches the + * fields order passed to {@link GlobalIndexerFactory#create(java.util.List, + * org.apache.paimon.options.Options)} + */ + void write(long rowId, @Nullable InternalRow row); +} diff --git a/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexer.java b/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexer.java index 74d223a60467..5eadf0597f6e 100644 --- a/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexer.java +++ b/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexer.java @@ -37,8 +37,14 @@ GlobalIndexReader createReader( List files, ExecutorService executor); - static GlobalIndexer create(String type, DataField dataField, Options options) { + static GlobalIndexer create(String type, DataField indexField, Options options) { GlobalIndexerFactory globalIndexerFactory = GlobalIndexerFactoryUtils.load(type); - return globalIndexerFactory.create(dataField, options); + return globalIndexerFactory.create(indexField, options); + } + + static GlobalIndexer create( + String type, DataField indexField, List extraFields, Options options) { + GlobalIndexerFactory globalIndexerFactory = GlobalIndexerFactoryUtils.load(type); + return globalIndexerFactory.create(indexField, extraFields, options); } } diff --git a/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexerFactory.java b/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexerFactory.java index 6eabb6d25360..a5aa6e96098c 100644 --- a/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexerFactory.java +++ b/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexerFactory.java @@ -22,10 +22,36 @@ import org.apache.paimon.options.Options; import org.apache.paimon.types.DataField; +import java.util.List; + /** File index factory to construct {@link FileIndexer}. */ public interface GlobalIndexerFactory { String identifier(); - GlobalIndexer create(DataField dataField, Options options); + GlobalIndexer create(DataField indexField, Options options); + + /** + * Whether this index type supports multi-column indexes. A factory that returns {@code true} + * must override {@link #create(DataField, List, Options)} to handle extra columns. + */ + default boolean supportsMultiColumn() { + return false; + } + + /** + * Creates an indexer over a primary column plus optional extra columns. {@code indexField} is + * the primary column; {@code extraFields} holds the remaining columns and is empty for a + * single-column index. + */ + default GlobalIndexer create( + DataField indexField, List extraFields, Options options) { + if (extraFields != null && !extraFields.isEmpty()) { + throw new UnsupportedOperationException( + String.format( + "Index type '%s' does not support multi-column index, got extra columns: %s", + identifier(), extraFields)); + } + return create(indexField, options); + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexBuilderUtils.java b/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexBuilderUtils.java index 085423efa851..39f7fb2b0e2f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexBuilderUtils.java +++ b/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexBuilderUtils.java @@ -24,24 +24,84 @@ import org.apache.paimon.index.GlobalIndexMeta; import org.apache.paimon.index.IndexFileMeta; import org.apache.paimon.index.IndexPathFactory; +import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.options.Options; +import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.types.DataField; import org.apache.paimon.utils.Range; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; /** Utils for global index build. */ public class GlobalIndexBuilderUtils { + private static final Logger LOG = LoggerFactory.getLogger(GlobalIndexBuilderUtils.class); + + public static List toIndexFileMetas( + FileIO fileIO, + IndexPathFactory indexPathFactory, + CoreOptions options, + Range range, + int indexFieldId, + String indexType, + List entries) + throws IOException { + return toIndexFileMetas( + fileIO, indexPathFactory, options, range, indexFieldId, null, indexType, entries); + } + + /** + * Builds the index file metas. The first column in {@code fields} is treated as the primary + * index column (e.g. the first column in {@code CREATE ... INDEX ON (a, b, c)}) and is stored + * as {@code indexFieldId}; the remaining columns go into {@code extraFieldIds}. Callers must + * pass {@code fields} in the intended column order. + */ public static List toIndexFileMetas( + FileIO fileIO, + IndexPathFactory indexPathFactory, + CoreOptions options, + Range range, + List fields, + String indexType, + List entries) + throws IOException { + // The first column is the primary index column and is stored as indexFieldId; the + // remaining columns (if any) go into extraFieldIds. + int indexFieldId = fields.get(0).id(); + int[] extraFieldIds = + fields.size() > 1 + ? fields.subList(1, fields.size()).stream() + .mapToInt(DataField::id) + .toArray() + : null; + return toIndexFileMetas( + fileIO, + indexPathFactory, + options, + range, + indexFieldId, + extraFieldIds, + indexType, + entries); + } + + private static List toIndexFileMetas( FileIO fileIO, IndexPathFactory indexPathFactory, CoreOptions options, Range range, int indexFieldId, + @Nullable int[] extraFieldIds, String indexType, List entries) throws IOException { @@ -50,7 +110,8 @@ public static List toIndexFileMetas( String fileName = entry.fileName(); long fileSize = fileIO.getFileSize(indexPathFactory.toPath(fileName)); GlobalIndexMeta globalIndexMeta = - new GlobalIndexMeta(range.from, range.to, indexFieldId, null, entry.meta()); + new GlobalIndexMeta( + range.from, range.to, indexFieldId, extraFieldIds, entry.meta()); Path externalPathDir = options.globalIndexExternalPath(); String externalPathString = null; @@ -78,6 +139,82 @@ public static GlobalIndexWriter createIndexWriter( return globalIndexer.createWriter(createGlobalIndexFileReadWrite(table)); } + public static GlobalIndexWriter createIndexWriter( + FileStoreTable table, + String indexType, + DataField indexField, + List extraFields, + Options options) + throws IOException { + GlobalIndexer globalIndexer = + GlobalIndexer.create(indexType, indexField, extraFields, options); + return globalIndexer.createWriter(createGlobalIndexFileReadWrite(table)); + } + + /** + * Find the minimum firstRowId among files whose schema does not contain all index columns. + * Files at or beyond this rowId cannot be indexed because the column was added later via ALTER + * TABLE. + * + * @return the boundary rowId, or {@link Long#MAX_VALUE} if all files contain the columns + */ + public static long findMinNonIndexableRowId( + SchemaManager schemaManager, List entries, List indexColumns) { + Map schemaContainsColumns = new HashMap<>(); + long minRowId = Long.MAX_VALUE; + long minSchemaId = -1; + for (ManifestEntry entry : entries) { + long sid = entry.file().schemaId(); + boolean contains = + schemaContainsColumns.computeIfAbsent( + sid, + id -> schemaManager.schema(id).fieldNames().containsAll(indexColumns)); + if (!contains && entry.file().firstRowId() != null) { + long rowId = entry.file().nonNullFirstRowId(); + if (rowId < minRowId) { + minRowId = rowId; + minSchemaId = sid; + } + } + } + if (minRowId != Long.MAX_VALUE) { + List schemaFields = schemaManager.schema(minSchemaId).fieldNames(); + List missingColumns = new ArrayList<>(); + for (String col : indexColumns) { + if (!schemaFields.contains(col)) { + missingColumns.add(col); + } + } + LOG.info( + "Found non-indexable files: schemaId={} missing columns {}, boundaryRowId={}.", + minSchemaId, + missingColumns, + minRowId); + } + return minRowId; + } + + /** Keep only entries whose firstRowId is strictly less than the given boundary. */ + public static List filterEntriesBefore( + List entries, long boundaryRowId) { + if (boundaryRowId == Long.MAX_VALUE) { + return entries; + } + List result = new ArrayList<>(); + for (ManifestEntry entry : entries) { + if (entry.file().firstRowId() != null + && entry.file().nonNullFirstRowId() < boundaryRowId) { + result.add(entry); + } + } + LOG.info( + "Filtered {} files to {} indexable files (boundaryRowId={}).", + entries.size(), + result.size(), + boundaryRowId); + return result; + } + private static GlobalIndexFileReadWrite createGlobalIndexFileReadWrite(FileStoreTable table) { IndexPathFactory indexPathFactory = table.store().pathFactory().globalIndexFileFactory(); return new GlobalIndexFileReadWrite(table.fileIO(), indexPathFactory); diff --git a/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexScanner.java b/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexScanner.java index 975b28183331..13c4adb0533e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexScanner.java +++ b/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexScanner.java @@ -53,6 +53,7 @@ import static org.apache.paimon.CoreOptions.GLOBAL_INDEX_THREAD_NUM; import static org.apache.paimon.predicate.PredicateVisitor.collectFieldNames; import static org.apache.paimon.table.source.snapshot.TimeTravelUtil.tryTravelOrLatest; +import static org.apache.paimon.utils.Preconditions.checkArgument; import static org.apache.paimon.utils.Preconditions.checkNotNull; /** Scanner for shard-based global indexes. */ @@ -74,29 +75,86 @@ public GlobalIndexScanner( GlobalIndexReadThreadPool.getExecutorService(options.get(GLOBAL_INDEX_THREAD_NUM)); this.indexPathFactory = indexPathFactory; GlobalIndexFileReader indexFileReader = meta -> fileIO.newInputStream(meta.filePath()); - Map>>> indexMetas = new HashMap<>(); + Map indexMetas = new HashMap<>(); + Map> extraIndexMetas = new HashMap<>(); for (IndexFileMeta indexFile : indexFiles) { GlobalIndexMeta meta = checkNotNull(indexFile.globalIndexMeta()); - int fieldId = meta.indexFieldId(); String indexType = indexFile.indexType(); - indexMetas - .computeIfAbsent(fieldId, k -> new HashMap<>()) - .computeIfAbsent(indexType, k -> new HashMap<>()) - .computeIfAbsent( - new Range(meta.rowRangeStart(), meta.rowRangeEnd()), - k -> new ArrayList<>()) - .add(indexFile); + Range range = new Range(meta.rowRangeStart(), meta.rowRangeEnd()); + int indexFieldId = meta.indexFieldId(); + List fieldIds = meta.getIndexedFieldIds(); + IndexMetaFileGroup group = indexMetas.get(indexFieldId); + if (group == null) { + group = new IndexMetaFileGroup(indexFieldId, fieldIds); + indexMetas.put(indexFieldId, group); + if (meta.extraFieldIds() != null) { + for (int extra : meta.extraFieldIds()) { + extraIndexMetas.computeIfAbsent(extra, k -> new ArrayList<>()).add(group); + } + } + } else { + checkArgument( + group.fieldIds.equals(fieldIds), + "Primary field %s owns multiple indexes with different columns %s and %s; " + + "a primary column can own at most one index.", + indexFieldId, + group.fieldIds, + fieldIds); + } + group.addFile(indexType, range, indexFile); } IntFunction> readersFunction = - fieldId -> - createReaders( - indexFileReader, - indexMetas.get(fieldId), - rowType.getField(fieldId)); + fId -> { + IndexMetaFileGroup group = indexMetas.get(fId); + if (group != null) { + return createReaders(indexFileReader, group, rowType); + } + List extraGroups = extraIndexMetas.get(fId); + if (extraGroups == null || extraGroups.isEmpty()) { + return Collections.emptyList(); + } + // Union readers from all groups that share this extra column + List allReaders = new ArrayList<>(); + for (IndexMetaFileGroup g : extraGroups) { + allReaders.addAll(createReaders(indexFileReader, g, rowType)); + } + return allReaders; + }; this.globalIndexEvaluator = new GlobalIndexEvaluator(rowType, readersFunction); } + /** All index files of one global index (single- or multi-column), grouped for reading. */ + private static class IndexMetaFileGroup { + + private final int indexFieldId; + private final List fieldIds; + private final Map>> metas = new HashMap<>(); + + IndexMetaFileGroup(int indexFieldId, List fieldIds) { + this.indexFieldId = indexFieldId; + this.fieldIds = fieldIds; + } + + void addFile(String indexType, Range range, IndexFileMeta indexFile) { + metas.computeIfAbsent(indexType, k -> new HashMap<>()) + .computeIfAbsent(range, k -> new ArrayList<>()) + .add(indexFile); + } + + /** The primary index column. */ + DataField indexField(RowType rowType) { + return rowType.getField(indexFieldId); + } + + /** The extra columns beyond the primary one; empty for a single-column index. */ + List extraFields(RowType rowType) { + return fieldIds.subList(1, fieldIds.size()).stream() + .map(rowType::getField) + .collect(Collectors.toList()); + } + } + public static Optional create( FileStoreTable table, Collection indexFiles) { if (indexFiles.isEmpty()) { @@ -127,7 +185,19 @@ public static Optional create( if (globalIndex == null) { return false; } - return filterFieldIds.contains(globalIndex.indexFieldId()); + // Collect indexes whose primary column is filtered, and also multi-column + // indexes that have a filtered column as an extra (used as a fallback). + if (filterFieldIds.contains(globalIndex.indexFieldId())) { + return true; + } + if (globalIndex.extraFieldIds() != null) { + for (int id : globalIndex.extraFieldIds()) { + if (filterFieldIds.contains(id)) { + return true; + } + } + } + return false; }; List indexFiles = @@ -143,19 +213,17 @@ public Optional scan(Predicate predicate) { } private Collection createReaders( - GlobalIndexFileReader indexFileReadWrite, - Map>> indexMetas, - DataField dataField) { - if (indexMetas == null) { - return Collections.emptyList(); - } + GlobalIndexFileReader indexFileReadWrite, IndexMetaFileGroup group, RowType rowType) { + DataField indexField = group.indexField(rowType); + List extraFields = group.extraFields(rowType); Set readers = new HashSet<>(); - for (Map.Entry>> entry : indexMetas.entrySet()) { + for (Map.Entry>> entry : group.metas.entrySet()) { String indexType = entry.getKey(); Map> metas = entry.getValue(); GlobalIndexerFactory globalIndexerFactory = GlobalIndexerFactoryUtils.load(indexType); - GlobalIndexer globalIndexer = globalIndexerFactory.create(dataField, options); + GlobalIndexer globalIndexer = + globalIndexerFactory.create(indexField, extraFields, options); List> futures = new ArrayList<>(metas.size()); for (Map.Entry> rangeMetas : metas.entrySet()) { diff --git a/paimon-core/src/main/java/org/apache/paimon/index/GlobalIndexMeta.java b/paimon-core/src/main/java/org/apache/paimon/index/GlobalIndexMeta.java index c468bbffb3aa..2c9718f70dab 100644 --- a/paimon-core/src/main/java/org/apache/paimon/index/GlobalIndexMeta.java +++ b/paimon-core/src/main/java/org/apache/paimon/index/GlobalIndexMeta.java @@ -27,7 +27,9 @@ import javax.annotation.Nullable; +import java.util.ArrayList; import java.util.Arrays; +import java.util.List; /** Schema for global index. */ public class GlobalIndexMeta { @@ -78,6 +80,15 @@ public int indexFieldId() { return indexFieldId; } + /** + * Whether this index covers more than one column. {@link #indexFieldId} is always the primary + * column; {@link #extraFieldIds} holds the remaining columns and is null/empty for a + * single-column index. + */ + public boolean isMultiColumn() { + return extraFieldIds != null && extraFieldIds.length > 0; + } + @Nullable public int[] extraFieldIds() { return extraFieldIds; @@ -87,4 +98,48 @@ public int[] extraFieldIds() { public byte[] indexMeta() { return indexMeta; } + + /** All indexed field ids in order: the primary {@link #indexFieldId} followed by the rest. */ + public List getIndexedFieldIds() { + List ids = new ArrayList<>(); + ids.add(indexFieldId); + if (extraFieldIds != null) { + for (int id : extraFieldIds) { + ids.add(id); + } + } + return ids; + } + + public List getIndexedFields(RowType rowType) { + List fields = new ArrayList<>(); + for (int id : getIndexedFieldIds()) { + fields.add(rowType.getField(id)); + } + return fields; + } + + /** The primary index column. */ + public DataField getIndexField(RowType rowType) { + return rowType.getField(indexFieldId); + } + + /** The extra columns beyond the primary one; empty for a single-column index. */ + public List getExtraFields(RowType rowType) { + List fields = new ArrayList<>(); + if (extraFieldIds != null) { + for (int id : extraFieldIds) { + fields.add(rowType.getField(id)); + } + } + return fields; + } + + public List getIndexedFieldNames(RowType rowType) { + List names = new ArrayList<>(); + for (int id : getIndexedFieldIds()) { + names.add(rowType.getField(id).name()); + } + return names; + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFileHandler.java b/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFileHandler.java index 3621483197f7..f99278085550 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFileHandler.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFileHandler.java @@ -28,6 +28,7 @@ import javax.annotation.Nullable; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -241,11 +242,13 @@ private void validateRetainedIndexFiles( GlobalIndexMeta addedMeta = added.indexFile().globalIndexMeta(); if (addedMeta == null || retainedMeta.indexFieldId() != addedMeta.indexFieldId() - || !Range.intersect( - retainedMeta.rowRangeStart(), - retainedMeta.rowRangeEnd(), - addedMeta.rowRangeStart(), - addedMeta.rowRangeEnd())) { + || (Arrays.equals( + retainedMeta.extraFieldIds(), addedMeta.extraFieldIds()) + && !Range.intersect( + retainedMeta.rowRangeStart(), + retainedMeta.rowRangeEnd(), + addedMeta.rowRangeStart(), + addedMeta.rowRangeEnd()))) { continue; } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/FullTextReadImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/source/FullTextReadImpl.java index 66e509de8999..19999bd824c6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/FullTextReadImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/FullTextReadImpl.java @@ -78,10 +78,22 @@ public GlobalIndexResult read(List splits) { return GlobalIndexResult.createEmpty(); } - String indexType = splits.get(0).fullTextIndexFiles().get(0).indexType(); - GlobalIndexer globalIndexer = - GlobalIndexerFactoryUtils.load(indexType) - .create(textColumn, table.coreOptions().toConfiguration()); + IndexFileMeta firstFile = splits.get(0).fullTextIndexFiles().get(0); + String indexType = firstFile.indexType(); + GlobalIndexMeta firstMeta = checkNotNull(firstFile.globalIndexMeta()); + GlobalIndexer globalIndexer; + if (firstMeta.isMultiColumn()) { + globalIndexer = + GlobalIndexerFactoryUtils.load(indexType) + .create( + firstMeta.getIndexField(table.rowType()), + firstMeta.getExtraFields(table.rowType()), + table.coreOptions().toConfiguration()); + } else { + globalIndexer = + GlobalIndexerFactoryUtils.load(indexType) + .create(textColumn, table.coreOptions().toConfiguration()); + } IndexPathFactory indexPathFactory = table.store().pathFactory().globalIndexFileFactory(); int parallelism = table.coreOptions().toConfiguration().get(GLOBAL_INDEX_THREAD_NUM); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/VectorReadImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/source/VectorReadImpl.java index 2eae2d48779d..a9cfe896baa2 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/VectorReadImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/VectorReadImpl.java @@ -87,10 +87,22 @@ public GlobalIndexResult read(List splits) { RoaringNavigableMap64 preFilter = preFilter(splits).orElse(null); - String indexType = splits.get(0).vectorIndexFiles().get(0).indexType(); - GlobalIndexer globalIndexer = - GlobalIndexerFactoryUtils.load(indexType) - .create(vectorColumn, table.coreOptions().toConfiguration()); + IndexFileMeta firstFile = splits.get(0).vectorIndexFiles().get(0); + String indexType = firstFile.indexType(); + GlobalIndexMeta firstMeta = checkNotNull(firstFile.globalIndexMeta()); + GlobalIndexer globalIndexer; + if (firstMeta.isMultiColumn()) { + globalIndexer = + GlobalIndexerFactoryUtils.load(indexType) + .create( + firstMeta.getIndexField(table.rowType()), + firstMeta.getExtraFields(table.rowType()), + table.coreOptions().toConfiguration()); + } else { + globalIndexer = + GlobalIndexerFactoryUtils.load(indexType) + .create(vectorColumn, table.coreOptions().toConfiguration()); + } IndexPathFactory indexPathFactory = table.store().pathFactory().globalIndexFileFactory(); int parallelism = table.coreOptions().toConfiguration().get(GLOBAL_INDEX_THREAD_NUM); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/VectorScanImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/source/VectorScanImpl.java index d3db6dd13d37..b59363a3264f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/VectorScanImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/VectorScanImpl.java @@ -82,7 +82,18 @@ public Plan scan() { return false; } int fieldId = globalIndex.indexFieldId(); - return vectorColumn.id() == fieldId || filterFieldIds.contains(fieldId); + if (vectorColumn.id() == fieldId || filterFieldIds.contains(fieldId)) { + return true; + } + int[] extras = globalIndex.extraFieldIds(); + if (extras != null) { + for (int extra : extras) { + if (filterFieldIds.contains(extra)) { + return true; + } + } + } + return false; }; List allIndexFiles = @@ -94,7 +105,7 @@ public Plan scan() { Map> vectorByRange = new HashMap<>(); for (IndexFileMeta indexFile : allIndexFiles) { GlobalIndexMeta meta = checkNotNull(indexFile.globalIndexMeta()); - if (meta.indexFieldId() == vectorColumn.id()) { + if (isPrimaryColumn(meta, vectorColumn.id())) { Range range = new Range(meta.rowRangeStart(), meta.rowRangeEnd()); vectorByRange.computeIfAbsent(range, k -> new ArrayList<>()).add(indexFile); } @@ -111,7 +122,7 @@ public Plan scan() { f -> { GlobalIndexMeta globalIndex = checkNotNull(f.globalIndexMeta()); - if (globalIndex.indexFieldId() == vectorColumn.id()) { + if (isPrimaryColumn(globalIndex, vectorColumn.id())) { return false; } return range.hasIntersection(globalIndex.rowRange()); @@ -122,4 +133,8 @@ public Plan scan() { return () -> splits; } + + private static boolean isPrimaryColumn(GlobalIndexMeta meta, int fieldId) { + return meta.indexFieldId() == fieldId; + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/TableIndexesTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/TableIndexesTable.java index 320257ce1057..9ad88e977b3d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/TableIndexesTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/TableIndexesTable.java @@ -235,8 +235,16 @@ private InternalRow toRow( String indexFieldName = null; if (globalMeta != null) { try { - indexFieldName = logicalRowType.getField(globalMeta.indexFieldId()).name(); - } catch (RuntimeException ignored) { + indexFieldName = + String.join(",", globalMeta.getIndexedFieldNames(logicalRowType)); + } catch (RuntimeException e) { + // Indexed columns may no longer exist in the current schema (e.g. dropped via + // ALTER TABLE); leave the name empty instead of failing the listing. + LOG.debug( + "Failed to resolve indexed field names for index file {} (primary field {}).", + indexManifestEntry.indexFile().fileName(), + globalMeta.indexFieldId(), + e); } } return GenericRow.of( diff --git a/paimon-core/src/test/java/org/apache/paimon/globalindex/GlobalIndexBuilderUtilsTest.java b/paimon-core/src/test/java/org/apache/paimon/globalindex/GlobalIndexBuilderUtilsTest.java new file mode 100644 index 000000000000..67852ae925ff --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/globalindex/GlobalIndexBuilderUtilsTest.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.globalindex; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.paimon.index.IndexFileMeta; +import org.apache.paimon.index.IndexPathFactory; +import org.apache.paimon.options.Options; +import org.apache.paimon.types.ArrayType; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.FloatType; +import org.apache.paimon.types.IntType; +import org.apache.paimon.types.VarCharType; +import org.apache.paimon.utils.Range; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.UUID; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link GlobalIndexBuilderUtils}. */ +class GlobalIndexBuilderUtilsTest { + + @TempDir java.nio.file.Path tempDir; + + private FileIO fileIO; + private IndexPathFactory indexPathFactory; + private CoreOptions coreOptions; + + @BeforeEach + void setUp() { + fileIO = new LocalFileIO(); + Path dir = new Path(tempDir.toString()); + indexPathFactory = + new IndexPathFactory() { + @Override + public Path toPath(String fileName) { + return new Path(dir, fileName); + } + + @Override + public Path newPath() { + return new Path(dir, UUID.randomUUID().toString()); + } + + @Override + public boolean isExternalPath() { + return false; + } + }; + coreOptions = new CoreOptions(new Options().toMap()); + } + + // Test: 2 columns (title + vec), primary column title is indexFieldId, rest in extraFieldIds + @Test + void testToIndexFileMetasMultiColumn() throws IOException { + DataField titleField = new DataField(1, "title", new VarCharType(Integer.MAX_VALUE)); + DataField vecField = new DataField(2, "vec", new ArrayType(new FloatType())); + List fields = Arrays.asList(titleField, vecField); + + List entries = createDummyResultEntries(); + Range range = new Range(0, 99); + + List metas = + GlobalIndexBuilderUtils.toIndexFileMetas( + fileIO, indexPathFactory, coreOptions, range, fields, "test-type", entries); + + assertThat(metas).hasSize(1); + assertThat(metas.get(0).globalIndexMeta().indexFieldId()).isEqualTo(1); + assertThat(metas.get(0).globalIndexMeta().extraFieldIds()).isEqualTo(new int[] {2}); + assertThat(metas.get(0).globalIndexMeta().rowRangeStart()).isEqualTo(0); + assertThat(metas.get(0).globalIndexMeta().rowRangeEnd()).isEqualTo(99); + } + + // Test: single column, extraFieldIds should be null (backward compatible with single-column + // path) + @Test + void testToIndexFileMetasSingleColumn() throws IOException { + DataField titleField = new DataField(1, "title", new VarCharType(Integer.MAX_VALUE)); + List fields = Collections.singletonList(titleField); + + List entries = createDummyResultEntries(); + Range range = new Range(0, 49); + + List metas = + GlobalIndexBuilderUtils.toIndexFileMetas( + fileIO, indexPathFactory, coreOptions, range, fields, "test-type", entries); + + assertThat(metas).hasSize(1); + assertThat(metas.get(0).globalIndexMeta().indexFieldId()).isEqualTo(1); + assertThat(metas.get(0).globalIndexMeta().extraFieldIds()).isNull(); + } + + // Test: 3 columns (title + vec + id), primary column title is indexFieldId, rest in + // extraFieldIds + @Test + void testToIndexFileMetasThreeColumns() throws IOException { + DataField titleField = new DataField(1, "title", new VarCharType(Integer.MAX_VALUE)); + DataField vecField = new DataField(2, "vec", new ArrayType(new FloatType())); + DataField idField = new DataField(3, "id", new IntType()); + List fields = Arrays.asList(titleField, vecField, idField); + + List entries = createDummyResultEntries(); + Range range = new Range(0, 199); + + List metas = + GlobalIndexBuilderUtils.toIndexFileMetas( + fileIO, indexPathFactory, coreOptions, range, fields, "test-type", entries); + + assertThat(metas).hasSize(1); + assertThat(metas.get(0).globalIndexMeta().indexFieldId()).isEqualTo(1); + assertThat(metas.get(0).globalIndexMeta().extraFieldIds()).isEqualTo(new int[] {2, 3}); + } + + private List createDummyResultEntries() throws IOException { + String fileName = "test-index-" + UUID.randomUUID(); + Path filePath = indexPathFactory.toPath(fileName); + fileIO.newOutputStream(filePath, false).close(); + return Collections.singletonList(new ResultEntry(fileName, 100, null)); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/dataevolution/MergeIntoUpdateChecker.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/dataevolution/MergeIntoUpdateChecker.java index 8b1122382aae..bdd0c0d49194 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/dataevolution/MergeIntoUpdateChecker.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/dataevolution/MergeIntoUpdateChecker.java @@ -100,10 +100,12 @@ private void checkUpdatedColumns() { GlobalIndexMeta globalIndexMeta = entry.indexFile().globalIndexMeta(); if (globalIndexMeta != null) { - String fieldName = - rowType.getField(globalIndexMeta.indexFieldId()) - .name(); - return updatedColumns.contains(fieldName) + List indexedNames = + globalIndexMeta.getIndexedFieldNames(rowType); + boolean overlaps = + indexedNames.stream() + .anyMatch(updatedColumns::contains); + return overlaps && affectedPartitions.contains(entry.partition()); } return false; @@ -116,8 +118,8 @@ private void checkUpdatedColumns() { case THROW_ERROR: Set conflictedColumns = affectedEntries.stream() - .map(file -> file.indexFile().globalIndexMeta().indexFieldId()) - .map(id -> rowType.getField(id).name()) + .map(file -> file.indexFile().globalIndexMeta()) + .flatMap(meta -> meta.getIndexedFieldNames(rowType).stream()) .collect(Collectors.toSet()); throw new RuntimeException( diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilder.java index 5896503ce09d..af256da8ec72 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilder.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilder.java @@ -29,7 +29,9 @@ import org.apache.paimon.flink.utils.BoundedOneInputOperator; import org.apache.paimon.flink.utils.JavaTypeInfo; import org.apache.paimon.flink.utils.StreamExecutionEnvironmentUtils; +import org.apache.paimon.globalindex.GlobalIndexMultiColumnWriter; import org.apache.paimon.globalindex.GlobalIndexSingletonWriter; +import org.apache.paimon.globalindex.GlobalIndexWriter; import org.apache.paimon.globalindex.ResultEntry; import org.apache.paimon.index.IndexFileMeta; import org.apache.paimon.io.DataFileMeta; @@ -38,7 +40,6 @@ import org.apache.paimon.options.Options; import org.apache.paimon.partition.PartitionPredicate; import org.apache.paimon.reader.RecordReader; -import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.SpecialFields; import org.apache.paimon.table.sink.BatchWriteBuilder; @@ -50,6 +51,7 @@ import org.apache.paimon.types.DataField; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.CloseableIterator; +import org.apache.paimon.utils.ProjectedRow; import org.apache.paimon.utils.Range; import org.apache.flink.streaming.api.datastream.DataStream; @@ -65,7 +67,6 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; -import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -74,6 +75,8 @@ import java.util.stream.Collectors; import static org.apache.paimon.globalindex.GlobalIndexBuilderUtils.createIndexWriter; +import static org.apache.paimon.globalindex.GlobalIndexBuilderUtils.filterEntriesBefore; +import static org.apache.paimon.globalindex.GlobalIndexBuilderUtils.findMinNonIndexableRowId; import static org.apache.paimon.globalindex.GlobalIndexBuilderUtils.toIndexFileMetas; import static org.apache.paimon.io.CompactIncrement.emptyIncrement; import static org.apache.paimon.io.DataIncrement.deleteIndexIncrement; @@ -104,6 +107,7 @@ public static void buildIndexAndExecute( env, table, indexColumn, + Collections.emptyList(), indexType, partitionPredicate, userOptions, @@ -119,12 +123,54 @@ public static void buildIndexAndExecute( Options userOptions, long maxIndexedRowId) throws Exception { + buildIndexAndExecute( + env, + table, + indexColumn, + Collections.emptyList(), + indexType, + partitionPredicate, + userOptions, + maxIndexedRowId); + } + + public static void buildIndexAndExecute( + StreamExecutionEnvironment env, + FileStoreTable table, + String indexColumn, + List extraColumns, + String indexType, + PartitionPredicate partitionPredicate, + Options userOptions) + throws Exception { + buildIndexAndExecute( + env, + table, + indexColumn, + extraColumns, + indexType, + partitionPredicate, + userOptions, + NO_MAX_INDEXED_ROW_ID); + } + + public static void buildIndexAndExecute( + StreamExecutionEnvironment env, + FileStoreTable table, + String indexColumn, + List extraColumns, + String indexType, + PartitionPredicate partitionPredicate, + Options userOptions, + long maxIndexedRowId) + throws Exception { boolean hasIndexToBuild = buildIndex( env, () -> new GenericGlobalIndexBuilder(table), table, indexColumn, + extraColumns, indexType, partitionPredicate, userOptions, @@ -150,12 +196,35 @@ public static boolean buildIndex( indexBuilderSupplier, table, indexColumn, + Collections.emptyList(), indexType, partitionPredicate, userOptions, NO_MAX_INDEXED_ROW_ID); } + public static boolean buildIndex( + StreamExecutionEnvironment env, + Supplier indexBuilderSupplier, + FileStoreTable table, + String indexColumn, + String indexType, + PartitionPredicate partitionPredicate, + Options userOptions, + long maxIndexedRowId) + throws Exception { + return buildIndex( + env, + indexBuilderSupplier, + table, + indexColumn, + Collections.emptyList(), + indexType, + partitionPredicate, + userOptions, + maxIndexedRowId); + } + /** * Builds a generic global index topology using a {@link GenericGlobalIndexBuilder} supplier. * @@ -167,6 +236,7 @@ public static boolean buildIndex( Supplier indexBuilderSupplier, FileStoreTable table, String indexColumn, + List extraColumns, String indexType, PartitionPredicate partitionPredicate, Options userOptions, @@ -184,6 +254,7 @@ public static boolean buildIndex( env, table, indexColumn, + extraColumns, indexType, userOptions, entries, @@ -204,32 +275,39 @@ private static boolean buildTopology( StreamExecutionEnvironment env, FileStoreTable table, String indexColumn, + List extraColumns, String indexType, Options userOptions, List entries, List deletedIndexEntries, long maxIndexedRowId) throws Exception { + // The primary column followed by the extra columns, in index order. + List indexColumns = new ArrayList<>(1 + extraColumns.size()); + indexColumns.add(indexColumn); + indexColumns.addAll(extraColumns); + long totalRowCount = entries.stream().mapToLong(e -> e.file().rowCount()).sum(); LOG.info( - "Scanned {} files ({} rows) across {} partitions for {} index on column '{}'" + "Scanned {} files ({} rows) across {} partitions for {} index on columns '{}'" + (maxIndexedRowId >= 0 ? ", maxIndexedRowId={}." : "."), entries.size(), totalRowCount, entries.stream().map(ManifestEntry::partition).distinct().count(), indexType, - indexColumn, + indexColumns, maxIndexedRowId); long minNonIndexableRowId = - findMinNonIndexableRowId(table.schemaManager(), entries, indexColumn); + findMinNonIndexableRowId(table.schemaManager(), entries, indexColumns); entries = filterEntriesBefore(entries, minNonIndexableRowId); RowType rowType = table.rowType(); DataField indexField = rowType.getField(indexColumn); - // Project indexColumn + _ROW_ID so we can read the actual row ID from data - List readColumns = new ArrayList<>(); - readColumns.add(indexColumn); + List extraFields = + extraColumns.stream().map(rowType::getField).collect(Collectors.toList()); + // Project indexColumns + _ROW_ID so we can read the actual row ID from data + List readColumns = new ArrayList<>(indexColumns); readColumns.add(SpecialFields.ROW_ID.name()); RowType projectedRowType = SpecialFields.rowTypeWithRowId(rowType).project(readColumns); @@ -278,6 +356,7 @@ private static boolean buildTopology( table, indexType, indexField, + extraFields, projectedRowType, mergedOptions)) .setParallelism(parallelism); @@ -298,49 +377,6 @@ private static boolean buildTopology( return true; } - /** - * Find the minimum firstRowId among files whose schema does not contain the index column. Files - * at or beyond this rowId cannot be indexed because the column was added later via ALTER TABLE. - * - * @return the boundary rowId, or {@link Long#MAX_VALUE} if all files contain the column - */ - static long findMinNonIndexableRowId( - SchemaManager schemaManager, List entries, String indexColumn) { - Map schemaContainsColumn = new HashMap<>(); - long minRowId = Long.MAX_VALUE; - for (ManifestEntry entry : entries) { - long sid = entry.file().schemaId(); - boolean contains = - schemaContainsColumn.computeIfAbsent( - sid, id -> schemaManager.schema(id).fieldNames().contains(indexColumn)); - if (!contains && entry.file().firstRowId() != null) { - minRowId = Math.min(minRowId, entry.file().nonNullFirstRowId()); - } - } - return minRowId; - } - - /** Keep only entries whose firstRowId is strictly less than the given boundary. */ - static List filterEntriesBefore( - List entries, long boundaryRowId) { - if (boundaryRowId == Long.MAX_VALUE) { - return entries; - } - List result = new ArrayList<>(); - for (ManifestEntry entry : entries) { - if (entry.file().firstRowId() != null - && entry.file().nonNullFirstRowId() < boundaryRowId) { - result.add(entry); - } - } - LOG.info( - "Filtered {} files at or beyond rowId {}, {} files remain.", - entries.size() - result.size(), - boundaryRowId, - result.size()); - return result; - } - /** * Compute shard tasks for a full build (no rows to skip). * @@ -549,24 +585,30 @@ private static class BuildIndexOperator private final FileStoreTable table; private final String indexType; private final DataField indexField; + private final List extraFields; private final RowType projectedRowType; private final Options mergedOptions; private transient TableRead tableRead; - private transient InternalRow.FieldGetter indexFieldGetter; + private transient List indexedFields; + private transient InternalRow.FieldGetter[] indexFieldGetters; private transient int rowIdFieldIndex; + private transient boolean multiColumn; + private transient ProjectedRow writerProjection; BuildIndexOperator( ReadBuilder readBuilder, FileStoreTable table, String indexType, DataField indexField, + List extraFields, RowType projectedRowType, Options mergedOptions) { this.readBuilder = readBuilder; this.table = table; this.indexType = indexType; this.indexField = indexField; + this.extraFields = extraFields; this.projectedRowType = projectedRowType; this.mergedOptions = mergedOptions; } @@ -575,10 +617,27 @@ private static class BuildIndexOperator public void open() throws Exception { super.open(); this.tableRead = readBuilder.newRead(); - this.indexFieldGetter = - InternalRow.createFieldGetter( - indexField.type(), projectedRowType.getFieldIndex(indexField.name())); + // The primary column followed by the extra columns, in index order. Field getters and + // the writer projection both need the full ordered list. + this.indexedFields = new ArrayList<>(1 + extraFields.size()); + indexedFields.add(indexField); + indexedFields.addAll(extraFields); + this.indexFieldGetters = new InternalRow.FieldGetter[indexedFields.size()]; + for (int i = 0; i < indexedFields.size(); i++) { + DataField field = indexedFields.get(i); + indexFieldGetters[i] = + InternalRow.createFieldGetter( + field.type(), projectedRowType.getFieldIndex(field.name())); + } this.rowIdFieldIndex = projectedRowType.getFieldIndex(SpecialFields.ROW_ID.name()); + this.multiColumn = !extraFields.isEmpty(); + if (multiColumn) { + int[] projection = new int[indexedFields.size()]; + for (int i = 0; i < indexedFields.size(); i++) { + projection[i] = projectedRowType.getFieldIndex(indexedFields.get(i).name()); + } + this.writerProjection = ProjectedRow.from(projection); + } } @Override @@ -595,9 +654,8 @@ public void processElement(StreamRecord element) throws Exception { task.split.dataFiles().size()); long startTime = System.currentTimeMillis(); - GlobalIndexSingletonWriter indexWriter = - (GlobalIndexSingletonWriter) - createIndexWriter(table, indexType, indexField, mergedOptions); + GlobalIndexWriter indexWriter = + createIndexWriter(table, indexType, indexField, extraFields, mergedOptions); try { long rowsSeen = 0; @@ -626,8 +684,14 @@ public void processElement(StreamRecord element) throws Exception { } // Only write rows within this shard's range if (currentRowId >= task.shardRange.from) { - Object fieldData = indexFieldGetter.getFieldOrNull(row); - indexWriter.write(fieldData); + if (multiColumn) { + long rowId = currentRowId - task.shardRange.from; + ((GlobalIndexMultiColumnWriter) indexWriter) + .write(rowId, writerProjection.replaceRow(row)); + } else { + Object fieldData = indexFieldGetters[0].getFieldOrNull(row); + ((GlobalIndexSingletonWriter) indexWriter).write(fieldData); + } rowsSeen++; } } @@ -664,7 +728,7 @@ public void processElement(StreamRecord element) throws Exception { table, partition, task.shardRange, - indexField, + indexedFields, indexType, resultEntries); output.collect( @@ -688,7 +752,7 @@ private static CommitMessage flushIndex( FileStoreTable table, BinaryRow partition, Range rowRange, - DataField indexField, + List indexFields, String indexType, List resultEntries) throws IOException { @@ -698,14 +762,14 @@ private static CommitMessage flushIndex( table.store().pathFactory().globalIndexFileFactory(), table.coreOptions(), rowRange, - indexField.id(), + indexFields, indexType, resultEntries); return new CommitMessageImpl( partition, 0, null, indexIncrement(indexFileMetas), emptyIncrement()); } - private static void closeWriterQuietly(GlobalIndexSingletonWriter writer) { + private static void closeWriterQuietly(GlobalIndexWriter writer) { if (writer instanceof Closeable) { try { ((Closeable) writer).close(); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateGlobalIndexProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateGlobalIndexProcedure.java index ad62ad8f7654..8d15824918dd 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateGlobalIndexProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateGlobalIndexProcedure.java @@ -20,6 +20,7 @@ import org.apache.paimon.flink.btree.BTreeIndexTopoBuilder; import org.apache.paimon.flink.globalindex.GenericIndexTopoBuilder; +import org.apache.paimon.globalindex.GlobalIndexerFactoryUtils; import org.apache.paimon.options.Options; import org.apache.paimon.partition.PartitionPredicate; import org.apache.paimon.predicate.Predicate; @@ -32,8 +33,11 @@ import org.apache.flink.table.annotation.ProcedureHint; import org.apache.flink.table.procedure.ProcedureContext; +import java.util.Arrays; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import static org.apache.paimon.utils.ParameterUtils.getPartitions; import static org.apache.paimon.utils.Preconditions.checkArgument; @@ -85,11 +89,23 @@ public String[] call( tableId); RowType rowType = table.rowType(); + List indexColumns = + Arrays.stream(indexColumn.split(",")) + .map(String::trim) + .filter(s -> !s.isEmpty()) + .collect(Collectors.toList()); + checkArgument(!indexColumns.isEmpty(), "At least one column required."); checkArgument( - rowType.containsField(indexColumn), - "Column '%s' does not exist in table '%s'.", - indexColumn, - tableId); + indexColumns.size() == new HashSet<>(indexColumns).size(), + "Duplicate index columns are not allowed: %s", + indexColumns); + for (String col : indexColumns) { + checkArgument( + rowType.containsField(col), + "Column '%s' does not exist in table '%s'.", + col, + tableId); + } // Parse partition predicate PartitionPredicate partitionPredicate = parsePartitionPredicate(table, partitions); @@ -99,12 +115,21 @@ public String[] call( // Build global index based on index type indexType = indexType.toLowerCase().trim(); + if (indexColumns.size() > 1) { + // Whether multi-column is supported is decided by each index type's factory; fail fast + // up front instead of failing later in the build job. + checkArgument( + GlobalIndexerFactoryUtils.load(indexType).supportsMultiColumn(), + "Index type '%s' does not support multi-column index, got columns: %s", + indexType, + indexColumns); + } try { if ("btree".equals(indexType)) { BTreeIndexTopoBuilder.buildIndexAndExecute( procedureContext.getExecutionEnvironment(), table, - indexColumn, + indexColumns.get(0), partitionPredicate, userOptions); return new String[] { @@ -114,7 +139,8 @@ public String[] call( GenericIndexTopoBuilder.buildIndexAndExecute( procedureContext.getExecutionEnvironment(), table, - indexColumn, + indexColumns.get(0), + indexColumns.subList(1, indexColumns.size()), indexType, partitionPredicate, userOptions); @@ -122,8 +148,8 @@ public String[] call( } catch (Exception e) { throw new RuntimeException( String.format( - "Failed to create %s index for column '%s' on table '%s'.", - indexType, indexColumn, table.name()), + "Failed to create %s index for columns '%s' on table '%s'.", + indexType, indexColumns, table.name()), e); } return new String[] { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DropGlobalIndexProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DropGlobalIndexProcedure.java index a5ab0239c215..92bde693ea6c 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DropGlobalIndexProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DropGlobalIndexProcedure.java @@ -42,6 +42,7 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.UUID; @@ -82,13 +83,26 @@ public String[] call( FileStoreTable table = (FileStoreTable) table(tableId); - // Validate column exists + // Parse comma-separated columns (consistent with create procedure) RowType rowType = table.rowType(); - checkArgument( - rowType.containsField(indexColumn), - "Column '%s' does not exist in table '%s'.", - indexColumn, - tableId); + List indexColumns = + Arrays.stream(indexColumn.split(",")) + .map(String::trim) + .filter(s -> !s.isEmpty()) + .collect(Collectors.toList()); + checkArgument(!indexColumns.isEmpty(), "At least one column required."); + for (String col : indexColumns) { + checkArgument( + rowType.containsField(col), + "Column '%s' does not exist in table '%s'.", + col, + tableId); + } + final List indexFieldIds = + indexColumns.stream() + .map(col -> rowType.getField(col).id()) + .collect(Collectors.toList()); + final String columnsDesc = String.join(",", indexColumns); // Parse partition predicate PartitionPredicate partitionPredicate = parsePartitionPredicate(table, partitions); @@ -96,9 +110,6 @@ public String[] call( // Normalize index type final String indexTypeLower = indexType.toLowerCase().trim(); - // Get column field ID for final reference in lambda - final int columnId = rowType.getField(indexColumn).id(); - // Get latest snapshot Snapshot snapshot = table.latestSnapshot() @@ -108,12 +119,15 @@ public String[] call( String.format( "Table '%s' has no snapshot.", tableId))); - // Create filter for index entries to delete + // Create filter for index entries to delete — match by primary column + full column set Filter filter = entry -> entry.indexFile().indexType().equals(indexTypeLower) && entry.indexFile().globalIndexMeta() != null - && entry.indexFile().globalIndexMeta().indexFieldId() == columnId + && entry.indexFile() + .globalIndexMeta() + .getIndexedFieldIds() + .equals(indexFieldIds) && (partitionPredicate == null || partitionPredicate.test(entry.partition())); @@ -122,15 +136,15 @@ public String[] call( table.store().newIndexFileHandler().scan(snapshot, filter); LOG.info( - "Found {} {} global index files to delete for column '{}' on table '{}'", + "Found {} {} global index files to delete for columns '{}' on table '{}'", waitToDelete.size(), indexTypeLower, - indexColumn, + columnsDesc, table.name()); if (waitToDelete.isEmpty()) { return new String[] { - "No " + indexTypeLower + " global index found for column '" + indexColumn + "'" + "No " + indexTypeLower + " global index found for columns '" + columnsDesc + "'" }; } @@ -165,10 +179,10 @@ public String[] call( } LOG.info( - "Successfully dropped {} {} global index files for column '{}' on table '{}'", + "Successfully dropped {} {} global index files for columns '{}' on table '{}'", waitToDelete.size(), indexTypeLower, - indexColumn, + columnsDesc, table.name()); return new String[] { @@ -176,8 +190,8 @@ public String[] call( + waitToDelete.size() + " " + indexTypeLower - + " global index files for column '" - + indexColumn + + " global index files for columns '" + + columnsDesc + "' on table '" + table.name() + "'" diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilderTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilderTest.java index 0de57077b295..c69b59ad6e3c 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilderTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilderTest.java @@ -23,6 +23,7 @@ import org.apache.paimon.data.BinaryRowWriter; import org.apache.paimon.data.BinaryString; import org.apache.paimon.fs.Path; +import org.apache.paimon.globalindex.GlobalIndexBuilderUtils; import org.apache.paimon.io.PojoDataFileMeta; import org.apache.paimon.manifest.FileKind; import org.apache.paimon.manifest.ManifestEntry; @@ -472,10 +473,10 @@ void testAppendFilterOldFilesBeforeNewFiles() { entries.add(createEntryWithSchemaId(BinaryRow.EMPTY_ROW, 200L, 100, 0L)); List result = - GenericIndexTopoBuilder.filterEntriesBefore( + GlobalIndexBuilderUtils.filterEntriesBefore( entries, - GenericIndexTopoBuilder.findMinNonIndexableRowId( - schemaManager, entries, "vec")); + GlobalIndexBuilderUtils.findMinNonIndexableRowId( + schemaManager, entries, Collections.singletonList("vec"))); assertThat(result).hasSize(2); assertThat(result.get(0).file().nonNullFirstRowId()).isEqualTo(0L); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/DropGlobalIndexProcedureITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/DropGlobalIndexProcedureITCase.java index 5659467d8aa9..a348b5af7eda 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/DropGlobalIndexProcedureITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/DropGlobalIndexProcedureITCase.java @@ -299,6 +299,6 @@ public void testDropNonExistentIndex() throws Exception { assertThat(dropResult.get(0).getField(0)) .isInstanceOf(String.class) .asString() - .contains("No btree global index found for column 'name'"); + .contains("No btree global index found for columns 'name'"); } } diff --git a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala b/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala index ad6f5b95011a..e5d7df44be74 100644 --- a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala +++ b/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala @@ -21,6 +21,7 @@ package org.apache.paimon.spark.commands import org.apache.paimon.CoreOptions.GlobalIndexColumnUpdateAction import org.apache.paimon.data.BinaryRow import org.apache.paimon.format.blob.BlobFileFormat.isBlobFile +import org.apache.paimon.index.GlobalIndexMeta import org.apache.paimon.io.{CompactIncrement, DataIncrement} import org.apache.paimon.manifest.IndexManifestEntry import org.apache.paimon.spark.SparkTable @@ -592,9 +593,9 @@ case class MergeIntoPaimonDataEvolutionTable( if (globalIndexMeta == null) { false } else { - val fieldName = rowType.getField(globalIndexMeta.indexFieldId()).name() + val indexedNames = globalIndexMeta.getIndexedFieldNames(rowType).asScala affectedParts.contains(entry.partition()) && updateColumns.exists( - _.name.equals(fieldName)) + col => indexedNames.contains(col.name)) } } @@ -611,8 +612,7 @@ case class MergeIntoPaimonDataEvolutionTable( case GlobalIndexColumnUpdateAction.THROW_ERROR => val updatedColNames = updateColumns.map(_.name) val conflicted = affectedIndexEntries - .map(_.indexFile().globalIndexMeta().indexFieldId()) - .map(id => rowType.getField(id).name()) + .flatMap(e => e.indexFile().globalIndexMeta().getIndexedFieldNames(rowType).asScala) .toSet throw new RuntimeException( s"""MergeInto: update columns contain globally indexed columns, not supported now. diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/DefaultGlobalIndexBuilder.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/DefaultGlobalIndexBuilder.java index 1485d14fac1c..ae87dc96a45a 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/DefaultGlobalIndexBuilder.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/DefaultGlobalIndexBuilder.java @@ -20,23 +20,29 @@ import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.globalindex.GlobalIndexMultiColumnWriter; import org.apache.paimon.globalindex.GlobalIndexSingletonWriter; +import org.apache.paimon.globalindex.GlobalIndexWriter; import org.apache.paimon.globalindex.ResultEntry; import org.apache.paimon.index.IndexFileMeta; import org.apache.paimon.io.CompactIncrement; import org.apache.paimon.io.DataIncrement; import org.apache.paimon.options.Options; import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.SpecialFields; import org.apache.paimon.table.sink.CommitMessage; import org.apache.paimon.table.sink.CommitMessageImpl; import org.apache.paimon.types.DataField; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.CloseableIterator; import org.apache.paimon.utils.LongCounter; +import org.apache.paimon.utils.ProjectedRow; import org.apache.paimon.utils.Range; import java.io.IOException; import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; import static org.apache.paimon.globalindex.GlobalIndexBuilderUtils.createIndexWriter; @@ -51,6 +57,7 @@ public class DefaultGlobalIndexBuilder implements Serializable { private final BinaryRow partition; private final RowType readType; private final DataField indexField; + private final List extraFields; private final String indexType; private final Range rowRange; private final Options options; @@ -63,15 +70,48 @@ public DefaultGlobalIndexBuilder( String indexType, Range rowRange, Options options) { + this( + table, + partition, + readType, + indexField, + Collections.emptyList(), + indexType, + rowRange, + options); + } + + public DefaultGlobalIndexBuilder( + FileStoreTable table, + BinaryRow partition, + RowType readType, + DataField indexField, + List extraFields, + String indexType, + Range rowRange, + Options options) { this.table = table; this.partition = partition; this.readType = readType; this.indexField = indexField; + // Copy into a serializable ArrayList: callers may pass a List#subList view (e.g. + // indexFields.subList(1, ...)), which is not Serializable, and this builder is serialized + // and shipped to Spark executors. A null value means no extra columns. + this.extraFields = + extraFields == null ? Collections.emptyList() : new ArrayList<>(extraFields); this.indexType = indexType; this.rowRange = rowRange; this.options = options; } + /** The primary index column followed by the extra columns, in index order. */ + private List indexedFields() { + List fields = new ArrayList<>(1 + extraFields.size()); + fields.add(indexField); + fields.addAll(extraFields); + return fields; + } + public FileStoreTable table() { return table; } @@ -89,7 +129,7 @@ public CommitMessage build(CloseableIterator data) throws IOExcepti table.store().pathFactory().globalIndexFileFactory(), table.coreOptions(), rowRange, - indexField.id(), + indexedFields(), indexType, resultEntries); DataIncrement dataIncrement = DataIncrement.indexIncrement(indexFileMetas); @@ -99,27 +139,50 @@ public CommitMessage build(CloseableIterator data) throws IOExcepti private List writePaimonRows( CloseableIterator rows, LongCounter rowCounter) throws IOException { - GlobalIndexSingletonWriter indexWriter = - (GlobalIndexSingletonWriter) - createIndexWriter(table, indexType, indexField, options); + GlobalIndexWriter indexWriter = + createIndexWriter(table, indexType, indexField, extraFields, options); + boolean multiColumn = !extraFields.isEmpty(); try { - InternalRow.FieldGetter getter = - InternalRow.createFieldGetter( - indexField.type(), readType.getFieldIndex(indexField.name())); - rows.forEachRemaining( - row -> { - Object indexO = getter.getFieldOrNull(row); - indexWriter.write(indexO); - rowCounter.add(1); - }); + if (multiColumn) { + GlobalIndexMultiColumnWriter multiWriter = + (GlobalIndexMultiColumnWriter) indexWriter; + List indexedFields = indexedFields(); + int[] projection = new int[indexedFields.size()]; + for (int i = 0; i < indexedFields.size(); i++) { + DataField field = indexedFields.get(i); + projection[i] = readType.getFieldIndex(field.name()); + } + ProjectedRow projectedRow = ProjectedRow.from(projection); + int rowIdIndex = readType.getFieldIndex(SpecialFields.ROW_ID.name()); + while (rows.hasNext()) { + InternalRow row = rows.next(); + long absRowId = row.getLong(rowIdIndex); + if (absRowId < rowRange.from || absRowId > rowRange.to) { + continue; + } + multiWriter.write(absRowId - rowRange.from, projectedRow.replaceRow(row)); + rowCounter.add(1); + } + } else { + GlobalIndexSingletonWriter singleWriter = (GlobalIndexSingletonWriter) indexWriter; + InternalRow.FieldGetter getter = + InternalRow.createFieldGetter( + indexField.type(), readType.getFieldIndex(indexField.name())); + rows.forEachRemaining( + row -> { + Object indexO = getter.getFieldOrNull(row); + singleWriter.write(indexO); + rowCounter.add(1); + }); + } return indexWriter.finish(); } finally { closeWriterQuietly(indexWriter); } } - private static void closeWriterQuietly(GlobalIndexSingletonWriter writer) { + private static void closeWriterQuietly(GlobalIndexWriter writer) { if (writer instanceof java.io.Closeable) { try { ((java.io.Closeable) writer).close(); diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/DefaultGlobalIndexTopoBuilder.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/DefaultGlobalIndexTopoBuilder.java index afd954c39a5d..cf65ed937a95 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/DefaultGlobalIndexTopoBuilder.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/DefaultGlobalIndexTopoBuilder.java @@ -21,12 +21,14 @@ import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.InternalRow; import org.apache.paimon.fs.Path; +import org.apache.paimon.globalindex.GlobalIndexBuilderUtils; import org.apache.paimon.globalindex.IndexedSplit; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.options.Options; import org.apache.paimon.partition.PartitionPredicate; import org.apache.paimon.reader.RecordReader; +import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.sink.CommitMessage; import org.apache.paimon.table.sink.CommitMessageSerializer; @@ -77,6 +79,30 @@ public List buildIndex( DataField indexField, Options options) throws IOException { + return buildIndex( + spark, + relation, + partitionPredicate, + table, + indexType, + readType, + indexField, + Collections.emptyList(), + options); + } + + @Override + public List buildIndex( + SparkSession spark, + DataSourceV2Relation relation, + PartitionPredicate partitionPredicate, + FileStoreTable table, + String indexType, + RowType readType, + DataField indexField, + List extraFields, + Options options) + throws IOException { Options tableOptions = table.coreOptions().toConfiguration(); long rowsPerShard = tableOptions @@ -88,6 +114,16 @@ public List buildIndex( List entries = table.store().newScan().withPartitionFilter(partitionPredicate).plan().files(); + List indexFields = new ArrayList<>(); + indexFields.add(indexField); + indexFields.addAll(extraFields); + List indexColumns = + indexFields.stream().map(DataField::name).collect(Collectors.toList()); + SchemaManager schemaManager = new SchemaManager(table.fileIO(), table.location()); + long boundaryRowId = + GlobalIndexBuilderUtils.findMinNonIndexableRowId( + schemaManager, entries, indexColumns); + entries = GlobalIndexBuilderUtils.filterEntriesBefore(entries, boundaryRowId); // generate splits for each partition && shard Map> splits = split(table, entries, rowsPerShard); @@ -107,6 +143,7 @@ public List buildIndex( partition, readType, indexField, + extraFields, indexType, indexedSplit.rowRanges().get(0), options); diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/GlobalIndexTopologyBuilder.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/GlobalIndexTopologyBuilder.java index 50c6ab34e153..d7a47cfdc9ed 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/GlobalIndexTopologyBuilder.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/GlobalIndexTopologyBuilder.java @@ -46,4 +46,32 @@ List buildIndex( DataField indexField, Options options) throws IOException; + + default List buildIndex( + SparkSession spark, + DataSourceV2Relation relation, + PartitionPredicate partitionPredicate, + FileStoreTable table, + String indexType, + RowType readType, + DataField indexField, + List extraFields, + Options options) + throws IOException { + if (extraFields != null && !extraFields.isEmpty()) { + throw new UnsupportedOperationException( + String.format( + "Topology builder '%s' does not support multi-column index, got extra columns: %s", + identifier(), extraFields)); + } + return buildIndex( + spark, + relation, + partitionPredicate, + table, + indexType, + readType, + indexField, + options); + } } diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedure.java index b447cdbd33f8..bcb9c06cffb5 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedure.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedure.java @@ -18,6 +18,7 @@ package org.apache.paimon.spark.procedure; +import org.apache.paimon.globalindex.GlobalIndexerFactoryUtils; import org.apache.paimon.options.Options; import org.apache.paimon.partition.PartitionPredicate; import org.apache.paimon.spark.globalindex.GlobalIndexTopologyBuilder; @@ -43,12 +44,14 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Collections; +import java.util.Arrays; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Locale; import java.util.Map; import java.util.UUID; +import java.util.stream.Collectors; import static org.apache.paimon.utils.Preconditions.checkArgument; import static org.apache.spark.sql.types.DataTypes.StringType; @@ -120,6 +123,11 @@ public InternalRow[] call(InternalRow args) { return modifySparkTable( tableIdent, sparkTable -> { + List indexColumns = + Arrays.stream(column.split(",")) + .map(String::trim) + .filter(s -> !s.isEmpty()) + .collect(Collectors.toList()); try { org.apache.paimon.table.Table t = sparkTable.getTable(); checkArgument( @@ -132,11 +140,24 @@ public InternalRow[] call(InternalRow args) { tableIdent); RowType rowType = table.rowType(); + checkArgument(!indexColumns.isEmpty(), "At least one column required."); checkArgument( - rowType.containsField(column), - "Column '%s' does not exist in table '%s'.", - column, - tableIdent); + indexColumns.size() == new HashSet<>(indexColumns).size(), + "Duplicate index columns are not allowed: %s", + indexColumns); + // No hard cap on the number of index columns: unlike row-store B-tree + // indexes (e.g. MySQL 16, PostgreSQL 32) whose limit comes from composing + // columns into a single key, the global index is built on per-type index + // frameworks. Whether multiple columns are supported, and any practical + // limit, is decided by each index type (single-column types reject + // multi-column via UnsupportedOperationException). + for (String col : indexColumns) { + checkArgument( + rowType.containsField(col), + "Column '%s' does not exist in table '%s'.", + col, + tableIdent); + } DataSourceV2Relation relation = createRelation(tableIdent, sparkTable); PartitionPredicate partitionPredicate = SparkProcedureUtils.convertToPartitionPredicate( @@ -145,13 +166,26 @@ public InternalRow[] call(InternalRow args) { spark(), relation); - DataField indexField = rowType.getField(column); - RowType projectedRowType = - rowType.project(Collections.singletonList(column)); + List indexFields = + indexColumns.stream() + .map(rowType::getField) + .collect(Collectors.toList()); + RowType projectedRowType = rowType.project(indexColumns); RowType readRowType = SpecialFields.rowTypeWithRowId(projectedRowType); Options userOptions = createUserOptions(table, optionString); + if (indexColumns.size() > 1) { + // Whether multi-column is supported is decided by each index type's + // factory; fail fast up front instead of failing later in the build + // job. + checkArgument( + GlobalIndexerFactoryUtils.load(indexType).supportsMultiColumn(), + "Index type '%s' does not support multi-column index, got columns: %s", + indexType, + indexColumns); + } + GlobalIndexTopologyBuilder topoBuilder = GlobalIndexTopologyBuilderUtils.createTopoBuilder(indexType); @@ -163,7 +197,8 @@ public InternalRow[] call(InternalRow args) { table, indexType, readRowType, - indexField, + indexFields.get(0), + indexFields.subList(1, indexFields.size()), userOptions); try (TableCommitImpl commit = @@ -179,8 +214,8 @@ public InternalRow[] call(InternalRow args) { } catch (Exception e) { throw new RuntimeException( String.format( - "Failed to create %s index for column '%s' on table '%s'.", - indexType, column, tableIdent), + "Failed to create %s index for columns '%s' on table '%s'.", + indexType, indexColumns, tableIdent), e); } }); diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/DropGlobalIndexProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/DropGlobalIndexProcedure.java index 74e4cc4aea50..bd218eb68ddf 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/DropGlobalIndexProcedure.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/DropGlobalIndexProcedure.java @@ -46,6 +46,7 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Locale; import java.util.Map; @@ -107,6 +108,13 @@ public InternalRow[] call(InternalRow args) { LOG.info("Starting to drop index for table " + tableIdent + " WHERE: " + finalWhere); + List indexColumns = + Arrays.stream(column.split(",")) + .map(String::trim) + .filter(s -> !s.isEmpty()) + .collect(Collectors.toList()); + checkArgument(!indexColumns.isEmpty(), "At least one column required."); + return modifyPaimonTable( tableIdent, t -> { @@ -117,11 +125,17 @@ public InternalRow[] call(InternalRow args) { FileStoreTable table = (FileStoreTable) t; RowType rowType = table.rowType(); - checkArgument( - rowType.containsField(column), - "Column '%s' does not exist in table '%s'.", - column, - tableIdent); + for (String col : indexColumns) { + checkArgument( + rowType.containsField(col), + "Column '%s' does not exist in table '%s'.", + col, + tableIdent); + } + List indexFieldIds = + indexColumns.stream() + .map(col -> rowType.getField(col).id()) + .collect(Collectors.toList()); DataSourceV2Relation relation = createRelation(tableIdent); PartitionPredicate partitionPredicate = SparkProcedureUtils.convertToPartitionPredicate( @@ -144,9 +158,9 @@ public InternalRow[] call(InternalRow args) { entry.indexFile().indexType().equals(indexType) && entry.indexFile().globalIndexMeta() != null && entry.indexFile() - .globalIndexMeta() - .indexFieldId() - == rowType.getField(column).id() + .globalIndexMeta() + .getIndexedFieldIds() + .equals(indexFieldIds) && (partitionPredicate == null || partitionPredicate.test( entry.partition())); @@ -192,8 +206,8 @@ public InternalRow[] call(InternalRow args) { } catch (Exception e) { throw new RuntimeException( String.format( - "Failed to drop %s index for column '%s' on table '%s'.", - indexType, column, tableIdent), + "Failed to drop %s index for columns '%s' on table '%s'.", + indexType, String.join(",", indexColumns), tableIdent), e); } }); diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala index cd1b000a361f..99990637c351 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala @@ -21,6 +21,7 @@ package org.apache.paimon.spark.commands import org.apache.paimon.CoreOptions.GlobalIndexColumnUpdateAction import org.apache.paimon.data.BinaryRow import org.apache.paimon.format.blob.BlobFileFormat.isBlobFile +import org.apache.paimon.index.GlobalIndexMeta import org.apache.paimon.io.{CompactIncrement, DataIncrement} import org.apache.paimon.manifest.IndexManifestEntry import org.apache.paimon.spark.SparkTable @@ -594,9 +595,9 @@ case class MergeIntoPaimonDataEvolutionTable( if (globalIndexMeta == null) { false } else { - val fieldName = rowType.getField(globalIndexMeta.indexFieldId()).name() + val indexedNames = globalIndexMeta.getIndexedFieldNames(rowType).asScala affectedParts.contains(entry.partition()) && updateColumns.exists( - _.name.equals(fieldName)) + col => indexedNames.contains(col.name)) } } @@ -613,8 +614,7 @@ case class MergeIntoPaimonDataEvolutionTable( case GlobalIndexColumnUpdateAction.THROW_ERROR => val updatedColNames = updateColumns.map(_.name) val conflicted = affectedIndexEntries - .map(_.indexFile().globalIndexMeta().indexFieldId()) - .map(id => rowType.getField(id).name()) + .flatMap(e => e.indexFile().globalIndexMeta().getIndexedFieldNames(rowType).asScala) .toSet throw new RuntimeException( s"""MergeInto: update columns contain globally indexed columns, not supported now.