-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathconftest.py
More file actions
215 lines (158 loc) · 5.44 KB
/
conftest.py
File metadata and controls
215 lines (158 loc) · 5.44 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
from __future__ import annotations
from contextlib import AbstractContextManager, suppress
from logging import LogRecord, setLogRecordFactory
from typing import TYPE_CHECKING
from hypothesis import HealthCheck
from pytest import fixture, param, skip
from whenever import PlainDateTime
from utilities.contextlib import enhanced_context_manager
from utilities.pytest import IS_CI, IS_CI_AND_NOT_LINUX, skipif_ci
from utilities.re import ExtractGroupError, extract_group
from utilities.whenever import MINUTE, get_now_local_plain
if TYPE_CHECKING:
from collections.abc import AsyncIterator, Iterator
from pathlib import Path
from _pytest.fixtures import SubRequest
from redis.asyncio import Redis
from sqlalchemy import Engine, TextClause
from sqlalchemy.ext.asyncio import AsyncEngine
# hypothesis
try:
from utilities.hypothesis import setup_hypothesis_profiles
except ModuleNotFoundError:
pass
else:
setup_hypothesis_profiles(suppress_health_check={HealthCheck.differing_executors})
# fixtures - docker
@fixture
def container() -> str:
return "postgres"
# fixtures - logging
@fixture
def set_log_factory() -> AbstractContextManager[None]:
@enhanced_context_manager
def cm() -> Iterator[None]:
try:
yield
finally:
setLogRecordFactory(LogRecord)
return cm()
# fixtures - redis
@fixture
async def test_redis() -> AsyncIterator[Redis]:
if IS_CI_AND_NOT_LINUX:
skip(reason="Skipped for CI/non-Linux")
from utilities.redis import yield_redis
async with yield_redis(db=15) as redis:
yield redis
# fixtures - sqlalchemy
@fixture(params=[param("sqlite"), param("postgresql", marks=skipif_ci)])
def test_engine(*, request: SubRequest, tmp_path: Path) -> Engine:
from sqlalchemy.exc import OperationalError
from utilities.sqlalchemy import create_engine
dialect = request.param
match dialect:
case "sqlite":
db_path = tmp_path / "db.sqlite"
return create_engine("sqlite", database=str(db_path))
case "postgresql":
engine = create_engine(
"postgresql+psycopg",
username="postgres",
password="password", # noqa: S106
host="localhost",
port=5432,
database="testing",
)
try:
with engine.begin() as conn:
tables: list[str] = list(
conn.execute(_select_tables()).scalars().all()
)
except OperationalError:
...
else:
for table in filter(_is_to_drop, tables):
with engine.begin() as conn, suppress(Exception):
_ = conn.execute(_drop_table(table))
return engine
case _:
msg = f"Unsupported dialect: {dialect}"
raise NotImplementedError(msg)
@fixture(params=[param("sqlite"), param("postgresql", marks=skipif_ci)])
async def test_async_engine(
*,
request: SubRequest,
test_async_sqlite_engine: AsyncEngine,
test_async_postgres_engine: AsyncEngine,
) -> AsyncEngine:
dialect = request.param
match dialect:
case "sqlite":
return test_async_sqlite_engine
case "postgresql":
return test_async_postgres_engine
case _:
msg = f"Unsupported dialect: {dialect}"
raise NotImplementedError(msg)
@fixture
async def test_async_sqlite_engine(*, tmp_path: Path) -> AsyncEngine:
from utilities.sqlalchemy import create_engine
db_path = tmp_path / "db.sqlite"
return create_engine("sqlite+aiosqlite", database=str(db_path), async_=True)
@fixture
async def test_async_postgres_engine() -> AsyncEngine:
from asyncpg.exceptions import InvalidCatalogNameError
from utilities.sqlalchemy import create_engine
if IS_CI:
skip(reason="Skipped for CI")
engine = create_engine(
"postgresql+asyncpg",
username="postgres",
password="password", # noqa: S106
host="localhost",
port=5432,
database="testing",
async_=True,
)
try:
async with engine.begin() as conn:
tables: list[str] = list(
(await conn.execute(_select_tables())).scalars().all()
)
except InvalidCatalogNameError:
...
else:
for table in filter(_is_to_drop, tables):
async with engine.begin() as conn:
with suppress(Exception):
_ = await conn.execute(_drop_table(table))
return engine
def _is_to_drop(table: str, /) -> bool:
now = get_now_local_plain()
try:
text = extract_group(r"^(\d{8}T\d{2,})_", table)
except ExtractGroupError:
return True
date_time = PlainDateTime.parse_iso(text)
age = now.difference(date_time, ignore_dst=True)
return age >= MINUTE
def _select_tables() -> TextClause:
from sqlalchemy import text
return text("SELECT tablename FROM pg_tables")
def _drop_table(table: str, /) -> TextClause:
from sqlalchemy import text
return text(f'DROP TABLE IF EXISTS "{table}" CASCADE')
# fixtures - subprocess
@fixture
def git_repo_url() -> str:
return "https://github.com/CogWorksBWSI/GitPracticeRepo"
@fixture
def ssh_user() -> str:
return "root"
@fixture
def ssh_hostname() -> str:
return "proxmox.main"
@fixture
def ssh_hostname_internal() -> str:
return "proxmox"