由于蟒蛇list的bytes值:
# actual str values un-important
[
b'foo',
b'bar',
b'baz',
...
]
Run Code Online (Sandbox Code Playgroud)
如何将列表分成块,其中每个块的最大内存大小低于某个上限?
例如:如果上限为 7 个字节,那么原始列表将被分解为一个列表列表
[
[b'foo', b'bar'], # sublist 0
[b'baz'], # sublist 1
...
]
Run Code Online (Sandbox Code Playgroud)
根据列表内容的累积长度,每个子列表最多为 7 个字节。
注意:每个子列表都应该按照原始列表的顺序进行最大程度的打包。在上面的示例中,前 2 个 str 值被分组,因为它是 7 字节限制下的最大可能值。
预先感谢您的考虑和回应。
可以贪婪地解决序列的最优分割问题,使得元素满足给定的最大/最小条件,同时保持元素的顺序。因此,您只需迭代输入序列一次并维护元素缓冲区。在 Python 中,可以使用生成器进行优雅的编码,其优点是不需要创建结果。
您的问题的大部分算法如下:
def split_by_size(items, max_size, get_size=len):
buffer = []
buffer_size = 0
for item in items:
item_size = get_size(item)
if buffer_size + item_size <= max_size:
buffer.append(item)
buffer_size += item_size
else:
yield buffer
buffer = [item]
buffer_size = item_size
if buffer_size > 0:
yield buffer
Run Code Online (Sandbox Code Playgroud)
其中最后一个参数将确定给定项目的大小的问题委托给指定的可调用对象。我不会详细讨论这一点,但我会假设一个简单的方法就len()可以了。此外,这假设每个元素都单独满足条件,否则也应该处理这种情况。
测试上面的代码:
import random
k = 10
n = 15
max_size = 10
random.seed(0)
items = [b'x' * random.randint(1, 2 * k // 3) for _ in range(n)]
print(items)
# [b'xxxx', b'xxxx', b'x', b'xxx', b'xxxxx', b'xxxx', b'xxxx', b'xxx', b'xxxx', b'xxx', b'xxxxx', b'xx', b'xxxxx', b'xx', b'xxx']
print(list(split_by_size(items, k)))
# [[b'xxxx', b'xxxx', b'x'], [b'xxx', b'xxxxx'], [b'xxxx', b'xxxx'], [b'xxx', b'xxxx', b'xxx'], [b'xxxxx', b'xx'], [b'xxxxx', b'xx', b'xxx']]
Run Code Online (Sandbox Code Playgroud)
另外,如果您愿意以某种方式存储分割结果list,则上述方法的代码可以稍微紧凑一些:
def chunks_by_size(items, max_size, get_size=len):
result = []
size = max_size + 1
for item in items:
item_size = get_size(item)
size += item_size
if size > max_size:
result.append([])
size = item_size
result[-1].append(item)
return result
Run Code Online (Sandbox Code Playgroud)
但也稍微慢一些(参见下面的基准)。
您还可以考虑使用(基本上与@NizamMohamed 答案functools.reduce()相同),代码会更短,但可读性可能也较差:
def chunks_by_size_reduce(items, size, get_size=len):
return functools.reduce(
lambda a, b, size=size:
a[-1].append(b) or a
if a and sum(get_size(x) for x in a[-1]) + get_size(b) <= size
else a.append([b]) or a, items, [])
Run Code Online (Sandbox Code Playgroud)
并且肯定效率较低,因为get_size()对所考虑的每个元素的“候选”内部列表的每个元素进行调用,这使得 thisO(n k!)成为k每个子序列中元素的平均数量。对于某些时间,请参阅下面的基准测试。
我不会对使用 的解决方案感到惊讶itertools.accumulate(),但这也必然会很慢。
加快速度的最简单方法是使用Cython或Numba。在这里,这被应用于split_by_size(). 对于他们两个来说,代码都不会改变。
对我们获得的所有这些进行基准测试(_cy代表 Cython 编译版本,而_nb代表 Numba 编译版本):
%timeit list(split_by_size(items * 100000, k + 1))
# 10 loops, best of 3: 281 ms per loop
%timeit list(split_by_size_cy(items * 100000, k + 1))
# 10 loops, best of 3: 181 ms per loop
%timeit list(split_by_size_nb(items * 100000, k + 1))
# 100 loops, best of 3: 5.17 ms per loop
%timeit chunks_by_size(items * 100000, k + 1)
# 10 loops, best of 3: 318 ms per loop
%timeit chunks_by_size_reduce(items * 100000, k + 1)
# 1 loop, best of 3: 1.18 s per loop
Run Code Online (Sandbox Code Playgroud)
请注意,虽然 Numba 编译的版本比替代版本快得多,但它也是最脆弱的,因为它需要将标志forceobj设置为True,这可能会导致执行不稳定。
不管怎样,如果最终目标是通过一些 I/O 操作发送分组的项目,我几乎不相信这会成为瓶颈。
请注意,该算法与其他答案几乎相同,我只是发现这里的代码更干净一些。
该解决方案与functools.reduce.
l = [b'abc', b'def', b'ghi', b'jklm', b'nopqrstuv', b'wx', b'yz']
reduce(lambda a, b, size=7: a[-1].append(b) or a if a and sum(len(x) for x in a[-1]) + len(b) <= size else a.append([b]) or a, l, [])
Run Code Online (Sandbox Code Playgroud)
a是一个空的list并且b是原始的一个项目list。
if a and sum(len(x) for x in a[-1]) + len(b) <= size
检查是否不为空且最后附加的a长度和 的长度之和不超过。 byteslistbsize
a[-1].append(b) or a
追加b到最后追加的list内容,如果条件为 则a返回。 aTrue
a.append([b]) or a
创建一个listwithb并将新的附加list到a并返回a
输出;
[[b'abc', b'def'], [b'ghi', b'jklm'], [b'nopqrstuv'], [b'wx', b'yz']]
Run Code Online (Sandbox Code Playgroud)