4 c++ multithreading tbb tbb-flow-graph
我目前正在测试 tbb 的流程图功能。为了使用它,我必须能够中止图中某个节点的执行,包括所有依赖它的子节点,但让其他不依赖它的子节点执行。从主体抛出异常或调用 task::cancel_group_execution() 会中止所有节点的执行。
#include <cstdio>
#include "tbb/flow_graph.h"
using namespace tbb::flow;
struct body
{ std::string my_name;
body( const char *name ) : my_name(name)
{
}
void operator()( continue_msg ) const
{ if (my_name == "B")
tbb::task::self().group()->cancel_group_execution();
else
{ sleep(1);
printf("%s\n", my_name.c_str());
}
}
};
int main()
{
graph g;
broadcast_node< continue_msg > start(g);
continue_node<continue_msg> a( g, body("A"));
continue_node<continue_msg> b( g, body("B"));
continue_node<continue_msg> c( g, body("C"));
continue_node<continue_msg> d( g, body("D"));
continue_node<continue_msg> e( g, body("E"));
make_edge( start, a );
make_edge( start, b );
make_edge( a, c );
make_edge( b, c );
make_edge( c, d );
make_edge( a, e );
for (int i = 0; i < 3; ++i )
try
{ start.try_put( continue_msg() );
g.wait_for_all();
} catch (...)
{ printf("Caught exception\n");
}
return 0;
}
Run Code Online (Sandbox Code Playgroud)
如果您希望能够取消部分图形的执行,则需要使用 task_group_contexts。添加以下内容:
#include "tbb/task.h"
Run Code Online (Sandbox Code Playgroud)
并将您的主程序更改为以下内容:
int main()
{
tbb::task_group_context tgc1;
tbb::task_group_context tgc2;
graph g1(tgc1);
graph g2(tgc2);
printf("Constructing graph\n");
broadcast_node< continue_msg > start(g1);
continue_node<continue_msg> a( g1, body("A"));
continue_node<continue_msg> b( g2, body("B"));
continue_node<continue_msg> c( g2, body("C"));
continue_node<continue_msg> d( g2, body("D"));
continue_node<continue_msg> e( g1, body("E"));
make_edge( start, a );
make_edge( start, b );
make_edge( a, c );
make_edge( b, c );
make_edge( c, d );
make_edge( a, e );
for (int i = 0; i < 3; ++i ) {
try
{
printf("broadcasting graph %d\n", i);
start.try_put( continue_msg() );
g1.wait_for_all();
g2.wait_for_all();
} catch (...)
{ printf("Caught exception\n");
}
g1.wait_for_all();
g1.reset();
g2.reset();
}
return 0;
}
Run Code Online (Sandbox Code Playgroud)
每个 task_group_context 是(默认)父上下文的子上下文。取消 g2 不会影响 g1。如果 B 抛出而不是取消,则您的捕获将确保异常不会传递给父级。如果您没有捕获异常,则父上下文也将被取消,A 和 E 的上下文也将被取消。

请注意,您必须等待两个图形完成。您还必须reset()使用图表来重置continue_nodes' 计数器。实际上,在抛出并捕获异常的情况下,完成后并不能保证g1catch(...)完成,因此需要在.outg1.wait_for_all()之外做一个try/catch。我编辑了代码以显示这一点。
不是使用取消来停止部分计算,您可以使 B amultifunction_node具有a的输入continue_msg和 a 的单个输出continue_msg:
typedef multifunction_node<continue_msg, tuple<continue_msg> > mf_type;
struct mf_body {
std::string my_name;
mf_body(const char *name) : my_name(name) {}
void operator()(continue_msg, mf_type::output_ports_type &op) {
if(my_name == "B") {
printf("%s returning without sending\n", my_name.c_str());
return;
}
sleep(1);
get<0>(op).try_put(continue_msg());
return;
}
};
Run Code Online (Sandbox Code Playgroud)
然后创建节点 B:
mf_type b( g, unlimited, mf_body("B"));
Run Code Online (Sandbox Code Playgroud)
并且从 B 到 C 的边缘将像这样设置:
make_edge( output_port<0>(b), c );
Run Code Online (Sandbox Code Playgroud)
在这种情况下,您不需要将图拆分为两个子图。如果节点 B 将取消,则它返回而不将 a 转发continue_msg给其后继节点。如果节点 B 不转发消息,节点 C 将不会执行,因为它需要两个continue_msgs启动。之后您仍然需要重置图形,以重置 C 的计数。
其multifunction_node优点是您可以选择是否转发消息。这里的警告是multifunction_node带有continue_msg输入的 a 不像 a continue_node。在continue_node需要尽可能多的continue_msgs,因为它有前辈(加上建设中的初始值。)的multifunction_node,当它接收到一个执行体continue_msg,不管它有多少前辈了。因此,对于您的图表,您不能只制作所有节点multifunction_nodes。