2323import com .google .common .collect .ImmutableSet ;
2424import java .io .ByteArrayOutputStream ;
2525import java .io .File ;
26+ import java .io .IOException ;
2627import java .nio .file .Path ;
2728import java .util .ArrayList ;
2829import java .util .Arrays ;
102103@ RunWith (PowerMockRunner .class )
103104@ PrepareForTest ({ReflectionUtil .class , ContainerStorageManagerRestoreUtil .class })
104105public class TestContainerStorageManager {
105-
106106 private static final String STORE_NAME = "store" ;
107107 private static final String SYSTEM_NAME = "kafka" ;
108108 private static final String STREAM_NAME = "store-stream" ;
@@ -116,6 +116,7 @@ public class TestContainerStorageManager {
116116 private SamzaContainerMetrics samzaContainerMetrics ;
117117 private Map <TaskName , TaskModel > tasks ;
118118 private StandbyTestContext testContext ;
119+ private CheckpointManager checkpointManager ;
119120
120121 private volatile int systemConsumerCreationCount ;
121122 private volatile int systemConsumerStartCount ;
@@ -143,7 +144,7 @@ private void addMockedTask(String taskname, int changelogPartition) {
143144 * Method to create a containerStorageManager with mocked dependencies
144145 */
145146 @ Before
146- public void setUp () throws InterruptedException {
147+ public void setUp () throws InterruptedException , IOException {
147148 taskRestoreMetricGauges = new HashMap <>();
148149 this .tasks = new HashMap <>();
149150 this .taskInstanceMetrics = new HashMap <>();
@@ -248,7 +249,7 @@ public Void answer(InvocationOnMock invocation) {
248249 .thenReturn (
249250 new scala .collection .immutable .Map .Map1 (new SystemStream (SYSTEM_NAME , STREAM_NAME ), systemStreamMetadata ));
250251
251- CheckpointManager checkpointManager = mock (CheckpointManager .class );
252+ this . checkpointManager = mock (CheckpointManager .class );
252253 when (checkpointManager .readLastCheckpoint (any (TaskName .class ))).thenReturn (new CheckpointV1 (new HashMap <>()));
253254
254255 SSPMetadataCache mockSSPMetadataCache = mock (SSPMetadataCache .class );
@@ -320,6 +321,40 @@ public void testParallelismAndMetrics() throws InterruptedException {
320321 Assert .assertEquals ("systemConsumerStartCount count should be 1" , 1 , this .systemConsumerStartCount );
321322 }
322323
324+ /**
325+ * This test will attempt to verify if logged stores are deleted if the input checkpoints are empty.
326+ * */
327+ @ Test
328+ @ SuppressWarnings ("ResultOfMethodCallIgnored" )
329+ public void testDeleteLoggedStoreOnNoCheckpoints () {
330+ // reset the mock to reset the stubs in setup method
331+ reset (this .checkpointManager );
332+ // redo stubbing to return null checkpoints
333+ when (this .checkpointManager .readLastCheckpoint (any ())).thenReturn (null );
334+ // create store under logged stores to demonstrate deletion
335+ final File storeFile = new File (DEFAULT_LOGGED_STORE_BASE_DIR .getPath () + File .separator + STORE_NAME );
336+ // add contents to store
337+ final File storeFilePartition = new File (DEFAULT_LOGGED_STORE_BASE_DIR .getPath () + File .separator + STORE_NAME + File .separator + "Partition_0" );
338+ storeFilePartition .deleteOnExit ();
339+ storeFile .deleteOnExit ();
340+ try {
341+ storeFile .mkdirs ();
342+ storeFilePartition .createNewFile ();
343+ Assert .assertTrue ("Assert that stores are present prior to the test." , storeFile .exists ());
344+ Assert .assertTrue ("Assert that store files are present prior to the test." , storeFilePartition .exists ());
345+ this .containerStorageManager .start ();
346+ this .containerStorageManager .shutdown ();
347+ Assert .assertFalse ("Assert that stores are deleted after the test." , storeFile .exists ());
348+ Assert .assertFalse ("Assert that store files are deleted after the test." , storeFile .exists ());
349+ } catch (Exception e ) {
350+ System .out .printf ("File %s could not be created." , storeFile );
351+ Assert .fail ();
352+ } finally {
353+ storeFilePartition .delete ();
354+ storeFile .delete ();
355+ }
356+ }
357+
323358 @ Test
324359 public void testNoConfiguredDurableStores () throws InterruptedException {
325360 taskRestoreMetricGauges = new HashMap <>();
0 commit comments