Nic*_*ard 21 python random algorithm distribution probability
我有一堆钥匙,每个钥匙都有一个不可靠的变量.我想随机选择其中一个键,但我希望它不太可能被选中(键,值)而不是不太可能(更可能)的对象.我想知道你是否会有任何建议,最好是我可以使用的现有python模块,否则我需要自己制作.
我检查了随机模块; 它似乎没有提供这个.
我必须为1000个不同的对象集做出数百万次这样的选择,每个对象包含2,455个对象.每个集合将在彼此之间交换对象,因此随机选择器需要是动态的.拥有1000套2,433件物品,即243.3万件物品; 低内存消耗至关重要.由于这些选择不是算法的主要部分,我需要这个过程非常快; CPU时间有限.
谢谢
更新:
好的,我试图明智地考虑你的建议,但时间是如此有限......
我查看了二叉搜索树方法,它看起来风险太大(复杂而复杂).其他建议都类似于ActiveState配方.我拿了它并稍微修改了一下,希望提高效率:
def windex(dict, sum, max):
'''an attempt to make a random.choose() function that makes
weighted choices accepts a dictionary with the item_key and
certainty_value as a pair like:
>>> x = [('one', 20), ('two', 2), ('three', 50)], the
maximum certainty value (max) and the sum of all certainties.'''
n = random.uniform(0, 1)
sum = max*len(list)-sum
for key, certainty in dict.iteritems():
weight = float(max-certainty)/sum
if n < weight:
break
n = n - weight
return key
Run Code Online (Sandbox Code Playgroud)
我希望通过动态保持确定性和最大确定性之和来获得效率增益.欢迎任何进一步的建议.你们节省了我这么多时间和精力,同时提高我的效率,这很疯狂.谢谢!谢谢!谢谢!
UPDATE2:
我决定让它一次选择更多选择,从而提高效率.这将导致我的算法中的精度损失可接受,因为它本质上是动态的.无论如何,这就是我现在拥有的:
def weightedChoices(dict, sum, max, choices=10):
'''an attempt to make a random.choose() function that makes
weighted choices accepts a dictionary with the item_key and
certainty_value as a pair like:
>>> x = [('one', 20), ('two', 2), ('three', 50)], the
maximum certainty value (max) and the sum of all certainties.'''
list = [random.uniform(0, 1) for i in range(choices)]
(n, list) = relavate(list.sort())
keys = []
sum = max*len(list)-sum
for key, certainty in dict.iteritems():
weight = float(max-certainty)/sum
if n < weight:
keys.append(key)
if list: (n, list) = relavate(list)
else: break
n = n - weight
return keys
def relavate(list):
min = list[0]
new = [l - min for l in list[1:]]
return (min, new)
Run Code Online (Sandbox Code Playgroud)
我还没有尝试过.如果您有任何意见/建议,请不要犹豫.谢谢!
UPDATE3:
我一整天都在为Rex Logan的答案定制任务.它实际上是一个特殊的字典类,而不是2个对象和权重数组; 因为Rex的代码生成一个随机索引,所以事情变得非常复杂......我还编写了一个测试用例,它类似于我算法中会发生的事情(但直到我尝试才真正知道!).基本原则是:经常随机生成密钥越多,再次生成密钥的可能性就越小:
import random, time
import psyco
psyco.full()
class ProbDict():
"""
Modified version of Rex Logans RandomObject class. The more a key is randomly
chosen, the more unlikely it will further be randomly chosen.
"""
def __init__(self,keys_weights_values={}):
self._kw=keys_weights_values
self._keys=self._kw.keys()
self._len=len(self._keys)
self._findSeniors()
self._effort = 0.15
self._fails = 0
def __iter__(self):
return self.next()
def __getitem__(self, key):
return self._kw[key]
def __setitem__(self, key, value):
self.append(key, value)
def __len__(self):
return self._len
def next(self):
key=self._key()
while key:
yield key
key = self._key()
def __contains__(self, key):
return key in self._kw
def items(self):
return self._kw.items()
def pop(self, key):
try:
(w, value) = self._kw.pop(key)
self._len -=1
if w == self._seniorW:
self._seniors -= 1
if not self._seniors:
#costly but unlikely:
self._findSeniors()
return [w, value]
except KeyError:
return None
def popitem(self):
return self.pop(self._key())
def values(self):
values = []
for key in self._keys:
try:
values.append(self._kw[key][1])
except KeyError:
pass
return values
def weights(self):
weights = []
for key in self._keys:
try:
weights.append(self._kw[key][0])
except KeyError:
pass
return weights
def keys(self, imperfect=False):
if imperfect: return self._keys
return self._kw.keys()
def append(self, key, value=None):
if key not in self._kw:
self._len +=1
self._kw[key] = [0, value]
self._keys.append(key)
else:
self._kw[key][1]=value
def _key(self):
for i in range(int(self._effort*self._len)):
ri=random.randint(0,self._len-1) #choose a random object
rx=random.uniform(0,self._seniorW)
rkey = self._keys[ri]
try:
w = self._kw[rkey][0]
if rx >= w: # test to see if that is the value we want
w += 1
self._warnSeniors(w)
self._kw[rkey][0] = w
return rkey
except KeyError:
self._keys.pop(ri)
# if you do not find one after 100 tries then just get a random one
self._fails += 1 #for confirming effectiveness only
for key in self._keys:
if key in self._kw:
w = self._kw[key][0] + 1
self._warnSeniors(w)
self._kw[key][0] = w
return key
return None
def _findSeniors(self):
'''this function finds the seniors, counts them and assess their age. It
is costly but unlikely.'''
seniorW = 0
seniors = 0
for w in self._kw.itervalues():
if w >= seniorW:
if w == seniorW:
seniors += 1
else:
seniorsW = w
seniors = 1
self._seniors = seniors
self._seniorW = seniorW
def _warnSeniors(self, w):
#a weight can only be incremented...good
if w >= self._seniorW:
if w == self._seniorW:
self._seniors+=1
else:
self._seniors = 1
self._seniorW = w
def test():
#test code
iterations = 200000
size = 2500
nextkey = size
pd = ProbDict(dict([(i,[0,i]) for i in xrange(size)]))
start = time.clock()
for i in xrange(iterations):
key=pd._key()
w=pd[key][0]
if random.randint(0,1+pd._seniorW-w):
#the heavier the object, the more unlikely it will be removed
pd.pop(key)
probAppend = float(500+(size-len(pd)))/1000
if random.uniform(0,1) < probAppend:
nextkey+=1
pd.append(nextkey)
print (time.clock()-start)*1000/iterations, "msecs / iteration with", pd._fails, "failures /", iterations, "iterations"
weights = pd.weights()
weights.sort()
print "avg weight:", float(sum(weights))/pd._len, max(weights), pd._seniorW, pd._seniors, len(pd), len(weights)
print weights
test()
Run Code Online (Sandbox Code Playgroud)
任何评论仍然欢迎.@Darius:你的二叉树对我来说太复杂和复杂; 而且我不认为它的叶子可以被有效地移除......谢谢所有
Dav*_*vid 26
这个activestate配方提供了一种易于理解的方法,特别是评论中的版本,不需要您预先标准化您的权重:
import random
def weighted_choice(items):
"""items is a list of tuples in the form (item, weight)"""
weight_total = sum((item[1] for item in items))
n = random.uniform(0, weight_total)
for item, weight in items:
if n < weight:
return item
n = n - weight
return item
Run Code Online (Sandbox Code Playgroud)
如果您有大量项目,这将很慢.在这种情况下,二进制搜索可能会更好......但写入也会更复杂,如果样本量很小,则获得的收益很小. 以下是python中二进制搜索方法的一个示例,如果您想要遵循该路由.
(我建议对数据集上的两种方法进行一些快速性能测试.这种算法的不同方法的性能通常有点不直观.)
编辑:我接受了自己的建议,因为我很好奇,并做了一些测试.
我比较了四种方法:
上面的weighted_choice函数.
二进制搜索选择函数如下:
def weighted_choice_bisect(items):
added_weights = []
last_sum = 0
for item, weight in items:
last_sum += weight
added_weights.append(last_sum)
return items[bisect.bisect(added_weights, random.random() * last_sum)][0]
Run Code Online (Sandbox Code Playgroud)
编译版本1:
def weighted_choice_compile(items):
"""returns a function that fetches a random item from items
items is a list of tuples in the form (item, weight)"""
weight_total = sum((item[1] for item in items))
def choice(uniform = random.uniform):
n = uniform(0, weight_total)
for item, weight in items:
if n < weight:
return item
n = n - weight
return item
return choice
Run Code Online (Sandbox Code Playgroud)
2的编译版本:
def weighted_choice_bisect_compile(items):
"""Returns a function that makes a weighted random choice from items."""
added_weights = []
last_sum = 0
for item, weight in items:
last_sum += weight
added_weights.append(last_sum)
def choice(rnd=random.random, bis=bisect.bisect):
return items[bis(added_weights, rnd() * last_sum)][0]
return choice
Run Code Online (Sandbox Code Playgroud)
然后,我建立了一个很大的选择列表,如下所示:
choices = [(random.choice("abcdefg"), random.uniform(0,50)) for i in xrange(2500)]
Run Code Online (Sandbox Code Playgroud)
一个过于简单的分析功能:
def profiler(f, n, *args, **kwargs):
start = time.time()
for i in xrange(n):
f(*args, **kwargs)
return time.time() - start
Run Code Online (Sandbox Code Playgroud)
结果:
(对该函数进行1,000次调用需要几秒钟.)
"编译"结果包括编译选择函数一次所花费的平均时间.(我计算了1000次编译,然后将该时间除以1,000,并将结果添加到选择函数时间.)
所以:如果你有一个项目+这很少改变权重的列表,二进制编译的方法是目前为止最快的.
在对原始帖子的评论中,尼古拉斯伦纳德认为交换和抽样都需要快速.这是一个关于那个案例的想法; 我没试过.
如果只需要快速采样,我们可以使用值的数组以及它们概率的运行总和,并对运行总和进行二进制搜索(键是一个统一的随机数) - 一个O(log( n))操作.但是交换将需要更新在交换条目之后出现的所有运行总和值--O(n)操作.(您可以选择只交换列表末尾附近的项目吗?我会假设没有.)
因此,让我们在两个操作中都以O(log(n))为目标.而不是数组,为每个样本保留一个二叉树来进行采样.叶子保存样本值及其(非标准化)概率.分支节点保存其子节点的总概率.
要进行采样,请x
在0和根的总概率之间生成一个均匀的随机数,然后下降树.在每个分支处,如果左孩子具有总概率,则选择左孩子<= x
.否则从左侧儿童减去概率x
并向右减去.返回您到达的叶子值.
要进行交换,请从树中移除叶子并调整导向它的分支(降低它们的总概率,并删除任何单子分支节点).将叶子插入目标树:您可以选择放置它的位置,因此请保持平衡.在每个级别挑选一个随机的孩子可能已经足够 - 这就是我开始的地方.增加每个父节点的概率,返回到根节点.
现在,采样和交换平均为O(log(n)).(如果需要保证平衡,一种简单的方法是在分支节点中添加另一个字段,保存整个子树中的叶子数.当添加叶子时,在每个级别选择叶子较少的子节点.这样就有可能树只通过删除变得不平衡;如果集合之间存在合理均匀的流量,则这不是问题,但如果是,则在删除期间使用遍历中每个节点上的叶子计数信息选择轮换.)
更新:根据要求,这是一个基本实现.根本没有调整过.用法:
>>> t1 = build_tree([('one', 20), ('two', 2), ('three', 50)])
>>> t1
Branch(Leaf(20, 'one'), Branch(Leaf(2, 'two'), Leaf(50, 'three')))
>>> t1.sample()
Leaf(50, 'three')
>>> t1.sample()
Leaf(20, 'one')
>>> t2 = build_tree([('four', 10), ('five', 30)])
>>> t1a, t2a = transfer(t1, t2)
>>> t1a
Branch(Leaf(20, 'one'), Leaf(2, 'two'))
>>> t2a
Branch(Leaf(10, 'four'), Branch(Leaf(30, 'five'), Leaf(50, 'three')))
Run Code Online (Sandbox Code Playgroud)
码:
import random
def build_tree(pairs):
tree = Empty()
for value, weight in pairs:
tree = tree.add(Leaf(weight, value))
return tree
def transfer(from_tree, to_tree):
"""Given a nonempty tree and a target, move a leaf from the former to
the latter. Return the two updated trees."""
leaf, from_tree1 = from_tree.extract()
return from_tree1, to_tree.add(leaf)
class Tree:
def add(self, leaf):
"Return a new tree holding my leaves plus the given leaf."
abstract
def sample(self):
"Pick one of my leaves at random in proportion to its weight."
return self.sampling(random.uniform(0, self.weight))
def extract(self):
"""Pick one of my leaves and return it along with a new tree
holding my leaves minus that one leaf."""
return self.extracting(random.uniform(0, self.weight))
class Empty(Tree):
weight = 0
def __repr__(self):
return 'Empty()'
def add(self, leaf):
return leaf
def sampling(self, weight):
raise Exception("You can't sample an empty tree")
def extracting(self, weight):
raise Exception("You can't extract from an empty tree")
class Leaf(Tree):
def __init__(self, weight, value):
self.weight = weight
self.value = value
def __repr__(self):
return 'Leaf(%r, %r)' % (self.weight, self.value)
def add(self, leaf):
return Branch(self, leaf)
def sampling(self, weight):
return self
def extracting(self, weight):
return self, Empty()
def combine(left, right):
if isinstance(left, Empty): return right
if isinstance(right, Empty): return left
return Branch(left, right)
class Branch(Tree):
def __init__(self, left, right):
self.weight = left.weight + right.weight
self.left = left
self.right = right
def __repr__(self):
return 'Branch(%r, %r)' % (self.left, self.right)
def add(self, leaf):
# Adding to a random branch as a clumsy way to keep an
# approximately balanced tree.
if random.random() < 0.5:
return combine(self.left.add(leaf), self.right)
return combine(self.left, self.right.add(leaf))
def sampling(self, weight):
if weight < self.left.weight:
return self.left.sampling(weight)
return self.right.sampling(weight - self.left.weight)
def extracting(self, weight):
if weight < self.left.weight:
leaf, left1 = self.left.extracting(weight)
return leaf, combine(left1, self.right)
leaf, right1 = self.right.extracting(weight - self.left.weight)
return leaf, combine(self.left, right1)
Run Code Online (Sandbox Code Playgroud)
更新2:在回答另一个问题时,Jason Orendorff指出二进制树可以通过在数组中表示它们来保持完美平衡,就像经典的堆结构一样.(这也节省了在指针上花费的空间.)请参阅我对该答案的评论,以了解如何使他的代码适应这个问题.