fra*_*fra 19 workflow-foundation-4 async-await c#-5.0 azure-servicebus-queues
我想通过一些特定的活动访问工作流程中的服务总线队列和主题.
我找不到适合这种情况的任何东西(这篇MSDN文章和Roman Kiss的这篇文章)是最接近的.
我想设计一个自定义活动,使用QueueClient异步接收代理消息,使用使用async/await模式实现的BeginReceive方法(请参阅我的问题).
首先,我想问一下,为什么我更喜欢建议的方法(改编的WCF)而不是我想要的方法(使用QueueClient).
然后,我将非常感谢以持久友好的方式设计它.
更新:
这是我到目前为止所尝试的:
public class AsyncReceiveBrokeredMessage : AsyncCodeActivity<BrokeredMessage>
{
[RequiredArgument]
public InArgument<string> ConnectionString { get; set; }
[RequiredArgument]
public InArgument<string> Path { get; set; }
protected sealed override IAsyncResult BeginExecute(AsyncCodeActivityContext context, AsyncCallback callback, object state)
{
var connectionString = this.ConnectionString.Get(context);
var path = this.Path.Get(context);
var queueClient = QueueClient.CreateFromConnectionString(connectionString, path);
var cts = new CancellationTokenSource();
context.UserState = new ReceiveState
{
CancellationTokenSource = cts,
QueueClient = queueClient
};
var task = ExecuteAsync(context, cts.Token);
var tcs = new TaskCompletionSource<BrokeredMessage>(state);
task.ContinueWith(
t =>
{
if (t.IsFaulted)
{
tcs.TrySetException(t.Exception.InnerExceptions);
}
else if (t.IsCanceled)
{
tcs.TrySetCanceled();
}
else
{
tcs.TrySetResult(t.Result);
}
if (callback != null)
{
callback(tcs.Task);
}
});
return tcs.Task;
}
protected sealed override BrokeredMessage EndExecute(AsyncCodeActivityContext context, IAsyncResult result)
{
var task = (Task<BrokeredMessage>)result;
try
{
return task.Result;
}
catch (OperationCanceledException)
{
if (context.IsCancellationRequested)
{
context.MarkCanceled();
}
else
{
throw;
}
return null; // or throw?
}
catch (AggregateException exception)
{
if (exception.InnerException is OperationCanceledException)
{
if (context.IsCancellationRequested)
{
context.MarkCanceled();
}
else
{
throw;
}
return null; // or throw?
}
ExceptionDispatchInfo.Capture(exception.InnerException).Throw();
throw;
}
}
protected override void Cancel(AsyncCodeActivityContext context)
{
var state = (ReceiveState)context.UserState;
state.CancellationTokenSource.Cancel();
}
private async Task<BrokeredMessage> ExecuteAsync(
AsyncCodeActivityContext context, CancellationToken cancellationToken)
{
var receiveState = context.UserState as ReceiveState;
var receiveTask = Task<BrokeredMessage>.Factory.FromAsync(
receiveState.QueueClient.BeginReceive, receiveState.QueueClient.EndReceive, null);
var completionTask = receiveTask.ContinueWith(
t =>
{
BrokeredMessage result;
if (t.IsCanceled)
{
context.MarkCanceled();
result = null;
}
else if (t.IsFaulted)
{
result = null;
}
else
{
t.Result.Complete();
result = t.Result;
}
receiveState.QueueClient.Close();
return result;
},
cancellationToken);
return await completionTask;
}
private class ReceiveState
{
public CancellationTokenSource CancellationTokenSource { get; set; }
public QueueClient QueueClient { get; set; }
}
}
Run Code Online (Sandbox Code Playgroud)
并以这种方式测试(使用本地Windows Server Service Bus):
var connectionString = new Variable<string>
{
Default = connectionStringValue
};
var path = new Variable<string>
{
Default = pathValue
};
var test = new While
{
Body =
new Pick
{
Branches =
{
new PickBranch
{
Trigger =
new AsyncReceiveBrokeredMessage
{
ConnectionString = new InArgument<string>(connectionString),
Path = new InArgument<string>(path)
},
Action =
new WriteLine
{
Text =
"Received message"
}
},
new PickBranch
{
Trigger =
new Delay
{
Duration = TimeSpan.FromSeconds(10)
},
Action =
new WriteLine
{
Text =
"Timeout!"
}
}
}
},
Condition = true,
Variables = { connectionString, path }
};
WorkflowInvoker.Invoke(test);
Run Code Online (Sandbox Code Playgroud)
如果我不断发送消息,我会按预期收到消息.第一次超时后出现问题,因为那时我没有收到任何消息.任何澄清表示赞赏.
队列实体提供以下功能:“指定消息添加到队列的时间的能力。”
超时后你可能收不到,因为这个规则?
五月份的决议是:
检测入站消息重复,允许客户端多次发送相同的消息,而不会产生不良后果。