从一个进程到另一个进程的信号 C++

Edu*_*yan 2 c++ boost ipc

我知道标题有点宽泛,所以让我详细说明一下。
我有两个进程正在运行,一个正在写入共享内存,另一个正在从中读取。
为了实现共享内存效果,我使用 boost::interprocess (顺便说一句,让我知道是否有更方便的库)。

所以我实现了以下内容:

//作家

#include <boost/interprocess/shared_memory_object.hpp>
#include <boost/interprocess/windows_shared_memory.hpp>
#include <boost/interprocess/mapped_region.hpp>
#include <iostream>

namespace ip = boost::interprocess;
class SharedMemory
{
public:
    template<typename OpenOrCreate>
    SharedMemory(OpenOrCreate criteria, const char* name, ip::mode_t mode, size_t size) :
        name_(name),
        sm_(std::make_shared<ip::windows_shared_memory>(criteria, name, mode, size))
    {
    }

    template<typename OpenOrCreate>
    SharedMemory(OpenOrCreate criteria, const char* name, ip::mode_t mode) :
        name_(name),
        sm_(std::make_shared<ip::windows_shared_memory>(criteria, name, mode))
    {
    }

    std::shared_ptr<ip::windows_shared_memory> getSM()
    {
        return sm_;
    }
private:
    std::function<void()> destroyer_;
    std::string name_;
    std::shared_ptr<ip::windows_shared_memory> sm_;
};


int main()
{
    SharedMemory creator(ip::create_only, "SharedMemory", ip::read_write, 10);
    ip::mapped_region region(*creator.getSM(), ip::read_write);
    std::memset(region.get_address(), 1, region.get_size());

    int status = system("reader.exe");
    std::cout << status << std::endl;
}
Run Code Online (Sandbox Code Playgroud)

所以我正在创建共享内存,向其中写入 1,然后调用读取器 exe。(我跳过了读者部分,因为它几乎相同,但不是写而是读)

这段代码工作正常,我写入内存,另一个进程读取它并打印我的 1。
但是,如果我同时运行这 2 个 exe,并且我想写入内存,然后通知其他进程有更新,该怎么办?如何从一个 exe/进程向另一个 exe/进程发出信号?

场景是我正在流式传输一些实时数据,写入内存,然后告诉其他进程有更新。

seh*_*ehe 5

我认为确实有更方便的方法。

原则上,要在进程之间进行同步,您可以使用与进程内部(线程之间)同步相同的方法:使用同步原语(互斥体/临界区、条件变量、信号量、屏障等)。

此外,您需要有一个同步的数据结构。这正是目前的致命弱点。这里完全没有数据结构。

尽管您可以使用自己的逻辑进行原始字节访问,但我认为使用高级库这样做没有吸引力。相反,我会使用托管内存段,它可以让您按名称查找或构造类型化对象。这可能包括您的同步原语。

事实上,您可以通过使用message_queue已内置所有同步功能来加快该过程。

手动同步:使用段管理器的写入器

我将提供可移植代码,因为我没有 Windows 机器。首先让我们考虑一下数据结构。一个简单的例子是消息队列。让我们使用一个deque<string>.

不完全是微不足道的数据结构,但好消息是 Boost Interprocess 附带了使事情正常工作的所有具体细节(使用进程间分配器)。

namespace Shared {

    using Segment = ip::managed_shared_memory;
    using Mgr     = Segment::segment_manager;
    template <typename T>
    using Alloc = bc::scoped_allocator_adaptor<ip::allocator<T, Mgr>>;
    template <typename T> using Deque = bc::deque<T, Alloc<T>>;
    using String = bc::basic_string<char, std::char_traits<char>, Alloc<char>>;

    using DataStructure = Deque<String>;

    class Memory {
      public:
        Memory(const char* name, size_t size)
            : name_(name)
            , sm_(ip::open_or_create, name, size)
            , data_(*sm_.find_or_construct<DataStructure>("data")(
                  sm_.get_segment_manager()))
        {
        }

        DataStructure&       get()       { return data_; } 
        DataStructure const& get() const { return data_; } 

