标签: rust-tokio

Hyper 中的共享可变状态

我正在尝试在 Hyper Web 服务器中创建一个计数器来计算它收到的请求数。我正在使用 aArc<Mutex<u64>>来保持计数。但是,我一直无法弄清楚闭包的正确组合move.clone()满足闭包的类型。这是一些编译代码,但在每个请求上重置计数器:

extern crate hyper;

use hyper::rt::Future;
use hyper::service::service_fn_ok;
use hyper::{Body, Response, Server};
use std::sync::{Arc, Mutex};

fn main() {
    let addr = "0.0.0.0:3000".parse().unwrap();
    // FIXME want to create the counter here, not below
    let server = Server::bind(&addr)
        .serve(|| {
            service_fn_ok(|_req| {
                let counter = Arc::new(Mutex::new(0));
                use_counter(counter)
            })
        })
        .map_err(|e| eprintln!("Error: {}", e));
    hyper::rt::run(server)
}

fn use_counter(counter: Arc<Mutex<u64>>) -> Response<Body> {
    let mut data = counter.lock().unwrap();
    *data += 1;
    Response::new(Body::from(format!("Counter: {}\n", data))) …
Run Code Online (Sandbox Code Playgroud)

rust hyper rust-tokio

3
推荐指数
1
解决办法
760
查看次数

如何异步探索目录及其子目录?

我需要探索一个目录及其所有子目录。我可以通过递归以同步方式轻松浏览目录:

use failure::Error;
use std::fs;
use std::path::Path;

fn main() -> Result<(), Error> {
    visit(Path::new("."))
}

fn visit(path: &Path) -> Result<(), Error> {
    for e in fs::read_dir(path)? {
        let e = e?;
        let path = e.path();
        if path.is_dir() {
            visit(&path)?;
        } else if path.is_file() {
            println!("File: {:?}", path);
        }
    }
    Ok(())
}
Run Code Online (Sandbox Code Playgroud)

当我尝试使用tokio_fs以下异步方式执行相同操作时:

use failure::Error; // 0.1.6
use futures::Future; // 0.1.29
use std::path::PathBuf;
use tokio::{fs, prelude::*}; // 0.1.22

fn visit(path: PathBuf) -> impl Future<Item = (), Error = Error> …
Run Code Online (Sandbox Code Playgroud)

filesystems asynchronous rust rust-tokio

3
推荐指数
1
解决办法
1507
查看次数

如何使用 tokio::fs 复制文件

我正在尝试使用 tokio 复制文件以进行异步操作。我看到 tokio 没有公开任何tokio::fs::copy可以为我完成工作的方法(例如std::fs::copy同步操作的等效方法)。

在尝试实现这样的方法时,我实际上无法使用 来创建文件tokio::fs::File::create,即以下代码不会创建任何文件:

tokio::fs::File::open("src.txt")
    .and_then(|mut file| {
        let mut content = Vec::new();
        file.read_buf(&mut content)
            .map(move |_| tokio::fs::File::create("dest.txt"))
    })
    .map_err(Error::from)
    .map(drop);
Run Code Online (Sandbox Code Playgroud)

如何复制src.txtdest.txt使用 tokio 和 asyncfs方法?

这是游乐场的链接

asynchronous file rust rust-tokio

3
推荐指数
1
解决办法
611
查看次数

有没有办法创建一个异步流生成器来产生重复调用函数的结果?

我想构建一个收集天气更新并将它们表示为流的程序。我想get_weather()在一个无限循环中调用,在finishstart之间有 60 秒的延迟。

简化版本如下所示:

async fn get_weather() -> Weather { /* ... */ }

fn get_weather_stream() -> impl futures::Stream<Item = Weather> {
    loop {
        tokio::timer::delay_for(std::time::Duration::from_secs(60)).await;
        let weather = get_weather().await;
        yield weather; // This is not supported
        // Note: waiting for get_weather() stops the timer and avoids overflows.
    }
}
Run Code Online (Sandbox Code Playgroud)

有没有办法轻松做到这一点?

超过 60 秒tokio::timer::Interval时使用将不起作用get_weather()

fn get_weather_stream() -> impl futures::Stream<Item = Weather> {
    tokio::timer::Interval::new_with_delay(std::time::Duration::from_secs(60))
        .then(|| get_weather())
}
Run Code Online (Sandbox Code Playgroud)

如果发生这种情况,下一个功能将立即启动。我想在上一次get_weather()开始和下一次get_weather()开始之间保持 …

asynchronous rust async-await rust-tokio

3
推荐指数
1
解决办法
1236
查看次数

特性 `std::future::Future` 没有为 `std::result::Result&lt;reqwest::Response, reqwest::Error&gt;` 实现

我正在尝试运行基本reqwest 示例

extern crate reqwest;
extern crate tokio;

#[tokio::main]
async fn main() -> Result<(), reqwest::Error> {
    let res = reqwest::Client::new()
        .get("https://hyper.rs")
        .send()
        .await?;

    println!("Status: {}", res.status());

    let body = res.text().await?;

    println!("Body:\n\n{}", body);

    Ok(())
}
Run Code Online (Sandbox Code Playgroud)

我得到的错误:

   --> src/main.rs:6:15
    |
6   |       let res = reqwest::Client::new()
    |  _______________^
7   | |         .get("https://hyper.rs")
8   | |         .send()
9   | |         .await?;
    | |______________^ the trait `std::future::Future` is not implemented for `std::result::Result<reqwest::Response, reqwest::Error>`
Run Code Online (Sandbox Code Playgroud)

锈版: rustc 1.39.0 (4560ea788 2019-11-04)

库版本:

reqwest = "0.9.22" …
Run Code Online (Sandbox Code Playgroud)

rust async-await rust-tokio reqwest

3
推荐指数
1
解决办法
1804
查看次数

为什么`tokio::main`会报“处理时检测到循环”的错误?

我正在使用 Tokio 和async/.await来创建一个 UDP 服务器,我可以在其中以异步方式接收和发送数据。

SendHalf我的UDP套接字的跨多个任务共享。为此,我正在使用Arc<Mutex<SendHalf>>. 这就是为什么Arc<Mutex<_>>存在。

use tokio::net::UdpSocket;
use tokio::net::udp::SendHalf;
use tokio::sync::mpsc;
use std::sync::{Arc, Mutex};
use std::net::SocketAddr;

struct Packet {
    sender: Arc<Mutex<SendHalf>>,
    buf: [u8; 512],
    addr: SocketAddr,
}

#[tokio::main]
async fn main() {
    let server = UdpSocket::bind(("0.0.0.0", 44667)).await.unwrap();
    let (mut server_rx, mut server_tx) = server.split();
    let sender = Arc::new(Mutex::new(server_tx));
    let (mut tx, mut rx) = mpsc::channel(100);

    tokio::spawn(async move {
        loop {
            let mut buffer = [0; 512];
            let (_, …
Run Code Online (Sandbox Code Playgroud)

udp rust async-await rust-tokio

3
推荐指数
1
解决办法
695
查看次数

特性绑定`tokio::net::tcp::stream::TcpStream: tokio_io::async_read::AsyncRead` 不满足

我无法编译一个简单的应用程序来测试 tokio-codec。tokio::net::tcp::stream::TcpStream 实现了 AsyncRead 和 -Write。但是当我尝试编译下面的代码时,出现以下错误。我还是 Rust 和 Tokio 的新手,所以毫无疑问我错过了一些明显的东西(我希望)......

主.rs:

use tokio::net::TcpListener;
use tokio::prelude::*;
use tokio_codec::{ Framed, LinesCodec };


#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
  let mut listener = TcpListener::bind("127.0.0.1:12321").await?;

  loop {
    let (socket, _addr) = listener.accept().await?;

    tokio::spawn(async move {
      let (_sink, mut stream) = Framed::new(socket, LinesCodec::new()).split();

      while let Some(Ok(line)) = stream.next().await {
        println!("{:?}", line);
      }
    });
  }
}
Run Code Online (Sandbox Code Playgroud)

货物.toml:

[dependencies]
tokio = { version = "0.2.6", features = ["full"] }
tokio-codec = "0.1.1"
Run Code Online (Sandbox Code Playgroud)

输出:

error[E0277]: …
Run Code Online (Sandbox Code Playgroud)

asynchronous compiler-errors traits rust rust-tokio

3
推荐指数
1
解决办法
1544
查看次数

如何将 Future 转换为 Stream?

我正在尝试使用async_std从网络接收 UDP 数据报。

有一个UdpSocket实现async recv_from,这个方法返回一个未来,但我需要一个async_std::stream::Stream提供UDP数据报流的,因为它是一个更好的抽象。

我发现tokio::net::UdpFramed这正是我需要的,但它在当前版本的 tokio 中不可用。

一般来说,问题是如何将Futures 从给定的异步函数转换为Stream

asynchronous rust rust-tokio

3
推荐指数
1
解决办法
1148
查看次数

如何在循环中生成异步方法?

我有有一个对象的矢量resolve()的方法,它使用reqwest来查询外部web API。在resolve()每个对象上调用该方法后,我想打印每个请求的结果。

这是我编译和工作的半异步代码(但不是真正异步的):

for mut item in items {
    item.resolve().await;

    item.print_result();
}
Run Code Online (Sandbox Code Playgroud)

我试图用来tokio::join!产生所有异步调用并等待它们完成,但我可能做错了什么:

tokio::join!(items.iter_mut().for_each(|item| item.resolve()));
Run Code Online (Sandbox Code Playgroud)

这是我得到的错误:

error[E0308]: mismatched types
  --> src\main.rs:25:51
   |
25 |     tokio::join!(items.iter_mut().for_each(|item| item.resolve()));
   |                                                   ^^^^^^^^^^^^^^ expected `()`, found opaque type
   | 
  ::: src\redirect_definition.rs:32:37
   |
32 |     pub async fn resolve(&mut self) {
   |                                     - the `Output` of this `async fn`'s found opaque type
   |
   = note: expected unit type `()`
            found opaque type `impl std::future::Future`
Run Code Online (Sandbox Code Playgroud)

如何一次调用resolve()所有实例的方法?


这段代码反映了答案——现在我正在处理我不太理解的借用检查器错误——我应该用 …

rust async-await rust-tokio

3
推荐指数
1
解决办法
4651
查看次数

使用 Tokio 0.2 生成非静态未来

我有一个异步方法应该并行执行一些期货,并且只有在所有期货完成后才返回。但是,它通过引用传递了一些寿命不长的数据'static(它将在主方法中的某个时刻被删除)。从概念上讲,它类似于(Playground):

async fn do_sth(with: &u64) {
    delay_for(Duration::new(*with, 0)).await;
    println!("{}", with);
}

async fn parallel_stuff(array: &[u64]) {
    let mut tasks: Vec<JoinHandle<()>> = Vec::new();
    for i in array {
        let task = spawn(do_sth(i));
        tasks.push(task);
    }
    for task in tasks {
        task.await;
    }
}

#[tokio::main]
async fn main() {
    parallel_stuff(&[3, 1, 4, 2]);
}
Run Code Online (Sandbox Code Playgroud)

现在,tokio 希望传递给的期货spawn'static一生中有效,因为我可以在不停止未来的情况下放下句柄。这意味着我上面的示例会产生此错误消息:

error[E0759]: `array` has an anonymous lifetime `'_` but it needs to satisfy a `'static` lifetime requirement
  --> src/main.rs:12:25
   | …
Run Code Online (Sandbox Code Playgroud)

lifetime rust async-await rust-tokio

3
推荐指数
1
解决办法
640
查看次数