-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy pathdisk_tools.py
More file actions
415 lines (333 loc) · 13.6 KB
/
disk_tools.py
File metadata and controls
415 lines (333 loc) · 13.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
#
# Copyright(c) 2019-2022 Intel Corporation
# Copyright(c) 2023-2025 Huawei Technologies Co., Ltd.
# SPDX-License-Identifier: BSD-3-Clause
#
import posixpath
import re
import time
from enum import Enum
from typing import List
from core.test_run import TestRun
from test_tools.dd import Dd
from test_tools.fs_tools import readlink, parse_ls_output, ls, check_if_directory_exists, \
create_directory, wipefs, is_mounted
from test_tools.udev import Udev
from type_def.size import Size, Unit
SECTOR_SIZE = 512
class PartitionTable(Enum):
msdos = 0
gpt = 1
class PartitionType(Enum):
efi = 0
primary = 1
extended = 2
logical = 3
lvm = 4
msr = 5
swap = 6
standard = 7
unknown = 8
def create_partition_table(device, partition_table_type: PartitionTable = PartitionTable.gpt):
TestRun.LOGGER.info(
f"Creating partition table ({partition_table_type.name}) for device: {device.path}")
cmd = f'parted --script {device.path} mklabel {partition_table_type.name}'
TestRun.executor.run_expect_success(cmd)
device.partition_table = partition_table_type
TestRun.LOGGER.info(
f"Successfully created {partition_table_type.name} "
f"partition table on device: {device.path}")
def get_partition_path(parent_dev, number):
# TODO: change this to be less specific hw dependent (kernel)
if "dev/cas" not in parent_dev:
id_separator = '-part'
else:
id_separator = 'p' # "cas1-1p1"
return f'{parent_dev}{id_separator}{number}'
def discover_partition(device) -> list:
cmd = (
f"lsblk -ln -o NAME,TYPE {device.path} " + r"""| awk '$2 == "part" { print "/dev/" $1 }'"""
)
output = TestRun.executor.run(cmd).stdout
partition_list = [line for line in output.split("\n") if line]
if not partition_list:
return []
device_name = re.search(r"^(/dev/\S+)(?=\d+)", partition_list[0]).group()
if "nvme" in partition_list[0]:
device_name = device_name[:-1]
# needs to be placed here (circular dependency)
from storage_devices.partition import Partition
partitions_on_device_list = []
cmd = f"fdisk -l --bytes {device_name}" + r"| grep -E '^/dev/[a-z]+[0-9]+'"
output = TestRun.executor.run(cmd).stdout.split("\n")
for partition in partition_list:
disk_params = [
x for x in next(param for param in output if partition in param).split(" ") if x
]
partition_number = int(re.search(r"(\d+)(?!.*\d)", disk_params[0]).group())
part_number = int(partition_number)
begin = Size(int(disk_params[1]), Unit.Byte)
end = Size(
(int(disk_params[2])),
Unit.Byte,
)
partitions_on_device_list.append(
Partition(
parent_dev=device,
type=PartitionType.logical,
number=part_number,
begin=begin,
end=end,
)
)
return partitions_on_device_list
def remove_parition(device, part_number):
TestRun.LOGGER.info(f"Removing part {part_number} from {device.path}")
cmd = f'parted --script {device.path} rm {part_number}'
output = TestRun.executor.run(cmd)
if output.exit_code != 0:
TestRun.executor.run_expect_success("partprobe")
def create_partition(
device,
part_size,
part_number,
part_type: PartitionType = PartitionType.primary,
unit=Unit.MebiByte,
aligned: bool = True):
TestRun.LOGGER.info(
f"Creating {part_type.name} partition on device: {device.path}")
begin = get_first_partition_offset(device, aligned)
for part in device.partitions:
begin += part.size
if part.type == PartitionType.logical:
begin += Size(1, Unit.MebiByte if not aligned else device.block_size)
if part_type == PartitionType.logical:
begin += Size(1, Unit.MebiByte if not aligned else device.block_size)
if part_size != Size.zero():
end = (begin + part_size)
end_cmd = f'{end.get_value(unit)}{unit_to_string(unit)}'
else:
end_cmd = '100%'
cmd = f'parted --script {device.path} mkpart ' \
f'{part_type.name} ' \
f'{begin.get_value(unit)}{unit_to_string(unit)} ' \
f'{end_cmd}'
output = TestRun.executor.run(cmd)
if output.exit_code != 0:
TestRun.executor.run_expect_success("partprobe")
TestRun.executor.run_expect_success("udevadm settle")
if not check_partition_after_create(
size=part_size,
part_number=part_number,
parent_dev_path=device.path,
part_type=part_type,
aligned=aligned):
raise Exception("Could not create partition!")
if part_type != PartitionType.extended:
from storage_devices.partition import Partition
new_part = Partition(device,
part_type,
part_number,
begin,
end if type(end) is Size else device.size)
dd = Dd().input("/dev/zero") \
.output(new_part.path) \
.count(1) \
.block_size(Size(1, Unit.Blocks4096)) \
.oflag("direct")
dd.run()
device.partitions.append(new_part)
TestRun.LOGGER.info(f"Successfully created {part_type.name} partition on {device.path}")
def available_disk_size(device):
dev = f"/dev/{device.get_device_id()}"
# get number of device's sectors
disk_sectors = int(TestRun.executor.run(f"fdisk -l {dev} | grep {dev} | grep sectors "
f"| awk '{{print $7 }}' ").stdout)
# get last occupied sector
last_occupied_sector = int(TestRun.executor.run(f"fdisk -l {dev} | grep {dev} "
f"| awk '{{print $3 }}' | tail -1").stdout)
available_disk_sectors = disk_sectors - last_occupied_sector
return Size(available_disk_sectors, Unit(get_block_size(device)))
def create_partitions(device, sizes: [], partition_table_type=PartitionTable.gpt):
create_partition_table(device, partition_table_type)
partition_type = PartitionType.primary
partition_number_offset = 0
msdos_part_max_size = Size(2, Unit.TeraByte)
for s in sizes:
size = Size(
s.get_value(device.block_size) - 1, device.block_size)
if partition_table_type == PartitionTable.msdos and \
len(sizes) > 4 and len(device.partitions) == 3:
if available_disk_size(device) > msdos_part_max_size:
part_size = msdos_part_max_size
else:
part_size = Size.zero()
create_partition(device,
part_size,
4,
PartitionType.extended)
partition_type = PartitionType.logical
partition_number_offset = 1
partition_number = len(device.partitions) + 1 + partition_number_offset
create_partition(device=device,
part_size=size,
part_number=partition_number,
part_type=partition_type,
unit=device.block_size,
aligned=True)
def get_block_size(device):
try:
block_size = float(TestRun.executor.run(
f"cat {get_sysfs_path(device)}/queue/hw_sector_size").stdout)
except ValueError:
block_size = Unit.Blocks512.value
return block_size
def get_size(device):
output = TestRun.executor.run_expect_success(f"cat {get_sysfs_path(device)}/size")
blocks_count = int(output.stdout)
return blocks_count * SECTOR_SIZE
def get_sysfs_path(device):
sysfs_path = f"/sys/class/block/{device}"
if TestRun.executor.run(f"test -d {sysfs_path}").exit_code != 0:
sysfs_path = f"/sys/block/{device}"
return sysfs_path
def get_pci_address(device):
pci_address = TestRun.executor.run(f"cat /sys/block/{device}/device/address").stdout
return pci_address
def check_partition_after_create(size: Size, part_number: int, parent_dev_path: str,
part_type: PartitionType, aligned: bool):
partition_path = get_partition_path(parent_dev_path, part_number)
if "dev/cas" not in partition_path:
cmd = f"find {partition_path} -type l"
else:
cmd = f"find {partition_path}"
output = TestRun.executor.run_expect_success(cmd).stdout
if partition_path not in output:
TestRun.LOGGER.info(
"Partition created, but could not find it in system, trying 'hdparm -z'")
TestRun.executor.run_expect_success(f"hdparm -z {parent_dev_path}")
output_after_hdparm = TestRun.executor.run_expect_success(
f"parted --script {parent_dev_path} print").stdout
TestRun.LOGGER.info(output_after_hdparm)
counter = 0
while partition_path not in output and counter < 10:
time.sleep(2)
output = TestRun.executor.run(cmd).stdout
counter += 1
if len(output.split('\n')) > 1 or partition_path not in output:
return False
if aligned and part_type != PartitionType.extended \
and size.get_value(Unit.Byte) % Unit.Blocks4096.value != 0:
TestRun.LOGGER.warning(
f"Partition {partition_path} is not 4k aligned: {size.get_value(Unit.KibiByte)}KiB")
partition_size = get_size(readlink(partition_path).split('/')[-1])
if part_type == PartitionType.extended or \
partition_size == size.get_value(Unit.Byte):
return True
TestRun.LOGGER.warning(
f"Partition size {partition_size} does not match expected {size.get_value(Unit.Byte)} size."
)
return True
def get_first_partition_offset(device, aligned: bool):
if aligned:
return Size(1, Unit.MebiByte)
# 33 sectors are reserved for the backup GPT
return Size(34, Unit(device.blocksize)) \
if device.partition_table == PartitionTable.gpt else Size(1, device.blocksize)
def remove_partitions(device):
if is_mounted(device.path):
device.unmount()
for partition in device.partitions:
unmount(partition)
TestRun.LOGGER.info(f"Removing partitions from device: {device.path} "
f"({device.get_device_id()}).")
wipefs(device)
Udev.trigger()
Udev.settle()
output = TestRun.executor.run(f"ls {device.path}* -1")
if len(output.stdout.split('\n')) > 1:
TestRun.LOGGER.error(f"Could not remove partitions from device {device.path}")
return False
return True
def mount(device, mount_point, options: [str] = None):
if not check_if_directory_exists(mount_point):
create_directory(mount_point, True)
TestRun.LOGGER.info(f"Mounting device {device.path} ({device.get_device_id()}) "
f"to {mount_point}.")
cmd = f"mount {device.path} {mount_point}"
if options:
cmd = f"{cmd} -o {','.join(options)}"
output = TestRun.executor.run(cmd)
if output.exit_code != 0:
raise Exception(f"Failed to mount {device.path} to {mount_point}")
device.mount_point = mount_point
def unmount(device):
TestRun.LOGGER.info(f"Unmounting device {device.path} ({device.get_device_id()}).")
if device.mount_point is not None:
output = TestRun.executor.run(f"umount {device.mount_point}")
if output.exit_code != 0:
TestRun.LOGGER.error("Could not unmount device.")
return False
return True
else:
TestRun.LOGGER.info("Device is not mounted.")
return True
def unit_to_string(unit):
unit_string = {
Unit.Byte: 'B',
Unit.Blocks512: 's',
Unit.Blocks4096: 's',
Unit.KibiByte: 'KiB',
Unit.MebiByte: 'MiB',
Unit.GibiByte: 'GiB',
Unit.TebiByte: 'TiB',
Unit.KiloByte: 'kB',
Unit.MegaByte: 'MB',
Unit.GigaByte: 'GB',
Unit.TeraByte: 'TB'
}
return unit_string.get(unit, "Invalid unit.")
def check_if_device_supports_trim(device):
if device.get_device_id().startswith("nvme"):
return True
command_output = TestRun.executor.run(f'hdparm -I {device.path} | grep "TRIM supported"')
if command_output.exit_code == 0:
return True
command_output = TestRun.executor.run(
f"lsblk -dn {device.path} -o DISC-MAX | grep -o \'[0-9]\\+\'"
)
return int(command_output.stdout) > 0
def _is_by_id_path(path: str):
"""check if given path already is proper by-id path"""
dev_by_id_dir = "/dev/disk/by-id"
by_id_paths = parse_ls_output(ls(dev_by_id_dir), dev_by_id_dir)
return path in [posixpath.join(dev_by_id_dir, id_path.full_path) for id_path in by_id_paths]
def _is_dev_path_whitelisted(path: str):
"""check if given path is whitelisted"""
whitelisted_paths = [
r"/dev/ram\d+",
r"/nullb\d+",
r"/dev/drbd\d+",
r"cas\d+-\d+",
r"/dev/dm-\d+"
]
for whitelisted_path in whitelisted_paths:
if re.search(whitelisted_path, path) is not None:
return True
return False
def validate_dev_path(path: str):
if not posixpath.isabs(path):
raise ValueError(f'Given path "{path}" is not absolute.')
if _is_dev_path_whitelisted(path):
return path
if _is_by_id_path(path):
return path
raise ValueError(f'By-id device link {path} is broken.')
def get_block_device_names_list(exclude_list: List[int] = None) -> List[str]:
cmd = "lsblk -lo NAME"
if exclude_list is not None:
cmd += f" -e {','.join(str(type_id) for type_id in exclude_list)}"
devices = TestRun.executor.run_expect_success(cmd).stdout
devices_list = devices.splitlines()
devices_list.sort()
return devices_list