Websocket客户端消息有效负载"活得不够久"

Nat*_*hen 2 rust

我修改了现有的websocket客户端以保存来自websocket服务器的消息有效负载:

fn main()
{
    let mut globaltest = "";

    // ...some othercode


    let receive_loop = thread::spawn(move || {
        // Receive loop
        for message in receiver.incoming_messages() {
            let message: Message = match message {
                Ok(m) => m,
                Err(e) => {
                    println!("Receive Loop: {:?}", e);
                    let _ = tx_1.send(Message::close());
                    return;
                }
            };
            match message.opcode {
                Type::Close => {
                    // Got a close message, so send a close message and return
                    let _ = tx_1.send(Message::close());
                    return;
                }
                Type::Ping => match tx_1.send(Message::pong(message.payload)) {
                    // Send a pong in response
                    Ok(()) => (),
                    Err(e) => {
                        println!("Receive Loop: {:?}", e);
                        return;
                    }
                },
                // Say what we received
                _ => {
                    println!("Receive Loop: {:?}", message);
                        // recive from motion
                        let buf = &message.payload;
                        {
                            let s = match str::from_utf8(buf) {
                                Ok(v) => v,
                                Err(e) => panic!("Invalid UTF-8 sequence: {}", e),
                            };
                            println!("{}",s);

                            ////>>> problem here <<<
                            globaltest = s;
                        }

                    },
            }
        }
    });


    // ...some othercode

}
Run Code Online (Sandbox Code Playgroud)

当我构建时,我收到一条错误消息:

