如何在C中实现循环缓冲区?

pax*_*blo 66 c circular-buffer data-structures

我需要一个固定大小(在创建它时可在运行时选择,而不是编译时)循环缓冲区,它可以容纳任何类型的对象,并且它需要非常高的性能.我不认为会有资源争用问题,因为虽然它是在多任务嵌入式环境中,但它是一个合作的,所以任务本身可以管理它.

我最初的想法是在缓冲区中存储一个简单的结构,它包含类型(简单的枚举/定义)和一个指向有效负载的void指针,但我希望它尽可能快,所以我愿意接受绕过的建议堆.

实际上我很高兴绕过任何标准库的原始速度 - 从我所看到的代码来看,它没有针对CPU进行大量优化:看起来他们只是为了类似的东西编译C代码,strcpy()没有手工编码组装.

任何代码或想法将不胜感激.所需的操作是:

  • 创建具有特定大小的缓冲区.
  • 放在尾巴上.
  • 从头上得到.
  • 归还计数.
  • 删除缓冲区.

Ada*_*eld 73

最简单的解决方案是跟踪项目大小和项目数,然后创建适当字节数的缓冲区:

typedef struct circular_buffer
{
    void *buffer;     // data buffer
    void *buffer_end; // end of data buffer
    size_t capacity;  // maximum number of items in the buffer
    size_t count;     // number of items in the buffer
    size_t sz;        // size of each item in the buffer
    void *head;       // pointer to head
    void *tail;       // pointer to tail
} circular_buffer;

void cb_init(circular_buffer *cb, size_t capacity, size_t sz)
{
    cb->buffer = malloc(capacity * sz);
    if(cb->buffer == NULL)
        // handle error
    cb->buffer_end = (char *)cb->buffer + capacity * sz;
    cb->capacity = capacity;
    cb->count = 0;
    cb->sz = sz;
    cb->head = cb->buffer;
    cb->tail = cb->buffer;
}

void cb_free(circular_buffer *cb)
{
    free(cb->buffer);
    // clear out other fields too, just to be safe
}

void cb_push_back(circular_buffer *cb, const void *item)
{
    if(cb->count == cb->capacity){
        // handle error
    }
    memcpy(cb->head, item, cb->sz);
    cb->head = (char*)cb->head + cb->sz;
    if(cb->head == cb->buffer_end)
        cb->head = cb->buffer;
    cb->count++;
}

void cb_pop_front(circular_buffer *cb, void *item)
{
    if(cb->count == 0){
        // handle error
    }
    memcpy(item, cb->tail, cb->sz);
    cb->tail = (char*)cb->tail + cb->sz;
    if(cb->tail == cb->buffer_end)
        cb->tail = cb->buffer;
    cb->count--;
}
Run Code Online (Sandbox Code Playgroud)

  • 非常标准的解决方案 - 完全符合规范.OP包括他试图避免的内容.:P (5认同)

ale*_*xbw 15

// Note power of two buffer size
#define kNumPointsInMyBuffer 1024 

typedef struct _ringBuffer {
    UInt32 currentIndex;
    UInt32 sizeOfBuffer;
    double data[kNumPointsInMyBuffer];
} ringBuffer;

// Initialize the ring buffer
ringBuffer *myRingBuffer = (ringBuffer *)calloc(1, sizeof(ringBuffer));
myRingBuffer->sizeOfBuffer = kNumPointsInMyBuffer;
myRingBuffer->currentIndex = 0;

