mina框架详解

mina框架详解From http blog csdn net w article details

Apache Mina Server 是一个网络通信应用框架,也就是说,它主要是对基于TCP/IP、UDP/IP协议栈的通信框架(当然,也可以提供JAVA 对象的序列化服务、虚拟机管道通信服务等),Mina 可以帮助我们快速开发高性能、高扩展性的网络通信应用,Mina 提供了事件驱动、异步(Mina 的异步IO 默认使用的是JAVA NIO 作为底层支持)操作的编程模型。Mina 主要有1.x 和2.x 两个分支,这里我们讲解最新版本2.0,如果你使用的是Mina 1.x,那么可能会有一些功能并不适用。学习本文档,需要你已掌握JAVA IO、JAVA NIO、JAVASocket、JAVA 线程及并发库(java.util.concurrent.*)的知识。Mina 同时提供了网络通信的Server 端、Client 端的封装,无论是哪端,Mina 在整个网通通信结构中都处于如下的位置:可见Mina 的API 将真正的网络通信与我们的应用程序隔离开来,你只需要关心你要发送、接收的数据以及你的业务逻辑即可。同样的,无论是哪端,Mina 的执行流程如下所示:
(1.) IoService:这个接口在一个线程上负责套接字的建立,拥有自己的Selector,监听是否有连接被建立。

(2.) IoProcessor:这个接口在另一个线程上,负责检查是否有数据在通道上读写,也就是说它也拥有自己的Selector,这是与我们使用JAVA NIO 编码时的一个不同之处,通常在JAVA NIO 编码中,我们都是使用一个Selector,也就是不区分IoService与IoProcessor 两个功能接口。另外,IoProcessor 负责调用注册在IoService 上的过滤器,并在过滤器链之后调用IoHandler。
(3.) IoFilter:这个接口定义一组拦截器,这些拦截器可以包括日志输出、黑名单过滤、数据的编码(write 方向)与解码(read 方向)等功能,其中数据的encode 与decode是最为重要的、也是你在使用Mina 时最主要关注的地方。
(4.) IoHandler:这个接口负责编写业务逻辑,也就是接收、发送数据的地方。

 


1. 简单的TCPServer:
(1.) 第一步:编写IoService
  按照上面的执行流程,我们首先需要编写IoService,IoService 本身既是服务端,又是客户端,我们这里编写服务端,所以使用IoAcceptor 实现,由于IoAcceptor 是与协议无关的,因为我们要编写TCPServer,所以我们使用IoAcceptor 的实现NioSocketAcceptor,实际上底层就是调用java.nio.channels.ServerSocketChannel 类。当然,如果你使用了Apache 的APR 库,那么你可以选择使AprSocketAcceptor 作为TCPServer 的实现,据传说Apache APR库的性能比JVM 自带的本地库高出很多。那么IoProcessor 是由指定的IoService 内部创建并调用的,我们并不需要关心。

[java] view plain copy

  1. IoAcceptor acceptor=new NioSocketAcceptor();    
  2. acceptor.getSessionConfig().setReadBufferSize(2048);    
  3. acceptor.getSessionConfig.setIdleTime(IdleStatus.BOTH_IDLE,10);    
  4. acceptor.bind(new InetSocketAddress(9123));   

这段代码我们初始化了服务端的TCP/IP 的基于NIO 的套接字,然后调用IoSessionConfig设置读取数据的缓冲区大小、读写通道均在10 秒内无任何操作就进入空闲状态。

[java] view plain copy

  1. acceptor.getFilterChain().addLast(“codec”,  new ProtocolCodecFilter(new TextLineCodecFactory(    

  2. “white-space:pre”>  Charset.forName(
    “UTF-8”),    

  3. “white-space:pre”>  LineDelimeter.WINDOWS.getValue(),    

  4. “white-space:pre”>  LineDelimiter. WINDOWS.getValue()))    
  5. );    

这段代码要在acceptor.bind()方法之前执行,因为绑定套接字之后就不能再做这些准备工作了。这里先不用清楚编解码器是如何工作的,这个是后面重点说明的内容,这里你只需要清楚,我们传输的以换行符为标识的数据,所以使用了Mina 自带的换行符编解码器工厂。

(3.) 第三步:编写IoHandler

这里我们只是简单的打印Client 传说过来的数据。

[java] view plain copy

  1. public class MyIoHandler extends IoHandlerAdapter {    
  2. // 这里我们使用的SLF4J作为日志门面,至于为什么在后面说明。    
  3. private final static Logger log = LoggerFactory    
  4. .getLogger(MyIoHandler.class);    
  5. @Override    
  6. public void messageReceived(IoSession session, Object message)    
  7. throws Exception {    
  8. String str = message.toString();    
  9. log.info(“The message received is [“ + str + “]”);    
  10. if (str.endsWith(“quit”)) {    
  11. session.close(true);    
  12. return;    
  13. }    
  14. }    
  15. }    

然后我们把这个IoHandler 注册到IoService:

[java] view plain copy

  1. acceptor.setHandler(new MyIoHandler());    

当然这段代码也要在acceptor.bind()方法之前执行。然后我们运行MyServer 中的main 方法,你可以看到控制台一直处于阻塞状态,此时,我们用telnet 127.0.0.1 9123 访问,然后输入一些内容,当按下回车键,你会发现数据在Server 端被输出,但要注意不要输入中文,因为Windows 的命令行窗口不会对传输的数据进行UTF-8 编码。当输入quit 结尾的字符串时,连接被断开。这里注意你如果使用的操作系统,或者使用的Telnet 软件的换行符是什么,如果不清楚,可以删掉第二步中的两个红色的参数,使用TextLineCodec 内部的自动识别机制。

 


2. 简单的TCPClient:
这里我们实现Mina 中的TCPClient,因为前面说过无论是Server 端还是Client 端,在Mina中的执行流程都是一样的。唯一不同的就是IoService 的Client 端实现是IoConnector。

(1.) 第一步:编写IoService并注册过滤器

[java] view plain copy

  1. public class MyClient {    
  2. main方法:    
  3. IoConnector connector=new NioSocketConnector();    
  4. connector.setConnectTimeoutMillis(30000);    
  5. connector.getFilterChain().addLast(“codec”,    
  6. new ProtocolCodecFilter(    
  7. new TextLineCodecFactory(    
  8. Charset.forName(“UTF-8”),    
  9. LineDelimiter.WINDOWS.getValue(),    
  10. LineDelimiter.WINDOWS.getValue()    
  11. )    
  12. )    
  13. );    
  14. connector.connect(new InetSocketAddress(“localhost”9123));    
  15. }    

(2.) 第三步:编写IoHandler

[java] view plain copy

  1. public class ClientHandler extends IoHandlerAdapter {    
  2. private final static Logger LOGGER = LoggerFactory    
  3. .getLogger(ClientHandler.class);    
  4. private final String values;    
  5. public ClientHandler(String values) {    
  6. this.values = values;    
  7. }    
  8. @Override    
  9. public void sessionOpened(IoSession session) {    
  10. session.write(values);    
  11. }    
  12. }    

注册IoHandler:

[java] view plain copy

  1. connector.setHandler(new ClientHandler(“你好!\r\n 大家好!”));    

 


 

3. 介绍Mina的TCP的主要接口:
通过上面的两个示例,你应该对Mina 如何编写TCP/IP 协议栈的网络通信有了一些感性的认识。
(1.)IoService:
这个接口是服务端IoAcceptor、客户端IoConnector 的抽象,提供IO 服务和管理IoSession的功能,它有如下几个常用的方法:
A. TransportMetadata getTransportMetadata():
这个方法获取传输方式的元数据描述信息,也就是底层到底基于什么的实现,譬如:nio、apr 等。
B. void addListener(IoServiceListener listener):
这个方法可以为IoService 增加一个监听器,用于监听IoService 的创建、活动、失效、空闲、销毁,具体可以参考IoServiceListener 接口中的方法,这为你参与IoService 的生命周期提供了机会。
C. void removeListener(IoServiceListener listener):
这个方法用于移除上面的方法添加的监听器。
D. void setHandler(IoHandler handler):
这个方法用于向IoService 注册IoHandler,同时有getHandler()方法获取Handler。
E. Map

getManagedSessions():

这个方法获取IoService 上管理的所有IoSession,Map 的key 是IoSession 的id。
F. IoSessionConfig getSessionConfig():
这个方法用于获取IoSession 的配置对象,通过IoSessionConfig 对象可以设置Socket 连接的一些选项。















 


 

