Jul*_*iet 5 f# multithreading async-workflow
您总是听说功能代码本质上比非功能代码更容易并行化,因此我决定编写一个执行以下操作的函数:
给定字符串输入,总计每个字符串的唯一字符数.因此,给定输入[ "aaaaa"; "bbb"; "ccccccc"; "abbbc" ],我们的方法将返回a: 6; b: 6; c: 8.
这是我写的:
(* seq<#seq<char>> -> Map<char,int> *)
let wordFrequency input =
input
|> Seq.fold (fun acc text ->
(* This inner loop can be processed on its own thread *)
text
|> Seq.choose (fun char -> if Char.IsLetter char then Some(char) else None)
|> Seq.fold (fun (acc : Map<_,_>) item ->
match acc.TryFind(item) with
| Some(count) -> acc.Add(item, count + 1)
| None -> acc.Add(item, 1))
acc
) Map.empty
Run Code Online (Sandbox Code Playgroud)
这段代码理想上是可并行化的,因为每个字符串input都可以在自己的线程上处理.它不像它看起来那么简单,因为内部循环将项添加到所有输入之间共享的Map.
我想将内部循环考虑到自己的线程中,我不想使用任何可变状态.如何使用Async工作流程重新编写此功能?
正如已经指出的,如果您尝试让不同的线程处理不同的输入字符串,则会出现更新争用,因为每个线程都可以增加每个字母的计数。您可以让每个线程生成自己的映射,然后“将所有映射相加”,但最后一步可能会很昂贵(并且由于共享数据而不太适合利用线程)。我认为使用像下面这样的算法,大输入可能会运行得更快,其中每个线程处理不同的字母计数(对于输入中的所有字符串)。因此,每个线程都有自己独立的计数器,因此没有更新争用,也没有最后一步来组合结果。然而,我们需要预处理来发现“唯一字母集”,并且此步骤确实存在相同的争用问题。(实际上,您可能预先知道字符的范围,例如字母表,然后可以创建 26 个线程来处理 az,并绕过此问题。)无论如何,大概问题主要是关于探索“如何编写 F#”异步代码跨线程划分工作,因此下面的代码演示了它。
#light
let input = [| "aaaaa"; "bbb"; "ccccccc"; "abbbc" |]
// first discover all unique letters used
let Letters str =
str |> Seq.fold (fun set c -> Set.add c set) Set.empty
let allLetters =
input |> Array.map (fun str ->
async { return Letters str })
|> Async.Parallel
|> Async.Run
|> Set.union_all // note, this step is single-threaded,
// if input has many strings, can improve this
// Now count each letter on a separate thread
let CountLetter letter =
let mutable count = 0
for str in input do
for c in str do
if letter = c then
count <- count + 1
letter, count
let result =
allLetters |> Seq.map (fun c ->
async { return CountLetter c })
|> Async.Parallel
|> Async.Run
// print results
for letter,count in result do
printfn "%c : %d" letter count
Run Code Online (Sandbox Code Playgroud)
我确实“完全改变了算法”,主要是因为我原来的算法由于更新争用而不太适合直接数据并行化。根据您要学习的内容,这个答案可能会让您特别满意,也可能不会让您特别满意。