// A little function to write into the buffer
// N.B. First argument of writeIntoBuffer() just happens to have the
// same as the one calloc'ed above. It will only point to the same
// space in memory if the calloc'ed pointer is passed to
// writeIntoBuffer() as an arg when the function is called. Consider
// using another name for clarity
void writeIntoBuffer(ringBuffer *myRingBuffer, double *myData, int numsamples) {
    // -1 for our binary modulo in a moment
    int buffLen = myRingBuffer->sizeOfBuffer - 1;
    int lastWrittenSample = myRingBuffer->currentIndex;

    int idx;
    for (int i=0; i < numsamples; ++i) {
        // modulo will automagically wrap around our index
        idx = (i + lastWrittenSample) & buffLen; 
        myRingBuffer->data[idx] = myData[i];
    }

    // Update the current index of our ring buffer.
    myRingBuffer->currentIndex += numsamples;
    myRingBuffer->currentIndex &= myRingBuffer->sizeOfBuffer - 1;
}
Run Code Online (Sandbox Code Playgroud)

只要你的环形缓冲区的长度是2的幂,那么令人难以置信的快速二进制"&"操作将围绕你的索引.对于我的应用程序,我正在从麦克风获取的音频环形缓冲区向用户显示一段音频.

我总是确保屏幕上显示的最大音频量远小于环形缓冲区的大小.否则你可能正在从同一个块读取和写入.这可能会给你带来奇怪的显示效果.


小智 11

首先,标题.如果使用位整数来保存头部和尾部"指针",并且调整它们以使它们完全同步,则不需要模运算来包装缓冲区.IE:4096塞进一个12位无符号int本身就是0,无论如何都不受干扰.即使对于2的幂,消除模运算也会使速度加倍 - 几乎完全相同.

使用带有默认内联的Visual Studio 2010 C++编译器,以及为服务数据提供的1/8192,我的第3代i7 Dell XPS 8500上填充和耗尽任何类型数据元素的4096缓冲区需要花费52秒.

我在RX中重写了main()中的测试循环,所以它们不再控制流 - 这应该是由指示缓冲区已满或空的返回值控制的,并且是随之而来的; 声明.IE:填料和沥水器应该能够相互撞击而不会腐败或不稳定.在某些时候,我希望多线程这个代码,因此这种行为将是至关重要的.

