Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/run-e2e-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ jobs:
packages/parameters,
packages/event-handler,
packages/tracer,
packages/batch,
layers,
]
version: [20, 22, 24]
Expand Down
23,705 changes: 9,642 additions & 14,063 deletions package-lock.json

Large diffs are not rendered by default.

9 changes: 5 additions & 4 deletions packages/batch/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@
"test:unit": "vitest --run",
"test:unit:coverage": "vitest --run tests/unit --coverage.enabled --coverage.thresholds.100 --coverage.include='src/**'",
"test:unit:types": "echo 'Not Implemented'",
"test:e2e:nodejs20x": "echo 'Not Implemented'",
"test:e2e:nodejs22x": "echo 'Not Implemented'",
"test:e2e:nodejs24x": "echo 'Not Implemented'",
"test:e2e": "echo 'Not Implemented'",
"test:e2e:nodejs20x": "RUNTIME=nodejs20x vitest --run tests/e2e",
"test:e2e:nodejs22x": "RUNTIME=nodejs22x vitest --run tests/e2e",
"test:e2e:nodejs24x": "RUNTIME=nodejs24x vitest --run tests/e2e",
"test:e2e": "vitest --run tests/e2e",
"build:cjs": "tsc --build tsconfig.cjs.json && echo '{ \"type\": \"commonjs\" }' > lib/cjs/package.json",
"build:esm": "tsc --build tsconfig.json && echo '{ \"type\": \"module\" }' > lib/esm/package.json",
"build": "npm run build:esm & npm run build:cjs",
Expand Down Expand Up @@ -90,6 +90,7 @@
"devDependencies": {
"@aws-lambda-powertools/parser": "2.30.2",
"@aws-lambda-powertools/testing-utils": "file:../testing",
"@aws-sdk/client-sqs": "^3.971.0",
"zod": "^4.3.6"
}
}
3 changes: 3 additions & 0 deletions packages/batch/tests/e2e/constants.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
const RESOURCE_NAME_PREFIX = 'Batch';

export { RESOURCE_NAME_PREFIX };
52 changes: 52 additions & 0 deletions packages/batch/tests/e2e/sqsStandard.test.FunctionCode.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import {
BatchProcessor,
EventType,
processPartialResponse,
} from '@aws-lambda-powertools/batch';
import { parser } from '@aws-lambda-powertools/batch/parser';
import type { ParsedRecord } from '@aws-lambda-powertools/batch/types';
import { Logger } from '@aws-lambda-powertools/logger';
import type { SQSHandler, SQSRecord } from 'aws-lambda';
import { z } from 'zod/mini';

const logger = new Logger({ serviceName: 'sqsStandardBatchProcessor' });
const innerSchema = z.object({ message: z.string(), shouldFail: z.boolean() });
const processor = new BatchProcessor(EventType.SQS, {
logger,
parser,
innerSchema,
transformer: 'json',
});

const recordHandler = ({
body: { shouldFail },
}: ParsedRecord<SQSRecord, z.infer<typeof innerSchema>>): void => {
if (shouldFail) {
throw new Error('Simulated processing failure');
}
};

export const handler: SQSHandler = async (event, context) => {
logger.addContext(context);
logger.info('messages', {
messageCount: event.Records.length,
messagesWillFail: event.Records.filter((r) => {
const parsed = JSON.parse(r.body);
return parsed.shouldFail === true || parsed.shouldFail === undefined;
}).map((r) => r.messageId),
messagesWillSucceed: event.Records.filter(
(r) => JSON.parse(r.body).shouldFail === false
).map((r) => r.messageId),
});
const response = await processPartialResponse(
event,
recordHandler,
processor,
{ context, throwOnFullBatchFailure: false }
);
logger.info('response', {
batchItemFailures: response.batchItemFailures,
});

return response;
};
135 changes: 135 additions & 0 deletions packages/batch/tests/e2e/sqsStandard.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
import { join } from 'node:path';
import { LogTailer, TestStack } from '@aws-lambda-powertools/testing-utils';
import { TestNodejsFunction } from '@aws-lambda-powertools/testing-utils/resources/lambda';
import type { ParsedLog } from '@aws-lambda-powertools/testing-utils/types';
import { CfnOutput, Duration, RemovalPolicy } from 'aws-cdk-lib';
import { LoggingFormat } from 'aws-cdk-lib/aws-lambda';
import { SqsEventSource } from 'aws-cdk-lib/aws-lambda-event-sources';
import { Queue } from 'aws-cdk-lib/aws-sqs';
import { afterAll, beforeAll, describe, expect, it } from 'vitest';
import { sendMessagesToQueue } from '../helpers/publishers.js';
import { RESOURCE_NAME_PREFIX } from './constants.js';

const lambdaFunctionCodeFilePath = join(
__dirname,
'sqsStandard.test.FunctionCode.ts'
);

