在 Cadence/Temporal 工作流程中处理信号的最佳方式/模式是什么

Lon*_*eng 5 cadence-workflow temporal-workflow uber-cadence

当像文档建议那样使用信号时:

public class MyWorkflow{
   public Output myWorkflwMethod(Input input){
      ...
   }

   public void mySignalMethod(request){
     // do actual processing here. 
     ...
   }
}
Run Code Online (Sandbox Code Playgroud)

我可能会遇到以下问题:

  1. 我想保证 FIFO 一次处理一个(在同一信号名称内或跨所有信号名称)
  2. 我想处理signalWithStart 的“竞争条件”,其中信号方法调用得太早
  3. 我想安全地重置工作流程。重置后,信号可以在历史早期重新应用
  4. 我想确保工作流程不会在处理信号之前提前完成

Lon*_*eng 4

    \n
  1. 保证 FIFO 按顺序一次处理一个
  2. \n
  3. 处理 signalWithStart 的“竞争条件”,其中信号方法调用过早。或者实际上,对于没有 signalWithStart 的常规信号,信号可能在工作流准备好处理之前来得太早。
  4. \n
  5. 可以安全地重置工作流程。重置后,信号可以在历史早期重新应用
  6. \n
  7. 确保工作流程不会在处理信号之前提前完成
  8. \n
  9. 对于所有信号名称的 FIFO 以避免竞争情况,可以使用 Queue 的同一个队列来存储所有信号,并使用instance of与强制转换一起使用
  10. \n
\n

这四个是在 Cadence/Temporal 工作流程中使用信号时最常见的错误。

\n

您可以应用一种设计模式来共同解决所有问题。

\n

这个想法是简化信号处理程序以始终将信号放入队列中,并且工作流方法将启动另一个工作流线程来处理队列。

\n

它基于样本(节奏时间

\n

爪哇

\n
public class MyWorkflow{\n   private Queue<SignalRequest> signalRequestQueue = new LinkedList<>(); \n\n   public void mySignalMethod(SignalRequest req){\n       signalRequestQueue.add(req);\n   }\n\n   public Output myWorkflwMethod(Input input){\n      //1. do everything necessary/needed before actually processing a signal\n      ...\n\n      //2. spin up a workflow thread to process \n      Async.procedure(\n      () -> {\n          while (true) {\n              Workflow.await(() -> !signalRequestQueue.isEmpty());\n              final SignalRequest request = signalRequestQueue.poll();\n              processSignal(request);\n          }\n      });\n\n\n      //3. always wait for queue to be empty before completing/failing/continueAsNew the workflow\n      Workflow.await(() -> signalRequestQueue.isEmpty());\n      return output\n   }\n\n   private void processSignal(request){\n     // do your actual processing here. \n     // If a process a single signal may take too much time and you don\'t care about FIFO, you could also start another workflow thread to process signals in parallel.\n     ...\n   }\n}\n
Run Code Online (Sandbox Code Playgroud)\n

将现有代码迁移到此模式

\n

您应该使用版本控制来迁移。

\n

假设您有这样的现有代码;

\n
public class MyWorkflow{\n   public Output myWorkflwMethod(Input input){\n      ...\n   }\n\n   public void mySignalMethod(request){\n     // do your actual processing here. \n     ...\n   }\n}\n
Run Code Online (Sandbox Code Playgroud)\n

那么你应该使用如下所示的版本控制:

\n
public class MyWorkflow{\n   private Queue<SignalRequest> signalRequestQueue = new LinkedList<>(); \n\n   public void mySignalMethod(SignalRequest req){\n       int version = Workflow.getVersion("useSignalQueue", Workflow.DEFAULT_VERSION, 1);\n       if( version == 1){\n          signalRequestQueue.add(req);\n       }else{\n          processSignal(req);\n       }\n   }\n\n   public Output myWorkflwMethod(Input input){\n      //1. do everything necessary/needed before actually processing a signal\n      ...\n\n       int version = Workflow.getVersion("useSignalQueue", Workflow.DEFAULT_VERSION, 1);\n       if( version == 1){\n         //2. spin up a workflow thread to process \n         Async.procedure(\n         () -> {\n             while (true) {\n                 Workflow.await(() -> !signalRequestQueue.isEmpty());\n                 final SignalRequest request = signalRequestQueue.poll();\n                 processSignal(request);\n             }\n         });\n       }\n\n      //3. always wait for queue to be empty before completing/failing/continueAsNeww the workflow\n      Workflow.await(() -> signalRequestQueue.isEmpty());\n      return output\n   }\n\n   private void processSignal(request){\n     // do your actual processing here. \n     // If a process a single signal may take too much time and you don\'t care about FIFO, you could also start another workflow thread to process signals in parallel.\n     ...\n   }\n}\n
Run Code Online (Sandbox Code Playgroud)\n

戈兰

\n

Golang SDK 没有 1/2/3 的相同问题。这是因为Golang SDK提供了完全不同的API来处理信号。

\n

Golang SDK 不需要将信号方法定义为处理程序,而是需要工作流侦听通道来处理信号,这正是该答案建议在 Java 中执行的操作。请参阅如何发出信号 API 的示例。(参见节奏/时间

\n

但它有问题#4——工作流程可能会在信号处理之前提前完成。这是 Golang SDK 的常见错误。

\n

建议始终在完成或 continueAsNew 工作流程之前排空信号通道。\n请参阅此示例,了解如何在 Golang 中排空信号通道

\n

它\xe2\x80\x99类似于在Java中使用Workflow.await来等待所有信号被处理。但是因为通道没有\xe2\x80\x99t有一个API来获取大小,所以我们必须使用\xe2\x80\x9cdefault\xe2\x80\x9d分支来检查空性。

\n

感谢@Maxim指出Temporal go sdk中的API——或者,使用Temporal go-sdk中的“ HasPending ” API来检查是否所有信号都被消耗。

\n

另外,建议监视“unhandledSignal”指标

\n