diff --git a/.gitignore b/.gitignore index b1ac10fa..dbca16b9 100644 --- a/.gitignore +++ b/.gitignore @@ -2,4 +2,5 @@ build/ .idea dev.properties -extensions.txt \ No newline at end of file +extensions.txt +local/ \ No newline at end of file diff --git a/api/build.gradle b/api/build.gradle index a14164ac..58ba1f48 100644 --- a/api/build.gradle +++ b/api/build.gradle @@ -29,11 +29,14 @@ test { exceptionFormat = 'full' showStandardStreams = true } + workingDir = rootDir - systemProperty 'io.xpipe.beacon.exec', 'start_test_daemon.bat' - systemProperty "io.xpipe.daemon.mode", 'base' - systemProperty "io.xpipe.storage.dir", "$projectDir/test_env" - systemProperty "io.xpipe.beacon.port", "21722" + systemProperty "io.xpipe.storage.dir", "$projectDir/local/storage" + systemProperty "io.xpipe.storage.persist", "false" systemProperty 'io.xpipe.app.writeSysOut', "true" systemProperty 'io.xpipe.app.logLevel', "trace" + + systemProperty "io.xpipe.beacon.exec", "cmd.exe /c \"$rootDir\\gradlew.bat\" :app:run -Dio.xpipe.daemon.mode=tray -Dio.xpipe.beacon.port=21722 -Dio.xpipe.app.dataDir=$projectDir/local/" + systemProperty 'io.xpipe.beacon.debugOutput', "true" + systemProperty "io.xpipe.beacon.port", "21722" } diff --git a/api/src/main/java/io/xpipe/api/DataSource.java b/api/src/main/java/io/xpipe/api/DataSource.java index d2936876..b7343ba5 100644 --- a/api/src/main/java/io/xpipe/api/DataSource.java +++ b/api/src/main/java/io/xpipe/api/DataSource.java @@ -1,13 +1,14 @@ package io.xpipe.api; import io.xpipe.api.impl.DataSourceImpl; -import io.xpipe.core.source.DataSourceConfig; -import io.xpipe.core.source.DataSourceId; -import io.xpipe.core.source.DataSourceReference; -import io.xpipe.core.source.DataSourceType; +import io.xpipe.core.source.*; +import java.io.IOException; import java.io.InputStream; +import java.io.UncheckedIOException; import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Path; import java.util.Map; /** @@ -67,7 +68,7 @@ public interface DataSource { } /** - * Retrieves a reference to the given data source. + * Retrieves the data source for a given reference. * * @param ref the data source reference */ @@ -83,54 +84,75 @@ public interface DataSource { throw new UnsupportedOperationException(); } - static DataSource wrap(InputStream in, String type, Map configOptions) { - return DataSourceImpl.wrap(in, type, configOptions); - } - - static DataSource wrap(InputStream in, String type) { - return DataSourceImpl.wrap(in, type, Map.of()); - } - - static DataSource wrap(InputStream in) { - return DataSourceImpl.wrap(in, null, Map.of()); + /** + * Wrapper for {@link #create(DataSourceId, String, Map, InputStream)} that creates an anonymous data source. + */ + public static DataSource createAnonymous(String type, Map config, Path path) { + return create(null, type, config, path); } /** - * Retrieves a reference to the given local data source that is specified by a URL. + * Wrapper for {@link #create(DataSourceId, String, Map, InputStream)}. + */ + public static DataSource create(DataSourceId id, String type, Map config, Path path) { + try (var in = Files.newInputStream(path)) { + return create(id, type, config, in); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + /** + * Wrapper for {@link #create(DataSourceId, String, Map, InputStream)} that creates an anonymous data source. + */ + public static DataSource createAnonymous(String type, Map config, URL url) { + return create(null, type, config, url); + } + + /** + * Wrapper for {@link #create(DataSourceId, String, Map, InputStream)}. + */ + public static DataSource create(DataSourceId id, String type, Map config, URL url) { + try (var in = url.openStream()) { + return create(id, type, config, in); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + + /** + * Wrapper for {@link #create(DataSourceId, String, Map, InputStream)} that creates an anonymous data source. + */ + public static DataSource createAnonymous(String type, Map config, InputStream in) { + return create(null, type, config, in); + } + + /** + * Creates a new data source from an input stream. * - * This wrapped data source is only available temporarily and locally, - * i.e. it is not added to the XPipe data source storage. - * - * @param url the url that points to the data + * @param id the data source id * @param type the data source type - * @param configOptions additional configuration options for the specific data source type - * @return a reference to the data source that can be used to access the underlying data source + * @param config additional configuration options for the specific data source type + * @param in the input stream to read + * @return a {@link DataSource} instances that can be used to access the underlying data */ - static DataSource wrap(URL url, String type, Map configOptions) { - return DataSourceImpl.wrap(url, type, configOptions); + public static DataSource create(DataSourceId id, String type, Map config, InputStream in) { + return DataSourceImpl.create(id, type, config, in); } /** - * Wrapper for {@link #wrap(URL, String, Map)} that passes no configuration options. - * As a result, the data source configuration is automatically determined by X-Pipe for the given type. + * Returns the id of this data source. */ - static DataSource wrap(URL url, String type) { - return wrap(url, type, Map.of()); - } - - /** - * Wrapper for {@link #wrap(URL, String, Map)} that passes no type and no configuration options. - * As a result, the data source type and configuration is automatically determined by X-Pipe. - */ - static DataSource wrap(URL url) { - return wrap(url, null, Map.of()); - } - DataSourceId getId(); + /** + * Returns the type of this data source. + */ DataSourceType getType(); - DataSourceConfig getConfig(); + + DataSourceConfigInstance getConfig(); /** * Attempts to cast this object to a {@link DataTable}. diff --git a/api/src/main/java/io/xpipe/api/DataTableAccumulator.java b/api/src/main/java/io/xpipe/api/DataTableAccumulator.java index 821f5073..c9581fce 100644 --- a/api/src/main/java/io/xpipe/api/DataTableAccumulator.java +++ b/api/src/main/java/io/xpipe/api/DataTableAccumulator.java @@ -20,9 +20,20 @@ public interface DataTableAccumulator { */ DataTable finish(DataSourceId id); + /** + * Adds a row to the table. + * + * @param row the row to add + */ void add(TupleNode row); + /** + * Creates a tuple acceptor that adds all accepted tuples to the table. + */ DataStructureNodeAcceptor acceptor(); + /** + * Returns the current amount of rows added to the table. + */ int getCurrentRows(); } diff --git a/api/src/main/java/io/xpipe/api/connector/XPipeApiConnector.java b/api/src/main/java/io/xpipe/api/connector/XPipeApiConnector.java deleted file mode 100644 index 9fc3f456..00000000 --- a/api/src/main/java/io/xpipe/api/connector/XPipeApiConnector.java +++ /dev/null @@ -1,75 +0,0 @@ -package io.xpipe.api.connector; - -import io.xpipe.beacon.*; -import io.xpipe.core.util.JacksonHelper; - -import java.util.Optional; - -public abstract class XPipeApiConnector extends BeaconConnector { - - public void execute() { - try { - var socket = constructSocket(); - handle(socket); - } catch (Throwable ce) { - throw new RuntimeException(ce); - } - } - - protected abstract void handle(BeaconClient sc) throws Exception; - - @Override - protected BeaconClient constructSocket() throws ConnectorException { - if (!JacksonHelper.isInit()) { - JacksonHelper.initModularized(ModuleLayer.boot()); - } - - if (!BeaconServer.isRunning()) { - try { - start(); - } catch (Exception ex) { - throw new ConnectorException("Unable to start xpipe daemon", ex); - } - - var r = waitForStartup(); - if (r.isEmpty()) { - throw new ConnectorException("Wait for xpipe daemon timed out"); - } else { - return r.get(); - } - } - - try { - return new BeaconClient(); - } catch (Exception ex) { - throw new ConnectorException("Unable to connect to running xpipe daemon", ex); - } - } - - private void start() throws Exception { - if (!BeaconServer.tryStart()) { - throw new UnsupportedOperationException("Unable to determine xpipe daemon launch command"); - }; - } - - private Optional waitForStartup() { - for (int i = 0; i < 40; i++) { - try { - Thread.sleep(500); - } catch (InterruptedException ignored) { - } - - var s = BeaconClient.tryConnect(); - if (s.isPresent()) { - return s; - } - } - return Optional.empty(); - } - - @FunctionalInterface - public static interface Handler { - - void handle(BeaconClient sc) throws ClientException, ServerException; - } -} diff --git a/api/src/main/java/io/xpipe/api/connector/XPipeConnection.java b/api/src/main/java/io/xpipe/api/connector/XPipeConnection.java new file mode 100644 index 00000000..c9c37e1f --- /dev/null +++ b/api/src/main/java/io/xpipe/api/connector/XPipeConnection.java @@ -0,0 +1,112 @@ +package io.xpipe.api.connector; + +import io.xpipe.beacon.*; +import io.xpipe.core.util.JacksonHelper; + +import java.util.Optional; + +public final class XPipeConnection extends BeaconConnection { + + public static XPipeConnection open() { + var con = new XPipeConnection(); + con.constructSocket(); + return con; + } + + public static void execute(Handler handler) { + try (var con = new XPipeConnection()) { + con.constructSocket(); + handler.handle(con); + } catch (Exception e) { + throw new BeaconException(e); + } + } + + public static T execute(Mapper mapper) { + try (var con = new XPipeConnection()) { + con.constructSocket(); + return mapper.handle(con); + } catch (Exception e) { + throw new BeaconException(e); + } + } + + private XPipeConnection() { + } + + @Override + protected void constructSocket() { + if (!JacksonHelper.isInit()) { + JacksonHelper.initModularized(ModuleLayer.boot()); + } + + if (!BeaconServer.isRunning()) { + try { + start(); + } catch (Exception ex) { + throw new BeaconException("Unable to start xpipe daemon", ex); + } + + var r = waitForStartup(); + if (r.isEmpty()) { + throw new BeaconException("Wait for xpipe daemon timed out"); + } else { + socket = r.get(); + return; + } + } + + try { + socket = new BeaconClient(); + } catch (Exception ex) { + throw new BeaconException("Unable to connect to running xpipe daemon", ex); + } + } + + private void start() throws Exception { + if (!BeaconServer.tryStart()) { + throw new UnsupportedOperationException("Unable to determine xpipe daemon launch command"); + }; + } + + public static Optional waitForStartup() { + for (int i = 0; i < 40; i++) { + try { + Thread.sleep(500); + } catch (InterruptedException ignored) { + } + + var s = BeaconClient.tryConnect(); + if (s.isPresent()) { + return s; + } + } + return Optional.empty(); + } + + public static void waitForShutdown() { + for (int i = 0; i < 40; i++) { + try { + Thread.sleep(500); + } catch (InterruptedException ignored) { + } + + var r = BeaconServer.isRunning(); + if (!r) { + return; + } + } + } + + @FunctionalInterface + public static interface Handler { + + void handle(BeaconConnection con) throws ClientException, ServerException, ConnectorException; + } + + @FunctionalInterface + public static interface Mapper { + + T handle(BeaconConnection con) throws ClientException, ServerException, ConnectorException; + } +} diff --git a/api/src/main/java/io/xpipe/api/impl/DataRawImpl.java b/api/src/main/java/io/xpipe/api/impl/DataRawImpl.java index abef43fe..b23d5945 100644 --- a/api/src/main/java/io/xpipe/api/impl/DataRawImpl.java +++ b/api/src/main/java/io/xpipe/api/impl/DataRawImpl.java @@ -1,10 +1,7 @@ package io.xpipe.api.impl; import io.xpipe.api.DataRaw; -import io.xpipe.core.source.DataSourceConfig; -import io.xpipe.core.source.DataSourceId; -import io.xpipe.core.source.DataSourceInfo; -import io.xpipe.core.source.DataSourceType; +import io.xpipe.core.source.*; import java.io.InputStream; @@ -12,7 +9,7 @@ public class DataRawImpl extends DataSourceImpl implements DataRaw { private final DataSourceInfo.Raw info; - public DataRawImpl(DataSourceId sourceId, DataSourceConfig sourceConfig, DataSourceInfo.Raw info) { + public DataRawImpl(DataSourceId sourceId, DataSourceConfigInstance sourceConfig, DataSourceInfo.Raw info) { super(sourceId, sourceConfig); this.info = info; } diff --git a/api/src/main/java/io/xpipe/api/impl/DataSourceImpl.java b/api/src/main/java/io/xpipe/api/impl/DataSourceImpl.java index 57068120..feca4ef7 100644 --- a/api/src/main/java/io/xpipe/api/impl/DataSourceImpl.java +++ b/api/src/main/java/io/xpipe/api/impl/DataSourceImpl.java @@ -1,101 +1,81 @@ package io.xpipe.api.impl; import io.xpipe.api.DataSource; -import io.xpipe.api.connector.XPipeApiConnector; -import io.xpipe.beacon.BeaconClient; -import io.xpipe.beacon.ClientException; -import io.xpipe.beacon.ConnectorException; -import io.xpipe.beacon.ServerException; +import io.xpipe.api.connector.XPipeConnection; +import io.xpipe.beacon.exchange.PreStoreExchange; import io.xpipe.beacon.exchange.QueryDataSourceExchange; -import io.xpipe.beacon.exchange.StoreResourceExchange; -import io.xpipe.beacon.exchange.StoreStreamExchange; -import io.xpipe.core.source.DataSourceConfig; +import io.xpipe.beacon.exchange.ReadExecuteExchange; +import io.xpipe.beacon.exchange.ReadPreparationExchange; +import io.xpipe.core.source.DataSourceConfigInstance; import io.xpipe.core.source.DataSourceId; import io.xpipe.core.source.DataSourceReference; import java.io.InputStream; -import java.net.URL; import java.util.Map; public abstract class DataSourceImpl implements DataSource { public static DataSource get(DataSourceReference ds) { - final DataSource[] source = new DataSource[1]; - new XPipeApiConnector() { - @Override - protected void handle(BeaconClient sc) throws ClientException, ServerException, ConnectorException { - var req = QueryDataSourceExchange.Request.builder().ref(ds).build(); - QueryDataSourceExchange.Response res = performSimpleExchange(sc, req); - switch (res.getInfo().getType()) { - case TABLE -> { - var data = res.getInfo().asTable(); - source[0] = new DataTableImpl(res.getId(), res.getConfig().getConfig(), data); - } - case STRUCTURE -> { - var info = res.getInfo().asStructure(); - source[0] = new DataStructureImpl(res.getId(), res.getConfig().getConfig(), info); - } - case TEXT -> { - var info = res.getInfo().asText(); - source[0] = new DataTextImpl(res.getId(), res.getConfig().getConfig(), info); - } - case RAW -> { - var info = res.getInfo().asRaw(); - source[0] = new DataRawImpl(res.getId(), res.getConfig().getConfig(), info); - } + return XPipeConnection.execute(con -> { + var req = QueryDataSourceExchange.Request.builder().ref(ds).build(); + QueryDataSourceExchange.Response res = con.performSimpleExchange(req); + switch (res.getInfo().getType()) { + case TABLE -> { + var data = res.getInfo().asTable(); + return new DataTableImpl(res.getId(), res.getConfig(), data); + } + case STRUCTURE -> { + var info = res.getInfo().asStructure(); + return new DataStructureImpl(res.getId(), res.getConfig(), info); + } + case TEXT -> { + var info = res.getInfo().asText(); + return new DataTextImpl(res.getId(), res.getConfig(), info); + } + case RAW -> { + var info = res.getInfo().asRaw(); + return new DataRawImpl(res.getId(), res.getConfig(), info); } } - }.execute(); - return source[0]; + throw new AssertionError(); + }); } - public static DataSource wrap(URL url, String type, Map config) { - final DataSource[] source = new DataSource[1]; - new XPipeApiConnector() { - @Override - protected void handle(BeaconClient sc) throws ClientException, ServerException, ConnectorException { - var req = StoreResourceExchange.Request.builder() - .url(url).providerId(type).build(); - StoreResourceExchange.Response res = performOutputExchange(sc, req, out -> { - try (var s = url.openStream()) { - writeLength(sc, s.available()); - s.transferTo(out); - } - }); - switch (res.getInfo().getType()) { - case TABLE -> { - var data = res.getInfo().asTable(); - source[0] = new DataTableImpl(res.getSourceId(), res.getConfig(), data); - } - case STRUCTURE -> { - } - case RAW -> { - } - } - } - }.execute(); - return source[0]; - } + public static DataSource create(DataSourceId id, String type, Map config, InputStream in) { + var res = XPipeConnection.execute(con -> { + var req = PreStoreExchange.Request.builder().build(); + PreStoreExchange.Response r = con.performOutputExchange(req, in::transferTo); + return r; + }); - public static DataSource wrap(InputStream in, String type, Map config) { - final DataSource[] source = new DataSource[1]; - new XPipeApiConnector() { - @Override - protected void handle(BeaconClient sc) throws ClientException, ServerException, ConnectorException { - var req = StoreStreamExchange.Request.builder().type(type).build(); - StoreStreamExchange.Response res = performOutputExchange(sc, req, in::transferTo); + var store = res.getStore(); - } - }.execute(); - return source[0]; + var startReq = ReadPreparationExchange.Request.builder() + .provider(type) + .store(store) + .build(); + var startRes = XPipeConnection.execute(con -> { + ReadPreparationExchange.Response r = con.performSimpleExchange(startReq); + return r; + }); + + var configInstance = startRes.getConfig(); + configInstance.getCurrentValues().putAll(config); + var endReq = ReadExecuteExchange.Request.builder() + .target(id).dataStore(store).config(configInstance).build(); + XPipeConnection.execute(con -> { + con.performSimpleExchange(endReq); + }); + var ref = id != null ? DataSourceReference.id(id) : DataSourceReference.latest(); + return get(ref); } private final DataSourceId sourceId; - private final DataSourceConfig sourceConfig; + private final DataSourceConfigInstance config; - public DataSourceImpl(DataSourceId sourceId, DataSourceConfig sourceConfig) { + public DataSourceImpl(DataSourceId sourceId, DataSourceConfigInstance config) { this.sourceId = sourceId; - this.sourceConfig = sourceConfig; + this.config = config; } @Override @@ -104,7 +84,7 @@ public abstract class DataSourceImpl implements DataSource { } @Override - public DataSourceConfig getConfig() { - return sourceConfig; + public DataSourceConfigInstance getConfig() { + return config; } } diff --git a/api/src/main/java/io/xpipe/api/impl/DataStructureImpl.java b/api/src/main/java/io/xpipe/api/impl/DataStructureImpl.java index 8e2e624a..f3441e59 100644 --- a/api/src/main/java/io/xpipe/api/impl/DataStructureImpl.java +++ b/api/src/main/java/io/xpipe/api/impl/DataStructureImpl.java @@ -2,16 +2,13 @@ package io.xpipe.api.impl; import io.xpipe.api.DataStructure; import io.xpipe.core.data.node.DataStructureNode; -import io.xpipe.core.source.DataSourceConfig; -import io.xpipe.core.source.DataSourceId; -import io.xpipe.core.source.DataSourceInfo; -import io.xpipe.core.source.DataSourceType; +import io.xpipe.core.source.*; public class DataStructureImpl extends DataSourceImpl implements DataStructure { private final DataSourceInfo.Structure info; - public DataStructureImpl(DataSourceId sourceId, DataSourceConfig sourceConfig, DataSourceInfo.Structure info) { + public DataStructureImpl(DataSourceId sourceId, DataSourceConfigInstance sourceConfig, DataSourceInfo.Structure info) { super(sourceId, sourceConfig); this.info = info; } diff --git a/api/src/main/java/io/xpipe/api/impl/DataTableAccumulatorImpl.java b/api/src/main/java/io/xpipe/api/impl/DataTableAccumulatorImpl.java index 56993373..4c6b01f2 100644 --- a/api/src/main/java/io/xpipe/api/impl/DataTableAccumulatorImpl.java +++ b/api/src/main/java/io/xpipe/api/impl/DataTableAccumulatorImpl.java @@ -1,30 +1,63 @@ package io.xpipe.api.impl; +import io.xpipe.api.DataSource; import io.xpipe.api.DataTable; import io.xpipe.api.DataTableAccumulator; +import io.xpipe.api.connector.XPipeConnection; +import io.xpipe.beacon.exchange.PreStoreExchange; +import io.xpipe.beacon.exchange.ReadExecuteExchange; import io.xpipe.core.data.node.DataStructureNodeAcceptor; import io.xpipe.core.data.node.TupleNode; +import io.xpipe.core.data.type.TupleType; +import io.xpipe.core.data.typed.TypedDataStreamWriter; +import io.xpipe.core.source.DataSourceConfigInstance; import io.xpipe.core.source.DataSourceId; +import io.xpipe.core.source.DataSourceReference; public class DataTableAccumulatorImpl implements DataTableAccumulator { - @Override - public DataTable finish(DataSourceId id) { - return null; + private final XPipeConnection connection; + private final TupleType type; + private int rows; + + public DataTableAccumulatorImpl(TupleType type) { + this.type = type; + connection = XPipeConnection.open(); + connection.sendRequest(PreStoreExchange.Request.builder().build()); + connection.sendBodyStart(); } @Override - public void add(TupleNode row) { + public synchronized DataTable finish(DataSourceId id) { + PreStoreExchange.Response res = connection.receiveResponse(); + connection.close(); + var req = ReadExecuteExchange.Request.builder() + .target(id).dataStore(res.getStore()).config(DataSourceConfigInstance.xpbt()).build(); + XPipeConnection.execute(con -> { + con.performSimpleExchange(req); + }); + return DataSource.get(DataSourceReference.id(id)).asTable(); } @Override - public DataStructureNodeAcceptor acceptor() { - return null; + public synchronized void add(TupleNode row) { + connection.withOutputStream(out -> { + TypedDataStreamWriter.writeStructure(connection.getOutputStream(), row, type); + rows++; + }); } @Override - public int getCurrentRows() { - return 0; + public synchronized DataStructureNodeAcceptor acceptor() { + return node -> { + add(node); + return true; + }; + } + + @Override + public synchronized int getCurrentRows() { + return rows; } } diff --git a/api/src/main/java/io/xpipe/api/impl/DataTableImpl.java b/api/src/main/java/io/xpipe/api/impl/DataTableImpl.java index 66efcc92..4837c911 100644 --- a/api/src/main/java/io/xpipe/api/impl/DataTableImpl.java +++ b/api/src/main/java/io/xpipe/api/impl/DataTableImpl.java @@ -1,26 +1,22 @@ package io.xpipe.api.impl; import io.xpipe.api.DataTable; -import io.xpipe.api.connector.XPipeApiConnector; -import io.xpipe.beacon.BeaconClient; -import io.xpipe.beacon.ClientException; -import io.xpipe.beacon.ConnectorException; -import io.xpipe.beacon.ServerException; +import io.xpipe.api.connector.XPipeConnection; +import io.xpipe.beacon.BeaconConnection; +import io.xpipe.beacon.exchange.api.QueryTableDataExchange; import io.xpipe.core.data.node.ArrayNode; import io.xpipe.core.data.node.DataStructureNode; import io.xpipe.core.data.node.TupleNode; import io.xpipe.core.data.typed.TypedAbstractReader; import io.xpipe.core.data.typed.TypedDataStreamParser; +import io.xpipe.core.data.typed.TypedDataStructureNodeReader; import io.xpipe.core.data.typed.TypedReusableDataStructureNodeReader; -import io.xpipe.core.source.DataSourceConfig; -import io.xpipe.core.source.DataSourceId; -import io.xpipe.core.source.DataSourceInfo; -import io.xpipe.core.source.DataSourceType; +import io.xpipe.core.source.*; -import java.io.IOException; import java.io.InputStream; -import java.io.UncheckedIOException; import java.util.*; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Stream; import java.util.stream.StreamSupport; @@ -28,7 +24,7 @@ public class DataTableImpl extends DataSourceImpl implements DataTable { private final DataSourceInfo.Table info; - DataTableImpl(DataSourceId id, DataSourceConfig sourceConfig, DataSourceInfo.Table info) { + DataTableImpl(DataSourceId id, DataSourceConfigInstance sourceConfig, DataSourceInfo.Table info) { super(id, sourceConfig); this.info = info; } @@ -60,20 +56,15 @@ public class DataTableImpl extends DataSourceImpl implements DataTable { @Override public ArrayNode read(int maxRows) { - int maxToRead = info.getRowCount() == -1 ? maxRows : Math.min(info.getRowCount(), maxRows); - List nodes = new ArrayList<>(); - new XPipeApiConnector() { - @Override - protected void handle(BeaconClient sc) throws ClientException, ServerException, ConnectorException { -// var req = ReadTableDataExchange.Request.builder() -// .sourceId(id).maxRows(maxToRead).build(); -// performInputExchange(sc, req, (ReadTableDataExchange.Response res, InputStream in) -> { -// var r = new TypedDataStreamParser(info.getDataType()); -// r.parseStructures(in, TypedDataStructureNodeReader.immutable(info.getDataType()), nodes::add); -// }); - } - }.execute(); + XPipeConnection.execute(con -> { + var req = QueryTableDataExchange.Request.builder() + .id(getId()).maxRows(maxRows).build(); + con.performInputExchange(req, (QueryTableDataExchange.Response res, InputStream in) -> { + var r = new TypedDataStreamParser(info.getDataType()); + r.parseStructures(in, TypedDataStructureNodeReader.immutable(info.getDataType()), nodes::add); + }); + }); return ArrayNode.of(nodes); } @@ -81,74 +72,49 @@ public class DataTableImpl extends DataSourceImpl implements DataTable { public Iterator iterator() { return new Iterator<>() { - private InputStream input; - private int read; - private final int toRead = info.getRowCount(); + private final BeaconConnection connection; private final TypedDataStreamParser parser; private final TypedAbstractReader nodeReader; { - new XPipeApiConnector() { - @Override - protected void handle(BeaconClient sc) throws ClientException, ServerException, ConnectorException { -// var req = ReadTableDataExchange.Request.builder() -// .sourceId(id).maxRows(Integer.MAX_VALUE).build(); -// performInputExchange(sc, req, -// (ReadTableDataExchange.Response res, InputStream in) -> { -// input = in; -// }); - } - }.execute(); - nodeReader = TypedReusableDataStructureNodeReader.create(info.getDataType()); parser = new TypedDataStreamParser(info.getDataType()); + + connection = XPipeConnection.open(); + var req = QueryTableDataExchange.Request.builder() + .id(getId()).build(); + connection.sendRequest(req); + connection.receiveResponse(); + connection.receiveBody(); } private void finish() { - try { - input.close(); - } catch (IOException e) { - e.printStackTrace(); - } - } - - private boolean hasKnownSize() { - return info.getRowCount() != -1; + connection.close(); } @Override public boolean hasNext() { - if (hasKnownSize() && read == toRead) { - finish(); - return false; - } + connection.checkClosed(); - if (hasKnownSize() && read < toRead) { - return true; - } - - try { - var hasNext = parser.hasNext(input); - if (!hasNext) { - finish(); - } - return hasNext; - } catch (IOException ex) { + AtomicBoolean hasNext = new AtomicBoolean(false); + connection.withInputStream(in -> { + hasNext.set(parser.hasNext(in)); + }); + if (!hasNext.get()) { finish(); - throw new UncheckedIOException(ex); } + return hasNext.get(); } @Override public TupleNode next() { - TupleNode current; - try { - current = (TupleNode) parser.parseStructure(input, nodeReader); - } catch (IOException ex) { - throw new UncheckedIOException(ex); - } - read++; - return current; + connection.checkClosed(); + + AtomicReference current = new AtomicReference<>(); + connection.withInputStream(in -> { + current.set((TupleNode) parser.parseStructure(connection.getInputStream(), nodeReader)); + }); + return current.get(); } }; } diff --git a/api/src/main/java/io/xpipe/api/impl/DataTextImpl.java b/api/src/main/java/io/xpipe/api/impl/DataTextImpl.java index 390c00eb..ad397b3a 100644 --- a/api/src/main/java/io/xpipe/api/impl/DataTextImpl.java +++ b/api/src/main/java/io/xpipe/api/impl/DataTextImpl.java @@ -1,10 +1,7 @@ package io.xpipe.api.impl; import io.xpipe.api.DataText; -import io.xpipe.core.source.DataSourceConfig; -import io.xpipe.core.source.DataSourceId; -import io.xpipe.core.source.DataSourceInfo; -import io.xpipe.core.source.DataSourceType; +import io.xpipe.core.source.*; import java.util.Iterator; import java.util.List; @@ -13,7 +10,7 @@ public class DataTextImpl extends DataSourceImpl implements DataText { private final DataSourceInfo.Text info; - public DataTextImpl(DataSourceId sourceId, DataSourceConfig sourceConfig, DataSourceInfo.Text info) { + public DataTextImpl(DataSourceId sourceId, DataSourceConfigInstance sourceConfig, DataSourceInfo.Text info) { super(sourceId, sourceConfig); this.info = info; } diff --git a/api/src/test/java/io/xpipe/api/test/ConnectionFactory.java b/api/src/test/java/io/xpipe/api/test/ConnectionFactory.java new file mode 100644 index 00000000..6f5bb728 --- /dev/null +++ b/api/src/test/java/io/xpipe/api/test/ConnectionFactory.java @@ -0,0 +1,31 @@ +package io.xpipe.api.test; + +import io.xpipe.api.connector.XPipeConnection; +import io.xpipe.beacon.BeaconClient; +import io.xpipe.beacon.BeaconServer; + +public class ConnectionFactory { + + public static void start() throws Exception { + if (!BeaconServer.tryStart()) { + throw new AssertionError(); + } + + XPipeConnection.waitForStartup(); + if (!BeaconServer.isRunning()) { + throw new AssertionError(); + } + } + + public static void stop() throws Exception { + if (!BeaconServer.isRunning()) { + return; + } + + var client = new BeaconClient(); + if (!BeaconServer.tryStop(client)) { + throw new AssertionError(); + } + XPipeConnection.waitForShutdown(); + } +} diff --git a/api/src/test/java/io/xpipe/api/test/DaemonControl.java b/api/src/test/java/io/xpipe/api/test/DaemonControl.java new file mode 100644 index 00000000..5c8bdf52 --- /dev/null +++ b/api/src/test/java/io/xpipe/api/test/DaemonControl.java @@ -0,0 +1,17 @@ +package io.xpipe.api.test; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; + +public class DaemonControl { + + @BeforeAll + static void setup() throws Exception { + ConnectionFactory.start(); + } + + @AfterAll + static void teardown() throws Exception { + ConnectionFactory.stop(); + } +} diff --git a/api/src/test/java/io/xpipe/api/test/DataTableTest.java b/api/src/test/java/io/xpipe/api/test/DataTableTest.java index f1cb2d77..c882a9f8 100644 --- a/api/src/test/java/io/xpipe/api/test/DataTableTest.java +++ b/api/src/test/java/io/xpipe/api/test/DataTableTest.java @@ -1,16 +1,22 @@ package io.xpipe.api.test; import io.xpipe.api.DataSource; -import io.xpipe.api.DataTable; +import io.xpipe.core.source.DataSourceId; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -@ExtendWith({XPipeConfig.class}) -public class DataTableTest { +import java.util.Map; + +public class DataTableTest extends DaemonControl { + + @BeforeAll + static void setup() throws Exception { + DataSource.create(DataSourceId.fromString(":usernames"), "csv", Map.of(), DataTableTest.class.getResource("username.csv")); + } @Test public void testGet() { - var table = DataSource.get("new folder:username").asTable(); + var table = DataSource.getById(":usernames").asTable(); var r = table.read(2); var a = 0; } diff --git a/api/src/test/java/io/xpipe/api/test/XPipeConfig.java b/api/src/test/java/io/xpipe/api/test/XPipeConfig.java deleted file mode 100644 index 0b634ae9..00000000 --- a/api/src/test/java/io/xpipe/api/test/XPipeConfig.java +++ /dev/null @@ -1,29 +0,0 @@ -package io.xpipe.api.test; - -import io.xpipe.beacon.BeaconClient; -import io.xpipe.beacon.BeaconServer; -import org.junit.jupiter.api.extension.BeforeAllCallback; -import org.junit.jupiter.api.extension.ExtensionContext; - -public class XPipeConfig implements BeforeAllCallback, ExtensionContext.Store.CloseableResource { - - private static boolean started = false; - - @Override - public void beforeAll(ExtensionContext context) throws Exception { - if (!started) { - started = true; - if (BeaconServer.tryStart()) { - throw new AssertionError(); - } - } - } - - @Override - public void close() throws Exception { - var client = new BeaconClient(); - if (BeaconServer.tryStop(client)) { - throw new AssertionError(); - } - } -} diff --git a/api/src/test/java/module-info.java b/api/src/test/java/module-info.java index 4820ba2b..234dfe6d 100644 --- a/api/src/test/java/module-info.java +++ b/api/src/test/java/module-info.java @@ -1,7 +1,9 @@ module io.xpipe.api.test { - exports io.xpipe.api.test; - requires io.xpipe.api; requires io.xpipe.beacon; requires org.junit.jupiter.api; + + opens io.xpipe.api.test; + + exports io.xpipe.api.test; } \ No newline at end of file diff --git a/api/src/test/resources/io/xpipe/api/test/username.csv b/api/src/test/resources/io/xpipe/api/test/username.csv new file mode 100644 index 00000000..87c0f122 --- /dev/null +++ b/api/src/test/resources/io/xpipe/api/test/username.csv @@ -0,0 +1,6 @@ +Username;Identifier ;First name;Last name +booker12;9012;Rachel;Booker +grey07;2070;Laura;Grey +johnson81;4081;Craig;Johnson +jenkins46;9346;Mary;Jenkins +smith79;5079;Jamie;Smith diff --git a/beacon/src/main/java/io/xpipe/beacon/BeaconClient.java b/beacon/src/main/java/io/xpipe/beacon/BeaconClient.java index b9b1f953..f0f2ebc5 100644 --- a/beacon/src/main/java/io/xpipe/beacon/BeaconClient.java +++ b/beacon/src/main/java/io/xpipe/beacon/BeaconClient.java @@ -68,6 +68,10 @@ public class BeaconClient implements AutoCloseable { out = socket.getOutputStream(); } + public boolean isClosed() { + return socket.isClosed(); + } + public void close() throws ConnectorException { try { socket.close(); @@ -100,13 +104,32 @@ public class BeaconClient implements AutoCloseable { } } + public void receiveBody() throws ConnectorException { + try { + var sep = in.readNBytes(BODY_SEPARATOR.length); + if (sep.length != 0 && !Arrays.equals(BODY_SEPARATOR, sep)) { + throw new ConnectorException("Invalid body separator"); + } + } catch (IOException ex) { + throw new ConnectorException(ex); + } + } + + public void startBody() throws ConnectorException { + try { + out.write(BODY_SEPARATOR); + } catch (IOException ex) { + throw new ConnectorException(ex); + } + } + public RES simpleExchange(REQ req) throws ServerException, ConnectorException, ClientException { sendRequest(req); return this.receiveResponse(); } - private void sendRequest(T req) throws ClientException, ConnectorException { + public void sendRequest(T req) throws ClientException, ConnectorException { ObjectNode json = JacksonHelper.newMapper().valueToTree(req); var prov = MessageExchanges.byRequest(req); if (prov.isEmpty()) { @@ -132,7 +155,7 @@ public class BeaconClient implements AutoCloseable { } } - private T receiveResponse() throws ConnectorException, ClientException, ServerException { + public T receiveResponse() throws ConnectorException, ClientException, ServerException { JsonNode read; try { var in = socket.getInputStream(); diff --git a/beacon/src/main/java/io/xpipe/beacon/BeaconConnection.java b/beacon/src/main/java/io/xpipe/beacon/BeaconConnection.java new file mode 100644 index 00000000..99af1153 --- /dev/null +++ b/beacon/src/main/java/io/xpipe/beacon/BeaconConnection.java @@ -0,0 +1,163 @@ +package io.xpipe.beacon; + +import io.xpipe.beacon.message.RequestMessage; +import io.xpipe.beacon.message.ResponseMessage; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +public abstract class BeaconConnection implements AutoCloseable { + + protected BeaconClient socket; + + protected abstract void constructSocket(); + + @Override + public void close() { + try { + if (socket != null) { + socket.close(); + } + socket = null; + } catch (Exception e) { + socket = null; + throw new BeaconException("Could not close beacon connection", e); + } + } + + public void closeOutput() { + try { + socket.getOutputStream().close(); + } catch (Exception e) { + throw new BeaconException("Could not close beacon output stream", e); + } + } + + public void withOutputStream(BeaconClient.FailableConsumer ex) { + try { + ex.accept(getOutputStream()); + } catch (IOException e) { + throw new BeaconException("Could not write to beacon output stream", e); + } + } + + public void withInputStream(BeaconClient.FailableConsumer ex) { + try { + ex.accept(getInputStream()); + } catch (IOException e) { + throw new BeaconException("Could not read from beacon output stream", e); + } + } + + public void checkClosed() { + if (socket == null) { + throw new BeaconException("Socket is closed"); + } + } + + public OutputStream getOutputStream() { + checkClosed(); + + return socket.getOutputStream(); + } + + public InputStream getInputStream() { + checkClosed(); + + return socket.getInputStream(); + } + + public void performInputExchange( + REQ req, + BeaconClient.FailableBiConsumer responseConsumer) { + checkClosed(); + + performInputOutputExchange(req, null, responseConsumer); + } + + public void performInputOutputExchange( + REQ req, + BeaconClient.FailableConsumer reqWriter, + BeaconClient.FailableBiConsumer responseConsumer) { + checkClosed(); + + try { + socket.exchange(req, reqWriter, responseConsumer); + } catch (Exception e) { + throw new BeaconException("Could not communicate with beacon", e); + } + } + + public void sendRequest( + REQ req) { + checkClosed(); + + try { + socket.sendRequest(req); + } catch (Exception e) { + throw new BeaconException("Could not communicate with beacon", e); + } + } + + public RES receiveResponse() { + checkClosed(); + + try { + return socket.receiveResponse(); + } catch (Exception e) { + throw new BeaconException("Could not communicate with beacon", e); + } + } + + public void sendBodyStart() { + checkClosed(); + + try { + socket.startBody(); + } catch (Exception e) { + throw new BeaconException("Could not communicate with beacon", e); + } + } + + public void receiveBody() { + checkClosed(); + + try { + socket.receiveBody(); + } catch (Exception e) { + throw new BeaconException("Could not communicate with beacon", e); + } + } + + public RES performOutputExchange( + REQ req, + BeaconClient.FailableConsumer reqWriter) { + checkClosed(); + + try { + socket.sendRequest(req); + socket.startBody(); + reqWriter.accept(socket.getOutputStream()); + return socket.receiveResponse(); + } catch (Exception e) { + throw new BeaconException("Could not communicate with beacon", e); + } + } + +// public void writeLength(int bytes) throws IOException { +// checkClosed(); +// socket.getOutputStream().write(ByteBuffer.allocate(4).putInt(bytes).array()); +// } + + public RES performSimpleExchange( + REQ req) { + checkClosed(); + + try { + return socket.simpleExchange(req); + } catch (Exception e) { + throw new BeaconException("Could not communicate with beacon", e); + } + } +} diff --git a/beacon/src/main/java/io/xpipe/beacon/BeaconConnector.java b/beacon/src/main/java/io/xpipe/beacon/BeaconConnector.java deleted file mode 100644 index 5e6e96ab..00000000 --- a/beacon/src/main/java/io/xpipe/beacon/BeaconConnector.java +++ /dev/null @@ -1,53 +0,0 @@ -package io.xpipe.beacon; - -import io.xpipe.beacon.message.RequestMessage; -import io.xpipe.beacon.message.ResponseMessage; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.nio.ByteBuffer; -import java.util.concurrent.atomic.AtomicReference; - -public abstract class BeaconConnector { - - protected abstract BeaconClient constructSocket() throws ConnectorException; - - protected void performInputExchange( - BeaconClient socket, - REQ req, - BeaconClient.FailableBiConsumer responseConsumer) throws ServerException, ConnectorException, ClientException { - performInputOutputExchange(socket, req, null, responseConsumer); - } - - protected void performInputOutputExchange( - BeaconClient socket, - REQ req, - BeaconClient.FailableConsumer reqWriter, - BeaconClient.FailableBiConsumer responseConsumer) - throws ServerException, ConnectorException, ClientException { - socket.exchange(req, reqWriter, responseConsumer); - } - - protected RES performOutputExchange( - BeaconClient socket, - REQ req, - BeaconClient.FailableConsumer reqWriter) - throws ServerException, ConnectorException, ClientException { - AtomicReference response = new AtomicReference<>(); - socket.exchange(req, reqWriter, (RES res, InputStream in) -> { - response.set(res); - }); - return response.get(); - } - - protected void writeLength(BeaconClient socket, int bytes) throws IOException { - socket.getOutputStream().write(ByteBuffer.allocate(4).putInt(bytes).array()); - } - - protected RES performSimpleExchange( - BeaconClient socket, - REQ req) throws ServerException, ConnectorException, ClientException { - return socket.simpleExchange(req); - } -} diff --git a/beacon/src/main/java/io/xpipe/beacon/BeaconException.java b/beacon/src/main/java/io/xpipe/beacon/BeaconException.java new file mode 100644 index 00000000..fad4bd69 --- /dev/null +++ b/beacon/src/main/java/io/xpipe/beacon/BeaconException.java @@ -0,0 +1,23 @@ +package io.xpipe.beacon; + +public class BeaconException extends RuntimeException { + + public BeaconException() { + } + + public BeaconException(String message) { + super(message); + } + + public BeaconException(String message, Throwable cause) { + super(message, cause); + } + + public BeaconException(Throwable cause) { + super(cause); + } + + public BeaconException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + } +} diff --git a/beacon/src/main/java/io/xpipe/beacon/exchange/PreStoreExchange.java b/beacon/src/main/java/io/xpipe/beacon/exchange/PreStoreExchange.java index 6a4d4371..1b0baf9b 100644 --- a/beacon/src/main/java/io/xpipe/beacon/exchange/PreStoreExchange.java +++ b/beacon/src/main/java/io/xpipe/beacon/exchange/PreStoreExchange.java @@ -2,13 +2,11 @@ package io.xpipe.beacon.exchange; import io.xpipe.beacon.message.RequestMessage; import io.xpipe.beacon.message.ResponseMessage; -import io.xpipe.core.store.LocalFileDataStore; +import io.xpipe.core.store.StreamDataStore; import lombok.Builder; import lombok.Value; import lombok.extern.jackson.Jacksonized; -import java.nio.file.Path; - public class PreStoreExchange implements MessageExchange { @Override @@ -36,6 +34,6 @@ public class PreStoreExchange implements MessageExchange descriptor; - @NonNull DataSourceConfigInstance config; } } diff --git a/beacon/src/main/java/io/xpipe/beacon/exchange/ReadExecuteExchange.java b/beacon/src/main/java/io/xpipe/beacon/exchange/ReadExecuteExchange.java index 5ab7b27f..94a4fca9 100644 --- a/beacon/src/main/java/io/xpipe/beacon/exchange/ReadExecuteExchange.java +++ b/beacon/src/main/java/io/xpipe/beacon/exchange/ReadExecuteExchange.java @@ -3,7 +3,7 @@ package io.xpipe.beacon.exchange; import io.xpipe.beacon.message.RequestMessage; import io.xpipe.beacon.message.ResponseMessage; import io.xpipe.core.source.DataSourceConfigInstance; -import io.xpipe.core.source.DataSourceReference; +import io.xpipe.core.source.DataSourceId; import io.xpipe.core.store.DataStore; import lombok.Builder; import lombok.NonNull; @@ -35,8 +35,8 @@ public class ReadExecuteExchange implements MessageExchange { @Override @@ -33,7 +30,7 @@ public class StoreEditExchange implements MessageExchange { + + @Override + public String getId() { + return "queryTableData"; + } + + @Override + public Class getRequestClass() { + return QueryTableDataExchange.Request.class; + } + + @Override + public Class getResponseClass() { + return QueryTableDataExchange.Response.class; + } + + @Jacksonized + @Builder + @Value + public static class Request implements RequestMessage { + @NonNull + DataSourceId id; + + @Builder.Default + int maxRows = -1; + } + + @Jacksonized + @Builder + @Value + public static class Response implements ResponseMessage { + } +} diff --git a/beacon/src/main/java/module-info.java b/beacon/src/main/java/module-info.java index b19af2e1..c3c685e7 100644 --- a/beacon/src/main/java/module-info.java +++ b/beacon/src/main/java/module-info.java @@ -1,20 +1,22 @@ import io.xpipe.beacon.exchange.*; +import io.xpipe.beacon.exchange.api.QueryTableDataExchange; module io.xpipe.beacon { exports io.xpipe.beacon; exports io.xpipe.beacon.exchange; exports io.xpipe.beacon.message; + exports io.xpipe.beacon.exchange.api; + exports io.xpipe.beacon.exchange.data; + + opens io.xpipe.beacon; + opens io.xpipe.beacon.exchange; + opens io.xpipe.beacon.exchange.api; + opens io.xpipe.beacon.message; + opens io.xpipe.beacon.exchange.data; requires com.fasterxml.jackson.core; requires com.fasterxml.jackson.databind; requires transitive io.xpipe.core; - - opens io.xpipe.beacon; - opens io.xpipe.beacon.exchange; - opens io.xpipe.beacon.message; - exports io.xpipe.beacon.exchange.data; - opens io.xpipe.beacon.exchange.data; - requires static lombok; uses MessageExchange; @@ -35,5 +37,6 @@ module io.xpipe.beacon { PreStoreExchange, EditPreparationExchange, EditExecuteExchange, + QueryTableDataExchange, VersionExchange; } \ No newline at end of file diff --git a/core/src/main/java/io/xpipe/core/source/DataSourceConfigInstance.java b/core/src/main/java/io/xpipe/core/source/DataSourceConfigInstance.java index e84a09a1..aad84799 100644 --- a/core/src/main/java/io/xpipe/core/source/DataSourceConfigInstance.java +++ b/core/src/main/java/io/xpipe/core/source/DataSourceConfigInstance.java @@ -7,13 +7,36 @@ import lombok.extern.jackson.Jacksonized; import java.util.Map; +/** + * Represents the current configuration of a data source. + * This configuration can either be in progress or complete. + */ @Value @Builder @Jacksonized @AllArgsConstructor public class DataSourceConfigInstance { + public static DataSourceConfigInstance xpbt() { + return new DataSourceConfigInstance("xpbt", DataSourceConfigOptions.empty(), Map.of()); + } + + /** + * The data source provider id. + */ String provider; - DataSourceConfig config; + + /** + * The available configuration options. + */ + DataSourceConfigOptions configOptions; + + /** + * The current configuration options that are set. + */ Map currentValues; + + public boolean isComplete() { + return currentValues.size() == configOptions.getOptions().size(); + } } diff --git a/core/src/main/java/io/xpipe/core/source/DataSourceConfig.java b/core/src/main/java/io/xpipe/core/source/DataSourceConfigOptions.java similarity index 73% rename from core/src/main/java/io/xpipe/core/source/DataSourceConfig.java rename to core/src/main/java/io/xpipe/core/source/DataSourceConfigOptions.java index 35e08531..495165d8 100644 --- a/core/src/main/java/io/xpipe/core/source/DataSourceConfig.java +++ b/core/src/main/java/io/xpipe/core/source/DataSourceConfigOptions.java @@ -12,7 +12,11 @@ import java.util.List; @Value @Builder @Jacksonized -public class DataSourceConfig { +public class DataSourceConfigOptions { + + public static DataSourceConfigOptions empty() { + return new DataSourceConfigOptions(List.of()); + } @Singular List