diff --git a/Cargo.lock b/Cargo.lock index 5f766da..5ad399a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2206,6 +2206,33 @@ version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f42a60cbdf9a97f5d2305f08a87dc4e09308d1276d28c869c684d7777685682" +[[package]] +name = "ix-tests" +version = "0.1.0" +dependencies = [ + "anyhow", + "bs58", + "futures", + "helius-laserstream", + "nix", + "reqwest 0.12.28", + "serde", + "serde_json", + "solana-keypair 3.1.2", + "solana-pubkey 4.1.0", + "solana-rpc-client", + "solana-signature 3.4.0", + "solana-signer 3.0.0", + "solana-system-interface 3.2.0", + "solana-transaction 3.1.0", + "tokio", + "tokio-stream", + "toml 0.9.12+spec-1.1.0", + "tonic", + "tracing", + "tracing-subscriber", +] + [[package]] name = "jiff" version = "0.2.23" @@ -2416,6 +2443,7 @@ dependencies = [ "thiserror 2.0.18", "tokio", "tokio-stream", + "tokio-util", "toml 0.9.12+spec-1.1.0", "tonic", "tracing", @@ -2520,6 +2548,18 @@ dependencies = [ "tempfile", ] +[[package]] +name = "nix" +version = "0.30.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "74523f3a35e05aba87a1d978330aef40f67b0304ac79c1c00b294c9830543db6" +dependencies = [ + "bitflags 2.11.1", + "cfg-if", + "cfg_aliases", + "libc", +] + [[package]] name = "nu-ansi-term" version = "0.50.3" @@ -7365,6 +7405,7 @@ dependencies = [ "bytes", "futures-core", "futures-sink", + "futures-util", "pin-project-lite", "tokio", ] diff --git a/Cargo.toml b/Cargo.toml index 9e92def..9cafb59 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,3 +1,3 @@ [workspace] -members = ["event-proto", "grpc-service", "geyser-plugin"] +members = ["event-proto", "grpc-service", "geyser-plugin", "ix-tests"] resolver = "2" diff --git a/Makefile b/Makefile index 9911f0d..829b2f0 100644 --- a/Makefile +++ b/Makefile @@ -10,6 +10,9 @@ CLIENT_REST ?= http://127.0.0.1:3030 kafka-ready \ kafka-ui \ kafka-ui-down \ + ix-tests-build \ + ix-tests-run \ + ix-tests-scenario \ grpc-service-run \ grpc-service-build \ grpc-service-client \ @@ -27,6 +30,9 @@ help: @echo " kafka-ready - Start the stack and initialize stream/table/schema" @echo " kafka-ui - Start Redpanda Console" @echo " kafka-ui-down - Stop Redpanda Console" + @echo " ix-tests-build - Build the gRPC service binary and the ix-tests harness" + @echo " ix-tests-run - Run the full local integration suite" + @echo " ix-tests-scenario - Run one integration scenario (SCENARIO=...)" @echo " grpc-service-run - Run the gRPC service" @echo " grpc-service-build - Build the gRPC service package" @echo " grpc-service-client - Run the example gRPC client" @@ -61,6 +67,20 @@ kafka-ui: kafka-ui-down: $(MAKE) -C kafka-setup ui-down +kafka-reset-state: + $(MAKE) -C kafka-setup reset-state + +ix-tests-build: + cargo build -p magigblock-grpc-service + cargo build -p ix-tests + +ix-tests-run: + cargo run -p ix-tests -- --config ix-tests/configs/suite.toml --scenario all + +ix-tests-scenario: + @test -n "$(SCENARIO)" || (echo "Provide SCENARIO=..." >&2; exit 1) + cargo run -p ix-tests -- --config ix-tests/configs/suite.toml --scenario "$(SCENARIO)" + grpc-service-run: $(MAKE) -C grpc-service run diff --git a/README.md b/README.md index 90f8067..dcd674a 100644 --- a/README.md +++ b/README.md @@ -12,6 +12,7 @@ This repo contains the MagicBlock account update pipeline: - `event-proto/`: shared Rust crate `magigblock-event-proto` - `grpc-service/`: Rust crate `magigblock-grpc-service` - `geyser-plugin/`: Solana Geyser plugin crate +- `ix-tests/`: local end-to-end gRPC integration harness - `kafka-setup/`: minimal Kafka/ksqlDB local environment - `Makefile`: top-level operator entrypoint @@ -36,6 +37,9 @@ cargo test --workspace -- --test-threads=16 - `make kafka-ready` - `make kafka-ui` - `make kafka-ui-down` +- `make ix-tests-build` +- `make ix-tests-run` +- `make ix-tests-scenario SCENARIO=single-basic` - `make grpc-service-run` - `make grpc-service-build` - `make grpc-service-client` @@ -43,3 +47,5 @@ cargo test --workspace -- --test-threads=16 - `make grpc-service-client-remove-sub PUBKEY=` - `make geyser-plugin-build` - `make geyser-plugin-launch` + +The integration suite assumes Kafka/ksqlDB and the validator-with-plugin are already up. Scenarios are isolated and can be run individually. Failure artifacts are written under `target/ix-tests/failures/`. diff --git a/geyser-plugin/Makefile b/geyser-plugin/Makefile index 4fab2f0..4ccc6bd 100644 --- a/geyser-plugin/Makefile +++ b/geyser-plugin/Makefile @@ -1,5 +1,7 @@ .PHONY: help build build-plugin launch init-config clean +MAKEFILE_DIR := $(dir $(abspath $(lastword $(MAKEFILE_LIST)))) + UNAME_S := $(shell uname -s) ifeq ($(UNAME_S),Linux) PLUGIN_EXT := so @@ -18,14 +20,6 @@ help: @echo " make launch - Launch solana-test-validator with the plugin (depends on build-plugin)" @echo " make clean - Remove compiled artifacts" -init-config: - @echo "Creating $(PLUGIN_CONFIG)..." - @perl -0pe 's#(? "$(PLUGIN_CONFIG)" - @echo "Creating $(VALIDATOR_CONFIG)..." - @perl -0pe 's#(? "$(VALIDATOR_CONFIG)" - @echo "✓ Created $(PLUGIN_CONFIG)" - @echo "✓ Created $(VALIDATOR_CONFIG)" - build-plugin: @echo "Building plugin for $(UNAME_S)..." cargo build --release @@ -46,8 +40,26 @@ launch: build-plugin exit 1; \ fi; \ fi + cd "$(MAKEFILE_DIR)" && \ solana-test-validator --log --reset --geyser-plugin-config "$(VALIDATOR_CONFIG)" +init-config: + @echo "Creating $(VALIDATOR_CONFIG) and $(PLUGIN_CONFIG) for $(UNAME_S) (.$(PLUGIN_EXT))..." + @if [ -f "$(VALIDATOR_CONFIG)" ]; then \ + echo "Error: $(VALIDATOR_CONFIG) already exists, refusing to overwrite"; \ + exit 1; \ + fi + @if [ -f "$(PLUGIN_CONFIG)" ]; then \ + echo "Error: $(PLUGIN_CONFIG) already exists, refusing to overwrite"; \ + exit 1; \ + fi + @sed -e 's|path-to-plugin-library|$(PLUGIN_PATH)|' \ + -e 's|plugin-config.toml|$(PLUGIN_CONFIG)|' \ + ./configs/plugin-config.example.json > "$(VALIDATOR_CONFIG)" + @cp ./configs/plugin-config.example.toml "$(PLUGIN_CONFIG)" + @echo "Created $(VALIDATOR_CONFIG) (libpath: $(PLUGIN_PATH))" + @echo "Created $(PLUGIN_CONFIG)" + clean: cargo clean @echo "Cleaned build artifacts" diff --git a/geyser-plugin/configs/plugin-config.example.json b/geyser-plugin/configs/plugin-config.example.json index 7b83930..748ca13 100644 --- a/geyser-plugin/configs/plugin-config.example.json +++ b/geyser-plugin/configs/plugin-config.example.json @@ -1,4 +1,4 @@ { - "libpath": "../target/release/libsolana_accountsdb_plugin_kafka.so", + "libpath": "path-to-plugin-library", "config_file": "plugin-config.toml" } diff --git a/geyser-plugin/src/account_update_publisher.rs b/geyser-plugin/src/account_update_publisher.rs index 6d08095..a5d5fb2 100644 --- a/geyser-plugin/src/account_update_publisher.rs +++ b/geyser-plugin/src/account_update_publisher.rs @@ -69,8 +69,9 @@ fn publish_raw_account_update( ) -> PluginResult<()> { if let Ok(key) = <[u8; 32]>::try_from(event.pubkey.as_slice()) { debug!( - "Matched account update {} in slot {}", + "Matched account update {} lamports {} in slot {}", Pubkey::new_from_array(key), + event.lamports, event.slot ); } diff --git a/grpc-service/Cargo.toml b/grpc-service/Cargo.toml index a2790b4..45ddcb8 100644 --- a/grpc-service/Cargo.toml +++ b/grpc-service/Cargo.toml @@ -23,6 +23,7 @@ serde_json = "1.0" toml = "0.9.12" tokio = { version = "1.47", features = ["macros", "rt-multi-thread", "signal"] } tokio-stream = { version = "0.1", features = ["net"] } +tokio-util = { version = "0.7", features = ["rt"] } tonic = { version = "0.12", features = ["transport"] } tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter", "fmt"] } diff --git a/grpc-service/src/app.rs b/grpc-service/src/app.rs index 5318204..9b58d39 100644 --- a/grpc-service/src/app.rs +++ b/grpc-service/src/app.rs @@ -1,13 +1,18 @@ use crate::config::Config; use crate::domain::AccountEvent; use crate::errors::GeykagResult; -use crate::grpc_service::{GrpcService, GrpcServiceHandle, GrpcSink}; +use crate::grpc_service::{ + GrpcService, GrpcServiceHandle, GrpcSink, ServiceReadiness, +}; use crate::kafka::KafkaAccountUpdateStream; use crate::ksql::KsqlAccountSnapshotClient; use crate::output::{ConsoleSink, TeeSink}; +use crate::preflight; use crate::traits::{ AccountSink, AccountUpdateSource, SnapshotStore, StatusSink, }; +use std::time::Duration; +use tokio_util::sync::CancellationToken; pub struct App< P: SnapshotStore, @@ -20,6 +25,7 @@ pub struct App< account_update_source: K, sink: A, status_sink: S, + readiness: ServiceReadiness, } impl @@ -34,14 +40,20 @@ impl pub fn new(config: Config) -> GeykagResult { let snapshot_store = KsqlAccountSnapshotClient::new(config.ksql.clone())?; - let account_update_source = - KafkaAccountUpdateStream::new(config.kafka.clone()); + let readiness = ServiceReadiness::new(); + let shutdown = CancellationToken::new(); + let account_update_source = KafkaAccountUpdateStream::new( + config.kafka.clone(), + readiness.clone(), + shutdown.clone(), + ); Ok(Self::build( config, snapshot_store, account_update_source, ConsoleSink::new(), ConsoleSink::new(), + readiness, )) } } @@ -55,19 +67,27 @@ impl > { #[allow(dead_code)] - pub fn new_grpc(config: Config) -> GeykagResult<(Self, GrpcServiceHandle)> { + pub fn new_grpc( + config: Config, + shutdown: CancellationToken, + ) -> GeykagResult<(Self, GrpcServiceHandle)> { let grpc = GrpcService::start(&config)?; let sink = grpc.sink(); let snapshot_store = KsqlAccountSnapshotClient::new(config.ksql.clone())?; - let account_update_source = - KafkaAccountUpdateStream::new(config.kafka.clone()); + let readiness = grpc.readiness(); + let account_update_source = KafkaAccountUpdateStream::new( + config.kafka.clone(), + readiness.clone(), + shutdown.clone(), + ); let app = Self::build( config, snapshot_store, account_update_source, sink, ConsoleSink::new(), + readiness, ); Ok((app, grpc)) @@ -84,19 +104,25 @@ impl { pub fn new_grpc_with_console( config: Config, + shutdown: CancellationToken, ) -> GeykagResult<(Self, GrpcServiceHandle)> { let grpc = GrpcService::start(&config)?; let sink = TeeSink::new(grpc.sink(), ConsoleSink::new()); let snapshot_store = KsqlAccountSnapshotClient::new(config.ksql.clone())?; - let account_update_source = - KafkaAccountUpdateStream::new(config.kafka.clone()); + let readiness = grpc.readiness(); + let account_update_source = KafkaAccountUpdateStream::new( + config.kafka.clone(), + readiness.clone(), + shutdown.clone(), + ); let app = Self::build( config, snapshot_store, account_update_source, sink, ConsoleSink::new(), + readiness, ); Ok((app, grpc)) @@ -112,6 +138,7 @@ impl account_update_source: K, sink: A, status_sink: S, + readiness: ServiceReadiness, ) -> Self { Self { config, @@ -119,6 +146,7 @@ impl account_update_source, sink, status_sink, + readiness, } } @@ -145,6 +173,19 @@ impl } } + // Startup preflight gates client-visible readiness. + if !self.readiness.is_ready() { + preflight::wait_for_dependencies( + &self.config, + Duration::from_secs(60), + ) + .await?; + self.readiness.mark_preflight_ready(); + tracing::info!( + "startup preflight complete; waiting for Kafka consumer assignment before advertising readiness" + ); + } + self.account_update_source .run(self.config.pubkey_filter.as_ref(), |message| { let event = AccountEvent::Live(message); @@ -168,11 +209,11 @@ mod tests { bytes_to_base58, }; use crate::errors::{GeykagError, GeykagResult}; + use crate::grpc_service::ServiceReadiness; use crate::kafka::StreamMessage; use crate::traits::{ AccountSink, AccountUpdateSource, SnapshotStore, StatusSink, }; - fn config(pubkey_filter: Option) -> Config { Config { kafka: KafkaConfig { @@ -189,6 +230,7 @@ mod tests { validator: ValidatorConfig { accounts_filter_url: "http://localhost:3000/filters/accounts" .to_owned(), + rpc_url: "http://localhost:8899".to_owned(), }, grpc: GrpcConfig { bind_host: "127.0.0.1".to_owned(), @@ -415,6 +457,7 @@ mod tests { update_source.clone(), sink.clone(), status_sink.clone(), + ServiceReadiness::ready_for_test(), ); app.run().await.unwrap(); @@ -439,6 +482,7 @@ mod tests { update_source.clone(), RecordingSink::new(false, false), status_sink.clone(), + ServiceReadiness::ready_for_test(), ); app.run().await.unwrap(); @@ -463,6 +507,7 @@ mod tests { update_source.clone(), RecordingSink::new(false, false), status_sink.clone(), + ServiceReadiness::ready_for_test(), ); app.run().await.unwrap(); @@ -490,6 +535,7 @@ mod tests { update_source.clone(), RecordingSink::new(false, false), RecordingStatusSink::new(), + ServiceReadiness::ready_for_test(), ); let error = app.run().await.unwrap_err(); @@ -510,6 +556,7 @@ mod tests { update_source.clone(), RecordingSink::new(true, false), RecordingStatusSink::new(), + ServiceReadiness::ready_for_test(), ); let error = app.run().await.unwrap_err(); @@ -529,6 +576,7 @@ mod tests { update_source.clone(), RecordingSink::new(false, true), RecordingStatusSink::new(), + ServiceReadiness::ready_for_test(), ); let error = app.run().await.unwrap_err(); @@ -548,6 +596,7 @@ mod tests { update_source.clone(), RecordingSink::new(false, false), RecordingStatusSink::new(), + ServiceReadiness::ready_for_test(), ); let error = app.run().await.unwrap_err(); diff --git a/grpc-service/src/config.rs b/grpc-service/src/config.rs index 8780252..a99d6fe 100644 --- a/grpc-service/src/config.rs +++ b/grpc-service/src/config.rs @@ -14,6 +14,7 @@ const DEFAULT_KSQL_URL: &str = "http://localhost:8088"; const DEFAULT_KSQL_TABLE: &str = "ACCOUNTS"; const DEFAULT_VALIDATOR_ACCOUNTS_FILTER_URL: &str = "http://localhost:3000/filters/accounts"; +const DEFAULT_VALIDATOR_RPC_URL: &str = "http://127.0.0.1:8899"; const DEFAULT_AUTO_OFFSET_RESET: &str = "latest"; const DEFAULT_GRPC_BIND_HOST: &str = "0.0.0.0"; const DEFAULT_GRPC_PORT: u16 = 50051; @@ -46,6 +47,8 @@ pub struct KsqlConfig { #[derive(Clone, Debug)] pub struct ValidatorConfig { pub accounts_filter_url: String, + #[allow(dead_code)] + pub rpc_url: String, } #[derive(Clone, Debug)] @@ -97,6 +100,8 @@ struct FileKsqlConfig { struct FileValidatorConfig { #[serde(default)] accounts_filter_url: Option, + #[serde(default)] + rpc_url: Option, } #[derive(Debug, Deserialize)] @@ -128,6 +133,7 @@ impl Config { }); let validator = file.validator.unwrap_or(FileValidatorConfig { accounts_filter_url: None, + rpc_url: None, }); let grpc = file.grpc.unwrap_or(FileGrpcConfig { bind_host: None, @@ -161,6 +167,9 @@ impl Config { .unwrap_or_else(|| { DEFAULT_VALIDATOR_ACCOUNTS_FILTER_URL.to_owned() }), + rpc_url: validator + .rpc_url + .unwrap_or_else(|| DEFAULT_VALIDATOR_RPC_URL.to_owned()), }, grpc: GrpcConfig { bind_host: grpc diff --git a/grpc-service/src/errors.rs b/grpc-service/src/errors.rs index 7736f0d..be9af17 100644 --- a/grpc-service/src/errors.rs +++ b/grpc-service/src/errors.rs @@ -69,6 +69,63 @@ pub enum GeykagError { #[source] source: reqwest::Error, }, + #[allow(dead_code)] + #[error("startup preflight HTTP client build failed")] + PreflightClientBuild { + #[source] + source: reqwest::Error, + }, + #[allow(dead_code)] + #[error("validator plugin admin probe to {url} failed")] + PreflightValidatorPluginRequest { + url: String, + #[source] + source: reqwest::Error, + }, + #[allow(dead_code)] + #[error( + "validator plugin admin probe to {url} returned non-success status" + )] + PreflightValidatorPluginStatus { + url: String, + #[source] + source: reqwest::Error, + }, + #[allow(dead_code)] + #[error("validator RPC probe to {url} failed")] + PreflightValidatorRpcRequest { + url: String, + #[source] + source: reqwest::Error, + }, + #[allow(dead_code)] + #[error("validator RPC probe to {url} returned non-success status")] + PreflightValidatorRpcStatus { + url: String, + #[source] + source: reqwest::Error, + }, + #[allow(dead_code)] + #[error("Kafka broker {broker} metadata probe failed")] + PreflightKafkaMetadata { + broker: String, + #[source] + source: KafkaError, + }, + #[allow(dead_code)] + #[error("Kafka metadata probe task failed")] + PreflightKafkaProbeJoin { + #[source] + source: tokio::task::JoinError, + }, + #[allow(dead_code)] + #[error( + "startup preflight timed out after {elapsed_ms} ms; last failing probe: {probe}" + )] + PreflightTimeout { + probe: &'static str, + elapsed_ms: u128, + }, #[error("failed to parse ksqlDB response line as JSON: {line}")] KsqlJsonLine { line: String, diff --git a/grpc-service/src/grpc_service/init_subs.rs b/grpc-service/src/grpc_service/init_subs.rs index da82962..914a7b1 100644 --- a/grpc-service/src/grpc_service/init_subs.rs +++ b/grpc-service/src/grpc_service/init_subs.rs @@ -1,3 +1,5 @@ +use tracing::{debug, info}; + use crate::errors::{GeykagError, GeykagResult}; use crate::traits::ValidatorSubscriptions; @@ -34,6 +36,12 @@ impl InitSubsClient { .join(","); let body = format!(r#"{{"pubkeys":[{pubkeys_json}]}}"#); + debug!( + url = %self.accounts_filter_url, + pubkey_count = pubkeys.len(), + "validator whitelist HTTP POST starting" + ); + self.http .post(&self.accounts_filter_url) .header(reqwest::header::CONTENT_TYPE, "application/json") @@ -42,8 +50,15 @@ impl InitSubsClient { .await .map_err(|source| GeykagError::InitSubsRequest { source })? .error_for_status() - .map(|_| ()) - .map_err(|source| GeykagError::InitSubsRequestStatus { source }) + .map_err(|source| GeykagError::InitSubsRequestStatus { source })?; + + info!( + url = %self.accounts_filter_url, + pubkey_count = pubkeys.len(), + "validator whitelist HTTP POST completed" + ); + + Ok(()) } } diff --git a/grpc-service/src/grpc_service/mod.rs b/grpc-service/src/grpc_service/mod.rs index f05aeac..77eccb3 100644 --- a/grpc-service/src/grpc_service/mod.rs +++ b/grpc-service/src/grpc_service/mod.rs @@ -1,10 +1,12 @@ mod convert; mod dispatcher; mod init_subs; +mod readiness; mod runtime; mod service; mod sink; mod utils; +pub use readiness::ServiceReadiness; pub use runtime::{GrpcService, GrpcServiceHandle}; pub use sink::GrpcSink; diff --git a/grpc-service/src/grpc_service/readiness.rs b/grpc-service/src/grpc_service/readiness.rs new file mode 100644 index 0000000..48684fa --- /dev/null +++ b/grpc-service/src/grpc_service/readiness.rs @@ -0,0 +1,118 @@ +use std::sync::Arc; +use std::sync::atomic::{AtomicBool, Ordering}; + +#[derive(Debug, Default)] +struct ServiceReadinessInner { + preflight_ready: AtomicBool, + kafka_ready: AtomicBool, +} + +/// Shared startup-readiness state for the gRPC service. +/// +/// The service starts in the "not ready" state. It becomes ready only +/// after startup preflight has verified all required dependencies and +/// the Kafka consumer has received a partition assignment. +#[derive(Clone, Debug, Default)] +#[allow(dead_code)] +pub struct ServiceReadiness { + inner: Arc, +} + +impl ServiceReadiness { + #[allow(dead_code)] + pub fn new() -> Self { + Self { + inner: Arc::new(ServiceReadinessInner::default()), + } + } + + #[cfg(test)] + pub fn ready_for_test() -> Self { + let readiness = Self::new(); + readiness.mark_preflight_ready(); + readiness.mark_kafka_ready(); + readiness + } + + #[allow(dead_code)] + pub fn mark_preflight_ready(&self) { + self.inner.preflight_ready.store(true, Ordering::Release); + } + + #[allow(dead_code)] + pub fn mark_kafka_ready(&self) { + self.inner.kafka_ready.store(true, Ordering::Release); + } + + #[allow(dead_code)] + pub fn mark_kafka_not_ready(&self) { + self.inner.kafka_ready.store(false, Ordering::Release); + } + + #[allow(dead_code)] + pub fn is_ready(&self) -> bool { + self.inner.preflight_ready.load(Ordering::Acquire) + && self.inner.kafka_ready.load(Ordering::Acquire) + } +} + +#[cfg(test)] +mod tests { + use super::ServiceReadiness; + + #[test] + fn test_new_is_not_ready() { + let r = ServiceReadiness::new(); + assert!(!r.is_ready()); + } + + #[test] + fn test_preflight_alone_does_not_make_service_ready() { + let r = ServiceReadiness::new(); + r.mark_preflight_ready(); + assert!(!r.is_ready()); + } + + #[test] + fn test_kafka_alone_does_not_make_service_ready() { + let r = ServiceReadiness::new(); + r.mark_kafka_ready(); + assert!(!r.is_ready()); + } + + #[test] + fn test_service_becomes_ready_only_after_both_flags() { + let r = ServiceReadiness::new(); + r.mark_preflight_ready(); + r.mark_kafka_ready(); + assert!(r.is_ready()); + } + + #[test] + fn test_mark_kafka_not_ready_clears_readiness() { + let r = ServiceReadiness::new(); + r.mark_preflight_ready(); + r.mark_kafka_ready(); + assert!(r.is_ready()); + + r.mark_kafka_not_ready(); + assert!(!r.is_ready()); + } + + #[test] + fn test_clones_share_state() { + let r = ServiceReadiness::new(); + let r2 = r.clone(); + r.mark_preflight_ready(); + r.mark_kafka_ready(); + assert!(r2.is_ready()); + + r2.mark_kafka_not_ready(); + assert!(!r.is_ready()); + } + + #[test] + fn test_ready_for_test_is_ready() { + assert!(ServiceReadiness::ready_for_test().is_ready()); + } +} diff --git a/grpc-service/src/grpc_service/runtime.rs b/grpc-service/src/grpc_service/runtime.rs index 64e0175..57ffbd5 100644 --- a/grpc-service/src/grpc_service/runtime.rs +++ b/grpc-service/src/grpc_service/runtime.rs @@ -16,6 +16,7 @@ use tonic::transport::Server; use super::dispatcher::DispatcherHandle; use super::init_subs::InitSubsClient; +use super::readiness::ServiceReadiness; use super::service::GrpcSubscriptionService; use super::sink::GrpcSink; @@ -72,10 +73,12 @@ impl GrpcService { KsqlAccountSnapshotClient::new(config.ksql.clone())?; let validator_subscriptions = InitSubsClient::new(config.validator.accounts_filter_url.clone())?; + let readiness = ServiceReadiness::new(); let service = GrpcSubscriptionService::new( dispatcher, snapshot_store, validator_subscriptions, + readiness.clone(), ) .into_server(); let (shutdown_tx, shutdown_rx) = oneshot::channel(); @@ -94,6 +97,7 @@ impl GrpcService { Ok(GrpcServiceHandle { sink, + readiness, is_running, shutdown_tx: Some(shutdown_tx), task: Some(task), @@ -105,6 +109,7 @@ impl GrpcService { #[derive(Debug)] pub struct GrpcServiceHandle { sink: GrpcSink, + readiness: ServiceReadiness, is_running: Arc, shutdown_tx: Option>, task: Option>>, @@ -116,6 +121,10 @@ impl GrpcServiceHandle { self.sink.clone() } + pub fn readiness(&self) -> ServiceReadiness { + self.readiness.clone() + } + #[allow(dead_code)] pub fn local_addr(&self) -> SocketAddr { self.local_addr diff --git a/grpc-service/src/grpc_service/service.rs b/grpc-service/src/grpc_service/service.rs index 3942b74..b173107 100644 --- a/grpc-service/src/grpc_service/service.rs +++ b/grpc-service/src/grpc_service/service.rs @@ -18,6 +18,7 @@ use tracing::{debug, info, warn}; use super::convert::to_subscribe_update; use super::dispatcher::{DispatcherHandle, TargetedSendResult}; +use super::readiness::ServiceReadiness; use crate::domain::{AccountEvent, PubkeyFilter}; use crate::traits::{SnapshotStore, ValidatorSubscriptions}; @@ -37,6 +38,7 @@ pub(crate) struct GrpcSubscriptionService< dispatcher: DispatcherHandle, snapshot_store: P, validator_subscriptions: V, + readiness: ServiceReadiness, } impl< @@ -48,11 +50,13 @@ impl< dispatcher: DispatcherHandle, snapshot_store: P, validator_subscriptions: V, + readiness: ServiceReadiness, ) -> Self { Self { dispatcher, snapshot_store, validator_subscriptions, + readiness, } } @@ -101,6 +105,12 @@ async fn bootstrap_new_pubkeys_impl< // will publish one of two Kafka updates: // - the current account update if the account exists // - a MissingAccount update if the account does not exist + debug!( + client_id, + pubkey = %pubkey_b58, + "fetching snapshot bootstrap for pubkey" + ); + let snapshot = match snapshot_store.fetch_one_by_pubkey(&pubkey).await { Ok(snapshot) => snapshot, Err(error) => { @@ -139,7 +149,13 @@ async fn bootstrap_new_pubkeys_impl< }; match dispatcher.send_to_client(client_id, update).await { - Ok(TargetedSendResult::Delivered) => {} + Ok(TargetedSendResult::Delivered) => { + info!( + client_id, + pubkey = %pubkey_b58, + "snapshot bootstrap dispatched" + ); + } Ok(TargetedSendResult::ClientNotFound) => { warn!( client_id, @@ -183,7 +199,8 @@ async fn bootstrap_new_pubkeys_impl< info!( client_id, pubkey_count = pubkeys_to_whitelist.len(), - "whitelisting ksql-missing pubkeys with validator" + pubkeys = ?pubkeys_to_whitelist, + "validator whitelist request starting" ); if let Err(error) = validator_subscriptions @@ -196,6 +213,13 @@ async fn bootstrap_new_pubkeys_impl< error = %error, "failed to whitelist pubkeys with validator" ); + } else { + info!( + client_id, + pubkey_count = pubkeys_to_whitelist.len(), + pubkeys = ?pubkeys_to_whitelist, + "validator whitelist request completed" + ); } } @@ -260,6 +284,15 @@ fn parse_pubkey_list(accounts: &[String]) -> Result, Status> { Ok(set) } +fn normalized_pubkeys(pubkeys: &HashSet<[u8; 32]>) -> Vec { + let mut normalized = pubkeys + .iter() + .map(|pubkey| bs58::encode(pubkey).into_string()) + .collect::>(); + normalized.sort(); + normalized +} + enum FilterOp { Replace(HashSet<[u8; 32]>), Patch { @@ -309,9 +342,15 @@ impl< })?; let initial_filter = parse_accounts_filter(&first_req)?; + let initial_pubkeys = normalized_pubkeys(&initial_filter); info!( filter_size = initial_filter.len(), - "new gRPC subscriber connected" + pubkeys = tracing::field::debug(if initial_pubkeys.is_empty() { + None::<&Vec> + } else { + Some(&initial_pubkeys) + }), + "gRPC subscribe request received" ); // 2. Register with dispatcher using parsed filter @@ -421,9 +460,15 @@ impl< async fn ping( &self, - _request: Request, + request: Request, ) -> Result, Status> { - Err(Status::unimplemented("Ping is not supported")) + if !self.readiness.is_ready() { + debug!("ping rejected: service not ready"); + return Err(Status::unavailable("service not ready")); + } + Ok(Response::new(PongResponse { + count: request.into_inner().count, + })) } async fn get_latest_blockhash( @@ -474,13 +519,18 @@ mod tests { }; use tokio::time::timeout; + use helius_laserstream::grpc::PingRequest; + use helius_laserstream::grpc::geyser_server::Geyser; + use tonic::Request; + use super::{ - FilterOp, bootstrap_new_pubkeys_impl, parse_accounts_filter, - parse_filter_op, parse_pubkey_list, + FilterOp, GrpcSubscriptionService, bootstrap_new_pubkeys_impl, + parse_accounts_filter, parse_filter_op, parse_pubkey_list, }; use crate::domain::{AccountState, PubkeyFilter, bytes_to_base58}; use crate::errors::{GeykagError, GeykagResult}; use crate::grpc_service::dispatcher::DispatcherHandle; + use crate::grpc_service::readiness::ServiceReadiness; use crate::traits::{SnapshotStore, ValidatorSubscriptions}; fn pubkey_bytes(byte: u8) -> [u8; 32] { @@ -925,4 +975,43 @@ mod tests { [pubkey_b58(1), pubkey_b58(2)].into_iter().collect() ); } + + #[tokio::test] + async fn test_ping_returns_ok() { + let dispatcher = DispatcherHandle::spawn(8, 8); + let snapshot_store = MockSnapshotStore::new(HashMap::new()); + let validator = MockValidatorSubscriptions::succeed(); + + let service = GrpcSubscriptionService::new( + dispatcher, + snapshot_store, + validator, + ServiceReadiness::ready_for_test(), + ); + + let response = + service.ping(Request::new(PingRequest { count: 0 })).await; + assert!(response.is_ok()); + } + + #[tokio::test] + async fn test_ping_returns_unavailable_when_not_ready() { + use tonic::Code; + let dispatcher = DispatcherHandle::spawn(8, 8); + let snapshot_store = MockSnapshotStore::new(HashMap::new()); + let validator = MockValidatorSubscriptions::succeed(); + + let service = GrpcSubscriptionService::new( + dispatcher, + snapshot_store, + validator, + ServiceReadiness::new(), + ); + + let response = service + .ping(Request::new(PingRequest { count: 0 })) + .await + .unwrap_err(); + assert_eq!(response.code(), Code::Unavailable); + } } diff --git a/grpc-service/src/kafka.rs b/grpc-service/src/kafka.rs index 6b0ea5d..687a4a7 100644 --- a/grpc-service/src/kafka.rs +++ b/grpc-service/src/kafka.rs @@ -4,21 +4,142 @@ use magigblock_event_proto::{ }; use prost::Message; use rdkafka::Message as KafkaMessage; +use rdkafka::client::ClientContext; use rdkafka::config::ClientConfig; -use rdkafka::consumer::{Consumer, StreamConsumer}; -use tracing::{error, info, warn}; +use rdkafka::consumer::{Consumer, ConsumerContext, Rebalance, StreamConsumer}; +use rdkafka::topic_partition_list::TopicPartitionList; +use tokio_util::sync::CancellationToken; +use tracing::{debug, error, info, warn}; use crate::config::KafkaConfig; use crate::domain::{AccountUpdate, PubkeyFilter, bytes_to_base58}; use crate::errors::{GeykagError, GeykagResult}; +use crate::grpc_service::ServiceReadiness; use crate::traits::AccountUpdateSource; + +struct ReadinessConsumerContext { + readiness: ServiceReadiness, + group_id: String, +} + +impl ClientContext for ReadinessConsumerContext {} + +impl ConsumerContext for ReadinessConsumerContext { + fn post_rebalance( + &self, + _base_consumer: &rdkafka::consumer::BaseConsumer, + rebalance: &Rebalance<'_>, + ) { + match rebalance { + Rebalance::Assign(assignment) => { + if assignment.count() == 0 { + self.readiness.mark_kafka_not_ready(); + } else { + self.readiness.mark_kafka_ready(); + info!( + group_id = self.group_id, + partitions = ?topic_partition_list(assignment), + "Kafka partitions assigned" + ); + } + } + Rebalance::Revoke(partitions) => { + self.readiness.mark_kafka_not_ready(); + info!( + group_id = self.group_id, + partitions = ?topic_partition_list(partitions), + "Kafka partitions revoked" + ); + } + Rebalance::Error(err) => { + self.readiness.mark_kafka_not_ready(); + warn!( + group_id = self.group_id, + error = %err, + "Kafka consumer lost readiness during rebalance" + ); + } + } + } +} + +fn topic_partition_list(partitions: &TopicPartitionList) -> Vec { + partitions + .elements() + .iter() + .map(|partition| { + format!("{}:{}", partition.topic(), partition.partition()) + }) + .collect() +} + pub struct KafkaAccountUpdateStream { config: KafkaConfig, + readiness: ServiceReadiness, + shutdown: CancellationToken, } impl KafkaAccountUpdateStream { - pub fn new(config: KafkaConfig) -> Self { - Self { config } + pub fn new( + config: KafkaConfig, + readiness: ServiceReadiness, + shutdown: CancellationToken, + ) -> Self { + Self { + config, + readiness, + shutdown, + } + } + + /// Verify that the configured Kafka broker is reachable by + /// requesting cluster metadata. Does not subscribe to the topic and + /// does not consume any messages. + #[allow(dead_code)] + pub async fn probe(&self) -> GeykagResult<()> { + Self::probe_config(&self.config).await + } + + /// Verify that the configured Kafka broker is reachable by + /// requesting cluster metadata. Does not subscribe to the topic and + /// does not consume any messages. + #[allow(dead_code)] + pub async fn probe_config(config: &KafkaConfig) -> GeykagResult<()> { + use rdkafka::consumer::BaseConsumer; + use rdkafka::consumer::Consumer as _; + use std::time::Duration; + + let config = config.clone(); + + tokio::task::spawn_blocking(move || { + let mut client_config = ClientConfig::new(); + for (key, value) in &config.client { + client_config.set(key, value); + } + client_config + .set("bootstrap.servers", &config.bootstrap_servers) + .set("group.id", &config.group_id) + .set("auto.offset.reset", &config.auto_offset_reset) + .set("enable.auto.commit", "false"); + + let consumer: BaseConsumer = + client_config.create().map_err(|source| { + GeykagError::KafkaConsumerCreate { + broker: config.bootstrap_servers.clone(), + source, + } + })?; + + consumer + .fetch_metadata(None, Duration::from_secs(2)) + .map(|_| ()) + .map_err(|source| GeykagError::PreflightKafkaMetadata { + broker: config.bootstrap_servers.clone(), + source, + }) + }) + .await + .map_err(|source| GeykagError::PreflightKafkaProbeJoin { source })? } pub async fn run( @@ -39,14 +160,24 @@ impl KafkaAccountUpdateStream { .set("auto.offset.reset", &self.config.auto_offset_reset) .set("enable.auto.commit", "true"); - let consumer: StreamConsumer = - client_config.create().map_err(|source| { - GeykagError::KafkaConsumerCreate { - broker: self.config.bootstrap_servers.clone(), - source, - } + let consumer: StreamConsumer = client_config + .create_with_context(ReadinessConsumerContext { + readiness: self.readiness.clone(), + group_id: self.config.group_id.clone(), + }) + .map_err(|source| GeykagError::KafkaConsumerCreate { + broker: self.config.bootstrap_servers.clone(), + source, })?; + info!( + broker = self.config.bootstrap_servers, + topic = self.config.topic, + group_id = self.config.group_id, + auto_offset_reset = self.config.auto_offset_reset, + "Kafka consumer created" + ); + consumer .subscribe(&[&self.config.topic]) .map_err(|source| GeykagError::KafkaSubscribe { @@ -55,52 +186,73 @@ impl KafkaAccountUpdateStream { })?; info!( - broker = self.config.bootstrap_servers, topic = self.config.topic, group_id = self.config.group_id, - auto_offset_reset = self.config.auto_offset_reset, - pubkey_filter = - filter.map(PubkeyFilter::as_str).unwrap_or("(none)"), - "listening for Kafka messages" + "Kafka subscribe issued" ); let mut stream = consumer.stream(); - while let Some(message) = stream.next().await { - match message { - Ok(msg) => { - let Some(payload) = msg.payload() else { - warn!( - partition = msg.partition(), - offset = msg.offset(), - "skipping empty payload" - ); - continue; + loop { + tokio::select! { + _ = self.shutdown.cancelled() => { + info!( + group_id = self.config.group_id, + topic = self.config.topic, + "Kafka consumer shutdown requested" + ); + break; + } + message = stream.next() => { + let Some(message) = message else { + break; }; - match decode_account_update(payload) { - Ok(account) => { - if !account.matches_filter(filter) { + match message { + Ok(msg) => { + let Some(payload) = msg.payload() else { + warn!( + partition = msg.partition(), + offset = msg.offset(), + "skipping empty payload" + ); continue; + }; + + match decode_account_update(payload) { + Ok(account) => { + if !account.matches_filter(filter) { + continue; + } + + debug!( + group_id = self.config.group_id, + partition = msg.partition(), + offset = msg.offset(), + pubkey = %account.pubkey_b58, + write_version = account.write_version, + "Kafka message consumed" + ); + + handler(StreamMessage { + account, + partition: msg.partition(), + offset: msg.offset(), + timestamp: format!("{:?}", msg.timestamp()), + })?; + } + Err(err) => { + warn!( + partition = msg.partition(), + offset = msg.offset(), + error = %err, + "failed to decode message payload" + ); + } } - - handler(StreamMessage { - account, - partition: msg.partition(), - offset: msg.offset(), - timestamp: format!("{:?}", msg.timestamp()), - })?; - } - Err(err) => { - warn!( - partition = msg.partition(), - offset = msg.offset(), - error = %err, - "failed to decode message payload" - ); } + Err(err) => error!(error = %err, "Kafka consumer error"), } } - Err(err) => error!(error = %err, "Kafka consumer error"), } } diff --git a/grpc-service/src/main.rs b/grpc-service/src/main.rs index 9d26052..66258be 100644 --- a/grpc-service/src/main.rs +++ b/grpc-service/src/main.rs @@ -6,9 +6,11 @@ mod grpc_service; mod kafka; mod ksql; mod output; +mod preflight; mod traits; use anyhow::Result; +use tokio_util::sync::CancellationToken; use crate::app::App; use crate::config::Config; @@ -16,11 +18,43 @@ use crate::config::Config; #[tokio::main] async fn main() -> Result<()> { init_tracing(); + tracing::info!("grpc-service process starting"); let config = Config::load()?; - let (app, grpc_handle) = App::new_grpc_with_console(config)?; - let result = app.run().await; + let shutdown = CancellationToken::new(); + let (app, grpc_handle) = + App::new_grpc_with_console(config, shutdown.clone())?; + let mut app_task = tokio::spawn(async move { app.run().await }); + + let app_result = tokio::select! { + result = &mut app_task => result?, + _ = shutdown_signal() => { + tracing::info!("shutdown requested"); + shutdown.cancel(); + app_task.await? + } + }; + grpc_handle.shutdown().await?; - Ok(result?) + Ok(app_result?) +} + +async fn shutdown_signal() { + #[cfg(unix)] + { + use tokio::signal::unix::{SignalKind, signal}; + + let mut terminate = signal(SignalKind::terminate()) + .expect("failed to install SIGTERM handler"); + tokio::select! { + _ = tokio::signal::ctrl_c() => {} + _ = terminate.recv() => {} + } + } + + #[cfg(not(unix))] + { + let _ = tokio::signal::ctrl_c().await; + } } fn init_tracing() { diff --git a/grpc-service/src/preflight.rs b/grpc-service/src/preflight.rs new file mode 100644 index 0000000..6111095 --- /dev/null +++ b/grpc-service/src/preflight.rs @@ -0,0 +1,278 @@ +use std::time::{Duration, Instant}; + +use reqwest::Client; +use serde_json::json; +use tracing::{debug, info, warn}; + +use crate::config::Config; +use crate::errors::{GeykagError, GeykagResult}; +use crate::kafka::KafkaAccountUpdateStream; + +const PROBE_INITIAL_BACKOFF: Duration = Duration::from_millis(250); +const PROBE_MAX_BACKOFF: Duration = Duration::from_secs(2); +const REQUEST_TIMEOUT: Duration = Duration::from_secs(2); + +/// Run startup preflight against all required dependencies. Returns +/// `Ok(())` once every probe has succeeded at least once. Returns a +/// `PreflightTimeout` error tagged with the last failing probe if the +/// total elapsed time exceeds `total_timeout`. +#[allow(dead_code)] +pub async fn wait_for_dependencies( + config: &Config, + total_timeout: Duration, +) -> GeykagResult<()> { + let started = Instant::now(); + let deadline = started + total_timeout; + let http = build_http_client()?; + + info!( + plugin = config.validator.accounts_filter_url, + rpc = config.validator.rpc_url, + kafka = config.kafka.bootstrap_servers, + "startup preflight: probing dependencies" + ); + + run_probe_with_retry("validator-plugin-admin", deadline, || async { + probe_validator_plugin_admin( + &http, + &config.validator.accounts_filter_url, + ) + .await + }) + .await?; + + run_probe_with_retry("validator-rpc", deadline, || async { + probe_validator_rpc_health(&http, &config.validator.rpc_url).await + }) + .await?; + + run_probe_with_retry("kafka-metadata", deadline, || async { + KafkaAccountUpdateStream::probe_config(&config.kafka).await + }) + .await?; + + let elapsed_ms = started.elapsed().as_millis(); + info!(elapsed_ms, "startup preflight: all dependencies ready"); + Ok(()) +} + +#[allow(dead_code)] +async fn run_probe_with_retry( + probe: &'static str, + deadline: Instant, + mut attempt: F, +) -> GeykagResult<()> +where + F: FnMut() -> Fut, + Fut: std::future::Future>, +{ + let started = Instant::now(); + let mut backoff = PROBE_INITIAL_BACKOFF; + loop { + debug!(probe, "startup preflight: attempting probe"); + match attempt().await { + Ok(()) => { + let elapsed_ms = started.elapsed().as_millis(); + info!(probe, elapsed_ms, "startup preflight: probe ok"); + return Ok(()); + } + Err(error) => { + if Instant::now() >= deadline { + let elapsed_ms = started.elapsed().as_millis(); + warn!( + probe, + elapsed_ms, + error = %error, + "startup preflight: probe deadline exceeded" + ); + return Err(GeykagError::PreflightTimeout { + probe, + elapsed_ms, + }); + } + debug!( + probe, + backoff_ms = backoff.as_millis() as u64, + error = %error, + "startup preflight: probe failed; will retry" + ); + tokio::time::sleep(backoff).await; + backoff = (backoff * 2).min(PROBE_MAX_BACKOFF); + } + } + } +} + +#[allow(dead_code)] +fn build_http_client() -> GeykagResult { + Client::builder() + .timeout(REQUEST_TIMEOUT) + .build() + .map_err(|source| GeykagError::PreflightClientBuild { source }) +} + +#[allow(dead_code)] +async fn probe_validator_plugin_admin( + http: &Client, + url: &str, +) -> GeykagResult<()> { + let body = r#"{"pubkeys":[]}"#; + let response = http + .post(url) + .header(reqwest::header::CONTENT_TYPE, "application/json") + .body(body) + .send() + .await + .map_err(|source| GeykagError::PreflightValidatorPluginRequest { + url: url.to_owned(), + source, + })?; + + response.error_for_status().map(|_| ()).map_err(|source| { + GeykagError::PreflightValidatorPluginStatus { + url: url.to_owned(), + source, + } + }) +} + +#[allow(dead_code)] +async fn probe_validator_rpc_health( + http: &Client, + url: &str, +) -> GeykagResult<()> { + let body = json!({ + "jsonrpc": "2.0", + "id": 1, + "method": "getHealth", + }); + let response = http + .post(url) + .header(reqwest::header::CONTENT_TYPE, "application/json") + .json(&body) + .send() + .await + .map_err(|source| GeykagError::PreflightValidatorRpcRequest { + url: url.to_owned(), + source, + })?; + + response.error_for_status().map(|_| ()).map_err(|source| { + GeykagError::PreflightValidatorRpcStatus { + url: url.to_owned(), + source, + } + }) +} + +#[cfg(test)] +mod tests { + use super::*; + use axum::{Router, http::StatusCode, routing::post}; + use std::sync::{ + Arc, + atomic::{AtomicUsize, Ordering}, + }; + + async fn spawn_test_server(router: Router) -> String { + let listener = + tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + tokio::spawn(async move { + axum::serve(listener, router).await.unwrap(); + }); + format!("http://{addr}") + } + + #[tokio::test] + async fn test_probe_validator_plugin_admin_succeeds_on_2xx() { + let app = Router::new() + .route("/filters/accounts", post(|| async { StatusCode::OK })); + let base = spawn_test_server(app).await; + let url = format!("{base}/filters/accounts"); + let http = build_http_client().unwrap(); + + probe_validator_plugin_admin(&http, &url).await.unwrap(); + } + + #[tokio::test] + async fn test_probe_validator_plugin_admin_fails_on_5xx() { + let app = Router::new().route( + "/filters/accounts", + post(|| async { StatusCode::INTERNAL_SERVER_ERROR }), + ); + let base = spawn_test_server(app).await; + let url = format!("{base}/filters/accounts"); + let http = build_http_client().unwrap(); + + let err = probe_validator_plugin_admin(&http, &url).await.unwrap_err(); + assert!(matches!( + err, + GeykagError::PreflightValidatorPluginStatus { .. } + )); + } + + #[tokio::test] + async fn test_probe_validator_rpc_health_succeeds_on_2xx() { + let app = Router::new().route( + "/", + post(|| async { + ( + StatusCode::OK, + [("content-type", "application/json")], + r#"{"jsonrpc":"2.0","result":"ok","id":1}"#, + ) + }), + ); + let base = spawn_test_server(app).await; + let http = build_http_client().unwrap(); + + probe_validator_rpc_health(&http, &base).await.unwrap(); + } + + #[tokio::test] + async fn test_run_probe_with_retry_eventually_succeeds() { + let calls = Arc::new(AtomicUsize::new(0)); + let calls_inner = calls.clone(); + let deadline = Instant::now() + Duration::from_secs(5); + + run_probe_with_retry("test-probe", deadline, move || { + let calls_inner = calls_inner.clone(); + async move { + let n = calls_inner.fetch_add(1, Ordering::SeqCst); + if n < 2 { + Err(GeykagError::PreflightTimeout { + probe: "test", + elapsed_ms: 0, + }) + } else { + Ok(()) + } + } + }) + .await + .unwrap(); + assert_eq!(calls.load(Ordering::SeqCst), 3); + } + + #[tokio::test] + async fn test_run_probe_with_retry_returns_timeout() { + let deadline = Instant::now() + Duration::from_millis(50); + let err = run_probe_with_retry("test-probe", deadline, || async { + Err(GeykagError::PreflightTimeout { + probe: "noop", + elapsed_ms: 0, + }) + }) + .await + .unwrap_err(); + + assert!(matches!( + err, + GeykagError::PreflightTimeout { + probe: "test-probe", + .. + } + )); + } +} diff --git a/ix-tests/Cargo.toml b/ix-tests/Cargo.toml new file mode 100644 index 0000000..4177cb1 --- /dev/null +++ b/ix-tests/Cargo.toml @@ -0,0 +1,27 @@ +[package] +name = "ix-tests" +version = "0.1.0" +edition = "2024" + +[dependencies] +anyhow = "1.0" +bs58 = "0.5" +futures = "0.3" +helius-laserstream = { git = "https://github.com/magicblock-labs/laserstream-sdk", rev = "fe205cb2b85864d1821027d663813d66160285dc" } +nix = { version = "0.30", features = ["signal"] } +reqwest = { version = "0.12", default-features = false, features = ["json", "rustls-tls"] } +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +solana-keypair = "3.1.2" +solana-pubkey = "4.1.0" +solana-rpc-client = { version = "4.0.0-beta.4", default-features = false } +solana-signature = "3.4.0" +solana-signer = "3.0.0" +solana-system-interface = { version = "3.2.0", features = ["bincode"] } +solana-transaction = { version = "3.1.0", features = ["bincode"] } +tokio = { version = "1.47", features = ["macros", "rt-multi-thread", "process", "sync", "time", "fs", "signal"] } +tokio-stream = "0.1" +toml = "0.9.12" +tonic = { version = "0.12", features = ["transport"] } +tracing = "0.1" +tracing-subscriber = { version = "0.3", features = ["env-filter", "fmt"] } diff --git a/ix-tests/configs/grpc-service/service-1.toml b/ix-tests/configs/grpc-service/service-1.toml new file mode 100644 index 0000000..d80b3b1 --- /dev/null +++ b/ix-tests/configs/grpc-service/service-1.toml @@ -0,0 +1,22 @@ +[kafka] +bootstrap_servers = "localhost:9092" +topic = "solana.testnet.account_updates" +group_id = "ix-tests-service-1" +auto_offset_reset = "latest" + +[kafka.client] +"session.timeout.ms" = "6000" +"heartbeat.interval.ms" = "2000" + +[ksql] +url = "http://localhost:8088" +table = "ACCOUNTS" + +[validator] +accounts_filter_url = "http://localhost:3000/filters/accounts" +rpc_url = "http://127.0.0.1:8899" + +[grpc] +bind_host = "0.0.0.0" +port = 50051 +dispatcher_capacity = 4096 diff --git a/ix-tests/configs/grpc-service/service-2.toml b/ix-tests/configs/grpc-service/service-2.toml new file mode 100644 index 0000000..5d3bad7 --- /dev/null +++ b/ix-tests/configs/grpc-service/service-2.toml @@ -0,0 +1,22 @@ +[kafka] +bootstrap_servers = "localhost:9092" +topic = "solana.testnet.account_updates" +group_id = "ix-tests-service-2" +auto_offset_reset = "latest" + +[kafka.client] +"session.timeout.ms" = "6000" +"heartbeat.interval.ms" = "2000" + +[ksql] +url = "http://localhost:8088" +table = "ACCOUNTS" + +[validator] +accounts_filter_url = "http://localhost:3000/filters/accounts" +rpc_url = "http://127.0.0.1:8899" + +[grpc] +bind_host = "0.0.0.0" +port = 50052 +dispatcher_capacity = 4096 diff --git a/ix-tests/configs/suite.toml b/ix-tests/configs/suite.toml new file mode 100644 index 0000000..a66abbe --- /dev/null +++ b/ix-tests/configs/suite.toml @@ -0,0 +1,6 @@ +service_binary = "target/debug/magigblock-grpc-service" +validator_rpc_url = "http://127.0.0.1:8899" +failure_artifact_root = "target/ix-tests/failures" +service_start_timeout_ms = 10000 +checkpoint_timeout_ms = 8000 +transaction_timeout_ms = 2000 diff --git a/ix-tests/src/accounts.rs b/ix-tests/src/accounts.rs new file mode 100644 index 0000000..7500b63 --- /dev/null +++ b/ix-tests/src/accounts.rs @@ -0,0 +1,89 @@ +use std::collections::HashMap; + +use solana_keypair::Keypair; +use solana_pubkey::Pubkey; +use solana_signer::Signer; + +#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)] +pub enum NamedAccount { + SimpleA, + SimpleB, + SimpleC, + SimpleD, + SharedA, + SharedB, + RestartA, + RestartB, + OwnerData, + Hot00, + Hot01, + Hot02, + Hot03, + Hot04, + Hot05, + Hot06, + Hot07, + Hot08, + Hot09, +} + +const ALL_NAMED_ACCOUNTS: &[NamedAccount] = &[ + NamedAccount::SimpleA, + NamedAccount::SimpleB, + NamedAccount::SimpleC, + NamedAccount::SimpleD, + NamedAccount::SharedA, + NamedAccount::SharedB, + NamedAccount::RestartA, + NamedAccount::RestartB, + NamedAccount::OwnerData, + NamedAccount::Hot00, + NamedAccount::Hot01, + NamedAccount::Hot02, + NamedAccount::Hot03, + NamedAccount::Hot04, + NamedAccount::Hot05, + NamedAccount::Hot06, + NamedAccount::Hot07, + NamedAccount::Hot08, + NamedAccount::Hot09, +]; + +pub struct ScenarioAccounts { + keypairs: HashMap, +} + +#[allow(dead_code)] +impl ScenarioAccounts { + pub fn new() -> Self { + let keypairs: HashMap = ALL_NAMED_ACCOUNTS + .iter() + .map(|account| (*account, Keypair::new())) + .collect(); + + let mapping: Vec = ALL_NAMED_ACCOUNTS + .iter() + .map(|account| { + format!("{:?} → {}", account, keypairs[account].pubkey()) + }) + .collect(); + + tracing::info!(accounts = ?mapping, "generated random ScenarioAccounts pubkeys"); + + Self { keypairs } + } + + pub fn keypair(&self, account: NamedAccount) -> &Keypair { + self.keypairs + .get(&account) + .expect("ScenarioAccounts missing keypair for account") + } + + pub fn pubkey(&self, account: NamedAccount) -> Pubkey { + self.keypair(account).pubkey() + } + + pub fn pubkey_b58(&self, account: NamedAccount) -> String { + self.pubkey(account).to_string() + } +} diff --git a/ix-tests/src/artifacts.rs b/ix-tests/src/artifacts.rs new file mode 100644 index 0000000..0eaeace --- /dev/null +++ b/ix-tests/src/artifacts.rs @@ -0,0 +1,180 @@ +use std::path::PathBuf; +use std::time::{SystemTime, UNIX_EPOCH}; + +use anyhow::Context; +use serde::Serialize; + +use crate::client::TestGrpcClient; +use crate::config::SuiteConfig; +use crate::layout::ServiceInstance; +use crate::scenario::ScenarioName; + +#[allow(dead_code)] +pub struct ServiceLogPaths { + pub stdout: PathBuf, + pub stderr: PathBuf, +} + +#[allow(dead_code)] +pub struct RunArtifacts { + run_dir: PathBuf, + run_id: String, + failure_root: PathBuf, + persist_on_failure: bool, +} + +impl RunArtifacts { + pub fn new( + config: &SuiteConfig, + scenario: ScenarioName, + ) -> anyhow::Result { + let pid = std::process::id(); + let timestamp = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_millis(); + let run_dir = PathBuf::from(format!( + "target/ix-tests/tmp/{}-{}", + scenario.as_str(), + pid + )); + let run_id = format!("{}-{}", pid, timestamp); + std::fs::create_dir_all(&run_dir).with_context(|| { + format!("failed to create run dir: {}", run_dir.display()) + })?; + + Ok(Self { + run_dir, + run_id, + failure_root: config.failure_artifact_root.clone(), + persist_on_failure: true, + }) + } + + pub fn run_id(&self) -> &str { + &self.run_id + } + + pub fn generated_service_config_path( + &self, + instance: ServiceInstance, + ) -> PathBuf { + self.run_dir + .join(format!("{}.generated.toml", Self::service_label(instance))) + } + + pub fn service_logs(&self, instance: ServiceInstance) -> ServiceLogPaths { + let label = Self::service_label(instance); + ServiceLogPaths { + stdout: self.run_dir.join(format!("{label}.stdout.log")), + stderr: self.run_dir.join(format!("{label}.stderr.log")), + } + } + + fn service_label(instance: ServiceInstance) -> &'static str { + match instance { + ServiceInstance::One => "service-1", + ServiceInstance::Two => "service-2", + } + } + + pub fn dump_service_logs_at(paths: &ServiceLogPaths) -> anyhow::Result<()> { + for path in [&paths.stdout, &paths.stderr] { + if path.exists() { + let content = + std::fs::read_to_string(path).with_context(|| { + format!( + "failed to read service log: {}", + path.display() + ) + })?; + println!("--- {} ---\n{}", path.display(), content); + } + } + Ok(()) + } + + #[allow(dead_code)] + pub fn dump_service_logs( + &self, + instance: ServiceInstance, + ) -> anyhow::Result<()> { + let paths = self.service_logs(instance); + Self::dump_service_logs_at(&paths) + } + + #[allow(dead_code)] + pub fn client_updates_path(&self, scenario: ScenarioName) -> PathBuf { + self.run_dir + .join(format!("{}-client-updates.json", scenario.as_str())) + } + + pub fn write_client_updates( + &self, + scenario: ScenarioName, + clients: &[TestGrpcClient], + ) -> anyhow::Result<()> { + #[derive(Serialize)] + struct ClientUpdates { + client_id: usize, + service: ServiceInstance, + endpoint: String, + updates: Vec, + } + + let payload = clients + .iter() + .map(|client| ClientUpdates { + client_id: client.id, + service: client.service, + endpoint: client.endpoint.clone(), + updates: client.log().snapshot(), + }) + .collect::>(); + let path = self.client_updates_path(scenario); + let json = serde_json::to_vec_pretty(&payload) + .context("failed to serialize client updates")?; + std::fs::write(&path, json).with_context(|| { + format!("failed to write client updates to {}", path.display()) + }) + } + + #[allow(dead_code)] + pub fn persist_failure(&self) -> anyhow::Result<()> { + let timestamp = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); + let scenario_name = self + .run_dir + .file_name() + .and_then(|n| n.to_str()) + .unwrap_or("unknown"); + let dest = self + .failure_root + .join(format!("{scenario_name}-{timestamp}")); + std::fs::create_dir_all(&self.failure_root).with_context(|| { + format!( + "failed to create failure root: {}", + self.failure_root.display() + ) + })?; + std::fs::rename(&self.run_dir, &dest).with_context(|| { + format!( + "failed to move run dir {} to {}", + self.run_dir.display(), + dest.display() + ) + })?; + Ok(()) + } + + pub fn cleanup_success(&self) -> anyhow::Result<()> { + if self.run_dir.exists() { + std::fs::remove_dir_all(&self.run_dir).with_context(|| { + format!("failed to remove run dir: {}", self.run_dir.display()) + })?; + } + Ok(()) + } +} diff --git a/ix-tests/src/cli.rs b/ix-tests/src/cli.rs new file mode 100644 index 0000000..c1564ab --- /dev/null +++ b/ix-tests/src/cli.rs @@ -0,0 +1,35 @@ +use std::path::PathBuf; + +use anyhow::{Context, bail}; + +pub struct Cli { + pub config_path: PathBuf, + pub scenario: String, +} + +impl Cli { + pub fn parse() -> anyhow::Result { + let mut cli = Self { + config_path: PathBuf::from("ix-tests/configs/suite.toml"), + scenario: "all".to_owned(), + }; + + let mut args = std::env::args().skip(1); + while let Some(arg) = args.next() { + match arg.as_str() { + "--config" => { + let path = + args.next().context("missing value for --config")?; + cli.config_path = PathBuf::from(path); + } + "--scenario" => { + cli.scenario = + args.next().context("missing value for --scenario")?; + } + _ => bail!("invalid CLI argument: {arg}"), + } + } + + Ok(cli) + } +} diff --git a/ix-tests/src/client.rs b/ix-tests/src/client.rs new file mode 100644 index 0000000..5901ce8 --- /dev/null +++ b/ix-tests/src/client.rs @@ -0,0 +1,246 @@ +use std::collections::HashMap; +use std::time::{SystemTime, UNIX_EPOCH}; +use tracing::*; + +use anyhow::Context; +use helius_laserstream::grpc::geyser_client::GeyserClient; +use helius_laserstream::grpc::subscribe_update::UpdateOneof; +use helius_laserstream::grpc::{ + SubscribeRequest, SubscribeRequestFilterAccounts, +}; +use solana_keypair::Signature; +use solana_pubkey::Pubkey; +use tokio::sync::mpsc; +use tokio_stream::StreamExt; +use tokio_stream::wrappers::ReceiverStream; + +use crate::layout::ServiceInstance; +use crate::observation::{ClientLog, ObservedUpdate}; + +#[allow(dead_code)] +pub struct TestGrpcClient { + pub id: usize, + pub service: ServiceInstance, + pub endpoint: String, + log: ClientLog, + request_tx: mpsc::Sender, + receive_task: tokio::task::JoinHandle>, +} + +#[allow(dead_code)] +impl TestGrpcClient { + pub async fn connect( + id: usize, + service: ServiceInstance, + endpoint: String, + ) -> anyhow::Result { + let mut client = + GeyserClient::connect(endpoint.clone()).await.with_context( + || format!("client {id}: failed to connect to {endpoint}"), + )?; + + let (tx, rx) = mpsc::channel::(16); + + tx.send(SubscribeRequest::default()) + .await + .context("failed to send initial empty subscribe request")?; + + let response = client + .subscribe(ReceiverStream::new(rx)) + .await + .with_context(|| format!("client {id}: subscribe call failed"))?; + let mut update_stream = response.into_inner(); + + let log = ClientLog::new(); + let log_clone = log.clone(); + + fn pubkey_str(bytes: &[u8]) -> Option { + if bytes.is_empty() { + return Some(String::new()); + } + match Pubkey::try_from(bytes.to_vec()) { + Ok(pubkey) => Some(pubkey.to_string()), + Err(err) => { + error!( + bytes = ?bytes, + err = ?err, + "failed to parse pubkey" + ); + None + } + } + } + fn txn_signature_str(bytes: &[u8]) -> Option { + if bytes.is_empty() { + return Some(String::new()); + } + match Signature::try_from(bytes.to_vec()) { + Ok(signature) => Some(signature.to_string()), + Err(err) => { + error!( + bytes = ?bytes, + err = ?err, + "failed to parse txn signature" + ); + None + } + } + } + + let receive_task = tokio::spawn(async move { + while let Some(item) = update_stream.next().await { + match item { + Ok(update) => { + if let Some(UpdateOneof::Account(account_update)) = + update.update_oneof + && let Some(info) = account_update.account + { + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_millis(); + + let Some(pubkey_b58) = pubkey_str(&info.pubkey) + else { + continue; + }; + let Some(owner_b58) = pubkey_str(&info.owner) + else { + continue; + }; + let txn_signature_b58 = + match info.txn_signature.as_ref() { + Some(signature) => { + let Some(signature_b58) = + txn_signature_str(signature) + else { + continue; + }; + Some(signature_b58) + } + None => None, + }; + + let observed = ObservedUpdate { + client_id: id, + service, + pubkey_b58, + slot: account_update.slot, + lamports: info.lamports, + owner_b58, + executable: info.executable, + rent_epoch: info.rent_epoch, + write_version: info.write_version, + txn_signature_b58, + data: info.data, + received_at_millis: now, + }; + + trace!( + client_id = id, + pubkey = %observed.pubkey_b58, + slot = observed.slot, + lamports = observed.lamports, + owner = %observed.owner_b58, + executable = observed.executable, + rent_epoch = observed.rent_epoch, + write_version = observed.write_version, + txn_signature = ?observed.txn_signature_b58.as_deref(), + data_len = observed.data.len(), + "received account update" + ); + + log_clone.push(observed); + } + } + Err(status) => { + tracing::warn!( + client_id = id, + %status, + "stream error" + ); + break; + } + } + } + Ok(()) + }); + + Ok(Self { + id, + service, + endpoint, + log, + request_tx: tx, + receive_task, + }) + } + + pub async fn replace_subscription( + &self, + pubkeys: &[String], + ) -> anyhow::Result<()> { + let req = SubscribeRequest { + accounts: HashMap::from([( + "replace".to_owned(), + SubscribeRequestFilterAccounts { + account: pubkeys.to_vec(), + ..Default::default() + }, + )]), + ..Default::default() + }; + self.request_tx + .send(req) + .await + .context("failed to send replace subscription request") + } + + pub async fn patch_subscription( + &self, + add: &[String], + remove: &[String], + ) -> anyhow::Result<()> { + let mut accounts = HashMap::new(); + if !add.is_empty() { + accounts.insert( + "add".to_owned(), + SubscribeRequestFilterAccounts { + account: add.to_vec(), + ..Default::default() + }, + ); + } + if !remove.is_empty() { + accounts.insert( + "remove".to_owned(), + SubscribeRequestFilterAccounts { + account: remove.to_vec(), + ..Default::default() + }, + ); + } + let req = SubscribeRequest { + accounts, + ..Default::default() + }; + self.request_tx + .send(req) + .await + .context("failed to send patch subscription request") + } + + pub fn log(&self) -> &ClientLog { + &self.log + } + + pub async fn shutdown(self) -> anyhow::Result<()> { + drop(self.request_tx); + self.receive_task.abort(); + match self.receive_task.await { + Ok(result) => result, + Err(e) if e.is_cancelled() => Ok(()), + Err(e) => Err(e.into()), + } + } +} diff --git a/ix-tests/src/config.rs b/ix-tests/src/config.rs new file mode 100644 index 0000000..6b35f49 --- /dev/null +++ b/ix-tests/src/config.rs @@ -0,0 +1,55 @@ +use std::path::{Path, PathBuf}; + +use serde::Deserialize; + +#[derive(Clone, Debug)] +pub struct SuiteConfig { + pub service_binary: PathBuf, + pub validator_rpc_url: String, + pub failure_artifact_root: PathBuf, + pub service_start_timeout_ms: u64, + pub checkpoint_timeout_ms: u64, + pub transaction_timeout_ms: u64, +} + +#[derive(Debug, Deserialize)] +struct FileSuiteConfig { + #[serde(default)] + service_binary: Option, + #[serde(default)] + validator_rpc_url: Option, + #[serde(default)] + failure_artifact_root: Option, + #[serde(default)] + service_start_timeout_ms: Option, + #[serde(default)] + checkpoint_timeout_ms: Option, + #[serde(default)] + transaction_timeout_ms: Option, +} + +impl SuiteConfig { + pub fn load(path: &Path) -> anyhow::Result { + let contents = std::fs::read_to_string(path)?; + let file: FileSuiteConfig = toml::from_str(&contents)?; + + Ok(SuiteConfig { + service_binary: file.service_binary.unwrap_or_else(|| { + PathBuf::from("target/debug/magigblock-grpc-service") + }), + validator_rpc_url: file + .validator_rpc_url + .unwrap_or_else(|| "http://127.0.0.1:8899".to_owned()), + failure_artifact_root: file + .failure_artifact_root + .unwrap_or_else(|| PathBuf::from("target/ix-tests/failures")), + service_start_timeout_ms: file + .service_start_timeout_ms + .unwrap_or(10_000), + checkpoint_timeout_ms: file.checkpoint_timeout_ms.unwrap_or(20_000), + transaction_timeout_ms: file + .transaction_timeout_ms + .unwrap_or(20_000), + }) + } +} diff --git a/ix-tests/src/context.rs b/ix-tests/src/context.rs new file mode 100644 index 0000000..ca676f6 --- /dev/null +++ b/ix-tests/src/context.rs @@ -0,0 +1,16 @@ +use crate::accounts::ScenarioAccounts; +use crate::artifacts::RunArtifacts; +use crate::config::SuiteConfig; +use crate::expectation::CheckpointRunner; +use crate::service::ServiceController; +use crate::validator::ValidatorDriver; + +#[allow(dead_code)] +pub struct ScenarioContext { + pub suite_config: SuiteConfig, + pub artifacts: RunArtifacts, + pub service_controller: ServiceController, + pub validator: ValidatorDriver, + pub checkpoint_runner: CheckpointRunner, + pub accounts: ScenarioAccounts, +} diff --git a/ix-tests/src/expectation.rs b/ix-tests/src/expectation.rs new file mode 100644 index 0000000..43440e7 --- /dev/null +++ b/ix-tests/src/expectation.rs @@ -0,0 +1,288 @@ +use std::time::{Duration, Instant}; +use tracing::*; + +use anyhow::{Context, bail}; +use tokio::time::sleep; + +use crate::client::TestGrpcClient; +use crate::config::SuiteConfig; +use crate::observation::ObservedUpdate; + +#[derive(Clone, Debug, Default)] +#[allow(dead_code)] +pub struct ExpectedUpdate { + pub pubkey_b58: Option, + pub slot: Option, + pub lamports: Option, + pub owner_b58: Option, + pub executable: Option, + pub rent_epoch: Option, + pub write_version: Option, + pub txn_signature_b58: Option>, + pub data: Option>, +} + +#[derive(Clone, Debug, Default)] +#[allow(dead_code)] +pub struct ClientCheckpoint { + pub client_id: usize, + pub required: Vec, +} + +#[derive(Clone, Debug, Default)] +#[allow(dead_code)] +pub struct CheckpointSpec { + pub name: &'static str, + pub checkpoints: Vec, +} + +#[derive(Clone, Debug)] +#[allow(dead_code)] +pub struct CheckpointRunner { + timeout: Duration, +} + +#[allow(dead_code)] +impl ExpectedUpdate { + pub fn matches(&self, observed: &ObservedUpdate) -> bool { + let mismatches = self.mismatches(observed); + if !mismatches.is_empty() { + warn!("Mismatches:\n {}", mismatches.join("\n ")); + } + mismatches.is_empty() + } + + fn matches_quietly(&self, observed: &ObservedUpdate) -> bool { + self.mismatches(observed).is_empty() + } + + fn mismatches(&self, observed: &ObservedUpdate) -> Vec { + let mut mismatches = Vec::new(); + if let Some(expected) = &self.pubkey_b58 + && observed.pubkey_b58 != *expected + { + mismatches.push(format!( + "pubkey_b58: expected {}, got {}", + expected, observed.pubkey_b58 + )); + } + if let Some(expected) = self.slot + && observed.slot != expected + { + mismatches.push(format!( + "slot: expected {}, got {}", + expected, observed.slot + )); + } + if let Some(expected) = self.lamports + && observed.lamports != expected + { + mismatches.push(format!( + "lamports: expected {}, got {}", + expected, observed.lamports + )); + } + if let Some(expected) = &self.owner_b58 + && observed.owner_b58 != *expected + { + mismatches.push(format!( + "owner_b58: expected {}, got {}", + expected, observed.owner_b58 + )); + } + + if let Some(expected) = self.executable + && observed.executable != expected + { + mismatches.push(format!( + "executable: expected {}, got {}", + expected, observed.executable + )); + } + + if let Some(expected) = self.rent_epoch + && observed.rent_epoch != expected + { + mismatches.push(format!( + "rent_epoch: expected {}, got {}", + expected, observed.rent_epoch + )); + } + + if let Some(expected) = self.write_version + && observed.write_version != expected + { + mismatches.push(format!( + "write_version: expected {}, got {}", + expected, observed.write_version + )); + } + + if let Some(expected) = &self.txn_signature_b58 + && observed.txn_signature_b58.as_ref() != expected.as_ref() + { + mismatches.push(format!( + "txn_signature_b58: expected {:?}, got {:?}", + expected, observed.txn_signature_b58 + )); + } + + if let Some(expected) = &self.data + && observed.data != *expected + { + mismatches.push(format!( + "data: expected {:?}, got {:?}", + expected, observed.data + )); + } + + mismatches + } +} + +#[allow(dead_code)] +impl CheckpointRunner { + pub fn new(config: &SuiteConfig) -> Self { + Self { + timeout: Duration::from_millis(config.checkpoint_timeout_ms), + } + } + + pub async fn wait_until_satisfied( + &self, + spec: &CheckpointSpec, + clients: &[TestGrpcClient], + ) -> anyhow::Result<()> { + let deadline = Instant::now() + self.timeout; + + // For each spec we take the next (in order of arrival) state from + // the matching client and compare them + for check_point in &spec.checkpoints { + let client = clients + .iter() + .find(|client| client.id == check_point.client_id) + .with_context(|| { + format!( + "checkpoint '{}' references unknown client {}", + spec.name, check_point.client_id + ) + })?; + + let mut matched = vec![false; check_point.required.len()]; + while matched.iter().any(|is_matched| !*is_matched) { + let client_state = client.log().consume_next_update(); + if let Some(observed) = client_state { + let matched_idx = next_required_match( + &check_point.required, + &matched, + &observed, + ); + + if let Some(idx) = matched_idx { + matched[idx] = true; + trace!( + checkpoint = spec.name, + idx, + client_id = check_point.client_id, + "matched expected update: {:#?}", + check_point.required[idx] + ); + } else { + debug!( + checkpoint = spec.name, + client_id = check_point.client_id, + observed = ?observed, + "skipping non-required update while waiting for checkpoint" + ); + } + } else if Instant::now() > deadline { + let missing = matched + .iter() + .enumerate() + .filter_map(|(idx, is_matched)| { + (!*is_matched).then_some(idx) + }) + .collect::>(); + bail!( + "checkpoint '{}' timed out waiting for client {}; missing required update indexes {:?}", + spec.name, + check_point.client_id, + missing + ); + } else { + sleep(Duration::from_millis(50)).await; + } + } + } + Ok(()) + } +} + +fn next_required_match( + required: &[ExpectedUpdate], + matched: &[bool], + observed: &ObservedUpdate, +) -> Option { + let next_for_observed_pubkey = + required.iter().enumerate().find(|(idx, expected)| { + !matched[*idx] + && expected.pubkey_b58.as_deref() + == Some(observed.pubkey_b58.as_str()) + }); + + if let Some((idx, expected)) = next_for_observed_pubkey { + return expected.matches_quietly(observed).then_some(idx); + } + + required.iter().enumerate().find_map(|(idx, expected)| { + (!matched[idx] + && expected.pubkey_b58.is_none() + && expected.matches_quietly(observed)) + .then_some(idx) + }) +} + +#[cfg(test)] +mod tests { + use crate::layout::ServiceInstance; + use crate::observation::ObservedUpdate; + + use super::ExpectedUpdate; + + fn observed_update() -> ObservedUpdate { + ObservedUpdate { + client_id: 7, + service: ServiceInstance::One, + pubkey_b58: "pubkey".to_owned(), + slot: 42, + lamports: 99, + owner_b58: "owner".to_owned(), + executable: false, + rent_epoch: 5, + write_version: 6, + txn_signature_b58: Some("sig".to_owned()), + data: vec![1, 2, 3], + received_at_millis: 123, + } + } + + #[test] + fn matches_ignores_none_fields() { + let expected = ExpectedUpdate { + lamports: Some(99), + ..Default::default() + }; + + assert!(expected.matches(&observed_update())); + } + + #[test] + fn matches_rejects_mismatched_fields() { + let expected = ExpectedUpdate { + lamports: Some(100), + ..Default::default() + }; + + assert!(!expected.matches(&observed_update())); + } +} diff --git a/ix-tests/src/layout.rs b/ix-tests/src/layout.rs new file mode 100644 index 0000000..d2e0da0 --- /dev/null +++ b/ix-tests/src/layout.rs @@ -0,0 +1,43 @@ +use serde::Serialize; + +use crate::scenario::ScenarioName; + +#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize)] +#[allow(dead_code)] +pub enum ServiceInstance { + One, + Two, +} + +#[allow(dead_code)] +pub struct ScenarioLayout { + pub services: Vec, + pub client_count: usize, +} + +#[allow(dead_code)] +impl ScenarioLayout { + pub fn for_scenario(name: ScenarioName) -> Self { + match name { + ScenarioName::SingleTriage => Self { + services: vec![ServiceInstance::One], + client_count: 1, + }, + ScenarioName::SingleBasic => Self { + services: vec![ServiceInstance::One], + client_count: 4, + }, + ScenarioName::SingleLoad => Self { + services: vec![ServiceInstance::One], + client_count: 100, + }, + ScenarioName::DualConcurrent | ScenarioName::DualRestart => Self { + services: vec![ServiceInstance::One, ServiceInstance::Two], + client_count: 20, + }, + ScenarioName::All => { + unreachable!("All is expanded before layout") + } + } + } +} diff --git a/ix-tests/src/main.rs b/ix-tests/src/main.rs new file mode 100644 index 0000000..4696e6d --- /dev/null +++ b/ix-tests/src/main.rs @@ -0,0 +1,110 @@ +mod accounts; +mod artifacts; +mod cli; +#[allow(dead_code)] +mod client; +mod config; +mod context; +mod expectation; +mod layout; +#[allow(dead_code)] +mod observation; +mod runner; +mod scenario; +mod scenarios; +mod service; +#[allow(dead_code)] +mod validator; + +use tracing::{info, warn}; + +use crate::accounts::ScenarioAccounts; +use crate::context::ScenarioContext; +use crate::expectation::CheckpointRunner; +use crate::service::ServiceController; +use crate::validator::ValidatorDriver; + +fn init_tracing() { + tracing_subscriber::fmt() + .with_env_filter( + tracing_subscriber::EnvFilter::try_from_default_env() + .unwrap_or_else(|_| "ix_tests=info".into()), + ) + .without_time() + .with_file(true) + .with_line_number(true) + .with_target(false) + .init(); +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + init_tracing(); + + let cli = cli::Cli::parse()?; + let config = config::SuiteConfig::load(&cli.config_path)?; + let requested = scenario::ScenarioName::parse(&cli.scenario)?; + + info!( + config_path = %cli.config_path.display(), + scenario = requested.as_str(), + service_binary = %config.service_binary.display(), + validator_rpc_url = %config.validator_rpc_url, + failure_artifact_root = %config.failure_artifact_root.display(), + service_start_timeout_ms = config.service_start_timeout_ms, + checkpoint_timeout_ms = config.checkpoint_timeout_ms, + transaction_timeout_ms = config.transaction_timeout_ms, + "loaded integration test suite config" + ); + + let scenarios = runner::ordered_scenarios(requested); + let names: Vec<&str> = scenarios.iter().map(|s| s.as_str()).collect(); + info!(scenarios = ?names, "resolved scenario execution order"); + + for scenario in &scenarios { + info!(scenario = scenario.as_str(), "running scenario"); + let artifacts = artifacts::RunArtifacts::new(&config, *scenario)?; + let ctx = ScenarioContext { + suite_config: config.clone(), + artifacts, + service_controller: ServiceController::new(&config), + validator: ValidatorDriver::new(&config), + checkpoint_runner: CheckpointRunner::new(&config), + accounts: ScenarioAccounts::new(), + }; + + match scenarios::run_scenario(*scenario, &ctx).await { + Ok(()) => { + ctx.artifacts.cleanup_success()?; + info!(scenario = scenario.as_str(), "scenario passed"); + } + Err(failure) => { + let scenarios::ScenarioFailure { + error: original_error, + clients, + } = failure; + + if !clients.is_empty() + && let Err(error) = + ctx.artifacts.write_client_updates(*scenario, &clients) + { + warn!(?error, "failed to write client updates artifact"); + } + for service in + [layout::ServiceInstance::One, layout::ServiceInstance::Two] + { + if let Err(error) = ctx.artifacts.dump_service_logs(service) + { + warn!(?error, ?service, "failed to dump service logs"); + } + } + if let Err(error) = ctx.artifacts.persist_failure() { + warn!(?error, "failed to persist failure artifacts"); + } + return Err(original_error); + } + } + } + + Ok(()) +} diff --git a/ix-tests/src/observation.rs b/ix-tests/src/observation.rs new file mode 100644 index 0000000..a44c546 --- /dev/null +++ b/ix-tests/src/observation.rs @@ -0,0 +1,68 @@ +use std::sync::{Arc, Mutex}; + +use serde::Serialize; + +use crate::layout::ServiceInstance; + +#[allow(dead_code)] +#[derive(Clone, Debug, Serialize)] +pub struct ObservedUpdate { + pub client_id: usize, + pub service: ServiceInstance, + pub pubkey_b58: String, + pub slot: u64, + pub lamports: u64, + pub owner_b58: String, + pub executable: bool, + pub rent_epoch: u64, + pub write_version: u64, + pub txn_signature_b58: Option, + pub data: Vec, + pub received_at_millis: u128, +} + +#[derive(Clone)] +pub struct ClientLog { + entries: Arc>>, +} + +#[allow(dead_code)] +impl ClientLog { + pub fn new() -> Self { + Self { + entries: Arc::new(Mutex::new(Vec::new())), + } + } + + pub fn push(&self, update: ObservedUpdate) { + self.entries.lock().unwrap().push(update); + } + + pub fn snapshot(&self) -> Vec { + self.entries.lock().unwrap().clone() + } + + pub fn snapshot_from(&self, start_index: usize) -> Vec { + let guard = self.entries.lock().unwrap(); + if start_index >= guard.len() { + Vec::new() + } else { + guard[start_index..].to_vec() + } + } + + /// Takes the next update in the order it came in + /// Removes and returns it from the log + pub fn consume_next_update(&self) -> Option { + let mut guard = self.entries.lock().unwrap(); + if guard.is_empty() { + None + } else { + Some(guard.remove(0)) + } + } + + pub fn len(&self) -> usize { + self.entries.lock().unwrap().len() + } +} diff --git a/ix-tests/src/runner.rs b/ix-tests/src/runner.rs new file mode 100644 index 0000000..612e0cd --- /dev/null +++ b/ix-tests/src/runner.rs @@ -0,0 +1,14 @@ +use crate::scenario::ScenarioName; + +pub fn ordered_scenarios(requested: ScenarioName) -> Vec { + match requested { + ScenarioName::All => vec![ + ScenarioName::SingleTriage, + ScenarioName::SingleBasic, + ScenarioName::SingleLoad, + ScenarioName::DualConcurrent, + ScenarioName::DualRestart, + ], + concrete => vec![concrete], + } +} diff --git a/ix-tests/src/scenario.rs b/ix-tests/src/scenario.rs new file mode 100644 index 0000000..d9c324b --- /dev/null +++ b/ix-tests/src/scenario.rs @@ -0,0 +1,36 @@ +use anyhow::bail; + +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub enum ScenarioName { + All, + SingleTriage, + SingleBasic, + SingleLoad, + DualConcurrent, + DualRestart, +} + +impl ScenarioName { + pub fn parse(input: &str) -> anyhow::Result { + match input { + "all" => Ok(Self::All), + "single-triage" => Ok(Self::SingleTriage), + "single-basic" => Ok(Self::SingleBasic), + "single-load" => Ok(Self::SingleLoad), + "dual-concurrent" => Ok(Self::DualConcurrent), + "dual-restart" => Ok(Self::DualRestart), + _ => bail!("unknown scenario: {input}"), + } + } + + pub fn as_str(&self) -> &'static str { + match self { + Self::All => "all", + Self::SingleTriage => "single-triage", + Self::SingleBasic => "single-basic", + Self::SingleLoad => "single-load", + Self::DualConcurrent => "dual-concurrent", + Self::DualRestart => "dual-restart", + } + } +} diff --git a/ix-tests/src/scenarios/dual_concurrent.rs b/ix-tests/src/scenarios/dual_concurrent.rs new file mode 100644 index 0000000..9cc48d8 --- /dev/null +++ b/ix-tests/src/scenarios/dual_concurrent.rs @@ -0,0 +1,191 @@ +use anyhow::Context; + +use crate::accounts::NamedAccount; +use crate::client::TestGrpcClient; +use crate::context::ScenarioContext; +use crate::expectation::{CheckpointSpec, ClientCheckpoint, ExpectedUpdate}; +use crate::layout::ServiceInstance; +use crate::scenarios::ScenarioFailure; +use crate::service::{ServiceHandle, ServiceSpec}; + +pub async fn run(ctx: &ScenarioContext) -> Result<(), ScenarioFailure> { + let spec_one = ServiceSpec::for_instance(ServiceInstance::One); + let spec_two = ServiceSpec::for_instance(ServiceInstance::Two); + let mut services = Vec::new(); + let mut clients = Vec::new(); + + let result = + run_inner(ctx, &spec_one, &spec_two, &mut services, &mut clients).await; + if let Err(error) = result { + return Err(ScenarioFailure { error, clients }); + } + + shutdown_clients(clients) + .await + .map_err(scenario_failure_without_clients)?; + shutdown_services(&ctx.service_controller, services) + .await + .map_err(scenario_failure_without_clients)?; + Ok(()) +} + +async fn run_inner( + ctx: &ScenarioContext, + spec_one: &ServiceSpec, + spec_two: &ServiceSpec, + services: &mut Vec, + clients: &mut Vec, +) -> anyhow::Result<()> { + services.push( + ctx.service_controller + .start(spec_one, &ctx.artifacts) + .await?, + ); + services.push( + ctx.service_controller + .start(spec_two, &ctx.artifacts) + .await?, + ); + + let simple_a = ctx.accounts.pubkey_b58(NamedAccount::SimpleA); + let simple_b = ctx.accounts.pubkey_b58(NamedAccount::SimpleB); + let shared_a = ctx.accounts.pubkey_b58(NamedAccount::SharedA); + + for id in 0..10 { + let client = TestGrpcClient::connect( + id, + ServiceInstance::One, + spec_one.endpoint.clone(), + ) + .await + .with_context(|| { + format!("failed to connect service-one client {id}") + })?; + let subscription = if id < 5 { + vec![simple_a.clone()] + } else { + vec![shared_a.clone()] + }; + client.replace_subscription(&subscription).await?; + clients.push(client); + } + + for id in 10..20 { + let client = TestGrpcClient::connect( + id, + ServiceInstance::Two, + spec_two.endpoint.clone(), + ) + .await + .with_context(|| { + format!("failed to connect service-two client {id}") + })?; + let subscription = if id < 15 { + vec![simple_b.clone()] + } else { + vec![shared_a.clone()] + }; + client.replace_subscription(&subscription).await?; + clients.push(client); + } + + ctx.validator.fund_payer().await?; + + let sigs = ctx + .validator + .airdrops(vec![ + (ctx.accounts.pubkey(NamedAccount::SimpleA), 1_111_111), + (ctx.accounts.pubkey(NamedAccount::SimpleB), 2_222_222), + (ctx.accounts.pubkey(NamedAccount::SharedA), 3_333_333), + ]) + .await?; + let [simple_a_sig, simple_b_sig, shared_a_sig]: [String; 3] = + sigs.try_into().expect("expected three airdrop signatures"); + + let simple_a_expected = ExpectedUpdate { + pubkey_b58: Some(simple_a), + lamports: Some(1_111_111), + txn_signature_b58: Some(Some(simple_a_sig)), + ..Default::default() + }; + let simple_b_expected = ExpectedUpdate { + pubkey_b58: Some(simple_b), + lamports: Some(2_222_222), + txn_signature_b58: Some(Some(simple_b_sig)), + ..Default::default() + }; + let shared_a_expected = ExpectedUpdate { + pubkey_b58: Some(shared_a), + lamports: Some(3_333_333), + txn_signature_b58: Some(Some(shared_a_sig)), + ..Default::default() + }; + + let mut checkpoint_clients = Vec::new(); + for client_id in 0..5 { + checkpoint_clients.push(single_update_checkpoint( + client_id, + simple_a_expected.clone(), + )); + } + for client_id in 5..10 { + checkpoint_clients.push(single_update_checkpoint( + client_id, + shared_a_expected.clone(), + )); + } + for client_id in 10..15 { + checkpoint_clients.push(single_update_checkpoint( + client_id, + simple_b_expected.clone(), + )); + } + for client_id in 15..20 { + checkpoint_clients.push(single_update_checkpoint( + client_id, + shared_a_expected.clone(), + )); + } + + let checkpoint = CheckpointSpec { + name: "dual-concurrent-routing", + checkpoints: checkpoint_clients, + }; + ctx.checkpoint_runner + .wait_until_satisfied(&checkpoint, clients) + .await +} + +fn single_update_checkpoint( + client_id: usize, + expected: ExpectedUpdate, +) -> ClientCheckpoint { + ClientCheckpoint { + client_id, + required: vec![expected], + } +} + +async fn shutdown_clients(clients: Vec) -> anyhow::Result<()> { + for client in clients { + client.shutdown().await?; + } + Ok(()) +} + +async fn shutdown_services( + controller: &crate::service::ServiceController, + services: Vec, +) -> anyhow::Result<()> { + for service in services { + controller.shutdown(service).await?; + } + Ok(()) +} + +fn scenario_failure_without_clients(error: anyhow::Error) -> ScenarioFailure { + ScenarioFailure { + error, + clients: Vec::new(), + } +} diff --git a/ix-tests/src/scenarios/dual_restart.rs b/ix-tests/src/scenarios/dual_restart.rs new file mode 100644 index 0000000..90643c0 --- /dev/null +++ b/ix-tests/src/scenarios/dual_restart.rs @@ -0,0 +1,461 @@ +use anyhow::{Context, bail}; + +use crate::accounts::{NamedAccount, ScenarioAccounts}; +use crate::client::TestGrpcClient; +use crate::context::ScenarioContext; +use crate::expectation::{CheckpointSpec, ClientCheckpoint, ExpectedUpdate}; +use crate::layout::ServiceInstance; +use crate::observation::ClientLog; +use crate::scenarios::ScenarioFailure; +use crate::service::{ServiceHandle, ServiceSpec}; + +const PRE_RESTART_A_AIRDROP_LAMPORTS: u64 = 4_444_444; +const PRE_RESTART_B_AIRDROP_LAMPORTS: u64 = 5_555_555; +const PRE_SHARED_B_AIRDROP_LAMPORTS: u64 = 6_666_666; + +const DURING_RESTART_A_AIRDROP_LAMPORTS: u64 = 7_777_777; +const DURING_SHARED_B_AIRDROP_LAMPORTS: u64 = 8_888_888; + +const POST_RESTART_A_AIRDROP_LAMPORTS: u64 = 9_999_999; +const POST_SHARED_B_AIRDROP_LAMPORTS: u64 = 10_101_010; + +// Account updates report the account's resulting balance, not the +// individual airdrop delta. The first airdrops below target fresh random +// accounts, so their resulting balances are equal to their airdrop amounts. +const PRE_RESTART_A_EXPECTED_BALANCE: u64 = PRE_RESTART_A_AIRDROP_LAMPORTS; +const PRE_RESTART_B_EXPECTED_BALANCE: u64 = PRE_RESTART_B_AIRDROP_LAMPORTS; +const PRE_SHARED_B_EXPECTED_BALANCE: u64 = PRE_SHARED_B_AIRDROP_LAMPORTS; + +// During restart, service one is offline but the validator still applies both +// airdrops. SharedB already has the pre-restart balance when this update is +// emitted, so the expected lamports are cumulative. +const DURING_RESTART_A_EXPECTED_BALANCE: u64 = + PRE_RESTART_A_EXPECTED_BALANCE + DURING_RESTART_A_AIRDROP_LAMPORTS; +const DURING_SHARED_B_EXPECTED_BALANCE: u64 = + PRE_SHARED_B_EXPECTED_BALANCE + DURING_SHARED_B_AIRDROP_LAMPORTS; + +// After service one restarts, live updates again carry full account balances. +// These expectations include all earlier airdrops to the same account. +const POST_RESTART_A_EXPECTED_BALANCE: u64 = + DURING_RESTART_A_EXPECTED_BALANCE + POST_RESTART_A_AIRDROP_LAMPORTS; +const POST_SHARED_B_EXPECTED_BALANCE: u64 = + DURING_SHARED_B_EXPECTED_BALANCE + POST_SHARED_B_AIRDROP_LAMPORTS; + +pub async fn run(ctx: &ScenarioContext) -> Result<(), ScenarioFailure> { + let spec_one = ServiceSpec::for_instance(ServiceInstance::One); + let spec_two = ServiceSpec::for_instance(ServiceInstance::Two); + let mut service_one = Some( + ctx.service_controller + .start(&spec_one, &ctx.artifacts) + .await + .map_err(scenario_failure_without_clients)?, + ); + let mut service_two = Some( + ctx.service_controller + .start(&spec_two, &ctx.artifacts) + .await + .map_err(scenario_failure_without_clients)?, + ); + let mut active_clients = Vec::new(); + + let result = run_inner( + ctx, + &spec_one, + &spec_two, + &mut service_one, + &mut active_clients, + ) + .await; + if let Err(error) = result { + return Err(ScenarioFailure { + error, + clients: active_clients, + }); + } + + shutdown_clients(active_clients) + .await + .map_err(scenario_failure_without_clients)?; + shutdown_service(&ctx.service_controller, &mut service_one) + .await + .map_err(scenario_failure_without_clients)?; + shutdown_service(&ctx.service_controller, &mut service_two) + .await + .map_err(scenario_failure_without_clients)?; + Ok(()) +} + +async fn run_inner( + ctx: &ScenarioContext, + spec_one: &ServiceSpec, + spec_two: &ServiceSpec, + service_one: &mut Option, + active_clients: &mut Vec, +) -> anyhow::Result<()> { + connect_service_one_clients( + &ctx.accounts, + active_clients, + &spec_one.endpoint, + ) + .await?; + connect_service_two_clients( + &ctx.accounts, + active_clients, + &spec_two.endpoint, + ) + .await?; + + ctx.validator.fund_payer().await?; + + let sigs = ctx + .validator + .airdrops(vec![ + ( + ctx.accounts.pubkey(NamedAccount::RestartA), + PRE_RESTART_A_AIRDROP_LAMPORTS, + ), + ( + ctx.accounts.pubkey(NamedAccount::RestartB), + PRE_RESTART_B_AIRDROP_LAMPORTS, + ), + ( + ctx.accounts.pubkey(NamedAccount::SharedB), + PRE_SHARED_B_AIRDROP_LAMPORTS, + ), + ]) + .await?; + let [restart_a_sig, restart_b_sig, shared_b_sig]: [String; 3] = + sigs.try_into().expect("expected three airdrop signatures"); + + let pre_restart = CheckpointSpec { + name: "pre-restart", + checkpoints: vec![ + repeated_checkpoint( + 0..5, + expected_update( + ctx.accounts.pubkey_b58(NamedAccount::RestartA), + PRE_RESTART_A_EXPECTED_BALANCE, + restart_a_sig, + ), + ), + repeated_checkpoint( + 5..10, + expected_update( + ctx.accounts.pubkey_b58(NamedAccount::SharedB), + PRE_SHARED_B_EXPECTED_BALANCE, + shared_b_sig.clone(), + ), + ), + repeated_checkpoint( + 10..15, + expected_update( + ctx.accounts.pubkey_b58(NamedAccount::RestartB), + PRE_RESTART_B_EXPECTED_BALANCE, + restart_b_sig, + ), + ), + repeated_checkpoint( + 15..20, + expected_update( + ctx.accounts.pubkey_b58(NamedAccount::SharedB), + PRE_SHARED_B_EXPECTED_BALANCE, + shared_b_sig, + ), + ), + ] + .into_iter() + .flatten() + .collect(), + }; + ctx.checkpoint_runner + .wait_until_satisfied(&pre_restart, active_clients) + .await?; + + let parked_logs = shutdown_service_one_clients(active_clients).await?; + shutdown_service(&ctx.service_controller, service_one).await?; + + let sigs = ctx + .validator + .airdrops(vec![ + ( + ctx.accounts.pubkey(NamedAccount::RestartA), + DURING_RESTART_A_AIRDROP_LAMPORTS, + ), + ( + ctx.accounts.pubkey(NamedAccount::SharedB), + DURING_SHARED_B_AIRDROP_LAMPORTS, + ), + ]) + .await?; + let [_during_restart_a_sig, during_shared_b_sig]: [String; 2] = + sigs.try_into().expect("expected two airdrop signatures"); + + assert_logs_unchanged(&parked_logs)?; + + // SharedB remains subscribed on service two while service one is down; + // lamports are the cumulative balance (6_666_666 + 8_888_888), not just the second airdrop. + let during_restart = CheckpointSpec { + name: "during-restart", + checkpoints: vec![ + empty_checkpoints(10..15), + repeated_checkpoint( + 15..20, + expected_update( + ctx.accounts.pubkey_b58(NamedAccount::SharedB), + DURING_SHARED_B_EXPECTED_BALANCE, + during_shared_b_sig, + ), + ), + ] + .into_iter() + .flatten() + .collect(), + }; + ctx.checkpoint_runner + .wait_until_satisfied(&during_restart, active_clients) + .await?; + + *service_one = Some( + ctx.service_controller + .start(spec_one, &ctx.artifacts) + .await?, + ); + connect_service_one_clients( + &ctx.accounts, + active_clients, + &spec_one.endpoint, + ) + .await?; + + let sigs = ctx + .validator + .airdrops(vec![ + ( + ctx.accounts.pubkey(NamedAccount::RestartA), + POST_RESTART_A_AIRDROP_LAMPORTS, + ), + ( + ctx.accounts.pubkey(NamedAccount::SharedB), + POST_SHARED_B_AIRDROP_LAMPORTS, + ), + ]) + .await?; + let [post_restart_a_sig, post_shared_b_sig]: [String; 2] = + sigs.try_into().expect("expected two airdrop signatures"); + + // Reconnected live updates still report full balances: RestartA includes all + // three RestartA airdrops and SharedB includes all three SharedB airdrops. + let post_restart = CheckpointSpec { + name: "post-restart", + checkpoints: vec![ + repeated_checkpoint( + 0..5, + expected_update( + ctx.accounts.pubkey_b58(NamedAccount::RestartA), + POST_RESTART_A_EXPECTED_BALANCE, + post_restart_a_sig, + ), + ), + repeated_checkpoint( + 5..10, + expected_update( + ctx.accounts.pubkey_b58(NamedAccount::SharedB), + POST_SHARED_B_EXPECTED_BALANCE, + post_shared_b_sig.clone(), + ), + ), + empty_checkpoints(10..15), + repeated_checkpoint( + 15..20, + expected_update( + ctx.accounts.pubkey_b58(NamedAccount::SharedB), + POST_SHARED_B_EXPECTED_BALANCE, + post_shared_b_sig, + ), + ), + ] + .into_iter() + .flatten() + .collect(), + }; + ctx.checkpoint_runner + .wait_until_satisfied(&post_restart, active_clients) + .await +} + +async fn connect_service_one_clients( + accounts: &ScenarioAccounts, + active_clients: &mut Vec, + endpoint: &str, +) -> anyhow::Result<()> { + for id in 0..10 { + let client = TestGrpcClient::connect( + id, + ServiceInstance::One, + endpoint.to_owned(), + ) + .await + .with_context(|| { + format!("failed to connect service-one client {id}") + })?; + let subscription = if id < 5 { + vec![NamedAccount::RestartA] + } else { + vec![NamedAccount::SharedB] + }; + let pubkeys = subscription + .into_iter() + .map(|account| accounts.pubkey_b58(account)) + .collect::>(); + client.replace_subscription(&pubkeys).await?; + upsert_client(active_clients, client); + } + Ok(()) +} + +async fn connect_service_two_clients( + accounts: &ScenarioAccounts, + active_clients: &mut Vec, + endpoint: &str, +) -> anyhow::Result<()> { + for id in 10..20 { + let client = TestGrpcClient::connect( + id, + ServiceInstance::Two, + endpoint.to_owned(), + ) + .await + .with_context(|| { + format!("failed to connect service-two client {id}") + })?; + let subscription = if id < 15 { + vec![NamedAccount::RestartB] + } else { + vec![NamedAccount::SharedB] + }; + let pubkeys = subscription + .into_iter() + .map(|account| accounts.pubkey_b58(account)) + .collect::>(); + client.replace_subscription(&pubkeys).await?; + upsert_client(active_clients, client); + } + Ok(()) +} + +fn upsert_client( + active_clients: &mut Vec, + client: TestGrpcClient, +) { + let client_id = client.id; + if let Some(position) = + active_clients.iter().position(|c| c.id == client_id) + { + active_clients[position] = client; + } else { + active_clients.push(client); + } +} + +async fn shutdown_service_one_clients( + active_clients: &mut Vec, +) -> anyhow::Result> { + let mut parked = Vec::new(); + let mut remaining = Vec::new(); + + for client in active_clients.drain(..) { + if client.service == ServiceInstance::One { + let log = client.log().clone(); + let len = log.len(); + parked.push(ParkedClientLog { + client_id: client.id, + log, + len, + }); + client.shutdown().await?; + } else { + remaining.push(client); + } + } + + *active_clients = remaining; + Ok(parked) +} + +fn assert_logs_unchanged( + parked_logs: &[ParkedClientLog], +) -> anyhow::Result<()> { + for parked in parked_logs { + if parked.log.len() != parked.len { + bail!( + "service-one client {} received updates after shutdown", + parked.client_id + ); + } + } + Ok(()) +} + +fn expected_update( + pubkey_b58: String, + lamports: u64, + txn_signature_b58: String, +) -> ExpectedUpdate { + ExpectedUpdate { + pubkey_b58: Some(pubkey_b58), + lamports: Some(lamports), + txn_signature_b58: Some(Some(txn_signature_b58)), + ..Default::default() + } +} + +fn repeated_checkpoint( + range: std::ops::Range, + expected: ExpectedUpdate, +) -> Vec { + range + .map(|client_id| ClientCheckpoint { + client_id, + required: vec![expected.clone()], + }) + .collect() +} + +fn empty_checkpoints(range: std::ops::Range) -> Vec { + range + .map(|client_id| ClientCheckpoint { + client_id, + required: Vec::new(), + }) + .collect() +} + +async fn shutdown_clients(clients: Vec) -> anyhow::Result<()> { + for client in clients { + client.shutdown().await?; + } + Ok(()) +} + +async fn shutdown_service( + controller: &crate::service::ServiceController, + service: &mut Option, +) -> anyhow::Result<()> { + if let Some(service) = service.take() { + controller.shutdown(service).await?; + } + Ok(()) +} + +struct ParkedClientLog { + client_id: usize, + log: ClientLog, + len: usize, +} + +fn scenario_failure_without_clients(error: anyhow::Error) -> ScenarioFailure { + ScenarioFailure { + error, + clients: Vec::new(), + } +} diff --git a/ix-tests/src/scenarios/mod.rs b/ix-tests/src/scenarios/mod.rs new file mode 100644 index 0000000..638f23a --- /dev/null +++ b/ix-tests/src/scenarios/mod.rs @@ -0,0 +1,33 @@ +mod dual_concurrent; +mod dual_restart; +mod single_basic; +mod single_load; +mod single_triage; + +use anyhow::anyhow; + +use crate::client::TestGrpcClient; +use crate::context::ScenarioContext; +use crate::scenario::ScenarioName; + +pub struct ScenarioFailure { + pub error: anyhow::Error, + pub clients: Vec, +} + +pub async fn run_scenario( + name: ScenarioName, + ctx: &ScenarioContext, +) -> Result<(), ScenarioFailure> { + match name { + ScenarioName::SingleTriage => single_triage::run(ctx).await, + ScenarioName::SingleBasic => single_basic::run(ctx).await, + ScenarioName::SingleLoad => single_load::run(ctx).await, + ScenarioName::DualConcurrent => dual_concurrent::run(ctx).await, + ScenarioName::DualRestart => dual_restart::run(ctx).await, + ScenarioName::All => Err(ScenarioFailure { + error: anyhow!("scenario dispatch does not accept 'all'"), + clients: Vec::new(), + }), + } +} diff --git a/ix-tests/src/scenarios/single_basic.rs b/ix-tests/src/scenarios/single_basic.rs new file mode 100644 index 0000000..eee12f5 --- /dev/null +++ b/ix-tests/src/scenarios/single_basic.rs @@ -0,0 +1,240 @@ +use anyhow::Context; +use solana_pubkey::Pubkey; +use tracing::debug; + +use crate::accounts::NamedAccount; +use crate::client::TestGrpcClient; +use crate::context::ScenarioContext; +use crate::expectation::{CheckpointSpec, ClientCheckpoint, ExpectedUpdate}; +use crate::layout::ServiceInstance; +use crate::scenarios::ScenarioFailure; +use crate::service::{ServiceHandle, ServiceSpec}; + +const OWNER_DATA_SPACE: u64 = 64; +const SYNTHETIC_OWNER_BYTES: [u8; 32] = [ + 0x31, 0x22, 0x13, 0x04, 0xF5, 0xE6, 0xD7, 0xC8, 0xB9, 0xAA, 0x9B, 0x8C, + 0x7D, 0x6E, 0x5F, 0x40, 0x11, 0x32, 0x53, 0x74, 0x95, 0xB6, 0xD7, 0xF8, + 0x18, 0x29, 0x3A, 0x4B, 0x5C, 0x6D, 0x7E, 0x8F, +]; + +pub async fn run(ctx: &ScenarioContext) -> Result<(), ScenarioFailure> { + let spec = ServiceSpec::for_instance(ServiceInstance::One); + let mut service = Some( + ctx.service_controller + .start(&spec, &ctx.artifacts) + .await + .map_err(scenario_failure_without_clients)?, + ); + let mut clients = Vec::new(); + + let result = run_inner(ctx, &spec.endpoint, &mut clients).await; + if let Err(error) = result { + return Err(ScenarioFailure { error, clients }); + } + + shutdown_clients(clients) + .await + .map_err(scenario_failure_without_clients)?; + shutdown_service(&ctx.service_controller, &mut service) + .await + .map_err(scenario_failure_without_clients)?; + Ok(()) +} + +async fn run_inner( + ctx: &ScenarioContext, + endpoint: &str, + clients: &mut Vec, +) -> anyhow::Result<()> { + for id in 0..4 { + let client = TestGrpcClient::connect( + id, + ServiceInstance::One, + endpoint.to_owned(), + ) + .await + .with_context(|| format!("failed to connect client {id}"))?; + clients.push(client); + } + + let simple_a = ctx.accounts.pubkey(NamedAccount::SimpleA); + debug!("Client 0 subscribing to SimpleA: {simple_a}"); + clients[0] + .replace_subscription(&[simple_a.to_string()]) + .await?; + + let simple_b = ctx.accounts.pubkey(NamedAccount::SimpleB); + debug!("Client 1 subscribing to SimpleB: {simple_b}"); + clients[1] + .replace_subscription(&[simple_b.to_string()]) + .await?; + + let simple_c = ctx.accounts.pubkey(NamedAccount::SimpleC); + debug!("Client 2 subscribing to SimpleC: {simple_c}"); + clients[2] + .replace_subscription(&[simple_c.to_string()]) + .await?; + + let owner_data = ctx.accounts.pubkey(NamedAccount::OwnerData); + debug!("Client 3 subscribing to OwnerData: {owner_data}"); + clients[3] + .replace_subscription(&[owner_data.to_string()]) + .await?; + + // Right after we made the subscriptions we expect to get an _empty_ account update for + // each account + let empty_checkpoint = CheckpointSpec { + name: "initial-empty-accounts", + checkpoints: vec![ + lamport_client_checkpoint(0, simple_a.to_string(), 0, None), + lamport_client_checkpoint(1, simple_b.to_string(), 0, None), + lamport_client_checkpoint(2, simple_c.to_string(), 0, None), + lamport_client_checkpoint(3, owner_data.to_string(), 0, None), + ], + }; + ctx.checkpoint_runner + .wait_until_satisfied(&empty_checkpoint, clients) + .await?; + + debug!("✅ initial empty accounts"); + + // Then we airdrop some lamports to each account and expect to see the updates with the + // correct lamports and signatures + ctx.validator.fund_payer().await?; + + let sigs = ctx + .validator + .airdrops(vec![ + (simple_a, 1_000_000), + (simple_b, 2_000_000), + (simple_c, 3_000_000), + ]) + .await?; + let [simple_a_sig, simple_b_sig, simple_c_sig]: [String; 3] = + sigs.try_into().expect("expected three airdrop signatures"); + + let basic_checkpoint = CheckpointSpec { + name: "basic-lamports", + checkpoints: vec![ + lamport_client_checkpoint( + 0, + simple_a.to_string(), + 1_000_000, + Some(simple_a_sig), + ), + lamport_client_checkpoint( + 1, + simple_b.to_string(), + 2_000_000, + Some(simple_b_sig), + ), + lamport_client_checkpoint( + 2, + simple_c.to_string(), + 3_000_000, + Some(simple_c_sig), + ), + ], + }; + ctx.checkpoint_runner + .wait_until_satisfied(&basic_checkpoint, clients) + .await?; + + debug!("✅ basic lamports updates"); + + let rent_lamports = + ctx.validator.rent_exempt_balance(OWNER_DATA_SPACE).await?; + let owner_data_airdrop_sig = + ctx.validator.airdrop(&owner_data, rent_lamports).await?; + + let owner_data_funding_checkpoint = CheckpointSpec { + name: "owner-data-funding", + checkpoints: vec![ClientCheckpoint { + client_id: 3, + required: vec![ExpectedUpdate { + pubkey_b58: Some(owner_data.to_string()), + lamports: Some(rent_lamports), + txn_signature_b58: Some(Some(owner_data_airdrop_sig)), + ..Default::default() + }], + }], + }; + ctx.checkpoint_runner + .wait_until_satisfied(&owner_data_funding_checkpoint, clients) + .await?; + + let synthetic_owner = Pubkey::new_from_array(SYNTHETIC_OWNER_BYTES); + let owner_data_sig = ctx + .validator + .allocate_and_assign( + ctx.accounts.keypair(NamedAccount::OwnerData), + OWNER_DATA_SPACE, + synthetic_owner, + ) + .await?; + + let owner_data_expected = ExpectedUpdate { + pubkey_b58: Some(owner_data.to_string()), + owner_b58: Some(synthetic_owner.to_string()), + lamports: Some(rent_lamports), + txn_signature_b58: Some(Some(owner_data_sig)), + data: None, + ..Default::default() + }; + let owner_data_checkpoint = CheckpointSpec { + name: "owner-data-change", + checkpoints: vec![ClientCheckpoint { + client_id: 3, + required: vec![owner_data_expected], + }], + }; + ctx.checkpoint_runner + .wait_until_satisfied(&owner_data_checkpoint, clients) + .await?; + + debug!("✅ owner and data updates"); + + Ok(()) +} + +fn lamport_client_checkpoint( + client_id: usize, + pubkey_b58: String, + lamports: u64, + txn_signature_b58: Option, +) -> ClientCheckpoint { + let expected = ExpectedUpdate { + pubkey_b58: Some(pubkey_b58), + lamports: Some(lamports), + txn_signature_b58: Some(txn_signature_b58), + ..Default::default() + }; + ClientCheckpoint { + client_id, + required: vec![expected], + } +} + +async fn shutdown_clients(clients: Vec) -> anyhow::Result<()> { + for client in clients { + client.shutdown().await?; + } + Ok(()) +} + +async fn shutdown_service( + controller: &crate::service::ServiceController, + service: &mut Option, +) -> anyhow::Result<()> { + if let Some(service) = service.take() { + controller.shutdown(service).await?; + } + Ok(()) +} + +fn scenario_failure_without_clients(error: anyhow::Error) -> ScenarioFailure { + ScenarioFailure { + error, + clients: Vec::new(), + } +} diff --git a/ix-tests/src/scenarios/single_load.rs b/ix-tests/src/scenarios/single_load.rs new file mode 100644 index 0000000..31c2022 --- /dev/null +++ b/ix-tests/src/scenarios/single_load.rs @@ -0,0 +1,144 @@ +use anyhow::Context; + +use crate::accounts::NamedAccount; +use crate::client::TestGrpcClient; +use crate::context::ScenarioContext; +use crate::expectation::{CheckpointSpec, ClientCheckpoint, ExpectedUpdate}; +use crate::layout::ServiceInstance; +use crate::scenarios::ScenarioFailure; +use crate::service::{ServiceHandle, ServiceSpec}; + +pub async fn run(ctx: &ScenarioContext) -> Result<(), ScenarioFailure> { + let spec = ServiceSpec::for_instance(ServiceInstance::One); + let mut service = Some( + ctx.service_controller + .start(&spec, &ctx.artifacts) + .await + .map_err(scenario_failure_without_clients)?, + ); + let mut clients = Vec::new(); + + let outcome = run_inner(ctx, &spec.endpoint, &mut clients).await; + if let Err(error) = outcome { + return Err(ScenarioFailure { error, clients }); + } + + shutdown_clients(clients) + .await + .map_err(scenario_failure_without_clients)?; + shutdown_service(&ctx.service_controller, &mut service) + .await + .map_err(scenario_failure_without_clients)?; + Ok(()) +} + +async fn run_inner( + ctx: &ScenarioContext, + endpoint: &str, + clients: &mut Vec, +) -> anyhow::Result<()> { + let shared_a = ctx.accounts.pubkey_b58(NamedAccount::SharedA); + let shared_b = ctx.accounts.pubkey_b58(NamedAccount::SharedB); + + for id in 0..100 { + let client = TestGrpcClient::connect( + id, + ServiceInstance::One, + endpoint.to_owned(), + ) + .await + .with_context(|| format!("failed to connect client {id}"))?; + client + .replace_subscription(&[shared_a.clone(), shared_b.clone()]) + .await + .with_context(|| { + format!("failed to set subscriptions for client {id}") + })?; + clients.push(client); + } + + ctx.validator.fund_payer().await?; + + let rent_exempt_lamports = ctx.validator.rent_exempt_balance(0).await?; + + let airdrop_specs = (1..=25u64) + .map(|index| { + let (account, lamports) = if index % 2 == 1 { + (NamedAccount::SharedA, rent_exempt_lamports + 10_000 + index) + } else { + (NamedAccount::SharedB, rent_exempt_lamports + 20_000 + index) + }; + (account, ctx.accounts.pubkey(account), lamports) + }) + .collect::>(); + let airdrop_requests = airdrop_specs + .iter() + .map(|(_, pubkey, lamports)| (*pubkey, *lamports)) + .collect(); + ctx.validator.airdrops(airdrop_requests).await?; + + let (shared_a_balance, shared_b_balance) = airdrop_specs.iter().fold( + (0, 0), + |(shared_a_balance, shared_b_balance), (account, _pubkey, lamports)| { + match account { + NamedAccount::SharedA => { + (shared_a_balance + lamports, shared_b_balance) + } + NamedAccount::SharedB => { + (shared_a_balance, shared_b_balance + lamports) + } + _ => unreachable!("single-load only airdrops shared accounts"), + } + }, + ); + let expected_updates = vec![ + ExpectedUpdate { + pubkey_b58: Some(ctx.accounts.pubkey_b58(NamedAccount::SharedA)), + lamports: Some(shared_a_balance), + ..Default::default() + }, + ExpectedUpdate { + pubkey_b58: Some(ctx.accounts.pubkey_b58(NamedAccount::SharedB)), + lamports: Some(shared_b_balance), + ..Default::default() + }, + ]; + + let client_specs = (0..100) + .map(|client_id| ClientCheckpoint { + client_id, + required: expected_updates.clone(), + }) + .collect(); + let checkpoint = CheckpointSpec { + name: "single-load-fanout", + checkpoints: client_specs, + }; + ctx.checkpoint_runner + .wait_until_satisfied(&checkpoint, clients) + .await +} + +async fn shutdown_clients(clients: Vec) -> anyhow::Result<()> { + for client in clients { + client.shutdown().await?; + } + Ok(()) +} + +async fn shutdown_service( + controller: &crate::service::ServiceController, + service: &mut Option, +) -> anyhow::Result<()> { + if let Some(service) = service.take() { + controller.shutdown(service).await?; + } + Ok(()) +} + +fn scenario_failure_without_clients(error: anyhow::Error) -> ScenarioFailure { + ScenarioFailure { + error, + clients: Vec::new(), + } +} diff --git a/ix-tests/src/scenarios/single_triage.rs b/ix-tests/src/scenarios/single_triage.rs new file mode 100644 index 0000000..6069b69 --- /dev/null +++ b/ix-tests/src/scenarios/single_triage.rs @@ -0,0 +1,138 @@ +use anyhow::Context; +use solana_keypair::{Keypair, Signer}; +use tracing::info; + +use crate::client::TestGrpcClient; +use crate::context::ScenarioContext; +use crate::expectation::{CheckpointSpec, ClientCheckpoint, ExpectedUpdate}; +use crate::layout::ServiceInstance; +use crate::scenarios::ScenarioFailure; +use crate::service::{ServiceHandle, ServiceSpec}; + +pub async fn run(ctx: &ScenarioContext) -> Result<(), ScenarioFailure> { + let spec = ServiceSpec::for_instance(ServiceInstance::One); + let service = ctx + .service_controller + .start(&spec, &ctx.artifacts) + .await + .map_err(scenario_failure_without_clients)?; + + if service.is_external() { + info!( + endpoint = %service.endpoint, + "single-triage attached to already-running external grpc-service" + ); + } else { + info!( + endpoint = %service.endpoint, + "single-triage launched managed grpc-service" + ); + } + + let mut service = Some(service); + let mut clients = Vec::new(); + + let outcome = run_inner(ctx, &spec.endpoint, &mut clients).await; + if let Err(error) = outcome { + return Err(ScenarioFailure { error, clients }); + } + + shutdown_clients(clients) + .await + .map_err(scenario_failure_without_clients)?; + shutdown_service(&ctx.service_controller, &mut service) + .await + .map_err(scenario_failure_without_clients)?; + Ok(()) +} + +async fn run_inner( + ctx: &ScenarioContext, + endpoint: &str, + clients: &mut Vec, +) -> anyhow::Result<()> { + info!(endpoint = %endpoint, "single-triage targeting endpoint"); + + let client = + TestGrpcClient::connect(0, ServiceInstance::One, endpoint.to_owned()) + .await + .with_context(|| "failed to connect client 0")?; + clients.push(client); + + let random_pubkey = Keypair::new().pubkey(); + info!(pubkey = %random_pubkey, "single-triage generated random pubkey"); + clients[0] + .replace_subscription(&[random_pubkey.to_string()]) + .await?; + + let bootstrap_checkpoint = CheckpointSpec { + name: "single-triage-bootstrap", + checkpoints: vec![ClientCheckpoint { + client_id: 0, + required: vec![ExpectedUpdate { + pubkey_b58: Some(random_pubkey.to_string()), + lamports: Some(0), + txn_signature_b58: Some(None), + ..Default::default() + }], + }], + }; + ctx.checkpoint_runner + .wait_until_satisfied(&bootstrap_checkpoint, clients) + .await?; + info!( + pubkey = %random_pubkey, + "single-triage bootstrap lamports=0 checkpoint passed" + ); + + ctx.validator.fund_payer().await?; + + let airdrop_signature = + ctx.validator.airdrop(&random_pubkey, 1_000_000).await?; + + let airdrop_checkpoint = CheckpointSpec { + name: "single-triage-airdrop", + checkpoints: vec![ClientCheckpoint { + client_id: 0, + required: vec![ExpectedUpdate { + pubkey_b58: Some(random_pubkey.to_string()), + lamports: Some(1_000_000), + txn_signature_b58: Some(Some(airdrop_signature)), + ..Default::default() + }], + }], + }; + ctx.checkpoint_runner + .wait_until_satisfied(&airdrop_checkpoint, clients) + .await?; + info!( + pubkey = %random_pubkey, + "single-triage post-airdrop checkpoint passed" + ); + + Ok(()) +} + +async fn shutdown_clients(clients: Vec) -> anyhow::Result<()> { + for client in clients { + client.shutdown().await?; + } + Ok(()) +} + +async fn shutdown_service( + controller: &crate::service::ServiceController, + service: &mut Option, +) -> anyhow::Result<()> { + if let Some(service) = service.take() { + controller.shutdown(service).await?; + } + Ok(()) +} + +fn scenario_failure_without_clients(error: anyhow::Error) -> ScenarioFailure { + ScenarioFailure { + error, + clients: Vec::new(), + } +} diff --git a/ix-tests/src/service.rs b/ix-tests/src/service.rs new file mode 100644 index 0000000..a44da80 --- /dev/null +++ b/ix-tests/src/service.rs @@ -0,0 +1,487 @@ +use std::net::SocketAddr; +use std::path::{Path, PathBuf}; + +use serde::Deserialize; +use std::time::Duration; + +use anyhow::{Context, bail}; +use helius_laserstream::grpc::PingRequest; +use helius_laserstream::grpc::geyser_client::GeyserClient; +use nix::sys::signal::{self, Signal}; +use nix::unistd::Pid; +use tokio::process::Command; +use tracing::{debug, info, warn}; + +use crate::artifacts::RunArtifacts; +use crate::config::SuiteConfig; +use crate::layout::ServiceInstance; + +const GRACEFUL_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5); +const DEFAULT_SERVICE_GRPC_PORT: u16 = 50051; + +/// Describes whether a service was started by this harness or supplied externally. +#[allow(dead_code)] +pub enum ServiceOwnership { + /// Child process spawned by [`ServiceController::start`]. + /// + /// The process is spawned with `.kill_on_drop(true)` and stored in + /// [`ServiceHandle`] as `ServiceOwnership::Owned(child)`. Dropping the + /// handle without passing it to [`ServiceController::shutdown`] therefore + /// kills the child immediately instead of performing the graceful SIGTERM + /// shutdown path. + Owned(tokio::process::Child), + External, +} + +/// Handle that keeps a service process alive for the lifetime of a test. +/// +/// Callers must retain this handle until they are ready to terminate the +/// service. For owned services, call [`ServiceController::shutdown`] with the +/// handle to request graceful termination; otherwise the contained child was +/// created with `.kill_on_drop(true)` and will be killed when the handle (and +/// its `ServiceOwnership::Owned(child)`) is dropped. +#[allow(dead_code)] +pub struct ServiceHandle { + pub instance: ServiceInstance, + pub endpoint: String, + pub ownership: ServiceOwnership, +} + +#[allow(dead_code)] +impl ServiceHandle { + pub fn is_owned(&self) -> bool { + matches!(self.ownership, ServiceOwnership::Owned(_)) + } + + pub fn is_external(&self) -> bool { + matches!(self.ownership, ServiceOwnership::External) + } +} + +pub struct ServiceController { + service_binary: PathBuf, + service_start_timeout: Duration, +} + +#[derive(Debug, Deserialize)] +struct FileServiceConfig { + #[serde(default)] + grpc: Option, +} + +#[derive(Debug, Deserialize)] +struct FileServiceGrpcConfig { + #[serde(default)] + port: Option, +} + +pub struct ServiceSpec { + pub instance: ServiceInstance, + pub config_path: PathBuf, + pub endpoint: String, +} + +impl ServiceSpec { + pub fn for_instance(instance: ServiceInstance) -> Self { + match instance { + ServiceInstance::One => Self { + instance, + config_path: PathBuf::from( + "ix-tests/configs/grpc-service/service-1.toml", + ), + endpoint: "http://127.0.0.1:50051".to_owned(), + }, + ServiceInstance::Two => Self { + instance, + config_path: PathBuf::from( + "ix-tests/configs/grpc-service/service-2.toml", + ), + endpoint: "http://127.0.0.1:50052".to_owned(), + }, + } + } +} + +impl ServiceController { + fn endpoint_port(endpoint: &str) -> anyhow::Result { + let socket_addr = endpoint + .strip_prefix("http://") + .ok_or_else(|| { + anyhow::anyhow!("unsupported endpoint format: {endpoint}") + })? + .parse::() + .with_context(|| { + format!("failed to parse endpoint as host:port: {endpoint}") + })?; + Ok(socket_addr.port()) + } + + fn configured_grpc_port(config_path: &Path) -> anyhow::Result { + let config_text = + std::fs::read_to_string(config_path).with_context(|| { + format!( + "failed to read service config: {}", + config_path.display() + ) + })?; + let file: FileServiceConfig = toml::from_str(&config_text) + .with_context(|| { + format!( + "failed to parse service config: {}", + config_path.display() + ) + })?; + Ok(file + .grpc + .and_then(|grpc| grpc.port) + .unwrap_or(DEFAULT_SERVICE_GRPC_PORT)) + } + + fn validate_spec_matches_config( + &self, + spec: &ServiceSpec, + ) -> anyhow::Result<()> { + let endpoint_port = Self::endpoint_port(&spec.endpoint)?; + let config_port = Self::configured_grpc_port(&spec.config_path)?; + if endpoint_port != config_port { + bail!( + "refusing to continue for {:?}: probe endpoint {} uses port {}, but template {} binds port {}; probe/reuse and spawned-service bind port would diverge", + spec.instance, + spec.endpoint, + endpoint_port, + spec.config_path.display(), + config_port, + ); + } + Ok(()) + } + + pub fn new(config: &SuiteConfig) -> Self { + Self { + service_binary: config.service_binary.clone(), + service_start_timeout: Duration::from_millis( + config.service_start_timeout_ms, + ), + } + } + + fn write_generated_config( + &self, + spec: &ServiceSpec, + artifacts: &RunArtifacts, + ) -> anyhow::Result { + let base_group_id = Self::base_group_id(spec.instance); + let run_scoped_group_id = + Self::run_scoped_group_id(spec.instance, artifacts); + let base_group_id_line = format!("group_id = \"{base_group_id}\""); + let generated_group_id_line = + format!("group_id = \"{run_scoped_group_id}\""); + let config_text = std::fs::read_to_string(&spec.config_path) + .with_context(|| { + format!( + "failed to read service config template: {}", + spec.config_path.display() + ) + })?; + if config_text.matches(&base_group_id_line).count() != 1 { + bail!( + "expected exactly one `{}` entry in {}", + base_group_id_line, + spec.config_path.display() + ); + } + let generated_config_text = + config_text.replace(&base_group_id_line, &generated_group_id_line); + let generated_config_path = + artifacts.generated_service_config_path(spec.instance); + std::fs::write(&generated_config_path, generated_config_text) + .with_context(|| { + format!( + "failed to write generated service config: {}", + generated_config_path.display() + ) + })?; + + Ok(generated_config_path) + } + + fn base_group_id(instance: ServiceInstance) -> &'static str { + match instance { + ServiceInstance::One => "ix-tests-service-1", + ServiceInstance::Two => "ix-tests-service-2", + } + } + + fn run_scoped_group_id( + instance: ServiceInstance, + artifacts: &RunArtifacts, + ) -> String { + format!("{}-{}", Self::base_group_id(instance), artifacts.run_id()) + } + + pub async fn start( + &self, + spec: &ServiceSpec, + artifacts: &RunArtifacts, + ) -> anyhow::Result { + self.validate_spec_matches_config(spec)?; + + if self.probe_ready(&spec.endpoint).await { + info!( + endpoint = %spec.endpoint, + "harness is reusing an already-running external grpc-service" + ); + return Ok(ServiceHandle { + instance: spec.instance, + endpoint: spec.endpoint.clone(), + ownership: ServiceOwnership::External, + }); + } + + let generated_config_path = + self.write_generated_config(spec, artifacts)?; + let log_paths = artifacts.service_logs(spec.instance); + + let stdout_file = std::fs::File::create(&log_paths.stdout) + .with_context(|| { + format!( + "failed to create stdout log: {}", + log_paths.stdout.display() + ) + })?; + let stderr_file = std::fs::File::create(&log_paths.stderr) + .with_context(|| { + format!( + "failed to create stderr log: {}", + log_paths.stderr.display() + ) + })?; + + let run_scoped_group_id = + Self::run_scoped_group_id(spec.instance, artifacts); + + info!( + binary = %self.service_binary.display(), + config = %generated_config_path.display(), + endpoint = %spec.endpoint, + group_id = %run_scoped_group_id, + "starting grpc-service" + ); + + let child = Command::new(&self.service_binary) + .arg("--config") + .arg(&generated_config_path) + .stdout(stdout_file) + .stderr(stderr_file) + .kill_on_drop(true) + .spawn() + .with_context(|| { + format!( + "failed to spawn service binary: {}", + self.service_binary.display() + ) + })?; + + let handle = ServiceHandle { + instance: spec.instance, + endpoint: spec.endpoint.clone(), + ownership: ServiceOwnership::Owned(child), + }; + + self.wait_until_ready(&spec.endpoint, &log_paths).await?; + + Ok(handle) + } + + pub async fn shutdown(&self, service: ServiceHandle) -> anyhow::Result<()> { + match service.ownership { + ServiceOwnership::Owned(mut child) => { + let pid = child.id(); + + if let Some(pid) = pid { + match signal::kill( + Pid::from_raw(pid as i32), + Signal::SIGTERM, + ) { + Ok(()) => { + info!( + endpoint = %service.endpoint, + pid, + "graceful shutdown requested for grpc-service" + ); + + match tokio::time::timeout( + GRACEFUL_SHUTDOWN_TIMEOUT, + child.wait(), + ) + .await + { + Ok(wait_result) => { + let status = wait_result.context( + "failed to wait for child after SIGTERM", + )?; + info!( + endpoint = %service.endpoint, + pid, + status = %status, + "grpc-service shut down gracefully" + ); + return Ok(()); + } + Err(_) => { + warn!( + endpoint = %service.endpoint, + pid, + timeout = ?GRACEFUL_SHUTDOWN_TIMEOUT, + "graceful shutdown timed out; forcing grpc-service kill" + ); + } + } + } + Err(err) => { + warn!( + endpoint = %service.endpoint, + pid, + error = %err, + "failed to request graceful shutdown; forcing grpc-service kill" + ); + } + } + } + + child + .start_kill() + .context("failed to send forced kill to grpc-service")?; + let status = child + .wait() + .await + .context("failed to wait for child after forced kill")?; + warn!( + endpoint = %service.endpoint, + pid = pid.unwrap_or_default(), + status = %status, + "grpc-service shutdown was forced" + ); + } + ServiceOwnership::External => { + info!( + endpoint = %service.endpoint, + "external grpc-service was left running intentionally" + ); + } + } + Ok(()) + } + + async fn probe_ready(&self, endpoint: &str) -> bool { + let Ok(mut client) = GeyserClient::connect(endpoint.to_owned()).await + else { + return false; + }; + + client.ping(PingRequest { count: 1 }).await.is_ok() + } + + async fn wait_until_ready( + &self, + endpoint: &str, + log_paths: &crate::artifacts::ServiceLogPaths, + ) -> anyhow::Result<()> { + let deadline = tokio::time::Instant::now() + self.service_start_timeout; + let mut announced_waiting = false; + + enum ReadinessProbeResult { + Ready, + ConnectError(tonic::transport::Error), + PingError(tonic::Status), + } + + loop { + let attempt_timeout = + deadline.saturating_duration_since(tokio::time::Instant::now()); + let probe = async { + match GeyserClient::connect(endpoint.to_owned()).await { + Ok(mut client) => { + match client.ping(PingRequest { count: 1 }).await { + Ok(_) => ReadinessProbeResult::Ready, + Err(err) => ReadinessProbeResult::PingError(err), + } + } + Err(err) => ReadinessProbeResult::ConnectError(err), + } + }; + + match tokio::time::timeout(attempt_timeout, probe).await { + Ok(ReadinessProbeResult::Ready) => { + info!(endpoint, "grpc-service is ready"); + return Ok(()); + } + Ok(ReadinessProbeResult::PingError(err)) + if err.code() == tonic::Code::Unavailable => + { + if !announced_waiting { + info!( + endpoint, + message = %err.message(), + "grpc-service listening but not yet ready; waiting for startup preflight" + ); + announced_waiting = true; + } else { + debug!( + endpoint, + message = %err.message(), + "grpc-service still preflight-pending" + ); + } + } + Ok(ReadinessProbeResult::PingError(err)) => { + warn!(endpoint, "ping returned non-readiness error: {err}"); + } + Ok(ReadinessProbeResult::ConnectError(err)) => { + debug!( + endpoint, + "grpc-service not yet accepting connections: {err}" + ); + } + Err(_) => { + if !announced_waiting { + info!( + endpoint, + timeout = ?attempt_timeout, + "grpc-service readiness probe timed out; waiting for startup" + ); + announced_waiting = true; + } else { + debug!( + endpoint, + timeout = ?attempt_timeout, + "grpc-service readiness probe still timing out" + ); + } + } + } + + if tokio::time::Instant::now() >= deadline { + if let Err(err) = RunArtifacts::dump_service_logs_at(log_paths) + { + warn!( + endpoint, + error = %err, + "failed to dump service logs after grpc-service readiness timeout" + ); + } + bail!( + "grpc-service at {} did not become ready within {:?}\n\ + stdout: {}\n\ + stderr: {}", + endpoint, + self.service_start_timeout, + log_paths.stdout.display(), + log_paths.stderr.display(), + ); + } + + tokio::time::sleep(Duration::from_millis(200)).await; + } + } +} diff --git a/ix-tests/src/validator.rs b/ix-tests/src/validator.rs new file mode 100644 index 0000000..4f72a74 --- /dev/null +++ b/ix-tests/src/validator.rs @@ -0,0 +1,205 @@ +use std::time::Duration; + +use anyhow::Context; +use futures::future::try_join_all; +use solana_keypair::Keypair; +use solana_pubkey::Pubkey; +use solana_rpc_client::{ + api::config::CommitmentConfig, nonblocking::rpc_client::RpcClient, +}; +use solana_signer::Signer; +use solana_system_interface::instruction as system_instruction; +use solana_transaction::Transaction; +use tracing::info; + +use crate::config::SuiteConfig; + +pub struct ValidatorDriver { + rpc: RpcClient, + payer: Keypair, + transaction_timeout: Duration, +} + +impl ValidatorDriver { + pub fn new(config: &SuiteConfig) -> Self { + let rpc = RpcClient::new_with_commitment( + config.validator_rpc_url.clone(), + CommitmentConfig::confirmed(), + ); + let payer = Keypair::new(); + let transaction_timeout = + Duration::from_millis(config.transaction_timeout_ms); + Self { + rpc, + payer, + transaction_timeout, + } + } + + pub async fn fund_payer(&self) -> anyhow::Result<()> { + let lamports = 10_000_000_000; // 10 SOL + let sig = self + .rpc + .request_airdrop(&self.payer.pubkey(), lamports) + .await + .context("fund_payer: request_airdrop failed")?; + self.confirm_signature(&sig).await?; + info!( + payer = %self.payer.pubkey(), + lamports, + %sig, + "funded payer" + ); + Ok(()) + } + + pub async fn airdrop( + &self, + pubkey: &Pubkey, + lamports: u64, + ) -> anyhow::Result { + let sig = self + .rpc + .request_airdrop(pubkey, lamports) + .await + .with_context(|| { + format!("airdrop to {pubkey} of {lamports} failed") + })?; + self.confirm_signature(&sig).await?; + info!(%pubkey, lamports, %sig, "airdrop confirmed"); + Ok(sig.to_string()) + } + + pub async fn airdrops( + &self, + requests: Vec<(Pubkey, u64)>, + ) -> anyhow::Result> { + try_join_all( + requests + .iter() + .map(|(pubkey, lamports)| self.airdrop(pubkey, *lamports)), + ) + .await + } + + pub async fn transfer( + &self, + from: &Keypair, + to: &Pubkey, + lamports: u64, + ) -> anyhow::Result { + let ix = system_instruction::transfer(&from.pubkey(), to, lamports); + let blockhash = self + .rpc + .get_latest_blockhash() + .await + .context("transfer: get_latest_blockhash failed")?; + let mut tx = Transaction::new_with_payer(&[ix], Some(&from.pubkey())); + tx.sign(&[from], blockhash); + let sig = self + .rpc + .send_and_confirm_transaction(&tx) + .await + .with_context(|| { + format!( + "transfer {} lamports from {} to {} failed", + lamports, + from.pubkey(), + to + ) + })?; + info!(from = %from.pubkey(), %to, lamports, %sig, "transfer confirmed"); + Ok(sig.to_string()) + } + + pub async fn allocate_and_assign( + &self, + target: &Keypair, + space: u64, + new_owner: Pubkey, + ) -> anyhow::Result { + let alloc_ix = system_instruction::allocate(&target.pubkey(), space); + let assign_ix = + system_instruction::assign(&target.pubkey(), &new_owner); + let blockhash = self + .rpc + .get_latest_blockhash() + .await + .context("allocate_and_assign: get_latest_blockhash failed")?; + let mut tx = Transaction::new_with_payer( + &[alloc_ix, assign_ix], + Some(&self.payer.pubkey()), + ); + tx.sign(&[&self.payer, target], blockhash); + let sig = self + .rpc + .send_and_confirm_transaction(&tx) + .await + .with_context(|| { + format!( + "allocate_and_assign for {} (space={}, owner={}) failed", + target.pubkey(), + space, + new_owner + ) + })?; + info!( + target = %target.pubkey(), + space, + new_owner = %new_owner, + %sig, + "allocate_and_assign confirmed" + ); + Ok(sig.to_string()) + } + + pub async fn rent_exempt_balance(&self, space: u64) -> anyhow::Result { + self.rpc + .get_minimum_balance_for_rent_exemption(space as usize) + .await + .with_context(|| { + format!("failed to fetch rent-exempt balance for {space} bytes") + }) + } + + async fn confirm_signature( + &self, + sig: &solana_signature::Signature, + ) -> anyhow::Result<()> { + let deadline = tokio::time::Instant::now() + self.transaction_timeout; + let mut last_status_error = None; + loop { + match self + .rpc + .get_signature_status_with_commitment( + sig, + CommitmentConfig::confirmed(), + ) + .await + { + Ok(Some(Ok(()))) => return Ok(()), + Ok(Some(Err(err))) => { + anyhow::bail!("transaction {sig} failed: {err:?}"); + } + Ok(None) => {} + Err(err) => { + last_status_error = Some(err.to_string()); + } + } + + if tokio::time::Instant::now() >= deadline { + if let Some(error) = last_status_error { + anyhow::bail!( + "transaction {sig} not confirmed within {:?}; last status check error: {error}", + self.transaction_timeout + ); + } + anyhow::bail!( + "transaction {sig} not confirmed within {:?}", + self.transaction_timeout + ); + } + tokio::time::sleep(Duration::from_millis(200)).await; + } + } +} diff --git a/kafka-setup/Makefile b/kafka-setup/Makefile index 2c2bceb..4275266 100644 --- a/kafka-setup/Makefile +++ b/kafka-setup/Makefile @@ -1,6 +1,7 @@ +MAKEFILE_DIR=$(dir $(abspath $(lastword $(MAKEFILE_LIST)))) DC := $(shell if command -v docker-compose >/dev/null 2>&1; then echo docker-compose; else echo docker compose; fi) -.PHONY: help up down setup-stream create-table register-schema ready ui ui-down +.PHONY: help up down setup-stream create-table register-schema ready reset-state ui ui-down help: @echo "Available targets:" @@ -10,6 +11,7 @@ help: @echo " create-table - Create the accounts table (latest update per pubkey)" @echo " register-schema - Register the protobuf schema for Redpanda Console" @echo " ready - Start stack, setup stream/table, and register schema" + @echo " reset-state - Rebuild Kafka + ksqlDB state for an already-running stack" @echo " ui - Start Redpanda Console UI (http://localhost:8080)" @echo " ui-down - Stop Redpanda Console" @@ -20,16 +22,19 @@ down: $(DC) down -v register-schema: - sh/redpanda/01_register-proto-schema.sh + $(MAKEFILE_DIR)sh/redpanda/01_register-proto-schema.sh setup-stream: - sh/ksql/01_setup-streams.sh + $(MAKEFILE_DIR)sh/ksql/01_setup-streams.sh create-table: - sh/ksql/02_create-accounts-table.sh + $(MAKEFILE_DIR)sh/ksql/02_create-accounts-table.sh ready: up setup-stream create-table register-schema +reset-state: + $(MAKEFILE_DIR)sh/reset-state.sh + ui: $(DC) up -d redpanda-console diff --git a/kafka-setup/README.md b/kafka-setup/README.md index 257140a..e540e12 100644 --- a/kafka-setup/README.md +++ b/kafka-setup/README.md @@ -6,6 +6,16 @@ Available workflows: - `make up` - `make ready` +- `make reset-state` - `make down` - `make ui` - `make ui-down` + +`make reset-state` is the narrower option for an already-running +environment. It rebuilds the Kafka topic and the dependent ksqlDB +state without restarting Docker or re-running the broader `make ready` +workflow. + +The local compose stack sets `KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS=0` +to reduce Kafka consumer cold-start delay during development and +integration tests. diff --git a/kafka-setup/docker-compose.yml b/kafka-setup/docker-compose.yml index df9a5eb..78a2eb6 100644 --- a/kafka-setup/docker-compose.yml +++ b/kafka-setup/docker-compose.yml @@ -14,6 +14,7 @@ services: KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 networks: - ksqldb zookeeper: diff --git a/kafka-setup/sh/kafka/01_recreate-account-updates-topic.sh b/kafka-setup/sh/kafka/01_recreate-account-updates-topic.sh new file mode 100755 index 0000000..635f02e --- /dev/null +++ b/kafka-setup/sh/kafka/01_recreate-account-updates-topic.sh @@ -0,0 +1,61 @@ +#!/usr/bin/env bash +set -euo pipefail + +# Delete and recreate the Kafka source topic used by the account updates flow. + +TOPIC="${TOPIC:-solana.testnet.account_updates}" +PARTITIONS="${PARTITIONS:-1}" +REPLICATION_FACTOR="${REPLICATION_FACTOR:-1}" +BOOTSTRAP_SERVER="${BOOTSTRAP_SERVER:-kafka:9092}" + +if command -v docker-compose >/dev/null 2>&1; then + DC="docker-compose" +else + DC="docker compose" +fi + +echo "Using compose command: $DC" +echo "Checking Kafka readiness..." +for i in $(seq 1 60); do + if $DC exec -T kafka kafka-topics --bootstrap-server "$BOOTSTRAP_SERVER" --list >/dev/null 2>&1; then + break + fi + sleep 1 + if [[ $i -eq 60 ]]; then + echo "Kafka not ready after 60 seconds" >&2 + exit 1 + fi +done +echo "Kafka is ready." + +if $DC exec -T kafka kafka-topics --bootstrap-server "$BOOTSTRAP_SERVER" --list | grep -Fxq "$TOPIC"; then + echo "Deleting Kafka topic '$TOPIC'..." + $DC exec -T kafka kafka-topics \ + --bootstrap-server "$BOOTSTRAP_SERVER" \ + --delete \ + --topic "$TOPIC" + + for i in $(seq 1 60); do + if ! $DC exec -T kafka kafka-topics --bootstrap-server "$BOOTSTRAP_SERVER" --list | grep -Fxq "$TOPIC"; then + break + fi + sleep 1 + if [[ $i -eq 60 ]]; then + echo "Kafka topic '$TOPIC' was not deleted after 60 seconds" >&2 + exit 1 + fi + done +else + echo "Kafka topic '$TOPIC' does not exist; skipping delete." +fi + +echo "Recreating Kafka topic '$TOPIC'..." +$DC exec -T kafka kafka-topics \ + --bootstrap-server "$BOOTSTRAP_SERVER" \ + --create \ + --if-not-exists \ + --topic "$TOPIC" \ + --replication-factor "$REPLICATION_FACTOR" \ + --partitions "$PARTITIONS" + +echo "Done recreating Kafka topic '$TOPIC'." diff --git a/kafka-setup/sh/ksql/03_reset-accounts-state.sh b/kafka-setup/sh/ksql/03_reset-accounts-state.sh new file mode 100755 index 0000000..0cf329b --- /dev/null +++ b/kafka-setup/sh/ksql/03_reset-accounts-state.sh @@ -0,0 +1,61 @@ +#!/usr/bin/env bash +set -euo pipefail + +# Tear down the ksqlDB objects that derive account state so they can be rebuilt +# from a fresh source topic. + +STREAM="${STREAM:-account_updates_stream}" +TABLE="${TABLE:-accounts}" +STREAM_UPPER="$(printf '%s' "$STREAM" | tr '[:lower:]' '[:upper:]')" +TABLE_UPPER="$(printf '%s' "$TABLE" | tr '[:lower:]' '[:upper:]')" +KSQL_SERVER_URL="${KSQL_SERVER_URL:-http://ksqldb-server:8088}" + +if command -v docker-compose >/dev/null 2>&1; then + DC="docker-compose" +else + DC="docker compose" +fi + +echo "Using compose command: $DC" +echo "Waiting for ksqlDB server to be ready (via CLI)..." +for i in $(seq 1 60); do + if $DC run --rm ksqldb-cli ksql "${KSQL_SERVER_URL}" -e 'SHOW STREAMS;' >/dev/null 2>&1; then + break + fi + sleep 1 + if [[ $i -eq 60 ]]; then + echo "ksqlDB not ready after 60 seconds" >&2 + exit 1 + fi +done +echo "ksqlDB is ready." + +QUERIES_OUTPUT="$($DC run --rm ksqldb-cli ksql "${KSQL_SERVER_URL}" -e "SHOW QUERIES;")" +ACCOUNT_QUERIES="$(printf '%s\n' "$QUERIES_OUTPUT" | grep -Eo 'CTAS_[^[:space:]|;]*ACCOUNTS[^[:space:]|;]*' | sort -u || true)" +if [[ -n "$ACCOUNT_QUERIES" ]]; then + while IFS= read -r query_name; do + [[ -z "$query_name" ]] && continue + echo "Terminating persistent ACCOUNTS query '${query_name}'..." + $DC run --rm ksqldb-cli ksql "${KSQL_SERVER_URL}" -e "TERMINATE ${query_name};" + done <<< "$ACCOUNT_QUERIES" +else + echo "No persistent ACCOUNTS query found; skipping terminate." +fi + +sleep 2 + +TABLES_OUTPUT="$($DC run --rm ksqldb-cli ksql "${KSQL_SERVER_URL}" -e "SHOW TABLES;")" +if printf '%s\n' "$TABLES_OUTPUT" | awk -v NAME="$TABLE_UPPER" '$1 == NAME { found = 1 } END { exit !found }'; then + echo "Dropping table '${TABLE_UPPER}'..." + $DC run --rm ksqldb-cli ksql "${KSQL_SERVER_URL}" -e "DROP TABLE ${TABLE_UPPER} DELETE TOPIC;" +else + echo "Table '${TABLE_UPPER}' does not exist; skipping drop." +fi + +STREAMS_OUTPUT="$($DC run --rm ksqldb-cli ksql "${KSQL_SERVER_URL}" -e "SHOW STREAMS;")" +if printf '%s\n' "$STREAMS_OUTPUT" | awk -v NAME="$STREAM_UPPER" '$1 == NAME { found = 1 } END { exit !found }'; then + echo "Dropping stream '${STREAM}'..." + $DC run --rm ksqldb-cli ksql "${KSQL_SERVER_URL}" -e "DROP STREAM ${STREAM};" +else + echo "Stream '${STREAM}' does not exist; skipping drop." +fi diff --git a/kafka-setup/sh/reset-state.sh b/kafka-setup/sh/reset-state.sh new file mode 100755 index 0000000..4ea655d --- /dev/null +++ b/kafka-setup/sh/reset-state.sh @@ -0,0 +1,21 @@ +#!/usr/bin/env bash +set -euo pipefail + +# Rebuild Kafka topic state and the dependent ksqlDB stream/table state without +# restarting the Docker stack. + +SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)" + +echo "Starting Kafka reset workflow..." + +echo "Resetting ksqlDB state..." +"$SCRIPT_DIR/ksql/03_reset-accounts-state.sh" + +echo "Resetting Kafka source topic..." +"$SCRIPT_DIR/kafka/01_recreate-account-updates-topic.sh" + +echo "Recreating ksqlDB stream and table..." +"$SCRIPT_DIR/ksql/01_setup-streams.sh" +"$SCRIPT_DIR/ksql/02_create-accounts-table.sh" + +echo "Done resetting Kafka and ksqlDB state."