最快的内联装配螺旋锁

sig*_*sen 24 c++ x86 assembly spinlock memory-barriers

我正在用c ++编写多线程应用程序,其中性能至关重要.我需要在线程之间复制小结构时使用大量锁定,为此我选择使用自旋锁.

我已经做了一些研究和速度测试,我发现大多数实现大致同样快:

  • MicroFts CRITICAL_SECTION,SpinCount设置为1000,得分约140个单位
  • 使用Microsofts 实现此算法 InterlockedCompareExchange得分约95个时间单位
  • 我也尝试使用一些内联汇编,__asm {}使用类似这样的代码,它得分约70个时间单位,但我不确定是否已创建适当的内存屏障.

编辑:这里给出的时间是2个线程锁定和解锁螺旋锁1,000,000次所需的时间.

我知道这并没有太大的区别,但是由于自旋锁是一个使用频繁的对象,人们会认为程序员会同意以最快的方式制作自旋锁.谷歌搜索导致许多不同的方法.我认为如果使用内联汇编并使用指令而不是比较32位寄存器来实现上述方法将是最快的CMPXCHG8B.此外,必须考虑内存障碍,这可以通过LOCK CMPXHG8B(我认为?)来完成,这保证了内核之间共享内存的"专有权".最后[有人建议]对于繁忙的等待应该伴随NOP:REP,这将使超线程处理器切换到另一个线程,但我不确定这是否是真的?

根据我对不同螺旋锁的性能测试,可以看出没有太大区别,但出于纯粹的学术目的,我想知道哪一个是最快的.但是由于我在汇编语言和内存障碍方面的经验非常有限,如果有人可以为我在LOCK CMPXCHG8B中提供的最后一个示例编写汇编代码并在以下模板中使用适当的内存屏障,我会很高兴:

__asm
{
     spin_lock:
         ;locking code.
     spin_unlock:
         ;unlocking code.
}
Run Code Online (Sandbox Code Playgroud)

Nec*_*lis 10

虽然已经有一个已经接受的答案,但有一些可以用来改进所有答案的东西,从这篇英特尔文章中可以看出,以上都是快速锁定实现:

  1. 在易失性读取上旋转,而不是原子指令,这可以避免不必要的总线锁定,尤其是在高度竞争的锁定上.
  2. 对于竞争激烈的锁使用后退
  3. 内联锁,最好是编译器的内在函数,其中内联asm是有害的(基本上是MSVC).


Olo*_*ell 6

我通常不会抱怨有人努力实现快速代码:这通常是一个非常好的练习,可以更好地理解编程和更快的代码.

我也不会抱怨这里,但我可以毫不含糊地说明快速自旋锁3指令的问题很长或者更多 - 至少在x86架构上 - 是徒劳的追逐.

原因如下:

使用典型代码序列调用自旋锁

lock_variable DW 0    ; 0 <=> free

mov ebx,offset lock_variable
mov eax,1
xchg eax,[ebx]

; if eax contains 0 (no one owned it) you own the lock,
; if eax contains 1 (someone already does) you don't
Run Code Online (Sandbox Code Playgroud)

解放螺旋锁是微不足道的

mov ebx,offset lock_variable
mov dword ptr [ebx],0
Run Code Online (Sandbox Code Playgroud)

xchg指令引发处理器上的锁定引脚,这实际上意味着我希望在接下来的几个时钟周期内使用总线.该信号通过高速缓存进行编织,然后进入最慢的总线主控设备,该设备通常是PCI总线.当每个总线设备完成时,锁定(锁定确认)信号被发回.然后进行实际的交换.问题是锁定/锁定序列需要很长时间.PCI总线可以以33MHz运行,具有几个延迟周期.在3.3 GHz CPU上,这意味着每个PCI总线周期需要100个CPU周期.

根据经验,我认为锁定需要300到3000个CPU周期来完成,最后我不知道我是否会拥有锁.因此,通过"快速"自旋锁可以节省的几个周期将是海市蜃楼,因为没有锁定就像下一个,它将取决于您在短时间内的公交车情况.

________________编辑________________

我刚看到自旋锁是一个"使用频繁的对象".好吧,你显然不明白自旋锁占用了大量的CPU周期.或者,换句话说,每次调用它都会失去大量的处理能力.

使用自旋锁(或其较大的兄弟,关键部分)时的诀窍是尽可能少地使用它们,同时仍然实现预期的程序功能.在整个地方使用它们很容易,因此最终会导致性能低下.

这不仅仅是编写快速代码,也是关于组织数据的.当您编写"在线程之间复制小型结构"时,您应该意识到锁定可能比实际复制需要花费数百倍的时间才能完成.