(2.)IoAcceptor:
这个接口是TCPServer 的接口,主要增加了void bind()监听端口、void unbind()解除对套接字的监听等方法。这里与传统的JAVA 中的ServerSocket 不同的是IoAcceptor 可以多次调用bind()方法(或者在一个方法中传入多个SocketAddress 参数)同时监听多个端口。

 


 

3.)IoConnector:
这个接口是TCPClient 的接口, 主要增加了ConnectFuture connect(SocketAddressremoteAddress,SocketAddress localAddress)方法,用于与Server 端建立连接,第二个参数如果不传递则使用本地的一个随机端口访问Server 端。这个方法是异步执行的,同样的,也可以同时连接多个服务端。

 


 

(4.)IoSession:
这个接口用于表示Server 端与Client 端的连接,IoAcceptor.accept()的时候返回实例。
这个接口有如下常用的方法:
A. WriteFuture write(Object message):
这个方法用于写数据,该操作是异步的。
B. CloseFuture close(boolean immediately):
这个方法用于关闭IoSession,该操作也是异步的,参数指定true 表示立即关闭,否则就在所有的写操作都flush 之后再关闭。
C. Object setAttribute(Object key,Object value):
这个方法用于给我们向会话中添加一些属性,这样可以在会话过程中都可以使用,类似于HttpSession 的setAttrbute()方法。IoSession 内部使用同步的HashMap 存储你添加的自
定义属性。
D. SocketAddress getRemoteAddress():
这个方法获取远端连接的套接字地址。
E. void suspendWrite():
这个方法用于挂起写操作,那么有void resumeWrite()方法与之配对。对于read()方法同样适用。
F. ReadFuture read():
这个方法用于读取数据, 但默认是不能使用的, 你需要调用IoSessionConfig 的setUseReadOperation(true)才可以使用这个异步读取的方法。一般我们不会用到这个方法,因为这个方法的内部实现是将数据保存到一个BlockingQueue,假如是Server 端,因为大量的Client 端发送的数据在Server 端都这么读取,那么可能会导致内存泄漏,但对于Client,可能有的时候会比较便利。
G. IoService getService():
这个方法返回与当前会话对象关联的IoService 实例。
关于TCP连接的关闭:
无论在客户端还是服务端,IoSession 都用于表示底层的一个TCP 连接,那么你会发现无论是Server 端还是Client 端的IoSession 调用close()方法之后,TCP 连接虽然显示关闭, 但主线程仍然在运行,也就是JVM 并未退出,这是因为IoSession 的close()仅仅是关闭了TCP的连接通道,并没有关闭Server 端、Client 端的程序。你需要调用IoService 的dispose()方法停止Server 端、Client 端。


















 


(5.)IoSessionConfig:
这个方法用于指定此次会话的配置,它有如下常用的方法:
A. void setReadBufferSize(int size):

 


(6.)IoHandler:
这个接口是你编写业务逻辑的地方,从上面的示例代码可以看出,读取数据、发送数据基本都在这个接口总完成,这个实例是绑定到IoService 上的,有且只有一个实例(没有给一个IoService 注入一个IoHandler 实例会抛出异常)。它有如下几个方法:
A. void sessionCreated(IoSession session):
这个方法当一个Session 对象被创建的时候被调用。对于TCP 连接来说,连接被接受的时候调用,但要注意此时TCP 连接并未建立,此方法仅代表字面含义,也就是连接的对象IoSession 被创建完毕的时候,回调这个方法。对于UDP 来说,当有数据包收到的时候回调这个方法,因为UDP 是无连接的。
B. void sessionOpened(IoSession session):
这个方法在连接被打开时调用,它总是在sessionCreated()方法之后被调用。对于TCP 来说,它是在连接被建立之后调用,你可以在这里执行一些认证操作、发送数据等。对于UDP 来说,这个方法与sessionCreated()没什么区别,但是紧跟其后执行。如果你每隔一段时间,发送一些数据,那么sessionCreated()方法只会在第一次调用,但是sessionOpened()方法每次都会调用。
C. void sessionClosed(IoSession session) :
对于TCP 来说,连接被关闭时,调用这个方法。对于UDP 来说,IoSession 的close()方法被调用时才会毁掉这个方法。
D. void sessionIdle(IoSession session, IdleStatus status) :
这个方法在IoSession 的通道进入空闲状态时调用,对于UDP 协议来说,这个方法始终不会被调用。
E. void exceptionCaught(IoSession session, Throwable cause) :
这个方法在你的程序、Mina 自身出现异常时回调,一般这里是关闭IoSession。










 

 

 

 


(7.)IoBuffer:
这个接口是对JAVA NIO 的ByteBuffer 的封装,这主要是因为ByteBuffer 只提供了对基本数据类型的读写操作,没有提供对字符串等对象类型的读写方法,使用起来更为方便,另外,ByteBuffer 是定长的,如果想要可变,将很麻烦。IoBuffer 的可变长度的实现类似于StringBuffer。IoBuffer 与ByteBuffer 一样,都是非线程安全的。本节的一些内容如果不清楚,可以参考java.nio.ByteBuffer 接口。这个接口有如下常用的方法:
A. static IoBuffer allocate(int capacity,boolean useDirectBuffer):
这个方法内部通过SimpleBufferAllocator 创建一个实例,第一个参数指定初始化容量,第二个参数指定使用直接缓冲区还是JAVA 内存堆的缓存区,默认为false。
B. void free():
释放缓冲区,以便被一些IoBufferAllocator 的实现重用,一般没有必要调用这个方法,除非你想提升性能(但可能未必效果明显)。
C. IoBuffer setAutoExpand(boolean autoExpand):
这个方法设置IoBuffer 为自动扩展容量,也就是前面所说的长度可变,那么可以看出长度可变这个特性默认是不开启的。
D. IoBuffer setAutoShrink(boolean autoShrink):
这个方法设置IoBuffer 为自动收缩,这样在compact()方法调用之后,可以裁减掉一些没有使用的空间。如果这个方法没有被调用或者设置为false,你也可以通过调用shrink()方法手动收缩空间。








 


(8.)IoFuture:
在Mina 的很多操作中,你会看到返回值是XXXFuture,实际上他们都是IoFuture 的子类,看到这样的返回值,这个方法就说明是异步执行的,主要的子类有ConnectFuture、CloseFuture 、ReadFuture 、WriteFuture 。这个接口的大部分操作都和
java.util.concurrent.Future 接口是类似的,譬如:await()、awaitUninterruptibly()等,一般我们常用awaitUninterruptibly()方法可以等待异步执行的结果返回。这个接口有如下常用的方法:
A. IoFuture addListener(IoFutureListener
listener):
这个方法用于添加一个监听器, 在异步执行的结果返回时监听器中的回调方法operationComplete(IoFuture future),也就是说,这是替代awaitUninterruptibly()方法另一种等待异步执行结果的方法,它的好处是不会产生阻塞。
B. IoFuture removeListener(IoFutureListener
listener):
这个方法用于移除指定的监听器。
C. IoSession getSession():
这个方法返回当前的IoSession。举个例子,我们在客户端调用connect()方法访问Server 端的时候,实际上这就是一个异步执行的方法,也就是调用connect()方法之后立即返回,执行下面的代码,而不管是否连







接成功。那么如果我想在连接成功之后执行一些事情(譬如:获取连接成功后的IoSession对象),该怎么办呢?按照上面的说明,你有如下两种办法:

第一种:

[java] view plain copy

  1. ConnectFuture future = connector.connect(new InetSocketAddress(    
  2. HOSTNAME, PORT));    
  3. // 等待是否连接成功,相当于是转异步执行为同步执行。    
  4. future.awaitUninterruptibly();    
  5. // 连接成功后获取会话对象。如果没有上面的等待,由于connect()方法是异步的,session    
  6. 可能会无法获取。    
  7. session = future.getSession();    

第二种:

