1212import com .amazon .dataprepper .plugins .source .codec .Codec ;
1313import com .amazon .dataprepper .plugins .source .compression .CompressionEngine ;
1414import io .micrometer .core .instrument .Counter ;
15+ import io .micrometer .core .instrument .Timer ;
1516import org .junit .jupiter .api .BeforeEach ;
1617import org .junit .jupiter .api .Test ;
1718import org .junit .jupiter .api .extension .ExtendWith ;
3132import java .util .Optional ;
3233import java .util .Random ;
3334import java .util .UUID ;
35+ import java .util .concurrent .Callable ;
3436import java .util .function .Consumer ;
3537
3638import static org .hamcrest .CoreMatchers .equalTo ;
3739import static org .hamcrest .CoreMatchers .notNullValue ;
40+ import static org .hamcrest .CoreMatchers .nullValue ;
3841import static org .hamcrest .CoreMatchers .sameInstance ;
3942import static org .hamcrest .MatcherAssert .assertThat ;
4043import static org .junit .jupiter .api .Assertions .assertThrows ;
4548import static org .mockito .Mockito .mock ;
4649import static org .mockito .Mockito .mockStatic ;
4750import static org .mockito .Mockito .verify ;
51+ import static org .mockito .Mockito .verifyNoInteractions ;
4852import static org .mockito .Mockito .when ;
4953
5054@ ExtendWith (MockitoExtension .class )
@@ -72,13 +76,19 @@ class S3ObjectWorkerTest {
7276 private PluginMetrics pluginMetrics ;
7377 @ Mock
7478 private Counter s3ObjectsFailedCounter ;
79+ @ Mock
80+ private Counter s3ObjectsSucceededCounter ;
81+ @ Mock
82+ private Timer s3ObjectReadTimer ;
7583
7684 private String bucketName ;
7785 private String key ;
7886 private ResponseInputStream <GetObjectResponse > objectInputStream ;
7987
88+ private Exception exceptionThrownByCallable ;
89+
8090 @ BeforeEach
81- void setUp () {
91+ void setUp () throws Exception {
8292 final Random random = new Random ();
8393 bufferTimeout = Duration .ofMillis (random .nextInt (100 ) + 100 );
8494 recordsToAccumulate = random .nextInt (10 ) + 2 ;
@@ -88,7 +98,21 @@ void setUp() {
8898 when (s3ObjectReference .getBucketName ()).thenReturn (bucketName );
8999 when (s3ObjectReference .getKey ()).thenReturn (key );
90100
101+ exceptionThrownByCallable = null ;
102+ when (s3ObjectReadTimer .recordCallable (any (Callable .class )))
103+ .thenAnswer (a -> {
104+ try {
105+ a .getArgument (0 , Callable .class ).call ();
106+ } catch (final Exception ex ) {
107+ exceptionThrownByCallable = ex ;
108+ throw ex ;
109+ }
110+ return null ;
111+ });
112+
91113 when (pluginMetrics .counter (S3ObjectWorker .S3_OBJECTS_FAILED_METRIC_NAME )).thenReturn (s3ObjectsFailedCounter );
114+ when (pluginMetrics .counter (S3ObjectWorker .S3_OBJECTS_SUCCEEDED_METRIC_NAME )).thenReturn (s3ObjectsSucceededCounter );
115+ when (pluginMetrics .timer (S3ObjectWorker .S3_OBJECTS_TIME_ELAPSED_METRIC_NAME )).thenReturn (s3ObjectReadTimer );
92116
93117 objectInputStream = mock (ResponseInputStream .class );
94118 }
@@ -189,6 +213,20 @@ void parseS3Object_calls_BufferAccumulator_flush_after_Codec_parse() throws Exce
189213 inOrder .verify (bufferAccumulator ).flush ();
190214 }
191215
216+ @ Test
217+ void parseS3Object_increments_success_counter_after_parsing_S3_object () throws IOException {
218+ final ResponseInputStream <GetObjectResponse > objectInputStream = mock (ResponseInputStream .class );
219+ when (s3Client .getObject (any (GetObjectRequest .class )))
220+ .thenReturn (objectInputStream );
221+
222+ final S3ObjectWorker objectUnderTest = createObjectUnderTest ();
223+ objectUnderTest .parseS3Object (s3ObjectReference );
224+
225+ verify (s3ObjectsSucceededCounter ).increment ();
226+ verifyNoInteractions (s3ObjectsFailedCounter );
227+ assertThat (exceptionThrownByCallable , nullValue ());
228+ }
229+
192230 @ Test
193231 void parseS3Object_throws_Exception_and_increments_counter_when_unable_to_get_S3_object () {
194232 final RuntimeException expectedException = mock (RuntimeException .class );
@@ -201,10 +239,12 @@ void parseS3Object_throws_Exception_and_increments_counter_when_unable_to_get_S3
201239 assertThat (actualException , sameInstance (expectedException ));
202240
203241 verify (s3ObjectsFailedCounter ).increment ();
242+ verifyNoInteractions (s3ObjectsSucceededCounter );
243+ assertThat (exceptionThrownByCallable , sameInstance (expectedException ));
204244 }
205245
206246 @ Test
207- void parseS3Object_throws_Exception_and_increments_counter_when_unable_to_parse_S3_object () throws IOException {
247+ void parseS3Object_throws_Exception_and_increments_failure_counter_when_unable_to_parse_S3_object () throws IOException {
208248 when (compressionEngine .createInputStream (key , objectInputStream )).thenReturn (objectInputStream );
209249 when (s3Client .getObject (any (GetObjectRequest .class )))
210250 .thenReturn (objectInputStream );
@@ -218,5 +258,55 @@ void parseS3Object_throws_Exception_and_increments_counter_when_unable_to_parse_
218258 assertThat (actualException , sameInstance (expectedException ));
219259
220260 verify (s3ObjectsFailedCounter ).increment ();
261+ verifyNoInteractions (s3ObjectsSucceededCounter );
262+ assertThat (exceptionThrownByCallable , sameInstance (expectedException ));
263+ }
264+
265+ @ Test
266+ void parseS3Object_throws_Exception_and_increments_failure_counter_when_unable_to_GetObject_from_S3 () {
267+ final RuntimeException expectedException = mock (RuntimeException .class );
268+ when (s3Client .getObject (any (GetObjectRequest .class )))
269+ .thenThrow (expectedException );
270+
271+ final S3ObjectWorker objectUnderTest = createObjectUnderTest ();
272+ final RuntimeException actualException = assertThrows (RuntimeException .class , () -> objectUnderTest .parseS3Object (s3ObjectReference ));
273+
274+ assertThat (actualException , sameInstance (expectedException ));
275+
276+ verify (s3ObjectsFailedCounter ).increment ();
277+ verifyNoInteractions (s3ObjectsSucceededCounter );
278+ assertThat (exceptionThrownByCallable , sameInstance (expectedException ));
279+ }
280+
281+ @ Test
282+ void parseS3Object_throws_Exception_and_increments_failure_counter_when_CompressionEngine_fails () throws IOException {
283+ when (s3Client .getObject (any (GetObjectRequest .class )))
284+ .thenReturn (objectInputStream );
285+ final IOException expectedException = mock (IOException .class );
286+ when (compressionEngine .createInputStream (key , objectInputStream )).thenThrow (expectedException );
287+
288+ final S3ObjectWorker objectUnderTest = createObjectUnderTest ();
289+ final IOException actualException = assertThrows (IOException .class , () -> objectUnderTest .parseS3Object (s3ObjectReference ));
290+
291+ assertThat (actualException , sameInstance (expectedException ));
292+
293+ verify (s3ObjectsFailedCounter ).increment ();
294+ verifyNoInteractions (s3ObjectsSucceededCounter );
295+ assertThat (exceptionThrownByCallable , sameInstance (expectedException ));
296+ }
297+
298+ @ Test
299+ void parseS3Object_calls_GetObject_after_Callable () throws Exception {
300+ final ResponseInputStream <GetObjectResponse > objectInputStream = mock (ResponseInputStream .class );
301+ when (s3Client .getObject (any (GetObjectRequest .class )))
302+ .thenReturn (objectInputStream );
303+
304+ final S3ObjectWorker objectUnderTest = createObjectUnderTest ();
305+ objectUnderTest .parseS3Object (s3ObjectReference );
306+
307+ final InOrder inOrder = inOrder (s3ObjectReadTimer , s3Client );
308+
309+ inOrder .verify (s3ObjectReadTimer ).recordCallable (any (Callable .class ));
310+ inOrder .verify (s3Client ).getObject (any (GetObjectRequest .class ));
221311 }
222312}
0 commit comments