Mina入门实例

Mina入门实例

大家好,又见面了,我是全栈君。

继续上一篇,这篇主要讲通过minaB端发送消息。并接受消息,mina是一个网络通信框架,封装了javaNIO。简单易用。网上有非常多关于他的介绍,在此不赘述了。

如上篇所介绍,完毕功能,须要五个类:

PoolListener:监听,用来在系统启动的时候创建连接。

SessionPool:连接池。

SendHandler:处理类。

CharsetEncoder:编码;

CharsetDecoder:解码:

B为我们提供了6个port。每一个port可建立3个长连接。因此。在系统时,就要创建长连接,以下是一个监听类:

import javax.servlet.ServletContextEvent;import javax.servlet.ServletContextListener;/** * 初始化连接 * @author yuanfubiao * */public class PoolListener implements ServletContextListener {	@Override	public void contextDestroyed(ServletContextEvent sce) {			}	@Override	public void contextInitialized(ServletContextEvent sce) {		String nds_ip = sce.getServletContext().getInitParameter("nds_ip");		String nds_ports = sce.getServletContext().getInitParameter("nds_ports");		SessionPool pool = new SessionPool();		try {						pool.init(nds_ip, nds_ports);		} catch (Exception e) {			e.printStackTrace();		}	}}

以下是监听配置,是配置在web.xml中:

    <display-name>Apache-Axis2</display-name>    <context-param>    	<param-name>nds_ip</param-name>    	<param-value>XX.XXX.XXX.XXX</param-value>    </context-param>    <context-param>    	<param-name>nds_ports</param-name>    	<param-value>12210,12211,12212,12213,12214,12215</param-value>    </context-param>    <listener>    	<listener-class>cn.net.easyway.nds.PoolListener</listener-class>    </listener>

以下是自己维护的一个连接池,相同使用并发包中的ConcurrentHashMap实现,他也是线程安全的,代码例如以下:

import java.net.InetSocketAddress;import java.util.HashMap;import java.util.Map;import java.util.concurrent.ConcurrentHashMap;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.apache.mina.core.future.ConnectFuture;import org.apache.mina.core.service.IoConnector;import org.apache.mina.core.session.IoSession;import org.apache.mina.filter.codec.ProtocolCodecFilter;import org.apache.mina.transport.socket.nio.NioSocketConnector;public class SessionPool {		private static Log logger = LogFactory.getLog(SessionPool.class);	private static int connNum = 0;	private static String ip = null;	private static Map<String,Integer> connNumPorts = new HashMap<String, Integer>();	private static ConcurrentHashMap<String, IoSession> pool = new ConcurrentHashMap<String, IoSession>();		/**	 * 初始化:读取配置文件。创建长连接	 * @throws Exception 	 */	public void init(String nds_ip,String nds_ports) throws Exception{		String[] ports = nds_ports.split(",");		ip = nds_ip;				for(int i=0;i<ports.length;i++){						int port = Integer.parseInt(ports[i]);			ConnectFuture future = null;						for(int j=0;j<3;j++){				String connNum = this.getConnNums();				logger.info("创建连接号---->>>>>" + connNum);				connNumPorts.put(connNum, port);				future = SessionPool.createConnect(ip, port);				if(future.isConnected()){					logger.info("创建连接------->" + future.getSession());					pool.put(connNum, future.getSession());				}else{					logger.error("连接创建错误,请检查IP和端口配置!" + future);				}						}		}	}		/**	 * 获取一个连接	 * @param num	 * @return	 */	public static IoSession  getSession(String strNum){				logger.info("IP端口号:" + ip + "连接序列号:" + strNum + "端口号:" + connNumPorts.get(strNum));				IoSession session = pool.get(strNum);				if(null == session || !session.isClosing()){			ConnectFuture newConn = createConnect(ip, connNumPorts.get(strNum));						if(!newConn.isConnected()){				newConn =  createConnect(ip,connNumPorts.get(strNum));			}			session = newConn.getSession();			pool.replace(strNum, session);		}				return session;	}		/**	 * 创建连接	 * @param ip	 * @param port	 * @return	 */	private static ConnectFuture createConnect(String strIp,int intPort){				IoConnector connector = new NioSocketConnector();				connector.getFilterChain().addLast("codec"				,new ProtocolCodecFilter(new CharsetCodecFactory()));				connector.setHandler(new SendHandler());				ConnectFuture future = connector.connect(new InetSocketAddress(strIp,intPort));		connector.getSessionConfig().setReadBufferSize(128);		future.awaitUninterruptibly();				return future;	}		/**	 * 生成连接序列号	 * @return	 */	private synchronized String getConnNums(){				if(18 == connNum){			connNum = 0;		}				connNum++;				return String.format("%02x", connNum);	}}

