Eth*_*001 4 python primes python-3.x
我有一个创建素数的代码,将它们转换为梅森数,然后素数再次检查它是否是梅森素数。它工作得很好,除了它停在 2^31 - 1... (2147483647)
我正在使用一个产生无限数字的函数:
def infinity():
i = 0
while True:
i += 1
yield i
Run Code Online (Sandbox Code Playgroud)
然后将其更改为末尾while True带有 a 的循环i += 1,但仍然不起作用。
def isPrime(i):
isprime = True
for j in range(2, int(math.sqrt(i) + 1)):
if i % j == 0:
return False
if isprime and i!=1:
return True
i=1
while True:
isprime = True
for j in range(2, int(math.sqrt(i) + 1)):
if i % j == 0:
isprime = False
break
if isprime and i!=1:
test = (2**i)-1
result = isPrime(test)
if result:
print (test)
i+=1
Run Code Online (Sandbox Code Playgroud)
你的算法看起来不错并且对我有用。不过,我决定做出一些改进,并实现了更快的寻找梅森素数的算法。仅供参考,这里列出了所有世界知名的梅森素数。
在接下来的所有步骤中,首先是描述,然后是代码,在代码之后您可以看到带有运行时间的控制台输出。
因为在我的答案中,我有很多代码片段,而且很多代码片段都很大,所以您可以使用漂亮的 Chrome 扩展程序 (Clipboardy)(在这个答案中描述了如何安装),它允许轻松地将 StackOverflow 代码片段复制到剪贴板。
步骤1。算法版本 0,仅限 Python。
首先,我改进了你的算法,也美化了你的代码。一个简单的改进是,当检查 is_prime 时,您只能检查奇数除数,这将使速度加倍。这就是我得到的:
import math, time
def print_result(exp, *, tb = time.time()):
print(exp, 'at', round(time.time() - tb, 3), 'secs', flush = True, end = ', ')
def mersennes_v0():
def is_prime(n):
if n <= 2:
return n == 2
if n & 1 == 0:
return False
for j in range(3, math.floor(math.sqrt(n + 0.1)) + 1, 2):
if n % j == 0:
return False
return True
for i in range(2, 63):
if is_prime(i) and is_prime((1 << i) - 1):
print_result(i)
mersennes_v0()
Run Code Online (Sandbox Code Playgroud)
输出:
2 at 0.0 secs, 3 at 0.0 secs, 5 at 0.001 secs, 7 at 0.001 secs,
13 at 0.001 secs, 17 at 0.001 secs, 19 at 0.002 secs, 31 at 0.009 secs,
61 at 243.5412 secs,
Run Code Online (Sandbox Code Playgroud)
第2步。算法版本 0,加上 Numba。
然后,在不改变算法的情况下,我做了非常简单的改进,我使用了Numba JIT 代码优化器/编译器!要在您的程序中使用它,您需要通过安装一次 PIP 模块python -m pip install numba。
在上面的代码中,我只是做了 3 个小更改来使用 Numba - 1) 做了import numba2) 添加了函数装饰器@numba.njit3) 添加了行with numba.objmode():。
使用 Numba 将 AlgoV0 运行时间从 243 秒缩短至 15 秒,即速度提升了 3 倍16x!
重要提示 - 与常规 Python Numba 不同,Numba 不支持大整数运算,这意味着只能测试适合 64 位整数的素数!
import math, time
import numba
def print_result(exp, *, tb = time.time()):
print(exp, 'at', round(time.time() - tb, 3), 'secs', flush = True, end = ', ')
@numba.njit
def mersennes_v0():
def is_prime(n):
if n <= 2:
return n == 2
if n & 1 == 0:
return False
for j in range(3, math.floor(math.sqrt(n + 0.1)) + 1, 2):
if n % j == 0:
return False
return True
for i in range(2, 63):
if is_prime(i) and is_prime((1 << i) - 1):
with numba.objmode():
print_result(i)
mersennes_v0()
Run Code Online (Sandbox Code Playgroud)
输出:
2 at 1.257 secs, 3 at 1.257 secs, 5 at 1.257 secs, 7 at 1.257 secs,
13 at 1.257 secs, 17 at 1.257 secs, 19 at 1.258 secs, 31 at 1.258 secs,
61 at 15.052 secs,
Run Code Online (Sandbox Code Playgroud)
步骤3。算法版本 1,仅限 Python。
然后我决定实施Lucas Lehmer Primality Test,这将更快地解决相同的任务。因为它是完全不同的算法,所以我将其称为版本 1 算法。
Numba 在这里帮不了我们多大的忙,因为它没有长算术,所以我没有使用它,我使用了内置的默认 Python 长算术。Python 默认的长算术实现在速度上足够好,可以按原样使用。此外,下一个代码的主要运行时间是在基于 C 的低级长算术函数内,因此下一个代码是 Lucas Lehmer 测试的非常有效的变体。
请注意,Lucas Lehmer 检验对于从 3 开始的指数有效,因此指数 2 不会显示在控制台输出中。
因此,接下来的代码仅使用带有标准模块的原始 Python,并且不需要安装任何 PIP 模块。
import math, time
def print_result(exp, *, tb = time.time()):
print(exp, 'at', round(time.time() - tb, 3), 'secs', flush = True, end = ', ')
def mersennes_v1():
def is_prime(n):
if n <= 2:
return n == 2
if n & 1 == 0:
return False
for j in range(3, math.floor(math.sqrt(n + 0.1)) + 1, 2):
if n % j == 0:
return False
return True
def lucas_lehmer_test(p):
# Lucas Lehmer Test https://en.wikipedia.org/wiki/Lucas%E2%80%93Lehmer_primality_test
mask = (1 << p) - 1
def mer_rem(x, bits):
# Below is same as: return x % mask
while True:
r = 0
while x != 0:
r += x & mask
x >>= bits
if r == mask:
r = 0
if r < mask:
return r
x = r
s = 4
for k in range(2, p):
s = mer_rem(s * s - 2, p)
return s == 0
for p in range(3, 1 << 30):
if is_prime(p) and lucas_lehmer_test(p):
print_result(p)
mersennes_v1()
Run Code Online (Sandbox Code Playgroud)
输出:
3 at 0.0 secs, 5 at 0.001 secs, 7 at 0.001 secs, 13 at 0.001 secs,
17 at 0.001 secs, 19 at 0.001 secs, 31 at 0.002 secs, 61 at 0.003 secs,
89 at 0.004 secs, 107 at 0.005 secs, 127 at 0.006 secs, 521 at 0.062 secs,
607 at 0.087 secs, 1279 at 0.504 secs, 2203 at 2.338 secs, 2281 at 2.634 secs,
3217 at 7.819 secs, 4253 at 20.578 secs, 4423 at 23.196 secs,
9689 at 386.877 secs, 9941 at 419.243 secs, 11213 at 590.477 secs,
Run Code Online (Sandbox Code Playgroud)
步骤4。算法版本 1,加上 GMPY2。
上述算法的下一个可能的改进是使用GMPY2 Python 库。它是高度优化的长算术库,比长算术的常规 Python 实现要快得多。
但 GMPY2 的安装可能相当具有挑战性。通过此链接可以获得最新的 Windows 预编译版本。下载适合您的 Python 版本的 .whl 文件并通过 pip 安装,例如,对于我的 Windows 64 位 Python 3.7,我下载并安装了 .whl 文件pip install gmpy2-2.0.8-cp37-cp37m-win_amd64.whl。对于 Linux,最简单的安装方式是通过sudo apt install -y python3-gmpy2.
为了在上一步的代码中使用 gmpy2,我添加了行并在多个地方import gmpy2使用。gmpy2.mpz(...)
与仅使用 Python 的版本相比,使用 gmpy2 将5.2x指数 11213 的速度提高了大约几倍,对于更大的指数,速度增益会更高,这可能是由于在 GMPY2 库中使用了快速傅里叶变换乘法算法。
import math, time
import gmpy2
def print_result(exp, *, tb = time.time()):
print(exp, 'at', round(time.time() - tb, 3), 'secs', flush = True, end = ', ')
def mersennes_v1():
def is_prime(n):
if n <= 2:
return n == 2
if n & 1 == 0:
return False
for j in range(3, math.floor(math.sqrt(n + 0.1)) + 1, 2):
if n % j == 0:
return False
return True
def lucas_lehmer_test(p):
# Lucas Lehmer Test https://en.wikipedia.org/wiki/Lucas%E2%80%93Lehmer_primality_test
mask = gmpy2.mpz((1 << p) - 1)
def mer_rem(x, bits):
# Below is same as: return x % mask
while True:
r = gmpy2.mpz(0)
while x != 0:
r += x & mask
x >>= bits
if r == mask:
r = gmpy2.mpz(0)
if r < mask:
return r
x = r
s = 4
s, p = gmpy2.mpz(s), gmpy2.mpz(p)
for k in range(2, p):
s = mer_rem(s * s - 2, p)
return s == 0
for p in range(3, 1 << 30):
if is_prime(p) and lucas_lehmer_test(p):
print_result(p)
mersennes_v1()
Run Code Online (Sandbox Code Playgroud)
输出:
3 at 0.0 secs, 5 at 0.0 secs, 7 at 0.001 secs, 13 at 0.001 secs,
17 at 0.001 secs, 19 at 0.001 secs, 31 at 0.002 secs, 61 at 0.002 secs,
89 at 0.004 secs, 107 at 0.005 secs, 127 at 0.006 secs, 521 at 0.055 secs,
607 at 0.073 secs, 1279 at 0.315 secs, 2203 at 1.055 secs, 2281 at 1.16 secs,
3217 at 2.637 secs, 4253 at 5.674 secs, 4423 at 6.375 secs,
9689 at 72.535 secs, 9941 at 79.605 secs, 11213 at 114.375 secs,
19937 at 796.705 secs, 21701 at 1080.045 secs, 23209 at 1421.392 secs,
Run Code Online (Sandbox Code Playgroud)
步骤5。算法版本 1,以及 GMPY2 + 多核
与上一步相比,一个更明显的改进是使用多核 CPU 功能,即在所有可用的 CPU 核心上并行化 Lucas-Lehmer 测试。
为此,我们使用标准 Python 的多处理模块。
在我的 4 核 CPU 上,2.65x与单核版本相比,速度提升指数为 23209 倍。
import math, time, multiprocessing
import gmpy2
def print_result(exp, *, tb = time.time()):
print(exp, 'at', round(time.time() - tb, 3), 'secs', flush = True, end = ', ')
def lucas_lehmer_test(p):
# Lucas Lehmer Test https://en.wikipedia.org/wiki/Lucas%E2%80%93Lehmer_primality_test
def num(n):
return gmpy2.mpz(n)
mask = num((1 << p) - 1)
def mer_rem(x, bits):
# Below is same as: return x % mask
while True:
r = num(0)
while x != 0:
r += x & mask
x >>= bits
if r == mask:
r = num(0)
if r < mask:
return r
x = r
s, p, two = num(4), num(p), num(2)
for k in range(2, p):
s = mer_rem(s * s - two, p)
return p, s == 0
def mersennes_v1():
def is_prime(n):
if n <= 2:
return n == 2
if n & 1 == 0:
return False
for j in range(3, math.floor(math.sqrt(n + 0.1)) + 1, 2):
if n % j == 0:
return False
return True
print('Num Cores Used:', multiprocessing.cpu_count())
with multiprocessing.Pool(multiprocessing.cpu_count()) as pool:
for p, _ in filter(lambda e: e[1], pool.imap(lucas_lehmer_test, filter(is_prime, range(3, 1 << 30)))):
print_result(p)
if __name__ == '__main__':
mersennes_v1()
Run Code Online (Sandbox Code Playgroud)
输出:
3 at 0.746 secs, 5 at 0.747 secs, 7 at 0.747 secs, 13 at 0.747 secs,
17 at 0.747 secs, 19 at 0.748 secs, 31 at 0.748 secs, 61 at 0.748 secs,
89 at 0.749 secs, 107 at 0.749 secs, 127 at 0.75 secs, 521 at 0.782 secs,
607 at 0.79 secs, 1279 at 0.876 secs, 2203 at 1.09 secs, 2281 at 1.122 secs,
3217 at 1.529 secs, 4253 at 2.329 secs, 4423 at 2.529 secs,
9689 at 25.208 secs, 9941 at 29.454 secs, 11213 at 50.498 secs,
19937 at 337.926 secs, 21701 at 441.264 secs, 23209 at 537.84 secs,
Run Code Online (Sandbox Code Playgroud)
步骤6。高级代码,带有试分测试
提供下一个代码仅供参考,它比上一步复杂得多。主要区别是添加了审判部门测试(现有的卢卡斯-莱默测试)。
要使用代码pip install numpy colorama,您还gmpy2需要按照前面的步骤所述进行安装。与之前的步骤不同的是,这一步骤至少需要 Python 3.8 才能工作(由于共享内存功能)。
在函数开始时,mersennes_v2()您可以设置一些配置变量。有一个重要的变量prime_bits,它应该设置为尽可能适合您的计算机,但不能超过32,该变量控制素数表的大小,32 意味着生成所有 32 位素数。试除测试需要这个素表。如果生成更多,则测试成功的机会就更高,因此可以节省更多时间,而不是使用繁重的 Lucas-Lehmer。如果你没有太多内存,那么将 prime_bits 设置为较小的值,但不能小于 24,对于 24 5MB,仅使用 32 左右的内存650MB。
另一个添加的内容是 1) 使用埃拉托斯特尼筛法生成素数表2) 共享内存的使用 3) 带有统计信息的精美控制台输出。
最后提供的计时并不精确(不要将它们与之前的步骤进行比较),因为我今天的 CPU 过热并且速度降低了一倍。
在控制台输出中,tboost表示额外的 TrialDivision+LucasLehmer 测试所带来的提升量,即与仅使用 LucasLehmer 相比,使用 TrialDivision+LucasLehmer 平均花费的时间少了多少。如果 tboost 值大于1.0x则意味着额外的 TrialDivision 是有益的,否则(如果小于 1.0)则意味着速度变慢,这可能发生在某些系统上,然后代码需要额外的调整。在控制台中tbits:X表示对以下所有素数进行了试除测试2^X。
import math, time, multiprocessing, multiprocessing.pool, multiprocessing.shared_memory, sys, os, faulthandler, traceback, secrets
faulthandler.enable()
import numpy as np, gmpy2, colorama
colorama.init()
def is_mersenne(args, *, data = {'tf': 0}):
try:
def lucas_lehmer_test(exp):
# Lucas Lehmer Test https://en.wikipedia.org/wiki/Lucas%E2%80%93Lehmer_primality_test
def num(n):
return gmpy2.mpz(n)
mask = num((1 << exp) - 1)
def mer_rem(x, bits):
# Below is same as: return x % mask
while True:
r = num(0)
while x != 0:
r += x & mask
x >>= bits
if r == mask:
r = num(0)
if r < mask:
return r
x = r
tb = time.time()
s, exp, two = num(4), num(exp), num(2)
for k in range(2, exp):
s = mer_rem(s * s - two, exp)
#print('ll', '-+'[s==0], '_', int(exp), '_', round(time.time() - tb, 3), sep = '', end = ' ', flush = True)
return s == 0
def trial_div_test(exp):
import numpy as np
if 'ptaba' not in data:
data['shp'] = multiprocessing.shared_memory.SharedMemory(args['all_primes_shname'])
data['ptaba'] = np.ndarray((args['all_primes_size'] // 4,), dtype = np.uint32, buffer = data['shp'].buf)
if exp <= 32:
return True, {'bits': 0}
bits = max(16, min(-19 + math.floor(math.log(exp) / math.log(1.25)), math.ceil(args['prime_bits'])))
if ('ptabb', bits) not in data:
cnt = min(np.searchsorted(data['ptaba'], 1 << bits), len(data['ptaba']))
data[('ptabb', bits)] = np.ndarray((cnt,), dtype = np.uint32, buffer = data['shp'].buf)
sptab = data[('ptabb', bits)]
tb, bl, probably_prime = time.time(), 1 << 13, True
spows = np.empty((bl,), dtype = np.uint64)
for i in range(0, sptab.size, bl):
ptab = sptab[i : i + bl]
pows = spows if spows.size == ptab.size else spows[:ptab.size]
pows[...] = 2
for b in bin(exp)[3:]:
pows *= pows
if b == '1':
pows <<= 1
pows %= ptab
if np.count_nonzero(pows == 1) > 0:
probably_prime = False
break
#print('td', '-+'[probably_prime], '_', int(exp), '_', round(time.time() - tb, 3), sep = '', end = ' ', flush = True)
return probably_prime, {'bits': bits}
p, stats = args['p'], {'tt': time.time()}
r, tinfo = trial_div_test(p)
stats['tt'], stats['tr'], stats['tte'], stats['tinfo'] = time.time() - stats['tt'], r, time.time(), tinfo
if not r:
return p, r, stats
stats['lt'] = time.time()
r = lucas_lehmer_test(p)
stats['lt'], stats['lte'] = time.time() - stats['lt'], time.time()
return p, r, stats
except:
return None, True, '', traceback.format_exc()
def mersennes_v2():
prime_bits = 28
num_cores = multiprocessing.cpu_count()
console_width = os.get_terminal_size().columns
def gen_primes(stop, *, dtype = 'int64'):
# https://en.wikipedia.org/wiki/Sieve_of_Eratosthenes
import math, numpy as np
if stop < 2:
return np.zeros((0,), dtype = dtype)
primes = np.ones((stop >> 1,), dtype = np.uint8)
primes[0] = False
for p in range(3, math.floor(math.sqrt(stop)) + 1, 2):
if primes[p >> 1]:
primes[(p * p) >> 1 :: p] = False
return np.concatenate((np.array([2], dtype = dtype), (np.flatnonzero(primes).astype(dtype) << 1) + 1))
def RoundFix(x, n):
s = str(round(x, n))
return s + '0' * (n - len(s) + ('.' + s).rfind('.'))
prime_bits = max(24, min(prime_bits, math.log(math.sqrt((1 << 63) - 1)) / math.log(2)))
print(f'Generating {round(prime_bits, 2)}-bits primes...', end = ' ', flush = True)
tb = time.time()
all_primes = gen_primes(math.floor(2 ** prime_bits), dtype = 'uint32')
print(f'{round(time.time() - tb)} secs.', flush = True)
all_primes_size = len(all_primes.data) * all_primes.dtype.itemsize
all_primes_shname = secrets.token_hex(16).upper()
shp = multiprocessing.shared_memory.SharedMemory(
name = all_primes_shname, create = True, size = all_primes_size)
shp.buf[:all_primes_size] = all_primes.tobytes()
del all_primes
all_primes = np.ndarray((all_primes_size // 4,), dtype = np.uint32, buffer = shp.buf)
print('Using', num_cores, 'cores.', flush = True)
print('\n\n')
offx, tstart, tlast, ptimes = 0, time.time(), time.time(), []
tstats = {'tt': 0.0, 'tc': 0, 'tts': [], 'tta': 0.0, 'lt': 0.0, 'lc': 0, 'lts': [], 'lta': 0.0, 'st': [(0, 0, time.time())], 'tbits': []}
with multiprocessing.Pool(num_cores) as pool:
for e in pool.imap(is_mersenne, ({
'p': int(e), 'all_primes_size': all_primes_size, 'all_primes_shname': all_primes_shname, 'prime_bits': prime_bits,
} for e in all_primes)):
if e[0] is None:
pool.terminate()
print('!!!Exception!!!\n', e[3], sep = '', flush = True)
break
stats = e[2]
def fill(p):
tstats[f'{p}t'] += stats[f'{p}t']
tstats[f'{p}ts'] += [(stats[f'{p}t'], stats[f'{p}te'])]
while len(tstats[f'{p}ts']) > 20 and tstats[f'{p}ts'][-1][1] - tstats[f'{p}ts'][0][1] > 120:
tstats[f'{p}ts'] = tstats[f'{p}ts'][1:]
tstats[f'{p}c'] += 1
tstats[f'{p}ta'] = sum(e[0] for e in tstats[f'{p}ts']) / len(tstats[f'{p}ts'])
if p == 't':
tstats['st'] += [(stats['tt'] + stats.get('lt', 0), stats.get('lt', tstats['st'][-1][1]), stats.get('lte', stats['tte']))]
while len(tstats['st']) > 50 and tstats['st'][-1][2] - tstats['st'][0][2] > 300:
tstats['st'] = tstats['st'][1:]
tstats['sta'] = sum(e[1] for e in tstats['st']) / max(0.001, sum(e[0] for e in tstats['st']))
tstats['tbits'] = (tstats['tbits'] + [stats['tinfo']['bits']])[-20:]
tstats['tbitsa'] = sum(tstats['tbits']) / len(tstats['tbits'])
fill('t')
if 'lt' in stats:
fill('l')
if not e[1]:
s0 = f'{str(e[0]).rjust(6)}| trial:{RoundFix(stats["tt"], 3)}/lucas:' + (f'{RoundFix(stats["lt"], 3)}' if 'lt' in stats else '-' * len(RoundFix(tstats['lta'], 3))) + f' secs (avg t:{RoundFix(tstats["tta"], 3)}/l:{RoundFix(tstats["lta"], 3)}) '
s1 = f'{"".rjust(6)}| cnt t:{tstats["tc"]}({tstats["tc"] - tstats["lc"]})/l:{tstats["lc"]}, tboost:{RoundFix(tstats["sta"], 3)}x tbits:{RoundFix(tstats["tbitsa"], 2)} '
print('\033[2A' + s0[:console_width - 4] + '\n' + s1[:console_width - 4] + '\n', end = '', flush = True)
else:
s = str(e[0]).rjust(6) + ' at ' + str(round(time.time() - tstart)).rjust(6) + ' secs, '
if offx + len(s) <= console_width - 4:
print('\033[3A\033[' + str(offx) + 'C' + s + '\n\n\n', end = '', flush = True)
else:
print('\033[2A' + (' ' * (console_width - 4)) + '\n\033[1A' + s + '\n\n\n', end = '', flush = True)
offx = 0
offx += len(s)
tlast = time.time()
if __name__ == '__main__':
mersennes_v2()
Run Code Online (Sandbox Code Playgroud)
Generating 28-bits primes... 4 secs.
Using 8 cores.
3 at 1 secs, 5 at 1 secs, 7 at 1 secs,
13 at 1 secs, 17 at 1 secs, 19 at 1 secs,
31 at 1 secs, 61 at 1 secs,
| 归档时间: |
|
| 查看次数: |
376 次 |
| 最近记录: |