________________编辑________________

当您计算平均锁定时间时,它可能会说很少,因为它在您的机器上测量可能不是预期目标(可能具有完全不同的总线使用特性).对于您的机器,平均值将由个别非常快的时间(当总线主控活动不干扰时)一直到非常慢的时间(当总线主控干扰很大时).

您可以引入确定最快和最慢情况的代码,并计算商以查看螺旋锁时间可以变化的程度.

________________编辑________________

2016年5月更新.

Peter Cordes提出这样的想法:"在无争用的情况下调整锁定是有意义的",并且在现代CPU上不会发生数百个时钟周期的锁定时间,除非在锁定变量未对齐的情况下.我开始想知道我以前的测试程序 - 用32位Watcom C编写 - 可能会受到WOW64的阻碍,因为它运行在64位操作系统上:Windows 7.

所以我写了一个64位程序并用TDM的gcc 5.3编译它.该程序利用隐式总线锁定指令变量"XCHG r,m"进行锁定,并使用简单分配"MOV m,r"进行解锁.在某些锁定变体中,锁定变量已经过预先测试,以确定是否可以尝试锁定(使用简单的比较"CMP r,m",可能不会在L3外冒险).这里是:

// compiler flags used:

// -O1 -m64 -mthreads -mtune=k8 -march=k8 -fwhole-program -freorder-blocks -fschedule-insns -falign-functions=32 -g3 -Wall -c -fmessage-length=0

#define CLASSIC_BUS_LOCK
#define WHILE_PRETEST
//#define SINGLE_THREAD

typedef unsigned char      u1;
typedef unsigned short     u2;
typedef unsigned long      u4;
typedef unsigned int       ud;
typedef unsigned long long u8;
typedef   signed char      i1;
typedef          short     i2;
typedef          long      i4;
typedef          int       id;
typedef          long long i8;
typedef          float     f4;
typedef          double    f8;

#define usizeof(a) ((ud)sizeof(a))

#define LOOPS 25000000

#include <stdio.h>
#include <windows.h>

#ifndef bool
typedef signed char bool;
#endif

u8 CPU_rdtsc (void)
{
  ud tickl, tickh;
  __asm__ __volatile__("rdtsc":"=a"(tickl),"=d"(tickh));
  return ((u8)tickh << 32)|tickl;
}

volatile u8 bus_lock (volatile u8 * block, u8 value)
{
  __asm__ __volatile__( "xchgq %1,%0" : "=r" (value) : "m" (*block), "0" (value) : "memory");

  return value;
}

void bus_unlock (volatile u8 * block, u8 value)
{
  __asm__ __volatile__( "movq %0,%1" : "=r" (value) : "m" (*block), "0" (value) : "memory");
}

void rfence (void)
{
  __asm__ __volatile__( "lfence" : : : "memory");
}

void rwfence (void)
{
  __asm__ __volatile__( "mfence" : : : "memory");
}

void wfence (void)
{
  __asm__ __volatile__( "sfence" : : : "memory");
}

volatile bool LOCK_spinlockPreTestIfFree (const volatile u8 *lockVariablePointer)
{
  return (bool)(*lockVariablePointer == 0ull);
}

volatile bool LOCK_spinlockFailed (volatile u8 *lockVariablePointer)
{
  return (bool)(bus_lock (lockVariablePointer, 1ull) != 0ull);
}

void LOCK_spinlockLeave (volatile u8 *lockVariablePointer)
{
  *lockVariablePointer = 0ull;
}

static volatile u8 lockVariable = 0ull,
                   lockCounter =  0ull;

static volatile i8 threadHold = 1;

static u8 tstr[4][32];    /* 32*8=256 bytes for each thread's parameters should result in them residing in different cache lines */

struct LOCKING_THREAD_STRUCTURE
{
  u8 numberOfFailures, numberOfPreTests;
  f8 clocksPerLock, failuresPerLock, preTestsPerLock;
  u8 threadId;
  HANDLE threadHandle;
  ud idx;
} *lts[4] = {(void *)tstr[0], (void *)tstr[1], (void *)tstr[2], (void *)tstr[3]};

