我不知道如何在不使用_siftupor 的情况下有效地解决以下问题_siftdown:
当一个元素无序时,如何恢复堆不变式?
换句话说,更新old_value中heap至new_value,并保持heap工作。您可以假设old_value堆中只有一个。功能定义如下:
def update_value_in_heap(heap, old_value, new_value):
Run Code Online (Sandbox Code Playgroud)
这是我的真实场景,有兴趣的可以看看。
你可以想象它是一个小型的自动完成系统。我需要统计单词出现的频率,并保持前k个max-count单词,随时准备输出。所以我heap在这里使用。当一个字数++时,如果它在堆中,我需要更新它。
所有的词和计数都存储在 trie-tree 的叶子中,heaps
存储在 trie-tree 的中间节点中。如果你关心
堆外这个词,别担心,我可以从trie-tree的叶子节点中得到它。
当用户输入一个单词时,它将首先从堆中读取然后更新
它。为了获得更好的性能,我们可以考虑通过批量更新来降低更新频率。
那么当一个特定的字数增加时,如何更新堆呢?
这是 _siftup 或 _siftdown 版本的简单示例(不是我的场景):
>>> from heapq import _siftup, _siftdown, heapify, heappop
>>> data = [10, 5, 18, 2, 37, 3, 8, 7, 19, 1]
>>> heapify(data)
>>> old, new = 8, 22 # increase the 8 to 22
>>> i = data.index(old)
>>> data[i] = new
>>> _siftup(data, i)
>>> [heappop(data) for i in range(len(data))]
[1, 2, 3, 5, 7, 10, 18, 19, 22, 37]
>>> data = [10, 5, 18, 2, 37, 3, 8, 7, 19, 1]
>>> heapify(data)
>>> old, new = 8, 4 # decrease the 8 to 4
>>> i = data.index(old)
>>> data[i] = new
>>> _siftdown(data, 0, i)
>>> [heappop(data) for i in range(len(data))]
[1, 2, 3, 4, 5, 7, 10, 18, 19, 37]
Run Code Online (Sandbox Code Playgroud)
索引的成本为 O(n),更新的成本为 O(logn)。heapify是另一种解决方案,但效率低于_siftup或_siftdown。
但是_siftup和_siftdown是 heapq 中的受保护成员,因此不建议从外部访问它们。
那么有没有更好更有效的方法来解决这个问题呢?这种情况的最佳实践?
感谢阅读,我真的很感激能帮助我。:)
已经参考了heapq python - how to modify values for which heap is sorted,但没有回答我的问题
@cglacet 的答案是完全错误的,但看起来非常合法。\n他提供的代码片段完全损坏了!它也很难阅读。\n_siftup()被调用 n//2 次,heapify()因此它不可能比_siftup()它本身更快。
要回答原来的问题,没有更好的办法了。如果您担心方法是私有的,请创建自己的方法来执行相同的操作。
\n我唯一同意的是,如果您长时间不需要从堆中读取数据,那么一旦需要它们,偷懒可能会有所帮助。heapify()问题是您是否应该为此使用堆。
让我们回顾一下他的代码片段中的问题:
\n该heapify()函数在“更新”运行中被多次调用。\n导致此问题的错误链如下:
heap_fix,但也期待着heap,同样的情况也适用sortself.sort总是False,则self.heap总是True__getitem__()and __setitem__(),每次赋值或读取内容时都会调用 and _siftup()(_siftdown()注意:这两个在 C 中没有被调用,所以它们使用__getitem__()and __setitem__())self.heapisTrue和__getitem__()are__setitem__()被调用,则_repair()每次都会调用该函数_siftup()或siftdown()交换元素。但调用heapify()是在 C 中完成的,因此__getitem__()不会被调用,并且不会陷入无限循环self.sort所以调用它,就像他尝试做的那样,会失败nb_updates次,而不是他声称的 1:1我修正了这个例子,我尽力验证它,但我们都会犯错误。请自行检查一下。
\nimport time\nimport random\n\nfrom heapq import _siftup, _siftdown, heapify, heappop\n\nclass UpdateHeap(list):\n def __init__(self, values):\n super().__init__(values)\n heapify(self)\n\n def update(self, index, value):\n old, self[index] = self[index], value\n if value > old:\n _siftup(self, index)\n else:\n _siftdown(self, 0, index)\n\n def pop(self):\n return heappop(self)\n\nclass SlowHeap(list):\n def __init__(self, values):\n super().__init__(values)\n heapify(self)\n self._broken = False\n \n # Solution 2 and 3) repair using sort/heapify in a lazy way:\n def update(self, index, value):\n super().__setitem__(index, value)\n self._broken = True\n \n def __getitem__(self, index):\n if self._broken:\n self._repair()\n self._broken = False\n return super().__getitem__(index)\n\n def _repair(self):\n ...\n\n def pop(self):\n if self._broken:\n self._repair()\n return heappop(self)\n\nclass HeapifyHeap(SlowHeap):\n\n def _repair(self):\n heapify(self)\n\n\nclass SortHeap(SlowHeap):\n\n def _repair(self):\n self.sort()\n\ndef rand_update(heap):\n index = random.randint(0, len(heap)-1)\n new_value = random.randint(max_int+1, max_int*2)\n heap.update(index, new_value)\n \ndef rand_updates(update_count, heap):\n for i in range(update_count):\n rand_update(heap)\n heap[0]\n \ndef verify(heap):\n last = None\n while heap:\n item = heap.pop()\n if last is not None and item < last:\n raise RuntimeError(f"{item} was smaller than last {last}")\n last = item\n\ndef run_perf_test(update_count, data, heap_class):\n test_heap = heap_class(data)\n t0 = time.time()\n rand_updates(update_count, test_heap)\n perf = (time.time() - t0)*1e3\n verify(test_heap)\n return perf\n\n\nresults = []\nmax_int = 500\nupdate_count = 100\n\nfor i in range(2, 7):\n test_size = 10**i\n test_data = [random.randint(0, max_int) for _ in range(test_size)]\n\n perf = run_perf_test(update_count, test_data, UpdateHeap)\n results.append((test_size, "update", perf))\n \n perf = run_perf_test(update_count, test_data, HeapifyHeap)\n results.append((test_size, "heapify", perf))\n\n perf = run_perf_test(update_count, test_data, SortHeap)\n results.append((test_size, "sort", perf))\n\nimport pandas as pd\nimport seaborn as sns\n\ndtf = pd.DataFrame(results, columns=["heap size", "method", "duration (ms)"])\nprint(dtf)\n\nsns.lineplot(\n data=dtf, \n x="heap size", \n y="duration (ms)", \n hue="method",\n)\n\nRun Code Online (Sandbox Code Playgroud)\n_siftdown()正如您所看到的,使用和 的“更新”方法_siftup()渐近更快。
您应该知道您的代码的作用以及运行需要多长时间。如果有疑问,请检查\xc5\xafd。@cglaced 检查了执行需要多长时间,但他没有询问应该需要多长时间。如果他这样做了,他会发现两者不匹配。其他人也上当了。
\n heap size method duration (ms)\n0 100 update 0.219107\n1 100 heapify 0.412703\n2 100 sort 0.242710\n3 1000 update 0.198841\n4 1000 heapify 2.947330\n5 1000 sort 0.605345\n6 10000 update 0.203848\n7 10000 heapify 32.759190\n8 10000 sort 4.621506\n9 100000 update 0.348568\n10 100000 heapify 327.646971\n11 100000 sort 49.481153\n12 1000000 update 0.256062\n13 1000000 heapify 3475.244761\n14 1000000 sort 1106.570005\nRun Code Online (Sandbox Code Playgroud)\n\n
TL;DR使用heapify。
您必须记住的一件重要事情是理论复杂性和性能是两个不同的东西(即使它们是相关的)。换句话说,实施也很重要。渐近复杂性为您提供了一些下限,您可以将其视为保证,例如 O(n) 中的算法确保在最坏的情况下,您将执行许多与输入大小呈线性关系的指令。这里有两件重要的事情:
根据您考虑的主题/问题,第一点可能非常重要。在某些领域中,隐藏在渐近复杂性中的常量非常大,以至于您甚至无法构建大于常量的输入(或者考虑该输入是不现实的)。情况并非如此,但这是您始终必须牢记的事情。
给出这两个观察结果,你真的不能说:实现 B 比 A 快,因为 A 派生自 O(n) 算法而 B 派生自 O(log n) 算法。即使这是一个很好的论据,但它并不总是足够的。当所有输入发生的可能性相同时,理论复杂性特别适合比较算法。换句话说,当您的算法非常通用时。
如果您知道您的用例和输入是什么,您可以直接测试性能。使用测试和渐近复杂度将使您对算法的执行方式有一个很好的了解(在极端情况下和任意实际情况下)。
话虽如此,让我们在以下将实现三种不同策略的类上运行一些性能测试(这里实际上有四种策略,但Invalidate 和 Reinsert在您的情况下似乎不正确,因为您会使每个项目无效的次数与你看到一个给定的词)。我将包括我的大部分代码,以便您可以仔细检查我没有搞砸(您甚至可以检查完整的笔记本):
from heapq import _siftup, _siftdown, heapify, heappop
class Heap(list):
def __init__(self, values, sort=False, heap=False):
super().__init__(values)
heapify(self)
self._broken = False
self.sort = sort
self.heap = heap or not sort
# Solution 1) repair using the knowledge we have after every update:
def update(self, key, value):
old, self[key] = self[key], value
if value > old:
_siftup(self, key)
else:
_siftdown(self, 0, key)
# Solution 2 and 3) repair using sort/heapify in a lazzy way:
def __setitem__(self, key, value):
super().__setitem__(key, value)
self._broken = True
def __getitem__(self, key):
if self._broken:
self._repair()
self._broken = False
return super().__getitem__(key)
def _repair(self):
if self.sort:
self.sort()
elif self.heap:
heapify(self)
# … you'll also need to delegate all other heap functions, for example:
def pop(self):
self._repair()
return heappop(self)
Run Code Online (Sandbox Code Playgroud)
我们可以首先检查所有三种方法是否有效:
data = [10, 5, 18, 2, 37, 3, 8, 7, 19, 1]
heap = Heap(data[:])
heap.update(8, 22)
heap.update(7, 4)
print(heap)
heap = Heap(data[:], sort_fix=True)
heap[8] = 22
heap[7] = 4
print(heap)
heap = Heap(data[:], heap_fix=True)
heap[8] = 22
heap[7] = 4
print(heap)
Run Code Online (Sandbox Code Playgroud)
然后我们可以使用以下函数运行一些性能测试:
import time
import random
def rand_update(heap, lazzy_fix=False, **kwargs):
index = random.randint(0, len(heap)-1)
new_value = random.randint(max_int+1, max_int*2)
if lazzy_fix:
heap[index] = new_value
else:
heap.update(index, new_value)
def rand_updates(n, heap, lazzy_fix=False, **kwargs):
for _ in range(n):
rand_update(heap, lazzy_fix)
def run_perf_test(n, data, **kwargs):
test_heap = Heap(data[:], **kwargs)
t0 = time.time()
rand_updates(n, test_heap, **kwargs)
test_heap[0]
return (time.time() - t0)*1e3
results = []
max_int = 500
nb_updates = 1
for i in range(3, 7):
test_size = 10**i
test_data = [random.randint(0, max_int) for _ in range(test_size)]
perf = run_perf_test(nb_updates, test_data)
results.append((test_size, "update", perf))
perf = run_perf_test(nb_updates, test_data, lazzy_fix=True, heap_fix=True)
results.append((test_size, "heapify", perf))
perf = run_perf_test(nb_updates, test_data, lazzy_fix=True, sort_fix=True)
results.append((test_size, "sort", perf))
Run Code Online (Sandbox Code Playgroud)
结果如下:
import pandas as pd
import seaborn as sns
dtf = pd.DataFrame(results, columns=["heap size", "method", "duration (ms)"])
print(dtf)
sns.lineplot(
data=dtf,
x="heap size",
y="duration (ms)",
hue="method",
)
Run Code Online (Sandbox Code Playgroud)
从这些测试中我们可以看到,这heapify似乎是最合理的选择,它在最坏的情况下具有不错的复杂性:O(n) 并且在实践中表现更好。另一方面,调查其他选项可能是个好主意(例如拥有专用于该特定问题的数据结构,例如使用垃圾箱将单词放入其中,然后将它们从垃圾箱移动到下一个看起来像可能的轨道调查)。
重要说明:这种情况(更新与读取比率为 1:1)对heapify和sort解决方案都不利。所以,如果你管理有AK:1分的比例,这一结论将更加清晰(你可以替换nb_updates = 1与nb_updates = k上面的代码)。
数据框详细信息:
heap size method duration in ms
0 1000 update 0.435114
1 1000 heapify 0.073195
2 1000 sort 0.101089
3 10000 update 1.668930
4 10000 heapify 0.480175
5 10000 sort 1.151085
6 100000 update 13.194084
7 100000 heapify 4.875898
8 100000 sort 11.922121
9 1000000 update 153.587103
10 1000000 heapify 51.237106
11 1000000 sort 145.306110
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
1288 次 |
| 最近记录: |