标签: rust-tokio

有没有办法在新线程上启动 tokio::Delay 以允许主循环继续?

如果计时器未取消,我试图在延迟结束时运行一个函数。用例是按住/双击以进行用户输入。

我遇到的主要问题是停止tokio::run(task);主循环执行,从而阻止我评估用户控件的状态。

fn start_timer(&self) {
    let function_name = self.combo.function_name.clone();
    let when = Instant::now() + Duration::from_millis(self.combo.timer_value as u64);
    let task = Delay::new(when)
        .and_then(move |_| {
            call_function(&function_name, InteropParams::Button);
            Ok(())
        })
        .map_err(|e| panic!("delay errored; err={:?}", e));

    tokio::run(task);
}
Run Code Online (Sandbox Code Playgroud)

future rust rust-tokio

1
推荐指数
1
解决办法
2226
查看次数

在异步 Rust 中包装阻塞 mpsc (Tokio)

我正在尝试使用 Tokio 包装同步 MQTT 客户端。该代码需要通过 std::sync::mpsc通道不断接收消息并将其发送到异步代码中。我了解如何用于spawn_blocking包装返回单个值的代码。但是如何将其应用于包装从std::sync::mpsc通道连续接收消息的循环呢?

这是我用来向频道发送消息的代码。

let (mut tx, mut rx) = std::sync::mpsc::channel();

