Ras*_*los 7 c++ python performance boost
我的目标是在Python中为谱有限元素编写一个小型库,为此我尝试使用Boost扩展python与C++库,希望它能使我的代码更快.
class Quad {
public:
Quad(int, int);
double integrate(boost::function<double(std::vector<double> const&)> const&);
double integrate_wrapper(boost::python::object const&);
std::vector< std::vector<double> > nodes;
std::vector<double> weights;
};
...
namespace std {
typedef std::vector< std::vector< std::vector<double> > > cube;
typedef std::vector< std::vector<double> > mat;
typedef std::vector<double> vec;
}
...
double Quad::integrate(boost::function<double(vec const&)> const& func) {
double result = 0.;
for (unsigned int i = 0; i < nodes.size(); ++i) {
result += func(nodes[i]) * weights[i];
}
return result;
}
// ---- PYTHON WRAPPER ----
double Quad::integrate_wrapper(boost::python::object const& func) {
std::function<double(vec const&)> lambda;
switch (this->nodes[0].size()) {
case 1: lambda = [&func](vec const& v) -> double { return boost::python::extract<double>(func (v[0])); }; break;
case 2: lambda = [&func](vec const& v) -> double { return boost::python::extract<double>(func(v[0], v[1])); }; break;
case 3: lambda = [&func](vec const& v) -> double { return boost::python::extract<double>(func(v[0], v[1], v[2])); }; break;
default: cout << "Dimension must be 1, 2, or 3" << endl; exit(0);
}
return integrate(lambda);
}
// ---- EXPOSE TO PYTHON ----
BOOST_PYTHON_MODULE(hermite)
{
using namespace boost::python;
class_<std::vec>("double_vector")
.def(vector_indexing_suite<std::vec>())
;
class_<std::mat>("double_mat")
.def(vector_indexing_suite<std::mat>())
;
class_<Quad>("Quad", init<int,int>())
.def("integrate", &Quad::integrate_wrapper)
.def_readonly("nodes", &Quad::nodes)
.def_readonly("weights", &Quad::weights)
;
}
Run Code Online (Sandbox Code Playgroud)
我比较了三种不同方法的性能来计算两个函数的积分.这两个功能是:
f1(x,y,z) = x*xf2(x,y,z) = np.cos(2*x+2*y+2*z) + x*y + np.exp(-z*z) +np.cos(2*x+2*y+2*z) + x*y + np.exp(-z*z) +np.cos(2*x+2*y+2*z) + x*y + np.exp(-z*z) +np.cos(2*x+2*y+2*z) + x*y + np.exp(-z*z) +np.cos(2*x+2*y+2*z) + x*y + np.exp(-z*z)使用的方法是:
从C++程序调用库:
double func(vector<double> v) {
return F1_OR_F2;
}
int main() {
hermite::Quad quadrature(100, 3);
double result = quadrature.integrate(func);
cout << "Result = " << result << endl;
}
Run Code Online (Sandbox Code Playgroud)从Python脚本调用库:
import hermite
def function(x, y, z): return F1_OR_F2
my_quad = hermite.Quad(100, 3)
result = my_quad.integrate(function)
Run Code Online (Sandbox Code Playgroud)for在Python中使用循环:
import hermite
def function(x, y, z): return F1_OR_F2
my_quad = hermite.Quad(100, 3)
weights = my_quad.weights
nodes = my_quad.nodes
result = 0.
for i in range(len(weights)):
result += weights[i] * function(nodes[i][0], nodes[i][1], nodes[i][2])
Run Code Online (Sandbox Code Playgroud)以下是每种方法的执行时间(时间是使用time方法1 的命令测量的,python模块time是方法2和3,C++代码是使用Cmake编译的set (CMAKE_BUILD_TYPE Release))
用于f1:
0.07s user 0.01s system 99% cpu 0.083 total用于f2:
0.28s user 0.01s system 99% cpu 0.289 total根据这些结果,我的问题如下:
为什么第一种方法比第二种方法快得多?
可以改进python包装器以达到方法1和方法2之间相当的性能吗?
为什么方法2比方法3更敏感,以便集成函数的难度?
编辑:我还尝试定义一个接受字符串作为参数的函数,将其写入文件,然后继续编译文件并动态加载生成的.so文件:
double Quad::integrate_from_string(string const& function_body) {
// Write function to file
ofstream helper_file;
helper_file.open("/tmp/helper_function.cpp");
helper_file << "#include <vector>\n#include <cmath>\n";
helper_file << "extern \"C\" double toIntegrate(std::vector<double> v) {\n";
helper_file << " return " << function_body << ";\n}";
helper_file.close();
// Compile file
system("c++ /tmp/helper_function.cpp -o /tmp/helper_function.so -shared -fPIC");
// Load function dynamically
typedef double (*vec_func)(vec);
void *function_so = dlopen("/tmp/helper_function.so", RTLD_NOW);
vec_func func = (vec_func) dlsym(function_so, "toIntegrate");
double result = integrate(func);
dlclose(function_so);
return result;
}
Run Code Online (Sandbox Code Playgroud)
它很脏,可能不太便携,所以我很乐意找到一个更好的解决方案,但它运行良好,并且可以很好地发挥其ccode功能sympy.
第二次编辑我用纯粹的Python使用Numpy重写了这个函数.
import numpy as np
import numpy.polynomial.hermite_e as herm
import time
def integrate(function, degrees):
dim = len(degrees)
nodes_multidim = []
weights_multidim = []
for i in range(dim):
nodes_1d, weights_1d = herm.hermegauss(degrees[i])
nodes_multidim.append(nodes_1d)
weights_multidim.append(weights_1d)
grid_nodes = np.meshgrid(*nodes_multidim)
grid_weights = np.meshgrid(*weights_multidim)
nodes_flattened = []
weights_flattened = []
for i in range(dim):
nodes_flattened.append(grid_nodes[i].flatten())
weights_flattened.append(grid_weights[i].flatten())
nodes = np.vstack(nodes_flattened)
weights = np.prod(np.vstack(weights_flattened), axis=0)
return np.dot(function(nodes), weights)
def function(v): return F1_OR_F2
result = integrate(function, [100,100,100])
print("-> Result = " + str(result) + ", Time = " + str(end-start))
Run Code Online (Sandbox Code Playgroud)
有点令人惊讶(至少对我而言),此方法与纯C++实现之间的性能没有显着差异.特别是,它需要0.059秒f1和0.36秒f2.
您的函数按值获取向量,这涉及复制向量。integrate_wrapper做额外的副本。
在这些 lambda 表达式中通过引用接受boost::function和func通过引用捕获也是有意义的。
将这些更改为(注意&和const&位):
double integrate(boost::function<double(std::vector<double> const&)> const&);
double Quad::integrate_wrapper(boost::python::object func) {
std::function<double(vec const&)> lambda;
switch (this->nodes[0].size()) {
case 1: lambda = [&func](vec const& v) -> double { return boost::python::extract<double>(func (v[0])); }; break;
case 2: lambda = [&func](vec const& v) -> double { return boost::python::extract<double>(func(v[0], v[1])); }; break;
case 3: lambda = [&func](vec const& v) -> double { return boost::python::extract<double>(func(v[0], v[1], v[2])); }; break;
default: cout << "Dimension must be 1, 2, or 3" << endl; exit(0);
}
return integrate(lambda);
}
Run Code Online (Sandbox Code Playgroud)
尽管如此,从 C++ 调用 Python 函数比调用 C++ 函数更昂贵。
人们通常numpy在 Python 中用于快速线性代数,它使用 SIMD 进行许多常见操作。numpy在推出 C++ 实现之前,您可能应该考虑先使用。在 C++ 中,您必须在 Eigen 上使用英特尔 MKL 进行矢量化。
另一种方式
以一种不太通用的方式,您的问题可以更容易地解决。您可以用纯 python 代码编写集成和函数,并使用 numba 进行编译。
第一种方法(第一次运行后每次积分运行 0.025 秒 (I7-4771))
函数在第一次调用时编译,大约需要 0.5s
函数_2:
@nb.njit(fastmath=True)
def function_to_integrate(x,y,z):
return np.cos(2*x+2*y+2*z) + x*y + np.exp(-z*z) +np.cos(2*x+2*y+2*z) + x*y + np.exp(-z*z) +np.cos(2*x+2*y+2*z) + x*y + np.exp(-z*z) +np.cos(2*x+2*y+2*z) + x*y + np.exp(-z*z) +np.cos(2*x+2*y+2*z) + x*y + np.exp(-z*z)
Run Code Online (Sandbox Code Playgroud)
一体化
@nb.jit(fastmath=True)
def integrate3(num_int_Points):
nodes_1d, weights_1d = herm.hermegauss(num_int_Points)
result=0.
for i in range(num_int_Points):
for j in range(num_int_Points):
result+=np.sum(function_to_integrate(nodes_1d[i],nodes_1d[j],nodes_1d[:])*weights_1d[i]*weights_1d[j]*weights_1d[:])
return result
Run Code Online (Sandbox Code Playgroud)
测试
import numpy as np
import numpy.polynomial.hermite_e as herm
import numba as nb
import time
t1=time.time()
nodes_1d, weights_1d = herm.hermegauss(num_int_Points)
for i in range(100):
#result = integrate3(nodes_1d,weights_1d,100)
result = integrate3(100)
print(time.time()-t1)
print(result)
Run Code Online (Sandbox Code Playgroud)
第二种方法
该函数还可以并行运行,当对许多元素进行积分时,高斯点和权重可能只计算一次。这将导致运行时间约为0.005 秒。
@nb.njit(fastmath=True,parallel=True)
def integrate3(nodes_1d,weights_1d,num_int_Points):
result=0.
for i in nb.prange(num_int_Points):
for j in range(num_int_Points):
result+=np.sum(function_to_integrate(nodes_1d[i],nodes_1d[j],nodes_1d[:])*weights_1d[i]*weights_1d[j]*weights_1d[:])
return result
Run Code Online (Sandbox Code Playgroud)
传递任意函数
import numpy as np
import numpy.polynomial.hermite_e as herm
import numba as nb
import time
def f(x,y,z):
return np.cos(2*x+2*y+2*z) + x*y + np.exp(-z*z) +np.cos(2*x+2*y+2*z) + x*y + np.exp(-z*z) +np.cos(2*x+2*y+2*z) + x*y + np.exp(-z*z) +np.cos(2*x+2*y+2*z) + x*y + np.exp(-z*z) +np.cos(2*x+2*y+2*z) + x*y + np.exp(-z*z)
def make_integrate3(f):
f_jit=nb.njit(f,fastmath=True)
@nb.njit(fastmath=True,parallel=True)
def integrate_3(nodes_1d,weights_1d,num_int_Points):
result=0.
for i in nb.prange(num_int_Points):
for j in range(num_int_Points):
result+=np.sum(f_jit(nodes_1d[i],nodes_1d[j],nodes_1d[:])*weights_1d[i]*weights_1d[j]*weights_1d[:])
return result
return integrate_3
int_fun=make_integrate3(f)
num_int_Points=100
nodes_1d, weights_1d = herm.hermegauss(num_int_Points)
#Calling it the first time (takes about 1s)
result = int_fun(nodes_1d,weights_1d,100)
t1=time.time()
for i in range(100):
result = int_fun(nodes_1d,weights_1d,100)
print(time.time()-t1)
print(result)
Run Code Online (Sandbox Code Playgroud)
第一次调用后,使用 Numba 0.38 和Intel SVML大约需要0.002 秒