Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 74 additions & 0 deletions .github/workflows/utcase-vector-index.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

name: UTCase Vector Index

on:
push:
paths:
- 'paimon-vector/**'
pull_request:
paths:
- 'paimon-vector/**'

env:
JDK_VERSION: 8
MAVEN_OPTS: -Dmaven.wagon.httpconnectionManager.ttlSeconds=30 -Dmaven.wagon.http.retryHandler.requestSentEnabled=true

concurrency:
group: ${{ github.workflow }}-${{ github.event_name }}-${{ github.event.number || github.run_id }}
cancel-in-progress: true

jobs:
vector_index_test:
runs-on: ubuntu-latest

steps:
- name: Checkout code
uses: actions/checkout@v6

- name: Set up JDK ${{ env.JDK_VERSION }}
uses: actions/setup-java@v5
with:
java-version: ${{ env.JDK_VERSION }}
distribution: 'temurin'

- name: Install Rust toolchain
run: |
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y --default-toolchain stable --profile minimal
echo "$HOME/.cargo/bin" >> $GITHUB_PATH

- name: Clone and build paimon-vector-index native library
run: |
git clone --depth 1 https://github.com/apache/paimon-vector-index.git /tmp/paimon-vector-index
cd /tmp/paimon-vector-index
cargo build --release -p paimon-vindex-jni

- name: Copy native library to resources
run: |
RESOURCE_DIR=paimon-vector/paimon-vector-jni/src/main/resources/native/linux-amd64
mkdir -p ${RESOURCE_DIR}
cp /tmp/paimon-vector-index/target/release/libpaimon_vindex_jni.so ${RESOURCE_DIR}/

- name: Build and test vector index modules
timeout-minutes: 30
run: |
mvn -T 2C -B -ntp clean install -DskipTests
mvn -B -ntp verify -pl paimon-vector/paimon-vector-jni,paimon-vector/paimon-vector-index -Dcheckstyle.skip=true -Dspotless.check.skip=true
env:
MAVEN_OPTS: -Xmx4096m
2 changes: 1 addition & 1 deletion docs/docs/flink/procedures.md
Original file line number Diff line number Diff line change
Expand Up @@ -1004,7 +1004,7 @@ All available procedures are listed below.
To create a global index on a table for accelerating queries. Arguments:
<li>table(required): the target table identifier.</li>
<li>index_column(required): the column name to build index on.</li>
<li>index_type(required): the type of global index, supported types include 'btree', 'lumina', 'tantivy-fulltext'.</li>
<li>index_type(required): the type of global index, supported types include 'btree', 'ivf-flat', 'ivf-pq', 'ivf-hnsw-flat', 'ivf-hnsw-sq', 'tantivy-fulltext'.</li>
<li>partitions(optional): partition filter for selective index creation.</li>
<li>options(optional): additional dynamic options for index creation.</li>
</td>
Expand Down
14 changes: 6 additions & 8 deletions docs/docs/learn-paimon/scenario-guide.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ configurations that are suited for different scenarios.
| Queue-like ordered streaming | Append Table | `bucket = N, bucket-key = col` |
| Large-scale OLAP with ad-hoc queries | Append Table | Incremental Clustering |
| Store images / videos / documents | Append Table (Blob) | `__BLOB_FIELD` comment, Data Evolution enabled |
| AI vector search / RAG | Append Table (Vector) | `VECTOR` type, Global Index (DiskANN) |
| AI vector search / RAG | Append Table (Vector) | `VECTOR` type, Vector Global Index |
| AI feature engineering & column evolution | Append Table | `data-evolution.enabled = true` |
| Python AI pipeline (Ray / PyTorch) | Append Table | PyPaimon SDK |

Expand Down Expand Up @@ -456,21 +456,19 @@ Schema schema = Schema.newBuilder()
**Build the vector index and search:**

