消息队列实现(无锁队列的几种实现及其性能对比)

管理员 2024-11-25 11:34:35 0
一、无锁队列用在什么样的场景?

当需要处理的数据非常多,比如行情数据,一秒处理非常多的数据的时候,可以考虑用无锁队列。但是如果一秒只需要处理几百或者几千的数据,是没有必要考虑用无锁队列的。用互斥锁就能解决问题,数据量相对少的时候互斥锁与无锁队列之间差别并不是很明显。

二、为什么要用无锁队列?

有锁队列会有哪些问题?

1、Cache的损坏,在线程间频繁切换的时候会导致Cache****中数据的丢失;

CPU的运行速度比主存快N倍,所以大量的处理器时间被浪费在处理器与主存的数据传输上,这就是在处理器与主存之间引入Cache的原因。Cache是一种速度更快但容量更小的内存,当处理器要访问主存中的数据时,这些数据首先要被拷贝到Cache中,因为这些数据在不久的将来可能又会被处理器访问。Cache misses对性能有非常大的影响,因为处理器访问Cache中的数据将比直接访问主存快得多。

线程被频繁抢占产生的Cache损坏将导致应用程序性能下降。

2、在同步机制上争抢队列;

CPU会将大量的时间浪费在保护队列数据的互斥锁,而不是处理队列中的数据。

然后非阻塞的机制使用了CAS的特殊操作,使得任务之间可以不争抢任何资源,然后在队列中预定的位置上,插入或者提取数据。

3、多线程动态内存分配性能下降;

多线程同时分配内存时,会涉及到线程分配同一块相同地址内存的问题,这个时候会用锁来进行同步。显然频繁分配内存会导致应用程序性能下降。

三、无锁队列的实现3.1 一读一写的无锁队列

yqueue是用来设计队列,ypipe用来设计队列的写入时机、回滚以及flush

首先我们来看yqueue的设计

3.1.1 yqueue——无锁队列

1、内存分配

首先我们需要考虑队列的内存分配,yqueue中的数据结构使用的chunk块机制,每次批量分配一批元素,这样可以减少内存的分配和释放

template<typename T, int N> // 链表结点称之为chunk_t struct chunk_t { T values[N]; //每个chunk_t可以容纳N个T类型的元素,以后就以一个chunk_t为单位申请内存 chunk_t *prev; chunk_t *next; };无锁队列的几种实现及其性能对比

当队列不足的时候每次分配一个chunk_t,每个chunk_t能存储N个元素。

93 // Adds an element to the back end of the queue.94 inline void push()95 {96 back_chunk = end_chunk;97 back_pos = end_pos; //99 if (++end_pos != N) //end_pos!=N表明这个chunk节点还没有满100 return;101102 chunk_t *sc = spare_chunk.xchg(NULL); // 为什么设置为NULL? 因为如果把之前值取出来了则没有spare chunk了,所以设置为NULL103 if (sc) // 如果有spare chunk则继续复用它104 {105 end_chunk->next = sc;106 sc->prev = end_chunk;107 }108 else // 没有则重新分配109 {110 // static int s_cout = 0;111 // printf("s_cout:%d\n", ++s_cout);112 end_chunk->next = (chunk_t *)malloc(sizeof(chunk_t)); // 分配一个chunk113 alloc_assert(end_chunk->next);114 end_chunk->next->prev = end_chunk;115 }116 end_chunk = end_chunk->next;117 end_pos = 0;118 }

可以看到112行,在要push一个元素的时候,首先看最后一个chunk,也就是back_chunk的back_pos是不是该chunk的最后一个元素,如果是,则重新分配一个chunk,将这个chunk加到chunk链表的下一个节点。

无锁队列的几种实现及其性能对比

这个逻辑相对来说还是比较简单的。唯一需要关注的,就是

102 chunk_t *sc = spare_chunk.xchg(NULL);

这一行,这个spare_chunk是怎么来的?

154 // Removes an element from the front end of the queue.155 inline void pop()156 {157 if (++begin_pos == N) // 删除满一个chunk才回收chunk158 {159 chunk_t *o = begin_chunk;160 begin_chunk = begin_chunk->next;161 begin_chunk->prev = NULL;162 begin_pos = 0;163164 // 'o' has been more recently used than spare_chunk,165 // so for cache reasons we'll get rid of the spare and166 // use 'o' as the spare.167 chunk_t *cs = spare_chunk.xchg(o); //由于局部性原理,总是保存最新的空闲块而释放先前的空闲快168 free(cs);169 }170 }

