Rework table mappings And various bug fixes

This commit is contained in:
Christopher Schnick 2022-10-29 03:59:03 +02:00
parent 38efb368ec
commit d5566cc662
24 changed files with 342 additions and 87 deletions

View file

@ -4,6 +4,7 @@ import io.xpipe.core.data.node.ArrayNode;
import io.xpipe.core.data.node.DataStructureNode;
import io.xpipe.core.data.node.DataStructureNodeAcceptor;
import io.xpipe.core.data.node.TupleNode;
import io.xpipe.core.source.TableMapping;
import io.xpipe.core.source.TableWriteConnection;
import java.util.ArrayList;
@ -15,9 +16,11 @@ public abstract class BatchTableWriteConnection implements TableWriteConnection
protected final int batchSize = DEFAULT_BATCH_SIZE;
private final List<DataStructureNode> batch = new ArrayList<>();
private TableMapping mapping;
@Override
public final DataStructureNodeAcceptor<TupleNode> writeLinesAcceptor() {
public final DataStructureNodeAcceptor<TupleNode> writeLinesAcceptor(TableMapping mapping) {
this.mapping = mapping;
return node -> {
if (batch.size() < batchSize) {
batch.add(node);
@ -27,7 +30,7 @@ public abstract class BatchTableWriteConnection implements TableWriteConnection
}
var array = ArrayNode.of(batch);
var returned = writeBatchLinesAcceptor().accept(array);
var returned = writeBatchLinesAcceptor(mapping).accept(array);
batch.clear();
return returned;
};
@ -38,15 +41,15 @@ public abstract class BatchTableWriteConnection implements TableWriteConnection
try {
if (batch.size() > 0) {
var array = ArrayNode.of(batch);
var returned = writeBatchLinesAcceptor().accept(array);
var returned = writeBatchLinesAcceptor(mapping).accept(array);
batch.clear();
}
} finally {
onClose();
onClose(mapping);
}
}
protected abstract void onClose() throws Exception;
protected abstract void onClose(TableMapping mapping) throws Exception;
protected abstract DataStructureNodeAcceptor<ArrayNode> writeBatchLinesAcceptor();
protected abstract DataStructureNodeAcceptor<ArrayNode> writeBatchLinesAcceptor(TableMapping mapping);
}

View file

@ -41,6 +41,11 @@ public class BinarySource extends RawDataSource<StreamDataStore> {
protected RawReadConnection newReadConnection() {
return new RawReadConnection() {
@Override
public boolean canRead() throws Exception {
return getStore().canOpen();
}
private InputStream inputStream;
@Override

View file

@ -55,4 +55,9 @@ public class LimitTableReadConnection implements TableReadConnection {
});
return localCounter.get();
}
@Override
public boolean canRead() throws Exception {
return connection.canRead();
}
}

View file

