标签: rust-tokio

使用 Hyper 同时获取多个 URL

我正在尝试调整Hyper basic 客户端示例以同时获取多个 URL。

这是我目前拥有的代码:

extern crate futures;
extern crate hyper;
extern crate tokio_core;

use std::io::{self, Write};
use std::iter;
use futures::{Future, Stream};
use hyper::Client;
use tokio_core::reactor::Core;

fn get_url() {
    let mut core = Core::new().unwrap();
    let client = Client::new(&core.handle());
    let uris: Vec<_> = iter::repeat("http://httpbin.org/ip".parse().unwrap()).take(50).collect();
    for uri in uris {
        let work = client.get(uri).and_then(|res| {
            println!("Response: {}", res.status());

            res.body().for_each(|chunk| {
                io::stdout()
                    .write_all(&chunk)
                    .map_err(From::from)
            })
        });
        core.run(work).unwrap();
    }
}

fn main() {
    get_url();
}
Run Code Online (Sandbox Code Playgroud)

它似乎没有同时进行(需要很长时间才能完成),我是否以错误的方式将工作交给了核心?

rust hyper rust-tokio

3
推荐指数
1
解决办法
2467
查看次数

在盒装未来中使用引用变量时“需要显式生命周期”

我正在尝试使用创建的结构main()并将其传递给返回盒装Future. 然而,我遇到了终身和借贷问题,似乎无法彻底解决这个问题。

这是我的结构和函数:

extern crate futures; // 0.1.21
extern crate tokio_core; // 0.1.17

use futures::{future::ok, Future};

pub struct SomeStruct {
    some_val: u32,
}

impl SomeStruct {
    pub fn do_something(&self, value: u32) -> u32 {
        // Do some work
        return self.some_val + value;
    }
}

fn main() {
    let core = tokio_core::reactor::Core::new().unwrap();
    let my_struct = SomeStruct { some_val: 10 };

    let future = get_future(&my_struct);
    core.run(future);

    let future2 = get_future(&my_struct);
    core.run(future2);
}

fn get_future(some_struct: &SomeStruct) -> Box<Future<Item = …
Run Code Online (Sandbox Code Playgroud)

future lifetime rust borrowing rust-tokio

3
推荐指数
1
解决办法
7264
查看次数

在未来的实现中手动轮询流

我正在迁移到futures0.3 和tokio0.2,并且有一种重复出现的模式我无法重复使用。我不确定这种模式是否已经过时,或者我是否做错了什么Pin

通常我有一种类型,可以容纳一个套接字和一些通道接收器。此类结构的实现Future包括重复轮询流,直到它们返回PendingNotReady在 0.1 生态系统中)。

然而,在 futures 0.3 中,用Future::polland代替,这种模式不再起作用:Stream::poll_nextself&mut self

use futures::{
    stream::Stream,
    task::{Context, Poll},
    Future,
};
use std::pin::Pin;
use tokio::sync::mpsc::{Receiver, Sender};

/// Dummy structure that represent some state we update when we
/// receive data or events.
struct State;

impl State {
    fn update(&mut self, _data: Vec<u8>) {
        println!("updated state");
    }
    fn handle_event(&mut self, _event: u32) {
        println!("handled event");
    }
}

/// …
Run Code Online (Sandbox Code Playgroud)

asynchronous future rust rust-tokio

3
推荐指数
1
解决办法
2606
查看次数

使用线程和异步/等待时如何解决“无法返回引用本地数据的值”?

我正在学习 Rust,特别是并行的多线程和异步请求。

我阅读了文档,但仍然不明白我在哪里犯了错误。我想我知道在哪里,但不知道如何解决它。

主程序.rs

use std::thread;

struct Request {
    url: String,
}

impl Request {
    fn new(name: &str) -> Request {
        Request {
            url: name.to_string(),
        }
    }

    async fn call(&self, x: &str) -> Result<(), Box<dyn std::error::Error>> {
        let resp = reqwest::get(x).await;
        Ok(())
    }
}

#[tokio::main]
async fn main() {
    let requests = vec![
        Request::new("https://www.google.com/"),
        Request::new("https://www.google.com/"),
    ];
    let handles: Vec<_> = requests
        .into_iter()
        .map(|request| {
            thread::spawn(move || async {
                request.call(&request.url).await;
            })
        })
        .collect();

    for y in handles {
        println!("{:?}", y);
    } …
Run Code Online (Sandbox Code Playgroud)

rust async-await rust-tokio reqwest

3
推荐指数
1
解决办法
1759
查看次数

使用 tokio-tungstenite 时如何获得标题?

我正在尝试使用tokio-tungstenite crate 创建基于 URL 的聊天室。例如,我有一个客户端连接到ws://localhost:8080/abcd. 我的理解是,我必须使用该tokio_tungstenite::accept_hdr_async函数来访问标头才能获取/abcd路径,但我在使用它时遇到了问题。我的第二个论点应该copy_headers_callback是什么?

我的代码基于这个例子

use std::{
    collections::HashMap,
    env,
    io::Error as IoError,
    net::SocketAddr,
    sync::{Arc, Mutex},
    marker::Unpin,
};

use futures_channel::mpsc::{unbounded, UnboundedSender};
use futures_util::{future, pin_mut, stream::TryStreamExt, StreamExt};

use tokio::net::{TcpListener, TcpStream};
use tungstenite::{
    protocol::Message,
    handshake::server::{Request},
};

type Sender = UnboundedSender<Message>;
type PeerMap = Arc<Mutex<HashMap<SocketAddr, Sender>>>;

use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize)]
struct BroadcastJsonStruct {
    message: String,
    sender_addr: SocketAddr,
}

