diff --git a/alchemist-incarnation-sapere/src/test/java/it/unibo/alchemist/model/sapere/nodes/LsaNodeConcurrencyTest.java b/alchemist-incarnation-sapere/src/test/java/it/unibo/alchemist/model/sapere/nodes/LsaNodeConcurrencyTest.java index 3625454a18..425dee19a6 100644 --- a/alchemist-incarnation-sapere/src/test/java/it/unibo/alchemist/model/sapere/nodes/LsaNodeConcurrencyTest.java +++ b/alchemist-incarnation-sapere/src/test/java/it/unibo/alchemist/model/sapere/nodes/LsaNodeConcurrencyTest.java @@ -18,13 +18,16 @@ import it.unibo.alchemist.model.positions.Euclidean2DPosition; import org.junit.jupiter.api.Test; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.TimeoutException; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -37,10 +40,13 @@ class LsaNodeConcurrencyTest { private static final int MIN_MOLECULES = 5; + private static final int NUMBER_OF_OPERATIONS = 100; + private static final int NUMBER_OF_THREADS = 10; private static final int TIMEOUT_SECONDS = 30; + private static final String SECONDS = " seconds"; @Test - void testConcurrentGetContentsAndModification() throws InterruptedException { + void testConcurrentGetContentsAndModification() { final SAPEREIncarnation incarnation = new SAPEREIncarnation<>(); final Environment, Euclidean2DPosition> environment = new Continuous2DEnvironment<>(incarnation); final LsaNode node = new LsaNode(environment); @@ -48,58 +54,116 @@ void testConcurrentGetContentsAndModification() throws InterruptedException { for (int i = 0; i < 10; i++) { node.setConcentration(new LsaMolecule("molecule" + i)); } - final int numberOfThreads = 10; - final int numberOfOperations = 100; - final ExecutorService executor = Executors.newFixedThreadPool(numberOfThreads); - final CountDownLatch latch = new CountDownLatch(numberOfThreads); - final AtomicBoolean exceptionOccurred = new AtomicBoolean(false); - // Start threads that modify the node while others read from it - for (int i = 0; i < numberOfThreads; i++) { + final ExecutorService executor = Executors.newFixedThreadPool(NUMBER_OF_THREADS); + final CountDownLatch latch = new CountDownLatch(NUMBER_OF_THREADS); + final List> tasks = new ArrayList<>(NUMBER_OF_THREADS); + for (int i = 0; i < NUMBER_OF_THREADS; i++) { final int threadId = i; - executor.submit(() -> { - try { - for (int j = 0; j < numberOfOperations; j++) { - if (threadId % 2 == 0) { - // Reader threads - should not throw ConcurrentModificationException - final Map> contents = node.getContents(); - assertNotNull(contents); - final int moleculeCount = node.getMoleculeCount(); - assertTrue(moleculeCount >= 0); - final List lsaSpace = node.getLsaSpace(); - assertNotNull(lsaSpace); - } else { - // Writer threads - final ILsaMolecule newMolecule = new LsaMolecule("thread" + threadId + "_" + j); - node.setConcentration(newMolecule); - // Sometimes remove molecules to simulate real concurrent modification - if (j % 10 == 0 && node.getMoleculeCount() > MIN_MOLECULES) { - try { - node.removeConcentration(newMolecule); - } catch (final IllegalStateException e) { - // Expected if molecule was already removed by another thread - } - } - } - } - } catch (final IllegalStateException | java.util.ConcurrentModificationException e) { - exceptionOccurred.set(true); - e.printStackTrace(); - } finally { - latch.countDown(); - } - }); + tasks.add(executor.submit(() -> runConcurrentTask(node, threadId, latch))); } - // Wait for all threads to complete - assertTrue(latch.await(TIMEOUT_SECONDS, TimeUnit.SECONDS), "Test should complete within 30 seconds"); - executor.shutdown(); - // No exceptions should have occurred (especially no ConcurrentModificationException) - assertFalse(exceptionOccurred.get(), "No exceptions should occur during concurrent access"); + awaitTaskCompletion(latch, tasks, executor); // Verify the node is still in a valid state final Map> finalContents = node.getContents(); assertNotNull(finalContents); assertTrue(node.getMoleculeCount() >= 0); } + private Void runConcurrentTask(final LsaNode node, final int threadId, final CountDownLatch latch) { + try { + for (int j = 0; j < NUMBER_OF_OPERATIONS; j++) { + if (threadId % 2 == 0) { + // Reader threads - should not throw ConcurrentModificationException + final Map> contents = node.getContents(); + assertNotNull(contents); + final int moleculeCount = node.getMoleculeCount(); + assertTrue(moleculeCount >= 0); + final List lsaSpace = node.getLsaSpace(); + assertNotNull(lsaSpace); + } else { + runWriterIteration(node, threadId, j); + } + } + } finally { + latch.countDown(); + } + return null; + } + + private void runWriterIteration(final LsaNode node, final int threadId, final int operation) { + final ILsaMolecule newMolecule = new LsaMolecule("thread" + threadId + "x" + operation); + node.setConcentration(newMolecule); + // Sometimes remove molecules to simulate real concurrent modification + if (operation % 10 == 0 && node.getMoleculeCount() > MIN_MOLECULES) { + try { + node.removeConcentration(newMolecule); + } catch (final IllegalStateException e) { + // Expected if molecule was already removed by another thread + } + } + } + + private void awaitTaskCompletion( + final CountDownLatch latch, + final List> tasks, + final ExecutorService executor + ) { + AssertionError primaryFailure = null; + try { + try { + assertTrue( + latch.await(TIMEOUT_SECONDS, TimeUnit.SECONDS), + "Test should complete within " + TIMEOUT_SECONDS + SECONDS); + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + throw new AssertionError("Test interrupted while waiting for tasks", e); + } + for (final Future task : tasks) { + waitForTask(task); + } + } catch (final AssertionError e) { + primaryFailure = e; + } finally { + executor.shutdownNow(); + try { + if (!executor.awaitTermination(TIMEOUT_SECONDS, TimeUnit.SECONDS)) { + final AssertionError terminationError = new AssertionError( + "Executor did not terminate within " + TIMEOUT_SECONDS + SECONDS); + if (primaryFailure != null) { + primaryFailure.addSuppressed(terminationError); + } else { + throw terminationError; + } + } + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + final AssertionError terminationError = new AssertionError( + "Interrupted while waiting for executor termination", e); + if (primaryFailure != null) { + primaryFailure.addSuppressed(terminationError); + } else { + throw terminationError; + } + } + } + if (primaryFailure != null) { + throw primaryFailure; + } + } + + private static void waitForTask(final Future task) { + try { + task.get(TIMEOUT_SECONDS, TimeUnit.SECONDS); + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + throw new AssertionError("Test interrupted while waiting for task", e); + } catch (final ExecutionException e) { + throw new AssertionError("Task failed with exception", e.getCause()); + } catch (final TimeoutException e) { + task.cancel(true); + throw new AssertionError("Task timed out after " + TIMEOUT_SECONDS + SECONDS, e); + } + } + @Test void testBasicFunctionalityPreserved() { final SAPEREIncarnation incarnation = new SAPEREIncarnation<>();