图文详解mina框架

图文详解mina框架ApacheMinaSe 是一个网络通信应用框架 也就是说 它主要是对基于 TCP IP UDP IP 协议栈的通信框架 当然 也可以提供 JAVA 对象的序列化服务 虚拟机管道通信服务等 Mina 可以帮助我们快速开发高性能 高扩展性的网络通信应用 Mina 提供了事件驱动 异步 Mina 的异步 IO 默认使用的是 JAVANIO 作为底层支持 操作的编程模型 Mina 主要有 1 x 和

Apache Mina Server 是一个网络通信应用框架,也就是说,它主要是对基于TCP/IP、UDP/IP协议栈的通信框架(当然,也可以提供JAVA 对象的序列化服务、虚拟机管道通信服务等),Mina 可以帮助我们快速开发高性能、高扩展性的网络通信应用,Mina 提供了事件驱动、异步(Mina 的异步IO 默认使用的是JAVA NIO 作为底层支持)操作的编程模型。Mina 主要有1.x 和2.x 两个分支,这里我们讲解最新版本2.0,如果你使用的是Mina 1.x,那么可能会有一些功能并不适用。学习本文档,需要你已掌握JAVA IO、JAVA NIO、JAVASocket、JAVA 线程及并发库(java.util.concurrent.*)的知识。Mina 同时提供了网络通信的Server 端、Client 端的封装,无论是哪端,Mina 在整个网通通信结构中都处于如下的位置:可见Mina 的API 将真正的网络通信与我们的应用程序隔离开来,你只需要关心你要发送、接收的数据以及你的业务逻辑即可。同样的,无论是哪端,Mina 的执行流程如下所示:

图文详解mina框架

Mina的底层依赖的主要是Java NIO库,上层提供的是基于事件的异步接口。其整体的结构如下:

图文详解mina框架

(1.) IoService:最底层的是IOService,负责具体的IO相关工作。这一层的典型代表有IOSocketAcceptor和IOSocketChannel,分别对应TCP协议下的服务端和客户端的IOService。IOService的意义在于隐藏底层IO的细节,对上提供统一的基于事件的异步IO接口。每当有数据到达时,IOService会先调用底层IO接口读取数据,封装成IoBuffer,之后以事件的形式通知上层代码,从而将Java NIO的同步IO接口转化成了异步IO。所以从图上看,进来的low-level IO经过IOService层后变成IO Event。具体的代码可以参考org.apache.mina.core.polling.AbstractPollingIoProcessor的私有内部类Processor。

(2.) IoProcessor:这个接口在另一个线程上,负责检查是否有数据在通道上读写,也就是说它也拥有自己的Selector,这是与我们使用JAVA NIO 编码时的一个不同之处,通常在JAVA NIO 编码中,我们都是使用一个Selector,也就是不区分IoService与IoProcessor 两个功能接口。另外,IoProcessor 负责调用注册在IoService 上的过滤器,并在过滤器链之后调用IoHandler。
(3.) IoFilter:这个接口定义一组拦截器,这些拦截器可以包括日志输出、黑名单过滤、数据的编码(write 方向)与解码(read 方向)等功能,其中数据的encode 与decode是最为重要的、也是你在使用Mina 时最主要关注的地方。
(4.) IoHandler:这个接口负责编写业务逻辑,也就是接收、发送数据的地方。需要有开发者自己来实现这个接口。IoHandler可以看成是Mina处理流程的终点,每个IoService都需要指定一个IoHandler。




(5.)IoSession:是对底层连接(服务器与客户端的特定连接,该连接由服务器地址、端口以及客户端地址、端口来决定)的封装,一个IoSession对应于一个底层的IO连接(在Mina中UDP也被抽象成了连接)。通过IoSession,可以获取当前连接相关的上下文信息,以及向远程peer发送数据。发送数据其实也是个异步的过程。发送的操作首先会逆向穿过IoFilterChain,到达IoService。但IoService上并不会直接调用底层IO接口来将数据发送出去,而是会将该次调用封装成一个WriteRequest,放入session的writeRequestQueue中,最后由IoProcessor线程统一调度flush出去。所以发送操作并不会引起上层调用线程的阻塞。具体代码可以参考org.apache.mina.core.filterchain.DefaultIoFilterChain的内部类HeadFilter的filterWrite方法。

工作流程:

图文详解mina框架

服务端流程:

1、通过SocketAcceptor 同客户端建立连接;

2、连接建立之后 I/O的读写交给了I/O Processor线程,I/O Processor是多线程的;

3、通过I/O Processor 读取的数据经过IoFilterChain里所有配置的IoFilter,IoFilter进行消息的过滤,格式的转换,在这个层面可以制定一些自定义的协议;

4、最后IoFilter将数据交给 Handler  进行业务处理,完成了整个读取的过程;

写入过程也是类似,只是刚好倒过来,通过IoSession.write 写出数据,然后Handler进行写入的业务处理,处理完成后交给IoFilterChain,进行消息过滤和协议的转换,最后通过 I/O Processor 将数据写出到 socket 通道。

一图胜千言,MINA的核心类图:

图文详解mina框架


IoAcceptor ac ceptor = new NioSocketAcceptor(); acceptor.getSessionConfig().setReadBufferSize(2048); acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10); //设置过滤器 //... //设置handler //绑定端口 acceptor.bind(new InetSocketAddress(9124));

这段代码我们初始化了服务端的TCP/IP 的基于NIO 的套接字,然后调用IoSessionConfig设置读取数据的缓冲区大小、读写通道均在10 秒内无任何操作就进入空闲状态。

 // 编写过滤器 acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"), LineDelimiter.WINDOWS.getValue(), LineDelimiter.WINDOWS.getValue())) );

这段代码要在acceptor.bind()方法之前执行,因为绑定套接字之后就不能再做这些准备工作了。这里先不用清楚编解码器是如何工作的,这个是后面重点说明的内容,这里你只需要清楚,我们传输的以换行符为标识的数据,所以使用了Mina 自带的换行符编解码器工厂。

(3.) 第三步:编写IoHandler

这里我们只是简单的打印Client 传说过来的数据。

