11from opensearchpy import NotFoundError , OpenSearch
22
33from benchmark .dataset import Dataset
4- from engine .base_client import IncompatibilityError
54from engine .base_client .configure import BaseConfigurator
65from engine .base_client .distances import Distance
76from engine .clients .opensearch .config import (
109 OPENSEARCH_PORT ,
1110 OPENSEARCH_USER ,
1211)
12+ from engine .clients .opensearch .utils import get_index_thread_qty
1313
1414
1515class OpenSearchConfigurator (BaseConfigurator ):
@@ -40,28 +40,36 @@ def __init__(self, host, collection_params: dict, connection_params: dict):
4040 )
4141
4242 def clean (self ):
43- try :
43+ is_index_available = self .client .indices .exists (index = OPENSEARCH_INDEX ,
44+ params = {
45+ "timeout" : 300 ,
46+ })
47+ if (is_index_available ):
48+ print (f"Deleting index: { OPENSEARCH_INDEX } , as it is already present" )
4449 self .client .indices .delete (
4550 index = OPENSEARCH_INDEX ,
4651 params = {
4752 "timeout" : 300 ,
4853 },
4954 )
50- except NotFoundError :
51- pass
55+
5256
5357 def recreate (self , dataset : Dataset , collection_params ):
54- if dataset .config .distance == Distance .DOT :
55- raise IncompatibilityError
56- if dataset .config .vector_size > 1024 :
57- raise IncompatibilityError
58+ self ._update_cluster_settings ()
59+ distance = self .DISTANCE_MAPPING [dataset .config .distance ]
60+ if dataset .config .distance == Distance .COSINE :
61+ distance = self .DISTANCE_MAPPING [Distance .DOT ]
62+ print (f"Using distance type: { distance } as dataset distance is : { dataset .config .distance } " )
5863
5964 self .client .indices .create (
6065 index = OPENSEARCH_INDEX ,
6166 body = {
6267 "settings" : {
6368 "index" : {
6469 "knn" : True ,
70+ "refresh_interval" : - 1 ,
71+ "number_of_replicas" : 0 ,
72+ "number_of_shards" : 1
6573 }
6674 },
6775 "mappings" : {
@@ -72,18 +80,13 @@ def recreate(self, dataset: Dataset, collection_params):
7280 "method" : {
7381 ** {
7482 "name" : "hnsw" ,
75- "engine" : "lucene" ,
76- "space_type" : self .DISTANCE_MAPPING [
77- dataset .config .distance
78- ],
79- "parameters" : {
80- "m" : 16 ,
81- "ef_construction" : 100 ,
82- },
83+ "engine" : "faiss" ,
84+ "space_type" : distance ,
85+ ** collection_params .get ("method" )
8386 },
84- ** collection_params .get ("method" ),
8587 },
8688 },
89+ # this doesn't work for nmslib, we need see what to do here, may be remove them
8790 ** self ._prepare_fields_config (dataset ),
8891 }
8992 },
@@ -94,6 +97,16 @@ def recreate(self, dataset: Dataset, collection_params):
9497 cluster_manager_timeout = "5m" ,
9598 )
9699
100+ def _update_cluster_settings (self ):
101+ index_thread_qty = get_index_thread_qty (self .client )
102+ cluster_settings_body = {
103+ "persistent" : {
104+ "knn.memory.circuit_breaker.limit" : "75%" , # putting a higher value to ensure that even with small cluster the latencies for vector search are good
105+ "knn.algo_param.index_thread_qty" : index_thread_qty
106+ }
107+ }
108+ self .client .cluster .put_settings (cluster_settings_body )
109+
97110 def _prepare_fields_config (self , dataset : Dataset ):
98111 return {
99112 field_name : {
@@ -104,3 +117,9 @@ def _prepare_fields_config(self, dataset: Dataset):
104117 }
105118 for field_name , field_type in dataset .config .schema .items ()
106119 }
120+
121+ def execution_params (self , distance , vector_size ) -> dict :
122+ # normalize the vectors if cosine similarity is there.
123+ if distance == Distance .COSINE :
124+ return {"normalize" : "true" }
125+ return {}
0 commit comments