ACE_Message_Queue和spawn实现(生产者/消费者)(V2.00)

ACE_Message_Queue和spawn实现(生产者/消费者)(V2.00)

大家好,又见面了,我是全栈君。

參考这里用到了线程管理。參考:http://blog.csdn.net/calmreason/article/details/36399697

以下的两个线程共享一个消息队列,一个用来放整数到队列,一个从队列里取消息出来。

此程序在控制台不停的输出递增数字,主要是内存不会泄露

用到了多线程、ACE_Message_Queue、ACE_Message_Block、ACE_Thread_Manager::instance()->spawn等

#include <iostream>
using namespace std;
#include "boost/lexical_cast.hpp"
using namespace boost;
#include "ace/Thread_Manager.h" 
#include "ace/Message_Queue.h"

void* create_vairous_record(void* ace_message_queue);

void* get_vairous_record(void* ace_message_queue);

int ACE_TMAIN (int argc, ACE_TCHAR *argv[]) 
{

	ACE_Message_Queue<ACE_MT_SYNCH>* various_record_queue = new ACE_Message_Queue<ACE_MT_SYNCH>;

	ACE_Thread_Manager::instance()->spawn(
		ACE_THR_FUNC(create_vairous_record), 
		various_record_queue, 
		THR_NEW_LWP | THR_DETACHED);

	ACE_Thread_Manager::instance()->spawn(
		ACE_THR_FUNC(get_vairous_record), 
		various_record_queue, 
		THR_NEW_LWP | THR_DETACHED);

	ACE_Thread_Manager::instance()->wait();

	return 0;
}

void* create_vairous_record(void* ace_message_queue)
{

	ACE_Message_Queue<ACE_MT_SYNCH>* p_queue = (ACE_Message_Queue<ACE_MT_SYNCH>*)ace_message_queue;
	int i=0;
	while (i<10000000)
	{
		ACE_Message_Block* mbl = new ACE_Message_Block(10);//在这里创建消息
		string temp = lexical_cast<string>(++i);
		mbl->copy(temp.c_str());
		p_queue->enqueue_tail(mbl);//消息被放到队列中(用指针引用消息实体)
	}
	return nullptr;
}

void* get_vairous_record(void* ace_message_queue)
{

	ACE_Message_Queue<ACE_MT_SYNCH>* p_queue = (ACE_Message_Queue<ACE_MT_SYNCH>*)ace_message_queue;
	while (true)
	{
		ACE_Message_Block* mbl =nullptr;
		p_queue->dequeue_head(mbl);//消息出队,出队的消息应该在用完之后被释放
		if (mbl)
		{
			cout<<mbl->rd_ptr()<<endl;
			mbl->release();//消息已经用完。释放消息
		}
	}
	return nullptr;

}

以下的程序实现:多个线程将连续整数分批放到ACE_Message_Queue中,一个消费者线程负责从中取出,并验证数据是否完整无误

#include <iostream>
#include <bitset>
#include <vector>
#include <memory>
using namespace std;

#include "ace/Thread_Manager.h" 
#include "ace/Message_Queue.h"
#include "ace/Message_Block.h"
#include "ace/Task.h"
#include "ace/OS.h"

namespace global
{
    const int total_number = 1000000;
    int task_number = 2;
    typedef int number_type;
}

class Generator_Number : public ACE_Task<ACE_MT_SYNCH>
{
public:
    Generator_Number(ACE_Message_Queue<ACE_MT_SYNCH>* msgq,const int i);
    virtual int open(void *args  = 0 );
    ~Generator_Number(void);
protected:
    Generator_Number(const Generator_Number&);
    Generator_Number& operator=(const Generator_Number&);
private:
    int svc(void);
    int mod_i_;
};

Generator_Number::Generator_Number(ACE_Message_Queue<ACE_MT_SYNCH>* msgq,const int i):mod_i_(i)
{
    this->msg_queue(msgq);
    std::cout<<"Generator_Number(const int "<<i<<")"<<std::endl;
}

int Generator_Number::open(void *args )
{
    return this->activate(THR_NEW_LWP | THR_DETACHED);
}

int Generator_Number::svc(void)
{
    std::cout<<"Generator_Number("<<this->mod_i_<<")::svc()"<<std::endl;
    for (size_t i = this->mod_i_ ; i<global::total_number;i+=global::task_number)
    {
        ACE_Message_Block * blk = new ACE_Message_Block(20);
        blk->copy(reinterpret_cast<const char*>(&i),sizeof(global::number_type));
        this->msg_queue()->enqueue_tail(blk);
    }
    return 0;
}

Generator_Number::~Generator_Number(void)
{
    std::cout<<"~Generator_Number("<<this->mod_i_<<")"<<std::endl;
}

void* out_put_queue(void* all_numbers_queue1)
{
    ACE_Message_Queue<ACE_MT_SYNCH>* all_numbers_queue = (ACE_Message_Queue<ACE_MT_SYNCH>*)all_numbers_queue1;
    bitset<global::total_number> all_number_bitset;
    size_t count_got_message=0;
    while(true)
    {
        if(!all_numbers_queue->is_empty())
        {
            ACE_Message_Block* blk = 0;
            all_numbers_queue->dequeue_head(blk);
            all_number_bitset.set(*reinterpret_cast<global::number_type*>(blk->rd_ptr()));
            blk->release();
            if(++count_got_message == global::total_number)
            {
                break;
            }
        }
        else
        {
            std::cout<<"now sleep 1"<<std::endl;
            ACE_Time_Value t(0,3000);
            ACE_OS::sleep(t);
        }
    }
    global::number_type check =0;
    bool wright_flag = true;
    for (size_t j=0; j!= global::total_number;++j)
    {
        if (0 == all_number_bitset[j])
        {
            wright_flag = false;
            break;
        }
    }
    std::cout<<std::endl;
    std::cout<<"check result:"<<wright_flag<<std::endl;
    return 0;
}
#include "boost/timer.hpp"
using namespace boost;

