forked from G123-jp/python_assignment
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathget_raw_data.py
More file actions
82 lines (72 loc) · 2.76 KB
/
get_raw_data.py
File metadata and controls
82 lines (72 loc) · 2.76 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
import datetime
from typing import List, Dict
import requests
import json
import os
import sqlalchemy
from sqlalchemy import inspect
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy.orm import Session
import os
from model import FinancialData
ALPHA_VANTAGE_API_KEY = "ALPHA_VANTAGE_API_KEY"
POSTGRES_URL = "POSTGRES_URL"
DEFAULT_TICKERS = ['IBM', 'AAPL']
def fetch_financial_data(req_api_key: str, ticker: str) -> List[Dict]:
"""
Queries the alpha vantage api and return a list of dictionary with keys
- symbol: str
- date: date
- open_price: float
- close_price: float
- volume: float
:param req_api_key: alpha vantage api key
:param ticker: the ticker to query for
:return: a list of dictionaries or if invalid input is provided, a key error may be raised.
"""
resp = requests.get(f"https://www.alphavantage.co/query?function=TIME_SERIES_DAILY_ADJUSTED&"
f"symbol={ticker}&apikey={req_api_key}")
resp.raise_for_status()
out_json = json.loads(resp.content.decode('utf8'))
records = []
time_series = out_json['Time Series (Daily)']
for record_date in time_series.keys():
record = time_series[record_date]
record_date = datetime.date.fromisoformat(record_date)
if record_date + datetime.timedelta(days=14) < datetime.datetime.now().date():
continue
records.append({
"symbol": ticker,
"date": record_date,
"open_price": float(record['1. open']),
"close_price": float(record['4. close']),
"volume": float(record['6. volume'])
})
return records
def write_to_db(records: List[Dict]):
"""
Writes the list of records to db
"""
engine = sqlalchemy.create_engine(os.environ[POSTGRES_URL], echo=True)
with Session(engine) as session:
for rec in records:
stmt = insert(FinancialData).values(**rec)
stmt = stmt.on_conflict_do_update(
constraint='unique_symbol_date',
index_where=FinancialData.symbol == rec['symbol'] and FinancialData.date == rec['date'],
set_=rec
)
session.execute(stmt)
session.commit()
if __name__ == '__main__':
if ALPHA_VANTAGE_API_KEY not in os.environ:
raise ValueError(f"Error, env var {ALPHA_VANTAGE_API_KEY} is not set.")
if POSTGRES_URL not in os.environ:
raise ValueError(f"Error, missing {POSTGRES_URL} in env var")
api_key = os.environ[ALPHA_VANTAGE_API_KEY]
try:
for ticker in DEFAULT_TICKERS:
entries = fetch_financial_data(api_key, ticker)
write_to_db(entries)
except KeyError:
raise ValueError("Error, please check your alpha vantage api key and ticker.")