标签: rust-tokio

如何将任务添加到在另一个线程上运行的 Tokio 事件循环?

我想在 Rocket 服务器旁边启动 Tokio 事件循环,然后稍后将事件添加到该循环中。我读过有没有办法在新线程上启动 tokio::Delay 以允许主循环继续?,但我仍然不清楚如何实现我的目标。

future rust rust-tokio

6
推荐指数
1
解决办法
4177
查看次数

如何使用 TcpStream 跨 2 个线程进行异步拆分?

我正在尝试在不同的线程中使用 tcp 流的读写。这就是我目前所拥有的:

use tokio::prelude::*;
use tokio::net::TcpStream;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let mut stream = TcpStream::connect("localhost:8080").await?;
    let (mut read, mut write) = stream.split();

    tokio::spawn(async move {
        loop {
            let mut buf = [0u8; 32];
            read.read(&mut buf).await.unwrap();
            println!("{}", std::str::from_utf8(&buf));
        }
    });

    Ok(())
}
Run Code Online (Sandbox Code Playgroud)

我将使用另一个线程进行写入。我的问题是,我收到“流”在借用时被丢弃的错误。

rust rust-tokio

6
推荐指数
1
解决办法
4719
查看次数

如何在另一个 Tokio 运行时内创建 Tokio 运行时而不会出现“无法从运行时内启动运行时”的错误?

rust_bert用于总结文本。我需要用 设置一个模型rust_bert::pipelines::summarization::SummarizationModel::new,它从互联网上获取模型。它以异步方式执行此操作,tokio并且(我认为)我遇到的问题是我正在另一个 Tokio 运行时中运行 Tokio 运行时,如错误消息所示:

Downloading https://cdn.huggingface.co/facebook/bart-large-cnn/config.json to "/home/(censored)/.cache/.rustbert/bart-cnn/config.json"
thread 'main' panicked at 'Cannot start a runtime from within a runtime. This happens because a function (like `block_on`) attempted to block the current thread while the thread is being used to drive asynchronous tasks.', /home/(censored)/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.21/src/runtime/enter.rs:38:5
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
Run Code Online (Sandbox Code Playgroud)

我试过与模型同步运行 tokio::task::spawn_blockingtokio::task::block_in_place 但它们都不适合我。block_in_place给出了同样的错误,就像是不存在的,并spawn_blocking没有真正似乎是用我的。我也尝试过summarize_text异步,但这并没有多大帮助。Github 问题 tokio-rs/tokio#2194 和 Reddit 发布 …

rust rust-tokio

6
推荐指数
2
解决办法
3583
查看次数

(tokio::spawn) 借用的值存在的时间不够长——参数要求“sleepy”是为“static”借用的

tokio::spawn此 MWE 显示了in循环的使用for in。注释的代码sleepy_futures.push(sleepy.sleep_n(2));工作正常,但不运行/轮询异步函数。

基本上,我想同时运行一堆异步函数。我很高兴更改实现Sleepy或使用其他库/技术。

pub struct Sleepy;
impl Sleepy {
    pub async fn sleep_n(self: &Self, n: u64) -> String {
        sleep(Duration::from_secs(n));
        "test".to_string()
    }
}

#[tokio::main(core_threads = 4)]
async fn main() {
    let sleepy = Sleepy{};

    let mut sleepy_futures = vec::Vec::new();
    for _ in 0..5 {
        // sleepy_futures.push(sleepy.sleep_n(2));
        sleepy_futures.push(tokio::task::spawn(sleepy.sleep_n(2)));
    }

    let results = futures::future::join_all(sleepy_futures).await;
    for result in results {
        println!("{}", result.unwrap())
    }
}
Run Code Online (Sandbox Code Playgroud)

rust rust-tokio

6
推荐指数
1
解决办法
5561
查看次数

写入 websocket 流

tungstenite 我正在使用此示例连接到 websocket 服务器并从那里使用write.send

let connect_addr = env::args()
    .nth(1)
    .unwrap_or_else(|| panic!("this program requires at least one argument"));

let url = url::Url::parse(&connect_addr).unwrap();

let (stdin_tx, stdin_rx) = futures_channel::mpsc::unbounded();
tokio::spawn(read_stdin(stdin_tx));

let (ws_stream, _) = connect_async(url).await.expect("Failed to connect");
println!("WebSocket handshake has been successfully completed");

let (write, read) = ws_stream.split();

## THIS DOESNT WORK ###
write.send(Message::Text(format!("{}", "HELLO!")))
            .await
            .expect("Failed to send message");
Run Code Online (Sandbox Code Playgroud)

我无法让写入工作。我得到:

error[E0599]: no method named `send` found for struct `futures_util::stream::stream::split::SplitSink<tokio_tungstenite::WebSocketStream<tokio_tungstenite::stream::Stream<tokio::net::tcp::stream::TcpStream, tokio_native_tls::TlsStream<tokio::net::tcp::stream::TcpStream>>>, tungstenite::protocol::message::Message>` in the current scope
   --> src/main.rs:28:15
    | …
Run Code Online (Sandbox Code Playgroud)

