spe*_*der 17 c# dataflow task-parallel-library async-await tpl-dataflow
我知道......我并没有真正使用TplDataflow来发挥它的最大潜力.ATM我只是BufferBlock用作消息传递的安全队列,其中生产者和消费者以不同的速率运行.我看到一些奇怪的行为,让我难以理解如何继续.
private BufferBlock<object> messageQueue = new BufferBlock<object>();
public void Send(object message)
{
var accepted=messageQueue.Post(message);
logger.Info("Send message was called qlen = {0} accepted={1}",
messageQueue.Count,accepted);
}
public async Task<object> GetMessageAsync()
{
try
{
var m = await messageQueue.ReceiveAsync(TimeSpan.FromSeconds(30));
//despite messageQueue.Count>0 next line
//occasionally does not execute
logger.Info("message received");
//.......
}
catch(TimeoutException)
{
//do something
}
}
Run Code Online (Sandbox Code Playgroud)
在上面的代码中(它是2000行分布式解决方案的一部分),Send每100ms左右定期调用一次.这意味着一个项目被Post编到messageQueue在约10次.这已经过验证.但是,偶尔看起来ReceiveAsync在超时内没有完成(即Post没有导致ReceiveAsync完成)并且TimeoutException在30秒后被提升.在这一点上,messageQueue.Count是数百.这是出乎意料的.这个问题已经在较慢的发布率(1个帖子/秒)中观察到,并且通常在1000个项目通过之前发生BufferBlock.
因此,要解决此问题,我使用以下代码,它可以工作,但偶尔会在接收时导致1秒延迟(由于上面发生的错误)
public async Task<object> GetMessageAsync()
{
try
{
object m;
var attempts = 0;
for (; ; )
{
try
{
m = await messageQueue.ReceiveAsync(TimeSpan.FromSeconds(1));
}
catch (TimeoutException)
{
attempts++;
if (attempts >= 30) throw;
continue;
}
break;
}
logger.Info("message received");
//.......
}
catch(TimeoutException)
{
//do something
}
}
Run Code Online (Sandbox Code Playgroud)
这看起来像TDF中的竞争条件,但我无法深究为什么在我BufferBlock以类似方式使用的其他地方不会发生这种情况.从实验上改变ReceiveAsync到Receive没有帮助.我没有检查,但我想孤立地看,上面的代码完美无缺.这是我在"TPL数据流简介" tpldataflow.docx中记录的模式.
我该怎么做才能找到底线?是否有任何指标可能有助于推断正在发生的事情?如果我无法创建可靠的测试用例,我可以提供哪些更多信息?
救命!
斯蒂芬似乎认为以下是解决方案
var m =等待messageQueue.ReceiveAsync();
代替:
var m =等待messageQueue.ReceiveAsync(TimeSpan.FromSeconds(30));
你能否证实或否认这一点?