Mina入门实例

Mina入门实例

大家好,又见面了,我是全栈君。

继续上一篇,这篇主要讲通过minaB端发送消息。并接受消息,mina是一个网络通信框架,封装了javaNIO。简单易用。网上有非常多关于他的介绍,在此不赘述了。

如上篇所介绍,完毕功能,须要五个类:

PoolListener:监听,用来在系统启动的时候创建连接。

SessionPool:连接池。

SendHandler:处理类。

CharsetEncoder:编码;

CharsetDecoder:解码:

B为我们提供了6个port。每一个port可建立3个长连接。因此。在系统时,就要创建长连接,以下是一个监听类:

import javax.servlet.ServletContextEvent;import javax.servlet.ServletContextListener;/** * 初始化连接 * @author yuanfubiao * */public class PoolListener implements ServletContextListener {	@Override	public void contextDestroyed(ServletContextEvent sce) {			}	@Override	public void contextInitialized(ServletContextEvent sce) {		String nds_ip = sce.getServletContext().getInitParameter("nds_ip");		String nds_ports = sce.getServletContext().getInitParameter("nds_ports");		SessionPool pool = new SessionPool();		try {						pool.init(nds_ip, nds_ports);		} catch (Exception e) {			e.printStackTrace();		}	}}

以下是监听配置,是配置在web.xml中:

    <display-name>Apache-Axis2</display-name>    <context-param>    	<param-name>nds_ip</param-name>    	<param-value>XX.XXX.XXX.XXX</param-value>    </context-param>    <context-param>    	<param-name>nds_ports</param-name>    	<param-value>12210,12211,12212,12213,12214,12215</param-value>    </context-param>    <listener>    	<listener-class>cn.net.easyway.nds.PoolListener</listener-class>    </listener>

以下是自己维护的一个连接池,相同使用并发包中的ConcurrentHashMap实现,他也是线程安全的,代码例如以下:

import java.net.InetSocketAddress;import java.util.HashMap;import java.util.Map;import java.util.concurrent.ConcurrentHashMap;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.apache.mina.core.future.ConnectFuture;import org.apache.mina.core.service.IoConnector;import org.apache.mina.core.session.IoSession;import org.apache.mina.filter.codec.ProtocolCodecFilter;import org.apache.mina.transport.socket.nio.NioSocketConnector;public class SessionPool {		private static Log logger = LogFactory.getLog(SessionPool.class);	private static int connNum = 0;	private static String ip = null;	private static Map<String,Integer> connNumPorts = new HashMap<String, Integer>();	private static ConcurrentHashMap<String, IoSession> pool = new ConcurrentHashMap<String, IoSession>();		/**	 * 初始化:读取配置文件。创建长连接	 * @throws Exception 	 */	public void init(String nds_ip,String nds_ports) throws Exception{		String[] ports = nds_ports.split(",");		ip = nds_ip;				for(int i=0;i<ports.length;i++){						int port = Integer.parseInt(ports[i]);			ConnectFuture future = null;						for(int j=0;j<3;j++){				String connNum = this.getConnNums();				logger.info("创建连接号---->>>>>" + connNum);				connNumPorts.put(connNum, port);				future = SessionPool.createConnect(ip, port);				if(future.isConnected()){					logger.info("创建连接------->" + future.getSession());					pool.put(connNum, future.getSession());				}else{					logger.error("连接创建错误,请检查IP和端口配置!" + future);				}						}		}	}		/**	 * 获取一个连接	 * @param num	 * @return	 */	public static IoSession  getSession(String strNum){				logger.info("IP端口号:" + ip + "连接序列号:" + strNum + "端口号:" + connNumPorts.get(strNum));				IoSession session = pool.get(strNum);				if(null == session || !session.isClosing()){			ConnectFuture newConn = createConnect(ip, connNumPorts.get(strNum));						if(!newConn.isConnected()){				newConn =  createConnect(ip,connNumPorts.get(strNum));			}			session = newConn.getSession();			pool.replace(strNum, session);		}				return session;	}		/**	 * 创建连接	 * @param ip	 * @param port	 * @return	 */	private static ConnectFuture createConnect(String strIp,int intPort){				IoConnector connector = new NioSocketConnector();				connector.getFilterChain().addLast("codec"				,new ProtocolCodecFilter(new CharsetCodecFactory()));				connector.setHandler(new SendHandler());				ConnectFuture future = connector.connect(new InetSocketAddress(strIp,intPort));		connector.getSessionConfig().setReadBufferSize(128);		future.awaitUninterruptibly();				return future;	}		/**	 * 生成连接序列号	 * @return	 */	private synchronized String getConnNums(){				if(18 == connNum){			connNum = 0;		}				connNum++;				return String.format("%02x", connNum);	}}

