Rework beacon

This commit is contained in:
Christopher Schnick 2021-12-13 15:29:04 +01:00
parent 5356d0d027
commit a523dd210b
18 changed files with 186 additions and 322 deletions

View file

@ -1,9 +1,9 @@
package io.xpipe.api;
import io.xpipe.beacon.*;
import io.xpipe.beacon.socket.SocketClient;
import io.xpipe.beacon.BeaconClient;
public abstract class XPipeApiConnector extends XPipeConnector {
public abstract class XPipeApiConnector extends BeaconConnector {
public void execute() {
try {
@ -20,7 +20,7 @@ public abstract class XPipeApiConnector extends XPipeConnector {
}
}
protected abstract void handle(SocketClient sc) throws Exception;
protected abstract void handle(BeaconClient sc) throws Exception;
@Override
protected void waitForStartup() {
@ -34,6 +34,6 @@ public abstract class XPipeApiConnector extends XPipeConnector {
@FunctionalInterface
public static interface Handler {
void handle(SocketClient sc) throws ClientException, ServerException;
void handle(BeaconClient sc) throws ClientException, ServerException;
}
}

View file

@ -5,7 +5,7 @@ import io.xpipe.api.XPipeApiConnector;
import io.xpipe.beacon.ClientException;
import io.xpipe.beacon.ConnectorException;
import io.xpipe.beacon.ServerException;
import io.xpipe.beacon.socket.SocketClient;
import io.xpipe.beacon.BeaconClient;
import io.xpipe.beacon.exchange.ReadTableDataExchange;
import io.xpipe.beacon.exchange.ReadTableInfoExchange;
import io.xpipe.core.data.DataStructureNode;
@ -32,7 +32,7 @@ public class DataTableImpl implements DataTable {
var ds = DataSourceId.fromString(s);
new XPipeApiConnector() {
@Override
protected void handle(SocketClient sc) throws ClientException, ServerException, ConnectorException {
protected void handle(BeaconClient sc) throws ClientException, ServerException, ConnectorException {
var req = new ReadTableInfoExchange.Request(ds);
ReadTableInfoExchange.Response res = performSimpleExchange(sc, req);
table[0] = new DataTableImpl(res.sourceId(), res.rowCount(), res.dataType());
@ -92,7 +92,7 @@ public class DataTableImpl implements DataTable {
List<DataStructureNode> nodes = new ArrayList<>();
new XPipeApiConnector() {
@Override
protected void handle(SocketClient sc) throws ClientException, ServerException, ConnectorException {
protected void handle(BeaconClient sc) throws ClientException, ServerException, ConnectorException {
var req = new ReadTableDataExchange.Request(id, maxToRead);
performExchange(sc, req, (ReadTableDataExchange.Response res, InputStream in) -> {
TypedDataStreamReader.readStructures(in, new TypedDataStructureNodeCallback(dataType, nodes::add));
@ -115,7 +115,7 @@ public class DataTableImpl implements DataTable {
{
new XPipeApiConnector() {
@Override
protected void handle(SocketClient sc) throws ClientException, ServerException, ConnectorException {
protected void handle(BeaconClient sc) throws ClientException, ServerException, ConnectorException {
var req = new ReadTableDataExchange.Request(id, Integer.MAX_VALUE);
performExchange(sc, req, (ReadTableDataExchange.Response res, InputStream in) -> {
input = in;

View file

@ -1,6 +1,6 @@
package io.xpipe.api.test;
import io.xpipe.beacon.XPipeDaemon;
import io.xpipe.beacon.BeaconServer;
import org.junit.jupiter.api.extension.BeforeAllCallback;
import org.junit.jupiter.api.extension.ExtensionContext;
@ -17,7 +17,7 @@ public class XPipeConfig implements BeforeAllCallback, ExtensionContext.Store.Cl
// Your "before all tests" startup logic goes here
// The following line registers a callback hook when the root test context is shut down
context.getRoot().getStore(GLOBAL).put("any unique name", this);
XPipeDaemon.startDaemon();
BeaconServer.start();
}
}

View file

@ -1,4 +1,4 @@
package io.xpipe.beacon.socket;
package io.xpipe.beacon;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonParser;
@ -6,7 +6,6 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.node.TextNode;
import io.xpipe.beacon.*;
import io.xpipe.beacon.exchange.MessageExchanges;
import io.xpipe.beacon.message.ClientErrorMessage;
import io.xpipe.beacon.message.RequestMessage;
@ -26,18 +25,18 @@ import java.util.Arrays;
import java.util.Optional;
import java.util.function.Consumer;
import static io.xpipe.beacon.socket.Sockets.BODY_SEPARATOR;
import static io.xpipe.beacon.BeaconConfig.BODY_SEPARATOR;
public class SocketClient {
public class BeaconClient {
private static final Logger logger = LoggerFactory.getLogger(SocketClient.class);
private static final Logger logger = LoggerFactory.getLogger(BeaconClient.class);
private final Socket socket;
private final InputStream in;
private final OutputStream out;
public SocketClient() throws IOException {
socket = new Socket(InetAddress.getLoopbackAddress(), SocketServer.determineUsedPort());
public BeaconClient() throws IOException {
socket = new Socket(InetAddress.getLoopbackAddress(), BeaconConfig.getUsedPort());
in = socket.getInputStream();
out = socket.getOutputStream();
}
@ -120,7 +119,7 @@ public class SocketClient {
throw new ConnectorException("Couldn't read from socket", ex);
}
if (Sockets.debugEnabled()) {
if (BeaconConfig.debugEnabled()) {
System.out.println("Recieved response:");
System.out.println(read.toPrettyString());
}

View file

@ -0,0 +1,52 @@
package io.xpipe.beacon;
import java.nio.charset.StandardCharsets;
public class BeaconConfig {
public static final byte[] BODY_SEPARATOR = "\n\n".getBytes(StandardCharsets.UTF_8);
private static final String DEBUG_PROP = "io.xpipe.beacon.debugOutput";
public static boolean debugEnabled() {
if (System.getProperty(DEBUG_PROP) != null) {
return Boolean.parseBoolean(System.getProperty(DEBUG_PROP));
}
return false;
}
private static final String BEACON_PORT_PROP = "io.xpipe.beacon.port";
private static final int DEFAULT_PORT = 21721;
public static int getUsedPort() {
if (System.getProperty(BEACON_PORT_PROP) != null) {
return Integer.parseInt(System.getProperty(BEACON_PORT_PROP));
}
return DEFAULT_PORT;
}
private static final String IN_PROCESS_PROP = "io.xpipe.beacon.startInProcess";
public static boolean shouldStartInProcess() {
if (System.getProperty(IN_PROCESS_PROP) != null) {
return Boolean.parseBoolean(System.getProperty(IN_PROCESS_PROP));
}
return false;
}
private static final String EXEC_PROCESS_PROP = "io.xpipe.beacon.exec";
public static String getCustomExecCommand() {
if (System.getProperty(EXEC_PROCESS_PROP) != null) {
return System.getProperty(EXEC_PROCESS_PROP);
}
return null;
}
}

View file

@ -2,7 +2,6 @@ package io.xpipe.beacon;
import io.xpipe.beacon.message.RequestMessage;
import io.xpipe.beacon.message.ResponseMessage;
import io.xpipe.beacon.socket.SocketClient;
import org.apache.commons.lang3.function.FailableBiConsumer;
import java.io.IOException;
@ -10,16 +9,16 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.util.function.Consumer;
public abstract class XPipeConnector {
public abstract class BeaconConnector {
protected abstract void waitForStartup();
protected SocketClient constructSocket() throws ConnectorException {
if (!XPipeDaemon.isDaemonRunning()) {
protected BeaconClient constructSocket() throws ConnectorException {
if (!BeaconServer.isRunning()) {
try {
XPipeDaemon.startDaemon();
BeaconServer.start();
waitForStartup();
if (!XPipeDaemon.isDaemonRunning()) {
if (!BeaconServer.isRunning()) {
throw new ConnectorException("Unable to start xpipe daemon");
}
} catch (Exception ex) {
@ -28,14 +27,14 @@ public abstract class XPipeConnector {
}
try {
return new SocketClient();
return new BeaconClient();
} catch (Exception ex) {
throw new ConnectorException("Unable to connect to running xpipe daemon: " + ex.getMessage());
}
}
protected <REQ extends RequestMessage, RES extends ResponseMessage> void performExchange(
SocketClient socket,
BeaconClient socket,
REQ req,
FailableBiConsumer<RES, InputStream, IOException> responseConsumer,
boolean keepOpen) throws ServerException, ConnectorException, ClientException {
@ -43,7 +42,7 @@ public abstract class XPipeConnector {
}
protected <REQ extends RequestMessage, RES extends ResponseMessage> void performExchange(
SocketClient socket,
BeaconClient socket,
REQ req,
Consumer<OutputStream> output,
FailableBiConsumer<RES, InputStream, IOException> responseConsumer,
@ -52,7 +51,7 @@ public abstract class XPipeConnector {
}
protected <REQ extends RequestMessage, RES extends ResponseMessage> RES performSimpleExchange(
SocketClient socket,
BeaconClient socket,
REQ req) throws ServerException, ConnectorException, ClientException {
return socket.simpleExchange(req);
}

View file

@ -0,0 +1,22 @@
package io.xpipe.beacon;
import io.xpipe.beacon.message.ResponseMessage;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
public interface BeaconHandler {
void prepareBody() throws IOException;
public <T extends ResponseMessage> void sendResponse(T obj) throws Exception;
public void sendClientErrorResponse(String message) throws Exception;
public void sendServerErrorResponse(Throwable ex) throws Exception;
InputStream getInputStream() throws Exception;
OutputStream getOutputStream() throws Exception;
}

View file

@ -0,0 +1,49 @@
package io.xpipe.beacon;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.net.DatagramSocket;
import java.net.ServerSocket;
public class BeaconServer {
private static boolean isPortAvailable(int port) {
try (var ss = new ServerSocket(port); var ds = new DatagramSocket(port)) {
return true;
} catch (IOException e) {
return false;
}
}
public static boolean isRunning() {
var port = BeaconConfig.getUsedPort();
return !isPortAvailable(port);
}
public static void start() throws Exception {
if (BeaconConfig.shouldStartInProcess()) {
startInProcess();
return;
}
var custom = BeaconConfig.getCustomExecCommand();
if (custom != null) {
Runtime.getRuntime().exec(System.getenv(custom));
return;
}
throw new IllegalArgumentException("Unable to start xpipe daemon");
}
private static void startInProcess() throws Exception {
var mainClass = Class.forName("io.xpipe.app.Main");
var method = mainClass.getDeclaredMethod("main", String[].class);
new Thread(() -> {
try {
method.invoke(null, (Object) new String[0]);
} catch (IllegalAccessException | InvocationTargetException e) {
e.printStackTrace();
}
}).start();
}
}

View file

@ -1,65 +0,0 @@
package io.xpipe.beacon;
import io.xpipe.beacon.socket.SocketServer;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.net.DatagramSocket;
import java.net.ServerSocket;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Optional;
public class XPipeDaemon {
private static final String IN_PROCESS_PROP = "io.xpipe.beacon.startInProcess";
public static Path getUserDir() {
return Path.of(System.getProperty("user.home"), ".xpipe");
}
private static boolean isPortAvailable(int port) {
try (var ss = new ServerSocket(port); var ds = new DatagramSocket(port)) {
return true;
} catch (IOException e) {
return false;
}
}
public static boolean isDaemonRunning() {
var port = SocketServer.determineUsedPort();
return !isPortAvailable(port);
}
public static void startDaemon() throws Exception {
if (Optional.ofNullable(System.getProperty("io.xpipe.beacon.startInProcess"))
.map(Boolean::parseBoolean).orElse(false)) {
startInProcess();
return;
}
// if (System.getenv().containsKey(EXEC_PROPERTY)) {
// Runtime.getRuntime().exec(System.getenv(EXEC_PROPERTY));
// return;
// }
var file = getUserDir().resolve("run");
if (Files.exists(file)) {
Runtime.getRuntime().exec(Files.readString(file));
}
throw new IllegalArgumentException("Unable to find xpipe daemon installation");
}
private static void startInProcess() throws Exception {
var mainClass = Class.forName("io.xpipe.app.Main");
var method = mainClass.getDeclaredMethod("main", String[].class);
new Thread(() -> {
try {
method.invoke(null, (Object) new String[0]);
} catch (IllegalAccessException | InvocationTargetException e) {
e.printStackTrace();
}
}).start();
}
}

View file

@ -27,7 +27,7 @@ public abstract class ListCollectionsExchange implements MessageExchange<ListCol
}
public static record Entry(String name, int count, Instant lastUsed) {
public static record Entry(String name, int size, Instant lastUsed) {
}

View file

@ -27,7 +27,7 @@ public abstract class ListEntriesExchange implements MessageExchange<ListEntries
}
public static record Entry(String name, String type, String description, Instant lastUsed, int size) {
public static record Entry(String name, String type, String description, Instant lastUsed) {
}

View file

@ -2,10 +2,9 @@ package io.xpipe.beacon.exchange;
import io.xpipe.beacon.message.RequestMessage;
import io.xpipe.beacon.message.ResponseMessage;
import io.xpipe.beacon.socket.SocketServer;
import io.xpipe.beacon.BeaconHandler;
import java.io.InputStream;
import java.net.Socket;
public interface MessageExchange<RQ extends RequestMessage, RP extends ResponseMessage> {
@ -15,5 +14,5 @@ public interface MessageExchange<RQ extends RequestMessage, RP extends ResponseM
Class<RP> getResponseClass();
void handleRequest(SocketServer server, RQ msg, InputStream body, Socket clientSocket) throws Exception;
void handleRequest(BeaconHandler handler, RQ msg, InputStream body) throws Exception;
}

View file

@ -8,7 +8,7 @@ public abstract class ReadTableDataExchange implements MessageExchange<ReadTable
@Override
public String getId() {
return "readTable";
return "readTableData";
}
@Override
@ -21,7 +21,7 @@ public abstract class ReadTableDataExchange implements MessageExchange<ReadTable
return ReadTableDataExchange.Response.class;
}
public static record Request(DataSourceId sourceId, int startow, int maxRows) implements RequestMessage {
public static record Request(DataSourceId sourceId, int maxRows) implements RequestMessage {
}

View file

@ -20,11 +20,11 @@ public abstract class StopExchange implements MessageExchange<StopExchange.Reque
return StopExchange.Response.class;
}
public static record Request(int timeout, boolean force) implements RequestMessage {
public static record Request(boolean force) implements RequestMessage {
}
public static record Response() implements ResponseMessage {
public static record Response(boolean success) implements ResponseMessage {
}
}

View file

@ -1,190 +0,0 @@
package io.xpipe.beacon.socket;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.node.TextNode;
import io.xpipe.beacon.exchange.MessageExchanges;
import io.xpipe.beacon.message.ClientErrorMessage;
import io.xpipe.beacon.message.RequestMessage;
import io.xpipe.beacon.message.ResponseMessage;
import io.xpipe.beacon.message.ServerErrorMessage;
import io.xpipe.core.util.JacksonHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.UUID;
public class SocketServer {
private static final String BEACON_PORT_PROP = "io.xpipe.beacon.port";
private static final Logger logger = LoggerFactory.getLogger(SocketServer.class);
private static final int DEFAULT_PORT = 21721;
private static SocketServer INSTANCE;
private final int port;
private ServerSocket socket;
private boolean running;
private int connectionCounter;
private SocketServer(int port) {
this.port = port;
}
public static Path getUserDir() {
return Path.of(System.getProperty("user.home"), ".xpipe");
}
public static int determineUsedPort() {
if (System.getProperty(BEACON_PORT_PROP) != null) {
return Integer.parseInt(System.getProperty(BEACON_PORT_PROP));
}
var file = getUserDir().resolve("port");
if (Files.exists(file)) {
try {
return Integer.parseInt(Files.readString(file));
} catch (IOException ex) {
ex.printStackTrace();
}
}
return DEFAULT_PORT;
}
public static void init() throws IOException {
var port = determineUsedPort();
INSTANCE = new SocketServer(port);
INSTANCE.createSocket();
}
public static void reset() {
INSTANCE.stop();
INSTANCE = null;
}
private void stop() {
}
private void createSocket() throws IOException {
socket = new ServerSocket(port, 1000, InetAddress.getLoopbackAddress());
running = true;
var t = new Thread(() -> {
while (running) {
try {
var clientSocket = socket.accept();
handleClientConnection(clientSocket);
} catch (Exception ex) {
ex.printStackTrace();
}
connectionCounter++;
}
}, "socket server");
t.setDaemon(true);
t.start();
}
private void handleClientConnection(Socket clientSocket) {
var t = new Thread(() -> {
try {
var in = clientSocket.getInputStream();
var read = JacksonHelper.newMapper().disable(JsonParser.Feature.AUTO_CLOSE_SOURCE).readTree(in);
logger.debug("Received request: \n" + read.toPrettyString());
var req = parseRequest(read);
var prov = MessageExchanges.byRequest(req).get();
prov.handleRequest(this, req, in, clientSocket);
} catch (SocketException ex) {
try {
ex.printStackTrace();
} catch (Exception ioex) {
ioex.printStackTrace();
}
} catch (Exception ex) {
try {
ex.printStackTrace();
sendServerErrorResponse(clientSocket, ex);
} catch (Exception ioex) {
ioex.printStackTrace();
}
} finally {
try {
clientSocket.close();
} catch (Exception ioex) {
ioex.printStackTrace();
}
}
}, "socket connection #" + connectionCounter);
t.setDaemon(true);
t.start();
}
public void prepareBody(Socket outSocket) throws IOException {
outSocket.getOutputStream().write(Sockets.BODY_SEPARATOR);
}
public <T extends ResponseMessage> void sendResponse(Socket outSocket, T obj) throws Exception {
ObjectNode json = JacksonHelper.newMapper().valueToTree(obj);
var prov = MessageExchanges.byResponse(obj).get();
json.set("type", new TextNode(prov.getId()));
json.set("phase", new TextNode("response"));
var msg = JsonNodeFactory.instance.objectNode();
msg.set("xPipeMessage", json);
var mapper = JacksonHelper.newMapper().disable(JsonGenerator.Feature.AUTO_CLOSE_TARGET);
var gen = mapper.createGenerator(outSocket.getOutputStream());
gen.writeTree(msg);
}
public void sendClientErrorResponse(Socket outSocket, String message) throws Exception {
var err = new ClientErrorMessage(message);
ObjectNode json = JacksonHelper.newMapper().valueToTree(err);
var msg = JsonNodeFactory.instance.objectNode();
msg.set("xPipeClientError", json);
var mapper = JacksonHelper.newMapper().disable(JsonGenerator.Feature.AUTO_CLOSE_TARGET);
var gen = mapper.createGenerator(outSocket.getOutputStream());
gen.writeTree(msg);
}
public void sendServerErrorResponse(Socket outSocket, Throwable ex) throws Exception {
var err = new ServerErrorMessage(UUID.randomUUID(), ex);
ObjectNode json = JacksonHelper.newMapper().valueToTree(err);
var msg = JsonNodeFactory.instance.objectNode();
msg.set("xPipeServerError", json);
var mapper = JacksonHelper.newMapper().disable(JsonGenerator.Feature.AUTO_CLOSE_TARGET);
var gen = mapper.createGenerator(outSocket.getOutputStream());
gen.writeTree(msg);
}
private <T extends RequestMessage> T parseRequest(JsonNode header) throws Exception {
ObjectNode content = (ObjectNode) header.required("xPipeMessage");
var type = content.required("type").textValue();
var phase = content.required("phase").textValue();
if (!phase.equals("request")) {
throw new IllegalArgumentException();
}
content.remove("type");
content.remove("phase");
var prov = MessageExchanges.byId(type);
if (prov.isEmpty()) {
throw new IllegalArgumentException();
}
var reader = JacksonHelper.newMapper().readerFor(prov.get().getRequestClass());
return reader.readValue(content);
}
}

View file

@ -1,16 +0,0 @@
package io.xpipe.beacon.socket;
import java.nio.charset.StandardCharsets;
public class Sockets {
public static final byte[] BODY_SEPARATOR = "\n\n".getBytes(StandardCharsets.UTF_8);
private static final String DEBUG_PROP = "io.xpipe.beacon.debugOutput";
public static boolean debugEnabled() {
if (System.getProperty(DEBUG_PROP) != null) {
return Boolean.parseBoolean(System.getProperty(DEBUG_PROP));
}
return false;
}
}

View file

@ -14,8 +14,6 @@ module io.xpipe.beacon {
opens io.xpipe.beacon;
opens io.xpipe.beacon.exchange;
exports io.xpipe.beacon.socket;
opens io.xpipe.beacon.socket;
opens io.xpipe.beacon.message;
requires org.apache.commons.lang;

View file

@ -6,34 +6,51 @@ public class DataSourceId {
public static final char SEPARATOR = ':';
public static DataSourceId create(String collectionName, String entryName) {
if (collectionName == null) {
throw new IllegalArgumentException("Collection name is null");
}
if (collectionName.contains("" + SEPARATOR)) {
throw new IllegalArgumentException("Separator character " + SEPARATOR + " is not allowed in the collection name");
}
if (entryName == null) {
throw new IllegalArgumentException("Collection name is null");
}
if (entryName.contains("" + SEPARATOR)) {
throw new IllegalArgumentException("Separator character " + SEPARATOR + " is not allowed in the entry name");
}
return new DataSourceId(collectionName, entryName);
}
private final String collectionName;
private final String entryName;
@JsonCreator
public DataSourceId(String collectionName, String entryName) {
private DataSourceId(String collectionName, String entryName) {
this.collectionName = collectionName;
this.entryName = entryName;
}
public DataSourceId withEntryName(String newName) {
return new DataSourceId(collectionName, newName);
}
public static DataSourceId fromString(String s) {
if (s == null) {
throw new IllegalArgumentException("String is null");
}
var split = s.split(String.valueOf(SEPARATOR));
if (split.length != 2) {
throw new IllegalArgumentException();
throw new IllegalArgumentException("Data source id must contain exactly one " + SEPARATOR);
}
if (split[0].length() == 0) {
throw new IllegalArgumentException("Collection name must not be empty");
}
if (split[1].length() == 0) {
throw new IllegalArgumentException();
throw new IllegalArgumentException("Entry name must not be empty");
}
return new DataSourceId(split[0].length() > 0 ? split[0] : null, split[1]);
}
public boolean hasCollection() {
return collectionName != null;
return new DataSourceId(split[0], split[1]);
}
@Override