-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathregistry.rs
More file actions
382 lines (344 loc) · 14.5 KB
/
registry.rs
File metadata and controls
382 lines (344 loc) · 14.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
// registry.rs — USearchRegistry, USearchTableConfig, USearchIndexConfig.
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use arrow_schema::{DataType, Field, Schema, SchemaRef};
use datafusion::common::Result;
use datafusion::error::DataFusionError;
use usearch::{Index, IndexOptions, MetricKind, ScalarKind};
use datafusion::catalog::TableProvider;
use crate::lookup::PointLookupProvider;
// ── USearchIndexConfig ────────────────────────────────────────────────────────
/// Parameters used to build and reload a USearch HNSW index.
///
/// This struct is the single source of truth for index construction options.
/// Persist it alongside your index file so that `load_index` can reconstruct
/// the index with exactly the same parameters — USearch does not embed metadata
/// inside the saved file.
///
/// # Example
///
/// ```rust,ignore
/// use datafusion_vector_search_ext::USearchIndexConfig;
/// use usearch::MetricKind;
///
/// let cfg = USearchIndexConfig::new(768, MetricKind::L2sq);
///
/// // Build and populate the index.
/// let index = cfg.build_index()?;
/// index.reserve(240_000)?;
/// for (key, vec) in rows { index.add(key, vec)?; }
/// index.save("my_table.index")?;
///
/// // Reload from disk — same config guarantees consistent options.
/// let index = cfg.load_index("my_table.index")?;
/// ```
#[derive(Debug, Clone)]
pub struct USearchIndexConfig {
/// Number of dimensions in each vector. Must match the data exactly.
pub dimensions: usize,
/// Distance metric. Must match the SQL UDF used at query time:
/// - `MetricKind::L2sq` → `l2_distance(col, ARRAY[...])`
/// - `MetricKind::Cos` → `cosine_distance(col, ARRAY[...])`
/// - `MetricKind::IP` → `negative_dot_product(col, ARRAY[...])`
pub metric: MetricKind,
/// Graph degree M: number of bidirectional edges per node per layer.
/// Higher M → better recall and faster search, but more memory and slower
/// index build. Default: 16 (canonical HNSW default).
pub connectivity: usize,
/// ef_construction: beam width used when inserting nodes into the graph.
/// Higher values → better graph quality → higher recall at query time,
/// at the cost of slower index build. Must be ≥ 2 × connectivity.
/// Default: 128.
pub expansion_add: usize,
/// Scalar quantization format. `F32` stores full-precision vectors
/// (safest, most accurate). `F16` halves memory with typically < 1%
/// recall loss at high dimensions. Default: `ScalarKind::F32`.
pub quantization: ScalarKind,
}
impl USearchIndexConfig {
/// Create a config for the given dimensions and metric, with all other
/// parameters set to their defaults (M=16, ef_construction=128, F32).
pub fn new(dimensions: usize, metric: MetricKind) -> Self {
Self {
dimensions,
metric,
..Self::default()
}
}
/// Build a new, empty USearch index with these parameters.
///
/// Call `index.reserve(n)` before adding vectors to pre-allocate graph
/// nodes. Adding vectors without reserving causes incremental
/// reallocation and is significantly slower for large datasets.
pub fn build_index(&self) -> Result<Index> {
Index::new(&self.to_index_options())
.map_err(|e| DataFusionError::Execution(format!("USearch Index::new failed: {e}")))
}
/// Load a previously saved index from `path` into memory.
///
/// Uses the same `IndexOptions` as `build_index()`. The options must
/// match those used when the index was originally built — passing wrong
/// dimensions or metric produces silently incorrect results.
pub fn load_index(&self, path: &str) -> Result<Index> {
let index = Index::new(&self.to_index_options())
.map_err(|e| DataFusionError::Execution(format!("USearch Index::new failed: {e}")))?;
index
.load(path)
.map_err(|e| DataFusionError::Execution(format!("USearch index load failed: {e}")))?;
Ok(index)
}
/// Memory-map a previously saved index from `path`.
///
/// Unlike [`load_index`], this does not copy the index into RAM. The OS
/// pages data in on demand, keeping resident memory proportional to the
/// working set rather than the full index size. Prefer this for the
/// reload-from-disk path where the index file is already local.
///
/// The returned [`Index`] is fully functional for search; the backing
/// file must remain on disk for the lifetime of the index.
///
/// [`load_index`]: Self::load_index
pub fn view_index(&self, path: &str) -> Result<Index> {
let index = Index::new(&self.to_index_options())
.map_err(|e| DataFusionError::Execution(format!("USearch Index::new failed: {e}")))?;
index
.view(path)
.map_err(|e| DataFusionError::Execution(format!("USearch index view failed: {e}")))?;
Ok(index)
}
fn to_index_options(&self) -> IndexOptions {
IndexOptions {
dimensions: self.dimensions,
metric: self.metric,
quantization: self.quantization,
connectivity: self.connectivity,
expansion_add: self.expansion_add,
// expansion_search (ef_search) is set at registration time via
// USearchTableConfig, not at index build time.
expansion_search: 0,
..Default::default()
}
}
}
impl Default for USearchIndexConfig {
fn default() -> Self {
Self {
dimensions: 0,
metric: MetricKind::L2sq,
connectivity: 16,
expansion_add: 128,
quantization: ScalarKind::F32,
}
}
}
// ── USearchTableConfig ────────────────────────────────────────────────────────
/// Per-table configuration for query execution behaviour.
///
/// Pass to [`USearchRegistry::add_with_config`] to override the defaults.
/// Use [`USearchRegistry::add`] for the common case — it applies
/// [`USearchTableConfig::default()`] automatically.
///
/// `expansion_search` (ef_search) is applied to the index exactly once,
/// inside [`USearchRegistry::add_with_config`], before any query touches it.
/// **Never call `index.change_expansion_search()` after registration** —
/// the index is shared via `Arc` and mutation after that point is a data race.
#[derive(Debug, Clone)]
pub struct USearchTableConfig {
/// HNSW ef_search: beam width during graph traversal at query time.
/// Higher → better recall, slower queries.
///
/// Queries with `LIMIT k > expansion_search` are handled automatically:
/// USearch uses `max(expansion_search, k)` per call, so they are correct
/// but cost proportionally more.
///
/// Default: 64. Recommended range: 32–200.
pub expansion_search: usize,
/// Selectivity fraction below which the planner bypasses the HNSW index
/// and uses the Parquet-native path: a full scan of the `scan_provider`
/// (all columns including vector), evaluating filters per batch,
/// computing distances inline, and maintaining a top-k heap. This path
/// avoids USearch and the `lookup_provider` entirely.
///
/// At low selectivity, `filtered_search` must explore ~`k/selectivity`
/// graph nodes before finding k passing candidates — eventually slower
/// than a linear scan over the small valid subset.
///
/// The theoretical crossover is `sqrt(k × M / n)` ≈ 2.6% for a 240k-row
/// dataset with k=10 and M=16. The default 0.05 (5%) gives a safety
/// margin that works across a wide range of n, k, and M.
pub brute_force_selectivity_threshold: f64,
}
impl Default for USearchTableConfig {
fn default() -> Self {
Self {
expansion_search: 64,
brute_force_selectivity_threshold: 0.05,
}
}
}
// ── RegisteredTable ───────────────────────────────────────────────────────────
pub struct RegisteredTable {
pub index: Arc<Index>,
/// Scan provider for WHERE evaluation and low-selectivity Parquet-native path.
pub scan_provider: Arc<dyn TableProvider>,
/// Lookup provider for efficient key-based row fetch (e.g. SQLite).
pub lookup_provider: Arc<dyn PointLookupProvider>,
pub key_col: String,
pub metric: MetricKind,
/// Native scalar type of the vector column. Determines which typed search
/// method (`search::<f32>` vs `search::<f64>`) the planner dispatches to.
pub scalar_kind: ScalarKind,
pub schema: SchemaRef,
pub config: USearchTableConfig,
}
// ── USearchRegistry ───────────────────────────────────────────────────────────
pub struct USearchRegistry {
tables: RwLock<HashMap<String, Arc<RegisteredTable>>>,
}
impl std::fmt::Debug for USearchRegistry {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let tables = self.tables.read().unwrap();
write!(f, "USearchRegistry({} tables)", tables.len())
}
}
impl USearchRegistry {
pub fn new() -> Self {
Self {
tables: RwLock::new(HashMap::new()),
}
}
/// Register a USearch index with default query configuration.
///
/// Convenience wrapper around [`add_with_config`] that uses
/// [`USearchTableConfig::default()`] (ef_search=64, threshold=5%).
///
/// - `index` — must already be loaded / populated.
/// - `scan_provider` — [`TableProvider`] used for WHERE evaluation and
/// low-selectivity Parquet-native scanning.
/// - `lookup_provider` — [`PointLookupProvider`] for O(k) key-based fetch.
/// [`HashKeyProvider`] is the bundled in-memory implementation.
/// - `key_col` — column in `lookup_provider.schema()` that stores the
/// USearch key (`u64`). Supported Arrow types: `UInt64`, `Int64`,
/// `UInt32`, `Int32`.
/// - `metric` — must match how the index was built. The optimizer rule
/// validates this and refuses to rewrite on mismatch.
/// - `scalar_kind` — native element type of the vector column (`F32` or
/// `F64`). Controls which typed search method the planner dispatches to.
///
/// [`add_with_config`]: USearchRegistry::add_with_config
/// [`HashKeyProvider`]: crate::lookup::HashKeyProvider
#[allow(clippy::too_many_arguments)]
pub fn add(
&self,
name: &str,
index: Arc<Index>,
scan_provider: Arc<dyn TableProvider>,
lookup_provider: Arc<dyn PointLookupProvider>,
key_col: &str,
metric: MetricKind,
scalar_kind: ScalarKind,
) -> Result<()> {
self.add_with_config(
name,
index,
scan_provider,
lookup_provider,
key_col,
metric,
scalar_kind,
USearchTableConfig::default(),
)
}
/// Register a USearch index with explicit query configuration.
///
/// Sets `ef_search` on the index exactly once before storing it.
/// Do not call `index.change_expansion_search()` after this point.
#[allow(clippy::too_many_arguments)]
pub fn add_with_config(
&self,
name: &str,
index: Arc<Index>,
scan_provider: Arc<dyn TableProvider>,
lookup_provider: Arc<dyn PointLookupProvider>,
key_col: &str,
metric: MetricKind,
scalar_kind: ScalarKind,
config: USearchTableConfig,
) -> Result<()> {
// Set ef_search once, here, before any query touches the index.
index.change_expansion_search(config.expansion_search);
let data_schema = lookup_provider.schema();
let _ = data_schema.index_of(key_col).map_err(|_| {
DataFusionError::Execution(format!(
"USearchRegistry: key column '{key_col}' not found in lookup provider schema for table '{name}'"
))
})?;
let _ = scan_provider.schema().index_of(key_col).map_err(|_| {
DataFusionError::Execution(format!(
"USearchRegistry: key column '{key_col}' not found in scan provider schema for table '{name}'"
))
})?;
let mut fields: Vec<Field> = data_schema
.fields()
.iter()
.map(|f| f.as_ref().clone())
.collect();
fields.push(Field::new("_distance", DataType::Float32, true));
let schema = Arc::new(Schema::new(fields));
self.tables
.write()
.expect("USearchRegistry lock poisoned")
.insert(
name.to_string(),
Arc::new(RegisteredTable {
index,
scan_provider,
lookup_provider,
key_col: key_col.to_string(),
metric,
scalar_kind,
schema,
config,
}),
);
Ok(())
}
/// Insert a pre-built [`RegisteredTable`] entry directly.
///
/// Used by `VectorIndexManager` to register entries after the
/// `SessionContext` is already constructed (on-demand cache path).
pub fn insert(&self, name: &str, entry: RegisteredTable) {
self.tables
.write()
.expect("USearchRegistry lock poisoned")
.insert(name.to_string(), Arc::new(entry));
}
/// Look up a registered table by its key.
///
/// Returns an `Arc` so callers don't hold the lock during query execution.
pub fn get(&self, name: &str) -> Option<Arc<RegisteredTable>> {
self.tables
.read()
.expect("USearchRegistry lock poisoned")
.get(name)
.cloned()
}
/// Remove all registry entries whose name starts with `prefix`.
///
/// Use `"conn::schema::table::"` to evict all columns for one table,
/// or `"conn::"` to evict all tables for a connection.
pub fn remove_by_prefix(&self, prefix: &str) {
self.tables
.write()
.expect("USearchRegistry lock poisoned")
.retain(|k, _| !k.starts_with(prefix));
}
pub fn into_arc(self) -> Arc<Self> {
Arc::new(self)
}
}
impl Default for USearchRegistry {
fn default() -> Self {
Self::new()
}
}