Reactor模型讲解

Reactor模型讲解一 什么是 Reactor 模型反应器设计模式 Reactorpatte 是一种为处理并发服务请求 并将请求提交到一个或者多个服务处理程序的事件设计模式 当客户端请求抵达后 服务处理程序使用多路分配策略 由一个非阻塞的线程来接收所有的请求 然后派发这些请求至相关的工作线程进行处理 关于 reactor 是什么 我们先从 wiki 上看下 Thereactorde

一、什么是Reactor模型

        反应器设计模式(Reactor pattern)是一种为处理并发服务请求,并将请求提交到一个或
者多个服务处理程序的事件设计模式。当客户端请求抵达后,服务处理程序使用多路分配策略,由一个非阻塞的线程来接收所有的请求,然后派发这些请求至相关的工作线程进行处理。

关于reactor 是什么,我们先从wiki上看下:

The reactor design pattern is an event handling pattern for handling service requests delivered concurrently to a service handler by one or more inputs. The service handler then demultiplexes the incoming requests and dispatches them synchronously to the associated request handlers.

从上述文字中我们可以看出以下关键点 :

  1. 事件驱动(event handling)
  2. 可以处理一个或多个输入源(one or more inputs)
  3. 通过Service Handler同步的将输入事件(Event)采用多路复用分发给相应的Request Handler(多个)处理

Reactor模型讲解

自POSA2 中的关于Reactor Pattern 介绍中,我们了解了Reactor 的处理方式:

  1. 同步的等待多个事件源到达(采用select()实现)
  2. 将事件多路分解以及分配相应的事件服务进行处理,这个分派采用server集中处理(dispatch)
  3. 分解的事件以及对应的事件服务应用从分派服务中分离出去(handler)

二、为什么要用Reactor

        常见的网络服务中,如果每一个客户端都维持一个与登陆服务器的连接。那么服务器将维护多个和客户端的连接以出来和客户端的contnect 、read、write ,特别是对于长链接的服务,有多少个c端,就需要在s端维护同等的IO连接。这对服务器来说是一个很大的开销。

三、Reactor的组成

首先我们基于Reactor Pattern 处理模式中,定义以下三种角色:

  • Reactor 将I/O事件分派给对应的Handler
  • Acceptor 处理客户端新连接,并分派请求到处理器链中
  • Handlers 执行非阻塞读/写 任务

这是最基本的单Reactor单线程模型。其中Reactor线程,负责多路分离套接字,有新连接到来触发connect 事件之后,交由Acceptor进行处理,有IO读写事件之后交给hanlder 处理。

Acceptor主要任务就是构建handler ,在获取到和client相关的SocketChannel之后 ,绑定到相应的hanlder上,对应的SocketChannel有读写事件之后,基于racotor 分发,hanlder就可以处理了(所有的IO事件都绑定到selector上,有Reactor分发)。

该模型 适用于处理器链中业务处理组件能快速完成的场景。不过,这种单线程模型不能充分利用多核资源,所以实际使用的不多。

四、Reactor模型的发展与种类

4.1 单Reactor单线程模型(也叫单线程模式)

wpsC334.tmp

        这是最简单的单Reactor单线程模型。Reactor线程是个多面手,负责多路分离套接字,Accept新连接,并分派请求到Handler处理器中。 

下面的图,来自于“Scalable IO in Java”,和上面的图的意思,差不多。Reactor和Hander 处于一条线程执行。

wpsC345.tmp

顺便说一下,可以将上图的accepter,看做是一种特殊的handler。

4.1.1 单线程Reactor的参考代码

“Scalable IO in Java”,实现了一个单线程Reactor的参考代码,Reactor的代码如下:

