spa*_*ker 6 java multithreading json gson
我需要找到一个解决方案来处理包含 100 万个元素的 Json 数组并尽快写入输出。我选择线程来并发处理数据。但最棘手的部分是我需要按照收到的顺序将数据写入输出。让我用例子来解释我的问题。
假设我有 Json 数组作为输入,它有 10 个元素。我需要先检查每个整数是偶数还是奇数,然后如果是偶数,则每个整数生成 2 行,如果是奇数,则每个整数生成 3 行。该行是格式
序列号_整数
而序列号为每一行递增。下面是产生 10 行输出的 4 个元素的 Json 数组的示例。我正在使用
格森
解析和迭代json数组
[ 1, 2, 3, 4 ]
Run Code Online (Sandbox Code Playgroud)
我对并发编程很陌生,但我尝试了自己并设法使其产生结果。下面是我的示例代码。
import com.google.gson.stream.JsonReader;
import java.io.BufferedOutputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.StringReader;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
public class SampleCheck {
public static void main(String[] args) throws IOException, InterruptedException {
String jsonStr = "[ 1, 2, 3, 4 ]";
JsonReader jsonReader = new JsonReader(new StringReader(jsonStr));
processJsonArray(jsonReader);
}
private static void processJsonArray(JsonReader jsonReader) throws InterruptedException, IOException {
String newLine = System.getProperty("line.separator");
AtomicInteger writeIndex = new AtomicInteger(0);
AtomicBoolean stop = new AtomicBoolean(false);
ExecutorService executorService = Executors.newFixedThreadPool(4);
ArrayBlockingQueue<Data> queue = new ArrayBlockingQueue<>(100);
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
DataOutputStream out = new DataOutputStream(byteArrayOutputStream);
for (int i = 0; i < 4; i++) {
executorService.submit(() -> {
StringBuilder sb = new StringBuilder(5);
while (!(stop.get() && queue.isEmpty())) {
Data data = queue.poll();
if (data == null) {
continue;
}
try {
int seq = data.getSeq();
String result = newLine;
if (data.getData() % 2 == 0) { //Even
result += seq++ + "_" + data.getData();
result += newLine;
result += seq + "_" + data.getData();
} else { //odd
result += seq++ + "_" + data.getData();
result += newLine;
result += seq++ + "_" + data.getData();
result += newLine;
result += seq + "_" + data.getData();
}
while (data.getIndex() > writeIndex.get()) {
//Do nothing and wait for other threads to complete
}
out.writeBytes(result);
writeIndex.incrementAndGet();
} catch (Exception ignore) {
}
}
});
}
int seq = 1;
int index = 0;
jsonReader.beginArray();
while (true) {
if(jsonReader.hasNext()) {
int data = jsonReader.nextInt();
queue.add(new Data(data, index, seq));
index++;
seq += (data % 2) == 0 ? 2 : 3;
} else {
break;
}
}
stop.set(true);
executorService.shutdown();
executorService.awaitTermination(20, TimeUnit.MINUTES);
out.close();
System.out.println(new String(byteArrayOutputStream.toByteArray()));
}
private static class Data {
private int data;
private int index;
private int seq;
public Data(int data, int index, int seq) {
this.data = data;
this.index = index;
this.seq = seq;
}
public int getData() {
return data;
}
public int getIndex() {
return index;
}
public int getSeq() {
return seq;
}
}
}
Run Code Online (Sandbox Code Playgroud)
但是我需要专家的建议以不同的方式解决这个问题并获得最大的性能。我的代码看起来非常冗长,如果我可以适应任何或任何更改以获得最大性能,我需要比这更好的解决方案。你们能帮我还是这段代码看起来不错?
PS:上面的例子是为了说明我的问题。在现实世界中,我在 zip 流中获取数据(最多 100 万个)并将这些行写入 zip 输出流
编辑:添加了更现实的例子。处理 Json 数组而不是 List。我需要processJsonArray
方法方面的帮助。在现实世界中 json 阅读器需要处理 100 万个元素
这似乎是并行流的一个很好的用例。Java 将完成拆分为单独线程并按顺序重新组装的所有艰苦工作,您根本不需要处理并发或线程。
您的代码可能很简单:
inputList.parallelStream()
.flatMap(in -> createOutputLines(in))
.forEach(out -> output(out));
Run Code Online (Sandbox Code Playgroud)
话虽如此,如果您的 IO 以外的任何其他因素对性能产生重大影响,我会感到非常惊讶。您需要对输入进行非常复杂的处理,以使其不仅仅是一个舍入误差。