diff --git a/.github/workflows/utcase-vector-index.yml b/.github/workflows/utcase-vector-index.yml new file mode 100644 index 000000000000..fa3da08567c3 --- /dev/null +++ b/.github/workflows/utcase-vector-index.yml @@ -0,0 +1,74 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +name: UTCase Vector Index + +on: + push: + paths: + - 'paimon-vector/**' + pull_request: + paths: + - 'paimon-vector/**' + +env: + JDK_VERSION: 8 + MAVEN_OPTS: -Dmaven.wagon.httpconnectionManager.ttlSeconds=30 -Dmaven.wagon.http.retryHandler.requestSentEnabled=true + +concurrency: + group: ${{ github.workflow }}-${{ github.event_name }}-${{ github.event.number || github.run_id }} + cancel-in-progress: true + +jobs: + vector_index_test: + runs-on: ubuntu-latest + + steps: + - name: Checkout code + uses: actions/checkout@v6 + + - name: Set up JDK ${{ env.JDK_VERSION }} + uses: actions/setup-java@v5 + with: + java-version: ${{ env.JDK_VERSION }} + distribution: 'temurin' + + - name: Install Rust toolchain + run: | + curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y --default-toolchain stable --profile minimal + echo "$HOME/.cargo/bin" >> $GITHUB_PATH + + - name: Clone and build paimon-vector-index native library + run: | + git clone --depth 1 https://github.com/apache/paimon-vector-index.git /tmp/paimon-vector-index + cd /tmp/paimon-vector-index + cargo build --release -p paimon-vindex-jni + + - name: Copy native library to resources + run: | + RESOURCE_DIR=paimon-vector/paimon-vector-jni/src/main/resources/native/linux-amd64 + mkdir -p ${RESOURCE_DIR} + cp /tmp/paimon-vector-index/target/release/libpaimon_vindex_jni.so ${RESOURCE_DIR}/ + + - name: Build and test vector index modules + timeout-minutes: 30 + run: | + mvn -T 2C -B -ntp clean install -DskipTests + mvn -B -ntp verify -pl paimon-vector/paimon-vector-jni,paimon-vector/paimon-vector-index -Dcheckstyle.skip=true -Dspotless.check.skip=true + env: + MAVEN_OPTS: -Xmx4096m diff --git a/docs/docs/flink/procedures.md b/docs/docs/flink/procedures.md index 31601354f1a8..0dbf8f0c1bc7 100644 --- a/docs/docs/flink/procedures.md +++ b/docs/docs/flink/procedures.md @@ -1004,7 +1004,7 @@ All available procedures are listed below. To create a global index on a table for accelerating queries. Arguments:
  • table(required): the target table identifier.
  • index_column(required): the column name to build index on.
  • -
  • index_type(required): the type of global index, supported types include 'btree', 'lumina', 'tantivy-fulltext'.
  • +
  • index_type(required): the type of global index, supported types include 'btree', 'ivf-flat', 'ivf-pq', 'ivf-hnsw-flat', 'ivf-hnsw-sq', 'tantivy-fulltext'.
  • partitions(optional): partition filter for selective index creation.
  • options(optional): additional dynamic options for index creation.
  • diff --git a/docs/docs/learn-paimon/scenario-guide.mdx b/docs/docs/learn-paimon/scenario-guide.mdx index 341ceddae99e..4aae116bc079 100644 --- a/docs/docs/learn-paimon/scenario-guide.mdx +++ b/docs/docs/learn-paimon/scenario-guide.mdx @@ -44,7 +44,7 @@ configurations that are suited for different scenarios. | Queue-like ordered streaming | Append Table | `bucket = N, bucket-key = col` | | Large-scale OLAP with ad-hoc queries | Append Table | Incremental Clustering | | Store images / videos / documents | Append Table (Blob) | `__BLOB_FIELD` comment, Data Evolution enabled | -| AI vector search / RAG | Append Table (Vector) | `VECTOR` type, Global Index (DiskANN) | +| AI vector search / RAG | Append Table (Vector) | `VECTOR` type, Vector Global Index | | AI feature engineering & column evolution | Append Table | `data-evolution.enabled = true` | | Python AI pipeline (Ray / PyTorch) | Append Table | PyPaimon SDK | @@ -456,21 +456,19 @@ Schema schema = Schema.newBuilder() **Build the vector index and search:** ```sql --- Build DiskANN vector index +-- Build IVF-PQ vector index CALL sys.create_global_index( table => 'db.doc_embeddings', index_column => 'embedding', - index_type => 'lumina', - options => 'lumina.index.dimension=768' + index_type => 'ivf-pq', + options => 'vector.distance.metric=cosine,vector.nlist=256,vector.pq.m=16' ); -- Search for top-5 nearest neighbors SELECT * FROM vector_search('doc_embeddings', 'embedding', array(0.1f, 0.2f, ...), 5); ``` -The legacy index type `lumina-vector-ann` is still accepted for existing tables and SQL compatibility. - -**Why:** The [Global Index](../multimodal-table/global-index) with DiskANN provides high-performance ANN search. +**Why:** The [Global Index](../multimodal-table/global-index) with vector indexes provides high-performance ANN search. Vector data is stored in dedicated `.vector.lance` files optimized for dense vectors, while scalar columns stay in Parquet. You can also build a **BTree Index** on scalar columns for efficient filtering: @@ -664,7 +662,7 @@ Do you need upsert / update / delete? │ └── AI / Multimodal scenarios? → Enable Data Evolution ├── Store images / videos / docs? → Blob Table (__BLOB_FIELD comment) - ├── Vector search / RAG? → VECTOR type + Global Index (DiskANN) + ├── Vector search / RAG? → VECTOR type + Vector Global Index ├── Feature engineering? → Data Evolution (MERGE INTO partial columns) └── Python pipeline? → PyPaimon (Ray / PyTorch / Pandas) ``` diff --git a/docs/docs/multimodal-table/global-index.mdx b/docs/docs/multimodal-table/global-index.mdx index 8edcfe87e2ad..747ccbaa9ad0 100644 --- a/docs/docs/multimodal-table/global-index.mdx +++ b/docs/docs/multimodal-table/global-index.mdx @@ -33,7 +33,7 @@ Global Index is a powerful indexing mechanism for Data Evolution (append) tables without full-table scans. Paimon supports multiple global index types: - **BTree Index**: A B-tree based index for scalar column lookups. Supports equality, IN, range predicates, and can be combined across multiple columns with AND/OR logic. -- **Vector Index**: An approximate nearest neighbor (ANN) index powered by DiskANN for vector similarity search. +- **Vector Index**: An approximate nearest neighbor (ANN) index powered by Paimon's vector index library for vector similarity search. - **Full-Text Index**: A full-text search index powered by Tantivy for text retrieval. Supports term matching and relevance scoring. Global indexes work on top of Data Evolution tables. To use global indexes, your table **must** have: @@ -87,26 +87,55 @@ SELECT * FROM my_table WHERE name IN ('a200', 'a300'); ## Vector Index -Vector Index provides approximate nearest neighbor (ANN) search based on the DiskANN algorithm. It is suitable for -vector similarity search scenarios such as recommendation systems, image retrieval, and RAG (Retrieval Augmented -Generation) applications. +Vector Index provides approximate nearest neighbor (ANN) search for vector similarity search scenarios such as +recommendation systems, image retrieval, and RAG (Retrieval Augmented Generation) applications. + +Supported vector index types: + +| Index Type | Description | +|---|---| +| `ivf-flat` | IVF index with flat vector storage. | +| `ivf-pq` | IVF index with product quantization. | +| `ivf-hnsw-flat` | IVF index with HNSW flat quantizer. | +| `ivf-hnsw-sq` | IVF index with HNSW scalar quantizer. | **Build Vector Index** ```sql --- Create Lumina vector index on 'embedding' column +-- Create IVF-PQ vector index on 'embedding' column CALL sys.create_global_index( table => 'db.my_table', index_column => 'embedding', - index_type => 'lumina', - options => 'lumina.index.dimension=128' + index_type => 'ivf-pq', + options => 'ivf-pq.distance.metric=cosine,ivf-pq.nlist=256,ivf-pq.pq.m=16' ); ``` -The legacy index type `lumina-vector-ann` is still accepted for existing tables and SQL compatibility. +For `ARRAY` vector columns, specify the vector dimension with `.dimension`. +For `VECTOR` columns, Paimon uses the dimension from the column type. + +Supported vector index options: + +| Option | Default | Description | +|---|---|---| +| `.dimension` | `128` | Vector dimension for `ARRAY` columns. Ignored for `VECTOR` columns. | +| `.distance.metric` | `inner_product` | Distance metric. Supported values: `l2`, `cosine`, `inner_product`. | +| `.nlist` | `256` | Number of IVF clusters used during index build. | +| `.pq.m` | `16` | Number of PQ sub-vectors for `ivf-pq`. The vector dimension must be divisible by this value. | +| `.pq.use-opq` | `false` | Whether to enable OPQ for `ivf-pq`. | +| `.hnsw.m` | `20` | HNSW graph out-degree for `ivf-hnsw-flat` and `ivf-hnsw-sq`. | +| `.hnsw.ef-construction` | `150` | HNSW construction search width for `ivf-hnsw-flat` and `ivf-hnsw-sq`. | +| `.hnsw.max-level` | `7` | Maximum HNSW level for `ivf-hnsw-flat` and `ivf-hnsw-sq`. | **Vector Search** +Search-time options are passed with each vector search request: + +| Option | Default | Description | +|---|---|---| +| `ivf.nprobe` | `16` | Number of IVF clusters to probe during search. | +| `hnsw.ef_search` | `0` | HNSW search width during search. `0` uses the native library default. | + @@ -155,6 +184,7 @@ GlobalIndexResult result = table.newVectorSearchBuilder() .withVector(queryVector) .withLimit(5) .withVectorColumn("embedding") + .withOption("ivf.nprobe", "16") .executeLocal(); // Step 2: Read matching rows using the search result diff --git a/docs/docs/multimodal-table/index.mdx b/docs/docs/multimodal-table/index.mdx index 4e0f42a38712..f0a4b2975082 100644 --- a/docs/docs/multimodal-table/index.mdx +++ b/docs/docs/multimodal-table/index.mdx @@ -37,7 +37,7 @@ Key capabilities: - **[Data Evolution](./data-evolution)**: Update partial columns without rewriting entire files, enabling efficient schema evolution. - **[Blob Storage](./blob)**: Store large binary objects (images, videos, audio) in dedicated `.blob` files with efficient column projection. - **[Vector Storage](./vector)**: Store and manage vector embeddings in dedicated Vortex-format files optimized for vector workloads. -- **[Global Index](./global-index)**: Build BTree, vector (DiskANN), and full-text (Tantivy) indexes for efficient lookups and similarity search. +- **[Global Index](./global-index)**: Build BTree, vector, and full-text (Tantivy) indexes for efficient lookups and similarity search. All multimodal features require the following table properties: diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/VectoredReadUtils.java b/paimon-common/src/main/java/org/apache/paimon/fs/VectoredReadUtils.java index bda768ef96f6..8f1551bff988 100644 --- a/paimon-common/src/main/java/org/apache/paimon/fs/VectoredReadUtils.java +++ b/paimon-common/src/main/java/org/apache/paimon/fs/VectoredReadUtils.java @@ -51,20 +51,33 @@ public static void readVectored(VectoredReadable readable, List ranges, ReadOptions options) + throws IOException { + if (ranges.isEmpty()) { + return; + } + requireNonNull(readable, "readable is null"); + requireNonNull(options, "options is null"); List sortRanges = validateAndSortRanges(ranges); List combinedRanges = - mergeSortedRanges(sortRanges, readable.minSeekForVectorReads()); + mergeSortedRanges(sortRanges, options.minSeekForVectorReads); - int parallelism = readable.parallelismForVectorReads(); + int parallelism = options.parallelismForVectorReads; - if (combinedRanges.size() == 1 && readable instanceof SeekableInputStream) { + if (options.sequentialReadFallback + && combinedRanges.size() == 1 + && readable instanceof SeekableInputStream) { fallbackToReadSequence((SeekableInputStream) readable, sortRanges); return; } BlockingExecutor executor = new BlockingExecutor(IO_THREAD_POOL, parallelism); - long batchSize = readable.batchSizeForVectorReads(); + long batchSize = options.batchSizeForVectorReads; for (CombinedRange combinedRange : combinedRanges) { if (combinedRange.underlying.size() == 1) { FileRange fileRange = combinedRange.underlying.get(0); @@ -76,12 +89,95 @@ public static void readVectored(VectoredReadable readable, List> futures = splitBatches.stream().map(FileRange::getData).collect(Collectors.toList()); CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])) - .thenAcceptAsync( - unused -> copyToFileRanges(combinedRange, futures), IO_THREAD_POOL); + .whenCompleteAsync( + (unused, throwable) -> { + if (throwable == null) { + try { + copyToFileRanges(combinedRange, futures); + } catch (Throwable t) { + completeFileRangesExceptionally(combinedRange, t); + } + } else { + completeFileRangesExceptionally(combinedRange, throwable); + } + }, + IO_THREAD_POOL); } } } + /** Options for vectored reads. */ + public static class ReadOptions { + + private final int minSeekForVectorReads; + private final long batchSizeForVectorReads; + private final int parallelismForVectorReads; + private final boolean sequentialReadFallback; + + public static ReadOptions from(VectoredReadable readable) { + return new ReadOptions( + readable.minSeekForVectorReads(), + readable.batchSizeForVectorReads(), + readable.parallelismForVectorReads(), + true); + } + + public ReadOptions( + int minSeekForVectorReads, + long batchSizeForVectorReads, + int parallelismForVectorReads, + boolean sequentialReadFallback) { + checkArgument( + minSeekForVectorReads >= 0, + "minSeekForVectorReads must be non-negative: %s", + minSeekForVectorReads); + checkArgument( + batchSizeForVectorReads > 0, + "batchSizeForVectorReads must be positive: %s", + batchSizeForVectorReads); + checkArgument( + parallelismForVectorReads > 0, + "parallelismForVectorReads must be positive: %s", + parallelismForVectorReads); + this.minSeekForVectorReads = minSeekForVectorReads; + this.batchSizeForVectorReads = batchSizeForVectorReads; + this.parallelismForVectorReads = parallelismForVectorReads; + this.sequentialReadFallback = sequentialReadFallback; + } + + public ReadOptions withMinSeekForVectorReads(int minSeekForVectorReads) { + return new ReadOptions( + minSeekForVectorReads, + batchSizeForVectorReads, + parallelismForVectorReads, + sequentialReadFallback); + } + + public ReadOptions withBatchSizeForVectorReads(long batchSizeForVectorReads) { + return new ReadOptions( + minSeekForVectorReads, + batchSizeForVectorReads, + parallelismForVectorReads, + sequentialReadFallback); + } + + public ReadOptions withParallelismForVectorReads(int parallelismForVectorReads) { + return new ReadOptions( + minSeekForVectorReads, + batchSizeForVectorReads, + parallelismForVectorReads, + sequentialReadFallback); + } + + public ReadOptions withSequentialReadFallback(boolean sequentialReadFallback) { + return new ReadOptions( + minSeekForVectorReads, + batchSizeForVectorReads, + parallelismForVectorReads, + sequentialReadFallback); + } + } + private static void fallbackToReadSequence( SeekableInputStream in, List ranges) throws IOException { for (FileRange range : ranges) { @@ -126,6 +222,13 @@ private static void copyToFileRanges( } } + private static void completeFileRangesExceptionally( + CombinedRange combinedRange, Throwable throwable) { + for (FileRange fileRange : combinedRange.underlying) { + fileRange.getData().completeExceptionally(throwable); + } + } + private static void copyMultiBytesToBytes( List segments, int offset, byte[] bytes, int numBytes) { int remainSize = numBytes; diff --git a/paimon-common/src/test/java/org/apache/paimon/fs/VectoredReadUtilsTest.java b/paimon-common/src/test/java/org/apache/paimon/fs/VectoredReadUtilsTest.java index a3264b08e20c..1cd5476e63e4 100644 --- a/paimon-common/src/test/java/org/apache/paimon/fs/VectoredReadUtilsTest.java +++ b/paimon-common/src/test/java/org/apache/paimon/fs/VectoredReadUtilsTest.java @@ -25,9 +25,14 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; class VectoredReadUtilsTest { @@ -127,4 +132,108 @@ public void testRandom() throws Exception { } doTest(ranges); } + + @Test + public void testReadOptionsCanDisableSequentialReadFallback() throws Exception { + TestSeekableVectoredReadable readable = new TestSeekableVectoredReadable(2); + + List ranges = + Arrays.asList( + FileRange.createFileRange(0, 100), FileRange.createFileRange(150, 100)); + VectoredReadUtils.ReadOptions options = + new VectoredReadUtils.ReadOptions(1000, 100, 2, false); + + VectoredReadUtils.readVectored(readable, ranges, options); + assertThat(readable.readsStarted.await(5, TimeUnit.SECONDS)).isTrue(); + readable.finishReads.countDown(); + + for (FileRange range : ranges) { + assertThat(range.getData().get(5, TimeUnit.SECONDS)).hasSize(range.getLength()); + } + assertThat(readable.reads).hasValue(2); + assertThat(readable.sequentialReads).hasValue(0); + assertThat(readable.maxActiveReads).hasValue(2); + } + + @Test + public void testReadOptionsPropagateSplitReadFailure() throws Exception { + VectoredReadable readable = + new VectoredReadable() { + @Override + public int pread(long position, byte[] buffer, int offset, int length) + throws IOException { + throw new IOException("failed"); + } + }; + + List ranges = + Arrays.asList( + FileRange.createFileRange(0, 100), FileRange.createFileRange(150, 100)); + VectoredReadUtils.ReadOptions options = + new VectoredReadUtils.ReadOptions(1000, 100, 2, false); + + VectoredReadUtils.readVectored(readable, ranges, options); + + assertThatThrownBy(() -> ranges.get(0).getData().get(5, TimeUnit.SECONDS)) + .isInstanceOf(ExecutionException.class) + .hasMessageContaining("failed"); + assertThatThrownBy(() -> ranges.get(1).getData().get(5, TimeUnit.SECONDS)) + .isInstanceOf(ExecutionException.class) + .hasMessageContaining("failed"); + } + + private class TestSeekableVectoredReadable extends SeekableInputStream + implements VectoredReadable { + + private final CountDownLatch readsStarted; + private final CountDownLatch finishReads = new CountDownLatch(1); + private final AtomicInteger reads = new AtomicInteger(); + private final AtomicInteger sequentialReads = new AtomicInteger(); + private final AtomicInteger activeReads = new AtomicInteger(); + private final AtomicInteger maxActiveReads = new AtomicInteger(); + + private TestSeekableVectoredReadable(int expectedReads) { + this.readsStarted = new CountDownLatch(expectedReads); + } + + @Override + public void seek(long desired) {} + + @Override + public long getPos() { + return 0; + } + + @Override + public int read() throws IOException { + throw new IOException("Sequential read should not be used"); + } + + @Override + public int read(byte[] buffer, int offset, int length) throws IOException { + sequentialReads.incrementAndGet(); + throw new IOException("Sequential read should not be used"); + } + + @Override + public void close() {} + + @Override + public int pread(long position, byte[] buffer, int offset, int length) throws IOException { + int active = activeReads.incrementAndGet(); + maxActiveReads.accumulateAndGet(active, Math::max); + readsStarted.countDown(); + try { + finishReads.await(); + System.arraycopy(bytes, (int) position, buffer, offset, length); + reads.incrementAndGet(); + return length; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException(e); + } finally { + activeReads.decrementAndGet(); + } + } + } } diff --git a/paimon-vector/paimon-vector-index/pom.xml b/paimon-vector/paimon-vector-index/pom.xml new file mode 100644 index 000000000000..6744de7c9e4a --- /dev/null +++ b/paimon-vector/paimon-vector-index/pom.xml @@ -0,0 +1,200 @@ + + + + 4.0.0 + + + paimon-vector + org.apache.paimon + 1.5-SNAPSHOT + + + paimon-vector-index + Paimon : Vector Index + + + + org.apache.paimon + paimon-vector-jni + ${project.version} + + + + org.apache.paimon + paimon-common + ${project.version} + provided + + + + org.apache.paimon + paimon-shade-jackson-2 + ${paimon.shade.jackson.version}-${paimon.shade.version} + + + + + org.junit.jupiter + junit-jupiter + ${junit5.version} + test + + + + org.apache.paimon + paimon-core + ${project.version} + test + + + + org.apache.paimon + paimon-core + ${project.version} + test-jar + test + + + + org.apache.paimon + paimon-common + ${project.version} + test-jar + test + + + + org.apache.paimon + paimon-format + ${project.version} + test + + + + org.apache.paimon + paimon-test-utils + ${project.version} + test + + + + org.apache.hadoop + hadoop-hdfs-client + ${hadoop.version} + test + + + org.apache.avro + avro + + + log4j + log4j + + + org.slf4j + slf4j-log4j12 + + + + + + org.apache.hadoop + hadoop-common + ${hadoop.version} + test + + + org.apache.avro + avro + + + log4j + log4j + + + org.slf4j + slf4j-log4j12 + + + jdk.tools + jdk.tools + + + com.google.protobuf + protobuf-java + + + + + + org.apache.hadoop + hadoop-mapreduce-client-core + ${hadoop.version} + test + + + org.apache.avro + avro + + + com.google.protobuf + protobuf-java + + + ch.qos.reload4j + reload4j + + + org.slf4j + slf4j-reload4j + + + log4j + log4j + + + org.slf4j + slf4j-log4j12 + + + jdk.tools + jdk.tools + + + + + + + + + org.apache.maven.plugins + maven-surefire-plugin + + 1 + true + none + + + + + diff --git a/paimon-vector/paimon-vector-index/src/main/java/org/apache/paimon/vector/index/IvfFlatVectorGlobalIndexerFactory.java b/paimon-vector/paimon-vector-index/src/main/java/org/apache/paimon/vector/index/IvfFlatVectorGlobalIndexerFactory.java new file mode 100644 index 000000000000..572c7cf4edb2 --- /dev/null +++ b/paimon-vector/paimon-vector-index/src/main/java/org/apache/paimon/vector/index/IvfFlatVectorGlobalIndexerFactory.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.vector.index; + +/** Factory for the {@code ivf-flat} vector index identifier. */ +public class IvfFlatVectorGlobalIndexerFactory extends VectorGlobalIndexerFactory { + + public static final String IDENTIFIER = "ivf-flat"; + + @Override + public String identifier() { + return IDENTIFIER; + } +} diff --git a/paimon-vector/paimon-vector-index/src/main/java/org/apache/paimon/vector/index/IvfHnswFlatVectorGlobalIndexerFactory.java b/paimon-vector/paimon-vector-index/src/main/java/org/apache/paimon/vector/index/IvfHnswFlatVectorGlobalIndexerFactory.java new file mode 100644 index 000000000000..159e7af6f1ba --- /dev/null +++ b/paimon-vector/paimon-vector-index/src/main/java/org/apache/paimon/vector/index/IvfHnswFlatVectorGlobalIndexerFactory.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.vector.index; + +/** Factory for the {@code ivf-hnsw-flat} vector index identifier. */ +public class IvfHnswFlatVectorGlobalIndexerFactory extends VectorGlobalIndexerFactory { + + public static final String IDENTIFIER = "ivf-hnsw-flat"; + + @Override + public String identifier() { + return IDENTIFIER; + } +} diff --git a/paimon-vector/paimon-vector-index/src/main/java/org/apache/paimon/vector/index/IvfHnswSqVectorGlobalIndexerFactory.java b/paimon-vector/paimon-vector-index/src/main/java/org/apache/paimon/vector/index/IvfHnswSqVectorGlobalIndexerFactory.java new file mode 100644 index 000000000000..51c72cd8f39c --- /dev/null +++ b/paimon-vector/paimon-vector-index/src/main/java/org/apache/paimon/vector/index/IvfHnswSqVectorGlobalIndexerFactory.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.vector.index; + +/** Factory for the {@code ivf-hnsw-sq} vector index identifier. */ +public class IvfHnswSqVectorGlobalIndexerFactory extends VectorGlobalIndexerFactory { + + public static final String IDENTIFIER = "ivf-hnsw-sq"; + + @Override + public String identifier() { + return IDENTIFIER; + } +} diff --git a/paimon-vector/paimon-vector-index/src/main/java/org/apache/paimon/vector/index/IvfPqAlgorithmVectorGlobalIndexerFactory.java b/paimon-vector/paimon-vector-index/src/main/java/org/apache/paimon/vector/index/IvfPqAlgorithmVectorGlobalIndexerFactory.java new file mode 100644 index 000000000000..f3932de46ed6 --- /dev/null +++ b/paimon-vector/paimon-vector-index/src/main/java/org/apache/paimon/vector/index/IvfPqAlgorithmVectorGlobalIndexerFactory.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.vector.index; + +/** Factory for the {@code ivf-pq} vector index identifier. */ +public class IvfPqAlgorithmVectorGlobalIndexerFactory extends VectorGlobalIndexerFactory { + + public static final String IDENTIFIER = "ivf-pq"; + + @Override + public String identifier() { + return IDENTIFIER; + } +} diff --git a/paimon-vector/paimon-vector-index/src/main/java/org/apache/paimon/vector/index/VectorGlobalIndexReader.java b/paimon-vector/paimon-vector-index/src/main/java/org/apache/paimon/vector/index/VectorGlobalIndexReader.java new file mode 100644 index 000000000000..54f8532a2227 --- /dev/null +++ b/paimon-vector/paimon-vector-index/src/main/java/org/apache/paimon/vector/index/VectorGlobalIndexReader.java @@ -0,0 +1,464 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.vector.index; + +import org.apache.paimon.fs.FileRange; +import org.apache.paimon.fs.SeekableInputStream; +import org.apache.paimon.fs.VectoredReadUtils; +import org.apache.paimon.fs.VectoredReadable; +import org.apache.paimon.globalindex.GlobalIndexIOMeta; +import org.apache.paimon.globalindex.GlobalIndexReader; +import org.apache.paimon.globalindex.GlobalIndexResult; +import org.apache.paimon.globalindex.ScoredGlobalIndexResult; +import org.apache.paimon.globalindex.io.GlobalIndexFileReader; +import org.apache.paimon.index.vector.VectorIndexInput; +import org.apache.paimon.index.vector.VectorIndexMetadata; +import org.apache.paimon.index.vector.VectorIndexReader; +import org.apache.paimon.index.vector.VectorSearchResult; +import org.apache.paimon.predicate.FieldRef; +import org.apache.paimon.predicate.VectorSearch; +import org.apache.paimon.types.ArrayType; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.FloatType; +import org.apache.paimon.types.VectorType; +import org.apache.paimon.utils.IOUtils; +import org.apache.paimon.utils.RoaringNavigableMap64; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; + +import static org.apache.paimon.utils.Preconditions.checkArgument; + +/** + * Vector global index reader using paimon-vector-index. + * + *

    Each shard has exactly one vector index file. The reader lazily opens the index and performs + * vector similarity search. + */ +public class VectorGlobalIndexReader implements GlobalIndexReader { + + private static final String NPROBE_PARAMETER = "ivf.nprobe"; + private static final String EF_SEARCH_PARAMETER = "hnsw.ef_search"; + private static final int DEFAULT_NPROBE = 16; + private static final int DEFAULT_EF_SEARCH = 0; + private static final int VECTOR_INDEX_MIN_SEEK_FOR_VECTOR_READS = 16 * 1024; + private static final int VECTOR_INDEX_PARALLELISM_FOR_VECTOR_READS = 32; + + private final GlobalIndexIOMeta ioMeta; + private final GlobalIndexFileReader fileReader; + private final DataType fieldType; + private final ExecutorService executor; + + private volatile VectorIndexMetadata nativeMeta; + private volatile VectorIndexReader vectorReader; + private SeekableInputStream openStream; + + public VectorGlobalIndexReader( + GlobalIndexFileReader fileReader, + List ioMetas, + DataType fieldType, + ExecutorService executor) { + checkArgument(ioMetas.size() == 1, "Expected exactly one index file per shard"); + this.executor = executor; + this.fileReader = fileReader; + this.ioMeta = ioMetas.get(0); + this.fieldType = fieldType; + } + + @Override + public CompletableFuture> visitVectorSearch( + VectorSearch vectorSearch) { + return CompletableFuture.supplyAsync( + () -> { + try { + ensureLoaded(); + return Optional.ofNullable(search(vectorSearch)); + } catch (IOException e) { + throw new RuntimeException( + String.format( + "Failed vector index search: field=%s, limit=%d", + vectorSearch.fieldName(), vectorSearch.limit()), + e); + } + }, + executor); + } + + private ScoredGlobalIndexResult search(VectorSearch vectorSearch) throws IOException { + validateSearchVector(vectorSearch.vector()); + float[] queryVector = vectorSearch.vector().clone(); + int limit = vectorSearch.limit(); + int nprobe = nprobe(vectorSearch.options()); + int efSearch = efSearch(vectorSearch.options()); + String metric = nativeMeta.metric(); + + RoaringNavigableMap64 includeRowIds = vectorSearch.includeRowIds(); + VectorSearchResult result; + + if (includeRowIds != null) { + long cardinality = includeRowIds.getLongCardinality(); + if (cardinality == 0) { + return null; + } + byte[] filterBytes = includeRowIds.serialize(); + int effectiveK = (int) Math.min(limit, cardinality); + result = vectorReader.search(queryVector, effectiveK, nprobe, efSearch, filterBytes); + } else { + result = vectorReader.search(queryVector, limit, nprobe, efSearch); + } + + long[] ids = result.ids(); + float[] distances = result.distances(); + + if (ids.length == 0) { + return null; + } + + RoaringNavigableMap64 resultBitmap = new RoaringNavigableMap64(); + HashMap id2scores = new HashMap<>(ids.length); + + for (int i = 0; i < ids.length; i++) { + long rowId = ids[i]; + if (rowId < 0) { + continue; + } + float score = convertDistanceToScore(distances[i], metric); + resultBitmap.add(rowId); + id2scores.put(rowId, score); + } + + if (resultBitmap.isEmpty()) { + return null; + } + + return ScoredGlobalIndexResult.create( + resultBitmap, + rowId -> { + Float score = id2scores.get(rowId); + if (score == null) { + throw new IllegalArgumentException( + "No score found for rowId: " + + rowId + + ". Only rowIds present in results() are valid."); + } + return score; + }); + } + + private static float convertDistanceToScore(float distance, String metric) { + if ("l2".equals(metric)) { + return 1.0f / (1.0f + distance); + } else if ("cosine".equals(metric)) { + return 1.0f - distance; + } else if ("inner_product".equals(metric)) { + return distance; + } + throw new IllegalArgumentException("Unknown metric: " + metric); + } + + static int nprobe(Map parameters) { + return intParameter(parameters, NPROBE_PARAMETER, DEFAULT_NPROBE); + } + + static int efSearch(Map parameters) { + return intParameter(parameters, EF_SEARCH_PARAMETER, DEFAULT_EF_SEARCH); + } + + private static int intParameter(Map parameters, String key, int defaultValue) { + String value = parameters.get(key); + if (value == null) { + return defaultValue; + } + try { + return Integer.parseInt(value); + } catch (NumberFormatException e) { + throw new IllegalArgumentException( + "Invalid value for '" + key + "': " + value + ". Must be an integer.", e); + } + } + + private void validateSearchVector(Object vector) { + if (!(vector instanceof float[])) { + throw new IllegalArgumentException( + "Expected float[] vector but got: " + vector.getClass()); + } + boolean validFieldType = false; + if (fieldType instanceof VectorType) { + validFieldType = ((VectorType) fieldType).getElementType() instanceof FloatType; + } else if (fieldType instanceof ArrayType) { + validFieldType = ((ArrayType) fieldType).getElementType() instanceof FloatType; + } + if (!validFieldType) { + throw new IllegalArgumentException( + "Vector index requires VectorType or ArrayType, but field type is: " + + fieldType); + } + int queryDim = ((float[]) vector).length; + if (queryDim != nativeMeta.dimension()) { + throw new IllegalArgumentException( + String.format( + "Query vector dimension mismatch: index expects %d, but got %d", + nativeMeta.dimension(), queryDim)); + } + } + + private void ensureLoaded() throws IOException { + if (vectorReader == null) { + synchronized (this) { + if (vectorReader == null) { + SeekableInputStream in = fileReader.getInputStream(ioMeta); + try { + vectorReader = + new VectorIndexReader(new SeekableStreamVectorIndexInput(in)); + nativeMeta = vectorReader.metadata(); + openStream = in; + } catch (Exception e) { + IOUtils.closeQuietly(in); + throw e; + } + } + } + } + } + + @Override + public void close() throws IOException { + Throwable firstException = null; + + if (vectorReader != null) { + try { + vectorReader.close(); + } catch (Throwable t) { + firstException = t; + } + vectorReader = null; + } + + if (openStream != null) { + try { + openStream.close(); + } catch (Throwable t) { + if (firstException == null) { + firstException = t; + } else { + firstException.addSuppressed(t); + } + } + openStream = null; + } + + if (firstException != null) { + if (firstException instanceof IOException) { + throw (IOException) firstException; + } else if (firstException instanceof RuntimeException) { + throw (RuntimeException) firstException; + } else { + throw new RuntimeException( + "Failed to close vector global index reader", firstException); + } + } + } + + static class SeekableStreamVectorIndexInput implements VectorIndexInput { + + private final SeekableInputStream input; + + SeekableStreamVectorIndexInput(SeekableInputStream input) { + this.input = input; + } + + @Override + public void pread(long[] positions, byte[][] buffers) { + if (positions.length != buffers.length) { + throw new IllegalArgumentException( + "positions length " + + positions.length + + " != buffers length " + + buffers.length); + } + try { + if (input instanceof VectoredReadable + && areRangesNonOverlapping(positions, buffers)) { + preadVectored((VectoredReadable) input, positions, buffers); + } else { + synchronized (this) { + preadSequential(positions, buffers); + } + } + } catch (IOException e) { + throw new RuntimeException("Failed to read vector index", e); + } + } + + private void preadVectored(VectoredReadable readable, long[] positions, byte[][] buffers) + throws IOException { + List ranges = new ArrayList<>(positions.length); + for (int i = 0; i < positions.length; i++) { + ranges.add(FileRange.createFileRange(positions[i], buffers[i].length)); + } + + VectoredReadUtils.ReadOptions options = + VectoredReadUtils.ReadOptions.from(readable) + .withMinSeekForVectorReads(VECTOR_INDEX_MIN_SEEK_FOR_VECTOR_READS) + .withParallelismForVectorReads( + VECTOR_INDEX_PARALLELISM_FOR_VECTOR_READS) + .withSequentialReadFallback(false); + VectoredReadUtils.readVectored(readable, ranges, options); + + for (int i = 0; i < ranges.size(); i++) { + byte[] bytes = ranges.get(i).getData().join(); + System.arraycopy(bytes, 0, buffers[i], 0, bytes.length); + } + } + + private void preadSequential(long[] positions, byte[][] buffers) throws IOException { + for (int i = 0; i < positions.length; i++) { + input.seek(positions[i]); + readFully(input, buffers[i]); + } + } + + private static void readFully(SeekableInputStream input, byte[] buffer) throws IOException { + int offset = 0; + while (offset < buffer.length) { + int read = input.read(buffer, offset, buffer.length - offset); + if (read < 0) { + throw new IOException("Unexpected end of vector index file"); + } + offset += read; + } + } + + private static boolean areRangesNonOverlapping(long[] positions, byte[][] buffers) { + if (positions.length < 2) { + return true; + } + + List indexes = new ArrayList<>(positions.length); + for (int i = 0; i < positions.length; i++) { + indexes.add(i); + } + indexes.sort(Comparator.comparingLong(index -> positions[index])); + + boolean hasPrevious = false; + long previousEnd = 0; + for (int index : indexes) { + long offset = positions[index]; + long end = offset + buffers[index].length; + if (end < offset || (hasPrevious && offset < previousEnd)) { + return false; + } + previousEnd = end; + hasPrevious = true; + } + return true; + } + } + + // =================== unsupported ===================== + + @Override + public CompletableFuture> visitIsNotNull(FieldRef fieldRef) { + return CompletableFuture.completedFuture(Optional.empty()); + } + + @Override + public CompletableFuture> visitIsNull(FieldRef fieldRef) { + return CompletableFuture.completedFuture(Optional.empty()); + } + + @Override + public CompletableFuture> visitStartsWith( + FieldRef fieldRef, Object literal) { + return CompletableFuture.completedFuture(Optional.empty()); + } + + @Override + public CompletableFuture> visitEndsWith( + FieldRef fieldRef, Object literal) { + return CompletableFuture.completedFuture(Optional.empty()); + } + + @Override + public CompletableFuture> visitContains( + FieldRef fieldRef, Object literal) { + return CompletableFuture.completedFuture(Optional.empty()); + } + + @Override + public CompletableFuture> visitLike( + FieldRef fieldRef, Object literal) { + return CompletableFuture.completedFuture(Optional.empty()); + } + + @Override + public CompletableFuture> visitLessThan( + FieldRef fieldRef, Object literal) { + return CompletableFuture.completedFuture(Optional.empty()); + } + + @Override + public CompletableFuture> visitGreaterOrEqual( + FieldRef fieldRef, Object literal) { + return CompletableFuture.completedFuture(Optional.empty()); + } + + @Override + public CompletableFuture> visitNotEqual( + FieldRef fieldRef, Object literal) { + return CompletableFuture.completedFuture(Optional.empty()); + } + + @Override + public CompletableFuture> visitLessOrEqual( + FieldRef fieldRef, Object literal) { + return CompletableFuture.completedFuture(Optional.empty()); + } + + @Override + public CompletableFuture> visitEqual( + FieldRef fieldRef, Object literal) { + return CompletableFuture.completedFuture(Optional.empty()); + } + + @Override + public CompletableFuture> visitGreaterThan( + FieldRef fieldRef, Object literal) { + return CompletableFuture.completedFuture(Optional.empty()); + } + + @Override + public CompletableFuture> visitIn( + FieldRef fieldRef, List literals) { + return CompletableFuture.completedFuture(Optional.empty()); + } + + @Override + public CompletableFuture> visitNotIn( + FieldRef fieldRef, List literals) { + return CompletableFuture.completedFuture(Optional.empty()); + } +} diff --git a/paimon-vector/paimon-vector-index/src/main/java/org/apache/paimon/vector/index/VectorGlobalIndexWriter.java b/paimon-vector/paimon-vector-index/src/main/java/org/apache/paimon/vector/index/VectorGlobalIndexWriter.java new file mode 100644 index 000000000000..19582009581d --- /dev/null +++ b/paimon-vector/paimon-vector-index/src/main/java/org/apache/paimon/vector/index/VectorGlobalIndexWriter.java @@ -0,0 +1,405 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.vector.index; + +import org.apache.paimon.data.InternalArray; +import org.apache.paimon.data.InternalVector; +import org.apache.paimon.fs.PositionOutputStream; +import org.apache.paimon.globalindex.GlobalIndexSingletonWriter; +import org.apache.paimon.globalindex.ResultEntry; +import org.apache.paimon.globalindex.io.GlobalIndexFileWriter; +import org.apache.paimon.index.vector.VectorIndexWriter; +import org.apache.paimon.types.ArrayType; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.FloatType; +import org.apache.paimon.types.VectorType; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.channels.FileChannel; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +/** + * Vector global index writer using paimon-vector-index. + * + *

    Vectors are spilled to a temporary file on disk as they arrive via {@link #write(Object)}, + * keeping Java heap usage constant (~8 MB buffer). During index build, vectors are read back for + * training and batch insertion. + * + *

    Thread safety: This class is not thread-safe. + */ +public class VectorGlobalIndexWriter implements GlobalIndexSingletonWriter, Closeable { + + private static final String FILE_NAME_PREFIX = "vector"; + + private static final Logger LOG = LoggerFactory.getLogger(VectorGlobalIndexWriter.class); + + private static final int IO_BUFFER_SIZE = 8 * 1024 * 1024; + private static final int ADD_BATCH_SIZE = 10000; + + private final GlobalIndexFileWriter fileWriter; + private final String identifier; + private final Map nativeOptions; + private final int dim; + + private File tempVectorFile; + private FileChannel writeChannel; + private ByteBuffer writeBuf; + + private final int recordSizeInBytes; + private final float[] vectorBuf; + private long count; + private boolean closed; + + private long logicalRowId; + + public VectorGlobalIndexWriter( + GlobalIndexFileWriter fileWriter, + DataType fieldType, + Map options, + String identifier) { + this.fileWriter = fileWriter; + this.identifier = identifier; + validateFieldType(fieldType); + this.nativeOptions = options; + this.dim = Integer.parseInt(options.get("dimension")); + this.count = 0; + this.closed = false; + this.recordSizeInBytes = checkedRecordSize(dim, IO_BUFFER_SIZE); + this.vectorBuf = new float[dim]; + + try { + this.tempVectorFile = File.createTempFile("paimon-vector-index-vectors-", ".bin"); + this.tempVectorFile.deleteOnExit(); + @SuppressWarnings("resource") + RandomAccessFile raf = new RandomAccessFile(tempVectorFile, "rw"); + this.writeChannel = raf.getChannel(); + this.writeBuf = ByteBuffer.allocateDirect(IO_BUFFER_SIZE); + this.writeBuf.order(ByteOrder.nativeOrder()); + } catch (IOException e) { + throw new RuntimeException("Failed to create temp vector file", e); + } + } + + private void validateFieldType(DataType dataType) { + if (dataType instanceof VectorType) { + DataType elementType = ((VectorType) dataType).getElementType(); + if (!(elementType instanceof FloatType)) { + throw new IllegalArgumentException( + "Vector index requires float vector, but got: " + elementType); + } + return; + } + if (dataType instanceof ArrayType) { + DataType elementType = ((ArrayType) dataType).getElementType(); + if (!(elementType instanceof FloatType)) { + throw new IllegalArgumentException( + "Vector index requires float array, but got: " + elementType); + } + return; + } + throw new IllegalArgumentException( + "Vector index requires VectorType or ArrayType, but got: " + dataType); + } + + @Override + public void write(Object fieldData) { + if (fieldData == null) { + logicalRowId++; + return; + } + + float[] src = materializeAndValidate(fieldData); + + if (writeBuf.remaining() < recordSizeInBytes) { + flushWriteBuffer(); + } + writeBuf.putLong(logicalRowId); + for (int i = 0; i < dim; i++) { + writeBuf.putFloat(src[i]); + } + logicalRowId++; + count++; + } + + private float[] materializeAndValidate(Object fieldData) { + if (fieldData instanceof float[]) { + float[] vector = (float[]) fieldData; + checkDimension(vector.length); + for (int i = 0; i < dim; i++) { + checkFinite(vector[i], i); + } + return vector; + } else if (fieldData instanceof InternalVector) { + InternalVector vector = (InternalVector) fieldData; + checkDimension(vector.size()); + for (int i = 0; i < dim; i++) { + float v = vector.getFloat(i); + checkFinite(v, i); + vectorBuf[i] = v; + } + return vectorBuf; + } else if (fieldData instanceof InternalArray) { + InternalArray array = (InternalArray) fieldData; + checkDimension(array.size()); + for (int i = 0; i < dim; i++) { + if (array.isNullAt(i)) { + throw new IllegalArgumentException("Vector element at index " + i + " is null"); + } + float v = array.getFloat(i); + checkFinite(v, i); + vectorBuf[i] = v; + } + return vectorBuf; + } else { + throw new RuntimeException( + "Unsupported vector type: " + fieldData.getClass().getName()); + } + } + + private void flushWriteBuffer() { + try { + writeBuf.flip(); + while (writeBuf.hasRemaining()) { + writeChannel.write(writeBuf); + } + writeBuf.clear(); + } catch (IOException e) { + throw new RuntimeException("Failed to flush vector buffer to disk", e); + } + } + + @Override + public List finish() { + try { + if (count == 0) { + writeChannel.close(); + writeChannel = null; + writeBuf = null; + return Collections.emptyList(); + } + flushWriteBuffer(); + writeChannel.close(); + writeChannel = null; + writeBuf = null; + return Collections.singletonList(buildIndex()); + } catch (IOException e) { + throw new RuntimeException("Failed to write vector global index", e); + } finally { + if (tempVectorFile != null) { + tempVectorFile.delete(); + tempVectorFile = null; + } + } + } + + private ResultEntry buildIndex() throws IOException { + LOG.info("{} vector index build started: {} vectors, dim={}", identifier, count, dim); + long buildStart = System.currentTimeMillis(); + + try (VectorIndexWriter writer = new VectorIndexWriter(nativeOptions)) { + + // Phase 1: Train + long phaseStart = System.currentTimeMillis(); + LOG.info("{} train phase started", identifier); + trainFromTempFile(writer); + LOG.info( + "{} train phase done in {} ms", + identifier, + System.currentTimeMillis() - phaseStart); + + // Phase 2: Add all vectors in batches + phaseStart = System.currentTimeMillis(); + LOG.info("{} add phase started", identifier); + addVectorsFromTempFile(writer); + LOG.info( + "{} add phase done in {} ms", + identifier, + System.currentTimeMillis() - phaseStart); + + // Phase 3: Write index + phaseStart = System.currentTimeMillis(); + LOG.info("{} write phase started", identifier); + String fileName = fileWriter.newFileName(fileNamePrefix()); + try (PositionOutputStream out = fileWriter.newOutputStream(fileName)) { + writer.writeIndex(out); + out.flush(); + } + LOG.info( + "{} write phase done in {} ms", + identifier, + System.currentTimeMillis() - phaseStart); + + LOG.info( + "{} vector index build completed in {} ms", + identifier, + System.currentTimeMillis() - buildStart); + + VectorIndexMeta meta = new VectorIndexMeta(); + return new ResultEntry(fileName, logicalRowId, meta.serialize()); + } + } + + private String fileNamePrefix() { + return FILE_NAME_PREFIX + "-" + identifier; + } + + private void trainFromTempFile(VectorIndexWriter writer) throws IOException { + int trainCount = (int) count; + float[] trainData = new float[trainCount * dim]; + + try (RandomAccessFile raf = new RandomAccessFile(tempVectorFile, "r"); + FileChannel channel = raf.getChannel()) { + ByteBuffer readBuf = ByteBuffer.allocateDirect(IO_BUFFER_SIZE); + readBuf.order(ByteOrder.nativeOrder()); + readBuf.limit(0); + + for (int i = 0; i < trainCount; i++) { + ensureAvailable(readBuf, channel, recordSizeInBytes); + readBuf.getLong(); // skip rowId + for (int d = 0; d < dim; d++) { + trainData[i * dim + d] = readBuf.getFloat(); + } + } + } + + writer.train(trainData, trainCount); + } + + private void addVectorsFromTempFile(VectorIndexWriter writer) throws IOException { + long[] batchIds = new long[ADD_BATCH_SIZE]; + float[] batchVectors = new float[ADD_BATCH_SIZE * dim]; + + try (RandomAccessFile raf = new RandomAccessFile(tempVectorFile, "r"); + FileChannel channel = raf.getChannel()) { + ByteBuffer readBuf = ByteBuffer.allocateDirect(IO_BUFFER_SIZE); + readBuf.order(ByteOrder.nativeOrder()); + readBuf.limit(0); + + long remaining = count; + int lastLoggedPercent = -1; + + while (remaining > 0) { + int thisBatch = (int) Math.min(ADD_BATCH_SIZE, remaining); + for (int i = 0; i < thisBatch; i++) { + ensureAvailable(readBuf, channel, recordSizeInBytes); + batchIds[i] = readBuf.getLong(); + for (int d = 0; d < dim; d++) { + batchVectors[i * dim + d] = readBuf.getFloat(); + } + } + if (thisBatch == ADD_BATCH_SIZE) { + writer.addVectors(batchIds, batchVectors, thisBatch); + } else { + writer.addVectors( + Arrays.copyOf(batchIds, thisBatch), + Arrays.copyOf(batchVectors, thisBatch * dim), + thisBatch); + } + remaining -= thisBatch; + + int percent = (int) ((count - remaining) * 100 / count); + if (percent / 10 > lastLoggedPercent / 10) { + LOG.info( + "{} add progress: {}/{} vectors ({}%)", + identifier, count - remaining, count, percent); + lastLoggedPercent = percent; + } + } + } + } + + private static void ensureAvailable(ByteBuffer readBuf, FileChannel channel, int minBytes) + throws IOException { + int zeroReadCount = 0; + while (readBuf.remaining() < minBytes) { + readBuf.compact(); + int bytesRead = channel.read(readBuf); + readBuf.flip(); + if (bytesRead == -1) { + throw new IOException("Unexpected end of temp file"); + } + if (bytesRead == 0) { + if (++zeroReadCount > 100) { + throw new IOException( + "Unable to read from temp file: repeated zero-byte reads"); + } + } else { + zeroReadCount = 0; + } + } + } + + private void checkDimension(int actualDim) { + if (actualDim != dim) { + throw new IllegalArgumentException( + String.format( + "Vector dimension mismatch: expected %d, but got %d", dim, actualDim)); + } + } + + private void checkFinite(float value, int elementIndex) { + if (!Float.isFinite(value)) { + throw new IllegalArgumentException( + String.format( + "Vector element at rowId=%d, index=%d is %s", + logicalRowId, elementIndex, Float.toString(value))); + } + } + + private static int checkedRecordSize(int dim, int bufferCapacity) { + long recordSize = Long.BYTES + (long) dim * Float.BYTES; + if (recordSize > bufferCapacity || recordSize > Integer.MAX_VALUE) { + throw new IllegalStateException( + "Vector record size " + + recordSize + + " exceeds buffer capacity " + + bufferCapacity); + } + return (int) recordSize; + } + + @Override + public void close() { + if (!closed) { + closed = true; + try { + if (writeChannel != null) { + writeChannel.close(); + } + } catch (IOException ignored) { + } + writeBuf = null; + if (tempVectorFile != null) { + tempVectorFile.delete(); + tempVectorFile = null; + } + } + } +} diff --git a/paimon-vector/paimon-vector-index/src/main/java/org/apache/paimon/vector/index/VectorGlobalIndexer.java b/paimon-vector/paimon-vector-index/src/main/java/org/apache/paimon/vector/index/VectorGlobalIndexer.java new file mode 100644 index 000000000000..a3f3bf51fb9a --- /dev/null +++ b/paimon-vector/paimon-vector-index/src/main/java/org/apache/paimon/vector/index/VectorGlobalIndexer.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.vector.index; + +import org.apache.paimon.globalindex.GlobalIndexIOMeta; +import org.apache.paimon.globalindex.GlobalIndexReader; +import org.apache.paimon.globalindex.GlobalIndexWriter; +import org.apache.paimon.globalindex.GlobalIndexer; +import org.apache.paimon.globalindex.io.GlobalIndexFileReader; +import org.apache.paimon.globalindex.io.GlobalIndexFileWriter; +import org.apache.paimon.types.DataType; + +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ExecutorService; + +/** Vector global indexer backed by paimon-vector-index. */ +public class VectorGlobalIndexer implements GlobalIndexer { + + private final DataType fieldType; + private final Map options; + private final String identifier; + + public VectorGlobalIndexer(DataType fieldType, Map options, String identifier) { + this.fieldType = fieldType; + this.options = Objects.requireNonNull(options, "options must not be null"); + this.identifier = Objects.requireNonNull(identifier, "identifier must not be null"); + } + + @Override + public GlobalIndexWriter createWriter(GlobalIndexFileWriter fileWriter) { + return new VectorGlobalIndexWriter(fileWriter, fieldType, options, identifier); + } + + @Override + public GlobalIndexReader createReader( + GlobalIndexFileReader fileReader, + List files, + ExecutorService executor) { + return new VectorGlobalIndexReader(fileReader, files, fieldType, executor); + } +} diff --git a/paimon-vector/paimon-vector-index/src/main/java/org/apache/paimon/vector/index/VectorGlobalIndexerFactory.java b/paimon-vector/paimon-vector-index/src/main/java/org/apache/paimon/vector/index/VectorGlobalIndexerFactory.java new file mode 100644 index 000000000000..354c967b56ad --- /dev/null +++ b/paimon-vector/paimon-vector-index/src/main/java/org/apache/paimon/vector/index/VectorGlobalIndexerFactory.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.vector.index; + +import org.apache.paimon.globalindex.GlobalIndexer; +import org.apache.paimon.globalindex.GlobalIndexerFactory; +import org.apache.paimon.options.Options; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.VectorType; + +import java.util.LinkedHashMap; +import java.util.Map; + +/** Factory for creating vector indexes backed by paimon-vector-index. */ +public abstract class VectorGlobalIndexerFactory implements GlobalIndexerFactory { + + private static final int DEFAULT_DIMENSION = 128; + + @Override + public GlobalIndexer create(DataField field, Options options) { + String identifier = identifier(); + return new VectorGlobalIndexer( + field.type(), nativeOptions(field.type(), options, identifier), identifier); + } + + static Map nativeOptions( + DataType fieldType, Options tableOptions, String identifier) { + Map nativeOptions = new LinkedHashMap<>(); + String optionPrefix = identifier + "."; + for (Map.Entry entry : tableOptions.toMap().entrySet()) { + String optionKey = entry.getKey(); + if (optionKey.startsWith(optionPrefix)) { + String nativeKey = nativeOptionKey(optionKey.substring(optionPrefix.length())); + if (nativeKey != null) { + nativeOptions.put(nativeKey, entry.getValue()); + } + } + } + nativeOptions.put("index.type", identifier.replace('-', '_')); + nativeOptions.put( + "dimension", String.valueOf(dimension(fieldType, nativeOptions, identifier))); + return nativeOptions; + } + + private static String nativeOptionKey(String optionKey) { + switch (optionKey) { + case "index.dimension": + case "dimension": + return "dimension"; + case "distance.metric": + case "metric": + return "metric"; + case "nlist": + case "pq.m": + case "hnsw.m": + case "hnsw.ef-construction": + case "hnsw.max-level": + return optionKey; + case "pq.use-opq": + case "use-opq": + return "use-opq"; + default: + return null; + } + } + + private static int dimension( + DataType fieldType, Map nativeOptions, String identifier) { + if (fieldType instanceof VectorType) { + return ((VectorType) fieldType).getLength(); + } + String dimension = nativeOptions.get("dimension"); + int value = dimension == null ? DEFAULT_DIMENSION : Integer.parseInt(dimension); + if (value <= 0) { + throw new IllegalArgumentException( + "Invalid value for '" + + identifier + + ".dimension': " + + value + + ". Must be a positive integer."); + } + return value; + } +} diff --git a/paimon-vector/paimon-vector-index/src/main/java/org/apache/paimon/vector/index/VectorIndexMeta.java b/paimon-vector/paimon-vector-index/src/main/java/org/apache/paimon/vector/index/VectorIndexMeta.java new file mode 100644 index 000000000000..ea18e7efebc9 --- /dev/null +++ b/paimon-vector/paimon-vector-index/src/main/java/org/apache/paimon/vector/index/VectorIndexMeta.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.vector.index; + +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.type.TypeReference; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + +import java.io.IOException; +import java.io.Serializable; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.Map; + +/** + * Metadata for a vector index file. + * + *

    Serialized as an empty JSON {@code Map}. Search-time parameters are passed + * through {@link org.apache.paimon.predicate.VectorSearch#options()}. + */ +public class VectorIndexMeta implements Serializable { + + private static final long serialVersionUID = 1L; + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private static final TypeReference> MAP_TYPE_REF = + new TypeReference>() {}; + + VectorIndexMeta() {} + + public byte[] serialize() throws IOException { + return OBJECT_MAPPER.writeValueAsBytes(Collections.emptyMap()); + } + + public static VectorIndexMeta deserialize(byte[] data) throws IOException { + Map ignored = OBJECT_MAPPER.readValue(data, MAP_TYPE_REF); + return new VectorIndexMeta(); + } +} diff --git a/paimon-vector/paimon-vector-index/src/main/resources/META-INF/services/org.apache.paimon.globalindex.GlobalIndexerFactory b/paimon-vector/paimon-vector-index/src/main/resources/META-INF/services/org.apache.paimon.globalindex.GlobalIndexerFactory new file mode 100644 index 000000000000..a11570704a1f --- /dev/null +++ b/paimon-vector/paimon-vector-index/src/main/resources/META-INF/services/org.apache.paimon.globalindex.GlobalIndexerFactory @@ -0,0 +1,19 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.paimon.vector.index.IvfFlatVectorGlobalIndexerFactory +org.apache.paimon.vector.index.IvfPqAlgorithmVectorGlobalIndexerFactory +org.apache.paimon.vector.index.IvfHnswFlatVectorGlobalIndexerFactory +org.apache.paimon.vector.index.IvfHnswSqVectorGlobalIndexerFactory diff --git a/paimon-vector/paimon-vector-index/src/test/java/org/apache/paimon/vector/index/SeekableStreamVectorIndexInputTest.java b/paimon-vector/paimon-vector-index/src/test/java/org/apache/paimon/vector/index/SeekableStreamVectorIndexInputTest.java new file mode 100644 index 000000000000..ddb29eab6e97 --- /dev/null +++ b/paimon-vector/paimon-vector-index/src/test/java/org/apache/paimon/vector/index/SeekableStreamVectorIndexInputTest.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.vector.index; + +import org.apache.paimon.fs.SeekableInputStream; +import org.apache.paimon.fs.VectoredReadable; + +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link VectorGlobalIndexReader.SeekableStreamVectorIndexInput}. */ +public class SeekableStreamVectorIndexInputTest { + + @Test + public void testVectoredReadableInputUsesParallelPositionReads() throws Exception { + byte[] data = data(128 * 1024); + TestVectoredSeekableInputStream input = new TestVectoredSeekableInputStream(data, 2); + VectorGlobalIndexReader.SeekableStreamVectorIndexInput indexInput = + new VectorGlobalIndexReader.SeekableStreamVectorIndexInput(input); + + byte[][] buffers = new byte[][] {new byte[64], new byte[64]}; + indexInput.pread(new long[] {0, 32 * 1024}, buffers); + + assertThat(buffers[0]).isEqualTo(slice(data, 0, 64)); + assertThat(buffers[1]).isEqualTo(slice(data, 32 * 1024, 64)); + assertThat(input.positionReads).hasValue(2); + assertThat(input.sequentialReads).hasValue(0); + assertThat(input.maxActiveReads).hasValue(2); + } + + @Test + public void testFallbackToSequentialReadWhenRangesOverlap() { + byte[] data = data(1024); + TestVectoredSeekableInputStream input = new TestVectoredSeekableInputStream(data, 0); + VectorGlobalIndexReader.SeekableStreamVectorIndexInput indexInput = + new VectorGlobalIndexReader.SeekableStreamVectorIndexInput(input); + + byte[][] buffers = new byte[][] {new byte[64], new byte[64]}; + indexInput.pread(new long[] {0, 32}, buffers); + + assertThat(buffers[0]).isEqualTo(slice(data, 0, 64)); + assertThat(buffers[1]).isEqualTo(slice(data, 32, 64)); + assertThat(input.positionReads).hasValue(0); + assertThat(input.sequentialReads).hasValue(2); + } + + private static byte[] data(int length) { + byte[] data = new byte[length]; + for (int i = 0; i < length; i++) { + data[i] = (byte) i; + } + return data; + } + + private static byte[] slice(byte[] data, int offset, int length) { + byte[] expected = new byte[length]; + System.arraycopy(data, offset, expected, 0, length); + return expected; + } + + private static class TestVectoredSeekableInputStream extends SeekableInputStream + implements VectoredReadable { + + private final byte[] data; + private final CountDownLatch readsStarted; + private final CountDownLatch finishReads = new CountDownLatch(1); + private final AtomicInteger activeReads = new AtomicInteger(); + private final AtomicInteger positionReads = new AtomicInteger(); + private final AtomicInteger sequentialReads = new AtomicInteger(); + private final AtomicInteger maxActiveReads = new AtomicInteger(); + + private int position; + + private TestVectoredSeekableInputStream(byte[] data, int expectedPositionReads) { + this.data = data; + this.readsStarted = new CountDownLatch(expectedPositionReads); + if (expectedPositionReads == 0) { + finishReads.countDown(); + } + } + + @Override + public void seek(long desired) { + position = (int) desired; + } + + @Override + public long getPos() { + return position; + } + + @Override + public int read() { + return data[position++]; + } + + @Override + public int read(byte[] buffer, int offset, int length) { + System.arraycopy(data, position, buffer, offset, length); + position += length; + sequentialReads.incrementAndGet(); + return length; + } + + @Override + public void close() {} + + @Override + public int pread(long position, byte[] buffer, int offset, int length) throws IOException { + int active = activeReads.incrementAndGet(); + maxActiveReads.accumulateAndGet(active, Math::max); + readsStarted.countDown(); + try { + if (!readsStarted.await(5, TimeUnit.SECONDS)) { + throw new IOException("Timed out waiting for parallel vector index reads"); + } + finishReads.countDown(); + System.arraycopy(data, (int) position, buffer, offset, length); + positionReads.incrementAndGet(); + return length; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException(e); + } finally { + activeReads.decrementAndGet(); + } + } + } +} diff --git a/paimon-vector/paimon-vector-index/src/test/java/org/apache/paimon/vector/index/VectorGlobalIndexTest.java b/paimon-vector/paimon-vector-index/src/test/java/org/apache/paimon/vector/index/VectorGlobalIndexTest.java new file mode 100644 index 000000000000..73f8a4b28f49 --- /dev/null +++ b/paimon-vector/paimon-vector-index/src/test/java/org/apache/paimon/vector/index/VectorGlobalIndexTest.java @@ -0,0 +1,413 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.vector.index; + +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.PositionOutputStream; +import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.paimon.globalindex.GlobalIndexIOMeta; +import org.apache.paimon.globalindex.ResultEntry; +import org.apache.paimon.globalindex.ScoredGlobalIndexResult; +import org.apache.paimon.globalindex.io.GlobalIndexFileReader; +import org.apache.paimon.globalindex.io.GlobalIndexFileWriter; +import org.apache.paimon.index.vector.NativeLoader; +import org.apache.paimon.options.Options; +import org.apache.paimon.predicate.VectorSearch; +import org.apache.paimon.types.ArrayType; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.FloatType; +import org.apache.paimon.types.IntType; +import org.apache.paimon.types.VectorType; +import org.apache.paimon.utils.RoaringNavigableMap64; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assumptions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Tests for {@link VectorGlobalIndexWriter} and {@link VectorGlobalIndexReader}. */ +public class VectorGlobalIndexTest { + + @TempDir java.nio.file.Path tempDir; + + private static final String IVF_PQ_IDENTIFIER = + IvfPqAlgorithmVectorGlobalIndexerFactory.IDENTIFIER; + private FileIO fileIO; + private Path indexPath; + private DataType vectorType; + private final String fieldName = "vec"; + private ExecutorService executor; + + private static boolean isNativeAvailable() { + try { + NativeLoader.loadJni(); + return true; + } catch (Throwable t) { + return false; + } + } + + @BeforeEach + public void setup() { + fileIO = new LocalFileIO(); + indexPath = new Path(tempDir.toString()); + vectorType = new ArrayType(new FloatType()); + executor = Executors.newCachedThreadPool(); + } + + @AfterEach + public void cleanup() throws IOException { + if (executor != null) { + executor.shutdownNow(); + } + if (fileIO != null) { + fileIO.delete(indexPath, true); + } + } + + // =================== Tests that do NOT need native library ===================== + + @Test + public void testDimensionMismatch() { + Options options = createDefaultOptions(64); + GlobalIndexFileWriter fileWriter = createFileWriter(indexPath); + VectorGlobalIndexWriter writer = createIvfPqWriter(fileWriter, vectorType, options); + + float[] wrongDimVector = new float[32]; + assertThatThrownBy(() -> writer.write(wrongDimVector)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("dimension mismatch"); + } + + @Test + public void testVectorTypeRejectsNonFloatElement() { + DataType intVecType = new VectorType(2, new IntType()); + Options options = createDefaultOptions(2); + options.setInteger("ivf-pq.pq.m", 1); + GlobalIndexFileWriter fileWriter = createFileWriter(indexPath); + + assertThatThrownBy(() -> createIvfPqWriter(fileWriter, intVecType, options)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("float"); + } + + @Test + public void testNanInVectorRejected() { + Options options = createDefaultOptions(2); + options.setInteger("ivf-pq.pq.m", 1); + GlobalIndexFileWriter fileWriter = createFileWriter(indexPath); + VectorGlobalIndexWriter writer = createIvfPqWriter(fileWriter, vectorType, options); + + assertThatThrownBy(() -> writer.write(new float[] {1.0f, Float.NaN})) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("rowId=0") + .hasMessageContaining("index=1") + .hasMessageContaining("NaN"); + } + + @Test + public void testInfinityInVectorRejected() { + Options options = createDefaultOptions(2); + options.setInteger("ivf-pq.pq.m", 1); + GlobalIndexFileWriter fileWriter = createFileWriter(indexPath); + VectorGlobalIndexWriter writer = createIvfPqWriter(fileWriter, vectorType, options); + + writer.write(null); // row 0 - null, advances logicalRowId + assertThatThrownBy(() -> writer.write(new float[] {Float.POSITIVE_INFINITY, 0.0f})) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("rowId=1") + .hasMessageContaining("index=0") + .hasMessageContaining("Infinity"); + } + + @Test + public void testAllNullReturnsEmpty() { + Options options = createDefaultOptions(2); + options.setInteger("ivf-pq.pq.m", 1); + GlobalIndexFileWriter fileWriter = createFileWriter(indexPath); + VectorGlobalIndexWriter writer = createIvfPqWriter(fileWriter, vectorType, options); + + writer.write(null); + writer.write(null); + writer.write(null); + + List results = writer.finish(); + assertThat(results).isEmpty(); + } + + @Test + public void testMetaSerializationIsEmptyMap() throws IOException { + VectorIndexMeta meta = new VectorIndexMeta(); + byte[] serialized = meta.serialize(); + VectorIndexMeta deserialized = VectorIndexMeta.deserialize(serialized); + + assertThat(new String(serialized, StandardCharsets.UTF_8)).isEqualTo("{}"); + assertThat(new String(deserialized.serialize(), StandardCharsets.UTF_8)).isEqualTo("{}"); + } + + @Test + public void testVectorSearchParameterParsing() { + Map parameters = new HashMap<>(); + parameters.put("ivf.nprobe", "24"); + parameters.put("hnsw.ef_search", "80"); + parameters.put("ignored", "bad"); + + assertThat(VectorGlobalIndexReader.nprobe(parameters)).isEqualTo(24); + assertThat(VectorGlobalIndexReader.efSearch(parameters)).isEqualTo(80); + assertThat(VectorGlobalIndexReader.nprobe(Collections.emptyMap())).isEqualTo(16); + assertThat(VectorGlobalIndexReader.efSearch(Collections.emptyMap())).isEqualTo(0); + } + + @Test + public void testVectorSearchParameterRangeValidationDelegatedToNative() { + assertThat(VectorGlobalIndexReader.nprobe(Collections.singletonMap("ivf.nprobe", "0"))) + .isEqualTo(0); + assertThat( + VectorGlobalIndexReader.efSearch( + Collections.singletonMap("hnsw.ef_search", "-1"))) + .isEqualTo(-1); + } + + // =================== Tests that NEED native library ===================== + + @Test + public void testFloatVectorEndToEnd() throws IOException { + Assumptions.assumeTrue(isNativeAvailable(), "Vector index native library not available"); + + int dimension = 2; + Options options = createDefaultOptions(dimension); + options.setInteger("ivf-pq.nlist", 2); + options.setInteger("ivf-pq.pq.m", 1); + + float[][] vectors = + new float[][] { + new float[] {1.0f, 0.0f}, + new float[] {0.95f, 0.1f}, + new float[] {0.1f, 0.95f}, + new float[] {0.98f, 0.05f}, + new float[] {0.0f, 1.0f}, + new float[] {0.05f, 0.98f} + }; + + GlobalIndexFileWriter fileWriter = createFileWriter(indexPath); + VectorGlobalIndexWriter writer = createIvfPqWriter(fileWriter, vectorType, options); + Arrays.stream(vectors).forEach(writer::write); + List results = writer.finish(); + List metas = toIOMetas(results, indexPath); + + GlobalIndexFileReader fileReader = createFileReader(indexPath); + try (VectorGlobalIndexReader reader = + new VectorGlobalIndexReader(fileReader, metas, vectorType, executor)) { + VectorSearch vectorSearch = new VectorSearch(vectors[0], 3, fieldName); + ScoredGlobalIndexResult result = reader.visitVectorSearch(vectorSearch).join().get(); + assertThat(result.results().getLongCardinality()).isEqualTo(3); + assertThat(result.results().contains(0L)).isTrue(); + float score = result.scoreGetter().score(0L); + assertThat(score).isNotNaN(); + } + } + + @Test + public void testSearchWithRoaringFilter() throws IOException { + Assumptions.assumeTrue(isNativeAvailable(), "Vector index native library not available"); + + int dimension = 2; + Options options = createDefaultOptions(dimension); + options.setInteger("ivf-pq.nlist", 2); + options.setInteger("ivf-pq.pq.m", 1); + + float[][] vectors = + new float[][] { + new float[] {1.0f, 0.0f}, + new float[] {0.95f, 0.1f}, + new float[] {0.9f, 0.2f}, + new float[] {-1.0f, 0.0f}, + new float[] {-0.95f, 0.1f}, + new float[] {-0.9f, 0.2f} + }; + + GlobalIndexFileWriter fileWriter = createFileWriter(indexPath); + VectorGlobalIndexWriter writer = createIvfPqWriter(fileWriter, vectorType, options); + Arrays.stream(vectors).forEach(writer::write); + List results = writer.finish(); + List metas = toIOMetas(results, indexPath); + + GlobalIndexFileReader fileReader = createFileReader(indexPath); + try (VectorGlobalIndexReader reader = + new VectorGlobalIndexReader(fileReader, metas, vectorType, executor)) { + + // Filter to rows {1, 4} only + RoaringNavigableMap64 filter = new RoaringNavigableMap64(); + filter.add(1L); + filter.add(4L); + VectorSearch search = + new VectorSearch(vectors[0], 6, fieldName).withIncludeRowIds(filter); + ScoredGlobalIndexResult result = reader.visitVectorSearch(search).join().get(); + assertThat(result.results().contains(1L)).isTrue(); + assertThat(result.results().contains(4L)).isTrue(); + assertThat(result.results().getLongCardinality()).isEqualTo(2); + } + } + + @Test + public void testNullVectorSkipWithCorrectIds() throws IOException { + Assumptions.assumeTrue(isNativeAvailable(), "Vector index native library not available"); + + int dimension = 2; + Options options = createDefaultOptions(dimension); + options.setInteger("ivf-pq.nlist", 2); + options.setInteger("ivf-pq.pq.m", 1); + + float[][] vectors = + new float[][] { + new float[] {1.0f, 0.0f}, + new float[] {0.1f, 0.95f}, + new float[] {0.0f, 1.0f} + }; + + GlobalIndexFileWriter fileWriter = createFileWriter(indexPath); + VectorGlobalIndexWriter writer = createIvfPqWriter(fileWriter, vectorType, options); + + writer.write(vectors[0]); // row 0 + writer.write(null); // row 1 - null + writer.write(vectors[1]); // row 2 + writer.write(null); // row 3 - null + writer.write(null); // row 4 - null + writer.write(vectors[2]); // row 5 + + List results = writer.finish(); + assertThat(results).hasSize(1); + assertThat(results.get(0).rowCount()).isEqualTo(6); + + List metas = toIOMetas(results, indexPath); + GlobalIndexFileReader fileReader = createFileReader(indexPath); + try (VectorGlobalIndexReader reader = + new VectorGlobalIndexReader(fileReader, metas, vectorType, executor)) { + VectorSearch vectorSearch = new VectorSearch(vectors[0], 3, fieldName); + ScoredGlobalIndexResult result = reader.visitVectorSearch(vectorSearch).join().get(); + assertThat(result.results().getLongCardinality()).isEqualTo(3); + assertThat(result.results().contains(0L)).isTrue(); + assertThat(result.results().contains(2L)).isTrue(); + assertThat(result.results().contains(5L)).isTrue(); + assertThat(result.results().contains(1L)).isFalse(); + assertThat(result.results().contains(3L)).isFalse(); + assertThat(result.results().contains(4L)).isFalse(); + } + } + + @Test + public void testViaIndexer() throws IOException { + Assumptions.assumeTrue(isNativeAvailable(), "Vector index native library not available"); + + int dimension = 2; + Options options = createDefaultOptions(dimension); + options.setInteger("ivf-pq.nlist", 2); + options.setInteger("ivf-pq.pq.m", 1); + + float[][] vectors = + new float[][] { + new float[] {1.0f, 0.0f}, + new float[] {0.0f, 1.0f}, + new float[] {0.7f, 0.7f} + }; + + VectorGlobalIndexer indexer = + new VectorGlobalIndexer( + vectorType, + VectorGlobalIndexerFactory.nativeOptions( + vectorType, options, IVF_PQ_IDENTIFIER), + IVF_PQ_IDENTIFIER); + + GlobalIndexFileWriter fileWriter = createFileWriter(indexPath); + VectorGlobalIndexWriter writer = (VectorGlobalIndexWriter) indexer.createWriter(fileWriter); + Arrays.stream(vectors).forEach(writer::write); + List results = writer.finish(); + List metas = toIOMetas(results, indexPath); + + GlobalIndexFileReader fileReader = createFileReader(indexPath); + try (VectorGlobalIndexReader reader = + (VectorGlobalIndexReader) indexer.createReader(fileReader, metas, executor)) { + VectorSearch vectorSearch = new VectorSearch(vectors[0], 2, fieldName); + ScoredGlobalIndexResult result = reader.visitVectorSearch(vectorSearch).join().get(); + assertThat(result.results().getLongCardinality()).isEqualTo(2); + assertThat(result.results().contains(0L)).isTrue(); + } + } + + // =================== Helpers ===================== + + private VectorGlobalIndexWriter createIvfPqWriter( + GlobalIndexFileWriter fileWriter, DataType fieldType, Options options) { + return new VectorGlobalIndexWriter( + fileWriter, + fieldType, + VectorGlobalIndexerFactory.nativeOptions(fieldType, options, IVF_PQ_IDENTIFIER), + IVF_PQ_IDENTIFIER); + } + + private Options createDefaultOptions(int dimension) { + Options options = new Options(); + options.setInteger("ivf-pq.dimension", dimension); + options.setString("ivf-pq.metric", "l2"); + return options; + } + + private GlobalIndexFileWriter createFileWriter(Path path) { + return new GlobalIndexFileWriter() { + @Override + public String newFileName(String prefix) { + return prefix + "-" + UUID.randomUUID(); + } + + @Override + public PositionOutputStream newOutputStream(String fileName) throws IOException { + return fileIO.newOutputStream(new Path(path, fileName), false); + } + }; + } + + private GlobalIndexFileReader createFileReader(Path path) { + return meta -> fileIO.newInputStream(new Path(path, meta.filePath())); + } + + private List toIOMetas(List results, Path path) + throws IOException { + assertThat(results).hasSize(1); + ResultEntry result = results.get(0); + Path filePath = new Path(path, result.fileName()); + return Collections.singletonList( + new GlobalIndexIOMeta(filePath, fileIO.getFileSize(filePath), result.meta())); + } +} diff --git a/paimon-vector/paimon-vector-index/src/test/java/org/apache/paimon/vector/index/VectorGlobalIndexerFactoryTest.java b/paimon-vector/paimon-vector-index/src/test/java/org/apache/paimon/vector/index/VectorGlobalIndexerFactoryTest.java new file mode 100644 index 000000000000..1caf082f2c4d --- /dev/null +++ b/paimon-vector/paimon-vector-index/src/test/java/org/apache/paimon/vector/index/VectorGlobalIndexerFactoryTest.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.vector.index; + +import org.apache.paimon.globalindex.GlobalIndexerFactoryUtils; +import org.apache.paimon.options.Options; +import org.apache.paimon.types.ArrayType; +import org.apache.paimon.types.FloatType; +import org.apache.paimon.types.VectorType; + +import org.junit.jupiter.api.Test; + +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Tests for vector global indexer factory SPI registration. */ +public class VectorGlobalIndexerFactoryTest { + + @Test + public void testIdentifier() { + assertThat(new IvfFlatVectorGlobalIndexerFactory().identifier()).isEqualTo("ivf-flat"); + assertThat(new IvfPqAlgorithmVectorGlobalIndexerFactory().identifier()).isEqualTo("ivf-pq"); + assertThat(new IvfHnswFlatVectorGlobalIndexerFactory().identifier()) + .isEqualTo("ivf-hnsw-flat"); + assertThat(new IvfHnswSqVectorGlobalIndexerFactory().identifier()).isEqualTo("ivf-hnsw-sq"); + } + + @Test + public void testLoadByIdentifier() { + assertThat(GlobalIndexerFactoryUtils.load("ivf-flat")) + .isExactlyInstanceOf(IvfFlatVectorGlobalIndexerFactory.class); + assertThat(GlobalIndexerFactoryUtils.load("ivf-pq")) + .isExactlyInstanceOf(IvfPqAlgorithmVectorGlobalIndexerFactory.class); + assertThat(GlobalIndexerFactoryUtils.load("ivf-hnsw-flat")) + .isExactlyInstanceOf(IvfHnswFlatVectorGlobalIndexerFactory.class); + assertThat(GlobalIndexerFactoryUtils.load("ivf-hnsw-sq")) + .isExactlyInstanceOf(IvfHnswSqVectorGlobalIndexerFactory.class); + } + + @Test + public void testNativeOptionsOnlyUsesIdentifierPrefix() { + Options options = new Options(); + options.setString("bucket", "4"); + options.setString("vector.file.format", "vortex"); + options.setString("vector.nlist", "64"); + options.setString("ivf-flat.dimension", "32"); + options.setString("ivf-flat.distance.metric", "cosine"); + options.setString("ivf-flat.nlist", "128"); + options.setString("ivf-pq.nlist", "256"); + + Map nativeOptions = + VectorGlobalIndexerFactory.nativeOptions( + new ArrayType(new FloatType()), + options, + IvfFlatVectorGlobalIndexerFactory.IDENTIFIER); + + assertThat(nativeOptions) + .containsEntry("index.type", "ivf_flat") + .containsEntry("dimension", "32") + .containsEntry("metric", "cosine") + .containsEntry("nlist", "128") + .doesNotContainEntry("nlist", "64") + .doesNotContainEntry("nlist", "256") + .doesNotContainKey("bucket") + .doesNotContainKey("vector.file.format"); + } + + @Test + public void testNativeOptionsUsesVectorTypeDimension() { + Options options = new Options(); + options.setString("ivf-flat.dimension", "32"); + + Map nativeOptions = + VectorGlobalIndexerFactory.nativeOptions( + new VectorType(8, new FloatType()), + options, + IvfFlatVectorGlobalIndexerFactory.IDENTIFIER); + + assertThat(nativeOptions).containsEntry("dimension", "8"); + } + + @Test + public void testInvalidDimension() { + Options options = new Options(); + options.setString("ivf-flat.dimension", "0"); + + assertThatThrownBy( + () -> + VectorGlobalIndexerFactory.nativeOptions( + new ArrayType(new FloatType()), + options, + IvfFlatVectorGlobalIndexerFactory.IDENTIFIER)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("ivf-flat.dimension") + .hasMessageContaining("positive integer"); + } +} diff --git a/paimon-vector/paimon-vector-jni/pom.xml b/paimon-vector/paimon-vector-jni/pom.xml new file mode 100644 index 000000000000..23d00d043a34 --- /dev/null +++ b/paimon-vector/paimon-vector-jni/pom.xml @@ -0,0 +1,69 @@ + + + + 4.0.0 + + + paimon-vector + org.apache.paimon + 1.5-SNAPSHOT + + + paimon-vector-jni + Paimon : Vector Index JNI + + + 1.8 + true + true + true + + + + + org.apache.paimon + paimon-shade-guava-30 + ${paimon.shade.guava.version}-${paimon.shade.version} + + + + org.junit.jupiter + junit-jupiter + ${junit5.version} + test + + + + + + + org.apache.maven.plugins + maven-surefire-plugin + + 1 + true + none + + + + + diff --git a/paimon-vector/paimon-vector-jni/src/main/java/org/apache/paimon/index/vector/NativeLoader.java b/paimon-vector/paimon-vector-jni/src/main/java/org/apache/paimon/index/vector/NativeLoader.java new file mode 100644 index 000000000000..e667bbcb8fc7 --- /dev/null +++ b/paimon-vector/paimon-vector-jni/src/main/java/org/apache/paimon/index/vector/NativeLoader.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.index.vector; + +import org.apache.paimon.shade.guava30.com.google.common.io.ByteStreams; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Locale; + +/** Utility class for loading the native vector index JNI library. */ +public final class NativeLoader { + private static boolean loaded = false; + + private NativeLoader() {} + + public static synchronized void loadJni() { + if (loaded) { + return; + } + + String osName = System.getProperty("os.name").toLowerCase(Locale.ROOT); + String osArch = System.getProperty("os.arch").toLowerCase(Locale.ROOT); + String libName = "libpaimon_vindex_jni"; + + String libExt; + String osShortName; + if (osName.contains("win")) { + osShortName = "win"; + libExt = ".dll"; + libName += libExt; + } else if (osName.contains("mac")) { + osShortName = "darwin"; + libExt = ".dylib"; + libName += libExt; + } else if (osName.contains("nix") || osName.contains("nux")) { + osShortName = "linux"; + libExt = ".so"; + libName += libExt; + } else { + throw new UnsupportedOperationException("Unsupported OS: " + osName); + } + + String libPath = "/native/" + osShortName + "-" + osArch + "/" + libName; + try (InputStream in = NativeLoader.class.getResourceAsStream(libPath)) { + if (in == null) { + throw new FileNotFoundException("Library not found: " + libPath); + } + File tempFile = File.createTempFile("libpaimon_vindex_jni", libExt); + tempFile.deleteOnExit(); + + try (OutputStream out = new FileOutputStream(tempFile)) { + ByteStreams.copy(in, out); + } + libName = tempFile.getAbsolutePath(); + } catch (IOException e) { + throw new RuntimeException("Failed to load library: " + e.getMessage(), e); + } + + System.load(libName); + loaded = true; + } +} diff --git a/paimon-vector/paimon-vector-jni/src/main/java/org/apache/paimon/index/vector/VectorIndexInput.java b/paimon-vector/paimon-vector-jni/src/main/java/org/apache/paimon/index/vector/VectorIndexInput.java new file mode 100644 index 000000000000..dca4430181c4 --- /dev/null +++ b/paimon-vector/paimon-vector-jni/src/main/java/org/apache/paimon/index/vector/VectorIndexInput.java @@ -0,0 +1,23 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.paimon.index.vector; + +public interface VectorIndexInput { + + void pread(long[] positions, byte[][] buffers); +} diff --git a/paimon-vector/paimon-vector-jni/src/main/java/org/apache/paimon/index/vector/VectorIndexMetadata.java b/paimon-vector/paimon-vector-jni/src/main/java/org/apache/paimon/index/vector/VectorIndexMetadata.java new file mode 100644 index 000000000000..4ffd89a4f3de --- /dev/null +++ b/paimon-vector/paimon-vector-jni/src/main/java/org/apache/paimon/index/vector/VectorIndexMetadata.java @@ -0,0 +1,94 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.paimon.index.vector; + +public final class VectorIndexMetadata { + + private final String indexType; + private final int dimension; + private final int nlist; + private final String metric; + private final long totalVectors; + private final int pqM; + private final int hnswM; + private final int hnswEfConstruction; + private final int hnswMaxLevel; + + public VectorIndexMetadata( + String indexType, + int dimension, + int nlist, + String metric, + long totalVectors, + int pqM, + int hnswM, + int efConstruction, + int maxLevel) { + if (indexType == null) { + throw new NullPointerException("indexType"); + } + if (metric == null) { + throw new NullPointerException("metric"); + } + this.indexType = indexType; + this.dimension = dimension; + this.nlist = nlist; + this.metric = metric; + this.totalVectors = totalVectors; + this.pqM = pqM; + this.hnswM = hnswM; + this.hnswEfConstruction = efConstruction; + this.hnswMaxLevel = maxLevel; + } + + public String indexType() { + return indexType; + } + + public int dimension() { + return dimension; + } + + public int nlist() { + return nlist; + } + + public String metric() { + return metric; + } + + public long totalVectors() { + return totalVectors; + } + + public int pqM() { + return pqM; + } + + public int hnswM() { + return hnswM; + } + + public int hnswEfConstruction() { + return hnswEfConstruction; + } + + public int hnswMaxLevel() { + return hnswMaxLevel; + } +} diff --git a/paimon-vector/paimon-vector-jni/src/main/java/org/apache/paimon/index/vector/VectorIndexNative.java b/paimon-vector/paimon-vector-jni/src/main/java/org/apache/paimon/index/vector/VectorIndexNative.java new file mode 100644 index 000000000000..b6a5bbf0a75c --- /dev/null +++ b/paimon-vector/paimon-vector-jni/src/main/java/org/apache/paimon/index/vector/VectorIndexNative.java @@ -0,0 +1,62 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.paimon.index.vector; + +final class VectorIndexNative { + + static { + NativeLoader.loadJni(); + } + + private VectorIndexNative() {} + + static native long createWriter(String[] optionKeys, String[] optionValues); + + static native int writerDimension(long ptr); + + static native void train(long ptr, float[] data, int n); + + static native void addVectors(long ptr, long[] ids, float[] data, int n); + + static native void writeIndex(long ptr, Object streamOutput); + + static native void freeWriter(long ptr); + + static native long openReader(Object streamInput); + + static native VectorIndexMetadata metadata(long ptr); + + static native VectorSearchResult search(long ptr, float[] query, int k, int nprobe, int efSearch); + + static native VectorSearchResult searchWithRoaringFilter( + long ptr, float[] query, int k, int nprobe, int efSearch, byte[] roaringFilter); + + static native VectorSearchBatchResult searchBatch( + long ptr, float[] queries, int queryCount, int k, int nprobe, int efSearch); + + static native VectorSearchBatchResult searchBatchWithRoaringFilter( + long ptr, + float[] queries, + int queryCount, + int k, + int nprobe, + int efSearch, + byte[] roaringFilter); + + static native void freeReader(long ptr); +} diff --git a/paimon-vector/paimon-vector-jni/src/main/java/org/apache/paimon/index/vector/VectorIndexReader.java b/paimon-vector/paimon-vector-jni/src/main/java/org/apache/paimon/index/vector/VectorIndexReader.java new file mode 100644 index 000000000000..34eefc7dcada --- /dev/null +++ b/paimon-vector/paimon-vector-jni/src/main/java/org/apache/paimon/index/vector/VectorIndexReader.java @@ -0,0 +1,196 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.paimon.index.vector; + +public final class VectorIndexReader implements AutoCloseable { + + private final Object nativeHandleLock = new Object(); + private long nativePtr; + private Thread nativeHandleOwner; + private VectorIndexMetadata metadata; + + public VectorIndexReader(VectorIndexInput input) { + if (input == null) { + throw new NullPointerException("input"); + } + this.nativePtr = VectorIndexNative.openReader(input); + } + + private VectorIndexReader(long nativePtr) { + this.nativePtr = nativePtr; + } + + static VectorIndexReader fromNativePointerForTesting(long nativePtr) { + return new VectorIndexReader(nativePtr); + } + + public VectorIndexMetadata metadata() { + synchronized (nativeHandleLock) { + enterNativeHandle(); + try { + requireOpen(); + if (metadata == null) { + metadata = VectorIndexNative.metadata(nativePtr); + } + return metadata; + } finally { + exitNativeHandle(); + } + } + } + + public String indexType() { + return metadata().indexType(); + } + + public int dimension() { + return metadata().dimension(); + } + + public long totalVectors() { + return metadata().totalVectors(); + } + + public VectorSearchResult search(float[] query, int topK, int nprobe) { + return search(query, topK, nprobe, 0); + } + + public VectorSearchResult search(float[] query, int topK, int nprobe, int efSearch) { + validateQuery(query); + synchronized (nativeHandleLock) { + enterNativeHandle(); + try { + return VectorIndexNative.search(requireOpen(), query, topK, nprobe, efSearch); + } finally { + exitNativeHandle(); + } + } + } + + public VectorSearchResult search(float[] query, int topK, int nprobe, byte[] roaringFilter) { + return search(query, topK, nprobe, 0, roaringFilter); + } + + public VectorSearchResult search( + float[] query, int topK, int nprobe, int efSearch, byte[] roaringFilter) { + validateQuery(query); + if (roaringFilter == null) { + throw new NullPointerException("roaringFilter"); + } + synchronized (nativeHandleLock) { + enterNativeHandle(); + try { + return VectorIndexNative.searchWithRoaringFilter( + requireOpen(), query, topK, nprobe, efSearch, roaringFilter); + } finally { + exitNativeHandle(); + } + } + } + + public VectorSearchBatchResult searchBatch( + float[] queries, int queryCount, int topK, int nprobe) { + return searchBatch(queries, queryCount, topK, nprobe, 0); + } + + public VectorSearchBatchResult searchBatch( + float[] queries, int queryCount, int topK, int nprobe, int efSearch) { + if (queries == null) { + throw new NullPointerException("queries"); + } + synchronized (nativeHandleLock) { + enterNativeHandle(); + try { + return VectorIndexNative.searchBatch( + requireOpen(), queries, queryCount, topK, nprobe, efSearch); + } finally { + exitNativeHandle(); + } + } + } + + public VectorSearchBatchResult searchBatch( + float[] queries, int queryCount, int topK, int nprobe, byte[] roaringFilter) { + return searchBatch(queries, queryCount, topK, nprobe, 0, roaringFilter); + } + + public VectorSearchBatchResult searchBatch( + float[] queries, + int queryCount, + int topK, + int nprobe, + int efSearch, + byte[] roaringFilter) { + if (queries == null) { + throw new NullPointerException("queries"); + } + if (roaringFilter == null) { + throw new NullPointerException("roaringFilter"); + } + synchronized (nativeHandleLock) { + enterNativeHandle(); + try { + return VectorIndexNative.searchBatchWithRoaringFilter( + requireOpen(), queries, queryCount, topK, nprobe, efSearch, roaringFilter); + } finally { + exitNativeHandle(); + } + } + } + + @Override + public void close() { + synchronized (nativeHandleLock) { + enterNativeHandle(); + try { + long ptr = nativePtr; + nativePtr = 0L; + if (ptr != 0L) { + VectorIndexNative.freeReader(ptr); + } + } finally { + exitNativeHandle(); + } + } + } + + private void validateQuery(float[] query) { + if (query == null) { + throw new NullPointerException("query"); + } + } + + private long requireOpen() { + if (nativePtr == 0L) { + throw new IllegalStateException("VectorIndexReader is closed"); + } + return nativePtr; + } + + private void enterNativeHandle() { + Thread current = Thread.currentThread(); + if (nativeHandleOwner == current) { + throw new IllegalStateException("VectorIndexReader native handle is already in use"); + } + nativeHandleOwner = current; + } + + private void exitNativeHandle() { + nativeHandleOwner = null; + } +} diff --git a/paimon-vector/paimon-vector-jni/src/main/java/org/apache/paimon/index/vector/VectorIndexWriter.java b/paimon-vector/paimon-vector-jni/src/main/java/org/apache/paimon/index/vector/VectorIndexWriter.java new file mode 100644 index 000000000000..0dda2a3eba18 --- /dev/null +++ b/paimon-vector/paimon-vector-jni/src/main/java/org/apache/paimon/index/vector/VectorIndexWriter.java @@ -0,0 +1,131 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.paimon.index.vector; + +import java.util.Map; + +public final class VectorIndexWriter implements AutoCloseable { + + private final Object nativeHandleLock = new Object(); + private long nativePtr; + private Thread nativeHandleOwner; + + public VectorIndexWriter(Map options) { + String[] keys = new String[options.size()]; + String[] values = new String[options.size()]; + int index = 0; + for (Map.Entry entry : options.entrySet()) { + keys[index] = entry.getKey(); + values[index] = entry.getValue(); + index++; + } + this.nativePtr = VectorIndexNative.createWriter(keys, values); + } + + private VectorIndexWriter(long nativePtr) { + this.nativePtr = nativePtr; + } + + static VectorIndexWriter fromNativePointerForTesting(long nativePtr) { + return new VectorIndexWriter(nativePtr); + } + + public int dimension() { + return VectorIndexNative.writerDimension(requireOpen()); + } + + public void train(float[] data, int vectorCount) { + if (data == null) { + throw new NullPointerException("data"); + } + synchronized (nativeHandleLock) { + enterNativeHandle(); + try { + VectorIndexNative.train(requireOpen(), data, vectorCount); + } finally { + exitNativeHandle(); + } + } + } + + public void addVectors(long[] ids, float[] data, int vectorCount) { + if (ids == null) { + throw new NullPointerException("ids"); + } + if (data == null) { + throw new NullPointerException("data"); + } + synchronized (nativeHandleLock) { + enterNativeHandle(); + try { + VectorIndexNative.addVectors(requireOpen(), ids, data, vectorCount); + } finally { + exitNativeHandle(); + } + } + } + + public void writeIndex(Object output) { + if (output == null) { + throw new NullPointerException("output"); + } + synchronized (nativeHandleLock) { + enterNativeHandle(); + try { + VectorIndexNative.writeIndex(requireOpen(), output); + } finally { + exitNativeHandle(); + } + } + } + + @Override + public void close() { + synchronized (nativeHandleLock) { + enterNativeHandle(); + try { + long ptr = nativePtr; + nativePtr = 0L; + if (ptr != 0L) { + VectorIndexNative.freeWriter(ptr); + } + } finally { + exitNativeHandle(); + } + } + } + + private long requireOpen() { + if (nativePtr == 0L) { + throw new IllegalStateException("VectorIndexWriter is closed"); + } + return nativePtr; + } + + private void enterNativeHandle() { + Thread current = Thread.currentThread(); + if (nativeHandleOwner == current) { + throw new IllegalStateException("VectorIndexWriter native handle is already in use"); + } + nativeHandleOwner = current; + } + + private void exitNativeHandle() { + nativeHandleOwner = null; + } +} diff --git a/paimon-vector/paimon-vector-jni/src/main/java/org/apache/paimon/index/vector/VectorSearchBatchResult.java b/paimon-vector/paimon-vector-jni/src/main/java/org/apache/paimon/index/vector/VectorSearchBatchResult.java new file mode 100644 index 000000000000..12952e932e7f --- /dev/null +++ b/paimon-vector/paimon-vector-jni/src/main/java/org/apache/paimon/index/vector/VectorSearchBatchResult.java @@ -0,0 +1,100 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.paimon.index.vector; + +import java.util.Arrays; + +public final class VectorSearchBatchResult { + + private final long[] ids; + private final float[] distances; + private final int queryCount; + private final int topK; + + public VectorSearchBatchResult(long[] ids, float[] distances, int queryCount, int topK) { + if (ids == null) { + throw new NullPointerException("ids"); + } + if (distances == null) { + throw new NullPointerException("distances"); + } + if (queryCount < 0) { + throw new IllegalArgumentException("queryCount must be >= 0"); + } + if (topK < 0) { + throw new IllegalArgumentException("topK must be >= 0"); + } + int expectedLength = checkedResultLength(queryCount, topK); + if (ids.length != expectedLength) { + throw new IllegalArgumentException( + "ids length " + ids.length + " != queryCount * topK " + expectedLength); + } + if (distances.length != expectedLength) { + throw new IllegalArgumentException( + "distances length " + + distances.length + + " != queryCount * topK " + + expectedLength); + } + this.ids = ids.clone(); + this.distances = distances.clone(); + this.queryCount = queryCount; + this.topK = topK; + } + + public int queryCount() { + return queryCount; + } + + public int topK() { + return topK; + } + + public long[] ids() { + return ids.clone(); + } + + public float[] distances() { + return distances.clone(); + } + + public long[] idsForQuery(int queryIndex) { + checkQueryIndex(queryIndex); + return Arrays.copyOfRange(ids, queryIndex * topK, (queryIndex + 1) * topK); + } + + public float[] distancesForQuery(int queryIndex) { + checkQueryIndex(queryIndex); + return Arrays.copyOfRange(distances, queryIndex * topK, (queryIndex + 1) * topK); + } + + private void checkQueryIndex(int queryIndex) { + if (queryIndex < 0 || queryIndex >= queryCount) { + throw new IndexOutOfBoundsException( + "queryIndex " + queryIndex + " out of range [0, " + queryCount + ')'); + } + } + + private static int checkedResultLength(int queryCount, int topK) { + long length = (long) queryCount * (long) topK; + if (length > Integer.MAX_VALUE) { + throw new IllegalArgumentException("queryCount * topK overflows int"); + } + return (int) length; + } +} diff --git a/paimon-vector/paimon-vector-jni/src/main/java/org/apache/paimon/index/vector/VectorSearchResult.java b/paimon-vector/paimon-vector-jni/src/main/java/org/apache/paimon/index/vector/VectorSearchResult.java new file mode 100644 index 000000000000..870aa49ce177 --- /dev/null +++ b/paimon-vector/paimon-vector-jni/src/main/java/org/apache/paimon/index/vector/VectorSearchResult.java @@ -0,0 +1,62 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.paimon.index.vector; + +import java.util.Arrays; + +public final class VectorSearchResult { + + private final long[] ids; + private final float[] distances; + + public VectorSearchResult(long[] ids, float[] distances) { + if (ids == null) { + throw new NullPointerException("ids"); + } + if (distances == null) { + throw new NullPointerException("distances"); + } + if (ids.length != distances.length) { + throw new IllegalArgumentException( + "ids length " + ids.length + " != distances length " + distances.length); + } + this.ids = ids.clone(); + this.distances = distances.clone(); + } + + public int size() { + return ids.length; + } + + public long[] ids() { + return ids.clone(); + } + + public float[] distances() { + return distances.clone(); + } + + @Override + public String toString() { + return "VectorSearchResult{ids=" + + Arrays.toString(ids) + + ", distances=" + + Arrays.toString(distances) + + '}'; + } +} diff --git a/paimon-vector/pom.xml b/paimon-vector/pom.xml new file mode 100644 index 000000000000..5cbb01a35a4e --- /dev/null +++ b/paimon-vector/pom.xml @@ -0,0 +1,39 @@ + + + + 4.0.0 + + + paimon-parent + org.apache.paimon + 1.5-SNAPSHOT + + + paimon-vector + Paimon : Vector Index + pom + + + paimon-vector-jni + paimon-vector-index + + diff --git a/pom.xml b/pom.xml index d2c02ae4d60c..a1a6a5e054f4 100644 --- a/pom.xml +++ b/pom.xml @@ -76,6 +76,7 @@ under the License. paimon-vortex paimon-mosaic paimon-tantivy + paimon-vector