带有generator/iterable/iterator的Python随机样本

Mat*_*yra 38 python random generator

你知道是否有办法让python random.sample与生成器对象一起工作.我试图从一个非常大的文本语料库中获取随机样本.问题是random.sample()引发以下错误.

TypeError: object of type 'generator' has no len()
Run Code Online (Sandbox Code Playgroud)

我在想,也许有一些方法itertools可以用来自某些东西来做这件事但却找不到任何有点搜索的东西.

一个有点组成的例子:

import random
def list_item(ls):
    for item in ls:
        yield item

random.sample( list_item(range(100)), 20 )
Run Code Online (Sandbox Code Playgroud)

UPDATE


按照MartinPieters的要求,我做了目前提出了三种方法的一些具体时机.结果如下.

Sampling 1000 from 10000
Using iterSample 0.0163 s
Using sample_from_iterable 0.0098 s
Using iter_sample_fast 0.0148 s

Sampling 10000 from 100000
Using iterSample 0.1786 s
Using sample_from_iterable 0.1320 s
Using iter_sample_fast 0.1576 s

Sampling 100000 from 1000000
Using iterSample 3.2740 s
Using sample_from_iterable 1.9860 s
Using iter_sample_fast 1.4586 s

Sampling 200000 from 1000000
Using iterSample 7.6115 s
Using sample_from_iterable 3.0663 s
Using iter_sample_fast 1.4101 s

Sampling 500000 from 1000000
Using iterSample 39.2595 s
Using sample_from_iterable 4.9994 s
Using iter_sample_fast 1.2178 s

Sampling 2000000 from 5000000
Using iterSample 798.8016 s
Using sample_from_iterable 28.6618 s
Using iter_sample_fast 6.6482 s
Run Code Online (Sandbox Code Playgroud)

事实证明,array.insert当涉及到大样本量时,它有一个严重的缺点.我用来计算方法的代码

from heapq import nlargest
import random
import timeit


def iterSample(iterable, samplesize):
    results = []
    for i, v in enumerate(iterable):
        r = random.randint(0, i)
        if r < samplesize:
            if i < samplesize:
                results.insert(r, v) # add first samplesize items in random order
            else:
                results[r] = v # at a decreasing rate, replace random items

    if len(results) < samplesize:
        raise ValueError("Sample larger than population.")

    return results

def sample_from_iterable(iterable, samplesize):
    return (x for _, x in nlargest(samplesize, ((random.random(), x) for x in iterable)))

def iter_sample_fast(iterable, samplesize):
    results = []
    iterator = iter(iterable)
    # Fill in the first samplesize elements:
    for _ in xrange(samplesize):
        results.append(iterator.next())
    random.shuffle(results)  # Randomize their positions
    for i, v in enumerate(iterator, samplesize):
        r = random.randint(0, i)
        if r < samplesize:
            results[r] = v  # at a decreasing rate, replace random items

    if len(results) < samplesize:
        raise ValueError("Sample larger than population.")
    return results

if __name__ == '__main__':
    pop_sizes = [int(10e+3),int(10e+4),int(10e+5),int(10e+5),int(10e+5),int(10e+5)*5]
    k_sizes = [int(10e+2),int(10e+3),int(10e+4),int(10e+4)*2,int(10e+4)*5,int(10e+5)*2]

    for pop_size, k_size in zip(pop_sizes, k_sizes):
        pop = xrange(pop_size)
        k = k_size
        t1 = timeit.Timer(stmt='iterSample(pop, %i)'%(k_size), setup='from __main__ import iterSample,pop')
        t2 = timeit.Timer(stmt='sample_from_iterable(pop, %i)'%(k_size), setup='from __main__ import sample_from_iterable,pop')
        t3 = timeit.Timer(stmt='iter_sample_fast(pop, %i)'%(k_size), setup='from __main__ import iter_sample_fast,pop')

        print 'Sampling', k, 'from', pop_size
        print 'Using iterSample', '%1.4f s'%(t1.timeit(number=100) / 100.0)
        print 'Using sample_from_iterable', '%1.4f s'%(t2.timeit(number=100) / 100.0)
        print 'Using iter_sample_fast', '%1.4f s'%(t3.timeit(number=100) / 100.0)
        print ''
Run Code Online (Sandbox Code Playgroud)

我还进行了一项测试,检查所有方法确实采用了无偏的发生器样本.因此,对于所有方法,我100010000 100000时间中采样元素并计算人口中每个项目的平均出现频率,结果证明这是~.1所有三种方法所期望的.

Dzi*_*inX 22

虽然Martijn Pieters的答案是正确的,但是当它samplesize变大时确实会减慢,因为list.insert在循环中使用可能具有二次复杂性.

在我看来,这是一种替代方案,可以在提高性能的同时保持一致性:

def iter_sample_fast(iterable, samplesize):
    results = []
    iterator = iter(iterable)
    # Fill in the first samplesize elements:
    try:
        for _ in xrange(samplesize):
            results.append(iterator.next())
    except StopIteration:
        raise ValueError("Sample larger than population.")
    random.shuffle(results)  # Randomize their positions
    for i, v in enumerate(iterator, samplesize):
        r = random.randint(0, i)
        if r < samplesize:
            results[r] = v  # at a decreasing rate, replace random items
    return results
Run Code Online (Sandbox Code Playgroud)

差异缓慢开始显示samplesize上面的值10000.打电话的时间(1000000, 100000):

  • iterSample:5.05s
  • iter_sample_fast:2.64s

  • 这是水库采样?对吧https://en.wikipedia.org/wiki/Reservoir_sampling (3认同)
  • +1对此算法的更新.我是`iterSample`的原始作者(在MartijnPieters链接的早期答案中),虽然使用`list.insert`的初始化代码的复杂性问题已经发生在我身上,但我自己从未解决过这个问题. (2认同)
  • @larsmans:Python的`random.sample`返回混乱结果(来自docs:"结果列表按选择顺序排列,所以所有子切片也都是有效的随机样本.")如果你不需要改组结果(例如,对于`len(iterable)== samplesize`,它们将按照它们进入的确切顺序),然​​后你可以跳过最初的改组. (2认同)

Mar*_*ers 17

你不能.

您有两个选择:将整个生成器读入列表,然后从该列表中进行采样,或者使用逐个读取生成器的方法并从中选择样本:

import random

def iterSample(iterable, samplesize):
    results = []

    for i, v in enumerate(iterable):
        r = random.randint(0, i)
        if r < samplesize:
            if i < samplesize:
                results.insert(r, v) # add first samplesize items in random order
            else:
                results[r] = v # at a decreasing rate, replace random items

    if len(results) < samplesize:
        raise ValueError("Sample larger than population.")

    return results
Run Code Online (Sandbox Code Playgroud)

此方法根据到目前为止迭代中的项目数来调整下一个项目是样本的一部分的可能性.它不需要samplesize在内存中保存多个项目.

解决方案不是我的; 它是作为SO另一个答案的一部分提供的.

  • @MattiLyra:当项目很大时,将项目添加到 python 列表中不需要额外的成本。请参阅[Python时间复杂度](http://wiki.python.org/moin/TimeComplexity);附加是 O(1) 恒定成本。 (2认同)

Fre*_*Foo 7

只是为了它,这里是一个单行程序,可以在没有替换O(n lg k)时间内生成的n个项目的情况下对k个元素进行采样:

from heapq import nlargest

def sample_from_iterable(it, k):
    return (x for _, x in nlargest(k, ((random.random(), x) for x in it)))
Run Code Online (Sandbox Code Playgroud)