Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
package eu.chargetime.ocpp;

/*
ChargeTime.eu - Java-OCA-OCPP

Expand Down Expand Up @@ -29,14 +30,28 @@ of this software and associated documentation files (the "Software"), to deal
import eu.chargetime.ocpp.model.Request;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;

public class AsyncPromiseFulfillerDecorator implements PromiseFulfiller {

private final PromiseFulfiller promiseFulfiller;

private static ExecutorService executor = Executors.newCachedThreadPool();
private static ExecutorService executor = createDefaultExecutor();

private static ExecutorService createDefaultExecutor() {
int coreSize = Runtime.getRuntime().availableProcessors();
int maxSize = coreSize * 2;
return new ThreadPoolExecutor(
coreSize,
maxSize,
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1000),
new ThreadPoolExecutor.CallerRunsPolicy());
}

public static void setExecutor(ExecutorService newExecutor) {
executor = newExecutor;
Expand Down
22 changes: 12 additions & 10 deletions ocpp-common/src/main/java/eu/chargetime/ocpp/Communicator.java
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
package eu.chargetime.ocpp;

/*
ChargeTime.eu - Java-OCA-OCPP
Copyright (C) 2015-2016 Thomas Volden <tv@chargetime.eu>
Expand Down Expand Up @@ -29,7 +30,8 @@ of this software and associated documentation files (the "Software"), to deal

import eu.chargetime.ocpp.feature.Feature;
import eu.chargetime.ocpp.model.*;
import java.util.ArrayDeque;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -42,11 +44,11 @@ of this software and associated documentation files (the "Software"), to deal
public abstract class Communicator {
private static final Logger logger = LoggerFactory.getLogger(Communicator.class);

private final ArrayDeque<Object> transactionQueue;
private final ConcurrentLinkedDeque<Object> transactionQueue;
private RetryRunner retryRunner;
protected Radio radio;
private CommunicatorEvents events;
private boolean failedFlag;
private final AtomicBoolean failedFlag = new AtomicBoolean(false);

/**
* Convert a formatted string into a {@link Request}/{@link Confirmation}. This is useful for call
Expand Down Expand Up @@ -149,9 +151,8 @@ public Communicator(Radio transmitter) {
*/
public Communicator(Radio transmitter, boolean enableTransactionQueue) {
this.radio = transmitter;
this.transactionQueue = enableTransactionQueue ? new ArrayDeque<>() : null;
this.transactionQueue = enableTransactionQueue ? new ConcurrentLinkedDeque<>() : null;
this.retryRunner = enableTransactionQueue ? new RetryRunner() : null;
this.failedFlag = false;
}

/**
Expand Down Expand Up @@ -267,7 +268,8 @@ public void sendCallResult(String uniqueId, String action, Confirmation confirma
public void sendCallError(
String uniqueId, String action, String errorCode, String errorDescription) {
logger.error(
"An error occurred. Sending this information: uniqueId {}: action: {}, errorCode: {}, errorDescription: {}",
"An error occurred. Sending this information: uniqueId {}: action: {}, errorCode: {},"
+ " errorDescription: {}",
uniqueId,
action,
errorCode,
Expand Down Expand Up @@ -381,7 +383,7 @@ public void receivedMessage(Object input) {
events.onCallResultError(
call.getId(), call.getErrorCode(), call.getErrorDescription(), call.getRawPayload());
} else if (message instanceof CallErrorMessage) {
failedFlag = true;
failedFlag.set(true);
CallErrorMessage call = (CallErrorMessage) message;
events.onError(
call.getId(), call.getErrorCode(), call.getErrorDescription(), call.getRawPayload());
Expand Down Expand Up @@ -417,11 +419,11 @@ private Object getRetryMessage() {
* @return whether a fail flag has been raised.
*/
private boolean hasFailed() {
return failedFlag;
return failedFlag.get();
}

private void popRetryMessage() {
if (transactionQueue != null && !transactionQueue.isEmpty()) transactionQueue.pop();
if (transactionQueue != null) transactionQueue.pollFirst();
}

