加速正态分布概率质量分配

pir*_*pir 5 python performance numpy probability scipy

我们有P avg 的N 个用户。每个用户的点数,其中每个点都是 0 到 1 之间的单个值。我们需要使用已知密度为 0.05 的正态分布来分布每个点的质量,因为这些点具有一些不确定性。此外,我们需要将质量包裹在 0 和 1 左右,例如,0.95 处的点也将分配大约 0 的质量。我在下面提供了一个工作示例,它将正态分布划分为D=50 个容器。该示例使用 Python 类型模块,但如果您愿意,可以忽略它。

from typing import List, Any
import numpy as np
import scipy.stats
import matplotlib.pyplot as plt

D = 50
BINS: List[float] = np.linspace(0, 1, D + 1).tolist()


def probability_mass(distribution: Any, x0: float, x1: float) -> float:
    """
    Computes the area under the distribution, wrapping at 1.
    The wrapping is done by adding the PDF at +- 1.
    """
    assert x1 > x0
    return (
        (distribution.cdf(x1) - distribution.cdf(x0))
        + (distribution.cdf(x1 + 1) - distribution.cdf(x0 + 1))
        + (distribution.cdf(x1 - 1) - distribution.cdf(x0 - 1))
    )


def point_density(x: float) -> List[float]:
    distribution: Any = scipy.stats.norm(loc=x, scale=0.05)
    density: List[float] = []
    for i in range(D):
        density.append(probability_mass(distribution, BINS[i], BINS[i + 1]))
    return density


def user_density(points: List[float]) -> Any:

    # Find the density of each point
    density: Any = np.array([point_density(p) for p in points])

    # Combine points and normalize
    combined = density.sum(axis=0)
    return combined / combined.sum()


if __name__ == "__main__":

    # Example for one user
    data: List[float] = [.05, .3, .5, .5]
    density = user_density(data)

    # Example for multiple users (N = 2)
    print([user_density(x) for x in [[.3, .5], [.7, .7, .7, .9]]])

    ### NB: THE REMAINING CODE IS FOR ILLUSTRATION ONLY!
    ### NB: THE IMPORTANT THING IS TO COMPUTE THE DENSITY FAST!
    middle: List[float] = []
    for i in range(D):
        middle.append((BINS[i] + BINS[i + 1]) / 2)
    plt.bar(x=middle, height=density, width=1.0 / D + 0.001)
    plt.xlim(0, 1)
    plt.xlabel("x")
    plt.ylabel("Density")
    plt.show()
Run Code Online (Sandbox Code Playgroud)

matplotlib 输出

在这个例子中,N=1D=50P=4。但是,我们希望在尽可能快的情况下将这种方法扩展到N=10000P=100。我不清楚我们将如何矢量化这种方法。我们如何最好地加快速度?

编辑

更快的解决方案可能会产生略有不同的结果。例如,它可以近似正态分布,而不是使用精确的正态分布。

编辑2

我们只关心density使用user_density()函数进行计算。该图仅用于帮助解释该方法。我们不关心情节本身:)

编辑3

请注意,P是平均值。每个用户的积分。一些用户可能拥有更多,而一些用户可能拥有更少。如果有帮助,您可以假设我们可以丢弃点数,以便所有用户最多拥有2 * P个点数。只要解决方案可以处理每个用户的灵活点数,就可以在进行基准测试时忽略这一部分。

tst*_*isl 5

对于最大情况(N=10000,AVG[P]=100,D=50),您可以通过使用 FFT 并以 numpy 友好格式创建来获得低于 50 毫秒的时间data。否则它将接近 300 毫秒。

这个想法是将一个以 0 为中心的单一正态分布与一系​​列 Dirac deltas 进行卷积。

见下图: 在此处输入图片说明

使用循环卷积解决了两个问题。

  • 自然处理边缘的包裹
  • 可以用 FFT 和卷积定理有效地计算

第一个必须创建要复制的分发。函数mk_bell()创建了以 0 为中心的 stddev 0.05 正态分布的直方图。分布围绕 1。在这里可以使用任意分布。计算分布的频谱用于快速卷积。

接下来创建一个类似梳子的函数。峰值位于对应于用户密度峰值的索引处。例如

peaks_location = [0.1, 0.3, 0.7]
D = 10
Run Code Online (Sandbox Code Playgroud)

映射到

peak_index = (D * peak_location).astype(int) = [1, 3, 7]
dist = [0, 1, 0, 1, 0, 0, 0, 1, 0, 0, 0] # ones at [1, 3, 7]
Run Code Online (Sandbox Code Playgroud)

您可以通过在np.bincount()函数的帮助下计算每个峰值位置的 bin 索引来快速创建 Diract Delta 的组合。为了加快速度,甚至可以并行计算用户峰值的梳状函数。

