Jac*_*els 0 concurrency crystal-lang
I'm trying to implement a crawler that visits some URL, collects new relative URLs from it and builds a report. I'm trying to do it concurrently using Crystal fibers and channels, like the following:
urls = [...] # of String
visited_urls = []
pool_size.times do
spawn do
loop do
url = urls.shift?
break if url.nil?
channel.send(url) if some_condition
end
end
end
# TODO: here the problem!
loop do
url = channel.receive?
break if url.nil? || channel.closed?
visited_urls << url
end
puts visited_urls.inspect
Run Code Online (Sandbox Code Playgroud)
But here I have a problem - infinite second loop (it calls channel.receive? till the last item in the channel and than waits for a new message that never arrives). Issue exists because I never know how much items actually in the channel, so I can't do like proposed in the Concurency section of the Crystal lang Guides.
So maybe there are some good practices how to work with the channel when we don't know how much items it will store and we need to receive? Thanks!
一个常见的解决方案是具有杀死值。作为主数据流的一部分,如下所示:
results = Channel(String|Symbol).new(POOL_SIZE * 2)
POOL_SIZE.times do
spawn do
while has_work?
results.send "some work result"
end
results.send :done
end
end
done_workers = 0
loop do
message = results.receive
if message == :done
done_workers += 1
break if done_workers == POOL_SIZE
elsif message.is_a? String
puts "Got: #{message}"
end
end
Run Code Online (Sandbox Code Playgroud)
或通过次要渠道向事件发送信号:
results = Channel(String).new(POOL_SIZE * 2)
done = Channel(Nil).new(POOL_SIZE)
POOL_SIZE.times do
spawn do
while has_work?
results.send "some work result"
end
done.send nil
end
end
done_workers = 0
loop do
select
when message = results.receive
puts "Got: #{message}"
when done.receive
done_workers += 1
break if done_workers == POOL_SIZE
end
end
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
53 次 |
| 最近记录: |