-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Expand file tree
/
Copy pathbuffered_call_cache.rs
More file actions
146 lines (126 loc) · 4.26 KB
/
buffered_call_cache.rs
File metadata and controls
146 lines (126 loc) · 4.26 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
use std::{
collections::HashMap,
sync::{Arc, Mutex},
};
use async_trait::async_trait;
use graph::{
cheap_clone::CheapClone,
components::store::EthereumCallCache,
data::store::ethereum::call,
prelude::{BlockPtr, CachedEthereumCall},
slog::{error, Logger},
};
/// A wrapper around an Ethereum call cache that buffers call results in
/// memory for the duration of a block. If `get_call` or `set_call` are
/// called with a different block pointer than the one used in the previous
/// call, the buffer is cleared.
pub struct BufferedCallCache {
call_cache: Arc<dyn EthereumCallCache>,
buffer: Arc<Mutex<HashMap<call::Request, call::Retval>>>,
block: Arc<Mutex<Option<BlockPtr>>>,
}
impl BufferedCallCache {
pub fn new(call_cache: Arc<dyn EthereumCallCache>) -> Self {
Self {
call_cache,
buffer: Arc::new(Mutex::new(HashMap::new())),
block: Arc::new(Mutex::new(None)),
}
}
fn check_block(&self, block: &BlockPtr) {
let mut self_block = self.block.lock().unwrap();
if self_block.as_ref() != Some(block) {
*self_block = Some(block.clone());
self.buffer.lock().unwrap().clear();
}
}
fn get(&self, call: &call::Request) -> Option<call::Response> {
let buffer = self.buffer.lock().unwrap();
buffer.get(call).map(|retval| {
call.cheap_clone()
.response(retval.clone(), call::Source::Memory)
})
}
}
#[async_trait]
impl EthereumCallCache for BufferedCallCache {
async fn get_call(
&self,
call: &call::Request,
block: BlockPtr,
) -> Result<Option<call::Response>, graph::prelude::Error> {
self.check_block(&block);
if let Some(value) = self.get(call) {
return Ok(Some(value));
}
let result = self.call_cache.get_call(call, block).await?;
let mut buffer = self.buffer.lock().unwrap();
if let Some(call::Response {
retval,
req: _,
source: _,
}) = &result
{
buffer.insert(call.cheap_clone(), retval.clone());
}
Ok(result)
}
async fn get_calls(
&self,
reqs: &[call::Request],
block: BlockPtr,
) -> Result<(Vec<call::Response>, Vec<call::Request>), graph::prelude::Error> {
self.check_block(&block);
let mut missing = Vec::new();
let mut resps = Vec::new();
for call in reqs {
match self.get(call) {
Some(resp) => resps.push(resp),
None => missing.push(call.cheap_clone()),
}
}
let (stored, calls) = self.call_cache.get_calls(&missing, block).await?;
{
let mut buffer = self.buffer.lock().unwrap();
for resp in &stored {
buffer.insert(resp.req.cheap_clone(), resp.retval.clone());
}
}
resps.extend(stored);
Ok((resps, calls))
}
async fn get_calls_in_block(
&self,
block: BlockPtr,
) -> Result<Vec<CachedEthereumCall>, graph::prelude::Error> {
self.call_cache.get_calls_in_block(block).await
}
async fn set_call(
self: Arc<Self>,
logger: &Logger,
call: call::Request,
block: BlockPtr,
return_value: call::Retval,
) -> Result<(), graph::prelude::Error> {
self.check_block(&block);
// Enter the call into the in-memory cache immediately so that
// handlers will find it, but add it to the underlying cache in the
// background so we do not have to wait for that as it will be a
// cache backed by the database
{
let mut buffer = self.buffer.lock().unwrap();
buffer.insert(call.cheap_clone(), return_value.clone());
}
let cache = self.call_cache.cheap_clone();
let logger = logger.cheap_clone();
if let Err(e) = cache
.set_call(&logger, call.cheap_clone(), block, return_value)
.await
{
error!(logger, "BufferedCallCache: call cache set error";
"contract_address" => format!("{:?}", call.address),
"error" => e.to_string())
}
Ok(())
}
}