如果计时器未取消,我试图在延迟结束时运行一个函数。用例是按住/双击以进行用户输入。
我遇到的主要问题是停止tokio::run(task);主循环执行,从而阻止我评估用户控件的状态。
fn start_timer(&self) {
let function_name = self.combo.function_name.clone();
let when = Instant::now() + Duration::from_millis(self.combo.timer_value as u64);
let task = Delay::new(when)
.and_then(move |_| {
call_function(&function_name, InteropParams::Button);
Ok(())
})
.map_err(|e| panic!("delay errored; err={:?}", e));
tokio::run(task);
}
Run Code Online (Sandbox Code Playgroud) 我正在尝试使用 Tokio 包装同步 MQTT 客户端库。该代码需要通过 std::sync::mpsc通道不断接收消息并将其发送到异步代码中。我了解如何用于spawn_blocking包装返回单个值的代码。但是如何将其应用于包装从std::sync::mpsc通道连续接收消息的循环呢?
这是我用来向频道发送消息的代码。
let (mut tx, mut rx) = std::sync::mpsc::channel();
tokio::spawn(async move {
let mut mqtt_options = MqttOptions::new("bot", settings.mqtt.host, settings.mqtt.port);
let (mut mqtt_client, notifications) = MqttClient::start(mqtt_options).unwrap();
mqtt_client.subscribe(settings.mqtt.topic_name, QoS::AtLeastOnce).unwrap();
tokio::task::spawn_blocking(move || {
println!("Waiting for notifications");
for notification in notifications {
match notification {
rumqtt::Notification::Publish(publish) => {
let payload = Arc::try_unwrap(publish.payload).unwrap();
let text: String = String::from_utf8(payload).expect("Can't decode payload for notification");
println!("Recieved message: {}", text);
let msg: Message = serde_json::from_str(&text).expect("Error while deserializing …Run Code Online (Sandbox Code Playgroud) 我正在制作自己的通道实现,但std::task::Context没有明确唤醒器是如何生成的。
我的假代码:
struct MyAtomicWaker {
lock: SpinLock,
is_waked: AtomicBool,
waker: std::task::Waker,
}
struct WeakAtomicWaker (Weak<MyAtomicWaker>)
impl MyAtomicWaker {
fn is_waked(&self) -> bool {}
fn weak(self: Arc<MyAtomicWaker>) -> WeakAtomicWaker;
fn cancel(&self) {} // nullify WeakAtomicWaker, means the waker is not waked by a future
}
impl WeakAtomicWaker {
fn wake(self) {} // upgrade to arc and can wake only once when waker not cancelled
}
struct ReceiveFuture<T> {
waker: Option<Arc<MyAtomicWaker>>,
}
impl<T> Drop for ReceiveFuture<T> {
fn drop(&mut self) …Run Code Online (Sandbox Code Playgroud) 不知道如何处理这里的借用检查器。
use tokio::sync::oneshot; // 1.0.2
fn main() {
let (sender, receiver) = oneshot::channel::<u8>();
tokio::spawn(async move {
loop {
sender.send(3).unwrap();
}
});
}
Run Code Online (Sandbox Code Playgroud)
创建此错误:
error[E0382]: use of moved value: `sender`
--> src/main.rs:7:13
|
7 | sender.send(3).unwrap();
| ^^^^^^ value moved here, in previous iteration of loop
|
= note: move occurs because `sender` has type `tokio::sync::oneshot::Sender<u8>`, which does not implement the `Copy` trait
Run Code Online (Sandbox Code Playgroud) 我正在开发一个项目,用 Rust 实现分布式键值存储。我使用 Tokio 的异步运行时编写了服务器端代码。我遇到了一个问题,我的异步代码似乎被阻塞,因此当我与服务器有多个连接时,仅处理一个 TcpStream。我对实现代码很陌生async,无论是一般的还是关于 Rust,但我认为如果给定的 tcp 流上没有活动,其他流将被接受和处理。
是我对 async 的理解错误还是我错误地使用了 tokio ?
这是我的切入点:
use std::error::Error;
use std::net::SocketAddr;
use std::path::{Path, PathBuf};
use std::str::FromStr;
use std::sync::{Arc, Mutex};
use env_logger;
use log::{debug, info};
use structopt::StructOpt;
use tokio::net::TcpListener;
extern crate blue;
use blue::ipc::message;
use blue::store::args;
use blue::store::cluster::{Cluster, NodeRole};
use blue::store::deserialize::deserialize_store;
use blue::store::handler::handle_stream;
use blue::store::wal::WriteAheadLog;
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();
let opt = args::Opt::from_args();
let addr = SocketAddr::from_str(format!("{}:{}", opt.host, opt.port).as_str())?;
let role = NodeRole::from_str(opt.role.as_str()).unwrap();
let leader_addr …Run Code Online (Sandbox Code Playgroud) 为什么我的阻塞线程在另一个线程输出一次之前输出两次?我希望能够生成任务以在后台运行(立即),但在主代码继续之前不等待它们完成。有没有一种简单的方法可以做到这一点tokio?
代码:
use std::{thread, time};
use chrono;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("Starting up");
tokio::spawn(blocking_thread()).await.unwrap();
Ok(())
}
async fn blocking_thread() {
for i in 1..5 {
println!("{} Blocking thread {}", chrono::offset::Utc::now(), i);
tokio::spawn(other_thread(i));
thread::sleep(time::Duration::from_millis(100)); //In my real code this is a blocking syscall
}
}
async fn other_thread(i: u8) {
println!("{} Other thread {}", chrono::offset::Utc::now(), i);
}
Run Code Online (Sandbox Code Playgroud)
输出:
Starting up
2022-01-21 09:03:36.662248332 UTC Blocking thread 1
2022-01-21 09:03:36.762375086 UTC Blocking thread 2
2022-01-21 09:03:36.762617994 …Run Code Online (Sandbox Code Playgroud) 我有一个东京TcpStream。我想T通过这个流传递一些类型。这种类型的T实现Serialize和Deserialize. 我怎样才能获得 aSink<T>和 a Stream<T>?
我找到了板条箱tokio_util和tokio_serde,但我不知道如何使用它们来做我想做的事情。
此代码生成一个子进程,逐行使用其 stderr 和 stdout,并适当地记录每个进程。它可以编译并运行。
use std::error::Error;
use std::process::{Stdio};
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::process::{Command, Child};
use tracing::{info, warn};
macro_rules! relay_pipe_lines {
($pipe:expr, $handler:expr) => {
tokio::spawn(async move {
let mut reader = BufReader::new($pipe).lines();
loop {
let line = reader
.next_line()
.await
.unwrap_or_else(|_| Some(String::new()));
match line {
None => break,
Some(line) => $handler(line)
}
}
});
};
}
pub fn start_and_log_command(mut command: Command) -> Result<Child, Box<dyn Error>> {
command.stdout(Stdio::piped()).stderr(Stdio::piped());
let mut child = command.spawn()?;
let child_stdout = child.stdout.take().unwrap(); // remove `take` from …Run Code Online (Sandbox Code Playgroud) 我的 Rust 代码如下所示。
#[tokio::main]
pub async fn main() {
for i in 1..10 {
tokio::spawn(async move {
println!("{}", i);
});
}
}
Run Code Online (Sandbox Code Playgroud)
运行代码时,我希望它以随机顺序打印 1 到 10。但它只是打印一些随机数:
1
3
2
终端将被任务重复使用,按任意键关闭它。
为什么会发生这种情况?
当映射还没有条目时,我尝试使用异步调用从数据库延迟填充 HashMap。
Rust 编译器警告异步闭包不稳定,但我应该尝试async {。
我正在尝试遵循该建议,但我在评论中收到expected a FnOnce<()>错误: closure
use std::collections::HashMap;
use tokio::runtime::Runtime;
async fn get_from_store(key: String) -> String {
// pretend to get this from an async sqlx db call
String::from(format!("value-for-{key}"))
}
async fn do_work() {
let mut map: HashMap<String, String> = HashMap::new();
let key = String::from("key1");
// the compiler advised async closures were unstable and...
// to use an async block, remove the `||`: `async {` (rustc E0658)
map.entry(key.clone())
.or_insert_with(async { get_from_store(key) }.await);
// the …Run Code Online (Sandbox Code Playgroud)