-
Notifications
You must be signed in to change notification settings - Fork 18
Expand file tree
/
Copy pathdatabase_ops.py
More file actions
85 lines (68 loc) · 3.11 KB
/
database_ops.py
File metadata and controls
85 lines (68 loc) · 3.11 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
import sqlalchemy
from . import backend_types_sql as bts
def create_db_engine_and_migrate_db(
database_uri: str,
**kwargs,
) -> sqlalchemy.Engine:
db_engine = create_db_engine(database_uri=database_uri, **kwargs)
bts._TableBase.metadata.create_all(db_engine)
migrate_db(db_engine=db_engine)
return db_engine
def initialize_and_migrate_db(db_engine: sqlalchemy.Engine):
bts._TableBase.metadata.create_all(db_engine)
migrate_db(db_engine=db_engine)
def create_db_engine(
database_uri: str,
**kwargs,
) -> sqlalchemy.Engine:
if database_uri.startswith("mysql://"):
try:
import MySQLdb
except ImportError:
# Using PyMySQL instead of missing MySQLdb
database_uri = database_uri.replace("mysql://", "mysql+pymysql://")
create_engine_kwargs = {}
if database_uri == "sqlite://":
create_engine_kwargs["poolclass"] = sqlalchemy.pool.StaticPool
if database_uri.startswith("sqlite://"):
# FastApi claims it's needed and safe: https://fastapi.tiangolo.com/tutorial/sql-databases/#create-an-engine
create_engine_kwargs.setdefault("connect_args", {})["check_same_thread"] = False
# https://docs.sqlalchemy.org/en/14/dialects/sqlite.html#using-a-memory-database-in-multiple-threads
if create_engine_kwargs.get("poolclass") != sqlalchemy.pool.StaticPool:
# Preventing the "MySQL server has gone away" error:
# https://docs.sqlalchemy.org/en/20/faq/connections.html#mysql-server-has-gone-away
create_engine_kwargs["pool_recycle"] = 3600
create_engine_kwargs["pool_pre_ping"] = True
if kwargs:
create_engine_kwargs.update(kwargs)
db_engine = sqlalchemy.create_engine(
url=database_uri,
**create_engine_kwargs,
)
return db_engine
def migrate_db(db_engine: sqlalchemy.Engine):
# # Example:
# sqlalchemy.Index(
# "ix_pipeline_run_created_by_created_at_desc",
# bts.PipelineRun.created_by,
# bts.PipelineRun.created_at.desc(),
# ).create(db_engine, checkfirst=True)
# index1 = sqlalchemy.Index(
# "ix_execution_node_container_execution_cache_key",
# bts.ExecutionNode.container_execution_cache_key,
# )
# index1.create(db_engine, checkfirst=True)
# SqlAlchemy's Index constructor is broken and adds indexes to the table definition (even if they are duplicate)
# See https://github.com/sqlalchemy/sqlalchemy/issues/12965
# See https://github.com/sqlalchemy/sqlalchemy/discussions/12420
# To work around that issue we either need to remove the index from the table
# bts.ExecutionNode.__table__.indexes.remove(index1)
# Or we need to avoid calling the Index constructor.
for index in bts.ExecutionNode.__table__.indexes:
if index.name == bts.ExecutionNode._IX_EXECUTION_NODE_CACHE_KEY:
index.create(db_engine, checkfirst=True)
break
for index in bts.PipelineRunAnnotation.__table__.indexes:
if index.name == bts.PipelineRunAnnotation._IX_ANNOTATION_RUN_ID_KEY_VALUE:
index.create(db_engine, checkfirst=True)
break