Skip to content

Commit 2de8dea

Browse files
authored
Merge pull request #63 from NillionNetwork/fix/sync-mint
fix: sync periodically and only mint if budget ran out
2 parents bc2577f + 9ccf0c0 commit 2de8dea

9 files changed

Lines changed: 246 additions & 128 deletions

File tree

crates/blacklight-contract-clients/src/protocol_config.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ sol!(
2727

2828
function nodeVersion() external view returns (string memory);
2929
function setNodeVersion(string calldata newVersion) external;
30+
function rewardPolicy() external view override returns (address);
3031
}
3132
);
3233

@@ -64,6 +65,11 @@ impl<P: Provider + Clone> ProtocolConfigClient<P> {
6465
Ok(self.contract.nodeVersion().call().await?)
6566
}
6667

68+
/// Returns the rewards policy address.
69+
pub async fn rewards_policy_address(&self) -> Result<Address> {
70+
Ok(self.contract.rewardPolicy().call().await?)
71+
}
72+
6773
// ------------------------------------------------------------------------
6874
// Admin Functions (owner only)
6975
// ------------------------------------------------------------------------

keeper/src/args.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,10 @@ pub struct CliArgs {
3838
#[arg(long, env = "L1_EMISSIONS_CONTROLLER_ADDRESS")]
3939
pub l1_emissions_controller_address: Address,
4040

41+
/// L2 staking operators contract address.
42+
#[arg(long, env = "L2_STAKING_OPERATORS_ADDRESS")]
43+
pub l2_staking_operators_address: Address,
44+
4145
/// Private key for contract interactions
4246
#[arg(long, env = "PRIVATE_KEY")]
4347
pub private_key: String,
@@ -79,6 +83,7 @@ pub struct KeeperConfig {
7983
pub l2_heartbeat_manager_address: Address,
8084
pub l2_jailing_policy_address: Option<Address>,
8185
pub l1_emissions_controller_address: Address,
86+
pub l2_staking_operators_address: Address,
8287
pub private_key: String,
8388
pub l1_bridge_value: U256,
8489
pub lookback_blocks: u64,
@@ -96,6 +101,7 @@ impl KeeperConfig {
96101
let l1_rpc_url = args.l1_rpc_url;
97102
let l2_heartbeat_manager_address = args.l2_heartbeat_manager_address;
98103
let l1_emissions_controller_address = args.l1_emissions_controller_address;
104+
let l2_staking_operators_address = args.l2_staking_operators_address;
99105
let l2_jailing_policy_address = args.l2_jailing_policy_address;
100106
let disable_jailing = args.disable_jailing;
101107
let private_key = args.private_key;
@@ -134,6 +140,7 @@ impl KeeperConfig {
134140
l2_heartbeat_manager_address,
135141
l2_jailing_policy_address,
136142
l1_emissions_controller_address,
143+
l2_staking_operators_address,
137144
private_key,
138145
l1_bridge_value,
139146
lookback_blocks,

keeper/src/clients.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,10 @@ use alloy::{
55
providers::{DynProvider, Provider, ProviderBuilder, WsConnect},
66
signers::local::PrivateKeySigner,
77
};
8-
use blacklight_contract_clients::HearbeatManager;
8+
use blacklight_contract_clients::{HearbeatManager, StakingOperators};
99

1010
pub type HeartbeatManagerInstance = HearbeatManager::HearbeatManagerInstance<DynProvider>;
11+
pub type StakingOperatorsInstance = StakingOperators::StakingOperatorsInstance<DynProvider>;
1112
pub type JailingPolicyInstance = JailingPolicy::JailingPolicyInstance<DynProvider>;
1213
pub type EmissionsControllerInstance =
1314
EmissionsController::EmissionsControllerInstance<DynProvider>;
@@ -39,6 +40,7 @@ async fn connect_ws(
3940
/// WebSocket-based client for L2 keeper duties (heartbeat rounds + jailing)
4041
pub struct L2KeeperClient {
4142
heartbeat_manager: HeartbeatManagerInstance,
43+
staking_operators: StakingOperatorsInstance,
4244
jailing_policy: Option<JailingPolicyInstance>,
4345
provider: DynProvider,
4446
wallet: EthereumWallet,
@@ -48,17 +50,21 @@ impl L2KeeperClient {
4850
pub async fn new(
4951
rpc_url: String,
5052
heartbeat_manager_address: Address,
53+
staking_operators_address: Address,
5154
jailing_policy_address: Option<Address>,
5255
private_key: String,
5356
) -> anyhow::Result<Self> {
5457
let (provider, wallet) = connect_ws(&rpc_url, &private_key).await?;
5558
let heartbeat_manager =
5659
HeartbeatManagerInstance::new(heartbeat_manager_address, provider.clone());
60+
let staking_operators =
61+
StakingOperatorsInstance::new(staking_operators_address, provider.clone());
5762
let jailing_policy =
5863
jailing_policy_address.map(|addr| JailingPolicyInstance::new(addr, provider.clone()));
5964

6065
Ok(Self {
6166
heartbeat_manager,
67+
staking_operators,
6268
jailing_policy,
6369
provider,
6470
wallet,
@@ -69,6 +75,10 @@ impl L2KeeperClient {
6975
&self.heartbeat_manager
7076
}
7177

78+
pub fn staking_operators(&self) -> &StakingOperatorsInstance {
79+
&self.staking_operators
80+
}
81+
7282
pub fn jailing_policy(&self) -> Option<&JailingPolicyInstance> {
7383
self.jailing_policy.as_ref()
7484
}

keeper/src/l1.rs

Lines changed: 105 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,35 @@
1-
use crate::{args::KeeperConfig, clients::L1EmissionsClient, metrics};
2-
use alloy::{
3-
eips::BlockNumberOrTag,
4-
primitives::{U256, utils::format_ether},
5-
providers::Provider,
1+
use crate::{
2+
args::KeeperConfig,
3+
clients::{L1EmissionsClient, L2KeeperClient, RewardPolicyInstance},
4+
metrics,
65
};
6+
use alloy::{eips::BlockNumberOrTag, primitives::U256, providers::Provider};
77
use anyhow::{Context, Result, bail};
8+
use blacklight_contract_clients::ProtocolConfig::ProtocolConfigInstance;
9+
use std::{sync::Arc, time::Duration};
810
use tokio::time::interval;
9-
use tracing::{debug, error, info};
11+
use tracing::{debug, error, info, warn};
1012

11-
pub struct L1Supervisor {
12-
client: L1EmissionsClient,
13+
pub struct EmissionsSupervisor {
14+
l1_client: L1EmissionsClient,
15+
l2_client: Arc<L2KeeperClient>,
1316
config: KeeperConfig,
1417
}
1518

16-
impl L1Supervisor {
17-
pub async fn new(config: KeeperConfig) -> Result<Self> {
18-
let client = L1EmissionsClient::new(
19+
impl EmissionsSupervisor {
20+
pub async fn new(config: KeeperConfig, l2_client: Arc<L2KeeperClient>) -> Result<Self> {
21+
let l1_client = L1EmissionsClient::new(
1922
config.l1_rpc_url.clone(),
2023
config.l1_emissions_controller_address,
2124
config.private_key.clone(),
2225
)
2326
.await
2427
.context("Failed to create L1 client")?;
25-
Ok(Self { client, config })
28+
Ok(Self {
29+
l1_client,
30+
l2_client,
31+
config,
32+
})
2633
}
2734

2835
pub fn spawn(self) {
@@ -31,6 +38,9 @@ impl L1Supervisor {
3138
}
3239

3340
async fn run(self) {
41+
// Start by publishing this so we don't have to wait for an epoch to export it.
42+
self.publish_balance_metric().await;
43+
3444
let mut ticker = interval(self.config.emissions_interval);
3545
loop {
3646
ticker.tick().await;
@@ -44,57 +54,115 @@ impl L1Supervisor {
4454
}
4555

4656
async fn try_process_emissions(&self) -> anyhow::Result<()> {
47-
let emissions = self.client.emissions();
57+
if !self.is_next_epoch_ready().await? {
58+
debug!("Next epoch is not ready yet");
59+
return Ok(());
60+
}
61+
if !self.is_l2_budget_depleted().await? {
62+
metrics::get().l1.epochs.set_blocked(true);
63+
warn!("Next epoch is ready but budget is not depleted yet");
64+
return Ok(());
65+
}
66+
metrics::get().l1.epochs.set_blocked(false);
67+
68+
let emissions = self.l1_client.emissions();
69+
info!("Epoch ready for minting, making sure spendable budget is 0");
70+
71+
info!("Minting and bridging next emission epoch");
72+
73+
let call = emissions
74+
.mintAndBridgeNextEpoch()
75+
.value(self.config.l1_bridge_value);
76+
match call.send().await {
77+
Ok(pending) => {
78+
let receipt = pending.get_receipt().await?;
79+
let tx_hash = receipt.transaction_hash;
80+
self.publish_balance_metric().await;
81+
82+
info!("Emission minted and bridged on tx {tx_hash}");
83+
Ok(())
84+
}
85+
Err(e) => {
86+
bail!("Failed to mint tokens: {e}")
87+
}
88+
}
89+
}
90+
91+
async fn is_next_epoch_ready(&self) -> anyhow::Result<bool> {
92+
let emissions = self.l1_client.emissions();
4893
let minted_epochs = emissions.mintedEpochs().call().await?;
4994
let total_epochs = emissions.epochs().call().await?;
5095
metrics::get().l1.epochs.set_total(total_epochs);
5196
metrics::get().l1.epochs.set_minted(minted_epochs);
5297

5398
if minted_epochs >= total_epochs {
54-
return Ok(());
99+
return Ok(false);
55100
}
56101

57102
let ready_at = emissions.nextEpochReadyAt().call().await?;
58103
let latest = self
59-
.client
104+
.l1_client
60105
.provider()
61106
.get_block_by_number(BlockNumberOrTag::Latest)
62107
.await?
63108
.ok_or_else(|| anyhow::anyhow!("Missing latest block"))?;
64109
let now = U256::from(latest.header.timestamp);
65110

66111
if now < ready_at {
67-
debug!(
112+
let missing = ready_at.saturating_sub(now);
113+
let missing = match u64::try_from(missing) {
114+
Ok(missing) => Duration::from_secs(missing),
115+
Err(_) => Duration::MAX,
116+
};
117+
info!(
68118
ready_at = ?ready_at,
69119
now = ?now,
70-
"Next emission not ready"
120+
"Next emission will be ready in {missing:?}"
71121
);
72-
return Ok(());
122+
return Ok(false);
73123
}
124+
Ok(true)
125+
}
74126

75-
info!(
76-
minted_epochs = ?minted_epochs,
77-
total_epochs = ?total_epochs,
78-
"Minting and bridging next emission epoch"
79-
);
127+
async fn is_l2_budget_depleted(&self) -> anyhow::Result<bool> {
128+
let protocol_config_address = self
129+
.l2_client
130+
.staking_operators()
131+
.protocolConfig()
132+
.call()
133+
.await
134+
.context("Failed to get protocol config address")?;
135+
let protocol_config =
136+
ProtocolConfigInstance::new(protocol_config_address, self.l2_client.provider());
137+
let reward_policy_address = protocol_config
138+
.rewardPolicy()
139+
.call()
140+
.await
141+
.context("Failed to get reward policy contract address")?;
142+
let reward_policy =
143+
RewardPolicyInstance::new(reward_policy_address, self.l2_client.provider());
144+
let spendable_budget = reward_policy
145+
.spendableBudget()
146+
.call()
147+
.await
148+
.context("Failed to get spendable budget")?;
149+
let remaining = reward_policy
150+
.streamRemaining()
151+
.call()
152+
.await
153+
.context("Failed to get stream remaining")?;
154+
let budget = spendable_budget.saturating_add(remaining);
155+
Ok(budget == U256::ZERO)
156+
}
80157

81-
let call = emissions
82-
.mintAndBridgeNextEpoch()
83-
.value(self.config.l1_bridge_value);
84-
match call.send().await {
85-
Ok(pending) => {
86-
let receipt = pending.get_receipt().await?;
87-
let tx_hash = receipt.transaction_hash;
88-
let balance = self.client.get_balance().await?;
158+
async fn publish_balance_metric(&self) {
159+
match self.l1_client.get_balance().await {
160+
Ok(balance) => {
89161
metrics::get().l1.eth.set_funds(balance);
90-
91-
let balance = format_ether(balance);
92-
info!("Emission minted and bridged on tx {tx_hash}, have {balance} ETH left");
93-
Ok(())
94162
}
95163
Err(e) => {
96-
bail!("Failed to mint tokens: {e}")
164+
error!("Failed to fetch balance: {e}");
97165
}
98-
}
166+
};
99167
}
100168
}

keeper/src/l2/mod.rs

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -36,26 +36,18 @@ struct RoundInfoView {
3636

3737
#[derive(Debug, Clone)]
3838
struct RewardPolicyCache {
39-
token_address: Option<Address>,
40-
token_decimals: Option<u8>,
4139
last_checked_at: Option<u64>,
4240
last_budget: Option<U256>,
4341
last_remaining: Option<U256>,
44-
last_accounted: Option<U256>,
45-
last_balance: Option<U256>,
4642
last_sync_attempt_at: Option<u64>,
4743
}
4844

4945
impl RewardPolicyCache {
5046
fn new() -> Self {
5147
Self {
52-
token_address: None,
53-
token_decimals: None,
5448
last_checked_at: None,
5549
last_budget: None,
5650
last_remaining: None,
57-
last_accounted: None,
58-
last_balance: None,
5951
last_sync_attempt_at: None,
6052
}
6153
}

0 commit comments

Comments
 (0)