QUEUE_DESC(队列描述符)和初始化函数强制此代码中的所有缓冲区都是2的幂.否则上述方案将不起作用.关于这个问题,请注意QUEUE_DESC不是硬编码的,它使用清单常量(#define BITS_ELE_KNT)来构造它.(我假设2的幂是足够的灵活性)

为了使缓冲区大小运行时可选,我尝试了不同的方法(这里没有显示),并使用USHRTs来解决Head,Tail,EleKnt能够管理FIFO缓冲区[USHRT]的问题.为了避免模运算,我使用Head,Tail为&&创建了一个掩码,但该掩码原来是(EleKnt -1),所以只需使用它.在安静的机器上使用USHRTS而不是bit ints可以提高性能~15%.英特尔CPU内核总是比它们的总线更快,因此在繁忙的共享机器上,打包数据结构可以让您在其他竞争线程之前加载和执行.权衡.

请注意,缓冲区的实际存储空间是使用calloc()在堆上分配的,并且指针位于结构的基础上,因此struct和指针的地址完全相同.IE浏览器; 不需要将偏移量添加到结构地址以绑定寄存器.

同样,服务于缓冲区的所有变量都与缓冲区物理上相邻,绑定到同一个结构中,因此编译器可以制作漂亮的汇编语言.你必须杀死内联优化才能看到任何程序集,否则它会被摧毁掉.

为了支持任何数据类型的多态性,我使用了memcpy()而不是赋值.如果您只需要灵活地为每个编译支持一个随机变量类型,那么此代码可以完美地运行.

对于多态性,您只需要知道类型及其存储要求.DATA_DESC描述符数组提供了一种跟踪放入QUEUE_DESC.pB​​uffer的每个数据的方法,以便可以正确检索它.我只是分配足够的pBuffer内存来保存最大数据类型的所有元素,但是跟踪给定数据在DATA_DESC.dBytes中实际使用的存储量.另一种方法是重新发明堆管理器.

这意味着QUEUE_DESC的UCHAR*pBuffer将有一个并行伴随数组来跟踪数据类型和大小,而pBuffer中的数据存储位置将保持原样.新成员可能是DATA_DESC*pDataDesc,或者也许是DATA_DESC DataDesc [2 ^ BITS_ELE_KNT],如果你能找到一种方法来通过这样的前向引用来击败你的编译器.在这些情况下,Calloc()总是更灵活.

您仍然在Q_Put(),Q_Get中使用memcpy(),但实际复制的字节数将由DATA_DESC.dBytes确定,而不是QUEUE_DESC.EleBytes.对于任何给定的put或get,元素可能是所有不同类型/大小的.

我相信这段代码满足速度和缓冲区大小的要求,可以满足6种不同数据类型的要求.我已经以printf()语句的形式留下了许多测试装置,因此您可以满足(或不)您的代码正常工作.随机数生成器演示该代码适用于任何随机头/尾组合.

enter code here
// Queue_Small.cpp : Defines the entry point for the console application.
//
#include "stdafx.h"
#include <stdio.h>
#include <time.h>
#include <limits.h>
#include <stdlib.h>
#include <malloc.h>
#include <memory.h>
#include <math.h>

#define UCHAR unsigned char
#define ULONG unsigned long
#define USHRT unsigned short
#define dbl   double
/* Queue structure */
#define QUEUE_FULL_FLAG 1
#define QUEUE_EMPTY_FLAG -1
#define QUEUE_OK 0
//  
#define BITS_ELE_KNT    12  //12 bits will create 4.096 elements numbered 0-4095
//
//typedef struct    {
//  USHRT dBytes:8;     //amount of QUEUE_DESC.EleBytes storage used by datatype
//  USHRT dType :3; //supports 8 possible data types (0-7)
//  USHRT dFoo  :5; //unused bits of the unsigned short host's storage
// }    DATA_DESC;
//  This descriptor gives a home to all the housekeeping variables
typedef struct  {
    UCHAR   *pBuffer;   //  pointer to storage, 16 to 4096 elements
    ULONG Tail  :BITS_ELE_KNT;  //  # elements, with range of 0-4095
    ULONG Head  :BITS_ELE_KNT;  //  # elements, with range of 0-4095
    ULONG EleBytes  :8;     //  sizeof(elements) with range of 0-256 bytes
    // some unused bits will be left over if BITS_ELE_KNT < 12
    USHRT EleKnt    :BITS_ELE_KNT +1;// 1 extra bit for # elements (1-4096)
    //USHRT Flags   :(8*sizeof(USHRT) - BITS_ELE_KNT +1);   //  flags you can use
    USHRT   IsFull  :1;     // queue is full
    USHRT   IsEmpty :1;     // queue is empty
    USHRT   Unused  :1;     // 16th bit of USHRT
}   QUEUE_DESC;

//  ---------------------------------------------------------------------------
//  Function prototypes
QUEUE_DESC *Q_Init(QUEUE_DESC *Q, int BitsForEleKnt, int DataTypeSz);
int Q_Put(QUEUE_DESC *Q, UCHAR *pNew);
int Q_Get(UCHAR *pOld, QUEUE_DESC *Q);
//  ---------------------------------------------------------------------------
QUEUE_DESC *Q_Init(QUEUE_DESC *Q, int BitsForEleKnt, int DataTypeSz)    {
    memset((void *)Q, 0, sizeof(QUEUE_DESC));//init flags and bit integers to zero
    //select buffer size from powers of 2 to receive modulo 
    //                arithmetic benefit of bit uints overflowing
    Q->EleKnt   =   (USHRT)pow(2.0, BitsForEleKnt);
    Q->EleBytes =   DataTypeSz; // how much storage for each element?
    //  Randomly generated head, tail a test fixture only. 
    //      Demonstrates that the queue can be entered at a random point 
    //      and still perform properly. Normally zero
    srand(unsigned(time(NULL)));    // seed random number generator with current time
    Q->Head = Q->Tail = rand(); // supposed to be set to zero here, or by memset
    Q->Head = Q->Tail = 0;
    //  allocate queue's storage
    if(NULL == (Q->pBuffer = (UCHAR *)calloc(Q->EleKnt, Q->EleBytes)))  {
        return NULL;
    }   else    {
        return Q;
    }
}
//  ---------------------------------------------------------------------------
int Q_Put(QUEUE_DESC *Q, UCHAR *pNew)   
{
    memcpy(Q->pBuffer + (Q->Tail * Q->EleBytes), pNew, Q->EleBytes);
    if(Q->Tail == (Q->Head + Q->EleKnt)) {
        //  Q->IsFull = 1;
        Q->Tail += 1;   
        return QUEUE_FULL_FLAG; //  queue is full
    }
    Q->Tail += 1;   //  the unsigned bit int MUST wrap around, just like modulo
    return QUEUE_OK; // No errors
}
//  ---------------------------------------------------------------------------
int Q_Get(UCHAR *pOld, QUEUE_DESC *Q)   
{
    memcpy(pOld, Q->pBuffer + (Q->Head * Q->EleBytes), Q->EleBytes);
    Q->Head += 1;   //  the bit int MUST wrap around, just like modulo

    if(Q->Head == Q->Tail)      {
        //  Q->IsEmpty = 1;
        return QUEUE_EMPTY_FLAG; // queue Empty - nothing to get
    }
    return QUEUE_OK; // No errors
}
//
//  ---------------------------------------------------------------------------
int _tmain(int argc, _TCHAR* argv[])    {
//  constrain buffer size to some power of 2 to force faux modulo arithmetic
    int LoopKnt = 1000000;  //  for benchmarking purposes only
    int k, i=0, Qview=0;
    time_t start;
    QUEUE_DESC Queue, *Q;
    if(NULL == (Q = Q_Init(&Queue, BITS_ELE_KNT, sizeof(int)))) {
        printf("\nProgram failed to initialize. Aborting.\n\n");
        return 0;
    }

    start = clock();
    for(k=0; k<LoopKnt; k++)    {
        //printf("\n\n Fill'er up please...\n");
        //Q->Head = Q->Tail = rand();
        for(i=1; i<= Q->EleKnt; i++)    {
            Qview = i*i;
            if(QUEUE_FULL_FLAG == Q_Put(Q, (UCHAR *)&Qview))    {
                //printf("\nQueue is full at %i \n", i);
                //printf("\nQueue value of %i should be %i squared", Qview, i);
                break;
            }
            //printf("\nQueue value of %i should be %i squared", Qview, i);
        }
        //  Get data from queue until completely drained (empty)
        //
        //printf("\n\n Step into the lab, and see what's on the slab... \n");
        Qview = 0;
        for(i=1; i; i++)    {
            if(QUEUE_EMPTY_FLAG == Q_Get((UCHAR *)&Qview, Q))   {
                //printf("\nQueue value of %i should be %i squared", Qview, i);
                //printf("\nQueue is empty at %i", i);
                break;
            }
            //printf("\nQueue value of %i should be %i squared", Qview, i);
        }
        //printf("\nQueue head value is %i, tail is %i\n", Q->Head, Q->Tail);
    }
    printf("\nQueue time was %5.3f to fill & drain %i element queue  %i times \n", 
                     (dbl)(clock()-start)/(dbl)CLOCKS_PER_SEC,Q->EleKnt, LoopKnt);
    printf("\nQueue head value is %i, tail is %i\n", Q->Head, Q->Tail);
    getchar();
    return 0;
}
Run Code Online (Sandbox Code Playgroud)

  • 您的代码因各种原因而中断.首先,你的`Q_Put`方法中的比较`Q-> Tail ==(Q-> Head + Q-> EleKnt)`将永远不会返回true,因为`Q-> Head + Q-> EleKnt`不是模数另外,这意味着你只需在下次写入时覆盖头部.如果`EleKnt`是'4096`,这是你的`Tail`永远不会达到的值.接下来,使用它作为生产者/消费者队列会造成严重破坏,因为你的`Q_Put`"首先写入,稍后提出问题"并且即使在意识到队列已满之后也更新`Tail`.接下来调用`Q_Put`将简单地覆盖头部,就像从未发生任何事情一样. (5认同)
  • @RocketRoy:我刚刚做了,Visual Studio 2012.在这里,你也可以[在线检查结果](http://ideone.com/doPsaB)(标准输出在底部),这样我们就可以确定看着相同的代码.测试程序的输出位于页面底部,并确认我写的内容:只要您不尝试填写它,它就"完美地运行".这就是为什么它作为FIFO缓冲区无用的原因.:) (5认同)
  • 具有讽刺意味的是,我通过你对[这个其他答案](http://stackoverflow.com/a/215575/69809)的评论得到了这篇文章,你声称@AdamDavis的片段不起作用,它实际上是其他方式.另请注意,一旦Adam检测到它已满,就会从"Put"返回,而您仍然会复制数据然后进行检查. (5认同)
  • @RocketRoy:所以你实际上是在告诉我你仍然不同意你的代码被破坏了?是的,我很确定你的代码不会"移动间隙",因为它只是完美地工作并覆盖头部而无法检测何时满.我希望你不要在任何生命关键系统中使用它,你可能会遇到严重的麻烦. (4认同)
  • 您应该分析维基百科页面上针对[循环缓冲区](http://en.wikipedia.org/wiki/Circular_buffer)提供的各种算法,即[全缓存或空缓冲区分](http://en.wikipedia) .org/wiki/Circular_buffer#Full_.2F_Empty_Buffer_Distinction)问题.使用当前的方法,您需要在缓冲区中保留一个元素,以了解full和empty之间的区别. (2认同)

dew*_*ell 9

您可以枚举编码缓冲区时所需的类型,还是需要能够通过动态调用在运行时添加类型?如果是前者,那么我将创建缓冲区作为n个结构的堆分配数组,其中每个结构由两个元素组成:标识数据类型的枚举标记和所有数据类型的并集.你在小元素的额外存储方面失去了什么,你可以弥补不必处理分配/释放以及由此产生的内存碎片.然后,您只需要跟踪定义缓冲区的head和tail元素的开始和结束索引,并确保在递增/递减索引时计算mod n.


Sol*_*lot 8

这是C中的一个简单解决方案.假设每个功能都关闭中断.没有多态性和东西,只是常识.


#define BUFSIZE 128
char buf[BUFSIZE];
char *pIn, *pOut, *pEnd;
char full;

// init
void buf_init()
{
    pIn = pOut = buf;       // init to any slot in buffer
    pEnd = &buf[BUFSIZE];   // past last valid slot in buffer
    full = 0;               // buffer is empty
}

// add char 'c' to buffer
int buf_put(char c)
{
    if (pIn == pOut  &&  full)
        return 0;           // buffer overrun

    *pIn++ = c;             // insert c into buffer
    if (pIn >= pEnd)        // end of circular buffer?
        pIn = buf;          // wrap around

    if (pIn == pOut)        // did we run into the output ptr?
        full = 1;           // can't add any more data into buffer
    return 1;               // all OK
}

// get a char from circular buffer
int buf_get(char *pc)
{
    if (pIn == pOut  &&  !full)
        return 0;           // buffer empty  FAIL

    *pc = *pOut++;              // pick up next char to be returned
    if (pOut >= pEnd)       // end of circular buffer?
        pOut = buf;         // wrap around

    full = 0;               // there is at least 1 slot
    return 1;               // *pc has the data to be returned
}
Run Code Online (Sandbox Code Playgroud)