Lon*_*eng 5 cadence-workflow temporal-workflow uber-cadence
当像文档建议那样使用信号时:
public class MyWorkflow{
public Output myWorkflwMethod(Input input){
...
}
public void mySignalMethod(request){
// do actual processing here.
...
}
}
Run Code Online (Sandbox Code Playgroud)
我可能会遇到以下问题:
instance of与强制转换一起使用这四个是在 Cadence/Temporal 工作流程中使用信号时最常见的错误。
\n您可以应用一种设计模式来共同解决所有问题。
\n这个想法是简化信号处理程序以始终将信号放入队列中,并且工作流方法将启动另一个工作流线程来处理队列。
\n\npublic class MyWorkflow{\n private Queue<SignalRequest> signalRequestQueue = new LinkedList<>(); \n\n public void mySignalMethod(SignalRequest req){\n signalRequestQueue.add(req);\n }\n\n public Output myWorkflwMethod(Input input){\n //1. do everything necessary/needed before actually processing a signal\n ...\n\n //2. spin up a workflow thread to process \n Async.procedure(\n () -> {\n while (true) {\n Workflow.await(() -> !signalRequestQueue.isEmpty());\n final SignalRequest request = signalRequestQueue.poll();\n processSignal(request);\n }\n });\n\n\n //3. always wait for queue to be empty before completing/failing/continueAsNew the workflow\n Workflow.await(() -> signalRequestQueue.isEmpty());\n return output\n }\n\n private void processSignal(request){\n // do your actual processing here. \n // If a process a single signal may take too much time and you don\'t care about FIFO, you could also start another workflow thread to process signals in parallel.\n ...\n }\n}\nRun Code Online (Sandbox Code Playgroud)\n您应该使用版本控制来迁移。
\n假设您有这样的现有代码;
\npublic class MyWorkflow{\n public Output myWorkflwMethod(Input input){\n ...\n }\n\n public void mySignalMethod(request){\n // do your actual processing here. \n ...\n }\n}\nRun Code Online (Sandbox Code Playgroud)\n那么你应该使用如下所示的版本控制:
\npublic class MyWorkflow{\n private Queue<SignalRequest> signalRequestQueue = new LinkedList<>(); \n\n public void mySignalMethod(SignalRequest req){\n int version = Workflow.getVersion("useSignalQueue", Workflow.DEFAULT_VERSION, 1);\n if( version == 1){\n signalRequestQueue.add(req);\n }else{\n processSignal(req);\n }\n }\n\n public Output myWorkflwMethod(Input input){\n //1. do everything necessary/needed before actually processing a signal\n ...\n\n int version = Workflow.getVersion("useSignalQueue", Workflow.DEFAULT_VERSION, 1);\n if( version == 1){\n //2. spin up a workflow thread to process \n Async.procedure(\n () -> {\n while (true) {\n Workflow.await(() -> !signalRequestQueue.isEmpty());\n final SignalRequest request = signalRequestQueue.poll();\n processSignal(request);\n }\n });\n }\n\n //3. always wait for queue to be empty before completing/failing/continueAsNeww the workflow\n Workflow.await(() -> signalRequestQueue.isEmpty());\n return output\n }\n\n private void processSignal(request){\n // do your actual processing here. \n // If a process a single signal may take too much time and you don\'t care about FIFO, you could also start another workflow thread to process signals in parallel.\n ...\n }\n}\nRun Code Online (Sandbox Code Playgroud)\nGolang SDK 没有 1/2/3 的相同问题。这是因为Golang SDK提供了完全不同的API来处理信号。
\nGolang SDK 不需要将信号方法定义为处理程序,而是需要工作流侦听通道来处理信号,这正是该答案建议在 Java 中执行的操作。请参阅如何发出信号 API 的示例。(参见节奏/时间)
\n但它有问题#4——工作流程可能会在信号处理之前提前完成。这是 Golang SDK 的常见错误。
\n建议始终在完成或 continueAsNew 工作流程之前排空信号通道。\n请参阅此示例,了解如何在 Golang 中排空信号通道。
\n它\xe2\x80\x99类似于在Java中使用Workflow.await来等待所有信号被处理。但是因为通道没有\xe2\x80\x99t有一个API来获取大小,所以我们必须使用\xe2\x80\x9cdefault\xe2\x80\x9d分支来检查空性。
\n感谢@Maxim指出Temporal go sdk中的API——或者,使用Temporal go-sdk中的“ HasPending ” API来检查是否所有信号都被消耗。
\n另外,建议监视“unhandledSignal”指标。
\n| 归档时间: |
|
| 查看次数: |
2294 次 |
| 最近记录: |