内存屏障在无锁队列中使用

16 c concurrency 64-bit lock-free memory-barriers

我最近阅读了Paul McKenney的2010年白皮书"内存障碍:软件黑客的硬件视图".

我非常感谢关于下面给出的一小部分C代码的反馈/评论/指导,这些代码实现了M&S队列排队功能,特别是在内存和编译器障碍方面.

此代码使用指针计数器对来处理ABA,并且为了这篇文章,应该被认为是为x86/x64编写的.

对于这篇文章,内联评论现在都写完了,并且是这篇文章的一部分,因为它们表达了我现在的想法.

为简洁起见,我在结构中删除了断言代码和缓存行填充等.

目前,我认为代码很破碎,但我想确保我认为正确的原因.


#define PAC_SIZE 2

struct lfds_queue_element
{
  struct lfds_queue_element
    *volatile next[PAC_SIZE];

  void
    *user_data;
};

struct lfds_queue_state
{
  struct lfds_queue_element
    *volatile enqueue[PAC_SIZE];

  struct lfds_queue_element
    *volatile dequeue[PAC_SIZE];

  atom_t volatile
    aba_counter;
};

void lfds_queue_internal_dcas_pac_enqueue( struct lfds_queue_state *lqs, struct lfds_queue_element *lqe )
{
  ALIGN(ALIGN_DOUBLE_POINTER) struct lfds_queue_element
    *local_lqe[PAC_SIZE], *enqueue[PAC_SIZE], *next[PAC_SIZE];
  unsigned char cas_result = 0;
  unsigned int backoff_iteration = 1;

  /* TRD : here we have been passed a new element to place
           into the queue; we initialize it and its next
           pointer/counter pair
  */

  local_lqe[POINTER] = lqe;
  local_lqe[COUNTER] = (struct lfds_queue_element *) lfds_abstraction_atomic_increment( &lqs->aba_counter );

  local_lqe[POINTER]->next[POINTER] = NULL;
  local_lqe[POINTER]->next[COUNTER] = (struct lfds_queue_element *) lfds_abstraction_atomic_increment( &lqs->aba_counter );

  /* TRD : now, I think there is a issue here, in that these values
           are by no means yet necessarily visible to other cores

           however, they only need to be visible once
           the element has entered the queue, and for that
           to happen, the contigious double-word CAS must
           have occurred - and on x86/x64, this carries
           with it an mfence

           however, that mfence will only act to empty our
           local store buffer - it will not cause other cores
           to flush their invalidation queues, so surely
           it can all still go horribly wrong?

           ah, but if all other cores are only accessing
           these variables using atomic operations, they
           too will be issuing mfences and so at that
           point flushing their invalidate queues
  */

  do
  {
    enqueue[COUNTER] = lqs->enqueue[COUNTER];
    enqueue[POINTER] = lqs->enqueue[POINTER];

    next[COUNTER] = enqueue[POINTER]->next[COUNTER];
    next[POINTER] = enqueue[POINTER]->next[POINTER];

    /* TRD : now, this is interesting

             we load the enqueue pointer and its next pointer
             we then (immediately below) check to see they're unchanged

             but this check is totally bogus!  we could be reading
             old values from our cache, where our invalidate queue
             has not been processed, so the initial read contains
             old data *and* we then go ahead and check from our cache
             the same old values a second time

             what's worse is that I think we could also read the correct
             values for enqueue but an incorrect (old) value for its
             next pointer...!

             so, in either case, we easily mistakenly pass the if()
             and then enter into code which does things to the queue

             now, in both cases, the CAS will mfence, which will
             cause us to see from the data structure the true
             values, but how much will that help us - we need
             to look to see what is actually being done

             the if() checks next[POINTER] is NULL

             if we have read a NULL for next, then we think
             the enqueue pointer is correcly placed (it's not
             lagging behind) so we don't need to help; we then
             try to add our element to the end of the queue

             now, it may be we have read enqueue properly but
             next improperly and so we now try to add our element
             where it will in fact truncate the queue!

             the CAS however will mfence and so at this point
             we will actually see the correct value for enqueue-next,
             and this will prevent that occurring

             if we look now at the else clause, here we have seen
             that enqueue->next is not NULL, so the enqueue pointer
             is out of place and we need to help, which we do by
             moving it down the queue

             here though we could have read enqueue correctly
             but next incorrectly; the CAS will mfence, which will
             update the cache, but since we're only comparing
             the enqueue pointer with our copy of the enqueue
             pointer, the fact our next pointer is wrong won't
             change!  so here, we move the enqueue pointer to
             some old element - which although it might be in the
             queue (so this would be an inefficiency, you'd have
             to do a bunch more queue walking to get the enqueue
             pointer to the final element) it might not be, too!
             it could in the meantime have been dequeued and
             that of course would be death
    */

    if( lqs->enqueue[POINTER] == enqueue[POINTER] and lqs->enqueue[COUNTER] == enqueue[COUNTER] )
    {
      if( next[POINTER] == NULL )
      {
        local_lqe[COUNTER] = next[COUNTER] + 1;
        cas_result = lfds_abstraction_atomic_dcas_with_backoff( (atom_t volatile *) enqueue[POINTER]->next, (atom_t *) local_lqe, (atom_t *) next, &backoff_iteration );
      }
      else
      {
        next[COUNTER] = enqueue[COUNTER] + 1;
        lfds_abstraction_atomic_dcas( (atom_t volatile *) lqs->enqueue, (atom_t *) next, (atom_t *) enqueue );
      }
    }
  }
  while( cas_result == 0 );

  local_lqe[COUNTER] = enqueue[COUNTER] + 1;

  lfds_abstraction_atomic_dcas( (atom_t volatile *) lqs->enqueue, (atom_t *) local_lqe, (atom_t *) enqueue );

  return;
}
Run Code Online (Sandbox Code Playgroud)

MSN*_*MSN 4

CAS 是原子性的,因此如果一个线程成功而另一个线程尝试,则另一个线程将失败并重试。

仅当所有线程尝试使用相同的机制访问相同的内存时(即它们都使用 CAS 访问),它才有效。如果不这样做,与 CAS(在本例中为内存栅栏)相关的保证就会消失。