使用Python中的谓词对可迭代进行分组

Doc*_*r J 9 python performance iterator

我正在解析这样的文件:

--header--
data1
data2
--header--
data3
data4
data5
--header--
--header--
...

我想要这样的团体:

[ [header, data1, data2], [header, data3, data4, data5], [header], [header], ... ]
Run Code Online (Sandbox Code Playgroud)

所以我可以像这样迭代它们:

for grp in group(open('file.txt'), lambda line: 'header' in line):
    for item in grp:
        process(item)
Run Code Online (Sandbox Code Playgroud)

并将detect-a-group逻辑与process-a-group逻辑分开.

但我需要一个可迭代的迭代,因为这些组可以任意大,我不想存储它们.也就是说,每当遇到"sentinel"或"header"项时,我想将一个iterable分成子组,如谓词所示.看起来这将是一项常见的任务,但我找不到有效的Pythonic实现.

这是一个愚蠢的追加到列表的实现:

def group(iterable, isstart=lambda x: x):
    """Group `iterable` into groups starting with items where `isstart(item)` is true.

    Start items are included in the group.  The first group may or may not have a 
    start item.  An empty `iterable` results in an empty result (zero groups)."""
    items = []
    for item in iterable:
        if isstart(item) and items:
            yield iter(items)
            items = []
        items.append(item)
    if items:
        yield iter(items) 
Run Code Online (Sandbox Code Playgroud)

感觉就像是一个不错的itertools版本,但它让我望而却步."明显的"(?!)groupby解决方案似乎不起作用,因为可能存在相邻的标头,并且它们需要分开组.我能想到的最好的是(ab)使用groupby一个保持计数器的关键功能:

def igroup(iterable, isstart=lambda x: x):
    def keyfunc(item):
        if isstart(item):
            keyfunc.groupnum += 1       # Python 2's closures leave something to be desired
        return keyfunc.groupnum
    keyfunc.groupnum = 0
    return (group for _, group in itertools.groupby(iterable, keyfunc))
Run Code Online (Sandbox Code Playgroud)

但我觉得Python可以做得更好 - 而且遗憾的是,这比哑巴列表更慢:

# ipython
%time deque(group(xrange(10 ** 7), lambda x: x % 1000 == 0), maxlen=0)
CPU times: user 4.20 s, sys: 0.03 s, total: 4.23 s

%time deque(igroup(xrange(10 ** 7), lambda x: x % 1000 == 0), maxlen=0)
CPU times: user 5.45 s, sys: 0.01 s, total: 5.46 s

为了方便您,这里有一些单元测试代码:

class Test(unittest.TestCase):
    def test_group(self):
        MAXINT, MAXLEN, NUMTRIALS = 100, 100000, 21
        isstart = lambda x: x == 0
        self.assertEqual(next(igroup([], isstart), None), None)
        self.assertEqual([list(grp) for grp in igroup([0] * 3, isstart)], [[0]] * 3)
        self.assertEqual([list(grp) for grp in igroup([1] * 3, isstart)], [[1] * 3])
        self.assertEqual(len(list(igroup([0,1,2] * 3, isstart))), 3)        # Catch hangs when groups are not consumed
        for _ in xrange(NUMTRIALS):
            expected, items = itertools.tee(itertools.starmap(random.randint, itertools.repeat((0, MAXINT), random.randint(0, MAXLEN))))
            for grpnum, grp in enumerate(igroup(items, isstart)):
                start = next(grp)
                self.assertTrue(isstart(start) or grpnum == 0)
                self.assertEqual(start, next(expected))
                for item in grp:
                    self.assertFalse(isstart(item))
                    self.assertEqual(item, next(expected))
Run Code Online (Sandbox Code Playgroud)

那么:我如何在Python中优雅高效地通过谓词对可迭代子进行子组化?

jfs*_*jfs 5

如何在Python中优雅高效地通过谓词对可迭代的子类进行子组化?

这是一个简洁,内存有效的实现,与您的问题非常类似:

from itertools import groupby, imap
from operator import itemgetter

