-
Notifications
You must be signed in to change notification settings - Fork 473
Expand file tree
/
Copy pathpowertools_cdk_stack.py
More file actions
144 lines (123 loc) · 4.86 KB
/
powertools_cdk_stack.py
File metadata and controls
144 lines (123 loc) · 4.86 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
from aws_cdk import (
Duration,
RemovalPolicy,
Stack,
)
from aws_cdk import (
aws_apigateway as apigateway,
)
from aws_cdk import (
aws_dynamodb as dynamodb,
)
from aws_cdk import (
aws_lambda as _lambda,
)
from aws_cdk import (
aws_lambda_event_sources as lambda_event_sources,
)
from aws_cdk import (
aws_sqs as sqs,
)
from constructs import Construct
class PowertoolsStack(Stack):
def __init__(self, scope: Construct, construct_id: str, environment: str = "dev", **kwargs) -> None:
super().__init__(scope, construct_id, **kwargs)
self.deploy_environment = environment
# Shared Powertools Layer (using public layer)
self.powertools_layer = self._create_powertools_layer()
# DynamoDB Table
self.table = self._create_dynamodb_table()
# SQS Queue
self.queue = self._create_sqs_queue()
# Lambda Functions
self.api_function = self._create_api_function()
self.worker_function = self._create_worker_function()
# API Gateway
self.api = self._create_api_gateway()
def _create_powertools_layer(self) -> _lambda.ILayerVersion:
return _lambda.LayerVersion.from_layer_version_arn(
self,
"PowertoolsLayer",
layer_version_arn="arn:aws:lambda:us-east-1:017000801446:layer:AWSLambdaPowertoolsPythonV3-python313-x86_64:28",
)
def _create_dynamodb_table(self) -> dynamodb.Table:
return dynamodb.Table(
self,
"DataTable",
table_name=f"powertools-{self.deploy_environment}-data",
partition_key=dynamodb.Attribute(name="pk", type=dynamodb.AttributeType.STRING),
billing_mode=dynamodb.BillingMode.PAY_PER_REQUEST,
removal_policy=RemovalPolicy.DESTROY if self.deploy_environment != "prod" else RemovalPolicy.RETAIN,
)
def _create_sqs_queue(self) -> sqs.Queue:
return sqs.Queue(
self,
"WorkerQueue",
queue_name=f"powertools-{self.deploy_environment}-worker",
visibility_timeout=Duration.seconds(180),
)
def _create_api_function(self) -> _lambda.Function:
function = _lambda.Function(
self,
"ApiFunction",
runtime=_lambda.Runtime.PYTHON_3_13,
handler="app.lambda_handler",
code=_lambda.Code.from_asset("src/app"),
layers=[self.powertools_layer],
timeout=Duration.seconds(30),
memory_size=512 if self.deploy_environment == "prod" else 256,
environment={
"ENVIRONMENT": self.deploy_environment,
"POWERTOOLS_SERVICE_NAME": f"app-{self.deploy_environment}",
"POWERTOOLS_METRICS_NAMESPACE": f"MyApp/{self.deploy_environment}",
"POWERTOOLS_LOG_LEVEL": "INFO" if self.deploy_environment == "prod" else "DEBUG",
"TABLE_NAME": self.table.table_name,
"QUEUE_URL": self.queue.queue_url,
},
)
# Grant permissions
self.table.grant_read_write_data(function)
self.queue.grant_send_messages(function)
return function
def _create_worker_function(self) -> _lambda.Function:
function = _lambda.Function(
self,
"WorkerFunction",
runtime=_lambda.Runtime.PYTHON_3_13,
handler="worker.lambda_handler",
code=_lambda.Code.from_asset("src/worker"),
layers=[self.powertools_layer],
timeout=Duration.seconds(120),
memory_size=1024 if self.deploy_environment == "prod" else 512,
environment={
"ENVIRONMENT": self.deploy_environment,
"POWERTOOLS_SERVICE_NAME": f"worker-{self.deploy_environment}",
"POWERTOOLS_METRICS_NAMESPACE": f"MyApp/{self.deploy_environment}",
"POWERTOOLS_LOG_LEVEL": "INFO" if self.deploy_environment == "prod" else "DEBUG",
"TABLE_NAME": self.table.table_name,
},
)
# Add SQS event source with partial failure support
function.add_event_source(
lambda_event_sources.SqsEventSource(
self.queue,
batch_size=10,
report_batch_item_failures=True,
),
)
# Grant permissions
self.table.grant_read_write_data(function)
return function
def _create_api_gateway(self) -> apigateway.RestApi:
api = apigateway.RestApi(
self,
"ApiGateway",
rest_api_name=f"Powertools API - {self.deploy_environment}",
description=f"API for {self.deploy_environment} environment",
)
integration = apigateway.LambdaIntegration(self.api_function)
api.root.add_proxy(
default_integration=integration,
any_method=True,
)
return api