forked from QuantConnect/lean-cli
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathlive_utils.py
More file actions
243 lines (202 loc) · 11.2 KB
/
live_utils.py
File metadata and controls
243 lines (202 loc) · 11.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
# QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
# Lean CLI v1.0. Copyright 2021 QuantConnect Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from click import prompt, confirm
from pathlib import Path
from typing import Any, Dict, List, Optional
from lean.components.api.api_client import APIClient
from lean.components.util.logger import Logger
from lean.models.errors import RequestFailedError
from lean.models.json_module import LiveInitialStateInput, JsonModule
from collections import UserDict
class InsensitiveCaseDict(UserDict):
def __getitem__(self, key: Any) -> Any:
if type(key) is str:
return super().__getitem__(key.lower())
return super().__getitem__(key)
def __setitem__(self, key: Any, item: Any) -> Any:
if type(key) is str:
self.data[key.lower()] = item
return
self.data[key] = item
def _get_last_portfolio(api_client: APIClient, project_id: str, project_name: Path) -> List[Dict[str, Any]]:
from os import listdir, path
from json import loads
from datetime import datetime, timezone
cloud_last_time = datetime.min.replace(tzinfo=timezone.utc)
if project_id:
try:
cloud_deployment = api_client.get("live/read", {"projectId": project_id})
if cloud_deployment["success"] and cloud_deployment["status"] != "Undefined":
if cloud_deployment["stopped"] is not None:
cloud_last_time = datetime.strptime(cloud_deployment["stopped"], "%Y-%m-%d %H:%M:%S")
else:
cloud_last_time = datetime.strptime(cloud_deployment["launched"], "%Y-%m-%d %H:%M:%S")
except RequestFailedError as e:
if "No live deployment found" not in str(e):
raise
cloud_last_time = datetime(cloud_last_time.year, cloud_last_time.month,
cloud_last_time.day, cloud_last_time.hour,
cloud_last_time.minute,
cloud_last_time.second,
tzinfo=timezone.utc)
local_last_time = datetime.min.replace(tzinfo=timezone.utc)
live_deployment_path = f"{project_name}/live"
if path.isdir(live_deployment_path):
local_deployment_time = [datetime.strptime(subdir, "%Y-%m-%d_%H-%M-%S").astimezone().astimezone(timezone.utc) for subdir in listdir(live_deployment_path)]
if local_deployment_time:
local_last_time = sorted(local_deployment_time, reverse = True)[0]
if cloud_last_time > local_last_time:
last_state = api_client.get("live/portfolio/read", {"projectId": project_id})
previous_portfolio_state = last_state["portfolio"]
elif cloud_last_time < local_last_time:
from lean.container import container
output_directory = container.output_config_manager.get_latest_output_directory("live")
if not output_directory:
return None
previous_state_file = get_latest_result_json_file(output_directory, True)
if not previous_state_file:
return None
previous_portfolio_state = {x.lower(): y for x, y in loads(open(previous_state_file, "r", encoding="utf-8").read()).items()}
else:
return None
return previous_portfolio_state
def get_last_portfolio_cash_holdings(api_client: APIClient, brokerage_instance: JsonModule, project_id: int, project: str):
"""Interactively obtain the portfolio state from the latest live deployment (both cloud/local)
:param api_client: the api instance
:param brokerage_instance: the brokerage
:param project_id: the cloud id of the project
:param project: the name of the project
:return: the options of initial cash/holdings setting, and the latest portfolio cash/holdings from the last deployment
"""
from lean.container import container
last_cash = {}
last_holdings = {}
container.logger.debug(f'brokerage_instance: {brokerage_instance}')
cash_balance_option = brokerage_instance._initial_cash_balance
holdings_option = brokerage_instance._initial_holdings
container.logger.debug(f'cash_balance_option: {cash_balance_option}')
container.logger.debug(f'holdings_option: {holdings_option}')
if cash_balance_option != LiveInitialStateInput.NotSupported or holdings_option != LiveInitialStateInput.NotSupported:
last_portfolio = _get_last_portfolio(api_client, project_id, project)
if last_portfolio is not None:
if "cash" in last_portfolio:
for key, value in last_portfolio["cash"].items():
last_cash[key] = InsensitiveCaseDict(value)
if "holdings" in last_portfolio:
for key, value in last_portfolio["holdings"].items():
new_dic = InsensitiveCaseDict(value)
new_dic["averagePrice"] = new_dic.get("averagePrice", new_dic.get("a", 0))
new_dic["quantity"] = new_dic.get("quantity", new_dic.get("q", 0))
new_dic["symbol"] = InsensitiveCaseDict(
new_dic.get("symbol", { "ID": key, "Value": key.split(' ')[0]}))
last_holdings[key] = new_dic
else:
last_cash = None
last_holdings = None
return cash_balance_option, holdings_option, last_cash, last_holdings
def _configure_initial_cash_interactively(logger: Logger, cash_input_option: LiveInitialStateInput, previous_cash_state: Dict[str, Any]) -> List[Dict[str, float]]:
cash_list = []
previous_cash_balance = []
if previous_cash_state:
for cash_state in previous_cash_state.values():
currency = cash_state["Symbol"]
amount = cash_state["Amount"]
previous_cash_balance.append({"currency": currency, "amount": amount})
if cash_input_option == LiveInitialStateInput.Required or confirm("Do you want to set the initial cash balance?", default=False):
if confirm(f"Do you want to use the last cash balance? {previous_cash_balance}", default=False):
return previous_cash_balance
continue_adding = True
while continue_adding:
logger.info("Setting initial cash balance...")
currency = prompt("Currency")
amount = prompt("Amount", type=float)
cash_list.append({"currency": currency, "amount": amount})
logger.info(f"Cash balance: {cash_list}")
if not confirm("Do you want to add more currency?", default=False):
continue_adding = False
return cash_list
else:
return []
def configure_initial_cash_balance(logger: Logger, cash_input_option: LiveInitialStateInput, live_cash_balance: str, previous_cash_state: Dict[str, Any])\
-> List[Dict[str, float]]:
"""Interactively configures the intial cash balance.
:param logger: the logger to use
:param cash_input_option: if the initial cash balance setting is optional/required
:param live_cash_balance: the initial cash balance option input
:param previous_cash_state: the dictionary containing cash balance in previous portfolio state
:return: the list of dictionary containing intial currency and amount information
"""
cash_list = []
if live_cash_balance or cash_input_option != LiveInitialStateInput.Required:
for cash_pair in [x for x in live_cash_balance.split(",") if x]:
currency, amount = cash_pair.split(":")
cash_list.append({"currency": currency, "amount": float(amount)})
return cash_list
else:
return _configure_initial_cash_interactively(logger, cash_input_option, previous_cash_state)
def _configure_initial_holdings_interactively(logger: Logger, holdings_option: LiveInitialStateInput, previous_holdings: Dict[str, Any]) -> List[Dict[str, float]]:
holdings = []
last_holdings = []
if previous_holdings:
for holding in previous_holdings.values():
symbol = holding["Symbol"]
quantity = int(holding["Quantity"])
avg_price = float(holding["AveragePrice"])
last_holdings.append({"symbol": symbol["Value"], "symbolId": symbol["ID"], "quantity": quantity, "averagePrice": avg_price})
if holdings_option == LiveInitialStateInput.Required or confirm("Do you want to set the initial portfolio holdings?", default=False):
if confirm(f"Do you want to use the last portfolio holdings? {last_holdings}", default=False):
return last_holdings
continue_adding = True
while continue_adding:
logger.info("Setting custom initial portfolio holdings...")
symbol = prompt("Symbol")
symbol_id = prompt("Symbol ID")
quantity = prompt("Quantity", type=int)
avg_price = prompt("Average Price", type=float)
holdings.append({"symbol": symbol, "symbolId": symbol_id, "quantity": quantity, "averagePrice": avg_price})
logger.info(f"Portfolio Holdings: {holdings}")
if not confirm("Do you want to add more holdings?", default=False):
continue_adding = False
return holdings
else:
return []
def configure_initial_holdings(logger: Logger, holdings_option: LiveInitialStateInput, live_holdings: str, previous_holdings: Dict[str, Any])\
-> List[Dict[str, float]]:
"""Interactively configures the intial portfolio holdings.
:param logger: the logger to use
:param holdings_option: if the initial portfolio holdings setting is optional/required
:param live_holdings: the initial portfolio holdings option input
:param previous_holdings: the dictionary containing portfolio holdings in previous portfolio state
:return: the list of dictionary containing intial symbol, symbol id, quantity, and average price information
"""
holdings = []
if live_holdings or holdings_option != LiveInitialStateInput.Required:
for holding in [x for x in live_holdings.split(",") if x]:
symbol, symbol_id, quantity, avg_price = holding.split(":")
holdings.append({"symbol": symbol, "symbolId": symbol_id, "quantity": int(quantity), "averagePrice": float(avg_price)})
return holdings
else:
return _configure_initial_holdings_interactively(logger, holdings_option, previous_holdings)
def get_latest_result_json_file(output_directory: Path, is_live_trading: bool = False) -> Optional[Path]:
from lean.container import container
output_config_manager = container.output_config_manager
output_id = output_config_manager.get_output_id(output_directory)
if output_id is None:
return None
prefix = ""
if is_live_trading:
prefix = "L-"
result_file = output_directory / f"{prefix}{output_id}.json"
if not result_file.exists():
return None
return result_file