package com.dxz.minademo2; import org.apache.mina.core.service.IoHandlerAdapter; import org.apache.mina.core.session.IoSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class TCPServerHandler extends IoHandlerAdapter { // 这里我们使用的SLF4J作为日志门面,至于为什么在后面说明。 private final static Logger log = LoggerFactory.getLogger(TCPServerHandler.class); @Override public void messageReceived(IoSession session, Object message) throws Exception { String str = message.toString(); System.out.println("The message received is [" + str + "]"); if (str.endsWith("quit")) { session.close(true); return; } } @Override public void sessionCreated(IoSession session) throws Exception { System.out.println("server session created"); super.sessionCreated(session); } @Override public void sessionOpened(IoSession session) throws Exception { System.out.println("server session Opened"); super.sessionOpened(session); } @Override public void sessionClosed(IoSession session) throws Exception { System.out.println("server session Closed"); super.sessionClosed(session); } }

然后我们把这个IoHandler 注册到IoService:

//设置handler acceptor.setHandler(new TCPServerHandler());

当然这段代码也要在acceptor.bind()方法之前执行。然后我们运行MyServer 中的main 方法,你可以看到控制台一直处于阻塞状态,等待客户端连接。

完成的代码:

package com.dxz.minademo2; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.charset.Charset; import org.apache.mina.core.service.IoAcceptor; import org.apache.mina.core.session.IdleStatus; import org.apache.mina.filter.codec.ProtocolCodecFilter; import org.apache.mina.filter.codec.textline.LineDelimiter; import org.apache.mina.filter.codec.textline.TextLineCodecFactory; import org.apache.mina.transport.socket.nio.NioSocketAcceptor; public class TCPServer { public static void main(String[] args) throws IOException { IoAcceptor acceptor = new NioSocketAcceptor(); acceptor.getSessionConfig().setReadBufferSize(2048); acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10); // 编写过滤器 acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"), LineDelimiter.WINDOWS.getValue(), LineDelimiter.WINDOWS.getValue())) ); //设置handler acceptor.setHandler(new TCPServerHandler()); //绑定端口 acceptor.bind(new InetSocketAddress(9124)); } }

测试:

此时,我们用telnet 127.0.0.1 9123 访问,然后输入一些内容,当按下回车键,你会发现数据在Server 端被输出,但要注意不要输入中文,因为Windows 的命令行窗口不会对传输的数据进行UTF-8 编码。当输入quit 结尾的字符串时,连接被断开。这里注意你如果使用的操作系统,或者使用的Telnet 软件的换行符是什么,如果不清楚,可以删掉第二步中的两个红色的参数,使用TextLineCodec 内部的自动识别机制。

pom.xml

 
    
    
    
      org.apache.mina 
     
    
      mina-core 
     
    
      2.0.7 
     
    
    
    
      org.apache.mina 
     
    
      mina-integration-spring 
     
    
      1.1.7 
     
   

2. 简单的TCPClient:
这里我们实现Mina 中的TCPClient,因为前面说过无论是Server 端还是Client 端,在Mina中的执行流程都是一样的。唯一不同的就是IoService 的Client 端实现是IoConnector。

(1.) 第一步:编写IoService并注册过滤器

package com.dxz.minademo2; import java.net.InetSocketAddress; import java.nio.charset.Charset; import org.apache.mina.core.service.IoConnector; import org.apache.mina.filter.codec.ProtocolCodecFilter; import org.apache.mina.filter.codec.textline.LineDelimiter; import org.apache.mina.filter.codec.textline.TextLineCodecFactory; import org.apache.mina.transport.socket.nio.NioSocketConnector; public class TCPClient { public static void main(String[] args) { IoConnector connector = new NioSocketConnector(); connector.setConnectTimeoutMillis(30000); connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"), LineDelimiter.WINDOWS.getValue(), LineDelimiter.WINDOWS.getValue()))); connector.setHandler(new TCPClientHandler("你好!\r\n 大家好!")); connector.connect(new InetSocketAddress("localhost", 9124)); } }

(2.) 第三步:编写IoHandler

package com.dxz.minademo2; import org.apache.mina.core.service.IoHandlerAdapter; import org.apache.mina.core.session.IoSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class TCPClientHandler extends IoHandlerAdapter { private final static Logger LOGGER = LoggerFactory.getLogger(TCPClientHandler.class); private final String values; public TCPClientHandler(String values) { this.values = values; } @Override public void sessionOpened(IoSession session) { session.write(values); } }

注册IoHandler:

connector.setHandler(new ClientHandler("你好!\r\n 大家好!")); 
然后我们运行MyClient,你会发现MyServer 输出如下语句: The message received is [你好!] The message received is [大家好!] 我们看到服务端是按照收到两条消息输出的,因为我们用的编解码器是以换行符判断数据是否读取完毕的。

3. 介绍Mina的TCP的主要接口:
通过上面的两个示例,你应该对Mina 如何编写TCP/IP 协议栈的网络通信有了一些感性的认识。
(1.)IoService:
这个接口是服务端IoAcceptor、客户端IoConnector 的抽象,提供IO 服务和管理IoSession的功能,它有如下几个常用的方法:
A. TransportMetadata getTransportMetadata():
这个方法获取传输方式的元数据描述信息,也就是底层到底基于什么的实现,譬如:nio、apr 等。
B. void addListener(IoServiceListener listener):
这个方法可以为IoService 增加一个监听器,用于监听IoService 的创建、活动、失效、空闲、销毁,具体可以参考IoServiceListener 接口中的方法,这为你参与IoService 的生命周期提供了机会。
C. void removeListener(IoServiceListener listener):
这个方法用于移除上面的方法添加的监听器。
D. void setHandler(IoHandler handler):
这个方法用于向IoService 注册IoHandler,同时有getHandler()方法获取Handler。
E. Map

getManagedSessions():

这个方法获取IoService 上管理的所有IoSession,Map 的key 是IoSession 的id。
F. IoSessionConfig getSessionConfig():
这个方法用于获取IoSession 的配置对象,通过IoSessionConfig 对象可以设置Socket 连接的一些选项。































(2.)IoAcceptor:
这个接口是TCPServer 的接口,主要增加了void bind()监听端口、void unbind()解除对套接字的监听等方法。这里与传统的JAVA 中的ServerSocket 不同的是IoAcceptor 可以多次调用bind()方法(或者在一个方法中传入多个SocketAddress 参数)同时监听多个端口。 

3.)IoConnector:
这个接口是TCPClient 的接口, 主要增加了ConnectFuture connect(SocketAddressremoteAddress,SocketAddress localAddress)方法,用于与Server 端建立连接,第二个参数如果不传递则使用本地的一个随机端口访问Server 端。这个方法是异步执行的,同样的,也可以同时连接多个服务端。

(4.)IoSession:
这个接口用于表示Server 端与Client 端的连接,IoAcceptor.accept()的时候返回实例。
这个接口有如下常用的方法:
A. WriteFuture write(Object message):
这个方法用于写数据,该操作是异步的。
B. CloseFuture close(boolean immediately):
这个方法用于关闭IoSession,该操作也是异步的,参数指定true 表示立即关闭,否则就在所有的写操作都flush 之后再关闭。
C. Object setAttribute(Object key,Object value):
这个方法用于给我们向会话中添加一些属性,这样可以在会话过程中都可以使用,类似于HttpSession 的setAttrbute()方法。IoSession 内部使用同步的HashMap 存储你添加的自定义属性。
D. SocketAddress getRemoteAddress():
这个方法获取远端连接的套接字地址。
E. void suspendWrite():
这个方法用于挂起写操作,那么有void resumeWrite()方法与之配对。对于read()方法同样适用。
F. ReadFuture read():
这个方法用于读取数据, 但默认是不能使用的, 你需要调用IoSessionConfig 的setUseReadOperation(true)才可以使用这个异步读取的方法。一般我们不会用到这个方法,因为这个方法的内部实现是将数据保存到一个BlockingQueue,假如是Server 端,因为大量的Client 端发送的数据在Server 端都这么读取,那么可能会导致内存泄漏,但对于Client,可能有的时候会比较便利。
G. IoService getService():
这个方法返回与当前会话对象关联的IoService 实例。
关于TCP连接的关闭:
无论在客户端还是服务端,IoSession 都用于表示底层的一个TCP 连接,那么你会发现无论是Server 端还是Client 端的IoSession 调用close()方法之后,TCP 连接虽然显示关闭, 但主线程仍然在运行,也就是JVM 并未退出,这是因为IoSession 的close()仅仅是关闭了TCP的连接通道,并没有关闭Server 端、Client 端的程序。你需要调用IoService 的dispose()方法停止Server 端、Client 端。




































(5.)IoSessionConfig:
这个方法用于指定此次会话的配置,它有如下常用的方法:
A. void setReadBufferSize(int size):




(6.)IoHandler:
这个接口是你编写业务逻辑的地方,从上面的示例代码可以看出,读取数据、发送数据基本都在这个接口总完成,这个实例是绑定到IoService 上的,有且只有一个实例(没有给一个IoService 注入一个IoHandler 实例会抛出异常)。它有如下几个方法:
A. void sessionCreated(IoSession session):
这个方法当一个Session 对象被创建的时候被调用。对于TCP 连接来说,连接被接受的时候调用,但要注意此时TCP 连接并未建立,此方法仅代表字面含义,也就是连接的对象IoSession 被创建完毕的时候,回调这个方法。对于UDP 来说,当有数据包收到的时候回调这个方法,因为UDP 是无连接的。
B. void sessionOpened(IoSession session):
这个方法在连接被打开时调用,它总是在sessionCreated()方法之后被调用。对于TCP 来说,它是在连接被建立之后调用,你可以在这里执行一些认证操作、发送数据等。对于UDP 来说,这个方法与sessionCreated()没什么区别,但是紧跟其后执行。如果你每隔一段时间,发送一些数据,那么sessionCreated()方法只会在第一次调用,但是sessionOpened()方法每次都会调用。
C. void sessionClosed(IoSession session) :
对于TCP 来说,连接被关闭时,调用这个方法。对于UDP 来说,IoSession 的close()方法被调用时才会毁掉这个方法。
D. void sessionIdle(IoSession session, IdleStatus status) :
这个方法在IoSession 的通道进入空闲状态时调用,对于UDP 协议来说,这个方法始终不会被调用。
E. void exceptionCaught(IoSession session, Throwable cause) :
这个方法在你的程序、Mina 自身出现异常时回调,一般这里是关闭IoSession。






















(7.)IoBuffer:
这个接口是对JAVA NIO 的ByteBuffer 的封装,这主要是因为ByteBuffer 只提供了对基本数据类型的读写操作,没有提供对字符串等对象类型的读写方法,使用起来更为方便,另外,ByteBuffer 是定长的,如果想要可变,将很麻烦。IoBuffer 的可变长度的实现类似于StringBuffer。IoBuffer 与ByteBuffer 一样,都是非线程安全的。本节的一些内容如果不清楚,可以参考java.nio.ByteBuffer 接口。这个接口有如下常用的方法:
A. static IoBuffer allocate(int capacity,boolean useDirectBuffer):
这个方法内部通过SimpleBufferAllocator 创建一个实例,第一个参数指定初始化容量,第二个参数指定使用直接缓冲区还是JAVA 内存堆的缓存区,默认为false。
B. void free():
释放缓冲区,以便被一些IoBufferAllocator 的实现重用,一般没有必要调用这个方法,除非你想提升性能(但可能未必效果明显)。
C. IoBuffer setAutoExpand(boolean autoExpand):
这个方法设置IoBuffer 为自动扩展容量,也就是前面所说的长度可变,那么可以看出长度可变这个特性默认是不开启的。
D. IoBuffer setAutoShrink(boolean autoShrink):
这个方法设置IoBuffer 为自动收缩,这样在compact()方法调用之后,可以裁减掉一些没有使用的空间。如果这个方法没有被调用或者设置为false,你也可以通过调用shrink()方法手动收缩空间。


















(8.)IoFuture:
在Mina 的很多操作中,你会看到返回值是XXXFuture,实际上他们都是IoFuture 的子类,看到这样的返回值,这个方法就说明是异步执行的,主要的子类有ConnectFuture、CloseFuture 、ReadFuture 、WriteFuture 。这个接口的大部分操作都和
java.util.concurrent.Future 接口是类似的,譬如:await()、awaitUninterruptibly()等,一般我们常用awaitUninterruptibly()方法可以等待异步执行的结果返回。这个接口有如下常用的方法:
A. IoFuture addListener(IoFutureListener
listener):
这个方法用于添加一个监听器, 在异步执行的结果返回时监听器中的回调方法operationComplete(IoFuture future),也就是说,这是替代awaitUninterruptibly()方法另一种等待异步执行结果的方法,它的好处是不会产生阻塞。
B. IoFuture removeListener(IoFutureListener
listener):
这个方法用于移除指定的监听器。
C. IoSession getSession():
这个方法返回当前的IoSession。举个例子,我们在客户端调用connect()方法访问Server 端的时候,实际上这就是一个异步执行的方法,也就是调用connect()方法之后立即返回,执行下面的代码,而不管是否连接成功。那么如果我想在连接成功之后执行一些事情(譬如:获取连接成功后的IoSession对象),该怎么办呢?按照上面的说明,你有如下两种办法:
















第一种:

package com.dxz.minademo3; import java.net.InetSocketAddress; import java.nio.charset.Charset; import org.apache.mina.core.future.ConnectFuture; import org.apache.mina.core.service.IoConnector; import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.codec.ProtocolCodecFilter; import org.apache.mina.filter.codec.textline.LineDelimiter; import org.apache.mina.filter.codec.textline.TextLineCodecFactory; import org.apache.mina.transport.socket.nio.NioSocketConnector; import com.dxz.minademo2.TCPClientHandler; public class TCPClient { public static void main(String[] args) { IoConnector connector = new NioSocketConnector(); connector.setConnectTimeoutMillis(30000); connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"), LineDelimiter.WINDOWS.getValue(), LineDelimiter.WINDOWS.getValue()))); connector.setHandler(new TCPClientHandler("你好!\r\n 大家好!")); ConnectFuture future = connector.connect(new InetSocketAddress("localhost", 9124)); // 等待是否连接成功,相当于是转异步执行为同步执行。 future.awaitUninterruptibly(); // 连接成功后获取会话对象。如果没有上面的等待,由于connect()方法是异步的,session可能会无法获取。 IoSession session = future.getSession(); System.out.println(session); } } 

第二种:

package com.dxz.minademo3; import java.net.InetSocketAddress; import java.nio.charset.Charset; import org.apache.mina.core.future.ConnectFuture; import org.apache.mina.core.future.IoFutureListener; import org.apache.mina.core.service.IoConnector; import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.codec.ProtocolCodecFilter; import org.apache.mina.filter.codec.textline.LineDelimiter; import org.apache.mina.filter.codec.textline.TextLineCodecFactory; import org.apache.mina.transport.socket.nio.NioSocketConnector; import com.dxz.minademo2.TCPClientHandler; public class TCPClient { public static void main(String[] args) { IoConnector connector = new NioSocketConnector(); connector.setConnectTimeoutMillis(30000); connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"), LineDelimiter.WINDOWS.getValue(), LineDelimiter.WINDOWS.getValue()))); connector.setHandler(new TCPClientHandler("你好!\r\n 大家好!")); ConnectFuture future = connector.connect(new InetSocketAddress("localhost", 9124)); future.addListener(new IoFutureListener 
    
      () { public void operationComplete(ConnectFuture future) { try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } IoSession session = future.getSession(); System.out.println("++++++++++++++++++++++++++++"); } }); System.out.println("*"); } } 
    

结果:

* ++++++++++++++++++++++++++++

为了更好的看清楚使用监听器是异步的,而不是像awaitUninterruptibly()那样会阻塞主线程的执行,我们在回调方法中暂停5 秒钟,然后输出+++,在最后输出*。我们执行代码之后,你会发现首先输出*(这证明了监听器是异步执行的),然后IoSession 对象Created,系统暂停5 秒,然后输出+++,最后IoSession 对象Opened,也就是TCP 连接建立。

4.日志配置:
前面的示例代码中提到了使用SLF4J 作为日志门面,这是因为Mina 内部使用的就是SLF4J,你也使用SLF4J 可以与之保持一致性。Mina 如果想启用日志跟踪Mina 的运行细节,你可以配置LoggingFilter 过滤器,这样你可以看到Session 建立、打开、空闲等一系列细节在日志中输出,默认SJF4J 是按照DEBUG级别输出跟踪信息的,如果你想给某一类别的Mina 运行信息输出指定日志输出级别,可以调用LoggingFilter 的setXXXLogLevel(LogLevel.XXX)。

例:

LoggingFilter lf = new LoggingFilter(); lf.setSessionOpenedLogLevel(LogLevel.ERROR); acceptor.getFilterChain().addLast("logger", lf); 

这里IoSession 被打开的跟踪信息将以ERROR 级别输出到日志。


5.过滤器:
前面我们看到了LoggingFilter、ProtocolCodecFilter 两个过滤器,一个负责日志输出,一个负责数据的编解码,通过最前面的Mina 执行流程图,在IoProcessor 与IoHandler 之间可以有很多的过滤器,这种设计方式为你提供可插拔似的扩展功能提供了非常便利的方式,目前的Apache CXF、Apache Struts2 中的拦截器也都是一样的设计思路。Mina 中的IoFilter 是单例的,这与CXF、Apache Struts2 没什么区别。IoService 实例上会绑定一个DefaultIoFilterChainBuilder 实例,DefaultIoFilterChainBuilder 会把使用内部的EntryImpl 类把所有的过滤器按照顺序连在一起,组成一个过滤器链。
DefaultIoFilterChainBuilder 类如下常用的方法:
A. void addFirst(String name,IoFilter filter):
这个方法把过滤器添加到过滤器链的头部,头部就是IoProcessor 之后的第一个过滤器。同样的addLast()方法把过滤器添加到过滤器链的尾部。
B. void addBefore(String baseName,String name,IoFilter filter):
这个方法将过滤器添加到baseName 指定的过滤器的前面,同样的addAfter()方法把过滤器添加到baseName 指定的过滤器的后面。这里要注意无论是那种添加方法,每个过滤器的名字(参数name)必须是唯一的。
C. IoFilter remove(Stirng name):
这个方法移除指定名称的过滤器,你也可以调用另一个重载的remove()方法,指定要移除的IoFilter 的类型。
D. List

getAll():

这个方法返回当前IoService 上注册的所有过滤器。默认情况下,过滤器链中是空的,也就是getAll()方法返回长度为0 的List,但实际Mina内部有两个隐藏的过滤器:HeadFilter、TailFilter,分别在List 的最开始和最末端,很明显,TailFilter 在最末端是为了调用过滤器链之后,调用IoHandler。但这两个过滤器对你来说是透明的,可以忽略它们的存在。编写一个过滤器很简单,你需要实现IoFilter 接口,如果你只关注某几个方法,可以继承IoFilterAdapter 适配器类。IoFilter 接口中主要包含两类方法,一类是与IoHandler 中的方法名一致的方法,相当于拦截IoHandler 中的方法,另一类是IoFilter 的生命周期回调方法,这些回调方法的执行顺序和解释如下所示:




















package com.dxz.minademo3; import org.apache.mina.core.filterchain.IoFilter; import org.apache.mina.core.filterchain.IoFilterChain; import org.apache.mina.core.session.IdleStatus; import org.apache.mina.core.session.IoSession; import org.apache.mina.core.write.WriteRequest; public class MyIoFilter implements IoFilter { @Override public void destroy() throws Exception { System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%�stroy"); } @Override public void exceptionCaught(NextFilter nextFilter, IoSession session, Throwable cause) throws Exception { System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%exceptionCaught"); nextFilter.exceptionCaught(session, cause); } @Override public void filterClose(NextFilter nextFilter, IoSession session) throws Exception { System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%filterClose"); nextFilter.filterClose(session); } @Override public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception { System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%filterWrite"); nextFilter.filterWrite(session, writeRequest); } @Override public void init() throws Exception { System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%init"); } @Override public void messageReceived(NextFilter nextFilter, IoSession session, Object message) throws Exception { System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%messageReceived"); nextFilter.messageReceived(session, message); } @Override public void messageSent(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception { System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%messageSent"); nextFilter.messageSent(session, writeRequest); } @Override public void onPostAdd(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception { System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%onPostAdd"); } @Override public void onPostRemove(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception { System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%onPostRemove"); } @Override public void onPreAdd(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception { System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%onPreAdd"); } @Override public void onPreRemove(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception { System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%onPreRemove"); } @Override public void sessionClosed(NextFilter nextFilter, IoSession session) throws Exception { System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%sessionClosed"); nextFilter.sessionClosed(session); } @Override public void sessionCreated(NextFilter nextFilter, IoSession session) throws Exception { System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%sessionCreated"); nextFilter.sessionCreated(session); } @Override public void sessionIdle(NextFilter nextFilter, IoSession session, IdleStatus status) throws Exception { System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%sessionIdle"); nextFilter.sessionIdle(session, status); } @Override public void sessionOpened(NextFilter nextFilter, IoSession session) throws Exception { System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%sessionOpened"); nextFilter.sessionOpened(session); } }

我们将这个拦截器注册到上面的TCPServer 的IoAcceptor 的过滤器链中的最后一个:

acceptor.getFilterChain().addLast("myIoFilter", new ReferenceCountingFilter(new MyIoFilter()));

这里我们将MyIoFilter 用ReferenceCountingFilter 包装起来,这样你可以看到init()、destroy()方法调用。我们启动客户端访问,然后关闭客户端,你会看到执行顺序如下所示:

%%%%%%%%%%%%%%%%%%%%%%%%%%%init %%%%%%%%%%%%%%%%%%%%%%%%%%%onPreAdd %%%%%%%%%%%%%%%%%%%%%%%%%%%onPostAdd %%%%%%%%%%%%%%%%%%%%%%%%%%%sessionCreated server session created %%%%%%%%%%%%%%%%%%%%%%%%%%%sessionOpened server session Opened %%%%%%%%%%%%%%%%%%%%%%%%%%%messageReceived The message received is [你好!] %%%%%%%%%%%%%%%%%%%%%%%%%%%messageReceived The message received is [ 大家好!] %%%%%%%%%%%%%%%%%%%%%%%%%%%sessionIdle

IoHandler 的对应方法会跟在上面的对应方法之后执行,这也就是说从横向(单独的看一个过滤器中的所有方法的执行顺序)上看,每个过滤器的执行顺序是上面所示的顺序;从纵向(方法链的调用)上看,如果有filter1、filter2 两个过滤器,sessionCreated()方法的执行顺序如下所示:

6.协议编解码器:

multiplex:英 [ˈmʌltɪpleks] 美 [ˈmʌltəˌplɛks] adj.多元的,多倍的,复式的;多部的,复合的,多样的,多重的;;[电讯]多路传输的n.多路;多厅影院,多剧场影剧院v.多路传输,多路复用;多重发讯

图文详解mina框架

public interface ProtocolCodecFactory {   ProtocolEncoder getEncoder(IoSession session) throws Exception;   ProtocolDecoder getDecoder(IoSession session) throws Exception; }

(6-1.)简单的编解码器示例:
下面我们举一个模拟电信运营商短信协议的编解码器实现,假设通信协议如下所示:
M sip:wap.fetion.com.cn SIP-C/2.0
S: xxxx
R: xxxx








package com.dxz.minademo3; public class SmsObject { private String sender;// 短信发送者 private String receiver;// 短信接受者 private String message;// 短信内容 public String getSender() { return sender; } public void setSender(String sender) { this.sender = sender; } public String getReceiver() { return receiver; } public void setReceiver(String receiver) { this.receiver = receiver; } public String getMessage() { return message; } public void setMessage(String message) { this.message = message; } }
package com.dxz.minademo3; import java.nio.charset.Charset; import java.nio.charset.CharsetEncoder; import org.apache.mina.core.buffer.IoBuffer; import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.codec.ProtocolEncoderAdapter; import org.apache.mina.filter.codec.ProtocolEncoderOutput; public class CmccSipcEncoder extends ProtocolEncoderAdapter { private final Charset charset; public CmccSipcEncoder(Charset charset) { this.charset = charset; } @Override public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception { SmsObject sms = (SmsObject) message; CharsetEncoder ce = charset.newEncoder(); IoBuffer buffer = IoBuffer.allocate(100).setAutoExpand(true); String statusLine = "M sip:wap.fetion.com.cn SIP-C/2.0"; String sender = sms.getSender(); String receiver = sms.getReceiver(); String smsContent = sms.getMessage(); buffer.putString(statusLine + '\n', ce); buffer.putString("S: " + sender + '\n', ce); buffer.putString("R: " + receiver + '\n', ce); buffer.putString("L: " + (smsContent.getBytes(charset).length) + "\n", ce); buffer.putString(smsContent, ce); buffer.flip(); out.write(buffer); } }

B. 当你的doDecode()方法返回false 时,CumulativeProtocolDecoder 会停止对doDecode()方法的调用,但此时如果本次数据还有未读取完的,就将含有剩余数据的IoBuffer 缓冲区保存到IoSession 中,以便下一次数据到来时可以从IoSession 中提取合并。如果发现本次数据全都读取完毕,则清空IoBuffer 缓冲区。简而言之,当你认为读取到的数据已经够解码了,那么就返回true,否则就返回false。这个CumulativeProtocolDecoder 其实最重要的工作就是帮你完成了数据的累积,因为这个工作是很烦琐的。

package com.dxz.minademo3; import java.nio.charset.Charset; import java.nio.charset.CharsetDecoder; import org.apache.mina.core.buffer.IoBuffer; import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.codec.CumulativeProtocolDecoder; import org.apache.mina.filter.codec.ProtocolDecoderOutput; public class CmccSipcDecoder extends CumulativeProtocolDecoder { private final Charset charset; public CmccSipcDecoder(Charset charset) { this.charset = charset; } @Override protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception { IoBuffer buffer = IoBuffer.allocate(100).setAutoExpand(true); CharsetDecoder cd = charset.newDecoder(); int matchCount = 0; String statusLine = "", sender = "", receiver = "", length = "", sms = ""; int i = 1; while (in.hasRemaining()) { byte b = in.get(); buffer.put(b); if (b == 10 && i < 5) { matchCount++; if (i == 1) { buffer.flip(); statusLine = buffer.getString(matchCount, cd); statusLine = statusLine.substring(0, statusLine.length() - 1); matchCount = 0; buffer.clear(); } if (i == 2) { buffer.flip(); sender = buffer.getString(matchCount, cd); sender = sender.substring(0, sender.length() - 1); matchCount = 0; buffer.clear(); } if (i == 3) { buffer.flip(); receiver = buffer.getString(matchCount, cd); receiver = receiver.substring(0, receiver.length() - 1); matchCount = 0; buffer.clear(); } if (i == 4) { buffer.flip(); length = buffer.getString(matchCount, cd); length = length.substring(0, length.length() - 1); matchCount = 0; buffer.clear(); } i++; } else if (i == 5) { matchCount++; if (matchCount == Long.parseLong(length.split(": ")[1])) { buffer.flip(); sms = buffer.getString(matchCount, cd); i++; break; } } else { matchCount++; } } SmsObject smsObject = new SmsObject(); smsObject.setSender(sender.split(": ")[1]); smsObject.setReceiver(receiver.split(": ")[1]); smsObject.setMessage(sms); out.write(smsObject); return false; } }
package com.dxz.minademo3; import org.apache.mina.core.buffer.IoBuffer; import org.apache.mina.core.session.AttributeKey; import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.codec.CumulativeProtocolDecoder; import org.apache.mina.filter.codec.ProtocolDecoderOutput; public class XXXDecoder extends CumulativeProtocolDecoder { private final AttributeKey CONTEXT = new AttributeKey(getClass(), "context"); public Context getContext(IoSession session) { Context ctx = (Context) session.getAttribute(CONTEXT); if (ctx == null) { ctx = new Context(); session.setAttribute(CONTEXT, ctx); } return ctx; } private class Context { // 状态变量 } @Override protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception { // TODO Auto-generated method stub return false; } }
package com.dxz.minademo3; import java.nio.charset.Charset; import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.codec.ProtocolCodecFactory; import org.apache.mina.filter.codec.ProtocolDecoder; import org.apache.mina.filter.codec.ProtocolEncoder; public class CmccSipcCodecFactory implements ProtocolCodecFactory { private final CmccSipcEncoder encoder; private final CmccSipcDecoder decoder; public CmccSipcCodecFactory() { this(Charset.defaultCharset()); } public CmccSipcCodecFactory(Charset charSet) { this.encoder = new CmccSipcEncoder(charSet); this.decoder = new CmccSipcDecoder(charSet); } @Override public ProtocolDecoder getDecoder(IoSession session) throws Exception { return decoder; } @Override public ProtocolEncoder getEncoder(IoSession session) throws Exception { return encoder; } }
package com.dxz.minademo3; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.charset.Charset; import org.apache.mina.core.service.IoAcceptor; import org.apache.mina.core.session.IdleStatus; import org.apache.mina.filter.codec.ProtocolCodecFilter; import org.apache.mina.filter.codec.textline.LineDelimiter; import org.apache.mina.filter.codec.textline.TextLineCodecFactory; import org.apache.mina.filter.util.ReferenceCountingFilter; import org.apache.mina.transport.socket.nio.NioSocketAcceptor; import com.dxz.minademo2.TCPServerHandler; public class TCPServer3 { public static void main(String[] args) throws IOException { IoAcceptor acceptor = new NioSocketAcceptor(); acceptor.getSessionConfig().setReadBufferSize(2048); acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10); // 编写过滤器 acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(new CmccSipcCodecFactory(Charset.forName("UTF-8")))); // 设置handler acceptor.getFilterChain().addLast("myIoFilter", new ReferenceCountingFilter(new MyIoFilter())); // 设置handler acceptor.setHandler(new TCPServerHandler()); // 绑定端口 acceptor.bind(new InetSocketAddress(9124)); System.out.println(acceptor.getSessionConfig()); } }

client

package com.dxz.minademo3; import java.net.InetSocketAddress; import java.nio.charset.Charset; import org.apache.mina.core.future.ConnectFuture; import org.apache.mina.core.future.IoFutureListener; import org.apache.mina.core.service.IoConnector; import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.codec.ProtocolCodecFilter; import org.apache.mina.filter.codec.textline.LineDelimiter; import org.apache.mina.filter.codec.textline.TextLineCodecFactory; import org.apache.mina.transport.socket.nio.NioSocketConnector; import com.dxz.minademo2.TCPClientHandler; public class TCPClient3 { public static void main(String[] args) { IoConnector connector = new NioSocketConnector(); connector.setConnectTimeoutMillis(30000); connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new CmccSipcCodecFactory(Charset.forName("UTF-8")))); connector.setHandler(new TCPClientHandler("你好!\r\n 大家好!")); ConnectFuture future = connector.connect(new InetSocketAddress("localhost", 9124)); future.addListener(new IoFutureListener 
     
       () { public void operationComplete(ConnectFuture future) { try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } IoSession session = future.getSession(); System.out.println("++++++++++++++++++++++++++++"); } }); System.out.println("*"); } } 
     

最后我们在MyIoHandler 中接收这条短信息:

package com.dxz.minademo3; import org.apache.mina.core.filterchain.IoFilter; import org.apache.mina.core.filterchain.IoFilterChain; import org.apache.mina.core.session.IdleStatus; import org.apache.mina.core.session.IoSession; import org.apache.mina.core.write.WriteRequest; public class MyIoFilter implements IoFilter { @Override public void destroy() throws Exception { System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%�stroy"); } @Override public void exceptionCaught(NextFilter nextFilter, IoSession session, Throwable cause) throws Exception { System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%exceptionCaught"); nextFilter.exceptionCaught(session, cause); } @Override public void filterClose(NextFilter nextFilter, IoSession session) throws Exception { System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%filterClose"); nextFilter.filterClose(session); } @Override public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception { System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%filterWrite"); nextFilter.filterWrite(session, writeRequest); } @Override public void init() throws Exception { System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%init"); } @Override public void messageReceived(NextFilter nextFilter, IoSession session, Object message) throws Exception { System.out.println("-%%%%%%%%%%%%%%%%%%%%%%%%%%%messageReceived"); SmsObject sms = (SmsObject) message; System.out.println("The message received is [" + sms.getMessage() + "]"); System.out.println("-----------------messageReceived"); nextFilter.messageReceived(session, message); } @Override public void messageSent(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception { System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%messageSent"); nextFilter.messageSent(session, writeRequest); } @Override public void onPostAdd(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception { System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%onPostAdd"); } @Override public void onPostRemove(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception { System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%onPostRemove"); } @Override public void onPreAdd(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception { System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%onPreAdd"); } @Override public void onPreRemove(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception { System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%onPreRemove"); } @Override public void sessionClosed(NextFilter nextFilter, IoSession session) throws Exception { System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%sessionClosed"); nextFilter.sessionClosed(session); } @Override public void sessionCreated(NextFilter nextFilter, IoSession session) throws Exception { System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%sessionCreated"); nextFilter.sessionCreated(session); } @Override public void sessionIdle(NextFilter nextFilter, IoSession session, IdleStatus status) throws Exception { System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%sessionIdle"); nextFilter.sessionIdle(session, status); } @Override public void sessionOpened(NextFilter nextFilter, IoSession session) throws Exception { System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%sessionOpened"); nextFilter.sessionOpened(session); } }

