Skip to content

abukva/yggdrasil

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

10 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Yggdrasil Banner

Yggdrasil

High-Performance Cryptocurrency Market Data Streaming Library

Docs License Rust

Key Features

  • Type-Safe Data Modeling: Strongly-typed symbols, exchanges, and normalized data structures
  • Automatic Redundancy: Configure multiple redundant WebSocket connections for high availability
  • Built-in Deduplication: Eliminates duplicate messages across redundant streams
  • Exchange-Agnostic Design: Easily add support for new exchanges through the Parser trait
  • Comprehensive Telemetry: Track network latency, parsing time, and processing metrics via Prometheus
  • Async/Await: Built on Tokio for efficient async I/O
  • Zero-Copy Parsing: Uses simd-json for high-performance JSON parsing
  • Graceful Shutdown: Proper cleanup and cancellation support

Performance

When running a benchmark code the library achieves ~ 8μs median latency for 300k msg / sec.

Scaling of latency

Quick Start

Add Yggdrasil to your Cargo.toml:

[dependencies]
yggdrasil = "0.1.0"
tokio = { version = "1.49", features = ["full"] }
tokio-stream = "0.1"

Here's a minimal example that connects to Binance and prints BTC/USDT quotes:

use tokio_stream::StreamExt;
use yggdrasil::parsers::binance::quotes::BinanceQuotesParser;
use yggdrasil::symbols::BTCUSDT;
use yggdrasil::{DataStream, StreamConfig};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    tracing_subscriber::fmt::init();

    // Create a parser for Binance BTC/USDT quotes
    let parser = BinanceQuotesParser::<BTCUSDT>::default();

    // Configure the stream with 3 redundant connections
    let config = StreamConfig::new(
        "binance_btcusdt_quotes",
        3,
        "wss://fstream.binance.com/ws/btcusdt@bookTicker",
        None,
        parser,
    )?;

    // Initialize and start the data stream
    let mut data_stream = DataStream::new(config);
    let mut rx = data_stream.run().await;

    // Consume normalized messages
    while let Some(result) = rx.next().await {
        match result {
            Ok(bbo) => println!("BBO: {:?}", bbo),
            Err(e) => eprintln!("Error: {}", e),
        }
    }

    // Gracefully shutdown
    data_stream.stop().await;
    Ok(())
}

Run the included example:

cargo run --bin example_print_quotes

Architecture

┌─────────────────────┐
│   User Application  │
│                     │
└──────────┬──────────┘
           │ subscribes to stream
           ▼
┌─────────────────────────────────────────┐
│         DataStream<Parser>              │
│  • Manages worker lifecycle             │
│  • Deduplicates messages                │
│  • Normalizes data                      │
│  • Broadcasts to subscribers            │
└──────────┬──────────────────────────────┘
           │ spawns N redundant workers
           ▼
┌─────────────────────────────────────────┐
│          Worker Tasks (N)               │
│  • Maintain WebSocket connections       │
│  • Parse raw JSON messages              │
│  • Handle reconnection automatically    │
└──────────┬──────────────────────────────┘
           │ connects to
           ▼
┌─────────────────────────────────────────┐
│      Exchange WebSocket APIs            │
│         (Binance, etc.)                 │
└─────────────────────────────────────────┘

Core Concepts

Yggdrasil is built around five main components:

1. Normalized Data Types (docs/DATA_TYPES.md)

Define unified data structures that represent market data regardless of the source exchange. The library provides BBO (Best Bid/Offer), and you can create custom types like Trade, OrderBook, etc.

pub struct BBO<S: Symbol, E: Exchange> {
    pub symbol: S,
    pub exchange: E,
    pub timestamp: DateTime<Utc>,
    pub best_bid_price: f64,
    pub best_ask_price: f64,
    // ...
}

2. Symbols and Exchanges (docs/SYMBOLS_AND_EXCHANGES.md)

Type-level identifiers that ensure compile-time type safety. Define symbols (trading pairs) and exchanges using simple macros:

define_symbols!(BTCUSDT, ETHUSDT, SOLUSDT);
define_exchanges!(Binance, Coinbase, Kraken);

3. Parsers (docs/PARSERS.md)

Convert exchange-specific raw messages into normalized data types. Implement the Parser trait to add support for new exchanges:

impl<S: Symbol> Parser for BinanceQuotesParser<S> {
    type Raw = RawBinanceQuote;
    type Normalized = BBO<S, exchanges::Binance>;
    type Id = usize;

    fn normalize(&self, raw: Self::Raw) -> Self::Normalized { /* ... */ }
    fn get_id(raw: &Self::Raw) -> Self::Id { /* ... */ }
}

4. Stream Configuration (docs/STREAM_CONFIG.md)

Configure WebSocket connections, redundancy levels, and parser selection:

let config = StreamConfig::new(
    "stream_name",    // identifier for logging/metrics
    3,                // number of redundant connections
    "wss://...",      // WebSocket URL
    None,             // optional subscription message
    parser,           // your Parser implementation
)?;

5. Data Stream (docs/DATA_STREAM.md)

The main entry point for users. Initialize, run, and consume normalized data:

let mut data_stream = DataStream::new(config);
let mut rx = data_stream.run().await;

while let Some(Ok(data)) = rx.next().await {
    // Process normalized data
}

data_stream.stop().await;

Documentation

Telemetry

Yggdrasil automatically tracks and exports metrics via Prometheus at http://127.0.0.1:9000/metrics:

  • message_throughput: Number of normalized messages processed
  • network_latency: Time between exchange timestamp and local receipt
  • json_processing_latency: Time to parse raw JSON
  • channel_passing_latency: Time to pass messages through internal channels
  • normalizing_latency: Time to normalize parsed messages
  • total_processing_latency: End-to-end processing time

Performance

Yggdrasil is designed for high-frequency trading applications:

  • Uses simd-json for fast JSON parsing
  • Zero-copy message passing where possible
  • Pre-allocated buffers to reduce allocations
  • Dedicated blocking thread for deduplication (avoids async overhead)
  • High-resolution timestamps with quanta (minimal overhead)

Testing

Run integration tests:

cargo test

Dependencies

Key dependencies:

  • tokio: Async runtime
  • tokio-tungstenite: WebSocket client
  • simd-json: High-performance JSON parsing
  • serde: Serialization framework
  • chrono: Date and time handling
  • metrics: Telemetry framework
  • tracing: Structured logging

License

Licensed under the Apache License, Version 2.0 (LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0).

Contribution

Unless you explicitly state otherwise, any contribution intentionally submitted for inclusion in the work by you shall be licensed under the Apache License, Version 2.0, without any additional terms or conditions.

Contributing

Contributions are welcome! Here's how you can help:

Reporting Issues

  • Use GitHub Issues to report bugs
  • Include minimal reproduction steps
  • Provide your Rust version and OS

Pull Requests

  1. Fork the repository
  2. Create a feature branch (git checkout -b feature/amazing-feature)
  3. Make your changes
  4. Run tests: cargo test
  5. Run formatter: cargo fmt
  6. Run linter: cargo clippy
  7. Commit your changes (git commit -m 'Add amazing feature')
  8. Push to your branch (git push origin feature/amazing-feature)
  9. Open a Pull Request

About

High-performance cryptocurrency market data streaming library

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages