如何最大化二元组合的缓存命中率?

ken*_*ken 16 python algorithm caching combinations

我的问题很简单,但我觉得很难说清楚,所以请允许我一步一步解释。

\n

假设我有N项目和N相应的索引。\n每个项目都可以使用相应的索引加载。

\n
def load_item(index: int) -> ItemType:\n    # Mostly just reading, but very slow.\n    return item\n
Run Code Online (Sandbox Code Playgroud)\n

我还有一个函数,它接受两个(加载的)项目并计算分数。

\n
def calc_score(item_a: ItemType, item_b: ItemType) -> ScoreType:\n    # Much faster than load function.\n    return score\n
Run Code Online (Sandbox Code Playgroud)\n

注意calc_score(a, b) == calc_score(b, a)

\n

我想要做的是计算所有 2 项组合的分数,并找到(至少)一个给出最高分数的组合。

\n

这可以按如下方式实现:

\n
def dumb_solution(n: int) -> Tuple[int, int]:\n    best_score = 0\n    best_combination = None\n    for index_a, index_b in itertools.combinations(range(n), 2):\n        item_a = load_item(index_a)\n        item_b = load_item(index_b)\n        score = calc_score(item_a, item_b)\n        if score > best_score:\n            best_score = score\n            best_combination = (index_a, index_b)\n    return best_combination\n
Run Code Online (Sandbox Code Playgroud)\n

然而,这个解决方案调用了load_item函数2*C(N,2) = N*(N-1)次数,这是这个函数的瓶颈。

\n

这可以通过使用缓存来解决。\n但是不幸的是,项目太大,不可能将所有项目保留在内存中。\n因此,我们需要使用大小有限的缓存。

\n
from functools import lru_cache\n\n@lru_cache(maxsize=M)\ndef load(index: int) -> ItemType:\n    # Very slow process.\n    return item\n
Run Code Online (Sandbox Code Playgroud)\n

