From f22cf28420d3490abc2d381b855c31b065fdcbe8 Mon Sep 17 00:00:00 2001 From: Alex Date: Sat, 2 Apr 2022 15:49:59 -0700 Subject: [PATCH 1/3] Render node compatible with rust scheduler. Version https://github.com/ChunkyCloud/scheduler/commit/3e13ef018681945c9718dfe88578d857531b5d1c --- pom.xml | 6 + .../application/RendererApplication.java | 7 +- .../message/AuthenticationMessage.java | 13 ++ .../renderer/message/ErrorMessage.java | 13 ++ .../renderer/message/Message.java | 122 ++++++++++++++++ .../renderer/message/TaskMessage.java | 19 +++ .../renderer/message/WarningMessage.java | 13 ++ .../renderer/rendering/MessageClient.java | 106 ++++++++++++++ .../renderer/rendering/RenderServiceInfo.java | 5 + .../renderer/rendering/RenderWorker.java | 135 +++++++++--------- .../renderer/rendering/Task.java | 9 ++ .../renderer/rendering/TaskWorker.java | 37 ++--- src/main/resources/log4j2.xml | 2 +- 13 files changed, 391 insertions(+), 96 deletions(-) create mode 100644 src/main/java/de/lemaik/renderservice/renderer/message/AuthenticationMessage.java create mode 100644 src/main/java/de/lemaik/renderservice/renderer/message/ErrorMessage.java create mode 100644 src/main/java/de/lemaik/renderservice/renderer/message/Message.java create mode 100644 src/main/java/de/lemaik/renderservice/renderer/message/TaskMessage.java create mode 100644 src/main/java/de/lemaik/renderservice/renderer/message/WarningMessage.java create mode 100644 src/main/java/de/lemaik/renderservice/renderer/rendering/MessageClient.java diff --git a/pom.xml b/pom.xml index 19b9781..62cf83c 100644 --- a/pom.xml +++ b/pom.xml @@ -77,6 +77,12 @@ amqp-client 3.6.5 + + + org.java-websocket + Java-WebSocket + 1.5.2 + com.squareup.okhttp3 okhttp diff --git a/src/main/java/de/lemaik/renderservice/renderer/application/RendererApplication.java b/src/main/java/de/lemaik/renderservice/renderer/application/RendererApplication.java index 66f1d98..8ef405e 100644 --- a/src/main/java/de/lemaik/renderservice/renderer/application/RendererApplication.java +++ b/src/main/java/de/lemaik/renderservice/renderer/application/RendererApplication.java @@ -132,17 +132,14 @@ public void start() { // (username is the first 8 characters of the api key) URI queueUri; try { - URI url = new URI(rsInfo.getRabbitMq()); - queueUri = new URI( - url.getScheme() + "://" + settings.getApiKey().substring(0, 8) + ":" + settings - .getApiKey() + "@" + url.getHost() + ":" + url.getPort() + url.getPath()); + queueUri = new URI(rsInfo.getWsUrl()).resolve("/rendernode"); } catch (URISyntaxException e) { LOGGER.error("Invalid queue url or api key", e); System.exit(-1); return; } - worker = new RenderWorker(queueUri.toString(), getSettings().getThreads().orElse(2), + worker = new RenderWorker(queueUri, settings.getApiKey(), getSettings().getThreads().orElse(2), getSettings().getCpuLoad().orElse(100), getSettings().getName().orElse(null), jobDirectory, texturepacksDirectory, chunkyWrapperFactory, api); diff --git a/src/main/java/de/lemaik/renderservice/renderer/message/AuthenticationMessage.java b/src/main/java/de/lemaik/renderservice/renderer/message/AuthenticationMessage.java new file mode 100644 index 0000000..f155804 --- /dev/null +++ b/src/main/java/de/lemaik/renderservice/renderer/message/AuthenticationMessage.java @@ -0,0 +1,13 @@ +package de.lemaik.renderservice.renderer.message; + +public class AuthenticationMessage { + protected String token; + + protected AuthenticationMessage() { + this.token = null; + } + + public String getToken() { + return token; + } +} diff --git a/src/main/java/de/lemaik/renderservice/renderer/message/ErrorMessage.java b/src/main/java/de/lemaik/renderservice/renderer/message/ErrorMessage.java new file mode 100644 index 0000000..511a674 --- /dev/null +++ b/src/main/java/de/lemaik/renderservice/renderer/message/ErrorMessage.java @@ -0,0 +1,13 @@ +package de.lemaik.renderservice.renderer.message; + +public class ErrorMessage { + protected String message; + + protected ErrorMessage() { + this.message = null; + } + + public String getMessage() { + return message; + } +} diff --git a/src/main/java/de/lemaik/renderservice/renderer/message/Message.java b/src/main/java/de/lemaik/renderservice/renderer/message/Message.java new file mode 100644 index 0000000..080f7ba --- /dev/null +++ b/src/main/java/de/lemaik/renderservice/renderer/message/Message.java @@ -0,0 +1,122 @@ +package de.lemaik.renderservice.renderer.message; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.JsonSyntaxException; + +public class Message { + private static final Gson GSON = new Gson(); + + private ErrorMessage Error; + private WarningMessage Warning; + private Object[] AuthenticationRequest; + private AuthenticationMessage Authentication; + private Object[] TaskGet; + private TaskMessage Task; + private Object[] TaskComplete; + + public Message() { + this.Error = null; + this.Warning = null; + this.AuthenticationRequest = null; + this.Authentication = null; + this.TaskGet = null; + this.Task = null; + } + + public static Message parse(String message) throws JsonSyntaxException { + return GSON.fromJson(message, Message.class); + } + + public String toJson() { + return toJson(GSON); + } + + private String toJson(Gson gson) { + return gson.toJson(this); + } + + @Override + public String toString() { + return toJson(new GsonBuilder().setPrettyPrinting().create()); + } + + public static Message empty() { + return new Message(); + } + + public static Message error(String message) { + Message m = new Message(); + m.Error = new ErrorMessage(); + m.Error.message = message; + return m; + } + + public ErrorMessage getError() { + return Error; + } + + public static Message warning(String message) { + Message m = new Message(); + m.Warning = new WarningMessage(); + m.Warning.message = message; + return m; + } + + public WarningMessage getWarning() { + return Warning; + } + + public static Message authenticationRequest() { + Message m = new Message(); + m.AuthenticationRequest = new Object[0]; + return m; + } + + public boolean getAuthenticationRequest() { + return AuthenticationRequest != null; + } + + public static Message authentication(String token) { + Message m = new Message(); + m.Authentication = new AuthenticationMessage(); + m.Authentication.token = token; + return m; + } + + public AuthenticationMessage getAuthentication() { + return Authentication; + } + + public static Message taskGet() { + Message m = new Message(); + m.TaskGet = new Object[0]; + return m; + } + + public boolean getTaskGet() { + return TaskGet != null; + } + + public static Message task(String jobId, int spp) { + Message m = new Message(); + m.Task = new TaskMessage(); + m.Task.job_id = jobId; + m.Task.spp = spp; + return m; + } + + public TaskMessage getTask() { + return Task; + } + + public static Message taskComplete() { + Message m = new Message(); + m.TaskComplete = new Object[0]; + return m; + } + + public boolean getTaskComplete() { + return TaskComplete != null; + } +} diff --git a/src/main/java/de/lemaik/renderservice/renderer/message/TaskMessage.java b/src/main/java/de/lemaik/renderservice/renderer/message/TaskMessage.java new file mode 100644 index 0000000..4af4ecd --- /dev/null +++ b/src/main/java/de/lemaik/renderservice/renderer/message/TaskMessage.java @@ -0,0 +1,19 @@ +package de.lemaik.renderservice.renderer.message; + +public class TaskMessage { + protected String job_id; + protected int spp; + + protected TaskMessage() { + this.job_id = null; + this.spp = 0; + } + + public String getJobId() { + return this.job_id; + } + + public int getSpp() { + return this.spp; + } +} diff --git a/src/main/java/de/lemaik/renderservice/renderer/message/WarningMessage.java b/src/main/java/de/lemaik/renderservice/renderer/message/WarningMessage.java new file mode 100644 index 0000000..00b71fd --- /dev/null +++ b/src/main/java/de/lemaik/renderservice/renderer/message/WarningMessage.java @@ -0,0 +1,13 @@ +package de.lemaik.renderservice.renderer.message; + +public class WarningMessage { + protected String message; + + protected WarningMessage() { + this.message = null; + } + + public String getMessage() { + return message; + } +} diff --git a/src/main/java/de/lemaik/renderservice/renderer/rendering/MessageClient.java b/src/main/java/de/lemaik/renderservice/renderer/rendering/MessageClient.java new file mode 100644 index 0000000..ddbfa6d --- /dev/null +++ b/src/main/java/de/lemaik/renderservice/renderer/rendering/MessageClient.java @@ -0,0 +1,106 @@ +package de.lemaik.renderservice.renderer.rendering; + +import com.google.gson.JsonSyntaxException; +import de.lemaik.renderservice.renderer.message.Message; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.java_websocket.client.WebSocketClient; +import org.java_websocket.handshake.ServerHandshake; + +import java.net.URI; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.TimeUnit; + +public class MessageClient extends WebSocketClient { + + private static final Logger LOGGER = LogManager.getLogger(MessageClient.class); + + /** + * We shouldn't ever have more then a handful of messages in waiting. + */ + private static final int QUEUE_CAPACITY = 8; + + private final ArrayBlockingQueue messages = new ArrayBlockingQueue<>(QUEUE_CAPACITY); + + public interface Factory { + MessageClient connect(); + } + + public MessageClient(URI serverUri) { + super(serverUri); + } + + @Override + public void onOpen(ServerHandshake serverHandshake) { + LOGGER.info("Websocket client connected."); + } + + @Override + public void onMessage(String s) { + LOGGER.debug("Received raw message: {}", s); + + Message message; + try { + message = Message.parse(s); + } catch (JsonSyntaxException e) { + LOGGER.info("Received invalid JSON: ", e); + return; + } + + // Error message + if (message.getError() != null) { + LOGGER.error("Received error message: {}", message.getError().getMessage()); + close(); + return; + } + + // Warning message + if (message.getWarning() != null) { + LOGGER.warn("Received warning message: {}", message.getWarning().getMessage()); + return; + } + + LOGGER.info("Received message: {}", message); + if (!messages.offer(message)) { + LOGGER.error("Message queue full."); + close(); + } + } + + @Override + public void onClose(int i, String s, boolean b) { + LOGGER.info("Websocket client closed by {} with code {}: {}", b ? "remote": "us", i, s); + this.messages.offer(Message.empty()); // Force any listeners to close + } + + @Override + public void onError(Exception e) { + LOGGER.error("Encountered error in websocket client: ", e); + } + + /** + * Send a message. + */ + public void send(Message message) { + LOGGER.debug("Sent message: {}", message::toString); + this.send(message.toJson()); + } + + /** + * Wait for the next message. + */ + public Message poll() throws InterruptedException { + return this.messages.take(); + } + + /** + * Wait for the next message. + */ + public Message poll(long timeout, TimeUnit unit) throws InterruptedException { + Message m = this.messages.poll(timeout, unit); + if (m == null) { + return Message.empty(); + } + return m; + } +} diff --git a/src/main/java/de/lemaik/renderservice/renderer/rendering/RenderServiceInfo.java b/src/main/java/de/lemaik/renderservice/renderer/rendering/RenderServiceInfo.java index c2c2f99..dfb9bb6 100644 --- a/src/main/java/de/lemaik/renderservice/renderer/rendering/RenderServiceInfo.java +++ b/src/main/java/de/lemaik/renderservice/renderer/rendering/RenderServiceInfo.java @@ -21,6 +21,7 @@ public class RenderServiceInfo { private int version; private String rabbitMq; + private String wsUrl; public int getVersion() { return version; @@ -29,4 +30,8 @@ public int getVersion() { public String getRabbitMq() { return rabbitMq; } + + public String getWsUrl() { + return wsUrl; + } } diff --git a/src/main/java/de/lemaik/renderservice/renderer/rendering/RenderWorker.java b/src/main/java/de/lemaik/renderservice/renderer/rendering/RenderWorker.java index ac96c51..c148bed 100644 --- a/src/main/java/de/lemaik/renderservice/renderer/rendering/RenderWorker.java +++ b/src/main/java/de/lemaik/renderservice/renderer/rendering/RenderWorker.java @@ -18,22 +18,17 @@ package de.lemaik.renderservice.renderer.rendering; -import com.rabbitmq.client.Channel; -import com.rabbitmq.client.Connection; -import com.rabbitmq.client.ConnectionFactory; -import com.rabbitmq.client.QueueingConsumer; -import de.lemaik.renderservice.renderer.Main; import de.lemaik.renderservice.renderer.chunky.ChunkyWrapperFactory; -import java.io.IOException; -import java.net.URISyntaxException; + +import java.net.URI; import java.nio.file.Path; -import java.security.KeyManagementException; -import java.security.NoSuchAlgorithmException; -import java.util.Map; import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.TimeoutException; +import java.util.concurrent.TimeUnit; + +import de.lemaik.renderservice.renderer.message.Message; +import de.lemaik.renderservice.renderer.message.TaskMessage; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -43,44 +38,32 @@ public class RenderWorker extends Thread { private static final Logger LOGGER = LogManager.getLogger(RenderWorker.class); - private static final String QUEUE_NAME = "rs_tasks_241"; - private final ExecutorService executorService; + private static final int MAX_RESTART_DELAY_SECONDS = 15 * 60; // 15 minutes + + private final String apiKey; private final Path jobDirectory; private final Path texturepacksDirectory; private final ChunkyWrapperFactory chunkyFactory; private final int threads; private final int cpuLoad; - private final int MAX_RESTART_DELAY_SECONDS = 15 * 60; // 15 minutes private final RenderServerApiClient apiClient; + private int nextRestartDelaySeconds = 1; - private ConnectionFactory factory; - private Connection conn; - private Channel channel; - public RenderWorker(String uri, int threads, int cpuLoad, String name, Path jobDirectory, - Path texturepacksDirectory, ChunkyWrapperFactory chunkyFactory, - RenderServerApiClient apiClient) { + private final MessageClient.Factory connectionFactory; + + public RenderWorker(URI uri, String apiKey, int threads, int cpuLoad, String name, Path jobDirectory, + Path texturepacksDirectory, ChunkyWrapperFactory chunkyFactory, + RenderServerApiClient apiClient) { this.threads = threads; this.cpuLoad = cpuLoad; this.texturepacksDirectory = texturepacksDirectory; - executorService = Executors.newFixedThreadPool(1); this.jobDirectory = jobDirectory; this.chunkyFactory = chunkyFactory; this.apiClient = apiClient; - factory = new ConnectionFactory(); - try { - factory.setUri(uri); - } catch (URISyntaxException | NoSuchAlgorithmException | KeyManagementException e) { - throw new IllegalArgumentException("Invalid RabbitMQ URI", e); - } + this.apiKey = apiKey; - Map connectionProps = factory.getClientProperties(); - if (name != null) { - connectionProps.put("x-rs-name", name); - } - connectionProps.put("x-rs-threads", threads); - connectionProps.put("x-version", Main.VERSION); - factory.setClientProperties(connectionProps); + connectionFactory = () -> new MessageClient(uri); } @Override @@ -88,45 +71,30 @@ public void run() { while (!interrupted()) { LOGGER.info("Connecting"); try { - connect(); - LOGGER.info("Connected"); - nextRestartDelaySeconds = 1; - - QueueingConsumer consumer = new QueueingConsumer(channel); - channel.basicQos(1, false); // only fetch tasks at once - channel.basicConsume(QUEUE_NAME, false, consumer); + MessageClient connection = connectionFactory.connect(); - while (!interrupted() && channel.isOpen()) { + // Connect + if (connection.connectBlocking(30, TimeUnit.SECONDS)) { try { - Path taskPath = jobDirectory.resolve(UUID.randomUUID().toString()); - taskPath.toFile().mkdir(); - executorService.submit( - new TaskWorker(consumer.nextDelivery(), channel, taskPath, - texturepacksDirectory, threads, cpuLoad, chunkyFactory.getChunkyInstance(), - apiClient)); - } catch (InterruptedException e) { - LOGGER.info("Worker loop interrupted", e); - break; + LOGGER.info("Connected"); + nextRestartDelaySeconds = 1; + renderProtocol(connection); + } finally { + connection.close(); } + } else { + LOGGER.info("Timed out while attempting to connect."); } } catch (Exception e) { LOGGER.error("An error occurred in the worker loop", e); } - if (conn != null && conn.isOpen()) { - try { - conn.close(5000); - } catch (IOException e) { - LOGGER.error("An error occurred while shutting down", e); - } - } - if (!interrupted()) { LOGGER.info("Waiting " + nextRestartDelaySeconds + " seconds before restarting..."); try { - Thread.sleep(nextRestartDelaySeconds * 1000); - nextRestartDelaySeconds = Math - .min(MAX_RESTART_DELAY_SECONDS, nextRestartDelaySeconds * 2); + // noinspection BusyWait + Thread.sleep(nextRestartDelaySeconds * 1000L); + nextRestartDelaySeconds = Math.min(MAX_RESTART_DELAY_SECONDS, nextRestartDelaySeconds * 2); } catch (InterruptedException e) { LOGGER.warn("Interrupted while sleeping", e); return; @@ -137,12 +105,43 @@ public void run() { } } - private void connect() throws IOException { - try { - conn = factory.newConnection(); - channel = conn.createChannel(); - } catch (TimeoutException e) { - throw new IOException("Timeout while connecting to RabbitMQ", e); + private void renderProtocol(MessageClient connection) throws InterruptedException { + Message message; + + // Authenticate + message = connection.poll(30, TimeUnit.SECONDS); + if (message.getAuthenticationRequest()) { + connection.send(Message.authentication(apiKey)); + } else { + throw new RuntimeException("Remote did not ask for authentication message!"); + } + + while (!interrupted() && connection.isOpen()) { + try { + Path taskPath = jobDirectory.resolve(UUID.randomUUID().toString()); + if (!taskPath.toFile().mkdir()) { + throw new RuntimeException("Failed to create task folder."); + } + + // Ask for a new task + connection.send(Message.taskGet()); + + // Wait for new task + message = connection.poll(); + TaskMessage task = message.getTask(); + if (task == null) { + if (connection.isClosed()) { + return; + } + throw new RuntimeException("Remote did not send a new task!"); + } + + // Render the task + new TaskWorker(task, connection, taskPath, texturepacksDirectory, threads, cpuLoad, chunkyFactory.getChunkyInstance(), apiClient).run(); + } catch (InterruptedException e) { + LOGGER.info("Worker loop interrupted", e); + break; + } } } } diff --git a/src/main/java/de/lemaik/renderservice/renderer/rendering/Task.java b/src/main/java/de/lemaik/renderservice/renderer/rendering/Task.java index 129f48b..ac554f4 100644 --- a/src/main/java/de/lemaik/renderservice/renderer/rendering/Task.java +++ b/src/main/java/de/lemaik/renderservice/renderer/rendering/Task.java @@ -18,10 +18,19 @@ package de.lemaik.renderservice.renderer.rendering; +import de.lemaik.renderservice.renderer.message.TaskMessage; + public class Task { private String jobId; private int spp; + public static Task fromMessage(TaskMessage message) { + Task task = new Task(); + task.jobId = message.getJobId(); + task.spp = message.getSpp(); + return task; + } + public String getJobId() { return jobId; } diff --git a/src/main/java/de/lemaik/renderservice/renderer/rendering/TaskWorker.java b/src/main/java/de/lemaik/renderservice/renderer/rendering/TaskWorker.java index 66de11c..7fe2b08 100644 --- a/src/main/java/de/lemaik/renderservice/renderer/rendering/TaskWorker.java +++ b/src/main/java/de/lemaik/renderservice/renderer/rendering/TaskWorker.java @@ -20,10 +20,10 @@ import com.google.gson.Gson; import com.google.gson.JsonObject; -import com.rabbitmq.client.Channel; -import com.rabbitmq.client.QueueingConsumer; import de.lemaik.renderservice.renderer.chunky.ChunkyRenderDump; import de.lemaik.renderservice.renderer.chunky.ChunkyWrapper; +import de.lemaik.renderservice.renderer.message.Message; +import de.lemaik.renderservice.renderer.message.TaskMessage; import de.lemaik.renderservice.renderer.util.FileUtil; import java.awt.image.BufferedImage; import java.io.ByteArrayInputStream; @@ -47,8 +47,8 @@ public class TaskWorker implements Runnable { private static final Logger LOGGER = LogManager.getLogger(TaskWorker.class); private static final Gson gson = new Gson(); - private final QueueingConsumer.Delivery delivery; - private final Channel channel; + private final TaskMessage task; + private final MessageClient connection; private final Path workingDir; private final Path texturepacksDir; private final int threads; @@ -56,11 +56,11 @@ public class TaskWorker implements Runnable { private final ChunkyWrapper chunky; private final RenderServerApiClient apiClient; - public TaskWorker(QueueingConsumer.Delivery delivery, Channel channel, Path workingDir, - Path texturepacksDir, int threads, int cpuLoad, ChunkyWrapper chunky, - RenderServerApiClient apiClient) { - this.delivery = delivery; - this.channel = channel; + public TaskWorker(TaskMessage task, MessageClient connection, Path workingDir, + Path texturepacksDir, int threads, int cpuLoad, ChunkyWrapper chunky, + RenderServerApiClient apiClient) { + this.task = task; + this.connection = connection; this.workingDir = workingDir; this.texturepacksDir = texturepacksDir; this.threads = threads; @@ -72,25 +72,24 @@ public TaskWorker(QueueingConsumer.Delivery delivery, Channel channel, Path work @Override public void run() { try { - Task task = gson - .fromJson(new String(delivery.getBody(), "UTF-8"), Task.class); + Task task = Task.fromMessage(this.task); LOGGER.info("New task: {} spp for job {}", task.getSpp(), task.getJobId()); Job job = apiClient.getJob(task.getJobId()).get(10, TimeUnit.MINUTES); if (job == null) { LOGGER.info("Job was deleted, skipping the task and removing it from the queue"); - channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); + connection.send(Message.taskComplete()); return; } if (job.isCancelled()) { LOGGER.info("Job is cancelled, skipping the task and removing it from the queue"); - channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); + connection.send(Message.taskComplete()); return; } if (job.getSpp() >= job.getTargetSpp()) { LOGGER.info( "Job is effectively finished ({} of {} spp), skipping the task and removing it from the queue", job.getSpp(), job.getTargetSpp()); - channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); + connection.send(Message.taskComplete()); return; } @@ -161,17 +160,11 @@ public void run() { apiClient.postDump(job.getId(), dump).get(1, TimeUnit.HOURS); } - channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); + connection.send(Message.taskComplete()); LOGGER.info("Done"); } catch (Exception e) { LOGGER.warn("An error occurred while processing a task", e); - if (channel.isOpen()) { - try { - channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true); - } catch (IOException e1) { - LOGGER.error("Could not nack a failed task", e); - } - } + connection.close(); } finally { FileUtil.deleteDirectory(workingDir.toFile()); } diff --git a/src/main/resources/log4j2.xml b/src/main/resources/log4j2.xml index 2a1626c..e56761a 100644 --- a/src/main/resources/log4j2.xml +++ b/src/main/resources/log4j2.xml @@ -25,7 +25,7 @@ - + From 58ed60b1a89a557175bfd98537e97d9f20e826ae Mon Sep 17 00:00:00 2001 From: Alex Date: Sat, 2 Apr 2022 16:42:53 -0700 Subject: [PATCH 2/3] Compatibility for protocol version 0. --- .../renderservice/renderer/message/Message.java | 10 ++++++++++ .../renderer/message/ServerInfoMessage.java | 13 +++++++++++++ .../renderer/rendering/RenderWorker.java | 15 +++++++++++++++ 3 files changed, 38 insertions(+) create mode 100644 src/main/java/de/lemaik/renderservice/renderer/message/ServerInfoMessage.java diff --git a/src/main/java/de/lemaik/renderservice/renderer/message/Message.java b/src/main/java/de/lemaik/renderservice/renderer/message/Message.java index 080f7ba..454c25a 100644 --- a/src/main/java/de/lemaik/renderservice/renderer/message/Message.java +++ b/src/main/java/de/lemaik/renderservice/renderer/message/Message.java @@ -9,8 +9,10 @@ public class Message { private ErrorMessage Error; private WarningMessage Warning; + private ServerInfoMessage ServerInfo; private Object[] AuthenticationRequest; private AuthenticationMessage Authentication; + private Object[] AuthenticationOk; private Object[] TaskGet; private TaskMessage Task; private Object[] TaskComplete; @@ -73,6 +75,10 @@ public static Message authenticationRequest() { return m; } + public ServerInfoMessage getServerInfo() { + return ServerInfo; + } + public boolean getAuthenticationRequest() { return AuthenticationRequest != null; } @@ -88,6 +94,10 @@ public AuthenticationMessage getAuthentication() { return Authentication; } + public boolean getAuthenticationOk() { + return AuthenticationOk != null; + } + public static Message taskGet() { Message m = new Message(); m.TaskGet = new Object[0]; diff --git a/src/main/java/de/lemaik/renderservice/renderer/message/ServerInfoMessage.java b/src/main/java/de/lemaik/renderservice/renderer/message/ServerInfoMessage.java new file mode 100644 index 0000000..bc36e4c --- /dev/null +++ b/src/main/java/de/lemaik/renderservice/renderer/message/ServerInfoMessage.java @@ -0,0 +1,13 @@ +package de.lemaik.renderservice.renderer.message; + +public class ServerInfoMessage { + protected int protocol_version; + + protected ServerInfoMessage() { + protocol_version = 0; + } + + public int getProtocolVersion() { + return this.protocol_version; + } +} diff --git a/src/main/java/de/lemaik/renderservice/renderer/rendering/RenderWorker.java b/src/main/java/de/lemaik/renderservice/renderer/rendering/RenderWorker.java index c148bed..baaaec6 100644 --- a/src/main/java/de/lemaik/renderservice/renderer/rendering/RenderWorker.java +++ b/src/main/java/de/lemaik/renderservice/renderer/rendering/RenderWorker.java @@ -39,6 +39,7 @@ public class RenderWorker extends Thread { private static final Logger LOGGER = LogManager.getLogger(RenderWorker.class); private static final int MAX_RESTART_DELAY_SECONDS = 15 * 60; // 15 minutes + private static final int PROTOCOL_VERSION = 0; private final String apiKey; private final Path jobDirectory; @@ -108,6 +109,14 @@ public void run() { private void renderProtocol(MessageClient connection) throws InterruptedException { Message message; + // Server info + message = connection.poll(30, TimeUnit.SECONDS); + if (message.getServerInfo() == null || message.getServerInfo().getProtocolVersion() != 0) { + throw new RuntimeException("Remote is on an incompatible version!"); + } else { + LOGGER.info(message.toString()); + } + // Authenticate message = connection.poll(30, TimeUnit.SECONDS); if (message.getAuthenticationRequest()) { @@ -116,6 +125,12 @@ private void renderProtocol(MessageClient connection) throws InterruptedExceptio throw new RuntimeException("Remote did not ask for authentication message!"); } + // Wait for Ok + message = connection.poll(30, TimeUnit.SECONDS); + if (!message.getAuthenticationOk()) { + throw new RuntimeException("Remote did not respond with authentication success!"); + } + while (!interrupted() && connection.isOpen()) { try { Path taskPath = jobDirectory.resolve(UUID.randomUUID().toString()); From 3fc6f218192dd8adafa181abe561267c1797d516 Mon Sep 17 00:00:00 2001 From: Alex Date: Mon, 11 Apr 2022 20:48:50 -0700 Subject: [PATCH 3/3] Use protocol version variable instead of hardcoded constant. --- .../lemaik/renderservice/renderer/rendering/RenderWorker.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/de/lemaik/renderservice/renderer/rendering/RenderWorker.java b/src/main/java/de/lemaik/renderservice/renderer/rendering/RenderWorker.java index baaaec6..23f2ffb 100644 --- a/src/main/java/de/lemaik/renderservice/renderer/rendering/RenderWorker.java +++ b/src/main/java/de/lemaik/renderservice/renderer/rendering/RenderWorker.java @@ -111,7 +111,7 @@ private void renderProtocol(MessageClient connection) throws InterruptedExceptio // Server info message = connection.poll(30, TimeUnit.SECONDS); - if (message.getServerInfo() == null || message.getServerInfo().getProtocolVersion() != 0) { + if (message.getServerInfo() == null || message.getServerInfo().getProtocolVersion() != PROTOCOL_VERSION) { throw new RuntimeException("Remote is on an incompatible version!"); } else { LOGGER.info(message.toString());