Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
c9abb86
[globalindex] Support multi-column GlobalIndex framework
CrownChu May 22, 2026
4f28a68
[globalindex] Support multi-column in CreateGlobalIndexProcedure
CrownChu May 25, 2026
9547549
[globalindex] Fix multi-column index metadata storage and resolveFiel…
CrownChu May 25, 2026
7d48842
[globalindex] Fix GenericIndexTopoBuilder multi-column null value error
CrownChu May 26, 2026
b9299bd
[globalindex] Extract findMinNonIndexableRowId and filterEntriesBefor…
CrownChu May 26, 2026
497d88b
[globalindex] Fix test to reference GlobalIndexBuilderUtils after met…
CrownChu May 26, 2026
eeb4c84
[globalindex] Fix multi-column writer projection, add BTree validatio…
CrownChu May 27, 2026
136d14f
[globalindex] Fix MERGE INTO crash when table has multi-column global…
CrownChu May 27, 2026
609dbec
[globalindex] Fix FullText/Vector read path mismatch and reject multi…
CrownChu May 27, 2026
e8e571a
[globalindex] Add input validation, Spark schema filtering, null chec…
CrownChu May 27, 2026
65a4b44
[globalindex] Reject duplicate index columns and document why column …
CrownChu May 27, 2026
3500fe2
[globalindex] Address PR review: isMultiColumn helper, overlap detect…
CrownChu May 29, 2026
0dc435a
[globalindex] Extract getIndexedFieldNames to GlobalIndexMeta and fix…
CrownChu May 29, 2026
7e0e57c
[globalindex] Fix compilation error: move indexColumns out of try block
CrownChu Jun 1, 2026
438d69c
[globalindex] Keep building through null values instead of ending the…
CrownChu Jun 2, 2026
cf4ae3b
[globalindex] Let a field participate in multiple multi-column index …
CrownChu Jun 2, 2026
81a96e3
[globalindex] Make indexFieldId the primary column for multi-column i…
CrownChu Jun 2, 2026
d618873
[globalindex] Refine scanner routing, multi-column writer rowId, and …
CrownChu Jun 3, 2026
7764843
[globalindex] Reject unsupported multi-column index types at creation…
CrownChu Jun 4, 2026
8ed8130
[globalindex] Union extra-column readers, fix vector scan filter, and…
CrownChu Jun 8, 2026
ea398f5
[globalindex] Format drop procedure column message and fix ITCase ass…
CrownChu Jun 8, 2026
a477bc0
[globalindex] Refactor multi-column factory create to explicit primar…
CrownChu Jun 8, 2026
485e868
[globalindex] Thread indexField/extraFields through write path and re…
CrownChu Jun 10, 2026
de3bd36
[globalindex] Fix non-serializable subList in Spark global index builder
CrownChu Jun 10, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.globalindex;

import org.apache.paimon.data.InternalRow;

import javax.annotation.Nullable;

