无法借用“Arc”中的可变数据

Lex*_*Lex 18 rust rust-tokio

我不知道下一步该做什么。看起来我误解了一些东西,或者我可能没有学到一些关键的话题。

use std::sync::Arc;

use reqwest::{Error, Response}; // 0.11.4
use tokio::sync::mpsc::{self, Receiver, Sender}; // 1.9.0

pub struct Task {
    pub id: u32,
    pub url: String,
}
pub enum Message {
    Failure(Task, Error),
    Success(Task, Response),
}

struct State {
    client: reqwest::Client,
    res_tx: Sender<Message>,
    res_rx: Receiver<Message>,
}

pub struct Proxy {
    state: Arc<State>,
    max_rps: u16,
    max_pending: u16,
    id: u32,
    parent_tx: Sender<String>,
}

async fn send_msg<T>(tx: &Sender<T>, msg: T) {
    match tx.send(msg).await {
        Err(error) => {
            eprintln!("{}", error)
        }
        _ => (),
    };
}

impl Proxy {
    // Starts loop for input channel
    async fn start_chin(&mut self) -> Sender<Task> {
        let (chin_tx, mut chin_rx) = mpsc::channel::<Task>(self.max_pending as usize + 1 as usize);
        let state_outer = self.state.clone();

        tokio::spawn(async move {
            loop {
                match chin_rx.recv().await {
                    Some(task) => {
                        let res_tx = state_outer.res_tx.clone();
                        let state = state_outer.clone();
                        tokio::spawn(async move {
                            match state.client.get(&task.url).send().await {
                                Ok(res) => send_msg(&res_tx, Message::Success(task, res)).await,
                                Err(err) => send_msg(&res_tx, Message::Failure(task, err)).await,
                            }
                        });
                    }
                    None => (),
                }
            }
        });
        chin_tx
    }

    async fn start_chres(&self) {
        let state = self.state.clone();

        tokio::spawn(async move {
            loop {
                match state.res_rx.recv().await { // LINE PRODUCES ERROR
                    Some(task) => {}
                    None => (),
                }
            }
        });
    }
}

impl Proxy {
    pub fn new(
        id: u32,
        parent_tx: Sender<String>,
        proxy_addr: &str,
        max_rps: u16,
        max_pending: u16,
    ) -> Result<Self, Error> {
        let client = reqwest::Client::builder();
        if proxy_addr != "none" {
            client = client.proxy(reqwest::Proxy::all(proxy_addr)?)
        }
        let (res_tx, res_rx) = mpsc::channel::<Message>(max_pending as usize + 1 as usize); // TODO: check size

        Ok(Proxy {
            id,
            state: Arc::new(State {
                client: client.build()?,
                res_tx,
                res_rx,
            }),
            max_rps,
            max_pending,
            parent_tx,
        })
    }
}
Run Code Online (Sandbox Code Playgroud)
use std::sync::Arc;

use reqwest::{Error, Response}; // 0.11.4
use tokio::sync::mpsc::{self, Receiver, Sender}; // 1.9.0

pub struct Task {
    pub id: u32,
    pub url: String,
}
pub enum Message {
    Failure(Task, Error),
    Success(Task, Response),
}

struct State {
    client: reqwest::Client,
    res_tx: Sender<Message>,
    res_rx: Receiver<Message>,
}

pub struct Proxy {
    state: Arc<State>,
    max_rps: u16,
    max_pending: u16,
    id: u32,
    parent_tx: Sender<String>,
}

async fn send_msg<T>(tx: &Sender<T>, msg: T) {
    match tx.send(msg).await {
        Err(error) => {
            eprintln!("{}", error)
        }
        _ => (),
    };
}

impl Proxy {
    // Starts loop for input channel
    async fn start_chin(&mut self) -> Sender<Task> {
        let (chin_tx, mut chin_rx) = mpsc::channel::<Task>(self.max_pending as usize + 1 as usize);
        let state_outer = self.state.clone();

        tokio::spawn(async move {
            loop {
                match chin_rx.recv().await {
                    Some(task) => {
                        let res_tx = state_outer.res_tx.clone();
                        let state = state_outer.clone();
                        tokio::spawn(async move {
                            match state.client.get(&task.url).send().await {
                                Ok(res) => send_msg(&res_tx, Message::Success(task, res)).await,
                                Err(err) => send_msg(&res_tx, Message::Failure(task, err)).await,
                            }
                        });
                    }
                    None => (),
                }
            }
        });
        chin_tx
    }

    async fn start_chres(&self) {
        let state = self.state.clone();

        tokio::spawn(async move {
            loop {
                match state.res_rx.recv().await { // LINE PRODUCES ERROR
                    Some(task) => {}
                    None => (),
                }
            }
        });
    }
}

impl Proxy {
    pub fn new(
        id: u32,
        parent_tx: Sender<String>,
        proxy_addr: &str,
        max_rps: u16,
        max_pending: u16,
    ) -> Result<Self, Error> {
        let client = reqwest::Client::builder();
        if proxy_addr != "none" {
            client = client.proxy(reqwest::Proxy::all(proxy_addr)?)
        }
        let (res_tx, res_rx) = mpsc::channel::<Message>(max_pending as usize + 1 as usize); // TODO: check size

        Ok(Proxy {
            id,
            state: Arc::new(State {
                client: client.build()?,
                res_tx,
                res_rx,
            }),
            max_rps,
            max_pending,
            parent_tx,
        })
    }
}
Run Code Online (Sandbox Code Playgroud)

