当我在 tokio 多线程运行时生成一个任务时,是否保证它会选择一个运行时操作系统线程并仅在其上运行?或者在轮询 future 时它可能会在不同的操作系统线程之间跳转?
更新:如果一项任务可以在操作系统线程之间切换,是否可以将其配置为仅使用一个线程?
use std::sync::Arc;
use tokio::sync::mpsc;
async fn student(id : i32,tx : Arc<mpsc::Sender<String>>) {
println!("student {} is getting their hw.",id);
tx.send(format!("student {}'s hw !",id)).await.unwrap();
}
async fn teacher(mut rc : mpsc::Receiver<String>) -> Vec<String> {
let mut homeworks = Vec::new();
while let Some(hw) = rc.recv().await {
println!("{hw}");
homeworks.push(hw);
}
homeworks
}
#[tokio::main]
async fn main() {
let (tx,rc): (mpsc::Sender<String>, mpsc::Receiver<String>) = mpsc::channel(100);
let ch_arc: Arc<mpsc::Sender<String>> = Arc::new(tx);
for i in 0..10 {
tokio::task::spawn(student(i,ch_arc.clone()));
}
let hws = teacher(rc).await;
println!("{:?}",hws);
}
Run Code Online (Sandbox Code Playgroud)
I am currently …
我试图总结我的周围锈期货的头,但我这个代码是应该发送到达的消息弄得rx到sink:
extern crate futures;
extern crate tokio_core;
extern crate websocket;
use websocket::message::OwnedMessage;
use websocket::server::InvalidConnection;
use websocket::async::Server;
use tokio_core::reactor::Core;
use futures::{Future, Sink, Stream};
use futures::sync::mpsc;
use std::{thread, time};
use futures::sync::mpsc::Receiver;
fn main() {
let mut core = Core::new().unwrap();
let (mut tx, rx) = mpsc::channel(5);
thread::spawn(|| worker(rx));
let mut i = 0;
loop {
let res = tx.clone().send(OwnedMessage::Text(format!("Test {}", i)));
core.run(res);
i += 1;
let period = time::Duration::from_millis(200);
thread::sleep(period);
}
}
fn worker(rx: Receiver<OwnedMessage>) {
let mut core …Run Code Online (Sandbox Code Playgroud) 例如:
use futures::future::Future;
fn main() {
let (stop_tokio, time_to_stop) = tokio::sync::oneshot::channel::<()>();
let handler = std::thread::spawn(|| {
tokio::run(
time_to_stop, // .map_err(|_| ())
);
});
handler.join().expect("join failed");
}
Run Code Online (Sandbox Code Playgroud)
编译器显示错误:
use futures::future::Future;
fn main() {
let (stop_tokio, time_to_stop) = tokio::sync::oneshot::channel::<()>();
let handler = std::thread::spawn(|| {
tokio::run(
time_to_stop, // .map_err(|_| ())
);
});
handler.join().expect("join failed");
}
Run Code Online (Sandbox Code Playgroud)
该代码要求使用(),RecvError但改为,但是编译器打印相反的代码。
这是编译器中的错误,还是我错过了什么?
在 Rust 中,我有一堆想要并行执行的异步函数。处理这些函数结果的顺序很重要。我还想在这些函数可用时检索它们的结果。
\n让我不好解释一下。
\n以下是 的描述FuturesOrdered:
\n\n这个“组合器”类似于 FuturesUnordered,但它在 future 集合之上强加了一个顺序。虽然集合中的 future 将并行完成\n,但结果将仅按其原始 future 添加到队列的顺序返回。
\n
到目前为止,一切都很好。现在看这个例子:
\nlet mut ft = FuturesOrdered::new();\nft.push(wait_n(1)); // wait_n sleeps\nft.push(wait_n(2)); // for the given\nft.push(wait_n(4)); // number of secs\nft.push(wait_n(3));\nft.push(wait_n(5));\nlet r = ft.collect::<Vec<u64>>().await;\nRun Code Online (Sandbox Code Playgroud)\n由于FuturesOrdered\xc2\xa0await 直到所有future 都完成;这就是我得到的:
|--| ++\n|----| ++\n|--------| ++\n|------| ++\n|----------|++\n ++-> all results available here \nRun Code Online (Sandbox Code Playgroud)\n这就是我要的:
\n|--|++\n|----|++\n|--------|++\n|------| ++\n|----------| ++\n \nRun Code Online (Sandbox Code Playgroud)\n换句话说; 我要等待下一个未来;随着剩余的期货不断地完成。另请注意,即使任务#4 在任务#3 之前完成;由于最初的订单,它是在#3 之后处理的。
\n我怎样才能获得像这样同时执行的期货流?我希望有这样的事情: …
error: failed to select a version for `syn`. \
... required by package `serde_derive v1.0.125`\
... which satisfies dependency `serde_derive = "=1.0.125"` of package `serde v1.0.125`
... which satisfies dependency `serde = "^1.0.125"` of package `mongodb v2.1.0`\
... which satisfies dependency `mongodb = "^2.1"` of package `wagmeet v0.1.0
\(/mnt/e/College/Eighth Semester/Crypto_Capable/wagmeet_app)`\
versions that meet the requirements `^1.0.60` are: 1.0.86, 1.0.85, 1.0.84, 1.0.83, 1.0.82, 1.0.81, 1.0.80, 1.0.79, 1.0.78, 1.0.77, 1.0.76, 1.0.75, 1.0.74, 1.0.73, 1.0.72, 1.0.71, 1.0.70, 1.0.69, 1.0.68, 1.0.67, 1.0.66, 1.0.65, 1.0.64, …Run Code Online (Sandbox Code Playgroud) 在我的理解中,异步只能处理I/O密集型任务,例如读写套接字或文件,而无法处理CPU密集型任务,例如加密和压缩。
所以在 Rust Tokio Runtime 中,我认为只需要用来spawn_blocking处理 CPU 密集型任务。但我看过这个回购协议,例子是
#[tokio_02::main]
async fn main() -> Result<()> {
let data = b"example";
let compressed_data = compress(data).await?;
let de_compressed_data = decompress(&compressed_data).await?;
assert_eq!(de_compressed_data, data);
println!("{:?}", String::from_utf8(de_compressed_data).unwrap());
Ok(())
}
Run Code Online (Sandbox Code Playgroud)
该库在压缩和异步 I/O 类型之间创建适配器。
我有 3 个问题:
等待压缩/解压缩的目的是什么?
这些适配器是必要的还是我对异步的理解错误?
我可以直接在 Tokio 多线程运行时进行压缩操作吗?像这样
async fn foo() {
let mut buf = ...;
compress_sync(&mut buf);
async_tcp_stream.write(buf).await;
}
Run Code Online (Sandbox Code Playgroud) 目前,当我的流收到新值时,我尝试打印计数器值:
fn add_counter_to_stream<T: std::marker::Send + 'static>(
stream: PinnedStream<T>,
) -> PinnedStream<T> {
let counter = Arc::new(Mutex::new(0)); // Shared counter between threads
futures::StreamExt::boxed(stream.map({
let counter = Arc::clone(&counter);
move |value| {
let counter = Arc::clone(&counter);
async move {
// Print counter value and increment
let mut num = counter.lock().await;
println!("Counter: {}", *num);
*num += 1;
};
value
}
}))
}
Run Code Online (Sandbox Code Playgroud)
Arc::clone(&counter)然而,如上所示,除非我创建两个嵌套闭包并使用两次,否则代码不会编译。有没有一种方法可以重构它,以便在一个克隆操作中完成,为什么在这个示例中我需要两个闭包?
我想运行一系列流,其中多个流进程同时运行。我尝试的测试代码如下:
use tokio;
use tokio_stream::{self as stream, StreamExt};
use std::time::Duration;
#[tokio::main]
async fn main() {
let stream = stream::iter(0..10)
.then(|i| async move {
tokio::time::sleep(Duration::from_secs(1)).await;
println!("i={:?}", i);
})
.chunks_timeout(3, Duration::from_secs(20));
let _result : Vec<_> = stream
.collect().await;
}
Run Code Online (Sandbox Code Playgroud)
此代码运行,但它会一一打印 10 个值,并有 1 秒的延迟。这与并发相反。我期望的是等待 3 秒,打印 3 个数字,然后等待 1 秒,依此类推。我认为这tokio::time::sleep很好,因为join_all我让代码同时工作。
那么,如何解释缺乏并发性以及如何解决呢?
rust ×9
rust-tokio ×9
asynchronous ×2
future ×2
async-await ×1
channel ×1
compression ×1
encryption ×1
mpsc ×1
nearprotocol ×1
rust-cargo ×1
rust-diesel ×1
stream ×1