我想在 Rocket 服务器旁边启动 Tokio 事件循环,然后稍后将事件添加到该循环中。我读过有没有办法在新线程上启动 tokio::Delay 以允许主循环继续?,但我仍然不清楚如何实现我的目标。
我正在尝试在不同的线程中使用 tcp 流的读写。这就是我目前所拥有的:
use tokio::prelude::*;
use tokio::net::TcpStream;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut stream = TcpStream::connect("localhost:8080").await?;
let (mut read, mut write) = stream.split();
tokio::spawn(async move {
loop {
let mut buf = [0u8; 32];
read.read(&mut buf).await.unwrap();
println!("{}", std::str::from_utf8(&buf));
}
});
Ok(())
}
Run Code Online (Sandbox Code Playgroud)
我将使用另一个线程进行写入。我的问题是,我收到“流”在借用时被丢弃的错误。
我rust_bert用于总结文本。我需要用 设置一个模型rust_bert::pipelines::summarization::SummarizationModel::new,它从互联网上获取模型。它以异步方式执行此操作,tokio并且(我认为)我遇到的问题是我正在另一个 Tokio 运行时中运行 Tokio 运行时,如错误消息所示:
Downloading https://cdn.huggingface.co/facebook/bart-large-cnn/config.json to "/home/(censored)/.cache/.rustbert/bart-cnn/config.json"
thread 'main' panicked at 'Cannot start a runtime from within a runtime. This happens because a function (like `block_on`) attempted to block the current thread while the thread is being used to drive asynchronous tasks.', /home/(censored)/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.21/src/runtime/enter.rs:38:5
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
Run Code Online (Sandbox Code Playgroud)
我试过与模型同步运行
tokio::task::spawn_blocking
,tokio::task::block_in_place
但它们都不适合我。block_in_place给出了同样的错误,就像是不存在的,并spawn_blocking没有真正似乎是用我的。我也尝试过summarize_text异步,但这并没有多大帮助。Github 问题
tokio-rs/tokio#2194
和 Reddit 发布 …
tokio::spawn此 MWE 显示了in循环的使用for in。注释的代码sleepy_futures.push(sleepy.sleep_n(2));工作正常,但不运行/轮询异步函数。
基本上,我想同时运行一堆异步函数。我很高兴更改实现Sleepy或使用其他库/技术。
pub struct Sleepy;
impl Sleepy {
pub async fn sleep_n(self: &Self, n: u64) -> String {
sleep(Duration::from_secs(n));
"test".to_string()
}
}
#[tokio::main(core_threads = 4)]
async fn main() {
let sleepy = Sleepy{};
let mut sleepy_futures = vec::Vec::new();
for _ in 0..5 {
// sleepy_futures.push(sleepy.sleep_n(2));
sleepy_futures.push(tokio::task::spawn(sleepy.sleep_n(2)));
}
let results = futures::future::join_all(sleepy_futures).await;
for result in results {
println!("{}", result.unwrap())
}
}
Run Code Online (Sandbox Code Playgroud) tungstenite 我正在使用此示例连接到 websocket 服务器并从那里使用write.send
let connect_addr = env::args()
.nth(1)
.unwrap_or_else(|| panic!("this program requires at least one argument"));
let url = url::Url::parse(&connect_addr).unwrap();
let (stdin_tx, stdin_rx) = futures_channel::mpsc::unbounded();
tokio::spawn(read_stdin(stdin_tx));
let (ws_stream, _) = connect_async(url).await.expect("Failed to connect");
println!("WebSocket handshake has been successfully completed");
let (write, read) = ws_stream.split();
## THIS DOESNT WORK ###
write.send(Message::Text(format!("{}", "HELLO!")))
.await
.expect("Failed to send message");
Run Code Online (Sandbox Code Playgroud)
我无法让写入工作。我得到:
error[E0599]: no method named `send` found for struct `futures_util::stream::stream::split::SplitSink<tokio_tungstenite::WebSocketStream<tokio_tungstenite::stream::Stream<tokio::net::tcp::stream::TcpStream, tokio_native_tls::TlsStream<tokio::net::tcp::stream::TcpStream>>>, tungstenite::protocol::message::Message>` in the current scope
--> src/main.rs:28:15
| …Run Code Online (Sandbox Code Playgroud) 我在使用 Tokio 时偶然发现了一个死锁情况:
use tokio::time::{delay_for, Duration};
use std::sync::Mutex;
#[tokio::main]
async fn main() {
let mtx = Mutex::new(0);
tokio::join!(work(&mtx), work(&mtx));
println!("{}", *mtx.lock().unwrap());
}
async fn work(mtx: &Mutex<i32>) {
println!("lock");
{
let mut v = mtx.lock().unwrap();
println!("locked");
// slow redis network request
delay_for(Duration::from_millis(100)).await;
*v += 1;
}
println!("unlock")
}
Run Code Online (Sandbox Code Playgroud)
产生以下输出,然后永远挂起。
lock
locked
lock
Run Code Online (Sandbox Code Playgroud)
根据Tokio docs,使用std::sync::Mutex是可以的:
与普遍的看法相反,在异步代码中使用标准库中的普通互斥体是可以的,而且通常是首选。
但是,用Mutexa替换tokio::sync::Mutex不会触发死锁,并且一切都“按预期”工作,但仅限于上面列出的示例情况。在现实场景中,如果延迟是由某些 Redis 请求引起的,它仍然会失败。
我认为这可能是因为我实际上根本没有生成线程,因此,即使“并行”执行,我也会锁定同一个线程,因为等待只是产生执行。
在不产生单独线程的情况下实现我想要的目标的 Rustacean 方法是什么?
是否有可能,如果一个任务发送到a另一个(同时)发送到b,通过取消剩余的未来来tokio::select!打开a并b删除一个值?还是保证在下一次循环迭代时收到?
use tokio::sync::mpsc::Receiver;
async fn foo(mut a: Receiver<()>, mut b: Receiver<()>) {
loop {
tokio::select!{
_ = a.recv() => {
println!("A!");
}
_ = b.recv() => {
println!("B!");
}
}
}
}
Run Code Online (Sandbox Code Playgroud)
async在那种情况下,我的思绪无法绕过魔法背后真正发生的事情。
我有一个与远程服务器通信的 Tokio 客户端,并且应该保持连接永久有效。我已经实现了初始身份验证握手,发现当我的测试终止时,我得到了一个奇怪的恐慌:
---- test_connect_without_database stdout ----
thread 'test_connect_without_database' panicked at 'Cannot drop a runtime in a context where blocking is not allowed. This happens when a runtime is dropped from within an asynchronous context.', /playground/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.3.5/src/runtime/blocking/shutdown.rs:51:21
Run Code Online (Sandbox Code Playgroud)
对于可能导致这种情况的原因,我完全不知所措,所以我不知道要为上下文添加哪些其他代码位。
这是我最小的可重现示例(playground):
use std::cell::RefCell;
use std::net::{IpAddr, SocketAddr};
use tokio::net::TcpStream;
use tokio::prelude::*;
use tokio::runtime;
#[derive(PartialEq, Debug)]
pub struct Configuration {
/// Database username.
username: String,
/// Database password.
password: String,
/// Database name.
db_name: String,
/// IP address for the remove server.
address: …Run Code Online (Sandbox Code Playgroud) 我有一个特征,我用它来抽象tokio::net::TcpStream和tokio::net::UnixStream:
/// Interface for TcpStream and UnixStream.
trait TryRead {
// overlapping the name makes it hard to work with
fn do_try_read(&self, buf: &mut [u8]) -> Result<usize, std::io::Error>;
}
impl TryRead for TcpStream {
fn do_try_read(&self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
self.try_read(buf)
}
}
Run Code Online (Sandbox Code Playgroud)
问题是我想pub async fn readable(&self) -> io::Result<()>在这两种方法中都抽象出来,但是无法在特征中实现异步方法。我该如何处理?
我有一个基于tokio的单线程异步应用程序,其中使用Arcs 或其他Sync类型似乎是一种开销。因为线程之间不需要同步,所以我正在寻找类似tokio::sync::oneshot::channel 的东西,Sender并且Receiver它应该!Sync并且可以被包装到Rc而不是Arc.
Rust 中是否有专门设计的同步原语可用于单线程异步应用程序?