From 948dcf33ef1e665cafc9dae615fffa649355670b Mon Sep 17 00:00:00 2001 From: Christopher Schnick Date: Sat, 28 May 2022 23:52:08 +0200 Subject: [PATCH] More beacon additions and small fixes --- .../java/io/xpipe/beacon/BeaconClient.java | 4 + .../java/io/xpipe/beacon/BeaconServer.java | 5 + .../beacon/exchange/EditExecuteExchange.java | 16 +--- .../beacon/exchange/MessageExchange.java | 11 ++- .../exchange/RemoveCollectionExchange.java | 30 ++++++ .../beacon/exchange/RemoveEntryExchange.java | 34 +++++++ .../exchange/RenameCollectionExchange.java | 32 +++++++ .../beacon/exchange/RenameEntryExchange.java | 36 +++++++ .../exchange/api/QueryRawDataExchange.java | 34 +++++++ .../exchange/api/QueryTextDataExchange.java | 10 -- .../beacon/exchange/cli/ConvertExchange.java | 20 ++-- .../exchange/cli/ProviderListExchange.java | 35 +++++++ .../exchange/cli/WriteExecuteExchange.java | 12 --- .../cli/WritePreparationExchange.java | 17 +--- .../beacon/exchange/data/ProviderEntry.java | 14 +++ beacon/src/main/java/module-info.java | 6 ++ .../core/data/node/MutableValueNode.java | 5 + .../java/io/xpipe/core/source/DataSource.java | 22 ++++- .../io/xpipe/core/source/DataSourceInfo.java | 4 +- .../xpipe/core/source/RawReadConnection.java | 10 ++ .../io/xpipe/core/source/TextDataSource.java | 13 ++- .../io/xpipe/core/store/StdinDataStore.java | 93 +++++++++++++++++++ .../io/xpipe/core/store/StdoutDataStore.java | 46 +++++++++ .../xpipe/extension/DataSourceProviders.java | 4 +- .../SimpleFileDataSourceProvider.java | 10 +- 25 files changed, 449 insertions(+), 74 deletions(-) create mode 100644 beacon/src/main/java/io/xpipe/beacon/exchange/RemoveCollectionExchange.java create mode 100644 beacon/src/main/java/io/xpipe/beacon/exchange/RemoveEntryExchange.java create mode 100644 beacon/src/main/java/io/xpipe/beacon/exchange/RenameCollectionExchange.java create mode 100644 beacon/src/main/java/io/xpipe/beacon/exchange/RenameEntryExchange.java create mode 100644 beacon/src/main/java/io/xpipe/beacon/exchange/api/QueryRawDataExchange.java create mode 100644 beacon/src/main/java/io/xpipe/beacon/exchange/cli/ProviderListExchange.java create mode 100644 beacon/src/main/java/io/xpipe/beacon/exchange/data/ProviderEntry.java create mode 100644 core/src/main/java/io/xpipe/core/store/StdinDataStore.java create mode 100644 core/src/main/java/io/xpipe/core/store/StdoutDataStore.java diff --git a/beacon/src/main/java/io/xpipe/beacon/BeaconClient.java b/beacon/src/main/java/io/xpipe/beacon/BeaconClient.java index d1f1f0dc..6ca4ef47 100644 --- a/beacon/src/main/java/io/xpipe/beacon/BeaconClient.java +++ b/beacon/src/main/java/io/xpipe/beacon/BeaconClient.java @@ -168,6 +168,10 @@ public class BeaconClient implements AutoCloseable { System.out.println(read.toPrettyString()); } + if (read.isMissingNode()) { + throw new ConnectorException("Received unexpected EOF"); + } + var se = parseServerError(read); if (se.isPresent()) { se.get().throwError(); diff --git a/beacon/src/main/java/io/xpipe/beacon/BeaconServer.java b/beacon/src/main/java/io/xpipe/beacon/BeaconServer.java index be1c15b9..0b9d5324 100644 --- a/beacon/src/main/java/io/xpipe/beacon/BeaconServer.java +++ b/beacon/src/main/java/io/xpipe/beacon/BeaconServer.java @@ -75,12 +75,17 @@ public class BeaconServer { public static boolean tryStart() throws Exception { var custom = BeaconConfig.getCustomExecCommand(); if (custom != null) { + System.out.println("Starting fork: " + custom); startFork(custom); return true; } var daemonExecutable = getDaemonExecutable(); if (daemonExecutable.isPresent()) { + if (BeaconConfig.debugEnabled()) { + System.out.println("Starting daemon executable: " + daemonExecutable.get()); + } + // Tell daemon that we launched from an external tool new ProcessBuilder(daemonExecutable.get().toString(), "--external") .redirectErrorStream(true) diff --git a/beacon/src/main/java/io/xpipe/beacon/exchange/EditExecuteExchange.java b/beacon/src/main/java/io/xpipe/beacon/exchange/EditExecuteExchange.java index 2c65b236..0f107b4d 100644 --- a/beacon/src/main/java/io/xpipe/beacon/exchange/EditExecuteExchange.java +++ b/beacon/src/main/java/io/xpipe/beacon/exchange/EditExecuteExchange.java @@ -3,6 +3,7 @@ package io.xpipe.beacon.exchange; import io.xpipe.beacon.message.RequestMessage; import io.xpipe.beacon.message.ResponseMessage; import io.xpipe.core.source.DataSourceConfigInstance; +import io.xpipe.core.source.DataSourceId; import io.xpipe.core.source.DataSourceReference; import lombok.Builder; import lombok.NonNull; @@ -19,21 +20,13 @@ public class EditExecuteExchange implements MessageExchange getRequestClass() { - return EditExecuteExchange.Request.class; - } - - @Override - public Class getResponseClass() { - return EditExecuteExchange.Response.class; - } - @Jacksonized @Builder @Value public static class Request implements RequestMessage { - @NonNull DataSourceReference ref; + @NonNull + DataSourceReference ref; + @NonNull DataSourceConfigInstance config; } @@ -42,5 +35,6 @@ public class EditExecuteExchange implements MessageExchange getRequestClass() { - var name = getClass().getName() + "$Request"; + var c = getClass().getSuperclass(); + var name = (MessageExchange.class.isAssignableFrom(c) ? c : getClass()).getName() + "$Request"; return (Class) Class.forName(name); } /** * Returns the response class, needed for serialization. */ - Class getResponseClass(); + @SneakyThrows + @SuppressWarnings("unchecked") + default Class getResponseClass() { + var c = getClass().getSuperclass(); + var name = (MessageExchange.class.isAssignableFrom(c) ? c : getClass()).getName() + "$Response"; + return (Class) Class.forName(name); + } } diff --git a/beacon/src/main/java/io/xpipe/beacon/exchange/RemoveCollectionExchange.java b/beacon/src/main/java/io/xpipe/beacon/exchange/RemoveCollectionExchange.java new file mode 100644 index 00000000..62bc7ebc --- /dev/null +++ b/beacon/src/main/java/io/xpipe/beacon/exchange/RemoveCollectionExchange.java @@ -0,0 +1,30 @@ +package io.xpipe.beacon.exchange; + +import io.xpipe.beacon.message.RequestMessage; +import io.xpipe.beacon.message.ResponseMessage; +import lombok.Builder; +import lombok.NonNull; +import lombok.Value; +import lombok.extern.jackson.Jacksonized; + +public class RemoveCollectionExchange implements MessageExchange { + + @Override + public String getId() { + return "removeCollection"; + } + + @Jacksonized + @Builder + @Value + public static class Request implements RequestMessage { + @NonNull + String collectionName; + } + + @Jacksonized + @Builder + @Value + public static class Response implements ResponseMessage { + } +} diff --git a/beacon/src/main/java/io/xpipe/beacon/exchange/RemoveEntryExchange.java b/beacon/src/main/java/io/xpipe/beacon/exchange/RemoveEntryExchange.java new file mode 100644 index 00000000..a32f4d45 --- /dev/null +++ b/beacon/src/main/java/io/xpipe/beacon/exchange/RemoveEntryExchange.java @@ -0,0 +1,34 @@ +package io.xpipe.beacon.exchange; + +import io.xpipe.beacon.message.RequestMessage; +import io.xpipe.beacon.message.ResponseMessage; +import io.xpipe.core.source.DataSourceId; +import io.xpipe.core.source.DataSourceReference; +import lombok.Builder; +import lombok.NonNull; +import lombok.Value; +import lombok.extern.jackson.Jacksonized; + +public class RemoveEntryExchange implements MessageExchange { + + @Override + public String getId() { + return "removeEntry"; + } + + @Jacksonized + @Builder + @Value + public static class Request implements RequestMessage { + @NonNull + DataSourceReference ref; + } + + @Jacksonized + @Builder + @Value + public static class Response implements ResponseMessage { + @NonNull + DataSourceId id; + } +} diff --git a/beacon/src/main/java/io/xpipe/beacon/exchange/RenameCollectionExchange.java b/beacon/src/main/java/io/xpipe/beacon/exchange/RenameCollectionExchange.java new file mode 100644 index 00000000..5bf6e49f --- /dev/null +++ b/beacon/src/main/java/io/xpipe/beacon/exchange/RenameCollectionExchange.java @@ -0,0 +1,32 @@ +package io.xpipe.beacon.exchange; + +import io.xpipe.beacon.message.RequestMessage; +import io.xpipe.beacon.message.ResponseMessage; +import lombok.Builder; +import lombok.NonNull; +import lombok.Value; +import lombok.extern.jackson.Jacksonized; + +public class RenameCollectionExchange implements MessageExchange { + + @Override + public String getId() { + return "renameCollection"; + } + + @Jacksonized + @Builder + @Value + public static class Request implements RequestMessage { + @NonNull + String collectionName; + @NonNull + String newName; + } + + @Jacksonized + @Builder + @Value + public static class Response implements ResponseMessage { + } +} diff --git a/beacon/src/main/java/io/xpipe/beacon/exchange/RenameEntryExchange.java b/beacon/src/main/java/io/xpipe/beacon/exchange/RenameEntryExchange.java new file mode 100644 index 00000000..aa96de3d --- /dev/null +++ b/beacon/src/main/java/io/xpipe/beacon/exchange/RenameEntryExchange.java @@ -0,0 +1,36 @@ +package io.xpipe.beacon.exchange; + +import io.xpipe.beacon.message.RequestMessage; +import io.xpipe.beacon.message.ResponseMessage; +import io.xpipe.core.source.DataSourceId; +import io.xpipe.core.source.DataSourceReference; +import lombok.Builder; +import lombok.NonNull; +import lombok.Value; +import lombok.extern.jackson.Jacksonized; + +public class RenameEntryExchange implements MessageExchange { + + @Override + public String getId() { + return "renameEntry"; + } + + @Jacksonized + @Builder + @Value + public static class Request implements RequestMessage { + @NonNull + DataSourceReference ref; + + @NonNull + String newName; + } + + @Jacksonized + @Builder + @Value + public static class Response implements ResponseMessage { + @NonNull DataSourceId newId; + } +} diff --git a/beacon/src/main/java/io/xpipe/beacon/exchange/api/QueryRawDataExchange.java b/beacon/src/main/java/io/xpipe/beacon/exchange/api/QueryRawDataExchange.java new file mode 100644 index 00000000..152deaf8 --- /dev/null +++ b/beacon/src/main/java/io/xpipe/beacon/exchange/api/QueryRawDataExchange.java @@ -0,0 +1,34 @@ +package io.xpipe.beacon.exchange.api; + +import io.xpipe.beacon.exchange.MessageExchange; +import io.xpipe.beacon.message.RequestMessage; +import io.xpipe.beacon.message.ResponseMessage; +import io.xpipe.core.source.DataSourceReference; +import lombok.Builder; +import lombok.NonNull; +import lombok.Value; +import lombok.extern.jackson.Jacksonized; + +public class QueryRawDataExchange implements MessageExchange { + + @Override + public String getId() { + return "queryRawData"; + } + + @Jacksonized + @Builder + @Value + public static class Request implements RequestMessage { + @NonNull + DataSourceReference ref; + + int maxBytes; + } + + @Jacksonized + @Builder + @Value + public static class Response implements ResponseMessage { + } +} diff --git a/beacon/src/main/java/io/xpipe/beacon/exchange/api/QueryTextDataExchange.java b/beacon/src/main/java/io/xpipe/beacon/exchange/api/QueryTextDataExchange.java index 5fec6f37..d8070907 100644 --- a/beacon/src/main/java/io/xpipe/beacon/exchange/api/QueryTextDataExchange.java +++ b/beacon/src/main/java/io/xpipe/beacon/exchange/api/QueryTextDataExchange.java @@ -16,16 +16,6 @@ public class QueryTextDataExchange implements MessageExchange getRequestClass() { - return QueryTextDataExchange.Request.class; - } - - @Override - public Class getResponseClass() { - return QueryTextDataExchange.Response.class; - } - @Jacksonized @Builder @Value diff --git a/beacon/src/main/java/io/xpipe/beacon/exchange/cli/ConvertExchange.java b/beacon/src/main/java/io/xpipe/beacon/exchange/cli/ConvertExchange.java index 301a5350..95ccb8a6 100644 --- a/beacon/src/main/java/io/xpipe/beacon/exchange/cli/ConvertExchange.java +++ b/beacon/src/main/java/io/xpipe/beacon/exchange/cli/ConvertExchange.java @@ -6,6 +6,7 @@ import io.xpipe.beacon.message.ResponseMessage; import io.xpipe.core.source.DataSourceConfigInstance; import io.xpipe.core.source.DataSourceId; import io.xpipe.core.source.DataSourceReference; +import io.xpipe.core.source.DataSourceType; import lombok.Builder; import lombok.NonNull; import lombok.Value; @@ -18,16 +19,6 @@ public class ConvertExchange implements MessageExchange getRequestClass() { - return ConvertExchange.Request.class; - } - - @Override - public Class getResponseClass() { - return ConvertExchange.Response.class; - } - @Jacksonized @Builder @Value @@ -35,15 +26,18 @@ public class ConvertExchange implements MessageExchange { + + @Override + public String getId() { + return "providerList"; + } + + @Jacksonized + @Builder + @Value + public static class Request implements RequestMessage { + } + + @Jacksonized + @Builder + @Value + public static class Response implements ResponseMessage { + @NonNull Map> entries; + } +} diff --git a/beacon/src/main/java/io/xpipe/beacon/exchange/cli/WriteExecuteExchange.java b/beacon/src/main/java/io/xpipe/beacon/exchange/cli/WriteExecuteExchange.java index b63bf4f2..7af483cb 100644 --- a/beacon/src/main/java/io/xpipe/beacon/exchange/cli/WriteExecuteExchange.java +++ b/beacon/src/main/java/io/xpipe/beacon/exchange/cli/WriteExecuteExchange.java @@ -5,7 +5,6 @@ import io.xpipe.beacon.message.RequestMessage; import io.xpipe.beacon.message.ResponseMessage; import io.xpipe.core.source.DataSourceConfigInstance; import io.xpipe.core.source.DataSourceReference; -import io.xpipe.core.store.DataStore; import lombok.Builder; import lombok.NonNull; import lombok.Value; @@ -21,16 +20,6 @@ public class WriteExecuteExchange implements MessageExchange getRequestClass() { - return WriteExecuteExchange.Request.class; - } - - @Override - public Class getResponseClass() { - return WriteExecuteExchange.Response.class; - } - @Jacksonized @Builder @Value @@ -38,7 +27,6 @@ public class WriteExecuteExchange implements MessageExchange getRequestClass() { - return WritePreparationExchange.Request.class; - } - - @Override - public Class getResponseClass() { - return WritePreparationExchange.Response.class; - } - @Jacksonized @Builder @Value public static class Request implements RequestMessage { String type; - String output; + @NonNull + StreamDataStore output; @NonNull DataSourceReference ref; } @@ -45,8 +36,6 @@ public class WritePreparationExchange implements MessageExchange { + @NonNull protected DS store; - public DataSource(DS store) { - this.store = store; + @SneakyThrows + @SuppressWarnings("unchecked") + public DataSource copy() { + var mapper = JacksonHelper.newMapper(); + TokenBuffer tb = new TokenBuffer(mapper, false); + mapper.writeValue(tb, this); + return mapper.readValue(tb.asParser(), getClass()); } - public DataSource withStore(DS newStore) { - return null; + public DataSource withStore(DS store) { + var c = copy(); + c.store = store; + return c; } /** diff --git a/core/src/main/java/io/xpipe/core/source/DataSourceInfo.java b/core/src/main/java/io/xpipe/core/source/DataSourceInfo.java index 918ee9a8..5a28d942 100644 --- a/core/src/main/java/io/xpipe/core/source/DataSourceInfo.java +++ b/core/src/main/java/io/xpipe/core/source/DataSourceInfo.java @@ -86,10 +86,12 @@ public abstract class DataSourceInfo { @Value @JsonTypeName("text") public static class Text extends DataSourceInfo { + int characters; int lineCount; @JsonCreator - public Text(int lineCount) { + public Text(int characters, int lineCount) { + this.characters = characters; this.lineCount = lineCount; } diff --git a/core/src/main/java/io/xpipe/core/source/RawReadConnection.java b/core/src/main/java/io/xpipe/core/source/RawReadConnection.java index 725b537f..a70dd146 100644 --- a/core/src/main/java/io/xpipe/core/source/RawReadConnection.java +++ b/core/src/main/java/io/xpipe/core/source/RawReadConnection.java @@ -1,11 +1,21 @@ package io.xpipe.core.source; +import java.io.OutputStream; + public interface RawReadConnection extends DataSourceReadConnection { byte[] readBytes(int max) throws Exception; int BUFFER_SIZE = 8192; + default void forwardBytes(OutputStream out, int maxBytes) throws Exception { + if (maxBytes == 0) { + return; + } + + out.write(readBytes(maxBytes)); + } + default void forward(DataSourceConnection con) throws Exception { try (var tCon = (RawWriteConnection) con) { tCon.init(); diff --git a/core/src/main/java/io/xpipe/core/source/TextDataSource.java b/core/src/main/java/io/xpipe/core/source/TextDataSource.java index a42586f5..9b1cbe18 100644 --- a/core/src/main/java/io/xpipe/core/source/TextDataSource.java +++ b/core/src/main/java/io/xpipe/core/source/TextDataSource.java @@ -2,6 +2,8 @@ package io.xpipe.core.source; import io.xpipe.core.store.DataStore; +import java.util.concurrent.atomic.AtomicInteger; + public abstract class TextDataSource extends DataSource { private static final int MAX_LINE_READ = 1000; @@ -13,9 +15,14 @@ public abstract class TextDataSource extends DataSource { + lineCount.getAndIncrement(); + charCount.addAndGet(s.length()); + }); + boolean limitHit = lineCount.get() == MAX_LINE_READ; + return new DataSourceInfo.Text(limitHit ? -1 : charCount.get(), limitHit ? -1 : lineCount.get()); } } diff --git a/core/src/main/java/io/xpipe/core/store/StdinDataStore.java b/core/src/main/java/io/xpipe/core/store/StdinDataStore.java new file mode 100644 index 00000000..ef63952f --- /dev/null +++ b/core/src/main/java/io/xpipe/core/store/StdinDataStore.java @@ -0,0 +1,93 @@ +package io.xpipe.core.store; + +import lombok.EqualsAndHashCode; +import lombok.Value; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +@EqualsAndHashCode +@Value +public class StdinDataStore implements StreamDataStore { + + @Override + public InputStream openInput() throws Exception { + var in = System.in; + return new InputStream() { + @Override + public int read() throws IOException { + return in.read(); + } + + @Override + public int read(byte[] b) throws IOException { + return in.read(b); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + return in.read(b, off, len); + } + + @Override + public byte[] readAllBytes() throws IOException { + return in.readAllBytes(); + } + + @Override + public byte[] readNBytes(int len) throws IOException { + return in.readNBytes(len); + } + + @Override + public int readNBytes(byte[] b, int off, int len) throws IOException { + return in.readNBytes(b, off, len); + } + + @Override + public long skip(long n) throws IOException { + return in.skip(n); + } + + @Override + public void skipNBytes(long n) throws IOException { + in.skipNBytes(n); + } + + @Override + public int available() throws IOException { + return in.available(); + } + + @Override + public void close() throws IOException { + } + + @Override + public synchronized void mark(int readlimit) { + in.mark(readlimit); + } + + @Override + public synchronized void reset() throws IOException { + in.reset(); + } + + @Override + public boolean markSupported() { + return in.markSupported(); + } + + @Override + public long transferTo(OutputStream out) throws IOException { + return in.transferTo(out); + } + }; + } + + @Override + public boolean exists() { + return false; + } +} diff --git a/core/src/main/java/io/xpipe/core/store/StdoutDataStore.java b/core/src/main/java/io/xpipe/core/store/StdoutDataStore.java new file mode 100644 index 00000000..e65b9aee --- /dev/null +++ b/core/src/main/java/io/xpipe/core/store/StdoutDataStore.java @@ -0,0 +1,46 @@ +package io.xpipe.core.store; + +import lombok.EqualsAndHashCode; +import lombok.Value; + +import java.io.IOException; +import java.io.OutputStream; + +@EqualsAndHashCode +@Value +public class StdoutDataStore implements StreamDataStore { + + @Override + public OutputStream openOutput() throws Exception { + return new OutputStream() { + @Override + public void write(int b) throws IOException { + System.out.write(b); + } + + @Override + public void write(byte[] b) throws IOException { + System.out.write(b); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + System.out.write(b, off, len); + } + + @Override + public void flush() throws IOException { + System.out.flush(); + } + + @Override + public void close() throws IOException { + } + }; + } + + @Override + public boolean exists() { + return false; + } +} diff --git a/extension/src/main/java/io/xpipe/extension/DataSourceProviders.java b/extension/src/main/java/io/xpipe/extension/DataSourceProviders.java index 789e458d..0bdd9081 100644 --- a/extension/src/main/java/io/xpipe/extension/DataSourceProviders.java +++ b/extension/src/main/java/io/xpipe/extension/DataSourceProviders.java @@ -107,13 +107,13 @@ public class DataSourceProviders { .anyMatch(s -> s.equalsIgnoreCase(name))).findAny(); } - public static Optional> byStore(DataStore store) { + public static Optional> byPreferredStore(DataStore store) { if (ALL == null) { throw new IllegalStateException("Not initialized"); } return ALL.stream().filter(d -> d.getFileProvider() != null) - .filter(d -> d.couldSupportStore(store)).findAny(); + .filter(d -> d.prefersStore(store)).findAny(); } public static Set> getAll() { diff --git a/extension/src/main/java/io/xpipe/extension/SimpleFileDataSourceProvider.java b/extension/src/main/java/io/xpipe/extension/SimpleFileDataSourceProvider.java index 793006ec..5b61d0ba 100644 --- a/extension/src/main/java/io/xpipe/extension/SimpleFileDataSourceProvider.java +++ b/extension/src/main/java/io/xpipe/extension/SimpleFileDataSourceProvider.java @@ -18,8 +18,14 @@ public interface SimpleFileDataSourceProvider> extends D continue; } - if (store instanceof FileDataStore l) { - return l.getFileName().matches("\\." + e.getValue() + "$"); + for (var ext : e.getValue()) { + if (ext == null) { + continue; + } + + if (store instanceof FileDataStore l) { + return l.getFileName().endsWith("." + ext); + } } } return false;