使用 tokio 0.1.x 生成具有非静态生命周期的任务

Ben*_*ijl 6 lifetime rust rust-tokio

我有一个 tokio 核心,其主要任务是运行 websocket(客户端)。当我从服务器收到一些消息时,我想执行一个新任务来更新一些数据。下面是一个最小的失败示例:

use tokio_core::reactor::{Core, Handle};
use futures::future::Future;
use futures::future;

struct Client {
    handle: Handle,
    data: usize,
}

impl Client {
    fn update_data(&mut self) {
        // spawn a new task that updates the data
        self.handle.spawn(future::ok(()).and_then(|x| {
            self.data += 1; // error here
            future::ok(())
        }));
    }
}

fn main() {
    let mut runtime = Core::new().unwrap();

    let mut client = Client {
        handle: runtime.handle(),
        data: 0,
    };

    let task = future::ok::<(), ()>(()).and_then(|_| {
        // under some conditions (omitted), we update the data
        client.update_data();
        future::ok::<(), ()>(())
    });
    runtime.run(task).unwrap();
}
Run Code Online (Sandbox Code Playgroud)

产生此错误:

use tokio_core::reactor::{Core, Handle};
use futures::future::Future;
use futures::future;

struct Client {
    handle: Handle,
    data: usize,
}

impl Client {
    fn update_data(&mut self) {
        // spawn a new task that updates the data
        self.handle.spawn(future::ok(()).and_then(|x| {
            self.data += 1; // error here
            future::ok(())
        }));
    }
}

fn main() {
    let mut runtime = Core::new().unwrap();

    let mut client = Client {
        handle: runtime.handle(),
        data: 0,
    };

    let task = future::ok::<(), ()>(()).and_then(|_| {
        // under some conditions (omitted), we update the data
        client.update_data();
        future::ok::<(), ()>(())
    });
    runtime.run(task).unwrap();
}
Run Code Online (Sandbox Code Playgroud)

问题是通过句柄产生的新任务需要是静态的。此处描述相同的问题。可悲的是,我不清楚如何解决这个问题。即使使用 andArc和 a 进行了一些尝试Mutex(单线程应用程序确实不需要),我也没有成功。

由于 tokio 领域的发展很快,我想知道当前的最佳解决方案是什么。你有什么建议吗?

编辑

Peter Hall的解决方案适用于上述示例。可悲的是,当我构建失败的示例时,我更改了 tokio reactor,认为它们会相似。使用tokio::runtime::current_thread

use futures::future;
use futures::future::Future;
use futures::stream::Stream;
use std::cell::Cell;
use std::rc::Rc;
use tokio::runtime::current_thread::{Builder, Handle};

struct Client {
    handle: Handle,
    data: Rc<Cell<usize>>,
}

impl Client {
    fn update_data(&mut self) {
        // spawn a new task that updates the data
        let mut data = Rc::clone(&self.data);
        self.handle.spawn(future::ok(()).and_then(move |_x| {
            data.set(data.get() + 1);
            future::ok(())
        }));
    }
}

fn main() {
    // let mut runtime = Core::new().unwrap();

    let mut runtime = Builder::new().build().unwrap();

    let mut client = Client {
        handle: runtime.handle(),
        data: Rc::new(Cell::new(1)),
    };

    let task = future::ok::<(), ()>(()).and_then(|_| {
        // under some conditions (omitted), we update the data
        client.update_data();
        future::ok::<(), ()>(())
    });
    runtime.block_on(task).unwrap();
}
Run Code Online (Sandbox Code Playgroud)

我获得:

error[E0477]: the type `futures::future::and_then::AndThen<futures::future::result_::FutureResult<(), ()>, futures::future::result_::FutureResult<(), ()>, [closure@src/main.rs:13:51: 16:10 self:&mut &mut Client]>` does not fulfill the required lifetime
  --> src/main.rs:13:21                                                                                                                                                                
   |                                                                                                                                                                                   
13 |         self.handle.spawn(future::ok(()).and_then(|x| {                                                                                                                           
   |                     ^^^^^                                                                                                                                                         
   |                                                                                                                                                                                   
   = note: type must satisfy the static lifetime      
Run Code Online (Sandbox Code Playgroud)

所以在这种情况下ArcMutex即使整个代码是单线程的,我也需要一个和一个?

Pet*_*all 5

在单线程程序中,不需要使用Arc; Rc足够了:

use std::{rc::Rc, cell::Cell};

struct Client {
    handle: Handle,
    data: Rc<Cell<usize>>,
}

impl Client {
    fn update_data(&mut self) {
        let data = Rc::clone(&self.data);
        self.handle.spawn(future::ok(()).and_then(move |_x| {
            data.set(data.get() + 1);
            future::ok(())
        }));
    }
}
Run Code Online (Sandbox Code Playgroud)

关键是您不再需要担心生命周期,因为每个克隆的行为Rc就好像它拥有数据一样,而不是通过对 的引用来访问它self。需要内部Cell(或RefCell对于非类型),因为它已被克隆,因此不能可变地取消引用。CopyRc


spawn的方法要求tokio::runtime::current_thread::Handle未来是Send,这就是导致问题更新的原因。在这个 Tokio Github 问题中对为什么会出现这种情况有(某种程度的)解释。

您可以使用tokio::runtime::current_thread::spawn代替 的方法Handle,该方法将始终在当前线程中运行 future,并且不要求future 是Send。您可以替换self.handle.spawn上面的代码,它会正常工作。

如果您需要使用 on 方法,Handle那么您还需要诉诸Arcand Mutex(或RwLock) 来满足Send要求:

use std::sync::{Mutex, Arc};

struct Client {
    handle: Handle,
    data: Arc<Mutex<usize>>,
}

impl Client {
    fn update_data(&mut self) {
        let data = Arc::clone(&self.data);
        self.handle.spawn(future::ok(()).and_then(move |_x| {
            *data.lock().unwrap() += 1;
            future::ok(())
        }));
    }
}
Run Code Online (Sandbox Code Playgroud)

如果您的数据确实是usize,您也可以使用AtomicUsize代替Mutex<usize>,但我个人发现它同样难以使用。