Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
70 commits
Select commit Hold shift + click to select a range
eca4113
feat: add ix-tests integration test harness
thlorenz Apr 22, 2026
7cb42b6
feat: implement gRPC ping RPC and add test
thlorenz Apr 22, 2026
d7dca98
feat: add scenario layout, accounts, artifacts, and runner
thlorenz Apr 22, 2026
77ac4a8
chore: add ix-tests grpc-service config files
thlorenz Apr 22, 2026
5973b71
feat: add service process controller with ping readiness probe
thlorenz Apr 22, 2026
38391bd
feat: add gRPC test client and observation log
thlorenz Apr 22, 2026
e43b0b0
feat: add validator driver and keypair-backed accounts
thlorenz Apr 22, 2026
c088b33
feat: add ix checkpoint runner
thlorenz Apr 22, 2026
4b8445c
feat: add ix single basic scenario
thlorenz Apr 22, 2026
88515b3
feat: add ix single load scenario
thlorenz Apr 22, 2026
2dbea64
feat: add ix dual concurrent scenario
thlorenz Apr 22, 2026
48d2c80
feat: add ix dual restart scenario
thlorenz Apr 22, 2026
5431e2b
feat: persist ix failure artifacts
thlorenz Apr 22, 2026
2112eb5
docs: add ix test root workflows
thlorenz Apr 22, 2026
efaa669
Merge branch 'master' into ix-tests
thlorenz Apr 27, 2026
a499959
chore: fixes + log improvs in ix-tests
thlorenz Apr 28, 2026
70f37a7
chore: log update lamports
thlorenz Apr 28, 2026
fdb3fb5
chore: improve b58 conversions
thlorenz Apr 28, 2026
63bbcf0
refactor(ix-tests): satisfy checkpoints from required only
thlorenz Apr 28, 2026
f3e6c4c
refactor: remove allowed field from ClientCheckpoint
thlorenz Apr 28, 2026
654597a
fix(ix-tests): consume matched required entries incrementally
thlorenz Apr 28, 2026
da78bbe
chore: simplify rust
thlorenz Apr 28, 2026
da71800
chore: minor fixes in ix-tests
thlorenz Apr 28, 2026
554d5a8
chore: simplify checkpoint asserts
thlorenz Apr 28, 2026
3df8168
chore: task to reset kafka/ksql state
thlorenz Apr 28, 2026
75fd402
chore: shorter wait times
thlorenz Apr 28, 2026
e26f199
chore: dumping logs on error
thlorenz Apr 28, 2026
2210038
chore: fix geyser plugin launch
thlorenz May 1, 2026
d384b78
feat: add rpc_url to ValidatorConfig
thlorenz May 1, 2026
f7331d2
feat: add ServiceReadiness primitive
thlorenz May 1, 2026
5aec64f
feat: gate Ping on ServiceReadiness
thlorenz May 1, 2026
59ac0aa
feat: add startup preflight probes
thlorenz May 1, 2026
881541a
feat: orchestrate preflight and readiness in App::run
thlorenz May 1, 2026
30cb19b
refactor: distinguish preflight-pending from connection errors
thlorenz May 1, 2026
32475cd
chore: convenience make tasks
thlorenz May 1, 2026
6353c8c
chore: increase checkpoint timeout
thlorenz May 1, 2026
d22a136
refactor: encode ix-test service ownership
thlorenz May 1, 2026
5ed0026
feat: attach to ready external grpc-service
thlorenz May 1, 2026
3200a68
feat: add single triage scenario
thlorenz May 1, 2026
6c835de
feat: wire single triage scenario
thlorenz May 1, 2026
a2adbc2
feat: add triage logging
thlorenz May 1, 2026
8be8b76
feat: scope grpc service configs per run
thlorenz May 3, 2026
6242cda
feat: generate run-scoped grpc service configs
thlorenz May 3, 2026
4d4910b
feat: add grpc-service cooperative shutdown
thlorenz May 3, 2026
69ea2a0
feat: request graceful ix-test service shutdown
thlorenz May 3, 2026
583a05b
feat: gate grpc readiness on kafka assignment
thlorenz May 3, 2026
263a272
feat(grpc-service): add startup delay observability logs
thlorenz May 3, 2026
328592d
feat(kafka-setup): tune local kafka rebalance timing
thlorenz May 3, 2026
20fc740
fix: align grpc probe and bind ports
thlorenz May 5, 2026
45fb673
fix: abort receive task on shutdown
thlorenz May 5, 2026
22ab4d7
fix: toml issues
thlorenz May 5, 2026
1d80a3b
feat: switch ix-tests to random keypairs per run
thlorenz May 5, 2026
0aff553
chore: add tracing::info! logging for generated NamedAccount pubkey m…
thlorenz May 5, 2026
c05a7e3
chore: fix single basic scenario assert
thlorenz May 5, 2026
6c02ca6
fix: accounts need to be rent exempt
thlorenz May 11, 2026
83532bd
chore: parallel airdrops
thlorenz May 11, 2026
ae878d2
fix: report failed tx status
thlorenz May 11, 2026
a0028ba
chore: confirm sig improvements
thlorenz May 11, 2026
aa96827
fix: relax load checkpoint matching
thlorenz May 11, 2026
8b7ee7e
fix: name dual restart balances
thlorenz May 11, 2026
057e4a9
feat: add path-aware plugin config preflight
thlorenz May 12, 2026
2e3f846
feat: run static startup checks on plugin load
thlorenz May 12, 2026
4d2a4ce
feat: add kafka startup readiness check
thlorenz May 12, 2026
3f543dd
feat: split ksql startup restore into prefetch and restore
thlorenz May 12, 2026
a60c0b0
feat: add admin bind startup check
thlorenz May 12, 2026
f3fe5a8
feat: validate local_rpc_url and improve backfill errors
thlorenz May 12, 2026
c38d0c8
docs: document safe-start startup checks
thlorenz May 14, 2026
458884b
fix: complete safe-start verification cleanup
thlorenz May 14, 2026
bcded8a
fix: generalize preflight error actions
thlorenz May 14, 2026
dbf0709
Merge branch 'master' into validator-safe-start
thlorenz May 14, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 59 additions & 0 deletions geyser-plugin/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,65 @@ admin = "127.0.0.1:3000"

