一、无锁队列原理
1、队列操作模型
队列是一种非常重要的数据结构,其特性是先进先出(FIFO),符合流水线业务流程。在进程间通信、网络通信间经常采用队列做缓存,缓解数据处理压力。
根据操作队列的场景分为:单生产者——单消费者、多生产者——单消费者、单生产者——多消费者、多生产者——多消费者四大模型。根据队列中数据分为:队列中的数据是定长的、队列中的数据是变长的。
(1)单生产者——单消费者

(2)多生产者——单消费者

(3)单生产者——多消费者

(4)多生产者——多消费者

(5)数据定长队列

(6)数据变长队列

2、 无锁队列
生产环境中广泛使用生产者和消费者模型,要求生产者在生产的同时,消费者可以进行消费,通常使用互斥锁保证数据同步。但线程互斥锁的开销仍然比较大,因此在要求高性能、低延时场景中,推荐使用无锁队列。
3、CAS操作
bool compare_and_swap ( int *memory_location, int expected_value, int new_value) { if (*memory_location == expected_value) { *memory_location = new_value; return true; } return false; }
bool __sync_bool_compare_and_swap (type *ptr, type oldval type newval, ...); type __sync_val_compare_and_swap (type *ptr, type oldval type newval, ...);
(2)Windows对CAS支持
Windows中使用Windows API支持CAS。
LONG InterlockedCompareExchange( LONG volatile *Destination, LONG ExChange, LONG Comperand );
(3)C11对CAS支持
C11 STL中atomic函数支持CAS并可以跨平台。
template< class T > bool atomic_compare_exchange_weak( std::atomic* obj,T* expected, T desired ); template< class T > bool atomic_compare_exchange_weak( volatile std::atomic* obj,T* expected, T desired );
二、无锁队列方案
1、boost方案
2、ConcurrentQueue
三、无锁队列实现
1、环形缓冲区

空队列时,front与rear相等;当有元素进队,则rear后移;有元素出队,则front后移。

data[rear] = x; rear = (rear+1)%maxn;
出队操作:
x = data[front]; rear = (front+1)%maxn;
2、单生产者单消费者
3、多生产者单消费者
多生产者和单消费者场景中,由于多个生产者都会修改write_index,所以在不加锁的情况下必须使用原子操作。
四、无锁队列使用
1、ConcurrentQueue无锁队列
实现demo如下:
lock_free_queue_impl.h
#pragma once #include
#include
#include
#include
#include"concurrentqueue.h" class Response { public: ~Response() { std::cout << "~Response()" << std::endl; } int status_code; int content; }; class LockFreeImpl { public: LockFreeImpl() {} ~LockFreeImpl() {} static LockFreeImpl& GetRef(); void Init(); void DoJob(); void OnMessage(std::shared_ptr
response); void HandleMsg(std::shared_ptr
response); void Release(); private: volatile bool inited_ = false; std::thread work_thread_; moodycamel::ConcurrentQueue
>* worker_queue_; };
lock_free_queue_impl.cpp
#include"lock_free_queue_impl.h" #include
LockFreeImpl& LockFreeImpl::GetRef() { static LockFreeImpl impl; return impl; } void LockFreeImpl::Init() { inited_ = true; worker_queue_ = new moodycamel::ConcurrentQueue
>(81920, 1, 1); work_thread_ = std::thread(std::bind(&LockFreeImpl::DoJob, this)); std::cout << "LockFreeImpl init successfully!" << std::endl; } void LockFreeImpl::DoJob() { std::cout << "worker thread, worker thread start!" << std::endl; std::shared_ptr
item = nullptr; while (inited_) { if (worker_queue_->try_dequeue(item) == false) { Sleep(10); continue; } if (item == nullptr) std::cout << "item == nullptr" << std::endl; else { HandleMsg(item); } } std::cout << "worker thread, worker thread exit successfully!" << std::endl; } void LockFreeImpl::OnMessage(std::shared_ptr
response) { try { worker_queue_->try_enqueue(response); } catch (const std::exception& e) { std::cout << "the error is: " << e.what() << std::endl; } } void LockFreeImpl::HandleMsg(std::shared_ptr
response) { std::cout << "\n ----------------------开始解析数据----------------------" << std::endl; std::cout << "response->status_code is: " << response->status_code << std::endl; std::cout << "response->content is: " << response->content << std::endl; } void LockFreeImpl::Release() { if (inited_) inited_ = false; if (work_thread_.joinable()) work_thread_.join(); delete worker_queue_; std::cout << "LockFreeImpl release,release exit successfully!" << std::endl; }
main.cpp
#include
#include
#include"lock_free_queue_impl.h" using namespace std; int main() { LockFreeImpl lock_free_impl; lock_free_impl.Init(); std::shared_ptr
response(new Response()); response->status_code = 0; response->content = 1; for (int i = 0; i < 4; ++i) { std::cout << "开始push数据...." << std::endl; Sleep(1000); lock_free_impl.OnMessage(response); } system("pause"); lock_free_impl.Release(); return 0; }
结果如下:

concurrentqueue.h 文件下载上文链接
参考链接:
https://blog.51cto.com/u_/
发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/201670.html原文链接:https://javaforall.net
