-
-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathenv.py
More file actions
156 lines (124 loc) · 5.01 KB
/
env.py
File metadata and controls
156 lines (124 loc) · 5.01 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
from alembic import context
from alembic.script import ScriptDirectory
from ddcdatabases import get_postgresql_settings
from logging.config import fileConfig
from sqlalchemy import create_engine, engine_from_config, pool, text
from sqlalchemy.schema import SchemaItem
from src.bot.constants.settings import get_bot_settings
from src.database.models import BotBase
from typing import Any, Literal
from urllib.parse import quote_plus
config = context.config
if config.config_file_name is not None:
fileConfig(config.config_file_name)
target_metadata = [BotBase.metadata]
_project_settings = get_bot_settings()
_postgres_settings = get_postgresql_settings()
_password = quote_plus(_postgres_settings.password)
def _build_url(database: str) -> str:
return (
f"{_postgres_settings.sync_driver}://"
f"{_postgres_settings.user}:"
f"{_password}@"
f"{_postgres_settings.host}:"
f"{_postgres_settings.port}/"
f"{database}"
f"?sslmode={_postgres_settings.ssl_mode}"
)
def _ensure_database_exists() -> None:
"""Connect to the 'postgres' maintenance DB and create the target database if it doesn't exist."""
db_name = _postgres_settings.database
engine = create_engine(_build_url("postgres"), isolation_level="AUTOCOMMIT")
with engine.connect() as conn:
result = conn.execute(text("SELECT 1 FROM pg_database WHERE datname = :db"), {"db": db_name})
if not result.scalar():
conn.execute(text(f'CREATE DATABASE "{db_name}"'))
engine.dispose()
_ensure_database_exists()
# set_main_option uses %-interpolation, so escape any % in the password
config.set_main_option("sqlalchemy.url", _build_url(_postgres_settings.database).replace("%", "%%"))
_schemas = {s.strip() for s in (_postgres_settings.schema or "public").split(",")}
def _include_object(
obj: SchemaItem,
_name: str | None,
type_: Literal["schema", "table", "column", "index", "unique_constraint", "foreign_key_constraint"],
_reflected: bool,
_compare_to: SchemaItem | None,
) -> bool | None:
"""
Filter to only include objects from our target schemas.
This prevents Alembic from trying to manage tables in other schemas.
"""
if type_ == "table" and hasattr(obj, "schema"):
obj_schema = obj.schema # type: ignore[attr-defined]
if obj_schema is None or obj_schema in _schemas:
return True
return False
return True
def _process_revision_directives(ctx: Any, revision: Any, directives: Any) -> None:
migration_script = directives[0]
head_revision = ScriptDirectory.from_config(ctx.config).get_current_head()
if head_revision is None:
new_rev_id = 1
else:
last_rev_id = int(head_revision.lstrip("0"))
new_rev_id = last_rev_id + 1
migration_script.rev_id = f"{new_rev_id:04}"
_version_table_schema = "public" if len(_schemas) > 1 else next(iter(_schemas))
def run_migrations_offline() -> None:
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well.
By skipping the Engine creation,
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
process_revision_directives=_process_revision_directives,
version_table_schema=_version_table_schema,
version_table=_project_settings.alembic_version_table_name,
include_schemas=True,
include_object=_include_object,
)
with context.begin_transaction():
for s in _schemas:
if s != "public":
context.execute(f"CREATE SCHEMA IF NOT EXISTS {s}")
context.run_migrations()
def run_migrations_online() -> None:
"""Run migrations in 'online' mode.
In this scenario, we need to create an Engine
and associate a connection with the context.
"""
connectable = engine_from_config(
config.get_section(config.config_ini_section, {}),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
for s in _schemas:
if s != "public":
connection.execute(text(f"CREATE SCHEMA IF NOT EXISTS {s}"))
connection.commit()
context.configure(
connection=connection,
target_metadata=target_metadata,
process_revision_directives=_process_revision_directives,
version_table_schema=_version_table_schema,
version_table=_project_settings.alembic_version_table_name,
include_schemas=True,
include_object=_include_object,
)
with context.begin_transaction():
context.run_migrations()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()