`kafka.bootstrap_servers`, `kafka.topic`, `plugin.local_rpc_url`, and `plugin.admin` are required. The `admin` bind address serves `POST /filters/accounts` and, when `plugin.metrics` is `true`, also `GET /metrics`. If `ksql.url` is set, it must be a valid absolute `http` or `https` base URL and startup will fail if the restore query cannot complete. Legacy filter arrays and legacy transaction, slot-status, block, and wrapping options are rejected during config parsing.

## Startup checks

Startup checks run from `KafkaPlugin::on_load` during normal validator/plugin startup. They run whether the validator is launched through `make geyser-plugin-launch` or directly with `solana-test-validator --geyser-plugin-config ...`; no separate preflight binary or Makefile target is required.

Startup validates:

- validator JSON wrapper
- runtime TOML config
- plugin library path
- admin bind address
- ksqlDB startup restore when `ksql.url` is configured
- Kafka bootstrap/topic readiness
- local RPC URL syntax only, not local RPC liveness

The local RPC endpoint is not required to be reachable during startup checks because it belongs to the validator being launched.

### Safe-start manual test matrix

| Scenario | Temporary change | Expected prefix |
| --- | --- | --- |
| malformed validator JSON | invalid JSON in wrapper copy | Agave rejects the wrapper before plugin load with `FailedToLoadPlugin`; no segfault |
| missing runtime TOML | JSON `config_file` points to missing TOML | `ERROR config startup check failed` |
| malformed runtime TOML | invalid TOML in runtime copy | `ERROR config startup check failed` |
| missing Kafka bootstrap | empty `kafka.bootstrap_servers` | `ERROR config startup check failed` |
| Kafka down | no Kafka on configured bootstrap | `ERROR kafka startup check failed` |
| ksqlDB down | `ksql.url = "http://127.0.0.1:1"` | `ERROR ksql startup check failed` |
| invalid ksql table | `table = "bad-name"` | `ERROR config startup check failed` |
| admin port in use | keep listener on configured port | `ERROR admin startup check failed` |
| malformed local RPC URL | `local_rpc_url = "127.0.0.1:8899"` | `ERROR config startup check failed` |

Proof command for the original issue:

```shell
make geyser-plugin-launch
```

Expected with no dependencies running:

- exits non-zero
- prints a Kafka startup check error with action `ensure Kafka is reachable at kafka.bootstrap_servers or update kafka.bootstrap_servers`
- validator/plugin startup exits gracefully
- does not print `Segmentation fault`

Direct validator path that must receive the same checks:

```shell
cd geyser-plugin
solana-test-validator --log --reset --geyser-plugin-config plugin-config.json
```

Expected failures and messages must match the Makefile path because the checks live in `KafkaPlugin::on_load`.

Success path:

```shell
make kafka-ready
make geyser-plugin-launch
```

## Whitelist Management