def igroup(iterable, isstart):
    def key(item, count=[False]):
        if isstart(item):
           count[0] = not count[0] # start new group
        return count[0]
    return imap(itemgetter(1), groupby(iterable, key))
Run Code Online (Sandbox Code Playgroud)

它支持无限组.

tee基于解决方案的速度略快,但它消耗了当前组的内存(类似于list问题的基于解决方案):

from itertools import islice, tee

def group(iterable, isstart):
    it, it2 = tee(iterable)
    count = 0
    for item in it:
        if isstart(item) and count:
            gr = islice(it2, count)
            yield gr
            for _ in gr:  # skip to the next group
                pass
            count = 0
        count += 1
    if count:
       gr = islice(it2, count)
       yield gr
       for _ in gr:  # skip to the next group
           pass
Run Code Online (Sandbox Code Playgroud)

groupby-solution可以用纯Python实现:

def igroup_inline_key(iterable, isstart):
    it = iter(iterable)

    def grouper():
        """Yield items from a single group."""
        while not p[START]:
            yield p[VALUE]  # each group has at least one element (a header)
            p[VALUE] = next(it)
            p[START] = isstart(p[VALUE])

    p = [None]*2 # workaround the absence of `nonlocal` keyword in Python 2.x
    START, VALUE = 0, 1
    p[VALUE] = next(it)
    while True:
        p[START] = False # to distinguish EOF and a start of new group
        yield grouper()
        while not p[START]: # skip to the next group
            p[VALUE] = next(it)
            p[START] = isstart(p[VALUE])
Run Code Online (Sandbox Code Playgroud)

为避免重复代码,while True循环可写为:

while True:
    p[START] = False  # to distinguish EOF and a start of new group
    g = grouper()
    yield g
    if not p[START]:  # skip to the next group
        for _ in g:
            pass
        if not p[START]:  # EOF
            break
Run Code Online (Sandbox Code Playgroud)

虽然之前的变体可能更明确和可读.

我认为纯Python中一般的内存高效解决方案不会明显快于groupby基于Python的解决方案.

如果process(item)快速比较igroup()并且可以在字符串中有效地找到标头(例如,对于固定的静态标头),那么您可以通过以大块读取文件并拆分标头值来提高性能.它应该使你的任务IO绑定.


pil*_*her 4

我没有完全阅读您的所有代码,但我认为这可能会有所帮助:

from itertools import izip, tee, chain


def pairwise(iterable):
    a, b = tee(iterable)
    return izip(a, chain(b, [next(b, None)]))


def group(iterable, isstart):

    pairs = pairwise(iterable)

    def extract(current, lookahead, pairs=pairs, isstart=isstart):
        yield current
        if isstart(lookahead):
            return
        for current, lookahead in pairs:
            yield current
            if isstart(lookahead):
                return

    for start, lookahead in pairs:
        gen = extract(start, lookahead)
        yield gen
        for _ in gen:
            pass


for gen in group(xrange(4, 16), lambda x: x % 5 == 0):
    print '------------------'
    for n in gen:
        print n

print [list(g) for g in group([], lambda x: x % 5 == 0)]
Run Code Online (Sandbox Code Playgroud)

结果:

$ python gen.py
------------------
4
------------------
5
6
7
8
9
------------------
10
11
12
13
14
------------------
15
[]
Run Code Online (Sandbox Code Playgroud)

编辑:

这是另一个解决方案,与上面类似,但没有pairwise()和 哨兵。我不知道哪个更快:

def group(iterable, isstart):

    sentinel = object()

    def interleave(iterable=iterable, isstart=isstart, sentinel=sentinel):
        for item in iterable:
            if isstart(item):
                yield sentinel
            yield item

    items = interleave()

    def extract(item, items=items, isstart=isstart, sentinel=sentinel):
        if item is not sentinel:
            yield item
        for item in items:
            if item is sentinel:
                return
            yield item

    for lookahead in items:
        gen = extract(lookahead)
        yield gen
        for _ in gen:
            pass
Run Code Online (Sandbox Code Playgroud)

由于 JFSebastians 提出了穷举跳过子组生成器的想法,现在两者都通过了测试用例。