当pop的时候,如果删除一个chunk里面没有元素了,这个时候会需要将这个chunk所开辟的空间释放掉,但是这里使用了一个技巧即:将这个chunk先不释放,先放到spare_chunk里面,等到下次需要开辟新的空间的时候再把这个spare_chunk拿来用。

我们再来看ypipe

3.1.2 ypipe——yqueue的封装

yqueue负责元素内存的分配与释放,入队以及出队列;ypipe负责yqueue读写指针的变化。

ypipe是在yqueue_t的基础上再构建一个单读单写的无锁队列

这里有三个指针:

T* w:指向第一个未刷新的元素,只被写线程使用T* r:指向第一个没有被预提取的元素,只被读线程使用T*f:指向下一轮要被刷新的一批元素的第一个

ypipe的定义:

37 // Initialises the pipe. 38 inline ypipe_t() 49 // The destructor doesn't have to be virtual. It is mad virtual 50 // just to keep ICC and code checking tools from complaining. 51 inline virtual ~ypipe_t() 52 { 53 } 67 // 写入数据,incomplete参数表示写入是否还没完成,在没完成的时候不会修改flush指针,即这部分数据不会让读线程看到。 68 inline void write(const T &value_, bool incomplete_); 92 inline bool unwrite(T *value_);104 // 刷新所有已经完成的数据到管道,返回false意味着读线程在休眠,在这种情况下调用者需要唤醒读线程。105 // 批量刷新的机制, 写入批量后唤醒读线程;106 // 反悔机制 unwrite107 inline bool flush(); 136 // Check whether item is available for reading.137 // 这里面有两个点,一个是检查是否有数据可读,一个是预取138 inline bool check_read();163 // Reads an item from the pipe. Returns false if there is no value.164 // available.165 inline bool read(T *value_)178 // Applies the function fn to the first elemenent in the pipe179 // and returns the value returned by the fn.180 // The pipe mustn't be empty or the function crashes.181 inline bool probe(bool (*fn)(T &))189 protected:190 // Allocation-efficient queue to store pipe items.191 // Front of the queue points to the first prefetched item, back of192 // the pipe points to last un-flushed item. Front is used only by193 // reader thread, while back is used only by writer thread.194 yqueue_t<T, N> queue;195 196 // Points to the first un-flushed item. This variable is used197 // exclusively by writer thread.198 T *w; //指向第一个未刷新的元素,只被写线程使用199 200 // Points to the first un-prefetched item. This variable is used201 // exclusively by reader thread.202 T *r; //指向第一个还没预提取的元素,只被读线程使用203 204 // Points to the first item to be flushed in the future.205 T *f; //指向下一轮要被刷新的一批元素中的第一个206 207 // The single point of contention between writer and reader thread.208 // Points past the last flushed item. If it is NULL,209 // reader is asleep. This pointer should be always accessed using210 // atomic operations.211 atomic_ptr_t<T> c; //读写线程共享的指针,指向每一轮刷新的起点(看代码的时候会详细说)。当c为空时,表示读线程睡眠(只会在读线程中被设置为空)212 213 // Disable copying of ypipe object.214 ypipe_t(const ypipe_t &);215 const ypipe_t &operator=(const ypipe_t &);

3.1.3 ypipe设计的目的

为了批量读写,即用户可以自主的决定写了多少数据之后开启读

那因为有了生产者和消费者,就会涉及到同步的问题,ypipe这里测试发现,用锁和条件变量性能最佳。

我们来分两种情况看一下读写的具体步骤。第一种情况:批量写,第一轮写

无锁队列的几种实现及其性能对比

无锁队列的几种实现及其性能对比

无锁队列的几种实现及其性能对比

无锁队列的几种实现及其性能对比

无锁队列的几种实现及其性能对比

无锁队列的几种实现及其性能对比

无锁队列的几种实现及其性能对比

无锁队列的几种实现及其性能对比

无锁队列的几种实现及其性能对比

在这个时候才能开始读数据

无锁队列的几种实现及其性能对比

第二种方式:条件变量+互斥锁

无锁队列的几种实现及其性能对比

无锁队列的几种实现及其性能对比

flush函数

