-
Notifications
You must be signed in to change notification settings - Fork 158
Expand file tree
/
Copy pathcli.py
More file actions
303 lines (248 loc) · 9.38 KB
/
cli.py
File metadata and controls
303 lines (248 loc) · 9.38 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
"""
Responsible for managing spec and routing commands to operations.
"""
import contextlib
import json
import os
import pickle
import sys
from json import JSONDecodeError
from sys import version_info
from typing import IO, Any, ContextManager, Dict
import requests
import yaml
from openapi3 import OpenAPI
from linodecli.api_request import do_request, get_all_pages
from linodecli.baked import OpenAPIOperation
from linodecli.configuration import CLIConfig
from linodecli.exit_codes import ExitCodes
from linodecli.output.output_handler import OutputHandler, OutputMode
METHODS = ("get", "post", "put", "delete")
class CLI: # pylint: disable=too-many-instance-attributes
"""
Responsible for loading or baking a spec and handling incoming commands
"""
def __init__(self, version, base_url, skip_config=False):
self.ops = {}
self.spec = {}
self.defaults = True # whether to use default values for arguments
self.pagination = True
self.page = 1
self.page_size = 100
self.debug_request = False
self.version = version
self.base_url = base_url
self.spec_version = "None"
self.suppress_warnings = False
self.output_handler = OutputHandler()
self.config = CLIConfig(self.base_url, skip_config=skip_config)
self.load_baked()
def bake(self, spec_location: str):
"""
Generates ops and bakes them to a pickle.
:param spec_location: The URL or file path of the OpenAPI spec to parse.
"""
try:
spec = self._load_openapi_spec(spec_location)
except Exception as e:
print(f"Failed to load spec: {e}")
sys.exit(ExitCodes.REQUEST_FAILED)
self.spec = spec
self.ops = {}
ext = {
"skip": "linode-cli-skip",
"action": "linode-cli-action",
"command": "linode-cli-command",
"defaults": "linode-cli-allowed-defaults",
}
for path in spec.paths.values():
command = path.extensions.get(ext["command"], "default")
for m in METHODS:
operation = getattr(path, m)
if operation is None or ext["skip"] in operation.extensions:
continue
action = operation.extensions.get(
ext["action"], operation.operationId
)
if not action:
continue
if isinstance(action, list):
action = action[0]
if command not in self.ops:
self.ops[command] = {}
self.ops[command][action] = OpenAPIOperation(
command, operation, m, path.parameters
)
# hide the base_url from the spec away
self.ops["_base_url"] = self.spec.servers[0].url
self.ops["_spec_version"] = self.spec.info.version
self.ops["_spec"] = self.spec
# finish the baking
data_file = self._get_data_file()
with open(data_file, "wb") as f:
pickle.dump(self.ops, f)
def load_baked(self):
"""
Loads a baked spec representation from a baked pickle
"""
data_file = self._get_data_file()
data_path = os.path.join(
os.path.dirname(os.path.realpath(__file__)), data_file
)
if os.path.exists(data_path):
with open(data_path, "rb") as f:
self.ops = pickle.load(f)
if "_base_url" in self.ops:
self.base_url = self.ops.pop("_base_url")
if "_spec_version" in self.ops:
self.spec_version = self.ops.pop("_spec_version")
if "_spec" in self.ops:
self.spec = self.ops.pop("_spec")
else:
print(
"No spec baked. Please bake by calling this script as follows:"
)
print(" python3 gen_cli.py bake /path/to/spec")
self.ops = None # this signals __init__.py to give up
def _get_data_file(self):
"""
Returns the name of the baked data file this program wants. This is in
part based on python version.
"""
return f"data-{version_info[0]}"
def handle_command(self, command, action, args):
"""
Given a command, action, and remaining kwargs, finds and executes the
action
"""
try:
operation = self.find_operation(command, action)
except ValueError as e:
print(e, file=sys.stderr)
sys.exit(ExitCodes.REQUEST_FAILED)
if not self.pagination:
result = get_all_pages(self, operation, args)
else:
result = do_request(self, operation, args).json()
operation.process_response_json(result, self.output_handler)
if (
self.output_handler.mode == OutputMode.table
and "pages" in result
and result["pages"] > 1
):
print(
f"Page {result['page']} of {result['pages']}. "
"Call with --page [PAGE] to load a different page."
)
def configure(self):
"""
Reconfigure the application
"""
self.config.configure()
def call_operation(self, command, action, args=None, filters=None):
"""
This function is used in plugins to retrieve the result of CLI operations
in JSON format. This uses the configured user of the CLI.
:param filters: The X-Filter header to include in the request. This overrides
whatever is passed into to command as filters.
:type filters: dict
"""
if args is None:
args = []
if command not in self.ops or action not in self.ops[command]:
raise ValueError(f"Unknown command/action {command}/{action}")
operation = self.ops[command][action]
result = do_request(
self,
operation,
args,
filter_header=filters,
skip_error_handling=True,
)
return result.status_code, result.json()
def find_operation(self, command, action):
"""
Finds the corresponding operation for the given command and action.
"""
if command not in self.ops:
raise ValueError(f"Command not found: {command}")
command_dict = self.ops[command]
if action in command_dict:
return command_dict[action]
# Find the matching alias
for op in command_dict.values():
if action in op.action_aliases:
return op
# Fail if no matching alias was found
raise ValueError(f"No action {action} for command {command}")
@property
def user_agent(self) -> str:
"""
Returns the User-Agent to use when making API requests.
"""
return (
f"linode-cli/{self.version} "
f"linode-api-docs/{self.spec_version} "
f"python/{version_info[0]}.{version_info[1]}.{version_info[2]}"
)
@staticmethod
def _load_openapi_spec(spec_location: str) -> OpenAPI:
"""
Attempts to load the raw OpenAPI spec (YAML or JSON) at the given location.
:param spec_location: The location of the OpenAPI spec.
This can be a local path or a URL.
:returns: A tuple containing the loaded OpenAPI object and the parsed spec in
dict format.
"""
with CLI._get_spec_file_reader(spec_location) as f:
parsed = CLI._parse_spec_file(f)
return OpenAPI(parsed)
@staticmethod
@contextlib.contextmanager
def _get_spec_file_reader(
spec_location: str,
) -> ContextManager[IO]:
"""
Returns a reader for an OpenAPI spec file from the given location.
:param spec_location: The location of the OpenAPI spec.
This can be a local path or a URL.
:returns: A context manager yielding the spec file's reader.
"""
# Case for local file
local_path = os.path.expanduser(spec_location)
if os.path.exists(local_path):
f = open(local_path, "r", encoding="utf-8")
try:
yield f
finally:
f.close()
return
# Case for remote file
resp = requests.get(spec_location, stream=True, timeout=120)
if resp.status_code != 200:
raise RuntimeError(f"Failed to GET {spec_location}")
# We need to access the underlying urllib
# response here so we can return a reader
# usable in yaml.safe_load(...) and json.load(...)
resp.raw.decode_content = True
try:
yield resp.raw
finally:
resp.close()
@staticmethod
def _parse_spec_file(reader: IO) -> Dict[str, Any]:
"""
Parses the given file reader into a dict and returns a dict.
:param reader: A reader for a YAML or JSON file.
:returns: The parsed file.
"""
errors = []
try:
return yaml.safe_load(reader)
except yaml.YAMLError as err:
errors.append(str(err))
try:
return json.load(reader)
except JSONDecodeError as err:
errors.append(str(err))
raise ValueError(f"Failed to parse spec file: {'; '.join(errors)}")