-
Notifications
You must be signed in to change notification settings - Fork 8
Expand file tree
/
Copy pathfunction_app.py
More file actions
188 lines (143 loc) · 6.31 KB
/
function_app.py
File metadata and controls
188 lines (143 loc) · 6.31 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
import requests
import logging
import os
import socket
import azure.functions as func
from azure.identity import ManagedIdentityCredential
# version - 0.11
app = func.FunctionApp()
API_BASE_URL = os.getenv("API_URL")
API_CLIENT_ID = os.getenv("API_CLIENT_ID")
API_APP_URI = f"api://{API_CLIENT_ID}"
credential = ManagedIdentityCredential()
def get_access_token():
try:
token = credential.get_token(f"{API_APP_URI}/.default")
return token.token
except Exception as e:
logging.error(f"Error obtaining access token: {str(e)}")
return None
def get_headers():
access_token = get_access_token()
if not access_token:
logging.error("Could not obtain access token.")
return None
headers = {
'Authorization': f'Bearer {access_token}'
}
return headers
# ===============================
# VM Management Tasks
# ===============================
@app.function_name(name="ReturnReleasedVMs")
@app.timer_trigger(schedule="0 * * * * *", # Every minute
arg_name="mytimer", run_on_startup=True)
def trigger_return_released_vms(mytimer: func.TimerRequest) -> None:
logging.info('Running scheduled check for released VMs to return.')
if mytimer.past_due:
logging.info('The timer is past due!')
try:
if not API_BASE_URL:
logging.error("API_URL not set in environment variables.")
return
headers = get_headers()
if headers is None:
return
# Construct the full API URL
check_and_return_url = f"{API_BASE_URL}/vms/released"
# Make an HTTP POST request to the API
response = requests.post(check_and_return_url, headers=headers)
if response.status_code == 200:
logging.info("Check and return logic triggered successfully.")
else:
logging.error(f"Failed to trigger check and return logic. Status code: {response.status_code}. Response: {response.text}")
except Exception as e:
logging.error(f"Error executing time-triggered check and return logic: {str(e)}")
@app.function_name(name="TestVMConnectivity")
@app.timer_trigger(schedule="0 0 * * * *", # Every hour at the top of the hour
arg_name="mytimer", run_on_startup=True)
def test_vm_connectivity(mytimer: func.TimerRequest) -> None:
logging.info('TestVMConnectivity function started.')
if mytimer.past_due:
logging.info('The timer is past due!')
try:
if not API_BASE_URL:
logging.error("API_URL not set in environment variables.")
return
headers = get_headers()
if headers is None:
return
# Construct the API URL for fetching all VMs
get_all_vms_url = f"{API_BASE_URL}/vms"
# Make an HTTP GET request to the API to fetch all VMs
response = requests.get(get_all_vms_url, headers=headers)
if response.status_code != 200:
logging.error(f"Failed to retrieve VMs. Status code: {response.status_code}")
return
vms = response.json()
# Iterate over each VM and test connectivity
for vm in vms:
vm_id = vm.get('VMID')
ip_address = vm.get('IPAddress')
if not ip_address:
logging.warning(f"No IP address found for VMID: {vm_id}")
continue
# Use os.system to test connectivity to port 22 using curl
try:
alive = False
try:
with socket.create_connection((ip_address, 22), timeout=3):
alive = True
except (socket.timeout, socket.error):
alive = False
network_status = 'Reachable' if alive else 'Unreachable'
logging.info(f"VMID: {vm_id}, IP Address: {ip_address}, Network Status: {network_status}")
except Exception as e:
logging.error(f"Error testing connectivity for VMID: {vm_id}: {str(e)}")
network_status = 'Unreachable'
# Prepare the data for updating VM attributes
update_data = {
"powerstate": None, # No change to PowerState
"networkstatus": network_status, # Update NetworkStatus
"vmstatus": None # No change to VmStatus
}
# Construct the API URL for updating VM attributes
update_vm_url = f"{API_BASE_URL}/vms/{vm_id}/update-attributes"
# Make an HTTP POST request to the API to update the network status
response = requests.post(update_vm_url, headers=headers, json=update_data)
if response.status_code == 200:
logging.info(f"Updated network status for VMID: {vm_id} to {network_status}.")
else:
logging.error(f"Failed to update network status for VMID: {vm_id}. Status code: {response.status_code}")
logging.info('TestVMConnectivity function completed.')
except Exception as e:
logging.error(f"Error executing TestVMConnectivity function: {str(e)}")
# ===============================
# Scaling Tasks
# ===============================
# Scaling logic triggered every 5 minutes
@app.function_name(name="ScalingVMs")
@app.timer_trigger(schedule="0 */5 * * * *", # Every 5 minutes
arg_name="mytimer",
run_on_startup=True)
def time_triggered_scaling(mytimer: func.TimerRequest) -> None:
logging.info('Time-triggered scaling logic execution started.')
if mytimer.past_due:
logging.info('The timer is past due!')
try:
if not API_BASE_URL:
logging.error("API_URL not set in environment variables.")
return
headers = get_headers()
if headers is None:
return
# Construct the full API URL
scaling_api_url = f"{API_BASE_URL}/scaling/trigger"
# Make an HTTP POST request to the API
response = requests.post(scaling_api_url, headers=headers)
if response.status_code == 200:
logging.info("Scaling logic triggered successfully.")
else:
logging.error(f"Failed to trigger scaling logic. Status code: {response.status_code}. Response: {response.text}")
except Exception as e:
logging.error(f"Error executing time-triggered scaling logic: {str(e)}")