Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ turbopuffer = [ "turbopuffer" ]
zvec = [ "zvec" ]
endee = [ "endee==0.1.10" ]
lindorm = [ "opensearch-py" ]
pinot = [ "requests" ]

[project.urls]
Repository = "https://github.com/zilliztech/VectorDBBench"
Expand Down
20 changes: 20 additions & 0 deletions vectordb_bench/backend/clients/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ class DB(Enum):
Lindorm = "Lindorm"
VectorChord = "VectorChord"
PolarDB = "PolarDB"
Pinot = "Pinot"

@property
def init_cls(self) -> type[VectorDB]: # noqa: PLR0911, PLR0912, C901, PLR0915
Expand Down Expand Up @@ -257,6 +258,11 @@ def init_cls(self) -> type[VectorDB]: # noqa: PLR0911, PLR0912, C901, PLR0915

return PolarDB

if self == DB.Pinot:
from .pinot.pinot import Pinot

return Pinot

msg = f"Unknown DB: {self.name}"
raise ValueError(msg)

Expand Down Expand Up @@ -455,6 +461,11 @@ def config_cls(self) -> type[DBConfig]: # noqa: PLR0911, PLR0912, C901, PLR0915

return PolarDBConfig

if self == DB.Pinot:
from .pinot.config import PinotConfig

return PinotConfig

msg = f"Unknown DB: {self.name}"
raise ValueError(msg)

