-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathtest_scenarios.py
More file actions
142 lines (120 loc) · 4.3 KB
/
test_scenarios.py
File metadata and controls
142 lines (120 loc) · 4.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
import pathlib
import pytest
import simvue
import time
import contextlib
import random
import tempfile
import threading
from multiprocessing import Process, Manager
from simvue.api.objects.artifact.fetch import Artifact
@pytest.mark.scenario
@pytest.mark.parametrize(
"file_size", (1, 10, 100)
)
def test_large_file_upload(file_size: int, create_plain_run: tuple[simvue.Run, dict]) -> None:
FILE_SIZE_MB: int = file_size
run, _ = create_plain_run
run.update_metadata({"file_size_mb": file_size})
_file = None
_temp_file_name = None
try:
with tempfile.NamedTemporaryFile(mode="w+b", delete=False) as temp_f:
temp_f.seek(FILE_SIZE_MB * 1024 * 1024 - 1)
temp_f.write(b'\0')
temp_f.flush()
temp_f.seek(0)
temp_f.close()
_temp_file_name = temp_f.name
_input_file_size = pathlib.Path(f"{_temp_file_name}").stat().st_size
run.save_file(file_path=f"{temp_f.name}", category="output", name="test_large_file_artifact")
run.close()
client = simvue.Client()
with tempfile.TemporaryDirectory() as tempd:
client.get_artifact_as_file(
run_id=run.id,
name="test_large_file_artifact",
output_dir=tempd
)
_file = next(pathlib.Path(tempd).glob("*"))
# Assert the returned file size
assert _file.stat().st_size == _input_file_size
except Exception as e:
_run = simvue.Run()
_run.reconnect(run.id)
_run.set_status("failed")
raise e
finally:
if _file and _file.exists():
_file.unlink()
if _temp_file_name and (_src := pathlib.Path(_temp_file_name)).exists():
_src.unlink()
with contextlib.suppress(Exception):
Artifact.from_name("test_large_file_artifact", run_id=run.id).delete()
@pytest.mark.scenario
def test_time_multi_run_create_threshold() -> None:
start = time.time()
runs: list[simvue.Run] = []
for i in range(10):
run = simvue.Run()
run.init(
f"test run {i}",
tags=["test_benchmarking"],
folder="/simvue_benchmark_testing",
retention_period="1 hour"
)
runs.append(run)
for run in runs:
run.close()
end = time.time()
client = simvue.Client()
with contextlib.suppress(RuntimeError):
client.delete_runs("/simvue_benchmark_testing")
client.delete_folder(
"/simvue_benchmark_testing",
remove_runs=False,
allow_missing=True,
recursive=True,
)
assert start - end < 60.0
@pytest.fixture
def run_deleter(request):
ident_dict = {}
def delete_run():
simvue.Client().delete_run(ident_dict["ident"])
request.addfinalizer(delete_run)
return ident_dict
def upload(name: str, values_per_run: int, shared_dict) -> None:
run = simvue.Run()
run.init(name=name, tags=["simvue_client_tests"])
shared_dict["ident"] = run._id
for i in range(values_per_run):
run.log_metrics({"increment": i})
run.close()
@pytest.mark.scenario
@pytest.mark.parametrize("values_per_run", (1, 2, 100, 1500))
@pytest.mark.parametrize("processing", ("local", "on_thread", "on_process"))
def test_uploaded_data_immediately_accessible(
values_per_run: int, processing: str, run_deleter
) -> None:
name = f"Test-{random.randint(0, 1000000000)}"
manager = Manager()
shared_dict = manager.dict()
if processing == "local":
upload(name, values_per_run, shared_dict)
else:
if processing == "on_thread":
thread = threading.Thread(
target=upload, args=(name, values_per_run, shared_dict)
)
else:
thread = Process(target=upload, args=(name, values_per_run, shared_dict))
thread.start()
thread.join()
run_deleter["ident"] = shared_dict["ident"]
values = simvue.Client().get_metric_values(
["increment"], "step", run_ids=[shared_dict["ident"]], max_points=2 * values_per_run, aggregate=False
)["increment"]
assert len(values) == values_per_run, "all uploaded values should be returned"
for i in range(len(values)):
assert i == int(values[(i, shared_dict["ident"])]), "values should be ascending ints"