C++ 无锁队列
一、atomic原子类型
二、无锁队列
1.代码实现
#include <thread> #include <iostream> #include <atomic> #include <vector> using namespace std; static const int QUEUE_SIZE = 1024; static const int PRODUCES_NUMBER = 3; static const int CONSUMERS_NUMBER = 4; static const int MSG_NUMBER = 200; template<typename T> struct Node {
T data; //数据 atomic_bool hasData; //是否有数据 }; template<typename T> class Queue {
public: Queue():msg(QUEUE_SIZE) {
r_index = 0; w_index = 0; }; bool enqueue(T& value); // 入队列 bool dequeue(T& value); // 出队列 private: vector<Node<T>> msg; // 消息队列 atomic<size_t> r_index; // 读索引(不考虑溢出情况) atomic<size_t> w_index; // 写索引(不考虑溢出情况) }; template<typename T> bool Queue<T>::enqueue(T& value) {
size_t l_w_index = w_index.load(std::memory_order_relaxed); Node<T>* node = NULL; // CAS比较是否和预期一致,一致则对写索引递增1,不一致则重试 do {
//队列是否已满 if (l_w_index >= r_index.load(std::memory_order_relaxed) + msg.size()) {
return false; } // 判断是否有数据 size_t index = l_w_index % msg.size(); node = &msg[index]; if (node->hasData.load(std::memory_order_relaxed)) {
return false; } } while (!w_index.compare_exchange_weak(l_w_index, l_w_index + 1, std::memory_order_relaxed)); //写数据 node->data = std::move(value);//左值转右值,避免拷贝带来的浪费 node->hasData.store(true); return true; } template<typename T> bool Queue<T>::dequeue(T& value) {
size_t l_r_index = r_index.load(std::memory_order_relaxed);; Node<T>* node = NULL; // CAS比较是否和预期一致,一致则对读索引递增1,不一致则重试 do {
//队列是否为空 if (l_r_index > w_index.load(std::memory_order_relaxed)) {
return false; } // 判断是否有数据 size_t index = l_r_index % msg.size(); node = &msg[index]; if (!node->hasData.load(std::memory_order_relaxed)) {
return false; } } while (!r_index.compare_exchange_weak(l_r_index, l_r_index + 1, std::memory_order_relaxed)); //读数据 value = std::move(node->data); node->hasData.store(false); return true; }
2. 多线程读写
int main() {
//控制线程 std::atomic<uint8_t> producer_thread_count = 0; std::atomic<uint8_t> consumer_thread_count = 0; //队列 Queue<uint32_t> queue; //消息序列 std::atomic<uint32_t> sequence = 0; //生产者线程 auto producer = [&queue, &sequence, &producer_thread_count]() {
for (uint32_t i = 0; i < MSG_NUMBER; i++) {
uint32_t num = sequence++; while(!queue.enqueue(num));//入队列[0, (MSG_NUMBER-1) * PRODUCES_NUMBER) } producer_thread_count++; }; //消费者线程 std::atomic<uint32_t> counter[MSG_NUMBER * PRODUCES_NUMBER]; auto consumer = [&queue, &counter,&consumer_thread_count]() {
uint32_t num = 0; while (queue.dequeue(num)) {
counter[num]++;//出队列后把对应索引位的值递增1 } consumer_thread_count++; }; //线程池 std::unique_ptr<std::thread> produce_threads[PRODUCES_NUMBER]; std::unique_ptr<std::thread> consumer_threads[CONSUMERS_NUMBER]; //创建线程 for (int i = 0; i < PRODUCES_NUMBER; i++) {
produce_threads[i].reset(new std::thread(producer)); produce_threads[i]->detach(); } for (int i = 0; i < CONSUMERS_NUMBER; i++) {
consumer_threads[i].reset(new std::thread(consumer)); consumer_threads[i]->detach(); } while (producer_thread_count != PRODUCES_NUMBER || consumer_thread_count != CONSUMERS_NUMBER) {
std::this_thread::sleep_for(std::chrono::seconds(5)); } //判断是否有竞争 for (int i = 0; i < (MSG_NUMBER*PRODUCES_NUMBER); i++) {
if (counter[i] != 1) {
std::cout << "found race condition\t" << i << '\t' << counter[i] << std::endl; break; } } return 0; }
三、总结
无锁队列依靠原子和CAS操作,对队列的读写索引进行判断来入队和出队,它没有使用互斥量mutex来进行加锁,从性能上具有明显的优势,但同时编程的复杂性增加了很多,在编码时也要对内存序有简单的了解。
发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/231463.html原文链接:https://javaforall.net
