diff --git a/geyser-plugin/README.md b/geyser-plugin/README.md index 467584a..3d90e9e 100644 --- a/geyser-plugin/README.md +++ b/geyser-plugin/README.md @@ -123,6 +123,65 @@ admin = "127.0.0.1:3000" `kafka.bootstrap_servers`, `kafka.topic`, `plugin.local_rpc_url`, and `plugin.admin` are required. The `admin` bind address serves `POST /filters/accounts` and, when `plugin.metrics` is `true`, also `GET /metrics`. If `ksql.url` is set, it must be a valid absolute `http` or `https` base URL and startup will fail if the restore query cannot complete. Legacy filter arrays and legacy transaction, slot-status, block, and wrapping options are rejected during config parsing. +## Startup checks + +Startup checks run from `KafkaPlugin::on_load` during normal validator/plugin startup. They run whether the validator is launched through `make geyser-plugin-launch` or directly with `solana-test-validator --geyser-plugin-config ...`; no separate preflight binary or Makefile target is required. + +Startup validates: + +- validator JSON wrapper +- runtime TOML config +- plugin library path +- admin bind address +- ksqlDB startup restore when `ksql.url` is configured +- Kafka bootstrap/topic readiness +- local RPC URL syntax only, not local RPC liveness + +The local RPC endpoint is not required to be reachable during startup checks because it belongs to the validator being launched. + +### Safe-start manual test matrix + +| Scenario | Temporary change | Expected prefix | +| --- | --- | --- | +| malformed validator JSON | invalid JSON in wrapper copy | Agave rejects the wrapper before plugin load with `FailedToLoadPlugin`; no segfault | +| missing runtime TOML | JSON `config_file` points to missing TOML | `ERROR config startup check failed` | +| malformed runtime TOML | invalid TOML in runtime copy | `ERROR config startup check failed` | +| missing Kafka bootstrap | empty `kafka.bootstrap_servers` | `ERROR config startup check failed` | +| Kafka down | no Kafka on configured bootstrap | `ERROR kafka startup check failed` | +| ksqlDB down | `ksql.url = "http://127.0.0.1:1"` | `ERROR ksql startup check failed` | +| invalid ksql table | `table = "bad-name"` | `ERROR config startup check failed` | +| admin port in use | keep listener on configured port | `ERROR admin startup check failed` | +| malformed local RPC URL | `local_rpc_url = "127.0.0.1:8899"` | `ERROR config startup check failed` | + +Proof command for the original issue: + +```shell +make geyser-plugin-launch +``` + +Expected with no dependencies running: + +- exits non-zero +- prints a Kafka startup check error with action `ensure Kafka is reachable at kafka.bootstrap_servers or update kafka.bootstrap_servers` +- validator/plugin startup exits gracefully +- does not print `Segmentation fault` + +Direct validator path that must receive the same checks: + +```shell +cd geyser-plugin +solana-test-validator --log --reset --geyser-plugin-config plugin-config.json +``` + +Expected failures and messages must match the Makefile path because the checks live in `KafkaPlugin::on_load`. + +Success path: + +```shell +make kafka-ready +make geyser-plugin-launch +``` + ## Whitelist Management Account inclusion is managed through the HTTP API: diff --git a/geyser-plugin/src/config.rs b/geyser-plugin/src/config.rs index d713b7b..9dca5cf 100644 --- a/geyser-plugin/src/config.rs +++ b/geyser-plugin/src/config.rs @@ -77,10 +77,10 @@ pub struct PluginConfig { #[derive(Debug, Deserialize)] #[serde(deny_unknown_fields)] -struct ValidatorConfig { +pub(crate) struct ValidatorConfig { #[allow(dead_code)] - libpath: String, - config_file: PathBuf, + pub(crate) libpath: PathBuf, + pub(crate) config_file: PathBuf, } fn default_shutdown_timeout_ms() -> u64 { @@ -125,6 +125,7 @@ impl Default for KsqlConfig { impl Config { /// Read plugin config from either a validator JSON wrapper or a TOML runtime config. + #[allow(dead_code)] pub fn read_from>(config_path: P) -> PluginResult { let config_path = config_path.as_ref(); let contents = read_to_string(config_path)?; @@ -171,14 +172,14 @@ impl Config { } } - fn fill_defaults(&mut self) { + pub(crate) fn fill_defaults(&mut self) { self.set_default("request.required.acks", "1"); self.set_default("message.timeout.ms", "30000"); self.set_default("compression.type", "lz4"); self.set_default("partitioner", "murmur2_random"); } - fn validate(&self) -> PluginResult<()> { + pub(crate) fn validate(&self) -> PluginResult<()> { if self.kafka.bootstrap_servers.trim().is_empty() { return Err(GeyserPluginError::ConfigFileReadError { msg: "missing required config field `kafka.bootstrap_servers`" @@ -192,10 +193,40 @@ impl Config { }); } - if self.plugin.local_rpc_url.trim().is_empty() { + let trimmed_local_rpc_url = self.plugin.local_rpc_url.trim(); + if trimmed_local_rpc_url.is_empty() { return Err(GeyserPluginError::ConfigFileReadError { - msg: "missing required config field `plugin.local_rpc_url`" - .to_owned(), + msg: + "invalid config field `plugin.local_rpc_url`: URL must not be empty" + .to_owned(), + }); + } + + let parsed_local_rpc_url = + Url::parse(trimmed_local_rpc_url).map_err(|error| { + GeyserPluginError::ConfigFileReadError { + msg: format!( + "invalid config field `plugin.local_rpc_url`: {error}" + ), + } + })?; + + match parsed_local_rpc_url.scheme() { + "http" | "https" => {} + scheme => { + return Err(GeyserPluginError::ConfigFileReadError { + msg: format!( + "invalid config field `plugin.local_rpc_url`: unsupported scheme `{scheme}`" + ), + }); + } + } + + if !parsed_local_rpc_url.has_host() { + return Err(GeyserPluginError::ConfigFileReadError { + msg: + "invalid config field `plugin.local_rpc_url`: host is required" + .to_owned(), }); } @@ -238,18 +269,45 @@ impl Config { .to_owned(), }); } + } - if self.ksql.table.trim().is_empty() { - return Err(GeyserPluginError::ConfigFileReadError { - msg: "invalid config field `ksql.table`: table must not be empty".to_owned(), - }); + validate_ksql_identifier(&self.ksql.table).map_err(|error| { + GeyserPluginError::ConfigFileReadError { + msg: format!("invalid config field `ksql.table`: {error}"), } - } + })?; Ok(()) } } +/// Validates that `identifier` is a safe ksqlDB identifier suitable for +/// direct interpolation into a SQL statement. The identifier must start with +/// an ASCII letter or `_` and may otherwise contain only ASCII alphanumeric +/// characters or `_`. +pub(crate) fn validate_ksql_identifier( + identifier: &str, +) -> std::io::Result<&str> { + let mut chars = identifier.chars(); + let first = chars.next().ok_or_else(|| { + std::io::Error::other("ksql identifier must not be empty") + })?; + if !(first.is_ascii_alphabetic() || first == '_') { + return Err(std::io::Error::other(format!( + "invalid ksql identifier `{identifier}`: must start with an ASCII letter or `_`" + ))); + } + for c in chars { + if !(c.is_ascii_alphanumeric() || c == '_') { + return Err(std::io::Error::other(format!( + "invalid ksql identifier `{identifier}`: only ASCII alphanumeric characters and `_` are allowed" + ))); + } + } + Ok(identifier) +} + +#[allow(dead_code)] fn read_to_string(path: &Path) -> PluginResult { let mut file = File::open(path)?; let mut contents = String::new(); @@ -257,7 +315,7 @@ fn read_to_string(path: &Path) -> PluginResult { Ok(contents) } -fn resolve_runtime_config_path( +pub(crate) fn resolve_runtime_config_path( wrapper_path: &Path, runtime_path: &Path, ) -> PathBuf { @@ -273,12 +331,45 @@ fn resolve_runtime_config_path( #[cfg(test)] mod tests { - use super::Config; + use super::{Config, validate_ksql_identifier}; use std::{ fs, time::{SystemTime, UNIX_EPOCH}, }; + #[test] + fn test_validates_simple_identifier() { + assert_eq!(validate_ksql_identifier("accounts").unwrap(), "accounts"); + assert_eq!(validate_ksql_identifier("_x").unwrap(), "_x"); + assert_eq!(validate_ksql_identifier("A1_b2").unwrap(), "A1_b2"); + } + + #[test] + fn test_rejects_empty_identifier() { + let error = validate_ksql_identifier("").unwrap_err().to_string(); + assert!(error.contains("must not be empty")); + } + + #[test] + fn test_rejects_identifier_starting_with_digit() { + let error = validate_ksql_identifier("1bad").unwrap_err().to_string(); + assert!(error.contains("must start with an ASCII letter")); + } + + #[test] + fn test_rejects_identifier_with_invalid_characters() { + let error = validate_ksql_identifier("accounts; DROP TABLE x") + .unwrap_err() + .to_string(); + assert!(error.contains("only ASCII alphanumeric")); + } + + #[test] + fn test_rejects_identifier_with_quote() { + let error = validate_ksql_identifier("a\"b").unwrap_err().to_string(); + assert!(error.contains("only ASCII alphanumeric")); + } + fn parse_config(toml: &str) -> Result { let mut config: Config = toml::from_str(toml).map_err(|error| error.to_string())?; @@ -606,6 +697,141 @@ admin = "127.0.0.1:8080" ); } + #[test] + fn test_rejects_invalid_ksql_table_identifier() { + let error = parse_config( + r#" +libpath = "target/release/libsolana_accountsdb_plugin_kafka.so" + +[kafka] +bootstrap_servers = "localhost:9092" +topic = "solana.testnet.account_updates" + +[ksql] +url = "http://127.0.0.1:8088" +table = "bad-name" + +[plugin] +local_rpc_url = "http://127.0.0.1:8899" +admin = "127.0.0.1:8080" +"#, + ) + .unwrap_err(); + + assert!(error.contains("invalid config field `ksql.table`")); + assert!(error.contains("only ASCII alphanumeric")); + } + + #[test] + fn test_rejects_ksql_table_starting_with_digit() { + let error = parse_config( + r#" +libpath = "target/release/libsolana_accountsdb_plugin_kafka.so" + +[kafka] +bootstrap_servers = "localhost:9092" +topic = "solana.testnet.account_updates" + +[ksql] +table = "1bad" + +[plugin] +local_rpc_url = "http://127.0.0.1:8899" +admin = "127.0.0.1:8080" +"#, + ) + .unwrap_err(); + + assert!(error.contains("invalid config field `ksql.table`")); + assert!(error.contains("must start with an ASCII letter")); + } + + #[test] + fn test_rejects_empty_local_rpc_url() { + let error = parse_config( + r#" +libpath = "target/release/libsolana_accountsdb_plugin_kafka.so" + +[kafka] +bootstrap_servers = "localhost:9092" +topic = "solana.testnet.account_updates" + +[plugin] +local_rpc_url = " " +admin = "127.0.0.1:8080" +"#, + ) + .unwrap_err(); + + assert!(error.contains("invalid config field `plugin.local_rpc_url`")); + assert!(error.contains("URL must not be empty")); + } + + #[test] + fn test_rejects_local_rpc_url_without_scheme() { + let error = parse_config( + r#" +libpath = "target/release/libsolana_accountsdb_plugin_kafka.so" + +[kafka] +bootstrap_servers = "localhost:9092" +topic = "solana.testnet.account_updates" + +[plugin] +local_rpc_url = "127.0.0.1:8899" +admin = "127.0.0.1:8080" +"#, + ) + .unwrap_err(); + + assert!(error.contains("invalid config field `plugin.local_rpc_url`")); + assert!(error.contains("relative URL without a base")); + } + + #[test] + fn test_rejects_local_rpc_url_with_unsupported_scheme() { + let error = parse_config( + r#" +libpath = "target/release/libsolana_accountsdb_plugin_kafka.so" + +[kafka] +bootstrap_servers = "localhost:9092" +topic = "solana.testnet.account_updates" + +[plugin] +local_rpc_url = "ftp://127.0.0.1:8899" +admin = "127.0.0.1:8080" +"#, + ) + .unwrap_err(); + + assert!(error.contains("invalid config field `plugin.local_rpc_url`")); + assert!(error.contains("unsupported scheme `ftp`")); + } + + #[test] + fn test_rejects_local_rpc_url_without_host() { + let error = parse_config( + r#" +libpath = "target/release/libsolana_accountsdb_plugin_kafka.so" + +[kafka] +bootstrap_servers = "localhost:9092" +topic = "solana.testnet.account_updates" + +[plugin] +local_rpc_url = "http://:8899" +admin = "127.0.0.1:8080" +"#, + ) + .unwrap_err(); + + assert!(error.contains("invalid config field `plugin.local_rpc_url`")); + assert!( + error.contains("empty host") || error.contains("host is required") + ); + } + #[test] fn test_passes_through_kafka_client_overrides() { let config = parse_config( diff --git a/geyser-plugin/src/initial_account_backfill/mod.rs b/geyser-plugin/src/initial_account_backfill/mod.rs index 888abb0..7784fee 100644 --- a/geyser-plugin/src/initial_account_backfill/mod.rs +++ b/geyser-plugin/src/initial_account_backfill/mod.rs @@ -55,9 +55,10 @@ impl InitialAccountBackfill { update_account_topic, subscriptions, client: RpcClient::new_with_commitment( - local_rpc_url, + local_rpc_url.clone(), CommitmentConfig::confirmed(), ), + local_rpc_url, }); let handle = InitialAccountBackfillHandle { inner: inner.clone(), @@ -96,6 +97,7 @@ impl InitialAccountBackfill { String::new(), CommitmentConfig::confirmed(), ), + local_rpc_url: String::new(), }); Self { handle: InitialAccountBackfillHandle { inner }, @@ -254,11 +256,17 @@ struct InitialAccountBackfillInner { update_account_topic: Arc, subscriptions: AccountSubscriptions, client: RpcClient, + local_rpc_url: String, } impl InitialAccountBackfillInner { async fn process_request(&self, pubkeys: &[[u8; 32]]) { - match rpc::fetch_account_events_for_request(&self.client, pubkeys).await + match rpc::fetch_account_events_for_request( + &self.client, + &self.local_rpc_url, + pubkeys, + ) + .await { Ok(events) => { info!( @@ -399,6 +407,7 @@ mod tests { String::new(), CommitmentConfig::confirmed(), ), + local_rpc_url: String::new(), }), rx, ) diff --git a/geyser-plugin/src/initial_account_backfill/rpc.rs b/geyser-plugin/src/initial_account_backfill/rpc.rs index 2477ecc..a8de068 100644 --- a/geyser-plugin/src/initial_account_backfill/rpc.rs +++ b/geyser-plugin/src/initial_account_backfill/rpc.rs @@ -23,17 +23,22 @@ use { pub(crate) async fn fetch_account_events_for_request( client: &RpcClient, + local_rpc_url: &str, pubkeys: &[[u8; 32]], ) -> io::Result> { let mut events = Vec::with_capacity(pubkeys.len()); for chunk in pubkeys.chunks(INITIAL_BACKFILL_MAX_RPC_KEYS_PER_REQUEST) { - events.extend(fetch_account_events_for_chunk(client, chunk).await?); + events.extend( + fetch_account_events_for_chunk(client, local_rpc_url, chunk) + .await?, + ); } Ok(events) } async fn fetch_account_events_for_chunk( client: &RpcClient, + local_rpc_url: &str, pubkeys: &[[u8; 32]], ) -> io::Result> { let keys = pubkeys @@ -106,9 +111,10 @@ async fn fetch_account_events_for_chunk( .with_label_values(&["failed"]) .inc(); warn!( - "Initial account backfill RPC request failed for {} pubkeys, \ + "Initial account backfill RPC request failed for {} pubkeys via {}, \ attempt={}/{}: {error}", pubkeys.len(), + local_rpc_url, attempt, INITIAL_BACKFILL_MAX_ATTEMPTS ); @@ -123,7 +129,24 @@ async fn fetch_account_events_for_chunk( } } - Err(io::Error::other(last_error.unwrap())) + let last_error_message = last_error + .map(|error| error.to_string()) + .unwrap_or_else(|| "unknown error".to_owned()); + Err(io::Error::other(format_exhausted_error( + local_rpc_url, + INITIAL_BACKFILL_MAX_ATTEMPTS, + &last_error_message, + ))) +} + +pub(crate) fn format_exhausted_error( + local_rpc_url: &str, + max_attempts: usize, + last_error_message: &str, +) -> String { + format!( + "initial account backfill RPC failed after {max_attempts} attempts via {local_rpc_url}: {last_error_message}" + ) } pub(crate) const SYSTEM_PROGRAM_ID: Pubkey = @@ -169,3 +192,22 @@ pub(crate) fn map_missing_account( account_age: 0, } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_format_exhausted_error_includes_url_and_attempts() { + let message = format_exhausted_error( + "http://127.0.0.1:8899", + INITIAL_BACKFILL_MAX_ATTEMPTS, + "connection refused", + ); + + assert!(message.starts_with(&format!( + "initial account backfill RPC failed after {INITIAL_BACKFILL_MAX_ATTEMPTS} attempts via http://127.0.0.1:8899: " + ))); + assert!(message.contains("connection refused")); + } +} diff --git a/geyser-plugin/src/ksql.rs b/geyser-plugin/src/ksql.rs index e595a19..99d26dc 100644 --- a/geyser-plugin/src/ksql.rs +++ b/geyser-plugin/src/ksql.rs @@ -32,7 +32,7 @@ impl KsqlPubkeyRestoreClient { } pub(crate) fn fetch_pubkeys(&self) -> io::Result> { - let table = validate_ksql_identifier(&self.table)?; + let table = crate::config::validate_ksql_identifier(&self.table)?; let sql = format!("SELECT PUBKEY FROM {table};"); let query_url = format!("{}/query-stream", self.base_url); debug!( @@ -72,30 +72,6 @@ impl KsqlPubkeyRestoreClient { } } -/// Validates that `identifier` is a safe ksqlDB identifier suitable for -/// direct interpolation into a SQL statement. The identifier must start with -/// an ASCII letter or `_` and may otherwise contain only ASCII alphanumeric -/// characters or `_`. -pub(crate) fn validate_ksql_identifier(identifier: &str) -> io::Result<&str> { - let mut chars = identifier.chars(); - let first = chars - .next() - .ok_or_else(|| io::Error::other("ksql identifier must not be empty"))?; - if !(first.is_ascii_alphabetic() || first == '_') { - return Err(io::Error::other(format!( - "invalid ksql identifier `{identifier}`: must start with an ASCII letter or `_`" - ))); - } - for c in chars { - if !(c.is_ascii_alphanumeric() || c == '_') { - return Err(io::Error::other(format!( - "invalid ksql identifier `{identifier}`: only ASCII alphanumeric characters and `_` are allowed" - ))); - } - } - Ok(identifier) -} - pub(crate) fn parse_pubkeys_stream( reader: impl BufRead, ) -> io::Result> { @@ -172,7 +148,7 @@ pub(crate) fn parse_pubkeys_stream( #[cfg(test)] mod tests { - use super::{parse_pubkeys_stream, validate_ksql_identifier}; + use super::parse_pubkeys_stream; fn pubkey(byte: u8) -> [u8; 32] { [byte; 32] @@ -248,37 +224,4 @@ mod tests { assert!(error.contains("expected 32 decoded PUBKEY bytes")); } - - #[test] - fn test_validates_simple_identifier() { - assert_eq!(validate_ksql_identifier("accounts").unwrap(), "accounts"); - assert_eq!(validate_ksql_identifier("_x").unwrap(), "_x"); - assert_eq!(validate_ksql_identifier("A1_b2").unwrap(), "A1_b2"); - } - - #[test] - fn test_rejects_empty_identifier() { - let error = validate_ksql_identifier("").unwrap_err().to_string(); - assert!(error.contains("must not be empty")); - } - - #[test] - fn test_rejects_identifier_starting_with_digit() { - let error = validate_ksql_identifier("1bad").unwrap_err().to_string(); - assert!(error.contains("must start with an ASCII letter")); - } - - #[test] - fn test_rejects_identifier_with_invalid_characters() { - let error = validate_ksql_identifier("accounts; DROP TABLE x") - .unwrap_err() - .to_string(); - assert!(error.contains("only ASCII alphanumeric")); - } - - #[test] - fn test_rejects_identifier_with_quote() { - let error = validate_ksql_identifier("a\"b").unwrap_err().to_string(); - assert!(error.contains("only ASCII alphanumeric")); - } } diff --git a/geyser-plugin/src/lib.rs b/geyser-plugin/src/lib.rs index beb1cbc..045952a 100644 --- a/geyser-plugin/src/lib.rs +++ b/geyser-plugin/src/lib.rs @@ -21,6 +21,8 @@ mod initial_account_backfill; mod ksql; mod metrics; mod plugin; +#[allow(dead_code)] +mod preflight; mod publisher; mod server; mod version; diff --git a/geyser-plugin/src/metrics.rs b/geyser-plugin/src/metrics.rs index 7cdcfbf..bd37770 100644 --- a/geyser-plugin/src/metrics.rs +++ b/geyser-plugin/src/metrics.rs @@ -6,7 +6,7 @@ use { producer::{DeliveryResult, ProducerContext}, statistics::Statistics, }, - std::sync::Once, + std::sync::OnceLock, }; lazy_static::lazy_static! { @@ -58,38 +58,44 @@ lazy_static::lazy_static! { ).unwrap(); } -pub fn register_metrics() { - static REGISTER: Once = Once::new(); - REGISTER.call_once(|| { - macro_rules! register { - ($collector:ident) => { - REGISTRY - .register(Box::new($collector.clone())) - .expect("collector can't be registered"); - }; - } - register!(VERSION); - register!(UPLOAD_ACCOUNTS_TOTAL); - register!(INITIAL_BACKFILL_REQUESTS_ENQUEUED_TOTAL); - register!(INITIAL_BACKFILL_PUBKEYS_ENQUEUED_TOTAL); - register!(INITIAL_BACKFILL_RPC_ATTEMPTS_TOTAL); - register!(INITIAL_BACKFILL_RPC_FAILURES_TOTAL); - register!(INITIAL_BACKFILL_SNAPSHOTS_TOTAL); - register!(INITIAL_BACKFILL_IN_FLIGHT); - register!(KAFKA_STATS); - - for (key, value) in &[ - ("version", VERSION_INFO.version), - ("solana", VERSION_INFO.solana), - ("git", VERSION_INFO.git), - ("rustc", VERSION_INFO.rustc), - ("buildts", VERSION_INFO.buildts), - ] { - VERSION - .with_label_values(&[key.to_string(), value.to_string()]) - .inc(); - } - }); +pub fn register_metrics() -> Result<(), String> { + static REGISTER_RESULT: OnceLock> = OnceLock::new(); + REGISTER_RESULT + .get_or_init(|| { + register_all_metrics().map_err(|error| error.to_string()) + }) + .clone() +} + +fn register_all_metrics() -> Result<(), prometheus::Error> { + macro_rules! register { + ($collector:ident) => { + REGISTRY.register(Box::new($collector.clone()))?; + }; + } + register!(VERSION); + register!(UPLOAD_ACCOUNTS_TOTAL); + register!(INITIAL_BACKFILL_REQUESTS_ENQUEUED_TOTAL); + register!(INITIAL_BACKFILL_PUBKEYS_ENQUEUED_TOTAL); + register!(INITIAL_BACKFILL_RPC_ATTEMPTS_TOTAL); + register!(INITIAL_BACKFILL_RPC_FAILURES_TOTAL); + register!(INITIAL_BACKFILL_SNAPSHOTS_TOTAL); + register!(INITIAL_BACKFILL_IN_FLIGHT); + register!(KAFKA_STATS); + + for (key, value) in &[ + ("version", VERSION_INFO.version), + ("solana", VERSION_INFO.solana), + ("git", VERSION_INFO.git), + ("rustc", VERSION_INFO.rustc), + ("buildts", VERSION_INFO.buildts), + ] { + VERSION + .with_label_values(&[key.to_string(), value.to_string()]) + .inc(); + } + + Ok(()) } #[derive(Debug, Default, Clone, Copy)] diff --git a/geyser-plugin/src/plugin/mod.rs b/geyser-plugin/src/plugin/mod.rs index fa6a980..f3aa774 100644 --- a/geyser-plugin/src/plugin/mod.rs +++ b/geyser-plugin/src/plugin/mod.rs @@ -39,6 +39,7 @@ use { rdkafka::{ClientConfig, config::FromClientConfigAndContext}, std::{ fmt::{Debug, Formatter}, + io, sync::{Arc, Mutex, MutexGuard}, time::Duration, }, @@ -88,17 +89,24 @@ impl GeyserPlugin for KafkaPlugin { self.name(), config_file ); - let config = Config::read_from(config_file)?; - let (version_n, version_s) = get_rdkafka_version(); info!("rd_kafka_version: {:#08x}, {}", version_n, version_s); - let mut producer_config = ClientConfig::new(); - for (key, value) in &config.kafka.client { - producer_config.set(key, value); - } - producer_config - .set("bootstrap.servers", &config.kafka.bootstrap_servers); + let loaded = crate::preflight::run_static_startup_checks(config_file) + .map_err(startup_error_to_plugin_error)?; + let config = loaded.config; + + crate::preflight::check_admin_bind(&config) + .map_err(startup_error_to_plugin_error)?; + + crate::preflight::check_kafka_readiness(&config) + .map_err(startup_error_to_plugin_error)?; + crate::preflight::check_ksql_readiness(&config) + .map_err(startup_error_to_plugin_error)?; + + let prefetched_restore = Self::fetch_tracking_from_ksql(&config)?; + + let producer_config = build_producer_config(&config); let producer = rdkafka::producer::ThreadedProducer::from_config_and_context( &producer_config, @@ -120,8 +128,8 @@ impl GeyserPlugin for KafkaPlugin { config.plugin.local_rpc_url.clone(), ) .map_err(|error| PluginError::Custom(Box::new(error)))?; - Self::restore_tracking_from_ksql( - &config, + Self::restore_prefetched_tracking( + prefetched_restore, &self.account_subscriptions, &initial_account_backfill, )?; @@ -218,39 +226,68 @@ impl KafkaPlugin { .expect("update_account_topic is unavailable") } - fn restore_tracking_from_ksql( + fn fetch_tracking_from_ksql( config: &Config, - account_subscriptions: &AccountSubscriptions, - initial_account_backfill: &InitialAccountBackfill, - ) -> PluginResult<()> { + ) -> PluginResult> { let Some(raw_url) = config.ksql.url.as_deref() else { - return Ok(()); + return Ok(None); }; let url = raw_url.trim(); - - let table = &config.ksql.table; + let table = config.ksql.table.clone(); info!("Startup ksql restore enabled, url={}, table={}", url, table); - let client = KsqlPubkeyRestoreClient::new(url, table) - .map_err(|error| PluginError::Custom(Box::new(error)))?; - let pubkeys = client - .fetch_pubkeys() - .map_err(|error| PluginError::Custom(Box::new(error)))?; + let client = KsqlPubkeyRestoreClient::new(url, &table).map_err( + |error| { + error!( + "Startup ksql restore failed before plugin initialization completed: {error}" + ); + PluginError::Custom(Box::new(io::Error::other(format!( + "Startup ksql restore failed before plugin initialization completed: {error}" + )))) + }, + )?; + let pubkeys = client.fetch_pubkeys().map_err(|error| { + error!( + "Startup ksql restore failed before plugin initialization completed: {error}" + ); + PluginError::Custom(Box::new(io::Error::other(format!( + "Startup ksql restore failed before plugin initialization completed: {error}" + )))) + })?; let fetched_count = pubkeys.len(); info!( "Fetched {} pubkeys from ksql startup restore", fetched_count ); + Ok(Some(KsqlStartupRestore { + url: url.to_owned(), + table, + pubkeys, + })) + } + + fn restore_prefetched_tracking( + restore: Option, + account_subscriptions: &AccountSubscriptions, + initial_account_backfill: &InitialAccountBackfill, + ) -> PluginResult<()> { + let Some(restore) = restore else { + return Ok(()); + }; + + let fetched_count = restore.pubkeys.len(); let summary = restore_pubkeys_in_chunks( account_subscriptions, initial_account_backfill.handle_ref(), - pubkeys, + restore.pubkeys, ) .map_err(add_accounts_error_to_plugin_error)?; info!( - "Completed startup ksql restore, fetched_count={}, deduplicated_count={}, chunk_count={}, accepted_count={}, newly_added_count={}, retried_backfill_count={}, duplicate_count={}", + "Completed startup ksql restore, url={}, table={}, fetched_count={}, deduplicated_count={}, chunk_count={}, accepted_count={}, newly_added_count={}, retried_backfill_count={}, duplicate_count={}", + restore.url, + restore.table, fetched_count, summary.deduplicated_count, summary.chunk_count, @@ -264,6 +301,29 @@ impl KafkaPlugin { } } +#[derive(Debug, Default)] +struct KsqlStartupRestore { + url: String, + table: String, + pubkeys: Vec<[u8; 32]>, +} + +fn build_producer_config(config: &Config) -> ClientConfig { + let mut producer_config = ClientConfig::new(); + for (key, value) in &config.kafka.client { + producer_config.set(key, value); + } + producer_config.set("bootstrap.servers", &config.kafka.bootstrap_servers); + producer_config +} + +fn startup_error_to_plugin_error( + error: crate::preflight::StartupError, +) -> PluginError { + error!("{error}"); + std::process::exit(1); +} + #[derive(Clone, Debug, Default, PartialEq, Eq)] struct RestoreTrackingSummary { deduplicated_count: usize, @@ -374,19 +434,13 @@ mod tests { } #[test] - fn test_restore_tracking_from_ksql_is_noop_when_disabled() { + fn test_fetch_tracking_from_ksql_is_noop_when_disabled() { let config = Config::default(); let subs = AccountSubscriptions::new(); - let initial_account_backfill = - crate::initial_account_backfill::InitialAccountBackfill::default(); - let result = KafkaPlugin::restore_tracking_from_ksql( - &config, - &subs, - &initial_account_backfill, - ); + let result = KafkaPlugin::fetch_tracking_from_ksql(&config); - assert!(result.is_ok()); + assert!(matches!(result, Ok(None))); assert!(!subs.contains_sync(&pk(1))); } diff --git a/geyser-plugin/src/preflight.rs b/geyser-plugin/src/preflight.rs new file mode 100644 index 0000000..2862bbf --- /dev/null +++ b/geyser-plugin/src/preflight.rs @@ -0,0 +1,635 @@ +// Copyright 2022 Blockdaemon Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use { + agave_geyser_plugin_interface::geyser_plugin_interface::GeyserPluginError, + rdkafka::{ + ClientConfig, + producer::{BaseProducer, Producer}, + }, + std::{error::Error, fmt, fs, path::PathBuf}, +}; + +pub(crate) const STARTUP_CHECK_TIMEOUT: std::time::Duration = + std::time::Duration::from_secs(3); + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct StartupError { + pub subsystem: &'static str, + pub field: Option<&'static str>, + pub target: Option, + pub cause: String, + pub action: String, +} + +impl StartupError { + pub fn new( + subsystem: &'static str, + field: Option<&'static str>, + target: Option>, + cause: impl Into, + action: impl Into, + ) -> Self { + Self { + subsystem, + field, + target: target.map(Into::into), + cause: cause.into(), + action: action.into(), + } + } +} + +impl fmt::Display for StartupError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + writeln!(f, "ERROR {} startup check failed", self.subsystem)?; + if let Some(field) = self.field { + writeln!(f, " field: {field}")?; + } + if let Some(target) = &self.target { + writeln!(f, " target: {target}")?; + } + writeln!(f, " cause: {}", self.cause)?; + write!(f, " action: {}", self.action) + } +} + +impl Error for StartupError {} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct ValidatorConfigPaths { + pub wrapper_path: PathBuf, + pub libpath: PathBuf, + pub runtime_config_path: PathBuf, +} + +#[derive(Debug)] +pub struct LoadedPluginConfig { + pub paths: Option, + pub config: crate::config::Config, +} + +pub(crate) fn run_static_startup_checks( + config_path: impl AsRef, +) -> Result { + let loaded = load_config_with_paths(config_path)?; + if let Some(paths) = &loaded.paths + && !paths.libpath.exists() + { + return Err(StartupError::new( + "config", + Some("libpath"), + Some(paths.libpath.display().to_string()), + "plugin shared library does not exist", + "build the plugin shared library or update libpath in the validator JSON wrapper", + )); + } + Ok(loaded) +} + +pub fn check_ksql_readiness( + config: &crate::config::Config, +) -> Result<(), StartupError> { + let Some(raw_url) = config.ksql.url.as_deref() else { + return Ok(()); + }; + let url = raw_url.trim(); + + crate::config::validate_ksql_identifier(&config.ksql.table).map_err( + |error| { + StartupError::new( + "ksql", + Some("ksql.table"), + Some(config.ksql.table.clone()), + format!("invalid ksql.table identifier: {error}"), + "fix ksql.table to be a valid SQL identifier", + ) + }, + )?; + + let client = crate::ksql::KsqlPubkeyRestoreClient::new( + url, + &config.ksql.table, + ) + .map_err(|error| { + StartupError::new( + "ksql", + Some("ksql.url"), + Some(url.to_owned()), + format!("failed to run startup restore query: {error}"), + "ensure ksqlDB is reachable at ksql.url, fix ksql.url/ksql.table, or remove ksql.url to disable startup restore", + ) + })?; + + client.fetch_pubkeys().map_err(|error| { + StartupError::new( + "ksql", + Some("ksql.url"), + Some(url.to_owned()), + format!("failed to run startup restore query: {error}"), + "ensure ksqlDB is reachable at ksql.url, fix ksql.url/ksql.table, or remove ksql.url to disable startup restore", + ) + })?; + + Ok(()) +} + +pub fn check_admin_bind( + config: &crate::config::Config, +) -> Result<(), StartupError> { + match std::net::TcpListener::bind(config.plugin.admin) { + Ok(listener) => { + drop(listener); + Ok(()) + } + Err(error) => Err(StartupError::new( + "admin", + Some("plugin.admin"), + Some(config.plugin.admin.to_string()), + format!("failed to bind admin HTTP address: {error}"), + "choose a free plugin.admin port or stop the process currently using it", + )), + } +} + +pub fn check_kafka_readiness( + config: &crate::config::Config, +) -> Result<(), StartupError> { + let mut producer_config = ClientConfig::new(); + for (key, value) in &config.kafka.client { + producer_config.set(key, value); + } + producer_config.set("bootstrap.servers", &config.kafka.bootstrap_servers); + + let producer: BaseProducer = producer_config.create().map_err(|error| { + StartupError::new( + "kafka", + Some("kafka.client"), + Some(config.kafka.bootstrap_servers.clone()), + format!( + "failed to create kafka producer for readiness check: {error}" + ), + "fix kafka.bootstrap_servers or kafka.client settings", + ) + })?; + + let metadata = match producer + .client() + .fetch_metadata(Some(&config.kafka.topic), STARTUP_CHECK_TIMEOUT) + { + Ok(metadata) => metadata, + Err(error) => { + drop_readiness_producer(producer); + return Err(StartupError::new( + "kafka", + Some("kafka.bootstrap_servers"), + Some(config.kafka.bootstrap_servers.clone()), + format!("failed to fetch kafka metadata: {error}"), + "ensure Kafka is reachable at kafka.bootstrap_servers or update kafka.bootstrap_servers", + )); + } + }; + + drop_readiness_producer(producer); + + validate_topic_metadata(&metadata, &config.kafka.topic).map_err(|cause| { + StartupError::new( + "kafka", + Some("kafka.topic"), + Some(config.kafka.topic.clone()), + cause, + "create the topic or enable broker topic auto-creation before launching the validator", + ) + }) +} + +fn drop_readiness_producer(producer: BaseProducer) { + let _ = producer.flush(STARTUP_CHECK_TIMEOUT); + drop(producer); + std::thread::sleep(std::time::Duration::from_millis(100)); +} + +fn validate_topic_metadata( + metadata: &rdkafka::metadata::Metadata, + topic: &str, +) -> Result<(), String> { + validate_topic_entries( + metadata.topics().iter().map(|topic_metadata| { + ( + topic_metadata.name(), + topic_metadata.error().map(|error| format!("{error:?}")), + ) + }), + topic, + ) +} + +fn validate_topic_entries<'a>( + topic_entries: impl IntoIterator)>, + topic: &str, +) -> Result<(), String> { + let Some((_, error)) = topic_entries + .into_iter() + .find(|(topic_name, _)| *topic_name == topic) + else { + return Err("topic is not present in broker metadata".to_string()); + }; + + if let Some(error) = error { + return Err(format!("topic metadata error: {error}")); + } + + Ok(()) +} + +pub fn load_config_with_paths( + config_path: impl AsRef, +) -> Result { + let config_path = config_path.as_ref(); + let contents = fs::read_to_string(config_path).map_err(|error| { + StartupError::new( + "config", + None, + Some(config_path.display().to_string()), + format!("failed to read config file: {error}"), + "check that the file exists and is readable", + ) + })?; + + match serde_json::from_str::(&contents) { + Ok(wrapper) => { + let libpath = + resolve_wrapper_relative_path(config_path, &wrapper.libpath); + let runtime_config_path = + crate::config::resolve_runtime_config_path( + config_path, + &wrapper.config_file, + ); + if !runtime_config_path.exists() { + return Err(StartupError::new( + "config", + Some("config_file"), + Some(runtime_config_path.display().to_string()), + "runtime TOML config does not exist", + "create the runtime TOML config or update config_file in the validator JSON wrapper", + )); + } + + let config = + read_parse_validate_runtime_config(&runtime_config_path)?; + Ok(LoadedPluginConfig { + paths: Some(ValidatorConfigPaths { + wrapper_path: config_path.to_path_buf(), + libpath, + runtime_config_path, + }), + config, + }) + } + Err(error) => { + let looks_like_json = config_path + .extension() + .and_then(|ext| ext.to_str()) + .is_some_and(|ext| ext.eq_ignore_ascii_case("json")) + || matches!( + contents.trim_start().as_bytes().first(), + Some(b'{') | Some(b'[') + ); + if looks_like_json { + return Err(StartupError::new( + "config", + None, + Some(config_path.display().to_string()), + format!("invalid validator config JSON: {error}"), + "fix the validator JSON wrapper; it must contain libpath and config_file", + )); + } + + let config = parse_validate_runtime_config(&contents, config_path)?; + Ok(LoadedPluginConfig { + paths: None, + config, + }) + } + } +} + +fn resolve_wrapper_relative_path( + wrapper_path: &std::path::Path, + path: &std::path::Path, +) -> PathBuf { + if path.is_absolute() { + path.to_path_buf() + } else { + wrapper_path + .parent() + .unwrap_or_else(|| std::path::Path::new(".")) + .join(path) + } +} + +fn read_parse_validate_runtime_config( + path: &std::path::Path, +) -> Result { + let contents = fs::read_to_string(path).map_err(|error| { + config_error_to_startup_error( + path, + GeyserPluginError::ConfigFileReadError { + msg: format!("failed to read runtime TOML config: {error}"), + }, + ) + })?; + parse_validate_runtime_config(&contents, path) +} + +fn parse_validate_runtime_config( + contents: &str, + path: &std::path::Path, +) -> Result { + let mut config: crate::config::Config = toml::from_str(contents) + .map_err(|error| toml_error_to_startup_error(path, error))?; + config.fill_defaults(); + config + .validate() + .map_err(|error| config_error_to_startup_error(path, error))?; + Ok(config) +} + +fn toml_error_to_startup_error( + path: &std::path::Path, + error: toml::de::Error, +) -> StartupError { + StartupError::new( + "config", + None, + Some(path.display().to_string()), + error.to_string(), + "fix the runtime TOML config", + ) +} + +fn config_error_to_startup_error( + path: &std::path::Path, + error: GeyserPluginError, +) -> StartupError { + let cause = match error { + GeyserPluginError::ConfigFileReadError { msg } => msg, + other => other.to_string(), + }; + StartupError::new( + "config", + None, + Some(path.display().to_string()), + cause, + "fix the runtime TOML config", + ) +} + +#[cfg(test)] +mod tests { + use super::{ + check_admin_bind, load_config_with_paths, run_static_startup_checks, + validate_topic_entries, + }; + use std::{ + fs, + net::TcpListener, + path::PathBuf, + time::{SystemTime, UNIX_EPOCH}, + }; + + fn temp_dir(test_name: &str) -> PathBuf { + std::env::temp_dir().join(format!( + "geyser-plugin-preflight-{test_name}-{}", + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_nanos() + )) + } + + fn valid_runtime_config() -> &'static str { + r#" +libpath = "target/release/libsolana_accountsdb_plugin_kafka.so" + +[kafka] +bootstrap_servers = "localhost:9092" +topic = "solana.testnet.account_updates" + +[plugin] +local_rpc_url = "http://127.0.0.1:8899" +admin = "127.0.0.1:8080" +"# + } + + #[test] + fn malformed_validator_json_reports_invalid_validator_config_json() { + let base = temp_dir("malformed-json"); + fs::create_dir_all(&base).unwrap(); + let wrapper_path = base.join("plugin-config.json"); + fs::write(&wrapper_path, r#"{ "libpath": "plugin.so", "#).unwrap(); + + let error = load_config_with_paths(&wrapper_path).unwrap_err(); + assert_eq!(error.subsystem, "config"); + assert!(error.cause.contains("invalid validator config JSON")); + + fs::remove_dir_all(&base).unwrap(); + } + + #[test] + fn missing_config_file_target_reports_config_file_field() { + let base = temp_dir("missing-config-file"); + fs::create_dir_all(&base).unwrap(); + let wrapper_path = base.join("plugin-config.json"); + fs::write( + &wrapper_path, + r#"{ + "libpath": "plugin.so", + "config_file": "missing.toml" +}"#, + ) + .unwrap(); + + let error = load_config_with_paths(&wrapper_path).unwrap_err(); + assert_eq!(error.subsystem, "config"); + assert_eq!(error.field, Some("config_file")); + assert!(error.target.as_deref().unwrap().ends_with("missing.toml")); + + fs::remove_dir_all(&base).unwrap(); + } + + #[test] + fn malformed_runtime_toml_reports_config_subsystem() { + let base = temp_dir("malformed-runtime-toml"); + fs::create_dir_all(&base).unwrap(); + let runtime_path = base.join("runtime.toml"); + fs::write(&runtime_path, "[kafka\nbootstrap_servers = nope").unwrap(); + let wrapper_path = base.join("plugin-config.json"); + fs::write( + &wrapper_path, + r#"{ + "libpath": "plugin.so", + "config_file": "runtime.toml" +}"#, + ) + .unwrap(); + + let error = load_config_with_paths(&wrapper_path).unwrap_err(); + assert_eq!(error.subsystem, "config"); + assert!(error.action.contains("fix the runtime TOML config")); + + fs::remove_dir_all(&base).unwrap(); + } + + #[test] + fn direct_toml_config_loads_successfully() { + let base = temp_dir("direct-toml"); + fs::create_dir_all(&base).unwrap(); + let runtime_path = base.join("runtime.toml"); + fs::write(&runtime_path, valid_runtime_config()).unwrap(); + + let loaded = load_config_with_paths(&runtime_path).unwrap(); + assert!(loaded.paths.is_none()); + assert_eq!(loaded.config.kafka.topic, "solana.testnet.account_updates"); + + fs::remove_dir_all(&base).unwrap(); + } + + #[test] + fn static_startup_checks_report_missing_libpath() { + let base = temp_dir("missing-libpath"); + fs::create_dir_all(&base).unwrap(); + let runtime_path = base.join("runtime.toml"); + fs::write(&runtime_path, valid_runtime_config()).unwrap(); + let wrapper_path = base.join("plugin-config.json"); + fs::write( + &wrapper_path, + r#"{ + "libpath": "missing-plugin.so", + "config_file": "runtime.toml" +}"#, + ) + .unwrap(); + + let error = run_static_startup_checks(&wrapper_path).unwrap_err(); + assert_eq!(error.subsystem, "config"); + assert_eq!(error.field, Some("libpath")); + assert!( + error + .target + .as_deref() + .unwrap() + .ends_with("missing-plugin.so") + ); + assert_eq!(error.cause, "plugin shared library does not exist"); + assert!(error.action.contains("build the plugin shared library")); + + fs::remove_dir_all(&base).unwrap(); + } + + #[test] + fn static_startup_checks_accept_existing_libpath() { + let base = temp_dir("existing-libpath"); + fs::create_dir_all(&base).unwrap(); + let runtime_path = base.join("runtime.toml"); + fs::write(&runtime_path, valid_runtime_config()).unwrap(); + let libpath = base.join("plugin.so"); + fs::write(&libpath, "").unwrap(); + let wrapper_path = base.join("plugin-config.json"); + fs::write( + &wrapper_path, + r#"{ + "libpath": "plugin.so", + "config_file": "runtime.toml" +}"#, + ) + .unwrap(); + + let loaded = run_static_startup_checks(&wrapper_path).unwrap(); + let paths = loaded.paths.unwrap(); + assert_eq!(paths.libpath, libpath); + + fs::remove_dir_all(&base).unwrap(); + } + + #[test] + fn topic_metadata_validation_accepts_present_topic_without_error() { + let result = validate_topic_entries( + [("other", None), ("solana.testnet.account_updates", None)], + "solana.testnet.account_updates", + ); + + assert_eq!(result, Ok(())); + } + + #[test] + fn topic_metadata_validation_reports_missing_topic() { + let error = + validate_topic_entries([("other", None)], "missing").unwrap_err(); + + assert_eq!(error, "topic is not present in broker metadata"); + } + + #[test] + fn topic_metadata_validation_reports_topic_error() { + let error = validate_topic_entries( + [( + "solana.testnet.account_updates", + Some("unknown topic".into()), + )], + "solana.testnet.account_updates", + ) + .unwrap_err(); + + assert_eq!(error, "topic metadata error: unknown topic"); + } + + #[test] + fn check_admin_bind_reports_address_in_use() { + let listener = TcpListener::bind("127.0.0.1:0").unwrap(); + let local_addr = listener.local_addr().unwrap(); + + let mut config = crate::config::Config::default(); + config.plugin.admin = local_addr; + + let error = check_admin_bind(&config).unwrap_err(); + assert_eq!(error.subsystem, "admin"); + assert_eq!(error.field, Some("plugin.admin")); + assert_eq!( + error.target.as_deref(), + Some(local_addr.to_string().as_str()) + ); + assert!(error.cause.contains("failed to bind admin HTTP address")); + assert!(error.action.contains("free plugin.admin port")); + + drop(listener); + } + + #[test] + fn check_admin_bind_succeeds_on_free_port() { + let listener = TcpListener::bind("127.0.0.1:0").unwrap(); + let local_addr = listener.local_addr().unwrap(); + drop(listener); + + let mut config = crate::config::Config::default(); + config.plugin.admin = local_addr; + + // It is possible (though unlikely) for the OS to assign the port to a + // different process between the drop and the bind here. Treat both + // outcomes as acceptable for this test of the success path. + if let Err(error) = check_admin_bind(&config) { + panic!("expected admin bind to succeed, got: {error}"); + } + } +} diff --git a/geyser-plugin/src/server/accounts.rs b/geyser-plugin/src/server/accounts.rs index c13878a..5b5e11f 100644 --- a/geyser-plugin/src/server/accounts.rs +++ b/geyser-plugin/src/server/accounts.rs @@ -190,15 +190,29 @@ pub async fn handle_post_accounts( } Err(AddAccountsError::QueueFull(outcome)) => json_response( StatusCode::SERVICE_UNAVAILABLE, - &AccountsResponse::from(outcome), + &BackfillUnavailableResponse { + error: "initial account backfill queue is full; retry the request after the validator RPC is available" + .to_owned(), + accounts: AccountsResponse::from(outcome), + }, ), Err(AddAccountsError::BackfillUnavailable(outcome)) => json_response( StatusCode::INTERNAL_SERVER_ERROR, - &AccountsResponse::from(outcome), + &BackfillUnavailableResponse { + error: "initial account backfill enqueue failed; retry the request after the validator RPC is available" + .to_owned(), + accounts: AccountsResponse::from(outcome), + }, ), } } +#[derive(serde::Serialize)] +struct BackfillUnavailableResponse { + error: String, + accounts: AccountsResponse, +} + impl From for AccountsResponse { fn from(outcome: AddAccountsOutcome) -> Self { Self { diff --git a/geyser-plugin/src/server/mod.rs b/geyser-plugin/src/server/mod.rs index 4eaf21f..bb3f9ad 100644 --- a/geyser-plugin/src/server/mod.rs +++ b/geyser-plugin/src/server/mod.rs @@ -32,12 +32,26 @@ impl HttpService { initial_account_backfill: InitialAccountBackfillHandle, metrics_enabled: bool, ) -> IoResult { - if metrics_enabled { - register_metrics(); + if metrics_enabled && let Err(error) = register_metrics() { + let message = + format!("failed to register Prometheus metrics: {error}"); + error!("{message}"); + return Err(std::io::Error::other(message)); } let runtime = Runtime::new()?; - let listener = runtime.block_on(TcpListener::bind(address))?; + let listener = + runtime + .block_on(TcpListener::bind(address)) + .map_err(|error| { + let message = format!( + "Failed to bind admin HTTP API to {address}: {error}. \ + Choose a free plugin.admin port or stop the process \ + currently using it." + ); + error!("{message}"); + std::io::Error::new(error.kind(), message) + })?; runtime.spawn(async move { loop { diff --git a/geyser-plugin/src/server/prom.rs b/geyser-plugin/src/server/prom.rs index 06bb0ae..a27d494 100644 --- a/geyser-plugin/src/server/prom.rs +++ b/geyser-plugin/src/server/prom.rs @@ -10,7 +10,11 @@ pub fn metrics_handler() -> Response> { error!("could not encode custom metrics: {}", error); String::new() }); - Response::builder() - .body(Full::new(Bytes::from(metrics))) - .unwrap() + match Response::builder().body(Full::new(Bytes::from(metrics))) { + Ok(response) => response, + Err(error) => { + error!("failed to build metrics response: {error}"); + Response::new(Full::new(Bytes::new())) + } + } } diff --git a/grpc-service/src/app.rs b/grpc-service/src/app.rs index 9b58d39..1330a6a 100644 --- a/grpc-service/src/app.rs +++ b/grpc-service/src/app.rs @@ -26,6 +26,7 @@ pub struct App< sink: A, status_sink: S, readiness: ServiceReadiness, + shutdown: CancellationToken, } impl @@ -54,6 +55,7 @@ impl ConsoleSink::new(), ConsoleSink::new(), readiness, + shutdown, )) } } @@ -88,6 +90,7 @@ impl sink, ConsoleSink::new(), readiness, + shutdown, ); Ok((app, grpc)) @@ -123,6 +126,7 @@ impl sink, ConsoleSink::new(), readiness, + shutdown, ); Ok((app, grpc)) @@ -139,6 +143,7 @@ impl sink: A, status_sink: S, readiness: ServiceReadiness, + shutdown: CancellationToken, ) -> Self { Self { config, @@ -147,6 +152,7 @@ impl sink, status_sink, readiness, + shutdown, } } @@ -186,6 +192,7 @@ impl ); } + let _shutdown = &self.shutdown; self.account_update_source .run(self.config.pubkey_filter.as_ref(), |message| { let event = AccountEvent::Live(message); @@ -214,6 +221,8 @@ mod tests { use crate::traits::{ AccountSink, AccountUpdateSource, SnapshotStore, StatusSink, }; + use tokio_util::sync::CancellationToken; + fn config(pubkey_filter: Option) -> Config { Config { kafka: KafkaConfig { @@ -458,6 +467,7 @@ mod tests { sink.clone(), status_sink.clone(), ServiceReadiness::ready_for_test(), + CancellationToken::new(), ); app.run().await.unwrap(); @@ -483,6 +493,7 @@ mod tests { RecordingSink::new(false, false), status_sink.clone(), ServiceReadiness::ready_for_test(), + CancellationToken::new(), ); app.run().await.unwrap(); @@ -508,6 +519,7 @@ mod tests { RecordingSink::new(false, false), status_sink.clone(), ServiceReadiness::ready_for_test(), + CancellationToken::new(), ); app.run().await.unwrap(); @@ -536,6 +548,7 @@ mod tests { RecordingSink::new(false, false), RecordingStatusSink::new(), ServiceReadiness::ready_for_test(), + CancellationToken::new(), ); let error = app.run().await.unwrap_err(); @@ -557,6 +570,7 @@ mod tests { RecordingSink::new(true, false), RecordingStatusSink::new(), ServiceReadiness::ready_for_test(), + CancellationToken::new(), ); let error = app.run().await.unwrap_err(); @@ -577,6 +591,7 @@ mod tests { RecordingSink::new(false, true), RecordingStatusSink::new(), ServiceReadiness::ready_for_test(), + CancellationToken::new(), ); let error = app.run().await.unwrap_err(); @@ -597,6 +612,7 @@ mod tests { RecordingSink::new(false, false), RecordingStatusSink::new(), ServiceReadiness::ready_for_test(), + CancellationToken::new(), ); let error = app.run().await.unwrap_err();