Ric*_*ter 7 c# .net-core c#-8.0 .net-core-3.0 system.text.json
可以说,我请求一个包含许多对象列表的大型json文件。我不希望它们一次全部出现在内存中,但我宁愿一个个地读取和处理它们。所以我需要将异步System.IO.Stream流转换为IAsyncEnumerable<T>。我如何使用新的System.Text.JsonAPI来做到这一点?
private async IAsyncEnumerable<T> GetList<T>(Uri url, CancellationToken cancellationToken = default)
{
using (var httpResponse = await httpClient.GetAsync(url, cancellationToken))
{
using (var stream = await httpResponse.Content.ReadAsStreamAsync())
{
// Probably do something with JsonSerializer.DeserializeAsync here without serializing the entire thing in one go
}
}
}
Run Code Online (Sandbox Code Playgroud)
Pan*_*vos 11
TL;DR这不是微不足道的
看起来有人已经 发布了一个Utf8JsonStreamReader结构的完整代码,该结构从流中读取缓冲区并将它们提供给 Utf8JsonRreader,允许使用JsonSerializer.Deserialize<T>(ref newJsonReader, options);. 代码也不是微不足道的。相关问题在这里,答案在这里。
但这还不够 -HttpClient.GetAsync只有在收到整个响应后才会返回,基本上是在内存中缓冲所有内容。
为了避免这种情况,HttpClient.GetAsync(string,HttpCompletionOption )应该与HttpCompletionOption.ResponseHeadersRead.
反序列化循环也应该检查取消令牌,如果有信号则退出或抛出。否则循环将继续,直到整个流被接收和处理。
此代码基于相关答案的示例,并使用HttpCompletionOption.ResponseHeadersRead和检查取消令牌。它可以解析包含适当项目数组的 JSON 字符串,例如:
[{"prop1":123},{"prop1":234}]
Run Code Online (Sandbox Code Playgroud)
第一次调用jsonStreamReader.Read()移动到数组的开头,而第二次调用移动到第一个对象的开头。当]检测到数组 ( ) 的结尾时,循环本身终止。
private async IAsyncEnumerable<T> GetList<T>(Uri url, CancellationToken cancellationToken = default)
{
//Don't cache the entire response
using var httpResponse = await httpClient.GetAsync(url,
HttpCompletionOption.ResponseHeadersRead,
cancellationToken);
using var stream = await httpResponse.Content.ReadAsStreamAsync();
using var jsonStreamReader = new Utf8JsonStreamReader(stream, 32 * 1024);
jsonStreamReader.Read(); // move to array start
jsonStreamReader.Read(); // move to start of the object
while (jsonStreamReader.TokenType != JsonTokenType.EndArray)
{
//Gracefully return if cancellation is requested.
//Could be cancellationToken.ThrowIfCancellationRequested()
if(cancellationToken.IsCancellationRequested)
{
return;
}
// deserialize object
var obj = jsonStreamReader.Deserialize<T>();
yield return obj;
// JsonSerializer.Deserialize ends on last token of the object parsed,
// move to the first token of next object
jsonStreamReader.Read();
}
}
Run Code Online (Sandbox Code Playgroud)
JSON 片段,又名流 JSON 又名 ...*
在事件流或日志记录场景中,将单个 JSON 对象附加到文件中是很常见的,每行一个元素,例如:
{"eventId":1}
{"eventId":2}
...
{"eventId":1234567}
Run Code Online (Sandbox Code Playgroud)
这不是有效的 JSON文档,但各个片段是有效的。这对于大数据/高并发场景有几个优势。添加新事件只需要在文件中追加一个新行,而不需要解析和重建整个文件。处理,尤其是并行处理更容易,原因有两个:
使用 StreamReader
执行此操作的分配方式是使用 TextReader,一次读取一行并使用JsonSerializer.Deserialize解析它:
using var reader=new StreamReader(stream);
string line;
//ReadLineAsync() doesn't accept a CancellationToken
while((line=await reader.ReadLineAsync()) != null)
{
var item=JsonSerializer.Deserialize<T>(line);
yield return item;
if(cancellationToken.IsCancellationRequested)
{
return;
}
}
Run Code Online (Sandbox Code Playgroud)
这比反序列化正确数组的代码简单得多。有两个问题:
ReadLineAsync 不接受取消令牌这可能就足够了,因为尝试生成ReadOnlySpan<Byte>JsonSerializer.Deserialize 所需的缓冲区并非易事。
管道和 SequenceReader
为了避免分配,我们需要ReadOnlySpan<byte>从流中获取 a 。这样做需要使用 System.IO.Pipeline 管道和SequenceReader结构。Steve Gordon 的An Introduction to SequenceReader解释了如何使用这个类使用分隔符从流中读取数据。
不幸的是,SequenceReader是一个 ref 结构,这意味着它不能在异步或本地方法中使用。这就是为什么史蒂夫戈登在他的文章中创建了一个
private static SequencePosition ReadItems(in ReadOnlySequence<byte> sequence, bool isCompleted)
Run Code Online (Sandbox Code Playgroud)
读取项的方法形成 ReadOnlySequence 并返回结束位置,因此 PipeReader 可以从中恢复。不幸的是,我们想要返回 IEnumerable 或 IAsyncEnumerable,并且迭代器方法也不喜欢in或out参数。
我们可以在 List 或 Queue 中收集反序列化的项目并将它们作为单个结果返回,但这仍然会分配列表、缓冲区或节点,并且必须等待缓冲区中的所有项目在返回之前被反序列化:
private static (SequencePosition,List<T>) ReadItems(in ReadOnlySequence<byte> sequence, bool isCompleted)
Run Code Online (Sandbox Code Playgroud)
我们需要一些像可枚举一样的东西,不需要迭代器方法,使用异步并且不缓冲所有东西。
添加频道以生成 IAsyncEnumerable
ChannelReader.ReadAllAsync返回一个 IAsyncEnumerable。我们可以从不能作为迭代器工作的方法返回一个 ChannelReader 并且仍然产生一个没有缓存的元素流。
调整 Steve Gordon 的代码以使用通道,我们得到 ReadItems(ChannelWriter...) 和ReadLastItem方法。第一个,一次读取一个项目,使用ReadOnlySpan<byte> itemBytes. 这可以由JsonSerializer.Deserialize. 如果ReadItems找不到分隔符,它将返回其位置,以便 PipelineReader 可以从流中提取下一个块。
当我们到达最后一个块并且没有其他分隔符时, ReadLastItem` 读取剩余的字节并反序列化它们。
代码几乎与 Steve Gordon 的相同。我们不是写入控制台,而是写入 ChannelWriter。
private const byte NL=(byte)'\n';
private const int MaxStackLength = 128;
private static SequencePosition ReadItems<T>(ChannelWriter<T> writer, in ReadOnlySequence<byte> sequence,
bool isCompleted, CancellationToken token)
{
var reader = new SequenceReader<byte>(sequence);
while (!reader.End && !token.IsCancellationRequested) // loop until we've read the entire sequence
{
if (reader.TryReadTo(out ReadOnlySpan<byte> itemBytes, NL, advancePastDelimiter: true)) // we have an item to handle
{
var item=JsonSerializer.Deserialize<T>(itemBytes);
writer.TryWrite(item);
}
else if (isCompleted) // read last item which has no final delimiter
{
var item = ReadLastItem<T>(sequence.Slice(reader.Position));
writer.TryWrite(item);
reader.Advance(sequence.Length); // advance reader to the end
}
else // no more items in this sequence
{
break;
}
}
return reader.Position;
}
private static T ReadLastItem<T>(in ReadOnlySequence<byte> sequence)
{
var length = (int)sequence.Length;
if (length < MaxStackLength) // if the item is small enough we'll stack allocate the buffer
{
Span<byte> byteBuffer = stackalloc byte[length];
sequence.CopyTo(byteBuffer);
var item=JsonSerializer.Deserialize<T>(byteBuffer);
return item;
}
else // otherwise we'll rent an array to use as the buffer
{
var byteBuffer = ArrayPool<byte>.Shared.Rent(length);
try
{
sequence.CopyTo(byteBuffer);
var item=JsonSerializer.Deserialize<T>(byteBuffer);
return item;
}
finally
{
ArrayPool<byte>.Shared.Return(byteBuffer);
}
}
}
Run Code Online (Sandbox Code Playgroud)
该DeserializeToChannel<T>方法在流的顶部创建一个管道读取器,创建一个通道并启动一个解析块并将它们推送到通道的工作任务:
ChannelReader<T> DeserializeToChannel<T>(Stream stream, CancellationToken token)
{
var pipeReader = PipeReader.Create(stream);
var channel=Channel.CreateUnbounded<T>();
var writer=channel.Writer;
_ = Task.Run(async ()=>{
while (!token.IsCancellationRequested)
{
var result = await pipeReader.ReadAsync(token); // read from the pipe
var buffer = result.Buffer;
var position = ReadItems(writer,buffer, result.IsCompleted,token); // read complete items from the current buffer
if (result.IsCompleted)
break; // exit if we've read everything from the pipe
pipeReader.AdvanceTo(position, buffer.End); //advance our position in the pipe
}
pipeReader.Complete();
},token)
.ContinueWith(t=>{
pipeReader.Complete();
writer.TryComplete(t.Exception);
});
return channel.Reader;
}
Run Code Online (Sandbox Code Playgroud)
ChannelReader.ReceiveAllAsync()可用于通过以下方式消耗所有物品IAsyncEnumerable<T>:
var reader=DeserializeToChannel<MyEvent>(stream,cts.Token);
await foreach(var item in reader.ReadAllAsync(cts.Token))
{
//Do something with it
}
Run Code Online (Sandbox Code Playgroud)
是的,在很多地方,真正的流式 JSON(反)序列化器将是一个很好的性能改进。
不幸的是,此时System.Text.Json不这样做。我不确定将来是否会 - 我希望如此!事实证明,JSON 的真正流式反序列化相当具有挑战性。
也许您可以检查极快的Utf8Json 是否支持它。
但是,可能有针对您的特定情况的自定义解决方案,因为您的要求似乎限制了难度。
这个想法是一次从数组中手动读取一个项目。我们正在利用列表中的每个项目本身就是一个有效的 JSON 对象这一事实。
您可以手动跳过[(对于第一项)或,(对于每个下一项)。然后我认为最好的办法是使用 .NET CoreUtf8JsonReader来确定当前对象的结束位置,并将扫描的字节提供给JsonDeserializer.
这样,您一次只对一个对象稍加缓冲。
既然我们在谈论性能,你可以从 a 获得输入PipeReader,而你正在这样做。:-)
| 归档时间: |
|
| 查看次数: |
266 次 |
| 最近记录: |