Skip to content

Commit 32b0408

Browse files
authored
Bound native price cache with moka (#4154)
# Description The native price cache is backed by an unbounded `Mutex<HashMap<Address, CachedResult>>`. Entries are inserted on every price estimate but never removed. Once at steady state with many unique tokens, the cache grows without bound. The size limit is currently hardcoded at 20k, since on mainnet we have around that number of unique tokens: ```sql SELECT COUNT(*) AS unique_token_count FROM (SELECT sell_token AS token FROM orders UNION SELECT buy_token AS token FROM orders) t; Returns -> 20109 ``` Per cache entry breakdown: - CachedResult value: ~32 bytes - Address key: 20 bytes - Moka internal overhead: ~100-120 bytes - Total per entry: ~170 bytes At 20,000 entries: ~3.4 MB upper bound # Changes - Replace the `Mutex<HashMap<...>>` cache with `moka::sync::Cache`. - Adapt get_cached_price(): - No more MutexGuard parameter. Use data.get(&token) which returns Option<CachedResult> (moka clones the value). Check staleness + is_ready() on the returned value. No `requested_at` to update, since moka tracks access internally. - Remove the `requested_at` field from the `CachedResult` since it is not needed anymore and turned out to be dead code. - Adapt the `insert()` function to use `entry_by_ref().and_compute_with()`, which gives gives atomic read-modify-write for `accumulative_errors_count`. ## How to test Existing tests.
1 parent 95b5017 commit 32b0408

3 files changed

Lines changed: 41 additions & 39 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/shared/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ tracing = { workspace = true }
5353
tracing-subscriber = { workspace = true, features = ["env-filter", "fmt", "time"] }
5454
url = { workspace = true }
5555

56+
moka = { workspace = true, features = ["sync"] }
5657
mockall = { workspace = true, optional = true }
5758

5859
[dev-dependencies]

crates/shared/src/price_estimation/native_price_cache.rs

Lines changed: 39 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use {
1313
rand::Rng,
1414
std::{
1515
collections::{HashMap, HashSet},
16-
sync::{Arc, Mutex, MutexGuard},
16+
sync::Arc,
1717
time::{Duration, Instant},
1818
},
1919
tokio::time,
@@ -94,7 +94,6 @@ type CacheEntry = Result<f64, PriceEstimationError>;
9494
struct CachedResult {
9595
result: CacheEntry,
9696
updated_at: Instant,
97-
requested_at: Instant,
9897
accumulative_errors_count: u32,
9998
}
10099

@@ -112,14 +111,12 @@ impl CachedResult {
112111
Self {
113112
result,
114113
updated_at: now,
115-
requested_at: now,
116114
accumulative_errors_count: u32::from(is_accumulating_error),
117115
}
118116
}
119117

120118
fn update(&mut self, result: CacheEntry) {
121119
let now = Instant::now();
122-
self.requested_at = now;
123120
self.updated_at = now;
124121
self.accumulative_errors_count = match result {
125122
Err(PriceEstimationError::EstimatorInternal(_)) => self.accumulative_errors_count + 1,
@@ -159,8 +156,10 @@ fn should_cache(result: &Result<f64, PriceEstimationError>) -> bool {
159156
#[derive(Clone)]
160157
pub struct Cache(Arc<CacheInner>);
161158

159+
const MAX_CACHE_SIZE: u64 = 20_000;
160+
162161
struct CacheInner {
163-
data: Mutex<HashMap<Address, CachedResult>>,
162+
data: moka::sync::Cache<Address, CachedResult>,
164163
max_age: Duration,
165164
}
166165

@@ -169,26 +168,25 @@ impl Cache {
169168
let mut rng = rand::thread_rng();
170169
let now = std::time::Instant::now();
171170

172-
let data = initial_prices
173-
.into_iter()
174-
.filter_map(|(token, price)| {
171+
let data = moka::sync::Cache::builder()
172+
.max_capacity(MAX_CACHE_SIZE)
173+
.build();
174+
175+
for (token, price) in initial_prices {
176+
if let Some(price) = from_normalized_price(price) {
175177
let updated_at = Self::random_updated_at(max_age, now, &mut rng);
176-
Some((
178+
data.insert(
177179
token,
178180
CachedResult {
179-
result: Ok(from_normalized_price(price)?),
181+
result: Ok(price),
180182
updated_at,
181-
requested_at: now,
182183
accumulative_errors_count: 0,
183184
},
184-
))
185-
})
186-
.collect::<HashMap<_, _>>();
187-
188-
Self(Arc::new(CacheInner {
189-
data: Mutex::new(data),
190-
max_age,
191-
}))
185+
);
186+
}
187+
}
188+
189+
Self(Arc::new(CacheInner { data, max_age }))
192190
}
193191

194192
fn max_age(&self) -> Duration {
@@ -205,19 +203,19 @@ impl Cache {
205203
}
206204

207205
fn len(&self) -> usize {
208-
self.0.data.lock().unwrap().len()
206+
// Should never fire since we are bounded with MAX_CACHE_SIZE
207+
usize::try_from(self.0.data.entry_count()).expect("cache size should fit in a usize")
209208
}
210209

211210
fn get_cached_price(
212211
token: Address,
213212
now: Instant,
214-
cache: &mut MutexGuard<HashMap<Address, CachedResult>>,
213+
cache: &moka::sync::Cache<Address, CachedResult>,
215214
max_age: &Duration,
216215
) -> Option<CachedResult> {
217-
let entry = cache.get_mut(&token)?;
218-
entry.requested_at = now;
216+
let entry = cache.get(&token)?;
219217
let is_recent = now.saturating_duration_since(entry.updated_at) < *max_age;
220-
(is_recent && entry.is_ready()).then_some(entry.clone())
218+
(is_recent && entry.is_ready()).then_some(entry)
221219
}
222220

223221
/// Only returns prices that are currently cached.
@@ -226,10 +224,9 @@ impl Cache {
226224
tokens: &[Address],
227225
) -> HashMap<Address, Result<f64, PriceEstimationError>> {
228226
let now = Instant::now();
229-
let mut cache = self.0.data.lock().unwrap();
230227
let mut results = HashMap::default();
231228
for token in tokens {
232-
let cached = Self::get_cached_price(*token, now, &mut cache, &self.0.max_age);
229+
let cached = Self::get_cached_price(*token, now, &self.0.data, &self.0.max_age);
233230
let label = if cached.is_some() { "hits" } else { "misses" };
234231
CacheMetrics::get()
235232
.native_price_cache_access
@@ -243,11 +240,17 @@ impl Cache {
243240
}
244241

245242
fn insert(&self, token: Address, result: CacheEntry) {
246-
let mut cache = self.0.data.lock().unwrap();
247-
cache
248-
.entry(token)
249-
.and_modify(|value| value.update(result.clone()))
250-
.or_insert_with(|| CachedResult::new(result));
243+
self.0
244+
.data
245+
.entry_by_ref(&token)
246+
.and_upsert_with(|maybe_entry| match maybe_entry {
247+
Some(entry) => {
248+
let mut cached = entry.into_value();
249+
cached.update(result);
250+
cached
251+
}
252+
None => CachedResult::new(result),
253+
});
251254
}
252255
}
253256

@@ -311,11 +314,10 @@ impl CachingNativePriceEstimator {
311314
let estimates = tokens.into_iter().map(move |token| async move {
312315
// check if the price is cached by now
313316
let now = Instant::now();
317+
if let Some(cached) =
318+
Cache::get_cached_price(token, now, &self.0.cache.0.data, &max_age)
314319
{
315-
let mut cache = self.0.cache.0.data.lock().unwrap();
316-
if let Some(cached) = Cache::get_cached_price(token, now, &mut cache, &max_age) {
317-
return (token, cached.result);
318-
}
320+
return (token, cached.result);
319321
}
320322

321323
let approximation = self
@@ -390,8 +392,7 @@ impl NativePriceEstimating for CachingNativePriceEstimator {
390392
async move {
391393
let cached = {
392394
let now = Instant::now();
393-
let mut cache = self.0.cache.0.data.lock().unwrap();
394-
Cache::get_cached_price(token, now, &mut cache, &self.0.cache.0.max_age)
395+
Cache::get_cached_price(token, now, &self.0.cache.0.data, &self.0.cache.0.max_age)
395396
};
396397

397398
let label = if cached.is_some() { "hits" } else { "misses" };
@@ -565,8 +566,7 @@ mod tests {
565566
{
566567
// Check that `updated_at` timestamps are initialized with
567568
// reasonable values.
568-
let data = estimator.cache().0.data.lock().unwrap();
569-
for value in data.values() {
569+
for (_, value) in &estimator.cache().0.data {
570570
let elapsed = value.updated_at.elapsed();
571571
assert!(elapsed >= min_age && elapsed <= max_age);
572572
}

0 commit comments

Comments
 (0)