如何锁定阵列中某个元素的MUTEX,而不是整个阵列

use*_*584 5 c++ multithreading mutex pthreads

问题的简短版本:我有2个共享同一数组的函数,当一个正在编辑它时,另一个正在读取它。但是,向量很长(5000个样本),并且很少发生并发访问。但是启用Mutex的竞争MUTEX1使该程序变慢了。'

我如何锁定内存的某些位置而不是整个块,以减少争用?

编辑:注意:我必须尽可能使用更新的G值。

EDIT2:例如,我的数组G的长度为5000。foo1锁定mutex1编辑索引124。尽管foo2要编辑索引2349,但要等到foo1release 才可以mutex1

有什么办法可以将锁定互斥锁的争论降低到元素级别?含义:仅当他们想要编辑相同的索引时,我才希望foo2foo1仅在相同的互斥体上竞争。例如:foo1想要编辑索引3156,并foo2想要编辑索引3156。

带代码说明的长版:我正在编写用于复杂数学函数的代码,并且使用pthread来并行处理代码并提高性能。代码非常复杂,我可以发布它,但是可以将模型发布到代码中。

基本上,我想使用2个并行运行的线程来编辑2个数组。一个线程运行foo1,另一个线程运行foo2。但是,他们应该在一个特定的顺序运行,我用mutexES( ,_B_A1_A2)以受让人的序列。它如下:

foo1 (first half)
foo2 (first half) and foo1 (second half) (in parallel)
foo1 (first half) and foo2 (second half) (in parallel)
...
foo2(second half)
Run Code Online (Sandbox Code Playgroud)

然后我将检索结果。在第一部分中,foo1我将使用结果,G1因为可能会同时编辑foo2。因此,我Mutex1用来保护它。同样发生在foo2G。但是,将整个向量锁定为1值非常有效,它们几乎永远不会在同一时间编辑相同的存储位置。当我比较结果时,几乎总是一样。我想一次锁定一个元素的方法,以便它们只争用同一元素。

我将为有兴趣了解其工作原理的人介绍该代码:

#include <pthread.h>
#include <iostream>

using namespace std;

#define numThreads 2
#define Length 10000

pthread_t threads[numThreads];

pthread_mutex_t mutex1   = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t Mutex_B  = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t Mutex_A1 = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t Mutex_A2 = PTHREAD_MUTEX_INITIALIZER;

struct data_pointers
{
    double  *A;
    double  *B;
    double  *G;
    double  *L;
    int idxThread;
};

void foo1   (data_pointers &data);
void foo2   (data_pointers &data);

void *thread_func(void *arg){
    data_pointers data = *((data_pointers *) arg);
    if (data.idxThread==0)
        foo1 (data);
    else
        foo2 (data);
}
Run Code Online (Sandbox Code Playgroud)

到这里为止是我定义Length 10000和记住的定义和线程调用函数numThreads 2