DWORD WINAPI locking_thread (struct LOCKING_THREAD_STRUCTURE *ltsp)
{
  ud n = LOOPS;
  u8 clockCycles;

  SetThreadAffinityMask (ltsp->threadHandle, 1ull<<ltsp->idx);

  while (threadHold) {}

  clockCycles = CPU_rdtsc ();
  while (n)
  {
    Sleep (0);

#ifdef CLASSIC_BUS_LOCK
    while (LOCK_spinlockFailed (&lockVariable)) {++ltsp->numberOfFailures;}
#else
#ifdef WHILE_PRETEST
    while (1)
    {
      do
      {
        ++ltsp->numberOfPreTests;
      } while (!LOCK_spinlockPreTestIfFree (&lockVariable));

      if (!LOCK_spinlockFailed (&lockVariable)) break;
      ++ltsp->numberOfFailures;
    }
#else
    while (1)
    {
      ++ltsp->numberOfPreTests;
      if (LOCK_spinlockPreTestIfFree (&lockVariable))
      {
        if (!LOCK_spinlockFailed (&lockVariable)) break;
        ++ltsp->numberOfFailures;
      }
    }
#endif
#endif
    ++lockCounter;
    LOCK_spinlockLeave (&lockVariable);

#ifdef CLASSIC_BUS_LOCK
    while (LOCK_spinlockFailed (&lockVariable)) {++ltsp->numberOfFailures;}
#else
#ifdef WHILE_PRETEST
    while (1)
    {
      do
      {
        ++ltsp->numberOfPreTests;
      } while (!LOCK_spinlockPreTestIfFree (&lockVariable));

      if (!LOCK_spinlockFailed (&lockVariable)) break;
      ++ltsp->numberOfFailures;
    }
#else
    while (1)
    {
      ++ltsp->numberOfPreTests;
      if (LOCK_spinlockPreTestIfFree (&lockVariable))
      {
        if (!LOCK_spinlockFailed (&lockVariable)) break;
        ++ltsp->numberOfFailures;
      }
    }
#endif
#endif
    --lockCounter;
    LOCK_spinlockLeave (&lockVariable);

    n-=2;
  }
  clockCycles = CPU_rdtsc ()-clockCycles;

  ltsp->clocksPerLock =   (f8)clockCycles/           (f8)LOOPS;
  ltsp->failuresPerLock = (f8)ltsp->numberOfFailures/(f8)LOOPS;
  ltsp->preTestsPerLock = (f8)ltsp->numberOfPreTests/(f8)LOOPS;

//rwfence ();

  ltsp->idx = 4u;

  ExitThread (0);
  return 0;
}

int main (int argc, char *argv[])
{
  u8 processAffinityMask, systemAffinityMask;

  memset (tstr, 0u, usizeof(tstr));

  lts[0]->idx = 3;
  lts[1]->idx = 2;
  lts[2]->idx = 1;
  lts[3]->idx = 0;

  GetProcessAffinityMask (GetCurrentProcess(), &processAffinityMask, &systemAffinityMask);

  SetPriorityClass (GetCurrentProcess(), HIGH_PRIORITY_CLASS);
  SetThreadAffinityMask (GetCurrentThread (), 1ull);

  lts[0]->threadHandle = CreateThread (NULL, 65536u, (void *)locking_thread, (void *)lts[0], 0, (void *)&lts[0]->threadId);
#ifndef SINGLE_THREAD
  lts[1]->threadHandle = CreateThread (NULL, 65536u, (void *)locking_thread, (void *)lts[1], 0, (void *)&lts[1]->threadId);
  lts[2]->threadHandle = CreateThread (NULL, 65536u, (void *)locking_thread, (void *)lts[2], 0, (void *)&lts[2]->threadId);
  lts[3]->threadHandle = CreateThread (NULL, 65536u, (void *)locking_thread, (void *)lts[3], 0, (void *)&lts[3]->threadId);
#endif

  SetThreadAffinityMask (GetCurrentThread (), processAffinityMask);

  threadHold = 0;

#ifdef SINGLE_THREAD
  while (lts[0]->idx<4u) {Sleep (1);}
#else
  while (lts[0]->idx+lts[1]->idx+lts[2]->idx+lts[3]->idx<16u) {Sleep (1);}
#endif

  printf ("T0:%1.1f,%1.1f,%1.1f\n", lts[0]->clocksPerLock, lts[0]->failuresPerLock, lts[0]->preTestsPerLock);
  printf ("T1:%1.1f,%1.1f,%1.1f\n", lts[1]->clocksPerLock, lts[1]->failuresPerLock, lts[1]->preTestsPerLock);
  printf ("T2:%1.1f,%1.1f,%1.1f\n", lts[2]->clocksPerLock, lts[2]->failuresPerLock, lts[2]->preTestsPerLock);
  printf ("T3:%1.1f,%1.1f,%1.1f\n", lts[3]->clocksPerLock, lts[3]->failuresPerLock, lts[3]->preTestsPerLock);

  printf ("T*:%1.1f,%1.1f,%1.1f\n", (lts[0]->clocksPerLock+  lts[1]->clocksPerLock+  lts[2]->clocksPerLock+  lts[3]->clocksPerLock)/  4.,
                                    (lts[0]->failuresPerLock+lts[1]->failuresPerLock+lts[2]->failuresPerLock+lts[3]->failuresPerLock)/4.,
                                    (lts[0]->preTestsPerLock+lts[1]->preTestsPerLock+lts[2]->preTestsPerLock+lts[3]->preTestsPerLock)/4.);

  printf ("LC:%u\n", (ud)lockCounter);

  return 0;
}
Run Code Online (Sandbox Code Playgroud)

