diff --git a/.github/workflows/metrics-contract.yml b/.github/workflows/metrics-contract.yml new file mode 100644 index 0000000000..01c2a34bf7 --- /dev/null +++ b/.github/workflows/metrics-contract.yml @@ -0,0 +1,49 @@ +name: metrics-contract + +permissions: + contents: read + +on: + workflow_dispatch: + push: + branches: [main] + paths: + - "forester/metrics-contract.json" + - "forester/src/metrics.rs" + - "forester/tests/metrics_contract_test.rs" + - ".github/workflows/metrics-contract.yml" + pull_request: + branches: [main] + paths: + - "forester/metrics-contract.json" + - "forester/src/metrics.rs" + - "forester/tests/metrics_contract_test.rs" + - ".github/workflows/metrics-contract.yml" + types: [opened, synchronize, reopened, ready_for_review] + +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true + +jobs: + check: + name: Verify metrics contract + runs-on: ubuntu-latest + timeout-minutes: 20 + steps: + - uses: actions/checkout@v6 + with: + submodules: true + + - name: Install system dependencies + run: | + sudo apt-get update + sudo apt-get install -y libudev-dev pkg-config build-essential protobuf-compiler + + - name: Setup Rust toolchain + uses: actions-rust-lang/setup-rust-toolchain@v1 + with: + cache-workspaces: . + + - name: Run metrics contract test + run: cargo test --test metrics_contract_test -p forester diff --git a/external/photon b/external/photon index a252934447..01cae51ca5 160000 --- a/external/photon +++ b/external/photon @@ -1 +1 @@ -Subproject commit a2529344477d889b6c23c121f915a91f3678d861 +Subproject commit 01cae51ca58935eb46571094ce7abd0ea252c344 diff --git a/forester-utils/src/rpc_pool.rs b/forester-utils/src/rpc_pool.rs index 145f4942ca..f5a8d07625 100644 --- a/forester-utils/src/rpc_pool.rs +++ b/forester-utils/src/rpc_pool.rs @@ -1,4 +1,11 @@ -use std::{cmp::min, time::Duration}; +use std::{ + cmp::min, + sync::{ + atomic::{AtomicBool, AtomicU64, Ordering}, + Arc, + }, + time::Duration, +}; use async_trait::async_trait; use bb8::{Pool, PooledConnection}; @@ -6,7 +13,7 @@ use light_client::rpc::{LightClientConfig, Rpc, RpcError}; use solana_sdk::commitment_config::CommitmentConfig; use thiserror::Error; use tokio::time::sleep; -use tracing::{error, trace, warn}; +use tracing::{error, info, trace, warn}; use crate::rate_limiter::RateLimiter; @@ -24,10 +31,74 @@ pub enum PoolError { BuilderMissingField(String), } +/// Shared health state for tracking primary RPC health across all pooled connections. +/// When consecutive `is_valid()` failures cross the threshold, the pool flips to +/// "fallback mode" — new connections try the fallback URL first, and stale primary +/// connections are eagerly dropped by `has_broken()`. +pub struct HealthState { + use_fallback: AtomicBool, + consecutive_failures: AtomicU64, + failure_threshold: u64, + primary_url: String, +} + +impl HealthState { + pub fn new(failure_threshold: u64, primary_url: String) -> Self { + Self { + use_fallback: AtomicBool::new(false), + consecutive_failures: AtomicU64::new(0), + failure_threshold, + primary_url, + } + } + + pub fn record_failure(&self) { + let prev = self.consecutive_failures.fetch_add(1, Ordering::Relaxed); + if prev + 1 >= self.failure_threshold && !self.use_fallback.swap(true, Ordering::Release) { + warn!( + "Primary RPC health check failed {} consecutive times, switching to fallback mode", + prev + 1 + ); + } + } + + pub fn record_success(&self) { + self.consecutive_failures.store(0, Ordering::Relaxed); + } + + pub fn is_fallback_active(&self) -> bool { + self.use_fallback.load(Ordering::Acquire) + } + + pub fn recover_primary(&self) { + if self.use_fallback.swap(false, Ordering::Release) { + self.consecutive_failures.store(0, Ordering::Relaxed); + info!("Primary RPC recovered, switching back from fallback mode"); + } + } +} + +impl std::fmt::Debug for HealthState { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("HealthState") + .field("use_fallback", &self.use_fallback.load(Ordering::Relaxed)) + .field( + "consecutive_failures", + &self.consecutive_failures.load(Ordering::Relaxed), + ) + .field("failure_threshold", &self.failure_threshold) + .finish() + } +} + pub struct SolanaConnectionManager { url: String, photon_url: Option, + fallback_rpc_url: Option, + fallback_photon_url: Option, commitment: CommitmentConfig, + health_state: Arc, + has_fallback: bool, // TODO: implement Rpc for SolanaConnectionManager and rate limit requests. _rpc_rate_limiter: Option, _send_tx_rate_limiter: Option, @@ -35,17 +106,26 @@ pub struct SolanaConnectionManager { } impl SolanaConnectionManager { + #[allow(clippy::too_many_arguments)] pub fn new( url: String, photon_url: Option, + fallback_rpc_url: Option, + fallback_photon_url: Option, commitment: CommitmentConfig, rpc_rate_limiter: Option, send_tx_rate_limiter: Option, + health_state: Arc, ) -> Self { + let has_fallback = fallback_rpc_url.is_some(); Self { url, photon_url, + fallback_rpc_url, + fallback_photon_url, commitment, + health_state, + has_fallback, _rpc_rate_limiter: rpc_rate_limiter, _send_tx_rate_limiter: send_tx_rate_limiter, _phantom: std::marker::PhantomData, @@ -59,22 +139,89 @@ impl bb8::ManageConnection for SolanaConnectionManager { type Error = PoolError; async fn connect(&self) -> Result { + // When in fallback mode, try fallback URL first; otherwise try primary first. + let (first_url, first_photon, second_url, second_photon) = + if self.health_state.is_fallback_active() { + ( + self.fallback_rpc_url.as_deref().unwrap_or(&self.url), + self.fallback_photon_url + .clone() + .or_else(|| self.photon_url.clone()), + Some(self.url.as_str()), + self.photon_url.clone(), + ) + } else { + ( + self.url.as_str(), + self.photon_url.clone(), + self.fallback_rpc_url.as_deref(), + self.fallback_photon_url + .clone() + .or_else(|| self.photon_url.clone()), + ) + }; + let config = LightClientConfig { - url: self.url.to_string(), - photon_url: self.photon_url.clone(), + url: first_url.to_string(), + photon_url: first_photon, commitment_config: Some(self.commitment), fetch_active_tree: false, }; - Ok(R::new(config).await?) + match R::new(config).await { + Ok(conn) => Ok(conn), + Err(first_err) => { + if let Some(second) = second_url { + warn!( + "RPC {} failed ({}), trying alternate: {}", + first_url, first_err, second + ); + let fallback_config = LightClientConfig { + url: second.to_string(), + photon_url: second_photon, + commitment_config: Some(self.commitment), + fetch_active_tree: false, + }; + R::new(fallback_config).await.map_err(|second_err| { + error!( + "Both RPC endpoints failed: first={}, second={}", + first_err, second_err + ); + PoolError::ClientCreation(format!( + "first: {}, second: {}", + first_err, second_err + )) + }) + } else { + Err(PoolError::ClientCreation(first_err.to_string())) + } + } + } } async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> { - conn.health().await.map_err(PoolError::RpcRequest) + conn.health().await.map_err(|e| { + // Only track failures for fallback switching when a fallback URL exists. + if self.has_fallback { + self.health_state.record_failure(); + } + PoolError::RpcRequest(e) + })?; + // Reset consecutive failure count on success. Note: this does NOT reset + // use_fallback — that is handled by the recovery probe to avoid flapping. + if self.has_fallback { + self.health_state.record_success(); + } + Ok(()) } - fn has_broken(&self, _conn: &mut Self::Connection) -> bool { - false + fn has_broken(&self, conn: &mut Self::Connection) -> bool { + if !self.has_fallback || !self.health_state.is_fallback_active() { + return false; + } + // In fallback mode: connections still pointing to the primary URL are stale. + // Tell bb8 to drop them so new connections go through connect() → fallback. + conn.get_url() == self.health_state.primary_url } } @@ -84,12 +231,20 @@ pub struct SolanaRpcPool { max_retries: u32, initial_retry_delay: Duration, max_retry_delay: Duration, + health_state: Arc, + has_fallback: bool, + primary_url: String, + primary_photon_url: Option, + commitment: CommitmentConfig, + primary_probe_interval: Duration, } #[derive(Debug)] pub struct SolanaRpcPoolBuilder { url: Option, photon_url: Option, + fallback_rpc_url: Option, + fallback_photon_url: Option, commitment: Option, max_size: u32, @@ -98,6 +253,8 @@ pub struct SolanaRpcPoolBuilder { max_retries: u32, initial_retry_delay_ms: u64, max_retry_delay_ms: u64, + failure_threshold: u64, + primary_probe_interval_secs: u64, rpc_rate_limiter: Option, send_tx_rate_limiter: Option, @@ -115,6 +272,8 @@ impl SolanaRpcPoolBuilder { Self { url: None, photon_url: None, + fallback_rpc_url: None, + fallback_photon_url: None, commitment: None, max_size: 50, connection_timeout_secs: 15, @@ -122,6 +281,8 @@ impl SolanaRpcPoolBuilder { max_retries: 3, initial_retry_delay_ms: 1000, max_retry_delay_ms: 16000, + failure_threshold: 3, + primary_probe_interval_secs: 30, rpc_rate_limiter: None, send_tx_rate_limiter: None, _phantom: std::marker::PhantomData, @@ -138,6 +299,16 @@ impl SolanaRpcPoolBuilder { self } + pub fn fallback_rpc_url(mut self, url: Option) -> Self { + self.fallback_rpc_url = url; + self + } + + pub fn fallback_photon_url(mut self, url: Option) -> Self { + self.fallback_photon_url = url; + self + } + pub fn commitment(mut self, commitment: CommitmentConfig) -> Self { self.commitment = Some(commitment); self @@ -183,6 +354,16 @@ impl SolanaRpcPoolBuilder { self } + pub fn failure_threshold(mut self, threshold: u64) -> Self { + self.failure_threshold = threshold; + self + } + + pub fn primary_probe_interval_secs(mut self, secs: u64) -> Self { + self.primary_probe_interval_secs = secs; + self + } + pub async fn build(self) -> Result, PoolError> { let url = self .url @@ -191,12 +372,18 @@ impl SolanaRpcPoolBuilder { .commitment .ok_or_else(|| PoolError::BuilderMissingField("commitment".to_string()))?; + let has_fallback = self.fallback_rpc_url.is_some(); + let health_state = Arc::new(HealthState::new(self.failure_threshold, url.clone())); + let manager = SolanaConnectionManager::new( - url, - self.photon_url, + url.clone(), + self.photon_url.clone(), + self.fallback_rpc_url, + self.fallback_photon_url, commitment, self.rpc_rate_limiter, self.send_tx_rate_limiter, + Arc::clone(&health_state), ); let pool = Pool::builder() @@ -212,11 +399,64 @@ impl SolanaRpcPoolBuilder { max_retries: self.max_retries, initial_retry_delay: Duration::from_millis(self.initial_retry_delay_ms), max_retry_delay: Duration::from_millis(self.max_retry_delay_ms), + health_state, + has_fallback, + primary_url: url, + primary_photon_url: self.photon_url, + commitment, + primary_probe_interval: Duration::from_secs(self.primary_probe_interval_secs), }) } } impl SolanaRpcPool { + /// Spawns a background task that periodically probes the primary RPC URL + /// when in fallback mode. When the primary becomes healthy again, switches + /// back automatically. Returns None if no fallback URL is configured + /// (fallback mode can never activate). + pub fn spawn_primary_recovery_probe(self: &Arc) -> Option> { + // Only meaningful if a fallback URL is configured; without one, the + // health state never flips to fallback mode so there is nothing to recover from. + if !self.has_fallback { + return None; + } + let health_state = Arc::clone(&self.health_state); + + let primary_url = self.primary_url.clone(); + let primary_photon_url = self.primary_photon_url.clone(); + let commitment = self.commitment; + let interval = self.primary_probe_interval; + + Some(tokio::spawn(async move { + loop { + sleep(interval).await; + if !health_state.is_fallback_active() { + continue; + } + // Try connecting to the primary URL. + let config = LightClientConfig { + url: primary_url.clone(), + photon_url: primary_photon_url.clone(), + commitment_config: Some(commitment), + fetch_active_tree: false, + }; + match R::new(config).await { + Ok(conn) => match conn.health().await { + Ok(()) => { + health_state.recover_primary(); + } + Err(e) => { + trace!("Primary RPC probe health check failed: {}", e); + } + }, + Err(e) => { + trace!("Primary RPC probe connection failed: {}", e); + } + } + } + })) + } + pub async fn get_connection( &self, ) -> Result>, PoolError> { diff --git a/forester/.env.example b/forester/.env.example index 81c1197919..aedeaeba16 100644 --- a/forester/.env.example +++ b/forester/.env.example @@ -22,6 +22,12 @@ export PAGERDUTY_ROUTING_KEY="your-pagerduty-key-here" export PAYER='[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57,58,59,60,61,62,63,64]' export DERIVATION_PUBKEY='[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32]' +# Fallback RPC (automatic failover if primary goes down) +# export FALLBACK_RPC_URL="https://api.devnet.solana.com" +# export FALLBACK_INDEXER_URL="http://localhost:8785" +# export RPC_POOL_FAILURE_THRESHOLD=3 # Health check failures before switching (default: 3) +# export RPC_POOL_PRIMARY_PROBE_INTERVAL_SECS=30 # Seconds between primary recovery probes (default: 30) + # Performance Tuning - RPC Pool export RPC_POOL_SIZE=100 export RPC_POOL_CONNECTION_TIMEOUT_SECS=15 # Connection timeout (default: 15) @@ -52,4 +58,4 @@ export SLOT_UPDATE_INTERVAL_SECONDS=10 # Slot update interval (default: 1 export TREE_DISCOVERY_INTERVAL_SECONDS=1 # Tree discovery interval (default: 1) # Processor Mode -export PROCESSOR_MODE=v1 # Options: v1, v2, mixed (default: mixed) +export PROCESSOR_MODE="all" # Options: v1, v2, all (default: all) diff --git a/forester/.gitignore b/forester/.gitignore index 71ed41a6c3..3b35c33407 100644 --- a/forester/.gitignore +++ b/forester/.gitignore @@ -6,5 +6,6 @@ logs .env.mainnet *.json !package.json +!metrics-contract.json spawn.sh spawn_devnet.sh diff --git a/forester/README.md b/forester/README.md index 9fdf82d14e..b0c6d0c458 100644 --- a/forester/README.md +++ b/forester/README.md @@ -2,9 +2,7 @@ ## Description -Forester is a service for nullifying the state and address merkle trees. -It subscribes to the nullifier queue and nullifies merkle tree leaves. - +Forester is a service that processes queued Merkle tree updates for Light Protocol's ZK Compression system on Solana. It reads pending nullifications and address insertions from queue accounts and submits batched transactions with ZK proofs to update the on-chain Merkle trees. ## Quick Start @@ -17,9 +15,9 @@ cp .env.example .env 3. Start the forester: ```bash -cargo run start +cargo run -- start # or with environment file -source .env && cargo run start +source .env && cargo run -- start ``` ## Commands @@ -28,179 +26,172 @@ source .env && cargo run start forester ``` -Available commands: -- `start` - Start the Forester service -- `status` - Check the status of various components -- `health` - Perform health checks on the system -- `help` - Print help information +| Command | Description | +|---------|-------------| +| `start` | Start the Forester service | +| `status` | Check queue and protocol status | +| `health` | Run health checks (balance, registration) | +| `dashboard` | Run a standalone API server (no processing) | ## Configuration -All configuration can be provided via command-line arguments or environment variables. Environment variables take the format `FORESTER_`. - -### Required Configuration - -| Option | Environment Variable | Description | -|--------|---------------------|-------------| -| `--rpc-url` | `FORESTER_RPC_URL` | Solana RPC endpoint URL | -| `--ws-rpc-url` | `FORESTER_WS_RPC_URL` | WebSocket RPC endpoint URL | -| `--indexer-url` | `FORESTER_INDEXER_URL` | Photon indexer URL | -| `--prover-url` | `FORESTER_PROVER_URL` | Light Protocol prover service URL | -| `--payer` | `FORESTER_PAYER` | Keypair for transaction signing (JSON array format) | -| `--derivation` | `FORESTER_DERIVATION_PUBKEY` | Derivation public key (JSON array format) | - -### Performance Configuration +All configuration is provided via CLI arguments or environment variables. There is **no prefix** on env vars (e.g. `RPC_URL`, not `FORESTER_RPC_URL`). -#### RPC Pool Settings -Optimize connection pooling for better throughput: +### Required -| Option | Default | Description | +| Option | Env Var | Description | |--------|---------|-------------| -| `--rpc-pool-size` | 10 | Number of RPC connections to maintain | -| `--rpc-pool-connection-timeout-secs` | 15 | Connection timeout in seconds | -| `--rpc-pool-idle-timeout-secs` | 300 | Idle connection timeout | +| `--rpc-url` | `RPC_URL` | Solana RPC endpoint | +| `--indexer-url` | `INDEXER_URL` | Photon indexer URL (supports `?api-key=KEY`) | +| `--payer` | `PAYER` | Keypair for signing (JSON byte array) | +| `--derivation` | `DERIVATION_PUBKEY` | Derivation pubkey (JSON byte array, 32 bytes) | -#### Transaction V1 Processing -Control transaction batching and concurrency: +### Optional Services -| Option | Default | Description | +| Option | Env Var | Description | |--------|---------|-------------| -| `--max-concurrent-sends` | 50 | Maximum concurrent transaction sends | -| `--legacy-ixs-per-tx` | 1 | Instructions per transaction (max 1 for address nullification) | -| `--transaction-max-concurrent-batches` | 20 | Maximum concurrent transaction batches | -| `--cu-limit` | 1000000 | Compute unit limit per transaction | -| `--enable-priority-fees` | false | Enable dynamic priority fee calculation | -| `--enable-compressible` | false | Enable compressible account tracking and compression (requires `--ws-rpc-url`) | +| `--ws-rpc-url` | `WS_RPC_URL` | WebSocket RPC (required for `--enable-compressible`) | +| `--prover-url` | `PROVER_URL` | ZK prover service URL | +| `--prover-append-url` | `PROVER_APPEND_URL` | Prover URL for append ops (falls back to `--prover-url`) | +| `--prover-update-url` | `PROVER_UPDATE_URL` | Prover URL for update ops (falls back to `--prover-url`) | +| `--prover-address-append-url` | `PROVER_ADDRESS_APPEND_URL` | Prover URL for address-append ops (falls back to `--prover-url`) | +| `--prover-api-key` | `PROVER_API_KEY` | API key for the prover service | +| `--photon-grpc-url` | `PHOTON_GRPC_URL` | Photon gRPC endpoint | + +### Resilience & Fallback + +When a fallback RPC URL is configured, the pool automatically switches to it if the primary becomes unhealthy, and switches back when the primary recovers. + +| Option | Env Var | Default | Description | +|--------|---------|---------|-------------| +| `--fallback-rpc-url` | `FALLBACK_RPC_URL` | | Fallback Solana RPC endpoint | +| `--fallback-indexer-url` | `FALLBACK_INDEXER_URL` | | Fallback Photon indexer URL | +| `--rpc-pool-failure-threshold` | `RPC_POOL_FAILURE_THRESHOLD` | 3 | Consecutive health check failures before switching to fallback | +| `--rpc-pool-primary-probe-interval-secs` | `RPC_POOL_PRIMARY_PROBE_INTERVAL_SECS` | 30 | Seconds between probes to check if primary has recovered | + +**How it works:** +1. The connection pool validates each connection via Solana `getHealth` before use +2. After `failure_threshold` consecutive failures, all new connections route to the fallback URL +3. Existing primary connections are eagerly dropped so they get replaced with fallback connections +4. A background probe checks the primary every `primary_probe_interval_secs` seconds and auto-recovers + +### RPC Pool + +| Option | Env Var | Default | Description | +|--------|---------|---------|-------------| +| `--rpc-pool-size` | `RPC_POOL_SIZE` | 100 | Number of pooled RPC connections | +| `--rpc-pool-connection-timeout-secs` | `RPC_POOL_CONNECTION_TIMEOUT_SECS` | 15 | Connection timeout | +| `--rpc-pool-idle-timeout-secs` | `RPC_POOL_IDLE_TIMEOUT_SECS` | 300 | Idle connection timeout | +| `--rpc-pool-max-retries` | `RPC_POOL_MAX_RETRIES` | 100 | Max retries to get a connection from the pool | +| `--rpc-pool-initial-retry-delay-ms` | `RPC_POOL_INITIAL_RETRY_DELAY_MS` | 1000 | Initial backoff delay | +| `--rpc-pool-max-retry-delay-ms` | `RPC_POOL_MAX_RETRY_DELAY_MS` | 16000 | Max backoff delay | + +### Processing + +| Option | Env Var | Default | Description | +|--------|---------|---------|-------------| +| `--processor-mode` | `PROCESSOR_MODE` | `all` | `v1`, `v2`, or `all` | +| `--queue-polling-mode` | `QUEUE_POLLING_MODE` | `indexer` | `indexer` or `onchain` | +| `--tree-id` | `TREE_IDS` | | Process only these tree pubkeys (comma-separated) | +| `--group-authority` | `GROUP_AUTHORITY` | | Only process trees owned by this authority | +| `--max-concurrent-sends` | `MAX_CONCURRENT_SENDS` | 50 | Concurrent transaction sends per batch | +| `--transaction-max-concurrent-batches` | `TRANSACTION_MAX_CONCURRENT_BATCHES` | 20 | Concurrent transaction batches | +| `--max-batches-per-tree` | `MAX_BATCHES_PER_TREE` | 4 | Max ZKP batches per tree per iteration | +| `--legacy-ixs-per-tx` | `LEGACY_IXS_PER_TX` | 1 | Instructions per V1 transaction | +| `--cu-limit` | `CU_LIMIT` | 1000000 | Compute unit limit per transaction | +| `--enable-priority-fees` | `ENABLE_PRIORITY_FEES` | false | Enable dynamic priority fees | +| `--lookup-table-address` | `LOOKUP_TABLE_ADDRESS` | | Address lookup table for versioned transactions | +| `--helius-rpc` | `HELIUS_RPC` | false | Use Helius `getProgramAccountsV2` | + +### Compressible Accounts + +| Option | Env Var | Default | Description | +|--------|---------|---------|-------------| +| `--enable-compressible` | `ENABLE_COMPRESSIBLE` | false | Enable compressible account tracking (requires `--ws-rpc-url`) | +| `--light-pda-program` | `LIGHT_PDA_PROGRAMS` | | PDA programs to track (`program_id:discriminator_base58`, comma-separated) | + +### Caching & Confirmation + +| Option | Env Var | Default | Description | +|--------|---------|---------|-------------| +| `--tx-cache-ttl-seconds` | `TX_CACHE_TTL_SECONDS` | 180 | Transaction deduplication cache TTL | +| `--ops-cache-ttl-seconds` | `OPS_CACHE_TTL_SECONDS` | 180 | Operations cache TTL | +| `--confirmation-max-attempts` | `CONFIRMATION_MAX_ATTEMPTS` | 60 | Max tx confirmation polling attempts | +| `--confirmation-poll-interval-ms` | `CONFIRMATION_POLL_INTERVAL_MS` | 500 | Confirmation polling interval | + +### Monitoring + +| Option | Env Var | Description | +|--------|---------|-------------| +| `--push-gateway-url` | `PUSH_GATEWAY_URL` | Prometheus Pushgateway URL (enables metrics) | +| `--pagerduty-routing-key` | `PAGERDUTY_ROUTING_KEY` | PagerDuty integration key | +| `--prometheus-url` | `PROMETHEUS_URL` | Prometheus server URL for dashboard queries | +| `--api-server-port` | `API_SERVER_PORT` | HTTP API server port (default: 8080) | +| `--api-server-public-bind` | `API_SERVER_PUBLIC_BIND` | Bind to 0.0.0.0 instead of 127.0.0.1 | -#### Example +### Example ```bash -cargo run start \ +cargo run -- start \ --rpc-url "$RPC_URL" \ - --ws-rpc-url "$WS_RPC_URL" \ --indexer-url "$INDEXER_URL" \ --prover-url "$PROVER_URL" \ - --payer "$FORESTER_KEYPAIR" \ - --derivation "$FORESTER_DERIVATION" \ + --payer "$PAYER" \ + --derivation "$DERIVATION_PUBKEY" \ + --fallback-rpc-url "$FALLBACK_RPC_URL" \ + --fallback-indexer-url "$FALLBACK_INDEXER_URL" \ --rpc-pool-size 100 \ - --max-concurrent-sends 500 \ + --processor-mode v2 \ + --max-concurrent-sends 200 \ --cu-limit 400000 \ --enable-priority-fees true ``` - -### Prover V2 Endpoints - -```bash ---prover-append-url "http://prover/append" ---prover-update-url "http://prover/update" ---prover-address-append-url "http://prover/address-append" ---prover-api-key "your-api-key" -``` - -#### Cache Settings -Control caching behavior: +## Status & Health ```bash ---tx-cache-ttl-seconds 180 # Transaction deduplication cache ---ops-cache-ttl-seconds 180 # Operations cache -``` - -### Environment File +# Check queue and protocol status +forester status --rpc-url [--full] [--protocol-config] [--queue] -See `.env.example` for a complete list of configuration options with example values. - -## Checking Status - -To check the status of Forester: - -```bash -forester status [OPTIONS] --rpc-url +# Health checks +forester health --rpc-url --payer --derivation \ + [--check-balance] [--check-registration] [--min-balance 0.01] ``` -### Status Options: +## Dashboard -- `--full` - Run comprehensive status checks including compressed token program tests -- `--protocol-config` - Check protocol configuration -- `--queue` - Check queue status -- `--push-gateway-url` - Monitoring push gateway URL [env: FORESTER_PUSH_GATEWAY_URL] -- `--pagerduty-routing-key` - PagerDuty integration key [env: FORESTER_PAGERDUTY_ROUTING_KEY] - -## Environment Variables - -All configuration options can be set using environment variables with the `FORESTER_` prefix. For example: +Run a standalone API server without forester processing: ```bash -export FORESTER_RPC_URL="your-rpc-url-here" +forester dashboard --rpc-url \ + [--port 8080] \ + [--prometheus-url http://prometheus:9090] \ + [--forester-api-url http://forester-a:8080,http://forester-b:8080] ``` -### Test Environment Variables - -The following environment variables are used for running the e2e_v2 tests: - -#### Test Mode - -- `TEST_MODE` - Specifies whether to run tests on local validator or devnet (values: `local` or `devnet`, default: `devnet`) - -#### Test Feature Flags - -Control which test scenarios to run (all default to `true`): +## Environment File -- `TEST_V1_STATE` - Enable/disable V1 state tree testing (`true`/`false`) -- `TEST_V2_STATE` - Enable/disable V2 state tree testing (`true`/`false`) -- `TEST_V1_ADDRESS` - Enable/disable V1 address tree testing (`true`/`false`) -- `TEST_V2_ADDRESS` - Enable/disable V2 address tree testing (`true`/`false`) +See `.env.example` for a complete example configuration. -#### Required for Devnet mode: +## Testing -- `PHOTON_RPC_URL` - Photon RPC endpoint URL -- `PHOTON_WSS_RPC_URL` - Photon WebSocket RPC endpoint URL -- `PHOTON_INDEXER_URL` - Photon indexer endpoint URL -- `PHOTON_PROVER_URL` - Photon prover endpoint URL -#### Required for both modes: +See the [main CLAUDE.md](../CLAUDE.md) for test commands. Key forester-specific tests: -- `FORESTER_KEYPAIR` - Keypair for testing (supports both base58 format and byte array format like `[1,2,3,...]`) - -#### Example configurations: - -**Local validator mode with all tests:** -```bash -export TEST_MODE="local" -export FORESTER_KEYPAIR="your-base58-encoded-keypair" -# OR using byte array format: -# export FORESTER_KEYPAIR="[1,2,3,...]" -``` - -**Local validator mode with only V1 tests:** ```bash -export TEST_MODE="local" -export TEST_V1_STATE="true" -export TEST_V2_STATE="false" -export TEST_V1_ADDRESS="true" -export TEST_V2_ADDRESS="false" -export FORESTER_KEYPAIR="your-base58-encoded-keypair" -``` +# E2E test (requires local validator) +TEST_MODE=local cargo test --package forester e2e_test -- --nocapture -**Devnet mode with only V2 tests:** -```bash -export TEST_MODE="devnet" -export TEST_V1_STATE="false" -export TEST_V2_STATE="true" -export TEST_V1_ADDRESS="false" -export TEST_V2_ADDRESS="true" -export PHOTON_RPC_URL="https://devnet.helius-rpc.com/?api-key=your-key" -export PHOTON_WSS_RPC_URL="wss://devnet.helius-rpc.com/?api-key=your-key" -export PHOTON_INDEXER_URL="https://devnet.helius-rpc.com?api-key=your-key" -export PHOTON_PROVER_URL="https://devnet.helius-rpc.com" -export FORESTER_KEYPAIR="your-base58-encoded-keypair" +# Metrics contract test +cargo test -p forester --test metrics_contract_test -- --nocapture ``` -When running in local mode, the test will: -- Spawn a local validator -- Start a local prover service -- Use predefined local URLs (localhost:8899 for RPC, localhost:8784 for indexer, etc.) +### Test Environment Variables -The test will automatically: -- Skip minting tokens for disabled test types -- Skip executing transactions for disabled test types -- Skip root verification for disabled test types +| Variable | Description | +|----------|-------------| +| `TEST_MODE` | `local` or `devnet` (default: `devnet`) | +| `TEST_V1_STATE` | Enable V1 state tree tests (default: `true`) | +| `TEST_V2_STATE` | Enable V2 state tree tests (default: `true`) | +| `TEST_V1_ADDRESS` | Enable V1 address tree tests (default: `true`) | +| `TEST_V2_ADDRESS` | Enable V2 address tree tests (default: `true`) | +| `FORESTER_KEYPAIR` | Test keypair (base58 or byte array format) | diff --git a/forester/dashboard/.gitignore b/forester/dashboard/.gitignore index ded1a3f3a0..7f0f8a4dde 100644 --- a/forester/dashboard/.gitignore +++ b/forester/dashboard/.gitignore @@ -2,3 +2,4 @@ node_modules/ .next/ out/ .env.local +*.tsbuildinfo diff --git a/forester/dashboard/src/app/page.tsx b/forester/dashboard/src/app/page.tsx index a34384f919..798fcabe4f 100644 --- a/forester/dashboard/src/app/page.tsx +++ b/forester/dashboard/src/app/page.tsx @@ -66,6 +66,7 @@ export default function Dashboard() { warnings.push("No foresters registered for the active epoch"); } if ( + status.registration_is_open && status.slots_until_next_registration < 1000 && status.registration_epoch_foresters.length === 0 ) { diff --git a/forester/dashboard/src/components/EpochCard.tsx b/forester/dashboard/src/components/EpochCard.tsx index 58fd848583..e67e74dcec 100644 --- a/forester/dashboard/src/components/EpochCard.tsx +++ b/forester/dashboard/src/components/EpochCard.tsx @@ -42,6 +42,7 @@ function useCountdown(slots: number): string { export function EpochCard({ status }: EpochCardProps) { const registrationClosingSoon = + status.registration_is_open && status.slots_until_next_registration < 5000 && status.registration_epoch_foresters.length === 0; @@ -73,7 +74,7 @@ export function EpochCard({ status }: EpochCardProps) {
- + Registration Epoch {status.current_registration_epoch} @@ -81,9 +82,15 @@ export function EpochCard({ status }: EpochCardProps) {
- {status.registration_epoch_foresters.length} forester{status.registration_epoch_foresters.length !== 1 ? "s" : ""} registered - {registrationClosingSoon && ( - — closing soon, no foresters! + {status.registration_is_open ? ( + <> + {status.registration_epoch_foresters.length} forester{status.registration_epoch_foresters.length !== 1 ? "s" : ""} registered + {registrationClosingSoon && ( + — closing soon, no foresters! + )} + + ) : ( + Registration opens in {regCountdown} )}
diff --git a/forester/dashboard/src/components/TreeTable.tsx b/forester/dashboard/src/components/TreeTable.tsx index 2a82faaa8f..0e2d7cf006 100644 --- a/forester/dashboard/src/components/TreeTable.tsx +++ b/forester/dashboard/src/components/TreeTable.tsx @@ -124,7 +124,7 @@ export function TreeTable({ Address Fullness Index / Cap - Pending + Queue Forester Schedule @@ -177,22 +177,21 @@ export function TreeTable({ {tree.v2_queue_info ? ( - - {tree.tree_type === "StateV2" ? ( - <> - I:{tree.v2_queue_info.input_pending_batches}{" "} - O:{tree.v2_queue_info.output_pending_batches} - - ) : ( - tree.v2_queue_info.input_pending_batches - )} - + ) : ( - - {tree.queue_length != null - ? formatNumber(tree.queue_length) - : "-"} - +
+ + {tree.queue_length != null + ? formatNumber(tree.queue_length) + : "-"} + + {tree.queue_length != null && tree.queue_capacity != null && tree.queue_capacity > 0 && ( + + / {formatNumber(tree.queue_capacity)} + {" "}({formatPercentage(tree.queue_length / tree.queue_capacity * 100)}) + + )} +
)} @@ -241,6 +240,48 @@ export function TreeTable({ import { Fragment } from "react"; +function V2QueueCell({ tree }: { tree: TreeStatus }) { + const info = tree.v2_queue_info!; + const isState = tree.tree_type === "StateV2"; + + if (isState) { + return ( +
+ + I:{info.input_pending_batches} + + {info.input_total_zkp_batches > 0 && ( + + /{info.input_total_zkp_batches} + + )} + {" "} + + O:{info.output_pending_batches} + + {info.output_total_zkp_batches > 0 && ( + + /{info.output_total_zkp_batches} + + )} +
+ ); + } + + return ( +
+ + {info.input_pending_batches} + + {info.input_total_zkp_batches > 0 && ( + + /{info.input_total_zkp_batches} + + )} +
+ ); +} + // Distinct colors for up to 8 foresters; cycles if more const FORESTER_COLORS = [ "bg-emerald-400", diff --git a/forester/dashboard/src/types/forester.ts b/forester/dashboard/src/types/forester.ts index fc094aec26..0c929391be 100644 --- a/forester/dashboard/src/types/forester.ts +++ b/forester/dashboard/src/types/forester.ts @@ -15,10 +15,13 @@ export interface BatchInfo { export interface V2QueueInfo { next_index: number; pending_batch_index: number; + batch_size: number; zkp_batch_size: number; batches: BatchInfo[]; input_pending_batches: number; output_pending_batches: number; + input_total_zkp_batches: number; + output_total_zkp_batches: number; input_items_in_current_zkp_batch: number; output_items_in_current_zkp_batch: number; } @@ -34,6 +37,7 @@ export interface TreeStatus { threshold: number; is_rolledover: boolean; queue_length: number | null; + queue_capacity: number | null; v2_queue_info: V2QueueInfo | null; assigned_forester: string | null; schedule: (number | null)[]; @@ -59,6 +63,7 @@ export interface ForesterStatus { active_phase_length: number; active_epoch_progress_percentage: number; hours_until_next_epoch: number; + registration_is_open: boolean; slots_until_next_registration: number; hours_until_next_registration: number; active_epoch_foresters: ForesterInfo[]; diff --git a/forester/metrics-contract.json b/forester/metrics-contract.json new file mode 100644 index 0000000000..e8f4e312d9 --- /dev/null +++ b/forester/metrics-contract.json @@ -0,0 +1,18 @@ +{ + "description": "Forester metrics contract — single source of truth for metric names. Both the Rust forester crate and the k8s monitoring repo validate against this file.", + "metrics": [ + { "name": "queue_length", "labels": ["tree_type", "tree_pubkey"] }, + { "name": "queue_capacity", "labels": ["tree_type", "tree_pubkey"] }, + { "name": "forester_last_run_timestamp", "labels": [] }, + { "name": "forester_transactions_processed_total", "labels": ["epoch"] }, + { "name": "forester_transaction_timestamp", "labels": ["epoch"] }, + { "name": "forester_transaction_rate", "labels": ["epoch"] }, + { "name": "forester_sol_balance", "labels": ["pubkey"] }, + { "name": "registered_foresters", "labels": ["epoch", "authority"] }, + { "name": "forester_transactions_failed_total", "labels": ["reason"] }, + { "name": "forester_epoch_detected", "labels": [] }, + { "name": "forester_epoch_registered", "labels": [] }, + { "name": "forester_indexer_response_time_seconds", "labels": ["operation", "tree_type"] }, + { "name": "forester_indexer_proof_count", "labels": ["tree_type", "tree_pubkey", "metric"] } + ] +} diff --git a/forester/src/api_server.rs b/forester/src/api_server.rs index 4ea4b091b4..e1032f9501 100644 --- a/forester/src/api_server.rs +++ b/forester/src/api_server.rs @@ -1173,6 +1173,13 @@ pub fn spawn_api_server(config: ApiServerConfig) -> ApiServerHandle { .and(warp::get()) .map(move || warp::reply::json(&*metrics_rx_clone.borrow())); + // --- Prometheus text metrics route (scrape endpoint) --- + let prometheus_route = warp::path!("metrics").and(warp::get()).and_then(|| async { + crate::metrics::metrics_handler() + .await + .map_err(|_| warp::reject::reject()) + }); + // --- Compressible route (reads latest snapshot from watch channel) --- let compressible_rx_clone = compressible_rx.clone(); let compressible_route = warp::path("compressible") @@ -1182,6 +1189,7 @@ pub fn spawn_api_server(config: ApiServerConfig) -> ApiServerHandle { let routes = health_route .or(status_route) .or(metrics_route) + .or(prometheus_route) .or(compressible_route) .with(cors); diff --git a/forester/src/cli.rs b/forester/src/cli.rs index 41da7cf56b..01e69525ad 100644 --- a/forester/src/cli.rs +++ b/forester/src/cli.rs @@ -171,6 +171,24 @@ pub struct StartArgs { #[arg(long, env = "RPC_POOL_MAX_RETRY_DELAY_MS", default_value = "16000")] pub rpc_pool_max_retry_delay_ms: u64, + #[arg( + long, + env = "RPC_POOL_FAILURE_THRESHOLD", + default_value = "3", + value_parser = clap::value_parser!(u64).range(1..), + help = "Consecutive health check failures before switching to fallback RPC" + )] + pub rpc_pool_failure_threshold: u64, + + #[arg( + long, + env = "RPC_POOL_PRIMARY_PROBE_INTERVAL_SECS", + default_value = "30", + value_parser = clap::value_parser!(u64).range(1..), + help = "Interval in seconds between probes to check if primary RPC has recovered" + )] + pub rpc_pool_primary_probe_interval_secs: u64, + #[arg(long, env = "SLOT_UPDATE_INTERVAL_SECONDS", default_value = "10")] pub slot_update_interval_seconds: u64, @@ -291,6 +309,20 @@ pub struct StartArgs { help = "Prometheus server URL for querying metrics (e.g. http://prometheus:9090)" )] pub prometheus_url: Option, + + #[arg( + long, + env = "FALLBACK_RPC_URL", + help = "Fallback RPC URL, used when primary RPC is unreachable for new pool connections" + )] + pub fallback_rpc_url: Option, + + #[arg( + long, + env = "FALLBACK_INDEXER_URL", + help = "Fallback Photon indexer URL, used when primary indexer is unreachable" + )] + pub fallback_indexer_url: Option, } #[derive(Parser, Clone, Debug)] diff --git a/forester/src/config.rs b/forester/src/config.rs index 42eee8d2ee..dfdc0f569d 100644 --- a/forester/src/config.rs +++ b/forester/src/config.rs @@ -50,6 +50,8 @@ pub struct ExternalServicesConfig { pub rpc_rate_limit: Option, pub photon_rate_limit: Option, pub send_tx_rate_limit: Option, + pub fallback_rpc_url: Option, + pub fallback_indexer_url: Option, } #[derive(Debug, Clone, Copy)] @@ -153,6 +155,8 @@ pub struct RpcPoolConfig { pub max_retries: u32, pub initial_retry_delay_ms: u64, pub max_retry_delay_ms: u64, + pub failure_threshold: u64, + pub primary_probe_interval_secs: u64, } impl Default for QueueConfig { @@ -265,6 +269,8 @@ impl ForesterConfig { rpc_rate_limit: args.rpc_rate_limit, photon_rate_limit: args.photon_rate_limit, send_tx_rate_limit: args.send_tx_rate_limit, + fallback_rpc_url: args.fallback_rpc_url.clone(), + fallback_indexer_url: args.fallback_indexer_url.clone(), }, retry_config: RetryConfig { max_retries: args.max_retries, @@ -342,6 +348,8 @@ impl ForesterConfig { max_retries: args.rpc_pool_max_retries, initial_retry_delay_ms: args.rpc_pool_initial_retry_delay_ms, max_retry_delay_ms: args.rpc_pool_max_retry_delay_ms, + failure_threshold: args.rpc_pool_failure_threshold, + primary_probe_interval_secs: args.rpc_pool_primary_probe_interval_secs, }, registry_pubkey: Pubkey::from_str(®istry_pubkey).map_err(|e| { ConfigError::InvalidArguments { @@ -423,6 +431,8 @@ impl ForesterConfig { rpc_rate_limit: None, photon_rate_limit: None, send_tx_rate_limit: None, + fallback_rpc_url: None, + fallback_indexer_url: None, }, retry_config: RetryConfig::default(), queue_config: QueueConfig::default(), @@ -441,6 +451,8 @@ impl ForesterConfig { max_retries: 10, initial_retry_delay_ms: 1000, max_retry_delay_ms: 16000, + failure_threshold: 3, + primary_probe_interval_secs: 30, }, registry_pubkey: Pubkey::default(), payer_keypair: Keypair::new(), diff --git a/forester/src/epoch_manager.rs b/forester/src/epoch_manager.rs index 6a18e2a059..77a30bdc61 100644 --- a/forester/src/epoch_manager.rs +++ b/forester/src/epoch_manager.rs @@ -54,7 +54,10 @@ use crate::{ ChannelError, ForesterError, InitializationError, RegistrationError, WorkReportError, }, logging::{should_emit_rate_limited_warning, ServiceHeartbeat}, - metrics::{push_metrics, queue_metric_update, update_forester_sol_balance}, + metrics::{ + push_metrics, queue_metric_update, update_epoch_detected, update_epoch_registered, + update_forester_sol_balance, + }, pagerduty::send_pagerduty_alert, processor::{ tx_cache::ProcessedHashCache, @@ -737,158 +740,92 @@ impl EpochManager { event = "epoch_monitor_new_epoch_detected", run_id = %self.run_id, epoch = current_epoch, - "New epoch detected" + "New epoch detected; sending for processing" ); - let phases = get_epoch_phases(&self.protocol_config, current_epoch); - if slot < phases.registration.end { - debug!( - event = "epoch_monitor_send_current_epoch", + if let Err(e) = tx.send(current_epoch).await { + error!( + event = "epoch_monitor_send_current_epoch_failed", run_id = %self.run_id, epoch = current_epoch, - "Sending current epoch for processing" + error = ?e, + "Failed to send current epoch for processing; channel closed" ); - if let Err(e) = tx.send(current_epoch).await { - error!( - event = "epoch_monitor_send_current_epoch_failed", - run_id = %self.run_id, - epoch = current_epoch, - error = ?e, - "Failed to send current epoch for processing; channel closed" - ); - return Err(anyhow!("Epoch channel closed: {}", e)); - } - last_epoch = Some(current_epoch); + return Err(anyhow!("Epoch channel closed: {}", e)); } + last_epoch = Some(current_epoch); } - // Find the next epoch we can register for (scan forward if needed) - let mut target_epoch = current_epoch + 1; + // Find the next epoch to process + let target_epoch = current_epoch + 1; if last_epoch.is_none_or(|last| target_epoch > last) { - // Scan forward to find an epoch whose registration is still open - // This handles the case where we missed multiple epochs - loop { - let target_phases = get_epoch_phases(&self.protocol_config, target_epoch); - - // If registration hasn't started yet, wait for it - if slot < target_phases.registration.start { - let mut rpc = match self.rpc_pool.get_connection().await { - Ok(rpc) => rpc, - Err(e) => { - warn!( - event = "epoch_monitor_wait_rpc_connection_failed", - run_id = %self.run_id, - target_epoch, - error = ?e, - "Failed to get RPC connection while waiting for registration slot" - ); - tokio::time::sleep(Duration::from_secs(1)).await; - break; - } - }; - - const REGISTRATION_BUFFER_SLOTS: u64 = 30; - let wait_target = target_phases - .registration - .start - .saturating_sub(REGISTRATION_BUFFER_SLOTS); - let slots_to_wait = wait_target.saturating_sub(slot); - - debug!( - event = "epoch_monitor_wait_for_registration", - run_id = %self.run_id, - target_epoch, - current_slot = slot, - wait_target_slot = wait_target, - registration_start_slot = target_phases.registration.start, - slots_to_wait, - "Waiting for target epoch registration phase" - ); + let target_phases = get_epoch_phases(&self.protocol_config, target_epoch); - if let Err(e) = - wait_until_slot_reached(&mut *rpc, &self.slot_tracker, wait_target) - .await - { - error!( - event = "epoch_monitor_wait_for_registration_failed", + // If registration hasn't started yet, wait for it + if slot < target_phases.registration.start { + let mut rpc = match self.rpc_pool.get_connection().await { + Ok(rpc) => rpc, + Err(e) => { + warn!( + event = "epoch_monitor_wait_rpc_connection_failed", run_id = %self.run_id, target_epoch, error = ?e, - "Error waiting for registration phase" - ); - break; - } - - let current_slot = self.slot_tracker.estimated_current_slot(); - if current_slot >= target_phases.registration.end { - debug!( - event = "epoch_monitor_registration_ended_while_waiting", - run_id = %self.run_id, - target_epoch, - current_slot, - registration_end_slot = target_phases.registration.end, - "Target epoch registration ended while waiting; trying next epoch" + "Failed to get RPC connection while waiting for registration slot" ); - target_epoch += 1; + tokio::time::sleep(Duration::from_secs(1)).await; continue; } + }; - debug!( - event = "epoch_monitor_send_target_epoch_after_wait", - run_id = %self.run_id, - target_epoch, - current_slot, - registration_end_slot = target_phases.registration.end, - "Target epoch registration phase ready; sending for processing" - ); - if let Err(e) = tx.send(target_epoch).await { - error!( - event = "epoch_monitor_send_target_epoch_failed", - run_id = %self.run_id, - target_epoch, - error = ?e, - "Failed to send target epoch for processing" - ); - break; - } - last_epoch = Some(target_epoch); - break; - } + const REGISTRATION_BUFFER_SLOTS: u64 = 30; + let wait_target = target_phases + .registration + .start + .saturating_sub(REGISTRATION_BUFFER_SLOTS); + let slots_to_wait = wait_target.saturating_sub(slot); - // If we're within the registration window, send it - if slot < target_phases.registration.end { - debug!( - event = "epoch_monitor_send_target_epoch_window_open", + debug!( + event = "epoch_monitor_wait_for_registration", + run_id = %self.run_id, + target_epoch, + current_slot = slot, + wait_target_slot = wait_target, + registration_start_slot = target_phases.registration.start, + slots_to_wait, + "Waiting for target epoch registration phase" + ); + + if let Err(e) = + wait_until_slot_reached(&mut *rpc, &self.slot_tracker, wait_target).await + { + error!( + event = "epoch_monitor_wait_for_registration_failed", run_id = %self.run_id, target_epoch, - slot, - registration_end_slot = target_phases.registration.end, - "Target epoch registration window is open; sending for processing" + error = ?e, + "Error waiting for registration phase" ); - if let Err(e) = tx.send(target_epoch).await { - error!( - event = "epoch_monitor_send_target_epoch_failed", - run_id = %self.run_id, - target_epoch, - error = ?e, - "Failed to send target epoch for processing" - ); - break; - } - last_epoch = Some(target_epoch); - break; + continue; } + } - // Registration already ended, try next epoch - debug!( - event = "epoch_monitor_target_epoch_registration_closed", + debug!( + event = "epoch_monitor_send_target_epoch", + run_id = %self.run_id, + target_epoch, + "Sending target epoch for processing" + ); + if let Err(e) = tx.send(target_epoch).await { + error!( + event = "epoch_monitor_send_target_epoch_failed", run_id = %self.run_id, target_epoch, - slot, - registration_end_slot = target_phases.registration.end, - "Target epoch registration already ended; checking next epoch" + error = ?e, + "Failed to send target epoch for processing; channel closed" ); - target_epoch += 1; + return Err(anyhow!("Epoch channel closed: {}", e)); } + last_epoch = Some(target_epoch); continue; // Re-check state after processing } else { // we've already sent the next epoch, wait a bit before checking again @@ -966,70 +903,17 @@ impl EpochManager { } } - // Only process current epoch if we can still register or are already registered - // If registration has ended and we haven't registered, skip it to avoid errors - if slot < current_phases.registration.end { - debug!( - "Processing current epoch: {} (registration still open)", - current_epoch + // Always process the current epoch (registration is allowed at any time) + debug!("Processing current epoch: {}", current_epoch); + if let Err(e) = tx.send(current_epoch).await { + error!( + event = "initial_epoch_send_current_failed", + run_id = %self.run_id, + epoch = current_epoch, + error = ?e, + "Failed to send current epoch for processing" ); - if let Err(e) = tx.send(current_epoch).await { - error!( - event = "initial_epoch_send_current_failed", - run_id = %self.run_id, - epoch = current_epoch, - error = ?e, - "Failed to send current epoch for processing" - ); - return Ok(()); // Channel closed, exit gracefully - } - } else { - // Check if we're already registered for this epoch - let forester_epoch_pda_pubkey = get_forester_epoch_pda_from_authority( - &self.config.derivation_pubkey, - current_epoch, - ) - .0; - match self.rpc_pool.get_connection().await { - Ok(rpc) => { - if let Ok(Some(_)) = rpc - .get_anchor_account::(&forester_epoch_pda_pubkey) - .await - { - debug!( - "Processing current epoch: {} (already registered)", - current_epoch - ); - if let Err(e) = tx.send(current_epoch).await { - error!( - event = "initial_epoch_send_current_registered_failed", - run_id = %self.run_id, - epoch = current_epoch, - error = ?e, - "Failed to send current epoch for processing" - ); - return Ok(()); // Channel closed, exit gracefully - } - } else { - info!( - event = "skip_current_epoch_registration_closed", - run_id = %self.run_id, - epoch = current_epoch, - registration_end_slot = current_phases.registration.end, - current_slot = slot, - "Skipping current epoch because registration has ended" - ); - } - } - Err(e) => { - warn!( - event = "registration_check_rpc_failed", - run_id = %self.run_id, - error = ?e, - "Failed to get RPC connection to check registration, skipping" - ); - } - } + return Ok(()); // Channel closed, exit gracefully } debug!("Finished processing current and previous epochs"); @@ -1063,6 +947,7 @@ impl EpochManager { }); let phases = get_epoch_phases(&self.protocol_config, epoch); + update_epoch_detected(epoch); // Attempt to recover registration info debug!("Recovering registration info for epoch {}", epoch); @@ -1123,6 +1008,7 @@ impl EpochManager { } }; debug!("Recovered registration info for epoch {}", epoch); + update_epoch_registered(epoch); // Wait for the active phase registration_info = self.wait_for_active_phase(®istration_info).await?; @@ -1190,16 +1076,6 @@ impl EpochManager { let slot = rpc.get_slot().await.map_err(ForesterError::Rpc)?; let phases = get_epoch_phases(&self.protocol_config, epoch); - // Check if it's already too late to register - if slot >= phases.registration.end { - return Err(RegistrationError::RegistrationPhaseEnded { - epoch, - current_slot: slot, - registration_end: phases.registration.end, - } - .into()); - } - if slot < phases.registration.start { let slots_to_wait = phases.registration.start.saturating_sub(slot); info!( @@ -1219,30 +1095,6 @@ impl EpochManager { match self.register_for_epoch(epoch).await { Ok(registration_info) => return Ok(registration_info), Err(e) => { - if let Some(RegistrationError::RegistrationPhaseEnded { - epoch: ended_epoch, - current_slot, - registration_end, - }) = e.downcast_ref::() - { - warn!( - event = "registration_attempt_non_retryable", - run_id = %self.run_id, - epoch, - attempt = attempt + 1, - max_attempts = max_retries, - error = ?e, - "Registration phase ended; stopping retries for this epoch" - ); - return Err(ForesterError::Registration( - RegistrationError::RegistrationPhaseEnded { - epoch: *ended_epoch, - current_slot: *current_slot, - registration_end: *registration_end, - }, - )); - } - warn!( event = "registration_attempt_failed", run_id = %self.run_id, @@ -1308,7 +1160,7 @@ impl EpochManager { let slot = rpc.get_slot().await?; let phases = get_epoch_phases(&self.protocol_config, epoch); - if slot >= phases.registration.start && slot < phases.registration.end { + if slot >= phases.registration.start { let forester_epoch_pda_pubkey = get_forester_epoch_pda_from_authority(&self.config.derivation_pubkey, epoch).0; let existing_registration = rpc @@ -1391,7 +1243,7 @@ impl EpochManager { }; debug!("Registered: {:?}", registration_info); Ok(registration_info) - } else if slot < phases.registration.start { + } else { warn!( event = "registration_too_early", run_id = %self.run_id, @@ -1406,21 +1258,6 @@ impl EpochManager { registration_start: phases.registration.start, } .into()) - } else { - warn!( - event = "registration_too_late", - run_id = %self.run_id, - epoch, - current_slot = slot, - registration_end_slot = phases.registration.end, - "Too late to register for epoch" - ); - Err(RegistrationError::RegistrationPhaseEnded { - epoch, - current_slot: slot, - registration_end: phases.registration.end, - } - .into()) } } @@ -4329,6 +4166,8 @@ mod tests { send_tx_rate_limit: None, prover_polling_interval: None, prover_max_wait_time: None, + fallback_rpc_url: None, + fallback_indexer_url: None, }, retry_config: RetryConfig::default(), queue_config: Default::default(), diff --git a/forester/src/forester_status.rs b/forester/src/forester_status.rs index 9c4feb44f6..d133fb1af9 100644 --- a/forester/src/forester_status.rs +++ b/forester/src/forester_status.rs @@ -55,6 +55,7 @@ pub struct ForesterStatus { pub active_phase_length: u64, pub active_epoch_progress_percentage: f64, pub hours_until_next_epoch: u64, + pub registration_is_open: bool, pub slots_until_next_registration: u64, pub hours_until_next_registration: u64, pub active_epoch_foresters: Vec, @@ -107,6 +108,7 @@ pub struct TreeStatus { pub threshold: u64, pub is_rolledover: bool, pub queue_length: Option, + pub queue_capacity: Option, pub v2_queue_info: Option, /// Currently assigned forester for this tree (in current light slot) pub assigned_forester: Option, @@ -154,22 +156,48 @@ pub async fn get_forester_status_with_options( .context("No ProtocolConfigPda found in registry program accounts")?; let current_active_epoch = protocol_config_pda.config.get_current_active_epoch(slot)?; - let current_registration_epoch = protocol_config_pda.config.get_latest_register_epoch(slot)?; + let latest_register_epoch = protocol_config_pda.config.get_latest_register_epoch(slot)?; let active_epoch_progress = protocol_config_pda .config .get_current_active_epoch_progress(slot); let active_phase_length = protocol_config_pda.config.active_phase_length; + let registration_phase_length = protocol_config_pda.config.registration_phase_length; let active_epoch_progress_percentage = active_epoch_progress as f64 / active_phase_length as f64 * 100f64; let hours_until_next_epoch = active_phase_length.saturating_sub(active_epoch_progress) * 460 / 1000 / 3600; - let slots_until_next_registration = protocol_config_pda + // Determine if registration is currently open + let registration_is_open = protocol_config_pda .config - .registration_phase_length - .saturating_sub(active_epoch_progress); + .is_registration_phase(slot) + .is_ok(); + + // If registration is closed, show the next epoch as the registration target + let current_registration_epoch = if registration_is_open { + latest_register_epoch + } else { + latest_register_epoch + 1 + }; + + let slots_until_next_registration = if registration_is_open { + // Slots until current registration closes + let epoch_progress = protocol_config_pda + .config + .get_latest_register_epoch_progress(slot) + .unwrap_or(0); + registration_phase_length.saturating_sub(epoch_progress) + } else { + // Slots until next epoch's registration opens + active_phase_length.saturating_sub( + protocol_config_pda + .config + .get_latest_register_epoch_progress(slot) + .unwrap_or(0), + ) + }; let hours_until_next_registration = slots_until_next_registration * 460 / 1000 / 3600; // Collect forester authorities for both epochs @@ -230,6 +258,7 @@ pub async fn get_forester_status_with_options( active_phase_length, active_epoch_progress_percentage, hours_until_next_epoch, + registration_is_open, slots_until_next_registration, hours_until_next_registration, active_epoch_foresters, @@ -448,6 +477,7 @@ pub async fn get_forester_status_with_options( active_phase_length, active_epoch_progress_percentage, hours_until_next_epoch, + registration_is_open, slots_until_next_registration, hours_until_next_registration, active_epoch_foresters, @@ -609,164 +639,188 @@ fn parse_tree_status( let mut merkle_account = merkle_account.ok_or_else(|| anyhow::anyhow!("Merkle tree account not found"))?; - let (fullness_percentage, next_index, capacity, height, threshold, queue_length, v2_queue_info) = - match tree.tree_type { - TreeType::StateV1 => { - let tree_account = StateMerkleTreeAccount::deserialize( - &mut &merkle_account.data[8..], - ) + let ( + fullness_percentage, + next_index, + capacity, + height, + threshold, + queue_length, + queue_capacity, + v2_queue_info, + ) = match tree.tree_type { + TreeType::StateV1 => { + let tree_account = StateMerkleTreeAccount::deserialize(&mut &merkle_account.data[8..]) .map_err(|e| anyhow::anyhow!("Failed to deserialize StateV1 metadata: {}", e))?; - let height = STATE_MERKLE_TREE_HEIGHT; - let capacity = 1u64 << height; - let threshold_val = capacity - .saturating_mul(tree_account.metadata.rollover_metadata.rollover_threshold) - / 100; - - let merkle_tree = parse_concurrent_merkle_tree_from_bytes::< - StateMerkleTreeAccount, - Poseidon, - 26, - >(&merkle_account.data) + let height = STATE_MERKLE_TREE_HEIGHT; + let capacity = 1u64 << height; + let threshold_val = capacity + .saturating_mul(tree_account.metadata.rollover_metadata.rollover_threshold) + / 100; + + let merkle_tree = + parse_concurrent_merkle_tree_from_bytes::( + &merkle_account.data, + ) .map_err(|e| anyhow::anyhow!("Failed to parse StateV1 tree: {:?}", e))?; - let next_index = merkle_tree.next_index() as u64; - let fullness = next_index as f64 / capacity as f64 * 100.0; + let next_index = merkle_tree.next_index() as u64; + let fullness = next_index as f64 / capacity as f64 * 100.0; - let queue_len = queue_account.and_then(|acc| { + let (queue_len, queue_cap) = queue_account + .map(|acc| { unsafe { parse_hash_set_from_bytes::(&acc.data) } .ok() .map(|hs| { - hs.iter() + let len = hs + .iter() .filter(|(_, cell)| cell.sequence_number.is_none()) - .count() as u64 + .count() as u64; + let cap = hs.get_capacity() as u64; + (len, cap) }) - }); - - ( - fullness, - next_index, - capacity, - height as u32, - threshold_val, - queue_len, - None, - ) - } - TreeType::AddressV1 => { - let height = ADDRESS_MERKLE_TREE_HEIGHT; - let capacity = 1u64 << height; - - let threshold_val = queue_account - .as_ref() - .and_then(|acc| QueueAccount::deserialize(&mut &acc.data[8..]).ok()) - .map(|q| { - capacity.saturating_mul(q.metadata.rollover_metadata.rollover_threshold) - / 100 - }) - .unwrap_or(0); - - let merkle_tree = parse_indexed_merkle_tree_from_bytes::< - AddressMerkleTreeAccount, - Poseidon, - usize, - 26, - 16, - >(&merkle_account.data) - .map_err(|e| anyhow::anyhow!("Failed to parse AddressV1 tree: {:?}", e))?; - - let next_index = merkle_tree - .next_index() - .saturating_sub(INDEXED_MERKLE_TREE_V1_INITIAL_LEAVES) - as u64; - let fullness = next_index as f64 / capacity as f64 * 100.0; - - let queue_len = queue_account.and_then(|acc| { + .unwrap_or((0, 0)) + }) + .map(|(l, c)| (Some(l), Some(c))) + .unwrap_or((None, None)); + + ( + fullness, + next_index, + capacity, + height as u32, + threshold_val, + queue_len, + queue_cap, + None, + ) + } + TreeType::AddressV1 => { + let height = ADDRESS_MERKLE_TREE_HEIGHT; + let capacity = 1u64 << height; + + let threshold_val = queue_account + .as_ref() + .and_then(|acc| QueueAccount::deserialize(&mut &acc.data[8..]).ok()) + .map(|q| { + capacity.saturating_mul(q.metadata.rollover_metadata.rollover_threshold) / 100 + }) + .unwrap_or(0); + + let merkle_tree = parse_indexed_merkle_tree_from_bytes::< + AddressMerkleTreeAccount, + Poseidon, + usize, + 26, + 16, + >(&merkle_account.data) + .map_err(|e| anyhow::anyhow!("Failed to parse AddressV1 tree: {:?}", e))?; + + let next_index = merkle_tree + .next_index() + .saturating_sub(INDEXED_MERKLE_TREE_V1_INITIAL_LEAVES) + as u64; + let fullness = next_index as f64 / capacity as f64 * 100.0; + + let (queue_len, queue_cap) = queue_account + .map(|acc| { unsafe { parse_hash_set_from_bytes::(&acc.data) } .ok() .map(|hs| { - hs.iter() + let len = hs + .iter() .filter(|(_, cell)| cell.sequence_number.is_none()) - .count() as u64 + .count() as u64; + let cap = hs.get_capacity() as u64; + (len, cap) }) - }); - - ( - fullness, - next_index, - capacity, - height as u32, - threshold_val, - queue_len, - None, - ) - } - TreeType::StateV2 => { - let merkle_tree = BatchedMerkleTreeAccount::state_from_bytes( - &mut merkle_account.data, - &tree.merkle_tree.into(), - ) - .map_err(|e| anyhow::anyhow!("Failed to parse StateV2 tree: {:?}", e))?; - - let height = merkle_tree.height as u64; - let capacity = 1u64 << height; - let threshold_val = (1u64 << height) - * merkle_tree.metadata.rollover_metadata.rollover_threshold - / 100; - let next_index = merkle_tree.next_index; - let fullness = next_index as f64 / capacity as f64 * 100.0; - - let v2_info = queue_account.and_then(|mut acc| { - parse_state_v2_queue_info(&merkle_tree, &mut acc.data).ok() - }); - let queue_len = v2_info.as_ref().map(|i| { - (i.input_pending_batches + i.output_pending_batches) * i.zkp_batch_size - }); - - ( - fullness, - next_index, - capacity, - height as u32, - threshold_val, - queue_len, - v2_info, - ) - } - TreeType::AddressV2 => { - let merkle_tree = BatchedMerkleTreeAccount::address_from_bytes( - &mut merkle_account.data, - &tree.merkle_tree.into(), - ) - .map_err(|e| anyhow::anyhow!("Failed to parse AddressV2 tree: {:?}", e))?; - - let height = merkle_tree.height as u64; - let capacity = 1u64 << height; - let threshold_val = - capacity * merkle_tree.metadata.rollover_metadata.rollover_threshold / 100; - let fullness = merkle_tree.next_index as f64 / capacity as f64 * 100.0; - - let v2_info = parse_address_v2_queue_info(&merkle_tree); - let queue_len = Some(v2_info.input_pending_batches * v2_info.zkp_batch_size); - - ( - fullness, - merkle_tree.next_index, - capacity, - height as u32, - threshold_val, - queue_len, - Some(v2_info), - ) - } - TreeType::Unknown => { - warn!( - "Encountered unknown tree type for merkle_tree={}, queue={}", - tree.merkle_tree, tree.queue - ); - (0.0, 0, 0, 0, 0, None, None) - } - }; + .unwrap_or((0, 0)) + }) + .map(|(l, c)| (Some(l), Some(c))) + .unwrap_or((None, None)); + + ( + fullness, + next_index, + capacity, + height as u32, + threshold_val, + queue_len, + queue_cap, + None, + ) + } + TreeType::StateV2 => { + let merkle_tree = BatchedMerkleTreeAccount::state_from_bytes( + &mut merkle_account.data, + &tree.merkle_tree.into(), + ) + .map_err(|e| anyhow::anyhow!("Failed to parse StateV2 tree: {:?}", e))?; + + let height = merkle_tree.height as u64; + let capacity = 1u64 << height; + let threshold_val = + (1u64 << height) * merkle_tree.metadata.rollover_metadata.rollover_threshold / 100; + let next_index = merkle_tree.next_index; + let fullness = next_index as f64 / capacity as f64 * 100.0; + + let v2_info = queue_account + .and_then(|mut acc| parse_state_v2_queue_info(&merkle_tree, &mut acc.data).ok()); + let queue_len = v2_info + .as_ref() + .map(|i| (i.input_pending_batches + i.output_pending_batches) * i.zkp_batch_size); + let queue_cap = v2_info + .as_ref() + .map(|i| i.batches.len() as u64 * i.batch_size); + + ( + fullness, + next_index, + capacity, + height as u32, + threshold_val, + queue_len, + queue_cap, + v2_info, + ) + } + TreeType::AddressV2 => { + let merkle_tree = BatchedMerkleTreeAccount::address_from_bytes( + &mut merkle_account.data, + &tree.merkle_tree.into(), + ) + .map_err(|e| anyhow::anyhow!("Failed to parse AddressV2 tree: {:?}", e))?; + + let height = merkle_tree.height as u64; + let capacity = 1u64 << height; + let threshold_val = + capacity * merkle_tree.metadata.rollover_metadata.rollover_threshold / 100; + let fullness = merkle_tree.next_index as f64 / capacity as f64 * 100.0; + + let v2_info = parse_address_v2_queue_info(&merkle_tree); + let queue_len = Some(v2_info.input_pending_batches * v2_info.zkp_batch_size); + let queue_cap = Some(v2_info.batches.len() as u64 * v2_info.batch_size); + + ( + fullness, + merkle_tree.next_index, + capacity, + height as u32, + threshold_val, + queue_len, + queue_cap, + Some(v2_info), + ) + } + TreeType::Unknown => { + warn!( + "Encountered unknown tree type for merkle_tree={}, queue={}", + tree.merkle_tree, tree.queue + ); + (0.0, 0, 0, 0, 0, None, None, None) + } + }; Ok(TreeStatus { tree_type: tree.tree_type.to_string(), @@ -779,6 +833,7 @@ fn parse_tree_status( threshold, is_rolledover: tree.is_rolledover, queue_length, + queue_capacity, v2_queue_info, assigned_forester: None, schedule: Vec::new(), diff --git a/forester/src/lib.rs b/forester/src/lib.rs index 7d6125ea4b..3892eb64fb 100644 --- a/forester/src/lib.rs +++ b/forester/src/lib.rs @@ -40,7 +40,7 @@ use tracing::debug; use crate::{ epoch_manager::{generate_run_id, run_service, WorkReport}, - metrics::QUEUE_LENGTH, + metrics::{QUEUE_CAPACITY, QUEUE_LENGTH}, processor::tx_cache::ProcessedHashCache, queue_helpers::{ fetch_queue_item_data, print_address_v2_queue_info, print_state_v2_input_queue_info, @@ -72,66 +72,74 @@ pub async fn run_queue_info( for tree_data in trees { match tree_data.tree_type { TreeType::StateV1 => { - let queue_length = fetch_queue_item_data(&mut rpc, &tree_data.queue, 0) - .await? - .len(); + let qdata = fetch_queue_item_data(&mut rpc, &tree_data.queue, 0).await?; + let labels = [&*queue_type.to_string(), &tree_data.merkle_tree.to_string()]; QUEUE_LENGTH - .with_label_values(&[ - &*queue_type.to_string(), - &tree_data.merkle_tree.to_string(), - ]) - .set(queue_length as i64); + .with_label_values(&labels) + .set(qdata.total_pending as i64); + QUEUE_CAPACITY + .with_label_values(&labels) + .set(qdata.capacity as i64); println!( - "{:?} queue {} length: {}", - queue_type, tree_data.queue, queue_length + "{:?} queue {} length: {} capacity: {}", + queue_type, tree_data.queue, qdata.total_pending, qdata.capacity ); } TreeType::AddressV1 => { - let queue_length = fetch_queue_item_data(&mut rpc, &tree_data.queue, 0) - .await? - .len(); + let qdata = fetch_queue_item_data(&mut rpc, &tree_data.queue, 0).await?; + let labels = [&*queue_type.to_string(), &tree_data.merkle_tree.to_string()]; QUEUE_LENGTH - .with_label_values(&[ - &*queue_type.to_string(), - &tree_data.merkle_tree.to_string(), - ]) - .set(queue_length as i64); + .with_label_values(&labels) + .set(qdata.total_pending as i64); + QUEUE_CAPACITY + .with_label_values(&labels) + .set(qdata.capacity as i64); println!( - "{:?} queue {} length: {}", - queue_type, tree_data.queue, queue_length + "{:?} queue {} length: {} capacity: {}", + queue_type, tree_data.queue, qdata.total_pending, qdata.capacity ); } TreeType::StateV2 => { println!("\n=== StateV2 {} ===", tree_data.merkle_tree); println!("\n1. APPEND OPERATIONS:"); - let append_unprocessed = - print_state_v2_output_queue_info(&mut rpc, &tree_data.queue).await?; + let append = print_state_v2_output_queue_info(&mut rpc, &tree_data.queue).await?; println!("\n2. NULLIFY OPERATIONS:"); - let nullify_unprocessed = + let nullify = print_state_v2_input_queue_info(&mut rpc, &tree_data.merkle_tree).await?; println!("===========================================\n"); + let append_labels = ["StateV2.Append", &tree_data.queue.to_string()]; QUEUE_LENGTH - .with_label_values(&["StateV2.Append", &tree_data.queue.to_string()]) - .set(append_unprocessed as i64); + .with_label_values(&append_labels) + .set(append.length as i64); + QUEUE_CAPACITY + .with_label_values(&append_labels) + .set(append.capacity as i64); + let nullify_labels = ["StateV2.Nullify", &tree_data.merkle_tree.to_string()]; QUEUE_LENGTH - .with_label_values(&["StateV2.Nullify", &tree_data.merkle_tree.to_string()]) - .set(nullify_unprocessed as i64); + .with_label_values(&nullify_labels) + .set(nullify.length as i64); + QUEUE_CAPACITY + .with_label_values(&nullify_labels) + .set(nullify.capacity as i64); } TreeType::AddressV2 => { println!("\n=== AddressV2 {} ===", tree_data.merkle_tree); - let queue_length = - print_address_v2_queue_info(&mut rpc, &tree_data.merkle_tree).await?; + let qdata = print_address_v2_queue_info(&mut rpc, &tree_data.merkle_tree).await?; println!("===========================================\n"); + let labels = ["AddressV2", &tree_data.merkle_tree.to_string()]; QUEUE_LENGTH - .with_label_values(&["AddressV2", &tree_data.merkle_tree.to_string()]) - .set(queue_length as i64); + .with_label_values(&labels) + .set(qdata.length as i64); + QUEUE_CAPACITY + .with_label_values(&labels) + .set(qdata.capacity as i64); } TreeType::Unknown => { // Virtual tree type for compression, no queue to monitor @@ -398,13 +406,17 @@ pub async fn run_pipeline_with_run_id( let mut builder = SolanaRpcPoolBuilder::::default() .url(config.external_services.rpc_url.to_string()) .photon_url(config.external_services.indexer_url.clone()) + .fallback_rpc_url(config.external_services.fallback_rpc_url.clone()) + .fallback_photon_url(config.external_services.fallback_indexer_url.clone()) .commitment(CommitmentConfig::processed()) .max_size(config.rpc_pool_config.max_size) .connection_timeout_secs(config.rpc_pool_config.connection_timeout_secs) .idle_timeout_secs(config.rpc_pool_config.idle_timeout_secs) .max_retries(config.rpc_pool_config.max_retries) .initial_retry_delay_ms(config.rpc_pool_config.initial_retry_delay_ms) - .max_retry_delay_ms(config.rpc_pool_config.max_retry_delay_ms); + .max_retry_delay_ms(config.rpc_pool_config.max_retry_delay_ms) + .failure_threshold(config.rpc_pool_config.failure_threshold) + .primary_probe_interval_secs(config.rpc_pool_config.primary_probe_interval_secs); if let Some(limiter) = rpc_rate_limiter { builder = builder.rpc_rate_limiter(limiter); @@ -467,6 +479,8 @@ pub async fn run_pipeline_with_run_id( mint_tracker, } = tracker_handles; + let recovery_probe_handle = arc_pool.spawn_primary_recovery_probe(); + debug!("Starting Forester pipeline"); let result = run_service( config, @@ -484,9 +498,13 @@ pub async fn run_pipeline_with_run_id( ) .await; - // Stop the SlotTracker task to prevent panic during shutdown + // Stop background tasks to prevent panics during shutdown tracing::debug!("Stopping SlotTracker task"); slot_tracker_handle.abort(); + if let Some(handle) = recovery_probe_handle { + tracing::debug!("Stopping primary RPC recovery probe"); + handle.abort(); + } result } diff --git a/forester/src/metrics.rs b/forester/src/metrics.rs index 356ca54480..a7be12dd83 100644 --- a/forester/src/metrics.rs +++ b/forester/src/metrics.rs @@ -12,134 +12,151 @@ use tracing::{debug, error, log::trace}; use crate::Result; +macro_rules! define_metrics { + ( + $( + metric($metric_name:literal, [$($label:literal),*]) + static ref $IDENT:ident : $ty:ty = $init:expr; + )* + ) => { + lazy_static! { + $( + pub static ref $IDENT: $ty = ($init) + .unwrap_or_else(|e| { + error!("Failed to create metric {}: {:?}", stringify!($IDENT), e); + std::process::exit(1); + }); + )* + } + + /// (name, labels) for every metric — used by the contract test. + pub const METRIC_DESCRIPTORS: &[(&str, &[&str])] = &[ + $(($metric_name, &[$($label),*]),)* + ]; + + pub fn register_metrics() { + INIT.call_once(|| { + $( + if let Err(e) = REGISTRY.register(Box::new($IDENT.clone())) { + error!("Failed to register metric {}: {:?}", stringify!($IDENT), e); + } + )* + }); + } + }; +} + lazy_static! { pub static ref REGISTRY: Registry = Registry::new(); - pub static ref QUEUE_LENGTH: IntGaugeVec = IntGaugeVec::new( +} + +static INIT: Once = Once::new(); + +define_metrics! { + metric("queue_length", ["tree_type", "tree_pubkey"]) + static ref QUEUE_LENGTH: IntGaugeVec = IntGaugeVec::new( prometheus::opts!("queue_length", "Length of the queue"), &["tree_type", "tree_pubkey"] - ) - .unwrap_or_else(|e| { - error!("Failed to create metric QUEUE_LENGTH: {:?}", e); - std::process::exit(1); - }); - pub static ref LAST_RUN_TIMESTAMP: IntGauge = IntGauge::new( + ); + + metric("queue_capacity", ["tree_type", "tree_pubkey"]) + static ref QUEUE_CAPACITY: IntGaugeVec = IntGaugeVec::new( + prometheus::opts!("queue_capacity", "Maximum capacity of the queue"), + &["tree_type", "tree_pubkey"] + ); + + metric("forester_last_run_timestamp", []) + static ref LAST_RUN_TIMESTAMP: IntGauge = IntGauge::new( "forester_last_run_timestamp", "Timestamp of the last Forester run" - ) - .unwrap_or_else(|e| { - error!("Failed to create metric LAST_RUN_TIMESTAMP: {:?}", e); - std::process::exit(1); - }); - pub static ref TRANSACTIONS_PROCESSED: IntCounterVec = IntCounterVec::new( + ); + + metric("forester_transactions_processed_total", ["epoch"]) + static ref TRANSACTIONS_PROCESSED: IntCounterVec = IntCounterVec::new( prometheus::opts!( "forester_transactions_processed_total", "Total number of transactions processed" ), &["epoch"] - ) - .unwrap_or_else(|e| { - error!("Failed to create metric TRANSACTIONS_PROCESSED: {:?}", e); - std::process::exit(1); - }); - pub static ref TRANSACTION_TIMESTAMP: GaugeVec = GaugeVec::new( + ); + + metric("forester_transaction_timestamp", ["epoch"]) + static ref TRANSACTION_TIMESTAMP: GaugeVec = GaugeVec::new( prometheus::opts!( "forester_transaction_timestamp", "Timestamp of the last processed transaction" ), &["epoch"] - ) - .unwrap_or_else(|e| { - error!("Failed to create metric TRANSACTION_TIMESTAMP: {:?}", e); - std::process::exit(1); - }); - pub static ref TRANSACTION_RATE: GaugeVec = GaugeVec::new( + ); + + metric("forester_transaction_rate", ["epoch"]) + static ref TRANSACTION_RATE: GaugeVec = GaugeVec::new( prometheus::opts!( "forester_transaction_rate", "Rate of transactions processed per second" ), &["epoch"] - ) - .unwrap_or_else(|e| { - error!("Failed to create metric TRANSACTION_RATE: {:?}", e); - std::process::exit(1); - }); - pub static ref FORESTER_SOL_BALANCE: GaugeVec = GaugeVec::new( + ); + + metric("forester_sol_balance", ["pubkey"]) + static ref FORESTER_SOL_BALANCE: GaugeVec = GaugeVec::new( prometheus::opts!( "forester_sol_balance", "Current SOL balance of the forester" ), &["pubkey"] - ) - .unwrap_or_else(|e| { - error!("Failed to create metric FORESTER_SOL_BALANCE: {:?}", e); - std::process::exit(1); - }); - pub static ref REGISTERED_FORESTERS: GaugeVec = GaugeVec::new( + ); + + metric("registered_foresters", ["epoch", "authority"]) + static ref REGISTERED_FORESTERS: GaugeVec = GaugeVec::new( prometheus::opts!("registered_foresters", "Foresters registered per epoch"), &["epoch", "authority"] - ) - .unwrap_or_else(|e| { - error!("Failed to create metric REGISTERED_FORESTERS: {:?}", e); - std::process::exit(1); - }); - pub static ref INDEXER_RESPONSE_TIME: HistogramVec = HistogramVec::new( + ); + + metric("forester_transactions_failed_total", ["reason"]) + static ref TRANSACTIONS_FAILED: IntCounterVec = IntCounterVec::new( + prometheus::opts!( + "forester_transactions_failed_total", + "Total number of failed transactions" + ), + &["reason"] + ); + + metric("forester_epoch_detected", []) + static ref EPOCH_DETECTED: IntGauge = IntGauge::new( + "forester_epoch_detected", + "Latest epoch the forester has detected and attempted to register for" + ); + + metric("forester_epoch_registered", []) + static ref EPOCH_REGISTERED: IntGauge = IntGauge::new( + "forester_epoch_registered", + "Latest epoch the forester has successfully registered for" + ); + + metric("forester_indexer_response_time_seconds", ["operation", "tree_type"]) + static ref INDEXER_RESPONSE_TIME: HistogramVec = HistogramVec::new( prometheus::HistogramOpts::new( "forester_indexer_response_time_seconds", "Response time for indexer proof requests in seconds" ) .buckets(vec![0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0]), &["operation", "tree_type"] - ) - .unwrap_or_else(|e| { - error!("Failed to create metric INDEXER_RESPONSE_TIME: {:?}", e); - std::process::exit(1); - }); - pub static ref INDEXER_PROOF_COUNT: IntCounterVec = IntCounterVec::new( + ); + + metric("forester_indexer_proof_count", ["tree_type", "tree_pubkey", "metric"]) + static ref INDEXER_PROOF_COUNT: IntCounterVec = IntCounterVec::new( prometheus::opts!( "forester_indexer_proof_count", "Number of proofs requested vs received from indexer" ), - &["tree_type", "metric"] - ) - .unwrap_or_else(|e| { - error!("Failed to create metric INDEXER_PROOF_COUNT: {:?}", e); - std::process::exit(1); - }); - static ref METRIC_UPDATES: std::sync::Mutex> = - std::sync::Mutex::new(Vec::new()); + &["tree_type", "tree_pubkey", "metric"] + ); } -static INIT: Once = Once::new(); -pub fn register_metrics() { - INIT.call_once(|| { - if let Err(e) = REGISTRY.register(Box::new(QUEUE_LENGTH.clone())) { - error!("Failed to register metric QUEUE_LENGTH: {:?}", e); - } - if let Err(e) = REGISTRY.register(Box::new(LAST_RUN_TIMESTAMP.clone())) { - error!("Failed to register metric LAST_RUN_TIMESTAMP: {:?}", e); - } - if let Err(e) = REGISTRY.register(Box::new(TRANSACTIONS_PROCESSED.clone())) { - error!("Failed to register metric TRANSACTIONS_PROCESSED: {:?}", e); - } - if let Err(e) = REGISTRY.register(Box::new(TRANSACTION_TIMESTAMP.clone())) { - error!("Failed to register metric TRANSACTION_TIMESTAMP: {:?}", e); - } - if let Err(e) = REGISTRY.register(Box::new(TRANSACTION_RATE.clone())) { - error!("Failed to register metric TRANSACTION_RATE: {:?}", e); - } - if let Err(e) = REGISTRY.register(Box::new(FORESTER_SOL_BALANCE.clone())) { - error!("Failed to register metric FORESTER_SOL_BALANCE: {:?}", e); - } - if let Err(e) = REGISTRY.register(Box::new(REGISTERED_FORESTERS.clone())) { - error!("Failed to register metric REGISTERED_FORESTERS: {:?}", e); - } - if let Err(e) = REGISTRY.register(Box::new(INDEXER_RESPONSE_TIME.clone())) { - error!("Failed to register metric INDEXER_RESPONSE_TIME: {:?}", e); - } - if let Err(e) = REGISTRY.register(Box::new(INDEXER_PROOF_COUNT.clone())) { - error!("Failed to register metric INDEXER_PROOF_COUNT: {:?}", e); - } - }); +lazy_static! { + static ref METRIC_UPDATES: std::sync::Mutex> = + std::sync::Mutex::new(Vec::new()); } pub fn update_last_run_timestamp() { @@ -210,6 +227,22 @@ pub fn update_registered_foresters(epoch: u64, authority: &str) { .set(1.0); } +pub fn increment_transactions_failed(reason: &str, count: u64) { + TRANSACTIONS_FAILED + .with_label_values(&[reason]) + .inc_by(count); +} + +pub fn update_epoch_detected(epoch: u64) { + EPOCH_DETECTED.set(epoch as i64); + debug!("Updated epoch detected: {}", epoch); +} + +pub fn update_epoch_registered(epoch: u64) { + EPOCH_REGISTERED.set(epoch as i64); + debug!("Updated epoch registered: {}", epoch); +} + pub fn update_indexer_response_time(operation: &str, tree_type: &str, duration_secs: f64) { // Ensure metrics are registered before updating (idempotent via Once) register_metrics(); @@ -222,14 +255,19 @@ pub fn update_indexer_response_time(operation: &str, tree_type: &str, duration_s ); } -pub fn update_indexer_proof_count(tree_type: &str, requested: u64, received: u64) { +pub fn update_indexer_proof_count( + tree_type: &str, + tree_pubkey: &str, + requested: u64, + received: u64, +) { // Ensure metrics are registered before updating (idempotent via Once) register_metrics(); INDEXER_PROOF_COUNT - .with_label_values(&[tree_type, "requested"]) + .with_label_values(&[tree_type, tree_pubkey, "requested"]) .inc_by(requested); INDEXER_PROOF_COUNT - .with_label_values(&[tree_type, "received"]) + .with_label_values(&[tree_type, tree_pubkey, "received"]) .inc_by(received); } @@ -392,6 +430,10 @@ pub async fn query_prometheus_metrics( pub async fn metrics_handler() -> Result { use prometheus::Encoder; + + // Flush queued metric updates so HTTP scrapes see them + process_queued_metrics(); + let encoder = TextEncoder::new(); let mut buffer = Vec::new(); diff --git a/forester/src/processor/v1/helpers.rs b/forester/src/processor/v1/helpers.rs index 5999fc0dc5..cac9e53e3e 100644 --- a/forester/src/processor/v1/helpers.rs +++ b/forester/src/processor/v1/helpers.rs @@ -214,7 +214,16 @@ pub async fn fetch_proofs_and_create_instructions( "AddressV1", total_duration.as_secs_f64(), ); - update_indexer_proof_count("AddressV1", total_addresses as u64, all_proofs.len() as u64); + let tree_pubkey_str = address_items + .first() + .map(|item| item.tree_account.merkle_tree.to_string()) + .unwrap_or_default(); + update_indexer_proof_count( + "AddressV1", + &tree_pubkey_str, + total_addresses as u64, + all_proofs.len() as u64, + ); all_proofs } else { @@ -281,8 +290,13 @@ pub async fn fetch_proofs_and_create_instructions( "StateV1", duration.as_secs_f64(), ); + let state_tree_pubkey_str = state_items + .first() + .map(|item| item.tree_account.merkle_tree.to_string()) + .unwrap_or_default(); update_indexer_proof_count( "StateV1", + &state_tree_pubkey_str, total_states as u64, proofs_received as u64, ); diff --git a/forester/src/processor/v1/send_transaction.rs b/forester/src/processor/v1/send_transaction.rs index 3de6fb36c0..9fb015e09e 100644 --- a/forester/src/processor/v1/send_transaction.rs +++ b/forester/src/processor/v1/send_transaction.rs @@ -21,13 +21,14 @@ use solana_sdk::{ transaction::Transaction, }; use tokio::time::Instant; -use tracing::{error, info, trace, warn}; +use tracing::{debug, error, info, trace, warn}; const WORK_ITEM_BATCH_SIZE: usize = 100; use crate::{ epoch_manager::WorkItem, errors::ForesterError, + metrics::increment_transactions_failed, processor::v1::{ config::SendBatchedTransactionsConfig, helpers::request_priority_fee_estimate, tx_builder::TransactionBuilder, @@ -101,6 +102,12 @@ pub async fn send_batched_transactions BLOCKHASH_REFRESH_INTERVAL { + match pool.get_connection().await { + Ok(mut rpc) => match rpc.get_latest_blockhash().await { + Ok((new_hash, new_height)) => { + recent_blockhash = new_hash; + last_valid_block_height = new_height + 150; + last_blockhash_refresh = Instant::now(); + debug!(tree = %tree_accounts.merkle_tree, "Refreshed blockhash"); + } + Err(e) => { + warn!(tree = %tree_accounts.merkle_tree, "Failed to refresh blockhash: {:?}", e); + } + }, + Err(e) => { + warn!(tree = %tree_accounts.merkle_tree, "Failed to get RPC for blockhash refresh: {:?}", e); + } + } + } + trace!(tree = %tree_accounts.merkle_tree, "Processing chunk of size {}", work_chunk.len()); let build_start_time = Instant::now(); @@ -118,8 +145,8 @@ pub async fn send_batched_transactions( error: format!("Fetch queue data failed for {}: {}", tree_id_str, e), } })? + .items }; if queue_item_data.is_empty() { @@ -339,6 +367,7 @@ async fn execute_transaction_chunk_sending( trace!(tx.signature = %sig, "Transaction confirmed sent"); } TransactionSendResult::Failure(err, sig_opt) => { + increment_transactions_failed("send_failed", 1); if let Some(sig) = sig_opt { error!(tx.signature = %sig, error = ?err, "Transaction failed to send"); } else { diff --git a/forester/src/processor/v2/common.rs b/forester/src/processor/v2/common.rs index 9917e5a9eb..4ecb5f2626 100644 --- a/forester/src/processor/v2/common.rs +++ b/forester/src/processor/v2/common.rs @@ -19,8 +19,8 @@ use tracing::{debug, error, info, warn}; use super::{errors::V2Error, proof_worker::ProofJob}; use crate::{ - errors::ForesterError, processor::tx_cache::ProcessedHashCache, slot_tracker::SlotTracker, - Result, + errors::ForesterError, metrics::increment_transactions_failed, + processor::tx_cache::ProcessedHashCache, slot_tracker::SlotTracker, Result, }; const SLOTS_STOP_THRESHOLD: u64 = 3; @@ -196,6 +196,7 @@ pub(crate) async fn send_transaction_batch( if let Some(Some(status)) = statuses.first() { if let Some(err) = &status.err { + increment_transactions_failed("execution_failed", 1); error!( "transaction {} failed for tree {}: {:?}", signature, context.merkle_tree, err @@ -233,6 +234,7 @@ pub(crate) async fn send_transaction_batch( tokio::time::sleep(poll_interval).await; } + increment_transactions_failed("timeout", 1); warn!( "Transaction {} timed out waiting for confirmation for tree {}", signature, context.merkle_tree diff --git a/forester/src/queue_helpers.rs b/forester/src/queue_helpers.rs index 8a1b008677..ba80afc166 100644 --- a/forester/src/queue_helpers.rs +++ b/forester/src/queue_helpers.rs @@ -14,10 +14,13 @@ use crate::Result; pub struct V2QueueInfo { pub next_index: u64, pub pending_batch_index: u64, + pub batch_size: u64, pub zkp_batch_size: u64, pub batches: Vec, pub input_pending_batches: u64, pub output_pending_batches: u64, + pub input_total_zkp_batches: u64, + pub output_total_zkp_batches: u64, pub input_items_in_current_zkp_batch: u64, pub output_items_in_current_zkp_batch: u64, } @@ -36,6 +39,7 @@ pub struct BatchInfo { pub struct ParsedBatchData { pub batch_infos: Vec, pub total_pending_batches: u64, + pub batch_size: u64, pub zkp_batch_size: u64, pub items_in_current_zkp_batch: u64, } @@ -50,7 +54,10 @@ pub fn parse_batch_metadata( let mut batch_infos = Vec::with_capacity(batches.len()); let mut items_in_current_zkp_batch = 0u64; + let mut batch_size = 0u64; + for (batch_idx, batch) in batches.iter().enumerate() { + batch_size = batch.batch_size; zkp_batch_size = batch.zkp_batch_size; let num_inserted = batch.get_num_inserted_zkps(); let current_index = batch.get_current_zkp_batch_index(); @@ -75,6 +82,7 @@ pub fn parse_batch_metadata( ParsedBatchData { batch_infos, total_pending_batches, + batch_size, zkp_batch_size, items_in_current_zkp_batch, } @@ -92,13 +100,29 @@ pub fn parse_state_v2_queue_info( let output_parsed = parse_batch_metadata(&output_queue.batch_metadata.batches); let input_parsed = parse_batch_metadata(&merkle_tree.queue_batches.batches); + let input_total_zkp_batches = if input_parsed.zkp_batch_size > 0 { + input_parsed.batch_infos.len() as u64 + * (input_parsed.batch_size / input_parsed.zkp_batch_size) + } else { + 0 + }; + let output_total_zkp_batches = if output_parsed.zkp_batch_size > 0 { + output_parsed.batch_infos.len() as u64 + * (output_parsed.batch_size / output_parsed.zkp_batch_size) + } else { + 0 + }; + Ok(V2QueueInfo { next_index, pending_batch_index: output_queue.batch_metadata.pending_batch_index, + batch_size: output_parsed.batch_size, zkp_batch_size: output_parsed.zkp_batch_size, batches: output_parsed.batch_infos, input_pending_batches: input_parsed.total_pending_batches, output_pending_batches: output_parsed.total_pending_batches, + input_total_zkp_batches, + output_total_zkp_batches, input_items_in_current_zkp_batch: input_parsed.items_in_current_zkp_batch, output_items_in_current_zkp_batch: output_parsed.items_in_current_zkp_batch, }) @@ -108,29 +132,53 @@ pub fn parse_address_v2_queue_info(merkle_tree: &BatchedMerkleTreeAccount) -> V2 let next_index = merkle_tree.queue_batches.next_index; let parsed = parse_batch_metadata(&merkle_tree.queue_batches.batches); + let input_total_zkp_batches = if parsed.zkp_batch_size > 0 { + parsed.batch_infos.len() as u64 * (parsed.batch_size / parsed.zkp_batch_size) + } else { + 0 + }; + V2QueueInfo { next_index, pending_batch_index: merkle_tree.queue_batches.pending_batch_index, + batch_size: parsed.batch_size, zkp_batch_size: parsed.zkp_batch_size, batches: parsed.batch_infos, input_pending_batches: parsed.total_pending_batches, output_pending_batches: 0, + input_total_zkp_batches, + output_total_zkp_batches: 0, input_items_in_current_zkp_batch: parsed.items_in_current_zkp_batch, output_items_in_current_zkp_batch: 0, } } +/// Queue length and capacity (used by metrics). +#[derive(Debug, Clone)] +pub struct QueueLengthAndCapacity { + pub length: u64, + pub capacity: u64, +} + #[derive(Debug, Clone)] pub struct QueueItemData { pub hash: [u8; 32], pub index: usize, } +/// Result of fetching V1 queue data, including items and capacity. +#[derive(Debug, Clone)] +pub struct QueueFetchResult { + pub items: Vec, + pub capacity: u64, + pub total_pending: u64, +} + pub async fn fetch_queue_item_data( rpc: &mut R, queue_pubkey: &Pubkey, start_index: u16, -) -> Result> { +) -> Result { trace!("Fetching queue data for {:?}", queue_pubkey); let account = rpc.get_account(*queue_pubkey).await?; let mut account = match account { @@ -140,7 +188,11 @@ pub async fn fetch_queue_item_data( "Queue account {} not found - may have been deleted or not yet created", queue_pubkey ); - return Ok(Vec::new()); + return Ok(QueueFetchResult { + items: Vec::new(), + capacity: 0, + total_pending: 0, + }); } }; let offset = 8 + size_of::(); @@ -151,11 +203,16 @@ pub async fn fetch_queue_item_data( account.data.len(), offset ); - return Ok(Vec::new()); + return Ok(QueueFetchResult { + items: Vec::new(), + capacity: 0, + total_pending: 0, + }); } let queue: HashSet = unsafe { HashSet::from_bytes_copy(&mut account.data[offset..])? }; let end_index = queue.get_capacity(); + let capacity = end_index as u64; let all_items: Vec<(usize, [u8; 32], bool)> = queue .iter() @@ -186,13 +243,17 @@ pub async fn fetch_queue_item_data( filtered_queue.len() ); - Ok(filtered_queue) + Ok(QueueFetchResult { + items: filtered_queue, + capacity, + total_pending: total_pending as u64, + }) } pub async fn print_state_v2_output_queue_info( rpc: &mut R, output_queue_pubkey: &Pubkey, -) -> Result { +) -> Result { if let Some(mut account) = rpc.get_account(*output_queue_pubkey).await? { let output_queue = BatchedQueueAccount::output_from_bytes(account.data.as_mut_slice())?; let metadata = output_queue.get_metadata(); @@ -225,6 +286,9 @@ pub async fn print_state_v2_output_queue_info( )); } + // Capacity in items = num_batches * batch_size + let capacity = parsed.batch_infos.len() as u64 * parsed.batch_size; + println!("StateV2 {} APPEND:", output_queue_pubkey); println!(" next_index (total ever added): {}", next_index); println!( @@ -238,8 +302,8 @@ pub async fn print_state_v2_output_queue_info( ); println!(" zkp_batch_size: {}", zkp_batch_size); println!( - " SUMMARY: {} items added, {} items processed, {} items pending", - next_index, total_completed_operations, total_unprocessed + " SUMMARY: {} items added, {} items processed, {} items pending (capacity: {})", + next_index, total_completed_operations, total_unprocessed, capacity ); for detail in batch_details { println!(" {}", detail); @@ -249,7 +313,10 @@ pub async fn print_state_v2_output_queue_info( parsed.total_pending_batches ); - Ok(total_unprocessed as usize) + Ok(QueueLengthAndCapacity { + length: total_unprocessed, + capacity, + }) } else { Err(anyhow::anyhow!("account not found")) } @@ -258,7 +325,7 @@ pub async fn print_state_v2_output_queue_info( pub async fn print_state_v2_input_queue_info( rpc: &mut R, merkle_tree_pubkey: &Pubkey, -) -> Result { +) -> Result { if let Some(mut account) = rpc.get_account(*merkle_tree_pubkey).await? { let merkle_tree = BatchedMerkleTreeAccount::state_from_bytes( account.data.as_mut_slice(), @@ -292,6 +359,9 @@ pub async fn print_state_v2_input_queue_info( total_unprocessed += pending_operations_in_batch; } + // Capacity in items = num_batches * batch_size + let capacity = parsed.batch_infos.len() as u64 * parsed.batch_size; + println!("StateV2 {} NULLIFY:", merkle_tree_pubkey); println!(" next_index (total ever added): {}", next_index); println!( @@ -305,8 +375,8 @@ pub async fn print_state_v2_input_queue_info( ); println!(" zkp_batch_size: {}", parsed.zkp_batch_size); println!( - " SUMMARY: {} items added, {} items processed, {} items pending", - next_index, total_completed_operations, total_unprocessed + " SUMMARY: {} items added, {} items processed, {} items pending (capacity: {})", + next_index, total_completed_operations, total_unprocessed, capacity ); for detail in batch_details { println!(" {}", detail); @@ -316,7 +386,10 @@ pub async fn print_state_v2_input_queue_info( total_unprocessed / parsed.zkp_batch_size ); - Ok(total_unprocessed as usize) + Ok(QueueLengthAndCapacity { + length: total_unprocessed, + capacity, + }) } else { Err(anyhow::anyhow!("account not found")) } @@ -325,7 +398,7 @@ pub async fn print_state_v2_input_queue_info( pub async fn print_address_v2_queue_info( rpc: &mut R, merkle_tree_pubkey: &Pubkey, -) -> Result { +) -> Result { if let Some(mut account) = rpc.get_account(*merkle_tree_pubkey).await? { let merkle_tree = BatchedMerkleTreeAccount::address_from_bytes( account.data.as_mut_slice(), @@ -351,9 +424,15 @@ pub async fn print_address_v2_queue_info( total_unprocessed += batch_info.pending; } + // For AddressV2, total_unprocessed is in zkp batches. + // Convert to items for consistency with other queue types. + let length_items = total_unprocessed * parsed.zkp_batch_size; + // Capacity in items = num_batches * batch_size + let capacity = parsed.batch_infos.len() as u64 * parsed.batch_size; + println!("AddressV2 {}:", merkle_tree_pubkey); println!(" next_index (total ever added): {}", next_index); - println!(" total_unprocessed_items: {}", total_unprocessed); + println!(" total_unprocessed_items: {}", length_items); println!( " pending_batch_index: {}", merkle_tree.queue_batches.pending_batch_index @@ -365,7 +444,10 @@ pub async fn print_address_v2_queue_info( println!(" Total pending ADDRESS operations: {}", total_unprocessed); - Ok(total_unprocessed as usize) + Ok(QueueLengthAndCapacity { + length: length_items, + capacity, + }) } else { Err(anyhow::anyhow!("account not found")) } @@ -402,13 +484,22 @@ pub async fn get_state_v2_output_queue_info( let parsed = parse_batch_metadata(&queue.batch_metadata.batches); + let output_total_zkp_batches = if parsed.zkp_batch_size > 0 { + parsed.batch_infos.len() as u64 * (parsed.batch_size / parsed.zkp_batch_size) + } else { + 0 + }; + Ok(V2QueueInfo { next_index, pending_batch_index: queue.batch_metadata.pending_batch_index, + batch_size: parsed.batch_size, zkp_batch_size: parsed.zkp_batch_size, batches: parsed.batch_infos, input_pending_batches: 0, output_pending_batches: parsed.total_pending_batches, + input_total_zkp_batches: 0, + output_total_zkp_batches, input_items_in_current_zkp_batch: 0, output_items_in_current_zkp_batch: parsed.items_in_current_zkp_batch, }) diff --git a/forester/tests/e2e_test.rs b/forester/tests/e2e_test.rs index 855b765908..1727ed108b 100644 --- a/forester/tests/e2e_test.rs +++ b/forester/tests/e2e_test.rs @@ -213,6 +213,8 @@ async fn e2e_test() { rpc_rate_limit: None, photon_rate_limit: None, send_tx_rate_limit: None, + fallback_rpc_url: None, + fallback_indexer_url: None, }, lookup_table_address: None, retry_config: Default::default(), @@ -243,6 +245,8 @@ async fn e2e_test() { max_retries: 10, initial_retry_delay_ms: 1000, max_retry_delay_ms: 16000, + failure_threshold: 3, + primary_probe_interval_secs: 30, }, registry_pubkey: light_registry::ID, payer_keypair: env.protocol.forester.insecure_clone(), diff --git a/forester/tests/legacy/priority_fee_test.rs b/forester/tests/legacy/priority_fee_test.rs index 875d20c5fc..ab7d3e944c 100644 --- a/forester/tests/legacy/priority_fee_test.rs +++ b/forester/tests/legacy/priority_fee_test.rs @@ -1,5 +1,5 @@ use forester::{ - cli::StartArgs, + cli::{ProcessorMode, StartArgs}, processor::v1::{ config::CapConfig, helpers::{get_capped_priority_fee, request_priority_fee_estimate}, @@ -69,6 +69,32 @@ async fn test_priority_fee_request() { rpc_rate_limit: None, photon_rate_limit: None, send_tx_rate_limit: None, + processor_mode: ProcessorMode::All, + queue_polling_mode: Default::default(), + tree_ids: vec![], + enable_compressible: false, + lookup_table_address: None, + api_server_port: 8080, + api_server_public_bind: false, + group_authority: None, + light_pda_programs: vec![], + helius_rpc: false, + prometheus_url: None, + prover_append_url: None, + prover_update_url: None, + prover_address_append_url: None, + prover_api_key: None, + prover_polling_interval_ms: None, + prover_max_wait_time_secs: None, + photon_grpc_url: None, + max_concurrent_sends: 50, + max_batches_per_tree: 4, + confirmation_max_attempts: 30, + confirmation_poll_interval_ms: 1000, + fallback_rpc_url: None, + fallback_indexer_url: None, + rpc_pool_failure_threshold: 3, + rpc_pool_primary_probe_interval_secs: 30, }; let config = ForesterConfig::new_for_start(&args).expect("Failed to create config"); diff --git a/forester/tests/legacy/test_utils.rs b/forester/tests/legacy/test_utils.rs index 47425cabbc..d535665d71 100644 --- a/forester/tests/legacy/test_utils.rs +++ b/forester/tests/legacy/test_utils.rs @@ -87,6 +87,8 @@ pub fn forester_config() -> ForesterConfig { rpc_rate_limit: None, photon_rate_limit: None, send_tx_rate_limit: None, + fallback_rpc_url: None, + fallback_indexer_url: None, }, retry_config: Default::default(), queue_config: Default::default(), @@ -111,6 +113,8 @@ pub fn forester_config() -> ForesterConfig { max_retries: 10, initial_retry_delay_ms: 1000, max_retry_delay_ms: 16000, + failure_threshold: 3, + primary_probe_interval_secs: 30, }, registry_pubkey: light_registry::ID, payer_keypair: test_accounts.protocol.forester.insecure_clone(), diff --git a/forester/tests/metrics_contract_test.rs b/forester/tests/metrics_contract_test.rs new file mode 100644 index 0000000000..45eb9d013b --- /dev/null +++ b/forester/tests/metrics_contract_test.rs @@ -0,0 +1,66 @@ +//! Integration test: ensures metrics-contract.json stays in sync with src/metrics.rs. +//! +//! Run with: cargo test -p forester --test metrics_contract_test + +use std::collections::HashSet; + +#[test] +fn metrics_contract_matches_code() { + let manifest_dir = env!("CARGO_MANIFEST_DIR"); + + // --- read contract --- + let contract_path = format!("{}/metrics-contract.json", manifest_dir); + let contract_text = std::fs::read_to_string(&contract_path) + .unwrap_or_else(|e| panic!("Cannot read {}: {}", contract_path, e)); + let contract: serde_json::Value = serde_json::from_str(&contract_text) + .unwrap_or_else(|e| panic!("Bad JSON in {}: {}", contract_path, e)); + + let contract_set: HashSet<(String, Vec)> = contract["metrics"] + .as_array() + .expect("metrics must be an array") + .iter() + .map(|m| { + let name = m["name"] + .as_str() + .expect("metric.name must be a string") + .to_string(); + let labels: Vec = m["labels"] + .as_array() + .expect("metric.labels must be an array") + .iter() + .map(|l| l.as_str().expect("label must be a string").to_string()) + .collect(); + (name, labels) + }) + .collect(); + + // --- read code descriptors --- + let code_set: HashSet<(String, Vec)> = forester::metrics::METRIC_DESCRIPTORS + .iter() + .map(|(name, labels)| { + ( + name.to_string(), + labels.iter().map(|l| l.to_string()).collect(), + ) + }) + .collect(); + + // --- compare --- + let in_contract_not_code: Vec<_> = contract_set.difference(&code_set).collect(); + let in_code_not_contract: Vec<_> = code_set.difference(&contract_set).collect(); + + let mut errors = Vec::new(); + if !in_contract_not_code.is_empty() { + errors.push(format!( + "In contract but not in code: {:?}", + in_contract_not_code + )); + } + if !in_code_not_contract.is_empty() { + errors.push(format!( + "In code but not in contract: {:?}", + in_code_not_contract + )); + } + assert!(errors.is_empty(), "\n{}\n", errors.join("\n")); +} diff --git a/forester/tests/priority_fee_test.rs b/forester/tests/priority_fee_test.rs index dddb76a737..e2d3c16286 100644 --- a/forester/tests/priority_fee_test.rs +++ b/forester/tests/priority_fee_test.rs @@ -84,6 +84,10 @@ async fn test_priority_fee_request() { light_pda_programs: vec![], helius_rpc: false, prometheus_url: None, + fallback_rpc_url: None, + fallback_indexer_url: None, + rpc_pool_failure_threshold: 3, + rpc_pool_primary_probe_interval_secs: 30, }; let config = ForesterConfig::new_for_start(&args).expect("Failed to create config"); diff --git a/forester/tests/test_utils.rs b/forester/tests/test_utils.rs index 1a1d3f18f6..4225503a19 100644 --- a/forester/tests/test_utils.rs +++ b/forester/tests/test_utils.rs @@ -97,6 +97,8 @@ pub fn forester_config() -> ForesterConfig { rpc_rate_limit: None, photon_rate_limit: None, send_tx_rate_limit: None, + fallback_rpc_url: None, + fallback_indexer_url: None, }, retry_config: Default::default(), queue_config: Default::default(), @@ -124,6 +126,8 @@ pub fn forester_config() -> ForesterConfig { max_retries: 10, initial_retry_delay_ms: 1000, max_retry_delay_ms: 16000, + failure_threshold: 3, + primary_probe_interval_secs: 30, }, registry_pubkey: light_registry::ID, payer_keypair: test_accounts.protocol.forester.insecure_clone(), diff --git a/scripts/deploy/deploy.sh b/scripts/deploy/deploy.sh index 58deaeb383..daaedd4a86 100755 --- a/scripts/deploy/deploy.sh +++ b/scripts/deploy/deploy.sh @@ -4,10 +4,8 @@ # Array of program names libraries=("account_compression" "light_compressed_token" "light_system_program_pinocchio" "light_registry") - BUFFER_KEYPAIR_PATH="target/buffer" - create_buffer_account() { local max_retries=5 local attempt=1