- Type-Safe Data Modeling: Strongly-typed symbols, exchanges, and normalized data structures
- Automatic Redundancy: Configure multiple redundant WebSocket connections for high availability
- Built-in Deduplication: Eliminates duplicate messages across redundant streams
- Exchange-Agnostic Design: Easily add support for new exchanges through the
Parsertrait - Comprehensive Telemetry: Track network latency, parsing time, and processing metrics via Prometheus
- Async/Await: Built on Tokio for efficient async I/O
- Zero-Copy Parsing: Uses
simd-jsonfor high-performance JSON parsing - Graceful Shutdown: Proper cleanup and cancellation support
When running a benchmark code the library achieves ~ 8μs median latency for 300k msg / sec.
Add Yggdrasil to your Cargo.toml:
[dependencies]
yggdrasil = "0.1.0"
tokio = { version = "1.49", features = ["full"] }
tokio-stream = "0.1"Here's a minimal example that connects to Binance and prints BTC/USDT quotes:
use tokio_stream::StreamExt;
use yggdrasil::parsers::binance::quotes::BinanceQuotesParser;
use yggdrasil::symbols::BTCUSDT;
use yggdrasil::{DataStream, StreamConfig};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt::init();
// Create a parser for Binance BTC/USDT quotes
let parser = BinanceQuotesParser::<BTCUSDT>::default();
// Configure the stream with 3 redundant connections
let config = StreamConfig::new(
"binance_btcusdt_quotes",
3,
"wss://fstream.binance.com/ws/btcusdt@bookTicker",
None,
parser,
)?;
// Initialize and start the data stream
let mut data_stream = DataStream::new(config);
let mut rx = data_stream.run().await;
// Consume normalized messages
while let Some(result) = rx.next().await {
match result {
Ok(bbo) => println!("BBO: {:?}", bbo),
Err(e) => eprintln!("Error: {}", e),
}
}
// Gracefully shutdown
data_stream.stop().await;
Ok(())
}Run the included example:
cargo run --bin example_print_quotes┌─────────────────────┐
│ User Application │
│ │
└──────────┬──────────┘
│ subscribes to stream
▼
┌─────────────────────────────────────────┐
│ DataStream<Parser> │
│ • Manages worker lifecycle │
│ • Deduplicates messages │
│ • Normalizes data │
│ • Broadcasts to subscribers │
└──────────┬──────────────────────────────┘
│ spawns N redundant workers
▼
┌─────────────────────────────────────────┐
│ Worker Tasks (N) │
│ • Maintain WebSocket connections │
│ • Parse raw JSON messages │
│ • Handle reconnection automatically │
└──────────┬──────────────────────────────┘
│ connects to
▼
┌─────────────────────────────────────────┐
│ Exchange WebSocket APIs │
│ (Binance, etc.) │
└─────────────────────────────────────────┘
Yggdrasil is built around five main components:
1. Normalized Data Types (docs/DATA_TYPES.md)
Define unified data structures that represent market data regardless of the source exchange. The library provides BBO (Best Bid/Offer), and you can create custom types like Trade, OrderBook, etc.
pub struct BBO<S: Symbol, E: Exchange> {
pub symbol: S,
pub exchange: E,
pub timestamp: DateTime<Utc>,
pub best_bid_price: f64,
pub best_ask_price: f64,
// ...
}2. Symbols and Exchanges (docs/SYMBOLS_AND_EXCHANGES.md)
Type-level identifiers that ensure compile-time type safety. Define symbols (trading pairs) and exchanges using simple macros:
define_symbols!(BTCUSDT, ETHUSDT, SOLUSDT);
define_exchanges!(Binance, Coinbase, Kraken);3. Parsers (docs/PARSERS.md)
Convert exchange-specific raw messages into normalized data types. Implement the Parser trait to add support for new exchanges:
impl<S: Symbol> Parser for BinanceQuotesParser<S> {
type Raw = RawBinanceQuote;
type Normalized = BBO<S, exchanges::Binance>;
type Id = usize;
fn normalize(&self, raw: Self::Raw) -> Self::Normalized { /* ... */ }
fn get_id(raw: &Self::Raw) -> Self::Id { /* ... */ }
}4. Stream Configuration (docs/STREAM_CONFIG.md)
Configure WebSocket connections, redundancy levels, and parser selection:
let config = StreamConfig::new(
"stream_name", // identifier for logging/metrics
3, // number of redundant connections
"wss://...", // WebSocket URL
None, // optional subscription message
parser, // your Parser implementation
)?;5. Data Stream (docs/DATA_STREAM.md)
The main entry point for users. Initialize, run, and consume normalized data:
let mut data_stream = DataStream::new(config);
let mut rx = data_stream.run().await;
while let Some(Ok(data)) = rx.next().await {
// Process normalized data
}
data_stream.stop().await;- Data Types Guide: How to define custom normalized data types
- Symbols & Exchanges Guide: Understanding the type system
- Parser Implementation Guide: Adding support for new exchanges
- Stream Configuration Guide: Configuring streams and connections
- Data Stream API Guide: Using the main user API
- Architecture Guide: Understanding internal design (for library extenders)
Yggdrasil automatically tracks and exports metrics via Prometheus at http://127.0.0.1:9000/metrics:
- message_throughput: Number of normalized messages processed
- network_latency: Time between exchange timestamp and local receipt
- json_processing_latency: Time to parse raw JSON
- channel_passing_latency: Time to pass messages through internal channels
- normalizing_latency: Time to normalize parsed messages
- total_processing_latency: End-to-end processing time
Yggdrasil is designed for high-frequency trading applications:
- Uses
simd-jsonfor fast JSON parsing - Zero-copy message passing where possible
- Pre-allocated buffers to reduce allocations
- Dedicated blocking thread for deduplication (avoids async overhead)
- High-resolution timestamps with
quanta(minimal overhead)
Run integration tests:
cargo testKey dependencies:
tokio: Async runtimetokio-tungstenite: WebSocket clientsimd-json: High-performance JSON parsingserde: Serialization frameworkchrono: Date and time handlingmetrics: Telemetry frameworktracing: Structured logging
Licensed under the Apache License, Version 2.0 (LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0).
Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in the work by you shall be licensed under the Apache License, Version 2.0, without any additional terms or conditions.
Contributions are welcome! Here's how you can help:
- Use GitHub Issues to report bugs
- Include minimal reproduction steps
- Provide your Rust version and OS
- Fork the repository
- Create a feature branch (
git checkout -b feature/amazing-feature) - Make your changes
- Run tests:
cargo test - Run formatter:
cargo fmt - Run linter:
cargo clippy - Commit your changes (
git commit -m 'Add amazing feature') - Push to your branch (
git push origin feature/amazing-feature) - Open a Pull Request

