Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,16 @@
import it.unibo.alchemist.model.positions.Euclidean2DPosition;
import org.junit.jupiter.api.Test;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.TimeoutException;

import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand All @@ -37,69 +40,130 @@
class LsaNodeConcurrencyTest {

private static final int MIN_MOLECULES = 5;
private static final int NUMBER_OF_OPERATIONS = 100;
private static final int NUMBER_OF_THREADS = 10;
private static final int TIMEOUT_SECONDS = 30;
private static final String SECONDS = " seconds";

@Test
void testConcurrentGetContentsAndModification() throws InterruptedException {
void testConcurrentGetContentsAndModification() {
final SAPEREIncarnation<Euclidean2DPosition> incarnation = new SAPEREIncarnation<>();
final Environment<List<ILsaMolecule>, Euclidean2DPosition> environment = new Continuous2DEnvironment<>(incarnation);
final LsaNode node = new LsaNode(environment);
// Add some initial molecules
for (int i = 0; i < 10; i++) {
node.setConcentration(new LsaMolecule("molecule" + i));
}
final int numberOfThreads = 10;
final int numberOfOperations = 100;
final ExecutorService executor = Executors.newFixedThreadPool(numberOfThreads);
final CountDownLatch latch = new CountDownLatch(numberOfThreads);
final AtomicBoolean exceptionOccurred = new AtomicBoolean(false);
// Start threads that modify the node while others read from it
for (int i = 0; i < numberOfThreads; i++) {
final ExecutorService executor = Executors.newFixedThreadPool(NUMBER_OF_THREADS);
final CountDownLatch latch = new CountDownLatch(NUMBER_OF_THREADS);
final List<Future<?>> tasks = new ArrayList<>(NUMBER_OF_THREADS);
for (int i = 0; i < NUMBER_OF_THREADS; i++) {
final int threadId = i;
executor.submit(() -> {
try {
for (int j = 0; j < numberOfOperations; j++) {
if (threadId % 2 == 0) {
// Reader threads - should not throw ConcurrentModificationException
final Map<Molecule, List<ILsaMolecule>> contents = node.getContents();
assertNotNull(contents);
final int moleculeCount = node.getMoleculeCount();
assertTrue(moleculeCount >= 0);
final List<ILsaMolecule> lsaSpace = node.getLsaSpace();
assertNotNull(lsaSpace);
} else {
// Writer threads
final ILsaMolecule newMolecule = new LsaMolecule("thread" + threadId + "_" + j);
node.setConcentration(newMolecule);
// Sometimes remove molecules to simulate real concurrent modification
if (j % 10 == 0 && node.getMoleculeCount() > MIN_MOLECULES) {
try {
node.removeConcentration(newMolecule);
} catch (final IllegalStateException e) {
// Expected if molecule was already removed by another thread
}
}
}
}
} catch (final IllegalStateException | java.util.ConcurrentModificationException e) {
exceptionOccurred.set(true);
e.printStackTrace();
} finally {
latch.countDown();
}
});
tasks.add(executor.submit(() -> runConcurrentTask(node, threadId, latch)));
}
// Wait for all threads to complete
assertTrue(latch.await(TIMEOUT_SECONDS, TimeUnit.SECONDS), "Test should complete within 30 seconds");
executor.shutdown();
// No exceptions should have occurred (especially no ConcurrentModificationException)
assertFalse(exceptionOccurred.get(), "No exceptions should occur during concurrent access");
awaitTaskCompletion(latch, tasks, executor);
// Verify the node is still in a valid state
final Map<Molecule, List<ILsaMolecule>> finalContents = node.getContents();
assertNotNull(finalContents);
assertTrue(node.getMoleculeCount() >= 0);
}

private Void runConcurrentTask(final LsaNode node, final int threadId, final CountDownLatch latch) {
try {
for (int j = 0; j < NUMBER_OF_OPERATIONS; j++) {
if (threadId % 2 == 0) {
// Reader threads - should not throw ConcurrentModificationException
final Map<Molecule, List<ILsaMolecule>> contents = node.getContents();
assertNotNull(contents);
final int moleculeCount = node.getMoleculeCount();
assertTrue(moleculeCount >= 0);
final List<ILsaMolecule> lsaSpace = node.getLsaSpace();
assertNotNull(lsaSpace);
} else {
runWriterIteration(node, threadId, j);
}
}
} finally {
latch.countDown();
}
return null;
}

private void runWriterIteration(final LsaNode node, final int threadId, final int operation) {
final ILsaMolecule newMolecule = new LsaMolecule("thread" + threadId + "x" + operation);
node.setConcentration(newMolecule);
// Sometimes remove molecules to simulate real concurrent modification
if (operation % 10 == 0 && node.getMoleculeCount() > MIN_MOLECULES) {
try {
node.removeConcentration(newMolecule);
} catch (final IllegalStateException e) {
// Expected if molecule was already removed by another thread
}
}
}

private void awaitTaskCompletion(
final CountDownLatch latch,
final List<Future<?>> tasks,
final ExecutorService executor
) {
AssertionError primaryFailure = null;
try {
try {
assertTrue(
latch.await(TIMEOUT_SECONDS, TimeUnit.SECONDS),
"Test should complete within " + TIMEOUT_SECONDS + SECONDS);
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
throw new AssertionError("Test interrupted while waiting for tasks", e);
}
for (final Future<?> task : tasks) {
waitForTask(task);
}
} catch (final AssertionError e) {
primaryFailure = e;
} finally {
executor.shutdownNow();
Comment thread
DanySK marked this conversation as resolved.
try {
if (!executor.awaitTermination(TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
final AssertionError terminationError = new AssertionError(
"Executor did not terminate within " + TIMEOUT_SECONDS + SECONDS);
if (primaryFailure != null) {
primaryFailure.addSuppressed(terminationError);
} else {
throw terminationError;
}
}
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
final AssertionError terminationError = new AssertionError(
"Interrupted while waiting for executor termination", e);
if (primaryFailure != null) {
primaryFailure.addSuppressed(terminationError);
} else {
throw terminationError;
}
}
Comment thread
DanySK marked this conversation as resolved.
}
if (primaryFailure != null) {
throw primaryFailure;
}
}

private static void waitForTask(final Future<?> task) {
try {
task.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
throw new AssertionError("Test interrupted while waiting for task", e);
} catch (final ExecutionException e) {
throw new AssertionError("Task failed with exception", e.getCause());
} catch (final TimeoutException e) {
task.cancel(true);
throw new AssertionError("Task timed out after " + TIMEOUT_SECONDS + SECONDS, e);
}
}

@Test
void testBasicFunctionalityPreserved() {
final SAPEREIncarnation<Euclidean2DPosition> incarnation = new SAPEREIncarnation<>();
Expand Down
Loading