-
Notifications
You must be signed in to change notification settings - Fork 87
Expand file tree
/
Copy pathcli.py
More file actions
executable file
·292 lines (252 loc) · 14 KB
/
cli.py
File metadata and controls
executable file
·292 lines (252 loc) · 14 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
import sys
import argparse
import time
import shutil
from .config import app_config
from .lib.logger import logger
from .lib.aws import is_asg_scaled, is_asg_healthy, instance_terminated, get_asg_tag, modify_aws_autoscaling, \
count_all_cluster_instances, save_asg_tags, get_asgs, scale_asg, plan_asgs, terminate_instance_in_asg, delete_asg_tags, plan_asgs_older_nodes
from .lib.k8s import k8s_nodes_count, k8s_nodes_ready, get_k8s_nodes, modify_k8s_autoscaler, get_node_by_instance_id, \
drain_node, delete_node, cordon_node, taint_node
from .lib.exceptions import RollingUpdateException
def validate_cluster_health(asg_name, new_desired_asg_capacity, cluster_name, predictive, health_check_type="regular",):
cluster_health_retry = app_config['CLUSTER_HEALTH_RETRY']
cluster_health_wait = app_config['CLUSTER_HEALTH_WAIT']
retry_count = 0
while retry_count < cluster_health_retry:
retry_count += 1
if health_check_type == "asg":
logger.info(f'Waiting for {cluster_health_wait} seconds for ASG to scale before validating cluster health...')
else:
logger.info(f'Waiting for {cluster_health_wait} seconds before validating cluster health...')
time.sleep(cluster_health_wait)
# check if asg has enough nodes first before checking instance health
if not is_asg_scaled(asg_name, new_desired_asg_capacity):
logger.info(f'Validation failed for asg {asg_name}. Not enough instances online.')
continue
# wait and check for instances in ASG to become healthy
if not is_asg_healthy(asg_name):
logger.info(f'Validation failed for asg {asg_name}. Some instances not yet healthy.')
continue
# wait and check for desired amount of k8s nodes to come online within the cluster
desired_k8s_node_count = count_all_cluster_instances(cluster_name, predictive=predictive)
if not k8s_nodes_count(desired_k8s_node_count):
logger.info(f'Validation failed for cluster {cluster_name}. Didn\'t reach expected node count {desired_k8s_node_count}.')
continue
# Wait and check for nodes to become ready
if not k8s_nodes_ready():
logger.info('Validation failed for cluster. Expected node count reached but nodes are not ready.')
continue
logger.info('Cluster validation passed. Proceeding with node draining and termination...')
return
logger.info(f'Exiting since ASG healthcheck failed after {cluster_health_retry} attempts')
raise Exception('ASG healthcheck failed')
def scale_up_asg(cluster_name, asg, count):
asg_old_max_size = asg['MaxSize']
asg_old_desired_capacity = asg['DesiredCapacity'] + app_config['ASG_OVERSCALE_INSTANCES']
desired_capacity = asg_old_desired_capacity + count
asg_tags = asg['Tags']
asg_name = asg['AutoScalingGroupName']
current_capacity = None
# remove any stale suspensions from asg that may be present
modify_aws_autoscaling(asg_name, "resume")
use_asg_termination_policy = app_config['ASG_USE_TERMINATION_POLICY']
batch_size = app_config['BATCH_SIZE']
asg_tag_desired_capacity = get_asg_tag(asg_tags, app_config["ASG_DESIRED_STATE_TAG"])
asg_tag_orig_capacity = get_asg_tag(asg_tags, app_config["ASG_ORIG_CAPACITY_TAG"])
asg_tag_orig_max_capacity = get_asg_tag(asg_tags, app_config["ASG_ORIG_MAX_CAPACITY_TAG"])
if desired_capacity == asg_old_desired_capacity:
logger.info(f'Desired and current capacity for {asg_name} are equal. Skipping ASG.')
if asg_tag_desired_capacity.get('Value'):
logger.info('Found capacity tags on ASG from previous run. Leaving alone.')
return int(asg_tag_desired_capacity.get('Value')), int(asg_tag_orig_capacity.get(
'Value')), int(asg_tag_orig_max_capacity.get('Value'))
else:
save_asg_tags(asg_name, app_config["ASG_ORIG_CAPACITY_TAG"], asg_old_desired_capacity)
save_asg_tags(asg_name, app_config["ASG_DESIRED_STATE_TAG"], asg_old_desired_capacity)
save_asg_tags(asg_name, app_config["ASG_ORIG_MAX_CAPACITY_TAG"], asg_old_max_size)
return asg_old_desired_capacity, asg_old_desired_capacity, asg_old_max_size
# True: use ASG's 'DesiredCapacity' to count the instances
# False: use Instances list to count the instances
predictive = True if use_asg_termination_policy else False
# only scale up if no previous desired capacity tag set
if asg_tag_desired_capacity.get('Value'):
logger.info('Found previous desired capacity value tag set on asg from a previous run.')
logger.info(f'Maintaining previous capacity of {asg_old_desired_capacity} to not overscale.')
# check cluster health before doing anything
validate_cluster_health(
asg_name,
int(asg_tag_desired_capacity.get('Value')),
cluster_name,
predictive
)
return int(asg_tag_desired_capacity.get('Value')), int(asg_tag_orig_capacity.get(
'Value')), int(asg_tag_orig_max_capacity.get('Value'))
else:
logger.info('No previous capacity value tags set on ASG; setting tags.')
save_asg_tags(asg_name, app_config["ASG_ORIG_CAPACITY_TAG"], asg_old_desired_capacity)
save_asg_tags(asg_name, app_config["ASG_DESIRED_STATE_TAG"], desired_capacity)
save_asg_tags(asg_name, app_config["ASG_ORIG_MAX_CAPACITY_TAG"], asg_old_max_size)
old_desired_capacity = asg_old_desired_capacity
while True:
if batch_size:
if current_capacity is None:
current_capacity = old_desired_capacity
else:
old_desired_capacity = current_capacity
current_capacity += batch_size
if current_capacity >= desired_capacity:
current_capacity = desired_capacity
else:
current_capacity = desired_capacity
# only change the max size if the new capacity is bigger than current max
if current_capacity > asg_old_max_size:
scale_asg(asg_name, old_desired_capacity, current_capacity, current_capacity)
else:
scale_asg(asg_name, old_desired_capacity, current_capacity, asg_old_max_size)
# check cluster health before doing anything
validate_cluster_health(
asg_name,
current_capacity,
cluster_name,
predictive,
health_check_type="asg"
)
if current_capacity == desired_capacity:
break
logger.info('Proceeding with node draining and termination...')
return desired_capacity, asg_old_desired_capacity, asg_old_max_size
def update_asgs(asgs, cluster_name):
run_mode = app_config['RUN_MODE']
use_asg_termination_policy = app_config['ASG_USE_TERMINATION_POLICY']
if run_mode == 4:
asg_outdated_instance_dict = plan_asgs_older_nodes(asgs)
else:
asg_outdated_instance_dict = plan_asgs(asgs)
asg_state_dict = {}
if run_mode == 2:
# Scale up all the ASGs with outdated nodes (by the number of outdated nodes)
for asg_name, asg_tuple in asg_outdated_instance_dict.items():
outdated_instances, asg = asg_tuple
outdated_instance_count = len(outdated_instances)
logger.info(
f'Setting the scale of ASG {asg_name} based on {outdated_instance_count} outdated instances.')
asg_state_dict[asg_name] = scale_up_asg(cluster_name, asg, outdated_instance_count)
k8s_nodes = get_k8s_nodes()
if (run_mode == 2) or (run_mode == 3):
for asg_name, asg_tuple in asg_outdated_instance_dict.items():
outdated_instances, asg = asg_tuple
for outdated in outdated_instances:
node_name = ""
try:
# get the k8s node name instead of instance id
node_name = get_node_by_instance_id(k8s_nodes, outdated['InstanceId'])
if not app_config["TAINT_NODES"]:
cordon_node(node_name)
else:
taint_node(node_name)
except Exception as exception:
logger.error(f"Encountered an error when adding taint/cordoning node {node_name}")
logger.error(exception)
exit(1)
# Drain, Delete and Terminate the outdated nodes and return the ASGs back to their original state
for asg_name, asg_tuple in asg_outdated_instance_dict.items():
outdated_instances, asg = asg_tuple
outdated_instance_count = len(outdated_instances)
if (run_mode == 1) or (run_mode == 3) or (run_mode == 4):
logger.info(
f'Setting the scale of ASG {asg_name} based on {outdated_instance_count} outdated instances.')
asg_state_dict[asg_name] = scale_up_asg(cluster_name, asg, outdated_instance_count)
if (run_mode == 1) or (run_mode == 4):
for outdated in outdated_instances:
node_name = ""
try:
# get the k8s node name instead of instance id
node_name = get_node_by_instance_id(k8s_nodes, outdated['InstanceId'])
if not app_config["TAINT_NODES"]:
cordon_node(node_name)
else:
taint_node(node_name)
except Exception as exception:
logger.error(f"Encountered an error when adding taint/cordoning node {node_name}")
logger.error(exception)
exit(1)
if len(outdated_instances) != 0:
# if ASG termination is ignored then suspend 'Launch' and 'ReplaceUnhealthy'
# for this ASG to avoid instances being spawned during terminate/detach phase
if not use_asg_termination_policy:
modify_aws_autoscaling(asg_name, "suspend")
# start draining and terminating
desired_asg_capacity = asg_state_dict[asg_name][0]
for outdated in outdated_instances:
# catch any failures so we can resume aws autoscaling
try:
# get the k8s node name instead of instance id
node_name = get_node_by_instance_id(k8s_nodes, outdated['InstanceId'])
desired_asg_capacity -= 1
drain_node(node_name)
delete_node(node_name)
save_asg_tags(asg_name, app_config["ASG_DESIRED_STATE_TAG"], desired_asg_capacity)
# terminate/detach outdated instances only if ASG termination policy is ignored
if not use_asg_termination_policy:
terminate_instance_in_asg(outdated['InstanceId'])
if not instance_terminated(outdated['InstanceId']):
raise Exception('Instance is failing to terminate. Cancelling out.')
between_nodes_wait = app_config['BETWEEN_NODES_WAIT']
if between_nodes_wait != 0:
logger.info(f'Waiting for {between_nodes_wait} seconds before continuing...')
time.sleep(between_nodes_wait)
except Exception as drain_exception:
logger.info(drain_exception)
raise RollingUpdateException("Rolling update on ASG failed", asg_name)
# scaling cluster back down
logger.info("Scaling asg back down to original state")
asg_desired_capacity, asg_orig_desired_capacity, asg_orig_max_capacity = asg_state_dict[asg_name]
scale_asg(asg_name, asg_desired_capacity, asg_orig_desired_capacity, asg_orig_max_capacity)
# resume aws autoscaling only if ASG termination policy is ignored
if not use_asg_termination_policy:
modify_aws_autoscaling(asg_name, "resume")
# remove aws tag
delete_asg_tags(asg_name, app_config["ASG_DESIRED_STATE_TAG"])
delete_asg_tags(asg_name, app_config["ASG_ORIG_CAPACITY_TAG"])
delete_asg_tags(asg_name, app_config["ASG_ORIG_MAX_CAPACITY_TAG"])
logger.info(f'*** Rolling update of asg {asg_name} is complete! ***')
logger.info('All asgs processed')
def main(args=None):
parser = argparse.ArgumentParser(description='Rolling update on cluster')
parser.add_argument('--cluster_name', '-c', required=True,
help='the cluster name to perform rolling update on')
parser.add_argument('--plan', '-p', action='store_const', const=True,
help='perform a dry run to see which instances are out of date')
args = parser.parse_args(args)
# check kubectl is installed
kctl = shutil.which('kubectl')
if not kctl:
logger.info('kubectl is required to be installed before proceeding')
quit(1)
filtered_asgs = get_asgs(args.cluster_name)
run_mode = app_config['RUN_MODE']
# perform a dry run on mode 4 for older nodes
if (args.plan or app_config['DRY_RUN']) and (run_mode == 4):
plan_asgs_older_nodes(filtered_asgs)
# perform a dry run on main mode
elif args.plan or app_config['DRY_RUN']:
plan_asgs(filtered_asgs)
else:
# perform real update
if app_config['K8S_AUTOSCALER_ENABLED']:
# pause k8s autoscaler
modify_k8s_autoscaler("pause")
try:
update_asgs(filtered_asgs, args.cluster_name)
if app_config['K8S_AUTOSCALER_ENABLED']:
# resume autoscaler after asg updated
modify_k8s_autoscaler("resume")
logger.info('*** Rolling update of all asg is complete! ***')
except Exception as e:
logger.error(e)
logger.error('*** Rolling update of ASG has failed. Exiting ***')
logger.error('AWS Auto Scaling Group processes will need resuming manually')
if app_config['K8S_AUTOSCALER_ENABLED']:
logger.error('Kubernetes Cluster Autoscaler will need resuming manually')
sys.exit(1)