-
Notifications
You must be signed in to change notification settings - Fork 6
Expand file tree
/
Copy pathdata_connections.py
More file actions
100 lines (77 loc) · 3.24 KB
/
data_connections.py
File metadata and controls
100 lines (77 loc) · 3.24 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
"""
Purpose of script: handles reading data in and writing data back out.
"""
import sqlalchemy as sa
import os
import pandas as pd
import logging
logger = logging.getLogger(__name__)
def read_sql_file(sql_file_path: str, sql_file_name: str, database, schema, table) -> str:
"""Reads a SQL file and replaces placeholders with given fields.
Parameters:
sql_file_name: .sql file name
database: database name
table: table name
schema: schema name
sql folder: location of sql file (relative to root directory)
Returns:
string
"""
logger.info(f"Reading in SQL script: {sql_file_name} from: {sql_file_path}.")
with open(os.path.join(sql_file_path, sql_file_name), 'r') as sql_file:
sql_query = sql_file.read()
sql_params = {'database': database, 'schema': schema, 'table': table}
new_sql_query = sql_query.format(**sql_params)
return new_sql_query
def make_database_connection(server, database,autocommit=True):
"""Creates SQL Server connection.
"""
conn = sa.create_engine(f"mssql+pyodbc://{server}/{database}?driver=SQL+Server&trusted_connection=yes",
fast_executemany=True)
conn.execution_options(autocommit=autocommit)
return conn
def execute_sql(query, conn):
"""
Use sqlalchemy to connect to the SQL server and database with the help
of mssql and pyodbc packages to execute a SQL query
Inputs:
query: string containing a sql query
conn: SQLAlchemy engine connection to the SQL database - defaults to WPRI_GP_LIVE database unless specified
Output:
response: whatever is returned by SQL alchemy, which can be rows of data, or nothing, if the query didn't
return anything
"""
with conn.connect() as connection:
logger.info("Executing submitted SQL statetment")
response = connection.execute(sa.text(query))
logger.info("Successfully executed submitted SQL statetment")
return response
def get_df_from_server(conn, server, database, query) -> pd.DataFrame:
"""Constructs a pandas DataFrame from running a SQL query on a given SQL server using SQL Alchemy .
Requires mssql and pyodbc packages.
Parameters:
server: server name
database: database name
query: string containing a sql query
Returns:
pandas Dataframe
"""
logger.info("Reading in dataframe from SQL Server.")
conn.execution_options(autocommit=True)
logger.info(f"Getting dataframe from SQL database {database}")
logger.info(f"Running query:\n\n {query}")
df = pd.read_sql_query(query, conn)
return df
def write_df_to_server(conn, server, database, df_to_write, table_name) -> None:
"""Writes a pandas DataFrame to a table on a given SQL server using SQL Alchemy.
Requires mssql and pyodbc packages.
Parameters:
database: database name
df_to_write: df to write to a SQL Server table
table_name: SQL Server table name
Returns
Write to a SQL Server table.
"""
logger.info(f"Writing dataframe to SQL Server designated {table_name}.")
conn.execution_options(autocommit=True)
df_to_write.to_sql(name=table_name, con=conn, if_exists='fail', index=False)