forked from QuantConnect/lean-cli
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathjson_module.py
More file actions
379 lines (332 loc) · 20.1 KB
/
json_module.py
File metadata and controls
379 lines (332 loc) · 20.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
# QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
# Lean CLI v1.0. Copyright 2021 QuantConnect Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from enum import Enum
from typing import Any, Dict, List, Type
from click import get_current_context
from click.core import ParameterSource
from lean.components.util.auth0_helper import get_authorization
from lean.components.util.logger import Logger
from lean.constants import MODULE_TYPE, MODULE_PLATFORM, MODULE_CLI_PLATFORM
from lean.container import container
from lean.models.configuration import BrokerageEnvConfiguration, Configuration, InternalInputUserInput, \
PathParameterUserInput, AuthConfiguration, ChoiceUserInput
from copy import copy
from abc import ABC
_logged_messages = set()
class JsonModule(ABC):
"""The JsonModule class is the base class extended for all json modules."""
def __init__(self, json_module_data: Dict[str, Any], module_type: str, platform: str) -> None:
self._module_type: str = module_type
self._platform: str = platform
self._product_id: int = json_module_data["product-id"] if "product-id" in json_module_data else 0
self._id: str = json_module_data["id"]
self._display_name: str = json_module_data["display-id"]
self._specifications_url: str = json_module_data["specifications"] if "specifications" in json_module_data else None
self._installs: bool = json_module_data["installs"] if ("installs" in json_module_data
and platform == MODULE_CLI_PLATFORM) else False
self._lean_configs: List[Configuration] = []
for config in json_module_data["configurations"]:
self._lean_configs.append(Configuration.factory(config))
self._lean_configs = self.sort_configs()
self._is_module_installed: bool = False
self._initial_cash_balance: LiveInitialStateInput = LiveInitialStateInput(
json_module_data["live-cash-balance-state"]) \
if "live-cash-balance-state" in json_module_data \
else None
self._initial_holdings: LiveInitialStateInput = LiveInitialStateInput(json_module_data["live-holdings-state"]) \
if "live-holdings-state" in json_module_data \
else False
self._minimum_seat = json_module_data["minimum-seat"] if "minimum-seat" in json_module_data else None
def get_id(self):
return self._id
def sort_configs(self) -> List[Configuration]:
sorted_configs = []
filter_configs = []
brokerage_configs = []
for config in self._lean_configs:
if isinstance(config, BrokerageEnvConfiguration):
brokerage_configs.append(config)
else:
if config.has_filter_dependency:
filter_configs.append(config)
else:
sorted_configs.append(config)
return brokerage_configs + sorted_configs + filter_configs
def get_name(self) -> str:
"""Returns the user-friendly name which users can identify this object by.
:return: the user-friendly name to display to users
"""
return self._display_name
def _check_if_config_passes_filters(self, config: Configuration, all_for_platform_type: bool) -> bool:
for condition in config._filter._conditions:
if condition._dependent_config_id == MODULE_TYPE:
target_value = self._module_type
elif condition._dependent_config_id == MODULE_PLATFORM:
target_value = self._platform
else:
if all_for_platform_type:
# skip, we want all configurations that match type and platform, for help
continue
target_value = self.get_config_value_from_name(condition._dependent_config_id)
if not target_value:
return False
elif isinstance(target_value, dict):
return all(condition.check(value) for value in target_value.values())
elif not condition.check(target_value):
return False
return True
def get_config_value_from_name(self, target_name: str) -> str:
[idx] = [i for i in range(len(self._lean_configs))
if self._lean_configs[i]._id == target_name]
return self._lean_configs[idx]._value
def is_value_in_config(self, searched_value: str) -> bool:
searched_value = searched_value.lower()
for i in range(len(self._lean_configs)):
value = self._lean_configs[i]._value
if isinstance(value, str):
value = value.lower()
if isinstance(value, list):
value = [x.lower() for x in value]
if searched_value in value:
return True
return False
def get_settings(self) -> Dict[str, str]:
settings: Dict[str, str] = {"id": self._id}
# we build these after the rest, because they might depend on their values
for config in self._lean_configs:
if type(config) is InternalInputUserInput:
if config._is_conditional:
for option in config._value_options:
if option._condition.check(self.get_config_value_from_name(option._condition._dependent_config_id)):
config._value = option._value
break
if not config._value:
options_to_log = set([(opt._condition._dependent_config_id,
self.get_config_value_from_name(opt._condition._dependent_config_id))
for opt in config._value_options])
raise ValueError(
f'No condition matched among present options for "{config._id}". '
f'Please review ' +
', '.join([f'"{x[0]}"' for x in options_to_log]) +
f' given value{"s" if len(options_to_log) > 1 else ""} ' +
', '.join([f'"{x[1]}"' for x in options_to_log]))
for configuration in self._lean_configs:
if not self._check_if_config_passes_filters(configuration, all_for_platform_type=False):
continue
if isinstance(configuration, AuthConfiguration) and isinstance(configuration._value, dict):
for key, value in configuration._value.items():
settings[key] = str(value)
else:
# Replace escaped newline characters and backslashes in the configuration value.
# When reading the JSON configuration through Python, newline characters ('\n') and backslashes ('\')
# may be escaped, causing issues in scenarios where these characters are expected to be interpreted
# literally (e.g., file paths, multi-line strings). This replace operation ensures that:
# 1. Escaped newline characters ('\\n') are correctly interpreted as actual newlines ('\n').
# 2. Backslashes ('\\') in file paths are converted to forward slashes ('/'), making paths
# more consistent across different operating systems.
settings[configuration._id] = str(configuration._value).replace("\\n", "\n").replace("\\", "/")
return settings
def get_all_input_configs(self, filters: List[Type[Configuration]] = []) -> List[Configuration]:
return [copy(config) for config in self._lean_configs if config._is_required_from_user
if not isinstance(config, tuple(filters))
and self._check_if_config_passes_filters(config, all_for_platform_type=True)]
def convert_lean_key_to_variable(self, lean_key: str) -> str:
"""Replaces hyphens with underscore to follow python naming convention.
:param lean_key: string that uses hyphnes as separator. Used in lean config
"""
return lean_key.replace('-', '_')
def convert_variable_to_lean_key(self, variable_key: str) -> str:
"""Replaces underscore with hyphens to follow lean config naming convention.
:param variable_key: string that uses underscore as separator as per python convention.
"""
return variable_key.replace('_', '-')
def get_user_name(self, lean_config: Dict[str, Any], configuration, user_provided_options: Dict[str, Any], require_user_name: bool) -> str:
"""Retrieve the user name, prompting the user if required and not already set.
:param lean_config: The Lean config dict to read defaults from.
:param configuration: The AuthConfiguration instance.
:param user_provided_options: Options passed as command-line arguments.
:param require_user_name: Flag to determine if prompting is necessary.
:return: The user name, or None if not required.
"""
if not require_user_name:
return None
from click import prompt
user_name_key = configuration._id.replace("-oauth-token", "") + "-user-name"
user_name_variable = self.convert_lean_key_to_variable(user_name_key)
if user_name_variable in user_provided_options and user_provided_options[user_name_variable]:
return user_provided_options[user_name_variable]
if lean_config and lean_config.get(user_name_key):
return lean_config[user_name_key]
user_name = prompt("Please enter your Login ID to proceed with Auth0 authentication",
show_default=False)
if lean_config is not None:
lean_config[user_name_key] = user_name
return user_name
def get_project_id(self, default_project_id: int, require_project_id: bool) -> int:
"""Retrieve the project ID, prompting the user if required and default is invalid.
:param default_project_id: The default project ID to use.
:param require_project_id: Flag to determine if prompting is necessary.
:return: A valid project ID.
"""
from click import prompt
project_id: int = default_project_id
if require_project_id and project_id <= 0:
project_id = prompt("Please enter any cloud project ID to proceed with Auth0 authentication",
-1, show_default=False)
return project_id
def config_build(self,
lean_config: Dict[str, Any],
logger: Logger,
interactive: bool,
properties: Dict[str, Any] = {},
hide_input: bool = False,
environment_name: str = None,
no_browser: bool = False) -> 'JsonModule':
"""Builds a new instance of this class, prompting the user for input when necessary.
:param lean_config: the Lean configuration dict to read defaults from
:param logger: the logger to use
:param interactive: true if running in interactive mode
:param properties: the properties that passed as options
:param hide_input: whether to hide secrets inputs
:param environment_name: the target environment name
:param no_browser: whether to disable opening the browser
:return: self
"""
logger.debug(f'Configuring {self._display_name}')
# filter properties that were not passed as command line arguments,
# so that we prompt the user for them only when they don't have a value in the Lean config
context = get_current_context()
user_provided_options = {k: v for k, v in properties.items()
if context.get_parameter_source(k) == ParameterSource.COMMANDLINE}
missing_options = []
for configuration in self._lean_configs:
if not self._check_if_config_passes_filters(configuration, all_for_platform_type=False):
continue
if not configuration._is_required_from_user:
continue
# Let's log messages for internal input configurations as well
if configuration._log_message is not None:
log_message = configuration._log_message.strip()
if log_message and log_message not in _logged_messages:
logger.info(log_message)
# make sure we log these messages once, we could use the same module for different functionalities
_logged_messages.add(log_message)
if type(configuration) is InternalInputUserInput:
continue
if isinstance(configuration, ChoiceUserInput) and len(configuration._choices) == 0:
logger.debug(f"skipping configuration '{configuration._id}': no choices available.")
continue
elif isinstance(configuration, AuthConfiguration):
lean_config["project-id"] = self.get_project_id(lean_config["project-id"],
configuration.require_project_id)
logger.debug(f'project_id: {lean_config["project-id"]}')
user_name = self.get_user_name(lean_config, configuration, user_provided_options,
configuration.require_user_name)
logger.debug(f'user_name: {user_name}')
auth_authorizations = get_authorization(container.api_client.auth0, self._display_name.lower(),
logger, lean_config["project-id"], no_browser=no_browser,
user_name=user_name)
logger.debug(f'auth: {auth_authorizations}')
configuration._value = auth_authorizations.get_authorization_config_without_account()
for inner_config in self._lean_configs:
if any(condition._dependent_config_id == configuration._id for condition in
inner_config._filter._conditions):
api_account_ids = auth_authorizations.get_account_ids()
config_dash = inner_config._id.replace('-', '_')
inner_config._choices = api_account_ids
if user_provided_options and config_dash in user_provided_options:
user_provide_account_id = user_provided_options[config_dash]
if (api_account_ids and len(api_account_ids) > 0 and
not any(account_id.lower() == user_provide_account_id.lower()
for account_id in api_account_ids)):
raise ValueError(f"The provided account id '{user_provide_account_id}' is not valid, "
f"available: {api_account_ids}")
existing_account = lean_config.get(inner_config._id)
if existing_account and (existing_account not in api_account_ids
or len(api_account_ids) > 1):
# Clear stale or ambiguous account so the user is prompted
# to select from the current API choices
lean_config.pop(inner_config._id)
break
continue
property_name = self.convert_lean_key_to_variable(configuration._id)
# Only ask for user input if the config wasn't given as an option
if property_name in user_provided_options and user_provided_options[property_name]:
user_choice = user_provided_options[property_name]
logger.debug(
f'JsonModule({self._display_name}): user provided \'{user_choice}\' for \'{property_name}\'')
else:
logger.debug(f'JsonModule({self._display_name}): Configuration not provided \'{configuration._id}\'')
user_choice = self.get_default(lean_config, configuration._id, environment_name, logger)
# There's no value in the lean config, let's use the module default value instead and prompt the user
# NOTE: using "not" instead of "is None" because the default value can be false,
# in which case we still want to prompt the user.
if not user_choice:
if interactive:
default_value = configuration._input_default
user_choice = configuration.ask_user_for_input(default_value, logger, hide_input=hide_input)
if not isinstance(configuration, BrokerageEnvConfiguration):
self._save_property({f"{configuration._id}": user_choice})
else:
if configuration._input_default != None and configuration._optional:
# if optional and we have a default input value and the user didn't provider it we use it
user_choice = configuration._input_default
else:
missing_options.append(f"--{configuration._id}")
configuration._value = user_choice
if len(missing_options) > 0:
raise RuntimeError(f"""You are missing the following option{"s" if len(missing_options) > 1 else ""}: {', '
.join(missing_options)}""".strip())
return self
def get_paths_to_mount(self) -> Dict[str, str]:
return {config._id: config._value
for config in self._lean_configs
if (isinstance(config, PathParameterUserInput)
and self._check_if_config_passes_filters(config, all_for_platform_type=False))}
def ensure_module_installed(self, organization_id: str, module_version: str) -> None:
"""
Ensures that the specified module is installed. If the module is not installed, it will be installed.
Args:
organization_id (str): The ID of the organization where the module should be installed.
module_version (str): The version of the module to install. If not provided,
the latest version will be installed.
Returns:
None
"""
if not self._is_module_installed and self._installs:
container.logger.debug(f"JsonModule.ensure_module_installed(): installing module {self}: {self._product_id}")
container.module_manager.install_module(self._product_id, organization_id, module_version)
self._is_module_installed = True
def get_default(self, lean_config: Dict[str, Any], key: str, environment_name: str, logger: Logger):
user_choice = None
if lean_config is not None:
if (environment_name and "environments" in lean_config and environment_name in lean_config["environments"]
and key in lean_config["environments"][environment_name]):
user_choice = lean_config["environments"][environment_name][key]
logger.debug(f'JsonModule({self._display_name}): found \'{user_choice}\' for \'{key}\', in environment')
elif key in lean_config:
user_choice = lean_config[key]
logger.debug(f'JsonModule({self._display_name}): found \'{user_choice}\' for \'{key}\'')
return user_choice
def __repr__(self):
return self.get_name()
def _save_property(self, settings: Dict[str, Any]):
from lean.container import container
container.lean_config_manager.set_properties(settings)
@property
def specifications_url(self):
return self._specifications_url
class LiveInitialStateInput(str, Enum):
Required = "required"
Optional = "optional"
NotSupported = "not-supported"