diff --git a/.github/workflows/utcase-vector-index.yml b/.github/workflows/utcase-vector-index.yml
new file mode 100644
index 000000000000..fa3da08567c3
--- /dev/null
+++ b/.github/workflows/utcase-vector-index.yml
@@ -0,0 +1,74 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+name: UTCase Vector Index
+
+on:
+ push:
+ paths:
+ - 'paimon-vector/**'
+ pull_request:
+ paths:
+ - 'paimon-vector/**'
+
+env:
+ JDK_VERSION: 8
+ MAVEN_OPTS: -Dmaven.wagon.httpconnectionManager.ttlSeconds=30 -Dmaven.wagon.http.retryHandler.requestSentEnabled=true
+
+concurrency:
+ group: ${{ github.workflow }}-${{ github.event_name }}-${{ github.event.number || github.run_id }}
+ cancel-in-progress: true
+
+jobs:
+ vector_index_test:
+ runs-on: ubuntu-latest
+
+ steps:
+ - name: Checkout code
+ uses: actions/checkout@v6
+
+ - name: Set up JDK ${{ env.JDK_VERSION }}
+ uses: actions/setup-java@v5
+ with:
+ java-version: ${{ env.JDK_VERSION }}
+ distribution: 'temurin'
+
+ - name: Install Rust toolchain
+ run: |
+ curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y --default-toolchain stable --profile minimal
+ echo "$HOME/.cargo/bin" >> $GITHUB_PATH
+
+ - name: Clone and build paimon-vector-index native library
+ run: |
+ git clone --depth 1 https://github.com/apache/paimon-vector-index.git /tmp/paimon-vector-index
+ cd /tmp/paimon-vector-index
+ cargo build --release -p paimon-vindex-jni
+
+ - name: Copy native library to resources
+ run: |
+ RESOURCE_DIR=paimon-vector/paimon-vector-jni/src/main/resources/native/linux-amd64
+ mkdir -p ${RESOURCE_DIR}
+ cp /tmp/paimon-vector-index/target/release/libpaimon_vindex_jni.so ${RESOURCE_DIR}/
+
+ - name: Build and test vector index modules
+ timeout-minutes: 30
+ run: |
+ mvn -T 2C -B -ntp clean install -DskipTests
+ mvn -B -ntp verify -pl paimon-vector/paimon-vector-jni,paimon-vector/paimon-vector-index -Dcheckstyle.skip=true -Dspotless.check.skip=true
+ env:
+ MAVEN_OPTS: -Xmx4096m
diff --git a/docs/docs/flink/procedures.md b/docs/docs/flink/procedures.md
index 31601354f1a8..0dbf8f0c1bc7 100644
--- a/docs/docs/flink/procedures.md
+++ b/docs/docs/flink/procedures.md
@@ -1004,7 +1004,7 @@ All available procedures are listed below.
To create a global index on a table for accelerating queries. Arguments:
table(required): the target table identifier.
index_column(required): the column name to build index on.
-
index_type(required): the type of global index, supported types include 'btree', 'lumina', 'tantivy-fulltext'.
+
index_type(required): the type of global index, supported types include 'btree', 'ivf-flat', 'ivf-pq', 'ivf-hnsw-flat', 'ivf-hnsw-sq', 'tantivy-fulltext'.
partitions(optional): partition filter for selective index creation.
options(optional): additional dynamic options for index creation.
diff --git a/docs/docs/learn-paimon/scenario-guide.mdx b/docs/docs/learn-paimon/scenario-guide.mdx
index 341ceddae99e..4aae116bc079 100644
--- a/docs/docs/learn-paimon/scenario-guide.mdx
+++ b/docs/docs/learn-paimon/scenario-guide.mdx
@@ -44,7 +44,7 @@ configurations that are suited for different scenarios.
| Queue-like ordered streaming | Append Table | `bucket = N, bucket-key = col` |
| Large-scale OLAP with ad-hoc queries | Append Table | Incremental Clustering |
| Store images / videos / documents | Append Table (Blob) | `__BLOB_FIELD` comment, Data Evolution enabled |
-| AI vector search / RAG | Append Table (Vector) | `VECTOR` type, Global Index (DiskANN) |
+| AI vector search / RAG | Append Table (Vector) | `VECTOR` type, Vector Global Index |
| AI feature engineering & column evolution | Append Table | `data-evolution.enabled = true` |
| Python AI pipeline (Ray / PyTorch) | Append Table | PyPaimon SDK |
@@ -456,21 +456,19 @@ Schema schema = Schema.newBuilder()
**Build the vector index and search:**
```sql
--- Build DiskANN vector index
+-- Build IVF-PQ vector index
CALL sys.create_global_index(
table => 'db.doc_embeddings',
index_column => 'embedding',
- index_type => 'lumina',
- options => 'lumina.index.dimension=768'
+ index_type => 'ivf-pq',
+ options => 'vector.distance.metric=cosine,vector.nlist=256,vector.pq.m=16'
);
-- Search for top-5 nearest neighbors
SELECT * FROM vector_search('doc_embeddings', 'embedding', array(0.1f, 0.2f, ...), 5);
```
-The legacy index type `lumina-vector-ann` is still accepted for existing tables and SQL compatibility.
-
-**Why:** The [Global Index](../multimodal-table/global-index) with DiskANN provides high-performance ANN search.
+**Why:** The [Global Index](../multimodal-table/global-index) with vector indexes provides high-performance ANN search.
Vector data is stored in dedicated `.vector.lance` files optimized for dense vectors, while scalar columns stay in
Parquet. You can also build a **BTree Index** on scalar columns for efficient filtering:
@@ -664,7 +662,7 @@ Do you need upsert / update / delete?
│
└── AI / Multimodal scenarios? → Enable Data Evolution
├── Store images / videos / docs? → Blob Table (__BLOB_FIELD comment)
- ├── Vector search / RAG? → VECTOR type + Global Index (DiskANN)
+ ├── Vector search / RAG? → VECTOR type + Vector Global Index
├── Feature engineering? → Data Evolution (MERGE INTO partial columns)
└── Python pipeline? → PyPaimon (Ray / PyTorch / Pandas)
```
diff --git a/docs/docs/multimodal-table/global-index.mdx b/docs/docs/multimodal-table/global-index.mdx
index 8edcfe87e2ad..747ccbaa9ad0 100644
--- a/docs/docs/multimodal-table/global-index.mdx
+++ b/docs/docs/multimodal-table/global-index.mdx
@@ -33,7 +33,7 @@ Global Index is a powerful indexing mechanism for Data Evolution (append) tables
without full-table scans. Paimon supports multiple global index types:
- **BTree Index**: A B-tree based index for scalar column lookups. Supports equality, IN, range predicates, and can be combined across multiple columns with AND/OR logic.
-- **Vector Index**: An approximate nearest neighbor (ANN) index powered by DiskANN for vector similarity search.
+- **Vector Index**: An approximate nearest neighbor (ANN) index powered by Paimon's vector index library for vector similarity search.
- **Full-Text Index**: A full-text search index powered by Tantivy for text retrieval. Supports term matching and relevance scoring.
Global indexes work on top of Data Evolution tables. To use global indexes, your table **must** have:
@@ -87,26 +87,55 @@ SELECT * FROM my_table WHERE name IN ('a200', 'a300');
## Vector Index
-Vector Index provides approximate nearest neighbor (ANN) search based on the DiskANN algorithm. It is suitable for
-vector similarity search scenarios such as recommendation systems, image retrieval, and RAG (Retrieval Augmented
-Generation) applications.
+Vector Index provides approximate nearest neighbor (ANN) search for vector similarity search scenarios such as
+recommendation systems, image retrieval, and RAG (Retrieval Augmented Generation) applications.
+
+Supported vector index types:
+
+| Index Type | Description |
+|---|---|
+| `ivf-flat` | IVF index with flat vector storage. |
+| `ivf-pq` | IVF index with product quantization. |
+| `ivf-hnsw-flat` | IVF index with HNSW flat quantizer. |
+| `ivf-hnsw-sq` | IVF index with HNSW scalar quantizer. |
**Build Vector Index**
```sql
--- Create Lumina vector index on 'embedding' column
+-- Create IVF-PQ vector index on 'embedding' column
CALL sys.create_global_index(
table => 'db.my_table',
index_column => 'embedding',
- index_type => 'lumina',
- options => 'lumina.index.dimension=128'
+ index_type => 'ivf-pq',
+ options => 'ivf-pq.distance.metric=cosine,ivf-pq.nlist=256,ivf-pq.pq.m=16'
);
```
-The legacy index type `lumina-vector-ann` is still accepted for existing tables and SQL compatibility.
+For `ARRAY` vector columns, specify the vector dimension with `.dimension`.
+For `VECTOR` columns, Paimon uses the dimension from the column type.
+
+Supported vector index options:
+
+| Option | Default | Description |
+|---|---|---|
+| `.dimension` | `128` | Vector dimension for `ARRAY` columns. Ignored for `VECTOR` columns. |
+| `.distance.metric` | `inner_product` | Distance metric. Supported values: `l2`, `cosine`, `inner_product`. |
+| `.nlist` | `256` | Number of IVF clusters used during index build. |
+| `.pq.m` | `16` | Number of PQ sub-vectors for `ivf-pq`. The vector dimension must be divisible by this value. |
+| `.pq.use-opq` | `false` | Whether to enable OPQ for `ivf-pq`. |
+| `.hnsw.m` | `20` | HNSW graph out-degree for `ivf-hnsw-flat` and `ivf-hnsw-sq`. |
+| `.hnsw.ef-construction` | `150` | HNSW construction search width for `ivf-hnsw-flat` and `ivf-hnsw-sq`. |
+| `.hnsw.max-level` | `7` | Maximum HNSW level for `ivf-hnsw-flat` and `ivf-hnsw-sq`. |
**Vector Search**
+Search-time options are passed with each vector search request:
+
+| Option | Default | Description |
+|---|---|---|
+| `ivf.nprobe` | `16` | Number of IVF clusters to probe during search. |
+| `hnsw.ef_search` | `0` | HNSW search width during search. `0` uses the native library default. |
+
@@ -155,6 +184,7 @@ GlobalIndexResult result = table.newVectorSearchBuilder()
.withVector(queryVector)
.withLimit(5)
.withVectorColumn("embedding")
+ .withOption("ivf.nprobe", "16")
.executeLocal();
// Step 2: Read matching rows using the search result
diff --git a/docs/docs/multimodal-table/index.mdx b/docs/docs/multimodal-table/index.mdx
index 4e0f42a38712..f0a4b2975082 100644
--- a/docs/docs/multimodal-table/index.mdx
+++ b/docs/docs/multimodal-table/index.mdx
@@ -37,7 +37,7 @@ Key capabilities:
- **[Data Evolution](./data-evolution)**: Update partial columns without rewriting entire files, enabling efficient schema evolution.
- **[Blob Storage](./blob)**: Store large binary objects (images, videos, audio) in dedicated `.blob` files with efficient column projection.
- **[Vector Storage](./vector)**: Store and manage vector embeddings in dedicated Vortex-format files optimized for vector workloads.
-- **[Global Index](./global-index)**: Build BTree, vector (DiskANN), and full-text (Tantivy) indexes for efficient lookups and similarity search.
+- **[Global Index](./global-index)**: Build BTree, vector, and full-text (Tantivy) indexes for efficient lookups and similarity search.
All multimodal features require the following table properties:
diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/VectoredReadUtils.java b/paimon-common/src/main/java/org/apache/paimon/fs/VectoredReadUtils.java
index bda768ef96f6..8f1551bff988 100644
--- a/paimon-common/src/main/java/org/apache/paimon/fs/VectoredReadUtils.java
+++ b/paimon-common/src/main/java/org/apache/paimon/fs/VectoredReadUtils.java
@@ -51,20 +51,33 @@ public static void readVectored(VectoredReadable readable, List extends FileRa
if (ranges.isEmpty()) {
return;
}
+ readVectored(readable, ranges, ReadOptions.from(readable));
+ }
+
+ public static void readVectored(
+ VectoredReadable readable, List extends FileRange> ranges, ReadOptions options)
+ throws IOException {
+ if (ranges.isEmpty()) {
+ return;
+ }
+ requireNonNull(readable, "readable is null");
+ requireNonNull(options, "options is null");
List extends FileRange> sortRanges = validateAndSortRanges(ranges);
List combinedRanges =
- mergeSortedRanges(sortRanges, readable.minSeekForVectorReads());
+ mergeSortedRanges(sortRanges, options.minSeekForVectorReads);
- int parallelism = readable.parallelismForVectorReads();
+ int parallelism = options.parallelismForVectorReads;
- if (combinedRanges.size() == 1 && readable instanceof SeekableInputStream) {
+ if (options.sequentialReadFallback
+ && combinedRanges.size() == 1
+ && readable instanceof SeekableInputStream) {
fallbackToReadSequence((SeekableInputStream) readable, sortRanges);
return;
}
BlockingExecutor executor = new BlockingExecutor(IO_THREAD_POOL, parallelism);
- long batchSize = readable.batchSizeForVectorReads();
+ long batchSize = options.batchSizeForVectorReads;
for (CombinedRange combinedRange : combinedRanges) {
if (combinedRange.underlying.size() == 1) {
FileRange fileRange = combinedRange.underlying.get(0);
@@ -76,12 +89,95 @@ public static void readVectored(VectoredReadable readable, List extends FileRa
List> futures =
splitBatches.stream().map(FileRange::getData).collect(Collectors.toList());
CompletableFuture.allOf(futures.toArray(new CompletableFuture>[0]))
- .thenAcceptAsync(
- unused -> copyToFileRanges(combinedRange, futures), IO_THREAD_POOL);
+ .whenCompleteAsync(
+ (unused, throwable) -> {
+ if (throwable == null) {
+ try {
+ copyToFileRanges(combinedRange, futures);
+ } catch (Throwable t) {
+ completeFileRangesExceptionally(combinedRange, t);
+ }
+ } else {
+ completeFileRangesExceptionally(combinedRange, throwable);
+ }
+ },
+ IO_THREAD_POOL);
}
}
}
+ /** Options for vectored reads. */
+ public static class ReadOptions {
+
+ private final int minSeekForVectorReads;
+ private final long batchSizeForVectorReads;
+ private final int parallelismForVectorReads;
+ private final boolean sequentialReadFallback;
+
+ public static ReadOptions from(VectoredReadable readable) {
+ return new ReadOptions(
+ readable.minSeekForVectorReads(),
+ readable.batchSizeForVectorReads(),
+ readable.parallelismForVectorReads(),
+ true);
+ }
+
+ public ReadOptions(
+ int minSeekForVectorReads,
+ long batchSizeForVectorReads,
+ int parallelismForVectorReads,
+ boolean sequentialReadFallback) {
+ checkArgument(
+ minSeekForVectorReads >= 0,
+ "minSeekForVectorReads must be non-negative: %s",
+ minSeekForVectorReads);
+ checkArgument(
+ batchSizeForVectorReads > 0,
+ "batchSizeForVectorReads must be positive: %s",
+ batchSizeForVectorReads);
+ checkArgument(
+ parallelismForVectorReads > 0,
+ "parallelismForVectorReads must be positive: %s",
+ parallelismForVectorReads);
+ this.minSeekForVectorReads = minSeekForVectorReads;
+ this.batchSizeForVectorReads = batchSizeForVectorReads;
+ this.parallelismForVectorReads = parallelismForVectorReads;
+ this.sequentialReadFallback = sequentialReadFallback;
+ }
+
+ public ReadOptions withMinSeekForVectorReads(int minSeekForVectorReads) {
+ return new ReadOptions(
+ minSeekForVectorReads,
+ batchSizeForVectorReads,
+ parallelismForVectorReads,
+ sequentialReadFallback);
+ }
+
+ public ReadOptions withBatchSizeForVectorReads(long batchSizeForVectorReads) {
+ return new ReadOptions(
+ minSeekForVectorReads,
+ batchSizeForVectorReads,
+ parallelismForVectorReads,
+ sequentialReadFallback);
+ }
+
+ public ReadOptions withParallelismForVectorReads(int parallelismForVectorReads) {
+ return new ReadOptions(
+ minSeekForVectorReads,
+ batchSizeForVectorReads,
+ parallelismForVectorReads,
+ sequentialReadFallback);
+ }
+
+ public ReadOptions withSequentialReadFallback(boolean sequentialReadFallback) {
+ return new ReadOptions(
+ minSeekForVectorReads,
+ batchSizeForVectorReads,
+ parallelismForVectorReads,
+ sequentialReadFallback);
+ }
+ }
+
private static void fallbackToReadSequence(
SeekableInputStream in, List extends FileRange> ranges) throws IOException {
for (FileRange range : ranges) {
@@ -126,6 +222,13 @@ private static void copyToFileRanges(
}
}
+ private static void completeFileRangesExceptionally(
+ CombinedRange combinedRange, Throwable throwable) {
+ for (FileRange fileRange : combinedRange.underlying) {
+ fileRange.getData().completeExceptionally(throwable);
+ }
+ }
+
private static void copyMultiBytesToBytes(
List segments, int offset, byte[] bytes, int numBytes) {
int remainSize = numBytes;
diff --git a/paimon-common/src/test/java/org/apache/paimon/fs/VectoredReadUtilsTest.java b/paimon-common/src/test/java/org/apache/paimon/fs/VectoredReadUtilsTest.java
index a3264b08e20c..1cd5476e63e4 100644
--- a/paimon-common/src/test/java/org/apache/paimon/fs/VectoredReadUtilsTest.java
+++ b/paimon-common/src/test/java/org/apache/paimon/fs/VectoredReadUtilsTest.java
@@ -25,9 +25,14 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
class VectoredReadUtilsTest {
@@ -127,4 +132,108 @@ public void testRandom() throws Exception {
}
doTest(ranges);
}
+
+ @Test
+ public void testReadOptionsCanDisableSequentialReadFallback() throws Exception {
+ TestSeekableVectoredReadable readable = new TestSeekableVectoredReadable(2);
+
+ List ranges =
+ Arrays.asList(
+ FileRange.createFileRange(0, 100), FileRange.createFileRange(150, 100));
+ VectoredReadUtils.ReadOptions options =
+ new VectoredReadUtils.ReadOptions(1000, 100, 2, false);
+
+ VectoredReadUtils.readVectored(readable, ranges, options);
+ assertThat(readable.readsStarted.await(5, TimeUnit.SECONDS)).isTrue();
+ readable.finishReads.countDown();
+
+ for (FileRange range : ranges) {
+ assertThat(range.getData().get(5, TimeUnit.SECONDS)).hasSize(range.getLength());
+ }
+ assertThat(readable.reads).hasValue(2);
+ assertThat(readable.sequentialReads).hasValue(0);
+ assertThat(readable.maxActiveReads).hasValue(2);
+ }
+
+ @Test
+ public void testReadOptionsPropagateSplitReadFailure() throws Exception {
+ VectoredReadable readable =
+ new VectoredReadable() {
+ @Override
+ public int pread(long position, byte[] buffer, int offset, int length)
+ throws IOException {
+ throw new IOException("failed");
+ }
+ };
+
+ List ranges =
+ Arrays.asList(
+ FileRange.createFileRange(0, 100), FileRange.createFileRange(150, 100));
+ VectoredReadUtils.ReadOptions options =
+ new VectoredReadUtils.ReadOptions(1000, 100, 2, false);
+
+ VectoredReadUtils.readVectored(readable, ranges, options);
+
+ assertThatThrownBy(() -> ranges.get(0).getData().get(5, TimeUnit.SECONDS))
+ .isInstanceOf(ExecutionException.class)
+ .hasMessageContaining("failed");
+ assertThatThrownBy(() -> ranges.get(1).getData().get(5, TimeUnit.SECONDS))
+ .isInstanceOf(ExecutionException.class)
+ .hasMessageContaining("failed");
+ }
+
+ private class TestSeekableVectoredReadable extends SeekableInputStream
+ implements VectoredReadable {
+
+ private final CountDownLatch readsStarted;
+ private final CountDownLatch finishReads = new CountDownLatch(1);
+ private final AtomicInteger reads = new AtomicInteger();
+ private final AtomicInteger sequentialReads = new AtomicInteger();
+ private final AtomicInteger activeReads = new AtomicInteger();
+ private final AtomicInteger maxActiveReads = new AtomicInteger();
+
+ private TestSeekableVectoredReadable(int expectedReads) {
+ this.readsStarted = new CountDownLatch(expectedReads);
+ }
+
+ @Override
+ public void seek(long desired) {}
+
+ @Override
+ public long getPos() {
+ return 0;
+ }
+
+ @Override
+ public int read() throws IOException {
+ throw new IOException("Sequential read should not be used");
+ }
+
+ @Override
+ public int read(byte[] buffer, int offset, int length) throws IOException {
+ sequentialReads.incrementAndGet();
+ throw new IOException("Sequential read should not be used");
+ }
+
+ @Override
+ public void close() {}
+
+ @Override
+ public int pread(long position, byte[] buffer, int offset, int length) throws IOException {
+ int active = activeReads.incrementAndGet();
+ maxActiveReads.accumulateAndGet(active, Math::max);
+ readsStarted.countDown();
+ try {
+ finishReads.await();
+ System.arraycopy(bytes, (int) position, buffer, offset, length);
+ reads.incrementAndGet();
+ return length;
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IOException(e);
+ } finally {
+ activeReads.decrementAndGet();
+ }
+ }
+ }
}
diff --git a/paimon-vector/paimon-vector-index/pom.xml b/paimon-vector/paimon-vector-index/pom.xml
new file mode 100644
index 000000000000..6744de7c9e4a
--- /dev/null
+++ b/paimon-vector/paimon-vector-index/pom.xml
@@ -0,0 +1,200 @@
+
+
+
+ 4.0.0
+
+
+ paimon-vector
+ org.apache.paimon
+ 1.5-SNAPSHOT
+
+
+ paimon-vector-index
+ Paimon : Vector Index
+
+
+
+ org.apache.paimon
+ paimon-vector-jni
+ ${project.version}
+
+
+
+ org.apache.paimon
+ paimon-common
+ ${project.version}
+ provided
+
+
+
+ org.apache.paimon
+ paimon-shade-jackson-2
+ ${paimon.shade.jackson.version}-${paimon.shade.version}
+
+
+
+
+ org.junit.jupiter
+ junit-jupiter
+ ${junit5.version}
+ test
+
+
+
+ org.apache.paimon
+ paimon-core
+ ${project.version}
+ test
+
+
+
+ org.apache.paimon
+ paimon-core
+ ${project.version}
+ test-jar
+ test
+
+
+
+ org.apache.paimon
+ paimon-common
+ ${project.version}
+ test-jar
+ test
+
+
+
+ org.apache.paimon
+ paimon-format
+ ${project.version}
+ test
+
+
+
+ org.apache.paimon
+ paimon-test-utils
+ ${project.version}
+ test
+
+
+
+ org.apache.hadoop
+ hadoop-hdfs-client
+ ${hadoop.version}
+ test
+
+
+ org.apache.avro
+ avro
+
+
+ log4j
+ log4j
+
+
+ org.slf4j
+ slf4j-log4j12
+
+
+
+
+
+ org.apache.hadoop
+ hadoop-common
+ ${hadoop.version}
+ test
+
+
+ org.apache.avro
+ avro
+
+
+ log4j
+ log4j
+
+
+ org.slf4j
+ slf4j-log4j12
+
+
+ jdk.tools
+ jdk.tools
+
+
+ com.google.protobuf
+ protobuf-java
+
+
+
+
+
+ org.apache.hadoop
+ hadoop-mapreduce-client-core
+ ${hadoop.version}
+ test
+
+
+ org.apache.avro
+ avro
+
+
+ com.google.protobuf
+ protobuf-java
+
+
+ ch.qos.reload4j
+ reload4j
+
+
+ org.slf4j
+ slf4j-reload4j
+
+
+ log4j
+ log4j
+
+
+ org.slf4j
+ slf4j-log4j12
+
+
+ jdk.tools
+ jdk.tools
+
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+
+ 1
+ true
+ none
+
+
+
+
+
diff --git a/paimon-vector/paimon-vector-index/src/main/java/org/apache/paimon/vector/index/IvfFlatVectorGlobalIndexerFactory.java b/paimon-vector/paimon-vector-index/src/main/java/org/apache/paimon/vector/index/IvfFlatVectorGlobalIndexerFactory.java
new file mode 100644
index 000000000000..572c7cf4edb2
--- /dev/null
+++ b/paimon-vector/paimon-vector-index/src/main/java/org/apache/paimon/vector/index/IvfFlatVectorGlobalIndexerFactory.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.vector.index;
+
+/** Factory for the {@code ivf-flat} vector index identifier. */
+public class IvfFlatVectorGlobalIndexerFactory extends VectorGlobalIndexerFactory {
+
+ public static final String IDENTIFIER = "ivf-flat";
+
+ @Override
+ public String identifier() {
+ return IDENTIFIER;
+ }
+}
diff --git a/paimon-vector/paimon-vector-index/src/main/java/org/apache/paimon/vector/index/IvfHnswFlatVectorGlobalIndexerFactory.java b/paimon-vector/paimon-vector-index/src/main/java/org/apache/paimon/vector/index/IvfHnswFlatVectorGlobalIndexerFactory.java
new file mode 100644
index 000000000000..159e7af6f1ba
--- /dev/null
+++ b/paimon-vector/paimon-vector-index/src/main/java/org/apache/paimon/vector/index/IvfHnswFlatVectorGlobalIndexerFactory.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.vector.index;
+
+/** Factory for the {@code ivf-hnsw-flat} vector index identifier. */
+public class IvfHnswFlatVectorGlobalIndexerFactory extends VectorGlobalIndexerFactory {
+
+ public static final String IDENTIFIER = "ivf-hnsw-flat";
+
+ @Override
+ public String identifier() {
+ return IDENTIFIER;
+ }
+}
diff --git a/paimon-vector/paimon-vector-index/src/main/java/org/apache/paimon/vector/index/IvfHnswSqVectorGlobalIndexerFactory.java b/paimon-vector/paimon-vector-index/src/main/java/org/apache/paimon/vector/index/IvfHnswSqVectorGlobalIndexerFactory.java
new file mode 100644
index 000000000000..51c72cd8f39c
--- /dev/null
+++ b/paimon-vector/paimon-vector-index/src/main/java/org/apache/paimon/vector/index/IvfHnswSqVectorGlobalIndexerFactory.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.vector.index;
+
+/** Factory for the {@code ivf-hnsw-sq} vector index identifier. */
+public class IvfHnswSqVectorGlobalIndexerFactory extends VectorGlobalIndexerFactory {
+
+ public static final String IDENTIFIER = "ivf-hnsw-sq";
+
+ @Override
+ public String identifier() {
+ return IDENTIFIER;
+ }
+}
diff --git a/paimon-vector/paimon-vector-index/src/main/java/org/apache/paimon/vector/index/IvfPqAlgorithmVectorGlobalIndexerFactory.java b/paimon-vector/paimon-vector-index/src/main/java/org/apache/paimon/vector/index/IvfPqAlgorithmVectorGlobalIndexerFactory.java
new file mode 100644
index 000000000000..f3932de46ed6
--- /dev/null
+++ b/paimon-vector/paimon-vector-index/src/main/java/org/apache/paimon/vector/index/IvfPqAlgorithmVectorGlobalIndexerFactory.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.vector.index;
+
+/** Factory for the {@code ivf-pq} vector index identifier. */
+public class IvfPqAlgorithmVectorGlobalIndexerFactory extends VectorGlobalIndexerFactory {
+
+ public static final String IDENTIFIER = "ivf-pq";
+
+ @Override
+ public String identifier() {
+ return IDENTIFIER;
+ }
+}
diff --git a/paimon-vector/paimon-vector-index/src/main/java/org/apache/paimon/vector/index/VectorGlobalIndexReader.java b/paimon-vector/paimon-vector-index/src/main/java/org/apache/paimon/vector/index/VectorGlobalIndexReader.java
new file mode 100644
index 000000000000..54f8532a2227
--- /dev/null
+++ b/paimon-vector/paimon-vector-index/src/main/java/org/apache/paimon/vector/index/VectorGlobalIndexReader.java
@@ -0,0 +1,464 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.vector.index;
+
+import org.apache.paimon.fs.FileRange;
+import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.fs.VectoredReadUtils;
+import org.apache.paimon.fs.VectoredReadable;
+import org.apache.paimon.globalindex.GlobalIndexIOMeta;
+import org.apache.paimon.globalindex.GlobalIndexReader;
+import org.apache.paimon.globalindex.GlobalIndexResult;
+import org.apache.paimon.globalindex.ScoredGlobalIndexResult;
+import org.apache.paimon.globalindex.io.GlobalIndexFileReader;
+import org.apache.paimon.index.vector.VectorIndexInput;
+import org.apache.paimon.index.vector.VectorIndexMetadata;
+import org.apache.paimon.index.vector.VectorIndexReader;
+import org.apache.paimon.index.vector.VectorSearchResult;
+import org.apache.paimon.predicate.FieldRef;
+import org.apache.paimon.predicate.VectorSearch;
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.FloatType;
+import org.apache.paimon.types.VectorType;
+import org.apache.paimon.utils.IOUtils;
+import org.apache.paimon.utils.RoaringNavigableMap64;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/**
+ * Vector global index reader using paimon-vector-index.
+ *
+ *
Each shard has exactly one vector index file. The reader lazily opens the index and performs
+ * vector similarity search.
+ */
+public class VectorGlobalIndexReader implements GlobalIndexReader {
+
+ private static final String NPROBE_PARAMETER = "ivf.nprobe";
+ private static final String EF_SEARCH_PARAMETER = "hnsw.ef_search";
+ private static final int DEFAULT_NPROBE = 16;
+ private static final int DEFAULT_EF_SEARCH = 0;
+ private static final int VECTOR_INDEX_MIN_SEEK_FOR_VECTOR_READS = 16 * 1024;
+ private static final int VECTOR_INDEX_PARALLELISM_FOR_VECTOR_READS = 32;
+
+ private final GlobalIndexIOMeta ioMeta;
+ private final GlobalIndexFileReader fileReader;
+ private final DataType fieldType;
+ private final ExecutorService executor;
+
+ private volatile VectorIndexMetadata nativeMeta;
+ private volatile VectorIndexReader vectorReader;
+ private SeekableInputStream openStream;
+
+ public VectorGlobalIndexReader(
+ GlobalIndexFileReader fileReader,
+ List ioMetas,
+ DataType fieldType,
+ ExecutorService executor) {
+ checkArgument(ioMetas.size() == 1, "Expected exactly one index file per shard");
+ this.executor = executor;
+ this.fileReader = fileReader;
+ this.ioMeta = ioMetas.get(0);
+ this.fieldType = fieldType;
+ }
+
+ @Override
+ public CompletableFuture> visitVectorSearch(
+ VectorSearch vectorSearch) {
+ return CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ ensureLoaded();
+ return Optional.ofNullable(search(vectorSearch));
+ } catch (IOException e) {
+ throw new RuntimeException(
+ String.format(
+ "Failed vector index search: field=%s, limit=%d",
+ vectorSearch.fieldName(), vectorSearch.limit()),
+ e);
+ }
+ },
+ executor);
+ }
+
+ private ScoredGlobalIndexResult search(VectorSearch vectorSearch) throws IOException {
+ validateSearchVector(vectorSearch.vector());
+ float[] queryVector = vectorSearch.vector().clone();
+ int limit = vectorSearch.limit();
+ int nprobe = nprobe(vectorSearch.options());
+ int efSearch = efSearch(vectorSearch.options());
+ String metric = nativeMeta.metric();
+
+ RoaringNavigableMap64 includeRowIds = vectorSearch.includeRowIds();
+ VectorSearchResult result;
+
+ if (includeRowIds != null) {
+ long cardinality = includeRowIds.getLongCardinality();
+ if (cardinality == 0) {
+ return null;
+ }
+ byte[] filterBytes = includeRowIds.serialize();
+ int effectiveK = (int) Math.min(limit, cardinality);
+ result = vectorReader.search(queryVector, effectiveK, nprobe, efSearch, filterBytes);
+ } else {
+ result = vectorReader.search(queryVector, limit, nprobe, efSearch);
+ }
+
+ long[] ids = result.ids();
+ float[] distances = result.distances();
+
+ if (ids.length == 0) {
+ return null;
+ }
+
+ RoaringNavigableMap64 resultBitmap = new RoaringNavigableMap64();
+ HashMap id2scores = new HashMap<>(ids.length);
+
+ for (int i = 0; i < ids.length; i++) {
+ long rowId = ids[i];
+ if (rowId < 0) {
+ continue;
+ }
+ float score = convertDistanceToScore(distances[i], metric);
+ resultBitmap.add(rowId);
+ id2scores.put(rowId, score);
+ }
+
+ if (resultBitmap.isEmpty()) {
+ return null;
+ }
+
+ return ScoredGlobalIndexResult.create(
+ resultBitmap,
+ rowId -> {
+ Float score = id2scores.get(rowId);
+ if (score == null) {
+ throw new IllegalArgumentException(
+ "No score found for rowId: "
+ + rowId
+ + ". Only rowIds present in results() are valid.");
+ }
+ return score;
+ });
+ }
+
+ private static float convertDistanceToScore(float distance, String metric) {
+ if ("l2".equals(metric)) {
+ return 1.0f / (1.0f + distance);
+ } else if ("cosine".equals(metric)) {
+ return 1.0f - distance;
+ } else if ("inner_product".equals(metric)) {
+ return distance;
+ }
+ throw new IllegalArgumentException("Unknown metric: " + metric);
+ }
+
+ static int nprobe(Map parameters) {
+ return intParameter(parameters, NPROBE_PARAMETER, DEFAULT_NPROBE);
+ }
+
+ static int efSearch(Map parameters) {
+ return intParameter(parameters, EF_SEARCH_PARAMETER, DEFAULT_EF_SEARCH);
+ }
+
+ private static int intParameter(Map parameters, String key, int defaultValue) {
+ String value = parameters.get(key);
+ if (value == null) {
+ return defaultValue;
+ }
+ try {
+ return Integer.parseInt(value);
+ } catch (NumberFormatException e) {
+ throw new IllegalArgumentException(
+ "Invalid value for '" + key + "': " + value + ". Must be an integer.", e);
+ }
+ }
+
+ private void validateSearchVector(Object vector) {
+ if (!(vector instanceof float[])) {
+ throw new IllegalArgumentException(
+ "Expected float[] vector but got: " + vector.getClass());
+ }
+ boolean validFieldType = false;
+ if (fieldType instanceof VectorType) {
+ validFieldType = ((VectorType) fieldType).getElementType() instanceof FloatType;
+ } else if (fieldType instanceof ArrayType) {
+ validFieldType = ((ArrayType) fieldType).getElementType() instanceof FloatType;
+ }
+ if (!validFieldType) {
+ throw new IllegalArgumentException(
+ "Vector index requires VectorType or ArrayType, but field type is: "
+ + fieldType);
+ }
+ int queryDim = ((float[]) vector).length;
+ if (queryDim != nativeMeta.dimension()) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Query vector dimension mismatch: index expects %d, but got %d",
+ nativeMeta.dimension(), queryDim));
+ }
+ }
+
+ private void ensureLoaded() throws IOException {
+ if (vectorReader == null) {
+ synchronized (this) {
+ if (vectorReader == null) {
+ SeekableInputStream in = fileReader.getInputStream(ioMeta);
+ try {
+ vectorReader =
+ new VectorIndexReader(new SeekableStreamVectorIndexInput(in));
+ nativeMeta = vectorReader.metadata();
+ openStream = in;
+ } catch (Exception e) {
+ IOUtils.closeQuietly(in);
+ throw e;
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ Throwable firstException = null;
+
+ if (vectorReader != null) {
+ try {
+ vectorReader.close();
+ } catch (Throwable t) {
+ firstException = t;
+ }
+ vectorReader = null;
+ }
+
+ if (openStream != null) {
+ try {
+ openStream.close();
+ } catch (Throwable t) {
+ if (firstException == null) {
+ firstException = t;
+ } else {
+ firstException.addSuppressed(t);
+ }
+ }
+ openStream = null;
+ }
+
+ if (firstException != null) {
+ if (firstException instanceof IOException) {
+ throw (IOException) firstException;
+ } else if (firstException instanceof RuntimeException) {
+ throw (RuntimeException) firstException;
+ } else {
+ throw new RuntimeException(
+ "Failed to close vector global index reader", firstException);
+ }
+ }
+ }
+
+ static class SeekableStreamVectorIndexInput implements VectorIndexInput {
+
+ private final SeekableInputStream input;
+
+ SeekableStreamVectorIndexInput(SeekableInputStream input) {
+ this.input = input;
+ }
+
+ @Override
+ public void pread(long[] positions, byte[][] buffers) {
+ if (positions.length != buffers.length) {
+ throw new IllegalArgumentException(
+ "positions length "
+ + positions.length
+ + " != buffers length "
+ + buffers.length);
+ }
+ try {
+ if (input instanceof VectoredReadable
+ && areRangesNonOverlapping(positions, buffers)) {
+ preadVectored((VectoredReadable) input, positions, buffers);
+ } else {
+ synchronized (this) {
+ preadSequential(positions, buffers);
+ }
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to read vector index", e);
+ }
+ }
+
+ private void preadVectored(VectoredReadable readable, long[] positions, byte[][] buffers)
+ throws IOException {
+ List ranges = new ArrayList<>(positions.length);
+ for (int i = 0; i < positions.length; i++) {
+ ranges.add(FileRange.createFileRange(positions[i], buffers[i].length));
+ }
+
+ VectoredReadUtils.ReadOptions options =
+ VectoredReadUtils.ReadOptions.from(readable)
+ .withMinSeekForVectorReads(VECTOR_INDEX_MIN_SEEK_FOR_VECTOR_READS)
+ .withParallelismForVectorReads(
+ VECTOR_INDEX_PARALLELISM_FOR_VECTOR_READS)
+ .withSequentialReadFallback(false);
+ VectoredReadUtils.readVectored(readable, ranges, options);
+
+ for (int i = 0; i < ranges.size(); i++) {
+ byte[] bytes = ranges.get(i).getData().join();
+ System.arraycopy(bytes, 0, buffers[i], 0, bytes.length);
+ }
+ }
+
+ private void preadSequential(long[] positions, byte[][] buffers) throws IOException {
+ for (int i = 0; i < positions.length; i++) {
+ input.seek(positions[i]);
+ readFully(input, buffers[i]);
+ }
+ }
+
+ private static void readFully(SeekableInputStream input, byte[] buffer) throws IOException {
+ int offset = 0;
+ while (offset < buffer.length) {
+ int read = input.read(buffer, offset, buffer.length - offset);
+ if (read < 0) {
+ throw new IOException("Unexpected end of vector index file");
+ }
+ offset += read;
+ }
+ }
+
+ private static boolean areRangesNonOverlapping(long[] positions, byte[][] buffers) {
+ if (positions.length < 2) {
+ return true;
+ }
+
+ List indexes = new ArrayList<>(positions.length);
+ for (int i = 0; i < positions.length; i++) {
+ indexes.add(i);
+ }
+ indexes.sort(Comparator.comparingLong(index -> positions[index]));
+
+ boolean hasPrevious = false;
+ long previousEnd = 0;
+ for (int index : indexes) {
+ long offset = positions[index];
+ long end = offset + buffers[index].length;
+ if (end < offset || (hasPrevious && offset < previousEnd)) {
+ return false;
+ }
+ previousEnd = end;
+ hasPrevious = true;
+ }
+ return true;
+ }
+ }
+
+ // =================== unsupported =====================
+
+ @Override
+ public CompletableFuture> visitIsNotNull(FieldRef fieldRef) {
+ return CompletableFuture.completedFuture(Optional.empty());
+ }
+
+ @Override
+ public CompletableFuture> visitIsNull(FieldRef fieldRef) {
+ return CompletableFuture.completedFuture(Optional.empty());
+ }
+
+ @Override
+ public CompletableFuture> visitStartsWith(
+ FieldRef fieldRef, Object literal) {
+ return CompletableFuture.completedFuture(Optional.empty());
+ }
+
+ @Override
+ public CompletableFuture> visitEndsWith(
+ FieldRef fieldRef, Object literal) {
+ return CompletableFuture.completedFuture(Optional.empty());
+ }
+
+ @Override
+ public CompletableFuture> visitContains(
+ FieldRef fieldRef, Object literal) {
+ return CompletableFuture.completedFuture(Optional.empty());
+ }
+
+ @Override
+ public CompletableFuture> visitLike(
+ FieldRef fieldRef, Object literal) {
+ return CompletableFuture.completedFuture(Optional.empty());
+ }
+
+ @Override
+ public CompletableFuture> visitLessThan(
+ FieldRef fieldRef, Object literal) {
+ return CompletableFuture.completedFuture(Optional.empty());
+ }
+
+ @Override
+ public CompletableFuture> visitGreaterOrEqual(
+ FieldRef fieldRef, Object literal) {
+ return CompletableFuture.completedFuture(Optional.empty());
+ }
+
+ @Override
+ public CompletableFuture> visitNotEqual(
+ FieldRef fieldRef, Object literal) {
+ return CompletableFuture.completedFuture(Optional.empty());
+ }
+
+ @Override
+ public CompletableFuture> visitLessOrEqual(
+ FieldRef fieldRef, Object literal) {
+ return CompletableFuture.completedFuture(Optional.empty());
+ }
+
+ @Override
+ public CompletableFuture> visitEqual(
+ FieldRef fieldRef, Object literal) {
+ return CompletableFuture.completedFuture(Optional.empty());
+ }
+
+ @Override
+ public CompletableFuture> visitGreaterThan(
+ FieldRef fieldRef, Object literal) {
+ return CompletableFuture.completedFuture(Optional.empty());
+ }
+
+ @Override
+ public CompletableFuture> visitIn(
+ FieldRef fieldRef, List