Tim*_*onk 4 multithreading rust
我试图通过Arc一个接收器引用一个线程,所以我可以通过调度程序集中发布pub-sub.但是,我收到以下错误:
src/dispatcher.rs:58:11: 58:24 error: the trait `core::marker::Sync` is not implemented for the type `core::cell::UnsafeCell<std::sync::mpsc::Flavor<dispatcher::DispatchMessage>>` [E0277]
src/dispatcher.rs:58 thread::spawn(move || {
^~~~~~~~~~~~~
src/dispatcher.rs:58:11: 58:24 note: `core::cell::UnsafeCell<std::sync::mpsc::Flavor<dispatcher::DispatchMessage>>` cannot be shared between threads safely
src/dispatcher.rs:58 thread::spawn(move || {
Run Code Online (Sandbox Code Playgroud)
笏!我认为只有Send跨越频道才需要?代码DispatchMessage是:
#[derive(PartialEq, Debug, Clone)]
enum DispatchType {
ChangeCurrentChannel,
OutgoingMessage,
IncomingMessage
}
#[derive(Clone)]
struct DispatchMessage {
dispatch_type: DispatchType,
payload: String
}
Run Code Online (Sandbox Code Playgroud)
两者String肯定Enum都是Send,对吧?为什么抱怨Sync?
调度员的相关部分:
pub fn start(&self) {
let shared_subscribers = Arc::new(self.subscribers);
for ref broadcaster in &self.broadcasters {
let shared_broadcaster = Arc::new(Mutex::new(broadcaster));
let broadcaster = shared_broadcaster.clone();
let subscribers = shared_subscribers.clone();
thread::spawn(move || {
loop {
let message = &broadcaster.lock().unwrap().recv().ok().expect("Couldn't receive message in broadcaster");
match subscribers.get(type_to_str(&message.dispatch_type)) {
Some(ref subs) => {
for sub in subs.iter() { sub.send(*message).unwrap(); }
},
None => ()
}
}
});
}
}
Run Code Online (Sandbox Code Playgroud)
完整的调度程序代码在这个要点:https://gist.github.com/timonv/5cdc56bf671cee69d3fa
如果它仍然相关,建立在每晚5-2-2015.
Arc需要Sync,而且在我看来,你正试图将频道放入其中Arc.渠道不是Sync,也不Sender是Receiver.
在不知道你想要做什么的情况下,这里有一些可能对你有帮助的事情:
Sender,所以在这里你可能会Arc一个T和多个线程之间共享它,你可以代替克隆Sender并将其发送到多个线程,因为它是 SendReceiver你无法克隆的),你必须把它粘在一个里面Arc<Mutex<T>>,这就是它Sync.尽管 Jorge 在一般意义上是正确的,但此特定代码的问题在于创建 Arc Mutex 需要参数的所有权,因此不能作为引用。仔细想想,这是有道理的。你怎么能锁定不属于你的东西呢?或者更具体地说,我们需要锁定该内存位置上的所有内容,而不是指向它的指针。
当广播器添加到结构中时,更改代码以创建 Arc Mutex 可以解决问题。这会将这部分代码更改为:
pub fn register_broadcaster(&mut self, broadcaster: &mut Broadcast) {
let handle = Arc::new(Mutex::new(broadcaster.broadcast_handle()));
self.broadcasters.push(handle);
}
Run Code Online (Sandbox Code Playgroud)
然后调度程序的 start 方法将如下所示:
pub fn start(&self) {
// Assuming that broadcasters.clone() copies the vector, but increase ref count on els
for broadcaster in self.broadcasters.clone() {
let subscribers = self.subscribers.clone();
thread::spawn(move || {
loop {
let message = broadcaster.lock().unwrap().recv().ok().expect("Couldn't receive message in broadcaster or channel hung up");
match subscribers.get(type_to_str(&message.dispatch_type)) {
Some(ref subs) => {
for sub in subs.iter() { sub.send(message.clone()).unwrap(); }
},
None => ()
}
}
});
}
}
Run Code Online (Sandbox Code Playgroud)