C++无锁队列

C++无锁队列利用原子操作基于 c 语言实现的无锁队列

无锁队列

添加锁的局限性

什么是原子操作

无锁队列实现原理

CAS

bool compare_and_swap ( int *memory_location, int expected_value, int new_value) { 
    if (*memory_location == expected_value) { 
    *memory_location = new_value; return true; } return false; } 

GCC对CAS的支持

bool __sync_bool_compare_and_swap (type *ptr, type oldval type newval, ...); type __sync_val_compare_and_swap (type *ptr, type oldval type newval, ...); 

windows对CAS的支持

LONG InterlockedCompareExchange( LONG volatile *Destination, LONG ExChange, LONG Comperand ); 

队列

无锁队列的实现

RingBuffer

开始队列为空的时候,rear和front都指向队列的头部,当有task进入时,先添加task,再将rear++,而当有task取出时,先取出task,再将front++。

single builder —— single consumer

针对这种情况,因为,队列只有一个线程push task, 一个线程pop task,即一个builder维护rear,一个consumer维护front。两个线程之间并没有共同需要维护的数据,所以也就不存在数据的竞争。这种情况下,就不需数据队列进行原子操作,就可以直接进行数据的写入。

其他情况

针对其他情况,就有必要引入原子操作,来保证队列中数据不会产生数据竞争。

CircleQueue实现

头文件
#pragma once #include  
     #include  
     enum CircularQueueValueStatus : unsigned int { 
    STATUS_INVALID = 0, STATUS_LOCKED, STATUS_VALID, }; template<class T> class CircularQueue { 
    public: CircularQueue(); ~CircularQueue(); public: bool init(unsigned int maxNum = 4096); void destroy(); public: bool push(const T& task); bool pop(T& task); public: unsigned int getTaskCount(unsigned int front, unsigned int rear); unsigned int getFreeTaskCount(unsigned int front, unsigned int rear); private: unsigned int _maxNum; T* _array; std::atomic<unsigned int>* _arrayState; volatile unsigned int _front; volatile unsigned int _rear; }; class ScopeLockPush { 
    public: ScopeLockPush(std::atomic<unsigned int>* flag) :_flag(flag) { 
    unsigned int state = STATUS_INVALID; /*原子的比较*_flag 和 state的值,若它们逐位相等,则 *flag = STATUS_LOCKED 。否则,state = *_flag。*/ while (!_flag->compare_exchange_strong(state, STATUS_LOCKED)) { 
    state = STATUS_INVALID; } } ~ScopeLockPush() { 
    (*_flag).store(STATUS_VALID); } private: std::atomic<unsigned int>* _flag; }; class ScopeLockPop { 
    public: ScopeLockPop(std::atomic<unsigned int>* flag) :_flag(flag) { 
    unsigned int state = STATUS_VALID; while (!_flag->compare_exchange_strong(state, STATUS_LOCKED)) { 
    state = STATUS_VALID; } } ~ScopeLockPop() { 
    (*_flag).store(STATUS_INVALID); } private: std::atomic<unsigned int>* _flag; }; 