/** Will resend transaction related requests. */
Expand All @@ -433,7 +435,7 @@ public void run() {
Object call;
try {
while ((call = getRetryMessage()) != null) {
failedFlag = false;
failedFlag.set(false);
radio.send(call);
Thread.sleep(DELAY_IN_MILLISECONDS);
if (!hasFailed()) popRetryMessage();
Expand Down
10 changes: 7 additions & 3 deletions ocpp-common/src/main/java/eu/chargetime/ocpp/Session.java
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
package eu.chargetime.ocpp;

/*
ChargeTime.eu - Java-OCA-OCPP
Copyright (C) 2015-2016 Thomas Volden <tv@chargetime.eu>
Expand Down Expand Up @@ -204,11 +205,13 @@ public void accept(SessionEvents eventHandler) {

private class CommunicatorEventHandler implements CommunicatorEvents {
private static final String OCCURRENCE_CONSTRAINT_VIOLATION =
"Payload for Action is syntactically correct but at least one of the fields violates occurrence constraints";
"Payload for Action is syntactically correct but at least one of the fields violates"
+ " occurrence constraints";
private static final String PROPERTY_CONSTRAINT_VIOLATION =
"Payload is syntactically correct but at least one field contains an invalid value";
private static final String INTERNAL_ERROR =
"An internal error occurred and the receiver was not able to process the requested Action successfully";
"An internal error occurred and the receiver was not able to process the requested Action"
+ " successfully";
private static final String UNABLE_TO_PROCESS = "Unable to process action";

@Override
Expand Down Expand Up @@ -266,7 +269,7 @@ public void onCallResult(String id, String action, Object payload) {
}

@Override
public synchronized void onCall(String id, String action, Object payload) {
public void onCall(String id, String action, Object payload) {
Optional<Feature> featureOptional = featureRepository.findFeature(action);
if (!featureOptional.isPresent() || featureOptional.get().getConfirmationType() == null) {
communicator.sendCallError(
Expand All @@ -279,6 +282,7 @@ public synchronized void onCall(String id, String action, Object payload) {
if (request.validate()) {
CompletableFuture<Confirmation> promise = new CompletableFuture<>();
promise.whenComplete(new ConfirmationHandler(id, action, communicator));
promise.whenComplete((result, error) -> pendingPromises.remove(id));
addPendingPromise(id, action, promise);
dispatcher.handleRequest(promise, request);
} else {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
package eu.chargetime.ocpp.test;

/*
ChargeTime.eu - Java-OCA-OCPP

MIT License

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/

import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.*;

import eu.chargetime.ocpp.AsyncPromiseFulfillerDecorator;
import eu.chargetime.ocpp.PromiseFulfiller;
import eu.chargetime.ocpp.SessionEvents;
import eu.chargetime.ocpp.model.Confirmation;
import eu.chargetime.ocpp.model.Request;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;

@RunWith(MockitoJUnitRunner.class)
public class AsyncPromiseFulfillerDecoratorTest {

@Mock private PromiseFulfiller innerFulfiller;
@Mock private SessionEvents sessionEvents;
@Mock private Request request;

private AsyncPromiseFulfillerDecorator decorator;

@Before
public void setup() {
// Reset to a fresh default executor before each test to avoid cross-test pollution
AsyncPromiseFulfillerDecorator.setExecutor(createFreshExecutor());
decorator = new AsyncPromiseFulfillerDecorator(innerFulfiller);
}

private static ExecutorService createFreshExecutor() {
int coreSize = Runtime.getRuntime().availableProcessors();
int maxSize = coreSize * 2;
return new java.util.concurrent.ThreadPoolExecutor(
coreSize,
maxSize,
60L,
TimeUnit.SECONDS,
new java.util.concurrent.LinkedBlockingQueue<>(1000),
new java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy());
}

@Test
public void fulfill_delegatesToInnerFulfiller() throws Exception {
CompletableFuture<Confirmation> promise = new CompletableFuture<>();
CountDownLatch latch = new CountDownLatch(1);

doAnswer(
invocation -> {
latch.countDown();
return null;
})
.when(innerFulfiller)
.fulfill(any(), any(), any());

decorator.fulfill(promise, sessionEvents, request);

assertTrue("Inner fulfiller was not called within timeout", latch.await(5, TimeUnit.SECONDS));
verify(innerFulfiller).fulfill(promise, sessionEvents, request);
}

@Test
public void fulfill_executesAsynchronously_doesNotBlockCaller() throws Exception {
CountDownLatch blockingLatch = new CountDownLatch(1);
CountDownLatch callerReturned = new CountDownLatch(1);

doAnswer(
invocation -> {
// Block the inner fulfiller until we signal
blockingLatch.await(5, TimeUnit.SECONDS);
return null;
})
.when(innerFulfiller)
.fulfill(any(), any(), any());

// fulfill() should return immediately without blocking
decorator.fulfill(null, sessionEvents, request);
callerReturned.countDown();

assertTrue("fulfill() blocked the caller thread", callerReturned.getCount() == 0);

// Release the blocked task
blockingLatch.countDown();
}

@Test
public void fulfill_withCustomExecutor_usesCustomExecutor() throws Exception {
ExecutorService customExecutor = Executors.newSingleThreadExecutor();
AsyncPromiseFulfillerDecorator.setExecutor(customExecutor);

CountDownLatch latch = new CountDownLatch(1);
doAnswer(
invocation -> {
latch.countDown();
return null;
})
.when(innerFulfiller)
.fulfill(any(), any(), any());

decorator.fulfill(null, sessionEvents, request);

assertTrue("Task was not executed on custom executor", latch.await(5, TimeUnit.SECONDS));
verify(innerFulfiller).fulfill(null, sessionEvents, request);

customExecutor.shutdown();
}

@Test
public void fulfill_multipleConcurrentCalls_allExecuted() throws Exception {
int callCount = 20;
CountDownLatch latch = new CountDownLatch(callCount);

doAnswer(
invocation -> {
latch.countDown();
return null;
})
.when(innerFulfiller)
.fulfill(any(), any(), any());

for (int i = 0; i < callCount; i++) {
decorator.fulfill(null, sessionEvents, request);
}

assertTrue("Not all concurrent fulfill calls were executed", latch.await(10, TimeUnit.SECONDS));
verify(innerFulfiller, times(callCount)).fulfill(any(), any(), any());
}

@Test
public void fulfill_whenInnerFulfillerThrows_doesNotCrashExecutor() throws Exception {
doThrow(new RuntimeException("handler error"))
.when(innerFulfiller)
.fulfill(any(), any(), any());

// First call triggers exception
decorator.fulfill(null, sessionEvents, request);
Thread.sleep(200);

// Second call should still work (executor not dead)
CountDownLatch latch = new CountDownLatch(1);
doAnswer(
invocation -> {
latch.countDown();
return null;
})
.when(innerFulfiller)
.fulfill(any(), any(), any());

decorator.fulfill(null, sessionEvents, request);

assertTrue("Executor died after exception in fulfiller", latch.await(5, TimeUnit.SECONDS));
}
}
Loading