因此。在项目启动的时候就会有18个连接自己主动创建。并放在pool中等待我们的使用。

以下是业务处理类。须要继承IoHandlerAdapter类。而且实现以下几个方法:

import nds.framework.security.NDSMD5;import org.apache.commons.codec.binary.Hex;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.apache.mina.core.service.IoHandlerAdapter;import org.apache.mina.core.session.IdleStatus;import org.apache.mina.core.session.IoSession;import cm.custom.service.reception.RecResponse;import cm.custom.service.reception.ReceptionResponseServiceStub;/** * 业务处理 * @author yuanfubiao * */public class SendHandler extends IoHandlerAdapter {	private static Log logger = LogFactory.getLog(SendHandler.class);		@Override	public void exceptionCaught(IoSession session, Throwable cause)			throws Exception {		logger.error("连接出错", cause);	}	@Override	/**	 * 设置空暇时间	 */	public void sessionCreated(IoSession session) throws Exception {		session.getConfig().setIdleTime(IdleStatus.BOTH_IDLE, 60);	}		/**	 * 接受到消息后,通过WS发送给用户管理系统	 */	@Override	public void messageReceived(IoSession session, Object message)			throws Exception {		String result = message.toString().trim();		String temp = result.substring(0, result.length()-16).trim();		logger.info("接受到的数据:" + result);		//验证签名		String signature = null;		String securityKey = "12345678";		try {			byte binSignature[] = NDSMD5.signPacket(temp.getBytes(), securityKey);			signature = new String(Hex.encodeHex(binSignature));		} catch (Exception e) {			e.printStackTrace();		}				String packet = temp + signature.toUpperCase().trim();				if(!result.equalsIgnoreCase(packet)){			logger.error("数字签名不对。错误指令:" + result);			return;		}		logger.info("接受到的数据:" + packet);		RecResponse res = new RecResponse();		res.setResponse(temp);		ReceptionResponseServiceStub stub = new ReceptionResponseServiceStub();		stub.recResponse(res);	}		/**	 * 连接空暇时。发送心跳包	 */	@Override	public void sessionIdle(IoSession session, IdleStatus status)			throws Exception {		if(status == IdleStatus.BOTH_IDLE){				session.write("heartbeat");		}	}}

一般我们在写socket程序时。用堵塞的方式读取消息,通常是依据消息换行符或者特殊字符,或者对方关闭流来证明一条信息读取完毕,在mina中,有默认的编解码方式。但也能够自己定义,比方以长度来推断一条消息是否读取完毕:

编码

import java.nio.charset.Charset;import org.apache.mina.core.buffer.IoBuffer;import org.apache.mina.core.session.IoSession;import org.apache.mina.filter.codec.ProtocolEncoderAdapter;import org.apache.mina.filter.codec.ProtocolEncoderOutput;/** * 编码 * @author yuanfubiao * */public class CharsetEncoder extends ProtocolEncoderAdapter{		private final static Charset charset = Charset.forName("utf-8");		@Override	public void encode(IoSession session, Object message, ProtocolEncoderOutput out)			throws Exception {				IoBuffer buff = IoBuffer.allocate(100).setAutoExpand(true);		buff.putString(message.toString(), charset.newEncoder());				buff.flip();		out.write(buff);	}}

解码

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.CumulativeProtocolDecoder;
import org.apache.mina.filter.codec.ProtocolDecoderOutput;

/**
 * 解码
 * @author yuanfubiao
 *
 */
public class CharsetDecoder extends CumulativeProtocolDecoder{
	private static Log logger = LogFactory.getLog(CharsetDecoder.class);
	@Override
	protected boolean doDecode(IoSession session, IoBuffer in,
			ProtocolDecoderOutput out) throws Exception {
		
		if(in.remaining() >= 9){ //心跳为最小传输长度
			
			byte[] headBytes = new byte[in.limit()];
			logger.info("接收到消息" + headBytes.toString());
			in.get(headBytes, 0, 9);
			String head = new String(headBytes).trim();
			if("heartbeat".equalsIgnoreCase(head)){
				return true;
			}
			
			int lenPack = Integer.parseInt(head.substring(5, 9), 16)-9;
			
			if(in.remaining() == lenPack){ //验证消息长度
				byte[] bodyBytes = new byte[in.limit()];
				in.get(bodyBytes,0,lenPack);
				String body = new String(bodyBytes);
				out.write(head.trim()+body.trim());
				return true;
			}
			in.flip();
			return false;
		}
		return false;
	}
}

源代码下载:
http://download.csdn.net/detail/stubbornpotatoes/7438435

关于mina发现一个系列好文章:http://xxgblog.com/2014/10/16/mina-netty-twisted-10/

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/115701.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • 急!!ftp登录错误,提示 530 not logged in,连接失败 ,,是怎么回事啊

    急!!ftp登录错误,提示 530 not logged in,连接失败 ,,是怎么回事啊

    2021年9月23日
    108
  • nextline函数_在JAVA中Scanner中的next()和nextLine()为什么不能一起使用?

    nextline函数_在JAVA中Scanner中的next()和nextLine()为什么不能一起使用?Java输入一直是一个坑,本来一直用Scanner,但一直搞不懂换行符啥的,就用BufferReader,但前不久大疆笔试需要持续输入,早忘了Scanner怎么写,而那个场景用Scanner很好实现……就继续在这里记录一下Scanner的坑吧一、next&nextLine区别next不能得到带有空格的字符串一定要读到有效字符后才可以结束,结束条件是碰到空格…

    2022年6月2日
    37
  • 2套后台模板HTML+整套Easyui皮肤组件-后台管理系统模板

    2套后台模板HTML+整套Easyui皮肤组件-后台管理系统模板2019年最新easyui主题模板设计:http://www.uimaker.com/easyui本作品仅供学习参考,请勿用于任何商业用途,版权所有:uimaker.com,谢绝任何网站转载,请互相理解!设计业务联系QQ:32534386请注:模板说明:由于效果图比较多,合并成一个图片文件后,文件很大,所以进行了压缩,导致您看到的效果图都比较灰,其实…

    2022年9月11日
    0
  • 数据链路层学习之LLDP「建议收藏」

    一、LLDP协议概述 随着网络技术的发展,接入网络的设备的种类越来越多,配置越来越复杂,来自不同设备厂商的设备也往往会增加自己特有的功能,这就导致在一个网络中往往会有很多具有不同特性的、来自不同厂商的设备,为了方便对这样的网络进行管理,就需要使得不同厂商的设备能够在网络中相互发现并交互各自的系统及配置信息。 LLDP(LinkLayerDiscoveryProtocol,链路层发现协

    2022年4月3日
    104
  • PyQuery 详解「建议收藏」

    PyQuery 详解「建议收藏」PyQuery库是一个非常强大又灵活的网页解析库,如果你有前端开发经验,那么你应该接触过jQuery,那么PyQuery就是你非常绝佳的选择,PyQuery是Python仿照jQuery的严格实现,语法与jQuery几乎完全相同。安装跟安装其他库一样:>>>pip3installpyquery安装了之后,在程序里面就可以引用了,引用方法跟其他…

    2022年5月31日
    35
  • getjson跨域

    getjson跨域

    2021年8月19日
    53

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号