package com.crazymakercircle.ReactorModel; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; class Reactor implements Runnable { final Selector selector; final ServerSocketChannel serverSocket; Reactor(int port) throws IOException { //Reactor初始化 selector = Selector.open(); serverSocket = ServerSocketChannel.open(); serverSocket.socket().bind(new InetSocketAddress(port)); //非阻塞 serverSocket.configureBlocking(false); //分步处理,第一步,接收accept事件 SelectionKey sk = serverSocket.register(selector, SelectionKey.OP_ACCEPT); //attach callback object, Acceptor sk.attach(new Acceptor()); } public void run() { try { while (!Thread.interrupted()) { selector.select(); Set selected = selector.selectedKeys(); Iterator it = selected.iterator(); while (it.hasNext()) { //Reactor负责dispatch收到的事件 dispatch((SelectionKey) (it.next())); } selected.clear(); } } catch (IOException ex) { /* ... */ } } void dispatch(SelectionKey k) { Runnable r = (Runnable) (k.attachment()); //调用之前注册的callback对象 if (r != null) { r.run(); } } // inner class class Acceptor implements Runnable { public void run() { try { SocketChannel channel = serverSocket.accept(); if (channel != null) new Handler(selector, channel); } catch (IOException ex) { /* ... */ } } } }

Handler的代码如下:

package com.crazymakercircle.ReactorModel; import com.crazymakercircle.config.SystemConfig; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; class Handler implements Runnable { final SocketChannel channel; final SelectionKey sk; ByteBuffer input = ByteBuffer.allocate(SystemConfig.INPUT_SIZE); ByteBuffer output = ByteBuffer.allocate(SystemConfig.SEND_SIZE); static final int READING = 0, SENDING = 1; int state = READING; Handler(Selector selector, SocketChannel c) throws IOException { channel = c; c.configureBlocking(false); // Optionally try first read now sk = channel.register(selector, 0); //将Handler作为callback对象 sk.attach(this); //第二步,注册Read就绪事件 sk.interestOps(SelectionKey.OP_READ); selector.wakeup(); } boolean inputIsComplete() { /* ... */ return false; } boolean outputIsComplete() { /* ... */ return false; } void process() { /* ... */ return; } public void run() { try { if (state == READING) { read(); } else if (state == SENDING) { send(); } } catch (IOException ex) { /* ... */ } } void read() throws IOException { channel.read(input); if (inputIsComplete()) { process(); state = SENDING; // Normally also do first write now //第三步,接收write就绪事件 sk.interestOps(SelectionKey.OP_WRITE); } } void send() throws IOException { channel.write(output); //write完就结束了, 关闭select key if (outputIsComplete()) { sk.cancel(); } } }

这两段代码,是建立在JAVA NIO的基础上的,这两段代码建议一定要看懂。可以在IDE中去看源码,这样直观感觉更佳。 

4.1.2 单线程模式的缺点

  1. 当其中某个 handler 阻塞时, 会导致其他所有的 client 的 handler 都得不到执行, 并且更严重的是, handler 的阻塞也会导致整个服务不能接收新的 client 请求(因为 acceptor 也被阻塞了)。 因为有这么多的缺陷, 因此单线程Reactor 模型用的比较少。这种单线程模型不能充分利用多核资源,所以实际使用的不多。
  2. 因此,单线程模型仅仅适用于handler 中业务处理组件能快速完成的场景。

4.2 单Reactor多线程模型(也叫多线程模式)

4.2.1 基于线程池的改进

在线程Reactor模式基础上,做如下改进:

  1. 将Handler处理器的执行放入线程池,多线程进行业务处理。
  2. 而对于Reactor而言,可以仍为单个线程。如果服务器为多核的CPU,为充分利用系统资源,可以将Reactor拆分为两个线程。

一个简单的图如下:

image

4.2.2 改进后的完整示意图

下面的图,来自于“Scalable IO in Java”,和上面的图的意思,差不多,只是更加详细。Reactor是一条独立的线程,Hander 处于线程池中执行。

wpsC376.tmp

4.2.3 多线程模式参考代码

“Scalable IO in Java”,的多线程Reactor的参考代码,是基于单线程做一个线程池的改进,改进的Handler的代码如下:

package com.crazymakercircle.ReactorModel; import com.crazymakercircle.config.SystemConfig; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; class MthreadHandler implements Runnable { final SocketChannel channel; final SelectionKey selectionKey; ByteBuffer input = ByteBuffer.allocate(SystemConfig.INPUT_SIZE); ByteBuffer output = ByteBuffer.allocate(SystemConfig.SEND_SIZE); static final int READING = 0, SENDING = 1; int state = READING; ExecutorService pool = Executors.newFixedThreadPool(2); static final int PROCESSING = 3; MthreadHandler(Selector selector, SocketChannel c) throws IOException { channel = c; c.configureBlocking(false); // Optionally try first read now selectionKey = channel.register(selector, 0); //将Handler作为callback对象 selectionKey.attach(this); //第二步,注册Read就绪事件 selectionKey.interestOps(SelectionKey.OP_READ); selector.wakeup(); } boolean inputIsComplete() { /* ... */ return false; } boolean outputIsComplete() { /* ... */ return false; } void process() { /* ... */ return; } public void run() { try { if (state == READING) { read(); } else if (state == SENDING) { send(); } } catch (IOException ex) { /* ... */ } } synchronized void read() throws IOException { // ... channel.read(input); if (inputIsComplete()) { state = PROCESSING; //使用线程pool异步执行 pool.execute(new Processer()); } } void send() throws IOException { channel.write(output); //write完就结束了, 关闭select key if (outputIsComplete()) { selectionKey.cancel(); } } synchronized void processAndHandOff() { process(); state = SENDING; // or rebind attachment //process完,开始等待write就绪 selectionKey.interestOps(SelectionKey.OP_WRITE); } class Processer implements Runnable { public void run() { processAndHandOff(); } } }

4.3 多Reactor多线程模型(也叫主从多线程模式)

Reactor模型讲解

第三种模型比起第二种模型,是将Reactor分成两部分,

  1. mainReactor负责监听server socket,用来处理新连接的建立,将建立的socketChannel指定注册给subReactor。
  2. subReactor维护自己的selector, 基于mainReactor 注册的socketChannel多路分离IO读写事件,读写网 络数据,对业务处理的功能,另其扔给worker线程池来完成。

第三种模型中,我们可以看到,mainReactor 主要是用来处理网络IO 连接建立操作,通常一个线程就可以处理,而subReactor主要做和建立起来的socket做数据交互和事件业务处理操作,它的个数上一般是和CPU个数等同,每个subReactor一个县城来处理。

此种模型中,每个模块的工作更加专一,耦合度更低,性能和稳定性也大量的提升,支持的可并发客户端数量可达到上百万级别。

关于此种模型的应用,目前有很多优秀的矿建已经在应用了,比如mina 和netty 等。上述中去掉线程池的第三种形式的变种,也 是Netty NIO的默认模式。下一节我们将着重讲解netty的架构模式。

4.3.1 主从多线程模式参考代码

对于多个CPU的机器,为充分利用系统资源,将Reactor拆分为两部分。代码如下:

package com.crazymakercircle.ReactorModel; import java.io.IOException; import java.net.InetSocketAddress; import java.net.Socket; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; class MthreadReactor implements Runnable { //subReactors集合, 一个selector代表一个subReactor Selector[] selectors=new Selector[2]; int next = 0; final ServerSocketChannel serverSocket; MthreadReactor(int port) throws IOException { //Reactor初始化 selectors[0]=Selector.open(); selectors[1]= Selector.open(); serverSocket = ServerSocketChannel.open(); serverSocket.socket().bind(new InetSocketAddress(port)); //非阻塞 serverSocket.configureBlocking(false); //分步处理,第一步,接收accept事件 SelectionKey sk = serverSocket.register( selectors[0], SelectionKey.OP_ACCEPT); //attach callback object, Acceptor sk.attach(new Acceptor()); } public void run() { try { while (!Thread.interrupted()) { for (int i = 0; i <2 ; i++) { selectors[i].select(); Set selected = selectors[i].selectedKeys(); Iterator it = selected.iterator(); while (it.hasNext()) { //Reactor负责dispatch收到的事件 dispatch((SelectionKey) (it.next())); } selected.clear(); } } } catch (IOException ex) { /* ... */ } } void dispatch(SelectionKey k) { Runnable r = (Runnable) (k.attachment()); //调用之前注册的callback对象 if (r != null) { r.run(); } } class Acceptor { // ... public synchronized void run() throws IOException { SocketChannel connection = serverSocket.accept(); //主selector负责accept if (connection != null) { new Handler(selectors[next], connection); //选个subReactor去负责接收到的connection } if (++next == selectors.length) next = 0; } } }

4.3.2 主从多线程模式优缺点

优点

  1. 响应快,不必为单个同步时间所阻塞,虽然Reactor本身依然是同步的;
  2. 编程相对简单,可以最大程度的避免复杂的多线程及同步问题,并且避免了多线程/进程的切换开销;
  3. 可扩展性,可以方便的通过增加Reactor实例个数来充分利用CPU资源;
  4. 可复用性,reactor框架本身与具体事件处理逻辑无关,具有很高的复用性;

缺点

  1. 相比传统的简单模型,Reactor增加了一定的复杂性,因而有一定的门槛,并且不易于调试。
  2. Reactor模式需要底层的Synchronous Event Demultiplexer支持,比如Java中的Selector支持,操作系统的select系统调用支持,如果要自己实现Synchronous Event Demultiplexer可能不会有那么高效。
  3. Reactor模式在IO读写数据时还是在同一个线程中实现的,即使使用多个Reactor机制的情况下,那些共享一个Reactor的Channel如果出现一个长时间的数据读写,会影响这个Reactor中其他Channel的相应时间,比如在大文件传输时,IO操作就会影响其他Client的相应时间,因而对这种操作,使用传统的Thread-Per-Connection或许是一个更好的选择,或则此时使用改进版的Reactor模式如Proactor模式。
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/220159.html原文链接:https://javaforall.net

(0)
上一篇 2026年3月17日 下午9:06
下一篇 2026年3月17日 下午9:06


相关推荐

  • 运行怎么进入文件路径_cmd命令怎么进入某个文件夹

    运行怎么进入文件路径_cmd命令怎么进入某个文件夹1.通过Windows+R进入命令调出运行2.输入cmd进入命令窗口(默认的一般是c:\Users下的某个文件夹,例如我的是c:\Users\LML)3.若想进入c盘的其他文件路径下,可以通过在目录下输入cd..进入上一层目录,直到进入c盘根目录;通过命令行输入c:\cd+文件或文件夹路径 进入目标文件夹4.若想进入其他盘下的文件路径,通过在命令行默认路径后输入想进入的盘名加上冒号,例如:c:…

    2022年10月15日
    4
  • Python中的lambda表达式

    Python中的lambda表达式目录1.简约而不简单的lambda表达式1.1匿名函数基础1.2为什么要使用匿名函数?1.3Python函数式编程1.简约而不简单的lambda表达式在Python中,除了常规函数,你应该也会在代码中见到一些“非常规”函数,它们往往很简短,就一行,并且有个很酷炫的名字——lambda,没错,这就是匿名函数。匿名函数在实际工作中同样举足轻重,正确地运用匿名函数,能让我们的代码更简洁、易读。让我们一起来看下Python中简约而不简单的匿名函数。1.1匿名函数基础..

    2022年10月18日
    4
  • linux15:TCP端口状态说明「建议收藏」

    linux15:TCP端口状态说明「建议收藏」TCP状态转移要点TCP协议规定,对于已经建立的连接,网络双方要进行四次握手才能成功断开连接,如果缺少了其中某个步骤,将会使连接处于假死状态,连接本身占用的资源不会被释放。服务器程序要同时管理大量连接,所以很有必要保证无用连接完全断开,否则大量僵死的连接会浪费许多服务器资源。在众多TCP状态中,最值得注意的状态有两个:CLOSE_WAIT和TIME_WAIT。1、LISTENING状态FTP服务启动后首先处于监听(LISTENING)状态。2、ESTABLISHED状态ESTABLISH

    2022年8月11日
    8
  • 深信服SCSA安全工程师题库(方便大家复习备考)

    深信服SCSA安全工程师题库(方便大家复习备考)1、【EDR】下列哪个端口是紧急情况下EDR管理平台和客户端通信端口,即紧急情况下用于下发Agent重启、Agent卸载和Agent停止等指令。()A:443.0B:54120.0C:8083.0D:8088.0正确答案B2、【EDR】客户有7000个终端需要安装EDR客户端进行安全防护,请问推荐部署多少个EDR管理平台()A:1个B:2个C:4个D:6个正确答案C3、【EDR】EDR的Agent客户端不支持在以下哪种类型的终端上安装()A:WindowsServerB

    2022年6月20日
    50
  • navicat 15.0激活码【中文破解版】

    (navicat 15.0激活码)好多小伙伴总是说激活码老是失效,太麻烦,关注/收藏全栈君太难教程,2021永久激活的方法等着你。https://javaforall.net/100143.htmlIntelliJ2021最新激活注册码,破解教程可免费永久激活,亲测有效,上面是详细链接哦~MLZPB5EL5Q-eyJsaWNlbnNlSWQiOi…

    2022年3月21日
    49
  • java calendar 设置小时_Java Calendar类的时间操作[通俗易懂]

    java calendar 设置小时_Java Calendar类的时间操作[通俗易懂]JavaCalendar类时间操作,这也许是创建日历和管理最简单的一个方案,示范代码很简单,演示了获取时间,日期时间的累加和累减,以及比较。注意事项:Calendar的month从0开始,也就是全年12个月由0~11进行表示。而Calendar.DAY_OF_WEEK定义和值如下:Calendar.SUNDAY=1Calendar.MONDAY=2Calend…

    2022年4月30日
    216

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号