移动接收器线程抱怨同步,但预期发送

Tim*_*onk 4 multithreading rust

我试图通过Arc一个接收器引用一个线程,所以我可以通过调度程序集中发布pub-sub.但是,我收到以下错误:

src/dispatcher.rs:58:11: 58:24 error: the trait `core::marker::Sync` is not implemented for the type `core::cell::UnsafeCell<std::sync::mpsc::Flavor<dispatcher::DispatchMessage>>` [E0277]
src/dispatcher.rs:58           thread::spawn(move || {
                               ^~~~~~~~~~~~~
src/dispatcher.rs:58:11: 58:24 note: `core::cell::UnsafeCell<std::sync::mpsc::Flavor<dispatcher::DispatchMessage>>` cannot be shared between threads safely
src/dispatcher.rs:58           thread::spawn(move || {
Run Code Online (Sandbox Code Playgroud)

笏!我认为只有Send跨越频道才需要?代码DispatchMessage是:

#[derive(PartialEq, Debug, Clone)]
enum DispatchType {
    ChangeCurrentChannel,
    OutgoingMessage,
    IncomingMessage
}

#[derive(Clone)]
struct DispatchMessage {
   dispatch_type: DispatchType,
   payload: String
}
Run Code Online (Sandbox Code Playgroud)

两者String肯定Enum都是Send,对吧?为什么抱怨Sync

调度员的相关部分:

pub fn start(&self) {
   let shared_subscribers = Arc::new(self.subscribers);
   for ref broadcaster in &self.broadcasters {
      let shared_broadcaster = Arc::new(Mutex::new(broadcaster));

      let broadcaster = shared_broadcaster.clone();
      let subscribers = shared_subscribers.clone();
      thread::spawn(move || {
         loop {
            let message = &broadcaster.lock().unwrap().recv().ok().expect("Couldn't receive message in broadcaster");
            match subscribers.get(type_to_str(&message.dispatch_type)) {
              Some(ref subs) => { 
                  for sub in subs.iter() { sub.send(*message).unwrap(); }
              },
              None => ()
            }

         }
      });
   }
}
Run Code Online (Sandbox Code Playgroud)

完整的调度程序代码在这个要点:https://gist.github.com/timonv/5cdc56bf671cee69d3fa

如果它仍然相关,建立在每晚5-2-2015.

Jor*_*eña 6

Arc需要Sync,而且在我看来,你正试图将频道放入其中Arc.渠道不是Sync,也不SenderReceiver.

在不知道你想要做什么的情况下,这里有一些可能对你有帮助的事情:

  • 这是可能的克隆Sender,所以在这里你可能会Arc一个T和多个线程之间共享它,你可以代替克隆Sender并将其发送到多个线程,因为它 Send
  • 否则(特别是对于Receiver你无法克隆的),你必须把它粘在一个里面Arc<Mutex<T>>,这就是它Sync.


Tim*_*onk 0

尽管 Jorge 在一般意义上是正确的,但此特定代码的问题在于创建 Arc Mutex 需要参数的所有权,因此不能作为引用。仔细想想,这是有道理的。你怎么能锁定不属于你的东西呢?或者更具体地说,我们需要锁定该内存位置上的所有内容,而不是指向它的指针。

当广播器添加到结构中时,更改代码以创建 Arc Mutex 可以解决问题。这会将这部分代码更改为:

pub fn register_broadcaster(&mut self, broadcaster: &mut Broadcast) {
   let handle = Arc::new(Mutex::new(broadcaster.broadcast_handle()));
   self.broadcasters.push(handle);
}
Run Code Online (Sandbox Code Playgroud)

然后调度程序的 start 方法将如下所示:

pub fn start(&self) {
   // Assuming that broadcasters.clone() copies the vector, but increase ref count on els
   for broadcaster in self.broadcasters.clone() {
      let subscribers = self.subscribers.clone();
      thread::spawn(move || {
         loop {
            let message = broadcaster.lock().unwrap().recv().ok().expect("Couldn't receive message in broadcaster or channel hung up");
            match subscribers.get(type_to_str(&message.dispatch_type)) {
              Some(ref subs) => { 
                  for sub in subs.iter() { sub.send(message.clone()).unwrap(); }
              },
              None => ()
            }

         }
      });
   }
}
Run Code Online (Sandbox Code Playgroud)