纯Ruby并发哈希

Yeh*_*atz 61 ruby concurrency hash locking

实现可以跨多个线程修改但具有最少数量的锁的Hash的最佳方法是什么.出于这个问题的目的,你可以假设哈希将是重读的.它必须在所有Ruby实现中都是线程安全的,包括以真正同步方式运行的实现,例如JRuby,并且它必须用纯Ruby编写(不允许使用C或Java).

随意提交始终锁定的天真解决方案,但这不太可能是最佳解决方案.优雅的要点,但较小的锁定可能性胜过较小的代码.

小智 22

好的,既然您已经指定了'threadsafe'的实际含义,那么这里有两个可能的实现.以下代码将在MRI和JRuby中永久运行.无锁实现遵循最终的一致性模型,其中如果主服务器处于不稳定状态,则每个线程使用它自己的哈希视图.确保在线程中存储所有信息不会泄漏内存,但需要进行处理和测试需要一些小技巧 - 进程大小不会增加运行此代码.这两种实现都需要更多工作才能"完成",这意味着删除,更新等需要一些思考,但下面两个概念中的任何一个都能满足您的要求.

阅读这个帖子的人非常重要的是要意识到整个问题是JRuby独有的 - 在MRI中,内置Hash就足够了.

module Cash
  def Cash.new(*args, &block)
    env = ENV['CASH_IMPL']
    impl = env ? Cash.const_get(env) : LocklessImpl
    klass = defined?(JRUBY_VERSION) ? impl : ::Hash
    klass.new(*args)
  end

  class LocklessImpl
    def initialize
      @hash = {}
    end

    def thread_hash
      thread = Thread.current
      thread[:cash] ||= {}
      hash = thread[:cash][thread_key]
      if hash
        hash
      else
        hash = thread[:cash][thread_key] = {}
        ObjectSpace.define_finalizer(self){ thread[:cash].delete(thread_key) }
        hash
      end
    end

    def thread_key
      [Thread.current.object_id, object_id]
    end

    def []=(key, val)
      time = Time.now.to_f
      tuple = [time, val]
      @hash[key] = tuple
      thread_hash[key] = tuple
      val
    end

    def [](key)
    # check the master value
    #
      val = @hash[key]

    # someone else is either writing the key or it has never been set.  we
    # need to invalidate our own copy in either case
    #
      if val.nil?
        thread_val = thread_hash.delete(key)
        return(thread_val ? thread_val.last : nil)
      end

    # check our own thread local value
    #
      thread_val = thread_hash[key]

    # in this case someone else has written a value that we have never seen so
    # simply return it
    #
      if thread_val.nil?
        return(val.last)
      end

    # in this case there is a master *and* a thread local value, if the master
    # is newer juke our own cached copy
    #
      if val.first > thread_val.first
        thread_hash.delete(key)
        return val.last
      else
        return thread_val.last
      end
    end
  end

  class LockingImpl < ::Hash
    require 'sync'

    def initialize(*args, &block)
      super
    ensure
      extend Sync_m
    end

    def sync(*args, &block)
      sync_synchronize(*args, &block)
    end

    def [](key)
      sync(:SH){ super }
    end

    def []=(key, val)
      sync(:EX){ super }
    end
  end
end



if $0 == __FILE__
  iteration = 0

  loop do
    n = 42
    hash = Cash.new

    threads =
      Array.new(10) {
        Thread.new do
          Thread.current.abort_on_exception = true
          n.times do |key|
            hash[key] = key
            raise "#{ key }=nil" if hash[key].nil?
          end
        end
      }

    threads.map{|thread| thread.join}

    puts "THREADSAFE: #{ iteration += 1 }"
  end
end
Run Code Online (Sandbox Code Playgroud)

  • 对不起,我花了很长时间才回去读这篇文章.我其实真的很喜欢这个,但是你的断言"这只是JRuby中的一个问题"让我感到有点不舒服.今天这可能是真的(或者可能不是,一旦像macruby和磁悬浮一样被考虑在内),但它很难被视为未来的一个给定.事实上,如果Ruby 2.0设法完全避免GIL支持更细粒度的锁,我不会感到惊讶.这可能是Maglev,MacRuby和Rubinius的一个问题. (2认同)

小智 10

发布基础/天真的解决方案,只是为了提高我的Stack Overflow信誉:

require 'thread'

class ConcurrentHash < Hash
  def initialize
    super
    @mutex = Mutex.new
  end

  def [](*args)
    @mutex.synchronize { super }
  end

  def []=(*args)
    @mutex.synchronize { super }
  end
end
Run Code Online (Sandbox Code Playgroud)

  • 是不是声誉的目标是悬挂胡萝卜强迫用户给出好的答案? (5认同)
  • 难道你不想,至少想要对Hash上的所有变异操作进行互操作吗? (5认同)

小智 7

Yehuda,我想你提到ivar设置是原子的吗?那么简单的复制和交换呢?

require 'thread'

class ConcurrentHash
  def initialize
    @reader, @writer = {}, {}
    @lock = Mutex.new
  end

  def [](key)
    @reader[key]
  end

  def []=(key, value)
    @lock.synchronize {
      @writer[key] = value
      @reader, @writer = @writer, @reader
      @writer[key] = value
    }
  end
end
Run Code Online (Sandbox Code Playgroud)

  • 其中一个区别.`clone`也会复制单例方法,而`dup`则不会. (4认同)
  • 克隆不起作用.哈希仍然会被冻结,而'h [key] = value`会爆炸. (2认同)