C&低级信号量实现

Jum*_*ays 6 c assembly multithreading mutex semaphore

我正在考虑如何使用尽可能少的asm代码实现信号量(不是二进制).
我没有成功地在没有使用互斥锁的情况下思考和编写它,所以这是迄今为止我能做的最好的事情:

全球:

#include <stdlib.h>
#include <pthread.h>
#include <stdatomic.h>
#include <stdbool.h>

typedef struct
{
    atomic_ullong    value;
    pthread_mutex_t *lock_op;
    bool             ready;
} semaphore_t;

typedef struct
{
    atomic_ullong   value;
    pthread_mutex_t lock_op;
    bool            ready;
} static_semaphore_t;

 /* use with static_semaphore_t */
#define SEMAPHORE_INITIALIZER(value) = {value, PTHREAD_MUTEX_INITIALIZER, true}
Run Code Online (Sandbox Code Playgroud)


功能:

bool semaphore_init(semaphore_t *semaphore, unsigned long long init_value)
{   
    if(semaphore->ready) if(!(semaphore->lock_op = \
                             calloc(1, sizeof(pthread_mutex_t)))) return false;
    else                 pthread_mutex_destroy(semaphore->lock_op);   

    if(pthread_mutex_init(semaphore->lock_op, NULL))
            return false;

    semaphore->value = init_value;
    semaphore->ready = true;
    return true;
}

bool semaphore_wait(semaphore_t *semaphore)
{
    if(!semaphore->ready) return false;

    pthread_mutex_lock(&(semaphore->lock_op));
    while(!semaphore->value) __asm__ __volatile__("nop");
    (semaphore->value)--;
    pthread_mutex_unlock(&(semaphore->lock_op));
    return true;
}

bool semaphore_post(semaphore_t *semaphore)
{
    if(!semaphore->ready) return false;

    atomic_fetch_add(&(semaphore->value), (unsigned long long) 1);
    return true;
}
Run Code Online (Sandbox Code Playgroud)


是否可以仅使用几行,使用原子内置函数或直接在汇编中实现信号量(例如lock cmpxchg)?

看看<bits/sempahore.h>包含<semaphore.h> 它的sem_t结构在我看来,它被选择了一个非常不同的路径......

typedef union
{
    char __size[__SIZEOF_SEM_T];
    long int __align;
} sem_t;
Run Code Online (Sandbox Code Playgroud)




更新:

@PeterCordes提出了一个更好的解决方案,使用原子,没有互斥,直接对信号量值进行检查.

我仍然希望更好地理解在性能方面改进代码的机会,利用内置的暂停函数或内核调用来避免CPU浪费,等待关键资源可用.

为了进行比较,使用互斥锁和非二进制信号量的标准实现也会很不错.
futex(7)我读到:"Linux内核提供了futexes("快速用户空间互斥")作为快速用户空间锁定和信号量的构建块.Futexes非常基础,非常适合构建更高级别的锁定诸如互斥体,条件变量,读写锁,障碍和信号量之类的抽象."

Pet*_*des 9

请参阅部分内容,了解我的最小天真信号量实现可能有效.它编译并适合x86.对于任何C11实现,我认为这是正确的.


IIRC,可以用一个整数实现计数锁(也称为信号量),您可以使用原子操作访问它.该维基百科链接甚至为up/ 提供算法down.您不需要单独的互斥锁.如果atomic_ullong需要一个互斥锁来支持目标CPU上的原子递增/递减,它将包含一个.(在32位x86上可能就是这种情况,或者实现使用慢速cmpxchg8而不是快速lock xadd.32位计数器对于你的信号量来说真的太小吗?因为64位原子在32位机器上会慢一些.)

<bits/sempahore.h>联盟的定义显然只是一个不透明的POD类型有正确的尺寸,不表示实际执行的.


正如@David Schwartz所说,除非你是专家,否则在实际使用中实现自己的锁定是一件愚蠢的事.然而,这可能是一种有趣的方式来了解原子操作并找出标准实现中的内幕.请注意他的注意事项,锁定实现很难测试.您可以使用当前版本的编译器编写适用于您的测试用例的代码,并使用您选择的编译选项...


ready布尔是空间只是一个总的浪费.如果您可以正确初始化ready标志以使其对于查看它的函数有意义,那么您可以将其他字段初始化为理智的初始状态.

您的代码还有一些我注意到的其他问题:

#define SEMAPHORE_INITIALIZER(value) = {value, PTHREAD_MUTEX_INITIALIZER, true};

static_semaphore_t my_lock = SEMAPHORE_INITIALIZER(1);
// expands to my_lock = = {1, PTHREAD_MUTEX_INITIALIZER, true};;
// you should leave out the = and ; in the macro def so it works like a value
Run Code Online (Sandbox Code Playgroud)

使用动态分配pthread_mutex_t *lock_op只是愚蠢的.使用值,而不是指针.大多数锁定函数都使用互斥锁,因此额外的间接级别会减慢速度.记忆与计数器一起存在会好得多.互斥体不需要很大的空间.


while(!semaphore->value) __asm__ __volatile__("nop");
Run Code Online (Sandbox Code Playgroud)

我们希望这个循环避免浪费功率并减慢其他线程,甚至其他逻辑线程与超线程共享相同的核心.

