Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,12 @@
<artifactId>amqp-client</artifactId>
<version>3.6.5</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.java-websocket/Java-WebSocket -->
<dependency>
<groupId>org.java-websocket</groupId>
<artifactId>Java-WebSocket</artifactId>
<version>1.5.2</version>
</dependency>
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,17 +132,14 @@ public void start() {
// (username is the first 8 characters of the api key)
URI queueUri;
try {
URI url = new URI(rsInfo.getRabbitMq());
queueUri = new URI(
url.getScheme() + "://" + settings.getApiKey().substring(0, 8) + ":" + settings
.getApiKey() + "@" + url.getHost() + ":" + url.getPort() + url.getPath());
queueUri = new URI(rsInfo.getWsUrl()).resolve("/rendernode");
} catch (URISyntaxException e) {
LOGGER.error("Invalid queue url or api key", e);
System.exit(-1);
return;
}

worker = new RenderWorker(queueUri.toString(), getSettings().getThreads().orElse(2),
worker = new RenderWorker(queueUri, settings.getApiKey(), getSettings().getThreads().orElse(2),
getSettings().getCpuLoad().orElse(100),
getSettings().getName().orElse(null), jobDirectory, texturepacksDirectory,
chunkyWrapperFactory, api);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package de.lemaik.renderservice.renderer.message;

public class AuthenticationMessage {
protected String token;

protected AuthenticationMessage() {
this.token = null;
}

public String getToken() {
return token;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package de.lemaik.renderservice.renderer.message;

public class ErrorMessage {
protected String message;

protected ErrorMessage() {
this.message = null;
}

public String getMessage() {
return message;
}
}
132 changes: 132 additions & 0 deletions src/main/java/de/lemaik/renderservice/renderer/message/Message.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package de.lemaik.renderservice.renderer.message;

import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonSyntaxException;

public class Message {
private static final Gson GSON = new Gson();

private ErrorMessage Error;
private WarningMessage Warning;
private ServerInfoMessage ServerInfo;
private Object[] AuthenticationRequest;
private AuthenticationMessage Authentication;
private Object[] AuthenticationOk;
private Object[] TaskGet;
private TaskMessage Task;
private Object[] TaskComplete;

public Message() {
this.Error = null;
this.Warning = null;
this.AuthenticationRequest = null;
this.Authentication = null;
this.TaskGet = null;
this.Task = null;
}

public static Message parse(String message) throws JsonSyntaxException {
return GSON.fromJson(message, Message.class);
}

public String toJson() {
return toJson(GSON);
}

private String toJson(Gson gson) {
return gson.toJson(this);
}

@Override
public String toString() {
return toJson(new GsonBuilder().setPrettyPrinting().create());
}

public static Message empty() {
return new Message();
}

public static Message error(String message) {
Message m = new Message();
m.Error = new ErrorMessage();
m.Error.message = message;
return m;
}

public ErrorMessage getError() {
return Error;
}

public static Message warning(String message) {
Message m = new Message();
m.Warning = new WarningMessage();
m.Warning.message = message;
return m;
}

public WarningMessage getWarning() {
return Warning;
}

public static Message authenticationRequest() {
Message m = new Message();
m.AuthenticationRequest = new Object[0];
return m;
}

public ServerInfoMessage getServerInfo() {
return ServerInfo;
}

public boolean getAuthenticationRequest() {
return AuthenticationRequest != null;
}

public static Message authentication(String token) {
Message m = new Message();
m.Authentication = new AuthenticationMessage();
m.Authentication.token = token;
return m;
}

public AuthenticationMessage getAuthentication() {
return Authentication;
}

public boolean getAuthenticationOk() {
return AuthenticationOk != null;
}

public static Message taskGet() {
Message m = new Message();
m.TaskGet = new Object[0];
return m;
}

public boolean getTaskGet() {
return TaskGet != null;
}

public static Message task(String jobId, int spp) {
Message m = new Message();
m.Task = new TaskMessage();
m.Task.job_id = jobId;
m.Task.spp = spp;
return m;
}

public TaskMessage getTask() {
return Task;
}

public static Message taskComplete() {
Message m = new Message();
m.TaskComplete = new Object[0];
return m;
}

public boolean getTaskComplete() {
return TaskComplete != null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package de.lemaik.renderservice.renderer.message;

public class ServerInfoMessage {
protected int protocol_version;

protected ServerInfoMessage() {
protocol_version = 0;
}

public int getProtocolVersion() {
return this.protocol_version;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package de.lemaik.renderservice.renderer.message;

public class TaskMessage {
protected String job_id;
protected int spp;

protected TaskMessage() {
this.job_id = null;
this.spp = 0;
}

public String getJobId() {
return this.job_id;
}

public int getSpp() {
return this.spp;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package de.lemaik.renderservice.renderer.message;

public class WarningMessage {
protected String message;

protected WarningMessage() {
this.message = null;
}

public String getMessage() {
return message;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package de.lemaik.renderservice.renderer.rendering;

import com.google.gson.JsonSyntaxException;
import de.lemaik.renderservice.renderer.message.Message;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.handshake.ServerHandshake;

import java.net.URI;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;

public class MessageClient extends WebSocketClient {

private static final Logger LOGGER = LogManager.getLogger(MessageClient.class);

/**
* We shouldn't ever have more then a handful of messages in waiting.
*/
private static final int QUEUE_CAPACITY = 8;

private final ArrayBlockingQueue<Message> messages = new ArrayBlockingQueue<>(QUEUE_CAPACITY);

public interface Factory {
MessageClient connect();
}

public MessageClient(URI serverUri) {
super(serverUri);
}

@Override
public void onOpen(ServerHandshake serverHandshake) {
LOGGER.info("Websocket client connected.");
}

@Override
public void onMessage(String s) {
LOGGER.debug("Received raw message: {}", s);

Message message;
try {
message = Message.parse(s);
} catch (JsonSyntaxException e) {
LOGGER.info("Received invalid JSON: ", e);
return;
}

// Error message
if (message.getError() != null) {
LOGGER.error("Received error message: {}", message.getError().getMessage());
close();
return;
}

// Warning message
if (message.getWarning() != null) {
LOGGER.warn("Received warning message: {}", message.getWarning().getMessage());
return;
}

LOGGER.info("Received message: {}", message);
if (!messages.offer(message)) {
LOGGER.error("Message queue full.");
close();
}
}

@Override
public void onClose(int i, String s, boolean b) {
LOGGER.info("Websocket client closed by {} with code {}: {}", b ? "remote": "us", i, s);
this.messages.offer(Message.empty()); // Force any listeners to close
}

@Override
public void onError(Exception e) {
LOGGER.error("Encountered error in websocket client: ", e);
}

/**
* Send a message.
*/
public void send(Message message) {
LOGGER.debug("Sent message: {}", message::toString);
this.send(message.toJson());
}

/**
* Wait for the next message.
*/
public Message poll() throws InterruptedException {
return this.messages.take();
}

/**
* Wait for the next message.
*/
public Message poll(long timeout, TimeUnit unit) throws InterruptedException {
Message m = this.messages.poll(timeout, unit);
if (m == null) {
return Message.empty();
}
return m;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
public class RenderServiceInfo {
private int version;
private String rabbitMq;
private String wsUrl;

public int getVersion() {
return version;
Expand All @@ -29,4 +30,8 @@ public int getVersion() {
public String getRabbitMq() {
return rabbitMq;
}

public String getWsUrl() {
return wsUrl;
}
}
Loading