我想列举一个大IEnumerable过一次,并观察附有各运营商(枚举Count,Sum,Average等)。显而易见的方法是IObservable使用 method将其转换为 an ToObservable,然后为它订阅观察者。我注意到这比其他方法慢得多,比如做一个简单的循环并在每次迭代时通知观察者,或者使用Observable.Create方法而不是ToObservable. 差异很大:它慢了 20-30 倍。就是这样,还是我做错了什么?
using System;
using System.Diagnostics;
using System.Linq;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Reactive.Threading.Tasks;
public static class Program
{
static void Main(string[] args)
{
const int COUNT = 10_000_000;
Method1(COUNT);
Method2(COUNT);
Method3(COUNT);
}
static void Method1(int count)
{
var source = Enumerable.Range(0, count);
var subject = new Subject<int>();
var stopwatch = Stopwatch.StartNew();
source.ToObservable().Subscribe(subject);
Console.WriteLine($"ToObservable: {stopwatch.ElapsedMilliseconds:#,0} msec");
}
static …Run Code Online (Sandbox Code Playgroud) 我有以下同步代码,运行良好:
private void GenerateExportOutput()
{
using StreamWriter writer = new(Coordinator.OutputDirectory + @"\export.txt");
if (this.WikiPagesToExport.IsEmpty)
{
return;
}
var wanted = new SortedDictionary<string, WikiPage>(this.WikiPagesToExport, StringComparer.Ordinal);
foreach (var title in wanted.Keys)
{
writer.WriteLine(title);
}
}
Run Code Online (Sandbox Code Playgroud)
我想将其更改为异步。所以:
private async Task GenerateExportOutputAsync()
{
using StreamWriter writer = new(Coordinator.OutputDirectory + @"\export.txt");
if (this.WikiPagesToExport.IsEmpty)
{
return;
}
var wanted = new SortedDictionary<string, WikiPage>(this.WikiPagesToExport, StringComparer.Ordinal);
foreach (var title in wanted.Keys)
{
await writer.WriteLineAsync(title).ConfigureAwait(false);
}
await writer.FlushAsync().ConfigureAwait(false);
}
Run Code Online (Sandbox Code Playgroud)
哪个编译。但我使用的分析器之一(Meziantou.Analyzer)现在建议我“更喜欢使用‘await using’”。我从来没有使用过await using(尽管我过去尝试过几次并且总是遇到现在遇到的相同问题)。但我想使用它,所以:
await using StreamWriter writer = …Run Code Online (Sandbox Code Playgroud) .NET 6 引入了PeriodicTimer.
我需要每分钟、每分钟都做点什么。例如:09:23:00,09:24:00,09:25:00, ...
但在一分钟的时间内 -new PeriodicTimer(TimeSpan.FromMinutes(1))从 开始09:23:45,我将在以下位置得到“刻度”:09:24:45, 09:25:45, 09:26:45, ...
所以这取决于开始时间。
我的解决方法是使用一秒周期,并检查当前时间的秒数是否等于0。另一种解决方法是等待下一分钟,然后启动计时器。这两种方法都有效,但都很繁琐,并且使用的分辨率太精细。
是否有内置或更好的方法在每分钟结束时触发而不是在开始后一分钟触发?
ConcurrentQueue<T>我最近使用和构建了一个消费者/生产者系统SemaphoreSlim。然后利用新System.Threading.Channel类制作了另一个替代系统。
使用 BenchmarkDotNet 对两个系统进行基准测试后,将 1000 个项目写入两个系统 1000 次(并等待读者完成),我得到以下结果:
| Method | ItemsCount | Iterations | Mean | Error | StdDev | Median | Allocated |
|------------ |----------- |----------- |------------:|------------:|------------:|------------:|-----------:|
| MyQueue | 1000 | 1000 | 19,379.4 us | 1,230.30 us | 3,569.33 us | 18,735.6 us | 8235.02 KB |
| MyChannel | 1000 | 1000 | 45,858.2 us | 1,298.42 us | 3,704.46 us | 45,689.2 us | 72.11 KB |
Run Code Online (Sandbox Code Playgroud)
实施ConcurrentQueue …
c# performance producer-consumer concurrent-queue system.threading.channels
我正在编写一个通用方法,需要在内部使用Dictionary由通用参数键控的通用方法。
我不想对通用参数本身施加任何约束,特别是没有notnull约束。但是,该方法能够显式处理空值,当然不会向它使用的字典添加任何空键。
这样做有什么最佳实践吗?
默认情况下,编译器会警告通用键类型与 的notnull约束不匹配Dictionary<,>。
请参阅下面的示例。除非我遗漏了什么,否则这应该是绝对安全的。但我怎样才能最好地将这一点传达给编译器呢?除了在这里抑制编译器警告之外,还有其他方法吗?
public static int CountMostFrequentItemOrItems<T>(params T[] values)
{
var counts = new Dictionary<T, int>(); // The type 'T' cannot be used as type parameter 'TKey' in the generic type or method 'Dictionary<TKey, TValue>'
var nullCount = 0;
foreach(var value in values)
if (value is null)
++nullCount;
else if (counts.TryGetValue(value, out var count))
counts[value] = count+1;
else
counts[value] = 1;
// ... combine results etc ...
}
Run Code Online (Sandbox Code Playgroud) 我编写了以下方法来批处理一个巨大的CSV文件.我们的想法是从文件中读取一大块行到内存中,然后将这些行分成固定大小的批量.获得分区后,将这些分区发送到服务器(同步或异步),这可能需要一段时间.
private static void BatchProcess(string filePath, int chunkSize, int batchSize)
{
List<string> chunk = new List<string>(chunkSize);
foreach (var line in File.ReadLines(filePath))
{
if (chunk.Count == chunk.Capacity)
{
// Partition each chunk into smaller chunks grouped on column 1
var partitions = chunk.GroupBy(c => c.Split(',')[0], (key, g) => g);
// Further breakdown the chunks into batch size groups
var groups = partitions.Select(x => x.Select((i, index) =>
new { i, index }).GroupBy(g => g.index / batchSize, e => e.i));
// Get batches …Run Code Online (Sandbox Code Playgroud) c# csv task-parallel-library blockingcollection tpl-dataflow
在我的应用程序中,我正在创建一些并发的Web请求,并且我对其中任何一个请求完成都感到满意,因此我使用的是Task.WhenAny:
var urls = new string[] {
"https://stackoverflow.com",
"https://superuser.com",
"https://www.reddit.com/r/chess",
};
var tasks = urls.Select(async url =>
{
using (var webClient = new WebClient())
{
return (Url: url, Data: await webClient.DownloadStringTaskAsync(url));
}
}).ToArray();
var firstTask = await Task.WhenAny(tasks);
Console.WriteLine($"First Completed Url: {firstTask.Result.Url}");
Console.WriteLine($"Data: {firstTask.Result.Data.Length:#,0} chars");
Run Code Online (Sandbox Code Playgroud)
首次完成的网址:https
://superuser.com数据:121.954个字符
我不喜欢这种实现方式,因为未完成的任务会继续下载不再需要的数据,并且浪费带宽,我希望为下一批请求保留这些带宽。因此,我正在考虑取消其他任务,但是我不确定该怎么做。我发现了如何使用CancellationToken取消特定的Web请求:
public static async Task<(string Url, string Data)> DownloadUrl(
string url, CancellationToken cancellationToken)
{
try
{
using (var webClient = new WebClient())
{
cancellationToken.Register(webClient.CancelAsync);
return (url, await webClient.DownloadStringTaskAsync(url)); …Run Code Online (Sandbox Code Playgroud) 如果可能,我想为并行启动的任务创建一个异步枚举器。所以第一个完成的是枚举的第一个元素,第二个完成的是枚举的第二个元素,依此类推。
public static async IAsyncEnumerable<T> ParallelEnumerateAsync(this IEnumerable<Task<T>> coldAsyncTasks)
{
// ...
}
Run Code Online (Sandbox Code Playgroud)
我打赌有一种使用ContinueWith和 a 的方法Queue<T>,但我并不完全相信自己会实现它。
c# asynchronous task-parallel-library async-await iasyncenumerable
假设我想发出并行 API post 请求。
在 for 循环中,我可以将 http post 调用附加到任务列表中(使用 Task.Run 调用的每个任务),然后等待所有任务完成使用await Task.WhenAll. 因此,在等待网络请求完成时,控制权将交给调用者。实际上,API 请求将并行发出。
同样,我可以使用Parallel.ForEachAsync它将自动执行WhenAll并将控制权返回给调用者。所以我想问是否ForEachAsync可以替换普通的 for 循环列表(async wait Task.Run)和WhenAll?
我参加聚会迟到了,但我最近了解到SemaphoreSlim:
我曾经用于lock同步锁定,并使用busy布尔值用于异步锁定。现在我SemaphoreSlim什么都用。
private SemaphoreSlim semaphoreSlim = new SemaphoreSlim(1, 1);
private void DoStuff()
{
semaphoreSlim.Wait();
try
{
DoBlockingStuff();
}
finally
{
semaphoreSlim.Release();
}
}
Run Code Online (Sandbox Code Playgroud)
与
private object locker = new object();
private void DoStuff()
{
lock(locker)
{
DoBlockingStuff();
}
}
Run Code Online (Sandbox Code Playgroud)
是否存在我应该更喜欢使用lockover 的同步情况SemaphoreSlim?如果有,它们是什么?
c# ×10
async-await ×4
.net ×3
asynchronous ×3
.net-6.0 ×1
csv ×1
dictionary ×1
locking ×1
nullable ×1
performance ×1
rx.net ×1
semaphore ×1
timer ×1
tpl-dataflow ×1