如何在tbb流程图中中止节点及其子节点的执行

4 c++ multithreading tbb tbb-flow-graph

我目前正在测试 tbb 的流程图功能。为了使用它,我必须能够中止图中某个节点的执行,包括所有依赖它的子节点,但让其他不依赖它的子节点执行。从主体抛出异常或调用 task::cancel_group_execution() 会中止所有节点的执行。

#include <cstdio>
#include "tbb/flow_graph.h"

using namespace tbb::flow;

struct body
{   std::string my_name;
    body( const char *name ) : my_name(name)
    {
    }
    void operator()( continue_msg ) const
    {   if (my_name == "B")
            tbb::task::self().group()->cancel_group_execution();
        else
        {   sleep(1);
            printf("%s\n", my_name.c_str());
        }
    }
};

int main()
{
    graph g;

    broadcast_node< continue_msg > start(g);
    continue_node<continue_msg> a( g, body("A"));
    continue_node<continue_msg> b( g, body("B"));
    continue_node<continue_msg> c( g, body("C"));
    continue_node<continue_msg> d( g, body("D"));
    continue_node<continue_msg> e( g, body("E"));

    make_edge( start, a );
    make_edge( start, b );
    make_edge( a, c );
    make_edge( b, c );
    make_edge( c, d );
    make_edge( a, e );

    for (int i = 0; i < 3; ++i )
        try
        {   start.try_put( continue_msg() );
            g.wait_for_all();
        } catch (...)
        {   printf("Caught exception\n");
        }
    return 0;
}
Run Code Online (Sandbox Code Playgroud)

cah*_*son 5

如果您希望能够取消部分图形的执行,则需要使用 task_group_contexts。添加以下内容:

#include "tbb/task.h"
Run Code Online (Sandbox Code Playgroud)

并将您的主程序更改为以下内容:

int main()
{
    tbb::task_group_context tgc1;
    tbb::task_group_context tgc2;
    graph g1(tgc1);
    graph g2(tgc2);
    printf("Constructing graph\n");
    broadcast_node< continue_msg > start(g1);
    continue_node<continue_msg> a( g1, body("A"));
    continue_node<continue_msg> b( g2, body("B"));
    continue_node<continue_msg> c( g2, body("C"));
    continue_node<continue_msg> d( g2, body("D"));
    continue_node<continue_msg> e( g1, body("E"));

    make_edge( start, a );
    make_edge( start, b );
    make_edge( a, c );
    make_edge( b, c );
    make_edge( c, d );
    make_edge( a, e );

    for (int i = 0; i < 3; ++i ) {
        try
        {   
            printf("broadcasting graph %d\n", i);
            start.try_put( continue_msg() );
            g1.wait_for_all();
            g2.wait_for_all();
        } catch (...)
        {   printf("Caught exception\n");
        }
        g1.wait_for_all();
        g1.reset();
        g2.reset();
    }
    return 0;
}
Run Code Online (Sandbox Code Playgroud)

每个 task_group_context 是(默认)父上下文的子上下文。取消 g2 不会影响 g1。如果 B 抛出而不是取消,则您的捕获将确保异常不会传递给父级。如果您没有捕获异常,则父上下文也将被取消,A 和 E 的上下文也将被取消。

具有多个 task_group_contexts 的图

请注意,您必须等待两个图形完成。您还必须reset()使用图表来重置continue_nodes' 计数器。实际上,在抛出并捕获异常的情况下,完成后并不能保证g1catch(...)完成,因此需要在.outg1.wait_for_all()之外做一个try/catch。我编辑了代码以显示这一点。

不是使用取消来停止部分计算,您可以使 B amultifunction_node具有a的输入continue_msg和 a 的单个输出continue_msg

typedef multifunction_node<continue_msg, tuple<continue_msg> > mf_type;

struct mf_body {
    std::string my_name;
    mf_body(const char *name) : my_name(name) {}
    void operator()(continue_msg, mf_type::output_ports_type &op) {
        if(my_name == "B") {
            printf("%s returning without sending\n", my_name.c_str());
            return;
        }
        sleep(1);
        get<0>(op).try_put(continue_msg());
        return;
    }
};
Run Code Online (Sandbox Code Playgroud)

然后创建节点 B:

mf_type b( g, unlimited, mf_body("B"));
Run Code Online (Sandbox Code Playgroud)

并且从 B 到 C 的边缘将像这样设置:

make_edge( output_port<0>(b), c ); 
Run Code Online (Sandbox Code Playgroud)

在这种情况下,您不需要将图拆分为两个子图。如果节点 B 将取消,则它返回而不将 a 转发continue_msg给其后继节点。如果节点 B 不转发消息,节点 C 将不会执行,因为它需要两个continue_msgs启动。之后您仍然需要重置图形,以重置 C 的计数。

multifunction_node优点是您可以选择是否转发消息。这里的警告是multifunction_node带有continue_msg输入的 a 不像 a continue_node。在continue_node需要尽可能多的continue_msgs,因为它有前辈(加上建设中的初始值。)的multifunction_node,当它接收到一个执行体continue_msg,不管它有多少前辈了。因此,对于您的图表,您不能只制作所有节点multifunction_nodes