tokio::spawn(async move {
            let mut mqtt_options = MqttOptions::new("bot", settings.mqtt.host, settings.mqtt.port);
            let (mut mqtt_client, notifications) = MqttClient::start(mqtt_options).unwrap();

            mqtt_client.subscribe(settings.mqtt.topic_name, QoS::AtLeastOnce).unwrap();

            tokio::task::spawn_blocking(move || {
                println!("Waiting for notifications");
                for notification in notifications {
                    match notification {
                        rumqtt::Notification::Publish(publish) => {
                            let payload = Arc::try_unwrap(publish.payload).unwrap();
                            let text: String = String::from_utf8(payload).expect("Can't decode payload for notification");
                            println!("Recieved message: {}", text);
                            let msg: Message = serde_json::from_str(&text).expect("Error while deserializing …
Run Code Online (Sandbox Code Playgroud)

rust rust-tokio

1
推荐指数
1
解决办法
2212
查看次数

每次调用 Future::poll 时都需要注册一个新的唤醒器吗?

我正在制作自己的通道实现,但std::task::Context没有明确唤醒器是如何生成的。

我的假代码:

struct MyAtomicWaker {
    lock: SpinLock,
    is_waked: AtomicBool,
    waker: std::task::Waker, 
}

struct WeakAtomicWaker (Weak<MyAtomicWaker>)

impl MyAtomicWaker {
    fn is_waked(&self) -> bool {}
    fn weak(self: Arc<MyAtomicWaker>) -> WeakAtomicWaker;
    fn cancel(&self) {}  // nullify WeakAtomicWaker, means the waker is not waked by a future
}

impl WeakAtomicWaker {
    fn wake(self) {}  // upgrade to arc and can wake only once when waker not cancelled
}

struct ReceiveFuture<T> {
    waker: Option<Arc<MyAtomicWaker>>,
}

impl<T> Drop for ReceiveFuture<T> {
    fn drop(&mut self) …
Run Code Online (Sandbox Code Playgroud)

future rust async-await rust-tokio

1
推荐指数
1
解决办法
2220
查看次数

如何使用 Tokio oneshot 发送器和接收器执行具有内循环的不同任务?

不知道如何处理这里的借用检查器。

use tokio::sync::oneshot; // 1.0.2

fn main() {
    let (sender, receiver) = oneshot::channel::<u8>();
    tokio::spawn(async move {
        loop {
            sender.send(3).unwrap();
        }
    });
}
Run Code Online (Sandbox Code Playgroud)

创建此错误:

error[E0382]: use of moved value: `sender`
 --> src/main.rs:7:13
  |
7 |             sender.send(3).unwrap();
  |             ^^^^^^ value moved here, in previous iteration of loop
  |
  = note: move occurs because `sender` has type `tokio::sync::oneshot::Sender<u8>`, which does not implement the `Copy` trait
Run Code Online (Sandbox Code Playgroud)

future rust rust-tokio

1
推荐指数
1
解决办法
5726
查看次数

为什么异步 TcpStream 会阻塞?

我正在开发一个项目,用 Rust 实现分布式键值存储。我使用 Tokio 的异步运行时编写了服务器端代码。我遇到了一个问题,我的异步代码似乎被阻塞,因此当我与服务器有多个连接时,仅处理一个 TcpStream。我对实现代码很陌生async,无论是一般的还是关于 Rust,但我认为如果给定的 tcp 流上没有活动,其他流将被接受和处理。

是我对 async 的理解错误还是我错误地使用了 tokio ?

这是我的切入点:

use std::error::Error;
use std::net::SocketAddr;
use std::path::{Path, PathBuf};
use std::str::FromStr;
use std::sync::{Arc, Mutex};

use env_logger;
use log::{debug, info};
use structopt::StructOpt;
use tokio::net::TcpListener;

extern crate blue;

use blue::ipc::message;
use blue::store::args;
use blue::store::cluster::{Cluster, NodeRole};
use blue::store::deserialize::deserialize_store;
use blue::store::handler::handle_stream;
use blue::store::wal::WriteAheadLog;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();

    let opt = args::Opt::from_args();
    let addr = SocketAddr::from_str(format!("{}:{}", opt.host, opt.port).as_str())?;
    let role = NodeRole::from_str(opt.role.as_str()).unwrap();
    let leader_addr …
Run Code Online (Sandbox Code Playgroud)

asynchronous rust rust-tokio

1
推荐指数
1
解决办法
770
查看次数

为什么 tokio 线程在继续之前要等待阻塞线程?

为什么我的阻塞线程在另一个线程输出一次之前输出两次?我希望能够生成任务以在后台运行(立即),但在主代码继续之前不等待它们完成。有没有一种简单的方法可以做到这一点tokio

代码:

use std::{thread, time};
use chrono;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    println!("Starting up");
    tokio::spawn(blocking_thread()).await.unwrap();
    Ok(())
}

async fn blocking_thread() {
    for i in 1..5 {
        println!("{} Blocking thread {}", chrono::offset::Utc::now(), i);
        tokio::spawn(other_thread(i));
        thread::sleep(time::Duration::from_millis(100)); //In my real code this is a blocking syscall
    }
}

async fn other_thread(i: u8) {
    println!("{} Other thread {}", chrono::offset::Utc::now(), i);
}
Run Code Online (Sandbox Code Playgroud)

输出:

Starting up
2022-01-21 09:03:36.662248332 UTC Blocking thread 1
2022-01-21 09:03:36.762375086 UTC Blocking thread 2
2022-01-21 09:03:36.762617994 …
Run Code Online (Sandbox Code Playgroud)

rust rust-tokio

1
推荐指数
1
解决办法
2155
查看次数

如何将 tokio TcpStream 转换为可序列化/反序列化值的接收器/流?

我有一个东京TcpStream。我想T通过这个流传递一些类型。这种类型的T实现SerializeDeserialize. 我怎样才能获得 aSink<T>和 a Stream<T>

我找到了板条箱tokio_utiltokio_serde,但我不知道如何使用它们来做我想做的事情。

rust serde rust-tokio

1
推荐指数
1
解决办法
2046
查看次数

`take()` 在这里做什么,为什么我需要它?

此代码生成一个子进程,逐行使用其 stderr 和 stdout,并适当地记录每个进程。它可以编译并运行。

use std::error::Error;
use std::process::{Stdio};
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::process::{Command, Child};
use tracing::{info, warn};

macro_rules! relay_pipe_lines {
    ($pipe:expr, $handler:expr) => {
        tokio::spawn(async move {
            let mut reader = BufReader::new($pipe).lines();

            loop {
                let line = reader
                    .next_line()
                    .await
                    .unwrap_or_else(|_| Some(String::new()));

                match line {
                    None => break,
                    Some(line) => $handler(line)
                }

            }
        });
    };
}

pub fn start_and_log_command(mut command: Command) -> Result<Child, Box<dyn Error>> {
    command.stdout(Stdio::piped()).stderr(Stdio::piped());

    let mut child = command.spawn()?;

    let child_stdout = child.stdout.take().unwrap(); // remove `take` from …
Run Code Online (Sandbox Code Playgroud)

rust rust-tokio

1
推荐指数
1
解决办法
331
查看次数

为什么 tokio::spawn 不执行我的代码?

我的 Rust 代码如下所示。

#[tokio::main]
pub async fn main() {
    for i in 1..10 {
        tokio::spawn(async move {
            println!("{}", i);
        });
    }
}
Run Code Online (Sandbox Code Playgroud)

运行代码时,我希望它以随机顺序打印 1 到 10。但它只是打印一些随机数:

1
3
2
终端将被任务重复使用,按任意键关闭它。

为什么会发生这种情况?

asynchronous spawn rust async-await rust-tokio

1
推荐指数
1
解决办法
641
查看次数

使用“or_insert_with”对 HashMap 进行 rust 异步更新

当映射还没有条目时,我尝试使用异步调用从数据库延迟填充 HashMap。

Rust 编译器警告异步闭包不稳定,但我应该尝试async {

我正在尝试遵循该建议,但我在评论中收到expected a FnOnce<()>错误: closure

use std::collections::HashMap;
use tokio::runtime::Runtime;

async fn get_from_store(key: String) -> String {
    // pretend to get this from an async sqlx db call
    String::from(format!("value-for-{key}"))
}

async fn do_work() {
    let mut map: HashMap<String, String> = HashMap::new();

    let key = String::from("key1");

    // the compiler advised async closures were unstable and...
    // to use an async block, remove the `||`: `async {` (rustc E0658)
    map.entry(key.clone())
        .or_insert_with(async { get_from_store(key) }.await);

    // the …
Run Code Online (Sandbox Code Playgroud)

rust rust-tokio

1
推荐指数
1
解决办法
545
查看次数

标签 统计

rust ×10

rust-tokio ×10

future ×3

async-await ×2

asynchronous ×2

serde ×1

spawn ×1