Rework nodes

This commit is contained in:
Christopher Schnick 2021-12-17 06:43:50 +01:00
parent 8a36027a31
commit cf89b2ff18
30 changed files with 701 additions and 324 deletions

View file

@ -2,13 +2,13 @@ package io.xpipe.api;
import io.xpipe.api.impl.DataTableImpl;
import io.xpipe.core.data.node.ArrayNode;
import io.xpipe.core.data.node.SimpleTupleNode;
import io.xpipe.core.data.node.TupleNode;
import io.xpipe.core.data.type.DataType;
import io.xpipe.core.source.DataSourceId;
import java.util.OptionalInt;
public interface DataTable extends Iterable<SimpleTupleNode> {
public interface DataTable extends Iterable<TupleNode> {
static DataTable get(String s) {
return DataTableImpl.get(s);

View file

@ -2,18 +2,17 @@ package io.xpipe.api.impl;
import io.xpipe.api.DataTable;
import io.xpipe.api.XPipeApiConnector;
import io.xpipe.beacon.BeaconClient;
import io.xpipe.beacon.ClientException;
import io.xpipe.beacon.ConnectorException;
import io.xpipe.beacon.ServerException;
import io.xpipe.beacon.BeaconClient;
import io.xpipe.beacon.exchange.ReadTableDataExchange;
import io.xpipe.beacon.exchange.ReadTableInfoExchange;
import io.xpipe.core.data.DataStructureNode;
import io.xpipe.core.data.node.ArrayNode;
import io.xpipe.core.data.node.SimpleTupleNode;
import io.xpipe.core.data.node.TupleNode;
import io.xpipe.core.data.type.DataType;
import io.xpipe.core.data.typed.TypedDataStreamReader;
import io.xpipe.core.data.typed.TypedDataStreamCallback;
import io.xpipe.core.data.typed.TypedDataStreamParser;
import io.xpipe.core.data.typed.TypedDataStructureNodeReader;
import io.xpipe.core.source.DataSourceId;
@ -51,7 +50,7 @@ public class DataTableImpl implements DataTable {
this.dataType = dataType;
}
public Stream<SimpleTupleNode> stream() {
public Stream<TupleNode> stream() {
return StreamSupport.stream(
Spliterators.spliteratorUnknownSize(iterator(), Spliterator.ORDERED), false);
}
@ -95,7 +94,8 @@ public class DataTableImpl implements DataTable {
protected void handle(BeaconClient sc) throws ClientException, ServerException, ConnectorException {
var req = new ReadTableDataExchange.Request(id, maxToRead);
performExchange(sc, req, (ReadTableDataExchange.Response res, InputStream in) -> {
TypedDataStreamReader.readStructures(in, new TypedDataStructureNodeReader(dataType, nodes::add));
var r = new TypedDataStreamParser(dataType);
r.readStructures(in, new TypedDataStructureNodeReader(dataType), nodes::add);
}, false);
}
}.execute();
@ -103,14 +103,15 @@ public class DataTableImpl implements DataTable {
}
@Override
public Iterator<SimpleTupleNode> iterator() {
return new Iterator<SimpleTupleNode>() {
public Iterator<TupleNode> iterator() {
return new Iterator<TupleNode>() {
private InputStream input;
private int read;
private final int toRead = size;
private TypedDataStreamCallback callback;
private SimpleTupleNode current;
private TypedDataStreamParser reader;
private TypedDataStructureNodeReader nodeReader;
private TupleNode current;
{
new XPipeApiConnector() {
@ -123,9 +124,8 @@ public class DataTableImpl implements DataTable {
}
}.execute();
callback = new TypedDataStructureNodeReader(dataType, dsn -> {
current = (SimpleTupleNode) dsn;
});
nodeReader = new TypedDataStructureNodeReader(dataType);
reader = new TypedDataStreamParser(dataType);
}
private boolean hasKnownSize() {
@ -143,16 +143,16 @@ public class DataTableImpl implements DataTable {
}
try {
return TypedDataStreamReader.hasNext(input);
return reader.hasNext(input);
} catch (IOException ex) {
throw new UncheckedIOException(ex);
}
}
@Override
public SimpleTupleNode next() {
public TupleNode next() {
try {
TypedDataStreamReader.readStructure(input, callback);
current = (TupleNode) reader.readStructure(input, nodeReader);
} catch (IOException ex) {
throw new UncheckedIOException(ex);
}

View file

@ -12,7 +12,7 @@ public abstract class DataStructureNode implements Iterable<DataStructureNode> {
protected abstract String getName();
protected UnsupportedOperationException unuspported(String s) {
protected UnsupportedOperationException unsupported(String s) {
return new UnsupportedOperationException(getName() + " does not support " + s);
}
@ -21,6 +21,18 @@ public abstract class DataStructureNode implements Iterable<DataStructureNode> {
return toString(0);
}
public DataStructureNode clear() {
throw unsupported("clear");
}
public DataStructureNode setRawData(byte[] data) {
throw unsupported("set raw data");
}
public DataStructureNode set(int index, DataStructureNode node) {
throw unsupported("set at index");
}
public abstract String toString(int indent);
public boolean isTuple() {
@ -35,48 +47,64 @@ public abstract class DataStructureNode implements Iterable<DataStructureNode> {
return false;
}
public DataStructureNode put(String keyName, DataStructureNode node) {
throw unsupported("put node with key");
}
public DataStructureNode put(DataStructureNode node) {
throw unsupported("put node");
}
public DataStructureNode remove(int index) {
throw unsupported("index remove");
}
public DataStructureNode remove(String keyName) {
throw unsupported("key remove");
}
public int size() {
throw unuspported("size computation");
throw unsupported("size computation");
}
public abstract DataType getDataType();
public DataStructureNode at(int index) {
throw unuspported("integer indexing");
throw unsupported("integer indexing");
}
public DataStructureNode forKey(String name) {
throw unuspported("name indexing");
throw unsupported("name indexing");
}
public Optional<DataStructureNode> forKeyIfPresent(String name) {
throw unuspported("name indexing");
throw unsupported("name indexing");
}
public int asInt() {
throw unuspported("integer conversion");
throw unsupported("integer conversion");
}
public String asString() {
throw unuspported("string conversion");
throw unsupported("string conversion");
}
public Stream<DataStructureNode> stream() {
throw unuspported("stream creation");
throw unsupported("stream creation");
}
@Override
public void forEach(Consumer<? super DataStructureNode> action) {
throw unuspported("for each");
throw unsupported("for each");
}
@Override
public Spliterator<DataStructureNode> spliterator() {
throw unuspported("spliterator creation");
throw unsupported("spliterator creation");
}
@Override
public Iterator<DataStructureNode> iterator() {
throw unuspported("iterator creation");
throw unsupported("iterator creation");
}
}

View file

@ -2,9 +2,14 @@ package io.xpipe.core.data;
public class DataStructureNodeIO {
public static final int STRUCTURE_ID = 0;
public static final int TUPLE_ID = 1;
public static final int ARRAY_ID = 2;
public static final int VALUE_ID = 3;
public static final int NAME_ID = 4;
public static final int GENERIC_STRUCTURE_ID = 0;
public static final int GENERIC_TUPLE_ID = 1;
public static final int GENERIC_ARRAY_ID = 2;
public static final int GENERIC_VALUE_ID = 3;
public static final int GENERIC_NAME_ID = 4;
public static final int TYPED_STRUCTURE_ID = 0;
public static final int TYPED_TUPLE_ID = 5;
public static final int TYPED_ARRAY_ID = 6;
public static final int TYPED_VALUE_ID = 7;
}

View file

@ -0,0 +1,10 @@
package io.xpipe.core.data.generic;
import io.xpipe.core.data.DataStructureNode;
public interface GenericAbstractReader extends GenericDataStreamCallback {
boolean isDone();
DataStructureNode create();
}

View file

@ -7,7 +7,7 @@ import io.xpipe.core.data.node.ValueNode;
import java.util.ArrayList;
import java.util.List;
public class GenericArrayReader implements GenericDataStructureNodeReader {
public class GenericArrayReader implements GenericAbstractReader {
public static GenericArrayReader newReader(int length) {
var ar = new GenericArrayReader();
@ -19,7 +19,7 @@ public class GenericArrayReader implements GenericDataStructureNodeReader {
private List<DataStructureNode> nodes;
private int length;
private int currentIndex = 0;
private GenericDataStructureNodeReader currentReader;
private GenericAbstractReader currentReader;
private DataStructureNode created;
public GenericArrayReader() {

View file

@ -1,14 +1,30 @@
package io.xpipe.core.data.generic;
import io.xpipe.core.data.DataStructureNode;
import io.xpipe.core.data.DataStructureNodeIO;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
public class GenericDataStreamReader {
public class GenericDataStreamParser {
private static final int TUPLE_ID = 1;
private static final int ARRAY_ID = 2;
private static final int VALUE_ID = 3;
private static final int NAME_ID = 4;
public static DataStructureNode read(InputStream in) throws IOException {
var reader = new GenericDataStructureNodeReader();
read(in, reader);
return reader.create();
}
public static List<DataStructureNode> readN(InputStream in, int n) throws IOException {
var list = new ArrayList<DataStructureNode>();
var reader = new GenericDataStructureNodeReader();
for (int i = 0; i < n ; i++) {
read(in, reader);
list.add(reader.create());
}
return list;
}
public static void read(InputStream in, GenericDataStreamCallback cb) throws IOException {
var b = in.read();
@ -17,20 +33,20 @@ public class GenericDataStreamReader {
}
switch (b) {
case TUPLE_ID -> {
case DataStructureNodeIO.GENERIC_TUPLE_ID -> {
readTuple(in, cb);
}
case ARRAY_ID -> {
case DataStructureNodeIO.GENERIC_ARRAY_ID -> {
readArray(in, cb);
}
case VALUE_ID -> {
case DataStructureNodeIO.GENERIC_VALUE_ID -> {
readValue(in, cb);
}
case NAME_ID -> {
case DataStructureNodeIO.GENERIC_NAME_ID -> {
readName(in, cb);
read(in, cb);
}
default -> throw new IllegalStateException("Unexpected value: " + b);
default -> throw new IllegalStateException("Unexpected type id: " + b);
}
}

View file

@ -1,10 +1,89 @@
package io.xpipe.core.data.generic;
import io.xpipe.core.data.DataStructureNode;
import io.xpipe.core.data.node.ValueNode;
public interface GenericDataStructureNodeReader extends GenericDataStreamCallback {
public class GenericDataStructureNodeReader implements GenericDataStreamCallback {
boolean isDone();
private DataStructureNode node;
private GenericAbstractReader reader;
DataStructureNode create();
public DataStructureNode create() {
if (node == null) {
throw new IllegalStateException("No node has been created yet");
}
reader = null;
return node;
}
private boolean hasReader() {
return reader != null;
}
@Override
public void onName(String name) {
if (hasReader()) {
reader.onName(name);
return;
}
throw new IllegalStateException("Expected node start but got name");
}
@Override
public void onArrayStart(int length) {
if (hasReader()) {
reader.onArrayStart(length);
return;
}
reader = GenericArrayReader.newReader(length);
}
@Override
public void onArrayEnd() {
if (!hasReader()) {
throw new IllegalStateException("No array to close");
}
reader.onArrayEnd();
if (reader.isDone()) {
node = reader.create();
reader = null;
}
}
@Override
public void onTupleStart(int length) {
if (hasReader()) {
reader.onTupleStart(length);
return;
}
reader = GenericTupleReader.newReader(length);
}
@Override
public void onTupleEnd() {
if (!hasReader()) {
throw new IllegalStateException("No tuple to close");
}
reader.onTupleEnd();
if (reader.isDone()) {
node = reader.create();
reader = null;
}
}
@Override
public void onValue(byte[] value) {
if (hasReader()) {
reader.onValue(value);
return;
}
node = ValueNode.wrap(value);
}
}

View file

@ -1,84 +0,0 @@
package io.xpipe.core.data.generic;
import io.xpipe.core.data.DataStructureNode;
import io.xpipe.core.data.node.ValueNode;
public class GenericDataStructureReader implements GenericDataStreamCallback {
private DataStructureNode node;
private GenericDataStructureNodeReader reader;
public DataStructureNode create() {
if (node == null) {
throw new IllegalStateException("No node has been created yet");
}
return node;
}
private boolean hasReader() {
return reader != null;
}
@Override
public void onName(String name) {
if (hasReader()) {
reader.onName(name);
return;
}
throw new IllegalStateException("Expected node start but got name");
}
@Override
public void onArrayStart(int length) {
if (hasReader()) {
reader.onArrayStart(length);
return;
}
reader = GenericArrayReader.newReader(length);
}
@Override
public void onArrayEnd() {
if (!hasReader()) {
throw new IllegalStateException("No array to close");
}
reader.onArrayEnd();
}
@Override
public void onTupleStart(int length) {
if (hasReader()) {
reader.onTupleStart(length);
return;
}
reader = GenericTupleReader.newReader(length);
}
@Override
public void onTupleEnd() {
if (!hasReader()) {
throw new IllegalStateException("No tuple to close");
}
reader.onTupleEnd();
if (reader.isDone()) {
node = reader.create();
reader = null;
}
}
@Override
public void onValue(byte[] value) {
if (hasReader()) {
reader.onValue(value);
return;
}
node = ValueNode.wrap(value);
}
}

View file

@ -8,7 +8,7 @@ import io.xpipe.core.data.node.SimpleTupleNode;
import java.util.ArrayList;
import java.util.List;
public class GenericTupleReader implements GenericDataStructureNodeReader {
public class GenericTupleReader implements GenericAbstractReader {
public static GenericTupleReader newReader(int length) {
var tr = new GenericTupleReader();
@ -21,7 +21,7 @@ public class GenericTupleReader implements GenericDataStructureNodeReader {
private List<String> names;
private List<DataStructureNode> nodes;
private int currentIndex = 0;
private GenericDataStructureNodeReader currentReader;
private GenericAbstractReader currentReader;
private DataStructureNode created;
public GenericTupleReader() {

View file

@ -2,7 +2,6 @@ package io.xpipe.core.data.node;
import io.xpipe.core.data.DataStructureNode;
import io.xpipe.core.data.type.ArrayType;
import io.xpipe.core.data.type.DataType;
import lombok.EqualsAndHashCode;
import java.util.*;
@ -31,6 +30,18 @@ public class ArrayNode extends DataStructureNode {
return new ArrayNode(new ArrayList<>(valueNodes));
}
@Override
public DataStructureNode put(DataStructureNode node) {
valueNodes.add(node);
return this;
}
@Override
public DataStructureNode set(int index, DataStructureNode node) {
valueNodes.add(index, node);
return this;
}
@Override
public Stream<DataStructureNode> stream() {
return Collections.unmodifiableList(valueNodes).stream();
@ -58,10 +69,16 @@ public class ArrayNode extends DataStructureNode {
}
@Override
public DataType getDataType() {
public ArrayType getDataType() {
return ArrayType.of(valueNodes.stream().map(DataStructureNode::getDataType).toList());
}
@Override
public DataStructureNode clear() {
valueNodes.clear();
return this;
}
@Override
public DataStructureNode at(int index) {
return valueNodes.get(index);

View file

@ -33,7 +33,7 @@ public class NoKeyTupleNode extends TupleNode {
@Override
public DataStructureNode forKey(String name) {
throw unuspported("key indexing");
throw unsupported("key indexing");
}
@Override
@ -47,7 +47,7 @@ public class NoKeyTupleNode extends TupleNode {
}
public String nameAt(int index) {
throw unuspported("name getter");
throw unsupported("name getter");
}
@Override

View file

@ -47,6 +47,13 @@ public class SimpleTupleNode extends TupleNode {
return Optional.of(nodes.get(names.indexOf(name)));
}
@Override
public DataStructureNode clear() {
nodes.clear();
return this;
}
@Override
public int size() {
return nodes.size();

View file

@ -10,7 +10,7 @@ import java.nio.charset.StandardCharsets;
@EqualsAndHashCode(callSuper = false)
public class ValueNode extends DataStructureNode {
private final byte[] data;
private byte[] data;
private ValueNode(byte[] data) {
this.data = data;
@ -24,6 +24,12 @@ public class ValueNode extends DataStructureNode {
return new ValueNode(o.toString().getBytes(StandardCharsets.UTF_8));
}
@Override
public DataStructureNode setRawData(byte[] data) {
this.data = data;
return this;
}
@Override
public boolean isValue() {
return true;

View file

@ -1,21 +1,24 @@
package io.xpipe.core.data.type;
import com.fasterxml.jackson.annotation.JsonTypeName;
import io.xpipe.core.data.DataStructureNode;
import io.xpipe.core.data.type.callback.DataTypeCallback;
import lombok.EqualsAndHashCode;
import java.util.List;
@JsonTypeName("array")
@EqualsAndHashCode
public class ArrayType implements DataType {
public static ArrayType of(List<DataType> types) {
if (types.size() == 0) {
return new ArrayType(null);
return new ArrayType(new WildcardType());
}
var first = types.get(0);
var eq = types.stream().allMatch(d -> d.equals(first));
return new ArrayType(eq ? first : null);
return new ArrayType(eq ? first : new WildcardType());
}
private final DataType sharedType;
@ -37,8 +40,13 @@ public class ArrayType implements DataType {
}
@Override
public boolean isTuple() {
return false;
public String getName() {
return "array";
}
@Override
public boolean matches(DataStructureNode node) {
return node.isArray();
}
@Override
@ -46,11 +54,6 @@ public class ArrayType implements DataType {
return true;
}
@Override
public boolean isValue() {
return false;
}
@Override
public void traverseType(DataTypeCallback cb) {
cb.onArray(this);

View file

@ -1,16 +1,31 @@
package io.xpipe.core.data.type;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import io.xpipe.core.data.DataStructureNode;
import io.xpipe.core.data.type.callback.DataTypeCallback;
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
public interface DataType {
boolean isTuple();
String getName();
boolean isArray();
boolean matches(DataStructureNode node);
boolean isValue();
default boolean isTuple() {
return false;
}
default boolean isWildcard() {
return false;
}
default boolean isArray() {
return false;
}
default boolean isValue() {
return false;
}
void traverseType(DataTypeCallback cb);
}

View file

@ -2,12 +2,15 @@ package io.xpipe.core.data.type;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonTypeName;
import io.xpipe.core.data.DataStructureNode;
import io.xpipe.core.data.type.callback.DataTypeCallback;
import lombok.EqualsAndHashCode;
import java.util.Collections;
import java.util.List;
@JsonTypeName("tuple")
@EqualsAndHashCode
public class TupleType implements DataType {
private List<String> names;
@ -31,21 +34,21 @@ public class TupleType implements DataType {
return new TupleType(Collections.nCopies(types.size(), null), types);
}
@Override
public String getName() {
return "tuple";
}
@Override
public boolean matches(DataStructureNode node) {
return node.isTuple();
}
@Override
public boolean isTuple() {
return true;
}
@Override
public boolean isArray() {
return false;
}
@Override
public boolean isValue() {
return false;
}
@Override
public void traverseType(DataTypeCallback cb) {
cb.onTupleBegin(this);

View file

@ -1,21 +1,22 @@
package io.xpipe.core.data.type;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonTypeName;
import io.xpipe.core.data.DataStructureNode;
import io.xpipe.core.data.type.callback.DataTypeCallback;
import lombok.EqualsAndHashCode;
@JsonTypeName("value")
@EqualsAndHashCode
public class ValueType implements DataType {
@Override
public boolean isTuple() {
return false;
public String getName() {
return "value";
}
@JsonIgnore
@Override
public boolean isArray() {
return false;
public boolean matches(DataStructureNode node) {
return node.isValue();
}
@Override

View file

@ -0,0 +1,27 @@
package io.xpipe.core.data.type;
import io.xpipe.core.data.DataStructureNode;
import io.xpipe.core.data.type.callback.DataTypeCallback;
public class WildcardType implements DataType {
@Override
public String getName() {
return "wildcard";
}
@Override
public boolean matches(DataStructureNode node) {
return true;
}
@Override
public boolean isWildcard() {
return true;
}
@Override
public void traverseType(DataTypeCallback cb) {
}
}

View file

@ -1,29 +0,0 @@
package io.xpipe.core.data.typed;
public class ReusableTypedDataStructureNodeCallback implements TypedDataStreamCallback {
@Override
public void onValue(byte[] data) {
TypedDataStreamCallback.super.onValue(data);
}
@Override
public void onTupleBegin(int size) {
TypedDataStreamCallback.super.onTupleBegin(size);
}
@Override
public void onTupleEnd() {
TypedDataStreamCallback.super.onTupleEnd();
}
@Override
public void onArrayBegin(int size) {
TypedDataStreamCallback.super.onArrayBegin(size);
}
@Override
public void onArrayEnd() {
TypedDataStreamCallback.super.onArrayEnd();
}
}

View file

@ -0,0 +1,10 @@
package io.xpipe.core.data.typed;
import io.xpipe.core.data.DataStructureNode;
public interface TypedAbstractReader extends TypedDataStreamCallback {
boolean isDone();
DataStructureNode create();
}

View file

@ -1,17 +1,25 @@
package io.xpipe.core.data.typed;
import io.xpipe.core.data.DataStructureNode;
import io.xpipe.core.data.type.TupleType;
import java.io.IOException;
public interface TypedDataStreamCallback {
default void onValue(byte[] data) {
}
default void onTupleBegin(int size) {
default void onGenericNode(DataStructureNode node) {
}
default void onTupleBegin(TupleType type) {
}
default void onTupleEnd() {
}
default void onArrayBegin(int size) {
default void onArrayBegin(int size) throws IOException {
}
default void onArrayEnd() {

View file

@ -0,0 +1,136 @@
package io.xpipe.core.data.typed;
import io.xpipe.core.data.DataStructureNode;
import io.xpipe.core.data.DataStructureNodeIO;
import io.xpipe.core.data.generic.GenericDataStreamParser;
import io.xpipe.core.data.generic.GenericDataStructureNodeReader;
import io.xpipe.core.data.type.ArrayType;
import io.xpipe.core.data.type.DataType;
import io.xpipe.core.data.type.TupleType;
import java.io.IOException;
import java.io.InputStream;
import java.util.function.Consumer;
public class TypedDataStreamParser {
public boolean hasNext(InputStream in) throws IOException {
var b = in.read();
if (b == -1) {
return false;
}
if (b != DataStructureNodeIO.TYPED_STRUCTURE_ID) {
throw new IllegalStateException("Unexpected value: " + b);
}
return true;
}
public void readStructures(InputStream in, TypedAbstractReader cb, Consumer<DataStructureNode> consumer) throws IOException {
while (hasNext(in)) {
cb.onNodeBegin();
read(in, cb, dataType);
cb.onNodeEnd();
consumer.accept(cb.create());
}
}
public DataStructureNode readStructure(InputStream in, TypedAbstractReader cb) throws IOException {
if (!hasNext(in)) {
throw new IllegalStateException("No structure to read");
}
cb.onNodeBegin();
read(in, cb, dataType);
cb.onNodeEnd();
return cb.create();
}
private void read(InputStream in, TypedDataStreamCallback cb, DataType type) throws IOException {
var b = in.read();
// Skip
if (b == DataStructureNodeIO.TYPED_STRUCTURE_ID) {
b = in.read();
}
switch (b) {
case DataStructureNodeIO.TYPED_TUPLE_ID -> {
if (!type.isTuple()) {
throw new IllegalStateException("Got tuple but expected " + type.getName());
}
var tt = (TupleType) type;
readTypedTuple(in, cb, tt);
}
case DataStructureNodeIO.TYPED_ARRAY_ID -> {
if (!type.isArray()) {
throw new IllegalStateException("Got array but expected " + type.getName());
}
var at = (ArrayType) type;
readTypedArray(in, cb, at);
}
case DataStructureNodeIO.TYPED_VALUE_ID -> {
if (!type.isValue()) {
throw new IllegalStateException("Got value but expected " + type.getName());
}
readValue(in, cb);
}
default -> throw new IllegalStateException("Unexpected value: " + b);
}
}
private void readTypedTuple(InputStream in, TypedDataStreamCallback cb, TupleType type) throws IOException {
cb.onTupleBegin(type);
for (int i = 0; i < type.getSize(); i++) {
if (type.getTypes().get(i).isWildcard()) {
var r = getGenericReader();
GenericDataStreamParser.read(in, r);
var node = r.create();
cb.onGenericNode(node);
} else {
read(in, cb, type.getTypes().get(i));
}
}
cb.onTupleEnd();
}
private DataType dataType;
private GenericDataStructureNodeReader genericReader;
public TypedDataStreamParser(DataType dataType) {
this.dataType = dataType;
}
private GenericDataStructureNodeReader getGenericReader() {
if (genericReader == null) {
genericReader = new GenericDataStructureNodeReader();
}
return genericReader;
}
private void readTypedArray(InputStream in, TypedDataStreamCallback cb, ArrayType type) throws IOException {
var size = in.read();
cb.onArrayBegin(size);
for (int i = 0; i < size; i++) {
if (type.getSharedType().isWildcard()) {
var r = getGenericReader();
GenericDataStreamParser.read(in, r);
var node = r.create();
cb.onGenericNode(node);
} else {
read(in, cb, type.getSharedType());
}
}
cb.onArrayEnd();
}
private void readValue(InputStream in, TypedDataStreamCallback cb) throws IOException {
var size = in.read();
var data = in.readNBytes(size);
cb.onValue(data);
}
}

View file

@ -1,89 +0,0 @@
package io.xpipe.core.data.typed;
import java.io.IOException;
import java.io.InputStream;
public class TypedDataStreamReader {
private static final int STRUCTURE_ID = 0;
private static final int TUPLE_ID = 1;
private static final int ARRAY_ID = 2;
private static final int VALUE_ID = 3;
public static boolean hasNext(InputStream in) throws IOException {
var b = in.read();
if (b == -1) {
return false;
}
if (b != STRUCTURE_ID) {
throw new IllegalStateException("Unexpected value: " + b);
}
return true;
}
public static void readStructures(InputStream in, TypedDataStreamCallback cb) throws IOException {
while (hasNext(in)) {
cb.onNodeBegin();
read(in, cb);
cb.onNodeEnd();
}
}
public static void readStructure(InputStream in, TypedDataStreamCallback cb) throws IOException {
if (!hasNext(in)) {
throw new IllegalStateException("No structure to read");
}
cb.onNodeBegin();
read(in, cb);
cb.onNodeEnd();
}
private static void read(InputStream in, TypedDataStreamCallback cb) throws IOException {
var b = in.read();
// Skip
if (b == STRUCTURE_ID) {
b = in.read();
}
switch (b) {
case TUPLE_ID -> {
readTuple(in, cb);
}
case ARRAY_ID -> {
readArray(in, cb);
}
case VALUE_ID -> {
readValue(in, cb);
}
default -> throw new IllegalStateException("Unexpected value: " + b);
}
}
private static void readTuple(InputStream in, TypedDataStreamCallback cb) throws IOException {
var size = in.read();
cb.onTupleBegin(size);
for (int i = 0; i < size; i++) {
read(in, cb);
}
cb.onTupleEnd();
}
private static void readArray(InputStream in, TypedDataStreamCallback cb) throws IOException {
var size = in.read();
cb.onArrayBegin(size);
for (int i = 0; i < size; i++) {
read(in, cb);
}
cb.onArrayEnd();
}
private static void readValue(InputStream in, TypedDataStreamCallback cb) throws IOException {
var size = in.read();
var data = in.readNBytes(size);
cb.onValue(data);
}
}

View file

@ -1,33 +1,33 @@
package io.xpipe.core.data.typed;
import io.xpipe.core.data.DataStructureNode;
import io.xpipe.core.data.DataStructureNodeIO;
import io.xpipe.core.data.generic.GenericDataStreamWriter;
import io.xpipe.core.data.node.ArrayNode;
import io.xpipe.core.data.node.SimpleTupleNode;
import io.xpipe.core.data.node.ValueNode;
import io.xpipe.core.data.type.ArrayType;
import io.xpipe.core.data.type.DataType;
import io.xpipe.core.data.type.TupleType;
import java.io.IOException;
import java.io.OutputStream;
public class TypedDataStreamWriter {
private static final int STRUCTURE_ID = 0;
private static final int TUPLE_ID = 1;
private static final int ARRAY_ID = 2;
private static final int VALUE_ID = 3;
public static void writeStructure(OutputStream out, DataStructureNode node) throws IOException {
out.write(STRUCTURE_ID);
write(out, node);
public static void writeStructure(OutputStream out, DataStructureNode node, DataType type) throws IOException {
out.write(DataStructureNodeIO.TYPED_STRUCTURE_ID);
write(out, node, type);
}
private static void write(OutputStream out, DataStructureNode node) throws IOException {
if (node.isTuple()) {
writeTuple(out, (SimpleTupleNode) node);
private static void write(OutputStream out, DataStructureNode node, DataType type) throws IOException {
if (type.isTuple() && node.isTuple()) {
writeTuple(out, (SimpleTupleNode) node, (TupleType) type);
}
else if (node.isArray()) {
writeArray(out, (ArrayNode) node);
else if (node.isArray() && type.isArray()) {
writeArray(out, (ArrayNode) node, (ArrayType) type);
}
else if (node.isValue()) {
else if (node.isValue() && type.isValue()) {
writeValue(out, (ValueNode) node);
} else {
throw new AssertionError();
@ -35,24 +35,35 @@ public class TypedDataStreamWriter {
}
private static void writeValue(OutputStream out, ValueNode n) throws IOException {
out.write(VALUE_ID);
out.write(DataStructureNodeIO.TYPED_VALUE_ID);
out.write(n.getRawData().length);
out.write(n.getRawData());
}
private static void writeTuple(OutputStream out, SimpleTupleNode tuple) throws IOException {
out.write(TUPLE_ID);
out.write(tuple.size());
private static void writeTuple(OutputStream out, SimpleTupleNode tuple, TupleType type) throws IOException {
if (tuple.size() != type.getSize()) {
throw new IllegalArgumentException("Tuple size mismatch");
}
out.write(DataStructureNodeIO.TYPED_TUPLE_ID);
for (int i = 0; i < tuple.size(); i++) {
write(out, tuple.at(i));
if (!type.getTypes().get(i).isWildcard()) {
write(out, tuple.at(i), type.getTypes().get(i));
} else {
GenericDataStreamWriter.write(out, tuple);
}
}
}
private static void writeArray(OutputStream out, ArrayNode array) throws IOException {
out.write(ARRAY_ID);
private static void writeArray(OutputStream out, ArrayNode array, ArrayType type) throws IOException {
out.write(DataStructureNodeIO.TYPED_ARRAY_ID);
out.write(array.size());
for (int i = 0; i < array.size(); i++) {
write(out, array.at(i));
if (!type.getSharedType().isWildcard()) {
write(out, array.at(i), type.getSharedType());
} else {
GenericDataStreamWriter.write(out, array.at(i));
}
}
}
}

View file

@ -9,11 +9,12 @@ import io.xpipe.core.data.type.DataType;
import io.xpipe.core.data.type.TupleType;
import io.xpipe.core.data.type.callback.DataTypeCallback;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Stack;
public class TypedDataStructureNodeReader implements TypedDataStreamCallback {
public class TypedDataStructureNodeReader implements TypedAbstractReader {
private int currentDataTypeIndex;
private final List<DataType> flattened;
@ -22,6 +23,7 @@ public class TypedDataStructureNodeReader implements TypedDataStreamCallback {
private DataStructureNode readNode;
private boolean initialized;
private int arrayDepth;
private boolean makeImmutable;
public TypedDataStructureNodeReader(DataType type) {
flattened = new ArrayList<>();
@ -39,6 +41,11 @@ public class TypedDataStructureNodeReader implements TypedDataStreamCallback {
readNode = null;
}
@Override
public boolean isDone() {
return readNode != null;
}
public DataStructureNode create() {
return readNode;
}
@ -104,11 +111,15 @@ public class TypedDataStructureNodeReader implements TypedDataStreamCallback {
}
@Override
public void onTupleBegin(int size) {
if (flattened.size() == currentDataTypeIndex) {
int a = 0;
public void onGenericNode(DataStructureNode node) {
children.peek().add(node);
if (!isInArray()) {
currentDataTypeIndex++;
}
}
@Override
public void onTupleBegin(TupleType type) {
if (!isInArray() && !flattened.get(currentDataTypeIndex).isTuple()) {
throw new IllegalStateException();
}
@ -133,7 +144,7 @@ public class TypedDataStructureNodeReader implements TypedDataStreamCallback {
}
@Override
public void onArrayBegin(int size) {
public void onArrayBegin(int size) throws IOException {
if (!flattened.get(currentDataTypeIndex).isArray()) {
throw new IllegalStateException();
}

View file

@ -0,0 +1,161 @@
package io.xpipe.core.data.typed;
import io.xpipe.core.data.DataStructureNode;
import io.xpipe.core.data.node.ValueNode;
import io.xpipe.core.data.type.DataType;
import io.xpipe.core.data.type.TupleType;
import io.xpipe.core.data.type.callback.DataTypeCallback;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Stack;
public class TypedReusableDataStructureNodeReader implements TypedDataStreamCallback {
private TypedDataStructureNodeReader initialReader;
private DataStructureNode node;
private final List<DataType> flattened;
private Stack<Integer> indices;
private int arrayDepth;
public TypedReusableDataStructureNodeReader(DataType type) {
flattened = new ArrayList<>();
indices = new Stack<>();
initialReader = new TypedDataStructureNodeReader(type);
type.traverseType(DataTypeCallback.flatten(d -> flattened.add(d)));
}
public DataStructureNode create() {
return node;
}
private boolean isInArray() {
return arrayDepth >= 1;
}
private boolean initialized() {
return node != null;
}
@Override
public void onValue(byte[] data) {
if (!initialized()) {
initialReader.onValue(data);
return;
}
if (isInArray()) {
getCurrentParent().set(indices.peek(), ValueNode.wrap(data));
indices.push(indices.pop() + 1);
} else {
getCurrent().setRawData(data);
indices.push(indices.pop() + 1);
}
}
@Override
public void onGenericNode(DataStructureNode node) {
if (!initialized()) {
initialReader.onGenericNode(node);
return;
}
if (isInArray()) {
getCurrentParent().set(indices.peek(), node);
indices.push(indices.pop() + 1);
} else {
getCurrent().set(indices.peek(), node);
indices.push(indices.pop() + 1);
}
}
private DataStructureNode getCurrentParent() {
var current = node;
for (var index : indices.subList(0, indices.size() - 1)) {
current = current.at(index);
}
return current;
}
private DataStructureNode getCurrent() {
var current = node;
for (var index : indices) {
current = current.at(index);
}
return current;
}
private void setValue(byte[] data) {
var current = node;
for (var index : indices) {
current = current.at(index);
}
var value = (ValueNode) current;
value.setRawData(data);
}
@Override
public void onTupleBegin(TupleType type) {
if (!initialized()) {
initialReader.onTupleBegin(type);
return;
}
indices.push(0);
}
@Override
public void onTupleEnd() {
if (!initialized()) {
initialReader.onTupleEnd();
return;
}
indices.pop();
if (!indices.isEmpty()) {
indices.push(indices.pop() + 1);
}
}
@Override
public void onArrayBegin(int size) throws IOException {
if (!initialized()) {
initialReader.onArrayBegin(size);
return;
}
getCurrent().clear();
indices.push(0);
arrayDepth++;
}
@Override
public void onArrayEnd() {
if (!initialized()) {
initialReader.onArrayEnd();
return;
}
indices.pop();
arrayDepth--;
indices.push(indices.pop() + 1);
}
@Override
public void onNodeBegin() {
if (!initialized()) {
initialReader.onNodeBegin();
return;
}
}
@Override
public void onNodeEnd() {
if (!initialized()) {
initialReader.onNodeEnd();
node = initialReader.create();
}
}
}

View file

@ -3,7 +3,7 @@ package io.xpipe.core.source;
import io.xpipe.core.data.DataStructureNodeAcceptor;
import io.xpipe.core.data.node.ArrayNode;
import io.xpipe.core.data.node.SimpleTupleNode;
import io.xpipe.core.data.node.TupleNode;
import io.xpipe.core.data.type.TupleType;
import java.io.OutputStream;
@ -14,7 +14,7 @@ public interface TableDataReadConnection extends DataSourceConnection {
int determineRowCount() throws Exception;
void withLines(DataStructureNodeAcceptor<SimpleTupleNode> lineAcceptor) throws Exception;
void withLines(DataStructureNodeAcceptor<TupleNode> lineAcceptor) throws Exception;
ArrayNode readLines(int maxLines) throws Exception;

View file

@ -1,8 +1,8 @@
package io.xpipe.core.test;
import io.xpipe.core.data.generic.GenericDataStreamReader;
import io.xpipe.core.data.generic.GenericDataStreamParser;
import io.xpipe.core.data.generic.GenericDataStreamWriter;
import io.xpipe.core.data.generic.GenericDataStructureReader;
import io.xpipe.core.data.generic.GenericDataStructureNodeReader;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@ -25,8 +25,8 @@ public class GenericDataStructureIoTest {
var format = HexFormat.of().withPrefix("0x").withDelimiter(" ");
System.out.println(format.formatHex(data));
var reader = new GenericDataStructureReader();
GenericDataStreamReader.read(new ByteArrayInputStream(data), reader);
var reader = new GenericDataStructureNodeReader();
GenericDataStreamParser.read(new ByteArrayInputStream(data), reader);
var node = reader.create();
Assertions.assertEquals(obj, node);

View file

@ -1,8 +1,9 @@
package io.xpipe.core.test;
import io.xpipe.core.data.typed.TypedDataStreamReader;
import io.xpipe.core.data.typed.TypedDataStreamParser;
import io.xpipe.core.data.typed.TypedDataStreamWriter;
import io.xpipe.core.data.typed.TypedDataStructureNodeReader;
import io.xpipe.core.data.typed.TypedReusableDataStructureNodeReader;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@ -20,17 +21,41 @@ public class TypedDataStructureIoTest {
var obj = createTestData();
var type = obj.getDataType();
var dataOut = new ByteArrayOutputStream();
TypedDataStreamWriter.writeStructure(dataOut, obj);
TypedDataStreamWriter.writeStructure(dataOut, obj, type);
var data = dataOut.toByteArray();
var format = HexFormat.of().withPrefix("0x").withDelimiter(" ");
System.out.println(format.formatHex(data));
var reader = new TypedDataStructureNodeReader(type);
TypedDataStreamReader.readStructure(new ByteArrayInputStream(data), reader);
new TypedDataStreamParser(type).readStructure(new ByteArrayInputStream(data), reader);
var node = reader.create();
Assertions.assertEquals(obj, node);
System.out.println(node);
}
@Test
public void testBasicReusableIo() throws IOException {
var obj = createTestData();
var type = obj.getDataType();
var dataOut = new ByteArrayOutputStream();
TypedDataStreamWriter.writeStructure(dataOut, obj, type);
TypedDataStreamWriter.writeStructure(dataOut, obj, type);
var data = dataOut.toByteArray();
var format = HexFormat.of().withPrefix("0x").withDelimiter(" ");
System.out.println(format.formatHex(data));
var in = new ByteArrayInputStream(data);
var reader = new TypedReusableDataStructureNodeReader(type);
new TypedDataStreamParser(type).readStructure(in, reader);
var firstNode = reader.create();
new TypedDataStreamParser(type).readStructure(in, reader);
var secondNode = reader.create();
System.out.println(firstNode);
Assertions.assertEquals(obj, firstNode);
Assertions.assertEquals(obj, secondNode);
}
}