源文件
#include "circular_queue.h" template<class T> CircularQueue<T>::CircularQueue() :_maxNum(0) ,_array(nullptr) ,_arrayState(nullptr) ,_front(0) ,_rear(0) { 
    } template<class T> CircularQueue<T>::~CircularQueue() { 
    destroy(); } template<class T> bool CircularQueue<T>::init(unsigned int maxNum) { 
    if (maxNum > 0) { 
    if (nullptr == _array) { 
    _array = new T[maxNum]; if (nullptr == _array) { 
    return false; } } if (nullptr == _arrayState) { 
    _arrayState = new std::atomic<unsigned int>[maxNum]; if (nullptr == _arrayState) { 
    return false; } } for (unsigned int stateIndex = 0; stateIndex < maxNum; ++stateIndex) { 
    _arrayState[stateIndex] = CircularQueueValueStatus::STATUS_INVALID; } _maxNum = maxNum; _front = 0; _rear = 0; return true; } return false; } template<class T> void CircularQueue<T>::destroy() { 
    delete [] _array; _array = nullptr; delete[] _arrayState; _arrayState = nullptr; } template<class T> bool CircularQueue<T>::push(const T& task) { 
    register unsigned int oldPos = 0; register unsigned int newPos = 0; while (true) { 
    oldPos = _rear; newPos = oldPos; // 如果队列已满,无法进行添加 if (0 >= getFreeTaskCount(_front, oldPos)) { 
    return false; } if (_maxNum == oldPos) { 
    newPos = 0; } else { 
    newPos++; } //CAS原子操作,如果rear == oldPos, 则rear = newPos, 如果rear != oldPos, 则没任何变化, 都会返返回&rear指向的地址的值,即rear if (oldPos == _InterlockedCompareExchange(&_rear, newPos, oldPos)) { 
    // 这里进行原子操作,这是判断获取到oldPos后,rear是否发生变化,如果发生变化,则说明有别的线程在_rear处添加进了task,则,重新进入while循环,更新_rear(原_rear++)后,再进行判断。 // 如果没有发生变化,则说明,本线程可以在_rear处添加task register unsigned int index = oldPos % _maxNum; // RingBuffer循环特性,获取在队列中的index; ScopeLockPush guard(&_arrayState[index]); // 防止 _rear 和 _front的值访问同样的index; 防止array[index]出现数据竞争。同样是使用原子操作的原理防止 _array[index] = task; return true; } } } template <class T> bool CircularQueue<T>::pop(T& task) { 
    register unsigned int oldPos = 0; register unsigned int newPos = 0; while (true) { 
    oldPos = _front; newPos = oldPos; // 如果队列中没有任务,则不进行pop if (0 >= getTaskCount(oldPos, _rear)) { 
    return false; } if (_maxNum == oldPos) { 
    newPos = 0; } else { 
    newPos++; } //CAS原子操作,如果rear == oldPos, 则rear = newPos, 如果rear != oldPos, 则没任何变化, 都会返返回&rear指向的地址的值,即rear if (oldPos == _InterlockedCompareExchange(&_front, newPos, oldPos)) { 
    // 这里进行原子操作,这是判断获取到oldPos后,rear是否发生变化,如果发生变化,则说明有别的线程在_rear处添加进了task,则,重新进入while循环,更新_rear(原_rear++)后,再进行判断。 // 如果没有发生变化,则说明,本线程可以在_rear处添加task register unsigned int index = oldPos % _maxNum; // RingBuffer循环特性,获取在队列中的index; ScopeLockPop guard(&_arrayState[index]); // 防止 _rear 和 _front的值访问同样的index; 防止array[index]出现数据竞争。同样是使用原子操作的原理防止 task = _array[index]; break; } } } //队列中任务个数 template <class T> unsigned int CircularQueue<T>::getTaskCount(unsigned int front, unsigned int rear) { 
    if (front <= rear) { 
    return (rear - front); } else { 
    return (rear + _maxNum + 1 - front); } } //队列中空白的位置 template <class T> unsigned int CircularQueue<T>::getFreeTaskCount(unsigned int front, unsigned int rear) { 
    return _maxNum - getTaskCount(front, rear); } 
测试CircularQueue
#include "circular_queue.cpp" #include  
     #include  
     #include  
     using namespace std; unsigned int* taskNum = new unsigned int(0); class Task { 
    public: Task() { 
    while (true) { 
    unsigned int oldTaskNum = *taskNum; unsigned int newTaskNum = oldTaskNum + 1; if (oldTaskNum == InterlockedCompareExchange(taskNum, newTaskNum, oldTaskNum)) { 
    taskId = *taskNum; break; } } } public: int taskId; }; int main() { 
    CircularQueue<Task> queue; bool flag = queue.init(); *taskNum = 0; if (flag) { 
    auto pushTask = [&]() { 
    for (int i = 0; i < 10; ++i) { 
    Task task; if (queue.push(task)) { 
    cout << "PUSH TASK, TASKID = " << task.taskId << endl; } else { 
    cout << "PUSH FAILED ID = " << task.taskId << endl; } } }; auto popTask = [&]() { 
    Sleep(1); for (int i = 0; i < 10; ++i) { 
    Task task; if (queue.pop(task)) { 
    cout << "POP TASK, TASKID = " << task.taskId << endl; } } }; thread builder1(pushTask); thread consumer1(popTask); thread consumer2(popTask); thread consumer3(popTask); thread consumer4(popTask); builder1.join(); consumer1.join(); consumer2.join(); consumer3.join(); consumer4.join(); } return 0; } 
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/209208.html原文链接:https://javaforall.net

(0)
上一篇 2026年3月19日 上午9:47
下一篇 2026年3月19日 上午9:47


相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号