-
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathipctool_test.py
More file actions
executable file
·126 lines (94 loc) · 3.67 KB
/
ipctool_test.py
File metadata and controls
executable file
·126 lines (94 loc) · 3.67 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
#!/usr/bin/env python3
import pytest
from subprocess import Popen, PIPE
import os
import yaml
from telnet import Telnet
import tasmota
def pytest_generate_tests(metafunc):
def generate_id(input_data, level):
level += 1
# Choose how it will look
INDENTS = {
# level: (levelmark, addition_indent)
1: ("_", ["", ""]),
2: ("-", ["[", "]"]),
}
COMMON_INDENT = ("-", ["[", "]"])
levelmark, additional_indent = INDENTS.get(level, COMMON_INDENT)
# If deeper than 2 level, let's take data type as id
if level > 3:
return (
additional_indent[0] + type(input_data).__name__ + additional_indent[1]
)
# Return trivial data types
elif isinstance(input_data, (str, bool, float, int)):
return str(input_data)
# Parse collection types
elif isinstance(input_data, (list, set, tuple)):
# Traverse list to check data inside
list_repr = levelmark.join(
[generate_id(input_value, level=level) for input_value in input_data]
)
return additional_indent[0] + list_repr + additional_indent[1]
# Convert dictionary keys to string
elif isinstance(input_data, dict):
return "{" + levelmark.join(input_data.keys()) + "}"
# Or do nothing
else:
return None
if "test_case" not in metafunc.fixturenames:
return
dir_path = os.path.dirname(os.path.abspath(metafunc.module.__file__))
file_path = os.path.join(dir_path, metafunc.function.__name__ + ".yaml")
with open(file_path) as f:
raw_test_cases = yaml.full_load(f)
if not raw_test_cases:
raise ValueError("Test cases not loaded")
test_cases = []
# Traverse raw data
for case_id, test_case in enumerate(raw_test_cases):
# Search list of markers
marks = [getattr(pytest.mark, name) for name in test_case.get("marks", [])]
# Get specied id or generate it
case_id = test_case.get("id", generate_id(test_case["hostname"], level=0))
# Add test to ready cases
test_cases.append(pytest.param(test_case, marks=marks, id=case_id,))
return metafunc.parametrize("test_case", test_cases)
def do_test(test_case):
host = test_case["hostname"]
expected = test_case["output"]
connection = test_case.get("connection", "ssh")
hash = os.environ["SHA"]
binary = "ipctool-{}".format(hash)
durl = "openipc.s3-eu-west-1.amazonaws.com/{}".format(binary)
if connection == "telnet":
UGET = "/tmp/uget"
proxy = test_case.get("proxy")
proxy_type = test_case.get("proxy_type", None)
t = Telnet(host, debug=False, proxy_type=proxy_type, proxy=proxy)
t.login()
if not t.file_exists(UGET):
t.upload_uget()
output = t.run_command(UGET + " run " + durl).splitlines()
t.close()
assert output == expected
elif connection == "ssh":
run_cmd = "cd /tmp; rm -f {0}; wget -q http://{1}; chmod +x {0}; ./{0}; rm {0}".format(
binary, durl
)
with Popen(["ssh", "root@{}".format(host), run_cmd], stdout=PIPE) as proc:
output = [i.decode("utf-8") for i in proc.stdout.read().splitlines()]
assert output == expected
def test_zftlab(test_case):
do_test(**locals())
def test_dlab(test_case):
do_test(**locals())
@pytest.fixture(scope='session')
def tasmota_50803B():
print()
with tasmota.updown("tasmota_50803B.dlab", "10.216.128.5", warmup=250) as resource:
yield
print()
def test_sw(test_case, tasmota_50803B):
do_test(test_case)