-
Notifications
You must be signed in to change notification settings - Fork 55
Expand file tree
/
Copy pathforwarder.ts
More file actions
550 lines (495 loc) · 17.6 KB
/
forwarder.ts
File metadata and controls
550 lines (495 loc) · 17.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
import Service from "serverless/classes/Service";
import { FunctionInfo } from "./layer";
import { version } from "../package.json";
import Aws = require("serverless/plugins/aws/provider/awsProvider");
const logGroupKey = "AWS::Logs::LogGroup";
const logGroupSubscriptionKey = "AWS::Logs::SubscriptionFilter";
const maxAllowableLogGroupSubscriptions: number = 2;
class DatadogForwarderNotFoundError extends Error {
constructor(message: string) {
super(...message);
this.name = "DatadogForwarderNotFoundError";
this.message = message;
}
}
interface LogGroupResource {
Type: typeof logGroupKey;
Properties: {
LogGroupName: string;
};
}
interface ForwarderConfigs {
AddExtension: boolean;
TestingMode: boolean | undefined;
IntegrationTesting: boolean | undefined;
SubToAccessLogGroups: boolean;
SubToExecutionLogGroups: boolean;
}
interface SubscriptionFilter {
creationTime: number;
destinationArn: string;
distribution: string;
filterName: string;
filterPattern: string;
logGroupName: string;
roleArn: string;
}
interface DescribeSubscriptionFiltersResponse {
subscriptionFilters: SubscriptionFilter[];
}
type SubLogsConfig =
| boolean
| {
accessLogging: boolean | undefined;
executionLogging: boolean | undefined;
}
| undefined;
type LogsConfig =
| {
restApi: SubLogsConfig;
httpApi: SubLogsConfig;
websocket: SubLogsConfig;
}
| undefined;
const REST_EXECUTION_LOG_GROUP_KEY = "RestExecutionLogGroup";
const REST_EXECUTION_SUBSCRIPTION_KEY = "RestExecutionLogGroupSubscription";
const WEBSOCKETS_EXECUTION_LOG_GROUP_KEY = "WebsocketsExecutionLogGroup";
const WEBSOCKETS_EXECUTION_SUBSCRIPTION_KEY = "WebsocketsExecutionLogGroupSubscription";
/** A single CloudFormation resource */
interface CfnResource {
Type: string;
Properties?: Record<string, unknown>;
[key: string]: unknown;
}
/** The Resources block of a compiled CloudFormation template */
export type CfnResourceMap = Record<string, CfnResource>;
/** A compiled AWS::StepFunctions::StateMachine CFN resource */
export interface StateMachineCfnResource {
Properties?: {
Tags?: { Key: string; Value: string }[];
[key: string]: unknown;
};
[key: string]: unknown;
}
/** Logging config for a step function (from serverless-step-functions plugin) */
interface StepFunctionLoggingConfig {
level: string;
includeExecutionData: boolean;
destinations: unknown[];
}
/** A step function entry from stepFunctions.stateMachines */
export interface StepFunctionConfig {
name: string;
loggingConfig?: StepFunctionLoggingConfig;
}
/** A CloudFormation intrinsic function value */
type CfnIntrinsicValue = Record<string, unknown>;
// When users define ARN with CloudFormation functions, the ARN takes this type instead of a string.
export interface CloudFormationObjectArn {
"Fn::Sub"?: string;
"arn:aws"?: string;
}
function isLogGroup(value: unknown): value is LogGroupResource {
return (value as LogGroupResource).Type === logGroupKey;
}
/**
* Validates whether Lambda forwarder exists in the account
* @param aws Serverless framework provided AWS client
* @param functionArn The forwarder ARN to be validated
*/
async function validateForwarderArn(aws: Aws, functionArn: CloudFormationObjectArn | string): Promise<void> {
try {
await aws.request("Lambda", "getFunction", { FunctionName: functionArn });
} catch (err) {
throw new DatadogForwarderNotFoundError(`Could not perform GetFunction on ${functionArn}.`);
}
}
export async function addExecutionLogGroupsAndSubscriptions(
service: Service,
aws: Aws,
functionArn: CloudFormationObjectArn | string,
): Promise<void> {
const extendedProvider = (service.provider as unknown as { logs?: LogsConfig })?.logs;
if (!isLogsConfig(extendedProvider)) {
return;
}
const resources = service.provider.compiledCloudFormationTemplate?.Resources;
if (restExecutionLoggingIsEnabled(extendedProvider)) {
// create log group
const logGroupName = await createRestExecutionLogGroupName(aws);
const executionLogGroupName = addExecutionLogGroup(logGroupName);
resources[REST_EXECUTION_LOG_GROUP_KEY] = executionLogGroupName;
// add subscription
const executionSubscription = subscribeToExecutionLogGroup(functionArn, REST_EXECUTION_LOG_GROUP_KEY);
resources[REST_EXECUTION_SUBSCRIPTION_KEY] = executionSubscription;
}
if (websocketExecutionLoggingIsEnabled(extendedProvider)) {
// create log group
const logGroupName = await createWebsocketExecutionLogGroupName(aws);
const executionLogGroupName = addExecutionLogGroup(logGroupName);
// add subscription
resources[WEBSOCKETS_EXECUTION_LOG_GROUP_KEY] = executionLogGroupName;
const executionSubscription = subscribeToExecutionLogGroup(functionArn, WEBSOCKETS_EXECUTION_LOG_GROUP_KEY);
resources[WEBSOCKETS_EXECUTION_SUBSCRIPTION_KEY] = executionSubscription;
}
}
export async function addStepFunctionLogGroup(
aws: Aws,
resources: CfnResourceMap,
stepFunction: StepFunctionConfig,
): Promise<void> {
const stepFunctionName = stepFunction.name;
const logGroupName = `/aws/vendedlogs/states/${stepFunctionName}-Logs-${aws.getStage()}`;
const logGroupResourceName = `${normalizeResourceName(stepFunctionName)}LogGroup`;
// create log group and add it to compiled CloudFormation template
resources[logGroupResourceName] = {
Type: logGroupKey,
Properties: {
LogGroupName: logGroupName,
Tags: [{ Key: "dd_sls_plugin", Value: `v${version}` }],
},
};
// add logging config to step function in serverless.yaml using newly created log group
// the serverless-step-functions plugin handles the IAM policy creation for the adding logs to the log group
stepFunction.loggingConfig = {
level: "ALL",
includeExecutionData: true,
destinations: [{ "Fn::GetAtt": [logGroupResourceName, "Arn"] }],
};
}
export function addDdSlsPluginTag(stateMachineObj: StateMachineCfnResource): void {
stateMachineObj.Properties?.Tags?.push({
Key: "dd_sls_plugin",
Value: `v${version}`,
});
}
export function addDdTraceEnabledTag(
stateMachineObj: StateMachineCfnResource,
enableStepFunctionsTracing: undefined | boolean,
): void {
if (!enableStepFunctionsTracing) {
return;
}
stateMachineObj.Properties?.Tags?.push({
Key: "DD_TRACE_ENABLED",
Value: `true`,
});
}
export async function addStepFunctionLogGroupSubscription(
resources: CfnResourceMap,
stepFunction: StepFunctionConfig,
functionArn: CloudFormationObjectArn | string,
): Promise<void> {
const logGroupSubscriptionResourceName = `${normalizeResourceName(stepFunction.name)}LogGroupSubscription`;
// parse log group name out of arn in logging config destination
resources[logGroupSubscriptionResourceName] = {
Type: logGroupSubscriptionKey,
Properties: {
DestinationArn: functionArn,
FilterPattern: "",
LogGroupName: {
"Fn::Select": [
6,
{
"Fn::Split": [":", stepFunction.loggingConfig!.destinations[0]],
},
],
},
},
};
}
export async function addCloudWatchForwarderSubscriptions(
service: Service,
aws: Aws,
functionArn: CloudFormationObjectArn | string,
forwarderConfigs: ForwarderConfigs,
handlers: FunctionInfo[],
): Promise<string[]> {
const resources = service.provider.compiledCloudFormationTemplate?.Resources;
if (resources === undefined) {
return ["No cloudformation stack available. Skipping subscribing Datadog forwarder."];
}
const errors = [];
if (typeof functionArn !== "string") {
errors.push("Skipping forwarder ARN validation because forwarder string defined with CloudFormation function.");
} else if (forwarderConfigs.TestingMode === true || forwarderConfigs.IntegrationTesting === true) {
errors.push("Skipping forwarder ARN validation because 'testingMode' is set to true");
} else {
await validateForwarderArn(aws, functionArn);
}
for (const [name, resource] of Object.entries(resources)) {
if (!shouldSubscribe(name, resource, forwarderConfigs, handlers, service)) {
continue;
}
const logGroupName = resource.Properties.LogGroupName;
const scopedSubName = `${name}Subscription`;
let expectedSubName = `${service.getServiceName()}-${aws.getStage()}-${scopedSubName}-`;
const stackName = aws.naming.getStackName();
if (stackName) {
expectedSubName = `${stackName}-${scopedSubName}-`;
}
const canSub = await canSubscribeLogGroup(aws, logGroupName, expectedSubName);
if (!canSub) {
errors.push(
`Could not subscribe Datadog Forwarder due to too many existing subscription filter(s) for ${logGroupName}.`,
);
continue;
}
// Create subscriptions for each log group
const subscription = subscribeToLogGroup(functionArn, name);
resources[scopedSubName] = subscription;
}
return errors;
}
export async function canSubscribeLogGroup(aws: Aws, logGroupName: string, expectedSubName: string): Promise<boolean> {
const subscriptionFilters = await describeSubscriptionFilters(aws, logGroupName);
const numberOfActiveSubscriptionFilters: number = subscriptionFilters.length;
let foundDatadogSubscriptionFilter: boolean = false;
for (const subscription of subscriptionFilters) {
const filterName = subscription.filterName;
if (filterName.startsWith(expectedSubName)) {
foundDatadogSubscriptionFilter = true;
}
}
if (!foundDatadogSubscriptionFilter && numberOfActiveSubscriptionFilters >= maxAllowableLogGroupSubscriptions) {
return false;
} else {
return true;
}
}
export async function describeSubscriptionFilters(aws: Aws, logGroupName: string): Promise<SubscriptionFilter[]> {
try {
const result: DescribeSubscriptionFiltersResponse = await aws.request(
"CloudWatchLogs",
"describeSubscriptionFilters",
{
logGroupName,
},
);
return result.subscriptionFilters;
} catch (err) {
// An error will occur if the log group doesn't exist, so we swallow this and return an empty list.
return [];
}
}
// Helper functions to validate we have a particular log group and if we should subscribe to it
function validateRestApiSubscription(
resource: LogGroupResource,
subscribe: boolean,
extendedProvider: LogsConfig,
): boolean {
return (
restAccessLoggingIsEnabled(extendedProvider) &&
resource.Properties.LogGroupName.startsWith("/aws/api-gateway/") &&
subscribe
);
}
function validateHttpApiSubscription(
resource: LogGroupResource,
subscribe: boolean,
extendedProvider: LogsConfig,
): boolean {
return (
httpAccessLoggingIsEnabled(extendedProvider) &&
resource.Properties.LogGroupName.startsWith("/aws/http-api/") &&
subscribe
);
}
function validateWebsocketSubscription(
resource: LogGroupResource,
subscribe: boolean,
extendedProvider: LogsConfig,
): boolean {
return (
websocketAccessLoggingIsEnabled(extendedProvider) &&
resource.Properties.LogGroupName.startsWith("/aws/websocket/") &&
subscribe
);
}
function shouldSubscribe(
resourceName: string,
resource: unknown,
forwarderConfigs: ForwarderConfigs,
handlers: FunctionInfo[],
service: Service,
): boolean {
const extendedProvider = (service.provider as unknown as { logs?: LogsConfig })?.logs;
if (!isLogGroup(resource)) {
return false;
}
// we don't want to run the shouldSubscribe validation on execution log groups since we manually add those.
if (typeof resource.Properties.LogGroupName !== "string") {
return false;
}
/*
Step function log groups created as custom resources in serverless.yml need to be subscribed to using the log group in
the step function loggingConfig since custom resources are not in the complied cloudformation template until a later lifecycle event.
Step function log groups created outside of serverless.yml need to be subscribed to using the log group in
the step function loggingConfig since these log groups will never be in the compiled cloudformation template.
Step function log groups created by this plugin are also subscribed to using the log group in the step function loggingConfig
for consistency with step function log groups created with the above methods.
*/
if (resource.Properties.LogGroupName.startsWith("/aws/vendedlogs/states/")) {
return false;
}
// if the extension is enabled, we don't want to subscribe to lambda log groups
if (
forwarderConfigs.AddExtension &&
!(
validateRestApiSubscription(resource, forwarderConfigs.SubToAccessLogGroups, extendedProvider) ||
validateHttpApiSubscription(resource, forwarderConfigs.SubToAccessLogGroups, extendedProvider) ||
validateWebsocketSubscription(resource, forwarderConfigs.SubToAccessLogGroups, extendedProvider)
)
) {
return false;
}
// if the extension is disabled, we should subscribe to lambda log groups
if (
!(
resource.Properties.LogGroupName.startsWith("/aws/lambda/") ||
validateRestApiSubscription(resource, forwarderConfigs.SubToAccessLogGroups, extendedProvider) ||
validateHttpApiSubscription(resource, forwarderConfigs.SubToAccessLogGroups, extendedProvider) ||
validateWebsocketSubscription(resource, forwarderConfigs.SubToAccessLogGroups, extendedProvider)
)
) {
return true;
}
// If the log group does not belong to our list of handlers, we don't want to subscribe to it
if (
resource.Properties.LogGroupName.startsWith("/aws/lambda/") &&
!handlers.some(({ name }) => getLogGroupLogicalId(name) === resourceName)
) {
return false;
}
return true;
}
function subscribeToLogGroup(functionArn: string | CloudFormationObjectArn, name: string) {
const subscription = {
Type: logGroupSubscriptionKey,
Properties: {
DestinationArn: functionArn,
FilterPattern: "",
LogGroupName: { Ref: name },
},
};
return subscription;
}
async function createRestExecutionLogGroupName(aws: Aws) {
return {
"Fn::Join": ["", ["API-Gateway-Execution-Logs_", { Ref: "ApiGatewayRestApi" }, "/", aws.getStage()]],
};
}
async function createWebsocketExecutionLogGroupName(aws: Aws) {
return {
"Fn::Join": ["", ["/aws/apigateway/", { Ref: "WebsocketsApi" }, "/", aws.getStage()]],
};
}
function addExecutionLogGroup(logGroupName: string | CfnIntrinsicValue) {
// Create the Execution log group for API Gateway REST logging manually
const executionLogGroup = {
Type: "AWS::Logs::LogGroup",
Properties: {
LogGroupName: logGroupName,
},
};
return executionLogGroup;
}
function subscribeToExecutionLogGroup(functionArn: string | CloudFormationObjectArn, executionLogGroupKey: string) {
const executionSubscription = {
Type: logGroupSubscriptionKey,
Properties: {
DestinationArn: functionArn,
FilterPattern: "",
LogGroupName: { Ref: executionLogGroupKey },
},
};
return executionSubscription;
}
export function isLogsConfig(obj: unknown): obj is LogsConfig {
if (typeof obj !== "object" || obj === null) {
return false;
}
const record = obj as Record<string, unknown>;
if (record.hasOwnProperty("restApi")) {
if (!isSubLogsConfig(record.restApi)) {
return false;
}
}
if (record.hasOwnProperty("httpApi")) {
if (!isSubLogsConfig(record.httpApi)) {
return false;
}
}
if (record.hasOwnProperty("websocket")) {
if (!isSubLogsConfig(record.websocket)) {
return false;
}
}
return true;
}
function isSubLogsConfig(obj: unknown): obj is SubLogsConfig {
if (typeof obj === "boolean") {
return true;
}
if (typeof obj !== "object" || obj === null) {
return false;
}
const record = obj as Record<string, unknown>;
if (record.hasOwnProperty("accessLogging")) {
if (typeof record.accessLogging !== "boolean" && typeof record.accessLogging !== undefined) {
return false;
}
}
if (record.hasOwnProperty("executionLogging")) {
if (typeof record.executionLogging !== "boolean" && typeof record.executionLogging !== undefined) {
return false;
}
}
return true;
}
function restAccessLoggingIsEnabled(obj: LogsConfig): boolean {
if (obj?.restApi === false) {
return false;
}
return obj?.restApi === true || obj?.restApi?.accessLogging === true;
}
function restExecutionLoggingIsEnabled(obj: LogsConfig): boolean {
if (obj?.restApi === false) {
return false;
}
return obj?.restApi === true || obj?.restApi?.executionLogging === true;
}
function httpAccessLoggingIsEnabled(obj: LogsConfig): boolean {
if (obj?.httpApi === false) {
return false;
}
return obj?.httpApi === true || obj?.httpApi?.accessLogging === true;
}
function websocketAccessLoggingIsEnabled(obj: LogsConfig): boolean {
if (obj?.websocket === false) {
return false;
}
return obj?.websocket === true || obj?.websocket?.accessLogging === true;
}
function websocketExecutionLoggingIsEnabled(obj: LogsConfig): boolean {
if (obj?.websocket === false) {
return false;
}
return obj?.websocket === true || obj?.websocket?.executionLogging === true;
}
// Created from https://github.com/serverless/serverless/blob/master/lib/plugins/aws/lib/naming.js#L125-L127
// Skipped lodash because Lambda Function Names can't include unicode chars or symbols
function getLogGroupLogicalId(functionName: string): string {
if (!functionName) {
return "";
}
const uppercasedFirst = functionName[0].toUpperCase();
const rest = functionName.slice(1);
const upperCasedFunctionName = uppercasedFirst + rest;
const normalizedFunctionName = upperCasedFunctionName.replace(/-/g, "Dash").replace(/_/g, "Underscore");
return `${normalizedFunctionName}LogGroup`;
}
// Resource names in CloudFormation Templates can only have alphanumeric characters
function normalizeResourceName(resourceName: string): string {
return resourceName.replace(/[^0-9a-z]/gi, "");
}