nathaniel@nathaniel-virtual-machine:~/rustcoderep/rsdummywsclient$ sudo cargo build
   Compiling rsdummywsclient v0.1.0 (file:///home/nathaniel/rustcoderep/rsdummywsclient)
src/main.rs:94:36: 94:51 error: `message.payload` does not live long enough
src/main.rs:94                         let buf = &message.payload;
                                                  ^~~~~~~~~~~~~~~
note: reference must be valid for the static lifetime...
src/main.rs:75:6: 106:4 note: ...but borrowed value is only valid for the block suffix following statement 0 at 75:5
src/main.rs:75          };
src/main.rs:76          match message.opcode {
src/main.rs:77              Type::Close => {
src/main.rs:78                  // Got a close message, so send a close message and return
src/main.rs:79                  let _ = tx_1.send(Message::close());
src/main.rs:80                  return;
               ...
error: aborting due to previous error
Run Code Online (Sandbox Code Playgroud)

我不知道为什么.我尝试了很多类似的解决方案ArcMutex,但他们没有工作:(

当我删除时globaltest = s,代码构建和运行没有问题.所以我试着写一个更简单的例子:

use std::str;
use std::thread;

fn main() {

    let mut y = 2;

    let receive_loop = thread::spawn(move || {
         let x = 1;

         y = x;
          println!("tt{:?}",y);
    });

    let receive_loop2 = thread::spawn(move || {

          println!("tt2{:?}",y);
    });

  println!("{:?}",y);

}
Run Code Online (Sandbox Code Playgroud)

这有效...具有几乎相同的结构.

这是完整的代码,与rust-websocket客户端示例略有不同:

extern crate websocket;

fn main() {
    use std::thread;
    use std::sync::mpsc::channel;
    use std::io::stdin;
    use std::str;
    use websocket::{Message, Sender, Receiver};
    use websocket::message::Type;
    use websocket::client::request::Url;
    use websocket::Client;

    let mut globaltest ="";

    let url = Url::parse("ws://127.0.0.1:2794").unwrap();

    println!("Connecting to {}", url);

    let request = Client::connect(url).unwrap();

    let response = request.send().unwrap(); // Send the request and retrieve a response

    println!("Validating response...");

    response.validate().unwrap(); // Validate the response

    println!("Successfully connected");

    let (mut sender, mut receiver) = response.begin().split();

    let (tx, rx) = channel();

    let tx_1 = tx.clone();

    let send_loop = thread::spawn(move || {
        loop {
            // Send loop
            let message: Message = match rx.recv() {
                Ok(m) => m,
                Err(e) => {
                    println!("Send Loop: {:?}", e);
                    return;
                }
            };
            match message.opcode {
                Type::Close => {
                    let _ = sender.send_message(&message);
                    // If it's a close message, just send it and then return.
                    return;
                },
                _ => (),
            }
            // Send the message
            match sender.send_message(&message) {
                Ok(()) => (),
                Err(e) => {
                    println!("Send Loop: {:?}", e);
                    let _ = sender.send_message(&Message::close());
                    return;
                }
            }
        }
    });

    let receive_loop = thread::spawn(move || {
        // Receive loop
        for message in receiver.incoming_messages() {
            let message: Message = match message {
                Ok(m) => m,
                Err(e) => {
                    println!("Receive Loop: {:?}", e);
                    let _ = tx_1.send(Message::close());
                    return;
                }
            };
            match message.opcode {
                Type::Close => {
                    // Got a close message, so send a close message and return
                    let _ = tx_1.send(Message::close());
                    return;
                }
                Type::Ping => match tx_1.send(Message::pong(message.payload)) {
                    // Send a pong in response
                    Ok(()) => (),
                    Err(e) => {
                        println!("Receive Loop: {:?}", e);
                        return;
                    }
                },
                // Say what we received
                _ => {
                    println!("Receive Loop: {:?}", message);
                        // recive from motion
                        let buf = &message.payload;
                        {
                            let s = match str::from_utf8(buf) {
                                Ok(v) => v,
                                Err(e) => panic!("Invalid UTF-8 sequence: {}", e),
                            };
                            println!("{}",s);
                            globaltest = s;
                        }

                    },
            }
        }
    });

    loop {
        let mut input = String::new();

        stdin().read_line(&mut input).unwrap();

        let trimmed = input.trim();

        let message = match trimmed {
            "/close" => {
                // Close the connection
                let _ = tx.send(Message::close());
                break;
            }
            // Send a ping
            "/ping" => Message::ping(b"PING".to_vec()),
            // Otherwise, just send text
            _ => Message::text(trimmed.to_string()),
        };

        match tx.send(message) {
            Ok(()) => (),
            Err(e) => {
                println!("Main Loop: {:?}", e);
                break;
            }
        }
    }

    // We're exiting

    println!("Waiting for child threads to exit");

    let _ = send_loop.join();
    let _ = receive_loop.join();

    println!("Exited");
}
Run Code Online (Sandbox Code Playgroud)

@fjh谢谢你的回复!

我修改了代码到这一点,改变globaltestreceive_loop从主线程循环访问它.我仍然得到一个令人困惑的错误,即使花了三个小时后我仍然无法解决它:(

fn main() {

    let mut globaltest:Arc<Mutex<String>> = Arc::new(Mutex::new(String::from("")));

    //some other code...

    let receive_loop = thread::spawn(move || {

        // Receive loop
        for message in receiver.incoming_messages() {
            let message: Message = match message {
                Ok(m) => m,
                Err(e) => {
                    println!("Receive Loop: {:?}", e);
                    let _ = tx_1.send(Message::close());
                    return;
                }
            };
            match message.opcode {
                Type::Close => {
                    // Got a close message, so send a close message and return
                    let _ = tx_1.send(Message::close());
                    return;
                }
                Type::Ping => match tx_1.send(Message::pong(message.payload)) {
                    // Send a pong in response
                    Ok(()) => (),
                    Err(e) => {
                        println!("Receive Loop: {:?}", e);
                        return;
                    }
                },
                // Say what we received
                _ => { 
                        let mut globaltest_child = globaltest.lock().unwrap();

                        println!("Receive Loop: {:?}", message);
                        // recive from motion
                        let buf = &message.payload;
                        {
                            let s = match str::from_utf8(buf) {
                                Ok(v) => v,
                                Err(e) => panic!("Invalid UTF-8 sequence: {}", e),
                            };

                            {
                                //>>> if I do like this, globaltest value will same like globaltest_child??
                                *globaltest_child = String::from(s); 
                                println!("{:?}",globaltest_child.clone()); 
                            }
                        } 
                    },
            }
        }
    });

    loop { 
        let message = Message::text("mtconnect");

        match tx.send(message) {
            Ok(()) => (),
            Err(e) => {
                println!("Main Loop: {:?}", e);
                break;
            }
        }

        ///>>> problem here////
        println!("{:?}",globaltest.clone()); 
        thread::sleep(time::Duration::from_millis(3000)); 
    } 
}
Run Code Online (Sandbox Code Playgroud)

编译器总是告诉我:

athaniel@nathaniel-virtual-machine:~/rustcoderep/rsadapter$ sudo cargo run
   Compiling rsadapter v0.1.0 (file:///home/nathaniel/rustcoderep/rsadapter)
src/main.rs:166:25: 166:35 error: use of moved value: `globaltest` [E0382]
src/main.rs:166         println!("{:?}",globaltest.clone()); 
                                        ^~~~~~~~~~
<std macros>:2:25: 2:56 note: in this expansion of format_args!
<std macros>:3:1: 3:54 note: in this expansion of print! (defined in <std macros>)
src/main.rs:166:9: 166:45 note: in this expansion of println! (defined in <std macros>)
src/main.rs:166:25: 166:35 help: run `rustc --explain E0382` to see a detailed explanation
src/main.rs:102:35: 153:3 note: `globaltest` moved into closure environment here because it has type `alloc::arc::Arc<std::sync::mutex::Mutex<collections::string::String>>`, which is non-copyable
src/main.rs:102     let receive_loop = thread::spawn(move || {
src/main.rs:103         
src/main.rs:104       
src/main.rs:105         // Receive loop
src/main.rs:106         for message in receiver.incoming_messages() {
src/main.rs:107             let message: Message = match message {
                ...
src/main.rs:102:35: 153:3 help: perhaps you meant to use `clone()`?
error: aborting due to previous error
Run Code Online (Sandbox Code Playgroud)

我仍然无法访问globaltest另一个线程.

fjh*_*fjh 5

当我删除时globaltest = s,代码构建和运行没有问题.

是的,因为这项任务不安全.您正在尝试从其他线程中修改主线程中声明的局部变量.这可能会导致各种各样的问题,如数据竞争,这也就是为什么,编译器不会让你这么做.

很难说解决这个问题的最佳方法是在不了解您想要做什么的情况下更多.话虽这么说,你很可能通过使解决这一问题globaltestArc<Mutex<String>>,而不是一个&str,所以你可以放心地从两个线程访问它.