void foo1 ( data_pointers &data)
{
    double *A           = data.A;
    double *L           = data.L; 
    double *G           = data.G; 
    double U;

    for (int ijk =0;ijk<5;ijk++){
        /* here goes some definitions*/

        pthread_mutex_lock(&Mutex_A1);

        for (int k =0;k<Length;k++){
            pthread_mutex_lock(&mutex1); 
            U = G[k];
            pthread_mutex_unlock(&mutex1);
            /*U undergoes a lot of mathematical operations here


            */
        }

        pthread_mutex_lock(&Mutex_B);
        pthread_mutex_unlock(&Mutex_A2);
        for (int k =0;k<Length;k++){
            /*U another mathematical operations here


            */
            pthread_mutex_lock(&mutex1);
            L[k] = U;
            pthread_mutex_unlock(&mutex1);
            pthread_mutex_unlock(&Mutex_B);
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

在foo1中,我锁定mutexA1并完成了工作,然后锁定MutexB并解锁,MutexA2因此foo2可以开始工作。请注意,main从锁定开始MutexA2。这样,我garantee会foo1mutexB锁定的后半部分开始,这样,foo2直到foo1解锁后才能进入函数的后半部分mutexB

void foo2 (data_pointers &data)
{
    double *A           = data.A;
    double *L           = data.L; 
    double *G           = data.G; 
    double U;

    for (int ijk =0;ijk<5;ijk++){
        /* here goes some definitions*/

        pthread_mutex_lock(&Mutex_A1);

        for (int k =0;k<Length;k++){
            pthread_mutex_lock(&mutex1); 
            U = G[k];
            pthread_mutex_unlock(&mutex1);
            /*U undergoes a lot of mathematical operations here


            */
        }

        pthread_mutex_lock(&Mutex_B);
        pthread_mutex_unlock(&Mutex_A2);
        for (int k =0;k<Length;k++){        
            /*U another mathematical operations here


            */
            pthread_mutex_lock(&mutex1);
            L[k] = U;
            pthread_mutex_unlock(&mutex1);
            pthread_mutex_unlock(&Mutex_B);

        }
    }
}
Run Code Online (Sandbox Code Playgroud)

现在,在foo1解锁时,mutexB它必须等待foo2解锁mutexA1才能工作,foo2只有mutexA2在已经解锁时才解锁mutexB

持续进行5次。

int main(){
    double G1[Length];
    double G2[Length];
    double B1[Length];
    double B2[Length];
    double A2[Length];
    double A1[Length];
    data_pointers data[numThreads];

    data[0].L           = G2;
    data[0].G           = G1;   
    data[0].A           = A1;
    data[0].B           = B1;
    data[0].idxThread   = 0;

    data[1].L           = G1;
    data[1].G           = G2;   
    data[1].A           = A2;
    data[1].B           = B2;
    data[1].idxThread   = 1;

    pthread_mutex_lock(&Mutex_A2);

    pthread_create(&(threads[0]), NULL, thread_func, (void *) &(data[0]));
    pthread_create(&(threads[1]), NULL, thread_func, (void *) &(data[1]));
    pthread_join(threads[1], NULL);
    pthread_join(threads[0], NULL);

    pthread_mutex_unlock(&Mutex_A1);
    pthread_mutex_unlock(&Mutex_A2);

    return 0;
}
Run Code Online (Sandbox Code Playgroud)

请注意,这只是一个示例代码。可以按预期进行编译和工作,但是没有输出。

最后编辑:谢谢大家的好主意,我有很多经验,按照这些建议很有趣。我将对所有有用的答案进行投票,并选择最接近原始问题(原子性)的答案

bku*_*ytt 2

使用原子指针“锁定”内存中某些位置的示例代码:

#include <vector>
#include <atomic>
#include <thread>

using container = std::vector<std::atomic<double>>;
using container_size_type = container::size_type;

container c(300);

std::atomic<container::pointer> p_busy_elem{ nullptr };

void editor()
{
    for (container_size_type i{ 0 }, sz{ c.size() }; i < sz; ++i)
    {
        p_busy_elem.exchange(&c[i]); // c[i] is busy
        // ... edit c[i] ... // E: calculate a value and assign it to c[i]
        p_busy_elem.exchange(nullptr); // c[i] is no longer busy
    }
}

void reader()
{
    for (container_size_type i{ 0 }, sz{ c.size() }; i < sz; ++i)
    {
        // A1: wait for editor thread to finish editing value
        while (p_busy_elem == &c[i])
        {
            // A2: room a better algorithm to prevent blocking/yielding
            std::this_thread::yield();
        }

        // B: if c[i] is updated in between A and B, this will load the latest value
        auto value = c[i].load();

        // C: c[i] might have changed by this time, but we had the most up to date value we could get without checking again
        // ... use value ...
    }
}

int main()
{
    std::thread t_editor{ editor };
    std::thread t_reader{ reader };
    t_editor.join();
    t_reader.join();
}
Run Code Online (Sandbox Code Playgroud)

在编辑器线程中,设置 busy 指针以指示当前正在编辑该内存位置 ( E )。如果线程 B 在设置 busy 指针后尝试读取该值,它将等到编辑完成后再继续 ( A1 )。

A2上的注释:可以在此处放置更好的系统。可以保留尝试读取时繁忙的节点列表,然后我们将添加i到该列表并尝试稍后处理该列表。好处:可以告诉循环执行 a并且读取continue当前正在编辑的索引之后的索引。i

制作要读取的值的副本 ( B ),以便根据需要使用它 ( C )。这是我们最后一次检查 的最新值c[i]