Skip to content

Commit d67034c

Browse files
authored
Merge branch 'main' into simulator-refactor
2 parents c69637e + 8a30f89 commit d67034c

3 files changed

Lines changed: 93 additions & 46 deletions

File tree

crates/autopilot/src/boundary/events/settlement.rs

Lines changed: 5 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use {
2-
crate::{database::Postgres, domain::settlement},
2+
crate::database::Postgres,
33
alloy::{
44
primitives::Address,
55
rpc::types::{Filter, Log},
@@ -35,16 +35,14 @@ impl AlloyEventRetrieving for GPv2SettlementContract {
3535

3636
pub struct Indexer {
3737
db: Postgres,
38-
start_index: u64,
39-
settlement_observer: settlement::Observer,
38+
start_indexing_block: u64,
4039
}
4140

4241
impl Indexer {
43-
pub fn new(db: Postgres, settlement_observer: settlement::Observer, start_index: u64) -> Self {
42+
pub fn new(db: Postgres, start_indexing_block: u64) -> Self {
4443
Self {
4544
db,
46-
settlement_observer,
47-
start_index,
45+
start_indexing_block,
4846
}
4947
}
5048
}
@@ -57,7 +55,7 @@ impl EventStoring<(GPv2SettlementEvents, Log)> for Indexer {
5755
async fn last_event_block(&self) -> Result<u64> {
5856
super::read_last_block_from_db(&self.db.pool, INDEX_NAME)
5957
.await
60-
.map(|last_block| last_block.max(self.start_index))
58+
.map(|last_block| last_block.max(self.start_indexing_block))
6159
}
6260

6361
async fn persist_last_indexed_block(&mut self, latest_block: u64) -> Result<()> {
@@ -74,21 +72,13 @@ impl EventStoring<(GPv2SettlementEvents, Log)> for Indexer {
7472
crate::database::events::replace_events(&mut transaction, events, from_block).await?;
7573
database::settlements::delete(&mut transaction, from_block).await?;
7674
transaction.commit().await?;
77-
78-
self.settlement_observer
79-
.post_process_outstanding_settlement_transactions()
80-
.await;
8175
Ok(())
8276
}
8377

8478
async fn append_events(&mut self, events: Vec<(GPv2SettlementEvents, Log)>) -> Result<()> {
8579
let mut transaction = self.db.pool.begin().await?;
8680
crate::database::events::append_events(&mut transaction, events).await?;
8781
transaction.commit().await?;
88-
89-
self.settlement_observer
90-
.post_process_outstanding_settlement_transactions()
91-
.await;
9282
Ok(())
9383
}
9484
}

crates/autopilot/src/maintenance.rs

Lines changed: 80 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,19 @@ use {
33
boundary::events::settlement::{GPv2SettlementContract, Indexer},
44
database::{
55
Postgres,
6+
ethflow_events::event_retriever::EthFlowRefundRetriever,
67
onchain_order_events::{
78
OnchainOrderParser,
89
ethflow_events::{EthFlowData, EthFlowDataForDb},
910
event_retriever::CoWSwapOnchainOrdersContract,
1011
},
1112
},
13+
domain::settlement,
1214
event_updater::EventUpdater,
1315
},
1416
anyhow::Result,
1517
ethrpc::block_stream::{BlockInfo, CurrentBlockWatcher, into_stream},
16-
futures::StreamExt,
18+
futures::{FutureExt, StreamExt},
1719
prometheus::{
1820
HistogramVec,
1921
IntCounterVec,
@@ -27,11 +29,13 @@ use {
2729
},
2830
tokio::sync::watch,
2931
tokio_stream::wrappers::WatchStream,
32+
tracing::Instrument,
3033
};
3134

3235
/// Component to sync with the maintenance logic that runs in a background task.
33-
/// This allows us to run the maintenance logic ASAP but still wait for it to
34-
/// finish in a convenient manner.
36+
/// This allows us to run the maintenance logic as soon as we see a new block
37+
/// while still making the autopilot run loop only wait for updates that are
38+
/// essential for building new auctions.
3539
#[derive(Clone)]
3640
pub struct MaintenanceSync {
3741
/// How long the autopilot wants to wait at most.
@@ -75,19 +79,26 @@ pub struct Maintenance {
7579
/// All indexing tasks to keep cow amms up to date.
7680
cow_amm_indexer: Vec<Arc<dyn Maintaining>>,
7781
/// Tasks to index ethflow orders that were submitted onchain.
78-
ethflow_indexer: Vec<EthflowIndexer>,
82+
ethflow_order_indexer: Vec<EthflowOrderIndexer>,
83+
/// Tasks to index ethflow refunds.
84+
ethflow_refund_indexer: Vec<EthflowRefundIndexer>,
85+
/// Component to correctly attribute a settlement to a proposed solution.
86+
settlement_observer: settlement::Observer,
7987
}
8088

8189
impl Maintenance {
8290
pub fn new(
8391
settlement_indexer: EventUpdater<Indexer, GPv2SettlementContract>,
8492
db_cleanup: Postgres,
93+
settlement_observer: settlement::Observer,
8594
) -> Self {
8695
Self {
8796
settlement_indexer,
8897
db_cleanup,
8998
cow_amm_indexer: Default::default(),
90-
ethflow_indexer: Default::default(),
99+
ethflow_order_indexer: Default::default(),
100+
ethflow_refund_indexer: Default::default(),
101+
settlement_observer,
91102
}
92103
}
93104

@@ -108,7 +119,12 @@ impl Maintenance {
108119
.next()
109120
.await
110121
.expect("block stream terminated unexpectedly");
111-
self.index_until_block(block, &sender).await;
122+
self.index_until_block(block, &sender)
123+
.instrument(tracing::info_span!(
124+
"autopilot_maintenance",
125+
block = block.number
126+
))
127+
.await;
112128
}
113129
});
114130

@@ -122,33 +138,45 @@ impl Maintenance {
122138
metrics().last_seen_block.set(block.number);
123139
let start = Instant::now();
124140

125-
if let Err(err) = self.update_inner().await {
126-
tracing::warn!(?err, block = block.number, "failed to run maintenance");
141+
if let Err(err) = self.run_essential_maintenance().await {
142+
tracing::warn!(?err, "failed to run essential maintenance");
127143
metrics().updates.with_label_values(&["error"]).inc();
128144
return;
129145
}
130146

131147
tracing::info!(
132-
block = block.number,
133148
time = ?start.elapsed(),
134-
"successfully ran maintenance task"
149+
"successfully ran essential maintenance tasks"
135150
);
136151
metrics().last_updated_block.set(block.number);
137152
metrics().updates.with_label_values(&["success"]).inc();
138153
if let Err(err) = last_processed_block.send(block.number) {
139154
tracing::warn!(?err, "nobody listening for processed blocks anymore");
140155
}
156+
157+
// only after we informed the run_loop that the essential updates are done we
158+
// kick off the optional maintenance tasks
159+
let start = Instant::now();
160+
if let Err(err) = self.run_optional_maintenance().await {
161+
tracing::warn!(?err, "failed to run optional maintenance");
162+
return;
163+
}
164+
tracing::info!(
165+
time = ?start.elapsed(),
166+
"successfully ran optional maintenance tasks"
167+
);
141168
}
142169

143-
async fn update_inner(&self) -> Result<()> {
144-
let _timer =
145-
observe::metrics::metrics().on_auction_overhead_start("autopilot", "maintenance_total");
170+
/// Runs all the maintenance tasks that are needed to ensure the next
171+
/// auction gets built using the most up-to-date information.
172+
async fn run_essential_maintenance(&self) -> Result<()> {
173+
let _timer = observe::metrics::metrics()
174+
.on_auction_overhead_start("autopilot", "maintenance_essential");
146175
tokio::try_join!(
147176
Self::timed_future(
148177
"settlement_indexer",
149178
self.settlement_indexer.run_maintenance()
150179
),
151-
Self::timed_future("db_cleanup", self.db_cleanup.run_maintenance()),
152180
Self::timed_future(
153181
"cow_amm_indexer",
154182
futures::future::try_join_all(
@@ -158,9 +186,9 @@ impl Maintenance {
158186
),
159187
),
160188
Self::timed_future(
161-
"ethflow_indexer",
189+
"ethflow_order_indexer",
162190
futures::future::try_join_all(
163-
self.ethflow_indexer
191+
self.ethflow_order_indexer
164192
.iter()
165193
.map(|indexer| indexer.run_maintenance()),
166194
),
@@ -170,10 +198,41 @@ impl Maintenance {
170198
Ok(())
171199
}
172200

201+
/// Runs all the maintenance tasks that should run eventually but are not
202+
/// very time sensitive.
203+
async fn run_optional_maintenance(&self) -> Result<()> {
204+
let _timer = observe::metrics::metrics()
205+
.on_auction_overhead_start("autopilot", "maintenance_optional");
206+
tokio::try_join!(
207+
Self::timed_future("db_cleanup", self.db_cleanup.run_maintenance()),
208+
Self::timed_future(
209+
"ethflow_refund_indexer",
210+
futures::future::try_join_all(
211+
self.ethflow_refund_indexer
212+
.iter()
213+
.map(|indexer| indexer.run_maintenance()),
214+
),
215+
),
216+
Self::timed_future(
217+
"settlement_attribution",
218+
self.settlement_observer
219+
.post_process_outstanding_settlement_transactions()
220+
.map(|_| Ok(()))
221+
)
222+
)?;
223+
224+
Ok(())
225+
}
226+
173227
/// Registers all maintenance tasks that are necessary to correctly support
174228
/// ethflow orders.
175-
pub fn add_ethflow_indexer(&mut self, ethflow_indexer: EthflowIndexer) {
176-
self.ethflow_indexer.push(ethflow_indexer);
229+
pub fn add_ethflow_indexing(
230+
&mut self,
231+
order_indexer: EthflowOrderIndexer,
232+
refund_indexer: EthflowRefundIndexer,
233+
) {
234+
self.ethflow_order_indexer.push(order_indexer);
235+
self.ethflow_refund_indexer.push(refund_indexer);
177236
}
178237

179238
/// Registers all maintenance tasks that are necessary to correctly support
@@ -194,9 +253,11 @@ impl Maintenance {
194253
}
195254
}
196255

197-
type EthflowIndexer =
256+
type EthflowOrderIndexer =
198257
EventUpdater<OnchainOrderParser<EthFlowData, EthFlowDataForDb>, CoWSwapOnchainOrdersContract>;
199258

259+
type EthflowRefundIndexer = EventUpdater<Postgres, EthFlowRefundRetriever>;
260+
200261
#[derive(prometheus_metric_storage::MetricStorage)]
201262
#[metric(subsystem = "autopilot_maintenance")]
202263
struct Metrics {

crates/autopilot/src/run.rs

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@ use {
4646
baseline_solver::BaseTokens,
4747
code_fetching::CachedCodeFetcher,
4848
http_client::HttpClientFactory,
49-
maintenance::ServiceMaintenance,
5049
order_quoting::{self, OrderQuoter},
5150
price_estimation::factory::{self, PriceEstimatorFactory},
5251
signature_validator,
@@ -426,8 +425,6 @@ pub async fn run(args: Arguments, shutdown_controller: ShutdownController) {
426425
infra::persistence::Persistence::new(args.s3.into().unwrap(), Arc::new(db_write.clone()))
427426
.instrument(info_span!("persistence_init"))
428427
.await;
429-
let settlement_observer =
430-
crate::domain::settlement::Observer::new(eth.clone(), persistence.clone());
431428
let settlement_contract_start_index = match GPv2Settlement::deployment_block(&chain_id) {
432429
Some(block) => {
433430
tracing::debug!(block, "found settlement contract deployment");
@@ -449,7 +446,6 @@ pub async fn run(args: Arguments, shutdown_controller: ShutdownController) {
449446
),
450447
boundary::events::settlement::Indexer::new(
451448
db_write.clone(),
452-
settlement_observer,
453449
settlement_contract_start_index,
454450
),
455451
block_retriever.clone(),
@@ -568,8 +564,14 @@ pub async fn run(args: Arguments, shutdown_controller: ShutdownController) {
568564
// updated in background task
569565
let trusted_tokens =
570566
AutoUpdatingTokenList::from_configuration(market_makable_token_list_configuration).await;
567+
let settlement_observer =
568+
crate::domain::settlement::Observer::new(eth.clone(), persistence.clone());
571569

572-
let mut maintenance = Maintenance::new(settlement_event_indexer, db_write.clone());
570+
let mut maintenance = Maintenance::new(
571+
settlement_event_indexer,
572+
db_write.clone(),
573+
settlement_observer,
574+
);
573575
maintenance.add_cow_amm_indexer(&cow_amm_registry);
574576

575577
if !args.ethflow_contracts.is_empty() {
@@ -626,13 +628,7 @@ pub async fn run(args: Arguments, shutdown_controller: ShutdownController) {
626628
.await
627629
.expect("Should be able to initialize event updater. Database read issues?");
628630

629-
maintenance.add_ethflow_indexer(onchain_order_indexer);
630-
// refunds are not critical for correctness and can therefore be indexed
631-
// sporadically in a background task
632-
let service_maintainer = ServiceMaintenance::new(vec![Arc::new(refund_event_handler)]);
633-
tokio::task::spawn(
634-
service_maintainer.run_maintenance_on_new_block(eth.current_block().clone()),
635-
);
631+
maintenance.add_ethflow_indexing(onchain_order_indexer, refund_event_handler);
636632
}
637633

638634
let run_loop_config = run_loop::Config {

0 commit comments

Comments
 (0)