Mina入门实例

Mina入门实例

大家好,又见面了,我是全栈君。

继续上一篇,这篇主要讲通过minaB端发送消息。并接受消息,mina是一个网络通信框架,封装了javaNIO。简单易用。网上有非常多关于他的介绍,在此不赘述了。

如上篇所介绍,完毕功能,须要五个类:

PoolListener:监听,用来在系统启动的时候创建连接。

SessionPool:连接池。

SendHandler:处理类。

CharsetEncoder:编码;

CharsetDecoder:解码:

B为我们提供了6个port。每一个port可建立3个长连接。因此。在系统时,就要创建长连接,以下是一个监听类:

import javax.servlet.ServletContextEvent;import javax.servlet.ServletContextListener;/** * 初始化连接 * @author yuanfubiao * */public class PoolListener implements ServletContextListener {	@Override	public void contextDestroyed(ServletContextEvent sce) {			}	@Override	public void contextInitialized(ServletContextEvent sce) {		String nds_ip = sce.getServletContext().getInitParameter("nds_ip");		String nds_ports = sce.getServletContext().getInitParameter("nds_ports");		SessionPool pool = new SessionPool();		try {						pool.init(nds_ip, nds_ports);		} catch (Exception e) {			e.printStackTrace();		}	}}

以下是监听配置,是配置在web.xml中:

    <display-name>Apache-Axis2</display-name>    <context-param>    	<param-name>nds_ip</param-name>    	<param-value>XX.XXX.XXX.XXX</param-value>    </context-param>    <context-param>    	<param-name>nds_ports</param-name>    	<param-value>12210,12211,12212,12213,12214,12215</param-value>    </context-param>    <listener>    	<listener-class>cn.net.easyway.nds.PoolListener</listener-class>    </listener>

以下是自己维护的一个连接池,相同使用并发包中的ConcurrentHashMap实现,他也是线程安全的,代码例如以下:

import java.net.InetSocketAddress;import java.util.HashMap;import java.util.Map;import java.util.concurrent.ConcurrentHashMap;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.apache.mina.core.future.ConnectFuture;import org.apache.mina.core.service.IoConnector;import org.apache.mina.core.session.IoSession;import org.apache.mina.filter.codec.ProtocolCodecFilter;import org.apache.mina.transport.socket.nio.NioSocketConnector;public class SessionPool {		private static Log logger = LogFactory.getLog(SessionPool.class);	private static int connNum = 0;	private static String ip = null;	private static Map<String,Integer> connNumPorts = new HashMap<String, Integer>();	private static ConcurrentHashMap<String, IoSession> pool = new ConcurrentHashMap<String, IoSession>();		/**	 * 初始化:读取配置文件。创建长连接	 * @throws Exception 	 */	public void init(String nds_ip,String nds_ports) throws Exception{		String[] ports = nds_ports.split(",");		ip = nds_ip;				for(int i=0;i<ports.length;i++){						int port = Integer.parseInt(ports[i]);			ConnectFuture future = null;						for(int j=0;j<3;j++){				String connNum = this.getConnNums();				logger.info("创建连接号---->>>>>" + connNum);				connNumPorts.put(connNum, port);				future = SessionPool.createConnect(ip, port);				if(future.isConnected()){					logger.info("创建连接------->" + future.getSession());					pool.put(connNum, future.getSession());				}else{					logger.error("连接创建错误,请检查IP和端口配置!" + future);				}						}		}	}		/**	 * 获取一个连接	 * @param num	 * @return	 */	public static IoSession  getSession(String strNum){				logger.info("IP端口号:" + ip + "连接序列号:" + strNum + "端口号:" + connNumPorts.get(strNum));				IoSession session = pool.get(strNum);				if(null == session || !session.isClosing()){			ConnectFuture newConn = createConnect(ip, connNumPorts.get(strNum));						if(!newConn.isConnected()){				newConn =  createConnect(ip,connNumPorts.get(strNum));			}			session = newConn.getSession();			pool.replace(strNum, session);		}				return session;	}		/**	 * 创建连接	 * @param ip	 * @param port	 * @return	 */	private static ConnectFuture createConnect(String strIp,int intPort){				IoConnector connector = new NioSocketConnector();				connector.getFilterChain().addLast("codec"				,new ProtocolCodecFilter(new CharsetCodecFactory()));				connector.setHandler(new SendHandler());				ConnectFuture future = connector.connect(new InetSocketAddress(strIp,intPort));		connector.getSessionConfig().setReadBufferSize(128);		future.awaitUninterruptibly();				return future;	}		/**	 * 生成连接序列号	 * @return	 */	private synchronized String getConnNums(){				if(18 == connNum){			connNum = 0;		}				connNum++;				return String.format("%02x", connNum);	}}

