diff --git a/pom.xml b/pom.xml
index 19b9781..62cf83c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -77,6 +77,12 @@
amqp-client
3.6.5
+
+
+ org.java-websocket
+ Java-WebSocket
+ 1.5.2
+
com.squareup.okhttp3
okhttp
diff --git a/src/main/java/de/lemaik/renderservice/renderer/application/RendererApplication.java b/src/main/java/de/lemaik/renderservice/renderer/application/RendererApplication.java
index 66f1d98..8ef405e 100644
--- a/src/main/java/de/lemaik/renderservice/renderer/application/RendererApplication.java
+++ b/src/main/java/de/lemaik/renderservice/renderer/application/RendererApplication.java
@@ -132,17 +132,14 @@ public void start() {
// (username is the first 8 characters of the api key)
URI queueUri;
try {
- URI url = new URI(rsInfo.getRabbitMq());
- queueUri = new URI(
- url.getScheme() + "://" + settings.getApiKey().substring(0, 8) + ":" + settings
- .getApiKey() + "@" + url.getHost() + ":" + url.getPort() + url.getPath());
+ queueUri = new URI(rsInfo.getWsUrl()).resolve("/rendernode");
} catch (URISyntaxException e) {
LOGGER.error("Invalid queue url or api key", e);
System.exit(-1);
return;
}
- worker = new RenderWorker(queueUri.toString(), getSettings().getThreads().orElse(2),
+ worker = new RenderWorker(queueUri, settings.getApiKey(), getSettings().getThreads().orElse(2),
getSettings().getCpuLoad().orElse(100),
getSettings().getName().orElse(null), jobDirectory, texturepacksDirectory,
chunkyWrapperFactory, api);
diff --git a/src/main/java/de/lemaik/renderservice/renderer/message/AuthenticationMessage.java b/src/main/java/de/lemaik/renderservice/renderer/message/AuthenticationMessage.java
new file mode 100644
index 0000000..f155804
--- /dev/null
+++ b/src/main/java/de/lemaik/renderservice/renderer/message/AuthenticationMessage.java
@@ -0,0 +1,13 @@
+package de.lemaik.renderservice.renderer.message;
+
+public class AuthenticationMessage {
+ protected String token;
+
+ protected AuthenticationMessage() {
+ this.token = null;
+ }
+
+ public String getToken() {
+ return token;
+ }
+}
diff --git a/src/main/java/de/lemaik/renderservice/renderer/message/ErrorMessage.java b/src/main/java/de/lemaik/renderservice/renderer/message/ErrorMessage.java
new file mode 100644
index 0000000..511a674
--- /dev/null
+++ b/src/main/java/de/lemaik/renderservice/renderer/message/ErrorMessage.java
@@ -0,0 +1,13 @@
+package de.lemaik.renderservice.renderer.message;
+
+public class ErrorMessage {
+ protected String message;
+
+ protected ErrorMessage() {
+ this.message = null;
+ }
+
+ public String getMessage() {
+ return message;
+ }
+}
diff --git a/src/main/java/de/lemaik/renderservice/renderer/message/Message.java b/src/main/java/de/lemaik/renderservice/renderer/message/Message.java
new file mode 100644
index 0000000..454c25a
--- /dev/null
+++ b/src/main/java/de/lemaik/renderservice/renderer/message/Message.java
@@ -0,0 +1,132 @@
+package de.lemaik.renderservice.renderer.message;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonSyntaxException;
+
+public class Message {
+ private static final Gson GSON = new Gson();
+
+ private ErrorMessage Error;
+ private WarningMessage Warning;
+ private ServerInfoMessage ServerInfo;
+ private Object[] AuthenticationRequest;
+ private AuthenticationMessage Authentication;
+ private Object[] AuthenticationOk;
+ private Object[] TaskGet;
+ private TaskMessage Task;
+ private Object[] TaskComplete;
+
+ public Message() {
+ this.Error = null;
+ this.Warning = null;
+ this.AuthenticationRequest = null;
+ this.Authentication = null;
+ this.TaskGet = null;
+ this.Task = null;
+ }
+
+ public static Message parse(String message) throws JsonSyntaxException {
+ return GSON.fromJson(message, Message.class);
+ }
+
+ public String toJson() {
+ return toJson(GSON);
+ }
+
+ private String toJson(Gson gson) {
+ return gson.toJson(this);
+ }
+
+ @Override
+ public String toString() {
+ return toJson(new GsonBuilder().setPrettyPrinting().create());
+ }
+
+ public static Message empty() {
+ return new Message();
+ }
+
+ public static Message error(String message) {
+ Message m = new Message();
+ m.Error = new ErrorMessage();
+ m.Error.message = message;
+ return m;
+ }
+
+ public ErrorMessage getError() {
+ return Error;
+ }
+
+ public static Message warning(String message) {
+ Message m = new Message();
+ m.Warning = new WarningMessage();
+ m.Warning.message = message;
+ return m;
+ }
+
+ public WarningMessage getWarning() {
+ return Warning;
+ }
+
+ public static Message authenticationRequest() {
+ Message m = new Message();
+ m.AuthenticationRequest = new Object[0];
+ return m;
+ }
+
+ public ServerInfoMessage getServerInfo() {
+ return ServerInfo;
+ }
+
+ public boolean getAuthenticationRequest() {
+ return AuthenticationRequest != null;
+ }
+
+ public static Message authentication(String token) {
+ Message m = new Message();
+ m.Authentication = new AuthenticationMessage();
+ m.Authentication.token = token;
+ return m;
+ }
+
+ public AuthenticationMessage getAuthentication() {
+ return Authentication;
+ }
+
+ public boolean getAuthenticationOk() {
+ return AuthenticationOk != null;
+ }
+
+ public static Message taskGet() {
+ Message m = new Message();
+ m.TaskGet = new Object[0];
+ return m;
+ }
+
+ public boolean getTaskGet() {
+ return TaskGet != null;
+ }
+
+ public static Message task(String jobId, int spp) {
+ Message m = new Message();
+ m.Task = new TaskMessage();
+ m.Task.job_id = jobId;
+ m.Task.spp = spp;
+ return m;
+ }
+
+ public TaskMessage getTask() {
+ return Task;
+ }
+
+ public static Message taskComplete() {
+ Message m = new Message();
+ m.TaskComplete = new Object[0];
+ return m;
+ }
+
+ public boolean getTaskComplete() {
+ return TaskComplete != null;
+ }
+}
diff --git a/src/main/java/de/lemaik/renderservice/renderer/message/ServerInfoMessage.java b/src/main/java/de/lemaik/renderservice/renderer/message/ServerInfoMessage.java
new file mode 100644
index 0000000..bc36e4c
--- /dev/null
+++ b/src/main/java/de/lemaik/renderservice/renderer/message/ServerInfoMessage.java
@@ -0,0 +1,13 @@
+package de.lemaik.renderservice.renderer.message;
+
+public class ServerInfoMessage {
+ protected int protocol_version;
+
+ protected ServerInfoMessage() {
+ protocol_version = 0;
+ }
+
+ public int getProtocolVersion() {
+ return this.protocol_version;
+ }
+}
diff --git a/src/main/java/de/lemaik/renderservice/renderer/message/TaskMessage.java b/src/main/java/de/lemaik/renderservice/renderer/message/TaskMessage.java
new file mode 100644
index 0000000..4af4ecd
--- /dev/null
+++ b/src/main/java/de/lemaik/renderservice/renderer/message/TaskMessage.java
@@ -0,0 +1,19 @@
+package de.lemaik.renderservice.renderer.message;
+
+public class TaskMessage {
+ protected String job_id;
+ protected int spp;
+
+ protected TaskMessage() {
+ this.job_id = null;
+ this.spp = 0;
+ }
+
+ public String getJobId() {
+ return this.job_id;
+ }
+
+ public int getSpp() {
+ return this.spp;
+ }
+}
diff --git a/src/main/java/de/lemaik/renderservice/renderer/message/WarningMessage.java b/src/main/java/de/lemaik/renderservice/renderer/message/WarningMessage.java
new file mode 100644
index 0000000..00b71fd
--- /dev/null
+++ b/src/main/java/de/lemaik/renderservice/renderer/message/WarningMessage.java
@@ -0,0 +1,13 @@
+package de.lemaik.renderservice.renderer.message;
+
+public class WarningMessage {
+ protected String message;
+
+ protected WarningMessage() {
+ this.message = null;
+ }
+
+ public String getMessage() {
+ return message;
+ }
+}
diff --git a/src/main/java/de/lemaik/renderservice/renderer/rendering/MessageClient.java b/src/main/java/de/lemaik/renderservice/renderer/rendering/MessageClient.java
new file mode 100644
index 0000000..ddbfa6d
--- /dev/null
+++ b/src/main/java/de/lemaik/renderservice/renderer/rendering/MessageClient.java
@@ -0,0 +1,106 @@
+package de.lemaik.renderservice.renderer.rendering;
+
+import com.google.gson.JsonSyntaxException;
+import de.lemaik.renderservice.renderer.message.Message;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.java_websocket.client.WebSocketClient;
+import org.java_websocket.handshake.ServerHandshake;
+
+import java.net.URI;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+public class MessageClient extends WebSocketClient {
+
+ private static final Logger LOGGER = LogManager.getLogger(MessageClient.class);
+
+ /**
+ * We shouldn't ever have more then a handful of messages in waiting.
+ */
+ private static final int QUEUE_CAPACITY = 8;
+
+ private final ArrayBlockingQueue messages = new ArrayBlockingQueue<>(QUEUE_CAPACITY);
+
+ public interface Factory {
+ MessageClient connect();
+ }
+
+ public MessageClient(URI serverUri) {
+ super(serverUri);
+ }
+
+ @Override
+ public void onOpen(ServerHandshake serverHandshake) {
+ LOGGER.info("Websocket client connected.");
+ }
+
+ @Override
+ public void onMessage(String s) {
+ LOGGER.debug("Received raw message: {}", s);
+
+ Message message;
+ try {
+ message = Message.parse(s);
+ } catch (JsonSyntaxException e) {
+ LOGGER.info("Received invalid JSON: ", e);
+ return;
+ }
+
+ // Error message
+ if (message.getError() != null) {
+ LOGGER.error("Received error message: {}", message.getError().getMessage());
+ close();
+ return;
+ }
+
+ // Warning message
+ if (message.getWarning() != null) {
+ LOGGER.warn("Received warning message: {}", message.getWarning().getMessage());
+ return;
+ }
+
+ LOGGER.info("Received message: {}", message);
+ if (!messages.offer(message)) {
+ LOGGER.error("Message queue full.");
+ close();
+ }
+ }
+
+ @Override
+ public void onClose(int i, String s, boolean b) {
+ LOGGER.info("Websocket client closed by {} with code {}: {}", b ? "remote": "us", i, s);
+ this.messages.offer(Message.empty()); // Force any listeners to close
+ }
+
+ @Override
+ public void onError(Exception e) {
+ LOGGER.error("Encountered error in websocket client: ", e);
+ }
+
+ /**
+ * Send a message.
+ */
+ public void send(Message message) {
+ LOGGER.debug("Sent message: {}", message::toString);
+ this.send(message.toJson());
+ }
+
+ /**
+ * Wait for the next message.
+ */
+ public Message poll() throws InterruptedException {
+ return this.messages.take();
+ }
+
+ /**
+ * Wait for the next message.
+ */
+ public Message poll(long timeout, TimeUnit unit) throws InterruptedException {
+ Message m = this.messages.poll(timeout, unit);
+ if (m == null) {
+ return Message.empty();
+ }
+ return m;
+ }
+}
diff --git a/src/main/java/de/lemaik/renderservice/renderer/rendering/RenderServiceInfo.java b/src/main/java/de/lemaik/renderservice/renderer/rendering/RenderServiceInfo.java
index c2c2f99..dfb9bb6 100644
--- a/src/main/java/de/lemaik/renderservice/renderer/rendering/RenderServiceInfo.java
+++ b/src/main/java/de/lemaik/renderservice/renderer/rendering/RenderServiceInfo.java
@@ -21,6 +21,7 @@
public class RenderServiceInfo {
private int version;
private String rabbitMq;
+ private String wsUrl;
public int getVersion() {
return version;
@@ -29,4 +30,8 @@ public int getVersion() {
public String getRabbitMq() {
return rabbitMq;
}
+
+ public String getWsUrl() {
+ return wsUrl;
+ }
}
diff --git a/src/main/java/de/lemaik/renderservice/renderer/rendering/RenderWorker.java b/src/main/java/de/lemaik/renderservice/renderer/rendering/RenderWorker.java
index ac96c51..23f2ffb 100644
--- a/src/main/java/de/lemaik/renderservice/renderer/rendering/RenderWorker.java
+++ b/src/main/java/de/lemaik/renderservice/renderer/rendering/RenderWorker.java
@@ -18,22 +18,17 @@
package de.lemaik.renderservice.renderer.rendering;
-import com.rabbitmq.client.Channel;
-import com.rabbitmq.client.Connection;
-import com.rabbitmq.client.ConnectionFactory;
-import com.rabbitmq.client.QueueingConsumer;
-import de.lemaik.renderservice.renderer.Main;
import de.lemaik.renderservice.renderer.chunky.ChunkyWrapperFactory;
-import java.io.IOException;
-import java.net.URISyntaxException;
+
+import java.net.URI;
import java.nio.file.Path;
-import java.security.KeyManagementException;
-import java.security.NoSuchAlgorithmException;
-import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.TimeoutException;
+import java.util.concurrent.TimeUnit;
+
+import de.lemaik.renderservice.renderer.message.Message;
+import de.lemaik.renderservice.renderer.message.TaskMessage;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -43,44 +38,33 @@
public class RenderWorker extends Thread {
private static final Logger LOGGER = LogManager.getLogger(RenderWorker.class);
- private static final String QUEUE_NAME = "rs_tasks_241";
- private final ExecutorService executorService;
+ private static final int MAX_RESTART_DELAY_SECONDS = 15 * 60; // 15 minutes
+ private static final int PROTOCOL_VERSION = 0;
+
+ private final String apiKey;
private final Path jobDirectory;
private final Path texturepacksDirectory;
private final ChunkyWrapperFactory chunkyFactory;
private final int threads;
private final int cpuLoad;
- private final int MAX_RESTART_DELAY_SECONDS = 15 * 60; // 15 minutes
private final RenderServerApiClient apiClient;
+
private int nextRestartDelaySeconds = 1;
- private ConnectionFactory factory;
- private Connection conn;
- private Channel channel;
- public RenderWorker(String uri, int threads, int cpuLoad, String name, Path jobDirectory,
- Path texturepacksDirectory, ChunkyWrapperFactory chunkyFactory,
- RenderServerApiClient apiClient) {
+ private final MessageClient.Factory connectionFactory;
+
+ public RenderWorker(URI uri, String apiKey, int threads, int cpuLoad, String name, Path jobDirectory,
+ Path texturepacksDirectory, ChunkyWrapperFactory chunkyFactory,
+ RenderServerApiClient apiClient) {
this.threads = threads;
this.cpuLoad = cpuLoad;
this.texturepacksDirectory = texturepacksDirectory;
- executorService = Executors.newFixedThreadPool(1);
this.jobDirectory = jobDirectory;
this.chunkyFactory = chunkyFactory;
this.apiClient = apiClient;
- factory = new ConnectionFactory();
- try {
- factory.setUri(uri);
- } catch (URISyntaxException | NoSuchAlgorithmException | KeyManagementException e) {
- throw new IllegalArgumentException("Invalid RabbitMQ URI", e);
- }
+ this.apiKey = apiKey;
- Map connectionProps = factory.getClientProperties();
- if (name != null) {
- connectionProps.put("x-rs-name", name);
- }
- connectionProps.put("x-rs-threads", threads);
- connectionProps.put("x-version", Main.VERSION);
- factory.setClientProperties(connectionProps);
+ connectionFactory = () -> new MessageClient(uri);
}
@Override
@@ -88,45 +72,30 @@ public void run() {
while (!interrupted()) {
LOGGER.info("Connecting");
try {
- connect();
- LOGGER.info("Connected");
- nextRestartDelaySeconds = 1;
+ MessageClient connection = connectionFactory.connect();
- QueueingConsumer consumer = new QueueingConsumer(channel);
- channel.basicQos(1, false); // only fetch tasks at once
- channel.basicConsume(QUEUE_NAME, false, consumer);
-
- while (!interrupted() && channel.isOpen()) {
+ // Connect
+ if (connection.connectBlocking(30, TimeUnit.SECONDS)) {
try {
- Path taskPath = jobDirectory.resolve(UUID.randomUUID().toString());
- taskPath.toFile().mkdir();
- executorService.submit(
- new TaskWorker(consumer.nextDelivery(), channel, taskPath,
- texturepacksDirectory, threads, cpuLoad, chunkyFactory.getChunkyInstance(),
- apiClient));
- } catch (InterruptedException e) {
- LOGGER.info("Worker loop interrupted", e);
- break;
+ LOGGER.info("Connected");
+ nextRestartDelaySeconds = 1;
+ renderProtocol(connection);
+ } finally {
+ connection.close();
}
+ } else {
+ LOGGER.info("Timed out while attempting to connect.");
}
} catch (Exception e) {
LOGGER.error("An error occurred in the worker loop", e);
}
- if (conn != null && conn.isOpen()) {
- try {
- conn.close(5000);
- } catch (IOException e) {
- LOGGER.error("An error occurred while shutting down", e);
- }
- }
-
if (!interrupted()) {
LOGGER.info("Waiting " + nextRestartDelaySeconds + " seconds before restarting...");
try {
- Thread.sleep(nextRestartDelaySeconds * 1000);
- nextRestartDelaySeconds = Math
- .min(MAX_RESTART_DELAY_SECONDS, nextRestartDelaySeconds * 2);
+ // noinspection BusyWait
+ Thread.sleep(nextRestartDelaySeconds * 1000L);
+ nextRestartDelaySeconds = Math.min(MAX_RESTART_DELAY_SECONDS, nextRestartDelaySeconds * 2);
} catch (InterruptedException e) {
LOGGER.warn("Interrupted while sleeping", e);
return;
@@ -137,12 +106,57 @@ public void run() {
}
}
- private void connect() throws IOException {
- try {
- conn = factory.newConnection();
- channel = conn.createChannel();
- } catch (TimeoutException e) {
- throw new IOException("Timeout while connecting to RabbitMQ", e);
+ private void renderProtocol(MessageClient connection) throws InterruptedException {
+ Message message;
+
+ // Server info
+ message = connection.poll(30, TimeUnit.SECONDS);
+ if (message.getServerInfo() == null || message.getServerInfo().getProtocolVersion() != PROTOCOL_VERSION) {
+ throw new RuntimeException("Remote is on an incompatible version!");
+ } else {
+ LOGGER.info(message.toString());
+ }
+
+ // Authenticate
+ message = connection.poll(30, TimeUnit.SECONDS);
+ if (message.getAuthenticationRequest()) {
+ connection.send(Message.authentication(apiKey));
+ } else {
+ throw new RuntimeException("Remote did not ask for authentication message!");
+ }
+
+ // Wait for Ok
+ message = connection.poll(30, TimeUnit.SECONDS);
+ if (!message.getAuthenticationOk()) {
+ throw new RuntimeException("Remote did not respond with authentication success!");
+ }
+
+ while (!interrupted() && connection.isOpen()) {
+ try {
+ Path taskPath = jobDirectory.resolve(UUID.randomUUID().toString());
+ if (!taskPath.toFile().mkdir()) {
+ throw new RuntimeException("Failed to create task folder.");
+ }
+
+ // Ask for a new task
+ connection.send(Message.taskGet());
+
+ // Wait for new task
+ message = connection.poll();
+ TaskMessage task = message.getTask();
+ if (task == null) {
+ if (connection.isClosed()) {
+ return;
+ }
+ throw new RuntimeException("Remote did not send a new task!");
+ }
+
+ // Render the task
+ new TaskWorker(task, connection, taskPath, texturepacksDirectory, threads, cpuLoad, chunkyFactory.getChunkyInstance(), apiClient).run();
+ } catch (InterruptedException e) {
+ LOGGER.info("Worker loop interrupted", e);
+ break;
+ }
}
}
}
diff --git a/src/main/java/de/lemaik/renderservice/renderer/rendering/Task.java b/src/main/java/de/lemaik/renderservice/renderer/rendering/Task.java
index 129f48b..ac554f4 100644
--- a/src/main/java/de/lemaik/renderservice/renderer/rendering/Task.java
+++ b/src/main/java/de/lemaik/renderservice/renderer/rendering/Task.java
@@ -18,10 +18,19 @@
package de.lemaik.renderservice.renderer.rendering;
+import de.lemaik.renderservice.renderer.message.TaskMessage;
+
public class Task {
private String jobId;
private int spp;
+ public static Task fromMessage(TaskMessage message) {
+ Task task = new Task();
+ task.jobId = message.getJobId();
+ task.spp = message.getSpp();
+ return task;
+ }
+
public String getJobId() {
return jobId;
}
diff --git a/src/main/java/de/lemaik/renderservice/renderer/rendering/TaskWorker.java b/src/main/java/de/lemaik/renderservice/renderer/rendering/TaskWorker.java
index 66de11c..7fe2b08 100644
--- a/src/main/java/de/lemaik/renderservice/renderer/rendering/TaskWorker.java
+++ b/src/main/java/de/lemaik/renderservice/renderer/rendering/TaskWorker.java
@@ -20,10 +20,10 @@
import com.google.gson.Gson;
import com.google.gson.JsonObject;
-import com.rabbitmq.client.Channel;
-import com.rabbitmq.client.QueueingConsumer;
import de.lemaik.renderservice.renderer.chunky.ChunkyRenderDump;
import de.lemaik.renderservice.renderer.chunky.ChunkyWrapper;
+import de.lemaik.renderservice.renderer.message.Message;
+import de.lemaik.renderservice.renderer.message.TaskMessage;
import de.lemaik.renderservice.renderer.util.FileUtil;
import java.awt.image.BufferedImage;
import java.io.ByteArrayInputStream;
@@ -47,8 +47,8 @@ public class TaskWorker implements Runnable {
private static final Logger LOGGER = LogManager.getLogger(TaskWorker.class);
private static final Gson gson = new Gson();
- private final QueueingConsumer.Delivery delivery;
- private final Channel channel;
+ private final TaskMessage task;
+ private final MessageClient connection;
private final Path workingDir;
private final Path texturepacksDir;
private final int threads;
@@ -56,11 +56,11 @@ public class TaskWorker implements Runnable {
private final ChunkyWrapper chunky;
private final RenderServerApiClient apiClient;
- public TaskWorker(QueueingConsumer.Delivery delivery, Channel channel, Path workingDir,
- Path texturepacksDir, int threads, int cpuLoad, ChunkyWrapper chunky,
- RenderServerApiClient apiClient) {
- this.delivery = delivery;
- this.channel = channel;
+ public TaskWorker(TaskMessage task, MessageClient connection, Path workingDir,
+ Path texturepacksDir, int threads, int cpuLoad, ChunkyWrapper chunky,
+ RenderServerApiClient apiClient) {
+ this.task = task;
+ this.connection = connection;
this.workingDir = workingDir;
this.texturepacksDir = texturepacksDir;
this.threads = threads;
@@ -72,25 +72,24 @@ public TaskWorker(QueueingConsumer.Delivery delivery, Channel channel, Path work
@Override
public void run() {
try {
- Task task = gson
- .fromJson(new String(delivery.getBody(), "UTF-8"), Task.class);
+ Task task = Task.fromMessage(this.task);
LOGGER.info("New task: {} spp for job {}", task.getSpp(), task.getJobId());
Job job = apiClient.getJob(task.getJobId()).get(10, TimeUnit.MINUTES);
if (job == null) {
LOGGER.info("Job was deleted, skipping the task and removing it from the queue");
- channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
+ connection.send(Message.taskComplete());
return;
}
if (job.isCancelled()) {
LOGGER.info("Job is cancelled, skipping the task and removing it from the queue");
- channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
+ connection.send(Message.taskComplete());
return;
}
if (job.getSpp() >= job.getTargetSpp()) {
LOGGER.info(
"Job is effectively finished ({} of {} spp), skipping the task and removing it from the queue",
job.getSpp(), job.getTargetSpp());
- channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
+ connection.send(Message.taskComplete());
return;
}
@@ -161,17 +160,11 @@ public void run() {
apiClient.postDump(job.getId(), dump).get(1, TimeUnit.HOURS);
}
- channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
+ connection.send(Message.taskComplete());
LOGGER.info("Done");
} catch (Exception e) {
LOGGER.warn("An error occurred while processing a task", e);
- if (channel.isOpen()) {
- try {
- channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);
- } catch (IOException e1) {
- LOGGER.error("Could not nack a failed task", e);
- }
- }
+ connection.close();
} finally {
FileUtil.deleteDirectory(workingDir.toFile());
}
diff --git a/src/main/resources/log4j2.xml b/src/main/resources/log4j2.xml
index 2a1626c..e56761a 100644
--- a/src/main/resources/log4j2.xml
+++ b/src/main/resources/log4j2.xml
@@ -25,7 +25,7 @@
-
+