服务总线工作流程活动

fra*_*fra 19 workflow-foundation-4 async-await c#-5.0 azure-servicebus-queues

我想通过一些特定的活动访问工作流程中的服务总线队列和主题.

我找不到适合这种情况的任何东西(这篇MSDN文章Roman Kiss的这篇文章)是最接近的.

我想设计一个自定义活动,使用QueueClient异步接收代理消息,使用使用async/await模式实现的BeginReceive方法(请参阅我的问题).

首先,我想问一下,为什么我更喜欢建议的方法(改编的WCF)而不是我想要的方法(使用QueueClient).

然后,我将非常感谢以持久友好的方式设计它.

更新:

这是我到目前为止所尝试的:

public class AsyncReceiveBrokeredMessage : AsyncCodeActivity<BrokeredMessage>
{
    [RequiredArgument]
    public InArgument<string> ConnectionString { get; set; }

    [RequiredArgument]
    public InArgument<string> Path { get; set; }

    protected sealed override IAsyncResult BeginExecute(AsyncCodeActivityContext context, AsyncCallback callback, object state)
    {
        var connectionString = this.ConnectionString.Get(context);
        var path = this.Path.Get(context);
        var queueClient = QueueClient.CreateFromConnectionString(connectionString, path);
        var cts = new CancellationTokenSource();
        context.UserState = new ReceiveState
                                {
                                    CancellationTokenSource = cts,
                                    QueueClient = queueClient
                                };
        var task = ExecuteAsync(context, cts.Token);
        var tcs = new TaskCompletionSource<BrokeredMessage>(state);
        task.ContinueWith(
            t =>
                {
                    if (t.IsFaulted)
                    {
                        tcs.TrySetException(t.Exception.InnerExceptions);
                    }
                    else if (t.IsCanceled)
                    {
                        tcs.TrySetCanceled();
                    }
                    else
                    {
                        tcs.TrySetResult(t.Result);
                    }

                    if (callback != null)
                    {
                        callback(tcs.Task);
                    }
                });

        return tcs.Task;
    }

    protected sealed override BrokeredMessage EndExecute(AsyncCodeActivityContext context, IAsyncResult result)
    {
        var task = (Task<BrokeredMessage>)result;
        try
        {
            return task.Result;
        }
        catch (OperationCanceledException)
        {
            if (context.IsCancellationRequested)
            {
                context.MarkCanceled();
            }
            else
            {
                throw;
            }

            return null; // or throw?
        }
        catch (AggregateException exception)
        {
            if (exception.InnerException is OperationCanceledException)
            {
                if (context.IsCancellationRequested)
                {
                    context.MarkCanceled();
                }
                else
                {
                    throw;
                }

                return null; // or throw?
            }

            ExceptionDispatchInfo.Capture(exception.InnerException).Throw();
            throw;
        }
    }

    protected override void Cancel(AsyncCodeActivityContext context)
    {
        var state = (ReceiveState)context.UserState;
        state.CancellationTokenSource.Cancel();
    }

    private async Task<BrokeredMessage> ExecuteAsync(
        AsyncCodeActivityContext context, CancellationToken cancellationToken)
    {
        var receiveState = context.UserState as ReceiveState;
        var receiveTask = Task<BrokeredMessage>.Factory.FromAsync(
            receiveState.QueueClient.BeginReceive, receiveState.QueueClient.EndReceive, null);
        var completionTask = receiveTask.ContinueWith(
             t =>
                 {
                     BrokeredMessage result;
                     if (t.IsCanceled)
                     {
                         context.MarkCanceled();
                         result = null;
                     }
                     else if (t.IsFaulted)
                     {
                         result = null;
                     }
                     else
                     {

                         t.Result.Complete();
                         result = t.Result;
                     }

                     receiveState.QueueClient.Close();
                     return result;
                 },
             cancellationToken);
        return await completionTask;
    }

    private class ReceiveState
    {
        public CancellationTokenSource CancellationTokenSource { get; set; }

        public QueueClient QueueClient { get; set; }
    }
}
Run Code Online (Sandbox Code Playgroud)

并以这种方式测试(使用本地Windows Server Service Bus):

var connectionString = new Variable<string>
                                   {
                                       Default = connectionStringValue
                                   };
        var path = new Variable<string>
                       {
                           Default = pathValue
                       };
        var test = new While
                       {
                           Body =
                               new Pick
                                   {
                                       Branches =
                                           {
                                               new PickBranch
                                                   {
                                                       Trigger =
                                                           new AsyncReceiveBrokeredMessage
                                                               {
                                                                   ConnectionString = new InArgument<string>(connectionString),
                                                                   Path = new InArgument<string>(path)
                                                               },
                                                       Action =
                                                           new WriteLine
                                                               {
                                                                   Text =
                                                                       "Received message"
                                                               }
                                                   },
                                               new PickBranch
                                                   {
                                                       Trigger =
                                                           new Delay
                                                               {
                                                                   Duration = TimeSpan.FromSeconds(10)
                                                               },
                                                       Action =
                                                           new WriteLine
                                                               {
                                                                   Text =
                                                                       "Timeout!"
                                                               }
                                                   }
                                           }
                                   },
                           Condition = true,
                           Variables = { connectionString, path }
                       };
        WorkflowInvoker.Invoke(test);
Run Code Online (Sandbox Code Playgroud)

如果我不断发送消息,我会按预期收到消息.第一次超时后出现问题,因为那时我没有收到任何消息.任何澄清表示赞赏.

Pet*_*tar 0

队列实体提供以下功能:“指定消息添加到队列的时间的能力。”

超时后你可能收不到,因为这个规则?

五月份的决议是:

检测入站消息重复,允许客户端多次发送相同的消息,而不会产生不良后果。