你会看到Server 端的控制台输出如下信息:

 %%%%%%%%%%%%%%%%%%%%%%%%%%%init %%%%%%%%%%%%%%%%%%%%%%%%%%%onPreAdd %%%%%%%%%%%%%%%%%%%%%%%%%%%onPostAdd %%%%%%%%%%%%%%%%%%%%%%%%%%%sessionCreated server session created %%%%%%%%%%%%%%%%%%%%%%%%%%%sessionOpened server session Opened -%%%%%%%%%%%%%%%%%%%%%%%%%%%messageReceived The message received is [你好!Hello World!] -----------------messageReceived The message received is [com.dxz.minademo3.SmsObject@2b]

CmccSispcDecoder 类改为如下的写法:

public class CmccSipcDecoder extends CumulativeProtocolDecoder { private final Charset charset; private final AttributeKey CONTEXT = new AttributeKey(getClass(), "context"); public CmccSipcDecoder(Charset charset) { this.charset = charset; } @Override protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception { Context ctx = getContext(session); CharsetDecoder cd = charset.newDecoder(); int matchCount = ctx.getMatchCount(); int line = ctx.getLine(); IoBuffer buffer = ctx.innerBuffer; String statusLine = ctx.getStatusLine(), sender = ctx.getSender(), receiver = ctx.getReceiver(), length = ctx.getLength(), sms = ctx.getSms(); while (in.hasRemaining()) { byte b = in.get(); matchCount++; buffer.put(b); if (line < 4 && b == 10) { if (line == 0) { buffer.flip(); statusLine = buffer.getString(matchCount, cd); statusLine = statusLine.substring(0, statusLine.length() - 1); matchCount = 0; buffer.clear(); ctx.setStatusLine(statusLine); } if (line == 1) { buffer.flip(); sender = buffer.getString(matchCount, cd); sender = sender.substring(0, sender.length() - 1); matchCount = 0; buffer.clear(); ctx.setSender(sender); } if (line == 2) { buffer.flip(); receiver = buffer.getString(matchCount, cd); receiver = receiver.substring(0, receiver.length() - 1); matchCount = 0; buffer.clear(); ctx.setReceiver(receiver); } if (line == 3) { buffer.flip(); length = buffer.getString(matchCount, cd); length = length.substring(0, length.length() - 1); matchCount = 0; buffer.clear(); ctx.setLength(length); } line++; } else if (line == 4) { if (matchCount == Long.parseLong(length.split(": ")[1])) { buffer.flip(); sms = buffer.getString(matchCount, cd); ctx.setSms(sms); // 由于下面的break,这里需要调用else外面的两行代码 ctx.setMatchCount(matchCount); ctx.setLine(line); break; } } ctx.setMatchCount(matchCount); ctx.setLine(line); } if (ctx.getLine() == 4 && Long.parseLong(ctx.getLength().split(": ")[1]) == ctx .getMatchCount()) { SmsObject smsObject = new SmsObject(); smsObject.setSender(sender.split(": ")[1]); smsObject.setReceiver(receiver.split(": ")[1]); smsObject.setMessage(sms); out.write(smsObject); ctx.reset(); return true; } else { return false; } } private Context getContext(IoSession session) { Context context = (Context) session.getAttribute(CONTEXT); if (context == null){ context = new Context(); session.setAttribute(CONTEXT, context); } return context; } private class Context { private final IoBuffer innerBuffer; private String statusLine = ""; private String sender = ""; private String receiver = ""; private String length = ""; private String sms = ""; public Context() { innerBuffer = IoBuffer.allocate(100).setAutoExpand(true); } private int matchCount = 0; private int line = 0; public int getMatchCount() { return matchCount; } public void setMatchCount(int matchCount) { this.matchCount = matchCount; } public int getLine() { return line; } public void setLine(int line) { this.line = line; } public String getStatusLine() { return statusLine; } public void setStatusLine(String statusLine) { this.statusLine = statusLine; } public String getSender() { return sender; } public void setSender(String sender) { this.sender = sender; } public String getReceiver() { return receiver; } public void setReceiver(String receiver) { this.receiver = receiver; } public String getLength() { return length; } public void setLength(String length) { this.length = length; } public String getSms() { return sms; } public void setSms(String sms) { this.sms = sms; } public void reset() { this.innerBuffer.clear(); this.matchCount = 0; this.line = 0; this.statusLine = ""; this.sender = ""; this.receiver = ""; this.length = ""; this.sms = ""; } } } 
ConnectFuture future = connector.connect(new InetSocketAddress( HOSTNAME, PORT)); future.awaitUninterruptibly(); session = future.getSession(); for (int i = 0; i < 3; i++) { SmsObject sms = new SmsObject(); session.write(sms); System.out.println("" + i); } 
public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception { SmsObject sms = (SmsObject) message; CharsetEncoder ce = charset.newEncoder(); String statusLine = "M sip:wap.fetion.com.cn SIP-C/2.0"; String sender = ""; String receiver = ""; String smsContent = "你好!Hello World!"; IoBuffer buffer = IoBuffer.allocate(100).setAutoExpand(true); buffer.putString(statusLine + '/n', ce); buffer.putString("S: " + sender + '/n', ce); buffer.putString("R: " + receiver + '/n', ce); buffer.flip(); out.write(buffer); IoBuffer buffer2 = IoBuffer.allocate(100).setAutoExpand(true); buffer2.putString("L: " + (smsContent.getBytes(charset).length) + "/n",ce); buffer2.putString(smsContent, ce); buffer2.putString(statusLine + '/n', ce); buffer2.flip(); out.write(buffer2); IoBuffer buffer3 = IoBuffer.allocate(100).setAutoExpand(true); buffer3.putString("S: " + sender + '/n', ce); buffer3.putString("R: " + receiver + '/n', ce); buffer3.putString("L: " + (smsContent.getBytes(charset).length) + "/n",ce); buffer3.putString(smsContent, ce); buffer3.putString(statusLine + '/n', ce); buffer3.flip(); out.write(buffer3); } 

上面的这段代码要配合MyClient来操作,你需要做的是在MyClient中的红色输出语句处设置断点,然后第一调用时CmccSipcEncoder中注释掉蓝、绿色的代码,也就是发送两条短信息的第一部分(红色的代码),依次类推,也就是MyClient的中的三次断点中,分别执行CmccSipcEncoder中的红、蓝、绿三段代码,也就是模拟两条短信的三段发送。你会看到Server端的运行结果是:当MyClient第一次到达断点时,没有短信息被读取到,当MyClient第二次到达断点时,第一条短信息输出,当MyClient第三次到达断点时,第二条短信息输出。

(6-3.)多路分离的解码器:
假设一段数据发送过来之后,需要根据某种条件决定使用哪个解码器,而不是像上面的例子,固定使用一个解码器,那么该如何做呢?幸好Mina 提供了org.apache.mina.filter.codec.demux 包来完成这种多路分离(Demultiplexes)的解码工作,也就是同时注册多个解码器,然后运行时依据传入的数据决定到底使用哪个解码器来工作。所谓多路分离就是依据条件分发到指定的解码器,譬如:上面的短信协议进行扩展,可以依据状态行来判断使用1.0 版本的短信协议解码器还是2.0版本的短信协议解码器。
下面我们使用一个简单的例子,说明这个多路分离的解码器是如何使用的,需求如下所示:
(1.) 客户端传入两个int 类型的数字,还有一个char 类型的符号。
(2.) 如果符号是+,服务端就是用1 号解码器,对两个数字相加,然后把结果返回给客户端。
(3.) 如果符号是-,服务端就使用2 号解码器,将两个数字变为相反数,然后相加,把结果返回给客户端。
Demux 开发编解码器主要有如下几个步骤:
A. 定义Client 端、Server 端发送、接收的数据对象。
B. 使用Demux 编写编码器是实现MessageEncoder

接口,T 是你要编码的数据对象,这个MessageEncoder 会在DemuxingProtocolEncoder 中调用。

C. 使用Demux 编写编码器是实现MessageDecoder 接口,这个MessageDecoder 会在DemuxingProtocolDecoder 中调用。


















public interface MessageEncoder 
     
