标签: rust-tokio

使用 tokio 0.1.x 生成具有非静态生命周期的任务

我有一个 tokio 核心,其主要任务是运行 websocket(客户端)。当我从服务器收到一些消息时,我想执行一个新任务来更新一些数据。下面是一个最小的失败示例:

use tokio_core::reactor::{Core, Handle};
use futures::future::Future;
use futures::future;

struct Client {
    handle: Handle,
    data: usize,
}

impl Client {
    fn update_data(&mut self) {
        // spawn a new task that updates the data
        self.handle.spawn(future::ok(()).and_then(|x| {
            self.data += 1; // error here
            future::ok(())
        }));
    }
}

fn main() {
    let mut runtime = Core::new().unwrap();

    let mut client = Client {
        handle: runtime.handle(),
        data: 0,
    };

    let task = future::ok::<(), ()>(()).and_then(|_| {
        // under some conditions (omitted), we update the …
Run Code Online (Sandbox Code Playgroud)

lifetime rust rust-tokio

6
推荐指数
1
解决办法
2667
查看次数

带有特征的 tokio-async-await

我想在 trait 中编写异步函数,但由于async fn尚不支持 in trait,我试图找到等效的方法接口。这是我每晚在 Rust 中尝试过的(2019-01-01):

操场

#![feature(await_macro, async_await, futures_api)]
#[macro_use]
extern crate tokio;
use tokio::prelude::*;

trait T {
    async fn f();
}

fn main() {
}
Run Code Online (Sandbox Code Playgroud)
error[E0706]: trait fns cannot be declared `async`
 --> src/main.rs:7:5
  |
7 |     async fn f();
  |     ^^^^^^^^^^^^^
Run Code Online (Sandbox Code Playgroud)

我在某处读到async只是impl Future.

trait T {
    fn f() -> impl futures::Future<Item = (), Error = ()>;
}
Run Code Online (Sandbox Code Playgroud)
error[E0562]: `impl Trait` not allowed outside of function and inherent method return …
Run Code Online (Sandbox Code Playgroud)

rust async-await rust-tokio

6
推荐指数
1
解决办法
2089
查看次数

期货::选择有什么区别!和 tokio::select?

我正在使用 Tokio,我想接收来自两个不同mpsc队列的请求。select!似乎是要走的路,但我不确定futures::select!和之间有什么区别tokio::select!。在哪种情况下,您应该使用一种而不是另一种?

asynchronous rust rust-tokio

6
推荐指数
1
解决办法
983
查看次数

如何将超响应正文写入文件?

我正在尝试使用 tokio 编写一个测试程序,该程序从网站上获取文件并将流式响应写入文件。超级网站显示了一个示例,该示例使用 while 循环并使用.data()响应主体的方法,但我想与.map()其他几个人一起操作流。

我认为下一个合理的尝试是AsyncRead使用.into_async_read()from 方法将流转换为 an TryStreamExt,但这似乎不起作用。我不得不使用地图将 the 转换hyper::error::Error为 astd::error::Error以获取 a TryStream,但现在编译器告诉我AsyncRead没有为转换后的流实现。这是我的 main.rs 文件和错误:

src/main.rs

use futures::stream::{StreamExt, TryStreamExt};
use http::Request;
use hyper::{Body, Client};
use hyper_tls::HttpsConnector;
use tokio::fs::File;
use tokio::io;

use std::error::Error;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    let https = HttpsConnector::new();
    let client = Client::builder().build::<_, Body>(https);

    let request = Request::get("some file from the internet").body(Body::empty())?;
    let response = client.request(request).await?;

    let …
Run Code Online (Sandbox Code Playgroud)

hyper rust-tokio

6
推荐指数
1
解决办法
779
查看次数

为什么迭代器的实现在异步上下文中不够通用?

在github上交叉发布

鉴于以下代码段:

use futures::stream::{self, StreamExt};

async fn from_bar(bar: &[Vec<&u8>]) {
    let x = bar.iter().flat_map(|i| i.iter().map(|_| async { 42 }));
    let foo: Vec<_> = stream::iter(x).collect().await;
}

#[tokio::main]
async fn main() {
    for bar in vec![] {
        tokio::spawn(async {
            from_bar(bar).await;
        });
    }
}
Run Code Online (Sandbox Code Playgroud)

我收到以下错误:

error[E0308]: mismatched types
  --> src/main.rs:11:9
   |
11 |         tokio::spawn(async {
   |         ^^^^^^^^^^^^ one type is more general than the other
   |
   = note: expected type `std::ops::FnOnce<(&&u8,)>`
              found type `std::ops::FnOnce<(&&u8,)>`

error: implementation of `std::iter::Iterator` is not general …
Run Code Online (Sandbox Code Playgroud)

rust rust-tokio

6
推荐指数
1
解决办法
454
查看次数

如何在另一个 Tokio 运行时内创建 Tokio 运行时而不会出现“无法从运行时内启动运行时”的错误?

rust_bert用于总结文本。我需要用 设置一个模型rust_bert::pipelines::summarization::SummarizationModel::new,它从互联网上获取模型。它以异步方式执行此操作,tokio并且(我认为)我遇到的问题是我正在另一个 Tokio 运行时中运行 Tokio 运行时,如错误消息所示:

Downloading https://cdn.huggingface.co/facebook/bart-large-cnn/config.json to "/home/(censored)/.cache/.rustbert/bart-cnn/config.json"
thread 'main' panicked at 'Cannot start a runtime from within a runtime. This happens because a function (like `block_on`) attempted to block the current thread while the thread is being used to drive asynchronous tasks.', /home/(censored)/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.21/src/runtime/enter.rs:38:5
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
Run Code Online (Sandbox Code Playgroud)

我试过与模型同步运行 tokio::task::spawn_blockingtokio::task::block_in_place 但它们都不适合我。block_in_place给出了同样的错误,就像是不存在的,并spawn_blocking没有真正似乎是用我的。我也尝试过summarize_text异步,但这并没有多大帮助。Github 问题 tokio-rs/tokio#2194 和 Reddit 发布 …

rust rust-tokio

6
推荐指数
2
解决办法
3583
查看次数

我可以通过在两个异步接收器上调用 select 来错过一个值吗?

是否有可能,如果一个任务发送到a另一个(同时)发送到b,通过取消剩余的未来来tokio::select!打开ab删除一个值?还是保证在下一次循环迭代时收到?

use tokio::sync::mpsc::Receiver;

async fn foo(mut a: Receiver<()>, mut b: Receiver<()>) {
    loop {
        tokio::select!{
            _ = a.recv() => {
                println!("A!");
            }
            _ = b.recv() => {
                println!("B!");
            }
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

async在那种情况下,我的思绪无法绕过魔法背后真正发生的事情。

rust async-await rust-tokio

6
推荐指数
1
解决办法
277
查看次数

为什么 Tokio 返回错误“不能在不允许阻塞的上下文中删除运行时”?

我有一个与远程服务器通信的 Tokio 客户端,并且应该保持连接永久有效。我已经实现了初始身份验证握手,发现当我的测试终止时,我得到了一个奇怪的恐慌:

---- test_connect_without_database stdout ----
thread 'test_connect_without_database' panicked at 'Cannot drop a runtime in a context where blocking is not allowed. This happens when a runtime is dropped from within an asynchronous context.', /playground/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.3.5/src/runtime/blocking/shutdown.rs:51:21
Run Code Online (Sandbox Code Playgroud)

对于可能导致这种情况的原因,我完全不知所措,所以我不知道要为上下文添加哪些其他代码位。

这是我最小的可重现示例(playground):

use std::cell::RefCell;
use std::net::{IpAddr, SocketAddr};
use tokio::net::TcpStream;
use tokio::prelude::*;
use tokio::runtime;

#[derive(PartialEq, Debug)]
pub struct Configuration {
    /// Database username.
    username: String,
    /// Database password.
    password: String,
    /// Database name.
    db_name: String,
    /// IP address for the remove server.
    address: …
Run Code Online (Sandbox Code Playgroud)

multithreading rust rust-tokio

6
推荐指数
1
解决办法
1724
查看次数

如何在特征中定义异步方法?

我有一个特征,我用它来抽象tokio::net::TcpStreamtokio::net::UnixStream

/// Interface for TcpStream and UnixStream.
trait TryRead {
  // overlapping the name makes it hard to work with
  fn do_try_read(&self, buf: &mut [u8]) -> Result<usize, std::io::Error>;
}

impl TryRead for TcpStream {
  fn do_try_read(&self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
      self.try_read(buf)
  }
}
Run Code Online (Sandbox Code Playgroud)

问题是我想pub async fn readable(&self) -> io::Result<()>在这两种方法中都抽象出来,但是无法在特征中实现异步方法。我该如何处理?

rust rust-tokio

6
推荐指数
1
解决办法
1582
查看次数

从 Tokio 应用程序使用 Actix:混合 actix_web::main 和 tokio::main?

目前我有一个像异步示例Reqwest一样编写的主要内容。

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
Run Code Online (Sandbox Code Playgroud)

我们可以使用那里的确切示例。现在我想基本上添加一个-l <port>标志来改变我的应用程序的行为,当以这种方式触发时,我希望它侦听端口并运行 Web 服务器。我想使用这样记录的Actix Web

#[actix_web::main]
async fn main() -> std::io::Result<()> {
Run Code Online (Sandbox Code Playgroud)

我如何综合两个fn main:一个装饰#[actix_web::main]和一个装饰以#[tokio::main]从已经使用 Tokio 的应用程序中使用 Actix Web?我找不到有关此的任何文档?我们如何从 Actix Web 服务器使用 Tokio 的东西,我们如何将 Tokio 应用程序移植到 Actix Web 应用程序?

asynchronous rust rust-tokio actix-web

6
推荐指数
1
解决办法
1937
查看次数