该程序在基于DELL i5-4310U的计算机上运行,​​该计算机具有DDR3-800,2核/ 2 HT,能够达到2.7GHz,并具有通用L3缓存.

首先看来,WOW64的影响可以忽略不计.

执行无竞争锁定/解锁的单个线程能够每110个周期执行一次.调整无竞争锁是没用的:为增强单个XCHG指令而添加的任何代码只会使其变慢.

有四个HT用锁定尝试轰击锁变量,情况发生了根本性的变化.实现成功锁定所需的时间跳转到994个周期,其中很大一部分可归因于2.2失败的锁定尝试.换句话说,在高争用的情况下,必须尝试3.2锁才能实现锁定成功.显然110个周期没有变成110*3.2但接近110*9.所以其他机制在这里起作用就像旧机器上的测试一样.此外,平均994个循环包括716和1157之间的范围

实施预测试的锁定变体需要最简单的变体(XCHG)所需的约95%的循环.平均而言,他们将执行17次CMP,以发现尝试1.75锁定是可行的,其中1次成功.我建议使用预测试不仅因为它更快:它对总线锁定机制施加的压力更小(3.2-1.75 = 1.45减少锁定尝试),即使它稍微增加了复杂性.

  • @Peter Cordes:那么为什么 AF 在 instructions_tables.pdf 第 1 页的底部写道:“即使在单处理器系统上,一个 LOCK 前缀通常也会花费超过一百个时钟周期。这也适用于具有内存操作数。”?我想阿格纳·福格既可以被认为是对的,也可以被认为是错的……是哪一个呢? (2认同)
  • OP仍然写道:"我需要在线程之间复制小结构时使用大量锁定,为此我选择使用自旋锁".虽然我的代码显示四个内核中的每一个都有可能每秒实现270万次锁定/解锁,但是OP使用这么多的内核并不是不可思议的,以至于他的多线程代码在没有锁定的情况下会更好. (2认同)

amd*_*mdn 5

维基百科有一篇关于自旋锁的好文章,这里是x86实现

http://en.wikipedia.org/wiki/Spinlock#Example_implementation

注意它们的实现不使用"lock"前缀,因为它在x86上对于"xchg"指令来说是冗余的 - 它隐含地具有锁定语义,如Stackoverflow讨论中所讨论的:

在多核x86上,是否需要作为XCHG前缀的LOCK?

REP:NOP是PAUSE指令的别名,您可以在此处了解更多相关信息

x86如何在spinlock*中暂停指令,*可以在其他场景中使用吗?

关于记忆障碍问题,这里有你可能想知道的一切

内存障碍:Paul E. McKenney为软件黑客提供的硬件视图

http://irl.cs.ucla.edu/~yingdi/paperreading/whymb.2010.06.07c.pdf


hus*_*sik 3

看看这里: x86 spinlock using cmpxchg

感谢科里·尼尔森

__asm{
spin_lock:
xorl %ecx, %ecx
incl %ecx
spin_lock_retry:
xorl %eax, %eax
lock; cmpxchgl %ecx, (lock_addr)
jnz spin_lock_retry
ret

spin_unlock:
movl $0 (lock_addr)
ret
}
Run Code Online (Sandbox Code Playgroud)

另一个消息来源说: http ://www.geoffchappell.com/studies/windows/km/cpu/cx8.htm

       lock    cmpxchg8b qword ptr [esi]
is replaceable with the following sequence

try:
        lock    bts dword ptr [edi],0
        jnb     acquired
wait:
        test    dword ptr [edi],1
        je      try
        pause                   ; if available
        jmp     wait

acquired:
        cmp     eax,[esi]
        jne     fail
        cmp     edx,[esi+4]
        je      exchange

fail:
        mov     eax,[esi]
        mov     edx,[esi+4]
        jmp     done

exchange:
        mov     [esi],ebx
        mov     [esi+4],ecx

done:
        mov     byte ptr [edi],0
Run Code Online (Sandbox Code Playgroud)

这里是关于无锁与锁实现的讨论: http://newsgroups.derkeiler.com/Archive/Comp/comp.programming.threads/2011-10/msg00009.html


归档时间:

查看次数:

10834 次

最近记录:

9 年,3 月 前