@@ -69,7 +69,7 @@ public SqsWorker(final SqsClient sqsClient,
6969 @ Override
7070 public void run () {
7171
72- while (true ) {
72+ while (true ) {
7373 int messagesProcessed = 0 ;
7474 try {
7575 messagesProcessed = processSqsMessages ();
@@ -152,26 +152,35 @@ private ParsedMessage convertS3EventMessages(final Message message) {
152152 }
153153
154154 private List <DeleteMessageBatchRequestEntry > processS3EventNotificationRecords (final Collection <ParsedMessage > s3EventNotificationRecords ) {
155- List <DeleteMessageBatchRequestEntry > deleteMessageBatchRequestEntryCollection = new ArrayList <>();
155+ final List <DeleteMessageBatchRequestEntry > deleteMessageBatchRequestEntryCollection = new ArrayList <>();
156+ final List <ParsedMessage > parsedMessagesToRead = new ArrayList <>();
157+
156158 for (ParsedMessage parsedMessage : s3EventNotificationRecords ) {
157159 if (parsedMessage .failedParsing ) {
158160 sqsMessagesFailedCounter .increment ();
159161 if (s3SourceConfig .getOnErrorOption ().equals (OnErrorOption .DELETE_MESSAGES )) {
160162 deleteMessageBatchRequestEntryCollection .add (buildDeleteMessageBatchRequestEntry (parsedMessage .message ));
161163 }
162- }
163- else {
164+ } else {
164165 final List <S3EventNotification .S3EventNotificationRecord > notificationRecords = parsedMessage .notificationRecords ;
165- if (!notificationRecords .isEmpty () && isEventNameCreated (notificationRecords .get (0 ))) {
166- final S3ObjectReference s3ObjectReference = populateS3Reference (notificationRecords .get (0 ));
167- final Optional <DeleteMessageBatchRequestEntry > deleteMessageBatchRequestEntry = processS3Object (parsedMessage , s3ObjectReference );
168- deleteMessageBatchRequestEntry .ifPresent (deleteMessageBatchRequestEntryCollection ::add );
166+ if (!notificationRecords .isEmpty () && isEventNameCreated (notificationRecords .get (0 ))) {
167+ parsedMessagesToRead .add (parsedMessage );
169168 } else {
170169 // Add SQS message to delete collection if the eventName is not ObjectCreated
171170 deleteMessageBatchRequestEntryCollection .add (buildDeleteMessageBatchRequestEntry (parsedMessage .message ));
172171 }
173172 }
174173 }
174+
175+ LOG .info ("Received {} messages from SQS. Read {} messages from S3." , s3EventNotificationRecords .size (), parsedMessagesToRead .size ());
176+
177+ for (ParsedMessage parsedMessage : parsedMessagesToRead ) {
178+ final List <S3EventNotification .S3EventNotificationRecord > notificationRecords = parsedMessage .notificationRecords ;
179+ final S3ObjectReference s3ObjectReference = populateS3Reference (notificationRecords .get (0 ));
180+ final Optional <DeleteMessageBatchRequestEntry > deleteMessageBatchRequestEntry = processS3Object (parsedMessage , s3ObjectReference );
181+ deleteMessageBatchRequestEntry .ifPresent (deleteMessageBatchRequestEntryCollection ::add );
182+ }
183+
175184 return deleteMessageBatchRequestEntryCollection ;
176185 }
177186
@@ -194,6 +203,7 @@ private Optional<DeleteMessageBatchRequestEntry> processS3Object(
194203 }
195204
196205 private void deleteSqsMessages (final List <DeleteMessageBatchRequestEntry > deleteMessageBatchRequestEntryCollection ) {
206+ LOG .debug ("Deleting {} messages from SQS." , deleteMessageBatchRequestEntryCollection .size ());
197207 final DeleteMessageBatchRequest deleteMessageBatchRequest = buildDeleteMessageBatchRequest (deleteMessageBatchRequestEntryCollection );
198208 final DeleteMessageBatchResponse deleteMessageBatchResponse = sqsClient .deleteMessageBatch (deleteMessageBatchRequest );
199209 if (deleteMessageBatchResponse .hasSuccessful ()) {
@@ -224,7 +234,7 @@ private boolean isEventNameCreated(final S3EventNotification.S3EventNotification
224234 private S3ObjectReference populateS3Reference (final S3EventNotification .S3EventNotificationRecord s3EventNotificationRecord ) {
225235 final S3EventNotification .S3Entity s3Entity = s3EventNotificationRecord .getS3 ();
226236 return S3ObjectReference .bucketAndKey (s3Entity .getBucket ().getName (),
227- s3Entity .getObject ().getUrlDecodedKey ())
237+ s3Entity .getObject ().getUrlDecodedKey ())
228238 .build ();
229239 }
230240
0 commit comments