Luc*_*lla 12

use std::sync::Arc;

struct Something {
    size: usize
}

impl Something {
    fn increase(&mut self) {
        self.size = self.size + 1;
    }
}

fn main() {
    let something = Something{size: 1};
    let arc = Arc::new(something);
    arc.increase();
}
Run Code Online (Sandbox Code Playgroud)

给出

error[E0596]: cannot borrow data in an `Arc` as mutable
  --> src/main.rs:16:5
   |
16 |     arc.increase();
   |     ^^^ cannot borrow as mutable
   |
   = help: trait `DerefMut` is required to modify through a dereference, but it is not implemented for `Arc<Something>`

error: aborting due to previous error; 1 warning emitted
Run Code Online (Sandbox Code Playgroud)

因为它试图借用arc可变的。为了实现它,DerefMut必须实现它Arc,但这不是因为它Arc不应该是可变的。

将你的对象包裹在Mutex作品中:

use std::sync::{Arc, Mutex};

struct Something {
    size: usize
}

impl Something {
    fn increase(&mut self) {
        self.size = self.size + 1;
    }
}

fn main() {
    let something = Something{size: 1};
    let arc = Arc::new(Mutex::new(something));
    arc.lock().unwrap().increase();
}
Run Code Online (Sandbox Code Playgroud)

现在可以共享并且可以增加。

  • 根据[此](https://github.com/actix/actix-website/issues/201),Actix 团队建议使用标准库互斥体。 (3认同)
  • 最好不要对异步应用程序使用“std::sync::Mutex”,因为它会阻塞整个系统线程,导致程序死锁。如果您使用“tokio”,那么最好使用“tokio::sync::Mutex”——它不会阻塞系统线程。 (2认同)

Lex*_*Lex 4

Lucas Zanella 的回答和 Shepmaster 的评论对重构和简化代码有很大帮助。我决定在函数内部传递所有权,Proxy::new()而不是使用共享引用。代码变得更具可读性,并且我避免了 mutable 的共享引用tokio::sync::mpsc::Receiver。也许这个问题太无组织性了,但感谢社区,我找到了一种新方法。下面列出了重构后的代码。

use reqwest::{Client, Error, Response};
use tokio::sync::mpsc;
use tokio::sync::mpsc::{Sender, Receiver};


pub struct Task {
    pub id: u32,
    pub url:  String,
}
pub enum Message{
    Failure(Task, Error),
    Success(Task, Response),
}
pub struct Proxy{
    id: u32,
    max_rps: u16,
    max_pending: u16,
    in_tx: Sender<Task>,
}


async fn send_msg<T>(tx: &Sender<T>, msg: T){
    match tx.send(msg).await {
        Err(error) => { eprintln!("{}", error) },
        _ => (),
    };
}


async fn start_loop_in(client: Client, mut in_rx: Receiver<Task>, res_tx: Sender<Message>){
    loop {
        if let Some(task) = in_rx.recv().await {
            let client_clone = client.clone();
            let res_tx_clone = res_tx.clone();
            tokio::spawn(async move {
                println!("SENDING: {}", &task.url); // TODO: DELETE DEBUG
                match client_clone.get(&task.url).send().await {
                    Ok(res) => send_msg(&res_tx_clone, Message::Success(task, res)).await,
                    Err(err) => send_msg(&res_tx_clone, Message::Failure(task, err)).await,
                }
            });
        }
    }
}


async fn start_loop_res(mut res_rx: Receiver<Message>, out_tx: Sender<String>){
    loop {
        if let Some(message) = res_rx.recv().await {
            match message {
                Message::Success(task, res) => { 
                    send_msg(
                        &out_tx, 
                        format!("{:#?}", res.text().await.unwrap()) // TODO: change in release!
                    ).await;
                },
                Message::Failure(task, err) => {
                    send_msg(&out_tx, err.to_string()).await;
                },
            }
        }
    }
}


impl Proxy{

    pub fn new(id: u32, parent_tx: Sender<String>, proxy_addr: &str, max_rps: u16, max_pending: u16) -> Result<Self, Error> {
        
        let mut client = Client::builder();
        if proxy_addr != "none" { client = client.proxy(reqwest::Proxy::all(proxy_addr)?) }
        let (res_tx, res_rx) = mpsc::channel::<Message>(max_pending as usize + 1 as usize); // TODO: check size

        let client = client.build()?;
        let (in_tx, in_rx) = mpsc::channel::<Task>(max_pending as usize + 1 as usize);
        let res_tx_clone = res_tx.clone();
        tokio::spawn(async move { start_loop_in(client, in_rx, res_tx_clone).await });

        tokio::spawn(async move { start_loop_res(res_rx, parent_tx).await });
        
        Ok(Proxy{
            id,
            max_rps,
            max_pending,
            in_tx,
        })
    }

    pub fn get_in_tx(&self) -> Sender<Task> {
        self.in_tx.clone()
    }
}
Run Code Online (Sandbox Code Playgroud)