你好:我想知道如何编写一个异步表迭代器。假设输入表由很多行组成,当接收到该表时,它是序列化的格式。当接收到表时,迭代器被调用以一行一行地检索。
它通过以下方式执行读取和反序列化: 1) 它首先读取关于行大小的整数并将其反序列化。2) 然后它读取并反序列化该行的内容,其中,a。时间戳首先通过调用 in.readint(), b 准备好。然后读取和反序列化该行的每个键,c。然后读取和反序列化有关非键列的位图字符串。d. 然后调用 in.readint() 读取并反序列化表示非键列数的整数,然后读取并反序列化每个非键列。3) 最后它读取并反序列化文件结束标记,该标记指示是否到达文件末尾。
最后它返回反序列化的行。
这是代码
enter code here
public Row next() {
/* It first reads the integer about the size of the row and
deserialize it. */
int size = in.readInt();
/*Then it reads and deserialize the contents of the row*/
Row row = Row.deserialize(descriptor, in);
/*Finally it reads and deserializes the file end marker, which
indicates if the end of the file is reached.*/
int signal = in.readInt();
if (signal == FILE.END) {
file_end = true;
return row;
}
return row;
}
public Row deserialize(DataInput in) throws IOException {
/*timestamp is first ready by calling in.readint()*/
long timestamp= in.readLong();
Object[] Key = new Object[KeyColumns().size()];
Map<Column, Object> columns = new HashMap<>();
/*then each key of the row is read and deserialized */
int i = 0;
for (Column<?> col : KeyColumns()) {
Key[i++] = col.type.deserialize(in);
}
/* then the bitmap string about the non-key columns is read and
deserialized. */
int bitstring= in.readInt();
/*then calls in.readint() to read and deserialize the integer which
represents the number of non-key columns, and then it reads and
deserialize each non-key column.*/
i = 0;
for (Column<?> col : rowColumns()) {
if ((bitstring & (1 << i)) != 0){
columns.put(col, col.type.deserialize(in));
}
i++;
}
return new Row(timestamp, Key, columns);
}
Run Code Online (Sandbox Code Playgroud)
为了将此迭代器转换为异步迭代器,我正在考虑在 Java 8 中使用 CompletableFuture 并将读取与反序列化分离。也就是说,使用单独的 and 来处理读取,如下所示
public Row next() {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
int size= 0;
try {
size = in.readInt();
} catch (IOException e) {
e.printStackTrace();
}
});
Row row = Row.deserialize(descriptor, in);
int signal = in.readInt();
if (signal == FILE.END) {
file_end = true;
return row;
}
return row;
}
Run Code Online (Sandbox Code Playgroud)
但在我看来,因为执行“size = in.readInt();”的线程 以及执行“Row row = Row.deserialize(descriptor, in);”的主线程 共享同一个流。他们需要一个接一个地发生。仍然没有实现并行性。有没有更好的方法来实现这个异步迭代器?谢谢。
首先,您的DataInput核心有一个阻塞资源 ( )。因此,无论您做什么,您都会在阅读时同步DataInput。
在 Java 8 中我肯定会用流来实现这个。请参阅以下问题:
最简单的方法是实现 aSpliterator并使用它创建一个流StreamSupport.stream(...)。在 a 中,Spliterator您主要只需要实现tryAdvance基本上是“读取下一行”例程的方法。在那里您需要同步读取DataInput。
一旦你拥有了你的,你Stream<Row>将能够使用map或forEach等对其应用不同的功能。
要实现并行性,您需要trySplit在Spliterator. 问题来了:如果你不能并行读取DataInput,那么分割不会给你带来太多。Spliterator但我仍然认为创建一个新的实例并在读取时同步它们是有意义的DataInput。读取不会被并行化,但进一步的处理可能会被并行化(在并行流中)。