Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions api/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,8 @@ class MarketDataResponse(BaseModel):
market_cap: Optional[float] = None
pe_ratio: Optional[float] = None
timestamp: datetime
provider: Optional[str] = None
feed_latency_ms: Optional[float] = None


class TechnicalIndicatorsResponse(BaseModel):
Expand Down
48 changes: 48 additions & 0 deletions docs/user-guide/websocket-feeds.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# WebSocket Market Data Feeds

TradeGraph now supports low-latency market data by connecting to real-time vendor feeds. The financial analysis agent uses these feeds automatically when `include_market_data=True`, so you can mix fresh tick data with Yahoo Finance fundamentals in a single run.

## Supported Providers

| Provider | Asset Class | Notes |
| --- | --- | --- |
| Polygon | Equities | Requires `POLYGON_API_KEY`. Uses trades channel `T.*`. |
| Alpaca | Equities | Requires `ALPACA_API_KEY` and `ALPACA_API_SECRET`. Supports `ALPACA_DATA_FEED` (`iex` or `sip`). |
| Binance | Crypto | No authentication required. Streams trades via `symbol@trade`. |

Use the environment variables (or settings) to select defaults:

```env
DEFAULT_EQUITY_FEED_PROVIDER=polygon
DEFAULT_CRYPTO_FEED_PROVIDER=binance
POLYGON_API_KEY=...
ALPACA_API_KEY=...
ALPACA_API_SECRET=...
ALPACA_DATA_FEED=iex
WEBSOCKET_TIMEOUT_SECONDS=10
```

You can override feeds per symbol at runtime with `asset_types` and `feed_overrides` fields on the `FinancialAnalysisAgent` input payload.

## Monitoring Latency

Each `MarketData` entry now reports two additional fields:

```
provider: Source of the trade (`polygon`, `alpaca`, `binance`, ...)
feed_latency_ms: Milliseconds between trade timestamp and ingestion
```

This helps dashboards surface stale feeds or connectivity issues.

## Quick CLI Test

The repository ships with `examples/websocket_feed_demo.py` for manual testing.

```bash
python examples/websocket_feed_demo.py AAPL --asset-type equity \
--polygon-api-key $POLYGON_API_KEY
```

