使用并行查找素数

win*_*sgm 3 c# math parallel-processing primes

所以我创建了下面的方法来查找直到某个数字的所有素数。有关如何加快速度的任何建议?

我这样称呼它;

interval = (value + NOOFTHREADS - 1) / NOOFTHREADS;
int max = interval * NOOFTHREADS;

tickets = new List<int>(NOOFTHREADS);
for (int i = 1; i <= NOOFTHREADS; i++)
{
    tickets.Add(i * (max / NOOFTHREADS));
}

Enumerable.Range(1, NOOFTHREADS)
.AsParallel()
.ForAll(_ => findPrimes());
Run Code Online (Sandbox Code Playgroud)

带有一些全局变量;

static List<int> vals = new List<int>();
static List<int> tickets;
static int interval = new int();
Run Code Online (Sandbox Code Playgroud)

以及方法;

public static void findPrimes()
    {
        int myTicket;
        lock (tickets)
        {
            myTicket = (int)tickets.Last();
            tickets.RemoveAt(tickets.Count - 1);
        }
        var max = myTicket;
        int min = max - interval +1;
        int num;

        var maxSquareRoot = Math.Sqrt(max);
        var eliminated = new System.Collections.BitArray(max + 1);
        eliminated[0] = true;
        eliminated[1] = true;
        for (int i = 2; i < (max) / 2; i++)
        {
            if (!eliminated[i])
            {
                if (i < maxSquareRoot)
                {
                    num = ((min + i -1 )/i)*i;

                    if (num == i)
                        num = num + i;

                    for (int j =num; j <= max; j += i)
                        eliminated[j] = true;
                }
            }
        }
        for (int b = (int)min; b < max; b++)
        {
            if (!eliminated[b])
                lock(vals)
                    vals.Add(b);
        }
    }
Run Code Online (Sandbox Code Playgroud)

Dan*_*her 5

Eratosthenes 的筛选可以很容易地并行化,您只需要将其分成单独的块并单独筛选每个块。您已经开始进行拆分,但还没有走得足够远,无法获得良好的结果。让我们看看有什么问题findPrimes()

var max = myTicket;
int min = max - interval +1;
int num;

var maxSquareRoot = Math.Sqrt(max);
var eliminated = new System.Collections.BitArray(max + 1);
Run Code Online (Sandbox Code Playgroud)

BitArray为每个线程创建一个新线程,涵盖从 0 到 的所有数字max。对于筛选第一个块的线程,这很好,但是对于后面的线程,您分配的内存比需要的要多得多。使用高上限和许多线程,这本身就是一个问题,您(NOOFTHREADS + 1) * limit / 2在只limit需要大约位的地方分配粗略的位。对于更少的线程和/或更低的限制,您仍然在恶化局部性并且会有更多的缓存未命中。

eliminated[0] = true;
eliminated[1] = true;
for (int i = 2; i < (max) / 2; i++)
Run Code Online (Sandbox Code Playgroud)

您应该在 时停止外循环i > maxSquareRoot。然后循环体不再做任何有成效的事情,它只执行一次读取和一两次检查。这并不需要很长时间每次迭代,但这样做对所有i来自?maxmax加起来,如果max是如10 11。单独为最后一个块执行此操作可能比单线程单块筛子花费的时间更长。

{
    if (!eliminated[i])
Run Code Online (Sandbox Code Playgroud)

eliminated[i]只能对i >= min(或i < 2)为真,这种情况您只会在第一个块中遇到i <= maxSquareRoot(除非限制低得离谱)。因此,对于其他块,您还要消除 4, 6, 8, 9, 10, 12, 14, ... 的倍数。浪费了很多工作。

    {
        if (i < maxSquareRoot)
Run Code Online (Sandbox Code Playgroud)

maxSquareRoot恰好是素数的情况下,您没有消除它的平方,比较应该是<=

        {
            num = ((min + i -1 )/i)*i;

            if (num == i)
                num = num + i;

            for (int j =num; j <= max; j += i)
                eliminated[j] = true;
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

现在,当筛分完成时,您将穿过大块 BitArray

for (int b = (int)min; b < max; b++)
{
    if (!eliminated[b])
        lock(vals)
            vals.Add(b);
}
Run Code Online (Sandbox Code Playgroud)

每当你找到一个素数时,你就锁定列表vals并将这个素数添加到其中。如果有两个或更多线程同时完成筛分,它们将在那里相互踩踏,锁定和等待将进一步减慢进程。

为了减少空间使用,每个线程应该创建一个素数列表到maxSquareRoot,并使用它来消除其块中的组合,以便BitArray只需要max - min + 1位。每个线程创建自己的列表都会重复一些工作,但由于这里的上限很小,因此没有太多额外的工作。我不知道如何处理并发读取访问,如果这不会增加同步开销,您也可以只处理所有线程的一个列表,但我怀疑这会有所收获。

代码大致如下:

List<int> sievePrimes = simpleSieve(maxSquareRoot);
// simpleSieve is a standard SoE returning a list of primes not exceeding its argument
var sieve = new System.Collections.BitArray(max - min + 1);
int minSquareRoot = (int)Math.Sqrt(min);
foreach(int p in sievePrimes)
{
    int num = p > minSquareRoot ? p*p : ((min + p - 1)/p)*p;
    num -= min;
    for(; num <= max-min; num += p)
    {
        sieve[num] =true;
    }
}
Run Code Online (Sandbox Code Playgroud)

现在,为了避免线程在将素数添加到列表时相互踩到脚趾,每个线程都应该创建自己的素数列表并在一个步骤中附加它(我不是 100% 确定这比添加每个素数更快它自己的锁,但如果没有我会感到惊讶)

List<int> primes = new List<int>();
for(int offset = 0; offset <= max-min; ++offset)
{
    if (!sieve[offset])
    {
        primes.Add(min + offset);
    }
}
lock(vals) vals.AddRange(primes);
Run Code Online (Sandbox Code Playgroud)

(并且vals应该以大约预期素数的初始容量创建,以避免为每个块重新分配)