pir*_*pir 5 python performance numpy probability scipy
我们有P avg 的N 个用户。每个用户的点数,其中每个点都是 0 到 1 之间的单个值。我们需要使用已知密度为 0.05 的正态分布来分布每个点的质量,因为这些点具有一些不确定性。此外,我们需要将质量包裹在 0 和 1 左右,例如,0.95 处的点也将分配大约 0 的质量。我在下面提供了一个工作示例,它将正态分布划分为D=50 个容器。该示例使用 Python 类型模块,但如果您愿意,可以忽略它。
from typing import List, Any
import numpy as np
import scipy.stats
import matplotlib.pyplot as plt
D = 50
BINS: List[float] = np.linspace(0, 1, D + 1).tolist()
def probability_mass(distribution: Any, x0: float, x1: float) -> float:
"""
Computes the area under the distribution, wrapping at 1.
The wrapping is done by adding the PDF at +- 1.
"""
assert x1 > x0
return (
(distribution.cdf(x1) - distribution.cdf(x0))
+ (distribution.cdf(x1 + 1) - distribution.cdf(x0 + 1))
+ (distribution.cdf(x1 - 1) - distribution.cdf(x0 - 1))
)
def point_density(x: float) -> List[float]:
distribution: Any = scipy.stats.norm(loc=x, scale=0.05)
density: List[float] = []
for i in range(D):
density.append(probability_mass(distribution, BINS[i], BINS[i + 1]))
return density
def user_density(points: List[float]) -> Any:
# Find the density of each point
density: Any = np.array([point_density(p) for p in points])
# Combine points and normalize
combined = density.sum(axis=0)
return combined / combined.sum()
if __name__ == "__main__":
# Example for one user
data: List[float] = [.05, .3, .5, .5]
density = user_density(data)
# Example for multiple users (N = 2)
print([user_density(x) for x in [[.3, .5], [.7, .7, .7, .9]]])
### NB: THE REMAINING CODE IS FOR ILLUSTRATION ONLY!
### NB: THE IMPORTANT THING IS TO COMPUTE THE DENSITY FAST!
middle: List[float] = []
for i in range(D):
middle.append((BINS[i] + BINS[i + 1]) / 2)
plt.bar(x=middle, height=density, width=1.0 / D + 0.001)
plt.xlim(0, 1)
plt.xlabel("x")
plt.ylabel("Density")
plt.show()
Run Code Online (Sandbox Code Playgroud)
在这个例子中,N=1,D=50,P=4。但是,我们希望在尽可能快的情况下将这种方法扩展到N=10000和P=100。我不清楚我们将如何矢量化这种方法。我们如何最好地加快速度?
编辑
更快的解决方案可能会产生略有不同的结果。例如,它可以近似正态分布,而不是使用精确的正态分布。
编辑2
我们只关心density使用user_density()函数进行计算。该图仅用于帮助解释该方法。我们不关心情节本身:)
编辑3
请注意,P是平均值。每个用户的积分。一些用户可能拥有更多,而一些用户可能拥有更少。如果有帮助,您可以假设我们可以丢弃点数,以便所有用户最多拥有2 * P个点数。只要解决方案可以处理每个用户的灵活点数,就可以在进行基准测试时忽略这一部分。
对于最大情况(N=10000,AVG[P]=100,D=50),您可以通过使用 FFT 并以 numpy 友好格式创建来获得低于 50 毫秒的时间data。否则它将接近 300 毫秒。
这个想法是将一个以 0 为中心的单一正态分布与一系列 Dirac deltas 进行卷积。
使用循环卷积解决了两个问题。
第一个必须创建要复制的分发。函数mk_bell()创建了以 0 为中心的 stddev 0.05 正态分布的直方图。分布围绕 1。在这里可以使用任意分布。计算分布的频谱用于快速卷积。
接下来创建一个类似梳子的函数。峰值位于对应于用户密度峰值的索引处。例如
peaks_location = [0.1, 0.3, 0.7]
D = 10
Run Code Online (Sandbox Code Playgroud)
映射到
peak_index = (D * peak_location).astype(int) = [1, 3, 7]
dist = [0, 1, 0, 1, 0, 0, 0, 1, 0, 0, 0] # ones at [1, 3, 7]
Run Code Online (Sandbox Code Playgroud)
您可以通过在np.bincount()函数的帮助下计算每个峰值位置的 bin 索引来快速创建 Diract Delta 的组合。为了加快速度,甚至可以并行计算用户峰值的梳状函数。
数组dist是形状为 的二维数组NxD。它可以线性化为形状的一维数组(N*D)。在此更改之后,位置上的元素[user_id, peak_index]将可以从 index 访问user_id*D + peak_index。使用 numpy 友好的输入格式(如下所述),这个操作很容易矢量化。
卷积定理说两个信号的卷积频谱等于每个信号频谱的乘积。
频谱是用numpy.fft.rfft它计算的,它是专用于实数信号(无虚部)的快速傅立叶变换的变体。
Numpy 允许使用一个命令计算较大矩阵每一行的 FFT。
接下来,通过简单的乘法和广播的使用来计算卷积的频谱。
接下来,通过在 中实现的逆傅立叶变换将频谱计算回“时”域numpy.fft.irfft。
要使用 numpy 的全速,应避免可变大小的数据结构并保持固定大小的数组。我建议将输入数据表示为三个数组。
uids 用户标识符,整数 0..N-1peaks,峰的位置mass, peek 的质量,目前为 1/numer-of-peaks-for-user这种数据表示允许快速矢量化处理。例如:
user_data = [[0.1, 0.3], [0.5]]
Run Code Online (Sandbox Code Playgroud)
映射到:
uids = [0, 0, 1] # 2 points for user_data[0], one from user_data[1]
peaks = [0.1, 0.3, 0.5] # serialized user_data
mass = [0.5, 0.5, 1] # scaling factors for each peak, 0.5 means 2 peaks for user 0
Run Code Online (Sandbox Code Playgroud)
编码:
import numpy as np
import matplotlib.pyplot as plt
import time
def mk_bell(D, SIGMA):
# computes normal distribution wrapped and centered at zero
x = np.linspace(0, 1, D, endpoint=False);
x = (x + 0.5) % 1 - 0.5
bell = np.exp(-0.5*np.square(x / SIGMA))
return bell / bell.sum()
def user_densities_by_fft(uids, peaks, mass, D, N=None):
bell = mk_bell(D, 0.05).astype('f4')
sbell = np.fft.rfft(bell)
if N is None:
N = uids.max() + 1
# ensure that peaks are in [0..1) internal
peaks = peaks - np.floor(peaks)
# convert peak location from 0-1 to the indices
pidx = (D * (peaks + uids)).astype('i4')
dist = np.bincount(pidx, mass, N * D).reshape(N, D)
# process all users at once with Convolution Theorem
sdist = np.fft.rfft(dist)
sdist *= sbell
res = np.fft.irfft(sdist)
return res
def generate_data(N, Pmean):
# generateor for large data
data = []
for n in range(N):
# select P uniformly from 1..2*Pmean
P = np.random.randint(2 * Pmean) + 1
# select peak locations
chunk = np.random.uniform(size=P)
data.append(chunk.tolist())
return data
def make_data_numpy_friendly(data):
uids = []
chunks = []
mass = []
for uid, peaks in enumerate(data):
uids.append(np.full(len(peaks), uid))
mass.append(np.full(len(peaks), 1 / len(peaks)))
chunks.append(peaks)
return np.hstack(uids), np.hstack(chunks), np.hstack(mass)
D = 50
# demo for simple multi-distribution
data, N = [[0, .5], [.7, .7, .7, .9], [0.05, 0.3, 0.5, 0.5]], None
uids, peaks, mass = make_data_numpy_friendly(data)
dist = user_densities_by_fft(uids, peaks, mass, D, N)
plt.plot(dist.T)
plt.show()
# the actual measurement
N = 10000
P = 100
data = generate_data(N, P)
tic = time.time()
uids, peaks, mass = make_data_numpy_friendly(data)
toc = time.time()
print(f"make_data_numpy_friendly: {toc - tic}")
tic = time.time()
dist = user_densities_by_fft(uids, peaks, mass, D, N)
toc = time.time()
print(f"user_densities_by_fft: {toc - tic}")
Run Code Online (Sandbox Code Playgroud)
我的 4 核 Haswell 机器上的结果是:
make_data_numpy_friendly: 0.2733159065246582
user_densities_by_fft: 0.04064297676086426
Run Code Online (Sandbox Code Playgroud)
处理数据需要40ms。请注意,将数据处理为 numpy 友好格式所需的时间是实际计算分布的 6 倍。Python 在循环方面真的很慢。因此,我强烈建议首先以 numpy 友好的方式直接生成输入数据。
有一些问题需要解决:
D和下采样来提高scipy.fft提供可能更快的 FFT 实现的移动变体