async fn handle_connection(peer_map: PeerMap, raw_stream: TcpStream, client_addr: SocketAddr) {
    println!("Incoming TCP connection …
Run Code Online (Sandbox Code Playgroud)

header websocket rust rust-tokio

3
推荐指数
1
解决办法
5212
查看次数

如何从未来的函数中实现流

为了理解流是如何工作的,我试图实现一个使用 random.org 的无限数生成器。我做的第一件事是实现一个版本,在该版本中我将调用一个名为 get_number 的异步函数,它将填充缓冲区并返回下一个可能的数字:


struct RandomGenerator {
    buffer: Vec<u8>,
    position: usize,
}

impl RandomGenerator {
    pub fn new() -> RandomGenerator {
        Self {
            buffer: Vec::new(),
            position: 0,
        }
    }

    pub async fn get_number(&mut self) -> u8 {
        self.fill_buffer().await;

        let value = self.buffer[self.position];
        self.position += 1;

        value
    }

    async fn fill_buffer(&mut self) {
        if self.buffer.is_empty() || self.is_buffer_depleted() {
            let new_numbers = self.fetch_numbers().await;
            drop(replace(&mut self.buffer, new_numbers));
            self.position = 0;
        }
    }

    fn is_buffer_depleted(&self) -> bool {
        self.buffer.len() >= self.position
    }

    async …
Run Code Online (Sandbox Code Playgroud)

rust rust-tokio rust-async-std

3
推荐指数
1
解决办法
2745
查看次数

并发执行任务时不能将filter_map与buffer_unordered一起使用吗?

我正在查看这个示例,以便在 Rust 中同时下载内容。

粗略地说,它看起来像这样:

#![feature(async_closure)]

use futures::{stream, StreamExt}; // 0.3.13

async fn foo() {
    let xs_new = stream::once(async { 42 })
        .map(async move |x| {
            Some(x + 1)
        })
        .buffer_unordered(42);
}
Run Code Online (Sandbox Code Playgroud)

但是,我希望用来filter_map做这样的事情:

#![feature(async_closure)]

use futures::{stream, StreamExt}; // 0.3.13

async fn foo() {
    let xs_new = stream::once(async { 42 })
        .filter_map(async move |x| if x % 2 == 0 { Some(x + 1) } else { None })
        .buffer_unordered(42);
}
Run Code Online (Sandbox Code Playgroud)

然而,这会失败并出现错误:“{integer} 不是 Future,该特征...未针对 {integer} 实现”。

有谁知道为什么filter_map …

rust rust-tokio

3
推荐指数
1
解决办法
1573
查看次数

如何从标准库生成的线程向 Tokio 异步任务发送消息?

我有一个设置,我的程序使用std::thread::spawn.

我需要一个 GRPC 服务器来处理传入的命令以及工作线程完成的流输出。我用于tonicGRPC 服务器,它仅在 Tokio future 内提供异步实现。

我需要能够从我的“正常”标准库线程向 Tokio 未来发送消息。

我在这里将我的代码简化为最低限度:

use std::thread;
use tokio::sync::mpsc; // 1.9.0

fn main() {
    let (tx, mut rx) = mpsc::channel(1);

    let tokio_runtime = tokio::runtime::Runtime::new().unwrap();
    tokio_runtime.spawn(async move {
        // the code below starts the GRPC server in reality, here I'm just demonstrating trying to receive a message
        while let Some(v) = rx.recv().await {}
    });

    let h = thread::spawn(move || {
        // do work
        tx.send(1).await; //<------ error occurs here since I …
Run Code Online (Sandbox Code Playgroud)

multithreading rust rust-tokio rust-tonic

3
推荐指数
1
解决办法
1241
查看次数

我可以克隆未来吗?

我想为未来编写一些通用的重试逻辑。

我知道具体的返回类型,并且想在未来重试相同的操作。

我的代码只能访问未来 - 我不想将每个 fn 调用站点包装在闭包中以启用重新创建它。

看起来“future”是 (fn, args) 的组合,当.await被调用时,它会运行并等待结果。

如果我能够克隆所有参数,是否可以创建未启动的未来的克隆,以便在第一次失败时重试?

rust rust-tokio

3
推荐指数
1
解决办法
1689
查看次数

限制 join_all!() 中并发 future 的数量

如何让 Rust 执行所有给定的 future(例如join_all!)限制为一次执行 10 个 future?

我需要从大量服务器下载文件,但同时查询不超过 10 台服务器(以准确测量它们的超时:如果我一次查询太多服务器,它们就会超时,即使服务器本身很快)。

concurrency rust rust-tokio rust-futures

3
推荐指数
1
解决办法
2865
查看次数