       { void encode(IoSession session, T message, ProtocolEncoderOutput out) throws Exception; } 
     
public interface MessageDecoder { static MessageDecoderResult OK = MessageDecoderResult.OK; static MessageDecoderResult NEED_DATA = MessageDecoderResult.NEED_DATA; static MessageDecoderResult NOT_OK = MessageDecoderResult.NOT_OK; MessageDecoderResult decodable(IoSession session, IoBuffer in); MessageDecoderResult decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception; void finishDecode(IoSession session, ProtocolDecoderOutput out) throws Exception; }

(1.)decodable()方法有三个返回值,分别表示如下的含义:
A. MessageDecoderResult.NOT_OK:表示这个解码器不适合解码数据,然后检查其它解码器,如果都不满足会抛异常;
B. MessageDecoderResult.NEED_DATA:表示当前的读入的数据不够判断是否能够使用这个解码器解码,然后再次调用decodable()方法检查其它解码器,如果都是NEED_DATA,则等待下次输入;
C. MessageDecoderResult.OK: 表示这个解码器可以解码读入的数据, 然后则调用MessageDecoder 的decode()方法。这里注意decodable()方法对参数IoBuffer in 的任何操作在方法结束之后,都会复原,也就是你不必担心在调用decode()方法时,position 已经不在缓冲区的起始位置。这个方法相当于是预读取,用于判断是否是可用的解码器。
(2.)decode()方法有三个返回值,分别表示如下的含义:
A. MessageDecoderResult.NOT_OK:表示解码失败,会抛异常;










B. MessageDecoderResult.NEED_DATA:表示数据不够,需要读到新的数据后,再次调用decode()方法。
C. MessageDecoderResult.OK:表示解码成功。

代码演示:
(1.)客户端发送的数据对象:

package com.dxz.minademo4; public class SendMessage { private int i = 0; private int j = 0; private char symbol = '+'; public char getSymbol() { return symbol; } public void setSymbol(char symbol) { this.symbol = symbol; } public int getI() { return i; } public void setI(int i) { this.i = i; } public int getJ() { return j; } public void setJ(int j) { this.j = j; } }

(2.)服务端发送的返回结果对象:

package com.dxz.minademo4; public class ResultMessage { private int result = 0; public int getResult() { return result; } public void setResult(int result) { this.result = result; } }

(3.)客户端使用的SendMessage的编码器:

package com.dxz.minademo4; import org.apache.mina.core.buffer.IoBuffer; import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.codec.ProtocolEncoderOutput; import org.apache.mina.filter.codec.demux.MessageEncoder; public class SendMessageEncoder implements MessageEncoder 
     
