Apache Mina Server 是一个网络通信应用框架,也就是说,它主要是对基于TCP/IP、UDP/IP协议栈的通信框架(当然,也可以提供JAVA 对象的序列化服务、虚拟机管道通信服务等),Mina 可以帮助我们快速开发高性能、高扩展性的网络通信应用,Mina 提供了事件驱动、异步(Mina 的异步IO 默认使用的是JAVA NIO 作为底层支持)操作的编程模型。Mina 主要有1.x 和2.x 两个分支,这里我们讲解最新版本2.0,如果你使用的是Mina 1.x,那么可能会有一些功能并不适用。学习本文档,需要你已掌握JAVA IO、JAVA NIO、JAVASocket、JAVA 线程及并发库(java.util.concurrent.*)的知识。Mina 同时提供了网络通信的Server 端、Client 端的封装,无论是哪端,Mina 在整个网通通信结构中都处于如下的位置:可见Mina 的API 将真正的网络通信与我们的应用程序隔离开来,你只需要关心你要发送、接收的数据以及你的业务逻辑即可。同样的,无论是哪端,Mina 的执行流程如下所示:
(1.) IoService:这个接口在一个线程上负责套接字的建立,拥有自己的Selector,监听是否有连接被建立。
(2.) IoProcessor:这个接口在另一个线程上,负责检查是否有数据在通道上读写,也就是说它也拥有自己的Selector,这是与我们使用JAVA NIO 编码时的一个不同之处,通常在JAVA NIO 编码中,我们都是使用一个Selector,也就是不区分IoService与IoProcessor 两个功能接口。另外,IoProcessor 负责调用注册在IoService 上的过滤器,并在过滤器链之后调用IoHandler。
(3.) IoFilter:这个接口定义一组拦截器,这些拦截器可以包括日志输出、黑名单过滤、数据的编码(write 方向)与解码(read 方向)等功能,其中数据的encode 与decode是最为重要的、也是你在使用Mina 时最主要关注的地方。
(4.) IoHandler:这个接口负责编写业务逻辑,也就是接收、发送数据的地方。
1. 简单的TCPServer:
(1.) 第一步:编写IoService
按照上面的执行流程,我们首先需要编写IoService,IoService 本身既是服务端,又是客户端,我们这里编写服务端,所以使用IoAcceptor 实现,由于IoAcceptor 是与协议无关的,因为我们要编写TCPServer,所以我们使用IoAcceptor 的实现NioSocketAcceptor,实际上底层就是调用java.nio.channels.ServerSocketChannel 类。当然,如果你使用了Apache 的APR 库,那么你可以选择使AprSocketAcceptor 作为TCPServer 的实现,据传说Apache APR库的性能比JVM 自带的本地库高出很多。那么IoProcessor 是由指定的IoService 内部创建并调用的,我们并不需要关心。
- IoAcceptor acceptor=new NioSocketAcceptor();
- acceptor.getSessionConfig().setReadBufferSize(2048);
- acceptor.getSessionConfig.setIdleTime(IdleStatus.BOTH_IDLE,10);
- acceptor.bind(new InetSocketAddress(9123));
这段代码我们初始化了服务端的TCP/IP 的基于NIO 的套接字,然后调用IoSessionConfig设置读取数据的缓冲区大小、读写通道均在10 秒内无任何操作就进入空闲状态。
- acceptor.getFilterChain().addLast(“codec”, new ProtocolCodecFilter(new TextLineCodecFactory(
-
“white-space:pre”> Charset.forName(
“UTF-8”),
-
“white-space:pre”> LineDelimeter.WINDOWS.getValue(),
-
“white-space:pre”> LineDelimiter. WINDOWS.getValue()))
- );
这段代码要在acceptor.bind()方法之前执行,因为绑定套接字之后就不能再做这些准备工作了。这里先不用清楚编解码器是如何工作的,这个是后面重点说明的内容,这里你只需要清楚,我们传输的以换行符为标识的数据,所以使用了Mina 自带的换行符编解码器工厂。
(3.) 第三步:编写IoHandler
这里我们只是简单的打印Client 传说过来的数据。
- public class MyIoHandler extends IoHandlerAdapter {
- // 这里我们使用的SLF4J作为日志门面,至于为什么在后面说明。
- private final static Logger log = LoggerFactory
- .getLogger(MyIoHandler.class);
- @Override
- public void messageReceived(IoSession session, Object message)
- throws Exception {
- String str = message.toString();
- log.info(“The message received is [“ + str + “]”);
- if (str.endsWith(“quit”)) {
- session.close(true);
- return;
- }
- }
- }
然后我们把这个IoHandler 注册到IoService:
- acceptor.setHandler(new MyIoHandler());
当然这段代码也要在acceptor.bind()方法之前执行。然后我们运行MyServer 中的main 方法,你可以看到控制台一直处于阻塞状态,此时,我们用telnet 127.0.0.1 9123 访问,然后输入一些内容,当按下回车键,你会发现数据在Server 端被输出,但要注意不要输入中文,因为Windows 的命令行窗口不会对传输的数据进行UTF-8 编码。当输入quit 结尾的字符串时,连接被断开。这里注意你如果使用的操作系统,或者使用的Telnet 软件的换行符是什么,如果不清楚,可以删掉第二步中的两个红色的参数,使用TextLineCodec 内部的自动识别机制。
2. 简单的TCPClient:
这里我们实现Mina 中的TCPClient,因为前面说过无论是Server 端还是Client 端,在Mina中的执行流程都是一样的。唯一不同的就是IoService 的Client 端实现是IoConnector。
(1.) 第一步:编写IoService并注册过滤器
- public class MyClient {
- main方法:
- IoConnector connector=new NioSocketConnector();
- connector.setConnectTimeoutMillis(30000);
- connector.getFilterChain().addLast(“codec”,
- new ProtocolCodecFilter(
- new TextLineCodecFactory(
- Charset.forName(“UTF-8”),
- LineDelimiter.WINDOWS.getValue(),
- LineDelimiter.WINDOWS.getValue()
- )
- )
- );
- connector.connect(new InetSocketAddress(“localhost”, 9123));
- }
(2.) 第三步:编写IoHandler
- public class ClientHandler extends IoHandlerAdapter {
- private final static Logger LOGGER = LoggerFactory
- .getLogger(ClientHandler.class);
- private final String values;
- public ClientHandler(String values) {
- this.values = values;
- }
- @Override
- public void sessionOpened(IoSession session) {
- session.write(values);
- }
- }
注册IoHandler:
- connector.setHandler(new ClientHandler(“你好!\r\n 大家好!”));
3. 介绍Mina的TCP的主要接口:
通过上面的两个示例,你应该对Mina 如何编写TCP/IP 协议栈的网络通信有了一些感性的认识。
(1.)IoService:
这个接口是服务端IoAcceptor、客户端IoConnector 的抽象,提供IO 服务和管理IoSession的功能,它有如下几个常用的方法:
A. TransportMetadata getTransportMetadata():
这个方法获取传输方式的元数据描述信息,也就是底层到底基于什么的实现,譬如:nio、apr 等。
B. void addListener(IoServiceListener listener):
这个方法可以为IoService 增加一个监听器,用于监听IoService 的创建、活动、失效、空闲、销毁,具体可以参考IoServiceListener 接口中的方法,这为你参与IoService 的生命周期提供了机会。
C. void removeListener(IoServiceListener listener):
这个方法用于移除上面的方法添加的监听器。
D. void setHandler(IoHandler handler):
这个方法用于向IoService 注册IoHandler,同时有getHandler()方法获取Handler。
E. Map
getManagedSessions():
这个方法获取IoService 上管理的所有IoSession,Map 的key 是IoSession 的id。
F. IoSessionConfig getSessionConfig():
这个方法用于获取IoSession 的配置对象,通过IoSessionConfig 对象可以设置Socket 连接的一些选项。
(2.)IoAcceptor:
这个接口是TCPServer 的接口,主要增加了void bind()监听端口、void unbind()解除对套接字的监听等方法。这里与传统的JAVA 中的ServerSocket 不同的是IoAcceptor 可以多次调用bind()方法(或者在一个方法中传入多个SocketAddress 参数)同时监听多个端口。
3.)IoConnector:
这个接口是TCPClient 的接口, 主要增加了ConnectFuture connect(SocketAddressremoteAddress,SocketAddress localAddress)方法,用于与Server 端建立连接,第二个参数如果不传递则使用本地的一个随机端口访问Server 端。这个方法是异步执行的,同样的,也可以同时连接多个服务端。
(4.)IoSession:
这个接口用于表示Server 端与Client 端的连接,IoAcceptor.accept()的时候返回实例。
这个接口有如下常用的方法:
A. WriteFuture write(Object message):
这个方法用于写数据,该操作是异步的。
B. CloseFuture close(boolean immediately):
这个方法用于关闭IoSession,该操作也是异步的,参数指定true 表示立即关闭,否则就在所有的写操作都flush 之后再关闭。
C. Object setAttribute(Object key,Object value):
这个方法用于给我们向会话中添加一些属性,这样可以在会话过程中都可以使用,类似于HttpSession 的setAttrbute()方法。IoSession 内部使用同步的HashMap 存储你添加的自
定义属性。
D. SocketAddress getRemoteAddress():
这个方法获取远端连接的套接字地址。
E. void suspendWrite():
这个方法用于挂起写操作,那么有void resumeWrite()方法与之配对。对于read()方法同样适用。
F. ReadFuture read():
这个方法用于读取数据, 但默认是不能使用的, 你需要调用IoSessionConfig 的setUseReadOperation(true)才可以使用这个异步读取的方法。一般我们不会用到这个方法,因为这个方法的内部实现是将数据保存到一个BlockingQueue,假如是Server 端,因为大量的Client 端发送的数据在Server 端都这么读取,那么可能会导致内存泄漏,但对于Client,可能有的时候会比较便利。
G. IoService getService():
这个方法返回与当前会话对象关联的IoService 实例。
关于TCP连接的关闭:
无论在客户端还是服务端,IoSession 都用于表示底层的一个TCP 连接,那么你会发现无论是Server 端还是Client 端的IoSession 调用close()方法之后,TCP 连接虽然显示关闭, 但主线程仍然在运行,也就是JVM 并未退出,这是因为IoSession 的close()仅仅是关闭了TCP的连接通道,并没有关闭Server 端、Client 端的程序。你需要调用IoService 的dispose()方法停止Server 端、Client 端。
(5.)IoSessionConfig:
这个方法用于指定此次会话的配置,它有如下常用的方法:
A. void setReadBufferSize(int size):
(6.)IoHandler:
这个接口是你编写业务逻辑的地方,从上面的示例代码可以看出,读取数据、发送数据基本都在这个接口总完成,这个实例是绑定到IoService 上的,有且只有一个实例(没有给一个IoService 注入一个IoHandler 实例会抛出异常)。它有如下几个方法:
A. void sessionCreated(IoSession session):
这个方法当一个Session 对象被创建的时候被调用。对于TCP 连接来说,连接被接受的时候调用,但要注意此时TCP 连接并未建立,此方法仅代表字面含义,也就是连接的对象IoSession 被创建完毕的时候,回调这个方法。对于UDP 来说,当有数据包收到的时候回调这个方法,因为UDP 是无连接的。
B. void sessionOpened(IoSession session):
这个方法在连接被打开时调用,它总是在sessionCreated()方法之后被调用。对于TCP 来说,它是在连接被建立之后调用,你可以在这里执行一些认证操作、发送数据等。对于UDP 来说,这个方法与sessionCreated()没什么区别,但是紧跟其后执行。如果你每隔一段时间,发送一些数据,那么sessionCreated()方法只会在第一次调用,但是sessionOpened()方法每次都会调用。
C. void sessionClosed(IoSession session) :
对于TCP 来说,连接被关闭时,调用这个方法。对于UDP 来说,IoSession 的close()方法被调用时才会毁掉这个方法。
D. void sessionIdle(IoSession session, IdleStatus status) :
这个方法在IoSession 的通道进入空闲状态时调用,对于UDP 协议来说,这个方法始终不会被调用。
E. void exceptionCaught(IoSession session, Throwable cause) :
这个方法在你的程序、Mina 自身出现异常时回调,一般这里是关闭IoSession。
(7.)IoBuffer:
这个接口是对JAVA NIO 的ByteBuffer 的封装,这主要是因为ByteBuffer 只提供了对基本数据类型的读写操作,没有提供对字符串等对象类型的读写方法,使用起来更为方便,另外,ByteBuffer 是定长的,如果想要可变,将很麻烦。IoBuffer 的可变长度的实现类似于StringBuffer。IoBuffer 与ByteBuffer 一样,都是非线程安全的。本节的一些内容如果不清楚,可以参考java.nio.ByteBuffer 接口。这个接口有如下常用的方法:
A. static IoBuffer allocate(int capacity,boolean useDirectBuffer):
这个方法内部通过SimpleBufferAllocator 创建一个实例,第一个参数指定初始化容量,第二个参数指定使用直接缓冲区还是JAVA 内存堆的缓存区,默认为false。
B. void free():
释放缓冲区,以便被一些IoBufferAllocator 的实现重用,一般没有必要调用这个方法,除非你想提升性能(但可能未必效果明显)。
C. IoBuffer setAutoExpand(boolean autoExpand):
这个方法设置IoBuffer 为自动扩展容量,也就是前面所说的长度可变,那么可以看出长度可变这个特性默认是不开启的。
D. IoBuffer setAutoShrink(boolean autoShrink):
这个方法设置IoBuffer 为自动收缩,这样在compact()方法调用之后,可以裁减掉一些没有使用的空间。如果这个方法没有被调用或者设置为false,你也可以通过调用shrink()方法手动收缩空间。
(8.)IoFuture:
在Mina 的很多操作中,你会看到返回值是XXXFuture,实际上他们都是IoFuture 的子类,看到这样的返回值,这个方法就说明是异步执行的,主要的子类有ConnectFuture、CloseFuture 、ReadFuture 、WriteFuture 。这个接口的大部分操作都和
java.util.concurrent.Future 接口是类似的,譬如:await()、awaitUninterruptibly()等,一般我们常用awaitUninterruptibly()方法可以等待异步执行的结果返回。这个接口有如下常用的方法:
A. IoFuture addListener(IoFutureListener
listener):
这个方法用于添加一个监听器, 在异步执行的结果返回时监听器中的回调方法operationComplete(IoFuture future),也就是说,这是替代awaitUninterruptibly()方法另一种等待异步执行结果的方法,它的好处是不会产生阻塞。
B. IoFuture removeListener(IoFutureListener
listener):
这个方法用于移除指定的监听器。
C. IoSession getSession():
这个方法返回当前的IoSession。举个例子,我们在客户端调用connect()方法访问Server 端的时候,实际上这就是一个异步执行的方法,也就是调用connect()方法之后立即返回,执行下面的代码,而不管是否连
接成功。那么如果我想在连接成功之后执行一些事情(譬如:获取连接成功后的IoSession对象),该怎么办呢?按照上面的说明,你有如下两种办法:
第一种:
- ConnectFuture future = connector.connect(new InetSocketAddress(
- HOSTNAME, PORT));
- // 等待是否连接成功,相当于是转异步执行为同步执行。
- future.awaitUninterruptibly();
- // 连接成功后获取会话对象。如果没有上面的等待,由于connect()方法是异步的,session
- 可能会无法获取。
- session = future.getSession();
第二种:
- ConnectFuture future = connector.connect(new InetSocketAddress(
- HOSTNAME, PORT));
- future.addListener(new IoFutureListener
() {
- @Override
- public void operationComplete(ConnectFuture future) {
- try {
- Thread.sleep(5000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- IoSession session = future.getSession();
- System.out.println(“++++++++++++++++++++++++++++”);
- }
- });
- System.out.println(“*”);
为了更好的看清楚使用监听器是异步的,而不是像awaitUninterruptibly()那样会阻塞主线程的执行,我们在回调方法中暂停5 秒钟,然后输出+++,在最后输出*。我们执行代码之后,你会发现首先输出*(这证明了监听器是异步执行的),然后IoSession 对象Created,系统暂停5 秒,然后输出+++,最后IoSession 对象Opened,也就是TCP 连接建立。
4.日志配置:
前面的示例代码中提到了使用SLF4J 作为日志门面,这是因为Mina 内部使用的就是SLF4J,你也使用SLF4J 可以与之保持一致性。Mina 如果想启用日志跟踪Mina 的运行细节,你可以配置LoggingFilter 过滤器,这样你可
以看到Session 建立、打开、空闲等一系列细节在日志中输出,默认SJF4J 是按照DEBUG级别输出跟踪信息的,如果你想给某一类别的Mina 运行信息输出指定日志输出级别,可以调用LoggingFilter 的setXXXLogLevel(LogLevel.XXX)。
例:
- LoggingFilter lf = new LoggingFilter();
- lf.setSessionOpenedLogLevel(LogLevel.ERROR);
- acceptor.getFilterChain().addLast(“logger”, lf);
这里IoSession 被打开的跟踪信息将以ERROR 级别输出到日志。
5.过滤器:
前面我们看到了LoggingFilter、ProtocolCodecFilter 两个过滤器,一个负责日志输出,一个负责数据的编解码,通过最前面的Mina 执行流程图,在IoProcessor 与IoHandler 之间可以有很多的过滤器,这种设计方式为你提供可插拔似的扩展功能提供了非常便利的方式,目前的Apache CXF、Apache Struts2 中的拦截器也都是一样的设计思路。Mina 中的IoFilter 是单例的,这与CXF、Apache Struts2 没什么区别。IoService 实例上会绑定一个DefaultIoFilterChainBuilder 实例,DefaultIoFilterChainBuilder 会把使用内部的EntryImpl 类把所有的过滤器按照顺序连在一起,组成一个过滤器链。
DefaultIoFilterChainBuilder 类如下常用的方法:
A. void addFirst(String name,IoFilter filter):
这个方法把过滤器添加到过滤器链的头部,头部就是IoProcessor 之后的第一个过滤器。同样的addLast()方法把过滤器添加到过滤器链的尾部。
B. void addBefore(String baseName,String name,IoFilter filter):
这个方法将过滤器添加到baseName 指定的过滤器的前面,同样的addAfter()方法把过滤器添加到baseName 指定的过滤器的后面。这里要注意无论是那种添加方法,每个过滤器的名字(参数name)必须是唯一的。
C. IoFilter remove(Stirng name):
这个方法移除指定名称的过滤器,你也可以调用另一个重载的remove()方法,指定要移除的IoFilter 的类型。
D. List
getAll():
这个方法返回当前IoService 上注册的所有过滤器。默认情况下,过滤器链中是空的,也就是getAll()方法返回长度为0 的List,但实际Mina内部有两个隐藏的过滤器:HeadFilter、TailFilter,分别在List 的最开始和最末端,很明显,TailFilter 在最末端是为了调用过滤器链之后,调用IoHandler。但这两个过滤器对你来说是透明的,可以忽略它们的存在。编写一个过滤器很简单,你需要实现IoFilter 接口,如果你只关注某几个方法,可以继承IoFilterAdapter 适配器类。IoFilter 接口中主要包含两类方法,一类是与IoHandler 中的方法名一致的方法,相当于拦截IoHandler 中的方法,另一类是IoFilter 的生命周期回调方法,这些回调方法的执行顺序和解释如下所示:
- public class MyIoFilter implements IoFilter {
- @Override
- public void destroy() throws Exception {
- System.out.println(“%%%%%%%%%%%%%%%%%%%%%%%%%%�stroy”);
- }
- @Override
- public void exceptionCaught(NextFilter nextFilter, IoSession
- session,
- Throwable cause) throws Exception {
- System.out.println(“%%%%%%%%%%%%%%%%%%%%%%%%%%%exceptionCaught”);
- nextFilter.exceptionCaught(session, cause);
- }
- @Override
- public void filterClose(NextFilter nextFilter, IoSession session)
- throws Exception {
- System.out.println(“%%%%%%%%%%%%%%%%%%%%%%%%%%%filterClose”);
- nextFilter.filterClose(session);
- }
- @Override
- public void filterWrite(NextFilter nextFilter, IoSession session,
- WriteRequest writeRequest) throws Exception {
- System.out.println(“%%%%%%%%%%%%%%%%%%%%%%%%%%%filterWrite”);
- nextFilter.filterWrite(session, writeRequest);
- }
- @Override
- public void init() throws Exception {
- System.out.println(“%%%%%%%%%%%%%%%%%%%%%%%%%%%init”);
- }
- @Override
- public void messageReceived(NextFilter nextFilter, IoSession
- session,
- Object message) throws Exception {
- System.out.println(“%%%%%%%%%%%%%%%%%%%%%%%%%%%messageReceived”);
- nextFilter.messageReceived(session, message);
- }
- @Override
- public void messageSent(NextFilter nextFilter, IoSession session,
- WriteRequest writeRequest) throws Exception {
- System.out.println(“%%%%%%%%%%%%%%%%%%%%%%%%%%%messageSent”);
- nextFilter.messageSent(session, writeRequest);
- }
- @Override
- public void onPostAdd(IoFilterChain parent, String name,
- NextFilter nextFilter) throws Exception {
- System.out.println(“%%%%%%%%%%%%%%%%%%%%%%%%%%%onPostAdd”);
- }
- @Override
- public void onPostRemove(IoFilterChain parent, String name,
- NextFilter nextFilter) throws Exception {
- System.out.println(“%%%%%%%%%%%%%%%%%%%%%%%%%%%onPostRemove”);
- }
- @Override
- public void onPreAdd(IoFilterChain parent, String name,
- NextFilter nextFilter) throws Exception {
- System.out.println(“%%%%%%%%%%%%%%%%%%%%%%%%%%%onPreAdd”);
- }
- @Override
- public void onPreRemove(IoFilterChain parent, String name,
- NextFilter nextFilter) throws Exception {
- System.out.println(“%%%%%%%%%%%%%%%%%%%%%%%%%%%onPreRemove”);
- }
- @Override
- public void sessionClosed(NextFilter nextFilter, IoSession session)
- throws Exception {
- System.out.println(“%%%%%%%%%%%%%%%%%%%%%%%%%%%sessionClosed”);
- nextFilter.sessionClosed(session);
- }
- @Override
- public void sessionCreated(NextFilter nextFilter, IoSession session)
- throws Exception {
- System.out.println(“%%%%%%%%%%%%%%%%%%%%%%%%%%%sessionCreated”);
- nextFilter.sessionCreated(session);
- }
- @Override
- public void sessionIdle(NextFilter nextFilter, IoSession session,
- IdleStatus status) throws Exception {
- System.out.println(“%%%%%%%%%%%%%%%%%%%%%%%%%%%sessionIdle”);
- nextFilter.sessionIdle(session, status);
- }
- @Override
- public void sessionOpened(NextFilter nextFilter, IoSession session)
- throws Exception {
- System.out.println(“%%%%%%%%%%%%%%%%%%%%%%%%%%%sessionOpened”);
- nextFilter.sessionOpened(session);
- }
- }
我们将这个拦截器注册到上面的TCPServer 的IoAcceptor 的过滤器链中的最后一个:
- acceptor.getFilterChain().addLast(“myIoFilter”,
- new ReferenceCountingFilter(new MyIoFilter()));
6.协议编解码器:
前面说过,协议编解码器是在使用Mina 的时候你最需要关注的对象,因为在网络传输的数据都是二进制数据(byte),而你在程序中面向的是JAVA 对象,这就需要你实现在发送数据时将JAVA 对象编码二进制数据,而接收数据时将二进制数据解码为JAVA 对象(这个可不是JAVA 对象的序列化、反序列化那么简单的事情)。Mina 中的协议编解码器通过过滤器ProtocolCodecFilter 构造,这个过滤器的构造方法需要一个ProtocolCodecFactory,这从前面注册TextLineCodecFactory 的代码就可以看出来。
ProtocolCodecFactory 中有如下两个方法:
public interface ProtocolCodecFactory {
ProtocolEncoder getEncoder(IoSession session) throws Exception;
ProtocolDecoder getDecoder(IoSession session) throws Exception;
}
因此,构建一个ProtocolCodecFactory 需要ProtocolEncoder、ProtocolDecoder 两个实例。你可能要问JAVA 对象和二进制数据之间如何转换呢?这个要依据具体的通信协议,也就是Server 端要和Client 端约定网络传输的数据是什么样的格式,譬如:第一个字节表示数据长度,第二个字节是数据类型,后面的就是真正的数据(有可能是文字、有可能是图片等等),然后你可以依据长度从第三个字节向后读,直到读取到指定第一个字节指定长度的数据。
简单的说,HTTP 协议就是一种浏览器与Web 服务器之间约定好的通信协议,双方按照指定的协议编解码数据。我们再直观一点儿说,前面一直使用的TextLine 编解码器就是在读取网络上传递过来的数据时,只要发现哪个字节里存放的是ASCII 的10、13 字符(/r、/n),就认为之前的字节就是一个字符串(默认使用UTF-8 编码)。以上所说的就是各种协议实际上就是网络七层结构中的应用层协议,它位于网络层(IP)、传输层(TCP)之上,Mina 的协议编解码器就是让你实现一套自己的应用层协议栈。
(6-1.)简单的编解码器示例:
下面我们举一个模拟电信运营商短信协议的编解码器实现,假设通信协议如下所示:
M sip:wap.fetion.com.cn SIP-C/2.0
S: xxxx
R: xxxx
- public class SmsObject {
- private String sender;// 短信发送者
- private String receiver;// 短信接受者
- private String message;// 短信内容
- public String getSender() {
- return sender;
- }
- public void setSender(String sender) {
- this.sender = sender;
- }
- public String getReceiver() {
- return receiver;
- }
- public void setReceiver(String receiver) {
- this.receiver = receiver;
- }
- public String getMessage() {
- return message;
- }
- public void setMessage(String message) {
- this.message = message;
- }
- }
第二步,编码器:
在Mina 中编写编码器可以实现ProtocolEncoder,其中有encode()、dispose()两个方法需要实现。这里的dispose()方法用于在销毁编码器时释放关联的资源,由于这个方法一般我们并不关心,所以通常我们直接继承适配器ProtocolEncoderAdapter。
- public class CmccSipcEncoder extends ProtocolEncoderAdapter {
- private final Charset charset;
- public CmccSipcEncoder(Charset charset) {
- this.charset = charset;
- }
- @Override
- public void encode(IoSession session, Object message,
- ProtocolEncoderOutput out) throws Exception {
- SmsObject sms = (SmsObject) message;
- CharsetEncoder ce = charset.newEncoder();
- IoBuffer buffer = IoBuffer.allocate(100).setAutoExpand(true);
- String statusLine = “M sip:wap.fetion.com.cn SIP-C/2.0”;
- String sender = sms.getSender();
- String receiver = sms.getReceiver();
- String smsContent = sms.getMessage();
- buffer.putString(statusLine + ‘/n’, ce);
- buffer.putString(“S: “ + sender + ‘/n’, ce);
- buffer.putString(“R: “ + receiver + ‘/n’, ce);
- buffer
- .putString(“L: “ + (smsContent.getBytes(charset).length)
- + “/n”,
- ce);
- buffer.putString(smsContent, ce);
- buffer.flip();
- out.write(buffer);
- }
- }
B. 当你的doDecode()方法返回false 时,CumulativeProtocolDecoder 会停止对doDecode()方法的调用,但此时如果本次数据还有未读取完的,就将含有剩余数据的IoBuffer 缓冲区保存到IoSession 中,以便下一次数据到来时可以从IoSession 中提取合并。如果发现本次数据全都读取完毕,则清空IoBuffer 缓冲区。简而言之,当你认为读取到的数据已经够解码了,那么就返回true,否则就返回false。这个CumulativeProtocolDecoder 其实最重要的工作就是帮你完成了数据的累积,因为这个工作是很烦琐的。
- public class CmccSipcDecoder extends CumulativeProtocolDecoder {
- private final Charset charset;
- public CmccSipcDecoder(Charset charset) {
- this.charset = charset;
- }
- @Override
- protected boolean doDecode(IoSession session, IoBuffer in,
- ProtocolDecoderOutput out) throws Exception {
- IoBuffer buffer = IoBuffer.allocate(100).setAutoExpand(true);
- CharsetDecoder cd = charset.newDecoder();
- int matchCount = 0;
- String statusLine = “”, sender = “”, receiver = “”, length = “”,
- sms = “”;
- int i = 1;
- while (in.hasRemaining()) {
- byte b = in.get();
- buffer.put(b);
- if (b == 10 && i < 5) {
- matchCount++;
- if (i == 1) {
- buffer.flip();
- statusLine = buffer.getString(matchCount, cd);
- statusLine = statusLine.substring(0,
- statusLine.length() – 1);
- matchCount = 0;
- buffer.clear();
- }
- if (i == 2) {
- buffer.flip();
- sender = buffer.getString(matchCount, cd);
- sender = sender.substring(0, sender.length() –1);
- matchCount = 0;
- buffer.clear();
- }
- if (i == 3) {
- buffer.flip();
- receiver = buffer.getString(matchCount, cd);
- receiver = receiver.substring(0, receiver.length()
- 1);
- matchCount = 0;
- buffer.clear();
- }
- if (i == 4) {
- buffer.flip();
- length = buffer.getString(matchCount, cd);
- length = length.substring(0, length.length() –1);
- matchCount = 0;
- buffer.clear();
- }
- i++;
- } else if (i == 5) {
- matchCount++;
- if (matchCount == Long.parseLong(length.split(“: “)[1]))
- {
- buffer.flip();
- sms = buffer.getString(matchCount, cd);
- i++;
- break;
- }
- } else {
- matchCount++;
- }
- }
- SmsObject smsObject = new SmsObject();
- smsObject.setSender(sender.split(“: “)[1]);
- smsObject.setReceiver(receiver.split(“: “)[1]);
- smsObject.setMessage(sms);
- out.write(smsObject);
- return false;
- }
- }
我们的这个短信协议解码器使用/n(ASCII 的10 字符)作为分解点,一个字节一个字节的读取,那么第一次发现/n 的字节位置之前的部分,必然就是短信协议的状态行,依次类推,你就可以解析出来发送者、接受者、短信内容长度。然后我们在解析短信内容时,使用获取到的长度进行读取。全部读取完毕之后, 然后构造SmsObject 短信对象, 使用ProtocolDecoderOutput 的write()方法输出,最后返回false,也就是本次数据全部读取完毕,告知CumulativeProtocolDecoder 在本次数据读取中不需要再调用doDecode()方法了。这里需要注意的是两个状态变量i、matchCount,i 用于记录解析到了短信协议中的哪一行(/n),matchCount 记录在当前行中读取到了哪一个字节。状态变量在解码器中经常被使用,我们这里的情况比较简单,因为我们假定短信发送是在一次数据发送中完成的,所以状态变量的使用也比较简单。假如数据的发送被拆成了多次(譬如:短信协议的短信内容、消息报头被拆成了两次数据发送),那么上面的代码势必就会存在问题,因为当第二次调用doDecode()方法时,状态变量i、matchCount 势必会被重置,也就是原来的状态值并没有被保存。那么我们如何解决状态保存的问题呢?答案就是将状态变量保存在IoSession 中或者是Decoder 实例自身,但推荐使用前者,因为虽然Decoder 是单例的,其中的实例变量保存的状态在Decoder 实例销毁前始终保持,但Mina 并不保证每次调用doDecode()方法时都是同一个线程(这也就是说第一次调用doDecode()是IoProcessor-1 线程,第二次有可能就是IoProcessor-2 线程),这就会产生多线程中的实例变量的可视性(Visibility,具体请参考JAVA 的多线程知识)问题。IoSession中使用一个同步的HashMap 保存对象,所以你不需要担心多线程带来的问题。使用IoSession 保存解码器的状态变量通常的写法如下所示:
A. 在解码器中定义私有的内部类Context,然后将需要保存的状态变量定义在Context 中存储。
B. 在解码器中定义方法获取这个Context 的实例,这个方法的实现要优先从IoSession 中获取Context。
具体代码示例如下所示:
// 上下文作为保存状态的内部类的名字,意思很明显,就是让状态跟随上下文,在整个调用过程中都可以被保持。
- public class XXXDecoder extends CumulativeProtocolDecoder{
- private final AttributeKey CONTEXT =
- new AttributeKey(getClass(), “context” );
- public Context getContext(IoSession session){
- Context ctx=(Context)session.getAttribute(CONTEXT);
- if(ctx==null){
- ctx=new Context();
- session.setAttribute(CONTEXT,ctx);
- }
- }
- private class Context {
- //状态变量
- }
- }
注意这里我们使用了Mina 自带的AttributeKey 类来定义保存在IoSession 中的对象的键值,这样可以有效的防止键值重复。另外,要注意在全部处理完毕之后,状态要复位,譬如:聊天室中的一条消息读取完毕之后,状态变量要变为初始值,以便下次处理时重新使用。
第四步,编解码工厂:
- public class CmccSipcCodecFactory implements ProtocolCodecFactory {
- private final CmccSipcEncoder encoder;
- private final CmccSipcDecoder decoder;
- public CmccSipcCodecFactory() {
- this(Charset.defaultCharset());
- }
- public CmccSipcCodecFactory(Charset charSet) {
- this.encoder = new CmccSipcEncoder(charSet);
- this.decoder = new CmccSipcDecoder(charSet);
- }
- @Override
- public ProtocolDecoder getDecoder(IoSession session) throws
- Exception {
- return decoder;
- }
- @Override
- public ProtocolEncoder getEncoder(IoSession session) throws
- Exception {
- return encoder;
- }
- }
实际上这个工厂类就是包装了编码器、解码器,通过接口中的getEncoder()、getDecoder()方法向ProtocolCodecFilter 过滤器返回编解码器实例,以便在过滤器中对数据进行编解码处理。
第五步,运行示例:
下面我们修改最一开始的示例中的MyServer、MyClient 的代码,如下所示:
- acceptor.getFilterChain().addLast(
- “codec”,
- new ProtocolCodecFilter(new CmccSipcCodecFactory(Charset
- .forName(“UTF-8”))));
- connector.getFilterChain().addLast(
- “codec”,
- new ProtocolCodecFilter(new
- CmccSipcCodecFactory(
- Charset.forName(“UTF-8”))));
- 然后我们在ClientHandler 中发送一条短信:
- public void sessionOpened(IoSession session) {
- SmsObject sms = new SmsObject();
- sms.setSender(“”);
- sms.setReceiver(“”);
- sms.setMessage(“你好!Hello World!”);
- session.write(sms);
- }
最后我们在MyIoHandler 中接收这条短信息:
- public void messageReceived(IoSession session, Object message)
- throws Exception {
- SmsObject sms = (SmsObject) message;
- log.info(“The message received is [“ + sms.getMessage() + “]”);
- }
CmccSispcDecoder 类改为如下的写法:
- public class CmccSipcDecoder extends CumulativeProtocolDecoder {
- private final Charset charset;
- private final AttributeKey CONTEXT = new AttributeKey(getClass(),
- “context”);
- public CmccSipcDecoder(Charset charset) {
- this.charset = charset;
- }
- @Override
- protected boolean doDecode(IoSession session, IoBuffer in,
- ProtocolDecoderOutput out) throws Exception {
- Context ctx = getContext(session);
- CharsetDecoder cd = charset.newDecoder();
- int matchCount = ctx.getMatchCount();
- int line = ctx.getLine();
- IoBuffer buffer = ctx.innerBuffer;
- String statusLine = ctx.getStatusLine(),
- sender = ctx.getSender(),
- receiver = ctx.getReceiver(),
- length = ctx.getLength(),
- sms = ctx.getSms();
- while (in.hasRemaining()) {
- byte b = in.get();
- matchCount++;
- buffer.put(b);
- if (line < 4 && b == 10) {
- if (line == 0) {
- buffer.flip();
- statusLine = buffer.getString(matchCount, cd);
- statusLine = statusLine.substring(0,
- statusLine.length() – 1);
- matchCount = 0;
- buffer.clear();
- ctx.setStatusLine(statusLine);
- }
- if (line == 1) {
- buffer.flip();
- sender = buffer.getString(matchCount, cd);
- sender = sender.substring(0, sender.length() – 1);
- matchCount = 0;
- buffer.clear();
- ctx.setSender(sender);
- }
- if (line == 2) {
- buffer.flip();
- receiver = buffer.getString(matchCount, cd);
- receiver = receiver.substring(0, receiver.length() –
- 1);
- matchCount = 0;
- buffer.clear();
- ctx.setReceiver(receiver);
- }
- if (line == 3) {
- buffer.flip();
- length = buffer.getString(matchCount, cd);
- length = length.substring(0, length.length() – 1);
- matchCount = 0;
- buffer.clear();
- ctx.setLength(length);
- }
- line++;
- } else if (line == 4) {
- if (matchCount == Long.parseLong(length.split(“: “)[1]))
- {
- buffer.flip();
- sms = buffer.getString(matchCount, cd);
- ctx.setSms(sms);
- // 由于下面的break,这里需要调用else外面的两行代码
- ctx.setMatchCount(matchCount);
- ctx.setLine(line);
- break;
- }
- }
- ctx.setMatchCount(matchCount);
- ctx.setLine(line);
- }
- if (ctx.getLine() == 4
- && Long.parseLong(ctx.getLength().split(“: “)[1]) == ctx
- .getMatchCount()) {
- SmsObject smsObject = new SmsObject();
- smsObject.setSender(sender.split(“: “)[1]);
- smsObject.setReceiver(receiver.split(“: “)[1]);
- smsObject.setMessage(sms);
- out.write(smsObject);
- ctx.reset();
- return true;
- } else {
- return false;
- }
- }
- private Context getContext(IoSession session) {
- Context context = (Context) session.getAttribute(CONTEXT);
- if (context == null){
- context = new Context();
- session.setAttribute(CONTEXT, context);
- }
- return context;
- }
- private class Context {
- private final IoBuffer innerBuffer;
- private String statusLine = “”;
- private String sender = “”;
- private String receiver = “”;
- private String length = “”;
- private String sms = “”;
- public Context() {
- innerBuffer = IoBuffer.allocate(100).setAutoExpand(true);
- }
- private int matchCount = 0;
- private int line = 0;
- public int getMatchCount() {
- return matchCount;
- }
- public void setMatchCount(int matchCount) {
- this.matchCount = matchCount;
- }
- public int getLine() {
- return line;
- }
- public void setLine(int line) {
- this.line = line;
- }
- public String getStatusLine() {
- return statusLine;
- }
- public void setStatusLine(String statusLine) {
- this.statusLine = statusLine;
- }
- public String getSender() {
- return sender;
- }
- public void setSender(String sender) {
- this.sender = sender;
- }
- public String getReceiver() {
- return receiver;
- }
- public void setReceiver(String receiver) {
- this.receiver = receiver;
- }
- public String getLength() {
- return length;
- }
- public void setLength(String length) {
- this.length = length;
- }
- public String getSms() {
- return sms;
- }
- public void setSms(String sms) {
- this.sms = sms;
- }
- public void reset() {
- this.innerBuffer.clear();
- this.matchCount = 0;
- this.line = 0;
- this.statusLine = “”;
- this.sender = “”;
- this.receiver = “”;
- this.length = “”;
- this.sms = “”;
- }
- }
- }
这里我们做了如下的几步操作:
(1.) 所有记录状态的变量移到了Context 内部类中,包括记录读到短信协议的哪一行的line。每一行读取了多少个字节的matchCount,还有记录解析好的状态行、发送者、接受者、短信内容、累积数据的innerBuffer 等。这样就可以在数据不能完全解码,等待下一次doDecode()方法的调用时,还能承接上一次调用的数据。
(2.) 在 doDecode()方法中主要的变化是各种状态变量首先是从Context 中获取,然后操作之后,将最新的值setXXX()到Context 中保存。
(3.) 这里注意doDecode()方法最后的判断,当认为不够解码为一条短信息时,返回false,也就是在本次数据流解码中不要再调用doDecode()方法;当认为已经解码出一条短信息时,输出短消息,然后重置所有的状态变量,返回true,也就是如果本次数据流解码中还有没解码完的数据,继续调用doDecode()方法。下面我们对客户端稍加改造,来模拟上面的红、蓝、绿三次发送聊天短信息的情况:
MyClient:
- ConnectFuture future = connector.connect(new InetSocketAddress(
- HOSTNAME, PORT));
- future.awaitUninterruptibly();
- session = future.getSession();
- for (int i = 0; i < 3; i++) {
- SmsObject sms = new SmsObject();
- session.write(sms);
- System.out.println(“” + i);
- }
这里我们为了方便演示,不在IoHandler 中发送消息,而是直接在MyClient 中发送,你要注意的是三次发送都要使用同一个IoSession,否则就不是从同一个通道发送过去的了。
CmccSipcEncoder:
- public void encode(IoSession session, Object message,
- ProtocolEncoderOutput out) throws Exception {
- SmsObject sms = (SmsObject) message;
- CharsetEncoder ce = charset.newEncoder();
- String statusLine = “M sip:wap.fetion.com.cn SIP-C/2.0”;
- String sender = “”;
- String receiver = “”;
- String smsContent = “你好!Hello World!”;
- IoBuffer buffer = IoBuffer.allocate(100).setAutoExpand(true);
- buffer.putString(statusLine + ‘/n’, ce);
- buffer.putString(“S: “ + sender + ‘/n’, ce);
- buffer.putString(“R: “ + receiver + ‘/n’, ce);
- buffer.flip();
- out.write(buffer);
- IoBuffer buffer2 = IoBuffer.allocate(100).setAutoExpand(true);
- buffer2.putString(“L: “ + (smsContent.getBytes(charset).length)
- + “/n”,ce);
- buffer2.putString(smsContent, ce);
- buffer2.putString(statusLine + ‘/n’, ce);
- buffer2.flip();
- out.write(buffer2);
- IoBuffer buffer3 = IoBuffer.allocate(100).setAutoExpand(true);
- buffer3.putString(“S: “ + sender + ‘/n’, ce);
- buffer3.putString(“R: “ + receiver + ‘/n’, ce);
- buffer3.putString(“L: “ + (smsContent.getBytes(charset).length)
- + “/n”,ce);
- buffer3.putString(smsContent, ce);
- buffer3.putString(statusLine + ‘/n’, ce);
- buffer3.flip();
- out.write(buffer3);
- }
上面的这段代码要配合MyClient来操作,你需要做的是在MyClient中的红色输出语句处设置断点,然后第一调用时CmccSipcEncoder中注释掉蓝、绿色的代码,也就是发送两条短信息的第一部分(红色的代码),依次类推,也就是MyClient的中的三次断点中,分别执行CmccSipcEncoder中的红、蓝、绿三段代码,也就是模拟两条短信的三段发送。你会看到Server端的运行结果是:当MyClient第一次到达断点时,没有短信息被读取到,当MyClient第二次到达断点时,第一条短信息输出,当MyClient第三次到达断点时,第二条短信息输出。
(6-3.)多路分离的解码器:
假设一段数据发送过来之后,需要根据某种条件决定使用哪个解码器,而不是像上面的例子,固定使用一个解码器,那么该如何做呢?幸好Mina 提供了org.apache.mina.filter.codec.demux 包来完成这种多路分离(Demultiplexes)的解码工作,也就是同时注册多个解码器,然后运行时依据传入的数据决定到底使用哪个解码器来工作。所谓多路分离就是依据条件分发到指定的解码器,譬如:上面的短信协议进行扩展,可以依据状态行来判断使用1.0 版本的短信协议解码器还是2.0版本的短信协议解码器。
下面我们使用一个简单的例子,说明这个多路分离的解码器是如何使用的,需求如下所示:
(1.) 客户端传入两个int 类型的数字,还有一个char 类型的符号。
(2.) 如果符号是+,服务端就是用1 号解码器,对两个数字相加,然后把结果返回给客户端。
(3.) 如果符号是-,服务端就使用2 号解码器,将两个数字变为相反数,然后相加,把结果返回给客户端。
Demux 开发编解码器主要有如下几个步骤:
A. 定义Client 端、Server 端发送、接收的数据对象。
B. 使用Demux 编写编码器是实现MessageEncoder
接口,T 是你要编码的数据对象,这个MessageEncoder 会在DemuxingProtocolEncoder 中调用。
C. 使用Demux 编写编码器是实现MessageDecoder 接口,这个MessageDecoder 会在DemuxingProtocolDecoder 中调用。
- public interface MessageEncoder
{
- void encode(IoSession session, T message, ProtocolEncoderOutput out)
- throws Exception;
- }
你注意到消息编码器接口与在ProtocolEncoder 中没什么不同,区别就是Object message被泛型具体化了类型,你不需要手动的类型转换了。
MessageDecoder的接口如下所示:
- public interface MessageDecoder {
- static MessageDecoderResult OK = MessageDecoderResult.OK;
- static MessageDecoderResult NEED_DATA =
- MessageDecoderResult.NEED_DATA;
- static MessageDecoderResult NOT_OK = MessageDecoderResult.NOT_OK;
- MessageDecoderResult decodable(IoSession session, IoBuffer in);
- MessageDecoderResult decode(IoSession session, IoBuffer in,
- ProtocolDecoderOutput out) throws Exception;
- void finishDecode(IoSession session, ProtocolDecoderOutput out)
- throws Exception;
- }
- public class SendMessage {
- private int i = 0;
- private int j = 0;
- private char symbol = ‘+’;
- public char getSymbol() {
- return symbol;
- }
- public void setSymbol(char symbol) {
- this.symbol = symbol;
- }
- public int getI() {
- return i;
- }
- public void setI(int i) {
- this.i = i;
- }
- public int getJ() {
- return j;
- }
- public void setJ(int j) {
- this.j = j;
- }
- }
(2.)服务端发送的返回结果对象:
- public class ResultMessage {
- private int result = 0;
- public int getResult() {
- return result;
- }
- public void setResult(int result) {
- this.result = result;
- }
- }
(3.)客户端使用的SendMessage的编码器:
- public class SendMessageEncoder implements MessageEncoder
- {
- @Override
- public void encode(IoSession session, SendMessage message,
- ProtocolEncoderOutput out) throws Exception {
- IoBuffer buffer = IoBuffer.allocate(10);
- buffer.putChar(message.getSymbol());
- buffer.putInt(message.getI());
- buffer.putInt(message.getJ());
- buffer.flip();
- out.write(buffer);
- }
- }
这里我们的SendMessage、ResultMessage 中的字段都是用长度固定的基本数据类型,这样IoBuffer 就不需要自动扩展了,提高性能。按照一个char、两个int 计算,这里的IoBuffer只需要10 个字节的长度就可以了。
(4.)服务端使用的SendMessage的1号解码器:
- public class SendMessageDecoderPositive implements MessageDecoder {
- @Override
- public MessageDecoderResult decodable(IoSession session, IoBuffer in)
- {
- if (in.remaining() < 2)
- return MessageDecoderResult.NEED_DATA;
- else {
- char symbol = in.getChar();
- if (symbol == ‘+’) {
- return MessageDecoderResult.OK;
- } else {
- return MessageDecoderResult.NOT_OK;
- }
- }
- }
- @Override
- public MessageDecoderResult decode(IoSession session, IoBuffer in,
- ProtocolDecoderOutput out) throws Exception {
- SendMessage sm = new SendMessage();
- sm.setSymbol(in.getChar());
- sm.setI(in.getInt());
- sm.setJ(in.getInt());
- out.write(sm);
- return MessageDecoderResult.OK;
- }
- @Override
- public void finishDecode(IoSession session, ProtocolDecoderOutput
- out)
- throws Exception {
- // undo
- }
- }
因为客户端发送的SendMessage 的前两个字节(char)就是符号位,所以我们在decodable()方法中对此条件进行了判断,之后读到两个字节,并且这两个字节表示的字符是+时,才认为这个解码器可用。
(5.)服务端使用的SendMessage的2号解码器:
- public class SendMessageDecoderNegative implements MessageDecoder {
- @Override
- public MessageDecoderResult decodable(IoSession session, IoBuffer in)
- {
- if (in.remaining() < 2)
- return MessageDecoderResult.NEED_DATA;
- else {
- char symbol = in.getChar();
- if (symbol == ‘-‘) {
- return MessageDecoderResult.OK;
- } else {
- return MessageDecoderResult.NOT_OK;
- }
- }
- }
- @Override
- public MessageDecoderResult decode(IoSession session, IoBuffer in,
- ProtocolDecoderOutput out) throws Exception {
- SendMessage sm = new SendMessage();
- sm.setSymbol(in.getChar());
- sm.setI(-in.getInt());
- sm.setJ(-in.getInt());
- out.write(sm);
- return MessageDecoderResult.OK;
- }
- @Override
- public void finishDecode(IoSession session, ProtocolDecoderOutput
- out)
- throws Exception {
- // undo
- }
- }
(6.)服务端使用的ResultMessage的编码器:
- public class ResultMessageEncoder implements
- MessageEncoder
{
- @Override
- public void encode(IoSession session, ResultMessage message,
- ProtocolEncoderOutput out) throws Exception {
- IoBuffer buffer = IoBuffer.allocate(4);
- buffer.putInt(message.getResult());
- buffer.flip();
- out.write(buffer);
- }
- }
(7.)客户端使用的ResultMessage的解码器:
- public class ResultMessageDecoder implements MessageDecoder {
- @Override
- public MessageDecoderResult decodable(IoSession session, IoBuffer in)
- {
- if (in.remaining() < 4)
- return MessageDecoderResult.NEED_DATA;
- else if (in.remaining() == 4)
- return MessageDecoderResult.OK;
- else
- return MessageDecoderResult.NOT_OK;
- }
- @Override
- public MessageDecoderResult decode(IoSession session, IoBuffer in,
- ProtocolDecoderOutput out) throws Exception {
- ResultMessage rm = new ResultMessage();
- rm.setResult(in.getInt());
- out.write(rm);
- return MessageDecoderResult.OK;
- }
- @Override
- public void finishDecode(IoSession session, ProtocolDecoderOutput
- out)
- throws Exception {
- // undo
- }
- }
(8.)组装这些编解码器的工厂:
- public class MathProtocolCodecFactory extends
- DemuxingProtocolCodecFactory {
- public MathProtocolCodecFactory(boolean server) {
- if (server) {
- super.addMessageEncoder(ResultMessage.class,
- ResultMessageEncoder.class);
- super.addMessageDecoder(SendMessageDecoderPositive.class);
- super.addMessageDecoder(SendMessageDecoderNegative.class);
- } else {
- super
- .addMessageEncoder(SendMessage.class,
- SendMessageEncoder.class);
- super.addMessageDecoder(ResultMessageDecoder.class);
- }
- }
- }
这个工厂类我们使用了构造方法的一个布尔类型的参数,以便其可以在Server 端、Client端同时使用。我们以Server 端为例,你可以看到调用两次addMessageDecoder()方法添加了1 号、2 号解码器,其实DemuxingProtocolDecoder 内部在维护了一个MessageDecoder数组,用于保存添加的所有的消息解码器,每次decode()的时候就调用每个MessageDecoder的decodable()方法逐个检查,只要发现一个MessageDecoder 不是对应的解码器,就从数组中移除,直到找到合适的MessageDecoder,如果最后发现数组为空,就表示没找到对应的MessageDecoder,最后抛出异常。
(9.)Server端:
- public class Server {
- public static void main(String[] args) throws Exception {
- IoAcceptor acceptor = new NioSocketAcceptor();
- LoggingFilter lf = new LoggingFilter();
- acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE,
- 5);
- acceptor.getFilterChain().addLast(“logger”, lf);
- acceptor.getFilterChain().addLast(“codec”,
- new ProtocolCodecFilter(new
- MathProtocolCodecFactory(true)));
- acceptor.setHandler(new ServerHandler());
- acceptor.bind(new InetSocketAddress(9123));
- }
- }
(10.)Server端使用的IoHandler:
- public class ServerHandler extends IoHandlerAdapter {
- private final static Logger log = LoggerFactory
- .getLogger(ServerHandler.class);
- @Override
- public void sessionIdle(IoSession session, IdleStatus status)
- throws Exception {
- session.close(true);
- }
- @Override
- public void messageReceived(IoSession session, Object message)
- throws Exception {
- SendMessage sm = (SendMessage) message;
- log.info(“The message received is [ “ + sm.getI() + ” “
- + sm.getSymbol() + ” “ + sm.getJ() + ” ]”);
- ResultMessage rm = new ResultMessage();
- rm.setResult(sm.getI() + sm.getJ());
- session.write(rm);
- }
- }
(11.)Client端:
- public class Client {
- public static void main(String[] args) throws Throwable {
- IoConnector connector = new NioSocketConnector();
- connector.setConnectTimeoutMillis(30000);
- connector.getFilterChain().addLast(“logger”, new
- LoggingFilter());
- connector.getFilterChain().addLast(“codec”,
- new ProtocolCodecFilter(new
- MathProtocolCodecFactory(false)));
- connector.setHandler(new ClientHandler());
- connector.connect(new InetSocketAddress(“localhost”, 9123));
- }
- }
(12.)Client端的IoHandler:
- public class ClientHandler extends IoHandlerAdapter {
- private final static Logger LOGGER = LoggerFactory
- .getLogger(ClientHandler.class);
- @Override
- public void sessionOpened(IoSession session) throws Exception {
- SendMessage sm = new SendMessage();
- sm.setI(100);
- sm.setJ(99);
- sm.setSymbol(‘+’);
- session.write(sm);
- }
- @Override
- public void messageReceived(IoSession session, Object message) {
- ResultMessage rs = (ResultMessage) message;
- LOGGER.info(String.valueOf(rs.getResult()));
- }
- }
你尝试改变(12.)中的红色代码中的正负号,会看到服务端使用了两个不同的解码器对其进行处理。
7.线程模型配置:
Mina 中的很多执行环节都使用了多线程机制,用于提高性能。Mina 中默认在三个地方使用了线程:
(1.) IoAcceptor:
这个地方用于接受客户端的连接建立,每监听一个端口(每调用一次bind()方法),都启用一个线程,这个数字我们不能改变。这个线程监听某个端口是否有请求到来,一旦发现,则创建一个IoSession 对象。因为这个动作很快,所以有一个线程就够了。
(2.) IoConnector:
这个地方用于与服务端建立连接,每连接一个服务端(每调用一次connect()方法),就启用一个线程,我们不能改变。同样的,这个线程监听是否有连接被建立,一旦发现,则创建一个IoSession 对象。因为这个动作很快,所以有一个线程就够了。
(3.) IoProcessor:
这个地方用于执行真正的IO 操作,默认启用的线程个数是CPU 的核数+1,譬如:单CPU 双核的电脑,默认的IoProcessor 线程会创建3 个。这也就是说一个IoAcceptor 或者IoConnector 默认会关联一个IoProcessor 池,这个池中有3 个IoProcessor。因为IO 操作耗费资源,所以这里使用IoProcessor 池来完成数据的读写操作,有助于提高性能。这也就是前面说的IoAccetor、IoConnector 使用一个Selector,而IoProcessor 使用自己单独的Selector 的原因。那么为什么IoProcessor 池中的IoProcessor 数量只比CPU 的核数大1 呢?因为IO 读写操作是耗费CPU 的操作,而每一核CPU 同时只能运行一个线程,因此IoProcessor 池中的IoProcessor 的数量并不是越多越好。
- public enum IoEventType {
- SESSION_CREATED,
- SESSION_OPENED,
- SESSION_CLOSED,
- MESSAGE_RECEIVED,
- MESSAGE_SENT,
- SESSION_IDLE,
- EXCEPTION_CAUGHT,
- WRITE,
- CLOSE,
- }
默认情况下,没有配置关注的事件类型,有如下六个事件方法会被自动使用线程池异步执行:
IoEventType.EXCEPTION_CAUGHT,
IoEventType.MESSAGE_RECEIVED,
IoEventType.MESSAGE_SENT,
IoEventType.SESSION_CLOSED,
IoEventType.SESSION_IDLE,
IoEventType.SESSION_OPENED
其实ExecutorFilter 的工作机制很简单,就是在调用下一个过滤器的事件方法时,把其交给Executor 的execute(Runnable runnable)方法来执行,其实你自己在IoHandler 或者某个过滤器的事件方法中开启一个线程,也可以完成同样的功能,只不过这样做,你就失去了程序的可配置性,线程调用的代码也会完全耦合在代码中。但要注意的是绝对不能开启线程让其执行sessionCreated()方法。如果你真的打算使用这个ExecutorFilter,那么最好想清楚它该放在过滤器链的哪个位置,针对哪些事件做异步处理机制。一般ExecutorFilter 都是要放在ProtocolCodecFilter 过滤器的后面,也就是不要让编解码运行在独立的线程上,而是要运行在IoProcessor 所在的线程,因为编解码处理的数据都是由IoProcessor 读取和发送的,没必要开启新的线程,否则性能反而会下降。一般使用ExecutorFilter 的典型场景是将业务逻辑(譬如:耗时的数据库操作)放在单独的线程中运行,也就是说与IO 处理无关的操作可以考虑使用ExecutorFilter 来异步执行。
发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/221491.html原文链接:https://javaforall.net
