commit 63cdfb40c5d591aa016fcaf784551f138fe42166 Author: Christopher Schnick Date: Wed Dec 1 19:17:54 2021 +0100 Initial commit diff --git a/.gitattributes b/.gitattributes new file mode 100644 index 00000000..b9da2ed5 --- /dev/null +++ b/.gitattributes @@ -0,0 +1,3 @@ +* text=auto +*.png binary +*.xcf binary \ No newline at end of file diff --git a/.gitignore b/.gitignore new file mode 100644 index 00000000..b1ac10fa --- /dev/null +++ b/.gitignore @@ -0,0 +1,5 @@ +.gradle/ +build/ +.idea +dev.properties +extensions.txt \ No newline at end of file diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 00000000..f0603baa --- /dev/null +++ b/.gitmodules @@ -0,0 +1,3 @@ +[submodule "deps"] + path = deps + url = https://github.com/xpipe-io/xpipe_java_deps diff --git a/api/build.gradle b/api/build.gradle new file mode 100644 index 00000000..2e5412d7 --- /dev/null +++ b/api/build.gradle @@ -0,0 +1,44 @@ +plugins { + id 'java' + id "org.moditect.gradleplugin" version "1.0.0-rc3" +} + +java { + modularity.inferModulePath = true + sourceCompatibility = JavaVersion.VERSION_17 + targetCompatibility = JavaVersion.VERSION_17 +} + +repositories { + mavenCentral() +} + + +apply from: "$rootDir/deps/commons.gradle" +apply from: "$rootDir/deps/jackson.gradle" + +dependencies { + implementation project(':core') + implementation project(':beacon') + + testImplementation 'org.junit.jupiter:junit-jupiter-api:5.7.0' + //testImplementation project(':app') + testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.7.0' +} + +plugins.withType(JavaPlugin).configureEach { + java { + modularity.inferModulePath = true + } +} + +test { + useJUnitPlatform() + + //workingDir = project(":app").projectDir + + systemProperty 'io.xpipe.beacon.startInProcess', 'true' + systemProperty "io.xpipe.daemon.mode", 'base' + systemProperty "io.xpipe.storage.dir", "$projectDir/test_env" + systemProperty "io.xpipe.beacon.port", "21722" +} diff --git a/api/src/main/java/io/xpipe/api/DataTable.java b/api/src/main/java/io/xpipe/api/DataTable.java new file mode 100644 index 00000000..ec4cd8cb --- /dev/null +++ b/api/src/main/java/io/xpipe/api/DataTable.java @@ -0,0 +1,28 @@ +package io.xpipe.api; + +import io.xpipe.api.impl.DataTableImpl; +import io.xpipe.core.data.generic.ArrayNode; +import io.xpipe.core.data.generic.TupleNode; +import io.xpipe.core.data.type.DataType; +import io.xpipe.core.source.DataSourceId; + +import java.util.OptionalInt; + +public interface DataTable extends Iterable { + + static DataTable get(String s) { + return DataTableImpl.get(s); + } + + DataSourceId getId(); + + int getRowCount(); + + OptionalInt getRowCountIfPresent(); + + DataType getDataType(); + + ArrayNode readAll(); + + ArrayNode read(int maxRows); +} diff --git a/api/src/main/java/io/xpipe/api/IntConverter.java b/api/src/main/java/io/xpipe/api/IntConverter.java new file mode 100644 index 00000000..61babeac --- /dev/null +++ b/api/src/main/java/io/xpipe/api/IntConverter.java @@ -0,0 +1,21 @@ +package io.xpipe.api; + +import java.util.function.IntConsumer; + +public class IntConverter { + + private IntConsumer consumer; + + public IntConverter(IntConsumer consumer) { + this.consumer = consumer; + } + + public void onValue(byte[] value) { + if (value.length > 4) { + throw new IllegalArgumentException("Unable to fit " + value.length + " bytes into an integer"); + } + + int v = value[0] << 24 | (value[1] & 0xFF) << 16 | (value[2] & 0xFF) << 8 | (value[3] & 0xFF); + consumer.accept(v); + } +} diff --git a/api/src/main/java/io/xpipe/api/XPipeApiConnector.java b/api/src/main/java/io/xpipe/api/XPipeApiConnector.java new file mode 100644 index 00000000..db0cdfef --- /dev/null +++ b/api/src/main/java/io/xpipe/api/XPipeApiConnector.java @@ -0,0 +1,39 @@ +package io.xpipe.api; + +import io.xpipe.beacon.*; +import io.xpipe.beacon.socket.SocketClient; + +public abstract class XPipeApiConnector extends XPipeConnector { + + public void execute() { + try { + var socket = constructSocket(); + handle(socket); + } catch (ConnectorException ce) { + throw new XPipeException("Connection error: " + ce.getMessage()); + } catch (ClientException ce) { + throw new XPipeException("Client error: " + ce.getMessage()); + } catch (ServerException se) { + throw new XPipeException("Server error: " + se.getMessage()); + } catch (Throwable t) { + throw new XPipeException("Unexpected error", t); + } + } + + protected abstract void handle(SocketClient sc) throws Exception; + + @Override + protected void waitForStartup() { + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + @FunctionalInterface + public static interface Handler { + + void handle(SocketClient sc) throws ClientException, ServerException; + } +} diff --git a/api/src/main/java/io/xpipe/api/XPipeDataStructureSource.java b/api/src/main/java/io/xpipe/api/XPipeDataStructureSource.java new file mode 100644 index 00000000..95a57b30 --- /dev/null +++ b/api/src/main/java/io/xpipe/api/XPipeDataStructureSource.java @@ -0,0 +1,8 @@ +package io.xpipe.api; + +import io.xpipe.core.data.DataStructureNode; + +public interface XPipeDataStructureSource { + + DataStructureNode read(); +} diff --git a/api/src/main/java/io/xpipe/api/XPipeDataTableBuilder.java b/api/src/main/java/io/xpipe/api/XPipeDataTableBuilder.java new file mode 100644 index 00000000..8f99cb96 --- /dev/null +++ b/api/src/main/java/io/xpipe/api/XPipeDataTableBuilder.java @@ -0,0 +1,12 @@ +package io.xpipe.api; + +import io.xpipe.core.source.DataSourceId; + +public abstract class XPipeDataTableBuilder { + + private DataSourceId id; + + public abstract void write(); + + public abstract void commit(); +} diff --git a/api/src/main/java/io/xpipe/api/XPipeException.java b/api/src/main/java/io/xpipe/api/XPipeException.java new file mode 100644 index 00000000..d5b62beb --- /dev/null +++ b/api/src/main/java/io/xpipe/api/XPipeException.java @@ -0,0 +1,23 @@ +package io.xpipe.api; + +public class XPipeException extends RuntimeException { + + public XPipeException() { + } + + public XPipeException(String message) { + super(message); + } + + public XPipeException(String message, Throwable cause) { + super(message, cause); + } + + public XPipeException(Throwable cause) { + super(cause); + } + + public XPipeException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + } +} diff --git a/api/src/main/java/io/xpipe/api/impl/DataTableImpl.java b/api/src/main/java/io/xpipe/api/impl/DataTableImpl.java new file mode 100644 index 00000000..c3c82bfd --- /dev/null +++ b/api/src/main/java/io/xpipe/api/impl/DataTableImpl.java @@ -0,0 +1,164 @@ +package io.xpipe.api.impl; + +import io.xpipe.api.DataTable; +import io.xpipe.api.XPipeApiConnector; +import io.xpipe.beacon.ClientException; +import io.xpipe.beacon.ConnectorException; +import io.xpipe.beacon.ServerException; +import io.xpipe.beacon.socket.SocketClient; +import io.xpipe.beacon.message.impl.ReadTableDataExchange; +import io.xpipe.beacon.message.impl.ReadTableInfoExchange; +import io.xpipe.core.data.DataStructureNode; +import io.xpipe.core.data.generic.ArrayNode; +import io.xpipe.core.data.generic.TupleNode; +import io.xpipe.core.data.type.DataType; +import io.xpipe.core.data.type.TypedDataStreamReader; +import io.xpipe.core.data.type.callback.TypedDataStreamCallback; +import io.xpipe.core.data.type.callback.TypedDataStructureNodeCallback; +import io.xpipe.core.source.DataSourceId; + +import java.io.IOException; +import java.io.InputStream; +import java.io.UncheckedIOException; +import java.util.*; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +public class DataTableImpl implements DataTable { + + public static DataTable get(String s) { + final DataTable[] table = {null}; + + var ds = DataSourceId.fromString(s); + new XPipeApiConnector() { + @Override + protected void handle(SocketClient sc) throws ClientException, ServerException, ConnectorException { + var req = new ReadTableInfoExchange.Request(ds); + ReadTableInfoExchange.Response res = performSimpleExchange(sc, req); + table[0] = new DataTableImpl(res.sourceId(), res.rowCount(), res.dataType()); + } + }.execute(); + return table[0]; + } + + private final DataSourceId id; + private final int size; + private final DataType dataType; + + public DataTableImpl(DataSourceId id, int size, DataType dataType) { + this.id = id; + this.size = size; + this.dataType = dataType; + } + + public Stream stream() { + return StreamSupport.stream( + Spliterators.spliteratorUnknownSize(iterator(), Spliterator.ORDERED), false); + } + + @Override + public DataSourceId getId() { + return id; + } + + @Override + public int getRowCount() { + if (size == -1) { + throw new UnsupportedOperationException("Row count is unknown"); + } + + return size; + } + + @Override + public OptionalInt getRowCountIfPresent() { + return size != -1 ? OptionalInt.of(size) : OptionalInt.empty(); + } + + @Override + public DataType getDataType() { + return dataType; + } + + @Override + public ArrayNode readAll() { + return read(Integer.MAX_VALUE); + } + + @Override + public ArrayNode read(int maxRows) { + int maxToRead = size == -1 ? maxRows : Math.min(size, maxRows); + + List nodes = new ArrayList<>(); + new XPipeApiConnector() { + @Override + protected void handle(SocketClient sc) throws ClientException, ServerException, ConnectorException { + var req = new ReadTableDataExchange.Request(id, maxToRead); + performExchange(sc, req, (ReadTableDataExchange.Response res, InputStream in) -> { + TypedDataStreamReader.readStructures(in, new TypedDataStructureNodeCallback(dataType, nodes::add)); + }, false); + } + }.execute(); + return ArrayNode.wrap(nodes); + } + + @Override + public Iterator iterator() { + return new Iterator() { + + private InputStream input; + private int read; + private final int toRead = size; + private TypedDataStreamCallback callback; + private TupleNode current; + + { + new XPipeApiConnector() { + @Override + protected void handle(SocketClient sc) throws ClientException, ServerException, ConnectorException { + var req = new ReadTableDataExchange.Request(id, Integer.MAX_VALUE); + performExchange(sc, req, (ReadTableDataExchange.Response res, InputStream in) -> { + input = in; + }, false); + } + }.execute(); + + callback = new TypedDataStructureNodeCallback(dataType, dsn -> { + current = (TupleNode) dsn; + }); + } + + private boolean hasKnownSize() { + return size != -1; + } + + @Override + public boolean hasNext() { + if (hasKnownSize() && read == toRead) { + return false; + } + + if (hasKnownSize() && read < toRead) { + return true; + } + + try { + return TypedDataStreamReader.hasNext(input); + } catch (IOException ex) { + throw new UncheckedIOException(ex); + } + } + + @Override + public TupleNode next() { + try { + TypedDataStreamReader.readStructure(input, callback); + } catch (IOException ex) { + throw new UncheckedIOException(ex); + } + read++; + return current; + } + }; + } +} diff --git a/api/src/main/java/module-info.java b/api/src/main/java/module-info.java new file mode 100644 index 00000000..b9013427 --- /dev/null +++ b/api/src/main/java/module-info.java @@ -0,0 +1,7 @@ +module io.xpipe.api { + requires io.xpipe.core; + requires io.xpipe.beacon; + requires org.apache.commons.lang; + + exports io.xpipe.api; +} \ No newline at end of file diff --git a/api/src/test/java/io/xpipe/api/test/DataTableTest.java b/api/src/test/java/io/xpipe/api/test/DataTableTest.java new file mode 100644 index 00000000..6f362d2c --- /dev/null +++ b/api/src/test/java/io/xpipe/api/test/DataTableTest.java @@ -0,0 +1,16 @@ +package io.xpipe.api.test; + +import io.xpipe.api.DataTable; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +@ExtendWith({XPipeConfig.class}) +public class DataTableTest { + + @Test + public void testGet() { + var table = DataTable.get("new folder:username"); + var r = table.read(2); + var a = 0; + } +} diff --git a/api/src/test/java/io/xpipe/api/test/XPipeConfig.java b/api/src/test/java/io/xpipe/api/test/XPipeConfig.java new file mode 100644 index 00000000..0d4cd005 --- /dev/null +++ b/api/src/test/java/io/xpipe/api/test/XPipeConfig.java @@ -0,0 +1,28 @@ +package io.xpipe.api.test; + +import io.xpipe.beacon.XPipeDaemon; +import org.junit.jupiter.api.extension.BeforeAllCallback; +import org.junit.jupiter.api.extension.ExtensionContext; + +import static org.junit.jupiter.api.extension.ExtensionContext.Namespace.GLOBAL; + +public class XPipeConfig implements BeforeAllCallback, ExtensionContext.Store.CloseableResource { + + private static boolean started = false; + + @Override + public void beforeAll(ExtensionContext context) throws Exception { + if (!started) { + started = true; + // Your "before all tests" startup logic goes here + // The following line registers a callback hook when the root test context is shut down + context.getRoot().getStore(GLOBAL).put("any unique name", this); + XPipeDaemon.startDaemon(); + } + } + + @Override + public void close() { + // Your "after all tests" logic goes here + } +} diff --git a/api/src/test/java/module-info.java b/api/src/test/java/module-info.java new file mode 100644 index 00000000..401865d7 --- /dev/null +++ b/api/src/test/java/module-info.java @@ -0,0 +1,8 @@ +module io.xpipe.api.test { + exports io.xpipe.api.test; + + requires io.xpipe.api; + requires io.xpipe.beacon; + requires io.xpipe.app; + requires org.junit.jupiter.api; +} \ No newline at end of file diff --git a/api/start_test_daemon.bat b/api/start_test_daemon.bat new file mode 100644 index 00000000..b8286c03 --- /dev/null +++ b/api/start_test_daemon.bat @@ -0,0 +1,4 @@ +cd ..\app\ +SET "dir=%~dp0test_env" +CALL ..\gradlew.bat run -Dio.xpipe.storage.dir=%dir% -Dio.xpipe.beacon.port=21722 -Dio.xpipe.daemon.mode=gui +pause \ No newline at end of file diff --git a/beacon/build.gradle b/beacon/build.gradle new file mode 100644 index 00000000..2b9f94a1 --- /dev/null +++ b/beacon/build.gradle @@ -0,0 +1,27 @@ +plugins { + id 'java' + id "org.moditect.gradleplugin" version "1.0.0-rc3" +} + +java { + modularity.inferModulePath = true + sourceCompatibility = JavaVersion.VERSION_17 + targetCompatibility = JavaVersion.VERSION_17 +} + +repositories { + mavenCentral() +} + +apply from: "$rootDir/deps/slf4j.gradle" +apply from: "$rootDir/deps/websocket.gradle" +apply from: "$rootDir/deps/jackson.gradle" +apply from: "$rootDir/deps/commons.gradle" + +dependencies { + implementation project(':core') +} + +test { + useJUnitPlatform() +} \ No newline at end of file diff --git a/beacon/src/main/java/io/xpipe/beacon/ClientException.java b/beacon/src/main/java/io/xpipe/beacon/ClientException.java new file mode 100644 index 00000000..fd36dd46 --- /dev/null +++ b/beacon/src/main/java/io/xpipe/beacon/ClientException.java @@ -0,0 +1,8 @@ +package io.xpipe.beacon; + +public class ClientException extends Exception { + + public ClientException(String message) { + super(message); + } +} diff --git a/beacon/src/main/java/io/xpipe/beacon/ConnectorException.java b/beacon/src/main/java/io/xpipe/beacon/ConnectorException.java new file mode 100644 index 00000000..397a0d1a --- /dev/null +++ b/beacon/src/main/java/io/xpipe/beacon/ConnectorException.java @@ -0,0 +1,23 @@ +package io.xpipe.beacon; + +public class ConnectorException extends Exception { + + public ConnectorException() { + } + + public ConnectorException(String message) { + super(message); + } + + public ConnectorException(String message, Throwable cause) { + super(message, cause); + } + + public ConnectorException(Throwable cause) { + super(cause); + } + + public ConnectorException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + } +} diff --git a/beacon/src/main/java/io/xpipe/beacon/ServerException.java b/beacon/src/main/java/io/xpipe/beacon/ServerException.java new file mode 100644 index 00000000..6ebd9fdc --- /dev/null +++ b/beacon/src/main/java/io/xpipe/beacon/ServerException.java @@ -0,0 +1,8 @@ +package io.xpipe.beacon; + +public class ServerException extends Exception { + + public ServerException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/beacon/src/main/java/io/xpipe/beacon/XPipeConnector.java b/beacon/src/main/java/io/xpipe/beacon/XPipeConnector.java new file mode 100644 index 00000000..923d6227 --- /dev/null +++ b/beacon/src/main/java/io/xpipe/beacon/XPipeConnector.java @@ -0,0 +1,59 @@ +package io.xpipe.beacon; + +import io.xpipe.beacon.message.RequestMessage; +import io.xpipe.beacon.message.ResponseMessage; +import io.xpipe.beacon.socket.SocketClient; +import org.apache.commons.lang3.function.FailableBiConsumer; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.function.Consumer; + +public abstract class XPipeConnector { + + protected abstract void waitForStartup(); + + protected SocketClient constructSocket() throws ConnectorException { + if (!XPipeDaemon.isDaemonRunning()) { + try { + XPipeDaemon.startDaemon(); + waitForStartup(); + if (!XPipeDaemon.isDaemonRunning()) { + throw new ConnectorException("Unable to start xpipe daemon"); + } + } catch (Exception ex) { + throw new ConnectorException("Unable to start xpipe daemon: " + ex.getMessage()); + } + } + + try { + return new SocketClient(); + } catch (Exception ex) { + throw new ConnectorException("Unable to connect to running xpipe daemon: " + ex.getMessage()); + } + } + + protected void performExchange( + SocketClient socket, + REQ req, + FailableBiConsumer responseConsumer, + boolean keepOpen) throws ServerException, ConnectorException, ClientException { + performExchange(socket, req, null, responseConsumer, keepOpen); + } + + protected void performExchange( + SocketClient socket, + REQ req, + Consumer output, + FailableBiConsumer responseConsumer, + boolean keepOpen) throws ServerException, ConnectorException, ClientException { + socket.exchange(req, output, responseConsumer, keepOpen); + } + + protected RES performSimpleExchange( + SocketClient socket, + REQ req) throws ServerException, ConnectorException, ClientException { + return socket.simpleExchange(req); + } +} diff --git a/beacon/src/main/java/io/xpipe/beacon/XPipeDaemon.java b/beacon/src/main/java/io/xpipe/beacon/XPipeDaemon.java new file mode 100644 index 00000000..fa42203d --- /dev/null +++ b/beacon/src/main/java/io/xpipe/beacon/XPipeDaemon.java @@ -0,0 +1,58 @@ +package io.xpipe.beacon; + +import io.xpipe.app.Main; +import io.xpipe.app.util.ThreadHelper; +import io.xpipe.beacon.socket.SocketServer; + +import java.io.IOException; +import java.net.DatagramSocket; +import java.net.ServerSocket; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Optional; + +public class XPipeDaemon { + + private static final String IN_PROCESS_PROP = "io.xpipe.beacon.startInProcess"; + + public static Path getUserDir() { + return Path.of(System.getProperty("user.home"), ".xpipe"); + } + + private static boolean isPortAvailable(int port) { + try (var ss = new ServerSocket(port); var ds = new DatagramSocket(port)) { + return true; + } catch (IOException e) { + return false; + } + } + + public static boolean isDaemonRunning() { + var port = SocketServer.determineUsedPort(); + return !isPortAvailable(port); + } + + public static void startDaemon() throws Exception { + if (Optional.ofNullable(System.getProperty("io.xpipe.beacon.startInProcess")) + .map(Boolean::parseBoolean).orElse(false)) { + startInProcess(); + return; + } + +// if (System.getenv().containsKey(EXEC_PROPERTY)) { +// Runtime.getRuntime().exec(System.getenv(EXEC_PROPERTY)); +// return; +// } + + var file = getUserDir().resolve("run"); + if (Files.exists(file)) { + Runtime.getRuntime().exec(Files.readString(file)); + } + + throw new IllegalArgumentException("Unable to find xpipe daemon installation"); + } + + private static void startInProcess() { + ThreadHelper.create("XPipe daemon", false, () -> Main.main(new String[0])).start(); + } +} diff --git a/beacon/src/main/java/io/xpipe/beacon/message/ClientErrorMessage.java b/beacon/src/main/java/io/xpipe/beacon/message/ClientErrorMessage.java new file mode 100644 index 00000000..e6e8578e --- /dev/null +++ b/beacon/src/main/java/io/xpipe/beacon/message/ClientErrorMessage.java @@ -0,0 +1,10 @@ +package io.xpipe.beacon.message; + +import io.xpipe.beacon.ClientException; + +public record ClientErrorMessage(String message) { + + public ClientException throwException() { + return new ClientException(message); + } +} diff --git a/beacon/src/main/java/io/xpipe/beacon/message/MessageProvider.java b/beacon/src/main/java/io/xpipe/beacon/message/MessageProvider.java new file mode 100644 index 00000000..26eb1fe4 --- /dev/null +++ b/beacon/src/main/java/io/xpipe/beacon/message/MessageProvider.java @@ -0,0 +1,17 @@ +package io.xpipe.beacon.message; + +import io.xpipe.beacon.socket.SocketServer; + +import java.io.InputStream; +import java.net.Socket; + +public interface MessageProvider { + + String getId(); + + Class getRequestClass(); + + Class getResponseClass(); + + default void handleRequest(SocketServer server, RQ msg, InputStream body, Socket clientSocket) throws Exception {} +} diff --git a/beacon/src/main/java/io/xpipe/beacon/message/MessageProviders.java b/beacon/src/main/java/io/xpipe/beacon/message/MessageProviders.java new file mode 100644 index 00000000..fca4a74d --- /dev/null +++ b/beacon/src/main/java/io/xpipe/beacon/message/MessageProviders.java @@ -0,0 +1,42 @@ +package io.xpipe.beacon.message; + +import java.util.Optional; +import java.util.ServiceLoader; +import java.util.Set; +import java.util.stream.Collectors; + +public class MessageProviders { + + private static Set ALL; + + private static void loadAll() { + if (ALL == null) { + ALL = ServiceLoader.load(MessageProvider.class).stream() + .map(ServiceLoader.Provider::get).collect(Collectors.toSet()); + } + } + + public static Optional> byId(String name) { + loadAll(); + var r = ALL.stream().filter(d -> d.getId().equals(name)).findAny(); + return Optional.ofNullable(r.orElse(null)); + } + + + public static Optional> byRequest(RQ req) { + loadAll(); + var r = ALL.stream().filter(d -> d.getRequestClass().equals(req.getClass())).findAny(); + return Optional.ofNullable(r.orElse(null)); + } + + public static Optional> byResponse(RP rep) { + loadAll(); + var r = ALL.stream().filter(d -> d.getResponseClass().equals(rep.getClass())).findAny(); + return Optional.ofNullable(r.orElse(null)); + } + + public static Set getAll() { + loadAll(); + return ALL; + } +} diff --git a/beacon/src/main/java/io/xpipe/beacon/message/RequestMessage.java b/beacon/src/main/java/io/xpipe/beacon/message/RequestMessage.java new file mode 100644 index 00000000..945ec169 --- /dev/null +++ b/beacon/src/main/java/io/xpipe/beacon/message/RequestMessage.java @@ -0,0 +1,5 @@ +package io.xpipe.beacon.message; + +public interface RequestMessage { + +} diff --git a/beacon/src/main/java/io/xpipe/beacon/message/ResponseMessage.java b/beacon/src/main/java/io/xpipe/beacon/message/ResponseMessage.java new file mode 100644 index 00000000..9cb5a834 --- /dev/null +++ b/beacon/src/main/java/io/xpipe/beacon/message/ResponseMessage.java @@ -0,0 +1,5 @@ +package io.xpipe.beacon.message; + +public interface ResponseMessage { + +} diff --git a/beacon/src/main/java/io/xpipe/beacon/message/ServerErrorMessage.java b/beacon/src/main/java/io/xpipe/beacon/message/ServerErrorMessage.java new file mode 100644 index 00000000..0258177b --- /dev/null +++ b/beacon/src/main/java/io/xpipe/beacon/message/ServerErrorMessage.java @@ -0,0 +1,12 @@ +package io.xpipe.beacon.message; + +import io.xpipe.beacon.ServerException; + +import java.util.UUID; + +public record ServerErrorMessage(UUID requestId, Throwable error) { + + public void throwError() throws ServerException { + throw new ServerException(error.getMessage(), error); + } +} diff --git a/beacon/src/main/java/io/xpipe/beacon/message/impl/ListCollectionsExchange.java b/beacon/src/main/java/io/xpipe/beacon/message/impl/ListCollectionsExchange.java new file mode 100644 index 00000000..522e2305 --- /dev/null +++ b/beacon/src/main/java/io/xpipe/beacon/message/impl/ListCollectionsExchange.java @@ -0,0 +1,40 @@ +package io.xpipe.beacon.message.impl; + +import io.xpipe.beacon.socket.SocketServer; +import io.xpipe.beacon.message.MessageProvider; +import io.xpipe.beacon.message.RequestMessage; +import io.xpipe.beacon.message.ResponseMessage; + +import java.io.InputStream; +import java.net.Socket; +import java.util.List; + +public class ListCollectionsExchange implements MessageProvider { + + @Override + public String getId() { + return "listCollections"; + } + + @Override + public Class getRequestClass() { + return Request.class; + } + + @Override + public Class getResponseClass() { + return Response.class; + } + + public static record Request() implements RequestMessage { + + } + + public static record Entry(String name, int count) { + + } + + public static record Response(List entries) implements ResponseMessage { + + } +} diff --git a/beacon/src/main/java/io/xpipe/beacon/message/impl/ListEntriesExchange.java b/beacon/src/main/java/io/xpipe/beacon/message/impl/ListEntriesExchange.java new file mode 100644 index 00000000..f30edd10 --- /dev/null +++ b/beacon/src/main/java/io/xpipe/beacon/message/impl/ListEntriesExchange.java @@ -0,0 +1,41 @@ +package io.xpipe.beacon.message.impl; + +import io.xpipe.beacon.socket.SocketServer; +import io.xpipe.beacon.message.MessageProvider; +import io.xpipe.beacon.message.RequestMessage; +import io.xpipe.beacon.message.ResponseMessage; +import io.xpipe.storage.DataSourceStorage; + +import java.io.InputStream; +import java.net.Socket; +import java.util.List; + +public class ListEntriesExchange implements MessageProvider { + + @Override + public String getId() { + return "listEntries"; + } + + @Override + public Class getRequestClass() { + return Request.class; + } + + @Override + public Class getResponseClass() { + return Response.class; + } + + public static record Request(String collection) implements RequestMessage { + + } + + private static record Entry(String name, String type, String description, String date, String size) { + + } + + public static record Response(List entries) implements ResponseMessage { + + } +} diff --git a/beacon/src/main/java/io/xpipe/beacon/message/impl/ModeExchange.java b/beacon/src/main/java/io/xpipe/beacon/message/impl/ModeExchange.java new file mode 100644 index 00000000..520fd3fd --- /dev/null +++ b/beacon/src/main/java/io/xpipe/beacon/message/impl/ModeExchange.java @@ -0,0 +1,37 @@ +package io.xpipe.beacon.message.impl; + +import io.xpipe.app.core.OperationMode; +import io.xpipe.beacon.socket.SocketServer; +import io.xpipe.beacon.message.MessageProvider; +import io.xpipe.beacon.message.RequestMessage; +import io.xpipe.beacon.message.ResponseMessage; + +import java.io.InputStream; +import java.net.Socket; +import java.util.stream.Collectors; + +public class ModeExchange implements MessageProvider { + + @Override + public String getId() { + return "mode"; + } + + @Override + public Class getRequestClass() { + return ModeExchange.Request.class; + } + + @Override + public Class getResponseClass() { + return ModeExchange.Response.class; + } + + public static record Request(String modeId) implements RequestMessage { + + } + + public static record Response() implements ResponseMessage { + + } +} diff --git a/beacon/src/main/java/io/xpipe/beacon/message/impl/ReadStructureExchange.java b/beacon/src/main/java/io/xpipe/beacon/message/impl/ReadStructureExchange.java new file mode 100644 index 00000000..85a671f6 --- /dev/null +++ b/beacon/src/main/java/io/xpipe/beacon/message/impl/ReadStructureExchange.java @@ -0,0 +1,37 @@ +package io.xpipe.beacon.message.impl; + +import io.xpipe.beacon.socket.SocketServer; +import io.xpipe.beacon.message.MessageProvider; +import io.xpipe.beacon.message.RequestMessage; +import io.xpipe.beacon.message.ResponseMessage; +import io.xpipe.core.source.DataSourceId; +import io.xpipe.storage.DataSourceStorage; + +import java.io.InputStream; +import java.net.Socket; + +public class ReadStructureExchange implements MessageProvider { + + @Override + public String getId() { + return "readStructure"; + } + + @Override + public Class getRequestClass() { + return Request.class; + } + + @Override + public Class getResponseClass() { + return Response.class; + } + + public static record Request(DataSourceId id) implements RequestMessage { + + } + + public static record Response() implements ResponseMessage { + + } +} diff --git a/beacon/src/main/java/io/xpipe/beacon/message/impl/ReadTableDataExchange.java b/beacon/src/main/java/io/xpipe/beacon/message/impl/ReadTableDataExchange.java new file mode 100644 index 00000000..9375abc0 --- /dev/null +++ b/beacon/src/main/java/io/xpipe/beacon/message/impl/ReadTableDataExchange.java @@ -0,0 +1,37 @@ +package io.xpipe.beacon.message.impl; + +import io.xpipe.beacon.socket.SocketServer; +import io.xpipe.beacon.message.MessageProvider; +import io.xpipe.beacon.message.RequestMessage; +import io.xpipe.beacon.message.ResponseMessage; +import io.xpipe.core.source.DataSourceId; +import io.xpipe.storage.DataSourceStorage; + +import java.io.InputStream; +import java.net.Socket; + +public class ReadTableDataExchange implements MessageProvider { + + @Override + public String getId() { + return "readTable"; + } + + @Override + public Class getRequestClass() { + return ReadTableDataExchange.Request.class; + } + + @Override + public Class getResponseClass() { + return ReadTableDataExchange.Response.class; + } + + public static record Request(DataSourceId sourceId, int maxLines) implements RequestMessage { + + } + + public static record Response() implements ResponseMessage { + + } +} diff --git a/beacon/src/main/java/io/xpipe/beacon/message/impl/ReadTableInfoExchange.java b/beacon/src/main/java/io/xpipe/beacon/message/impl/ReadTableInfoExchange.java new file mode 100644 index 00000000..c725df80 --- /dev/null +++ b/beacon/src/main/java/io/xpipe/beacon/message/impl/ReadTableInfoExchange.java @@ -0,0 +1,38 @@ +package io.xpipe.beacon.message.impl; + +import io.xpipe.beacon.socket.SocketServer; +import io.xpipe.beacon.message.MessageProvider; +import io.xpipe.beacon.message.RequestMessage; +import io.xpipe.beacon.message.ResponseMessage; +import io.xpipe.core.data.type.DataType; +import io.xpipe.core.source.DataSourceId; +import io.xpipe.storage.DataSourceStorage; + +import java.io.InputStream; +import java.net.Socket; + +public class ReadTableInfoExchange implements MessageProvider { + + @Override + public String getId() { + return "readTableInfo"; + } + + @Override + public Class getRequestClass() { + return ReadTableInfoExchange.Request.class; + } + + @Override + public Class getResponseClass() { + return ReadTableInfoExchange.Response.class; + } + + public static record Request(DataSourceId sourceId) implements RequestMessage { + + } + + public static record Response(DataSourceId sourceId, DataType dataType, int rowCount) implements ResponseMessage { + + } +} diff --git a/beacon/src/main/java/io/xpipe/beacon/message/impl/StatusExchange.java b/beacon/src/main/java/io/xpipe/beacon/message/impl/StatusExchange.java new file mode 100644 index 00000000..74a6be68 --- /dev/null +++ b/beacon/src/main/java/io/xpipe/beacon/message/impl/StatusExchange.java @@ -0,0 +1,36 @@ +package io.xpipe.beacon.message.impl; + +import io.xpipe.app.core.OperationMode; +import io.xpipe.beacon.socket.SocketServer; +import io.xpipe.beacon.message.MessageProvider; +import io.xpipe.beacon.message.RequestMessage; +import io.xpipe.beacon.message.ResponseMessage; + +import java.io.InputStream; +import java.net.Socket; + +public class StatusExchange implements MessageProvider { + + @Override + public String getId() { + return "status"; + } + + @Override + public Class getRequestClass() { + return Request.class; + } + + @Override + public Class getResponseClass() { + return Response.class; + } + + public static record Request() implements RequestMessage { + + } + + public static record Response(String mode) implements ResponseMessage { + + } +} diff --git a/beacon/src/main/java/io/xpipe/beacon/message/impl/VersionExchange.java b/beacon/src/main/java/io/xpipe/beacon/message/impl/VersionExchange.java new file mode 100644 index 00000000..e008520d --- /dev/null +++ b/beacon/src/main/java/io/xpipe/beacon/message/impl/VersionExchange.java @@ -0,0 +1,43 @@ +package io.xpipe.beacon.message.impl; + +import io.xpipe.app.core.AppInstallation; +import io.xpipe.beacon.socket.SocketServer; +import io.xpipe.beacon.message.MessageProvider; +import io.xpipe.beacon.message.RequestMessage; +import io.xpipe.beacon.message.ResponseMessage; + +import java.io.InputStream; +import java.net.Socket; + +public class VersionExchange implements MessageProvider { + + @Override + public String getId() { + return "version"; + } + + @Override + public Class getRequestClass() { + return VersionExchange.Request.class; + } + + @Override + public Class getResponseClass() { + return VersionExchange.Response.class; + } + + public static record Request() implements RequestMessage { + + } + + public static class Response implements ResponseMessage { + + public final String version; + public final String jvmVersion; + + public Response(String version, String jvmVersion) { + this.version = version; + this.jvmVersion = jvmVersion; + } + } +} \ No newline at end of file diff --git a/beacon/src/main/java/io/xpipe/beacon/socket/SocketClient.java b/beacon/src/main/java/io/xpipe/beacon/socket/SocketClient.java new file mode 100644 index 00000000..8f2ca446 --- /dev/null +++ b/beacon/src/main/java/io/xpipe/beacon/socket/SocketClient.java @@ -0,0 +1,200 @@ +package io.xpipe.beacon.socket; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.JsonNodeFactory; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.fasterxml.jackson.databind.node.TextNode; +import io.xpipe.beacon.ClientException; +import io.xpipe.beacon.ConnectorException; +import io.xpipe.beacon.ServerException; +import io.xpipe.beacon.message.*; +import io.xpipe.core.util.JacksonHelper; +import org.apache.commons.lang3.function.FailableBiConsumer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.InetAddress; +import java.net.Socket; +import java.util.Arrays; +import java.util.Optional; +import java.util.function.Consumer; + +import static io.xpipe.beacon.socket.Sockets.BODY_SEPARATOR; + +public class SocketClient { + + private static final Logger logger = LoggerFactory.getLogger(SocketClient.class); + + private final Socket socket; + private final InputStream in; + private final OutputStream out; + + public SocketClient() throws IOException { + socket = new Socket(InetAddress.getLoopbackAddress(), SocketServer.determineUsedPort()); + in = socket.getInputStream(); + out = socket.getOutputStream(); + } + + public void close() throws ConnectorException { + try { + socket.close(); + } catch (IOException ex) { + throw new ConnectorException("Couldn't close socket", ex); + } + } + + public void exchange( + REQ req, + Consumer output, + FailableBiConsumer responseConsumer, + boolean keepOpen) throws ConnectorException, ClientException, ServerException { + try { + sendRequest(req); + if (output != null) { + out.write(BODY_SEPARATOR); + output.accept(out); + } + + var res = this.receiveResponse(); + var sep = in.readNBytes(BODY_SEPARATOR.length); + if (!Arrays.equals(BODY_SEPARATOR, sep)) { + throw new ConnectorException("Invalid body separator"); + } + + responseConsumer.accept(res, in); + } catch (IOException ex) { + throw new ConnectorException("Couldn't communicate with socket", ex); + } finally { + if (!keepOpen) { + close(); + } + } + } + + public RES simpleExchange(REQ req) + throws ServerException, ConnectorException, ClientException { + try { + sendRequest(req); + return this.receiveResponse(); + } finally { + close(); + } + } + + private void sendRequest(T req) throws ClientException, ConnectorException { + ObjectNode json = JacksonHelper.newMapper().valueToTree(req); + var prov = MessageProviders.byRequest(req); + if (prov.isEmpty()) { + throw new ClientException("Unknown request class " + req.getClass()); + } + + json.set("type", new TextNode(prov.get().getId())); + json.set("phase", new TextNode("request")); + //json.set("id", new TextNode(UUID.randomUUID().toString())); + var msg = JsonNodeFactory.instance.objectNode(); + msg.set("xPipeMessage", json); + + + try { + var mapper = JacksonHelper.newMapper().disable(JsonGenerator.Feature.AUTO_CLOSE_TARGET); + var gen = mapper.createGenerator(socket.getOutputStream()); + gen.writeTree(msg); + } catch (IOException ex) { + throw new ConnectorException("Couldn't write to socket", ex); + } + } + + private T receiveResponse() throws ConnectorException, ClientException, ServerException { + JsonNode read; + try { + var in = socket.getInputStream(); + read = JacksonHelper.newMapper().disable(JsonParser.Feature.AUTO_CLOSE_SOURCE).readTree(in); + } catch (IOException ex) { + throw new ConnectorException("Couldn't read from socket", ex); + } + + if (Sockets.debugEnabled()) { + System.out.println("Recieved response:"); + System.out.println(read.toPrettyString()); + } + + var se = parseServerError(read); + if (se.isPresent()) { + se.get().throwError(); + } + + var ce = parseClientError(read); + if (ce.isPresent()) { + throw ce.get().throwException(); + } + + return parseResponse(read); + } + + private Optional parseClientError(JsonNode node) throws ConnectorException { + ObjectNode content = (ObjectNode) node.get("xPipeClientError"); + if (content == null) { + return Optional.empty(); + } + + try { + var reader = JacksonHelper.newMapper().readerFor(ClientErrorMessage.class); + return Optional.of(reader.readValue(content)); + } catch (IOException ex) { + throw new ConnectorException("Couldn't parse client error message", ex); + } + } + + private Optional parseServerError(JsonNode node) throws ConnectorException { + ObjectNode content = (ObjectNode) node.get("xPipeServerError"); + if (content == null) { + return Optional.empty(); + } + + try { + var reader = JacksonHelper.newMapper().readerFor(ServerErrorMessage.class); + return Optional.of(reader.readValue(content)); + } catch (IOException ex) { + throw new ConnectorException("Couldn't parse server error message", ex); + } + } + + private T parseResponse(JsonNode header) throws ConnectorException { + ObjectNode content = (ObjectNode) header.required("xPipeMessage"); + + var type = content.required("type").textValue(); + var phase = content.required("phase").textValue(); + //var requestId = UUID.fromString(content.required("id").textValue()); + if (!phase.equals("response")) { + throw new IllegalArgumentException(); + } + content.remove("type"); + content.remove("phase"); + //content.remove("id"); + + var prov = MessageProviders.byId(type); + if (prov.isEmpty()) { + throw new IllegalArgumentException("Unknown response id " + type); + } + + try { + var reader = JacksonHelper.newMapper().readerFor(prov.get().getResponseClass()); + return reader.readValue(content); + } catch (IOException ex) { + throw new ConnectorException("Couldn't parse response", ex); + } + } + + public InputStream getInputStream() { + return in; + } + + public OutputStream getOutputStream() { + return out; + } +} diff --git a/beacon/src/main/java/io/xpipe/beacon/socket/SocketServer.java b/beacon/src/main/java/io/xpipe/beacon/socket/SocketServer.java new file mode 100644 index 00000000..6b88a9ad --- /dev/null +++ b/beacon/src/main/java/io/xpipe/beacon/socket/SocketServer.java @@ -0,0 +1,186 @@ +package io.xpipe.beacon.socket; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.JsonNodeFactory; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.fasterxml.jackson.databind.node.TextNode; +import io.xpipe.beacon.message.*; +import io.xpipe.core.util.JacksonHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.ServerSocket; +import java.net.Socket; +import java.net.SocketException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.UUID; + +public class SocketServer { + + private static final String BEACON_PORT_PROP = "io.xpipe.beacon.port"; + private static final Logger logger = LoggerFactory.getLogger(SocketServer.class); + + private static final int DEFAULT_PORT = 21721; + private static SocketServer INSTANCE; + private final int port; + private ServerSocket socket; + private boolean running; + private int connectionCounter; + + private SocketServer(int port) { + this.port = port; + } + + public static Path getUserDir() { + return Path.of(System.getProperty("user.home"), ".xpipe"); + } + + public static int determineUsedPort() { + if (System.getProperty(BEACON_PORT_PROP) != null) { + return Integer.parseInt(System.getProperty(BEACON_PORT_PROP)); + } + + var file = getUserDir().resolve("port"); + if (Files.exists(file)) { + try { + return Integer.parseInt(Files.readString(file)); + } catch (IOException ex) { + ex.printStackTrace(); + } + } + + return DEFAULT_PORT; + } + + public static void init() throws IOException { + var port = determineUsedPort(); + INSTANCE = new SocketServer(port); + INSTANCE.createSocket(); + } + + public static void reset() { + INSTANCE.stop(); + INSTANCE = null; + } + + private void stop() { + + } + + private void createSocket() throws IOException { + socket = new ServerSocket(port, 1000, InetAddress.getLoopbackAddress()); + running = true; + var t = new Thread(() -> { + while (running) { + try { + var clientSocket = socket.accept(); + handleClientConnection(clientSocket); + } catch (Exception ex) { + ex.printStackTrace(); + } + connectionCounter++; + } + }, "socket server"); + t.setDaemon(true); + t.start(); + } + + private void handleClientConnection(Socket clientSocket) { + var t = new Thread(() -> { + try { + var in = clientSocket.getInputStream(); + var read = JacksonHelper.newMapper().disable(JsonParser.Feature.AUTO_CLOSE_SOURCE).readTree(in); + logger.debug("Received request: \n" + read.toPrettyString()); + + var req = parseRequest(read); + var prov = MessageProviders.byRequest(req).get(); + prov.onRequestReceived(this, req, in, clientSocket); + } catch (SocketException ex) { + try { + ex.printStackTrace(); + } catch (Exception ioex) { + ioex.printStackTrace(); + } + } catch (Exception ex) { + try { + ex.printStackTrace(); + sendServerErrorResponse(clientSocket, ex); + } catch (Exception ioex) { + ioex.printStackTrace(); + } + } finally { + try { + clientSocket.close(); + } catch (Exception ioex) { + ioex.printStackTrace(); + } + } + }, "socket connection #" + connectionCounter); + t.setDaemon(true); + t.start(); + } + + public void prepareBody(Socket outSocket) throws IOException { + outSocket.getOutputStream().write(Sockets.BODY_SEPARATOR); + } + + public void sendResponse(Socket outSocket, T obj) throws Exception { + ObjectNode json = JacksonHelper.newMapper().valueToTree(obj); + var prov = MessageProviders.byResponse(obj).get(); + json.set("type", new TextNode(prov.getId())); + json.set("phase", new TextNode("response")); + var msg = JsonNodeFactory.instance.objectNode(); + msg.set("xPipeMessage", json); + + var mapper = JacksonHelper.newMapper().disable(JsonGenerator.Feature.AUTO_CLOSE_TARGET); + var gen = mapper.createGenerator(outSocket.getOutputStream()); + gen.writeTree(msg); + } + + public void sendClientErrorResponse(Socket outSocket, String message) throws Exception { + var err = new ClientErrorMessage(message); + ObjectNode json = JacksonHelper.newMapper().valueToTree(err); + var msg = JsonNodeFactory.instance.objectNode(); + msg.set("xPipeClientError", json); + + var mapper = JacksonHelper.newMapper().disable(JsonGenerator.Feature.AUTO_CLOSE_TARGET); + var gen = mapper.createGenerator(outSocket.getOutputStream()); + gen.writeTree(msg); + } + + public void sendServerErrorResponse(Socket outSocket, Throwable ex) throws Exception { + var err = new ServerErrorMessage(UUID.randomUUID(), ex); + ObjectNode json = JacksonHelper.newMapper().valueToTree(err); + var msg = JsonNodeFactory.instance.objectNode(); + msg.set("xPipeServerError", json); + + var mapper = JacksonHelper.newMapper().disable(JsonGenerator.Feature.AUTO_CLOSE_TARGET); + var gen = mapper.createGenerator(outSocket.getOutputStream()); + gen.writeTree(msg); + } + + private T parseRequest(JsonNode header) throws Exception { + ObjectNode content = (ObjectNode) header.required("xPipeMessage"); + + var type = content.required("type").textValue(); + var phase = content.required("phase").textValue(); + if (!phase.equals("request")) { + throw new IllegalArgumentException(); + } + content.remove("type"); + content.remove("phase"); + + var prov = MessageProviders.byId(type); + if (prov.isEmpty()) { + throw new IllegalArgumentException(); + } + + var reader = JacksonHelper.newMapper().readerFor(prov.get().getRequestClass()); + return reader.readValue(content); + } +} diff --git a/beacon/src/main/java/io/xpipe/beacon/socket/Sockets.java b/beacon/src/main/java/io/xpipe/beacon/socket/Sockets.java new file mode 100644 index 00000000..afe81499 --- /dev/null +++ b/beacon/src/main/java/io/xpipe/beacon/socket/Sockets.java @@ -0,0 +1,16 @@ +package io.xpipe.beacon.socket; + +import java.nio.charset.StandardCharsets; + +public class Sockets { + + public static final byte[] BODY_SEPARATOR = "\n\n".getBytes(StandardCharsets.UTF_8); + private static final String DEBUG_PROP = "io.xpipe.beacon.debugOutput"; + + public static boolean debugEnabled() { + if (System.getProperty(DEBUG_PROP) != null) { + return Boolean.parseBoolean(System.getProperty(DEBUG_PROP)); + } + return false; + } +} diff --git a/beacon/src/main/java/module-info.java b/beacon/src/main/java/module-info.java new file mode 100644 index 00000000..861899a3 --- /dev/null +++ b/beacon/src/main/java/module-info.java @@ -0,0 +1,28 @@ +import io.xpipe.app.core.BeaconProvider; +import io.xpipe.beacon.BeaconProviderImpl; +import io.xpipe.beacon.message.MessageProvider; +import io.xpipe.beacon.message.impl.*; + +module io.xpipe.beacon { + exports io.xpipe.beacon; + exports io.xpipe.beacon.message; + exports io.xpipe.beacon.message.impl; + requires org.slf4j; + requires org.slf4j.simple; + + requires com.fasterxml.jackson.core; + requires com.fasterxml.jackson.databind; + requires com.fasterxml.jackson.module.paramnames; + requires io.xpipe.core; + + opens io.xpipe.beacon; + opens io.xpipe.beacon.message; + opens io.xpipe.beacon.message.impl; + exports io.xpipe.beacon.socket; + opens io.xpipe.beacon.socket; + + requires org.apache.commons.lang; + + uses MessageProvider; + provides MessageProvider with ListCollectionsExchange, ListEntriesExchange, ReadTableDataExchange, VersionExchange, StatusExchange, ModeExchange, ReadTableInfoExchange; +} \ No newline at end of file diff --git a/build.gradle b/build.gradle new file mode 100644 index 00000000..e12bb160 --- /dev/null +++ b/build.gradle @@ -0,0 +1,9 @@ +plugins { + id 'java' +} + +group 'io.xpipe' + +repositories { + mavenCentral() +} diff --git a/core/build.gradle b/core/build.gradle new file mode 100644 index 00000000..16f96070 --- /dev/null +++ b/core/build.gradle @@ -0,0 +1,17 @@ +plugins { + id 'java' + id "org.moditect.gradleplugin" version "1.0.0-rc3" +} + +java { + modularity.inferModulePath = true + sourceCompatibility = JavaVersion.VERSION_17 + targetCompatibility = JavaVersion.VERSION_17 +} + +repositories { + mavenCentral() +} + +apply from: "$rootDir/deps/commons.gradle" +apply from: "$rootDir/deps/jackson.gradle" diff --git a/core/src/main/java/io/xpipe/core/data/DataStructureNode.java b/core/src/main/java/io/xpipe/core/data/DataStructureNode.java new file mode 100644 index 00000000..2122d362 --- /dev/null +++ b/core/src/main/java/io/xpipe/core/data/DataStructureNode.java @@ -0,0 +1,75 @@ +package io.xpipe.core.data; + +import io.xpipe.core.data.type.DataType; + +import java.util.Iterator; +import java.util.Optional; +import java.util.Spliterator; +import java.util.function.Consumer; +import java.util.stream.Stream; + +public abstract class DataStructureNode implements Iterable { + + protected abstract String getName(); + + private UnsupportedOperationException unuspported(String s) { + return new UnsupportedOperationException(getName() + " does not support " + s); + } + + public boolean isTuple() { + return false; + } + + public boolean isArray() { + return false; + } + + public boolean isValue() { + return false; + } + + public int size() { + throw unuspported("size computation"); + } + + public abstract DataType getDataType(); + + public DataStructureNode at(int index) { + throw unuspported("integer indexing"); + } + + public DataStructureNode forKey(String name) { + throw unuspported("name indexing"); + } + + public Optional forKeyIfPresent(String name) { + throw unuspported("name indexing"); + } + + public int asInt() { + throw unuspported("integer conversion"); + } + + public String asString() { + throw unuspported("string conversion"); + } + + public Stream stream() { + throw unuspported("stream creation"); + } + + @Override + public void forEach(Consumer action) { + throw unuspported("for each"); + } + + @Override + public Spliterator spliterator() { + throw unuspported("spliterator creation"); + } + + @Override + public Iterator iterator() { + throw unuspported("iterator creation"); + } +} diff --git a/core/src/main/java/io/xpipe/core/data/DataStructureNodeAcceptor.java b/core/src/main/java/io/xpipe/core/data/DataStructureNodeAcceptor.java new file mode 100644 index 00000000..a489bc32 --- /dev/null +++ b/core/src/main/java/io/xpipe/core/data/DataStructureNodeAcceptor.java @@ -0,0 +1,6 @@ +package io.xpipe.core.data; + +public interface DataStructureNodeAcceptor { + + boolean accept(T node) throws Exception; +} diff --git a/core/src/main/java/io/xpipe/core/data/generic/ArrayNode.java b/core/src/main/java/io/xpipe/core/data/generic/ArrayNode.java new file mode 100644 index 00000000..89bdf72c --- /dev/null +++ b/core/src/main/java/io/xpipe/core/data/generic/ArrayNode.java @@ -0,0 +1,71 @@ +package io.xpipe.core.data.generic; + +import io.xpipe.core.data.DataStructureNode; +import io.xpipe.core.data.type.ArrayType; +import io.xpipe.core.data.type.DataType; + +import java.util.*; +import java.util.function.Consumer; +import java.util.stream.Stream; + +public class ArrayNode extends DataStructureNode { + + private final List valueNodes; + + private ArrayNode(List valueNodes) { + this.valueNodes = valueNodes; + } + + public static ArrayNode wrap(List valueNodes) { + return new ArrayNode(valueNodes); + } + + public static ArrayNode copy(List valueNodes) { + return new ArrayNode(new ArrayList<>(valueNodes)); + } + + @Override + public Stream stream() { + return Collections.unmodifiableList(valueNodes).stream(); + } + + @Override + public boolean isArray() { + return true; + } + + @Override + public int size() { + return valueNodes.size(); + } + + @Override + protected String getName() { + return "array node"; + } + + @Override + public DataType getDataType() { + return ArrayType.of(valueNodes.stream().map(DataStructureNode::getDataType).toList()); + } + + @Override + public DataStructureNode at(int index) { + return valueNodes.get(index); + } + + @Override + public void forEach(Consumer action) { + valueNodes.forEach(action); + } + + @Override + public Spliterator spliterator() { + return valueNodes.spliterator(); + } + + @Override + public Iterator iterator() { + return valueNodes.iterator(); + } +} diff --git a/core/src/main/java/io/xpipe/core/data/generic/ArrayReader.java b/core/src/main/java/io/xpipe/core/data/generic/ArrayReader.java new file mode 100644 index 00000000..12271985 --- /dev/null +++ b/core/src/main/java/io/xpipe/core/data/generic/ArrayReader.java @@ -0,0 +1,55 @@ +package io.xpipe.core.data.generic; + +import io.xpipe.core.data.DataStructureNode; + +import java.util.ArrayList; +import java.util.List; + +public class ArrayReader implements DataStructureNodeReader { + + private final List nodes; + private int length; + private boolean hasSeenEnd; + private int currentIndex = 0; + private DataStructureNodeReader currentReader; + + public ArrayReader(int length) { + this.length = length; + this.nodes = new ArrayList<>(length); + } + + @Override + public void onArrayStart(String name, int length) { + DataStructureNodeReader.super.onArrayStart(name, length); + } + + @Override + public void onArrayEnd() { + DataStructureNodeReader.super.onArrayEnd(); + } + + @Override + public void onTupleStart(String name, int length) { + DataStructureNodeReader.super.onTupleStart(name, length); + } + + @Override + public void onTupleEnd() { + DataStructureNodeReader.super.onTupleEnd(); + } + + @Override + public void onValue(String name, byte[] value) { + DataStructureNodeReader.super.onValue(name, value); + } + + @Override + public boolean isDone() { + return false; + } + + @Override + public DataStructureNode create() { + return null; + } +} diff --git a/core/src/main/java/io/xpipe/core/data/generic/DataStreamCallback.java b/core/src/main/java/io/xpipe/core/data/generic/DataStreamCallback.java new file mode 100644 index 00000000..05cbd8a2 --- /dev/null +++ b/core/src/main/java/io/xpipe/core/data/generic/DataStreamCallback.java @@ -0,0 +1,30 @@ +package io.xpipe.core.data.generic; + +import java.util.function.Consumer; + +public interface DataStreamCallback { + + static DataStreamCallback flat(Consumer con) { + return new DataStreamCallback() { + @Override + public void onValue(String name, byte[] value) { + con.accept(value); + } + }; + } + + default void onArrayStart(String name, int length) { + } + + default void onArrayEnd() { + } + + default void onTupleStart(String name, int length) { + } + + default void onTupleEnd() { + } + + default void onValue(String name, byte[] value) { + } +} diff --git a/core/src/main/java/io/xpipe/core/data/generic/DataStreamReader.java b/core/src/main/java/io/xpipe/core/data/generic/DataStreamReader.java new file mode 100644 index 00000000..c462db87 --- /dev/null +++ b/core/src/main/java/io/xpipe/core/data/generic/DataStreamReader.java @@ -0,0 +1,59 @@ +package io.xpipe.core.data.generic; + +import java.io.IOException; +import java.io.InputStream; + +public class DataStreamReader { + + private static final int TUPLE_ID = 1; + private static final int ARRAY_ID = 2; + private static final int VALUE_ID = 3; + + public static void read(InputStream in, DataStreamCallback cb) throws IOException { + var b = in.read(); + switch (b) { + case TUPLE_ID -> { + readTuple(in, cb); + } + case ARRAY_ID -> { + readArray(in, cb); + } + case VALUE_ID -> { + readValue(in, cb); + } + default -> throw new IllegalStateException("Unexpected value: " + b); + } + } + + private static String readName(InputStream in) throws IOException { + var nameLength = in.read(); + return new String(in.readNBytes(nameLength)); + } + + private static void readTuple(InputStream in, DataStreamCallback cb) throws IOException { + var name = readName(in); + var size = in.read(); + cb.onTupleStart(name, size); + for (int i = 0; i < size; i++) { + read(in, cb); + } + cb.onTupleEnd(); + } + + private static void readArray(InputStream in, DataStreamCallback cb) throws IOException { + var name = readName(in); + var size = in.read(); + cb.onArrayStart(name, size); + for (int i = 0; i < size; i++) { + read(in, cb); + } + cb.onArrayEnd(); + } + + private static void readValue(InputStream in, DataStreamCallback cb) throws IOException { + var name = readName(in); + var size = in.read(); + var data = in.readNBytes(size); + cb.onValue(name, data); + } +} diff --git a/core/src/main/java/io/xpipe/core/data/generic/DataStreamWriter.java b/core/src/main/java/io/xpipe/core/data/generic/DataStreamWriter.java new file mode 100644 index 00000000..78ccbfd7 --- /dev/null +++ b/core/src/main/java/io/xpipe/core/data/generic/DataStreamWriter.java @@ -0,0 +1,33 @@ +package io.xpipe.core.data.generic; + +import io.xpipe.core.data.DataStructureNode; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; + +public class DataStreamWriter { + + private static final int TUPLE_ID = 1; + private static final int ARRAY_ID = 2; + private static final int VALUE_ID = 3; + + public static void write(OutputStream out, DataStructureNode node) throws IOException { + if (node.isTuple()) { + writeTuple(out, (TupleNode) node); + } + } + + private static void writeName(OutputStream out, String s) throws IOException { + out.write(s.length()); + out.write(s.getBytes(StandardCharsets.UTF_8)); + } + + private static void writeTuple(OutputStream out, TupleNode tuple) throws IOException { + out.write(TUPLE_ID); + for (int i = 0; i < tuple.size(); i++) { + writeName(out, tuple.nameAt(i)); + write(out, tuple.at(i)); + } + } +} diff --git a/core/src/main/java/io/xpipe/core/data/generic/DataStructureNodePointer.java b/core/src/main/java/io/xpipe/core/data/generic/DataStructureNodePointer.java new file mode 100644 index 00000000..066436c1 --- /dev/null +++ b/core/src/main/java/io/xpipe/core/data/generic/DataStructureNodePointer.java @@ -0,0 +1,239 @@ +package io.xpipe.core.data.generic; + +import io.xpipe.core.data.DataStructureNode; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.function.Function; +import java.util.function.Predicate; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +public class DataStructureNodePointer { + + private final List path; + + public DataStructureNodePointer(List path) { + this.path = path; + + if (path.size() == 0) { + throw new IllegalArgumentException(); + } + } + + public static Builder builder() { + return new Builder(); + } + + public static Builder fromBase(DataStructureNodePointer pointer) { + return new Builder(pointer); + } + + public String toString() { + return "/" + path.stream().map(Element::toString).collect(Collectors.joining("/")); + } + + public int size() { + return path.size(); + } + + public boolean isValid(DataStructureNode input) { + return get(input) != null; + } + + public DataStructureNode get(DataStructureNode root) { + DataStructureNode current = root; + for (Element value : path) { + var found = value.tryMatch(current); + if (found == null) { + return null; + } else { + current = found; + } + } + return current; + } + + public Optional getIfPresent(DataStructureNode root) { + return Optional.ofNullable(get(root)); + } + + public List getPath() { + return path; + } + + public static interface Element { + + DataStructureNode tryMatch(DataStructureNode n); + + default String getKey(DataStructureNode n) { + return null; + } + } + + public static final record NameElement(String name) implements Element { + + @Override + public DataStructureNode tryMatch(DataStructureNode n) { + return n.forKeyIfPresent(name).orElse(null); + } + + @Override + public String getKey(DataStructureNode n) { + return name; + } + + @Override + public String toString() { + return name; + } + } + + public static final record IndexElement(int index) implements Element { + + @Override + public DataStructureNode tryMatch(DataStructureNode n) { + if (n.size() > index) { + return n.at(index); + } + return null; + } + + @Override + public String toString() { + return "[" + index + "]"; + } + } + + public static final record SupplierElement(Supplier keySupplier) implements Element { + + @Override + public DataStructureNode tryMatch(DataStructureNode n) { + var name = keySupplier.get(); + if (name != null) { + return n.forKeyIfPresent(name).orElse(null); + } + return null; + } + + @Override + public String getKey(DataStructureNode n) { + return keySupplier.get(); + } + + @Override + public String toString() { + return "[$s]"; + } + } + + public static final record FunctionElement(Function keyFunc) implements Element { + + @Override + public DataStructureNode tryMatch(DataStructureNode n) { + var name = keyFunc.apply(n); + if (name != null) { + return n.forKeyIfPresent(name).orElse(null); + } + return null; + } + + @Override + public String getKey(DataStructureNode n) { + return keyFunc.apply(n); + } + + @Override + public String toString() { + return "[$s]"; + } + } + + public static final record SelectorElement(Predicate selector) implements Element { + + @Override + public DataStructureNode tryMatch(DataStructureNode n) { + var res = n.stream() + .filter(selector) + .findAny(); + return res.orElse(null); + } + + @Override + public String toString() { + return "[$(...)]"; + } + } + + public static class Builder { + + private final List path; + + public Builder() { + this.path = new ArrayList<>(); + } + + private Builder(List path) { + this.path = path; + } + + public Builder(DataStructureNodePointer pointer) { + this.path = new ArrayList<>(pointer.path); + } + + public Builder copy() { + return new Builder(new ArrayList<>(path)); + } + + + public Builder name(String name) { + path.add(new NameElement(name)); + return this; + } + + public Builder index(int index) { + path.add(new IndexElement(index)); + return this; + } + + public Builder supplier(Supplier keySupplier) { + path.add(new SupplierElement(keySupplier)); + return this; + } + + public Builder function(Function keyFunc) { + path.add(new FunctionElement(keyFunc)); + return this; + } + + public Builder selector(Predicate selector) { + path.add(new SelectorElement(selector)); + return this; + } + + public Builder pointerEvaluation(DataStructureNodePointer pointer) { + return pointerEvaluation(pointer, n -> { + if (!n.isValue()) { + return null; + } + return n.asString(); + }); + } + + public Builder pointerEvaluation(DataStructureNodePointer pointer, Function converter) { + path.add(new FunctionElement((current) -> { + var res = pointer.get(current); + if (res != null) { + return converter.apply(res); + } + return null; + })); + return this; + } + + public DataStructureNodePointer build() { + return new DataStructureNodePointer(path); + } + } +} diff --git a/core/src/main/java/io/xpipe/core/data/generic/DataStructureNodeReader.java b/core/src/main/java/io/xpipe/core/data/generic/DataStructureNodeReader.java new file mode 100644 index 00000000..8bcf8a72 --- /dev/null +++ b/core/src/main/java/io/xpipe/core/data/generic/DataStructureNodeReader.java @@ -0,0 +1,10 @@ +package io.xpipe.core.data.generic; + +import io.xpipe.core.data.DataStructureNode; + +public interface DataStructureNodeReader extends DataStreamCallback { + + boolean isDone(); + + DataStructureNode create(); +} diff --git a/core/src/main/java/io/xpipe/core/data/generic/DataStructureReader.java b/core/src/main/java/io/xpipe/core/data/generic/DataStructureReader.java new file mode 100644 index 00000000..ad4bf9cb --- /dev/null +++ b/core/src/main/java/io/xpipe/core/data/generic/DataStructureReader.java @@ -0,0 +1,69 @@ +package io.xpipe.core.data.generic; + +import io.xpipe.core.data.DataStructureNode; + +public class DataStructureReader implements DataStreamCallback { + + private boolean isWrapped; + private DataStructureNodeReader reader; + + public DataStructureNode create() { + return null; + } + + + @Override + public void onArrayStart(String name, int length) { + if (reader != null) { + reader.onArrayStart(name, length); + return; + } + + if (name != null) { + reader = new TupleReader(1); + reader.onArrayStart(name, length); + } else { + reader = new ArrayReader(length); + reader.onArrayStart(null, length); + } + } + + @Override + public void onArrayEnd() { + if (reader != null) { + reader.onArrayEnd(); + } + } + + @Override + public void onTupleStart(String name, int length) { + if (reader != null) { + reader.onTupleStart(name, length); + return; + } + + if (name != null) { + reader = new TupleReader(1); + reader.onTupleStart(name, length); + } else { + reader = new TupleReader(length); + } + } + + @Override + public void onTupleEnd() { + if (reader != null) { + reader.onTupleEnd(); + if (reader.isDone()) { + + } + } + + DataStreamCallback.super.onTupleEnd(); + } + + @Override + public void onValue(String name, byte[] value) { + DataStreamCallback.super.onValue(name, value); + } +} diff --git a/core/src/main/java/io/xpipe/core/data/generic/TupleNode.java b/core/src/main/java/io/xpipe/core/data/generic/TupleNode.java new file mode 100644 index 00000000..7b27ea24 --- /dev/null +++ b/core/src/main/java/io/xpipe/core/data/generic/TupleNode.java @@ -0,0 +1,79 @@ +package io.xpipe.core.data.generic; + +import io.xpipe.core.data.DataStructureNode; +import io.xpipe.core.data.type.DataType; +import io.xpipe.core.data.type.TupleType; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Optional; + +public class TupleNode extends DataStructureNode { + + private final List names; + private final List nodes; + + private TupleNode(List names, List nodes) { + this.names = names; + this.nodes = nodes; + } + + public static TupleNode wrap(List names, List nodes) { + return new TupleNode(names, nodes); + } + + public static TupleNode copy(List names, List nodes) { + return new TupleNode(new ArrayList<>(names), new ArrayList<>(nodes)); + } + + public boolean isTuple() { + return true; + } + + @Override + public DataType getDataType() { + return TupleType.wrap(names, nodes.stream().map(DataStructureNode::getDataType).toList()); + } + + @Override + protected String getName() { + return "tuple node"; + } + + @Override + public DataStructureNode at(int index) { + return nodes.get(index); + } + + @Override + public DataStructureNode forKey(String name) { + return nodes.get(names.indexOf(name)); + } + + @Override + public Optional forKeyIfPresent(String name) { + if (!names.contains(name)) { + return Optional.empty(); + } + + return Optional.of(nodes.get(names.indexOf(name))); + } + + @Override + public int size() { + return nodes.size(); + } + + public String nameAt(int index) { + return names.get(index); + } + + public List getNames() { + return Collections.unmodifiableList(names); + } + + public List getNodes() { + return Collections.unmodifiableList(nodes); + } +} diff --git a/core/src/main/java/io/xpipe/core/data/generic/TupleReader.java b/core/src/main/java/io/xpipe/core/data/generic/TupleReader.java new file mode 100644 index 00000000..e6801a49 --- /dev/null +++ b/core/src/main/java/io/xpipe/core/data/generic/TupleReader.java @@ -0,0 +1,119 @@ +package io.xpipe.core.data.generic; + +import io.xpipe.core.data.DataStructureNode; + +import java.util.ArrayList; +import java.util.List; + +public class TupleReader implements DataStructureNodeReader { + + private final int length; + private final List names; + private final List nodes; + private boolean hasSeenEnd; + private int currentIndex = 0; + private DataStructureNodeReader currentReader; + + public TupleReader(int length) { + this.length = length; + this.names = new ArrayList<>(length); + this.nodes = new ArrayList<>(length); + } + + private void put(String name, DataStructureNode node) { + this.names.add(name); + this.nodes.add(node); + currentIndex++; + } + + private void putNode(DataStructureNode node) { + this.nodes.add(node); + currentIndex++; + } + + private boolean filled() { + return currentIndex == length; + } + + @Override + public void onArrayStart(String name, int length) { + if (currentReader != null) { + currentReader.onArrayStart(name, length); + return; + } + + names.add(name); + currentReader = new ArrayReader(length); + } + + @Override + public void onArrayEnd() { + if (currentReader != null) { + currentReader.onArrayEnd(); + if (currentReader.isDone()) { + putNode(currentReader.create()); + currentReader = null; + } + return; + } + + throw new IllegalStateException(); + } + + @Override + public void onTupleStart(String name, int length) { + if (currentReader != null) { + currentReader.onTupleStart(name, length); + return; + } + + names.add(name); + currentReader = new TupleReader(length); + } + + @Override + public void onTupleEnd() { + if (currentReader != null) { + currentReader.onTupleEnd(); + if (currentReader.isDone()) { + putNode(currentReader.create()); + currentReader = null; + } + return; + } + + if (!filled()) { + throw new IllegalStateException(); + } + + hasSeenEnd = true; + } + + @Override + public void onValue(String name, byte[] value) { + if (currentReader != null) { + currentReader.onValue(name, value); + return; + } + + if (filled()) { + throw new IllegalStateException(); + } + + put(name, ValueNode.wrap(value)); + } + + @Override + public boolean isDone() { + return filled() && hasSeenEnd; + } + + @Override + public DataStructureNode create() { + if (!isDone()) { + throw new IllegalStateException(); + } + + return TupleNode.wrap(names, nodes); + } +} diff --git a/core/src/main/java/io/xpipe/core/data/generic/ValueNode.java b/core/src/main/java/io/xpipe/core/data/generic/ValueNode.java new file mode 100644 index 00000000..7581829d --- /dev/null +++ b/core/src/main/java/io/xpipe/core/data/generic/ValueNode.java @@ -0,0 +1,47 @@ +package io.xpipe.core.data.generic; + +import io.xpipe.core.data.DataStructureNode; +import io.xpipe.core.data.type.DataType; +import io.xpipe.core.data.type.ValueType; + +public class ValueNode extends DataStructureNode { + + private final byte[] data; + + private ValueNode(byte[] data) { + this.data = data; + } + + public static ValueNode wrap(byte[] data) { + return new ValueNode(data); + } + + @Override + public boolean isValue() { + return true; + } + + @Override + public int asInt() { + return Integer.parseInt(asString()); + } + + @Override + public String asString() { + return new String(data); + } + + @Override + protected String getName() { + return "value node"; + } + + @Override + public DataType getDataType() { + return new ValueType(); + } + + public byte[] getRawData() { + return data; + } +} diff --git a/core/src/main/java/io/xpipe/core/data/type/ArrayType.java b/core/src/main/java/io/xpipe/core/data/type/ArrayType.java new file mode 100644 index 00000000..c533fefe --- /dev/null +++ b/core/src/main/java/io/xpipe/core/data/type/ArrayType.java @@ -0,0 +1,58 @@ +package io.xpipe.core.data.type; + +import com.fasterxml.jackson.annotation.JsonTypeName; +import io.xpipe.core.data.type.callback.DataTypeCallback; + +import java.util.List; + +@JsonTypeName("array") +public class ArrayType implements DataType { + + public static ArrayType of(List types) { + if (types.size() == 0) { + return new ArrayType(null); + } + + var first = types.get(0); + var eq = types.stream().allMatch(d -> d.equals(first)); + return new ArrayType(eq ? first : null); + } + + private final DataType sharedType; + + public ArrayType(DataType sharedType) { + this.sharedType = sharedType; + } + + public boolean isSimple() { + return hasSharedType() && getSharedType().isValue(); + } + + public boolean hasSharedType() { + return sharedType != null; + } + + public DataType getSharedType() { + return sharedType; + } + + @Override + public boolean isTuple() { + return false; + } + + @Override + public boolean isArray() { + return true; + } + + @Override + public boolean isValue() { + return false; + } + + @Override + public void traverseType(DataTypeCallback cb) { + cb.onArray(this); + } +} diff --git a/core/src/main/java/io/xpipe/core/data/type/DataType.java b/core/src/main/java/io/xpipe/core/data/type/DataType.java new file mode 100644 index 00000000..6f443437 --- /dev/null +++ b/core/src/main/java/io/xpipe/core/data/type/DataType.java @@ -0,0 +1,16 @@ +package io.xpipe.core.data.type; + +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import io.xpipe.core.data.type.callback.DataTypeCallback; + +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") +public interface DataType { + + boolean isTuple(); + + boolean isArray(); + + boolean isValue(); + + void traverseType(DataTypeCallback cb); +} diff --git a/core/src/main/java/io/xpipe/core/data/type/TupleType.java b/core/src/main/java/io/xpipe/core/data/type/TupleType.java new file mode 100644 index 00000000..134373ac --- /dev/null +++ b/core/src/main/java/io/xpipe/core/data/type/TupleType.java @@ -0,0 +1,69 @@ +package io.xpipe.core.data.type; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonTypeName; +import io.xpipe.core.data.type.callback.DataTypeCallback; + +import java.util.Collections; +import java.util.List; + +@JsonTypeName("tuple") +public class TupleType implements DataType { + + private List names; + private List types; + + @JsonCreator + private TupleType(List names, List types) { + this.names = names; + this.types = types; + } + + public static TupleType empty() { + return new TupleType(List.of(), List.of()); + } + + public static TupleType wrap(List names, List types) { + return new TupleType(names, types); + } + + public static TupleType wrapWithoutNames(List types) { + return new TupleType(Collections.nCopies(types.size(), null), types); + } + + @Override + public boolean isTuple() { + return true; + } + + @Override + public boolean isArray() { + return false; + } + + @Override + public boolean isValue() { + return false; + } + + @Override + public void traverseType(DataTypeCallback cb) { + cb.onTupleBegin(this); + for (var t : types) { + t.traverseType(cb); + } + cb.onTupleEnd(); + } + + public int getSize() { + return types.size(); + } + + public List getNames() { + return names; + } + + public List getTypes() { + return types; + } +} diff --git a/core/src/main/java/io/xpipe/core/data/type/TypedDataStreamReader.java b/core/src/main/java/io/xpipe/core/data/type/TypedDataStreamReader.java new file mode 100644 index 00000000..e606fdce --- /dev/null +++ b/core/src/main/java/io/xpipe/core/data/type/TypedDataStreamReader.java @@ -0,0 +1,95 @@ +package io.xpipe.core.data.type; + +import io.xpipe.core.data.type.callback.TypedDataStreamCallback; + +import java.io.IOException; +import java.io.InputStream; + +public class TypedDataStreamReader { + + private static final int STRUCTURE_ID = 0; + private static final int TUPLE_ID = 1; + private static final int ARRAY_ID = 2; + private static final int VALUE_ID = 3; + + public static boolean hasNext(InputStream in) throws IOException { + var b = in.read(); + if (b == -1) { + return false; + } + + if (b != STRUCTURE_ID) { + throw new IOException("Unexpected value: " + b); + } + + return true; + } + + public static void readStructures(InputStream in, TypedDataStreamCallback cb) throws IOException { + while (true) { + if (!hasNext(in)) { + break; + } + + cb.onNodeBegin(); + read(in, cb); + cb.onNodeEnd(); + } + } + + public static void readStructure(InputStream in, TypedDataStreamCallback cb) throws IOException { + if (!hasNext(in)) { + throw new IllegalStateException("No structure to read"); + } + + cb.onNodeBegin(); + read(in, cb); + cb.onNodeEnd(); + } + + private static void read(InputStream in, TypedDataStreamCallback cb) throws IOException { + var b = in.read(); + + // Skip + if (b == STRUCTURE_ID) { + b = in.read(); + } + + switch (b) { + case TUPLE_ID -> { + readTuple(in, cb); + } + case ARRAY_ID -> { + readArray(in, cb); + } + case VALUE_ID -> { + readValue(in, cb); + } + default -> throw new IllegalStateException("Unexpected value: " + b); + } + } + + private static void readTuple(InputStream in, TypedDataStreamCallback cb) throws IOException { + var size = in.read(); + cb.onTupleBegin(size); + for (int i = 0; i < size; i++) { + read(in, cb); + } + cb.onTupleEnd(); + } + + private static void readArray(InputStream in, TypedDataStreamCallback cb) throws IOException { + var size = in.read(); + cb.onArrayBegin(size); + for (int i = 0; i < size; i++) { + read(in, cb); + } + cb.onArrayEnd(); + } + + private static void readValue(InputStream in, TypedDataStreamCallback cb) throws IOException { + var size = in.read(); + var data = in.readNBytes(size); + cb.onValue(data); + } +} diff --git a/core/src/main/java/io/xpipe/core/data/type/TypedDataStreamWriter.java b/core/src/main/java/io/xpipe/core/data/type/TypedDataStreamWriter.java new file mode 100644 index 00000000..e7b639ef --- /dev/null +++ b/core/src/main/java/io/xpipe/core/data/type/TypedDataStreamWriter.java @@ -0,0 +1,58 @@ +package io.xpipe.core.data.type; + +import io.xpipe.core.data.DataStructureNode; +import io.xpipe.core.data.generic.ArrayNode; +import io.xpipe.core.data.generic.TupleNode; +import io.xpipe.core.data.generic.ValueNode; + +import java.io.IOException; +import java.io.OutputStream; + +public class TypedDataStreamWriter { + + private static final int STRUCTURE_ID = 0; + private static final int TUPLE_ID = 1; + private static final int ARRAY_ID = 2; + private static final int VALUE_ID = 3; + + public static void writeStructure(OutputStream out, DataStructureNode node) throws IOException { + out.write(STRUCTURE_ID); + write(out, node); + } + + private static void write(OutputStream out, DataStructureNode node) throws IOException { + if (node.isTuple()) { + writeTuple(out, (TupleNode) node); + } + else if (node.isArray()) { + writeArray(out, (ArrayNode) node); + } + else if (node.isValue()) { + writeValue(out, (ValueNode) node); + } else { + throw new AssertionError(); + } + } + + private static void writeValue(OutputStream out, ValueNode n) throws IOException { + out.write(VALUE_ID); + out.write(n.getRawData().length); + out.write(n.getRawData()); + } + + private static void writeTuple(OutputStream out, TupleNode tuple) throws IOException { + out.write(TUPLE_ID); + out.write(tuple.size()); + for (int i = 0; i < tuple.size(); i++) { + write(out, tuple.at(i)); + } + } + + private static void writeArray(OutputStream out, ArrayNode array) throws IOException { + out.write(ARRAY_ID); + out.write(array.size()); + for (int i = 0; i < array.size(); i++) { + write(out, array.at(i)); + } + } +} diff --git a/core/src/main/java/io/xpipe/core/data/type/ValueType.java b/core/src/main/java/io/xpipe/core/data/type/ValueType.java new file mode 100644 index 00000000..c369b979 --- /dev/null +++ b/core/src/main/java/io/xpipe/core/data/type/ValueType.java @@ -0,0 +1,30 @@ +package io.xpipe.core.data.type; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonTypeName; +import io.xpipe.core.data.type.callback.DataTypeCallback; + +@JsonTypeName("value") +public class ValueType implements DataType { + + @Override + public boolean isTuple() { + return false; + } + + @JsonIgnore + @Override + public boolean isArray() { + return false; + } + + @Override + public boolean isValue() { + return true; + } + + @Override + public void traverseType(DataTypeCallback cb) { + cb.onValue(); + } +} diff --git a/core/src/main/java/io/xpipe/core/data/type/callback/DataTypeCallback.java b/core/src/main/java/io/xpipe/core/data/type/callback/DataTypeCallback.java new file mode 100644 index 00000000..10087639 --- /dev/null +++ b/core/src/main/java/io/xpipe/core/data/type/callback/DataTypeCallback.java @@ -0,0 +1,42 @@ +package io.xpipe.core.data.type.callback; + +import io.xpipe.core.data.type.ArrayType; +import io.xpipe.core.data.type.DataType; +import io.xpipe.core.data.type.TupleType; +import io.xpipe.core.data.type.ValueType; + +import java.util.function.Consumer; + +public interface DataTypeCallback { + + public static DataTypeCallback flatten(Consumer typeConsumer) { + return new DataTypeCallback() { + @Override + public void onValue() { + typeConsumer.accept(new ValueType()); + } + + @Override + public void onTupleBegin(TupleType tuple) { + typeConsumer.accept(tuple); + } + + @Override + public void onArray(ArrayType type) { + typeConsumer.accept(type); + } + }; + } + + default void onValue() { + } + + default void onTupleBegin(TupleType tuple) { + } + + default void onTupleEnd() { + } + + default void onArray(ArrayType type) { + } +} diff --git a/core/src/main/java/io/xpipe/core/data/type/callback/DataTypeCallbacks.java b/core/src/main/java/io/xpipe/core/data/type/callback/DataTypeCallbacks.java new file mode 100644 index 00000000..4ecf24e8 --- /dev/null +++ b/core/src/main/java/io/xpipe/core/data/type/callback/DataTypeCallbacks.java @@ -0,0 +1,61 @@ +package io.xpipe.core.data.type.callback; + +import io.xpipe.core.data.generic.DataStructureNodePointer; +import io.xpipe.core.data.type.ArrayType; +import io.xpipe.core.data.type.TupleType; + +import java.util.Stack; +import java.util.function.BiConsumer; +import java.util.function.Consumer; + +public class DataTypeCallbacks { + + public static DataTypeCallback visitTuples(Consumer newTuple, Runnable endTuple, BiConsumer newValue) { + return new DataTypeCallback() { + + private final Stack keyNames = new Stack<>(); + private final Stack builders = new Stack<>(); + + { + builders.push(DataStructureNodePointer.builder()); + } + + private boolean isOnTopLevel() { + return keyNames.size() == 0; + } + + @Override + public void onTupleBegin(TupleType tuple) { + if (!isOnTopLevel()) { + newTuple.accept(keyNames.peek()); + } + tuple.getNames().forEach(n -> { + keyNames.push(n); + builders.push(builders.peek().copy().name(n)); + tuple.getTypes().forEach(dt -> dt.traverseType(this)); + }); + } + + @Override + public void onValue() { + newValue.accept(keyNames.peek(), builders.peek().build()); + keyNames.pop(); + builders.pop(); + } + + @Override + public void onTupleEnd() { + endTuple.run(); + } + + @Override + public void onArray(ArrayType type) { + if (!type.isSimple()) { + throw new IllegalStateException(); + } + + newValue.accept(keyNames.peek(), builders.peek().build()); + } + }; + } +} diff --git a/core/src/main/java/io/xpipe/core/data/type/callback/FlatArrayTypeCallback.java b/core/src/main/java/io/xpipe/core/data/type/callback/FlatArrayTypeCallback.java new file mode 100644 index 00000000..b8d962ef --- /dev/null +++ b/core/src/main/java/io/xpipe/core/data/type/callback/FlatArrayTypeCallback.java @@ -0,0 +1,63 @@ +package io.xpipe.core.data.type.callback; + +import io.xpipe.core.data.type.TupleType; + +public class FlatArrayTypeCallback implements DataTypeCallback { + + private final FlatCallback cb; + private int arrayDepth = 0; + + public FlatArrayTypeCallback(FlatCallback cb) { + this.cb = cb; + } + + private boolean isInArray() { + return arrayDepth > 0; + } + + @Override + public void onValue() { + if (isInArray()) { + return; + } + + cb.onValue(); + } + + @Override + public void onTupleBegin(TupleType tuple) { + if (isInArray()) { + throw new IllegalStateException(); + } + + cb.onTupleBegin(tuple); + } + + @Override + public void onTupleEnd() { + cb.onTupleEnd(); + } + + public void onArray() { + if (isInArray()) { + throw new IllegalStateException(); + } + + arrayDepth++; + } + + public static interface FlatCallback { + + default void onValue() { + } + + default void onTupleBegin(TupleType tuple) { + } + + default void onTupleEnd() { + } + + default void onFlatArray() { + } + } +} diff --git a/core/src/main/java/io/xpipe/core/data/type/callback/ReusableTypedDataStructureNodeCallback.java b/core/src/main/java/io/xpipe/core/data/type/callback/ReusableTypedDataStructureNodeCallback.java new file mode 100644 index 00000000..19099e04 --- /dev/null +++ b/core/src/main/java/io/xpipe/core/data/type/callback/ReusableTypedDataStructureNodeCallback.java @@ -0,0 +1,29 @@ +package io.xpipe.core.data.type.callback; + +public class ReusableTypedDataStructureNodeCallback implements TypedDataStreamCallback { + + @Override + public void onValue(byte[] data) { + TypedDataStreamCallback.super.onValue(data); + } + + @Override + public void onTupleBegin(int size) { + TypedDataStreamCallback.super.onTupleBegin(size); + } + + @Override + public void onTupleEnd() { + TypedDataStreamCallback.super.onTupleEnd(); + } + + @Override + public void onArrayBegin(int size) { + TypedDataStreamCallback.super.onArrayBegin(size); + } + + @Override + public void onArrayEnd() { + TypedDataStreamCallback.super.onArrayEnd(); + } +} diff --git a/core/src/main/java/io/xpipe/core/data/type/callback/TableTypeCallback.java b/core/src/main/java/io/xpipe/core/data/type/callback/TableTypeCallback.java new file mode 100644 index 00000000..dbc27bb5 --- /dev/null +++ b/core/src/main/java/io/xpipe/core/data/type/callback/TableTypeCallback.java @@ -0,0 +1,80 @@ +package io.xpipe.core.data.type.callback; + +import io.xpipe.core.data.generic.DataStructureNodePointer; +import io.xpipe.core.data.type.ArrayType; +import io.xpipe.core.data.type.TupleType; + +import java.util.Stack; +import java.util.function.BiConsumer; +import java.util.function.Consumer; + +public class TableTypeCallback implements DataTypeCallback { + + private final Stack tuples = new Stack<>(); + private final Stack keyIndices = new Stack<>(); + private final Consumer newTuple; + private final Runnable endTuple; + private final BiConsumer newValue; + + private TableTypeCallback(Consumer newTuple, Runnable endTuple, BiConsumer newValue) { + this.newTuple = newTuple; + this.endTuple = endTuple; + this.newValue = newValue; + } + + public static DataTypeCallback create(Consumer newTuple, Runnable endTuple, BiConsumer newValue) { + return new TableTypeCallback(newTuple, endTuple, newValue); + } + + private boolean isOnTopLevel() { + return tuples.size() <= 1; + } + + private void onAnyValue() { + var pointer = DataStructureNodePointer.builder(); + for (int index : keyIndices) { + pointer.index(index); + } + var p = pointer.build(); + newValue.accept(tuples.peek().getNames().get(keyIndices.peek()), p); + + moveIndex(); + } + + private void moveIndex() { + var index = keyIndices.pop(); + index++; + keyIndices.push(index); + } + + @Override + public void onValue() { + onAnyValue(); + } + + @Override + public void onTupleBegin(TupleType tuple) { + if (!isOnTopLevel()) { + moveIndex(); + } + + tuples.push(tuple); + keyIndices.push(0); + + if (!isOnTopLevel()) { + newTuple.accept(tuples.peek().getNames().get(keyIndices.peek())); + } + } + + @Override + public void onTupleEnd() { + endTuple.run(); + tuples.pop(); + keyIndices.pop(); + } + + @Override + public void onArray(ArrayType type) { + onAnyValue(); + } +} diff --git a/core/src/main/java/io/xpipe/core/data/type/callback/TypedDataStreamCallback.java b/core/src/main/java/io/xpipe/core/data/type/callback/TypedDataStreamCallback.java new file mode 100644 index 00000000..8efbf877 --- /dev/null +++ b/core/src/main/java/io/xpipe/core/data/type/callback/TypedDataStreamCallback.java @@ -0,0 +1,25 @@ +package io.xpipe.core.data.type.callback; + +public interface TypedDataStreamCallback { + + default void onValue(byte[] data) { + } + + default void onTupleBegin(int size) { + } + + default void onTupleEnd() { + } + + default void onArrayBegin(int size) { + } + + default void onArrayEnd() { + } + + default void onNodeBegin() { + } + + default void onNodeEnd() { + } +} diff --git a/core/src/main/java/io/xpipe/core/data/type/callback/TypedDataStructureNodeCallback.java b/core/src/main/java/io/xpipe/core/data/type/callback/TypedDataStructureNodeCallback.java new file mode 100644 index 00000000..2061f700 --- /dev/null +++ b/core/src/main/java/io/xpipe/core/data/type/callback/TypedDataStructureNodeCallback.java @@ -0,0 +1,132 @@ +package io.xpipe.core.data.type.callback; + +import io.xpipe.core.data.DataStructureNode; +import io.xpipe.core.data.generic.ArrayNode; +import io.xpipe.core.data.generic.TupleNode; +import io.xpipe.core.data.generic.ValueNode; +import io.xpipe.core.data.type.DataType; +import io.xpipe.core.data.type.TupleType; + +import java.util.ArrayList; +import java.util.List; +import java.util.Stack; +import java.util.function.Consumer; + +public class TypedDataStructureNodeCallback implements TypedDataStreamCallback { + + private final List flattened; + private int dataTypeIndex; + private Stack> children; + private Stack nodes; + private DataStructureNode readNode; + private final Consumer consumer; + + public TypedDataStructureNodeCallback(DataType type, Consumer consumer) { + this.consumer = consumer; + flattened = new ArrayList<>(); + children = new Stack<>(); + nodes = new Stack<>(); + type.traverseType(DataTypeCallback.flatten(d -> flattened.add(d))); + } + + @Override + public void onNodeBegin() { + if (nodes.size() != 0 || children.size() != 0) { + throw new IllegalStateException(); + } + + dataTypeIndex = 0; + readNode = null; + } + + @Override + public void onNodeEnd() { + if (nodes.size() != 0 || children.size() != 0 || readNode == null) { + throw new IllegalStateException(); + } + + consumer.accept(readNode); + } + + @Override + public void onValue(byte[] data) { + children.peek().add(ValueNode.wrap(data)); + if (!flattened.get(dataTypeIndex).isArray()) { + dataTypeIndex++; + } + } + + protected void newTuple() { + TupleType tupleType = (TupleType) flattened.get(dataTypeIndex); + var l = new ArrayList(tupleType.getSize()); + children.push(l); + var newNode = TupleNode.wrap(tupleType.getNames(), l); + nodes.push(newNode); + } + + protected void newArray() { + var l = new ArrayList(); + children.push(new ArrayList<>()); + var newNode = ArrayNode.wrap(l); + nodes.push(newNode); + } + + private void finishTuple() { + children.pop(); + dataTypeIndex++; + var popped = nodes.pop(); + if (!popped.isTuple()) { + throw new IllegalStateException(); + } + + TupleNode tuple = (TupleNode) popped; + if (tuple.getNames().size() != tuple.getNodes().size()) { + throw new IllegalStateException(""); + } + + if (nodes.empty()) { + readNode = popped; + } else { + children.peek().add(popped); + } + } + + private void finishArray() { + children.pop(); + dataTypeIndex++; + var popped = nodes.pop(); + if (nodes.empty()) { + readNode = popped; + } else { + children.peek().add(popped); + } + } + + @Override + public void onTupleBegin(int size) { + if (!flattened.get(dataTypeIndex).isTuple()) { + throw new IllegalStateException(); + } + + newTuple(); + } + + @Override + public void onTupleEnd() { + finishTuple(); + } + + @Override + public void onArrayBegin(int size) { + if (!flattened.get(dataTypeIndex).isArray()) { + throw new IllegalStateException(); + } + + newArray(); + } + + @Override + public void onArrayEnd() { + finishArray(); + } +} diff --git a/core/src/main/java/io/xpipe/core/source/DataSource.java b/core/src/main/java/io/xpipe/core/source/DataSource.java new file mode 100644 index 00000000..b7373d39 --- /dev/null +++ b/core/src/main/java/io/xpipe/core/source/DataSource.java @@ -0,0 +1,14 @@ +package io.xpipe.core.source; + +import io.xpipe.core.store.DataStore; + +import java.util.Optional; + +public interface DataSource { + + default Optional determineDefaultName(DS store) { + return Optional.empty(); + } + + DataSourceType getType(); +} diff --git a/core/src/main/java/io/xpipe/core/source/DataSourceConnection.java b/core/src/main/java/io/xpipe/core/source/DataSourceConnection.java new file mode 100644 index 00000000..33e7c384 --- /dev/null +++ b/core/src/main/java/io/xpipe/core/source/DataSourceConnection.java @@ -0,0 +1,8 @@ +package io.xpipe.core.source; + +public interface DataSourceConnection extends AutoCloseable { + + void init() throws Exception; + + void close() throws Exception; +} diff --git a/core/src/main/java/io/xpipe/core/source/DataSourceId.java b/core/src/main/java/io/xpipe/core/source/DataSourceId.java new file mode 100644 index 00000000..6f6bc01a --- /dev/null +++ b/core/src/main/java/io/xpipe/core/source/DataSourceId.java @@ -0,0 +1,55 @@ +package io.xpipe.core.source; + +import com.fasterxml.jackson.annotation.JsonCreator; + +public class DataSourceId { + + public static final char SEPARATOR = ':'; + + private final String collectionName; + private final String entryName; + + @JsonCreator + public DataSourceId(String collectionName, String entryName) { + this.collectionName = collectionName; + this.entryName = entryName; + } + + public DataSourceId withEntryName(String newName) { + return new DataSourceId(collectionName, newName); + } + + public static DataSourceId fromString(String s) { + var split = s.split(String.valueOf(SEPARATOR)); + if (split.length != 2) { + throw new IllegalArgumentException(); + } + + if (split[1].length() == 0) { + throw new IllegalArgumentException(); + } + + return new DataSourceId(split[0].length() > 0 ? split[0] : null, split[1]); + } + + public boolean hasCollection() { + return collectionName != null; + } + + @Override + public String toString() { + return (collectionName != null ? collectionName : "") + SEPARATOR + entryName; + } + + public String toReferenceValue() { + return toString().toLowerCase(); + } + + public String getCollectionName() { + return collectionName; + } + + public String getEntryName() { + return entryName; + } +} diff --git a/core/src/main/java/io/xpipe/core/source/DataSourceType.java b/core/src/main/java/io/xpipe/core/source/DataSourceType.java new file mode 100644 index 00000000..e10894aa --- /dev/null +++ b/core/src/main/java/io/xpipe/core/source/DataSourceType.java @@ -0,0 +1,12 @@ +package io.xpipe.core.source; + +import com.fasterxml.jackson.annotation.JsonProperty; + +public enum DataSourceType { + + @JsonProperty("table") + TABLE, + + @JsonProperty("structure") + STRUCTURE +} diff --git a/core/src/main/java/io/xpipe/core/source/DataStructureConnection.java b/core/src/main/java/io/xpipe/core/source/DataStructureConnection.java new file mode 100644 index 00000000..b60a308f --- /dev/null +++ b/core/src/main/java/io/xpipe/core/source/DataStructureConnection.java @@ -0,0 +1,5 @@ +package io.xpipe.core.source; + +public interface DataStructureConnection extends DataSourceConnection { + +} diff --git a/core/src/main/java/io/xpipe/core/source/DataStructureSource.java b/core/src/main/java/io/xpipe/core/source/DataStructureSource.java new file mode 100644 index 00000000..1b3cc67a --- /dev/null +++ b/core/src/main/java/io/xpipe/core/source/DataStructureSource.java @@ -0,0 +1,11 @@ +package io.xpipe.core.source; + +public abstract class DataStructureSource implements DataSource { + + public abstract DataSourceConnection openConnection() throws Exception; + + @Override + public DataSourceType getType() { + return DataSourceType.STRUCTURE; + } +} diff --git a/core/src/main/java/io/xpipe/core/source/DataTableConnection.java b/core/src/main/java/io/xpipe/core/source/DataTableConnection.java new file mode 100644 index 00000000..fe6be84e --- /dev/null +++ b/core/src/main/java/io/xpipe/core/source/DataTableConnection.java @@ -0,0 +1,22 @@ +package io.xpipe.core.source; + + +import io.xpipe.core.data.DataStructureNodeAcceptor; +import io.xpipe.core.data.generic.ArrayNode; +import io.xpipe.core.data.generic.TupleNode; +import io.xpipe.core.data.type.TupleType; + +import java.io.OutputStream; + +public interface DataTableConnection extends DataSourceConnection { + + TupleType determineDataType() throws Exception; + + int determineRowCount() throws Exception; + + void withLines(DataStructureNodeAcceptor lineAcceptor) throws Exception; + + ArrayNode readLines(int maxLines) throws Exception; + + void forwardLines(OutputStream out, int maxLines) throws Exception; +} diff --git a/core/src/main/java/io/xpipe/core/source/DataTableSource.java b/core/src/main/java/io/xpipe/core/source/DataTableSource.java new file mode 100644 index 00000000..b4348a6b --- /dev/null +++ b/core/src/main/java/io/xpipe/core/source/DataTableSource.java @@ -0,0 +1,15 @@ +package io.xpipe.core.source; + +import io.xpipe.core.store.DataStore; + +public abstract class DataTableSource implements DataSource { + + public abstract DataTableWriteConnection openWriteConnection(DS store); + + public abstract DataTableConnection openConnection(DS store); + + @Override + public DataSourceType getType() { + return DataSourceType.TABLE; + } +} diff --git a/core/src/main/java/io/xpipe/core/source/DataTableWriteConnection.java b/core/src/main/java/io/xpipe/core/source/DataTableWriteConnection.java new file mode 100644 index 00000000..4256043f --- /dev/null +++ b/core/src/main/java/io/xpipe/core/source/DataTableWriteConnection.java @@ -0,0 +1,12 @@ +package io.xpipe.core.source; + +import io.xpipe.core.data.DataStructureNodeAcceptor; +import io.xpipe.core.data.generic.ArrayNode; +import io.xpipe.core.data.generic.TupleNode; + +public interface DataTableWriteConnection extends DataSourceConnection { + + DataStructureNodeAcceptor writeLinesAcceptor(); + + void writeLines(ArrayNode lines) throws Exception; +} diff --git a/core/src/main/java/io/xpipe/core/store/DataStore.java b/core/src/main/java/io/xpipe/core/store/DataStore.java new file mode 100644 index 00000000..104586f5 --- /dev/null +++ b/core/src/main/java/io/xpipe/core/store/DataStore.java @@ -0,0 +1,18 @@ +package io.xpipe.core.store; + +import com.fasterxml.jackson.annotation.JsonTypeInfo; + +import java.time.Instant; +import java.util.Optional; + +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") +public interface DataStore { + + default Optional determineDefaultName() { + return Optional.empty(); + } + + default Optional getLastModified() { + return Optional.empty(); + } +} diff --git a/core/src/main/java/io/xpipe/core/store/FileDataInput.java b/core/src/main/java/io/xpipe/core/store/FileDataInput.java new file mode 100644 index 00000000..60628c1a --- /dev/null +++ b/core/src/main/java/io/xpipe/core/store/FileDataInput.java @@ -0,0 +1,17 @@ +package io.xpipe.core.store; + +import com.fasterxml.jackson.annotation.JsonIgnore; + +public abstract class FileDataInput implements StreamDataStore { + + public abstract String getName(); + + @JsonIgnore + public abstract boolean isLocal(); + + @JsonIgnore + public abstract LocalFileDataInput getLocal(); + + @JsonIgnore + public abstract RemoteFileDataInput getRemote(); +} diff --git a/core/src/main/java/io/xpipe/core/store/InputStreamDataStore.java b/core/src/main/java/io/xpipe/core/store/InputStreamDataStore.java new file mode 100644 index 00000000..f331cedc --- /dev/null +++ b/core/src/main/java/io/xpipe/core/store/InputStreamDataStore.java @@ -0,0 +1,23 @@ +package io.xpipe.core.store; + +import java.io.InputStream; +import java.io.OutputStream; + +public abstract class InputStreamDataStore implements StreamDataStore { + + private final InputStream in; + + public InputStreamDataStore(InputStream in) { + this.in = in; + } + + @Override + public InputStream openInput() throws Exception { + return in; + } + + @Override + public OutputStream openOutput() throws Exception { + throw new UnsupportedOperationException(); + } +} diff --git a/core/src/main/java/io/xpipe/core/store/LocalFileDataInput.java b/core/src/main/java/io/xpipe/core/store/LocalFileDataInput.java new file mode 100644 index 00000000..1ea588d5 --- /dev/null +++ b/core/src/main/java/io/xpipe/core/store/LocalFileDataInput.java @@ -0,0 +1,73 @@ +package io.xpipe.core.store; + +import com.fasterxml.jackson.annotation.*; +import org.apache.commons.io.FilenameUtils; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.Instant; +import java.util.Optional; + +@JsonTypeName("local") +public class LocalFileDataInput extends FileDataInput { + + private final Path file; + + @JsonCreator + public LocalFileDataInput(Path file) { + this.file = file; + } + + @Override + public Optional determineDefaultName() { + return Optional.of(FilenameUtils.getBaseName(file.toString())); + } + + @Override + public Optional getLastModified() { + try { + var l = Files.getLastModifiedTime(file); + return Optional.of(l.toInstant()); + } catch (IOException e) { + return Optional.empty(); + } + } + + @Override + public String getName() { + return file.getFileName().toString(); + } + + @Override + @JsonIgnore + public boolean isLocal() { + return true; + } + + @Override + public LocalFileDataInput getLocal() { + return this; + } + + @Override + public RemoteFileDataInput getRemote() { + throw new UnsupportedOperationException(); + } + + public Path getFile() { + return file; + } + + @Override + public InputStream openInput() throws Exception { + return Files.newInputStream(file); + } + + @Override + public OutputStream openOutput() throws Exception { + return Files.newOutputStream(file); + } +} diff --git a/core/src/main/java/io/xpipe/core/store/RemoteFileDataInput.java b/core/src/main/java/io/xpipe/core/store/RemoteFileDataInput.java new file mode 100644 index 00000000..36258101 --- /dev/null +++ b/core/src/main/java/io/xpipe/core/store/RemoteFileDataInput.java @@ -0,0 +1,49 @@ +package io.xpipe.core.store; + +import java.io.InputStream; +import java.io.OutputStream; +import java.time.Instant; +import java.util.Optional; + +public class RemoteFileDataInput extends FileDataInput { + + @Override + public Optional determineDefaultName() { + return Optional.empty(); + } + + @Override + public Optional getLastModified() { + return Optional.empty(); + } + + @Override + public String getName() { + return null; + } + + @Override + public boolean isLocal() { + return false; + } + + @Override + public LocalFileDataInput getLocal() { + return null; + } + + @Override + public RemoteFileDataInput getRemote() { + return null; + } + + @Override + public InputStream openInput() throws Exception { + return null; + } + + @Override + public OutputStream openOutput() throws Exception { + return null; + } +} diff --git a/core/src/main/java/io/xpipe/core/store/StreamDataStore.java b/core/src/main/java/io/xpipe/core/store/StreamDataStore.java new file mode 100644 index 00000000..2c7396ae --- /dev/null +++ b/core/src/main/java/io/xpipe/core/store/StreamDataStore.java @@ -0,0 +1,11 @@ +package io.xpipe.core.store; + +import java.io.InputStream; +import java.io.OutputStream; + +public interface StreamDataStore extends DataStore { + + InputStream openInput() throws Exception; + + OutputStream openOutput() throws Exception; +} diff --git a/core/src/main/java/io/xpipe/core/util/CoreJacksonModule.java b/core/src/main/java/io/xpipe/core/util/CoreJacksonModule.java new file mode 100644 index 00000000..6a156f60 --- /dev/null +++ b/core/src/main/java/io/xpipe/core/util/CoreJacksonModule.java @@ -0,0 +1,72 @@ +package io.xpipe.core.util; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.JsonDeserializer; +import com.fasterxml.jackson.databind.JsonSerializer; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.fasterxml.jackson.databind.module.SimpleModule; +import io.xpipe.core.data.type.ArrayType; +import io.xpipe.core.data.type.TupleType; +import io.xpipe.core.data.type.ValueType; +import io.xpipe.core.store.LocalFileDataInput; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.file.Path; + +public class CoreJacksonModule extends SimpleModule { + + public static class CharsetSerializer extends JsonSerializer { + + @Override + public void serialize(Charset value, JsonGenerator jgen, SerializerProvider provider) + throws IOException, JsonProcessingException { + jgen.writeString(value.name()); + } + } + + public static class CharsetDeserializer extends JsonDeserializer { + + @Override + public Charset deserialize(JsonParser p, DeserializationContext ctxt) throws IOException, JsonProcessingException { + return Charset.forName(p.getValueAsString()); + } + } + + public static class LocalPathSerializer extends JsonSerializer { + + @Override + public void serialize(Path value, JsonGenerator jgen, SerializerProvider provider) + throws IOException, JsonProcessingException { + jgen.writeString(value.toString()); + } + } + + public static class LocalPathDeserializer extends JsonDeserializer { + + @Override + public Path deserialize(JsonParser p, DeserializationContext ctxt) throws IOException, JsonProcessingException { + return Path.of(p.getValueAsString()); + } + } + + @Override + public void setupModule(SetupContext context) { + context.registerSubtypes( + new NamedType(LocalFileDataInput.class), + new NamedType(ValueType.class), + new NamedType(TupleType.class), + new NamedType(ArrayType.class) + ); + + addSerializer(Charset.class, new CharsetSerializer()); + addDeserializer(Charset.class, new CharsetDeserializer()); + + addSerializer(Path.class, new LocalPathSerializer()); + addDeserializer(Path.class, new LocalPathDeserializer()); + } +} diff --git a/core/src/main/java/io/xpipe/core/util/JacksonHelper.java b/core/src/main/java/io/xpipe/core/util/JacksonHelper.java new file mode 100644 index 00000000..cf6ae4be --- /dev/null +++ b/core/src/main/java/io/xpipe/core/util/JacksonHelper.java @@ -0,0 +1,48 @@ +package io.xpipe.core.util; + +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; + +import java.util.ArrayList; +import java.util.List; +import java.util.ServiceLoader; + +public class JacksonHelper { + + private static final ObjectMapper INSTANCE = new ObjectMapper(); + private static boolean init = false; + + public static synchronized void init(ModuleLayer layer) { + ObjectMapper objectMapper = INSTANCE; + objectMapper.enable(SerializationFeature.INDENT_OUTPUT); + + objectMapper.registerModules(findModules(layer)); + objectMapper.setVisibility(objectMapper.getSerializationConfig().getDefaultVisibilityChecker() + .withFieldVisibility(JsonAutoDetect.Visibility.ANY) + .withGetterVisibility(JsonAutoDetect.Visibility.NONE) + .withSetterVisibility(JsonAutoDetect.Visibility.NONE) + .withCreatorVisibility(JsonAutoDetect.Visibility.NONE) + .withIsGetterVisibility(JsonAutoDetect.Visibility.NONE)); + init = true; + } + + private static List findModules(ModuleLayer layer) + { + ArrayList modules = new ArrayList(); + ServiceLoader loader = ServiceLoader.load(layer, Module.class); + for (Module module : loader) { + modules.add(module); + } + return modules; + } + + public static ObjectMapper newMapper() { + if (!init) { + throw new IllegalStateException("Not initialized"); + } + + return INSTANCE.copy(); + } +} diff --git a/core/src/main/java/module-info.java b/core/src/main/java/module-info.java new file mode 100644 index 00000000..fe7fcacf --- /dev/null +++ b/core/src/main/java/module-info.java @@ -0,0 +1,28 @@ +import io.xpipe.core.util.CoreJacksonModule; + +module io.xpipe.core { + requires com.fasterxml.jackson.core; + requires com.fasterxml.jackson.databind; + requires com.fasterxml.jackson.module.paramnames; + + exports io.xpipe.core.store; + exports io.xpipe.core.source; + exports io.xpipe.core.data.generic; + exports io.xpipe.core.data.type; + + opens io.xpipe.core.store; + opens io.xpipe.core.source; + opens io.xpipe.core.data.type; + opens io.xpipe.core.data.generic; + exports io.xpipe.core.data.type.callback; + opens io.xpipe.core.data.type.callback; + exports io.xpipe.core.data; + opens io.xpipe.core.data; + exports io.xpipe.core.util; + + uses com.fasterxml.jackson.databind.Module; + provides com.fasterxml.jackson.databind.Module with CoreJacksonModule; + + requires org.apache.commons.lang; + requires org.apache.commons.io; +} \ No newline at end of file diff --git a/deps b/deps new file mode 160000 index 00000000..e7f63e92 --- /dev/null +++ b/deps @@ -0,0 +1 @@ +Subproject commit e7f63e92d05537cee82e320a2017ddc26b9e3d3e diff --git a/gradle/wrapper/gradle-wrapper.jar b/gradle/wrapper/gradle-wrapper.jar new file mode 100644 index 00000000..7454180f Binary files /dev/null and b/gradle/wrapper/gradle-wrapper.jar differ diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties new file mode 100644 index 00000000..e750102e --- /dev/null +++ b/gradle/wrapper/gradle-wrapper.properties @@ -0,0 +1,5 @@ +distributionBase=GRADLE_USER_HOME +distributionPath=wrapper/dists +distributionUrl=https\://services.gradle.org/distributions/gradle-7.3-bin.zip +zipStoreBase=GRADLE_USER_HOME +zipStorePath=wrapper/dists diff --git a/gradlew b/gradlew new file mode 100644 index 00000000..744e882e --- /dev/null +++ b/gradlew @@ -0,0 +1,185 @@ +#!/usr/bin/env sh + +# +# Copyright 2015 the original author or authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +############################################################################## +## +## Gradle start up script for UN*X +## +############################################################################## + +# Attempt to set APP_HOME +# Resolve links: $0 may be a link +PRG="$0" +# Need this for relative symlinks. +while [ -h "$PRG" ] ; do + ls=`ls -ld "$PRG"` + link=`expr "$ls" : '.*-> \(.*\)$'` + if expr "$link" : '/.*' > /dev/null; then + PRG="$link" + else + PRG=`dirname "$PRG"`"/$link" + fi +done +SAVED="`pwd`" +cd "`dirname \"$PRG\"`/" >/dev/null +APP_HOME="`pwd -P`" +cd "$SAVED" >/dev/null + +APP_NAME="Gradle" +APP_BASE_NAME=`basename "$0"` + +# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"' + +# Use the maximum available, or set MAX_FD != -1 to use that value. +MAX_FD="maximum" + +warn () { + echo "$*" +} + +die () { + echo + echo "$*" + echo + exit 1 +} + +# OS specific support (must be 'true' or 'false'). +cygwin=false +msys=false +darwin=false +nonstop=false +case "`uname`" in + CYGWIN* ) + cygwin=true + ;; + Darwin* ) + darwin=true + ;; + MSYS* | MINGW* ) + msys=true + ;; + NONSTOP* ) + nonstop=true + ;; +esac + +CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar + + +# Determine the Java command to use to start the JVM. +if [ -n "$JAVA_HOME" ] ; then + if [ -x "$JAVA_HOME/jre/sh/java" ] ; then + # IBM's JDK on AIX uses strange locations for the executables + JAVACMD="$JAVA_HOME/jre/sh/java" + else + JAVACMD="$JAVA_HOME/bin/java" + fi + if [ ! -x "$JAVACMD" ] ; then + die "ERROR: JAVA_HOME is set to an invalid directory: $JAVA_HOME + +Please set the JAVA_HOME variable in your environment to match the +location of your Java installation." + fi +else + JAVACMD="java" + which java >/dev/null 2>&1 || die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. + +Please set the JAVA_HOME variable in your environment to match the +location of your Java installation." +fi + +# Increase the maximum file descriptors if we can. +if [ "$cygwin" = "false" -a "$darwin" = "false" -a "$nonstop" = "false" ] ; then + MAX_FD_LIMIT=`ulimit -H -n` + if [ $? -eq 0 ] ; then + if [ "$MAX_FD" = "maximum" -o "$MAX_FD" = "max" ] ; then + MAX_FD="$MAX_FD_LIMIT" + fi + ulimit -n $MAX_FD + if [ $? -ne 0 ] ; then + warn "Could not set maximum file descriptor limit: $MAX_FD" + fi + else + warn "Could not query maximum file descriptor limit: $MAX_FD_LIMIT" + fi +fi + +# For Darwin, add options to specify how the application appears in the dock +if $darwin; then + GRADLE_OPTS="$GRADLE_OPTS \"-Xdock:name=$APP_NAME\" \"-Xdock:icon=$APP_HOME/media/gradle.icns\"" +fi + +# For Cygwin or MSYS, switch paths to Windows format before running java +if [ "$cygwin" = "true" -o "$msys" = "true" ] ; then + APP_HOME=`cygpath --path --mixed "$APP_HOME"` + CLASSPATH=`cygpath --path --mixed "$CLASSPATH"` + + JAVACMD=`cygpath --unix "$JAVACMD"` + + # We build the pattern for arguments to be converted via cygpath + ROOTDIRSRAW=`find -L / -maxdepth 1 -mindepth 1 -type d 2>/dev/null` + SEP="" + for dir in $ROOTDIRSRAW ; do + ROOTDIRS="$ROOTDIRS$SEP$dir" + SEP="|" + done + OURCYGPATTERN="(^($ROOTDIRS))" + # Add a user-defined pattern to the cygpath arguments + if [ "$GRADLE_CYGPATTERN" != "" ] ; then + OURCYGPATTERN="$OURCYGPATTERN|($GRADLE_CYGPATTERN)" + fi + # Now convert the arguments - kludge to limit ourselves to /bin/sh + i=0 + for arg in "$@" ; do + CHECK=`echo "$arg"|egrep -c "$OURCYGPATTERN" -` + CHECK2=`echo "$arg"|egrep -c "^-"` ### Determine if an option + + if [ $CHECK -ne 0 ] && [ $CHECK2 -eq 0 ] ; then ### Added a condition + eval `echo args$i`=`cygpath --path --ignore --mixed "$arg"` + else + eval `echo args$i`="\"$arg\"" + fi + i=`expr $i + 1` + done + case $i in + 0) set -- ;; + 1) set -- "$args0" ;; + 2) set -- "$args0" "$args1" ;; + 3) set -- "$args0" "$args1" "$args2" ;; + 4) set -- "$args0" "$args1" "$args2" "$args3" ;; + 5) set -- "$args0" "$args1" "$args2" "$args3" "$args4" ;; + 6) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" ;; + 7) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" ;; + 8) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" ;; + 9) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" "$args8" ;; + esac +fi + +# Escape application args +save () { + for i do printf %s\\n "$i" | sed "s/'/'\\\\''/g;1s/^/'/;\$s/\$/' \\\\/" ; done + echo " " +} +APP_ARGS=`save "$@"` + +# Collect all arguments for the java command, following the shell quoting and substitution rules +eval set -- $DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS "\"-Dorg.gradle.appname=$APP_BASE_NAME\"" -classpath "\"$CLASSPATH\"" org.gradle.wrapper.GradleWrapperMain "$APP_ARGS" + +exec "$JAVACMD" "$@" diff --git a/gradlew.bat b/gradlew.bat new file mode 100644 index 00000000..107acd32 --- /dev/null +++ b/gradlew.bat @@ -0,0 +1,89 @@ +@rem +@rem Copyright 2015 the original author or authors. +@rem +@rem Licensed under the Apache License, Version 2.0 (the "License"); +@rem you may not use this file except in compliance with the License. +@rem You may obtain a copy of the License at +@rem +@rem https://www.apache.org/licenses/LICENSE-2.0 +@rem +@rem Unless required by applicable law or agreed to in writing, software +@rem distributed under the License is distributed on an "AS IS" BASIS, +@rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +@rem See the License for the specific language governing permissions and +@rem limitations under the License. +@rem + +@if "%DEBUG%" == "" @echo off +@rem ########################################################################## +@rem +@rem Gradle startup script for Windows +@rem +@rem ########################################################################## + +@rem Set local scope for the variables with windows NT shell +if "%OS%"=="Windows_NT" setlocal + +set DIRNAME=%~dp0 +if "%DIRNAME%" == "" set DIRNAME=. +set APP_BASE_NAME=%~n0 +set APP_HOME=%DIRNAME% + +@rem Resolve any "." and ".." in APP_HOME to make it shorter. +for %%i in ("%APP_HOME%") do set APP_HOME=%%~fi + +@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +set DEFAULT_JVM_OPTS="-Xmx64m" "-Xms64m" + +@rem Find java.exe +if defined JAVA_HOME goto findJavaFromJavaHome + +set JAVA_EXE=java.exe +%JAVA_EXE% -version >NUL 2>&1 +if "%ERRORLEVEL%" == "0" goto execute + +echo. +echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. +echo. +echo Please set the JAVA_HOME variable in your environment to match the +echo location of your Java installation. + +goto fail + +:findJavaFromJavaHome +set JAVA_HOME=%JAVA_HOME:"=% +set JAVA_EXE=%JAVA_HOME%/bin/java.exe + +if exist "%JAVA_EXE%" goto execute + +echo. +echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME% +echo. +echo Please set the JAVA_HOME variable in your environment to match the +echo location of your Java installation. + +goto fail + +:execute +@rem Setup the command line + +set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar + + +@rem Execute Gradle +"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %* + +:end +@rem End local scope for the variables with windows NT shell +if "%ERRORLEVEL%"=="0" goto mainEnd + +:fail +rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of +rem the _cmd.exe /c_ return code! +if not "" == "%GRADLE_EXIT_CONSOLE%" exit 1 +exit /b 1 + +:mainEnd +if "%OS%"=="Windows_NT" endlocal + +:omega diff --git a/settings.gradle b/settings.gradle new file mode 100644 index 00000000..c50fb1fe --- /dev/null +++ b/settings.gradle @@ -0,0 +1,7 @@ +rootProject.name = 'xpipe_java' + +include 'core' +include 'beacon' +include 'api' + +