rust rust-tokio

6
推荐指数
0
解决办法
2351
查看次数

为什么在将 Tokio 与 std::sync::Mutex 一起使用时会出现死锁?

我在使用 Tokio 时偶然发现了一个死锁情况:

use tokio::time::{delay_for, Duration};
use std::sync::Mutex;

#[tokio::main]
async fn main() {
    let mtx = Mutex::new(0);

    tokio::join!(work(&mtx), work(&mtx));

    println!("{}", *mtx.lock().unwrap());
}

async fn work(mtx: &Mutex<i32>) {
    println!("lock");
    {
        let mut v = mtx.lock().unwrap();
        println!("locked");
        // slow redis network request
        delay_for(Duration::from_millis(100)).await;
        *v += 1;
    }
    println!("unlock")
}
Run Code Online (Sandbox Code Playgroud)

产生以下输出,然后永远挂起。

lock
locked
lock
Run Code Online (Sandbox Code Playgroud)

根据Tokio docs,使用std::sync::Mutex是可以的:

与普遍的看法相反,在异步代码中使用标准库中的普通互斥体是可以的,而且通常是首选。

但是,用Mutexa替换tokio::sync::Mutex不会触发死锁,并且一切都“按预期”工作,但仅限于上面列出的示例情况。在现实场景中,如果延迟是由某些 Redis 请求引起的,它仍然会失败。

我认为这可能是因为我实际上根本没有生成线程,因此,即使“并行”执行,我也会锁定同一个线程,因为等待只是产生执行。

在不产生单独线程的情况下实现我想要的目标的 Rustacean 方法是什么?

mutex asynchronous rust rust-tokio

6
推荐指数
1
解决办法
3160
查看次数

我可以通过在两个异步接收器上调用 select 来错过一个值吗?

是否有可能,如果一个任务发送到a另一个(同时)发送到b,通过取消剩余的未来来tokio::select!打开ab删除一个值?还是保证在下一次循环迭代时收到?

use tokio::sync::mpsc::Receiver;

async fn foo(mut a: Receiver<()>, mut b: Receiver<()>) {
    loop {
        tokio::select!{
            _ = a.recv() => {
                println!("A!");
            }
            _ = b.recv() => {
                println!("B!");
            }
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

async在那种情况下,我的思绪无法绕过魔法背后真正发生的事情。

rust async-await rust-tokio

6
推荐指数
1
解决办法
277
查看次数

为什么 Tokio 返回错误“不能在不允许阻塞的上下文中删除运行时”?

我有一个与远程服务器通信的 Tokio 客户端,并且应该保持连接永久有效。我已经实现了初始身份验证握手,发现当我的测试终止时,我得到了一个奇怪的恐慌:

---- test_connect_without_database stdout ----
thread 'test_connect_without_database' panicked at 'Cannot drop a runtime in a context where blocking is not allowed. This happens when a runtime is dropped from within an asynchronous context.', /playground/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.3.5/src/runtime/blocking/shutdown.rs:51:21
Run Code Online (Sandbox Code Playgroud)

对于可能导致这种情况的原因,我完全不知所措,所以我不知道要为上下文添加哪些其他代码位。

这是我最小的可重现示例(playground):

use std::cell::RefCell;
use std::net::{IpAddr, SocketAddr};
use tokio::net::TcpStream;
use tokio::prelude::*;
use tokio::runtime;

#[derive(PartialEq, Debug)]
pub struct Configuration {
    /// Database username.
    username: String,
    /// Database password.
    password: String,
    /// Database name.
    db_name: String,
    /// IP address for the remove server.
    address: …
Run Code Online (Sandbox Code Playgroud)

multithreading rust rust-tokio

6
推荐指数
1
解决办法
1724
查看次数

如何在特征中定义异步方法?

我有一个特征,我用它来抽象tokio::net::TcpStreamtokio::net::UnixStream

/// Interface for TcpStream and UnixStream.
trait TryRead {
  // overlapping the name makes it hard to work with
  fn do_try_read(&self, buf: &mut [u8]) -> Result<usize, std::io::Error>;
}

impl TryRead for TcpStream {
  fn do_try_read(&self, buf: &mut [u8]) -> Result<usize, std::io::Error> {
      self.try_read(buf)
  }
}
Run Code Online (Sandbox Code Playgroud)

问题是我想pub async fn readable(&self) -> io::Result<()>在这两种方法中都抽象出来,但是无法在特征中实现异步方法。我该如何处理?

rust rust-tokio

6
推荐指数
1
解决办法
1582
查看次数

Rust 中单线程异步应用程序的高效同步原语

我有一个基于tokio的单线程异步应用程序,其中使用Arcs 或其他Sync类型似乎是一种开销。因为线程之间不需要同步,所以我正在寻找类似tokio::sync::oneshot::channel 的东西,Sender并且Receiver它应该!Sync并且可以被包装到Rc而不是Arc.

Rust 中是否有专门设计的同步原语可用于单线程异步应用程序?

asynchronous rust rust-tokio

6
推荐指数
1
解决办法
1113
查看次数