diff --git a/ocpp-common/src/main/java/eu/chargetime/ocpp/AsyncPromiseFulfillerDecorator.java b/ocpp-common/src/main/java/eu/chargetime/ocpp/AsyncPromiseFulfillerDecorator.java index 71a7ca9b..56eefb91 100644 --- a/ocpp-common/src/main/java/eu/chargetime/ocpp/AsyncPromiseFulfillerDecorator.java +++ b/ocpp-common/src/main/java/eu/chargetime/ocpp/AsyncPromiseFulfillerDecorator.java @@ -1,4 +1,5 @@ package eu.chargetime.ocpp; + /* ChargeTime.eu - Java-OCA-OCPP @@ -29,14 +30,28 @@ of this software and associated documentation files (the "Software"), to deal import eu.chargetime.ocpp.model.Request; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; public class AsyncPromiseFulfillerDecorator implements PromiseFulfiller { private final PromiseFulfiller promiseFulfiller; - private static ExecutorService executor = Executors.newCachedThreadPool(); + private static ExecutorService executor = createDefaultExecutor(); + + private static ExecutorService createDefaultExecutor() { + int coreSize = Runtime.getRuntime().availableProcessors(); + int maxSize = coreSize * 2; + return new ThreadPoolExecutor( + coreSize, + maxSize, + 60L, + TimeUnit.SECONDS, + new LinkedBlockingQueue<>(1000), + new ThreadPoolExecutor.CallerRunsPolicy()); + } public static void setExecutor(ExecutorService newExecutor) { executor = newExecutor; diff --git a/ocpp-common/src/main/java/eu/chargetime/ocpp/Communicator.java b/ocpp-common/src/main/java/eu/chargetime/ocpp/Communicator.java index 77c71d72..43a2d000 100644 --- a/ocpp-common/src/main/java/eu/chargetime/ocpp/Communicator.java +++ b/ocpp-common/src/main/java/eu/chargetime/ocpp/Communicator.java @@ -1,4 +1,5 @@ package eu.chargetime.ocpp; + /* ChargeTime.eu - Java-OCA-OCPP Copyright (C) 2015-2016 Thomas Volden @@ -29,7 +30,8 @@ of this software and associated documentation files (the "Software"), to deal import eu.chargetime.ocpp.feature.Feature; import eu.chargetime.ocpp.model.*; -import java.util.ArrayDeque; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.atomic.AtomicBoolean; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,11 +44,11 @@ of this software and associated documentation files (the "Software"), to deal public abstract class Communicator { private static final Logger logger = LoggerFactory.getLogger(Communicator.class); - private final ArrayDeque transactionQueue; + private final ConcurrentLinkedDeque transactionQueue; private RetryRunner retryRunner; protected Radio radio; private CommunicatorEvents events; - private boolean failedFlag; + private final AtomicBoolean failedFlag = new AtomicBoolean(false); /** * Convert a formatted string into a {@link Request}/{@link Confirmation}. This is useful for call @@ -149,9 +151,8 @@ public Communicator(Radio transmitter) { */ public Communicator(Radio transmitter, boolean enableTransactionQueue) { this.radio = transmitter; - this.transactionQueue = enableTransactionQueue ? new ArrayDeque<>() : null; + this.transactionQueue = enableTransactionQueue ? new ConcurrentLinkedDeque<>() : null; this.retryRunner = enableTransactionQueue ? new RetryRunner() : null; - this.failedFlag = false; } /** @@ -267,7 +268,8 @@ public void sendCallResult(String uniqueId, String action, Confirmation confirma public void sendCallError( String uniqueId, String action, String errorCode, String errorDescription) { logger.error( - "An error occurred. Sending this information: uniqueId {}: action: {}, errorCode: {}, errorDescription: {}", + "An error occurred. Sending this information: uniqueId {}: action: {}, errorCode: {}," + + " errorDescription: {}", uniqueId, action, errorCode, @@ -381,7 +383,7 @@ public void receivedMessage(Object input) { events.onCallResultError( call.getId(), call.getErrorCode(), call.getErrorDescription(), call.getRawPayload()); } else if (message instanceof CallErrorMessage) { - failedFlag = true; + failedFlag.set(true); CallErrorMessage call = (CallErrorMessage) message; events.onError( call.getId(), call.getErrorCode(), call.getErrorDescription(), call.getRawPayload()); @@ -417,11 +419,11 @@ private Object getRetryMessage() { * @return whether a fail flag has been raised. */ private boolean hasFailed() { - return failedFlag; + return failedFlag.get(); } private void popRetryMessage() { - if (transactionQueue != null && !transactionQueue.isEmpty()) transactionQueue.pop(); + if (transactionQueue != null) transactionQueue.pollFirst(); } /** Will resend transaction related requests. */ @@ -433,7 +435,7 @@ public void run() { Object call; try { while ((call = getRetryMessage()) != null) { - failedFlag = false; + failedFlag.set(false); radio.send(call); Thread.sleep(DELAY_IN_MILLISECONDS); if (!hasFailed()) popRetryMessage(); diff --git a/ocpp-common/src/main/java/eu/chargetime/ocpp/Session.java b/ocpp-common/src/main/java/eu/chargetime/ocpp/Session.java index 4873a243..55e221c7 100644 --- a/ocpp-common/src/main/java/eu/chargetime/ocpp/Session.java +++ b/ocpp-common/src/main/java/eu/chargetime/ocpp/Session.java @@ -1,4 +1,5 @@ package eu.chargetime.ocpp; + /* ChargeTime.eu - Java-OCA-OCPP Copyright (C) 2015-2016 Thomas Volden @@ -204,11 +205,13 @@ public void accept(SessionEvents eventHandler) { private class CommunicatorEventHandler implements CommunicatorEvents { private static final String OCCURRENCE_CONSTRAINT_VIOLATION = - "Payload for Action is syntactically correct but at least one of the fields violates occurrence constraints"; + "Payload for Action is syntactically correct but at least one of the fields violates" + + " occurrence constraints"; private static final String PROPERTY_CONSTRAINT_VIOLATION = "Payload is syntactically correct but at least one field contains an invalid value"; private static final String INTERNAL_ERROR = - "An internal error occurred and the receiver was not able to process the requested Action successfully"; + "An internal error occurred and the receiver was not able to process the requested Action" + + " successfully"; private static final String UNABLE_TO_PROCESS = "Unable to process action"; @Override @@ -266,7 +269,7 @@ public void onCallResult(String id, String action, Object payload) { } @Override - public synchronized void onCall(String id, String action, Object payload) { + public void onCall(String id, String action, Object payload) { Optional featureOptional = featureRepository.findFeature(action); if (!featureOptional.isPresent() || featureOptional.get().getConfirmationType() == null) { communicator.sendCallError( @@ -279,6 +282,7 @@ public synchronized void onCall(String id, String action, Object payload) { if (request.validate()) { CompletableFuture promise = new CompletableFuture<>(); promise.whenComplete(new ConfirmationHandler(id, action, communicator)); + promise.whenComplete((result, error) -> pendingPromises.remove(id)); addPendingPromise(id, action, promise); dispatcher.handleRequest(promise, request); } else { diff --git a/ocpp-common/src/test/java/eu/chargetime/ocpp/test/AsyncPromiseFulfillerDecoratorTest.java b/ocpp-common/src/test/java/eu/chargetime/ocpp/test/AsyncPromiseFulfillerDecoratorTest.java new file mode 100644 index 00000000..6a6f1166 --- /dev/null +++ b/ocpp-common/src/test/java/eu/chargetime/ocpp/test/AsyncPromiseFulfillerDecoratorTest.java @@ -0,0 +1,184 @@ +package eu.chargetime.ocpp.test; + +/* + ChargeTime.eu - Java-OCA-OCPP + + MIT License + + Permission is hereby granted, free of charge, to any person obtaining a copy + of this software and associated documentation files (the "Software"), to deal + in the Software without restriction, including without limitation the rights + to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + copies of the Software, and to permit persons to whom the Software is + furnished to do so, subject to the following conditions: + + The above copyright notice and this permission notice shall be included in all + copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + SOFTWARE. +*/ + +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.*; + +import eu.chargetime.ocpp.AsyncPromiseFulfillerDecorator; +import eu.chargetime.ocpp.PromiseFulfiller; +import eu.chargetime.ocpp.SessionEvents; +import eu.chargetime.ocpp.model.Confirmation; +import eu.chargetime.ocpp.model.Request; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class AsyncPromiseFulfillerDecoratorTest { + + @Mock private PromiseFulfiller innerFulfiller; + @Mock private SessionEvents sessionEvents; + @Mock private Request request; + + private AsyncPromiseFulfillerDecorator decorator; + + @Before + public void setup() { + // Reset to a fresh default executor before each test to avoid cross-test pollution + AsyncPromiseFulfillerDecorator.setExecutor(createFreshExecutor()); + decorator = new AsyncPromiseFulfillerDecorator(innerFulfiller); + } + + private static ExecutorService createFreshExecutor() { + int coreSize = Runtime.getRuntime().availableProcessors(); + int maxSize = coreSize * 2; + return new java.util.concurrent.ThreadPoolExecutor( + coreSize, + maxSize, + 60L, + TimeUnit.SECONDS, + new java.util.concurrent.LinkedBlockingQueue<>(1000), + new java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy()); + } + + @Test + public void fulfill_delegatesToInnerFulfiller() throws Exception { + CompletableFuture promise = new CompletableFuture<>(); + CountDownLatch latch = new CountDownLatch(1); + + doAnswer( + invocation -> { + latch.countDown(); + return null; + }) + .when(innerFulfiller) + .fulfill(any(), any(), any()); + + decorator.fulfill(promise, sessionEvents, request); + + assertTrue("Inner fulfiller was not called within timeout", latch.await(5, TimeUnit.SECONDS)); + verify(innerFulfiller).fulfill(promise, sessionEvents, request); + } + + @Test + public void fulfill_executesAsynchronously_doesNotBlockCaller() throws Exception { + CountDownLatch blockingLatch = new CountDownLatch(1); + CountDownLatch callerReturned = new CountDownLatch(1); + + doAnswer( + invocation -> { + // Block the inner fulfiller until we signal + blockingLatch.await(5, TimeUnit.SECONDS); + return null; + }) + .when(innerFulfiller) + .fulfill(any(), any(), any()); + + // fulfill() should return immediately without blocking + decorator.fulfill(null, sessionEvents, request); + callerReturned.countDown(); + + assertTrue("fulfill() blocked the caller thread", callerReturned.getCount() == 0); + + // Release the blocked task + blockingLatch.countDown(); + } + + @Test + public void fulfill_withCustomExecutor_usesCustomExecutor() throws Exception { + ExecutorService customExecutor = Executors.newSingleThreadExecutor(); + AsyncPromiseFulfillerDecorator.setExecutor(customExecutor); + + CountDownLatch latch = new CountDownLatch(1); + doAnswer( + invocation -> { + latch.countDown(); + return null; + }) + .when(innerFulfiller) + .fulfill(any(), any(), any()); + + decorator.fulfill(null, sessionEvents, request); + + assertTrue("Task was not executed on custom executor", latch.await(5, TimeUnit.SECONDS)); + verify(innerFulfiller).fulfill(null, sessionEvents, request); + + customExecutor.shutdown(); + } + + @Test + public void fulfill_multipleConcurrentCalls_allExecuted() throws Exception { + int callCount = 20; + CountDownLatch latch = new CountDownLatch(callCount); + + doAnswer( + invocation -> { + latch.countDown(); + return null; + }) + .when(innerFulfiller) + .fulfill(any(), any(), any()); + + for (int i = 0; i < callCount; i++) { + decorator.fulfill(null, sessionEvents, request); + } + + assertTrue("Not all concurrent fulfill calls were executed", latch.await(10, TimeUnit.SECONDS)); + verify(innerFulfiller, times(callCount)).fulfill(any(), any(), any()); + } + + @Test + public void fulfill_whenInnerFulfillerThrows_doesNotCrashExecutor() throws Exception { + doThrow(new RuntimeException("handler error")) + .when(innerFulfiller) + .fulfill(any(), any(), any()); + + // First call triggers exception + decorator.fulfill(null, sessionEvents, request); + Thread.sleep(200); + + // Second call should still work (executor not dead) + CountDownLatch latch = new CountDownLatch(1); + doAnswer( + invocation -> { + latch.countDown(); + return null; + }) + .when(innerFulfiller) + .fulfill(any(), any(), any()); + + decorator.fulfill(null, sessionEvents, request); + + assertTrue("Executor died after exception in fulfiller", latch.await(5, TimeUnit.SECONDS)); + } +} diff --git a/ocpp-common/src/test/java/eu/chargetime/ocpp/test/CommunicatorConcurrencyTest.java b/ocpp-common/src/test/java/eu/chargetime/ocpp/test/CommunicatorConcurrencyTest.java new file mode 100644 index 00000000..4d2de9b8 --- /dev/null +++ b/ocpp-common/src/test/java/eu/chargetime/ocpp/test/CommunicatorConcurrencyTest.java @@ -0,0 +1,195 @@ +package eu.chargetime.ocpp.test; + +/* + ChargeTime.eu - Java-OCA-OCPP + + MIT License + + Permission is hereby granted, free of charge, to any person obtaining a copy + of this software and associated documentation files (the "Software"), to deal + in the Software without restriction, including without limitation the rights + to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + copies of the Software, and to permit persons to whom the Software is + furnished to do so, subject to the following conditions: + + The above copyright notice and this permission notice shall be included in all + copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + SOFTWARE. +*/ + +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.*; +import static org.mockito.Mockito.lenient; + +import eu.chargetime.ocpp.*; +import eu.chargetime.ocpp.model.Message; +import eu.chargetime.ocpp.model.Request; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +/** + * Tests for thread safety of Communicator after ConcurrentLinkedDeque and AtomicBoolean changes. + */ +@RunWith(MockitoJUnitRunner.class) +public class CommunicatorConcurrencyTest { + + private Communicator communicator; + private RadioEvents eventHandler; + private AtomicInteger sendCount; + + @Mock private Receiver receiver; + @Mock private CommunicatorEvents events; + + @Before + public void setup() throws Exception { + sendCount = new AtomicInteger(0); + doAnswer(invocation -> eventHandler = invocation.getArgument(0, RadioEvents.class)) + .when(receiver) + .accept(any()); + + communicator = + new Communicator(receiver, true) { + @Override + public T unpackPayload(Object payload, Class type) { + return null; + } + + @Override + public Object packPayload(Object payload) { + return null; + } + + @Override + protected Object makeCallResult(String uniqueId, String action, Object payload) { + return null; + } + + @Override + protected Object makeCall(String uniqueId, String action, Object payload) { + return uniqueId; + } + + @Override + protected Object makeCallError( + String uniqueId, String action, String errorCode, String errorDescription) { + return null; + } + + @Override + protected Object makeCallResultError( + String uniqueId, String action, String errorCode, String errorDescription) { + return null; + } + + @Override + protected Object makeSend(String uniqueId, String action, Object payload) { + return uniqueId; + } + + @Override + protected Message parse(Object message) { + return null; + } + }; + communicator.accept(events); + } + + @Test + public void concurrentSendCall_transactionRelated_noExceptions() throws Exception { + // Radio is closed, so all transaction-related requests go to queue + when(receiver.isClosed()).thenReturn(true); + + int threadCount = 50; + CountDownLatch startLatch = new CountDownLatch(1); + CountDownLatch doneLatch = new CountDownLatch(threadCount); + List errors = Collections.synchronizedList(new ArrayList<>()); + + for (int i = 0; i < threadCount; i++) { + final int idx = i; + new Thread( + () -> { + try { + Request txRequest = mock(Request.class); + when(txRequest.transactionRelated()).thenReturn(true); + startLatch.await(); + communicator.sendCall("id-" + idx, "action", txRequest); + } catch (Throwable t) { + errors.add(t); + } finally { + doneLatch.countDown(); + } + }) + .start(); + } + + startLatch.countDown(); + assertTrue("Threads did not complete", doneLatch.await(10, TimeUnit.SECONDS)); + assertTrue("Concurrent sendCall threw exceptions: " + errors, errors.isEmpty()); + } + + @Test + public void concurrentSendCallAndConnect_noExceptions() throws Exception { + // Start with closed radio so requests queue up + when(receiver.isClosed()).thenReturn(true); + + int threadCount = 20; + CountDownLatch startLatch = new CountDownLatch(1); + CountDownLatch doneLatch = new CountDownLatch(threadCount + 1); + List errors = Collections.synchronizedList(new ArrayList<>()); + + // Threads that queue transaction-related requests + for (int i = 0; i < threadCount; i++) { + final int idx = i; + new Thread( + () -> { + try { + Request txRequest = mock(Request.class); + when(txRequest.transactionRelated()).thenReturn(true); + startLatch.await(); + communicator.sendCall("id-" + idx, "action", txRequest); + } catch (Throwable t) { + errors.add(t); + } finally { + doneLatch.countDown(); + } + }) + .start(); + } + + // Thread that triggers connection (processes queue) + new Thread( + () -> { + try { + startLatch.await(); + Thread.sleep(50); // Let some requests queue first + lenient().when(receiver.isClosed()).thenReturn(false); + eventHandler.connected(); + } catch (Throwable t) { + errors.add(t); + } finally { + doneLatch.countDown(); + } + }) + .start(); + + startLatch.countDown(); + assertTrue("Threads did not complete", doneLatch.await(10, TimeUnit.SECONDS)); + assertTrue("Concurrent sendCall+connect threw exceptions: " + errors, errors.isEmpty()); + } +} diff --git a/ocpp-common/src/test/java/eu/chargetime/ocpp/test/SessionConcurrencyTest.java b/ocpp-common/src/test/java/eu/chargetime/ocpp/test/SessionConcurrencyTest.java new file mode 100644 index 00000000..6781f728 --- /dev/null +++ b/ocpp-common/src/test/java/eu/chargetime/ocpp/test/SessionConcurrencyTest.java @@ -0,0 +1,219 @@ +package eu.chargetime.ocpp.test; + +/* + ChargeTime.eu - Java-OCA-OCPP + + MIT License + + Permission is hereby granted, free of charge, to any person obtaining a copy + of this software and associated documentation files (the "Software"), to deal + in the Software without restriction, including without limitation the rights + to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + copies of the Software, and to permit persons to whom the Software is + furnished to do so, subject to the following conditions: + + The above copyright notice and this permission notice shall be included in all + copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + SOFTWARE. +*/ + +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.*; + +import eu.chargetime.ocpp.*; +import eu.chargetime.ocpp.feature.Feature; +import eu.chargetime.ocpp.model.Confirmation; +import eu.chargetime.ocpp.model.TestRequest; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +/** + * Tests for Session thread safety after removing synchronized from onCall() and adding + * pendingPromises auto-cleanup. + */ +@RunWith(MockitoJUnitRunner.class) +public class SessionConcurrencyTest { + + private Session session; + private CommunicatorEvents eventHandler; + + @Mock private Communicator communicator; + @Mock private Queue queue; + @Mock private SessionEvents sessionEvents; + @Mock private Feature feature; + @Mock private FeatureRepository featureRepository; + @Mock private PromiseFulfiller fulfiller; + + @Before + public void setup() throws Exception { + when(featureRepository.findFeature(any())).thenReturn(Optional.of(feature)); + when(feature.getConfirmationType()).thenAnswer(invocation -> Confirmation.class); + when(communicator.unpackPayload(any(), any())).thenReturn(new TestRequest()); + + session = new Session(communicator, queue, fulfiller, featureRepository); + + doAnswer(invocation -> eventHandler = invocation.getArgument(1, CommunicatorEvents.class)) + .when(communicator) + .connect(any(), any()); + session.open(null, sessionEvents); + } + + @Test + public void concurrentOnCall_allRequestsProcessed() throws Exception { + int threadCount = 50; + CountDownLatch startLatch = new CountDownLatch(1); + CountDownLatch doneLatch = new CountDownLatch(threadCount); + List errors = Collections.synchronizedList(new ArrayList<>()); + + // Fulfiller completes the promise immediately + doAnswer( + invocation -> { + CompletableFuture future = invocation.getArgument(0); + if (future != null) { + Confirmation conf = + new Confirmation() { + @Override + public boolean validate() { + return true; + } + }; + future.complete(conf); + } + return null; + }) + .when(fulfiller) + .fulfill(any(), any(), any()); + + for (int i = 0; i < threadCount; i++) { + final String id = "call-" + i; + new Thread( + () -> { + try { + startLatch.await(); + eventHandler.onCall(id, "TestAction", "{}"); + } catch (Throwable t) { + errors.add(t); + } finally { + doneLatch.countDown(); + } + }) + .start(); + } + + startLatch.countDown(); + assertTrue("Threads did not complete", doneLatch.await(10, TimeUnit.SECONDS)); + assertTrue("Concurrent onCall threw exceptions: " + errors, errors.isEmpty()); + + // All calls should have been dispatched + verify(fulfiller, times(threadCount)).fulfill(any(), any(), any()); + } + + @Test + public void onCall_pendingPromiseCleanedUpAfterCompletion() throws Exception { + // Fulfiller completes the promise immediately + doAnswer( + invocation -> { + CompletableFuture future = invocation.getArgument(0); + if (future != null) { + Confirmation conf = + new Confirmation() { + @Override + public boolean validate() { + return true; + } + }; + future.complete(conf); + } + return null; + }) + .when(fulfiller) + .fulfill(any(), any(), any()); + + eventHandler.onCall("test-id", "TestAction", "{}"); + + // Give whenComplete callback time to run + Thread.sleep(100); + + // Attempting to complete an already-cleaned promise should return false + boolean result = session.completePendingPromise("test-id", mock(Confirmation.class)); + assertTrue("pendingPromise should have been auto-removed after completion", !result); + } + + @Test + public void concurrentOnCallAndCompletePendingPromise_noExceptions() throws Exception { + int threadCount = 30; + CountDownLatch startLatch = new CountDownLatch(1); + CountDownLatch doneLatch = new CountDownLatch(threadCount * 2); + List errors = Collections.synchronizedList(new ArrayList<>()); + + // Fulfiller stores promise but does NOT complete it immediately + doNothing().when(fulfiller).fulfill(any(), any(), any()); + + // Threads that send onCall + for (int i = 0; i < threadCount; i++) { + final String id = "async-" + i; + new Thread( + () -> { + try { + startLatch.await(); + eventHandler.onCall(id, "TestAction", "{}"); + } catch (Throwable t) { + errors.add(t); + } finally { + doneLatch.countDown(); + } + }) + .start(); + } + + // Threads that try to complete promises + for (int i = 0; i < threadCount; i++) { + final String id = "async-" + i; + new Thread( + () -> { + try { + startLatch.await(); + Thread.sleep(50); // Let onCall add the promise first + Confirmation conf = + new Confirmation() { + @Override + public boolean validate() { + return true; + } + }; + try { + session.completePendingPromise(id, conf); + } catch (Exception e) { + // May fail if promise not yet added — that's OK + } + } catch (Throwable t) { + errors.add(t); + } finally { + doneLatch.countDown(); + } + }) + .start(); + } + + startLatch.countDown(); + assertTrue("Threads did not complete", doneLatch.await(10, TimeUnit.SECONDS)); + assertTrue("Concurrent operations threw exceptions: " + errors, errors.isEmpty()); + } +}