diff --git a/README.md b/README.md index 1c0d156..8723389 100644 --- a/README.md +++ b/README.md @@ -125,6 +125,13 @@ README.md - ExchangeRate API for live currency conversion - yfinance for historical market-data retrieval and analytics +### Storage + +- DuckDB — local analytical storage for normalized historical market data + +>[!Note] +> See docs/storage.md for details. + --- ## Planned / Future Tech Stack @@ -138,40 +145,32 @@ Planned or likely future technologies include: - Frankfurter API for historical FX data - possible additional market-data APIs later -### Data processing - -- possibly Polars later for larger datasets - ### Storage - PostgreSQL -- DuckDB -- Parquet -- optional cloud storage ### Visualization and UI - NiceGUI +- Django ### DevOps and deployment - Docker Compose -- cloud deployment later +- Travis CI ### Cloud and data engineering -- Azure, GCP or AWS depending on project direction +- Azure - scheduled ingestion -- data quality checks -- reporting pipelines +- agentic Workflows +- Blob Storage +- scaled analysis ### AI and agentic workflows - LLM-assisted summaries - RAG over stored reports or notes -- agentic data checks -- anomaly monitoring -- human-in-the-loop signal review > [!CAUTION] > AI and agentic features are future-stage ideas. diff --git a/argus_probe.duckdb b/argus_probe.duckdb new file mode 100644 index 0000000..9c9ef34 Binary files /dev/null and b/argus_probe.duckdb differ diff --git a/docs/roadmap.md b/docs/roadmap.md index 85706af..c637327 100644 --- a/docs/roadmap.md +++ b/docs/roadmap.md @@ -27,92 +27,121 @@ Scope: Outcome: Sprint 1 established the local ARGUS foundation with package structure, GUI prototype, analytics prototype, tests, documentation, CI, Dependabot and governance files. -### Sprint 2 — Market Analytics & Data Source Expansion +### Sprint 2 — Reporting & Market Analytics Foundation **Status:** In progress -Move from simple FX conversion toward broader market analytics. +Move ARGUS from a simple FX-focused prototype toward a first usable market analytics and reporting tool. -Scope: +**Scope:** + +- Add stronger market analytics metrics: -- Add stronger market metrics: - cumulative return - strongest / weakest day - rolling volatility - - performance analytics - - risk analytics -- Extend the current dashboard without adding unnecessary chart noise -- Add or evaluate new data clients: - - Frankfurter for historical FX data + - basic performance analytics + - basic risk analytics +- Add or improve real market data support: + - yfinance for broader market data -- Replace or reduce dependency on the current ExchangeRate API where needed + - existing FX conversion remains available where useful - Improve pandas-based analysis workflows -- Add tests for metric calculations and data transformations -- Document metric definitions, assumptions and chart behavior +- Introduce local storage for historical market data +- Add report generation and export +- Add a first simple prediction feature +- Introduce NiceGUI as the next GUI direction +- Extend the current dashboard with real market analytics +- Add tests for metric calculations, data transformations and storage behavior +- Improve CI/CD with first deployment or release automation steps -Outcome: -ARGUS becomes a basic market analytics tool, not only a converter. +**Outcome:** + +ARGUS becomes a basic market analytics and reporting tool. +Users can fetch market data, store it locally, calculate metrics, generate a first report and view results through a first modern dashboard. -### Sprint 3 — Storage, Web-Ready UI & Data Architecture +--- + +### Sprint 3 — Advanced Local Analytics & Product Quality **Status:** Planned -Prepare ARGUS for persistent data workflows and a stronger product interface. +Expand the local ARGUS application into a stronger analytics product with better data handling, UI structure, predictions and quality checks. -Scope: +**Scope:** -- Add local storage layer: - - PostgreSQL, DuckDB, SQLite or Parquet depending on use case -- Store historical market data -- Separate ingestion, transformation, analytics and presentation layers more clearly -- Start NiceGUI as the main web-ready UI direction -- Keep Tkinter as legacy/prototype unless still useful -- Keep CLI as internal/debug interface only -- Add clearer architecture documentation -- Prepare the project for larger data workflows and external contributors +- Extend the local storage layer +- Add a first local ETL workflow +- Improve the NiceGUI dashboard structure and usability +- Explore how NiceGUI can later interact with a more modern frontend stack such as Django, React or Node.js-based services +- Keep Tkinter as legacy/prototype unless it is no longer useful +- Add more metrics, instruments and prediction features +- Improve report templates and report structure +- Introduce first LLM-based summaries for generated reports +- Add first performance tests +- Introduce Snyk or another dependency/security scanning workflow +- Improve code quality, test coverage and maintainability -Outcome: -ARGUS has a clearer data architecture and starts moving from local prototype toward a scalable analytics application. +**Outcome:** -### Sprint 4 — Cloud, Pipelines & Portfolio-Grade Data Engineering +ARGUS becomes a more scalable local analytics application. +It can process more instruments, produce better reports, provide first automated summaries and offer more reliable insight into market data. -**Status:** Future +--- -Turn ARGUS into a stronger end-to-end data engineering project. +### Sprint 4 — Extended Analysis & Cloud-Ready Foundation -Scope: +**Status:** Planned -- Docker / Docker Compose -- Scheduled data ingestion -- Cloud storage or cloud database -- CI/CD improvements -- Data quality checks -- Basic pipeline orchestration -- Reporting layer -- Architecture diagram -- Deployment documentation +Prepare ARGUS for deeper analysis, cloud interaction and future portfolio-assistant workflows while keeping the local product usable and transparent. -Target workflow: +**Scope:** -```text -API → Ingestion → Storage → Transformation → Analysis → Visualization → CI/CD -``` +- Add Docker Compose for a more complete local development setup +- Introduce a first Azure connection, focused on simple storage or artifact exchange +- Improve the LLM workflow +- Introduce a first RAG-ready structure for reports, notes, documentation and stored analysis artifacts +- Add data quality checks +- Improve caching and efficient storage of market data +- Add more export options for users +- Add more metrics and better metadata visualization +- Improve transparency around data sources, generated reports and analysis assumptions +- Prepare clear interfaces for future cloud and assistant workflows -### Sprint 5 — AI-Assisted Research & Agentic Monitoring +**Outcome:** -**Status:** Future vision +ARGUS becomes ready to interact with a future cloud layer. +The application can produce clearer, more transparent market analysis and prepares the foundation for retrieval-based workflows, stronger automation and future ARGUS Core integration. -Add AI support only after the data, storage, service and reporting layers are stable. +--- -Scope: +### Sprint 5 — Cloud Interaction & Agentic Monitoring Foundation -- LLM-assisted report summaries -- Explanation of unusual movements -- RAG over stored market notes, reports or documentation -- Agentic checks for data quality, anomalies and recurring market scans -- Human-in-the-loop signal review -- Automated monitoring workflows +**Status:** Planned -Outcome: +Start the first cloud-connected ARGUS workflows and introduce the foundation for monitoring, agentic checks and strategy-support features. + +**Scope:** + +- Add first cloud workflows that extend local analysis +- Connect local ARGUS workflows with the first cloud-side services +- Extend RAG over stored market notes, reports, documentation and analysis artifacts +- Add agentic checks for: + + - data quality + - anomalies + - recurring market scans + - report consistency +- Add first human-in-the-loop review workflows for signals or strategy ideas +- Add automated monitoring workflows +- Prepare the first foundations for: + + - paper trading + - backtesting + - controlled strategy evaluation + - future portfolio-assistant workflows + +**Outcome:** -ARGUS starts behaving like its name: a system that continuously watches market data, evaluates it and helps generate useful signals. +ARGUS and the first cloud-side services begin to interact. +ARGUS becomes useful not only as an analytics and reporting tool, but also as the first foundation for monitoring, strategy evaluation and controlled market-research workflows. diff --git a/docs/storage.md b/docs/storage.md new file mode 100644 index 0000000..27ce106 --- /dev/null +++ b/docs/storage.md @@ -0,0 +1,154 @@ +# ARGUS Storage Layer + +ARGUS uses DuckDB as the local storage layer for normalized market data. + +The storage layer stores ARGUS-internal market data structures and provides reusable historical data for analytics, charts, dashboards and reports. + +The storage design follows the direction described in [`docs/research-databases-and-storage.md`](research-databases-and-storage.md). + +## Storage Workflow + +ARGUS uses a storage-first workflow for historical market data. + +```text +User / GUI / Analytics request + ↓ +Market data service + ↓ +Check DuckDB storage + ↓ +If data exists: + read stored data + return it for analytics, charts or reports + +If data is missing: + fetch data from a client/API + normalize the response into ARGUS-internal data + return the normalized data + save the normalized data in DuckDB +``` + +DuckDB is used to avoid unnecessary repeated API calls and to make historical market data reusable across analytics, dashboard and reporting workflows. + +Fresh API data can be used immediately after normalization and is also persisted so future requests can use the local storage layer first. + +## Schema Overview + +The first storage schema is based on three related entities: + +```text +data_sources +instruments +price_bars +``` + +### `data_sources` + +Stores where market data came from. + +Examples: + +```text +yfinance +ExchangeRate API +Frankfurter +FRED +``` + +Each source describes a provider or API that can deliver market, FX or macro data. + +### `instruments` + +Stores what ARGUS can analyze. + +Examples: + +```text +EUR/USD +AAPL +SPY +BTC-USD +``` + +An instrument represents the internal ARGUS identity of an asset, currency pair, ETF, index or other market object. + +Provider-specific symbols should be normalized before storage. For example: + +```text +yfinance provider symbol: EURUSD=X +ARGUS instrument symbol: EUR/USD +``` + +### `price_bars` + +Stores historical time-series values in an OHLCV-ready structure. + +A price bar belongs to: + +```text +one data source +one instrument +one timestamp +one timeframe +``` + +FX rates are stored as `close` values. + +For simple FX data, the remaining OHLCV fields can stay empty. For broader market data, the same structure can store open, high, low, close, adjusted close and volume values. + +The combination of source, instrument, timestamp and timeframe identifies a unique stored price bar. + +## Internal Models and Storage + +ARGUS uses internal domain models before data is stored: + +```text +DataSource +Instrument +PriceBar +``` + +These models describe the meaning of the data inside ARGUS. + +The storage layer translates these internal models into DuckDB tables: + +```text +DataSource -> data_sources +Instrument -> instruments +PriceBar -> price_bars +``` + +In Python, a `PriceBar` references a `DataSource` and an `Instrument`. + +In DuckDB, this relationship is stored through IDs: + +```text +price_bars.source_id -> data_sources.id +price_bars.instrument_id -> instruments.id +``` + +This keeps the database normalized while still allowing ARGUS to work with meaningful internal models in Python. + +## Reading Stored Data + +Stored price bars can be read by: + +```text +source +instrument +start date +end date +``` + +The storage layer joins `price_bars`, `data_sources` and `instruments` so that stored IDs become readable market data again. + +Read operations return tabular data that can be used by: + +```text +analytics +charts +dashboards +reports +``` + +This allows ARGUS to process stored historical data without depending on raw API response structures. diff --git a/pyproject.toml b/pyproject.toml index e2dc784..a8ac7b1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -9,6 +9,7 @@ dependencies = [ "numpy", "matplotlib", "yfinance", + "duckdb", ] [project.optional-dependencies] diff --git a/src/argus/storage/__init__.py b/src/argus/storage/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/argus/storage/database.py b/src/argus/storage/database.py new file mode 100644 index 0000000..ea7128c --- /dev/null +++ b/src/argus/storage/database.py @@ -0,0 +1,271 @@ +import duckdb +from datetime import date +import pandas as pd +from argus.domain.internal_models import DataSource, PriceBar, Instrument + + +def initialize_database(database_path: str) -> None: + """ + Initialize the DuckDB database schema. + + Creates the required sequences and tables for data sources, + instruments, and price bars. + + Args: + database_path (str): Path to the DuckDB database file. + + Returns: + None + """ + queries = [ + "CREATE SEQUENCE IF NOT EXISTS data_sources_id_seq;", + "CREATE SEQUENCE IF NOT EXISTS instruments_id_seq;", + "CREATE SEQUENCE IF NOT EXISTS price_bars_id_seq;", + """ + CREATE TABLE IF NOT EXISTS data_sources ( + id INTEGER PRIMARY KEY DEFAULT nextval('data_sources_id_seq'), + name TEXT NOT NULL UNIQUE, + provider_kind TEXT NOT NULL, + requires_api_key BOOLEAN NOT NULL + ); + """, + """ + CREATE TABLE IF NOT EXISTS instruments ( + id INTEGER PRIMARY KEY DEFAULT nextval('instruments_id_seq'), + symbol TEXT NOT NULL UNIQUE, + name TEXT NOT NULL, + asset_class TEXT NOT NULL, + currency TEXT, + exchange TEXT, + base_currency TEXT, + quote_currency TEXT + ); + """, + """ + CREATE TABLE IF NOT EXISTS price_bars ( + id INTEGER PRIMARY KEY DEFAULT nextval('price_bars_id_seq'), + source_id INTEGER NOT NULL, + instrument_id INTEGER NOT NULL, + timestamp DATE NOT NULL, + timeframe TEXT NOT NULL, + close DOUBLE NOT NULL, + open DOUBLE, + high DOUBLE, + low DOUBLE, + adjusted_close DOUBLE, + volume DOUBLE, + FOREIGN KEY (source_id) REFERENCES data_sources (id), + FOREIGN KEY (instrument_id) REFERENCES instruments (id), + UNIQUE (source_id, instrument_id, timestamp, timeframe) + ); + """, + ] + + connection = duckdb.connect(database_path) + try: + for query in queries: + connection.execute(query) + finally: + connection.close() + + +def get_or_create_source(connection, source: DataSource) -> int: + """ + Get an existing data source ID or create a new data source. + + Searches for a data source by name. If it already exists, its ID is + returned. Otherwise, the data source is inserted and the new ID is + returned. + + Args: + connection: Active DuckDB connection. + source (DataSource): Data source model containing provider metadata. + + Returns: + int: Database ID of the existing or newly created data source. + + Raises: + ValueError: If the data source could not be inserted or found. + """ + insert_query = """ + INSERT INTO data_sources (name, provider_kind, requires_api_key) + VALUES (?,?,?) + ON CONFLICT DO NOTHING; + """ + search_query = """ + SELECT id FROM data_sources + WHERE name=? + """ + + result = connection.execute( + query=search_query, + parameters=[source.name], + ).fetchone() + if result is not None: + return result[0] + + connection.execute( + query=insert_query, + parameters=[source.name, source.provider_kind, source.requires_api_key], + ) + + result = connection.execute( + query=search_query, + parameters=[source.name], + ).fetchone() + + if result is None: + raise ValueError("Data source could not be inserted.") + + return result[0] + + +def get_or_create_instrument(connection, instrument: Instrument) -> int: + """ + Get an existing instrument ID or create a new instrument. + + Searches for an instrument by symbol. If it already exists, its ID is + returned. Otherwise, the instrument is inserted and the new ID is + returned. + + Args: + connection: Active DuckDB connection. + instrument (Instrument): Instrument model containing symbol and + asset metadata. + + Returns: + int: Database ID of the existing or newly created instrument. + + Raises: + ValueError: If the instrument could not be inserted or found. + """ + insert_query = """ + INSERT INTO instruments ( + symbol, + name, + asset_class, + currency, + exchange, + base_currency, + quote_currency) + VALUES (?,?,?,?,?,?,?) + ON CONFLICT DO NOTHING; + """ + search_query = """ + SELECT id FROM instruments + WHERE symbol=? + """ + + result = connection.execute( + query=search_query, + parameters=[instrument.symbol], + ).fetchone() + if result is not None: + return result[0] + + connection.execute( + query=insert_query, + parameters=[ + instrument.symbol, + instrument.name, + instrument.asset_class, + instrument.currency, + instrument.exchange, + instrument.base_currency, + instrument.quote_currency, + ], + ) + result = connection.execute( + query=search_query, + parameters=[instrument.symbol], + ).fetchone() + + if result is None: + raise ValueError("Instrument could not be inserted.") + + return result[0] + + +def insert_price_bar(db: str, price_bar: PriceBar) -> None: + insert_query = """ + INSERT INTO price_bars ( + source_id, + instrument_id, + timestamp, + timeframe, + close, + open, + high, + low, + adjusted_close, + volume + ) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT DO NOTHING; + """ + connection = duckdb.connect(db) + try: + source_id = get_or_create_source(connection, price_bar.source) + instrument_id = get_or_create_instrument(connection, price_bar.instrument) + connection.execute( + query=insert_query, + parameters=[ + source_id, + instrument_id, + price_bar.timestamp, + price_bar.timeframe, + price_bar.close, + price_bar.open, + price_bar.high, + price_bar.low, + price_bar.adjusted_close, + price_bar.volume, + ], + ) + finally: + connection.close() + + +def read_price_bars( + db: str, + source: DataSource, + instrument: Instrument, + start_date: date, + end_date: date, +) -> pd.DataFrame: + + search_query = """ + SELECT + data_sources.name AS source_name, + instruments.symbol AS instrument_symbol, + price_bars.timestamp, + price_bars.timeframe, + price_bars.open, + price_bars.high, + price_bars.low, + price_bars.close, + price_bars.adjusted_close, + price_bars.volume + FROM price_bars + JOIN data_sources ON price_bars.source_id = data_sources.id + JOIN instruments ON price_bars.instrument_id = instruments.id + WHERE data_sources.name = ? + AND instruments.symbol = ? + AND price_bars.timestamp BETWEEN ? AND ? + ORDER BY price_bars.timestamp; + """ + + connection = duckdb.connect(db) + try: + result = connection.execute( + query=search_query, + parameters=[ + source.name, + instrument.symbol, + start_date, + end_date, + ], + ).df() + finally: + connection.close() + return result diff --git a/tests/test_exchangerate_client.py b/tests/test_exchangerate_client.py index 1f15512..132a8a8 100644 --- a/tests/test_exchangerate_client.py +++ b/tests/test_exchangerate_client.py @@ -65,7 +65,6 @@ def test_check_currency_key_error(monkeypatch, capsys): test_resp.json.return_value = { "result": "success", # not passing "success" bypases the "conversion_rate" checking "error_type": "", - # "conversion_rate" fehlt absichtlich } def test_get_resp(url, timeout): diff --git a/tests/test_storage_database.py b/tests/test_storage_database.py new file mode 100644 index 0000000..d513008 --- /dev/null +++ b/tests/test_storage_database.py @@ -0,0 +1,222 @@ +from datetime import date + +import duckdb + +from argus.domain.internal_models import DataSource, Instrument, PriceBar +from argus.storage.database import ( + initialize_database, + insert_price_bar, + read_price_bars, +) + + +def test_initialize_database_creates_required_tables(tmp_path): + db = tmp_path / "test.duckdb" + + initialize_database(db) + connection = duckdb.connect(db) + tables = connection.execute("SHOW TABLES;").fetchall() + connection.close() + table_names = {row[0] for row in tables} + + assert "data_sources" in table_names + assert "instruments" in table_names + assert "price_bars" in table_names + + +def test_data_is_inserted(tmp_path): + source = DataSource( + name="Yahoo", provider_kind="yfinance_api", requires_api_key=False + ) + + instrument = Instrument( + symbol="EUR/USD", + name="EUR - USD Rate", + asset_class="fx", + base_currency="EUR", + quote_currency="USD", + ) + + pricebar = PriceBar( + source=source, + instrument=instrument, + timestamp=date(2026, 1, 1), + timeframe="1d", + close=1.89, + ) + + db = tmp_path / "test.duckdb" + initialize_database(db) + insert_price_bar(db, pricebar) + connection = duckdb.connect(db) + + instrument_count = connection.execute( + "SELECT COUNT(*) FROM instruments;" + ).fetchone() + + source_count = connection.execute("SELECT COUNT(*) FROM data_sources;").fetchone() + + price_bar_count = connection.execute("SELECT COUNT(*) FROM price_bars;").fetchone() + + assert instrument_count is not None + assert source_count is not None + assert price_bar_count is not None + assert instrument_count[0] == 1 + assert source_count[0] == 1 + assert price_bar_count[0] == 1 + + +def test_fx_has_correct_format(tmp_path): + source = DataSource( + name="Yahoo", provider_kind="yfinance_api", requires_api_key=False + ) + + instrument = Instrument( + symbol="EUR/USD", + name="EUR - USD Rate", + asset_class="fx", + base_currency="EUR", + quote_currency="USD", + ) + + pricebar = PriceBar( + source=source, + instrument=instrument, + timestamp=date(2026, 1, 1), + timeframe="1d", + close=1.89, + ) + + db = tmp_path / "test.duckdb" + initialize_database(db) + insert_price_bar(db, pricebar) + connection = duckdb.connect(db) + + price_bar_fx = connection.execute("SELECT * FROM price_bars;").fetchone() + connection.close() + + assert price_bar_fx is not None + assert price_bar_fx[0] == 1 + assert price_bar_fx[1] == 1 + assert price_bar_fx[2] == 1 + assert price_bar_fx[3] == date(2026, 1, 1) + assert price_bar_fx[4] == "1d" + assert price_bar_fx[5] == 1.89 + assert price_bar_fx[6] is None + assert price_bar_fx[7] is None + assert price_bar_fx[8] is None + assert price_bar_fx[9] is None + assert price_bar_fx[10] is None + + +def test_duplicates_are_ignored(tmp_path): + source = DataSource( + name="Yahoo", provider_kind="yfinance_api", requires_api_key=False + ) + + instrument = Instrument( + symbol="EUR/USD", + name="EUR - USD Rate", + asset_class="fx", + base_currency="EUR", + quote_currency="USD", + ) + + pricebar = PriceBar( + source=source, + instrument=instrument, + timestamp=date(2026, 1, 1), + timeframe="1d", + close=1.89, + ) + + db = tmp_path / "test.duckdb" + initialize_database(db) + insert_price_bar(db, pricebar) + insert_price_bar(db, pricebar) + connection = duckdb.connect(db) + count = connection.execute("SELECT COUNT(*) FROM price_bars;").fetchone() + + assert count is not None + assert count[0] == 1 + + +def test_read_price_bars_returns_matching_data(tmp_path): + source = DataSource( + name="Yahoo", + provider_kind="yfinance_api", + requires_api_key=False, + ) + + instrument = Instrument( + symbol="EUR/USD", + name="EUR - USD Rate", + asset_class="fx", + base_currency="EUR", + quote_currency="USD", + ) + + pricebar = PriceBar( + source=source, + instrument=instrument, + timestamp=date(2026, 1, 1), + timeframe="1d", + close=1.89, + ) + + db = tmp_path / "test.duckdb" + initialize_database(db) + insert_price_bar(db, pricebar) + + result = read_price_bars( + db=db, + source=source, + instrument=instrument, + start_date=date(2026, 1, 1), + end_date=date(2026, 1, 31), + ) + + assert result.empty is False + assert len(result) == 1 + assert result.iloc[0]["source_name"] == "Yahoo" + assert result.iloc[0]["instrument_symbol"] == "EUR/USD" + assert result.iloc[0]["timeframe"] == "1d" + assert result.iloc[0]["close"] == 1.89 + + +def test_read_price_bars_returns_empty_dataframe_for_missing_range(tmp_path): + source = DataSource( + name="Yahoo", + provider_kind="yfinance_api", + requires_api_key=False, + ) + + instrument = Instrument( + symbol="EUR/USD", + name="EUR - USD Rate", + asset_class="fx", + base_currency="EUR", + quote_currency="USD", + ) + + pricebar = PriceBar( + source=source, + instrument=instrument, + timestamp=date(2026, 1, 1), + timeframe="1d", + close=1.89, + ) + + db = tmp_path / "test.duckdb" + initialize_database(db) + insert_price_bar(db, pricebar) + + result = read_price_bars( + db=db, + source=source, + instrument=instrument, + start_date=date(2027, 1, 1), + end_date=date(2027, 1, 31), + ) + + assert result.empty is True