int ACE_TMAIN (int argc, ACE_TCHAR *argv[]) 
{
    cout<<"total_number:"<<global::total_number<<endl;
    timer t;
    ACE_Message_Queue<ACE_MT_SYNCH>* all_numbers_queue = new ACE_Message_Queue<ACE_MT_SYNCH>;

    vector<shared_ptr<Generator_Number>> gener_array;

    for (int i=0;i<global::task_number;++i)
    {
        gener_array.push_back(shared_ptr<Generator_Number>(new Generator_Number(all_numbers_queue,i)));
    }
    for (vector<shared_ptr<Generator_Number>>::const_iterator citer = gener_array.cbegin();
        citer!=gener_array.cend();
        ++citer)
    {
        (*citer)->open();
    }

    ACE_Thread_Manager::instance()->spawn(
        ACE_THR_FUNC(out_put_queue),   
        all_numbers_queue,   
        THR_NEW_LWP | THR_DETACHED);

    ACE_Thread_Manager::instance()->wait();
    cout<<t.elapsed()<<"s"<<endl;
    return 0;
}


输出例如以下:

total_number:1000000
Generator_Number(const int 0)
Generator_Number(const int 1)
Generator_Number(0)::svc()
Generator_Number(1now sleep 1
)::svc()
now sleep 1
now sleep 1
now sleep 1
now sleep 1
now sleep 1
now sleep 1
now sleep 1
now sleep 1
now sleep 1

check result:1
0.944s
~Generator_Number(0)
~Generator_Number(1)
请按随意键继续. . .

ACE_Message_Queue

高水位低水位

http://blog.163.com/ecy_fu/blog/static/4445126200964115620862/

注意事项

http://blog.chinaunix.net/uid-20453737-id-37118.html

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/115237.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • linux防火墙放行端口命令_防火墙端口查看

    linux防火墙放行端口命令_防火墙端口查看首先确保防火墙是开着的#查看防火墙状态systemctlstatusfirewalld#开启防火墙systemctlstartfirewalld防火墙放行端口1.添加端口6666代表端口号firewall-cmd–zone=public–add-port=6666/tcp–permanent2.刷新生效firewall-cm…

    2022年9月22日
    0
  • cns服务搭建+手机ml,百度直连「建议收藏」

    cns服务搭建+手机ml,百度直连「建议收藏」1买个服务器,阿里云或腾讯云2执行代码GitHub:githubLinux一键:安装:`typecurl&>/dev/null&&echo’curl-O’||echo’wget-Ocns.sh’`http://pros.cutebi.taobao69.cn:666/cns/cns.sh&&shcns.sh卸载:`typecurl&>/dev/null&&echo’curl-

    2025年7月5日
    0
  • Oracle数据库学习之数据类型和表的操作「建议收藏」

    Oracle数据库学习之数据类型和表的操作「建议收藏」Oracle的数据类型:字符型数据类型charvarcharvarchar2long这几个字段之间的区别:char的长度是固定的,而varchar2的长度是可以变化的。也就是char(20)和varchar2(20)都存储”abc”,char是占用20个字符的,而varchar2是占用3个字符空间的。但是char的效率要高与varchar。这也就是平时说的以空间换效率。如果有一

    2022年8月30日
    2
  • vue富文本编辑器的使用_elementui富文本

    vue富文本编辑器的使用_elementui富文本写好的富文本编辑器,附带功能齐全,复制即用!!!(Quill官方中文文档)一、安装二、注册1.在.main.js中注册富文本编辑器三、使用1.以下是写好的富文本编辑器,附带功能齐全(Quill官方中文文档)2.新建一个Editor文件夹,文件夹下创建一个index.vue文件,将此复制到vue文件里3.将Editor文件夹放入Vue项目的components组件包里方便其他页面直接引用富文本编辑器5.页面引入刚刚写好的富文本编辑器组件6.效果:………

    2022年10月14日
    0
  • JavaWeb专栏之(三):Eclipse创建JavaWeb项目「建议收藏」

    JavaWeb专栏之(三):Eclipse创建JavaWeb项目「建议收藏」JavaWeb专栏之(三):Eclipse创建JavaWeb项目前言:关注:《遇见小Du说》微信公众号,分享更多Java知识,不负每一次相遇。更多内容请访问:www.dushunchang.top在上一篇文章中,小Du猿带大家使用Idea创建JavaWeb项目,相比之下Idea作为当前非常主流的开发IDE,深受Java后端程序员使用。市面上约75%开发者使用Idea,一代开发神器Eclipse就此没落。小Du猿第一次使用的开发IDE就是Eclipse,也算是我的启蒙神器。今天就带了使

    2022年6月18日
    21
  • docker开放2375端口,并添加安全传输层协议(TLS)和CA认证

    docker开放2375端口,并添加安全传输层协议(TLS)和CA认证为了更便捷地打包和部署,服务器需要开放2375端口才能连接docker,但如果开放了端口没有做任何安全保护,会引起安全漏洞,被人入侵、挖矿、CPU飙升这些情况都有发生,任何知道你IP的人,都可以管理这台主机上的容器和镜像,真的可怕。为了解决安全问题,只要使用安全传输层协议(TLS)进行传输并使用CA认证即可。制作证书及秘钥我们需要使用OpenSSL制作CA机构证书、服务端证书和客户端证书,以下操作均在安装Docker的Linux服务器上进行。创建一个目录用于存储生成的证书和秘钥mkdir

    2022年6月3日
    174

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号