-
Notifications
You must be signed in to change notification settings - Fork 158
Expand file tree
/
Copy pathget-kubeconfig.py
More file actions
186 lines (150 loc) · 5.58 KB
/
get-kubeconfig.py
File metadata and controls
186 lines (150 loc) · 5.58 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
"""
This plugin allows the option to merge an LKE Cluster's kubeconfig to a desired location.
Usage:
linode-cli get-kubeconfig --id <cluster-id> --kubeconfig /path/to/config/file
"""
import argparse
import base64
import sys
from pathlib import Path
import yaml
from linodecli.exit_codes import ExitCodes
from linodecli.help_formatter import SortingHelpFormatter
from linodecli.plugins import inherit_plugin_args
PLUGIN_BASE = "linode-cli get-kubeconfig"
def call(args, context):
"""
The entrypoint for this plugin
"""
parser = inherit_plugin_args(
argparse.ArgumentParser(
PLUGIN_BASE, add_help=True, formatter_class=SortingHelpFormatter
)
)
group = parser.add_mutually_exclusive_group()
parser.add_argument(
"--kubeconfig",
metavar="KUBECONFIG",
nargs="?",
help="Path to kubeconfig file. If omitted, ~/.kube/config will be used.",
default="~/.kube/config",
)
parser.add_argument(
"--dry-run",
default=False,
const=True,
nargs="?",
help="Print resulting merged kubeconfig instead of merging.",
)
group.add_argument(
"--label",
metavar="LABEL",
nargs="?",
help="Label for desired cluster. If omitted, the ID must be provided.",
)
group.add_argument(
"--id",
metavar="ID",
nargs="?",
help="ID for desired cluster. If omitted, the Label must be provided.",
)
parsed = parser.parse_args(args)
# If --id was used, fetch the kubeconfig using the provided id
if parsed.id:
code, kubeconfig = context.client.call_operation(
"lke", "kubeconfig-view", args=[parsed.id]
)
if code != 200:
print(f"Error retrieving kubeconfig: {code}", file=sys.stderr)
sys.exit(ExitCodes.KUBECONFIG_ERROR)
# If --label was used, fetch the kubeconfig using the provided label
elif parsed.label:
kubeconfig = _get_kubeconfig_by_label(parsed.label, context.client)
else:
print("Either --label or --id must be used.", file=sys.stderr)
sys.exit(ExitCodes.KUBECONFIG_ERROR)
# Load the specified cluster's kubeconfig and the current kubeconfig
cluster_config = yaml.safe_load(
base64.b64decode(kubeconfig["kubeconfig"]).decode()
)
current_config = None
kubeconfig_path = Path(parsed.kubeconfig).expanduser()
if kubeconfig_path.exists():
current_config = _load_config(kubeconfig_path)
# If there is no current kubeconfig, dump the cluster config to the specified file location.
# If there is a current kubeconfig, merge it with the cluster's kubeconfig
cluster_config = (
_merge_dict(current_config, cluster_config)
if current_config is not None
else cluster_config
)
if parsed.dry_run:
print(yaml.dump(cluster_config))
else:
_dump_config(kubeconfig_path, cluster_config)
# Fetches the kubeconfig of the lke cluster with the specified label
def _get_kubeconfig_by_label(cluster_label, client):
"""
Returns the LKE Cluster with the given Label
"""
code, cluster = client.call_operation(
"lke", "clusters-list", args=["--label", cluster_label]
)
if code != 200:
print(f"Error retrieving cluster: {code}", file=sys.stderr)
sys.exit(ExitCodes.KUBECONFIG_ERROR)
if len(cluster["data"]) == 0:
print(
f"Cluster with label {cluster_label} does not exist.",
file=sys.stderr,
)
sys.exit(ExitCodes.KUBECONFIG_ERROR)
code, kubeconfig = client.call_operation(
"lke", "kubeconfig-view", args=[str(cluster["data"][0]["id"])]
)
if code != 200:
print(f"Error retrieving kubeconfig: {code}", file=sys.stderr)
sys.exit(ExitCodes.KUBECONFIG_ERROR)
return kubeconfig
# Loads a yaml file
def _load_config(filepath):
with open(filepath, "r", encoding="utf-8") as file_descriptor:
data = yaml.safe_load(file_descriptor)
if not data:
print(f"Could not load file at {filepath}", file=sys.stderr)
sys.exit(ExitCodes.KUBECONFIG_ERROR)
return data
# Dumps data to a yaml file
def _dump_config(filepath, data):
Path.mkdir(filepath.parent, exist_ok=True)
with open(filepath, "w", encoding="utf-8") as file_descriptor:
yaml.dump(data, file_descriptor)
def _merge_dict(dict_1, dict_2):
"""
Merges two dicts:
* Lists that are present in both dicts are merged together by their "name" key
(preferring duplicate values in the first dict)
* `None` or missing keys in the first dict are set to the second dict's value
* Other values are preferred from the first dict
"""
# Return a new dict to prevent any accidental mutations
result = {}
for key, dict_1_value in dict_1.items():
if dict_1_value is None and (dict_2_value := dict_2.get(key)):
# Replace null value in previous config
result[key] = dict_2_value
elif isinstance(dict_1_value, list) and (
dict_2_value := dict_2.get(key)
):
merge_map = {sub["name"]: sub for sub in dict_1_value}
for list_2_item in dict_2_value:
if (list_2_name := list_2_item["name"]) not in merge_map:
merge_map[list_2_name] = list_2_item
# Convert back to a list
result[key] = list(merge_map.values())
else:
result[key] = dict_1_value
# Process keys missing in dict_1
for key in set(dict_2.keys()).difference(dict_1.keys()):
result[key] = dict_2[key]
return result