[java] view plain copy

  1. ConnectFuture future = connector.connect(new InetSocketAddress(    
  2. HOSTNAME, PORT));    
  3. future.addListener(new IoFutureListener

    () {    
  4. @Override    
  5. public void operationComplete(ConnectFuture future) {    
  6. try {    
  7. Thread.sleep(5000);    
  8. catch (InterruptedException e) {    
  9. e.printStackTrace();    
  10. }    
  11. IoSession session = future.getSession();    
  12. System.out.println(“++++++++++++++++++++++++++++”);    
  13. }    
  14. });    
  15. System.out.println(“*”);    

为了更好的看清楚使用监听器是异步的,而不是像awaitUninterruptibly()那样会阻塞主线程的执行,我们在回调方法中暂停5 秒钟,然后输出+++,在最后输出*。我们执行代码之后,你会发现首先输出*(这证明了监听器是异步执行的),然后IoSession 对象Created,系统暂停5 秒,然后输出+++,最后IoSession 对象Opened,也就是TCP 连接建立。

 


4.日志配置:
前面的示例代码中提到了使用SLF4J 作为日志门面,这是因为Mina 内部使用的就是SLF4J,你也使用SLF4J 可以与之保持一致性。Mina 如果想启用日志跟踪Mina 的运行细节,你可以配置LoggingFilter 过滤器,这样你可
以看到Session 建立、打开、空闲等一系列细节在日志中输出,默认SJF4J 是按照DEBUG级别输出跟踪信息的,如果你想给某一类别的Mina 运行信息输出指定日志输出级别,可以调用LoggingFilter 的setXXXLogLevel(LogLevel.XXX)。

例:

[java] view plain copy

  1. LoggingFilter lf = new LoggingFilter();    
  2. lf.setSessionOpenedLogLevel(LogLevel.ERROR);    
  3. acceptor.getFilterChain().addLast(“logger”, lf);    

这里IoSession 被打开的跟踪信息将以ERROR 级别输出到日志。

 


5.过滤器:
前面我们看到了LoggingFilter、ProtocolCodecFilter 两个过滤器,一个负责日志输出,一个负责数据的编解码,通过最前面的Mina 执行流程图,在IoProcessor 与IoHandler 之间可以有很多的过滤器,这种设计方式为你提供可插拔似的扩展功能提供了非常便利的方式,目前的Apache CXF、Apache Struts2 中的拦截器也都是一样的设计思路。Mina 中的IoFilter 是单例的,这与CXF、Apache Struts2 没什么区别。IoService 实例上会绑定一个DefaultIoFilterChainBuilder 实例,DefaultIoFilterChainBuilder 会把使用内部的EntryImpl 类把所有的过滤器按照顺序连在一起,组成一个过滤器链。
DefaultIoFilterChainBuilder 类如下常用的方法:
A. void addFirst(String name,IoFilter filter):
这个方法把过滤器添加到过滤器链的头部,头部就是IoProcessor 之后的第一个过滤器。同样的addLast()方法把过滤器添加到过滤器链的尾部。
B. void addBefore(String baseName,String name,IoFilter filter):
这个方法将过滤器添加到baseName 指定的过滤器的前面,同样的addAfter()方法把过滤器添加到baseName 指定的过滤器的后面。这里要注意无论是那种添加方法,每个过滤器的名字(参数name)必须是唯一的。
C. IoFilter remove(Stirng name):
这个方法移除指定名称的过滤器,你也可以调用另一个重载的remove()方法,指定要移除的IoFilter 的类型。
D. List

getAll():

这个方法返回当前IoService 上注册的所有过滤器。默认情况下,过滤器链中是空的,也就是getAll()方法返回长度为0 的List,但实际Mina内部有两个隐藏的过滤器:HeadFilter、TailFilter,分别在List 的最开始和最末端,很明显,TailFilter 在最末端是为了调用过滤器链之后,调用IoHandler。但这两个过滤器对你来说是透明的,可以忽略它们的存在。编写一个过滤器很简单,你需要实现IoFilter 接口,如果你只关注某几个方法,可以继承IoFilterAdapter 适配器类。IoFilter 接口中主要包含两类方法,一类是与IoHandler 中的方法名一致的方法,相当于拦截IoHandler 中的方法,另一类是IoFilter 的生命周期回调方法,这些回调方法的执行顺序和解释如下所示:










[java] view plain copy

  1. public class MyIoFilter implements IoFilter {    
  2. @Override    
  3. public void destroy() throws Exception {    
  4. System.out.println(“%%%%%%%%%%%%%%%%%%%%%%%%%%�stroy”);    
  5. }    
  6. @Override    
  7. public void exceptionCaught(NextFilter nextFilter, IoSession    
  8. session,    
  9. Throwable cause) throws Exception {    
  10. System.out.println(“%%%%%%%%%%%%%%%%%%%%%%%%%%%exceptionCaught”);    
  11. nextFilter.exceptionCaught(session, cause);    
  12. }    
  13. @Override    
  14. public void filterClose(NextFilter nextFilter, IoSession session)    
  15. throws Exception {    
  16. System.out.println(“%%%%%%%%%%%%%%%%%%%%%%%%%%%filterClose”);    
  17. nextFilter.filterClose(session);    
  18. }    
  19. @Override    
  20. public void filterWrite(NextFilter nextFilter, IoSession session,    
  21. WriteRequest writeRequest) throws Exception {    
  22. System.out.println(“%%%%%%%%%%%%%%%%%%%%%%%%%%%filterWrite”);    
  23. nextFilter.filterWrite(session, writeRequest);    
  24. }    
  25. @Override    
  26. public void init() throws Exception {    
  27. System.out.println(“%%%%%%%%%%%%%%%%%%%%%%%%%%%init”);    
  28. }    
  29. @Override    
  30. public void messageReceived(NextFilter nextFilter, IoSession    
  31. session,    
  32. Object message) throws Exception {    
  33. System.out.println(“%%%%%%%%%%%%%%%%%%%%%%%%%%%messageReceived”);    
  34. nextFilter.messageReceived(session, message);    
  35. }    
  36. @Override    
  37. public void messageSent(NextFilter nextFilter, IoSession session,    
  38. WriteRequest writeRequest) throws Exception {    
  39. System.out.println(“%%%%%%%%%%%%%%%%%%%%%%%%%%%messageSent”);    
  40. nextFilter.messageSent(session, writeRequest);    
  41. }    
  42. @Override    
  43. public void onPostAdd(IoFilterChain parent, String name,    
  44. NextFilter nextFilter) throws Exception {    
  45. System.out.println(“%%%%%%%%%%%%%%%%%%%%%%%%%%%onPostAdd”);    
  46. }    
  47. @Override    
  48. public void onPostRemove(IoFilterChain parent, String name,    
  49. NextFilter nextFilter) throws Exception {    
  50. System.out.println(“%%%%%%%%%%%%%%%%%%%%%%%%%%%onPostRemove”);    
  51. }    
  52. @Override    
  53. public void onPreAdd(IoFilterChain parent, String name,    
  54. NextFilter nextFilter) throws Exception {    
  55. System.out.println(“%%%%%%%%%%%%%%%%%%%%%%%%%%%onPreAdd”);    
  56. }    
  57. @Override    
  58. public void onPreRemove(IoFilterChain parent, String name,    
  59. NextFilter nextFilter) throws Exception {    
  60. System.out.println(“%%%%%%%%%%%%%%%%%%%%%%%%%%%onPreRemove”);    
  61. }    
  62.      
  63. @Override    
  64. public void sessionClosed(NextFilter nextFilter, IoSession session)    
  65. throws Exception {    
  66. System.out.println(“%%%%%%%%%%%%%%%%%%%%%%%%%%%sessionClosed”);    
  67. nextFilter.sessionClosed(session);    
  68. }    
  69. @Override    
  70. public void sessionCreated(NextFilter nextFilter, IoSession session)    
  71. throws Exception {    
  72. System.out.println(“%%%%%%%%%%%%%%%%%%%%%%%%%%%sessionCreated”);    
  73. nextFilter.sessionCreated(session);    
  74. }    
  75. @Override    
  76. public void sessionIdle(NextFilter nextFilter, IoSession session,    
  77. IdleStatus status) throws Exception {    
  78. System.out.println(“%%%%%%%%%%%%%%%%%%%%%%%%%%%sessionIdle”);    
  79. nextFilter.sessionIdle(session, status);    
  80. }    
  81. @Override    
  82. public void sessionOpened(NextFilter nextFilter, IoSession session)    
  83. throws Exception {    
  84. System.out.println(“%%%%%%%%%%%%%%%%%%%%%%%%%%%sessionOpened”);    
  85. nextFilter.sessionOpened(session);    
  86. }    
  87. }    

我们将这个拦截器注册到上面的TCPServer 的IoAcceptor 的过滤器链中的最后一个:

[java] view plain copy

  1. acceptor.getFilterChain().addLast(“myIoFilter”,    
  2. new ReferenceCountingFilter(new MyIoFilter()));    

 


6.协议编解码器:
前面说过,协议编解码器是在使用Mina 的时候你最需要关注的对象,因为在网络传输的数据都是二进制数据(byte),而你在程序中面向的是JAVA 对象,这就需要你实现在发送数据时将JAVA 对象编码二进制数据,而接收数据时将二进制数据解码为JAVA 对象(这个可不是JAVA 对象的序列化、反序列化那么简单的事情)。Mina 中的协议编解码器通过过滤器ProtocolCodecFilter 构造,这个过滤器的构造方法需要一个ProtocolCodecFactory,这从前面注册TextLineCodecFactory 的代码就可以看出来。
ProtocolCodecFactory 中有如下两个方法:
public interface ProtocolCodecFactory {

ProtocolEncoder getEncoder(IoSession session) throws Exception;
ProtocolDecoder getDecoder(IoSession session) throws Exception;
}
因此,构建一个ProtocolCodecFactory 需要ProtocolEncoder、ProtocolDecoder 两个实例。你可能要问JAVA 对象和二进制数据之间如何转换呢?这个要依据具体的通信协议,也就是Server 端要和Client 端约定网络传输的数据是什么样的格式,譬如:第一个字节表示数据长度,第二个字节是数据类型,后面的就是真正的数据(有可能是文字、有可能是图片等等),然后你可以依据长度从第三个字节向后读,直到读取到指定第一个字节指定长度的数据。
简单的说,HTTP 协议就是一种浏览器与Web 服务器之间约定好的通信协议,双方按照指定的协议编解码数据。我们再直观一点儿说,前面一直使用的TextLine 编解码器就是在读取网络上传递过来的数据时,只要发现哪个字节里存放的是ASCII 的10、13 字符(/r、/n),就认为之前的字节就是一个字符串(默认使用UTF-8 编码)。以上所说的就是各种协议实际上就是网络七层结构中的应用层协议,它位于网络层(IP)、传输层(TCP)之上,Mina 的协议编解码器就是让你实现一套自己的应用层协议栈。








(6-1.)简单的编解码器示例:
下面我们举一个模拟电信运营商短信协议的编解码器实现,假设通信协议如下所示:
M sip:wap.fetion.com.cn SIP-C/2.0
S: xxxx
R: xxxx



 

[java] view plain copy

  1. public class SmsObject {    
  2. private String sender;// 短信发送者    
  3. private String receiver;// 短信接受者    
  4. private String message;// 短信内容    
  5. public String getSender() {    
  6. return sender;    
  7. }    
  8. public void setSender(String sender) {    
  9. this.sender = sender;    
  10. }    
  11. public String getReceiver() {    
  12. return receiver;    
  13. }    
  14. public void setReceiver(String receiver) {    
  15. this.receiver = receiver;    
  16. }    
  17. public String getMessage() {    
  18. return message;    
  19. }    
  20. public void setMessage(String message) {    
  21. this.message = message;    
  22. }    
  23. }    

第二步,编码器:
在Mina 中编写编码器可以实现ProtocolEncoder,其中有encode()、dispose()两个方法需要实现。这里的dispose()方法用于在销毁编码器时释放关联的资源,由于这个方法一般我们并不关心,所以通常我们直接继承适配器ProtocolEncoderAdapter。

[java] view plain copy

  1. public class CmccSipcEncoder extends ProtocolEncoderAdapter {    
  2. private final Charset charset;    
  3. public CmccSipcEncoder(Charset charset) {    
  4. this.charset = charset;    
  5. }    
  6. @Override    
  7. public void encode(IoSession session, Object message,    
  8. ProtocolEncoderOutput out) throws Exception {    
  9. SmsObject sms = (SmsObject) message;    
  10. CharsetEncoder ce = charset.newEncoder();    
  11. IoBuffer buffer = IoBuffer.allocate(100).setAutoExpand(true);    
  12. String statusLine = “M sip:wap.fetion.com.cn SIP-C/2.0”;    
  13. String sender = sms.getSender();    
  14. String receiver = sms.getReceiver();    
  15. String smsContent = sms.getMessage();    
  16. buffer.putString(statusLine + ‘/n’, ce);    
  17. buffer.putString(“S: “ + sender + ‘/n’, ce);    
  18. buffer.putString(“R: “ + receiver + ‘/n’, ce);    
  19. buffer    
  20. .putString(“L: “ + (smsContent.getBytes(charset).length)    
  21. “/n”,    
  22. ce);    
  23. buffer.putString(smsContent, ce);    
  24. buffer.flip();    
  25. out.write(buffer);    
  26. }    
  27. }    

B. 当你的doDecode()方法返回false 时,CumulativeProtocolDecoder 会停止对doDecode()方法的调用,但此时如果本次数据还有未读取完的,就将含有剩余数据的IoBuffer 缓冲区保存到IoSession 中,以便下一次数据到来时可以从IoSession 中提取合并。如果发现本次数据全都读取完毕,则清空IoBuffer 缓冲区。简而言之,当你认为读取到的数据已经够解码了,那么就返回true,否则就返回false。这个CumulativeProtocolDecoder 其实最重要的工作就是帮你完成了数据的累积,因为这个工作是很烦琐的。

[java] view plain copy

  1. public class CmccSipcDecoder extends CumulativeProtocolDecoder {    
  2. private final Charset charset;    
  3. public CmccSipcDecoder(Charset charset) {    
  4. this.charset = charset;    
  5. }    
  6. @Override    
  7. protected boolean doDecode(IoSession session, IoBuffer in,    
  8. ProtocolDecoderOutput out) throws Exception {    
  9. IoBuffer buffer = IoBuffer.allocate(100).setAutoExpand(true);    
  10. CharsetDecoder cd = charset.newDecoder();    
  11. int matchCount = 0;    
  12. String statusLine = “”, sender = “”, receiver = “”, length = “”,    
  13. sms = “”;    
  14. int i = 1;    
  15. while (in.hasRemaining()) {    
  16. byte b = in.get();    
  17. buffer.put(b);    
  18. if (b == 10 && i < 5) {    
  19. matchCount++;    
  20. if (i == 1) {    
  21. buffer.flip();    
  22. statusLine = buffer.getString(matchCount, cd);    
  23. statusLine = statusLine.substring(0,    
  24. statusLine.length() – 1);    
  25. matchCount = 0;    
  26. buffer.clear();    
  27. }    
  28. if (i == 2) {    
  29. buffer.flip();    
  30. sender = buffer.getString(matchCount, cd);    
  31. sender = sender.substring(0, sender.length() –1);    
  32. matchCount = 0;    
  33. buffer.clear();    
  34. }    
  35. if (i == 3) {    
  36. buffer.flip();    
  37. receiver = buffer.getString(matchCount, cd);    
  38. receiver = receiver.substring(0, receiver.length()    
  39. 1);    
  40. matchCount = 0;    
  41. buffer.clear();    
  42. }    
  43. if (i == 4) {    
  44. buffer.flip();    
  45. length = buffer.getString(matchCount, cd);    
  46. length = length.substring(0, length.length() –1);    
  47. matchCount = 0;    
  48. buffer.clear();    
  49. }    
  50. i++;    
  51. else if (i == 5) {    
  52. matchCount++;    
  53. if (matchCount == Long.parseLong(length.split(“: “)[1]))    
  54. {    
  55. buffer.flip();    
  56. sms = buffer.getString(matchCount, cd);    
  57. i++;    
  58. break;    
  59. }    
  60. else {    
  61. matchCount++;    
  62. }    
  63. }    
  64. SmsObject smsObject = new SmsObject();    
  65. smsObject.setSender(sender.split(“: “)[1]);    
  66. smsObject.setReceiver(receiver.split(“: “)[1]);    
  67. smsObject.setMessage(sms);    
  68. out.write(smsObject);    
  69. return false;    
  70. }    
  71. }    

我们的这个短信协议解码器使用/n(ASCII 的10 字符)作为分解点,一个字节一个字节的读取,那么第一次发现/n 的字节位置之前的部分,必然就是短信协议的状态行,依次类推,你就可以解析出来发送者、接受者、短信内容长度。然后我们在解析短信内容时,使用获取到的长度进行读取。全部读取完毕之后, 然后构造SmsObject 短信对象, 使用ProtocolDecoderOutput 的write()方法输出,最后返回false,也就是本次数据全部读取完毕,告知CumulativeProtocolDecoder 在本次数据读取中不需要再调用doDecode()方法了。这里需要注意的是两个状态变量i、matchCount,i 用于记录解析到了短信协议中的哪一行(/n),matchCount 记录在当前行中读取到了哪一个字节。状态变量在解码器中经常被使用,我们这里的情况比较简单,因为我们假定短信发送是在一次数据发送中完成的,所以状态变量的使用也比较简单。假如数据的发送被拆成了多次(譬如:短信协议的短信内容、消息报头被拆成了两次数据发送),那么上面的代码势必就会存在问题,因为当第二次调用doDecode()方法时,状态变量i、matchCount 势必会被重置,也就是原来的状态值并没有被保存。那么我们如何解决状态保存的问题呢?答案就是将状态变量保存在IoSession 中或者是Decoder 实例自身,但推荐使用前者,因为虽然Decoder 是单例的,其中的实例变量保存的状态在Decoder 实例销毁前始终保持,但Mina 并不保证每次调用doDecode()方法时都是同一个线程(这也就是说第一次调用doDecode()是IoProcessor-1 线程,第二次有可能就是IoProcessor-2 线程),这就会产生多线程中的实例变量的可视性(Visibility,具体请参考JAVA 的多线程知识)问题。IoSession中使用一个同步的HashMap 保存对象,所以你不需要担心多线程带来的问题。使用IoSession 保存解码器的状态变量通常的写法如下所示:
A. 在解码器中定义私有的内部类Context,然后将需要保存的状态变量定义在Context 中存储。
B. 在解码器中定义方法获取这个Context 的实例,这个方法的实现要优先从IoSession 中获取Context。
具体代码示例如下所示:
// 上下文作为保存状态的内部类的名字,意思很明显,就是让状态跟随上下文,在整个调用过程中都可以被保持。

[java] view plain copy

  1. public class XXXDecoder extends CumulativeProtocolDecoder{    
  2. private final AttributeKey CONTEXT =    
  3. new AttributeKey(getClass(), “context” );    
  4. public Context getContext(IoSession session){    
  5. Context ctx=(Context)session.getAttribute(CONTEXT);    
  6. if(ctx==null){    
  7. ctx=new Context();    
  8. session.setAttribute(CONTEXT,ctx);    
  9. }    
  10. }    
  11. private class Context {    
  12. //状态变量    
  13. }    
  14. }    

注意这里我们使用了Mina 自带的AttributeKey 类来定义保存在IoSession 中的对象的键值,这样可以有效的防止键值重复。另外,要注意在全部处理完毕之后,状态要复位,譬如:聊天室中的一条消息读取完毕之后,状态变量要变为初始值,以便下次处理时重新使用。
第四步,编解码工厂:

[java] view plain copy

  1. public class CmccSipcCodecFactory implements ProtocolCodecFactory {    
  2. private final CmccSipcEncoder encoder;    
  3. private final CmccSipcDecoder decoder;    
  4. public CmccSipcCodecFactory() {    
  5. this(Charset.defaultCharset());    
  6. }    
  7. public CmccSipcCodecFactory(Charset charSet) {    
  8. this.encoder = new CmccSipcEncoder(charSet);    
  9. this.decoder = new CmccSipcDecoder(charSet);    
  10. }    
  11. @Override    
  12. public ProtocolDecoder getDecoder(IoSession session) throws    
  13. Exception {    
  14. return decoder;    
  15. }    
  16. @Override    
  17. public ProtocolEncoder getEncoder(IoSession session) throws    
  18. Exception {    
  19. return encoder;    
  20. }    
  21. }    

实际上这个工厂类就是包装了编码器、解码器,通过接口中的getEncoder()、getDecoder()方法向ProtocolCodecFilter 过滤器返回编解码器实例,以便在过滤器中对数据进行编解码处理。
第五步,运行示例:
下面我们修改最一开始的示例中的MyServer、MyClient 的代码,如下所示:

[java] view plain copy

  1. acceptor.getFilterChain().addLast(    
  2. “codec”,    
  3. new ProtocolCodecFilter(new CmccSipcCodecFactory(Charset    
  4. .forName(“UTF-8”))));    
  5. connector.getFilterChain().addLast(    
  6. “codec”,    
  7. new ProtocolCodecFilter(new    
  8. CmccSipcCodecFactory(    
  9. Charset.forName(“UTF-8”))));    
  10. 然后我们在ClientHandler 中发送一条短信:    
  11. public void sessionOpened(IoSession session) {    
  12. SmsObject sms = new SmsObject();    
  13. sms.setSender(“”);    
  14. sms.setReceiver(“”);    
  15. sms.setMessage(“你好!Hello World!”);    
  16. session.write(sms);    
  17. }    

最后我们在MyIoHandler 中接收这条短信息:

[java] view plain copy

  1. public void messageReceived(IoSession session, Object message)    
  2. throws Exception {    
  3. SmsObject sms = (SmsObject) message;    
  4. log.info(“The message received is [“ + sms.getMessage() + “]”);    
  5. }    

 


CmccSispcDecoder 类改为如下的写法:

[java] view plain copy

  1. public class CmccSipcDecoder extends CumulativeProtocolDecoder {    
  2. private final Charset charset;    
  3. private final AttributeKey CONTEXT = new AttributeKey(getClass(),    
  4. “context”);    
  5. public CmccSipcDecoder(Charset charset) {    
  6. this.charset = charset;    
  7. }    
  8. @Override    
  9. protected boolean doDecode(IoSession session, IoBuffer in,    
  10. ProtocolDecoderOutput out) throws Exception {    
  11. Context ctx = getContext(session);    
  12. CharsetDecoder cd = charset.newDecoder();    
  13. int matchCount = ctx.getMatchCount();    
  14. int line = ctx.getLine();    
  15. IoBuffer buffer = ctx.innerBuffer;    
  16. String statusLine = ctx.getStatusLine(),    
  17. sender = ctx.getSender(),    
  18. receiver = ctx.getReceiver(),    
  19. length = ctx.getLength(),    
  20. sms = ctx.getSms();    
  21. while (in.hasRemaining()) {    
  22. byte b = in.get();    
  23. matchCount++;    
  24. buffer.put(b);    
  25. if (line < 4 && b == 10) {    
  26. if (line == 0) {    
  27. buffer.flip();    
  28. statusLine = buffer.getString(matchCount, cd);    
  29. statusLine = statusLine.substring(0,    
  30. statusLine.length() – 1);    
  31. matchCount = 0;    
  32. buffer.clear();    
  33. ctx.setStatusLine(statusLine);    
  34. }    
  35. if (line == 1) {    
  36. buffer.flip();    
  37. sender = buffer.getString(matchCount, cd);    
  38. sender = sender.substring(0, sender.length() – 1);    
  39. matchCount = 0;    
  40. buffer.clear();    
  41. ctx.setSender(sender);    
  42. }    
  43. if (line == 2) {    
  44. buffer.flip();    
  45. receiver = buffer.getString(matchCount, cd);    
  46. receiver = receiver.substring(0, receiver.length() –    
  47. 1);    
  48. matchCount = 0;    
  49. buffer.clear();    
  50. ctx.setReceiver(receiver);    
  51. }    
  52. if (line == 3) {    
  53. buffer.flip();    
  54. length = buffer.getString(matchCount, cd);    
  55. length = length.substring(0, length.length() – 1);    
  56. matchCount = 0;    
  57. buffer.clear();    
  58. ctx.setLength(length);    
  59. }    
  60. line++;    
  61. else if (line == 4) {    
  62. if (matchCount == Long.parseLong(length.split(“: “)[1]))    
  63. {    
  64. buffer.flip();    
  65. sms = buffer.getString(matchCount, cd);    
  66. ctx.setSms(sms);    
  67. // 由于下面的break,这里需要调用else外面的两行代码    
  68. ctx.setMatchCount(matchCount);    
  69. ctx.setLine(line);    
  70. break;    
  71. }    
  72. }    
  73. ctx.setMatchCount(matchCount);    
  74. ctx.setLine(line);    
  75. }    
  76. if (ctx.getLine() == 4    
  77. && Long.parseLong(ctx.getLength().split(“: “)[1]) == ctx    
  78. .getMatchCount()) {    
  79. SmsObject smsObject = new SmsObject();    
  80. smsObject.setSender(sender.split(“: “)[1]);    
  81. smsObject.setReceiver(receiver.split(“: “)[1]);    
  82. smsObject.setMessage(sms);    
  83. out.write(smsObject);    
  84. ctx.reset();    
  85. return true;    
  86. else {    
  87. return false;    
  88. }    
  89. }    
  90. private Context getContext(IoSession session) {    
  91. Context context = (Context) session.getAttribute(CONTEXT);    
  92. if (context == null){    
  93. context = new Context();    
  94. session.setAttribute(CONTEXT, context);    
  95. }    
  96. return context;    
  97. }    
  98. private class Context {    
  99. private final IoBuffer innerBuffer;    
  100. private String statusLine = “”;    
  101. private String sender = “”;    
  102. private String receiver = “”;    
  103. private String length = “”;    
  104. private String sms = “”;    
  105. public Context() {    
  106. innerBuffer = IoBuffer.allocate(100).setAutoExpand(true);    
  107. }    
  108. private int matchCount = 0;    
  109. private int line = 0;    
  110. public int getMatchCount() {    
  111. return matchCount;    
  112. }    
  113. public void setMatchCount(int matchCount) {    
  114. this.matchCount = matchCount;    
  115. }    
  116. public int getLine() {    
  117. return line;    
  118. }    
  119. public void setLine(int line) {    
  120. this.line = line;    
  121. }    
  122. public String getStatusLine() {    
  123. return statusLine;    
  124. }    
  125. public void setStatusLine(String statusLine) {    
  126. this.statusLine = statusLine;    
  127. }    
  128. public String getSender() {    
  129. return sender;    
  130. }    
  131. public void setSender(String sender) {    
  132. this.sender = sender;    
  133. }    
  134. public String getReceiver() {    
  135. return receiver;    
  136. }    
  137. public void setReceiver(String receiver) {    
  138. this.receiver = receiver;    
  139. }    
  140. public String getLength() {    
  141. return length;    
  142. }    
  143. public void setLength(String length) {    
  144. this.length = length;    
  145. }    
  146. public String getSms() {    
  147. return sms;    
  148. }    
  149. public void setSms(String sms) {    
  150. this.sms = sms;    
  151. }    
  152. public void reset() {    
  153. this.innerBuffer.clear();    
  154. this.matchCount = 0;    
  155. this.line = 0;    
  156. this.statusLine = “”;    
  157. this.sender = “”;    
  158. this.receiver = “”;    
  159. this.length = “”;    
  160. this.sms = “”;    
  161. }    
  162. }    
  163. }    

这里我们做了如下的几步操作:
(1.) 所有记录状态的变量移到了Context 内部类中,包括记录读到短信协议的哪一行的line。每一行读取了多少个字节的matchCount,还有记录解析好的状态行、发送者、接受者、短信内容、累积数据的innerBuffer 等。这样就可以在数据不能完全解码,等待下一次doDecode()方法的调用时,还能承接上一次调用的数据。
(2.) 在 doDecode()方法中主要的变化是各种状态变量首先是从Context 中获取,然后操作之后,将最新的值setXXX()到Context 中保存。
(3.) 这里注意doDecode()方法最后的判断,当认为不够解码为一条短信息时,返回false,也就是在本次数据流解码中不要再调用doDecode()方法;当认为已经解码出一条短信息时,输出短消息,然后重置所有的状态变量,返回true,也就是如果本次数据流解码中还有没解码完的数据,继续调用doDecode()方法。下面我们对客户端稍加改造,来模拟上面的红、蓝、绿三次发送聊天短信息的情况:
MyClient:

[java] view plain copy

  1. ConnectFuture future = connector.connect(new InetSocketAddress(    
  2. HOSTNAME, PORT));    
  3. future.awaitUninterruptibly();    
  4. session = future.getSession();    
  5. for (int i = 0; i < 3; i++) {    
  6. SmsObject sms = new SmsObject();    
  7. session.write(sms);    
  8. System.out.println(“” + i);    
  9. }    

这里我们为了方便演示,不在IoHandler 中发送消息,而是直接在MyClient 中发送,你要注意的是三次发送都要使用同一个IoSession,否则就不是从同一个通道发送过去的了。
CmccSipcEncoder:

[java] view plain copy

  1. public void encode(IoSession session, Object message,    
  2. ProtocolEncoderOutput out) throws Exception {    
  3. SmsObject sms = (SmsObject) message;    
  4. CharsetEncoder ce = charset.newEncoder();    
  5. String statusLine = “M sip:wap.fetion.com.cn SIP-C/2.0”;    
  6. String sender = “”;    
  7. String receiver = “”;    
  8. String smsContent = “你好!Hello World!”;    
  9. IoBuffer buffer = IoBuffer.allocate(100).setAutoExpand(true);    
  10. buffer.putString(statusLine + ‘/n’, ce);    
  11. buffer.putString(“S: “ + sender + ‘/n’, ce);    
  12. buffer.putString(“R: “ + receiver + ‘/n’, ce);    
  13. buffer.flip();    
  14. out.write(buffer);    
  15. IoBuffer buffer2 = IoBuffer.allocate(100).setAutoExpand(true);    
  16. buffer2.putString(“L: “ + (smsContent.getBytes(charset).length)    
  17. “/n”,ce);    
  18. buffer2.putString(smsContent, ce);    
  19. buffer2.putString(statusLine + ‘/n’, ce);    
  20. buffer2.flip();    
  21. out.write(buffer2);    
  22. IoBuffer buffer3 = IoBuffer.allocate(100).setAutoExpand(true);    
  23. buffer3.putString(“S: “ + sender + ‘/n’, ce);    
  24. buffer3.putString(“R: “ + receiver + ‘/n’, ce);    
  25. buffer3.putString(“L: “ + (smsContent.getBytes(charset).length)    
  26. “/n”,ce);    
  27. buffer3.putString(smsContent, ce);    
  28. buffer3.putString(statusLine + ‘/n’, ce);    
  29. buffer3.flip();    
  30. out.write(buffer3);    
  31. }    

上面的这段代码要配合MyClient来操作,你需要做的是在MyClient中的红色输出语句处设置断点,然后第一调用时CmccSipcEncoder中注释掉蓝、绿色的代码,也就是发送两条短信息的第一部分(红色的代码),依次类推,也就是MyClient的中的三次断点中,分别执行CmccSipcEncoder中的红、蓝、绿三段代码,也就是模拟两条短信的三段发送。你会看到Server端的运行结果是:当MyClient第一次到达断点时,没有短信息被读取到,当MyClient第二次到达断点时,第一条短信息输出,当MyClient第三次到达断点时,第二条短信息输出。

 


(6-3.)多路分离的解码器:
假设一段数据发送过来之后,需要根据某种条件决定使用哪个解码器,而不是像上面的例子,固定使用一个解码器,那么该如何做呢?幸好Mina 提供了org.apache.mina.filter.codec.demux 包来完成这种多路分离(Demultiplexes)的解码工作,也就是同时注册多个解码器,然后运行时依据传入的数据决定到底使用哪个解码器来工作。所谓多路分离就是依据条件分发到指定的解码器,譬如:上面的短信协议进行扩展,可以依据状态行来判断使用1.0 版本的短信协议解码器还是2.0版本的短信协议解码器。
下面我们使用一个简单的例子,说明这个多路分离的解码器是如何使用的,需求如下所示:
(1.) 客户端传入两个int 类型的数字,还有一个char 类型的符号。
(2.) 如果符号是+,服务端就是用1 号解码器,对两个数字相加,然后把结果返回给客户端。
(3.) 如果符号是-,服务端就使用2 号解码器,将两个数字变为相反数,然后相加,把结果返回给客户端。
Demux 开发编解码器主要有如下几个步骤:
A. 定义Client 端、Server 端发送、接收的数据对象。
B. 使用Demux 编写编码器是实现MessageEncoder

接口,T 是你要编码的数据对象,这个MessageEncoder 会在DemuxingProtocolEncoder 中调用。

C. 使用Demux 编写编码器是实现MessageDecoder 接口,这个MessageDecoder 会在DemuxingProtocolDecoder 中调用。









[java] view plain copy

  1. public interface MessageEncoder

     {    
  2. void encode(IoSession session, T message, ProtocolEncoderOutput out)    
  3. throws Exception;    
  4. }    

你注意到消息编码器接口与在ProtocolEncoder 中没什么不同,区别就是Object message被泛型具体化了类型,你不需要手动的类型转换了。
MessageDecoder的接口如下所示:

[java] view plain copy

  1. public interface MessageDecoder {    
  2. static MessageDecoderResult OK = MessageDecoderResult.OK;    
  3. static MessageDecoderResult NEED_DATA =    
  4. MessageDecoderResult.NEED_DATA;    
  5. static MessageDecoderResult NOT_OK = MessageDecoderResult.NOT_OK;    
  6. MessageDecoderResult decodable(IoSession session, IoBuffer in);    
  7. MessageDecoderResult decode(IoSession session, IoBuffer in,    
  8. ProtocolDecoderOutput out) throws Exception;    
  9. void finishDecode(IoSession session, ProtocolDecoderOutput out)    
  10. throws Exception;    
  11. }    

[java] view plain copy

  1. public class SendMessage {    
  2. private int i = 0;    
  3. private int j = 0;    
  4. private char symbol = ‘+’;    
  5. public char getSymbol() {    
  6. return symbol;    
  7. }    
  8. public void setSymbol(char symbol) {    
  9. this.symbol = symbol;    
  10. }    
  11. public int getI() {    
  12. return i;    
  13. }    
  14. public void setI(int i) {    
  15. this.i = i;    
  16. }    
  17. public int getJ() {    
  18. return j;    
  19. }    
  20. public void setJ(int j) {    
  21. this.j = j;    
  22. }    
  23. }    

(2.)服务端发送的返回结果对象:

[java] view plain copy

  1. public class ResultMessage {    
  2. private int result = 0;    
  3. public int getResult() {    
  4. return result;    
  5. }    
  6. public void setResult(int result) {    
  7. this.result = result;    
  8. }    
  9. }    

(3.)客户端使用的SendMessage的编码器:

[java] view plain copy

  1. public class SendMessageEncoder implements MessageEncoder

        
  2. {    
  3. @Override    
  4. public void encode(IoSession session, SendMessage message,    
  5. ProtocolEncoderOutput out) throws Exception {    
  6. IoBuffer buffer = IoBuffer.allocate(10);    
  7. buffer.putChar(message.getSymbol());    
  8. buffer.putInt(message.getI());    
  9. buffer.putInt(message.getJ());    
  10. buffer.flip();    
  11. out.write(buffer);    
  12. }    
  13. }    

这里我们的SendMessage、ResultMessage 中的字段都是用长度固定的基本数据类型,这样IoBuffer 就不需要自动扩展了,提高性能。按照一个char、两个int 计算,这里的IoBuffer只需要10 个字节的长度就可以了。
(4.)服务端使用的SendMessage的1号解码器:

[java] view plain copy

  1. public class SendMessageDecoderPositive implements MessageDecoder {    
  2. @Override    
  3. public MessageDecoderResult decodable(IoSession session, IoBuffer in)    
  4. {    
  5. if (in.remaining() < 2)    
  6. return MessageDecoderResult.NEED_DATA;    
  7. else {    
  8. char symbol = in.getChar();    
  9. if (symbol == ‘+’) {    
  10. return MessageDecoderResult.OK;    
  11. else {    
  12. return MessageDecoderResult.NOT_OK;    
  13. }    
  14. }    
  15. }    
  16. @Override    
  17. public MessageDecoderResult decode(IoSession session, IoBuffer in,    
  18. ProtocolDecoderOutput out) throws Exception {    
  19. SendMessage sm = new SendMessage();    
  20. sm.setSymbol(in.getChar());    
  21. sm.setI(in.getInt());    
  22. sm.setJ(in.getInt());    
  23. out.write(sm);    
  24. return MessageDecoderResult.OK;    
  25. }    
  26. @Override    
  27. public void finishDecode(IoSession session, ProtocolDecoderOutput    
  28. out)    
  29. throws Exception {    
  30. // undo    
  31. }    
  32. }    

因为客户端发送的SendMessage 的前两个字节(char)就是符号位,所以我们在decodable()方法中对此条件进行了判断,之后读到两个字节,并且这两个字节表示的字符是+时,才认为这个解码器可用。
(5.)服务端使用的SendMessage的2号解码器:

[java] view plain copy

  1. public class SendMessageDecoderNegative implements MessageDecoder {    
  2. @Override    
  3. public MessageDecoderResult decodable(IoSession session, IoBuffer in)    
  4. {    
  5. if (in.remaining() < 2)    
  6. return MessageDecoderResult.NEED_DATA;    
  7. else {    
  8. char symbol = in.getChar();    
  9. if (symbol == ‘-‘) {    
  10. return MessageDecoderResult.OK;    
  11. else {    
  12. return MessageDecoderResult.NOT_OK;    
  13. }    
  14. }    
  15. }    
  16. @Override    
  17. public MessageDecoderResult decode(IoSession session, IoBuffer in,    
  18. ProtocolDecoderOutput out) throws Exception {    
  19. SendMessage sm = new SendMessage();    
  20. sm.setSymbol(in.getChar());    
  21. sm.setI(-in.getInt());    
  22. sm.setJ(-in.getInt());    
  23. out.write(sm);    
  24. return MessageDecoderResult.OK;    
  25. }    
  26. @Override    
  27. public void finishDecode(IoSession session, ProtocolDecoderOutput    
  28. out)    
  29. throws Exception {    
  30. // undo    
  31. }    
  32. }    

(6.)服务端使用的ResultMessage的编码器:

[java] view plain copy

  1. public class ResultMessageEncoder implements    
  2. MessageEncoder

     {    
  3. @Override    
  4. public void encode(IoSession session, ResultMessage message,    
  5. ProtocolEncoderOutput out) throws Exception {    
  6. IoBuffer buffer = IoBuffer.allocate(4);    
  7. buffer.putInt(message.getResult());    
  8. buffer.flip();    
  9. out.write(buffer);    
  10. }    
  11. }    

(7.)客户端使用的ResultMessage的解码器:

[java] view plain copy

  1. public class ResultMessageDecoder implements MessageDecoder {    
  2. @Override    
  3. public MessageDecoderResult decodable(IoSession session, IoBuffer in)    
  4. {    
  5. if (in.remaining() < 4)    
  6. return MessageDecoderResult.NEED_DATA;    
  7. else if (in.remaining() == 4)    
  8. return MessageDecoderResult.OK;    
  9. else    
  10. return MessageDecoderResult.NOT_OK;    
  11. }    
  12. @Override    
  13. public MessageDecoderResult decode(IoSession session, IoBuffer in,    
  14. ProtocolDecoderOutput out) throws Exception {    
  15. ResultMessage rm = new ResultMessage();    
  16. rm.setResult(in.getInt());    
  17. out.write(rm);    
  18. return MessageDecoderResult.OK;    
  19. }    
  20. @Override    
  21. public void finishDecode(IoSession session, ProtocolDecoderOutput    
  22. out)    
  23. throws Exception {    
  24. // undo    
  25. }    
  26. }    

(8.)组装这些编解码器的工厂:

[java] view plain copy

  1. public class MathProtocolCodecFactory extends    
  2. DemuxingProtocolCodecFactory {    
  3. public MathProtocolCodecFactory(boolean server) {    
  4. if (server) {    
  5. super.addMessageEncoder(ResultMessage.class,    
  6. ResultMessageEncoder.class);    
  7. super.addMessageDecoder(SendMessageDecoderPositive.class);    
  8. super.addMessageDecoder(SendMessageDecoderNegative.class);    
  9. else {    
  10. super    
  11. .addMessageEncoder(SendMessage.class,    
  12. SendMessageEncoder.class);    
  13. super.addMessageDecoder(ResultMessageDecoder.class);    
  14. }    
  15. }    
  16. }    

这个工厂类我们使用了构造方法的一个布尔类型的参数,以便其可以在Server 端、Client端同时使用。我们以Server 端为例,你可以看到调用两次addMessageDecoder()方法添加了1 号、2 号解码器,其实DemuxingProtocolDecoder 内部在维护了一个MessageDecoder数组,用于保存添加的所有的消息解码器,每次decode()的时候就调用每个MessageDecoder的decodable()方法逐个检查,只要发现一个MessageDecoder 不是对应的解码器,就从数组中移除,直到找到合适的MessageDecoder,如果最后发现数组为空,就表示没找到对应的MessageDecoder,最后抛出异常。
(9.)Server端:

[java] view plain copy

  1. public class Server {    
  2. public static void main(String[] args) throws Exception {    
  3. IoAcceptor acceptor = new NioSocketAcceptor();    
  4. LoggingFilter lf = new LoggingFilter();    
  5. acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE,    
  6. 5);    
  7. acceptor.getFilterChain().addLast(“logger”, lf);    
  8. acceptor.getFilterChain().addLast(“codec”,    
  9. new ProtocolCodecFilter(new    
  10. MathProtocolCodecFactory(true)));    
  11. acceptor.setHandler(new ServerHandler());    
  12. acceptor.bind(new InetSocketAddress(9123));    
  13. }    
  14. }    

(10.)Server端使用的IoHandler:

[java] view plain copy

  1. public class ServerHandler extends IoHandlerAdapter {    
  2. private final static Logger log = LoggerFactory    
  3. .getLogger(ServerHandler.class);    
  4. @Override    
  5. public void sessionIdle(IoSession session, IdleStatus status)    
  6. throws Exception {    
  7. session.close(true);    
  8. }    
  9. @Override    
  10. public void messageReceived(IoSession session, Object message)    
  11. throws Exception {    
  12. SendMessage sm = (SendMessage) message;    
  13. log.info(“The message received is [ “ + sm.getI() + ” “    
  14. + sm.getSymbol() + ” “ + sm.getJ() + ” ]”);    
  15. ResultMessage rm = new ResultMessage();    
  16. rm.setResult(sm.getI() + sm.getJ());    
  17. session.write(rm);    
  18. }    
  19. }    

(11.)Client端:

[java] view plain copy

  1. public class Client {    
  2. public static void main(String[] args) throws Throwable {    
  3. IoConnector connector = new NioSocketConnector();    
  4. connector.setConnectTimeoutMillis(30000);    
  5. connector.getFilterChain().addLast(“logger”new    
  6. LoggingFilter());    
  7. connector.getFilterChain().addLast(“codec”,    
  8. new ProtocolCodecFilter(new    
  9. MathProtocolCodecFactory(false)));    
  10. connector.setHandler(new ClientHandler());    
  11. connector.connect(new InetSocketAddress(“localhost”9123));    
  12. }    
  13. }    

(12.)Client端的IoHandler:

[java] view plain copy

  1. public class ClientHandler extends IoHandlerAdapter {    
  2. private final static Logger LOGGER = LoggerFactory    
  3. .getLogger(ClientHandler.class);    
  4. @Override    
  5. public void sessionOpened(IoSession session) throws Exception {    
  6. SendMessage sm = new SendMessage();    
  7. sm.setI(100);    
  8. sm.setJ(99);    
  9. sm.setSymbol(‘+’);    
  10. session.write(sm);    
  11. }    
  12. @Override    
  13. public void messageReceived(IoSession session, Object message) {    
  14. ResultMessage rs = (ResultMessage) message;    
  15. LOGGER.info(String.valueOf(rs.getResult()));    
  16. }    
  17. }    

你尝试改变(12.)中的红色代码中的正负号,会看到服务端使用了两个不同的解码器对其进行处理。

 


7.线程模型配置:
Mina 中的很多执行环节都使用了多线程机制,用于提高性能。Mina 中默认在三个地方使用了线程:
(1.) IoAcceptor:
这个地方用于接受客户端的连接建立,每监听一个端口(每调用一次bind()方法),都启用一个线程,这个数字我们不能改变。这个线程监听某个端口是否有请求到来,一旦发现,则创建一个IoSession 对象。因为这个动作很快,所以有一个线程就够了。
(2.) IoConnector:
这个地方用于与服务端建立连接,每连接一个服务端(每调用一次connect()方法),就启用一个线程,我们不能改变。同样的,这个线程监听是否有连接被建立,一旦发现,则创建一个IoSession 对象。因为这个动作很快,所以有一个线程就够了。
(3.) IoProcessor:
这个地方用于执行真正的IO 操作,默认启用的线程个数是CPU 的核数+1,譬如:单CPU 双核的电脑,默认的IoProcessor 线程会创建3 个。这也就是说一个IoAcceptor 或者IoConnector 默认会关联一个IoProcessor 池,这个池中有3 个IoProcessor。因为IO 操作耗费资源,所以这里使用IoProcessor 池来完成数据的读写操作,有助于提高性能。这也就是前面说的IoAccetor、IoConnector 使用一个Selector,而IoProcessor 使用自己单独的Selector 的原因。那么为什么IoProcessor 池中的IoProcessor 数量只比CPU 的核数大1 呢?因为IO 读写操作是耗费CPU 的操作,而每一核CPU 同时只能运行一个线程,因此IoProcessor 池中的IoProcessor 的数量并不是越多越好。






[java] view plain copy

  1. public enum IoEventType {    
  2. SESSION_CREATED,    
  3. SESSION_OPENED,    
  4. SESSION_CLOSED,    
  5. MESSAGE_RECEIVED,    
  6. MESSAGE_SENT,    
  7. SESSION_IDLE,    
  8. EXCEPTION_CAUGHT,    
  9. WRITE,    
  10. CLOSE,    
  11. }    

默认情况下,没有配置关注的事件类型,有如下六个事件方法会被自动使用线程池异步执行:
IoEventType.EXCEPTION_CAUGHT,
IoEventType.MESSAGE_RECEIVED,
IoEventType.MESSAGE_SENT,
IoEventType.SESSION_CLOSED,
IoEventType.SESSION_IDLE,
IoEventType.SESSION_OPENED
其实ExecutorFilter 的工作机制很简单,就是在调用下一个过滤器的事件方法时,把其交给Executor 的execute(Runnable runnable)方法来执行,其实你自己在IoHandler 或者某个过滤器的事件方法中开启一个线程,也可以完成同样的功能,只不过这样做,你就失去了程序的可配置性,线程调用的代码也会完全耦合在代码中。但要注意的是绝对不能开启线程让其执行sessionCreated()方法。如果你真的打算使用这个ExecutorFilter,那么最好想清楚它该放在过滤器链的哪个位置,针对哪些事件做异步处理机制。一般ExecutorFilter 都是要放在ProtocolCodecFilter 过滤器的后面,也就是不要让编解码运行在独立的线程上,而是要运行在IoProcessor 所在的线程,因为编解码处理的数据都是由IoProcessor 读取和发送的,没必要开启新的线程,否则性能反而会下降。一般使用ExecutorFilter 的典型场景是将业务逻辑(譬如:耗时的数据库操作)放在单独的线程中运行,也就是说与IO 处理无关的操作可以考虑使用ExecutorFilter 来异步执行。






















































































版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/221491.html原文链接:https://javaforall.net

(0)
上一篇 2026年3月17日 下午5:58
下一篇 2026年3月17日 下午5:59


相关推荐

  • 元素守恒计算方法_leetcode免费吗

    元素守恒计算方法_leetcode免费吗给定一个整数数组 nums,按要求返回一个新数组 counts。数组 counts 有该性质: counts[i] 的值是 nums[i] 右侧小于 nums[i] 的元素的数量。示例:输入:nums = [5,2,6,1]输出:[2,1,1,0] 解释:5 的右侧有 2 个更小的元素 (2 和 1)2 的右侧仅有 1 个更小的元素 (1)6 的右侧有 1 个更小的元素 (1)1 的右侧有 0 个更小的元素提示:0 <= nums.length <= 10^5-10^4

    2022年8月8日
    7
  • OAM协议详解_服务期限怎么填写

    OAM协议详解_服务期限怎么填写OAMPDU消息格式及定义OAMPDU消息的格式如下图6-8所示。各个字段的详细定义如下:a) 目的地址(DA):Slow_Protocols_Multicast地址,使用和编码规定见IEEE802.3-2005Annex43B;b) 源地址(SA):OAMPDU中的SA是独立的MAC地址,该地址与发送OAMPDU的端口相关联;c) Length/Type:OAMPDU采

    2025年8月8日
    2
  • 奔图 Pantum P2206NW 打印机驱动

    奔图 Pantum P2206NW 打印机驱动奔图 PantumP2206N 打印机驱动是官方提供的一款打印机驱动 本站收集提供高速下载 用于解决打印机与电脑连接不了 无法正常使用的问题 本动适用于 WindowsXP Windows7 Windows8 Windows1032 64 位操作系统 有需要的朋友可以来本站下载安装 奔图 PantumP2206N 打印机驱动 http www equdong net qudong bt Pantum 7872 html

    2026年3月26日
    3
  • 支付宝授权登录淘宝_vue的登录实现

    支付宝授权登录淘宝_vue的登录实现api接口文档:https://docs.open.alipay.com/289/105656后台管理系统原本是用账号密码登录的,不过需求要改成支付宝授权,前端仅仅需要改登录页,以及添加一个授权返

    2022年8月1日
    11
  • 算法题做到崩溃?刷了几千道算法题,关于如何刷题有些话我想对你说

    算法题做到崩溃?刷了几千道算法题,关于如何刷题有些话我想对你说算法刷到最后 最后记在脑子里的不是代码 是思路 如果你有思路 那你一定能把代码写出来 你不可能记住所有题的代码 唯一可以记住的是解题思路 所以怎么码代码不是一个问题 怎么解题才是一个问题 建议刷题的时候这样刷 看到一道题 先想想怎么解 如果是你的话你会用什么方法去解 想好了以后用代码实现一遍 看能不能行 一般把想法用代码实现后 你的代码跑不通 以上的原因不是你的编程问题 而是你的思路在某个点

    2026年2月23日
    2
  • java.lang.assertionerror_java parseint

    java.lang.assertionerror_java parseintMicrosoftVisualStudioSolutionFile,FormatVersion12.00#VisualStudio15VisualStudioVersion=15.0.26730.16MinimumVisualStudioVersion=10.0.40219.1Project(“{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC…

    2025年10月6日
    3

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号