小智 22
好的,既然您已经指定了'threadsafe'的实际含义,那么这里有两个可能的实现.以下代码将在MRI和JRuby中永久运行.无锁实现遵循最终的一致性模型,其中如果主服务器处于不稳定状态,则每个线程使用它自己的哈希视图.确保在线程中存储所有信息不会泄漏内存,但需要进行处理和测试需要一些小技巧 - 进程大小不会增加运行此代码.这两种实现都需要更多工作才能"完成",这意味着删除,更新等需要一些思考,但下面两个概念中的任何一个都能满足您的要求.
阅读这个帖子的人非常重要的是要意识到整个问题是JRuby独有的 - 在MRI中,内置Hash就足够了.
module Cash
def Cash.new(*args, &block)
env = ENV['CASH_IMPL']
impl = env ? Cash.const_get(env) : LocklessImpl
klass = defined?(JRUBY_VERSION) ? impl : ::Hash
klass.new(*args)
end
class LocklessImpl
def initialize
@hash = {}
end
def thread_hash
thread = Thread.current
thread[:cash] ||= {}
hash = thread[:cash][thread_key]
if hash
hash
else
hash = thread[:cash][thread_key] = {}
ObjectSpace.define_finalizer(self){ thread[:cash].delete(thread_key) }
hash
end
end
def thread_key
[Thread.current.object_id, object_id]
end
def []=(key, val)
time = Time.now.to_f
tuple = [time, val]
@hash[key] = tuple
thread_hash[key] = tuple
val
end
def [](key)
# check the master value
#
val = @hash[key]
# someone else is either writing the key or it has never been set. we
# need to invalidate our own copy in either case
#
if val.nil?
thread_val = thread_hash.delete(key)
return(thread_val ? thread_val.last : nil)
end
# check our own thread local value
#
thread_val = thread_hash[key]
# in this case someone else has written a value that we have never seen so
# simply return it
#
if thread_val.nil?
return(val.last)
end
# in this case there is a master *and* a thread local value, if the master
# is newer juke our own cached copy
#
if val.first > thread_val.first
thread_hash.delete(key)
return val.last
else
return thread_val.last
end
end
end
class LockingImpl < ::Hash
require 'sync'
def initialize(*args, &block)
super
ensure
extend Sync_m
end
def sync(*args, &block)
sync_synchronize(*args, &block)
end
def [](key)
sync(:SH){ super }
end
def []=(key, val)
sync(:EX){ super }
end
end
end
if $0 == __FILE__
iteration = 0
loop do
n = 42
hash = Cash.new
threads =
Array.new(10) {
Thread.new do
Thread.current.abort_on_exception = true
n.times do |key|
hash[key] = key
raise "#{ key }=nil" if hash[key].nil?
end
end
}
threads.map{|thread| thread.join}
puts "THREADSAFE: #{ iteration += 1 }"
end
end
Run Code Online (Sandbox Code Playgroud)
小智 10
发布基础/天真的解决方案,只是为了提高我的Stack Overflow信誉:
require 'thread'
class ConcurrentHash < Hash
def initialize
super
@mutex = Mutex.new
end
def [](*args)
@mutex.synchronize { super }
end
def []=(*args)
@mutex.synchronize { super }
end
end
Run Code Online (Sandbox Code Playgroud)
小智 7
Yehuda,我想你提到ivar设置是原子的吗?那么简单的复制和交换呢?
require 'thread'
class ConcurrentHash
def initialize
@reader, @writer = {}, {}
@lock = Mutex.new
end
def [](key)
@reader[key]
end
def []=(key, value)
@lock.synchronize {
@writer[key] = value
@reader, @writer = @writer, @reader
@writer[key] = value
}
end
end
Run Code Online (Sandbox Code Playgroud)