数组dist是形状为 的二维数组NxD。它可以线性化为形状的一维数组(N*D)。在此更改之后,位置上的元素[user_id, peak_index]将可以从 index 访问user_id*D + peak_index。使用 numpy 友好的输入格式(如下所述),这个操作很容易矢量化。

卷积定理说两个信号的卷积频谱等于每个信号频谱的乘积。

频谱是用numpy.fft.rfft它计算的,它是专用于实数信号(无虚部)的快速傅立叶变换的变体。

Numpy 允许使用一个命令计算较大矩阵每一行的 FFT。

接下来,通过简单的乘法和广播的使用来计算卷积的频谱。

接下来,通过在 中实现的逆傅立叶变换将频谱计算回“时”域numpy.fft.irfft

要使用 numpy 的全速,应避免可变大小的数据结构并保持固定大小的数组。我建议将输入数据表示为三个数组。

  • uids 用户标识符,整数 0..N-1
  • peaks,峰的位置
  • mass, peek 的质量,目前为 1/numer-of-peaks-for-user

这种数据表示允许快速矢量化处理。例如:

user_data = [[0.1, 0.3], [0.5]]
Run Code Online (Sandbox Code Playgroud)

映射到:

uids = [0, 0, 1] # 2 points for user_data[0], one from user_data[1]
peaks = [0.1, 0.3, 0.5] # serialized user_data
mass = [0.5, 0.5, 1] # scaling factors for each peak, 0.5 means 2 peaks for user 0
Run Code Online (Sandbox Code Playgroud)

编码:

import numpy as np
import matplotlib.pyplot as plt
import time

def mk_bell(D, SIGMA):
    # computes normal distribution wrapped and centered at zero
    x = np.linspace(0, 1, D, endpoint=False);
    x = (x + 0.5) % 1 - 0.5
    bell = np.exp(-0.5*np.square(x / SIGMA))
    return bell / bell.sum()

def user_densities_by_fft(uids, peaks, mass, D, N=None):
    bell = mk_bell(D, 0.05).astype('f4')
    sbell = np.fft.rfft(bell)
    if N is None:
        N = uids.max() + 1
    # ensure that peaks are in [0..1) internal
    peaks = peaks - np.floor(peaks)
    # convert peak location from 0-1 to the indices
    pidx = (D * (peaks + uids)).astype('i4')
    dist = np.bincount(pidx, mass, N * D).reshape(N, D)
    # process all users at once with Convolution Theorem
    sdist = np.fft.rfft(dist)
    sdist *= sbell
    res = np.fft.irfft(sdist)

    return res

def generate_data(N, Pmean):
    # generateor for large data
    data = []
    for n in range(N):
        # select P uniformly from 1..2*Pmean
        P = np.random.randint(2 * Pmean) + 1
        # select peak locations
        chunk = np.random.uniform(size=P)
        data.append(chunk.tolist())
    return data

def make_data_numpy_friendly(data):
    uids = []
    chunks = []
    mass = []
    for uid, peaks in enumerate(data):
        uids.append(np.full(len(peaks), uid))
        mass.append(np.full(len(peaks), 1 / len(peaks)))
        chunks.append(peaks)
    return np.hstack(uids), np.hstack(chunks), np.hstack(mass)




D = 50

# demo for simple multi-distribution
data, N = [[0, .5], [.7, .7, .7, .9], [0.05, 0.3, 0.5, 0.5]], None
uids, peaks, mass = make_data_numpy_friendly(data)
dist = user_densities_by_fft(uids, peaks, mass, D, N)
plt.plot(dist.T)
plt.show()

# the actual measurement
N = 10000
P = 100
data = generate_data(N, P)

tic = time.time()
uids, peaks, mass = make_data_numpy_friendly(data)
toc = time.time()
print(f"make_data_numpy_friendly: {toc - tic}")

tic = time.time()
dist = user_densities_by_fft(uids, peaks, mass, D, N)
toc = time.time()
print(f"user_densities_by_fft: {toc - tic}")

Run Code Online (Sandbox Code Playgroud)

我的 4 核 Haswell 机器上的结果是:

make_data_numpy_friendly: 0.2733159065246582
user_densities_by_fft: 0.04064297676086426
Run Code Online (Sandbox Code Playgroud)

处理数据需要40ms。请注意,将数据处理为 numpy 友好格式所需的时间是实际计算分布的 6 倍。Python 在循环方面真的很慢。因此,我强烈建议首先以 numpy 友好的方式直接生成输入数据。

有一些问题需要解决:

  • 精度,可以通过使用更大的D和下采样来提高
  • 通过加宽尖峰可以进一步提高峰值定位的准确性。
  • 性能,scipy.fft提供可能更快的 FFT 实现的移动变体