Account inclusion is managed through the HTTP API:
Expand Down
256 changes: 241 additions & 15 deletions geyser-plugin/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,10 @@ pub struct PluginConfig {

#[derive(Debug, Deserialize)]
#[serde(deny_unknown_fields)]
struct ValidatorConfig {
pub(crate) struct ValidatorConfig {
#[allow(dead_code)]
libpath: String,
config_file: PathBuf,
pub(crate) libpath: PathBuf,
pub(crate) config_file: PathBuf,
}

fn default_shutdown_timeout_ms() -> u64 {
Expand Down Expand Up @@ -125,6 +125,7 @@ impl Default for KsqlConfig {

impl Config {
/// Read plugin config from either a validator JSON wrapper or a TOML runtime config.
#[allow(dead_code)]
pub fn read_from<P: AsRef<Path>>(config_path: P) -> PluginResult<Self> {
let config_path = config_path.as_ref();
let contents = read_to_string(config_path)?;
Expand Down Expand Up @@ -171,14 +172,14 @@ impl Config {
}
}

fn fill_defaults(&mut self) {
pub(crate) fn fill_defaults(&mut self) {
self.set_default("request.required.acks", "1");
self.set_default("message.timeout.ms", "30000");
self.set_default("compression.type", "lz4");
self.set_default("partitioner", "murmur2_random");
}

fn validate(&self) -> PluginResult<()> {
pub(crate) fn validate(&self) -> PluginResult<()> {
if self.kafka.bootstrap_servers.trim().is_empty() {
return Err(GeyserPluginError::ConfigFileReadError {
msg: "missing required config field `kafka.bootstrap_servers`"
Expand All @@ -192,10 +193,40 @@ impl Config {
});
}

if self.plugin.local_rpc_url.trim().is_empty() {
let trimmed_local_rpc_url = self.plugin.local_rpc_url.trim();
if trimmed_local_rpc_url.is_empty() {
return Err(GeyserPluginError::ConfigFileReadError {
msg: "missing required config field `plugin.local_rpc_url`"
.to_owned(),
msg:
"invalid config field `plugin.local_rpc_url`: URL must not be empty"
.to_owned(),
});
}

let parsed_local_rpc_url =
Url::parse(trimmed_local_rpc_url).map_err(|error| {
GeyserPluginError::ConfigFileReadError {
msg: format!(
"invalid config field `plugin.local_rpc_url`: {error}"
),
}
})?;

match parsed_local_rpc_url.scheme() {
"http" | "https" => {}
scheme => {
return Err(GeyserPluginError::ConfigFileReadError {
msg: format!(
"invalid config field `plugin.local_rpc_url`: unsupported scheme `{scheme}`"
),
});
}
}

if !parsed_local_rpc_url.has_host() {
return Err(GeyserPluginError::ConfigFileReadError {
msg:
"invalid config field `plugin.local_rpc_url`: host is required"
.to_owned(),
});
}

Expand Down Expand Up @@ -238,26 +269,53 @@ impl Config {
.to_owned(),
});
}
}

if self.ksql.table.trim().is_empty() {
return Err(GeyserPluginError::ConfigFileReadError {
msg: "invalid config field `ksql.table`: table must not be empty".to_owned(),
});
validate_ksql_identifier(&self.ksql.table).map_err(|error| {
GeyserPluginError::ConfigFileReadError {
msg: format!("invalid config field `ksql.table`: {error}"),
}
}
})?;

Ok(())
}
}

/// Validates that `identifier` is a safe ksqlDB identifier suitable for
/// direct interpolation into a SQL statement. The identifier must start with
/// an ASCII letter or `_` and may otherwise contain only ASCII alphanumeric
/// characters or `_`.
pub(crate) fn validate_ksql_identifier(
identifier: &str,
) -> std::io::Result<&str> {
let mut chars = identifier.chars();
let first = chars.next().ok_or_else(|| {
std::io::Error::other("ksql identifier must not be empty")
})?;
if !(first.is_ascii_alphabetic() || first == '_') {
return Err(std::io::Error::other(format!(
"invalid ksql identifier `{identifier}`: must start with an ASCII letter or `_`"
)));
}
for c in chars {
if !(c.is_ascii_alphanumeric() || c == '_') {
return Err(std::io::Error::other(format!(
"invalid ksql identifier `{identifier}`: only ASCII alphanumeric characters and `_` are allowed"
)));
}
}
Ok(identifier)
}

#[allow(dead_code)]
fn read_to_string(path: &Path) -> PluginResult<String> {
let mut file = File::open(path)?;
let mut contents = String::new();
file.read_to_string(&mut contents)?;
Ok(contents)
}

fn resolve_runtime_config_path(
pub(crate) fn resolve_runtime_config_path(
wrapper_path: &Path,
runtime_path: &Path,
) -> PathBuf {
Expand All @@ -273,12 +331,45 @@ fn resolve_runtime_config_path(

#[cfg(test)]
mod tests {
use super::Config;
use super::{Config, validate_ksql_identifier};
use std::{
fs,
time::{SystemTime, UNIX_EPOCH},
};

#[test]
fn test_validates_simple_identifier() {
assert_eq!(validate_ksql_identifier("accounts").unwrap(), "accounts");
assert_eq!(validate_ksql_identifier("_x").unwrap(), "_x");
assert_eq!(validate_ksql_identifier("A1_b2").unwrap(), "A1_b2");
}

#[test]
fn test_rejects_empty_identifier() {
let error = validate_ksql_identifier("").unwrap_err().to_string();
assert!(error.contains("must not be empty"));
}

#[test]
fn test_rejects_identifier_starting_with_digit() {
let error = validate_ksql_identifier("1bad").unwrap_err().to_string();
assert!(error.contains("must start with an ASCII letter"));
}

#[test]
fn test_rejects_identifier_with_invalid_characters() {
let error = validate_ksql_identifier("accounts; DROP TABLE x")
.unwrap_err()
.to_string();
assert!(error.contains("only ASCII alphanumeric"));
}

#[test]
fn test_rejects_identifier_with_quote() {
let error = validate_ksql_identifier("a\"b").unwrap_err().to_string();
assert!(error.contains("only ASCII alphanumeric"));
}

fn parse_config(toml: &str) -> Result<Config, String> {
let mut config: Config =
toml::from_str(toml).map_err(|error| error.to_string())?;
Expand Down Expand Up @@ -606,6 +697,141 @@ admin = "127.0.0.1:8080"
);
}

#[test]
fn test_rejects_invalid_ksql_table_identifier() {
let error = parse_config(
r#"
libpath = "target/release/libsolana_accountsdb_plugin_kafka.so"

[kafka]
bootstrap_servers = "localhost:9092"
topic = "solana.testnet.account_updates"

[ksql]
url = "http://127.0.0.1:8088"
table = "bad-name"

[plugin]
local_rpc_url = "http://127.0.0.1:8899"
admin = "127.0.0.1:8080"
"#,
)
.unwrap_err();

assert!(error.contains("invalid config field `ksql.table`"));
assert!(error.contains("only ASCII alphanumeric"));
}

#[test]
fn test_rejects_ksql_table_starting_with_digit() {
let error = parse_config(
r#"
libpath = "target/release/libsolana_accountsdb_plugin_kafka.so"

[kafka]
bootstrap_servers = "localhost:9092"
topic = "solana.testnet.account_updates"

[ksql]
table = "1bad"

[plugin]
local_rpc_url = "http://127.0.0.1:8899"
admin = "127.0.0.1:8080"
"#,
)
.unwrap_err();

assert!(error.contains("invalid config field `ksql.table`"));
assert!(error.contains("must start with an ASCII letter"));
}

#[test]
fn test_rejects_empty_local_rpc_url() {
let error = parse_config(
r#"
libpath = "target/release/libsolana_accountsdb_plugin_kafka.so"

[kafka]
bootstrap_servers = "localhost:9092"
topic = "solana.testnet.account_updates"

[plugin]
local_rpc_url = " "
admin = "127.0.0.1:8080"
"#,
)
.unwrap_err();

assert!(error.contains("invalid config field `plugin.local_rpc_url`"));
assert!(error.contains("URL must not be empty"));
}

#[test]
fn test_rejects_local_rpc_url_without_scheme() {
let error = parse_config(
r#"
libpath = "target/release/libsolana_accountsdb_plugin_kafka.so"

[kafka]
bootstrap_servers = "localhost:9092"
topic = "solana.testnet.account_updates"

[plugin]
local_rpc_url = "127.0.0.1:8899"
admin = "127.0.0.1:8080"
"#,
)
.unwrap_err();

assert!(error.contains("invalid config field `plugin.local_rpc_url`"));
assert!(error.contains("relative URL without a base"));
}

#[test]
fn test_rejects_local_rpc_url_with_unsupported_scheme() {
let error = parse_config(
r#"
libpath = "target/release/libsolana_accountsdb_plugin_kafka.so"

[kafka]
bootstrap_servers = "localhost:9092"
topic = "solana.testnet.account_updates"

[plugin]
local_rpc_url = "ftp://127.0.0.1:8899"
admin = "127.0.0.1:8080"
"#,
)
.unwrap_err();

assert!(error.contains("invalid config field `plugin.local_rpc_url`"));
assert!(error.contains("unsupported scheme `ftp`"));
}

#[test]
fn test_rejects_local_rpc_url_without_host() {
let error = parse_config(
r#"
libpath = "target/release/libsolana_accountsdb_plugin_kafka.so"

[kafka]
bootstrap_servers = "localhost:9092"
topic = "solana.testnet.account_updates"

[plugin]
local_rpc_url = "http://:8899"
admin = "127.0.0.1:8080"
"#,
)
.unwrap_err();

assert!(error.contains("invalid config field `plugin.local_rpc_url`"));
assert!(
error.contains("empty host") || error.contains("host is required")
);
}

#[test]
fn test_passes_through_kafka_client_overrides() {
let config = parse_config(
Expand Down
Loading