```sql
-- Build DiskANN vector index
-- Build IVF-PQ vector index
CALL sys.create_global_index(
table => 'db.doc_embeddings',
index_column => 'embedding',
index_type => 'lumina',
options => 'lumina.index.dimension=768'
index_type => 'ivf-pq',
options => 'vector.distance.metric=cosine,vector.nlist=256,vector.pq.m=16'
);

-- Search for top-5 nearest neighbors
SELECT * FROM vector_search('doc_embeddings', 'embedding', array(0.1f, 0.2f, ...), 5);
```

The legacy index type `lumina-vector-ann` is still accepted for existing tables and SQL compatibility.

**Why:** The [Global Index](../multimodal-table/global-index) with DiskANN provides high-performance ANN search.
**Why:** The [Global Index](../multimodal-table/global-index) with vector indexes provides high-performance ANN search.
Vector data is stored in dedicated `.vector.lance` files optimized for dense vectors, while scalar columns stay in
Parquet. You can also build a **BTree Index** on scalar columns for efficient filtering:

Expand Down Expand Up @@ -664,7 +662,7 @@ Do you need upsert / update / delete?
└── AI / Multimodal scenarios? → Enable Data Evolution
├── Store images / videos / docs? → Blob Table (__BLOB_FIELD comment)
├── Vector search / RAG? → VECTOR type + Global Index (DiskANN)
├── Vector search / RAG? → VECTOR type + Vector Global Index
├── Feature engineering? → Data Evolution (MERGE INTO partial columns)
└── Python pipeline? → PyPaimon (Ray / PyTorch / Pandas)
```
46 changes: 38 additions & 8 deletions docs/docs/multimodal-table/global-index.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ Global Index is a powerful indexing mechanism for Data Evolution (append) tables
without full-table scans. Paimon supports multiple global index types:

- **BTree Index**: A B-tree based index for scalar column lookups. Supports equality, IN, range predicates, and can be combined across multiple columns with AND/OR logic.
- **Vector Index**: An approximate nearest neighbor (ANN) index powered by DiskANN for vector similarity search.
- **Vector Index**: An approximate nearest neighbor (ANN) index powered by Paimon's vector index library for vector similarity search.
- **Full-Text Index**: A full-text search index powered by Tantivy for text retrieval. Supports term matching and relevance scoring.

Global indexes work on top of Data Evolution tables. To use global indexes, your table **must** have:
Expand Down Expand Up @@ -87,26 +87,55 @@ SELECT * FROM my_table WHERE name IN ('a200', 'a300');

## Vector Index

Vector Index provides approximate nearest neighbor (ANN) search based on the DiskANN algorithm. It is suitable for
vector similarity search scenarios such as recommendation systems, image retrieval, and RAG (Retrieval Augmented
Generation) applications.
Vector Index provides approximate nearest neighbor (ANN) search for vector similarity search scenarios such as
recommendation systems, image retrieval, and RAG (Retrieval Augmented Generation) applications.

Supported vector index types:

| Index Type | Description |
|---|---|
| `ivf-flat` | IVF index with flat vector storage. |
| `ivf-pq` | IVF index with product quantization. |
| `ivf-hnsw-flat` | IVF index with HNSW flat quantizer. |
| `ivf-hnsw-sq` | IVF index with HNSW scalar quantizer. |

**Build Vector Index**

```sql
-- Create Lumina vector index on 'embedding' column
-- Create IVF-PQ vector index on 'embedding' column
CALL sys.create_global_index(
table => 'db.my_table',
index_column => 'embedding',
index_type => 'lumina',
options => 'lumina.index.dimension=128'
index_type => 'ivf-pq',
options => 'ivf-pq.distance.metric=cosine,ivf-pq.nlist=256,ivf-pq.pq.m=16'
);
```

The legacy index type `lumina-vector-ann` is still accepted for existing tables and SQL compatibility.
For `ARRAY<FLOAT>` vector columns, specify the vector dimension with `<index-type>.dimension`.
For `VECTOR<FLOAT>` columns, Paimon uses the dimension from the column type.

Supported vector index options:

| Option | Default | Description |
|---|---|---|
| `<index-type>.dimension` | `128` | Vector dimension for `ARRAY<FLOAT>` columns. Ignored for `VECTOR<FLOAT>` columns. |
| `<index-type>.distance.metric` | `inner_product` | Distance metric. Supported values: `l2`, `cosine`, `inner_product`. |
| `<index-type>.nlist` | `256` | Number of IVF clusters used during index build. |
| `<index-type>.pq.m` | `16` | Number of PQ sub-vectors for `ivf-pq`. The vector dimension must be divisible by this value. |
| `<index-type>.pq.use-opq` | `false` | Whether to enable OPQ for `ivf-pq`. |
| `<index-type>.hnsw.m` | `20` | HNSW graph out-degree for `ivf-hnsw-flat` and `ivf-hnsw-sq`. |
| `<index-type>.hnsw.ef-construction` | `150` | HNSW construction search width for `ivf-hnsw-flat` and `ivf-hnsw-sq`. |
| `<index-type>.hnsw.max-level` | `7` | Maximum HNSW level for `ivf-hnsw-flat` and `ivf-hnsw-sq`. |

**Vector Search**

Search-time options are passed with each vector search request:

| Option | Default | Description |
|---|---|---|
| `ivf.nprobe` | `16` | Number of IVF clusters to probe during search. |
| `hnsw.ef_search` | `0` | HNSW search width during search. `0` uses the native library default. |

<Tabs groupId="vector-search">

<TabItem value="spark-sql" label="Spark SQL">
Expand Down Expand Up @@ -155,6 +184,7 @@ GlobalIndexResult result = table.newVectorSearchBuilder()
.withVector(queryVector)
.withLimit(5)
.withVectorColumn("embedding")
.withOption("ivf.nprobe", "16")
.executeLocal();

// Step 2: Read matching rows using the search result
Expand Down
2 changes: 1 addition & 1 deletion docs/docs/multimodal-table/index.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ Key capabilities:
- **[Data Evolution](./data-evolution)**: Update partial columns without rewriting entire files, enabling efficient schema evolution.
- **[Blob Storage](./blob)**: Store large binary objects (images, videos, audio) in dedicated `.blob` files with efficient column projection.
- **[Vector Storage](./vector)**: Store and manage vector embeddings in dedicated Vortex-format files optimized for vector workloads.
- **[Global Index](./global-index)**: Build BTree, vector (DiskANN), and full-text (Tantivy) indexes for efficient lookups and similarity search.
- **[Global Index](./global-index)**: Build BTree, vector, and full-text (Tantivy) indexes for efficient lookups and similarity search.

All multimodal features require the following table properties:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,20 +51,33 @@ public static void readVectored(VectoredReadable readable, List<? extends FileRa
if (ranges.isEmpty()) {
return;
}
readVectored(readable, ranges, ReadOptions.from(readable));
}

public static void readVectored(
VectoredReadable readable, List<? extends FileRange> ranges, ReadOptions options)
throws IOException {
if (ranges.isEmpty()) {
return;
}
requireNonNull(readable, "readable is null");
requireNonNull(options, "options is null");

List<? extends FileRange> sortRanges = validateAndSortRanges(ranges);
List<CombinedRange> combinedRanges =
mergeSortedRanges(sortRanges, readable.minSeekForVectorReads());
mergeSortedRanges(sortRanges, options.minSeekForVectorReads);

int parallelism = readable.parallelismForVectorReads();
int parallelism = options.parallelismForVectorReads;

if (combinedRanges.size() == 1 && readable instanceof SeekableInputStream) {
if (options.sequentialReadFallback
&& combinedRanges.size() == 1
&& readable instanceof SeekableInputStream) {
fallbackToReadSequence((SeekableInputStream) readable, sortRanges);
return;
}

BlockingExecutor executor = new BlockingExecutor(IO_THREAD_POOL, parallelism);
long batchSize = readable.batchSizeForVectorReads();
long batchSize = options.batchSizeForVectorReads;
for (CombinedRange combinedRange : combinedRanges) {
if (combinedRange.underlying.size() == 1) {
FileRange fileRange = combinedRange.underlying.get(0);
Expand All @@ -76,12 +89,95 @@ public static void readVectored(VectoredReadable readable, List<? extends FileRa
List<CompletableFuture<byte[]>> futures =
splitBatches.stream().map(FileRange::getData).collect(Collectors.toList());
CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[0]))
.thenAcceptAsync(
unused -> copyToFileRanges(combinedRange, futures), IO_THREAD_POOL);
.whenCompleteAsync(
(unused, throwable) -> {
if (throwable == null) {
try {
copyToFileRanges(combinedRange, futures);
} catch (Throwable t) {
completeFileRangesExceptionally(combinedRange, t);
}
} else {
completeFileRangesExceptionally(combinedRange, throwable);
}
},
IO_THREAD_POOL);
}
}
}

/** Options for vectored reads. */
public static class ReadOptions {

private final int minSeekForVectorReads;
private final long batchSizeForVectorReads;
private final int parallelismForVectorReads;
private final boolean sequentialReadFallback;

public static ReadOptions from(VectoredReadable readable) {
return new ReadOptions(
readable.minSeekForVectorReads(),
readable.batchSizeForVectorReads(),
readable.parallelismForVectorReads(),
true);
}

public ReadOptions(
int minSeekForVectorReads,
long batchSizeForVectorReads,
int parallelismForVectorReads,
boolean sequentialReadFallback) {
checkArgument(
minSeekForVectorReads >= 0,
"minSeekForVectorReads must be non-negative: %s",
minSeekForVectorReads);
checkArgument(
batchSizeForVectorReads > 0,
"batchSizeForVectorReads must be positive: %s",
batchSizeForVectorReads);
checkArgument(
parallelismForVectorReads > 0,
"parallelismForVectorReads must be positive: %s",
parallelismForVectorReads);
this.minSeekForVectorReads = minSeekForVectorReads;
this.batchSizeForVectorReads = batchSizeForVectorReads;
this.parallelismForVectorReads = parallelismForVectorReads;
this.sequentialReadFallback = sequentialReadFallback;
}

public ReadOptions withMinSeekForVectorReads(int minSeekForVectorReads) {
return new ReadOptions(
minSeekForVectorReads,
batchSizeForVectorReads,
parallelismForVectorReads,
sequentialReadFallback);
}

public ReadOptions withBatchSizeForVectorReads(long batchSizeForVectorReads) {
return new ReadOptions(
minSeekForVectorReads,
batchSizeForVectorReads,
parallelismForVectorReads,
sequentialReadFallback);
}

public ReadOptions withParallelismForVectorReads(int parallelismForVectorReads) {
return new ReadOptions(
minSeekForVectorReads,
batchSizeForVectorReads,
parallelismForVectorReads,
sequentialReadFallback);
}

public ReadOptions withSequentialReadFallback(boolean sequentialReadFallback) {
return new ReadOptions(
minSeekForVectorReads,
batchSizeForVectorReads,
parallelismForVectorReads,
sequentialReadFallback);
}
}

private static void fallbackToReadSequence(
SeekableInputStream in, List<? extends FileRange> ranges) throws IOException {
for (FileRange range : ranges) {
Expand Down Expand Up @@ -126,6 +222,13 @@ private static void copyToFileRanges(
}
}

private static void completeFileRangesExceptionally(
CombinedRange combinedRange, Throwable throwable) {
for (FileRange fileRange : combinedRange.underlying) {
fileRange.getData().completeExceptionally(throwable);
}
}

private static void copyMultiBytesToBytes(
List<byte[]> segments, int offset, byte[] bytes, int numBytes) {
int remainSize = numBytes;
Expand Down
Loading
Loading