因此。在项目启动的时候就会有18个连接自己主动创建。并放在pool中等待我们的使用。

以下是业务处理类。须要继承IoHandlerAdapter类。而且实现以下几个方法:

import nds.framework.security.NDSMD5;import org.apache.commons.codec.binary.Hex;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.apache.mina.core.service.IoHandlerAdapter;import org.apache.mina.core.session.IdleStatus;import org.apache.mina.core.session.IoSession;import cm.custom.service.reception.RecResponse;import cm.custom.service.reception.ReceptionResponseServiceStub;/** * 业务处理 * @author yuanfubiao * */public class SendHandler extends IoHandlerAdapter {	private static Log logger = LogFactory.getLog(SendHandler.class);		@Override	public void exceptionCaught(IoSession session, Throwable cause)			throws Exception {		logger.error("连接出错", cause);	}	@Override	/**	 * 设置空暇时间	 */	public void sessionCreated(IoSession session) throws Exception {		session.getConfig().setIdleTime(IdleStatus.BOTH_IDLE, 60);	}		/**	 * 接受到消息后,通过WS发送给用户管理系统	 */	@Override	public void messageReceived(IoSession session, Object message)			throws Exception {		String result = message.toString().trim();		String temp = result.substring(0, result.length()-16).trim();		logger.info("接受到的数据:" + result);		//验证签名		String signature = null;		String securityKey = "12345678";		try {			byte binSignature[] = NDSMD5.signPacket(temp.getBytes(), securityKey);			signature = new String(Hex.encodeHex(binSignature));		} catch (Exception e) {			e.printStackTrace();		}				String packet = temp + signature.toUpperCase().trim();				if(!result.equalsIgnoreCase(packet)){			logger.error("数字签名不对。错误指令:" + result);			return;		}		logger.info("接受到的数据:" + packet);		RecResponse res = new RecResponse();		res.setResponse(temp);		ReceptionResponseServiceStub stub = new ReceptionResponseServiceStub();		stub.recResponse(res);	}		/**	 * 连接空暇时。发送心跳包	 */	@Override	public void sessionIdle(IoSession session, IdleStatus status)			throws Exception {		if(status == IdleStatus.BOTH_IDLE){				session.write("heartbeat");		}	}}

一般我们在写socket程序时。用堵塞的方式读取消息,通常是依据消息换行符或者特殊字符,或者对方关闭流来证明一条信息读取完毕,在mina中,有默认的编解码方式。但也能够自己定义,比方以长度来推断一条消息是否读取完毕:

编码

import java.nio.charset.Charset;import org.apache.mina.core.buffer.IoBuffer;import org.apache.mina.core.session.IoSession;import org.apache.mina.filter.codec.ProtocolEncoderAdapter;import org.apache.mina.filter.codec.ProtocolEncoderOutput;/** * 编码 * @author yuanfubiao * */public class CharsetEncoder extends ProtocolEncoderAdapter{		private final static Charset charset = Charset.forName("utf-8");		@Override	public void encode(IoSession session, Object message, ProtocolEncoderOutput out)			throws Exception {				IoBuffer buff = IoBuffer.allocate(100).setAutoExpand(true);		buff.putString(message.toString(), charset.newEncoder());				buff.flip();		out.write(buff);	}}

解码

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.CumulativeProtocolDecoder;
import org.apache.mina.filter.codec.ProtocolDecoderOutput;

/**
 * 解码
 * @author yuanfubiao
 *
 */
public class CharsetDecoder extends CumulativeProtocolDecoder{
	private static Log logger = LogFactory.getLog(CharsetDecoder.class);
	@Override
	protected boolean doDecode(IoSession session, IoBuffer in,
			ProtocolDecoderOutput out) throws Exception {
		
		if(in.remaining() >= 9){ //心跳为最小传输长度
			
			byte[] headBytes = new byte[in.limit()];
			logger.info("接收到消息" + headBytes.toString());
			in.get(headBytes, 0, 9);
			String head = new String(headBytes).trim();
			if("heartbeat".equalsIgnoreCase(head)){
				return true;
			}
			
			int lenPack = Integer.parseInt(head.substring(5, 9), 16)-9;
			
			if(in.remaining() == lenPack){ //验证消息长度
				byte[] bodyBytes = new byte[in.limit()];
				in.get(bodyBytes,0,lenPack);
				String body = new String(bodyBytes);
				out.write(head.trim()+body.trim());
				return true;
			}
			in.flip();
			return false;
		}
		return false;
	}
}

源代码下载:
http://download.csdn.net/detail/stubbornpotatoes/7438435

关于mina发现一个系列好文章:http://xxgblog.com/2014/10/16/mina-netty-twisted-10/

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/115701.html原文链接:https://javaforall.net

(0)
上一篇 2022年1月31日 下午8:00
下一篇 2022年1月31日 下午8:00


相关推荐

  • C#ThreadPool.QueueUserWorkItem实例「建议收藏」

    C#ThreadPool.QueueUserWorkItem实例「建议收藏」今天学习线程池的时候发现,网上能搜到的都是很久以前的文档了,大家都是照搬过去,有没有考证都是问题。经过测试结果已经和他们说的不一样了,比如 Listactions=newList(){()=>{Console.WriteLine(“A-1”);},()=>{Conso

    2026年3月5日
    7
  • Springboot文件上传_maven上传jar包到远程仓库

    Springboot文件上传_maven上传jar包到远程仓库springboot文件上传机制:1.访问路径2. 上传完成后返回访问文件地址3. 我们只需要访问返回的地址就可以访问到图片4. yaml配置文件(localpath是实际存储的地址)5. 添加配置类,进行访问地址和存储地址映射 @Value(“${file.upload.suffixPath}”) private String uploadSuffixPath; @Value(“${file.upload.localPath}”) private Strin

    2022年8月8日
    9
  • 拉普拉斯分布

    拉普拉斯分布如果随机变量的概率密度函数分布如下图所示 那么它就是拉普拉斯分布 记为 x Laplace 其中 是位置参数 是尺度参数 如果 0 那么 正半部分恰好是尺度为 1 或者 看具体指数分布的尺度参数形式 的指数分布的一半 1 定

    2026年3月20日
    2
  • 双系统 Win10下安装Linux(单/双硬盘)

    双系统 Win10下安装Linux(单/双硬盘)双系统Win10下安装Linux(单/双硬盘)

    2022年7月24日
    37
  • java.math.BigDecimal转换double double转换java.math.BigDecimal

    java.math.BigDecimal转换double double转换java.math.BigDecimal有方法 java math BigDecimal doubleValue BigDecimala newBigDecima 1000 returna doubleValue nbsp public nbsp static nbsp void nbsp printDoubleT double nbsp v1 nbsp double nbsp v2 nbsp nbsp nbsp nbsp nbsp nbsp nbsp nbsp nbsp nbsp BigDecimal nbsp d1TobigDe nbsp

    2026年3月18日
    2
  • 矿机和普通电脑区别_计算机挖矿是什么意思

    矿机和普通电脑区别_计算机挖矿是什么意思原标题:处理的挖矿机能当家用电脑用吗?现在购买划算吗?目前市场上的矿机主要分为两类,一类是PC架构的矿机,另一类是基于ASIC芯片的专业矿机。一般PC架构的矿机可以当做家用电脑用,但cpu性能较弱,功耗较高。基于ASIC芯片的专业矿机由于没有显卡,不支持主流的桌面操作系统,,所以无法当普通电脑使用。基于PC架构的矿机可以作为普通电脑使用,但cpu性能较弱,功耗较高!一般基于pc架构的矿机,cpu…

    2022年9月29日
    4

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号