diff --git a/diskann-disk/src/build/builder/build.rs b/diskann-disk/src/build/builder/build.rs index dd69640bb..7921df186 100644 --- a/diskann-disk/src/build/builder/build.rs +++ b/diskann-disk/src/build/builder/build.rs @@ -31,7 +31,6 @@ use diskann_providers::{ use diskann_utils::io::{read_bin, write_bin}; use diskann_utils::views::MatrixView; use tokio::task::JoinSet; -use tracing::{debug, info}; use crate::{ build::{ @@ -61,6 +60,8 @@ use crate::{ }, DiskIndexBuildParameters, QuantizationType, }; +use diskann::tracked_debug; +use diskann::tracked_info; /// Disk index builder that composes with DiskIndexBuilderCore. pub struct DiskIndexBuilder<'a, Data, StorageProvider> @@ -175,7 +176,7 @@ where storage_provider: &StorageProvider, checkpoint_record_manager: &mut dyn CheckpointManager, ) -> ANNResult { - info!( + tracked_info!( "Training quantizer for {} quantized builds.", build_quantization_type.to_string() ); @@ -193,7 +194,7 @@ where ) }, || { - info!( + tracked_info!( "Skipping quantizer training, instead loading from already trained quantizer saved in the file system.", ); BuildQuantizer::load( @@ -210,7 +211,7 @@ where runtime.block_on(async { match self.build_internal().await { Err(err) if err.kind() == ANNErrorKind::BuildInterrupted => { - info!( + tracked_info!( "Index build was interrupted by continuation_checker, progress saved for resumption" ); Ok(()) // Return success for controlled interruptions @@ -225,7 +226,7 @@ where let pool = create_thread_pool(self.index_configuration.num_threads)?; - info!( + tracked_info!( "Starting index build: R={} L={} Indexing RAM budget={} T={}", self.index_configuration.config.pruned_degree(), self.index_configuration.config.l_build(), @@ -252,7 +253,7 @@ where let storage_provider = self.core.storage_provider; - info!( + tracked_info!( "Compressing data into {} bytes per vector for disk search", num_chunks.get() ); @@ -266,7 +267,7 @@ where let offset = match checkpoint_context.get_resumption_point()? { Some(offset) => offset, None => { - info!("Skip the DataCompression"); + tracked_info!("Skip the DataCompression"); return Ok(()); } }; @@ -362,9 +363,10 @@ where let offset = match checkpoint_context.get_resumption_point()? { Some(offset) => offset, None => { - info!( + tracked_info!( "[Stage:{:?}] Skip build_shard_index for shard {} - no valid checkpoint exists", - stage, shard_id + stage, + shard_id ); return Ok(()); } @@ -383,11 +385,12 @@ where &shard_ids_file, &shard_base_file, )?; - info!("[Stage:{:?}] Generate data for shard {}", stage, shard_id); + tracked_info!("[Stage:{:?}] Generate data for shard {}", stage, shard_id); } else { - info!( + tracked_info!( "[Stage:{:?}] Resume shard {} build with existing data", - stage, shard_id + stage, + shard_id ); } @@ -440,7 +443,7 @@ where let offset = match checkpoint_context.get_resumption_point()? { Some(offset) => offset, None => { - info!( + tracked_info!( "[Stage:{:?}] Skip in-memory index build - no valid checkpoint exists", stage ); @@ -558,18 +561,18 @@ where #[cfg(debug_assertions)] /// Log statistics about the build process async fn log_build_stats(index: &Arc>) -> ANNResult<()> { - debug!( + tracked_debug!( "Number of points reachable in the graph: {}", index.count_reachable_nodes().await? ); let (full_vector, quant_vector) = index.counts_for_get_vector(); let capacity = index.capacity(); - debug!( + tracked_debug!( "Number of get vector calls per insert: {}", full_vector as f32 / capacity as f32 ); - debug!( + tracked_debug!( "Number of get quantized vector calls per insert: {}", quant_vector as f32 / capacity as f32 ); @@ -593,7 +596,7 @@ where index.set_start_point(medoid.as_slice())?; - debug!("Set start point to medoid ID: {}", medoid_id); + tracked_debug!("Set start point to medoid ID: {}", medoid_id); Ok(medoid_id) } @@ -627,7 +630,7 @@ where match progress { Progress::Processed(num_points) => { - info!( + tracked_info!( "Linked #{} points. Start #{}, end #{} ", num_points, offset, @@ -635,7 +638,7 @@ where ); } Progress::Completed => { - info!("Linked all points. Num points: #{}", total_points); + tracked_info!("Linked all points. Num points: #{}", total_points); } } @@ -655,7 +658,7 @@ where T: VectorRepr, Iter: Iterator, ()))> + Send + 'static, { - debug!("Processing chunk from #{} to #{}", start, end); + tracked_debug!("Processing chunk from #{} to #{}", start, end); let partitions = async_tools::PartitionIter::new(end - start, num_tasks); @@ -690,7 +693,7 @@ where res.map_err(|_| ANNError::log_index_error("A spawned insert task failed"))??; } - debug!("Completed chunk #{} to #{}", start, end); + tracked_debug!("Completed chunk #{} to #{}", start, end); Ok(()) } @@ -853,7 +856,7 @@ where for file in files.iter() { if self.storage_provider.exists(file) { - debug!("Deleting temporary file: {}", file); + tracked_debug!("Deleting temporary file: {}", file); self.storage_provider.delete(file)?; } } @@ -885,7 +888,7 @@ impl StartPoint { ANNError::log_invalid_file_format(format!("Start point ID file {} is empty", path)) })?; - debug!("Loaded start point ID {} from {}", *start_point_id, path); + tracked_debug!("Loaded start point ID {} from {}", *start_point_id, path); Ok(Self(*start_point_id)) } @@ -897,7 +900,7 @@ impl StartPoint { MatrixView::row_vector(std::slice::from_ref(&self.0)), &mut storage_provider.create_for_write(path)?, )?; - debug!("Saved start point ID {} to {}", self.0, path); + tracked_debug!("Saved start point ID {} to {}", self.0, path); Ok(()) } diff --git a/diskann-disk/src/build/builder/core.rs b/diskann-disk/src/build/builder/core.rs index efb9bf697..2cf799e1a 100644 --- a/diskann-disk/src/build/builder/core.rs +++ b/diskann-disk/src/build/builder/core.rs @@ -17,7 +17,6 @@ use diskann_providers::{ }; use diskann_utils::io::read_bin; use rand::{seq::SliceRandom, Rng}; -use tracing::info; use crate::{ build::chunking::{ @@ -31,6 +30,7 @@ use crate::{ utils::partition_with_ram_budget, DiskIndexBuildParameters, QuantizationType, }; +use diskann::tracked_info; /// Overhead factor for RAM estimation during index build (10% buffer). const OVERHEAD_FACTOR: f64 = 1.1f64; @@ -146,7 +146,7 @@ where let storage_provider = self.storage_provider; let shard_ids = read_bin::(&mut storage_provider.open_reader(shard_ids_file)?)?; let shard_size = shard_ids.nrows(); - info!("Loaded {} shard ids from {}", shard_size, shard_ids_file); + tracked_info!("Loaded {} shard ids from {}", shard_size, shard_ids_file); let max_id = shard_ids.as_slice().iter().max().copied().unwrap_or(0); let sampling_rate = shard_ids.as_slice().len() as f64 / (max_id + 1) as f64; @@ -177,9 +177,10 @@ where Ok(()) })?; - info!( + tracked_info!( "Written file: {} with {} points", - shard_base_file, num_written + shard_base_file, + num_written ); shard_base_cached_writer.flush()?; @@ -215,19 +216,19 @@ where // find max node id let num_nodes: u32 = *id_maps.iter().flatten().max().unwrap_or(&0) + 1; let num_elements: u32 = id_maps.iter().map(|idmap| idmap.len() as u32).sum(); - info!("# nodes: {}, max degree: {}", num_nodes, max_degree); + tracked_info!("# nodes: {}, max degree: {}", num_nodes, max_degree); // compute inverse map: node -> shards let mut node_shard: Vec<(u32, u32)> = Vec::with_capacity(num_elements as usize); for (shard, id_map) in id_maps.iter().enumerate() { - info!("Creating inverse map -- shard #{}", shard); + tracked_info!("Creating inverse map -- shard #{}", shard); node_shard.extend(id_map.iter().map(|node_id| (*node_id, shard as u32))); } node_shard.sort_unstable_by(|left, right| { left.0.cmp(&right.0).then_with(|| left.1.cmp(&right.1)) }); - info!("Finished computing node -> shards map"); + tracked_info!("Finished computing node -> shards map"); // create cached vamana readers let mut vamana_readers = Vec::new(); @@ -270,9 +271,10 @@ where // write max_degree to merged_vamana_index let output_width: u32 = max_degree; - info!( + tracked_info!( "Max input width: {}, output width: {}", - max_input_width, output_width + max_input_width, + output_width ); merged_vamana_cached_writer.write(&output_width.to_le_bytes())?; @@ -300,7 +302,7 @@ where // Hence the final index will also not have any frozen points. merged_vamana_cached_writer.write(&vamana_index_frozen.to_le_bytes())?; - info!("Starting merge"); + tracked_info!("Starting merge"); let mut nbr_set = vec![false; num_nodes as usize]; let mut final_nbrs: Vec = Vec::new(); @@ -334,9 +336,10 @@ where let num_nbrs = vamana_readers[shard_id as usize].read_u32()?; if num_nbrs == 0 { - info!( + tracked_info!( "WARNING: shard #{}, node_id {} has 0 nbrs", - shard_id, node_id + shard_id, + node_id ); } else { let mut nbrs_bytes = vec![0u8; num_nbrs as usize * mem::size_of::()]; @@ -373,11 +376,11 @@ where nbr_set.clear(); final_nbrs.clear(); - info!("Expected size: {}", merged_index_size); + tracked_info!("Expected size: {}", merged_index_size); merged_vamana_cached_writer.reset()?; merged_vamana_cached_writer.write(&merged_index_size.to_le_bytes())?; - info!("Finished merge"); + tracked_info!("Finished merge"); Ok(()) } @@ -445,21 +448,21 @@ pub(crate) fn determine_build_strategy( build_quantization_type, ); - info!( + tracked_info!( "Estimated index RAM usage: {} GB, index_build_ram_limit={} GB", estimated_index_ram_in_bytes / BYTES_IN_GB, index_build_ram_limit_in_bytes / BYTES_IN_GB ); if estimated_index_ram_in_bytes >= index_build_ram_limit_in_bytes { - info!( + tracked_info!( "Insufficient memory budget for index build in one shot, index_build_ram_limit={} GB estimated_index_ram={} GB", index_build_ram_limit_in_bytes / BYTES_IN_GB, estimated_index_ram_in_bytes / BYTES_IN_GB, ); IndexBuildStrategy::Merged } else { - info!( + tracked_info!( "Full index fits in RAM budget, should consume at most {} GBs, so building in one shot", estimated_index_ram_in_bytes / BYTES_IN_GB ); @@ -562,7 +565,7 @@ impl<'a> MergedVamanaIndexWorkflow<'a> { ) { p += 1; } - info!("Found {} existing partitions from previous run", p); + tracked_info!("Found {} existing partitions from previous run", p); Ok(p) }, ) diff --git a/diskann-disk/src/build/builder/quantizer.rs b/diskann-disk/src/build/builder/quantizer.rs index c3eac75fc..67b26cecf 100644 --- a/diskann-disk/src/build/builder/quantizer.rs +++ b/diskann-disk/src/build/builder/quantizer.rs @@ -17,9 +17,9 @@ use diskann_providers::{ }; use diskann_quantization::scalar::train::ScalarQuantizationParameters; use diskann_utils::views::MatrixView; -use tracing::info; use crate::QuantizationType; +use diskann::tracked_info; /// Quantizer types used specifically for async disk index building. #[derive(Clone)] @@ -111,7 +111,7 @@ impl BuildQuantizer { MatrixView::try_from(&train_data_vector, train_size, train_dim).bridge_err()?, ); - info!("Now quantizer is trained and saving to file"); + tracked_info!("Now quantizer is trained and saving to file"); let sq_storage = SQStorage::new(index_path_prefix); sq_storage.save_quantizer(&quantizer, storage_provider)?; diff --git a/diskann-disk/src/build/chunking/checkpoint/checkpoint_record.rs b/diskann-disk/src/build/chunking/checkpoint/checkpoint_record.rs index b2e7eab43..c0edd4fe0 100644 --- a/diskann-disk/src/build/chunking/checkpoint/checkpoint_record.rs +++ b/diskann-disk/src/build/chunking/checkpoint/checkpoint_record.rs @@ -5,9 +5,9 @@ use diskann::ANNResult; use serde::{Deserialize, Serialize}; -use tracing::info; use super::WorkStage; +use diskann::tracked_info; /// Represents a checkpoint record in the index build process. /// The checkpoint record can be marked as in-valid to indicate that the exising intermediate data should be discarded. @@ -45,15 +45,17 @@ impl CheckpointRecord { pub fn get_resumption_point(&self, stage: WorkStage) -> Option { if self.stage == stage { - info!( + tracked_info!( "The resumption point is at {} for stage {:?}", - self.progress, stage + self.progress, + stage ); Some(if self.is_valid { self.progress } else { 0 }) } else { - info!( + tracked_info!( "Failed to get resumption point for {:?} since the current stage is {:?}.", - stage, self.stage + stage, + self.stage ); None } @@ -63,9 +65,10 @@ impl CheckpointRecord { // This method is used in each individual step of the index build process // ..t o update the checkpoint record. pub fn advance_work_type(&self, next_stage: WorkStage) -> ANNResult { - info!( + tracked_info!( "Advancing work type from {:?} to {:?}.", - self.stage, next_stage + self.stage, + next_stage ); Ok(CheckpointRecord { stage: next_stage, @@ -85,7 +88,7 @@ impl CheckpointRecord { // Update the progress of the current work type. pub fn update_progress(&self, progress: usize) -> CheckpointRecord { - info!("Updating progress to {:?}={}", self.stage, progress); + tracked_info!("Updating progress to {:?}={}", self.stage, progress); CheckpointRecord { stage: self.stage, is_valid: true, diff --git a/diskann-disk/src/build/chunking/checkpoint/checkpoint_record_manager.rs b/diskann-disk/src/build/chunking/checkpoint/checkpoint_record_manager.rs index f0912a78e..afdccabe2 100644 --- a/diskann-disk/src/build/chunking/checkpoint/checkpoint_record_manager.rs +++ b/diskann-disk/src/build/chunking/checkpoint/checkpoint_record_manager.rs @@ -4,9 +4,9 @@ */ use diskann::ANNResult; -use tracing::info; use super::{Progress, WorkStage}; +use diskann::tracked_info; /// This trait provides functionalities to get and set checkpoint records /// ..for tracking the progress and state in a chunkable index build process. @@ -73,7 +73,7 @@ where Ok(result) } None => { - info!("[Stage:{:?}] Skip stage - invalid checkpoint", stage); + tracked_info!("[Stage:{:?}] Skip stage - invalid checkpoint", stage); skip_handler() } } diff --git a/diskann-disk/src/build/chunking/continuation/utils.rs b/diskann-disk/src/build/chunking/continuation/utils.rs index 0d92e416e..fb0fd191f 100644 --- a/diskann-disk/src/build/chunking/continuation/utils.rs +++ b/diskann-disk/src/build/chunking/continuation/utils.rs @@ -5,10 +5,9 @@ use std::{error::Error, thread::sleep}; -use tracing::info; - use super::continuation_tracker::{ContinuationGrant, ContinuationTrackerTrait}; use crate::build::chunking::checkpoint::Progress; +use diskann::tracked_info; /// This takes an operation with an iterator of oprands, /// and processes the oprands using the operation in a loop, @@ -30,19 +29,19 @@ where loop { match continuation_checker.get_continuation_grant() { ContinuationGrant::Continue => { - info!("Continue processing."); + tracked_info!("Continue processing."); action(param)?; break; } ContinuationGrant::Yield(duration) => { - info!( + tracked_info!( "Continuation checker asks to yield for {} ms.", duration.as_millis() ); sleep(duration); } ContinuationGrant::Stop => { - info!("Continuation checker asks to stop. Breaking the loop."); + tracked_info!("Continuation checker asks to stop. Breaking the loop."); return Ok(Progress::Processed(idx)); } } @@ -71,19 +70,19 @@ where loop { match continuation_checker.get_continuation_grant() { ContinuationGrant::Continue => { - info!("Continue processing."); + tracked_info!("Continue processing."); action(param).await?; break; } ContinuationGrant::Yield(duration) => { - info!( + tracked_info!( "Continuation checker asks to yield for {} ms.", duration.as_millis() ); sleep(duration); } ContinuationGrant::Stop => { - info!("Continuation checker asks to stop. Breaking the loop."); + tracked_info!("Continuation checker asks to stop. Breaking the loop."); return Ok(Progress::Processed(idx)); } } diff --git a/diskann-disk/src/search/provider/disk_provider.rs b/diskann-disk/src/search/provider/disk_provider.rs index 33938caea..4c25c3c3b 100644 --- a/diskann-disk/src/search/provider/disk_provider.rs +++ b/diskann-disk/src/search/provider/disk_provider.rs @@ -46,7 +46,6 @@ use crate::search::pq::{quantizer_preprocess, PQData, PQScratch}; use diskann_vector::{distance::Metric, DistanceFunction, PreprocessedDistanceFunction}; use futures_util::future; use tokio::runtime::Runtime; -use tracing::debug; use crate::{ data_model::{CachingStrategy, GraphHeader}, @@ -59,6 +58,7 @@ use crate::{ utils::AlignedFileReaderFactory, utils::QueryStatistics, }; +use diskann::tracked_debug; /////////////////// // Disk Provider // @@ -135,7 +135,7 @@ where where P: StorageReadProvider, { - debug!( + tracked_debug!( "DiskProvider::load_with() called with file: {:?}", get_disk_index_file(ctx.quant_load_context.metadata.prefix()) ); @@ -870,7 +870,7 @@ where ) .build()?; - debug!("Creating DiskIndexSearcher with index_config: {:?}", config); + tracked_debug!("Creating DiskIndexSearcher with index_config: {:?}", config); let graph_header = vertex_provider_factory.get_header()?; let pq_data = disk_index_reader.get_pq_data(); diff --git a/diskann-disk/src/search/provider/disk_vertex_provider_factory.rs b/diskann-disk/src/search/provider/disk_vertex_provider_factory.rs index e5567c373..d17f8d510 100644 --- a/diskann-disk/src/search/provider/disk_vertex_provider_factory.rs +++ b/diskann-disk/src/search/provider/disk_vertex_provider_factory.rs @@ -11,7 +11,6 @@ use diskann_quantization::{ num::PowerOfTwo, }; use hashbrown::HashSet; -use tracing::info; use crate::{ data_model::{Cache, CachingStrategy, GraphHeader}, @@ -27,6 +26,7 @@ use crate::{ AlignedRead, }, }; +use diskann::tracked_info; const DEFAULT_DISK_SECTOR_LEN: usize = 4096; const BEAM_WIDTH_FOR_BFS: usize = 32; @@ -142,9 +142,10 @@ impl, ReaderFactory: AlignedReaderFactor let graph_metadata = graph_metadata.metadata(); if num_nodes_to_cache > graph_metadata.num_pts as usize { - info!( + tracked_info!( "Reducing nodes to cache from: {} to: {} (total no. of nodes)", - num_nodes_to_cache, graph_metadata.num_pts + num_nodes_to_cache, + graph_metadata.num_pts ); num_nodes_to_cache = graph_metadata.num_pts as usize; } @@ -159,7 +160,7 @@ impl, ReaderFactory: AlignedReaderFactor CachingStrategy::None => {} } - info!("Cache setup took: {} ms", timer.elapsed().as_millis()); + tracked_info!("Cache setup took: {} ms", timer.elapsed().as_millis()); Ok(()) } @@ -169,7 +170,7 @@ impl, ReaderFactory: AlignedReaderFactor num_nodes_to_cache: usize, dimension: usize, ) -> ANNResult> { - info!("Building cache with {} nodes via BFS.", num_nodes_to_cache); + tracked_info!("Building cache with {} nodes via BFS.", num_nodes_to_cache); let mut cache = Cache::new(dimension, num_nodes_to_cache)?; let mut vertex_provider = self.create_disk_vertex_provider(BEAM_WIDTH_FOR_BFS, &self.get_header()?)?; diff --git a/diskann-disk/src/storage/cached_reader.rs b/diskann-disk/src/storage/cached_reader.rs index 3326d7607..07488896c 100644 --- a/diskann-disk/src/storage/cached_reader.rs +++ b/diskann-disk/src/storage/cached_reader.rs @@ -4,9 +4,9 @@ */ use std::io::{Read, Seek}; +use diskann::tracked_info; use diskann::{ANNError, ANNResult}; use diskann_providers::storage::StorageReadProvider; -use tracing::info; /// Sequential cached reads with a generic storage provider with read access. pub struct CachedReader @@ -38,16 +38,18 @@ where cache_size: u64, storage_provider: &Storage, ) -> std::io::Result { - info!("Opening: {}", filename); + tracked_info!("Opening: {}", filename); let mut reader = storage_provider.open_reader(filename)?; let size = storage_provider.get_length(filename)?; let cache_size = cache_size.min(size); let mut cache_buf = vec![0; cache_size as usize]; reader.read_exact(&mut cache_buf)?; - info!( + tracked_info!( "Opened: {}, size: {}, cache_size: {}", - filename, size, cache_size + filename, + size, + cache_size ); Ok(Self { diff --git a/diskann-disk/src/storage/cached_writer.rs b/diskann-disk/src/storage/cached_writer.rs index 50d28a766..b141de888 100644 --- a/diskann-disk/src/storage/cached_writer.rs +++ b/diskann-disk/src/storage/cached_writer.rs @@ -4,8 +4,8 @@ */ use std::io::{Seek, SeekFrom, Write}; +use diskann::tracked_info; use diskann_providers::storage::StorageWriteProvider; -use tracing::info; /// Sequential cached writes with a generic storage provider with write access. pub struct CachedWriter @@ -37,7 +37,7 @@ where return Err(std::io::Error::other("Cache size must be greater than 0")); } - info!("Opened: {}, cache_size: {}", filename, cache_size); + tracked_info!("Opened: {}, cache_size: {}", filename, cache_size); Ok(Self { writer, cache_size, @@ -54,7 +54,7 @@ where } self.writer.flush()?; - info!("Finished writing {}B", self.fsize); + tracked_info!("Finished writing {}B", self.fsize); Ok(()) } diff --git a/diskann-disk/src/storage/disk_index_reader.rs b/diskann-disk/src/storage/disk_index_reader.rs index 60fbecf2b..a75831824 100644 --- a/diskann-disk/src/storage/disk_index_reader.rs +++ b/diskann-disk/src/storage/disk_index_reader.rs @@ -9,7 +9,7 @@ use diskann_providers::storage::StorageReadProvider; use diskann_providers::{storage::PQStorage, utils::load_metadata_from_file}; use crate::search::pq::PQData; -use tracing::info; +use diskann::tracked_info; /// This struct is used by the DiskIndexSearcher to read the index data from storage. Noted that the index data here is different from index graph, /// It includes the PQ data, pivot table, and the warmup query data. @@ -46,7 +46,7 @@ impl DiskIndexReader { pq_pivot_table.get_num_chunks(), storage_provider, )?; - info!( + tracked_info!( "Loaded PQ centroids and in-memory compressed vectors. #points:{} #pq_chunks: {}", metadata.npoints(), pq_pivot_table.get_num_chunks() diff --git a/diskann-disk/src/storage/disk_index_writer.rs b/diskann-disk/src/storage/disk_index_writer.rs index f86313833..4c8de2134 100644 --- a/diskann-disk/src/storage/disk_index_writer.rs +++ b/diskann-disk/src/storage/disk_index_writer.rs @@ -14,12 +14,12 @@ use diskann_providers::{ storage::{get_mem_index_file, path_utility::*}, utils::{save_bytes, READ_WRITE_BLOCK_SIZE}, }; -use tracing::info; use crate::{ data_model::{GraphHeader, GraphMetadata}, storage::{CachedReader, CachedWriter}, }; +use diskann::tracked_info; // Struct DiskIndexWriterState maintains the state of the process of creating a disk // layout using index and associated data. By moving the state to this struct, we @@ -172,7 +172,7 @@ impl DiskIndexWriter { // Create cached reader + writer let actual_file_size = storage_provider.get_length(mem_index_file.as_str())?; - info!("Vamana index file size={}", actual_file_size); + tracked_info!("Vamana index file size={}", actual_file_size); state.muti_shard_index_reader = Some(storage_provider.open_reader(mem_index_file.as_str())?); @@ -180,7 +180,7 @@ impl DiskIndexWriter { if let Some(vamana_reader) = state.muti_shard_index_reader.as_mut() { let index_file_size = vamana_reader.read_u64::()?; if index_file_size != actual_file_size { - info!( + tracked_info!( "Vamana Index file size does not match expected size per meta-data. file size from file: {}, actual file size: {}", index_file_size, actual_file_size ); @@ -415,11 +415,11 @@ impl DiskIndexWriter { let num_nodes_per_block = (block_size as u64) / state.node_len; // 0 if node_len > block_size - info!("block_size: {}B", block_size); - info!("medoid: {}B", state.medoid); - info!("node_len: {}B", state.node_len); - info!("num_nodes_per_sector: {}B", num_nodes_per_block); - info!( + tracked_info!("block_size: {}B", block_size); + tracked_info!("medoid: {}B", state.medoid); + tracked_info!("node_len: {}B", state.node_len); + tracked_info!("num_nodes_per_sector: {}B", num_nodes_per_block); + tracked_info!( "associated_data_length: {}B", state.associated_data_length * mem::size_of::() ); @@ -429,10 +429,10 @@ impl DiskIndexWriter { state.num_pts.div_ceil(num_nodes_per_block) } else { let num_block_per_node = state.node_len.div_ceil(block_size as u64); - info!("num_sector_per_node: {}B", num_block_per_node); + tracked_info!("num_sector_per_node: {}B", num_block_per_node); state.num_pts * num_block_per_node }; - info!("num_blocks: {}B", num_blocks); + tracked_info!("num_blocks: {}B", num_blocks); let disk_layout_file = self.disk_index_file(); { @@ -454,7 +454,7 @@ impl DiskIndexWriter { // Write multiple nodes per sector for sector in 0..num_blocks { if sector % 100_000 == 0 { - info!("Sector #{} written", sector); + tracked_info!("Sector #{} written", sector); } block_buf.fill(0); @@ -485,7 +485,7 @@ impl DiskIndexWriter { for node_idx in 0..state.num_pts { if (node_idx * num_block_per_node).is_multiple_of(100_000) { - info!("Sector #{} written", node_idx * num_block_per_node); + tracked_info!("Sector #{} written", node_idx * num_block_per_node); } self.read_neighbors::(&mut state, &mut multi_block_buf)?; diff --git a/diskann-disk/src/storage/quant/generator.rs b/diskann-disk/src/storage/quant/generator.rs index c14bf998a..0ccc3f02a 100644 --- a/diskann-disk/src/storage/quant/generator.rs +++ b/diskann-disk/src/storage/quant/generator.rs @@ -15,7 +15,6 @@ use diskann_providers::utils::{ }; use diskann_utils::{io::Metadata, views}; use rayon::iter::IndexedParallelIterator; -use tracing::info; use crate::{ build::chunking::{ @@ -24,6 +23,7 @@ use crate::{ }, storage::quant::compressor::{CompressionStage, QuantCompressor}, }; +use diskann::tracked_info; /// [`GeneratorContext`] defines parameters for vector quantization checkpoint state /// @@ -121,7 +121,7 @@ where storage_provider.delete(compressed_path)?; } - info!("Generating quantized data for {}", compressed_path); + tracked_info!("Generating quantized data for {}", compressed_path); let data_reader = &mut storage_provider.open_reader(&self.data_path)?; @@ -148,7 +148,7 @@ where let num_blocks = num_remaining / block_size + !num_remaining.is_multiple_of(block_size) as usize; - info!( + tracked_info!( "Compressing with block size {}, num_remaining {}, num_blocks {}, offset {}, num_points {}", block_size, num_remaining, num_blocks, offset, num_points ); @@ -215,7 +215,7 @@ where )? .map(|processed| processed * block_size + offset); - info!( + tracked_info!( "Quant data generation took {} seconds", timer.elapsed().as_secs_f64() ); diff --git a/diskann-disk/src/storage/quant/pq/pq_generation.rs b/diskann-disk/src/storage/quant/pq/pq_generation.rs index ccd2c30a7..a481fc10d 100644 --- a/diskann-disk/src/storage/quant/pq/pq_generation.rs +++ b/diskann-disk/src/storage/quant/pq/pq_generation.rs @@ -18,9 +18,9 @@ use diskann_providers::{ use diskann_quantization::{product::TransposedTable, CompressInto}; use diskann_utils::views::MatrixBase; use diskann_vector::distance::Metric; -use tracing::info; use crate::storage::quant::compressor::{CompressionStage, QuantCompressor}; +use diskann::tracked_info; pub struct PQGenerationContext<'a, Storage> where @@ -109,7 +109,7 @@ where pool, )?; - info!( + tracked_info!( "PQ pivot generation took {} seconds", timer.elapsed().as_secs_f64() ); diff --git a/diskann-disk/src/utils/aligned_file_reader/storage_provider_aligned_file_reader.rs b/diskann-disk/src/utils/aligned_file_reader/storage_provider_aligned_file_reader.rs index 631934a3f..1fc408c50 100644 --- a/diskann-disk/src/utils/aligned_file_reader/storage_provider_aligned_file_reader.rs +++ b/diskann-disk/src/utils/aligned_file_reader/storage_provider_aligned_file_reader.rs @@ -7,10 +7,10 @@ use std::io::Read; use diskann::ANNResult; use diskann_providers::storage::StorageReadProvider; -use tracing::info; use super::traits::AlignedFileReader; use crate::utils::aligned_file_reader::{AlignedRead, A1}; +use diskann::tracked_info; pub struct StorageProviderAlignedFileReader { data: Vec, @@ -21,7 +21,7 @@ impl StorageProviderAlignedFileReader { storage_provider: &impl StorageReadProvider, file_name: &str, ) -> ANNResult { - info!("Loading data from {}", file_name); + tracked_info!("Loading data from {}", file_name); let file_length = storage_provider.get_length(file_name)?; let mut data = vec![0u8; file_length as usize]; diff --git a/diskann-disk/src/utils/aligned_file_reader/windows_aligned_file_reader.rs b/diskann-disk/src/utils/aligned_file_reader/windows_aligned_file_reader.rs index 543a06d48..601d51e3a 100644 --- a/diskann-disk/src/utils/aligned_file_reader/windows_aligned_file_reader.rs +++ b/diskann-disk/src/utils/aligned_file_reader/windows_aligned_file_reader.rs @@ -12,6 +12,7 @@ use diskann_platform::{ use super::traits::AlignedFileReader; use crate::utils::aligned_file_reader::{AlignedRead, A512}; +use diskann::tracked_debug; pub const MAX_IO_CONCURRENCY: usize = 128; pub const IO_COMPLETION_TIMEOUT: DWORD = u32::MAX; // Infinite timeout. @@ -32,7 +33,7 @@ pub struct WindowsAlignedFileReader { impl WindowsAlignedFileReader { pub fn new(fname: &str) -> ANNResult { let mut io_context = IOContext::new(); - tracing::debug!("Creating file handle for {}", fname); + tracked_debug!("Creating file handle for {}", fname); match unsafe { FileHandle::new(fname, AccessMode::Read, ShareMode::Read) } { Ok(file_handle) => io_context.file_handle = file_handle, Err(err) => { diff --git a/diskann-disk/src/utils/partition.rs b/diskann-disk/src/utils/partition.rs index 0e9a05b0f..3ddaf6d84 100644 --- a/diskann-disk/src/utils/partition.rs +++ b/diskann-disk/src/utils/partition.rs @@ -8,12 +8,12 @@ use diskann_providers::utils::{gen_random_slice, RayonThreadPoolRef, READ_WRITE_ use crate::utils::{compute_closest_centers, k_meanspp_selecting_pivots, run_lloyds}; use rand::Rng; -use tracing::info; use crate::{ disk_index_build_parameter::BYTES_IN_GB, storage::{CachedReader, CachedWriter, DiskIndexWriter}, }; +use diskann::tracked_info; /// Block size for reading/processing large files and matrices in blocks const BLOCK_SIZE_LARGE_FILE: u32 = 10_000; @@ -48,7 +48,7 @@ where &ram_estimator, )?; - info!("Saving shard data into clusters, with only ids"); + tracked_info!("Saving shard data into clusters, with only ids"); shard_data_into_clusters_only_ids::( dataset_file, @@ -85,11 +85,11 @@ where let (train_data_float, num_train, train_dim) = gen_random_slice::(dataset_file, sampling_rate, storage_provider, rng)?; - info!("Loaded {} points for train, dim: {}", num_train, train_dim); + tracked_info!("Loaded {} points for train, dim: {}", num_train, train_dim); let (test_data_float, num_test, test_dim) = gen_random_slice::(dataset_file, sampling_rate, storage_provider, rng)?; - info!("Loaded {} points for test, dim: {}", num_test, test_dim); + tracked_info!("Loaded {} points for test, dim: {}", num_test, test_dim); // Calculate total points accounting for sampling rate let total_points = (num_train as f64 / sampling_rate) as u64; @@ -114,7 +114,7 @@ where pivot_data = vec![0.0; num_parts * train_dim]; // Process Global k-means for kmeans_partitioning Step - info!("Processing global k-means (kmeans_partitioning Step)"); + tracked_info!("Processing global k-means (kmeans_partitioning Step)"); k_meanspp_selecting_pivots( &train_data_float, num_train, @@ -163,7 +163,7 @@ where } } - info!( + tracked_info!( "Partition RAM estimates (GB): {}", partition_stats .iter() @@ -172,7 +172,7 @@ where .join(", ") ); - info!( + tracked_info!( "With {} parts, max estimated RAM usage: {:.2} GB, budget given is {:.2} GB", num_parts, max_ram_usage_in_bytes / BYTES_IN_GB, @@ -182,7 +182,7 @@ where fit_in_ram = false; num_parts += 2; } else { - info!( + tracked_info!( "Found optimal partition count: [parts={}, initial={}, max_ram={:.2}GB, budget={:.2}GB]", num_parts, initial_num_parts, @@ -217,7 +217,7 @@ where partition_count += 1; } - info!( + tracked_info!( "Estimated initial partition count: {} (total points: {}, dimension: {}, k_base: {}, total_ram_estimate: {:.2} GB, ram_budget: {:.2} GB)", partition_count, total_points, @@ -340,16 +340,19 @@ where for i in 0..num_parts { let cur_shard_count = shard_counts[i] as u32; - info!(" shard_{} with npts : {} ", i, cur_shard_count); + tracked_info!(" shard_{} with npts : {} ", i, cur_shard_count); total_count += cur_shard_count; shard_idmap_cached_writers[i].reset()?; shard_idmap_cached_writers[i].write(&cur_shard_count.to_le_bytes())?; shard_idmap_cached_writers[i].flush()?; } - info!( + tracked_info!( "Partitioned {} with replication factor {} to get {} points across {} shards", - num_points, k_base, total_count, num_parts + num_points, + k_base, + total_count, + num_parts ); Ok(()) @@ -411,7 +414,7 @@ fn estimate_cluster_sizes( let cur_shard_count = shard_counts[i] as u32; cluster_sizes.push(cur_shard_count); }); - info!("Estimated cluster sizes: {:?}", cluster_sizes); + tracked_info!("Estimated cluster sizes: {:?}", cluster_sizes); Ok(()) } diff --git a/diskann/src/tracing.rs b/diskann/src/tracing.rs index f1465d0c5..6b1780af2 100644 --- a/diskann/src/tracing.rs +++ b/diskann/src/tracing.rs @@ -48,6 +48,23 @@ macro_rules! tracked_warn { }}; } +#[cfg(feature = "tracing")] +#[macro_export] +macro_rules! tracked_info { + ($($arg:tt)+) => {{ + let location = std::panic::Location::caller(); + ::tracing::info!(diskann.file = location.file(), diskann.line = location.line(), $($arg)+); + }}; +} + +#[cfg(not(feature = "tracing"))] +#[macro_export] +macro_rules! tracked_info { + ($($arg:tt)+) => {{ + $crate::used!($($arg)+); + }}; +} + #[cfg(feature = "tracing")] #[macro_export] macro_rules! tracked_debug {