因此。在项目启动的时候就会有18个连接自己主动创建。并放在pool中等待我们的使用。

以下是业务处理类。须要继承IoHandlerAdapter类。而且实现以下几个方法:

import nds.framework.security.NDSMD5;import org.apache.commons.codec.binary.Hex;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.apache.mina.core.service.IoHandlerAdapter;import org.apache.mina.core.session.IdleStatus;import org.apache.mina.core.session.IoSession;import cm.custom.service.reception.RecResponse;import cm.custom.service.reception.ReceptionResponseServiceStub;/** * 业务处理 * @author yuanfubiao * */public class SendHandler extends IoHandlerAdapter {	private static Log logger = LogFactory.getLog(SendHandler.class);		@Override	public void exceptionCaught(IoSession session, Throwable cause)			throws Exception {		logger.error("连接出错", cause);	}	@Override	/**	 * 设置空暇时间	 */	public void sessionCreated(IoSession session) throws Exception {		session.getConfig().setIdleTime(IdleStatus.BOTH_IDLE, 60);	}		/**	 * 接受到消息后,通过WS发送给用户管理系统	 */	@Override	public void messageReceived(IoSession session, Object message)			throws Exception {		String result = message.toString().trim();		String temp = result.substring(0, result.length()-16).trim();		logger.info("接受到的数据:" + result);		//验证签名		String signature = null;		String securityKey = "12345678";		try {			byte binSignature[] = NDSMD5.signPacket(temp.getBytes(), securityKey);			signature = new String(Hex.encodeHex(binSignature));		} catch (Exception e) {			e.printStackTrace();		}				String packet = temp + signature.toUpperCase().trim();				if(!result.equalsIgnoreCase(packet)){			logger.error("数字签名不对。错误指令:" + result);			return;		}		logger.info("接受到的数据:" + packet);		RecResponse res = new RecResponse();		res.setResponse(temp);		ReceptionResponseServiceStub stub = new ReceptionResponseServiceStub();		stub.recResponse(res);	}		/**	 * 连接空暇时。发送心跳包	 */	@Override	public void sessionIdle(IoSession session, IdleStatus status)			throws Exception {		if(status == IdleStatus.BOTH_IDLE){				session.write("heartbeat");		}	}}

一般我们在写socket程序时。用堵塞的方式读取消息,通常是依据消息换行符或者特殊字符,或者对方关闭流来证明一条信息读取完毕,在mina中,有默认的编解码方式。但也能够自己定义,比方以长度来推断一条消息是否读取完毕:

编码

import java.nio.charset.Charset;import org.apache.mina.core.buffer.IoBuffer;import org.apache.mina.core.session.IoSession;import org.apache.mina.filter.codec.ProtocolEncoderAdapter;import org.apache.mina.filter.codec.ProtocolEncoderOutput;/** * 编码 * @author yuanfubiao * */public class CharsetEncoder extends ProtocolEncoderAdapter{		private final static Charset charset = Charset.forName("utf-8");		@Override	public void encode(IoSession session, Object message, ProtocolEncoderOutput out)			throws Exception {				IoBuffer buff = IoBuffer.allocate(100).setAutoExpand(true);		buff.putString(message.toString(), charset.newEncoder());				buff.flip();		out.write(buff);	}}

解码

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.CumulativeProtocolDecoder;
import org.apache.mina.filter.codec.ProtocolDecoderOutput;

/**
 * 解码
 * @author yuanfubiao
 *
 */
public class CharsetDecoder extends CumulativeProtocolDecoder{
	private static Log logger = LogFactory.getLog(CharsetDecoder.class);
	@Override
	protected boolean doDecode(IoSession session, IoBuffer in,
			ProtocolDecoderOutput out) throws Exception {
		
		if(in.remaining() >= 9){ //心跳为最小传输长度
			
			byte[] headBytes = new byte[in.limit()];
			logger.info("接收到消息" + headBytes.toString());
			in.get(headBytes, 0, 9);
			String head = new String(headBytes).trim();
			if("heartbeat".equalsIgnoreCase(head)){
				return true;
			}
			
			int lenPack = Integer.parseInt(head.substring(5, 9), 16)-9;
			
			if(in.remaining() == lenPack){ //验证消息长度
				byte[] bodyBytes = new byte[in.limit()];
				in.get(bodyBytes,0,lenPack);
				String body = new String(bodyBytes);
				out.write(head.trim()+body.trim());
				return true;
			}
			in.flip();
			return false;
		}
		return false;
	}
}

源代码下载:
http://download.csdn.net/detail/stubbornpotatoes/7438435

关于mina发现一个系列好文章:http://xxgblog.com/2014/10/16/mina-netty-twisted-10/

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/115701.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • PNP和NPN的区别_pnp和npn的二极管图

    PNP和NPN的区别_pnp和npn的二极管图为什么PNP三极管集电极(C)和发射极(E)反着接,却可以当开关使用?理解NPN和PNP两种类型的三极管原理及电流方向就会明白为什么PNP三极管的集电极和发射极反着接当开关使用。NPN和PNP三极

    2022年8月5日
    10
  • 程序员必备的网站推荐软件_程序员解决问题的网站

    程序员必备的网站推荐软件_程序员解决问题的网站程序员必备的网站推荐注:我收集的网站很有限,可能有很多网站没有列出,希望大家可以通过评论告知我,我会尽快添加。一、开源代码托管平台1.GitHub(https://github.com)gitHub是一个面向开源及私有软件项目的托管平台,因为只支持git作为唯一的版本库格式进行托管,故名gitHub。gitHub于2008年4月10日正式上线,除了git代码仓库托管…

    2022年10月11日
    3
  • 画完三角形再画谢尔宾斯基地毯

    画完三角形再画谢尔宾斯基地毯照样废话不说,看代码看注释importjava.awt.Color;importjava.awt.Dimension;importjava.awt.Graphics;importjava.awt.Toolkit;importjava.awt.event.MouseAdapter;importjava.awt.event.MouseEvent;import…

    2022年7月13日
    16
  • 联想拯救者y7000按键功能_联想Y7000P屏幕闪现白色横条

    联想拯救者y7000按键功能_联想Y7000P屏幕闪现白色横条前阶段买了一个拯救者Y7000P,记录一下功能键的使用:1、一些基本的使用就不详细说了Fn+F1-F11(音量亮度调节等等):其中Fn+F4是关闭开启麦克风,Fn+F7是用来设置扩展屏幕的场景Fn+F9进入设置界面Fn+F10关闭开启摄像头Fn+F11关闭开启触摸板开启关闭切换键盘灯:Fn+Space(空格)切换三种工作模式:Fn+Q键开启关闭屏幕上的Y字logo:Fn+L键2、Fn+Q切换的三种模式:(切换时需接通电源)安静模式:

    2022年9月19日
    4
  • JavaScript-ECMAScript5-JS基础语法「建议收藏」

    JavaScript-ECMAScript5-JS基础语法「建议收藏」JavaScript-ECMAScript5-基础语法

    2022年8月11日
    8
  • 学习 Spring Boot 知识看这一篇就够了

    学习 Spring Boot 知识看这一篇就够了从2016年因为工作原因开始研究SpringBoot,先后写了很多关于SpringBoot的文章,发表在技术社区、我的博客和我的公号内。粗略的统计了一下总共的文章加起来大概有六十多篇了,其中一部分是在技术社区做的系列课程。我在写文章的时候将文章中的示例提取出来,作为开源代码分享了出来让大家以更方便的方式去学习(https://github.com/ityouknow/spri…

    2022年7月15日
    20

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号