/** Index writer for global index that accepts multiple column values per row. */
public interface GlobalIndexMultiColumnWriter extends GlobalIndexWriter {

/**
* Write one record's indexed columns at the given relative row id.
*
* @param rowId the record's row id relative to the current shard (0 to rowCnt - 1); a null row
* still advances the row id without indexing a value
* @param row a projected row containing only the indexed columns, whose layout matches the
* fields order passed to {@link GlobalIndexerFactory#create(java.util.List,
* org.apache.paimon.options.Options)}
*/
void write(long rowId, @Nullable InternalRow row);
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,14 @@ GlobalIndexReader createReader(
List<GlobalIndexIOMeta> files,
ExecutorService executor);

static GlobalIndexer create(String type, DataField dataField, Options options) {
static GlobalIndexer create(String type, DataField indexField, Options options) {
GlobalIndexerFactory globalIndexerFactory = GlobalIndexerFactoryUtils.load(type);
return globalIndexerFactory.create(dataField, options);
return globalIndexerFactory.create(indexField, options);
}

static GlobalIndexer create(
String type, DataField indexField, List<DataField> extraFields, Options options) {
GlobalIndexerFactory globalIndexerFactory = GlobalIndexerFactoryUtils.load(type);
return globalIndexerFactory.create(indexField, extraFields, options);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,36 @@
import org.apache.paimon.options.Options;
import org.apache.paimon.types.DataField;

import java.util.List;

/** File index factory to construct {@link FileIndexer}. */
public interface GlobalIndexerFactory {

String identifier();

GlobalIndexer create(DataField dataField, Options options);
GlobalIndexer create(DataField indexField, Options options);

/**
* Whether this index type supports multi-column indexes. A factory that returns {@code true}
* must override {@link #create(DataField, List, Options)} to handle extra columns.
*/
default boolean supportsMultiColumn() {
return false;
}

/**
* Creates an indexer over a primary column plus optional extra columns. {@code indexField} is
* the primary column; {@code extraFields} holds the remaining columns and is empty for a
* single-column index.
*/
default GlobalIndexer create(
DataField indexField, List<DataField> extraFields, Options options) {
if (extraFields != null && !extraFields.isEmpty()) {
throw new UnsupportedOperationException(
String.format(
"Index type '%s' does not support multi-column index, got extra columns: %s",
identifier(), extraFields));
}
return create(indexField, options);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,24 +24,84 @@
import org.apache.paimon.index.GlobalIndexMeta;
import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.index.IndexPathFactory;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.DataField;
import org.apache.paimon.utils.Range;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

High risk — MERGE path crash: MULTI_COLUMN_INDEX_FIELD_ID = -1 breaks existing code that calls rowType.getField(globalIndexMeta.indexFieldId()) without guarding against -1:

  1. MergeIntoUpdateChecker.java:104 (Flink): scans index manifest entries and does rowType.getField(globalIndexMeta.indexFieldId()) — will throw when encountering a multi-column index.
  2. MergeIntoPaimonDataEvolutionTable.scala:514 (Spark): same pattern — rowType.getField(globalIndexMeta.indexFieldId()).name().

Once a table has a multi-column global index, any MERGE INTO that touches indexed columns will crash with "Cannot find field by field id: -1".

Fix: these callers need to handle MULTI_COLUMN_INDEX_FIELD_ID by reading extraFieldIds() to get the actual column list.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix:

Added getIndexedFieldNames helper in both Flink and Spark paths:

  • When indexFieldId == MULTI_COLUMN_INDEX_FIELD_ID (-1): resolve column names from extraFieldIds()
  • Otherwise: use the original single-column logic (rowType.getField(indexFieldId) + optional extraFieldIds)

Both the index filter (which entries are affected) and the error reporting (conflicted column names) now correctly handle multi-column indexes.

Affected files:

  • paimon-flink/.../dataevolution/MergeIntoUpdateChecker.java
  • paimon-spark/paimon-spark-common/.../MergeIntoPaimonDataEvolutionTable.scala
  • paimon-spark/paimon-spark-4.0/.../MergeIntoPaimonDataEvolutionTable.scala

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Risk — IndexManifestFileHandler overlap detection false positive with multi-column indexes

IndexManifestFileHandler.java:243 uses indexFieldId to determine whether two index files belong to the same field:

retainedMeta.indexFieldId() != addedMeta.indexFieldId()

With MULTI_COLUMN_INDEX_FIELD_ID = -1, all multi-column indexes share the same sentinel value. Two indexes on different column sets (e.g. [title, vec] vs [content, embedding]) will both have indexFieldId == -1, so the handler treats them as "same field". If their row ranges overlap, it throws IllegalStateException and rejects the commit — even though they are logically independent indexes.

Suggestion: add extraFieldIds comparison (e.g. Arrays.equals) to the overlap check, or compare indexType as well.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for catching this!

Fixed in IndexManifestFileHandler.validateRetainedIndexFiles().
Split the overlap detection into two branches:

  • Single-column: keep original indexFieldId comparison
  • Multi-column (indexFieldId == -1): use Arrays.equals(extraFieldIds) to distinguish different column groups
    This way two multi-column indexes on different column sets (e.g. [title, vec] vs [content, embedding]) won't trigger false positive overlap errors.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor — TableIndexesTable shows null for multi-column index field name

TableIndexesTable.java:238 does logicalRowType.getField(globalMeta.indexFieldId()) which throws when indexFieldId == -1. The exception is caught, but index_field_name silently displays null to users.

Suggestion: when indexFieldId == MULTI_COLUMN_INDEX_FIELD_ID, resolve names from extraFieldIds() and join them with commas.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Fixed in TableIndexesTable.toRow(). When indexFieldId == MULTI_COLUMN_INDEX_FIELD_ID, resolve field names from extraFieldIds() and join with commas (e.g. "title,vec"). Single-column path unchanged.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion — add isMultiColumn() helper to GlobalIndexMeta

The sentinel check indexFieldId == MULTI_COLUMN_INDEX_FIELD_ID is now scattered across many modules (MergeIntoUpdateChecker, MergeIntoPaimonDataEvolutionTable x2, FullTextReadImpl, VectorReadImpl, GlobalIndexScanner, GlobalIndexBuilderUtils, etc.). This is fragile — new code that touches indexFieldId() can easily forget the guard and crash on -1.

Consider adding a convenience method to GlobalIndexMeta:

public boolean isMultiColumn() {
    return indexFieldId == MULTI_COLUMN_INDEX_FIELD_ID;
}

Then all call sites replace meta.indexFieldId() == MULTI_COLUMN_INDEX_FIELD_ID with meta.isMultiColumn(), which is more readable and harder to miss.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Added GlobalIndexMeta.isMultiColumn() and replaced all sentinel checks across the following classes:

  • GlobalIndexScanner
  • FullTextReadImpl
  • VectorReadImpl
  • MergeIntoUpdateChecker
  • TableIndexesTable


/** Utils for global index build. */
public class GlobalIndexBuilderUtils {

private static final Logger LOG = LoggerFactory.getLogger(GlobalIndexBuilderUtils.class);

public static List<IndexFileMeta> toIndexFileMetas(
FileIO fileIO,
IndexPathFactory indexPathFactory,
CoreOptions options,
Range range,
int indexFieldId,
String indexType,
List<ResultEntry> entries)
throws IOException {
return toIndexFileMetas(
fileIO, indexPathFactory, options, range, indexFieldId, null, indexType, entries);
}

/**
* Builds the index file metas. The first column in {@code fields} is treated as the primary
* index column (e.g. the first column in {@code CREATE ... INDEX ON (a, b, c)}) and is stored
* as {@code indexFieldId}; the remaining columns go into {@code extraFieldIds}. Callers must
* pass {@code fields} in the intended column order.
*/
public static List<IndexFileMeta> toIndexFileMetas(
FileIO fileIO,
IndexPathFactory indexPathFactory,
CoreOptions options,
Range range,
List<DataField> fields,
String indexType,
List<ResultEntry> entries)
throws IOException {
// The first column is the primary index column and is stored as indexFieldId; the
// remaining columns (if any) go into extraFieldIds.
int indexFieldId = fields.get(0).id();
int[] extraFieldIds =
fields.size() > 1
? fields.subList(1, fields.size()).stream()
.mapToInt(DataField::id)
.toArray()
: null;
return toIndexFileMetas(
fileIO,
indexPathFactory,
options,
range,
indexFieldId,
extraFieldIds,
indexType,
entries);
}

private static List<IndexFileMeta> toIndexFileMetas(
FileIO fileIO,
IndexPathFactory indexPathFactory,
CoreOptions options,
Range range,
int indexFieldId,
@Nullable int[] extraFieldIds,
String indexType,
List<ResultEntry> entries)
throws IOException {
Expand All @@ -50,7 +110,8 @@ public static List<IndexFileMeta> toIndexFileMetas(
String fileName = entry.fileName();
long fileSize = fileIO.getFileSize(indexPathFactory.toPath(fileName));
GlobalIndexMeta globalIndexMeta =
new GlobalIndexMeta(range.from, range.to, indexFieldId, null, entry.meta());
new GlobalIndexMeta(
range.from, range.to, indexFieldId, extraFieldIds, entry.meta());

Path externalPathDir = options.globalIndexExternalPath();
String externalPathString = null;
Expand Down Expand Up @@ -78,6 +139,82 @@ public static GlobalIndexWriter createIndexWriter(
return globalIndexer.createWriter(createGlobalIndexFileReadWrite(table));
}

public static GlobalIndexWriter createIndexWriter(
FileStoreTable table,
String indexType,
DataField indexField,
List<DataField> extraFields,
Options options)
throws IOException {
GlobalIndexer globalIndexer =
GlobalIndexer.create(indexType, indexField, extraFields, options);
return globalIndexer.createWriter(createGlobalIndexFileReadWrite(table));
}

/**
* Find the minimum firstRowId among files whose schema does not contain all index columns.
* Files at or beyond this rowId cannot be indexed because the column was added later via ALTER
* TABLE.
*
* @return the boundary rowId, or {@link Long#MAX_VALUE} if all files contain the columns
*/
public static long findMinNonIndexableRowId(
SchemaManager schemaManager, List<ManifestEntry> entries, List<String> indexColumns) {
Map<Long, Boolean> schemaContainsColumns = new HashMap<>();
long minRowId = Long.MAX_VALUE;
long minSchemaId = -1;
for (ManifestEntry entry : entries) {
long sid = entry.file().schemaId();
boolean contains =
schemaContainsColumns.computeIfAbsent(
sid,
id -> schemaManager.schema(id).fieldNames().containsAll(indexColumns));
if (!contains && entry.file().firstRowId() != null) {
long rowId = entry.file().nonNullFirstRowId();
if (rowId < minRowId) {
minRowId = rowId;
minSchemaId = sid;
}
}
}
if (minRowId != Long.MAX_VALUE) {
List<String> schemaFields = schemaManager.schema(minSchemaId).fieldNames();
List<String> missingColumns = new ArrayList<>();
for (String col : indexColumns) {
if (!schemaFields.contains(col)) {
missingColumns.add(col);
}
}
LOG.info(
"Found non-indexable files: schemaId={} missing columns {}, boundaryRowId={}.",
minSchemaId,
missingColumns,
minRowId);
}
return minRowId;
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: the old filterEntriesBefore in GenericIndexTopoBuilder had a LOG.info("Filtered {} files ...") line for observability. This was lost during extraction since GlobalIndexBuilderUtils has no logger. Consider adding one — this log is useful for debugging index build issues in production.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add log done

/** Keep only entries whose firstRowId is strictly less than the given boundary. */
public static List<ManifestEntry> filterEntriesBefore(
List<ManifestEntry> entries, long boundaryRowId) {
if (boundaryRowId == Long.MAX_VALUE) {
return entries;
}
List<ManifestEntry> result = new ArrayList<>();
for (ManifestEntry entry : entries) {
if (entry.file().firstRowId() != null
&& entry.file().nonNullFirstRowId() < boundaryRowId) {
result.add(entry);
}
}
LOG.info(
"Filtered {} files to {} indexable files (boundaryRowId={}).",
entries.size(),
result.size(),
boundaryRowId);
return result;
}

private static GlobalIndexFileReadWrite createGlobalIndexFileReadWrite(FileStoreTable table) {
IndexPathFactory indexPathFactory = table.store().pathFactory().globalIndexFileFactory();
return new GlobalIndexFileReadWrite(table.fileIO(), indexPathFactory);
Expand Down
Loading