无锁队列
无锁队列
添加锁的局限性
什么是原子操作
无锁队列实现原理
CAS
bool compare_and_swap ( int *memory_location, int expected_value, int new_value) {
if (*memory_location == expected_value) {
*memory_location = new_value; return true; } return false; }
GCC对CAS的支持
bool __sync_bool_compare_and_swap (type *ptr, type oldval type newval, ...); type __sync_val_compare_and_swap (type *ptr, type oldval type newval, ...);
windows对CAS的支持
LONG InterlockedCompareExchange( LONG volatile *Destination, LONG ExChange, LONG Comperand );
队列
无锁队列的实现
RingBuffer
开始队列为空的时候,rear和front都指向队列的头部,当有task进入时,先添加task,再将rear++,而当有task取出时,先取出task,再将front++。
single builder —— single consumer
针对这种情况,因为,队列只有一个线程push task, 一个线程pop task,即一个builder维护rear,一个consumer维护front。两个线程之间并没有共同需要维护的数据,所以也就不存在数据的竞争。这种情况下,就不需数据队列进行原子操作,就可以直接进行数据的写入。
其他情况
针对其他情况,就有必要引入原子操作,来保证队列中数据不会产生数据竞争。
CircleQueue实现
头文件
#pragma once #include
#include
enum CircularQueueValueStatus : unsigned int {
STATUS_INVALID = 0, STATUS_LOCKED, STATUS_VALID, }; template<class T> class CircularQueue {
public: CircularQueue(); ~CircularQueue(); public: bool init(unsigned int maxNum = 4096); void destroy(); public: bool push(const T& task); bool pop(T& task); public: unsigned int getTaskCount(unsigned int front, unsigned int rear); unsigned int getFreeTaskCount(unsigned int front, unsigned int rear); private: unsigned int _maxNum; T* _array; std::atomic<unsigned int>* _arrayState; volatile unsigned int _front; volatile unsigned int _rear; }; class ScopeLockPush {
public: ScopeLockPush(std::atomic<unsigned int>* flag) :_flag(flag) {
unsigned int state = STATUS_INVALID; /*原子的比较*_flag 和 state的值,若它们逐位相等,则 *flag = STATUS_LOCKED 。否则,state = *_flag。*/ while (!_flag->compare_exchange_strong(state, STATUS_LOCKED)) {
state = STATUS_INVALID; } } ~ScopeLockPush() {
(*_flag).store(STATUS_VALID); } private: std::atomic<unsigned int>* _flag; }; class ScopeLockPop {
public: ScopeLockPop(std::atomic<unsigned int>* flag) :_flag(flag) {
unsigned int state = STATUS_VALID; while (!_flag->compare_exchange_strong(state, STATUS_LOCKED)) {
state = STATUS_VALID; } } ~ScopeLockPop() {
(*_flag).store(STATUS_INVALID); } private: std::atomic<unsigned int>* _flag; };
源文件
#include "circular_queue.h" template<class T> CircularQueue<T>::CircularQueue() :_maxNum(0) ,_array(nullptr) ,_arrayState(nullptr) ,_front(0) ,_rear(0) {
} template<class T> CircularQueue<T>::~CircularQueue() {
destroy(); } template<class T> bool CircularQueue<T>::init(unsigned int maxNum) {
if (maxNum > 0) {
if (nullptr == _array) {
_array = new T[maxNum]; if (nullptr == _array) {
return false; } } if (nullptr == _arrayState) {
_arrayState = new std::atomic<unsigned int>[maxNum]; if (nullptr == _arrayState) {
return false; } } for (unsigned int stateIndex = 0; stateIndex < maxNum; ++stateIndex) {
_arrayState[stateIndex] = CircularQueueValueStatus::STATUS_INVALID; } _maxNum = maxNum; _front = 0; _rear = 0; return true; } return false; } template<class T> void CircularQueue<T>::destroy() {
delete [] _array; _array = nullptr; delete[] _arrayState; _arrayState = nullptr; } template<class T> bool CircularQueue<T>::push(const T& task) {
register unsigned int oldPos = 0; register unsigned int newPos = 0; while (true) {
oldPos = _rear; newPos = oldPos; // 如果队列已满,无法进行添加 if (0 >= getFreeTaskCount(_front, oldPos)) {
return false; } if (_maxNum == oldPos) {
newPos = 0; } else {
newPos++; } //CAS原子操作,如果rear == oldPos, 则rear = newPos, 如果rear != oldPos, 则没任何变化, 都会返返回&rear指向的地址的值,即rear if (oldPos == _InterlockedCompareExchange(&_rear, newPos, oldPos)) {
// 这里进行原子操作,这是判断获取到oldPos后,rear是否发生变化,如果发生变化,则说明有别的线程在_rear处添加进了task,则,重新进入while循环,更新_rear(原_rear++)后,再进行判断。 // 如果没有发生变化,则说明,本线程可以在_rear处添加task register unsigned int index = oldPos % _maxNum; // RingBuffer循环特性,获取在队列中的index; ScopeLockPush guard(&_arrayState[index]); // 防止 _rear 和 _front的值访问同样的index; 防止array[index]出现数据竞争。同样是使用原子操作的原理防止 _array[index] = task; return true; } } } template <class T> bool CircularQueue<T>::pop(T& task) {
register unsigned int oldPos = 0; register unsigned int newPos = 0; while (true) {
oldPos = _front; newPos = oldPos; // 如果队列中没有任务,则不进行pop if (0 >= getTaskCount(oldPos, _rear)) {
return false; } if (_maxNum == oldPos) {
newPos = 0; } else {
newPos++; } //CAS原子操作,如果rear == oldPos, 则rear = newPos, 如果rear != oldPos, 则没任何变化, 都会返返回&rear指向的地址的值,即rear if (oldPos == _InterlockedCompareExchange(&_front, newPos, oldPos)) {
// 这里进行原子操作,这是判断获取到oldPos后,rear是否发生变化,如果发生变化,则说明有别的线程在_rear处添加进了task,则,重新进入while循环,更新_rear(原_rear++)后,再进行判断。 // 如果没有发生变化,则说明,本线程可以在_rear处添加task register unsigned int index = oldPos % _maxNum; // RingBuffer循环特性,获取在队列中的index; ScopeLockPop guard(&_arrayState[index]); // 防止 _rear 和 _front的值访问同样的index; 防止array[index]出现数据竞争。同样是使用原子操作的原理防止 task = _array[index]; break; } } } //队列中任务个数 template <class T> unsigned int CircularQueue<T>::getTaskCount(unsigned int front, unsigned int rear) {
if (front <= rear) {
return (rear - front); } else {
return (rear + _maxNum + 1 - front); } } //队列中空白的位置 template <class T> unsigned int CircularQueue<T>::getFreeTaskCount(unsigned int front, unsigned int rear) {
return _maxNum - getTaskCount(front, rear); }
测试CircularQueue
#include "circular_queue.cpp" #include
#include
#include
using namespace std; unsigned int* taskNum = new unsigned int(0); class Task {
public: Task() {
while (true) {
unsigned int oldTaskNum = *taskNum; unsigned int newTaskNum = oldTaskNum + 1; if (oldTaskNum == InterlockedCompareExchange(taskNum, newTaskNum, oldTaskNum)) {
taskId = *taskNum; break; } } } public: int taskId; }; int main() {
CircularQueue<Task> queue; bool flag = queue.init(); *taskNum = 0; if (flag) {
auto pushTask = [&]() {
for (int i = 0; i < 10; ++i) {
Task task; if (queue.push(task)) {
cout << "PUSH TASK, TASKID = " << task.taskId << endl; } else {
cout << "PUSH FAILED ID = " << task.taskId << endl; } } }; auto popTask = [&]() {
Sleep(1); for (int i = 0; i < 10; ++i) {
Task task; if (queue.pop(task)) {
cout << "POP TASK, TASKID = " << task.taskId << endl; } } }; thread builder1(pushTask); thread consumer1(popTask); thread consumer2(popTask); thread consumer3(popTask); thread consumer4(popTask); builder1.join(); consumer1.join(); consumer2.join(); consumer3.join(); consumer4.join(); } return 0; }
发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/209208.html原文链接:https://javaforall.net
