-
Notifications
You must be signed in to change notification settings - Fork 157
Expand file tree
/
Copy pathtest_disk.py
More file actions
205 lines (166 loc) · 4.86 KB
/
test_disk.py
File metadata and controls
205 lines (166 loc) · 4.86 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
import json
from tests.integration.helpers import (
BASE_CMDS,
assert_headers_in_lines,
exec_test_command,
get_random_text,
retry_exec_test_command_with_delay,
wait_for_condition,
)
from tests.integration.linodes.fixtures import ( # noqa: F401
linode_instance_disk_tests,
)
from tests.integration.linodes.helpers import (
get_disk_ids,
)
def _get_smallest_disk_id(linode_id):
"""Return the disk ID of the smallest disk (e.g. swap) on the Linode."""
disks_json = exec_test_command(
BASE_CMDS["linodes"]
+ [
"disks-list",
linode_id,
"--json",
]
)
disks = json.loads(disks_json)
smallest = min(disks, key=lambda d: d["size"])
return str(smallest["id"])
def test_disk_resize_clone_and_create(linode_instance_disk_tests):
linode_id = linode_instance_disk_tests
# Ensure disks are available
def disks_ready():
return len(get_disk_ids(linode_id=linode_id)) >= 2
wait_for_condition(10, 120, disks_ready)
# Use the smallest disk (swap) for resize/clone — the main OS disk
# is too large to shrink to 50 MB because it contains filesystem data.
disk_id = _get_smallest_disk_id(linode_id)
def disk_poll_func():
status = exec_test_command(
BASE_CMDS["linodes"]
+ [
"disk-view",
linode_id,
disk_id,
"--text",
"--no-headers",
"--format=status",
]
)
return status.strip() == "ready"
# Make sure the disk is ready before resizing
wait_for_condition(15, 300, disk_poll_func)
# resize disk
retry_exec_test_command_with_delay(
BASE_CMDS["linodes"]
+ [
"disk-resize",
linode_id,
disk_id,
"--size",
"50",
],
retries=10,
delay=15,
)
# Wait for the disk to be ready after resize
wait_for_condition(15, 300, disk_poll_func)
def disk_size_poll_func():
size = exec_test_command(
BASE_CMDS["linodes"]
+ [
"disk-view",
linode_id,
disk_id,
"--text",
"--no-headers",
"--format=size",
]
)
return size.strip() == "50"
# Verify the resize actually took effect
wait_for_condition(15, 300, disk_size_poll_func)
# clone disk
res = retry_exec_test_command_with_delay(
BASE_CMDS["linodes"]
+ [
"disk-clone",
linode_id,
disk_id,
"--text",
],
retries=10,
delay=15,
)
headers = ["id", "label", "status", "size", "filesystem", "disk_encryption"]
assert_headers_in_lines(headers, res.splitlines())
assert "Copy of" in res
assert "50" in res
label = get_random_text(5) + "disk"
# create new disk
res = retry_exec_test_command_with_delay(
BASE_CMDS["linodes"]
+ [
"disk-create",
linode_id,
"--size",
"15",
"--label",
label,
"--text",
],
retries=3,
delay=10,
)
headers = ["id", "label", "status", "size", "filesystem", "disk_encryption"]
assert_headers_in_lines(headers, res.splitlines())
assert label in res
assert "15" in res
def test_disk_reset_password(linode_instance_disk_tests):
linode_id = linode_instance_disk_tests
disk_id = get_disk_ids(linode_id)[0]
retry_exec_test_command_with_delay(
BASE_CMDS["linodes"]
+ [
"disk-reset-password",
linode_id,
disk_id,
"--password",
"ThIsIsRanDomPaSsWoRD",
"--text",
],
retries=3,
delay=10,
)
def test_disk_update(linode_instance_disk_tests):
linode_id = linode_instance_disk_tests
disk_id = get_disk_ids(linode_id)[0]
update_label = get_random_text(5) + "newdisk"
res = retry_exec_test_command_with_delay(
BASE_CMDS["linodes"]
+ [
"disk-update",
linode_id,
disk_id,
"--label",
update_label,
"--text",
],
retries=3,
delay=10,
)
headers = ["id", "label", "status", "size", "filesystem", "disk_encryption"]
assert_headers_in_lines(headers, res.splitlines())
assert update_label in res
def test_disks_list(linode_instance_disk_tests):
linode_id = linode_instance_disk_tests
res = retry_exec_test_command_with_delay(
BASE_CMDS["linodes"]
+ [
"disks-list",
linode_id,
"--text",
]
)
headers = ["id", "label", "status", "size", "filesystem", "disk_encryption"]
assert_headers_in_lines(headers, res.splitlines())