101 // Flush all the completed items into the pipe. Returns false if102 // the reader thread is sleeping. In that case, caller is obliged to103 // wake the reader up before using the pipe again.104 // 刷新所有已经完成的数据到管道,返回false意味着读线程在休眠,在这种情况下调用者需要唤醒读线程。105 // 批量刷新的机制, 写入批量后唤醒读线程;106 // 反悔机制 unwrite107 inline bool flush()108 {109 // If there are no un-flushed items, do nothing.110 if (w == f) // 不需要刷新,即是还没有新元素加入111 return true;112 113 // Try to set 'c' to 'f'.114 // read时如果没有数据可以读取则c的值会被置为NULL115 if (c.cas(w, f) != w) // 尝试将c设置为f,即是准备更新w的位置116 {117 118 // Compare-and-swap was unseccessful because 'c' is NULL.119 // This means that the reader is asleep. Therefore we don't120 // care about thread-safeness and update c in non-atomic121 // manner. We'll return false to let the caller know122 // that reader is sleeping.123 c.set(f); // 更新为新的f位置124 w = f;125 return false; //线程看到flush返回false之后会发送一个消息给读线程,这需要写业务去做处理126 }127 else // 读端还有数据可读取128 {129 // Reader is alive. Nothing special to do now. Just move130 // the 'first un-flushed item' pointer to 'f'.131 w = f; // 更新f的位置132 return true;133 }134 }

flush的目的就是将改变w的值,同时改变c的值

这里有两种情况:

1、c的值与w的值相等

说明队列的w值没有更新,不对队列的数据进行读取

无锁队列的几种实现及其性能对比

这发生在flush第一次发生的时候以及w的值还未更新时,此时返回true,表示队列不可读。

2、c的值与w的值不相等

无锁队列的几种实现及其性能对比

这发生在c在w位置后面,此时更新c与w的值,并返回false,表示队列可读

write函数

write函数相对简单

64 // Write an item to the pipe. Don't flush it yet. If incomplete is 65 // set to true the item is assumed to be continued by items 66 // subsequently written to the pipe. Incomplete items are neverflushed down the stream. 67 // 写入数据,incomplete参数表示写入是否还没完成,在没完成的时候不会修改flush指针,即这部分数据不会让读线程看到。 68 inline void write(const T &value_, bool incomplete_) 69 { 70 // Place the value to the queue, add new terminator element. 71 queue.back() = value_; 72 queue.push(); 73 74 // Move the "flush up to here" poiter. 75 if (!incomplete_) 76 { 77 f = &queue.back(); // 记录要刷新的位置 78 // printf("1 f:%p, w:%p\n", f, w); 79 } 80 else 81 { 82 // printf("0 f:%p, w:%p\n", f, w); 83 } 84 }

write只更新f的位置。write并不能决定该队列是否能读,因为write并不能改变w指针,如果要队列能读,需要w指针改变位置才行。

从write和flush可以看出,在更新w和f的时候并没有互斥的保护,所以该无锁队列的设计并不适合多线程场景。

read函数