      private:
        std::string    name_;
        Segment        sm_;
        DataStructure& data_;
    };

} // namespace Shared
Run Code Online (Sandbox Code Playgroud)

在那里,现在我们可以让作者像这样:

int main()
{
    Shared::Memory creator("SharedMemory", 10*1024*1024);

    creator.get().emplace_back("Hello");
    creator.get().emplace_back("World");

    std::cout << "Total queued: " << creator.get().size() << "\n";
}
Run Code Online (Sandbox Code Playgroud)

它将打印例如

Total queued: 2
Total queued: 4
Total queued: 6
Run Code Online (Sandbox Code Playgroud)

取决于您运行的次数。

读者方

现在让我们做读者方面。事实上它非常相似,让我们把它放在同一个主程序中:

int main(int argc, char**)
{
    Shared::Memory mem("SharedMemory", 10*1024*1024);
    auto& data = mem.get();

    bool is_reader = argc > 1;

    if (not is_reader) {
        data.emplace_back("Hello");
        data.emplace_back("World");
        std::cout << "Total queued: " << data.size() << "\n";
    } else {
        std::cout << "Found entries: " << data.size() << "\n";
        while (!data.empty()) {
            std::cout << "Dequeued " << data.front() << "\n";
            data.pop_front();
        }
    }

}
Run Code Online (Sandbox Code Playgroud)

开始很简单。现在运行egtest.exe READER将相反地打印如下内容:

在此输入图像描述

锁定与同步

目标是同时运行编写器和读取器。由于缺乏锁定和同步,这并不像现在这样安全。让我们添加它:

class Memory {
    static constexpr size_t max_capacity = 100;
  public:
    Memory(const char* name, size_t size)
        : name_(name)
        , sm_(ip::open_or_create, name, size)
        , mx_(*sm_.find_or_construct<Mutex>("mutex")())
        , cv_(*sm_.find_or_construct<Cond>("condition")())
        , data_(*sm_.find_or_construct<DataStructure>("data")(
              sm_.get_segment_manager()))
    { }

    // ... 

  private:
    std::string    name_;
    Segment        sm_;
    Mutex&         mx_;
    Cond&          cv_;
    DataStructure& data_;
};
Run Code Online (Sandbox Code Playgroud)

现在让我们小心一点。因为我们希望队列上的所有操作data_都是同步的,所以我们不会像以前那样公开它(使用get()成员函数)。相反,我们公开了我们支持的确切操作接口:

size_t queue_length() const;
void enqueue(std::string message); // blocking when queue at max_capacity
std::string dequeue();             // blocking dequeue
std::optional<std::string> try_dequeue(); // non-blocking dequeue
Run Code Online (Sandbox Code Playgroud)

这些都根据需要进行锁定,就像您所期望的那样:

size_t queue_length() const {
    ip::scoped_lock<Mutex> lk(mx_);
    return data_.size();
}
Run Code Online (Sandbox Code Playgroud)

潜在的阻塞操作变得更有趣。我选择了最大容量,所以enqueue需要等待容量:

// blocking when queue at max_capacity
void enqueue(std::string message) {
    ip::scoped_lock<Mutex> lk(mx_);
    cv_.wait(lk, [this] { return data_.size() < max_capacity; });

    data_.emplace_back(std::move(message));
    cv_.notify_one();
}
Run Code Online (Sandbox Code Playgroud)

相反,dequeue需要等待消息可用:

// blocking dequeue
std::string dequeue() {
    ip::scoped_lock<Mutex> lk(mx_);
    cv_.wait(lk, [this] { return not data_.empty(); });

    return do_pop();
}
Run Code Online (Sandbox Code Playgroud)

或者,您可以使其成为非阻塞,只需选择返回一个值:

// non-blocking dequeue
std::optional<std::string> try_dequeue() {
    ip::scoped_lock<Mutex> lk(mx_);

    if (data_.empty())
        return std::nullopt;
    return do_pop();
}
Run Code Online (Sandbox Code Playgroud)