       { @Override public void encode(IoSession session, SendMessage message, ProtocolEncoderOutput out) throws Exception { IoBuffer buffer = IoBuffer.allocate(10); buffer.putChar(message.getSymbol()); buffer.putInt(message.getI()); buffer.putInt(message.getJ()); buffer.flip(); out.write(buffer); } } 
     
package com.dxz.minademo4; import org.apache.mina.core.buffer.IoBuffer; import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.codec.ProtocolDecoderOutput; import org.apache.mina.filter.codec.demux.MessageDecoder; import org.apache.mina.filter.codec.demux.MessageDecoderResult; public class SendMessageDecoderPositive implements MessageDecoder { @Override public MessageDecoderResult decodable(IoSession session, IoBuffer in) { if (in.remaining() < 2) return MessageDecoderResult.NEED_DATA; else { char symbol = in.getChar(); if (symbol == '+') { return MessageDecoderResult.OK; } else { return MessageDecoderResult.NOT_OK; } } } @Override public MessageDecoderResult decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception { SendMessage sm = new SendMessage(); sm.setSymbol(in.getChar()); sm.setI(in.getInt()); sm.setJ(in.getInt()); out.write(sm); return MessageDecoderResult.OK; } @Override public void finishDecode(IoSession session, ProtocolDecoderOutput out) throws Exception { // undo } }
package com.dxz.minademo4; import org.apache.mina.core.buffer.IoBuffer; import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.codec.ProtocolDecoderOutput; import org.apache.mina.filter.codec.demux.MessageDecoder; import org.apache.mina.filter.codec.demux.MessageDecoderResult; public class SendMessageDecoderNegative implements MessageDecoder { @Override public MessageDecoderResult decodable(IoSession session, IoBuffer in) { if (in.remaining() < 2) return MessageDecoderResult.NEED_DATA; else { char symbol = in.getChar(); if (symbol == '-') { return MessageDecoderResult.OK; } else { return MessageDecoderResult.NOT_OK; } } } @Override public MessageDecoderResult decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception { SendMessage sm = new SendMessage(); sm.setSymbol(in.getChar()); sm.setI(-in.getInt()); sm.setJ(-in.getInt()); out.write(sm); return MessageDecoderResult.OK; } @Override public void finishDecode(IoSession session, ProtocolDecoderOutput out) throws Exception { // undo } }

(6.)服务端使用的ResultMessage的编码器:

package com.dxz.minademo4; import org.apache.mina.core.buffer.IoBuffer; import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.codec.ProtocolEncoderOutput; import org.apache.mina.filter.codec.demux.MessageEncoder; public class ResultMessageEncoder implements MessageEncoder 
     
       { @Override public void encode(IoSession session, ResultMessage message, ProtocolEncoderOutput out) throws Exception { IoBuffer buffer = IoBuffer.allocate(4); buffer.putInt(message.getResult()); buffer.flip(); out.write(buffer); } } 
     

(7.)客户端使用的ResultMessage的解码器:

package com.dxz.minademo4; import org.apache.mina.core.buffer.IoBuffer; import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.codec.ProtocolDecoderOutput; import org.apache.mina.filter.codec.demux.MessageDecoder; import org.apache.mina.filter.codec.demux.MessageDecoderResult; public class ResultMessageDecoder implements MessageDecoder { @Override public MessageDecoderResult decodable(IoSession session, IoBuffer in) { if (in.remaining() < 4) return MessageDecoderResult.NEED_DATA; else if (in.remaining() == 4) return MessageDecoderResult.OK; else return MessageDecoderResult.NOT_OK; } @Override public MessageDecoderResult decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception { ResultMessage rm = new ResultMessage(); rm.setResult(in.getInt()); out.write(rm); return MessageDecoderResult.OK; } @Override public void finishDecode(IoSession session, ProtocolDecoderOutput out) throws Exception { // undo } }

(8.)组装这些编解码器的工厂:

package com.dxz.minademo4; import org.apache.mina.filter.codec.demux.DemuxingProtocolCodecFactory; public class MathProtocolCodecFactory extends DemuxingProtocolCodecFactory { public MathProtocolCodecFactory(boolean server) { if (server) { super.addMessageEncoder(ResultMessage.class, ResultMessageEncoder.class); super.addMessageDecoder(SendMessageDecoderPositive.class); super.addMessageDecoder(SendMessageDecoderNegative.class); } else { super.addMessageEncoder(SendMessage.class, SendMessageEncoder.class); super.addMessageDecoder(ResultMessageDecoder.class); } } }
package com.dxz.minademo4; import java.net.InetSocketAddress; import org.apache.mina.core.service.IoAcceptor; import org.apache.mina.core.session.IdleStatus; import org.apache.mina.filter.codec.ProtocolCodecFilter; import org.apache.mina.filter.logging.LoggingFilter; import org.apache.mina.transport.socket.nio.NioSocketAcceptor; public class Server { public static void main(String[] args) throws Exception { IoAcceptor acceptor = new NioSocketAcceptor(); LoggingFilter lf = new LoggingFilter(); acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 5); acceptor.getFilterChain().addLast("logger", lf); acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(new MathProtocolCodecFactory(true))); acceptor.setHandler(new ServerHandler()); acceptor.bind(new InetSocketAddress(9123)); } }

(10.)Server端使用的IoHandler:

package com.dxz.minademo4; import org.apache.mina.core.service.IoHandlerAdapter; import org.apache.mina.core.session.IdleStatus; import org.apache.mina.core.session.IoSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class ServerHandler extends IoHandlerAdapter { private final static Logger log = LoggerFactory.getLogger(ServerHandler.class); @Override public void sessionIdle(IoSession session, IdleStatus status) throws Exception { session.close(true); } @Override public void messageReceived(IoSession session, Object message) throws Exception { SendMessage sm = (SendMessage) message; log.info("The message received is [ " + sm.getI() + " " + sm.getSymbol() + " " + sm.getJ() + " ]"); ResultMessage rm = new ResultMessage(); rm.setResult(sm.getI() + sm.getJ()); session.write(rm); } }

(11.)Client端:

package com.dxz.minademo4; import java.net.InetSocketAddress; import org.apache.mina.core.service.IoConnector; import org.apache.mina.filter.codec.ProtocolCodecFilter; import org.apache.mina.filter.logging.LoggingFilter; import org.apache.mina.transport.socket.nio.NioSocketConnector; public class Client { public static void main(String[] args) throws Throwable { IoConnector connector = new NioSocketConnector(); connector.setConnectTimeoutMillis(30000); connector.getFilterChain().addLast("logger", new LoggingFilter()); connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new MathProtocolCodecFactory(false))); connector.setHandler(new ClientHandler()); connector.connect(new InetSocketAddress("localhost", 9123)); } }

(12.)Client端的IoHandler:

package com.dxz.minademo4; import org.apache.mina.core.service.IoHandlerAdapter; import org.apache.mina.core.session.IoSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class ClientHandler extends IoHandlerAdapter { private final static Logger LOGGER = LoggerFactory.getLogger(ClientHandler.class); @Override public void sessionOpened(IoSession session) throws Exception { SendMessage sm = new SendMessage(); sm.setI(100); sm.setJ(99); sm.setSymbol('+'); session.write(sm); } @Override public void messageReceived(IoSession session, Object message) { ResultMessage rs = (ResultMessage) message; LOGGER.info(String.valueOf(rs.getResult())); } }

你尝试改变ClientHandler中的Synbol中的红色代码中的正负号,会看到服务端使用了两个不同的解码器对其进行处理。

The message received is [ 100 + 99 ] The message received is [ -100 - -99 ]

7.线程模型配置:
Mina 中的很多执行环节都使用了多线程机制,用于提高性能。Mina 中默认在三个地方使用了线程:
(1.) IoAcceptor:
这个地方用于接受客户端的连接建立,每监听一个端口(每调用一次bind()方法),都启用一个线程,这个数字我们不能改变。这个线程监听某个端口是否有请求到来,一旦发现,则创建一个IoSession 对象。因为这个动作很快,所以有一个线程就够了。
(2.) IoConnector:
这个地方用于与服务端建立连接,每连接一个服务端(每调用一次connect()方法),就启用一个线程,我们不能改变。同样的,这个线程监听是否有连接被建立,一旦发现,则创建一个IoSession 对象。因为这个动作很快,所以有一个线程就够了。
(3.) IoProcessor:
这个地方用于执行真正的IO 操作,默认启用的线程个数是CPU 的核数+1,譬如:单CPU 双核的电脑,默认的IoProcessor 线程会创建3 个。这也就是说一个IoAcceptor 或者IoConnector 默认会关联一个IoProcessor 池,这个池中有3 个IoProcessor。因为IO 操作耗费资源,所以这里使用IoProcessor 池来完成数据的读写操作,有助于提高性能。这也就是前面说的IoAccetor、IoConnector 使用一个Selector,而IoProcessor 使用自己单独的Selector 的原因。那么为什么IoProcessor 池中的IoProcessor 数量只比CPU 的核数大1 呢?因为IO 读写操作是耗费CPU 的操作,而每一核CPU 同时只能运行一个线程,因此IoProcessor 池中的IoProcessor 的数量并不是越多越好。














下面我们完整的综述一下Mina 的工作流程:
(1.) 当 IoService 实例创建的时候,同时一个关联在IoService上的IoProcessor 池、线程池也被创建;
(2.) 当 IoService 建立套接字(IoAcceptor 的bind()或者是IoConnector 的connect()方法被调用)时,IoService 从线程池中取出一个线程,监听套接字端口;
(3.) 当 IoService 监听到套接字上有连接请求时,建立IoSession 对象,从IoProcessor池中取出一个IoProcessor 实例执行这个会话通道上的过滤器、IoHandler;
(4.) 当这条IoSession 通道进入空闲状态或者关闭时,IoProcessor 被回收。上面说的是Mina 默认的线程工作方式,那么我们这里要讲的是如何配置IoProcessor 的多线程工作方式。因为一个IoProcessor 负责执行一个会话上的所有过滤器、IoHandler,也就是对于IO 读写操作来说,是单线程工作方式(就是按照顺序逐个执行)。假如你想让某个事件方法(譬如:sessionIdle()、sessionOpened()等)在单独的线程中运行(也就是非IoProcessor 所在的线程),那么这里就需要用到一个ExecutorFilter 的过滤器。你可以看到IoProcessor 的构造方法中有一个参数是java.util.concurrent.Executor,也就是可以让IoProcessor 调用的过滤器、IoHandler 中的某些事件方法在线程池中分配的线程上独立运行,而不是运行在IoProcessor 所在的线程。








public enum IoEventType { SESSION_CREATED, SESSION_OPENED, SESSION_CLOSED, MESSAGE_RECEIVED, MESSAGE_SENT, SESSION_IDLE, EXCEPTION_CAUGHT, WRITE, CLOSE, } 

默认情况下,没有配置关注的事件类型,有如下六个事件方法会被自动使用线程池异步执行:

IoEventType.EXCEPTION_CAUGHT, IoEventType.MESSAGE_RECEIVED, IoEventType.MESSAGE_SENT, IoEventType.SESSION_CLOSED, IoEventType.SESSION_IDLE, IoEventType.SESSION_OPENED

其实ExecutorFilter 的工作机制很简单,就是在调用下一个过滤器的事件方法时,把其交给Executor 的execute(Runnable runnable)方法来执行,其实你自己在IoHandler 或者某个过滤器的事件方法中开启一个线程,也可以完成同样的功能,只不过这样做,你就失去了程序的可配置性,线程调用的代码也会完全耦合在代码中。但要注意的是绝对不能开启线程让其执行sessionCreated()方法。如果你真的打算使用这个ExecutorFilter,那么最好想清楚它该放在过滤器链的哪个位置,针对哪些事件做异步处理机制。一般ExecutorFilter 都是要放在ProtocolCodecFilter 过滤器的后面,也就是不要让编解码运行在独立的线程上,而是要运行在IoProcessor 所在的线程,因为编解码处理的数据都是由IoProcessor 读取和发送的,没必要开启新的线程,否则性能反而会下降。一般使用ExecutorFilter 的典型场景是将业务逻辑(譬如:耗时的数据库操作)放在单独的线程中运行,也就是说与IO 处理无关的操作可以考虑使用ExecutorFilter 来异步执行。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/208771.html原文链接:https://javaforall.net

(0)
上一篇 2026年3月19日 上午10:48
下一篇 2026年3月19日 上午10:48


相关推荐

  • matlab之length函数[通俗易懂]

    matlab之length函数[通俗易懂]1、size获取数组的行数和列数2、length数组长度,即行数和列数中的较大值,相当于max(size(a))3、numel返回元素总数

    2022年4月27日
    102
  • sublime激活码-激活码分享

    (sublime激活码)这是一篇idea技术相关文章,由全栈君为大家提供,主要知识点是关于2021JetBrains全家桶永久激活码的内容IntelliJ2021最新激活注册码,破解教程可免费永久激活,亲测有效,下面是详细链接哦~https://javaforall.net/100143.htmlS32PGH0SQB-eyJsaWN…

    2022年3月26日
    313
  • 非阻塞recvfrom的设置[通俗易懂]

    非阻塞recvfrom的设置[通俗易懂]非阻塞recvfrom的设置

    2022年7月23日
    45
  • spssfisher判别分析步骤_SPSS判别分析

    spssfisher判别分析步骤_SPSS判别分析说明 非原创 借鉴网上多方材料整理 其中包括 CSDN 博主路易三十六 TOMOCAT 及百度资源整合做的一份学习笔记 与大家共享 一 定义判别分析又称 分辨法 是在分类确定的条件下 根据某一研究对象的各种特征值判别其类型归属问题的一种多变量统计分析方法 二 判别分析的一般形式 y a1x1 a2x2 anxn a1 为系数 Xn 为变量 事先非常明确共有几个类别 目的是从已知样本中训练出判别函数三

    2026年3月19日
    1
  • 免费拿走Vivado2017.4安装包及其license(附带安装教程)

    免费拿走Vivado2017.4安装包及其license(附带安装教程)免费拿走Vivado2017.4安装包及其license(附带安装教程)安装包下载地址:添加链接描述license:在文尾。。。安装教程:1.双击安装包文件夹中的xsetup文件2.运行安装:点击next。。3.三个IAgree都选上。。4.选择第二个,包含VIVADO设计的所有部件。。5.选择功能,一般选默认就好。。6.选择安装路径,保证容量足够即可,路径名不要出现非法字符。。7.点击Install,开始安装。。8.开始后,等几分钟,中间会跳出一两个安装确认,都点

    2022年7月26日
    96
  • DHCP协议工作流程

    DHCP协议工作流程一个全新主机的请求一次DHCP服务的过程主要包含以下四个步骤第一步:客户机操作系统生成一个DHCPdiscover发现报文,被放置在一个具有广播IP目的地址(255.255.255.255)和源IP地址为0.0.0.0的IP数据报中(网络层),发送到以太网,试图找到网络中DHCP服务器以获取一个IP地址。这一步可以简化为DHCPdiscover,源IP:0.0.0.0,目的IP:255.255.255.255,表示客户机说我需要一个IP,DHCP你在哪,能不能分我一个?第二步:当运行在路

    2022年5月23日
    47

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号