我正在尝试调整Hyper basic 客户端示例以同时获取多个 URL。
这是我目前拥有的代码:
extern crate futures;
extern crate hyper;
extern crate tokio_core;
use std::io::{self, Write};
use std::iter;
use futures::{Future, Stream};
use hyper::Client;
use tokio_core::reactor::Core;
fn get_url() {
let mut core = Core::new().unwrap();
let client = Client::new(&core.handle());
let uris: Vec<_> = iter::repeat("http://httpbin.org/ip".parse().unwrap()).take(50).collect();
for uri in uris {
let work = client.get(uri).and_then(|res| {
println!("Response: {}", res.status());
res.body().for_each(|chunk| {
io::stdout()
.write_all(&chunk)
.map_err(From::from)
})
});
core.run(work).unwrap();
}
}
fn main() {
get_url();
}
Run Code Online (Sandbox Code Playgroud)
它似乎没有同时进行(需要很长时间才能完成),我是否以错误的方式将工作交给了核心?
我正在尝试使用创建的结构main()并将其传递给返回盒装Future. 然而,我遇到了终身和借贷问题,似乎无法彻底解决这个问题。
这是我的结构和函数:
extern crate futures; // 0.1.21
extern crate tokio_core; // 0.1.17
use futures::{future::ok, Future};
pub struct SomeStruct {
some_val: u32,
}
impl SomeStruct {
pub fn do_something(&self, value: u32) -> u32 {
// Do some work
return self.some_val + value;
}
}
fn main() {
let core = tokio_core::reactor::Core::new().unwrap();
let my_struct = SomeStruct { some_val: 10 };
let future = get_future(&my_struct);
core.run(future);
let future2 = get_future(&my_struct);
core.run(future2);
}
fn get_future(some_struct: &SomeStruct) -> Box<Future<Item = …Run Code Online (Sandbox Code Playgroud) 我正在迁移到futures0.3 和tokio0.2,并且有一种重复出现的模式我无法重复使用。我不确定这种模式是否已经过时,或者我是否做错了什么Pin。
通常我有一种类型,可以容纳一个套接字和一些通道接收器。此类结构的实现Future包括重复轮询流,直到它们返回Pending(NotReady在 0.1 生态系统中)。
然而,在 futures 0.3 中,用Future::polland代替,这种模式不再起作用:Stream::poll_nextself&mut self
use futures::{
stream::Stream,
task::{Context, Poll},
Future,
};
use std::pin::Pin;
use tokio::sync::mpsc::{Receiver, Sender};
/// Dummy structure that represent some state we update when we
/// receive data or events.
struct State;
impl State {
fn update(&mut self, _data: Vec<u8>) {
println!("updated state");
}
fn handle_event(&mut self, _event: u32) {
println!("handled event");
}
}
/// …Run Code Online (Sandbox Code Playgroud) 我正在学习 Rust,特别是并行的多线程和异步请求。
我阅读了文档,但仍然不明白我在哪里犯了错误。我想我知道在哪里,但不知道如何解决它。
主程序.rs
use std::thread;
struct Request {
url: String,
}
impl Request {
fn new(name: &str) -> Request {
Request {
url: name.to_string(),
}
}
async fn call(&self, x: &str) -> Result<(), Box<dyn std::error::Error>> {
let resp = reqwest::get(x).await;
Ok(())
}
}
#[tokio::main]
async fn main() {
let requests = vec![
Request::new("https://www.google.com/"),
Request::new("https://www.google.com/"),
];
let handles: Vec<_> = requests
.into_iter()
.map(|request| {
thread::spawn(move || async {
request.call(&request.url).await;
})
})
.collect();
for y in handles {
println!("{:?}", y);
} …Run Code Online (Sandbox Code Playgroud) 我正在尝试使用tokio-tungstenite crate 创建基于 URL 的聊天室。例如,我有一个客户端连接到ws://localhost:8080/abcd. 我的理解是,我必须使用该tokio_tungstenite::accept_hdr_async函数来访问标头才能获取/abcd路径,但我在使用它时遇到了问题。我的第二个论点应该copy_headers_callback是什么?
我的代码基于这个例子:
use std::{
collections::HashMap,
env,
io::Error as IoError,
net::SocketAddr,
sync::{Arc, Mutex},
marker::Unpin,
};
use futures_channel::mpsc::{unbounded, UnboundedSender};
use futures_util::{future, pin_mut, stream::TryStreamExt, StreamExt};
use tokio::net::{TcpListener, TcpStream};
use tungstenite::{
protocol::Message,
handshake::server::{Request},
};
type Sender = UnboundedSender<Message>;
type PeerMap = Arc<Mutex<HashMap<SocketAddr, Sender>>>;
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize)]
struct BroadcastJsonStruct {
message: String,
sender_addr: SocketAddr,
}
async fn handle_connection(peer_map: PeerMap, raw_stream: TcpStream, client_addr: SocketAddr) {
println!("Incoming TCP connection …Run Code Online (Sandbox Code Playgroud) 为了理解流是如何工作的,我试图实现一个使用 random.org 的无限数生成器。我做的第一件事是实现一个版本,在该版本中我将调用一个名为 get_number 的异步函数,它将填充缓冲区并返回下一个可能的数字:
struct RandomGenerator {
buffer: Vec<u8>,
position: usize,
}
impl RandomGenerator {
pub fn new() -> RandomGenerator {
Self {
buffer: Vec::new(),
position: 0,
}
}
pub async fn get_number(&mut self) -> u8 {
self.fill_buffer().await;
let value = self.buffer[self.position];
self.position += 1;
value
}
async fn fill_buffer(&mut self) {
if self.buffer.is_empty() || self.is_buffer_depleted() {
let new_numbers = self.fetch_numbers().await;
drop(replace(&mut self.buffer, new_numbers));
self.position = 0;
}
}
fn is_buffer_depleted(&self) -> bool {
self.buffer.len() >= self.position
}
async …Run Code Online (Sandbox Code Playgroud) 我正在查看这个示例,以便在 Rust 中同时下载内容。
粗略地说,它看起来像这样:
#![feature(async_closure)]
use futures::{stream, StreamExt}; // 0.3.13
async fn foo() {
let xs_new = stream::once(async { 42 })
.map(async move |x| {
Some(x + 1)
})
.buffer_unordered(42);
}
Run Code Online (Sandbox Code Playgroud)
但是,我希望用来filter_map做这样的事情:
#![feature(async_closure)]
use futures::{stream, StreamExt}; // 0.3.13
async fn foo() {
let xs_new = stream::once(async { 42 })
.filter_map(async move |x| if x % 2 == 0 { Some(x + 1) } else { None })
.buffer_unordered(42);
}
Run Code Online (Sandbox Code Playgroud)
然而,这会失败并出现错误:“{integer} 不是 Future,该特征...未针对 {integer} 实现”。
有谁知道为什么filter_map …
我有一个设置,我的程序使用std::thread::spawn.
我需要一个 GRPC 服务器来处理传入的命令以及工作线程完成的流输出。我用于tonicGRPC 服务器,它仅在 Tokio future 内提供异步实现。
我需要能够从我的“正常”标准库线程向 Tokio 未来发送消息。
我在这里将我的代码简化为最低限度:
use std::thread;
use tokio::sync::mpsc; // 1.9.0
fn main() {
let (tx, mut rx) = mpsc::channel(1);
let tokio_runtime = tokio::runtime::Runtime::new().unwrap();
tokio_runtime.spawn(async move {
// the code below starts the GRPC server in reality, here I'm just demonstrating trying to receive a message
while let Some(v) = rx.recv().await {}
});
let h = thread::spawn(move || {
// do work
tx.send(1).await; //<------ error occurs here since I …Run Code Online (Sandbox Code Playgroud) 我想为未来编写一些通用的重试逻辑。
我知道具体的返回类型,并且想在未来重试相同的操作。
我的代码只能访问未来 - 我不想将每个 fn 调用站点包装在闭包中以启用重新创建它。
看起来“future”是 (fn, args) 的组合,当.await被调用时,它会运行并等待结果。
如果我能够克隆所有参数,是否可以创建未启动的未来的克隆,以便在第一次失败时重试?
如何让 Rust 执行所有给定的 future(例如join_all!)限制为一次执行 10 个 future?
我需要从大量服务器下载文件,但同时查询不超过 10 台服务器(以准确测量它们的超时:如果我一次查询太多服务器,它们就会超时,即使服务器本身很快)。
rust ×10
rust-tokio ×10
future ×2
async-await ×1
asynchronous ×1
borrowing ×1
concurrency ×1
header ×1
hyper ×1
lifetime ×1
reqwest ×1
rust-futures ×1
rust-tonic ×1
websocket ×1