请注意,M(缓存大小)远小于N(大约N // 10N // 2)。

\n

问题是典型的组合顺序对于 LRU 缓存来说并不理想。

\n

例如,当 时N=6, M=3itertools.combinations生成以下序列,并且调用的次数load_item为 17。

\n
[\n    (0, 1),  # 1, 2\n    (0, 2),  # -, 3\n    (0, 3),  # -, 4\n    (0, 4),  # -, 5\n    (0, 5),  # -, 6\n    (1, 2),  # 7, 8\n    (1, 3),  # -, 9\n    (1, 4),  # -, 10\n    (1, 5),  # -, 11\n    (2, 3),  # 12, 13\n    (2, 4),  # -, 14\n    (2, 5),  # -, 15\n    (3, 4),  # 16, 17\n    (3, 5),  # -, -\n    (4, 5),  # -, -\n]\n
Run Code Online (Sandbox Code Playgroud)\n

但是,如果我将上述顺序重新排列如下,则调用次数将为 10。

\n
[\n    (0, 1),  # 1, 2\n    (0, 2),  # -, 3\n    (1, 2),  # -, -\n    (0, 3),  # -, 4\n    (2, 3),  # -, -\n    (0, 4),  # -, 5\n    (3, 4),  # -, -\n    (0, 5),  # -, 6\n    (4, 5),  # -, -\n    (1, 4),  # 7, -\n    (1, 5),  # -, -\n    (1, 3),  # -, 8\n    (3, 5),  # -, -\n    (2, 5),  # 9, -\n    (2, 4),  # -, 10\n]\n
Run Code Online (Sandbox Code Playgroud)\n

问题:

\n

如何生成一系列 2 项组合以最大化缓存命中率?

\n
\n

我尝试过的:

\n

我提出的解决方案是优先考虑缓存中已有的项目。

\n
from collections import OrderedDict\n\n\ndef prioritizes_item_already_in_cache(n, cache_size):\n    items = list(itertools.combinations(range(n), 2))\n    cache = OrderedDict()\n    reordered = []\n\n    def update_cache(x, y):\n        cache[x] = cache[y] = None\n        cache.move_to_end(x)\n        cache.move_to_end(y)\n        while len(cache) > cache_size:\n            cache.popitem(last=False)\n\n    while items:\n        # Find a pair where both are cached.\n        for i, (a, b) in enumerate(items):\n            if a in cache and b in cache:\n                reordered.append((a, b))\n                update_cache(a, b)\n                del items[i]\n                break\n        else:\n            # Find a pair where one of them is cached.\n            for i, (a, b) in enumerate(items):\n                if a in cache or b in cache:\n                    reordered.append((a, b))\n                    update_cache(a, b)\n                    del items[i]\n                    break\n            else:\n                # Cannot find item in cache.\n                a, b = items.pop(0)\n                reordered.append((a, b))\n                update_cache(a, b)\n\n    return reordered\n
Run Code Online (Sandbox Code Playgroud)\n

对于N=100, M=10,此序列导致 1660 个调用,大约是典型序列的 1/3。因为N=100, M=50只有 155 个呼叫。所以我想我可以说这是一个有前途的方法。

\n

不幸的是,这个函数对于大的 来说太慢而且没用N。\n我没能完成 for N=1000,但实际数据有几万。\n而且,它没有考虑到没有缓存项时如何选择项\n因此,即使它很快,它是否是理论上的最佳解决方案也是值得怀疑的(所以请注意我的问题不是如何使上述函数更快)。

\n

(已编辑)这是完整的代码,包括每个人的答案以及测试和基准代码。

\n
import functools\nimport itertools\nimport math\nimport time\nfrom collections import Counter, OrderedDict\nfrom itertools import chain, combinations, product\nfrom pathlib import Path\nfrom typing import Callable, Iterable, Tuple\n\nimport joblib\nimport matplotlib.pyplot as plt\nimport numpy as np\nimport pandas as pd\nfrom PIL import Image, ImageDraw\n\nItemType = int\nScoreType = int\n\n\ndef load_item(index: int) -> ItemType:\n    return int(index)\n\n\ndef calc_score(item_a: ItemType, item_b: ItemType) -> ScoreType:\n    return abs(item_a - item_b)\n\n\nclass LRUCacheWithCounter:\n    def __init__(self, maxsize: int):\n        def wrapped_func(key):\n            self.load_count += 1\n            return load_item(key)\n\n        self.__cache = functools.lru_cache(maxsize=maxsize)(wrapped_func)\n        self.load_count = 0\n\n    def __call__(self, key: int) -> int:\n        return self.__cache(key)\n\n\ndef basic_loop(iterator: Iterable[Tuple[int, int]], cached_load: Callable[[int], int]):\n    best_score = 0\n    best_combination = None\n    for i, j in iterator:\n        a = cached_load(i)\n        b = cached_load(j)\n        score = calc_score(a, b)\n        if score > best_score:\n            best_score = score\n            best_combination = (i, j)\n    return best_score, best_combination\n\n\ndef baseline(n, _):\n    return itertools.combinations(range(n), 2)\n\n\ndef prioritizes(n, cache_size):\n    items = list(itertools.combinations(range(n), 2))\n    cache = OrderedDict()\n    reordered = []\n\n    def update_cache(x, y):\n        cache[x] = cache[y] = None\n        cache.move_to_end(x)\n        cache.move_to_end(y)\n        while len(cache) > cache_size:\n            cache.popitem(last=False)\n\n    while items:\n        # Find a pair where both are cached.\n        for i, (a, b) in enumerate(items):\n            if a in cache and b in cache:\n                reordered.append((a, b))\n                update_cache(a, b)\n                del items[i]\n                break\n        else:\n            # Find a pair where one of them is cached.\n            for i, (a, b) in enumerate(items):\n                if a in cache or b in cache:\n                    reordered.append((a, b))\n                    update_cache(a, b)\n                    del items[i]\n                    break\n            else:\n                # Cannot find item in cache.\n                a, b = items.pop(0)\n                reordered.append((a, b))\n                update_cache(a, b)\n\n    return reordered\n\n\ndef Matt_solution(n: int, cache_size: int) -> Iterable[Tuple[int, int]]:\n    dest = []\n\n    def findPairs(lo1: int, n1: int, lo2: int, n2: int):\n        if n1 < 1 or n2 < 1:\n            return\n        if n1 == 1:\n            for i in range(max(lo1 + 1, lo2), lo2 + n2):\n                dest.append((lo1, i))\n        elif n2 == 1:\n            for i in range(lo1, min(lo1 + n1, lo2)):\n                dest.append((i, lo2))\n        elif n1 >= n2:\n            half = n1 // 2\n            findPairs(lo1, half, lo2, n2)\n            findPairs(lo1 + half, n1 - half, lo2, n2)\n        else:\n            half = n2 // 2\n            findPairs(lo1, n1, lo2, half)\n            findPairs(lo1, n1, lo2 + half, n2 - half)\n\n    findPairs(0, n, 0, n)\n    return dest\n\n\ndef Kelly_solution(n: int, cache_size: int) -> Iterable[Tuple[int, int]]:\n    k = cache_size // 2\n    r = range(n)\n    return chain.from_iterable(combinations(r[i : i + k], 2) if i == j else product(r[i : i + k], r[j : j + k]) for i in r[::k] for j in r[i::k])\n\n\ndef Kelly_solution2(n: int, cache_size: int) -> Iterable[Tuple[int, int]]:\n    k = cache_size - 2\n    r = range(n)\n    return chain.from_iterable(combinations(r[i : i + k], 2) if i == j else product(r[i : i + k], r[j : j + k]) for i in r[::k] for j in r[i::k])\n\n\ndef diagonal_block(lower, upper):\n    for i in range(lower, upper + 1):\n        for j in range(i + 1, upper + 1):\n            yield i, j\n\n\ndef strip(i_lower, i_upper, j_lower, j_upper):\n    for i in range(i_lower, i_upper + 1):\n        for j in range(j_lower, j_upper + 1):\n            yield i, j\n\n\ndef btilly_solution(n: int, cache_size: int):\n    i_lower = 0\n    i_upper = n - 1\n    k = cache_size - 2\n    is_asc = True\n    while i_lower <= i_upper:\n        # Handle a k*k block first. At the end that is likely loaded.\n        if is_asc:\n            upper = min(i_lower + k - 1, i_upper)\n            yield from diagonal_block(i_lower, upper)\n            j_lower = i_lower\n            j_upper = upper\n            i_lower = upper + 1\n        else:\n            lower = max(i_lower, i_upper - k + 1)\n            yield from diagonal_block(lower, i_upper)\n            j_lower = lower\n            j_upper = i_upper\n            i_upper = lower - 1\n        yield from strip(i_lower, i_upper, j_lower, j_upper)\n        is_asc = not is_asc\n\n\ndef btilly_solution2(n: int, cache_size: int):\n    k = cache_size - 2\n    for top in range(0, n, k):\n        bottom = top + k\n        # Diagonal part.\n        for y in range(top, min(bottom, n)):  # Y-axis Top to Bottom\n            for x in range(y + 1, min(bottom, n)):  # X-axis Left to Right\n                yield y, x\n        # Strip part.\n        # Stripping right to left works well when cache_size is very small, but makes little difference when it is not.\n        for x in range(n - 1, bottom - 1, -1):  # X-axis Right to Left\n            for y in range(top, min(bottom, n)):  # Y-axis Top to Bottom\n                yield y, x\n\n\ndef btilly_solution3(n: int, cache_size: int):\n    k = cache_size - 2\n    r = range(n)\n    for i in r[::k]:\n        yield from combinations(r[i : i + k], 2)\n        yield from product(r[i + k :], r[i : i + k])\n\n\ndef btilly_solution4(n: int, cache_size: int):\n    def parts():\n        k = cache_size - 2\n        r = range(n)\n        for i in r[::k]:\n            yield combinations(r[i : i + k], 2)\n            yield product(r[i + k :], r[i : i + k])\n\n    return chain.from_iterable(parts())\n\n\ndef plot(df, series, ignore, y, label, title):\n    df = df[df["name"].isin(series)]\n    # plt.figure(figsize=(10, 10))\n    for name, group in df.groupby("name"):\n        plt.plot(group["n"], group[y], label=name)\n\n    y_max = df[~df["name"].isin(ignore)][y].max()\n    plt.ylim(0, y_max * 1.1)\n\n    plt.xlabel("n")\n    plt.ylabel(label)\n    plt.title(title)\n    plt.legend(loc="upper left")\n    plt.tight_layout()\n    plt.grid()\n    plt.show()\n\n\ndef run(func, n, cache_ratio, output_dir: Path):\n    cache_size = int(n * cache_ratio / 100)\n    output_path = output_dir / f"{n}_{cache_ratio}_{func.__name__}.csv"\n    if output_path.exists():\n        return\n\n    started = time.perf_counter()\n    for a, b in func(n, cache_size):\n        pass\n    elapsed_iterate = time.perf_counter() - started\n\n    # test_combinations(func(n, cache_size), n)\n\n    started = time.perf_counter()\n    cache = LRUCacheWithCounter(cache_size)\n    basic_loop(iterator=func(n, cache_size), cached_load=cache)\n    elapsed_cache = time.perf_counter() - started\n\n    output_path.write_text(f"{func.__name__},{n},{cache_ratio},{cache_size},{cache.load_count},{elapsed_iterate},{elapsed_cache}")\n\n\ndef add_lower_bound(df):\n    def calc_lower_bound(ni, mi):\n        n = ni\n        m = n * mi // 100\n        return m + math.ceil((math.comb(n, 2) - math.comb(m, 2)) / (m - 1))\n\n    return pd.concat(\n        [\n            df,\n            pd.DataFrame(\n                [\n                    {"name": "lower_bound", "n": ni, "m": mi, "count": calc_lower_bound(ni, mi)}\n                    for ni, mi in itertools.product(df["n"].unique(), df["m"].unique())\n                ]\n            ),\n        ]\n    )\n\n\ndef benchmark(output_dir: Path):\n    log_dir = output_dir / "log"\n    log_dir.mkdir(parents=True, exist_ok=True)\n\n    candidates = [\n        baseline,\n        prioritizes,\n        Matt_solution,\n        Kelly_solution,\n        Kelly_solution2,\n        btilly_solution,\n        btilly_solution2,\n        btilly_solution3,\n        btilly_solution4,\n    ]\n\n    nc = np.linspace(100, 500, num=9).astype(int)\n    # nc = np.linspace(500, 10000, num=9).astype(int)[1:]\n    # nc = np.linspace(10000, 100000, num=9).astype(int).tolist()[1:]\n    print(nc)\n\n    mc = np.linspace(10, 50, num=2).astype(int)\n    print(mc)\n\n    joblib.Parallel(n_jobs=1, verbose=5, batch_size=1)([joblib.delayed(run)(func, ni, mi, log_dir) for ni in nc for mi in mc for func in candidates])\n\n\ndef plot_graphs(output_dir: Path):\n    log_dir = output_dir / "log"\n\n    results = []\n    for path in log_dir.glob("*.csv"):\n        results.append(path.read_text().strip())\n    (output_dir / "stat.csv").write_text("\\n".join(results))\n\n    df = pd.read_csv(output_dir / "stat.csv", header=None, names=["name", "n", "m", "size", "count", "time", "time_full"])\n    df = add_lower_bound(df)\n    df = df.sort_values(["name", "n", "m"])\n\n    for m in [10, 50]:\n        plot(\n            df[df["m"] == m],\n            series=[\n                baseline.__name__,\n                prioritizes.__name__,\n                Matt_solution.__name__,\n                Kelly_solution.__name__,\n                Kelly_solution2.__name__,\n                btilly_solution.__name__,\n                "lower_bound",\n            ],\n            ignore=[\n                baseline.__name__,\n                prioritizes.__name__,\n            ],\n            y="count",\n            label="load count",\n            title=f"cache_size = {m}% of N",\n        )\n\n    plot(\n        df[df["m"] == 10],\n        series=[\n            baseline.__name__,\n            prioritizes.__name__,\n            Matt_solution.__name__,\n            Kelly_solution.__name__,\n            Kelly_solution2.__name__,\n            btilly_solution.__name__,\n            btilly_solution2.__name__,\n            btilly_solution3.__name__,\n            btilly_solution4.__name__,\n        ],\n        ignore=[\n            prioritizes.__name__,\n            Matt_solution.__name__,\n        ],\n        y="time",\n        label="time (sec)",\n        title=f"cache_size = {10}% of N",\n    )\n\n\nclass LRUCacheForTest:\n    def __init__(self, maxsize: int):\n        self.cache = OrderedDict()\n        self.maxsize = maxsize\n        self.load_count = 0\n\n    def __call__(self, key: int) -> int:\n        if key in self.cache:\n            value = self.cache[key]\n            self.cache.move_to_end(key)\n        else:\n            if len(self.cache) == self.maxsize:\n                self.cache.popitem(last=False)\n            value = load_item(key)\n            self.cache[key] = value\n            self.load_count += 1\n        return value\n\n    def hit(self, i, j):\n        count = int(i in self.cache)\n        self(i)\n        count += int(j in self.cache)\n        self(j)\n        return count\n\n\ndef visualize():\n    # Taken from https://stackoverflow.com/a/77024514/18125313 and modified.\n    n, m = 100, 30\n    func = btilly_solution2\n\n    pairs = func(n, m)\n    cache = LRUCacheForTest(m)\n\n    # Create the images, save as animated png.\n    images = []\n    s = 5\n    img = Image.new("RGB", (s * n, s * n), (255, 255, 255))\n    draw = ImageDraw.Draw(img)\n\n    colors = [(255, 0, 0), (255, 255, 0), (0, 255, 0)]\n    for step, (i, j) in enumerate(pairs):\n        draw.rectangle((s * j, s * i, s * j + s - 2, s * i + s - 2), colors[cache.hit(i, j)])\n        if not step % 17:\n            images.append(img.copy())\n\n    images += [img] * 40\n\n    images[0].save(f"{func.__name__}_{m}.gif", save_all=True, append_images=images[1:], optimize=False, duration=30, loop=0)\n\n\ndef test_combinations(iterator: Iterable[Tuple[int, int]], n: int):\n    # Note that this function is not suitable for large N.\n    expected = set(frozenset(pair) for pair in itertools.combinations(range(n), 2))\n    items = list(iterator)\n    actual = set(frozenset(pair) for pair in items)\n    assert len(actual) == len(items), f"{[item for item, count in Counter(items).items() if count > 1]}"\n    assert actual == expected, f"dup={actual - expected}, missing={expected - actual}"\n\n\ndef test():\n    n = 100  # N\n    cache_size = 30  # M\n\n    def run(func):\n        func(n, cache_size)\n\n        # Measure generation performance.\n        started = time.perf_counter()\n        for a, b in func(n, cache_size):\n            pass\n        elapsed = time.perf_counter() - started\n\n        # Test generated combinations.\n        test_combinations(func(n, cache_size), n)\n\n        # Measure cache hit (load count) performance.\n        cache = LRUCacheWithCounter(cache_size)\n        _ = basic_loop(iterator=func(n, cache_size), cached_load=cache)\n        print(f"{func.__name__}: {cache.load_count=}, {elapsed=}")\n\n    candidates = [\n        baseline,\n        prioritizes,\n        Matt_solution,\n        Kelly_solution,\n        Kelly_solution2,\n        btilly_solution,\n        btilly_solution2,\n        btilly_solution3,\n        btilly_solution4,\n    ]\n    for f in candidates:\n        run(f)\n\n\ndef main():\n    test()\n    visualize()\n\n    output_dir = Path("./temp2")\n    benchmark(output_dir)\n    plot_graphs(output_dir)\n\n\nif __name__ == "__main__":\n    main()\n
Run Code Online (Sandbox Code Playgroud)\n

basic_loop我对您不使用上述测试代码或更改或的行为没有问题LRUCacheWithCounter

\n

附加说明:

\n
    \n
  • 无法使用邻居分数来修剪分数计算。
  • \n
  • 无法仅使用项目的一部分来修剪分数计算。
  • \n
  • 不可能猜测最佳组合在哪里。
  • \n
  • 使用更快的媒体是一种选择,但我已经达到了极限,所以我正在寻找一种软件解决方案。
  • \n
\n

感谢您读完这篇长文。

\n
\n

编辑:

\n

感谢 btilly 的回答以及 Kelly 可视化的帮助,我得出的结论是 btilly 的解决方案是最好的并且(可能)最优的解决方案。

\n

这是一个理论上的解释(虽然我数学不太好,所以可能是错误的)。

\n
\n

N表示索引数量、M缓存大小和C组合数量(与 相同math.comb)。

\n

考虑这样一种情况:缓存已满,在不加载的情况下无法生成进一步的组合。\n如果此时添加新索引,则唯一可以生成的组合是新添加的索引与缓存中剩余索引的组合。\n此模式适用于每次后续迭代。\n因此,当缓存已满时,每次加载可生成的最大组合数为M - 1

\n

如果缓存未满,此逻辑也成立。\n如果M\'索引当前在缓存中,则下一个索引最多可以生成M\'组合。\n后续索引最多可以生成组合M\' + 1,依此类推。\n总共, 最多C(M,2),在缓存满之前

\n

因此,要生成C(N,2)组合,至少M需要加载来填充缓存,至少(C(N,2) - C(M,2)) / (M - 1)需要在缓存被填充后进行加载。

\n

由上可知,该问题的负载计算复杂度为\xce\xa9(N^2 / M)

\n
\n

我已将此公式绘制为下图中的 a lower_bound。\n请注意,这只是一个下限,并不能保证它实际上可以实现。

\n

顺便说一句,Kelly 的解决方案需要进行配置k以最大化其性能。\n对于M = 50% of N,它大约是M * 2/3。\n对于M = 30% of N,它大约是M * 5/6。\n虽然我不知道如何计算它。\作为一般配置,我在下图k = M - 2中使用(这不是最好的,但相对较好) 。Kelly_solution2

\n

为了M = 10% of N

\n

n_to_load_count_graph_10

\n

为了M = 50% of N

\n

n_to_load_count_graph_50

\n

请注意,在这些图中,它看起来像O(N),但这是因为我是M根据 确定的NM不改变时,O(N^2)如上所述。

\n

这是一个动画,可视化 的缓存命中率btilly_solution2,由 Kelly 代码的修改版本组成。\n每个像素代表一个组合,红色代表加载两个索引的组合,黄色代表加载一个索引,绿色代表加载两个索引的组合。两个索引均未加载。

\n

可视化_of_btilly_solution2

\n

此外,由于我正在寻找最佳序列,因此执行时间并不重要。\n但为了以防万一有人好奇,这里是执行时间的比较(仅迭代)。

\n

bti*_*lly 11

这是一个简单的方法,它依赖于缓存并在您的基准测试中获得 230。

def diagonal_block (lower, upper):
    for i in range(lower, upper + 1):
        for j in range(i, upper + 1):
            yield (i, j)

def strip (i_lower, i_upper, j_lower, j_upper):
    for i in range(i_lower, i_upper+1):
        for j in range (j_lower, j_upper + 1):
            yield (i, j)

# def your_solution_here(n: int, cache_size: int) -> Iterable[Tuple[int, int]]:
def your_solution_here(n: int, cache_size: int):
    i_lower = 0
    i_upper = n-1
    k = cache_size - 2
    is_asc = True
    while i_lower <= i_upper:
        # Handle a k*k block first. At the end that is likely loaded.
        if is_asc:
            upper = min(i_lower + k - 1, i_upper)
            yield from diagonal_block(i_lower, upper)
            j_lower = i_lower
            j_upper = upper
            i_lower = upper + 1
        else:
            lower = max(i_lower, i_upper - k + 1)
            yield from diagonal_block(lower, i_upper)
            j_lower = lower
            j_upper = i_upper
            i_upper = lower - 1
        yield from strip(i_lower, i_upper, j_lower, j_upper)
        is_asc = not is_asc
Run Code Online (Sandbox Code Playgroud)

关于我如何想到这个的评论。

我们想要将一组对象与其他所有未比较的对象进行比较。该组应该是除了一个之外适合缓存的所有内容。

因此,我们从第一个对象开始k,将它们相互比较,然后沿着条带一直走到最后。

现在我们需要第二组。好吧,我们已经有了最后一个对象,不需要剩下的了。所以我们k从末尾取出对象,将其作为一个组。将组与其自身进行比较,然后沿着条带前进到原始组之外的第一个对象。

现在反转方向,依此类推。

在所有点上,i_lower代表仍需要比较的第一个对象,i_upper代表最后一个。如果我们继续前进,我们会k从 开始获取对象i_lower。如果我们要倒退,我们就k从 开始获取物体i_upper并倒退。

当我实施它时,有两个复杂的情况。首先是当我们在中间相遇时我们必须担心边缘条件。第二个是我们可能必须在两个方向上进行剥离。

我选择只进行带状上升。这实际上是一个错误。在大多数上升负载中,我没有获得缓存中的第一个元素。哎呀。但它仍然相当不错。

  • 谢谢您的回答。您的代码运行良好,但它也会生成“i == j”的情况。也就是说,这相当于“itertools.combinations_with_replacement”。为了使其等同于“itertools.combinations”,我必须更改“diagonal_block”的内部循环以从“i + 1”开始。除此之外,我已经验证没有生成丢失或重复的组合。 (2认同)

Kel*_*ndy 7

进入 k\xc3\x97k 块。当 k=25 时,它在基准测试中获得 372 个负载。当 k=cache_size/2 时,它获得 570 次加载。

\n
from itertools import combinations, product, chain\n\ndef your_solution_here(n: int, cache_size: int) -> Iterable[Tuple[int, int]]:\n    k = cache_size // 2\n    r = range(n)\n    return list(chain.from_iterable(\n        combinations(r[i:i+k], 2) if i == j else product(r[i:i+k], r[j:j+k])\n        for i in r[::k]\n        for j in r[i::k]\n    ))\n
Run Code Online (Sandbox Code Playgroud)\n


Mat*_*ans 5

下面是一个简单的递归定义的排序,它不依赖于缓存大小,并且在基准测试中获得 566 次负载:

def cache_oblivious(n: int, cache_size: int) -> Iterable[Tuple[int, int]]:
    dest = []
    def findPairs(lo1: int, n1: int, lo2: int, n2: int):
        if n1 < 1 or n2 < 1:
            return
        if n1 == 1:
            for i in range(max(lo1+1,lo2), lo2+n2):
                dest.append((lo1, i))
        elif n2 == 1:
            for i in range(lo1, min(lo1+n1, lo2)):
                dest.append((i, lo2))
        elif n1 >= n2:
            half = n1//2
            findPairs(lo1, half, lo2, n2)
            findPairs(lo1+half, n1-half, lo2, n2)
        else:
            half = n2//2
            findPairs(lo1, n1, lo2, half)
            findPairs(lo1, n1, lo2+half, n2-half)
    findPairs(0,n,0,n)
    return dest
Run Code Online (Sandbox Code Playgroud)