如何减少二维连接域上的集成时间

SMA*_*A.D 3 python performance integral scipy numerical-methods

我需要在简单连接(并且大部分时间是凸的)的域上计算许多 2D 集成。我正在使用 python 函数scipy.integrate.nquad来做这个集成。但是,与矩形域上的积分相比,此操作所需的时间要长得多。有没有可能更快的实现?

这是一个例子;我首先在圆形域(使用函数内部的约束)上集成常数函数,然后在矩形域(nquad函数的默认域)上集成。

from scipy import integrate
import time

def circular(x,y,a):
  if x**2 + y**2 < a**2/4:
    return 1 
  else:
    return 0

def rectangular(x,y,a):
  return 1

a = 4
start = time.time()
result = integrate.nquad(circular, [[-a/2, a/2],[-a/2, a/2]], args=(a,))
now = time.time()
print(now-start)

start = time.time()
result = integrate.nquad(rectangular, [[-a/2, a/2],[-a/2, a/2]], args=(a,))
now = time.time()
print(now-start)
Run Code Online (Sandbox Code Playgroud)

矩形域只需0.00029几秒钟,而圆形域则需要2.07061几秒钟才能完成。

此外,循环积分给出以下警告:

IntegrationWarning: The maximum number of subdivisions (50) has been achieved.
If increasing the limit yields no improvement it is advised to analyze 
the integrand in order to determine the difficulties.  If the position of a 
local difficulty can be determined (singularity, discontinuity) one will 
probably gain from splitting up the interval and calling the integrator 
on the subranges.  Perhaps a special-purpose integrator should be used.
**opt)
Run Code Online (Sandbox Code Playgroud)

Jac*_*din 6

加快计算速度的一种方法是使用numbaPython 的即时编译器。

@jit装饰

Numba 提供了一个@jit装饰器来编译一些 Python 代码并输出可以在多个 CPU 上并行运行的优化机器代码。Jitting 被积函数只需很少的努力,并且可以节省一些时间,因为代码经过优化以运行得更快。人们甚至不必担心类型,Numba 会在幕后完成所有这些工作。

from scipy import integrate
from numba import jit

@jit
def circular_jit(x, y, a):
    if x**2 + y**2 < a**2 / 4:
        return 1 
    else:
        return 0

a = 4
result = integrate.nquad(circular_jit, [[-a/2, a/2],[-a/2, a/2]], args=(a,))
Run Code Online (Sandbox Code Playgroud)

这确实运行得更快,在我的机器上计时时,我得到:

 Original circular function: 1.599048376083374
 Jitted circular function: 0.8280022144317627
Run Code Online (Sandbox Code Playgroud)

这将减少约 50% 的计算时间。

西比的 LowLevelCallable

由于语言的性质,Python 中的函数调用非常耗时。与 C 等编译语言相比,开销有时会使 Python 代码变慢。

为了缓解这种情况,Scipy 提供了一个LowLevelCallable类,可用于提供对低级编译回调函数的访问。通过这种机制,可以绕过 Python 的函数调用开销并进一步节省时间。

请注意,在 的情况下nquadcfunc传递给的签名LowerLevelCallable必须是以下之一:

double func(int n, double *xx)
double func(int n, double *xx, void *user_data)
Run Code Online (Sandbox Code Playgroud)

其中int是参数的数量,参数的值在第二个参数中。user_data用于需要上下文操作的回调。

因此,我们可以稍微更改 Python 中的循环函数签名以使其兼容。

from scipy import integrate, LowLevelCallable
from numba import cfunc
from numba.types import intc, CPointer, float64


@cfunc(float64(intc, CPointer(float64)))
def circular_cfunc(n, args):
    x, y, a = (args[0], args[1], args[2]) # Cannot do `(args[i] for i in range(n))` as `yield` is not supported
    if x**2 + y**2 < a**2/4:
        return 1 
    else:
        return 0

circular_LLC = LowLevelCallable(circular_cfunc.ctypes)

a = 4
result = integrate.nquad(circular_LLC, [[-a/2, a/2],[-a/2, a/2]], args=(a,))
Run Code Online (Sandbox Code Playgroud)

用这种方法我得到

LowLevelCallable circular function: 0.07962369918823242
Run Code Online (Sandbox Code Playgroud)

与原始函数相比,这减少了 95%,与函数的 jitted 版本相比减少了 90%。

定制装修师

为了使代码更整洁并保持被积函数的签名灵活,可以创建一个定制的装饰器函数。它将 jit 被积函数并将其包装成一个LowLevelCallable对象,然后可以与nquad.

from scipy import integrate, LowLevelCallable
from numba import cfunc, jit
from numba.types import intc, CPointer, float64

def jit_integrand_function(integrand_function):
    jitted_function = jit(integrand_function, nopython=True)

    @cfunc(float64(intc, CPointer(float64)))
    def wrapped(n, xx):
        return jitted_function(xx[0], xx[1], xx[2])
    return LowLevelCallable(wrapped.ctypes)


@jit_integrand_function
def circular(x, y, a):
    if x**2 + y**2 < a**2 / 4:
        return 1
    else:
        return 0

a = 4
result = integrate.nquad(circular, [[-a/2, a/2],[-a/2, a/2]], args=(a,))
Run Code Online (Sandbox Code Playgroud)

任意数量的参数

如果参数的数量未知,那么我们可以使用Numba 提供的便捷carray函数将其转换CPointer(float64)为 Numpy 数组。

import numpy as np
from scipy import integrate, LowLevelCallable
from numba import cfunc, carray, jit
from numba.types import intc, CPointer, float64

def jit_integrand_function(integrand_function):
    jitted_function = jit(integrand_function, nopython=True)

    @cfunc(float64(intc, CPointer(float64)))
    def wrapped(n, xx):
        ar = carray(xx, n)
        return jitted_function(ar[0], ar[1], ar[2:])
    return LowLevelCallable(wrapped.ctypes)


@jit_integrand_function
def circular(x, y, a):
    if x**2 + y**2 < a[-1]**2 / 4:
        return 1
    else:
        return 0

ar = np.array([1, 2, 3, 4])
a = ar[-1]
result = integrate.nquad(circular, [[-a/2, a/2],[-a/2, a/2]], args=ar)
Run Code Online (Sandbox Code Playgroud)