现在在 main 中我们有三个版本:writer、reader 和 Continuous reader(后者演示了阻塞接口):

#include <boost/interprocess/allocators/allocator.hpp>
#include <boost/interprocess/managed_shared_memory.hpp>
#include <boost/interprocess/sync/interprocess_condition_any.hpp>
#include <boost/interprocess/sync/interprocess_mutex.hpp>
#include <boost/interprocess/sync/scoped_lock.hpp>

#include <boost/container/scoped_allocator.hpp>
#include <boost/interprocess/containers/deque.hpp>
#include <boost/interprocess/containers/string.hpp>

#include <iostream>
#include <iomanip>
#include <optional>

namespace ip = boost::interprocess;
namespace bc = boost::container;

namespace Shared {

    using Segment = ip::managed_shared_memory;
    using Mgr     = Segment::segment_manager;
    template <typename T>
    using Alloc = bc::scoped_allocator_adaptor<ip::allocator<T, Mgr>>;
    template <typename T> using Deque = ip::deque<T, Alloc<T>>;
    using String = ip::basic_string<char, std::char_traits<char>, Alloc<char>>;

    using DataStructure = Deque<String>;
    using Mutex         = ip::interprocess_mutex;
    using Cond          = ip::interprocess_condition;

    class Memory {
        static constexpr size_t max_capacity = 100;
      public:
        Memory(const char* name, size_t size)
            : name_(name)
            , sm_(ip::open_or_create, name, size)
            , mx_(*sm_.find_or_construct<Mutex>("mutex")())
            , cv_(*sm_.find_or_construct<Cond>("condition")())
            , data_(*sm_.find_or_construct<DataStructure>("data")(
                  sm_.get_segment_manager()))
        { }

        size_t queue_length() const {
            ip::scoped_lock<Mutex> lk(mx_);
            return data_.size(); // caution: racy by design!
        }

        // blocking when queue at max_capacity
        void enqueue(std::string message) {
            ip::scoped_lock<Mutex> lk(mx_);
            cv_.wait(lk, [this] { return data_.size() < max_capacity; });

            data_.emplace_back(std::move(message));

            cv_.notify_one();
        }

        // blocking dequeue
        std::string dequeue() {
            ip::scoped_lock<Mutex> lk(mx_);
            cv_.wait(lk, [this] { return not data_.empty(); });

            return do_pop();
        }

        // non-blocking dequeue
        std::optional<std::string> try_dequeue() {
            ip::scoped_lock<Mutex> lk(mx_);

            if (data_.empty())
                return std::nullopt;
            return do_pop();
        }

      private:
        std::string    name_;
        Segment        sm_;
        Mutex&         mx_;
        Cond&          cv_;
        DataStructure& data_;

        // Assumes mx_ locked by current thread!
        std::string do_pop() {
            auto&& tmp = std::move(data_.front());
            data_.pop_front();
            cv_.notify_all(); // any of the waiters might be a/the writer
            return std::string(tmp.begin(), tmp.end());
        }
    };

} // namespace Shared

int main(int argc, char**)
{
    Shared::Memory mem("SharedMemory", 10*1024*1024);

    switch (argc) {
    case 1:
        mem.enqueue("Hello");
        mem.enqueue("World");
        std::cout << "Total queued: " << mem.queue_length() << "\n";
        break;
    case 2:
        std::cout << "Found entries: " << mem.queue_length() << "\n";
        while (auto msg = mem.try_dequeue()) {
            std::cout << "Dequeued " << *msg << "\n";
        }
        break;
    case 3: 
        std::cout << "Continuous reader\n";
        while (true) {
            std::cout << "Dequeued " << mem.dequeue() << "\n";
        }
        break;
    }
}
Run Code Online (Sandbox Code Playgroud)

小演示:

在此输入图像描述

总结、注意事项

请注意,上述内容还有一些未解决的问题。值得注意的是,Boost Interprocess 中缺乏强大的锁,需要额外小心才能在不持有锁的情况下正确关闭。

我建议ip::message_queue也与以下内容进行对比: