一、什么是Reactor模型
反应器设计模式(Reactor pattern)是一种为处理并发服务请求,并将请求提交到一个或
者多个服务处理程序的事件设计模式。当客户端请求抵达后,服务处理程序使用多路分配策略,由一个非阻塞的线程来接收所有的请求,然后派发这些请求至相关的工作线程进行处理。
关于reactor 是什么,我们先从wiki上看下:
The reactor design pattern is an event handling pattern for handling service requests delivered concurrently to a service handler by one or more inputs. The service handler then demultiplexes the incoming requests and dispatches them synchronously to the associated request handlers.
从上述文字中我们可以看出以下关键点 :
- 事件驱动(event handling)
- 可以处理一个或多个输入源(one or more inputs)
- 通过Service Handler同步的将输入事件(Event)采用多路复用分发给相应的Request Handler(多个)处理

自POSA2 中的关于Reactor Pattern 介绍中,我们了解了Reactor 的处理方式:
- 同步的等待多个事件源到达(采用select()实现)
- 将事件多路分解以及分配相应的事件服务进行处理,这个分派采用server集中处理(dispatch)
- 分解的事件以及对应的事件服务应用从分派服务中分离出去(handler)
二、为什么要用Reactor
常见的网络服务中,如果每一个客户端都维持一个与登陆服务器的连接。那么服务器将维护多个和客户端的连接以出来和客户端的contnect 、read、write ,特别是对于长链接的服务,有多少个c端,就需要在s端维护同等的IO连接。这对服务器来说是一个很大的开销。
三、Reactor的组成
首先我们基于Reactor Pattern 处理模式中,定义以下三种角色:
- Reactor 将I/O事件分派给对应的Handler
- Acceptor 处理客户端新连接,并分派请求到处理器链中
- Handlers 执行非阻塞读/写 任务
这是最基本的单Reactor单线程模型。其中Reactor线程,负责多路分离套接字,有新连接到来触发connect 事件之后,交由Acceptor进行处理,有IO读写事件之后交给hanlder 处理。
Acceptor主要任务就是构建handler ,在获取到和client相关的SocketChannel之后 ,绑定到相应的hanlder上,对应的SocketChannel有读写事件之后,基于racotor 分发,hanlder就可以处理了(所有的IO事件都绑定到selector上,有Reactor分发)。
该模型 适用于处理器链中业务处理组件能快速完成的场景。不过,这种单线程模型不能充分利用多核资源,所以实际使用的不多。
四、Reactor模型的发展与种类
4.1 单Reactor单线程模型(也叫单线程模式)

这是最简单的单Reactor单线程模型。Reactor线程是个多面手,负责多路分离套接字,Accept新连接,并分派请求到Handler处理器中。
下面的图,来自于“Scalable IO in Java”,和上面的图的意思,差不多。Reactor和Hander 处于一条线程执行。
顺便说一下,可以将上图的accepter,看做是一种特殊的handler。
4.1.1 单线程Reactor的参考代码
“Scalable IO in Java”,实现了一个单线程Reactor的参考代码,Reactor的代码如下:
package com.crazymakercircle.ReactorModel; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; class Reactor implements Runnable { final Selector selector; final ServerSocketChannel serverSocket; Reactor(int port) throws IOException { //Reactor初始化 selector = Selector.open(); serverSocket = ServerSocketChannel.open(); serverSocket.socket().bind(new InetSocketAddress(port)); //非阻塞 serverSocket.configureBlocking(false); //分步处理,第一步,接收accept事件 SelectionKey sk = serverSocket.register(selector, SelectionKey.OP_ACCEPT); //attach callback object, Acceptor sk.attach(new Acceptor()); } public void run() { try { while (!Thread.interrupted()) { selector.select(); Set selected = selector.selectedKeys(); Iterator it = selected.iterator(); while (it.hasNext()) { //Reactor负责dispatch收到的事件 dispatch((SelectionKey) (it.next())); } selected.clear(); } } catch (IOException ex) { /* ... */ } } void dispatch(SelectionKey k) { Runnable r = (Runnable) (k.attachment()); //调用之前注册的callback对象 if (r != null) { r.run(); } } // inner class class Acceptor implements Runnable { public void run() { try { SocketChannel channel = serverSocket.accept(); if (channel != null) new Handler(selector, channel); } catch (IOException ex) { /* ... */ } } } }
Handler的代码如下:
package com.crazymakercircle.ReactorModel; import com.crazymakercircle.config.SystemConfig; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; class Handler implements Runnable { final SocketChannel channel; final SelectionKey sk; ByteBuffer input = ByteBuffer.allocate(SystemConfig.INPUT_SIZE); ByteBuffer output = ByteBuffer.allocate(SystemConfig.SEND_SIZE); static final int READING = 0, SENDING = 1; int state = READING; Handler(Selector selector, SocketChannel c) throws IOException { channel = c; c.configureBlocking(false); // Optionally try first read now sk = channel.register(selector, 0); //将Handler作为callback对象 sk.attach(this); //第二步,注册Read就绪事件 sk.interestOps(SelectionKey.OP_READ); selector.wakeup(); } boolean inputIsComplete() { /* ... */ return false; } boolean outputIsComplete() { /* ... */ return false; } void process() { /* ... */ return; } public void run() { try { if (state == READING) { read(); } else if (state == SENDING) { send(); } } catch (IOException ex) { /* ... */ } } void read() throws IOException { channel.read(input); if (inputIsComplete()) { process(); state = SENDING; // Normally also do first write now //第三步,接收write就绪事件 sk.interestOps(SelectionKey.OP_WRITE); } } void send() throws IOException { channel.write(output); //write完就结束了, 关闭select key if (outputIsComplete()) { sk.cancel(); } } }
这两段代码,是建立在JAVA NIO的基础上的,这两段代码建议一定要看懂。可以在IDE中去看源码,这样直观感觉更佳。
4.1.2 单线程模式的缺点
- 当其中某个 handler 阻塞时, 会导致其他所有的 client 的 handler 都得不到执行, 并且更严重的是, handler 的阻塞也会导致整个服务不能接收新的 client 请求(因为 acceptor 也被阻塞了)。 因为有这么多的缺陷, 因此单线程Reactor 模型用的比较少。这种单线程模型不能充分利用多核资源,所以实际使用的不多。
- 因此,单线程模型仅仅适用于handler 中业务处理组件能快速完成的场景。
4.2 单Reactor多线程模型(也叫多线程模式)
4.2.1 基于线程池的改进
在线程Reactor模式基础上,做如下改进:
- 将Handler处理器的执行放入线程池,多线程进行业务处理。
- 而对于Reactor而言,可以仍为单个线程。如果服务器为多核的CPU,为充分利用系统资源,可以将Reactor拆分为两个线程。
一个简单的图如下:

4.2.2 改进后的完整示意图
下面的图,来自于“Scalable IO in Java”,和上面的图的意思,差不多,只是更加详细。Reactor是一条独立的线程,Hander 处于线程池中执行。
4.2.3 多线程模式参考代码
“Scalable IO in Java”,的多线程Reactor的参考代码,是基于单线程做一个线程池的改进,改进的Handler的代码如下:
package com.crazymakercircle.ReactorModel; import com.crazymakercircle.config.SystemConfig; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; class MthreadHandler implements Runnable { final SocketChannel channel; final SelectionKey selectionKey; ByteBuffer input = ByteBuffer.allocate(SystemConfig.INPUT_SIZE); ByteBuffer output = ByteBuffer.allocate(SystemConfig.SEND_SIZE); static final int READING = 0, SENDING = 1; int state = READING; ExecutorService pool = Executors.newFixedThreadPool(2); static final int PROCESSING = 3; MthreadHandler(Selector selector, SocketChannel c) throws IOException { channel = c; c.configureBlocking(false); // Optionally try first read now selectionKey = channel.register(selector, 0); //将Handler作为callback对象 selectionKey.attach(this); //第二步,注册Read就绪事件 selectionKey.interestOps(SelectionKey.OP_READ); selector.wakeup(); } boolean inputIsComplete() { /* ... */ return false; } boolean outputIsComplete() { /* ... */ return false; } void process() { /* ... */ return; } public void run() { try { if (state == READING) { read(); } else if (state == SENDING) { send(); } } catch (IOException ex) { /* ... */ } } synchronized void read() throws IOException { // ... channel.read(input); if (inputIsComplete()) { state = PROCESSING; //使用线程pool异步执行 pool.execute(new Processer()); } } void send() throws IOException { channel.write(output); //write完就结束了, 关闭select key if (outputIsComplete()) { selectionKey.cancel(); } } synchronized void processAndHandOff() { process(); state = SENDING; // or rebind attachment //process完,开始等待write就绪 selectionKey.interestOps(SelectionKey.OP_WRITE); } class Processer implements Runnable { public void run() { processAndHandOff(); } } }
4.3 多Reactor多线程模型(也叫主从多线程模式)

第三种模型比起第二种模型,是将Reactor分成两部分,
- mainReactor负责监听server socket,用来处理新连接的建立,将建立的socketChannel指定注册给subReactor。
- subReactor维护自己的selector, 基于mainReactor 注册的socketChannel多路分离IO读写事件,读写网 络数据,对业务处理的功能,另其扔给worker线程池来完成。
第三种模型中,我们可以看到,mainReactor 主要是用来处理网络IO 连接建立操作,通常一个线程就可以处理,而subReactor主要做和建立起来的socket做数据交互和事件业务处理操作,它的个数上一般是和CPU个数等同,每个subReactor一个县城来处理。
此种模型中,每个模块的工作更加专一,耦合度更低,性能和稳定性也大量的提升,支持的可并发客户端数量可达到上百万级别。
关于此种模型的应用,目前有很多优秀的矿建已经在应用了,比如mina 和netty 等。上述中去掉线程池的第三种形式的变种,也 是Netty NIO的默认模式。下一节我们将着重讲解netty的架构模式。
4.3.1 主从多线程模式参考代码
对于多个CPU的机器,为充分利用系统资源,将Reactor拆分为两部分。代码如下:
package com.crazymakercircle.ReactorModel; import java.io.IOException; import java.net.InetSocketAddress; import java.net.Socket; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; class MthreadReactor implements Runnable { //subReactors集合, 一个selector代表一个subReactor Selector[] selectors=new Selector[2]; int next = 0; final ServerSocketChannel serverSocket; MthreadReactor(int port) throws IOException { //Reactor初始化 selectors[0]=Selector.open(); selectors[1]= Selector.open(); serverSocket = ServerSocketChannel.open(); serverSocket.socket().bind(new InetSocketAddress(port)); //非阻塞 serverSocket.configureBlocking(false); //分步处理,第一步,接收accept事件 SelectionKey sk = serverSocket.register( selectors[0], SelectionKey.OP_ACCEPT); //attach callback object, Acceptor sk.attach(new Acceptor()); } public void run() { try { while (!Thread.interrupted()) { for (int i = 0; i <2 ; i++) { selectors[i].select(); Set selected = selectors[i].selectedKeys(); Iterator it = selected.iterator(); while (it.hasNext()) { //Reactor负责dispatch收到的事件 dispatch((SelectionKey) (it.next())); } selected.clear(); } } } catch (IOException ex) { /* ... */ } } void dispatch(SelectionKey k) { Runnable r = (Runnable) (k.attachment()); //调用之前注册的callback对象 if (r != null) { r.run(); } } class Acceptor { // ... public synchronized void run() throws IOException { SocketChannel connection = serverSocket.accept(); //主selector负责accept if (connection != null) { new Handler(selectors[next], connection); //选个subReactor去负责接收到的connection } if (++next == selectors.length) next = 0; } } }
4.3.2 主从多线程模式优缺点
优点
- 响应快,不必为单个同步时间所阻塞,虽然Reactor本身依然是同步的;
- 编程相对简单,可以最大程度的避免复杂的多线程及同步问题,并且避免了多线程/进程的切换开销;
- 可扩展性,可以方便的通过增加Reactor实例个数来充分利用CPU资源;
- 可复用性,reactor框架本身与具体事件处理逻辑无关,具有很高的复用性;
缺点
- 相比传统的简单模型,Reactor增加了一定的复杂性,因而有一定的门槛,并且不易于调试。
- Reactor模式需要底层的Synchronous Event Demultiplexer支持,比如Java中的Selector支持,操作系统的select系统调用支持,如果要自己实现Synchronous Event Demultiplexer可能不会有那么高效。
- Reactor模式在IO读写数据时还是在同一个线程中实现的,即使使用多个Reactor机制的情况下,那些共享一个Reactor的Channel如果出现一个长时间的数据读写,会影响这个Reactor中其他Channel的相应时间,比如在大文件传输时,IO操作就会影响其他Client的相应时间,因而对这种操作,使用传统的Thread-Per-Connection或许是一个更好的选择,或则此时使用改进版的Reactor模式如Proactor模式。
发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/220159.html原文链接:https://javaforall.net