138 inline bool check_read()139 {140 // Was the value prefetched already? If so, return.141 if (&queue.front() != r && r) //判断是否在前几次调用read函数时已经预取数据了return true;142 return true;143 144 // There's no prefetched value, so let us prefetch more values.145 // Prefetching is to simply retrieve the146 // pointer from c in atomic fashion. If there are no147 // items to prefetch, set c to NULL (using compare-and-swap).148 // 两种情况149 // 1. 如果c值和queue.front(), 返回c值并将c值置为NULL,此时没有数据可读150 // 2. 如果c值和queue.front(), 返回c值,此时可能有数据度的去151 r = c.cas(&queue.front(), NULL); //尝试预取数据152 153 // If there are no elements prefetched, exit.154 // During pipe's lifetime r should never be NULL, however,155 // it can happen during pipe shutdown when items are being deallocated.156 if (&queue.front() == r || !r) //判断是否成功预取数据157 return false;158 159 // There was at least one value prefetched.160 return true;161 }162 163 // Reads an item from the pipe. Returns false if there is no value.164 // available.165 inline bool read(T *value_)166 {167 // Try to prefetch a value.168 if (!check_read())169 return false;170 171 // There was at least one value prefetched.172 // Return it to the caller.173 *value_ = queue.front();174 queue.pop();175 return true;176 }

这里也是有两种情况:

1、r不为空且r不等于&queue.front()

说明此时队列中有可读数据,直接读取即可

2、r指针指向队头元素(r==&queue.front())或者r为空

说明队列中并没有可读的数据,此时将r指针更新成c的值,这个过程我们叫做预取。预取的指令就是:

r=c;

c在flush的时候会被设置为w。而w与&queue.front()之间都是有距离的。这一段距离中间的数据就是预取数据,所以每次read都能取出一段数据。

无锁队列的几种实现及其性能对比

当&queue.front() == c时,代表数据被取完了,这时把c指向NULL,接着读线程会睡眠,这也是给写线程检查读线程是否睡眠的标志。

我们可以测试一下结果,对一个数据加200万次,分别用环形数组、链表、互斥锁、ypipe队列分别是什么样的性能

无锁队列的几种实现及其性能对比

通过测试发现在一读一写的情况下,ypipe的优势是非常大的。

那多读多写的场景呢?

四、多读多写的无锁队列实现

上面我们介绍的是一读一写的场景,用ypipe的方式会性能比较快,但是ypipe不适用于多读多写的场景,因为在读的时候是没有对r指针加锁,在写的时候也没有对w指针加锁。

多读多写的线程安全队列有以下几种实现方式:

1、互斥锁

2、互斥锁+条件变量:BlockQueue

3、内存屏障:SimpleLockFreeQueue

4、CAS原子操作:ArrayLockFreeQueue(也可以理解成RingBuffer)

其中互斥锁的性能是几种方式里面性能最低的,没什么讲的必要,这里就不对比这种实现方式了。

4.1 RingBuffer(ArrayLockFreeQueue)

下面我们来看基于循环数组的无锁队列,也就是RingBuffer如何解决多线程竞争的问题。

首先看下RingBuffer的数据结构如下:

14 template <typename ELEM_T, QUEUE_INT Q_SIZE = ARRAY_LOCK_FREE_Q_DEFAULT_SIZE> 15 class ArrayLockFreeQueue 16 { 17 public: 18 19 ArrayLockFreeQueue(); 20 virtual ~ArrayLockFreeQueue(); 21 22 QUEUE_INT size(); 23 24 bool enqueue(const ELEM_T &a_data);//入队列 25 26 bool dequeue(ELEM_T &a_data);//出队列 27 28 bool try_dequeue(ELEM_T &a_data); 29 30 private: 31 32 ELEM_T m_thequeue[Q_SIZE]; 33 34 volatile QUEUE_INT m_count; 35 volatile QUEUE_INT m_writeIndex; 36 37 volatile QUEUE_INT m_readIndex; 38 39 volatile QUEUE_INT m_maximumReadIndex; 40 41 inline QUEUE_INT countToIndex(QUEUE_INT a_count); 42 };

m_count: // 队列的元素个数

我们先来看三种不同的下标:

m_writeIndex: // 新元素入队列时存放位置在数组中的下标m_readIndex: // 下一个出列的元素在数组中的下标m_maximumReadIndex: // 这个值非常关键,表示最后一个已经完成入队列操作的元素在数组中的下标。如果它的值跟m_writeIndex不一致,表明有写请求尚未完成。这意味着,有写请求成功申请了空间但数据还没完全写进队列。所以如果有线程要读取,必须要等到写线程将数据完全写入到队列之后。

以上三种不同的下标都是必须的,因为队列允许任意数量的生产者和消费者围绕着它工作。已经存在一种基于循环数组的无锁队列,使得唯一的生产者和唯一的消费者可以良好的工作。它的实现相当简洁非常值得阅读。该程序使用gcc内置的__sync_bool_compare_and_swap,但重新做了宏定义封装。

#define CAS(a_ptr, a_oldVal, a_newVal) __sync_bool_compare_and_swap(a_ptr, a_oldVal, a_newVal)无锁队列的几种实现及其性能对比

队列已满判断:

(m_writeIndex+1) % Q_SIZE == m_readIndex

对应代码:

countToIndex(currentWriteIndex + 1) == countToIndex(currentReadIndex)

队列为空判断:

m_readIndex == m_maximumReadIndex

该RingBuffer的重点主要是以下四个方面的问题:

1、多线程写入的时候,m_writeIndex如何更新?

2、m_maximumReadIndex这个变量为什么会需要?它有什么作用?

3、多线程读的恶时候,m_readIndex如何更新?

4、m_maximumReadIndex在什么时候改变?

** **

4.2 enqueue入队列42 template <typename ELEM_T, QUEUE_INT Q_SIZE> 43 bool ArrayLockFreeQueue<ELEM_T, Q_SIZE>::enqueue(const ELEM_T &a_data) 44 { 45 QUEUE_INT currentWriteIndex; // 获取写指针的位置 46 QUEUE_INT currentReadIndex; 47 // 1. 获取可写入的位置 48 do 49 { 50 currentWriteIndex = m_writeIndex; 51 currentReadIndex = m_readIndex; 52 if(countToIndex(currentWriteIndex + 1) == 53 countToIndex(currentReadIndex)) 54 { 55 return false; // 队列已经满了 56 } 57 // 目的是为了获取一个能写入的位置 58 } while(!CAS(&m_writeIndex, currentWriteIndex, (currentWriteIndex+1))); 59 // 获取写入位置后 currentWriteIndex 是一个临时变量,保存我们写入的位置 60 // We know now that this index is reserved for us. Use it to save the data 61 m_thequeue[countToIndex(currentWriteIndex)] = a_data; // 把数据更新到对应的位置 62 63 // 2. 更新可读的位置,按着m_maximumReadIndex+1的操作 64 // update the maximum read index after saving the data. It wouldn't fail if there is only one thread 65 // inserting in the queue. It might fail if there are more than 1 producer threads because this 66 // operation has to be done in the same order as the previous CAS 67 while(!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1))) 68 { 69 // this is a good place to yield the thread in case there are more 70 // software threads than hardware processors and you have more 71 // than 1 producer thread 72 // have a look at sched_yield (POSIX.1b) 73 sched_yield(); // 当线程超过cpu核数的时候如果不让出cpu导致一直循环在此。 74 } 75 76 AtomicAdd(&m_count, 1); 77 78 return true; 79 80 }

图示(非常重要):

以下插图展示了对队列执行操作时各个下标时如何变化的。如果一个位置被标记为X,表示这个位置里面存放了数据。空白表示位置是空的。对于下图的情况,队列中存放了两个元素。WriteIndex指示的位置是新元素将会被插入的位置。ReadIndex指向的位置中的元素将会在下一次pop操作中被弹出。

无锁队列的几种实现及其性能对比

当生产者准备将数据插入到队列中时,它首先通过增加WriteIndex的值来申请空间。MaximumReadIndex指向最后一个存放有效数据的位置(也就是实际的读的队列尾)。

无锁队列的几种实现及其性能对比

一旦空间的申请完成,生产者就可以将数据拷贝到刚刚申请的位置中。完成之后增加MaximumReadIndex使得它与WriteIndex一致。

无锁队列的几种实现及其性能对比

现在队列中有3个元素,接着又有一个生产者尝试向队列中插入元素。

无锁队列的几种实现及其性能对比

在第一个生产者完成数据拷贝之前,又有另外一个生产者申请了一个新的空间准备拷贝元素。现在有两个生产者同时向队列插入数据。

无锁队列的几种实现及其性能对比

现在生产者开始拷贝数据,在完成拷贝之后,对MaximumReadIndex的递增操作必须严格遵循一个顺序:第一个生产者线程首先递增MaximumReadIndex,接着才轮到第二个生产者。这个顺序必须被严格遵守的原因是,我们必须保证数据被完全拷贝到队列之后才允许消费者线程将其出列。

while(!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1)){sched_yield(); // 当线程超过cpu核数的时候如果不让出cpu导致一直循环在此。}无锁队列的几种实现及其性能对比

第一个生产者完成了数据拷贝,并对MaximumReadIndex完成了递增,现在第二个生产者可以递增MaximumReadIndex了。

无锁队列的几种实现及其性能对比

第二个生产者完成了对MaximumReadIndex的递增,现在队列中有5个元素。

4.3 dequeue出队列88 template <typename ELEM_T, QUEUE_INT Q_SIZE> 89 bool ArrayLockFreeQueue<ELEM_T, Q_SIZE>::dequeue(ELEM_T &a_data) 90 { 91 QUEUE_INT currentMaximumReadIndex; 92 QUEUE_INT currentReadIndex; 93 94 do 95 { 96 // to ensure thread-safety when there is more than 1 producer thread 97 // a second index is defined (m_maximumReadIndex) 98 currentReadIndex = m_readIndex; 99 currentMaximumReadIndex = m_maximumReadIndex;100 101 if(countToIndex(currentReadIndex) ==102 countToIndex(currentMaximumReadIndex)) // 如果不为空,获取到读索引的位置103 {104 // the queue is empty or105 // a producer thread has allocate space in the queue but is 106 // waiting to commit the data into it107 return false;108 }109 // retrieve the data from the queue110 a_data = m_thequeue[countToIndex(currentReadIndex)]; // 从临时位置读取的111 112 // try to perfrom now the CAS operation on the read index. If we succeed113 // a_data already contains what m_readIndex pointed to before we 114 // increased it115 if(CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1)))116 {117 AtomicSub(&m_count, 1); // 真正读取到了数据,元素-1118 return true;119 }120 } while(true);121 122 assert(0);123 // Add this return statement to avoid compiler warnings124 return false;125 126 }

以下插入展示了元素出列的时候各种下标是如何变化的,队列中初始有2个元素。WriteIndex指示的位置是新元素将会被插入的位置。ReadIndex指向的位置中的元素将会在下一次pop操作中被弹出。

无锁队列的几种实现及其性能对比

消费者线程拷贝数组ReadIndex位置的元素,然后尝试CAS操作将ReadIndex加1.如果操作成功消费者成功地将数据出列。因为CAS操作是原子的,所以只有唯一的线程可以在同一时刻更新ReadIndex的值。

如果操作失败,读取新的ReadIndex的值,重复以上操作(copy数据,CAS)。

无锁队列的几种实现及其性能对比

现在又有一个消费者将元素出列,队列变成空。

无锁队列的几种实现及其性能对比

现在有一个生产者正在向队列中添加元素。它已经成功的申请了空间,但尚未完成数据拷贝。任何其他企图从队列中移除元素的消费者都会发现队列非空(因为writeIndex不等于readIndex)。但它不能读取readIndex所指向位置中的数据,因为readIndex与MaximumReadIndex相等。这个时候读数据失败,需要等到生产者完成数据拷贝增加MaximumReadIndex的值才可以读。

当生产者完成数据拷贝,队列的大小是1,消费者线程就可以读取这个数据了。

4.4 yielding处理器的必要性 67 while(!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1))) 68 { 69 // this is a good place to yield the thread in case there are more 70 // software threads than hardware processors and you have more 71 // than 1 producer thread 72 // have a look at sched_yield (POSIX.1b) 73 sched_yield(); // 当线程超过cpu核数的时候如果不让出cpu导致一直循环在此。 74 }

在enqueue的第二个CAS里面有一个sched_yield()来主动让出处理器的操作,对于一个声称无锁的算法而言,这个调用看起来有点儿奇怪。多线程环境下影响性能的其中一个因素就是Cache损坏。而产生Cache损坏的一种情况就是一个线程被抢占,操作系统需要保存被抢占线程的上下文,然后被选中作为下一个调度线程的上下文载入。此时Cache中缓存的数据都会失效,因为它是被抢占线程的数据而不是新线程的数据。

无锁算法和通过阻塞机制同步的算法的一个主要区别在于无锁算法不会阻塞在线程同步上。那这里的让出CPU,与阻塞在线程同步上有啥区别?为什么不直接自旋?

首先说下sched_yield的必要性:sched_yield的调用与有多少个生产者线程在并发地往队列中存放数据有关:每个生产者线程所执行的CAS操作都必须严格遵循FIFO次序,一个用于申请空间,另一个用于通知消费者数据已经写入完成可以被读取了.如果我们的应用程序只有唯一的生产者这个操作队列,sched_yield将永远没有机会被调用,因为enqueue的第二个CAS操作永远不会失败。因为一个生产者的情况下没人能破坏生产者执行这两个CAS操作的FIFO顺序。

而对于多个生产者线程往队列中存放数据的时候,问题就出现了。概括来说,一个生产者通过第1个CAS操作申请空间,然后将数据写入到申请到的空间中,然后执行第2个CAS操作通知消费者数据准备完毕可供读取了.这第2个CAS操作必须遵循FIFO顺序,也就是说,如果A线程第首先执行完第一个CAS操作,那么它也要第1个执行完第2个CAS操作,如果A线程在执行完第一个CAS操作之后停止,然后B线程执行完第1个CAS操作,那么B线程将无法完成第2个CAS操作,因为它要等待A先完成第2个CAS操作.而这就是问题产生的根源.让我们考虑如下场景,3个消费者线程和1个消费者线程:

线程1,2,3按顺序调用第1个CAS操作申请了空间.那么它们完成第2个CAS操作的顺序也应该与这个顺序一致,1,2,3.线程2首先尝试执行第2个CAS,但它会失败,因为线程1还没完成它的第2此CAS操作呢.同样对于线程3也是一样的.线程2和3将会不断的调用它们的第2个CAS操作,直到线程1完成它的第2个CAS操作为止.线程1最终完成了它的第2个CAS,现在线程3必须等线程2先完成它的第2个CAS.线程2也完成了,最终线程3也完成.

在上面的场景中,生产者可能会在第2个CAS操作上自旋一段时间,用于等待先于它执行第1个CAS操作的线程完成它的第2次CAS操作.在一个物理处理器数量大于操作队列线程数量的系统上,这不会有太严重的问题:因为每个线程都可以分配在自己的处理器上执行,它们最终都会很快完成各自的第2次CAS操作.虽然算法导致线程处理忙等状态,但这正是我们所期望的,因为这使得操作更快的完成.也就是说在这种情况下我们是不需要sche_yield()的,它完全可以从代码中删除.

但是,在一个物理处理器数量少于线程数量的系统上,sche_yield()就变得至关重要了.让我们再次考查上面3个线程的场景,当线程3准备向队列中插入数据:如果线程1在执行完第1个CAS操作,在执行第2个CAS操作之前被抢占,那么线程2,3就会一直在它们的第2个CAS操作上忙等(它们忙等,不让出处理器,线程1也就没机会执行,它们就只能继续忙等),直到线程1重新被唤醒,完成它的第2个CAS操作.这就是需要sche_yield()的场合了,操作系统应该避免让线程2,3处于忙等状态.它们应该尽快的让出处理器让线程1执行,使得线程1可以把它的第2个CAS操作完成.这样线程2和3才能继续完成它们的操作.

也就是说,如果不适用sched_yield,一直自旋,那么可能多个线程同时阻塞在第二个CAS那儿。

4.5多读多写的RingBuffer存在的问题

1、多于一个生产者线程性能提升不明显

如果有多于一个的生产者线程,那么将它们很可能花费大量的时间用于等待更新MaximumReadIndex(第2个CAS).这个队列最初的设计场景是满足单一消费者,所以不用怀疑在多生产者的情形下会比单一生产者有大幅的性能下降.

另外如果你只打算将此队列用于单一生产者的场合,那么第2个CAS操作可以去除.同样m_maximumReadIndex也可以一同被移除了,所有对m_maximumReadIndex的引用都改成m_writeIndex.所以,在这样的场合下push和pop可以被改写如下:

template <typename ELEM_T> bool ArrayLockFreeQueue<ELEM_T>::push(const ELEM_T &a_data) { uint32_t currentReadIndex; uint32_t currentWriteIndex; currentWriteIndex = m_writeIndex; currentReadIndex = m_readIndex; if (countToIndex(currentWriteIndex + 1) == countToIndex(currentReadIndex)) { // the queue is full return false; } // save the date into the q m_theQueue[countToIndex(currentWriteIndex)] = a_data; // increment atomically write index. Now a consumer thread can read // the piece of data that was just stored AtomicAdd(&m_writeIndex, 1); return true; } template <typename ELEM_T> bool ArrayLockFreeQueue<ELEM_T>::pop(ELEM_T &a_data) { uint32_t currentMaximumReadIndex; uint32_t currentReadIndex;do { // m_maximumReadIndex doesn't exist when the queue is set up as // single-producer. The maximum read index is described by the current // write index currentReadIndex = m_readIndex; currentMaximumReadIndex = m_writeIndex; if (countToIndex(currentReadIndex) == countToIndex(currentMaximumReadIndex)) { // the queue is empty or // a producer thread has allocate space in the queue but is // waiting to commit the data into it return false; } // retrieve the data from the queue a_data = m_theQueue[countToIndex(currentReadIndex)]; // try to perfrom now the CAS operation on the read index. If we succeed // a_data already contains what m_readIndex pointed to before we // increased it if (CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1))) { return true; } // it failed retrieving the element off the queue. Someone else must // have read the element stored at countToIndex(currentReadIndex) // before we could perform the CAS operation } while(1); // keep looping to try again! // Something went wrong. it shouldn't be possible to reach here assert(0); // Add this return statement to avoid compiler warnings return false; }

但是如果是单读单写的场景,没有必要用这个无锁队列,可以看以上单读单写的无锁队列。

2、与智能指针一起使用,内存无法得到释放

如果你打算用这个队列来存放智能指针对象.需要注意,将一个智能指针存入队列之后,如果它所占用的位置没有被另一个智能指针覆盖,那么它所指向的内存是无法被释放的(因为它的引用计数器无法下降为0).这对于一个操作频繁的队列来说没有什么问题,但是程序员需要注意的是,一旦队列被填满过一次那么应用程序所占用的内存就不会下降,即使队列被清空.除非自己做改动,每次pop手动delete。

3、计算队列的大小存在ABA问题

size函数可能会返回一个不正确的值,size的实现如下:

template <typename ELEM_T> inline uint32_t ArrayLockFreeQueue<ELEM_T>::size() { uint32_t currentWriteIndex = m_writeIndex; uint32_t currentReadIndex = m_readIndex; if (currentWriteIndex >= currentReadIndex) { return (currentWriteIndex - currentReadIndex); } else { return (m_totalSize + currentWriteIndex - currentReadIndex); } }

下面的场景描述了size为何会返回一个不正确的值:

当currentWriteIndex = m_writeIndex执行之后,m_writeIndex=3,m_readIndex = 2那么实际size是1之后操作线程被抢占,且在它停止运行的这段时间内,有2个元素被插入和从队列中移除.所以m_writeIndex=5,m_readIndex = 4,而size还是1现在被抢占的线程恢复执行,读取m_readIndex值,这个时候currentReadIndex=4,currentWriteIndex=3.currentReadIndex > currentWriteIndex'所以m_totalSize + currentWriteIndex - currentReadIndex`被返回,这个值意味着队列几乎是满的,而实际上队列几乎是空的

实际上也就是ABA的一个场景。

与本文一起上传的代码中包含了处理这个问题的解决方案.

解决方案:添加一个用于保存队列中元素数量的成员count.这个成员可以通过AtomicAdd/AtomicSub来实现原子的递增和递减.

但需要注意的是这增加了一定开销,因为原子递增,递减操作比较昂贵也很难被编译器优化.

例如,在core 2 duo E6400 2.13 Ghz 的机器上,单生产者单消费者,队列数组的初始大小是1000,测试执行10,000k次的插入,没有count成员的版本用时2.64秒,而维护了count成员的版本用时3.42秒.而对于2消费者,1生产者的情况,没有count成员的版本用时3.98秒,维护count的版本用时5.15秒.

这也就是为什么我把是否启用此成员变量的选择交给实际的使用者.使用者可以根据自己的使用场合选择是否承受额外的运行时开销. 在array_lock_free_queue.h中有一个名为ARRAY_LOCK_FREE_Q_KEEP_REAL_SIZE的宏变量,如果它被定义那么将启用count变量,否则将size函数将有可能返回不正确的值.

4.6多读多写RingBuffer的性能

无锁 vs 阻塞队列

并发的插入和移除100W元素所花费的时间(越小越好,队列的数组大小初始为16384).

在单生产者的情况下,无锁队列战胜了阻塞队列.而随着生产者数量的增加,无锁队列的效率迅速下降.

因为在多个生产者的情况下,第2个CAS将对性能产生影响。

无锁队列的几种实现及其性能对比

然后我们来看代码中的情况:

再来看看消费者线程数量对性能的影响

1、一个生产者线程

无锁队列的几种实现及其性能对比

2、两个生产者

无锁队列的几种实现及其性能对比

3、三个生产者

无锁队列的几种实现及其性能对比

4.7 RingBuffer结论

1、CAS操作是原子的,线程并行执行push/pop不会导致死锁

2、多生产者同时向队列push数据的时候不会将数据写入到同一个位置,产生数据覆盖

3、多消费者同时执行pop不会导致一个元素被出列多于1次

4、线程不能将数据push进已经满的队列中,不能从空的队列中pop元素

5、push和pop都没有ABA问题

但是,虽然这个队列是线程安全的,但是在多生产者线程的环境下它的性能还是不如阻塞队列.因此,在符合下述条件的情况下可以考虑使用这个队列来代替阻塞队列:

1、只有一个生产者线程

2、只有一个频繁操作队列的生产者,但偶尔会有其它生产者向队列push数据

在reactor网络框架中,如果只有一个reactor在处理client的话,用数组实现的RingBuffer来存储消息是比较合适的。

4.8 四种线程安全队列实现性能对比

互斥锁队列 vs 互斥锁+条件变量队列 vs 内存屏障链表 vs RingBuffer CAS实现

1、4写1读无锁队列的几种实现及其性能对比

2、4写4读

无锁队列的几种实现及其性能对比

3、1写4读

无锁队列的几种实现及其性能对比

可以发现RingBuffer的实现性能在几个场景中都是比较好的,但是相对而言,在1写4读的场景下性能是最明显的,几乎是内存屏障的3倍性能了。

为什么链表的方式性能相对BlockQueue没有很大的提升呢?

1、链表的方式需要不断的申请和释放元素。当然,用内存池可以适当改善这个影响,但是内存池在分配内存与释放内存的时候也会涉及到线程间的数据竞争,所以用链表的方式性能相对提升不多。

入队:

74 template <typename U> 75 inline bool enqueue(U &&item) 76 { 77 idx_t nodeIdx = allocate_node_for(std::forward<U>(item)); 78 79 auto tail_ = tail.load(std::memory_order_relaxed); 80 while (!tail.compare_exchange_weak(tail_, nodeIdx, std::memory_order_release, std::memory_order_relaxed)) 81 continue; 82 get_node_at(tail_)->next.store(nodeIdx, std::memory_order_release); 83 84 return true; 85 }

出队:

87 inline bool try_dequeue(T &item) {…….125 add_node_to_free_list(head_, headNode);}

2、链表需要不断地去更新头节点和尾节点指针的位置,在一个while循环里面反复去执行

80 while (!tail.compare_exchange_weak(tail_, nodeIdx, std::memory_order_release, std::memory_order_relaxed))81 continue;

相关文章