我正在尝试在 Hyper Web 服务器中创建一个计数器来计算它收到的请求数。我正在使用 aArc<Mutex<u64>>来保持计数。但是,我一直无法弄清楚闭包的正确组合move和.clone()满足闭包的类型。这是一些编译代码,但在每个请求上重置计数器:
extern crate hyper;
use hyper::rt::Future;
use hyper::service::service_fn_ok;
use hyper::{Body, Response, Server};
use std::sync::{Arc, Mutex};
fn main() {
let addr = "0.0.0.0:3000".parse().unwrap();
// FIXME want to create the counter here, not below
let server = Server::bind(&addr)
.serve(|| {
service_fn_ok(|_req| {
let counter = Arc::new(Mutex::new(0));
use_counter(counter)
})
})
.map_err(|e| eprintln!("Error: {}", e));
hyper::rt::run(server)
}
fn use_counter(counter: Arc<Mutex<u64>>) -> Response<Body> {
let mut data = counter.lock().unwrap();
*data += 1;
Response::new(Body::from(format!("Counter: {}\n", data))) …Run Code Online (Sandbox Code Playgroud) 我需要探索一个目录及其所有子目录。我可以通过递归以同步方式轻松浏览目录:
use failure::Error;
use std::fs;
use std::path::Path;
fn main() -> Result<(), Error> {
visit(Path::new("."))
}
fn visit(path: &Path) -> Result<(), Error> {
for e in fs::read_dir(path)? {
let e = e?;
let path = e.path();
if path.is_dir() {
visit(&path)?;
} else if path.is_file() {
println!("File: {:?}", path);
}
}
Ok(())
}
Run Code Online (Sandbox Code Playgroud)
当我尝试使用tokio_fs以下异步方式执行相同操作时:
use failure::Error; // 0.1.6
use futures::Future; // 0.1.29
use std::path::PathBuf;
use tokio::{fs, prelude::*}; // 0.1.22
fn visit(path: PathBuf) -> impl Future<Item = (), Error = Error> …Run Code Online (Sandbox Code Playgroud) 我正在尝试使用 tokio 复制文件以进行异步操作。我看到 tokio 没有公开任何tokio::fs::copy可以为我完成工作的方法(例如std::fs::copy同步操作的等效方法)。
在尝试实现这样的方法时,我实际上无法使用 来创建文件tokio::fs::File::create,即以下代码不会创建任何文件:
tokio::fs::File::open("src.txt")
.and_then(|mut file| {
let mut content = Vec::new();
file.read_buf(&mut content)
.map(move |_| tokio::fs::File::create("dest.txt"))
})
.map_err(Error::from)
.map(drop);
Run Code Online (Sandbox Code Playgroud)
如何复制src.txt到dest.txt使用 tokio 和 asyncfs方法?
这是游乐场的链接
我想构建一个收集天气更新并将它们表示为流的程序。我想get_weather()在一个无限循环中调用,在finish和start之间有 60 秒的延迟。
简化版本如下所示:
async fn get_weather() -> Weather { /* ... */ }
fn get_weather_stream() -> impl futures::Stream<Item = Weather> {
loop {
tokio::timer::delay_for(std::time::Duration::from_secs(60)).await;
let weather = get_weather().await;
yield weather; // This is not supported
// Note: waiting for get_weather() stops the timer and avoids overflows.
}
}
Run Code Online (Sandbox Code Playgroud)
有没有办法轻松做到这一点?
超过 60 秒tokio::timer::Interval时使用将不起作用get_weather():
fn get_weather_stream() -> impl futures::Stream<Item = Weather> {
tokio::timer::Interval::new_with_delay(std::time::Duration::from_secs(60))
.then(|| get_weather())
}
Run Code Online (Sandbox Code Playgroud)
如果发生这种情况,下一个功能将立即启动。我想在上一次get_weather()开始和下一次get_weather()开始之间保持 …
我正在尝试运行基本reqwest 示例:
extern crate reqwest;
extern crate tokio;
#[tokio::main]
async fn main() -> Result<(), reqwest::Error> {
let res = reqwest::Client::new()
.get("https://hyper.rs")
.send()
.await?;
println!("Status: {}", res.status());
let body = res.text().await?;
println!("Body:\n\n{}", body);
Ok(())
}
Run Code Online (Sandbox Code Playgroud)
我得到的错误:
--> src/main.rs:6:15
|
6 | let res = reqwest::Client::new()
| _______________^
7 | | .get("https://hyper.rs")
8 | | .send()
9 | | .await?;
| |______________^ the trait `std::future::Future` is not implemented for `std::result::Result<reqwest::Response, reqwest::Error>`
Run Code Online (Sandbox Code Playgroud)
锈版: rustc 1.39.0 (4560ea788 2019-11-04)
库版本:
reqwest = "0.9.22" …Run Code Online (Sandbox Code Playgroud) 我正在使用 Tokio 和async/.await来创建一个 UDP 服务器,我可以在其中以异步方式接收和发送数据。
在SendHalf我的UDP套接字的跨多个任务共享。为此,我正在使用Arc<Mutex<SendHalf>>. 这就是为什么Arc<Mutex<_>>存在。
use tokio::net::UdpSocket;
use tokio::net::udp::SendHalf;
use tokio::sync::mpsc;
use std::sync::{Arc, Mutex};
use std::net::SocketAddr;
struct Packet {
sender: Arc<Mutex<SendHalf>>,
buf: [u8; 512],
addr: SocketAddr,
}
#[tokio::main]
async fn main() {
let server = UdpSocket::bind(("0.0.0.0", 44667)).await.unwrap();
let (mut server_rx, mut server_tx) = server.split();
let sender = Arc::new(Mutex::new(server_tx));
let (mut tx, mut rx) = mpsc::channel(100);
tokio::spawn(async move {
loop {
let mut buffer = [0; 512];
let (_, …Run Code Online (Sandbox Code Playgroud) 我无法编译一个简单的应用程序来测试 tokio-codec。tokio::net::tcp::stream::TcpStream 实现了 AsyncRead 和 -Write。但是当我尝试编译下面的代码时,出现以下错误。我还是 Rust 和 Tokio 的新手,所以毫无疑问我错过了一些明显的东西(我希望)......
主.rs:
use tokio::net::TcpListener;
use tokio::prelude::*;
use tokio_codec::{ Framed, LinesCodec };
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut listener = TcpListener::bind("127.0.0.1:12321").await?;
loop {
let (socket, _addr) = listener.accept().await?;
tokio::spawn(async move {
let (_sink, mut stream) = Framed::new(socket, LinesCodec::new()).split();
while let Some(Ok(line)) = stream.next().await {
println!("{:?}", line);
}
});
}
}
Run Code Online (Sandbox Code Playgroud)
货物.toml:
[dependencies]
tokio = { version = "0.2.6", features = ["full"] }
tokio-codec = "0.1.1"
Run Code Online (Sandbox Code Playgroud)
输出:
error[E0277]: …Run Code Online (Sandbox Code Playgroud) 我正在尝试使用async_std从网络接收 UDP 数据报。
有一个UdpSocket实现async recv_from,这个方法返回一个未来,但我需要一个async_std::stream::Stream提供UDP数据报流的,因为它是一个更好的抽象。
我发现tokio::net::UdpFramed这正是我需要的,但它在当前版本的 tokio 中不可用。
一般来说,问题是如何将Futures 从给定的异步函数转换为Stream?
我有有一个对象的矢量resolve()的方法,它使用reqwest来查询外部web API。在resolve()每个对象上调用该方法后,我想打印每个请求的结果。
这是我编译和工作的半异步代码(但不是真正异步的):
for mut item in items {
item.resolve().await;
item.print_result();
}
Run Code Online (Sandbox Code Playgroud)
我试图用来tokio::join!产生所有异步调用并等待它们完成,但我可能做错了什么:
tokio::join!(items.iter_mut().for_each(|item| item.resolve()));
Run Code Online (Sandbox Code Playgroud)
这是我得到的错误:
error[E0308]: mismatched types
--> src\main.rs:25:51
|
25 | tokio::join!(items.iter_mut().for_each(|item| item.resolve()));
| ^^^^^^^^^^^^^^ expected `()`, found opaque type
|
::: src\redirect_definition.rs:32:37
|
32 | pub async fn resolve(&mut self) {
| - the `Output` of this `async fn`'s found opaque type
|
= note: expected unit type `()`
found opaque type `impl std::future::Future`
Run Code Online (Sandbox Code Playgroud)
如何一次调用resolve()所有实例的方法?
这段代码反映了答案——现在我正在处理我不太理解的借用检查器错误——我应该用 …
我有一个异步方法应该并行执行一些期货,并且只有在所有期货完成后才返回。但是,它通过引用传递了一些寿命不长的数据'static(它将在主方法中的某个时刻被删除)。从概念上讲,它类似于(Playground):
async fn do_sth(with: &u64) {
delay_for(Duration::new(*with, 0)).await;
println!("{}", with);
}
async fn parallel_stuff(array: &[u64]) {
let mut tasks: Vec<JoinHandle<()>> = Vec::new();
for i in array {
let task = spawn(do_sth(i));
tasks.push(task);
}
for task in tasks {
task.await;
}
}
#[tokio::main]
async fn main() {
parallel_stuff(&[3, 1, 4, 2]);
}
Run Code Online (Sandbox Code Playgroud)
现在,tokio 希望传递给的期货spawn在'static一生中有效,因为我可以在不停止未来的情况下放下句柄。这意味着我上面的示例会产生此错误消息:
error[E0759]: `array` has an anonymous lifetime `'_` but it needs to satisfy a `'static` lifetime requirement
--> src/main.rs:12:25
| …Run Code Online (Sandbox Code Playgroud) rust ×10
rust-tokio ×10
async-await ×5
asynchronous ×5
file ×1
filesystems ×1
hyper ×1
lifetime ×1
reqwest ×1
traits ×1
udp ×1