@ -2,19 +2,23 @@ package io.xpipe.core.impl;
import io.xpipe.core.data.node.DataStructureNodeAcceptor;
import io.xpipe.core.data.node.TupleNode;
import io.xpipe.core.source.DataSource;
import io.xpipe.core.source.DataSourceConnection;
import io.xpipe.core.source.DataSourceType;
import io.xpipe.core.source.TableWriteConnection;
import io.xpipe.core.data.type.TupleType;
import io.xpipe.core.source.*;
import java.util.Optional;
public class PreservingTableWriteConnection extends PreservingWriteConnection implements TableWriteConnection {
public PreservingTableWriteConnection(DataSource<?> source, DataSourceConnection connection, boolean append) {
super(DataSourceType.TABLE, source, append, connection);
}
@Override
public Optional<TableMapping> createMapping(TupleType inputType) throws Exception {
return ((TableWriteConnection) connection).createMapping(inputType);
}
@Override
public DataStructureNodeAcceptor<TupleNode> writeLinesAcceptor() {
return ((TableWriteConnection) connection).writeLinesAcceptor();
public DataStructureNodeAcceptor<TupleNode> writeLinesAcceptor(TableMapping mapping) {
return ((TableWriteConnection) connection).writeLinesAcceptor(mapping);
}
}

View file

@ -0,0 +1,15 @@
package io.xpipe.core.impl;
import io.xpipe.core.source.DataSource;
import io.xpipe.core.source.DataSourceConnection;
import lombok.Getter;
public class SimpleDataSourceConnection<T extends DataSource<?>> implements DataSourceConnection {
@Getter
protected final T source;
public SimpleDataSourceConnection(T source) {
this.source = source;
}
}

View file

@ -0,0 +1,27 @@
package io.xpipe.core.impl;
import io.xpipe.core.data.type.TupleType;
import io.xpipe.core.source.TableDataSource;
import io.xpipe.core.source.TableMapping;
import io.xpipe.core.source.TableWriteConnection;
import java.util.Optional;
public interface SimpleTableWriteConnection<T extends TableDataSource<?>> extends TableWriteConnection {
public T getSource();
public default Optional<TupleType> getType() throws Exception {
return getSource().determineDataType();
}
public default Optional<TableMapping> createMapping(TupleType inputType) throws Exception {
var outputType = getType();
if (outputType.isEmpty() || outputType.get().getSize() == 0){
return Optional.of(TableMapping.createIdentity(inputType));
}
return TableMapping.createBasic(inputType, outputType.get());
}
}

View file

@ -7,36 +7,38 @@ import io.xpipe.core.data.node.TupleNode;
import io.xpipe.core.data.type.TupleType;
import io.xpipe.core.data.typed.TypedDataStreamParser;
import io.xpipe.core.data.typed.TypedDataStructureNodeReader;
import io.xpipe.core.source.StreamReadConnection;
import io.xpipe.core.source.TableReadConnection;
import io.xpipe.core.store.StreamDataStore;
import io.xpipe.core.util.JacksonMapper;
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
public class XpbtReadConnection implements TableReadConnection {
public class XpbtReadConnection extends StreamReadConnection implements TableReadConnection {
private final StreamDataStore store;
private TupleType dataType;
private InputStream inputStream;
private TypedDataStreamParser parser;
private boolean empty;
protected XpbtReadConnection(StreamDataStore store) {
this.store = store;
protected XpbtReadConnection(XpbtSource source) {
super(source.getStore(), null);
this.store = source.getStore();
}
@Override
public void init() throws Exception {
this.inputStream = store.openBufferedInput();
super.init();
this.inputStream.mark(8192);
var header = new BufferedReader(new InputStreamReader(inputStream)).readLine();
this.inputStream.reset();
if (header == null || header.trim().length() == 0) {
this.dataType = TupleType.empty();
empty = true;
return;
}
@ -51,12 +53,7 @@ public class XpbtReadConnection implements TableReadConnection {
this.dataType = dataType;
this.parser = new TypedDataStreamParser(dataType);
}
@Override
public void close() throws Exception {
inputStream.close();
}
@Override
public TupleType getDataType() {
return dataType;
@ -95,4 +92,9 @@ public class XpbtReadConnection implements TableReadConnection {
}
return counter;
}
@Override
public boolean canRead() throws Exception {
return store.canOpen();
}
}

View file

@ -15,11 +15,11 @@ public class XpbtSource extends TableDataSource<StreamDataStore> {
@Override
protected TableWriteConnection newWriteConnection() {
return new XpbtWriteConnection(store);
return new XpbtWriteConnection(this);
}
@Override
protected TableReadConnection newReadConnection() {
return new XpbtReadConnection(store);
return new XpbtReadConnection(this);
}
}

View file

@ -7,39 +7,28 @@ import io.xpipe.core.data.node.DataStructureNodeAcceptor;
import io.xpipe.core.data.node.TupleNode;
import io.xpipe.core.data.type.TupleType;
import io.xpipe.core.data.typed.TypedDataStreamWriter;
import io.xpipe.core.source.TableWriteConnection;
import io.xpipe.core.store.StreamDataStore;
import io.xpipe.core.source.StreamWriteConnection;
import io.xpipe.core.source.TableMapping;
import io.xpipe.core.util.JacksonMapper;
import lombok.Getter;
import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.nio.charset.StandardCharsets;
public class XpbtWriteConnection implements TableWriteConnection {
public class XpbtWriteConnection extends StreamWriteConnection implements SimpleTableWriteConnection<XpbtSource> {
private final StreamDataStore store;
private OutputStream outputStream;
@Getter
private final XpbtSource source;
private TupleType writtenDescriptor;
public XpbtWriteConnection(StreamDataStore store) {
this.store = store;
public XpbtWriteConnection(XpbtSource source) {
super(source.getStore(), null);
this.source = source;
}
@Override
public void init() throws Exception {
outputStream = store.openOutput();
}
@Override
public void close() throws Exception {
if (outputStream != null) {
outputStream.close();
}
}
@Override
public DataStructureNodeAcceptor<TupleNode> writeLinesAcceptor() {
public DataStructureNodeAcceptor<TupleNode> writeLinesAcceptor(TableMapping mapping) {
return t -> {
writeDescriptor(t);
TypedDataStreamWriter.writeStructure(outputStream, t, writtenDescriptor);
@ -65,4 +54,5 @@ public class XpbtWriteConnection implements TableWriteConnection {
}
writer.flush();
}
}

View file

@ -49,7 +49,7 @@ public abstract class DataSource<DS extends DataStore> extends JacksonizedValue
store.validate();
}
public void validate() throws Exception {
public void checkComplete() throws Exception {
if (store == null) {
throw new IllegalStateException("Store cannot be null");
}

View file

@ -2,5 +2,7 @@ package io.xpipe.core.source;
public interface DataSourceReadConnection extends DataSourceConnection {
boolean canRead() throws Exception;
void forward(DataSourceConnection con) throws Exception;
}

View file

@ -19,13 +19,18 @@ public abstract class StreamReadConnection implements DataSourceReadConnection {
this.charset = charset;
}
@Override
public boolean canRead() throws Exception {
return store.canOpen();
}
@Override
public void init() throws Exception {
if (inputStream != null) {
throw new IllegalStateException("Already initialized");
}
inputStream = store.openInput();
inputStream = store.openBufferedInput();
if (charset != null) {
reader = Charsetter.get().reader(inputStream, charset);
}

View file

@ -1,12 +1,27 @@
package io.xpipe.core.source;
import io.xpipe.core.data.type.TupleType;
import io.xpipe.core.impl.PreservingTableWriteConnection;
import io.xpipe.core.store.DataStore;
import lombok.experimental.SuperBuilder;
import java.util.Optional;
@SuperBuilder
public abstract class TableDataSource<DS extends DataStore> extends DataSource<DS> {
public Optional<TupleType> determineDataType() throws Exception {
try (var readConnection = newReadConnection()) {
var canRead = readConnection != null && readConnection.canRead();
if (canRead) {
readConnection.init();
return Optional.ofNullable(readConnection.getDataType());
} else {
return Optional.empty();
}
}
}
@Override
public final DataSourceInfo determineInfo() throws Exception {
if (!getFlow().hasInput()) {
@ -22,7 +37,7 @@ public abstract class TableDataSource<DS extends DataStore> extends DataSource<D
public final TableReadConnection openReadConnection() throws Exception {
try {
validate();
checkComplete();
} catch (Exception e) {
return TableReadConnection.empty();
}

View file

@ -0,0 +1,106 @@
package io.xpipe.core.source;
import io.xpipe.core.data.type.TupleType;
import lombok.Getter;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.stream.IntStream;
@Getter
public class TableMapping {
private static Integer[] range(int size) {
var array = new Integer[size];
for (int i = 0; i < size; i++) {
array[i] = i;
}
return array;
}
public static TableMapping empty(TupleType inputType) {
return new TableMapping(inputType, TupleType.empty(), new Integer[inputType.getSize()]);
}
public static TableMapping createIdentity(TupleType inputType) {
return new TableMapping(inputType, inputType, range(inputType.getSize()));
}
public static Optional<TableMapping> createBasic(TupleType inputType, TupleType outputType) {
// Name based mapping
if (inputType.hasAllNames()) {
var array = new Integer[inputType.getSize()];
for (int i = 0; i < inputType.getNames().size(); i++) {
var map = mapColumnName(inputType.getNames().get(i), outputType.getNames());
array[i] = map.isPresent() ? map.getAsInt() : null;
}
return Optional.of(new TableMapping(inputType, outputType, array));
}
// Index based mapping
if ((!inputType.hasAllNames() || outputType.hasAllNames()) && inputType.getSize() == outputType.getSize()) {
return Optional.of(new TableMapping(inputType, outputType, range(inputType.getSize())));
}
return Optional.empty();
}
private static OptionalInt mapColumnName(String inputName, List<String> outputNames) {
for (int i = 0; i < outputNames.size(); i++) {
if (outputNames.get(i) != null && outputNames.get(i).trim().equalsIgnoreCase(inputName.trim())) {
return OptionalInt.of(i);
}
}
return OptionalInt.empty();
}
private final TupleType inputType;
private final TupleType outputType;
protected final Integer[] columMap;
public TableMapping(TupleType inputType, TupleType outputType, Integer[] columMap) {
this.inputType = inputType;
this.outputType = outputType;
this.columMap = columMap;
}
public boolean isIdentity() {
return inputType.equals(outputType) && Arrays.equals(columMap, range(getInputType().getSize()));
}
public boolean isComplete() {
return IntStream.range(0, outputType.getSize())
.allMatch(value -> inverseMap(value).isPresent());
}
public TableMapping sub(List<String> outputNames) {
var array = new Integer[inputType.getSize()];
for (int i = 0; i < outputNames.size(); i++) {
var index = inverseMap(outputType.getNames().indexOf(outputNames.get(i)));
if (index.isPresent()) {
array[index.getAsInt()] = i;
} else {
throw new IllegalStateException();
}
}
return new TableMapping(inputType, outputType, array);
}
public OptionalInt inverseMap(int outputColumn) {
for (int i = 0; i < inputType.getNames().size(); i++) {
if (map(i).orElse(-1) == outputColumn) {
return OptionalInt.of(i);
}
}
return OptionalInt.empty();
}
public OptionalInt map(int inputColumn) {
return columMap[inputColumn] != null ? OptionalInt.of(columMap[inputColumn]) : OptionalInt.empty();
}
}

View file

@ -20,6 +20,11 @@ public interface TableReadConnection extends DataSourceReadConnection {
public static TableReadConnection empty() {
return new TableReadConnection() {
@Override
public boolean canRead() throws Exception {
return true;
}
@Override
public TupleType getDataType() {
return TupleType.empty();
@ -102,7 +107,9 @@ public interface TableReadConnection extends DataSourceReadConnection {
}
default int forwardAndCount(DataSourceConnection con) throws Exception {
var inputType = getDataType();
var tCon = (TableWriteConnection) con;
return withRows(tCon.writeLinesAcceptor());
var mapping = tCon.createMapping(inputType);
return withRows(tCon.writeLinesAcceptor(mapping.orElseThrow()));
}
}

View file

@ -1,9 +1,10 @@
package io.xpipe.core.source;
import io.xpipe.core.data.node.ArrayNode;
import io.xpipe.core.data.node.DataStructureNode;
import io.xpipe.core.data.node.DataStructureNodeAcceptor;
import io.xpipe.core.data.node.TupleNode;
import io.xpipe.core.data.type.TupleType;
import java.util.Optional;
/**
* A connection for sequentially writing data to a table data source.
@ -13,7 +14,12 @@ public interface TableWriteConnection extends DataSourceConnection {
public static TableWriteConnection empty() {
return new TableWriteConnection() {
@Override
public DataStructureNodeAcceptor<TupleNode> writeLinesAcceptor() {
public Optional<TableMapping> createMapping(TupleType inputType) throws Exception {
return Optional.of(TableMapping.empty(inputType));
}
@Override
public DataStructureNodeAcceptor<TupleNode> writeLinesAcceptor(TableMapping mapping) {
return node -> {
return true;
};
@ -21,12 +27,7 @@ public interface TableWriteConnection extends DataSourceConnection {
};
}
DataStructureNodeAcceptor<TupleNode> writeLinesAcceptor();
Optional<TableMapping> createMapping(TupleType inputType) throws Exception;
default void writeLines(ArrayNode lines) throws Exception {
var consumer = writeLinesAcceptor();
for (DataStructureNode dataStructureNode : lines.getNodes()) {
consumer.accept(dataStructureNode.asTuple());
}
}
DataStructureNodeAcceptor<TupleNode> writeLinesAcceptor(TableMapping mapping);
}

View file

@ -3,6 +3,10 @@ package io.xpipe.core.store;
public class DataStoreFormatter {
public static String ellipsis(String input, int length) {
if (input == null) {
return "";
}
var end = Math.min(input.length(), length);
if (end < input.length()) {
return input.substring(0, end) + "...";

View file

@ -7,7 +7,6 @@ import io.xpipe.core.util.SecretValue;
import java.io.*;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
@ -34,7 +33,7 @@ public class LocalStore extends JacksonizedValue implements MachineFileStore, St
@Override
public NewLine getNewLine() {
return ShellTypes.getDefault().getNewLine();
return ShellTypes.getPlatformDefault().getNewLine();
}
@Override
@ -63,7 +62,7 @@ public class LocalStore extends JacksonizedValue implements MachineFileStore, St
@Override
public ShellType determineType() throws Exception {
return ShellTypes.getDefault();
return ShellTypes.getPlatformDefault();
}
class LocalProcessControl extends ProcessControl {
@ -85,7 +84,7 @@ public class LocalStore extends JacksonizedValue implements MachineFileStore, St
private InputStream createInputStream() {
var string =
input.stream().map(secret -> secret.getSecretValue()).collect(Collectors.joining("\n")) + "\r\n";
return new ByteArrayInputStream(string.getBytes(StandardCharsets.US_ASCII));
return new ByteArrayInputStream(string.getBytes(charset));
}
@Override
@ -105,7 +104,7 @@ public class LocalStore extends JacksonizedValue implements MachineFileStore, St
}
});
t.setDaemon(true);
t.start();
//t.start();
}
@Override

View file

@ -52,7 +52,7 @@ public abstract class ProcessControl {
var pc = this;
pc.start();
AtomicReference<String> readError = new AtomicReference<>();
AtomicReference<String> readError = new AtomicReference<>("");
var errorThread = new Thread(() -> {
try {
@ -64,7 +64,7 @@ public abstract class ProcessControl {
errorThread.setDaemon(true);
errorThread.start();
AtomicReference<String> read = new AtomicReference<>();
AtomicReference<String> read = new AtomicReference<>("");
var t = new Thread(() -> {
try {
read.set(new String(pc.getStdout().readAllBytes(), pc.getCharset()));

View file

@ -49,7 +49,7 @@ public class ShellTypes {
}
}
public static StandardShellStore.ShellType getDefault() {
public static StandardShellStore.ShellType getRecommendedDefault() {
if (System.getProperty("os.name").startsWith("Windows")) {
return POWERSHELL;
} else {
@ -57,6 +57,14 @@ public class ShellTypes {
}
}
public static StandardShellStore.ShellType getPlatformDefault() {
if (System.getProperty("os.name").startsWith("Windows")) {
return CMD;
} else {
return SH;
}
}
public static StandardShellStore.ShellType[] getWindowsShells() {
return new StandardShellStore.ShellType[] {CMD, POWERSHELL};
}

View file

@ -55,11 +55,13 @@ public class DynamicOptionsComp extends Comp<CompStructure<FlowPane>> {
line.getChildren().add(name);
}
var r = entry.comp().createRegion();
compRegions.add(r);
line.getChildren().add(r);
if (!wrap) {
HBox.setHgrow(r, Priority.ALWAYS);
if (entry.comp() != null) {
var r = entry.comp().createRegion();
compRegions.add(r);
line.getChildren().add(r);
if (!wrap) {
HBox.setHgrow(r, Priority.ALWAYS);
}
}
flow.getChildren().add(line);
@ -82,20 +84,22 @@ public class DynamicOptionsComp extends Comp<CompStructure<FlowPane>> {
compRegions.forEach(r -> r.prefWidthProperty().bind(compWidthBinding));
}
var nameWidthBinding = Bindings.createDoubleBinding(
() -> {
if (nameRegions.stream().anyMatch(r -> r.getWidth() == 0)) {
return Region.USE_COMPUTED_SIZE;
}
if (entries.stream().anyMatch(entry -> entry.name() != null)) {
var nameWidthBinding = Bindings.createDoubleBinding(
() -> {
if (nameRegions.stream().anyMatch(r -> r.getWidth() == 0)) {
return Region.USE_COMPUTED_SIZE;
}
var m = nameRegions.stream()
.map(Region::getWidth)
.max(Double::compareTo)
.orElse(0.0);
return m;
},
nameRegions.stream().map(Region::widthProperty).toList().toArray(new Observable[0]));
nameRegions.forEach(r -> r.prefWidthProperty().bind(nameWidthBinding));
var m = nameRegions.stream()
.map(Region::getWidth)
.max(Double::compareTo)
.orElse(0.0);
return m;
},
nameRegions.stream().map(Region::widthProperty).toList().toArray(new Observable[0]));
nameRegions.forEach(r -> r.prefWidthProperty().bind(nameWidthBinding));
}
return new SimpleCompStructure<>(flow);
}

View file

@ -0,0 +1,40 @@
package io.xpipe.extension.util;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.util.List;
import java.util.stream.Collectors;
public class ExpectHelper {
private static Path extractedExecutable = null;
private static Path getExtractedExecutable() {
if (extractedExecutable == null) {
XPipeDaemon.getInstance().withResource("io.xpipe.extension", "bin/expect.exe", path -> {
extractedExecutable = Files.createTempFile(null, ".exe");
Files.copy(path, extractedExecutable, StandardCopyOption.REPLACE_EXISTING);
});
}
return extractedExecutable;
}
public static List<String> executeExpect(List<String> command, String password) throws IOException {
var file = Files.createTempFile(null, ".lua");
Files.writeString(file, expectFile(command, password));
return List.of(getExtractedExecutable().toString(), file.toString());
}
private static String expectFile(List<String> command, String password) {
return String.format("""
echo(false)
if spawn(%s) then
sendln("%s")
echo(true)
end
""", command.stream().map(s -> "\"" + s + "\"").collect(Collectors.joining(", ")), password);
}
}

View file

@ -1,10 +1,19 @@
package io.xpipe.extension.util;
import org.apache.commons.lang3.function.FailableRunnable;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
public class ThreadHelper {
public static void await(FailableRunnable<InterruptedException> r) {
try {
r.run();
} catch (InterruptedException e) {
}
}
public static Thread run(Runnable r) {
var t = new Thread(r);
t.setDaemon(true);

View file

@ -1,5 +1,6 @@
package io.xpipe.extension.util;
import io.xpipe.core.charsetter.Charsetter;
import io.xpipe.core.source.DataSource;
import io.xpipe.core.source.DataSourceType;
import io.xpipe.core.store.DataStore;
@ -10,6 +11,8 @@ import javafx.beans.property.Property;
import javafx.beans.value.ObservableValue;
import javafx.scene.image.Image;
import java.io.IOException;
import java.nio.file.Path;
import java.util.List;
import java.util.Optional;
import java.util.ServiceLoader;
@ -21,6 +24,7 @@ public interface XPipeDaemon {
return ServiceLoader.load(XPipeDaemon.class).findFirst().orElseThrow();
}
void withResource(String module, String file, Charsetter.FailableConsumer<Path, IOException> con);
List<DataStore> getNamedStores();
Image image(String file);