-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathlambda_function.py
More file actions
116 lines (103 loc) · 3.82 KB
/
lambda_function.py
File metadata and controls
116 lines (103 loc) · 3.82 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
import json
import os
from nypl_py_utils.classes.kms_client import KmsClient
from nypl_py_utils.classes.redshift_client import RedshiftClient
from nypl_py_utils.functions.log_helper import create_log
logger = create_log("lambda_function")
_MAIN_DELETION_QUERY = """
DELETE FROM {main_table}
USING {staging_table}
WHERE {main_table}.patron_id = {staging_table}.patron_id;"""
_DUPLICATE_DELETION_QUERY = """
DELETE FROM {staging_table}
WHERE patron_id IN ({duplicate_ids});"""
_DUPLICATES_QUERY = """
SELECT *
FROM {staging_table} JOIN
(
SELECT patron_id, COUNT(patron_id) AS patron_count
FROM {staging_table}
GROUP BY patron_id
) t
ON t.patron_id = {staging_table}.patron_id
WHERE patron_count > 1;"""
def lambda_handler(event, context):
logger.info("Starting lambda processing")
kms_client = KmsClient()
redshift_client = RedshiftClient(
kms_client.decrypt(os.environ["REDSHIFT_DB_HOST"]),
os.environ["REDSHIFT_DB_NAME"],
kms_client.decrypt(os.environ["REDSHIFT_DB_USER"]),
kms_client.decrypt(os.environ["REDSHIFT_DB_PASSWORD"]),
)
kms_client.close()
redshift_client.connect()
logger.info("Checking for duplicate records")
raw_duplicates = redshift_client.execute_query(
_DUPLICATES_QUERY.format(staging_table=os.environ["STAGING_TABLE"])
)
unique_map = {}
for row in raw_duplicates:
id = row[0]
if id not in unique_map:
unique_map[id] = row
elif unique_map[id] != row:
logger.error("Duplicate patron ids with different values found")
raise ReplaceRedshiftDataError(
"Duplicate patron ids with different values found"
)
# If there are duplicate rows, delete all of them and insert each row back
# individually into the staging table
if len(unique_map.keys()) > 0:
duplicate_ids = "'" + "','".join(unique_map.keys()) + "'"
queries = [
(
_DUPLICATE_DELETION_QUERY.format(
staging_table=os.environ["STAGING_TABLE"],
duplicate_ids=duplicate_ids,
),
None,
)
]
# Need to include %s for each value that's being inserted. This is
# len(row)-2 because the row contains two extra fields from the join.
placeholder_length = len(next(iter(unique_map.values()))) - 2
placeholder = ", ".join(["%s"] * placeholder_length)
insert_query = "INSERT INTO {staging_table} VALUES ({placeholder});".format(
staging_table=os.environ["STAGING_TABLE"], placeholder=placeholder
)
queries.append((insert_query, [v[:-2] for v in unique_map.values()]))
redshift_client.execute_transaction(queries)
redshift_client.execute_transaction(
[
(
_MAIN_DELETION_QUERY.format(
main_table=os.environ["MAIN_TABLE"],
staging_table=os.environ["STAGING_TABLE"],
),
None,
),
(
"INSERT INTO {main_table} SELECT * FROM {staging_table};".format(
main_table=os.environ["MAIN_TABLE"],
staging_table=os.environ["STAGING_TABLE"],
),
None,
),
(
"DELETE FROM {staging_table};".format(
staging_table=os.environ["STAGING_TABLE"]
),
None,
),
]
)
redshift_client.close_connection()
logger.info("Finished lambda processing")
return {
"statusCode": 200,
"body": json.dumps({"message": "Job ran successfully."}),
}
class ReplaceRedshiftDataError(Exception):
def __init__(self, message=None):
self.message = message