1 io concurrency file-access rust
语境:
我正在编写一个网络服务器,我们在其中处理不同的段。我想将这些不同的段缓存在不同的文件中(段的大小最大可达 10MB)。像这样的东西:
pub async fn segment_handler(segment: String) {
if is_cached(&segment) {
return get_from_cache(segment)
}
// Do some computation to get the result.
let result = do_some_large_computation(segment);
// Cache this result to a file.
let file_name = &format!("./cache/{}", &segment);
fs::create(file_name);
fs::write(file_name, result).expect("Unable to write file");
result
}
Run Code Online (Sandbox Code Playgroud)
既然segment_handler可以被不同的多个线程调用segment,那么fs::write线程安全吗?如果没有,我们就不能使用互斥体,因为segment: String每次调用的互斥体可能不同,并且使用互斥体会使性能变差。我需要类似互斥体的东西,但仅需要segment: String。这个问题的解决办法是什么?
环境:
您发布的代码无法编译,因为不存在诸如 之类的东西fs::create,但幸运的是您根本不需要它。该fs::write函数为您创建文件。
至少在 Linux 上,fs::write从多个不同线程在同一路径上同时调用将导致文件包含传递给其中一个fs::write调用的内容。请注意,如果您使用文件的存在来确定是否需要从缓存中读取或重新计算它,则可能会导致多个线程重新计算相同的值,然后所有线程都将其写入文件。
您应该注意,由于您使用的是 async/await,因此不允许您使用该std::fs模块,因为它会阻塞线程。你应该tokio::fs::write这样使用:
pub async fn segment_handler(segment: String) {
if is_cached {
return get_from_cache(segment)
}
// Do some computation to get the result.
let result = do_some_large_computation(segment);
// Cache this result to a file.
let file_name = &format!("./cache/{}", &segment);
tokio::fs::write(file_name, result).await.expect("Unable to write file");
result
}
Run Code Online (Sandbox Code Playgroud)
Another correct option is to use spawn_blocking like this:
pub async fn segment_handler(segment: String) {
if is_cached {
return get_from_cache(segment)
}
tokio::task::spawn_blocking(move || {
// Do some computation to get the result.
let result = do_some_large_computation(segment);
// Cache this result to a file.
let file_name = &format!("./cache/{}", &segment);
tokio::fs::write(file_name, result).await.expect("Unable to write file");
result
}).await.unwrap("Panic in spawn_blocking")
}
Run Code Online (Sandbox Code Playgroud)
You can read more about why you must properly handle blocking like this in CPU-bound tasks and blocking code from Tokio's documentation.
Tokio is able to concurrently run many tasks on a few threads by repeatedly swapping the currently running task on each thread. However, this kind of swapping can only happen at
.awaitpoints, so code that spends a long time without reaching an .await will prevent other tasks from running. To combat this, Tokio provides two kinds of threads: Core threads and blocking threads. The core threads are where all asynchronous code runs, and Tokio will by default spawn one for each CPU core. The blocking threads are spawned on demand, and can be used to run blocking code that would otherwise block other tasks from running.To spawn a blocking task, you should use the
spawn_blockingfunction.
Note that I have linked to Tokio 0.2's documentation as warp does not yet support Tokio 0.3.
To prevent having the value computed several times if the function is called several times before the first call finishes, you can use a technique based on a HashMap stored behind a mutex like this:
use std::collections::HashMap;
use std::sync::Mutex;
use tokio::sync::broadcast;
pub struct Cache {
inner: Mutex<Inner>,
}
struct Inner {
cached: HashMap<String, CachedType>,
pending: HashMap<String, broadcast::Sender<CachedType>>,
}
pub enum TryCached {
Exists(CachedType),
Pending(broadcast::Receiver<CachedType>),
New(),
}
impl Cache {
pub fn try_get(&self, key: &str) -> TryCached {
let mut inner = self.inner.lock().unwrap();
if let Some(value) = inner.cached.get(key) {
// To avoid clone, use HashMap<String, Arc<CachedType>> and clone anyway.
TryCached::Exists(value.clone())
} else if let Some(pending) = inner.pending.get(key) {
TryCached::Pending(pending.subscribe())
} else {
let (channel, _) = broadcast::channel(1);
inner.pending.insert(key.to_string(), channel);
TryCached::New()
}
}
pub fn put_computed(&self, key: String, value: CachedType) {
let mut inner = self.inner.lock().unwrap();
if let Some(chan) = inner.pending.remove(&key) {
chan.send(value.clone());
}
inner.cached.insert(key, value);
}
}
Run Code Online (Sandbox Code Playgroud)
The method can then be implemented as a call to try_get that does different things depending on the value of the returned enum.
pub async fn segment_handler(cache: &Cache, segment: String) -> CachedType {
match cache.try_get(&segment) {
TryCached::Exists(value) => value,
TryCached::Pending(mut chan) => chan.recv().await.expect("Sender dropped without sending"),
TryCached::New() => {
let (segment, value) = tokio::task::spawn_blocking(move || {
// Do some computation to get the result.
let result = do_some_large_computation(&segment);
// Cache this result to a file.
let file_name = &format!("./cache/{}", &segment);
std::fs::write(file_name, result.to_slice()).expect("Unable to write file");
(segment, result)
})
.await
.expect("Panic in spawn_blocking");
cache.put_computed(segment, value.clone());
value
}
}
}
Run Code Online (Sandbox Code Playgroud)
由于互斥锁,此方法是完全线程安全的。请注意,这使用同步互斥体而不是异步互斥体。要详细了解为什么这样做可以,请参阅Tokio 教程中的共享状态章节。