describe('SQS Standard Queue Batch Processing', () => {
const testName = 'SqsStandard';
const testStack = new TestStack({
stackNameProps: {
stackNamePrefix: RESOURCE_NAME_PREFIX,
testName,
},
});

let logGroupArn: string;
let queueUrl: string;

const lambdaFunction = new TestNodejsFunction(
testStack,
{
entry: lambdaFunctionCodeFilePath,
environment: {
POWERTOOLS_SERVICE_NAME: 'batch-e2e-test',
},
loggingFormat: LoggingFormat.JSON,
},
{
nameSuffix: 'fn',
}
);
new CfnOutput(testStack.stack, 'LogGroupArn', {
value: lambdaFunction.logGroup.logGroupArn,
});

const queue = new Queue(testStack.stack, 'BatchTestQueue', {
removalPolicy: RemovalPolicy.DESTROY,
deadLetterQueue: {
queue: new Queue(testStack.stack, 'BatchTestDLQ', {
removalPolicy: RemovalPolicy.DESTROY,
}),
maxReceiveCount: 1,
},
});
lambdaFunction.addEventSource(
new SqsEventSource(queue, {
batchSize: 10,
reportBatchItemFailures: true,
maxConcurrency: 2,
maxBatchingWindow: Duration.seconds(10),
})
);
new CfnOutput(testStack.stack, 'BatchTestQueueUrl', {
value: queue.queueUrl,
});

beforeAll(async () => {
await testStack.deploy();

logGroupArn = testStack.findAndGetStackOutputValue('LogGroupArn');
queueUrl = testStack.findAndGetStackOutputValue('BatchTestQueueUrl');
});

it('processes mixed success and failure messages correctly', async () => {
const logTailer = new LogTailer(logGroupArn);

const requestsLogs = await logTailer.collectLogs({
testFn: async () => {
await sendMessagesToQueue(queueUrl, [
{ message: 'success-1', shouldFail: false },
{ message: 'failure-1', shouldFail: true },
{ message: 'success-2', shouldFail: false },
{ foo: 'bar' }, // shouldFail undefined, treated as failure
]);
},
waitForIdle: true,
});

const logs = {
messagesIdToFail: [] as string[],
messagesIdToSucceed: [] as string[],
response: { batchItemFailures: [] as Array<{ itemIdentifier: string }> },
};
for (const logEntries of requestsLogs.values()) {
const messageLog = logEntries.find(
(entry: ParsedLog) => entry.message === 'messages'
);
(messageLog as { messagesWillFail: string[] }).messagesWillFail.forEach(
(id: string) => {
logs.messagesIdToFail.push(id);
}
);
(
messageLog as { messagesWillSucceed: string[] }
).messagesWillSucceed.forEach((id: string) => {
logs.messagesIdToSucceed.push(id);
});
const responseLog = logEntries.find(
(entry: ParsedLog) => entry.message === 'response'
);
logs.response.batchItemFailures.push(
...(
responseLog as {
batchItemFailures: Array<{ itemIdentifier: string }>;
}
).batchItemFailures
);
}

expect(logs.messagesIdToFail.length).toBe(2);
expect(logs.messagesIdToSucceed.length).toBe(2);
expect(logs.response.batchItemFailures.length).toBe(2);
expect(logs.response.batchItemFailures).toEqual([
{ itemIdentifier: logs.messagesIdToFail[0] },
{ itemIdentifier: logs.messagesIdToFail[1] },
]);
});

afterAll(async () => {
if (!process.env.DISABLE_TEARDOWN) {
await testStack.destroy();
}
});
});
26 changes: 26 additions & 0 deletions packages/batch/tests/helpers/publishers.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import { SendMessageBatchCommand, SQSClient } from '@aws-sdk/client-sqs';

const sqsClient = new SQSClient({});

/**
* Send a batch of messages to an SQS queue.
*
* @param queueUrl - The URL of the SQS queue
* @param messages - An array of messages to send, each with a message body and a failure flag
*/
const sendMessagesToQueue = async (
queueUrl: string,
messages: Record<string, unknown>[]
) => {
await sqsClient.send(
new SendMessageBatchCommand({
QueueUrl: queueUrl,
Entries: messages.map((message, index) => ({
Id: `msg-${index}`,
MessageBody: JSON.stringify(message),
})),
})
);
};

export { sendMessagesToQueue };
2 changes: 2 additions & 0 deletions packages/batch/vitest.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,7 @@ export default defineProject({
test: {
environment: 'node',
setupFiles: ['../testing/src/setupEnv.ts'],
hookTimeout: 1_000 * 60 * 10, // 10 minutes
testTimeout: 1_000 * 60 * 3, // 3 minutes
},
});
1 change: 1 addition & 0 deletions packages/testing/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@
"dependencies": {
"@aws-cdk/toolkit-lib": "^1.14.1",
"@aws-sdk/client-lambda": "^3.975.0",
"@aws-sdk/client-cloudwatch-logs": "^3.975.0",
"@aws/lambda-invoke-store": "0.2.3",
"@smithy/util-utf8": "^4.0.0",
"aws-cdk-lib": "^2.236.0",
Expand Down
Loading
Loading