我有一个异步方法,说:
public async Task<T> GetAsync()
{
}
Run Code Online (Sandbox Code Playgroud)
并将从以下位置调用:
public async Task<IEnumerable<T>> GetAllAsync()
{
foreach (var item in something)
{
var result = await GetAsync();
yield return result;
}
}
Run Code Online (Sandbox Code Playgroud)
上面的语法无效,但基本上我是在异步生成器之后。我知道它可以通过 Observables 处理。我对 Rx.NET 进行了实验,并且在一定程度上奏效了。但我试图避免它给代码库带来的复杂性,更重要的是,上述需求本质上仍然不是一个反应式系统(我们的仍然是基于拉取的)。例如,我只会在一段时间内收听传入的异步流,并且我必须从消费者端停止生产者(而不仅仅是取消订阅消费者)。
我可以像这样反转方法签名:
public IEnumerable<Task<T>> GetAllAsync()
Run Code Online (Sandbox Code Playgroud)
但这使得在不阻塞的情况下进行 LINQ 操作有点棘手。我希望它是非阻塞的,并且不将整个内容加载到内存中。这个库:AsyncEnumerable正是我正在寻找的,但如何用Ix.NET做到这一点?它们的用途与我相信的相同。
换句话说,我如何利用 Ix.NETIAsyncEnumerable在处理await? 喜欢,
public async IAsyncEnumerable GetAllAsync()
{
foreach (var item in something)
{
var result = await GetAsync();
return // what?
}
}
Run Code Online (Sandbox Code Playgroud) 我发现自己经常编写这样的代码:
try
{
cancellationTokenSource.Cancel();
await task.ConfigureAwait(false); // this is the task that was cancelled
}
catch(OperationCanceledException)
{
// Cancellation expected and requested
}
Run Code Online (Sandbox Code Playgroud)
鉴于我请求取消,这是预料之中的,而且我真的希望忽略该异常。这似乎是一个常见的案例。
有没有更简洁的方法来做到这一点?我是否错过了有关取消的信息?看来应该有什么task.CancellationExpected()方法什么的。
等待发送非必要指标对我来说毫无意义,因为它会在每次调用后端 dotnet 核心服务时增加延迟(等待服务器的响应),每次客户端调用可能会发生多次。尽管如此,在失败时记录错误仍然很重要(我不需要抛出它,因为指标的失败不应该影响服务)。
我找到了多种方法。这是以 FireAndForget 方式调用的方法:
public async Task FireAndForget()
{
try{ await sendTheMetric(); } // Try catch on the await, to log exceptions
catch(Exception e){ logger.debug(e) }
}
Run Code Online (Sandbox Code Playgroud)
方法一:去掉await。
FireAndForget(); // No await in front of the call
Run Code Online (Sandbox Code Playgroud)
方法 2:对我来说似乎类似于方法 1,因为我们不等待 Task.Factory.StartNew 调用。
Task.Factory.StartNew(async () => await MyAsyncTask());
Run Code Online (Sandbox Code Playgroud)
方法 3:作为工作项在 ThreadPool 上排队。
ThreadPool.QueueUserWorkItem(async o => await FireAndForget());
Run Code Online (Sandbox Code Playgroud)
我很难找到应该将哪个用于 Fire 并忘记调用发送非必要指标。我的目标是在每次发送指标时不增加对我的服务的每次调用的延迟。当指标发送失败时记录错误很重要,但它不应该重新抛出。线程上下文对于任务的执行并不重要。任务应该总是,或至少几乎总是,完成。
哪个是需求的最佳实践?还是他们都一样?
注意:我没有包含async void,因为它看起来有风险(如果发生异常,它可能会崩溃,因为没有 Task 会包装它)。
我想要一种将异步方法转换为 observable 的通用方法。就我而言,我正在处理用于HttpClient从 API 获取数据的方法。
假设我们有一个方法Task<string> GetSomeData()需要成为一个单一的方法,Observable<string>其中的值是作为以下组合生成的:
GetSomeData()(例如每 x 秒)GetSomeData()在任何给定时间手动触发调用(例如当用户点击刷新时)。由于有两种方法可以触发GetSomeData()并发执行可能是一个问题。为了避免要求GetSomeData()线程安全,我想限制并发性,以便只有一个线程同时执行该方法。因此,我需要使用某种策略来处理重叠的请求。我做了一个(某种)大理石图来描述问题和想要的结果
我的直觉告诉我有一个简单的方法可以实现这一点,所以请给我一些见解:)
这是我到目前为止的解决方案。不幸的是,它并没有解决并发问题。
public class ObservableCreationWrapper<T>
{
private Subject<Unit> _manualCallsSubject = new Subject<Unit>();
private Func<Task<T>> _methodToCall;
private IObservable<T> _manualCalls;
public IObservable<T> Stream { get; private set; }
public ObservableCreationWrapper(Func<Task<T>> methodToCall, TimeSpan period)
{
_methodToCall = methodToCall;
_manualCalls = _manualCallsSubject.AsObservable()
.Select(x => Observable.FromAsync(x => methodToCall()))
.Merge(1);
Stream = Observable.FromAsync(() => _methodToCall())
.DelayRepeat(period)
.Merge(_manualCalls);
}
public void TriggerAdditionalCall() …Run Code Online (Sandbox Code Playgroud) 我想并行执行操作Span<T>,但这样的操作是不合法的:
void DoSomething(Span<int> buffer, int option1, int option2)
{
.......
}
void ParallelDoSomething(Span<int> buffer)
{
var size = buffer.Length;
Parallel.Invoke(() => DoSomething(buffer, 0, size / 2),
() => DoSomething(buffer, size/2, size)); //not legal
}
Run Code Online (Sandbox Code Playgroud)
因为编译器抱怨:不能在匿名方法、lambda 表达式、查询表达式或本地函数中使用 ref、out 或 in 参数“缓冲区”
如何在以Span<T>参数为参数的并行方法中执行?
当消费时IAsyncEnumerable<T>,是否可以只从流中读取第一项,然后取消操作并返回它?像FirstOrDefaulton之类的东西IEnumerable<T>?
干杯
如果我有大量收藏并且我关心性能,我是否应该相信奇迹并使用
var min = Y.Min();
var max = Y.Max();
Run Code Online (Sandbox Code Playgroud)
或者我最好成为一名优秀的工程师并使用
var max = double.NegativeInfinity;
var min = double.PositiveInfinity;
foreach(var y in Y)
{
if(y > max)
max = y;
if(y < min)
min = y;
}
Run Code Online (Sandbox Code Playgroud)
Y是ICollection<double>,因为我需要Count和foreach。我很好奇类型是否正确,因为最小/最大,并且我需要从末尾迭代集合,所以会有
Y.OrderByDescending((o) => o)...
Run Code Online (Sandbox Code Playgroud) 我正在使用 .Net 使用一个 REST API (GET) HttpClient。我想用long polling调用这个 API 。
我有几个问题:
c# concurrency multithreading long-polling dotnet-httpclient
我有一个这种形状的函数,可以进行一维求根:
public delegate double Fun(double x, object o);
public static void Solve(Fun f, out double y, object o)
{
y = f(1.0, o); // all the irrelevant details of the algorithm omitted
}
Run Code Online (Sandbox Code Playgroud)
这是一个固定的形状,以便使算法可重用。将此视为我无法更改的固定库函数(或者至少需要保持通用性和可重用性,并且不会针对此问题的具体情况进行更改)。
我想传递一个函数,该函数需要Span<T>保存在堆栈上的外部参数以避免分配,但显然不能将Span<T>其推入对象,因为这需要装箱和拆箱。
使用 lambda 表达式,调用代码可能类似于:
void CallingMethod()
{
Span<double> k1 = stackalloc double[n];
double answer;
Solve((x, o) => Wrapper(x, k1, o), out answer, null);
}
double Wrapper(double x, ReadOnlySpan<double> k1, object o)
{
return <some function of x and k1>;
}
Run Code Online (Sandbox Code Playgroud)
但这不起作用,因为您无法Span<T> …
我目前正在测试 C# 8 的异步流,似乎当我尝试使用使用 async/await 并返回 Task> 的旧模式运行应用程序时,它似乎更快。(我使用秒表对其进行测量并尝试多次运行,结果是我提到的旧模式似乎比使用 IAsyncEnumerable 快一些)。
这是我写的一个简单的控制台应用程序(我也在想我可能以错误的方式从数据库加载数据)
class Program
{
static async Task Main(string[] args)
{
// Using the old pattern
//Stopwatch stopwatch = Stopwatch.StartNew();
//foreach (var person in await LoadDataAsync())
//{
// Console.WriteLine($"Id: {person.Id}, Name: {person.Name}");
//}
//stopwatch.Stop();
//Console.WriteLine(stopwatch.ElapsedMilliseconds);
Stopwatch stopwatch = Stopwatch.StartNew();
await foreach (var person in LoadDataAsyncStream())
{
Console.WriteLine($"Id: {person.Id}, Name: {person.Name}");
}
stopwatch.Stop();
Console.WriteLine(stopwatch.ElapsedMilliseconds);
Console.ReadKey();
}
static async Task<IEnumerable<Person>> LoadDataAsync()
{
string connectionString = "Server=localhost; Database=AsyncStreams; Trusted_Connection = True;";
var people = …Run Code Online (Sandbox Code Playgroud) c# ×10
async-await ×3
asynchronous ×3
linq ×3
performance ×2
rx.net ×2
.net ×1
.net-core ×1
cancellation ×1
concurrency ×1
long-polling ×1
sql-server ×1
task ×1