Specify `--provider alpaca` or `--asset-type crypto` to route to other feeds. The script prints the decoded trade payload along with the calculated latency so you can verify credentials before running full analyses.
```
103 changes: 103 additions & 0 deletions examples/websocket_feed_demo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
"""Quick script to test streaming market data feeds."""

import argparse
import asyncio
import os
from pprint import pprint

from tradegraph_financial_advisor.services.market_data import (
MarketDataFeedConfig,
MarketDataWebSocketClient,
)


def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Test WebSocket market feeds")
parser.add_argument("symbol", help="Ticker symbol, e.g., AAPL or BTCUSDT")
parser.add_argument(
"--asset-type",
default="equity",
choices=["equity", "crypto"],
help="Asset class for routing to the right feed",
)
parser.add_argument(
"--provider",
default=None,
help="Override provider (binance, polygon, alpaca)",
)
parser.add_argument(
"--equity-provider",
default=os.getenv("DEFAULT_EQUITY_FEED_PROVIDER", "polygon"),
help="Default provider for equities",
)
parser.add_argument(
"--crypto-provider",
default=os.getenv("DEFAULT_CRYPTO_FEED_PROVIDER", "binance"),
help="Default provider for crypto symbols",
)
parser.add_argument(
"--polygon-api-key",
default=os.getenv("POLYGON_API_KEY"),
help="Polygon API key",
)
parser.add_argument(
"--alpaca-api-key",
default=os.getenv("ALPACA_API_KEY"),
help="Alpaca API key",
)
parser.add_argument(
"--alpaca-api-secret",
default=os.getenv("ALPACA_API_SECRET"),
help="Alpaca API secret",
)
parser.add_argument(
"--alpaca-feed",
default=os.getenv("ALPACA_DATA_FEED", "iex"),
help="Alpaca feed (iex or sip)",
)
parser.add_argument(
"--timeout",
type=int,
default=int(os.getenv("WEBSOCKET_TIMEOUT_SECONDS", "10")),
help="Seconds to wait for a trade message",
)
return parser.parse_args()


def build_config(args: argparse.Namespace) -> MarketDataFeedConfig:
return MarketDataFeedConfig(
equity_provider=args.equity_provider,
crypto_provider=args.crypto_provider,
polygon_api_key=args.polygon_api_key,
alpaca_api_key=args.alpaca_api_key,
alpaca_api_secret=args.alpaca_api_secret,
alpaca_feed=args.alpaca_feed,
timeout_seconds=args.timeout,
)


async def main() -> None:
args = parse_args()
config = build_config(args)

async with MarketDataWebSocketClient(config=config) as client:
trade = await client.get_realtime_trade(
args.symbol,
asset_type=args.asset_type,
provider_override=args.provider,
)

pprint(
{
"symbol": trade.symbol,
"price": trade.price,
"size": trade.size,
"provider": trade.provider,
"timestamp": trade.timestamp.isoformat(),
"latency_ms": trade.latency_ms,
}
)


if __name__ == "__main__":
asyncio.run(main())
3 changes: 2 additions & 1 deletion mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ nav:
- Python API: user-guide/python-api.md
- Web Interface: user-guide/web-interface.md
- Analysis Types: user-guide/analysis-types.md
- WebSocket Feeds: user-guide/websocket-feeds.md
- API Reference:
- Core Classes: api-reference/core.md
- Agents: api-reference/agents.md
Expand Down Expand Up @@ -113,4 +114,4 @@ extra:
- icon: fontawesome/brands/python
link: https://pypi.org/project/tradegraph-financial-advisor
version:
provider: mike
provider: mike
69 changes: 54 additions & 15 deletions src/tradegraph_financial_advisor/agents/financial_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@
from .base_agent import BaseAgent
from ..models.financial_data import CompanyFinancials, MarketData, TechnicalIndicators
from ..config.settings import settings
from ..services.market_data import (
MarketDataFeedConfig,
MarketDataWebSocketClient,
WebSocketFeedError,
)


class FinancialAnalysisAgent(BaseAgent):
Expand All @@ -18,23 +23,42 @@ def __init__(self, **kwargs):
**kwargs,
)
self.session: Optional[aiohttp.ClientSession] = None
self.market_data_stream: Optional[MarketDataWebSocketClient] = None

async def start(self) -> None:
await super().start()
self.session = aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=settings.analysis_timeout_seconds)
)
feed_config = MarketDataFeedConfig(
equity_provider=settings.default_equity_feed_provider,
crypto_provider=settings.default_crypto_feed_provider,
polygon_api_key=settings.polygon_api_key,
alpaca_api_key=settings.alpaca_api_key,
alpaca_api_secret=settings.alpaca_api_secret,
alpaca_feed=settings.alpaca_data_feed,
timeout_seconds=settings.websocket_timeout_seconds,
)
self.market_data_stream = MarketDataWebSocketClient(
session=self.session,
config=feed_config,
)

async def stop(self) -> None:
if self.session:
await self.session.close()
if self.market_data_stream:
await self.market_data_stream.aclose()
self.market_data_stream = None
await super().stop()

async def execute(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
symbols = input_data.get("symbols", [])
include_financials = input_data.get("include_financials", True)
include_technical = input_data.get("include_technical", True)
include_market_data = input_data.get("include_market_data", True)
asset_types = input_data.get("asset_types", {}) # symbol -> equity/crypto
feed_overrides = input_data.get("feed_overrides", {}) # symbol -> provider name

logger.info(f"Analyzing financial data for symbols: {symbols}")

Expand All @@ -45,7 +69,11 @@ async def execute(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
symbol_data = {}

if include_market_data:
market_data = await self._get_market_data(symbol)
asset_type = asset_types.get(symbol, "equity")
feed_override = feed_overrides.get(symbol)
market_data = await self._get_market_data(
symbol, asset_type=asset_type, feed_override=feed_override
)
symbol_data["market_data"] = (
market_data.dict() if market_data else None
)
Expand Down Expand Up @@ -73,32 +101,43 @@ async def execute(self, input_data: Dict[str, Any]) -> Dict[str, Any]:
"analysis_timestamp": datetime.now().isoformat(),
}

async def _get_market_data(self, symbol: str) -> Optional[MarketData]:
async def _get_market_data(
self, symbol: str, asset_type: str = "equity", feed_override: Optional[str] = None
) -> Optional[MarketData]:
try:
ticker = yf.Ticker(symbol)
info = ticker.info
history = ticker.history(period="1d")
if not self.market_data_stream:
raise RuntimeError("Market data stream is not initialized. Did you call start()?")

if history.empty:
return None
trade = await self.market_data_stream.get_realtime_trade(
symbol, asset_type=asset_type, provider_override=feed_override
)

latest = history.iloc[-1]
ticker = yf.Ticker(symbol)
info = ticker.info
previous_close = info.get("previousClose")
change = trade.price - previous_close if previous_close else 0.0
change_percent = (
(change / previous_close) * 100 if previous_close else 0.0
)

market_data = MarketData(
symbol=symbol,
current_price=float(latest["Close"]),
change=float(latest["Close"] - latest["Open"]),
change_percent=float(
(latest["Close"] - latest["Open"]) / latest["Open"] * 100
),
volume=int(latest["Volume"]),
current_price=float(trade.price),
change=float(change),
change_percent=float(change_percent),
volume=int(trade.size or 0),
market_cap=info.get("marketCap"),
pe_ratio=info.get("trailingPE"),
timestamp=datetime.now(),
timestamp=trade.timestamp,
provider=trade.provider,
feed_latency_ms=trade.latency_ms,
)

return market_data

except WebSocketFeedError as e:
logger.error(f"WebSocket feed error for {symbol}: {str(e)}")
return None
except Exception as e:
logger.error(f"Error fetching market data for {symbol}: {str(e)}")
return None
Expand Down
11 changes: 11 additions & 0 deletions src/tradegraph_financial_advisor/config/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,21 @@ class Settings(BaseSettings):
openai_api_key: str = Field("", env="OPENAI_API_KEY")
alpha_vantage_api_key: Optional[str] = Field(None, env="ALPHA_VANTAGE_API_KEY")
financial_data_api_key: Optional[str] = Field(None, env="FINANCIAL_DATA_API_KEY")
polygon_api_key: Optional[str] = Field(None, env="POLYGON_API_KEY")
alpaca_api_key: Optional[str] = Field(None, env="ALPACA_API_KEY")
alpaca_api_secret: Optional[str] = Field(None, env="ALPACA_API_SECRET")

log_level: str = Field("INFO", env="LOG_LEVEL")
max_concurrent_agents: int = Field(5, env="MAX_CONCURRENT_AGENTS")
analysis_timeout_seconds: int = Field(30, env="ANALYSIS_TIMEOUT_SECONDS")
websocket_timeout_seconds: int = Field(10, env="WEBSOCKET_TIMEOUT_SECONDS")
default_equity_feed_provider: str = Field(
"polygon", env="DEFAULT_EQUITY_FEED_PROVIDER"
)
default_crypto_feed_provider: str = Field(
"binance", env="DEFAULT_CRYPTO_FEED_PROVIDER"
)
alpaca_data_feed: str = Field("iex", env="ALPACA_DATA_FEED")

news_sources: List[str] = Field(
default_factory=lambda: [
Expand Down
2 changes: 2 additions & 0 deletions src/tradegraph_financial_advisor/models/financial_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ class MarketData(BaseModel):
market_cap: Optional[float] = None
pe_ratio: Optional[float] = None
timestamp: datetime
provider: Optional[str] = None
feed_latency_ms: Optional[float] = None


class TechnicalIndicators(BaseModel):
Expand Down
Loading
Loading