nop不会使繁忙等待循环减少CPU密集.即使使用超线程,它也许在x86上没有任何区别,因为整个循环体仍然可能适合4个uop,因此每个时钟在一次迭代中发出是否存在nop.在nop不需要执行单元,所以至少它不会伤害.这个自旋循环发生在持有互斥锁的情况下,这似乎很愚蠢.所以第一个服务员将进入这个旋转循环,而服务员之后会旋转互斥锁.


这是我对信号量的天真实现,只使用C11原子操作

我认为这是一个很好的实现,它实现了非常有限的目标:正确和小(源代码和机器代码),而不是使用其他实际的锁定原语.我甚至没有尝试解决的主要领域(例如公平/饥饿,将CPU交给其他线程,可能是其他东西).

看看godbolt上asm输出:只有12 x86个insn down,2个用于up(包括rets).Godbolt的非x86编译器(ARM/ARM64/PPC的gcc 4.8)太旧了,无法支持C11 <stdatomic.h>.(但他们确实有C++ std::atomic).所以我很遗憾无法轻易检查非x86上的asm输出.

#include <stdatomic.h>
#define likely(x)       __builtin_expect(!!(x), 1)
#define unlikely(x)     __builtin_expect(!!(x), 0)

typedef struct {
  atomic_int val;   // int is plenty big.  Must be signed, so an extra decrement doesn't make 0 wrap to >= 1
} naive_sem_t;

#if defined(__i386__) || defined(__x86_64__)
#include <immintrin.h>
static inline void spinloop_body(void) { _mm_pause(); }  // PAUSE is "rep nop" in asm output
#else
static inline void spinloop_body(void) { }
#endif

void sem_down(naive_sem_t *sem)
{
  while (1) {
    while (likely(atomic_load_explicit(&(sem->val), memory_order_acquire ) < 1))
        spinloop_body();  // wait for a the semaphore to be available
    int tmp = atomic_fetch_add_explicit( &(sem->val), -1, memory_order_acq_rel );  // try to take the lock.  Might only need mo_acquire
    if (likely(tmp >= 1))
        break;              // we successfully got the lock
    else                    // undo our attempt; another thread's decrement happened first
        atomic_fetch_add_explicit( &(sem->val), 1, memory_order_release ); // could be "relaxed", but we're still busy-waiting and want other thread to see this ASAP
  }
}
// note the release, not seq_cst.  Use a stronger ordering parameter if you want it to be a full barrier.
void sem_up(naive_sem_t *sem) {
    atomic_fetch_add_explicit(&(sem->val), 1, memory_order_release);
}
Run Code Online (Sandbox Code Playgroud)

这里的诀窍是val暂时太低是可以的 ; 这只是让其他线程旋转.还要注意,fetch_add单个原子操作是关键.它返回旧值,因此我们可以检测valwhile循环的load和fetch_add之间的另一个线程何时占用.(注意,我们不需要检查tmp= =到while循环的加载:如果另一个线程up编写了load和fetch_add之间的信号量,那就没问题了.这对使用fetch_add而不是cmpxchg是有好处的).

atomic_load自旋循环刚刚超过其所有的服务员做原子的读-修改-写入性能优化val.(虽然许多服务员试图决定然后撤销公司,但让服务员看到锁解锁可能非常罕见).

一个真正的实现将有更多平台的特殊东西,而不仅仅是x86.对于x86,可能不仅仅是PAUSEspinloop中的指令.这仍然是完全便携式C11实现的玩具示例. PAUSE显然有助于避免对内存排序的错误推测,因此CPU在离开自旋循环后运行效率更高. pause与为操作不同的线程的操作系统产生逻辑CPU不同.它也与正确性和memory_order_???参数选择无关.

在经过一定数量的旋转迭代之后,真正的实现可能会将CPU放弃到操作系统(sched_yield(2)或者更可能是futex系统调用,见下文).也许使用x86 MONITOR/ MWAIT更加超线程友好; 我不确定.我没有实现锁定自己的真实,我只是在查找其他insn时在x86 insn参考中看到所有这些东西.


如前所述,x86的lock xadd指令实现fetch_add(具有顺序一致性语义,因为locked指令始终是完整的内存屏障).在非x86上,仅对fetch_add使用获取+释放语义,而不是完全顺序一致性可能允许更高效的代码.我不确定,但使用它acquire很可能会在ARM64上提供更高效的代码.我想我们只需要acquirefetch_add,而不是acq_rel,但我不确定.在x86上,代码没有任何区别,因为locked指令是进行原子读 - 修改 - 写的唯一方法,所以即使relaxed是相同的seq_cst(编译时重新排序除外).


如果你想要产生CPU而不是旋转,你需要一个系统调用(正如人们所说).显然,在Linux上尽可能高效地进行标准库锁定已经付出了很多努力.有一些专用的系统调用可以帮助内核在释放锁时唤醒正确的线程,并且它们不易使用. 来自futex(7):

注意
重申一下,裸的futexes并不是最终用户易于使用的抽象.(在glibc中没有用于此系统调用的包装函数.)实现者应该具有汇编语言并且已经读取了下面引用的futex用户空间库的源代码.


公平/饥饿(我天真的实施忽略了)

正如维基百科文章所提到的,某种唤醒队列是一个好主意,因此同一个线程不会每次都获得信号量.(释放后快速锁定的代码通常会释放释放线程,而其他线程仍处于休眠状态).

这是该过程中内核协作的另一个主要好处(futex).

  • 我不得不说我不想使用我自己的实现,但我想深入理解这个主题,所以挑战在于找出如何使用尽可能少的代码尽可能更有效地实现我自己的信号量.也许能够将差异与实际标准实施进行比较. (2认同)