Expand Down Expand Up @@ -631,6 +642,15 @@ def case_config_cls( # noqa: C901, PLR0911, PLR0912, PLR0915

return _vectorchord_case_config.get(index_type)

if self == DB.Pinot:
from .pinot.config import PinotHNSWConfig, PinotIVFFlatConfig, PinotIVFPQConfig

return {
IndexType.HNSW: PinotHNSWConfig,
IndexType.IVFFlat: PinotIVFFlatConfig,
IndexType.IVFPQ: PinotIVFPQConfig,
}.get(index_type, PinotHNSWConfig)

# DB.Pinecone, DB.Redis
return EmptyDBCaseConfig

Expand Down
Empty file.
202 changes: 202 additions & 0 deletions vectordb_bench/backend/clients/pinot/cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
from typing import Annotated, TypedDict, Unpack

import click
from pydantic import SecretStr

from ....cli.cli import (
CommonTypedDict,
HNSWFlavor2,
click_parameter_decorators_from_typed_dict,
run,
)
from .. import DB


class PinotTypedDict(TypedDict):
controller_host: Annotated[
str,
click.option("--controller-host", type=str, default="localhost", help="Pinot Controller host"),
]
controller_port: Annotated[
int,
click.option("--controller-port", type=int, default=9000, help="Pinot Controller port"),
]
broker_host: Annotated[
str,
click.option("--broker-host", type=str, default="localhost", help="Pinot Broker host"),
]
broker_port: Annotated[
int,
click.option("--broker-port", type=int, default=8099, help="Pinot Broker port"),
]
username: Annotated[
str,
click.option("--username", type=str, default=None, help="Pinot username (optional)"),
]
password: Annotated[
str,
click.option("--password", type=str, default=None, help="Pinot password (optional)"),
]
ingest_batch_size: Annotated[
int,
click.option(
"--ingest-batch-size",
type=int,
default=100_000,
show_default=True,
help=(
"Rows buffered before flushing one Pinot segment (one ingestFromFile call). "
"Larger values mean fewer segments and better IVF training / query performance. "
"Reduce if memory is constrained (100K x 768-dim float32 ~= 300 MB)."
),
),
]


def _pinot_db_config(parameters: dict):
from .config import PinotConfig

return PinotConfig(
db_label=parameters["db_label"],
controller_host=parameters["controller_host"],
controller_port=parameters["controller_port"],
broker_host=parameters["broker_host"],
broker_port=parameters["broker_port"],
username=parameters.get("username"),
password=SecretStr(parameters["password"]) if parameters.get("password") else None,
ingest_batch_size=parameters["ingest_batch_size"],
)


@click.group()
def Pinot():
"""Apache Pinot vector search benchmarks."""


# ---------------------------------------------------------------------------
# HNSW
# ---------------------------------------------------------------------------


class PinotHNSWTypedDict(CommonTypedDict, PinotTypedDict, HNSWFlavor2): ...


@Pinot.command("hnsw")
@click_parameter_decorators_from_typed_dict(PinotHNSWTypedDict)
def pinot_hnsw(**parameters: Unpack[PinotHNSWTypedDict]):
from .config import PinotHNSWConfig

run(
db=DB.Pinot,
db_config=_pinot_db_config(parameters),
db_case_config=PinotHNSWConfig(
m=parameters["m"],
ef_construction=parameters["ef_construction"],
ef=parameters["ef_runtime"],
),
**parameters,
)


# ---------------------------------------------------------------------------
# IVF_FLAT
# ---------------------------------------------------------------------------


class PinotIVFFlatTypedDict(CommonTypedDict, PinotTypedDict):
nlist: Annotated[
int,
click.option("--nlist", type=int, default=128, help="Number of Voronoi cells (IVF nlist)"),
]
quantizer: Annotated[
str,
click.option(
"--quantizer",
type=click.Choice(["FLAT", "SQ8", "SQ4"]),
default="FLAT",
help="Quantizer type for IVF_FLAT",
),
]
nprobe: Annotated[
int,
click.option("--nprobe", type=int, default=8, help="Number of cells to probe at query time"),
]
train_sample_size: Annotated[
int,
click.option(
"--train-sample-size",
type=int,
default=None,
help="Training sample size (defaults to max(nlist*50, 1000) if not set)",
),
]


@Pinot.command("ivf-flat")
@click_parameter_decorators_from_typed_dict(PinotIVFFlatTypedDict)
def pinot_ivf_flat(**parameters: Unpack[PinotIVFFlatTypedDict]):
from .config import PinotIVFFlatConfig

run(
db=DB.Pinot,
db_config=_pinot_db_config(parameters),
db_case_config=PinotIVFFlatConfig(
nlist=parameters["nlist"],
quantizer=parameters["quantizer"],
nprobe=parameters["nprobe"],
train_sample_size=parameters.get("train_sample_size"),
),
**parameters,
)


# ---------------------------------------------------------------------------
# IVF_PQ
# ---------------------------------------------------------------------------


class PinotIVFPQTypedDict(CommonTypedDict, PinotTypedDict):
nlist: Annotated[
int,
click.option("--nlist", type=int, default=128, help="Number of Voronoi cells (IVF nlist)"),
]
pq_m: Annotated[
int,
click.option("--pq-m", type=int, default=8, help="Number of PQ sub-quantizers (must divide dimension)"),
]
pq_nbits: Annotated[
int,
click.option(
"--pq-nbits",
type=click.Choice(["4", "6", "8"]),
default="8",
help="Bits per PQ code (4, 6, or 8)",
),
]
train_sample_size: Annotated[
int,
click.option("--train-sample-size", type=int, default=6400, help="Training sample size (must be >= nlist)"),
]
nprobe: Annotated[
int,
click.option("--nprobe", type=int, default=8, help="Number of cells to probe at query time"),
]


@Pinot.command("ivf-pq")
@click_parameter_decorators_from_typed_dict(PinotIVFPQTypedDict)
def pinot_ivf_pq(**parameters: Unpack[PinotIVFPQTypedDict]):
from .config import PinotIVFPQConfig

run(
db=DB.Pinot,
db_config=_pinot_db_config(parameters),
db_case_config=PinotIVFPQConfig(
nlist=parameters["nlist"],
pq_m=parameters["pq_m"],
pq_nbits=int(parameters["pq_nbits"]),
train_sample_size=parameters["train_sample_size"],
nprobe=parameters["nprobe"],
),
**parameters,
)
94 changes: 94 additions & 0 deletions vectordb_bench/backend/clients/pinot/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
from pydantic import BaseModel, SecretStr

from ..api import DBCaseConfig, DBConfig, MetricType


class PinotConfig(DBConfig):
controller_host: str = "localhost"
controller_port: int = 9000
broker_host: str = "localhost"
broker_port: int = 8099
username: str | None = None
password: SecretStr | None = None
# Rows buffered before flushing one Pinot segment (one ingestFromFile call).
# Larger values → fewer segments → better IVF training & query perf.
# 100_000 rows x 768-dim float32 ~= 300 MB in-memory.
ingest_batch_size: int = 100_000

def to_dict(self) -> dict:
return {
"controller_host": self.controller_host,
"controller_port": self.controller_port,
"broker_host": self.broker_host,
"broker_port": self.broker_port,
"username": self.username,
"password": self.password.get_secret_value() if self.password else None,
"ingest_batch_size": self.ingest_batch_size,
}


class PinotHNSWConfig(BaseModel, DBCaseConfig):
"""HNSW vector index config for Apache Pinot (Lucene-based)."""

metric_type: MetricType | None = None
m: int = 16 # maxCon: max connections per node
ef_construction: int = 100 # beamWidth: construction beam width
ef: int | None = None # ef_search: HNSW candidate list size at query time (default=k)

def index_param(self) -> dict:
return {
"vectorIndexType": "HNSW",
"maxCon": str(self.m),
"beamWidth": str(self.ef_construction),
}

def search_param(self) -> dict:
# ef controls the HNSW candidate list during search via vectorSimilarity(col, q, ef).
# Larger ef → better recall, slightly higher latency. Defaults to k if not set.
return {"ef": self.ef} if self.ef is not None else {}


class PinotIVFFlatConfig(BaseModel, DBCaseConfig):
"""IVF_FLAT vector index config for Apache Pinot."""

metric_type: MetricType | None = None
nlist: int = 128 # number of Voronoi cells (centroids)
quantizer: str = "FLAT" # FLAT, SQ8, or SQ4
train_sample_size: int | None = None # defaults to max(nlist*50, 1000) if None
nprobe: int = 8 # number of cells to probe at query time

def index_param(self) -> dict:
params: dict = {
"vectorIndexType": "IVF_FLAT",
"nlist": str(self.nlist),
"quantizer": self.quantizer,
}
if self.train_sample_size is not None:
params["trainSampleSize"] = str(self.train_sample_size)
return params

def search_param(self) -> dict:
return {"nprobe": self.nprobe}


class PinotIVFPQConfig(BaseModel, DBCaseConfig):
"""IVF_PQ vector index config for Apache Pinot (residual product quantization)."""

metric_type: MetricType | None = None
nlist: int = 128 # number of Voronoi cells (centroids)
pq_m: int = 8 # number of sub-quantizers (must divide vectorDimension)
pq_nbits: int = 8 # bits per sub-quantizer code: 4, 6, or 8
train_sample_size: int = 6400 # training sample size (must be >= nlist)
nprobe: int = 8 # number of cells to probe at query time

def index_param(self) -> dict:
return {
"vectorIndexType": "IVF_PQ",
"nlist": str(self.nlist),
"pqM": str(self.pq_m),
"pqNbits": str(self.pq_nbits),
"trainSampleSize": str(self.train_sample_size),
}

def search_param(self) -> dict:
return {"nprobe": self.nprobe}
Loading
Loading