我想轮询一个异步函数:
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
some_function().await;
}
Run Code Online (Sandbox Code Playgroud)
我目前正在激活所有功能:
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
some_function().await;
}
Run Code Online (Sandbox Code Playgroud)
其中哪些是必要的?
tokio = { version = "1.4.0", features = ["full"] }
Run Code Online (Sandbox Code Playgroud) 像tokio这样的 Rust 异步运行时提供了许多标准函数的“异步”副本,包括一些文件 IO 函数,它们基本上只是通过调用相应的阻塞任务(在新线程上?)来工作。此类函数的示例有tokio::fs::create_dir_all, tokio::fs::read_dir, tokio::fs::read, ...
所有这些功能有什么优点?为什么我应该更喜欢在异步上下文中使用它们而不是标准阻塞函数?如果我是.await他们的结果,有什么收获吗?
一个示例是异步 Web 路由,它根据查询返回某个文件的内容(使用Rocket):
\n#[get("/foo/<query>")]\nasync fn foo(query: &str) -> Option<String> {\n let file_path = // interpret `query` somehow and find the target file\n tokio::fs::read_to_string(file_path).await.ok()\n // ^ Why not just `std::fs::read_to_string(file_path).ok()`?\n}\nRun Code Online (Sandbox Code Playgroud)\nasync/.await我理解套接字 IO 或延迟任务(线程睡眠)的好处,但在这种情况下,这对我来说似乎毫无意义。但相反的 \xe2\x80\x94 这使得在代码中解决更复杂的任务变得更加困难(例如,在目录列表中搜索文件时使用流)。
我正在尝试用 Rust 制作一个多线程 tcp 通信程序
这个想法是主线程上存在一个侦听套接字,并且当连接进入时,工作由工作线程处理
我之前使用了 Rust 书中找到的 ThreadPool 方法,但据我了解,tokio 能够“自动”将工作分配给池中的线程
我对操作系统线程和 tokio 任务之间的区别感到困惑(主要是因为您用来spawn创建两者)
这是一些代码
fn main() {
println!("Hello World!");
let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 103, 7)), 2048);
println!("socket -> {}", socket);
// You need to use the tokio runtime to execute async functions
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
rt.block_on(async {
let listener = StartListen::new(socket).await.unwrap();
});
}
Run Code Online (Sandbox Code Playgroud)
我StartListen在另一个文件中定义了
// Defines the StartListen class
pub struct StartListen {
listener: TcpListener,
}
// Implementation for …Run Code Online (Sandbox Code Playgroud) 我正在尝试使用Tokio crate 在Rust中编写一个简单的TCP客户端.我的代码非常接近这个例子减去TLS:
extern crate futures;
extern crate tokio_core;
extern crate tokio_io;
use futures::Future;
use tokio_core::net::TcpStream;
use tokio_core::reactor::Core;
use tokio_io::io;
fn main() {
let mut core = Core::new().unwrap();
let handle = core.handle();
let connection = TcpStream::connect(&"127.0.0.1:8080".parse().unwrap(), &handle);
let server = connection.and_then(|stream| {
io::write_all(stream, b"hello");
});
core.run(server).unwrap();
}
Run Code Online (Sandbox Code Playgroud)
但是,编译失败并显示错误:
error[E0277]: the trait bound `(): futures::Future` is not satisfied
--> src/main.rs:16:29
|
16 | let server = connection.and_then(|stream| {
| ^^^^^^^^ the trait `futures::Future` is not implemented for `()` …Run Code Online (Sandbox Code Playgroud) 我有一个AsyncRead,想Stream<Item = tokio::io::Result<Bytes>>用 tokio 0.2 和 futures 0.3将它转换为一个。
我能做的最好的是:
use bytes::Bytes; // 0.4.12
use futures::stream::{Stream, TryStreamExt};; // 0.3.1
use tokio::{fs::File, io::Result}; // 0.2.4
use tokio_util::{BytesCodec, FramedRead}; // 0.2.0
#[tokio::main]
async fn main() -> Result<()> {
let file = File::open("some_file.txt").await?;
let stream = FramedRead::new(file, BytesCodec::new()).map_ok(|b| b.freeze());
fn_that_takes_stream(stream)
}
fn fn_that_takes_stream<S, O>(s: S) -> Result<()>
where
S: Stream<Item = Result<Bytes>>,
{
//...
Ok(())
}
Run Code Online (Sandbox Code Playgroud)
似乎应该有一种更简单的方法;我很惊讶 Tokio 没有包含一个编解码器来获取一个流Bytes而不是,BytesMut或者没有一个扩展特性提供一种将 anAsyncRead转换为Stream. 我错过了什么吗?
我遇到了这个问题中描述的问题:如何在另一个 Tokio 运行时中创建 Tokio 运行时,而不会收到错误“无法从运行时内启动运行时”?。
一些好的 Rust crate 没有异步执行器。我决定将所有此类库调用放在一个能够容忍此类操作的线程中。另一个线程应该能够使用 发送非闪烁消息tokio::channel。
我编写了一个演示站来测试实施选项。在每个运行时内部进行调用tokio::spawn是为了了解 tokio 运行时和处理程序中的更多细节 - 这是问题的一部分。
问题。
如果我还有什么误解,请纠正我。
有两个 tokio 运行时。每个都在自己的线程中启动。tokio::spawn在第一个运行时内部调用会first_runtime()生成任务。tokio::spawn内部调用会second_runtime()在第二个运行时生成任务。tokio::channel这两个任务之间有一个。如果通道tx.send(...).await缓冲区未满,即使接收线程被调用阻塞,调用也不会阻塞发送线程thread::sleep()。
我一切都做对了吗?这段代码的输出告诉我我是对的,但我需要确认我的推理。
use std::thread;
use std::time::Duration;
use tokio::sync::mpsc::{Sender, Receiver, channel}; // 1.12.0
#[tokio::main(worker_threads = 1)]
#[allow(unused_must_use)]
async fn first_runtime(tx: Sender<String>) {
thread::sleep(Duration::from_secs(1));
println!("first thread woke up");
tokio::spawn(async move {
for msg_id in 0..10 {
if let Err(e) = tx.send(format!("message {}", msg_id)).await …Run Code Online (Sandbox Code Playgroud) 我正在尝试将使用 reqwest 下载的文件复制到 tokio 文件中。该文件太大,无法存储在内存中,因此需要通过bytes_stream()而不是bytes()
我尝试过以下操作
let mut tmp_file = tokio::fs::File::from(tempfile::tempfile()?);
let byte_stream = reqwest::get(&link).await?.bytes_stream();
tokio::io::copy(&mut byte_stream, &mut tmp_file).await?;
Run Code Online (Sandbox Code Playgroud)
这失败了,因为
|
153 | tokio::io::copy(&mut byte_stream, &mut tmp_file).await?;
| --------------- ^^^^^^^^^^^^^^^^ the trait `tokio::io::AsyncRead` is not implemented for `impl Stream<Item = Result<bytes::bytes::Bytes, reqwest::Error>>`
| |
| required by a bound introduced by this call
Run Code Online (Sandbox Code Playgroud)
有什么方法可以在流上获取特征 AsyncRead 或以其他方式将此数据复制到文件中?我使用 tokio 文件的原因是我稍后需要从中进行 AsyncRead 。也许复制到常规 std::File 然后将其转换为 tokio::fs::File 是有意义的?
在task文档中,有一节讨论了在异步中调用阻塞代码,以及如何避免这种情况,以免过多地阻塞异步线程(https://docs.rs/tokio/1.21.2/tokio/task/ index.html#blocking-and-yielding)。
它还讨论了用于tokio::task::spawn_blocking这些任务的方法,但我想知道在什么时候建议将工作发送到不同的线程?我目前正在编写一个程序,可以恢复大量的 ECDSA 签名,每条消息大约需要 100 微秒,同时进行大量的网络 IO。作为一个具体的例子,这足以使用类似的东西吗spawn_blocking?
我正在使用Tokio框架在Rust中创建重复任务。下面的代码基于完成的更改请求,以将该功能添加到Tokio-Timer板条箱中。
尝试编译时,出现错误消息:
error[E0281]: type mismatch: the type `fn() {my_cron_func}` implements the trait `std::ops::FnMut<()>`, but the trait `std::ops::FnMut<((),)>` is required (expected tuple, found ())
--> src/main.rs:19:36
|
19 | let background_tasks = wakeups.for_each(my_cron_func);
| ^^^^^^^^
error[E0281]: type mismatch: the type `fn() {my_cron_func}` implements the trait `std::ops::FnOnce<()>`, but the trait `std::ops::FnOnce<((),)>` is required (expected tuple, found ())
--> src/main.rs:19:36
|
19 | let background_tasks = wakeups.for_each(my_cron_func);
| ^^^^^^^^
error[E0281]: type mismatch: the type `fn() {my_cron_func}` implements the trait `std::ops::FnMut<()>`, but …Run Code Online (Sandbox Code Playgroud) 我正在尝试使用 MPSC 构建多线程应用程序,但遇到了标题中的错误。我不确定这个用例的正确模式是什么 - 我正在寻找一种模式,它允许我克隆生产者通道并将其移动到要使用的新线程中。
这个新线程将保持一个打开的 websocket,并在收到 websocket 消息时通过生产者发送 websocket 消息数据的子集。消费者线程中将需要来自其他线程的数据,这就是为什么我认为 MPSC 模式是一个合适的选择。
除了标题中的消息之外,它还显示以下内容:
`std::sync::mpsc::Sender<i32>` cannot be shared between threads safely
help: the trait `std::marker::Sync` is not implemented for `std::sync::mpsc::Sender`
Run Code Online (Sandbox Code Playgroud)
我可以/应该Send为此实施吗?Rc这是使用or的合适时机吗Pin?我相信这种情况正在发生,因为我试图发送一个未在闭包Send中实现的类型,但我不知道如何利用它或在这种情况下要达到什么目的。.awaitasync
我已经能够将我的问题简化为:
use futures::stream::{self, StreamExt};
use std::sync::mpsc::{channel, Receiver, Sender};
#[tokio::main]
async fn main() {
let (tx, rx): (Sender<i32>, Receiver<i32>) = channel();
tokio::spawn(async move {
let a = [1, 2, 3];
let mut s = stream::iter(a.iter())
.cycle()
.for_each(move |int| async { …Run Code Online (Sandbox Code Playgroud) rust ×10
rust-tokio ×10
async-await ×4
concurrency ×2
asynchronous ×1
reqwest ×1
rust-cargo ×1
tcp ×1
threadpool ×1