How to read all the items from the Channel when you don't know their actual count

Jac*_*els 0 concurrency crystal-lang

I'm trying to implement a crawler that visits some URL, collects new relative URLs from it and builds a report. I'm trying to do it concurrently using Crystal fibers and channels, like the following:

urls = [...] # of String
visited_urls = []

pool_size.times do
  spawn do
    loop do
      url = urls.shift?
      break if url.nil?

      channel.send(url) if some_condition
    end
  end
end

# TODO: here the problem!
loop do
  url = channel.receive?
  break if url.nil? || channel.closed?

  visited_urls << url
end

puts visited_urls.inspect

Run Code Online (Sandbox Code Playgroud)

But here I have a problem - infinite second loop (it calls channel.receive? till the last item in the channel and than waits for a new message that never arrives). Issue exists because I never know how much items actually in the channel, so I can't do like proposed in the Concurency section of the Crystal lang Guides.

So maybe there are some good practices how to work with the channel when we don't know how much items it will store and we need to receive? Thanks!

Jon*_*Haß 5

一个常见的解决方案是具有杀死值。作为主数据流的一部分,如下所示:

results = Channel(String|Symbol).new(POOL_SIZE * 2)

POOL_SIZE.times do
  spawn do
    while has_work?
      results.send "some work result"
    end

    results.send :done
  end
end

done_workers = 0

loop do
  message = results.receive
  if message == :done
    done_workers += 1
    break if done_workers == POOL_SIZE
  elsif message.is_a? String
    puts "Got: #{message}"
  end
end
Run Code Online (Sandbox Code Playgroud)

或通过次要渠道向事件发送信号:

results = Channel(String).new(POOL_SIZE * 2)
done = Channel(Nil).new(POOL_SIZE)

POOL_SIZE.times do
  spawn do
    while has_work?
      results.send "some work result"
    end

    done.send nil
  end
end

done_workers = 0
loop do
  select
  when message = results.receive
    puts "Got: #{message}"
  when done.receive
    done_workers += 1
    break if done_workers == POOL_SIZE
  end
end
Run Code Online (Sandbox Code Playgroud)