Netty实战《原理》

Netty实战《原理》Netty 介绍官网说明官网说明 1 netty 是由 JBOSS 提供的一个 java 开源框架 Netty 提供异步的 基于事件驱动的网络应用程序框架 用于快速开发高性能 高可靠的网络 IO 程序 2 netty 可以帮助你快速 简单的开发一个网络应用 相当于简化和流程化 NIO 的开发流程 3 netty 目前最流行的 NIO 框架 在互联网 大数据分布式计算领域 游戏行业 通信行业等有广泛的应用 知名的 Es Dubbo 等框架内部都采用 nettyc 官网说明

官网说明

1)netty是由JBOSS提供的一个java开源框架。Netty提供异步的,基于事件驱动的网络应用程序框架,用于快速开发高性能,高可靠的网络IO程序
2)netty可以帮助你快速,简单的开发一个网络应用,相当于简化和流程化NIO的开发流程
3)netty目前最流行的NIO框架,在互联网,大数据分布式计算领域,游戏行业,通信行业等有广泛的应用,知名的Es,Dubbo等框架内部都采用netty

Netty优点

1)设计优雅
2)使用方便
3)高性能,吞吐量高,延迟低,减少资源损耗,减少不必要的内存复制
4)安全,完整的ssl/tls 和start TLS支持
5)社区活跃



Netty工作原理示意图

在这里插入图片描述
说明
1) netty抽象出两组线程池 BossGroup专门负责接受客户端请求,WorkerGroup专门负责网络读写
2) BossGroup和WorkerGroup 类型都是NioEventLoopGroup
3)NioEventLoopGroup相当于一个事件循环组,这个组中含有多个事件循环 ,每个循环相当于NioEventLoop
4)NioEventLoop表示一个不断循环的执行处理任务的线程,每个NioEventLoop都有一个selector,用于监听绑定在其上的socket的网络通讯
5)NioEventLoopGroup可以有多个NioEventLoop
6)每个Boss NioEventLoop循环执行的步骤






  • 轮询accept事件
  • 处理accept事件,与client建立连接后,生成NioSocketChannel,并将其注册到某个worker NioEventLoop上的selector
  • 处理任务队列的任务,即runAllTasks
  1. 每个Worker NIOEventLoop循环执行的步骤
  • 轮询read,write事件
  • 处理io事件,在对应的NioSocketChannel处理
  • 处理任务队列的任务,即runAllTasks

8)每个Worker NioEventLoop处理业务的时候,会使用pipline,pipline中包含lchannel,即可以通过pipline可以获取对应的通道,管道中维护多个处理器。

9)NioEventLoop内部采用串行化设计,消息的读取-》解码-》处理-》编码-》发送,始终由IO线程的NioEventLoop负责

代码实例1 简单的聊天
实例要求:使用IDEA 创建Netty项目
1)Netty 服务器在 6668 端口监听,客户端能发送消息给服务器 “hello, 服务器~”
2)服务器可以回复消息给客户端 “hello, 客户端~”


server

/ * @author gusteu * @date 2022/04/16 21:50:33 */ public class NettyServer { 
    public static void main(String[] args) throws InterruptedException { 
    //创建NioEventLoopGroup NioEventLoopGroup bossGroup = new NioEventLoopGroup(); NioEventLoopGroup workerGroup = new NioEventLoopGroup(); //创建netty 引导类 ServerBootstrap serverBootstrap = new ServerBootstrap(); //配置 serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class)//使用NioServerSocketChannel作为服务器端通道的实现 .option(ChannelOption.SO_BACKLOG, 128)//设置线程队列中等待连接的个数 .childOption(ChannelOption.SO_KEEPALIVE, true)//保持活动的连接 .childHandler(new ChannelInitializer<SocketChannel>() { 
    @Override protected void initChannel(SocketChannel serverSocketChannel) throws Exception { 
    //添加自定义处理器 serverSocketChannel.pipeline().addLast(new NettyServerHandler()); } }); System.out.println("......Server is ready......"); ChannelFuture cf = serverBootstrap.bind(6666).sync(); //绑定端口 bind方法是异步的 sync方法是同步阻塞的 System.out.println("......Server is starting......"); //11. 关闭通道,关闭线程组 cf.channel().closeFuture().sync(); //异步 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } 

serverHandler

import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.CharsetUtil; / * @author gusteu * @date 2022/04/16 21:51:10 */ public class NettyServerHandler extends ChannelInboundHandlerAdapter { 
    //读取客户端消息 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 
    System.out.println("服务端:" + ctx); ByteBuf byteBuffer = (ByteBuf) msg; System.out.println("客户端发送的消息:" + byteBuffer.toString(CharsetUtil.UTF_8)); } //读取完成后返回个信息 @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { 
    ctx.writeAndFlush(Unpooled.copiedBuffer("hello client(>^ω^<)喵", CharsetUtil.UTF_8)); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 
    ctx.close(); } } 

client

import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; / * @author gusteu * @date 2022/04/16 21:40:23 */ public class NettyClient { 
    public static void main(String[] args) throws InterruptedException { 
    //创建一个线程组 NioEventLoopGroup group = new NioEventLoopGroup(); //创建客户端启动对象 Bootstrap bootstrap = new Bootstrap(); //客户端配置 bootstrap.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { 
    @Override protected void initChannel(SocketChannel socketChannel) throws Exception { 
    //pipline添加处理器 socketChannel.pipeline().addLast(new NettyClientHandler()); } }); //绑定服务器端口 //7.启动客户端去连接服务器端 connect方法是异步的 sync方法是同步阻塞的 ChannelFuture cf = bootstrap.connect("127.0.0.1", 6666).sync(); //8.关闭连接(异步非阻塞) cf.channel().closeFuture().sync(); } } 

clientHandler

import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.CharsetUtil; / * @author gusteu * @date 2022/04/16 21:33:11 * 客户端业务处理类 */ public class NettyClientHandler extends ChannelInboundHandlerAdapter { 
    //通道就绪事件 @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { 
    System.out.println("client: " + ctx); ctx.writeAndFlush(Unpooled.copiedBuffer("Hello Server", CharsetUtil.UTF_8)); } / * 读取数据事件 * * @param ctx * @param msg * @throws Exception */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 
    ByteBuf byteBuffer = (ByteBuf) msg; System.out.println("服务端返回的数据: " + byteBuffer.toString(CharsetUtil.UTF_8)); } } 

Netty异步模型

Future-Listener机制
1)当Future对象刚刚创建的时候,处于非完成的状态,调用者可以通过返回ChannelFuture来获取操作执行的状态,注册监听函数来执行完成后的操作

在这里插入图片描述

serverBootstrap.bind(port).addListener(future -> { 
    if(future.isSuccess()) { 
    System.out.println(newDate() + ": 端口["+ port + "]绑定成功!"); } else{ 
    System.err.println("端口["+ port + "]绑定失败!"); } }); 

小结:相比传统阻塞 I/O,执行 I/O 操作后线程会被阻塞住, 直到操作完成;异步处理的好处是不会造成线程阻塞,线程在 I/O 操作期间可以执行别的程序,在高并发情形下会更稳定和更高的吞吐量

代码实例2 Http服务
实例要求:使用IDEA 创建Netty项目
Netty 服务器在 6668 端口监听,浏览器发出请求 “http://localhost:6668/ “
服务器可以回复消息给客户端 “Hello! 我是服务器 5 ” , 并对特定请求资源进行过滤.


server

import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; / * @author gusteu * @date 2022/04/16 22:37:14 */ public class HttpNettyServer { 
    public static void main(String[] args) { 
    / * 事件循环组,就是死循环 */ //仅仅接受连接,转给workerGroup,自己不做处理 EventLoopGroup bossGroup = new NioEventLoopGroup(); //真正处理 EventLoopGroup workerGroup = new NioEventLoopGroup(); try { 
    //很轻松的启动服务端代码 ServerBootstrap serverBootstrap = new ServerBootstrap(); //childHandler子处理器,传入一个初始化器参数TestServerInitializer(这里是自定义) //TestServerInitializer在channel被注册时,就会创建调用 serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class). childHandler(new HttpServerInitializer()); //绑定一个端口并且同步,生成一个ChannelFuture对象 ChannelFuture channelFuture = serverBootstrap.bind(6668).sync(); //对关闭的监听 channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { 
    e.printStackTrace(); } finally { 
    //循环组优雅关闭 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } } 

ChildChannel

import io.netty.channel.ChannelInitializer; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.http.HttpServerCodec; / * @author gusteu * @date 2022/04/16 22:37:41 */ public class HttpServerInitializer extends ChannelInitializer<SocketChannel> { 
    @Override protected void initChannel(SocketChannel socketChannel) throws Exception { 
    //添加解码 socketChannel.pipeline().addLast(new HttpServerCodec()); //添加自定义处理器 socketChannel.pipeline().addLast(new HttpServerHandler()); } } 

自定义Handler

/ * @author gusteu * @date 2022/04/16 22:39:49 * 继承InboundHandler类,代表处理进入的请求,还有OutboundHandler,处理出去请求 * 其中里面的泛型表示msg的类型,如果指定了HttpObject,表明相互通讯的数据被封装成HttpObject */ public class HttpServerHandler extends SimpleChannelInboundHandler<HttpObject> { 
    int count = 4; // 通过这个可以看到在服务器 每一个客户端对应一个 独立的handler //channelRead0读取客户端请求,并返回响应的方法 @Override protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception { 
    //判断是不是HttpRequest if(msg instanceof HttpRequest){ 
    System.out.println(msg.getClass()); System.out.println(ctx.channel().remoteAddress()); HttpRequest httpRequest= (HttpRequest) msg; URI uri=new URI(httpRequest.uri()); System.out.println("uri----"+uri.toString()); System.out.println("请求方法名:"+httpRequest.method().name()); //ByteBuf,neety中极为重要的概念,代表响应返回的数据 ByteBuf content = Unpooled.copiedBuffer("Hello! 我是服务器" + (++count), CharsetUtil.UTF_8); //构造一个http响应,HttpVersion.HTTP_1_1:采用http1.1协议,HttpResponseStatus.OK:状态码200 FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, content); response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain"); response.headers().set(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes()); //如果只是调用write方法,他仅仅是存在缓冲区里,并不会返回客户端 //调用writeAndFlush可以 ctx.writeAndFlush(response); } } / * 处理顺序如下: * handlerAdded * channelRegistered * channelActive * 请求方法名:GET(channelRead0) * (下面的表示的是断开连接后) * 1.如果是使用curl :连接会立刻关闭 * 2.如果是浏览器访问,http1.0:它是短连接,会立刻关闭。http1.1,是长连接,连接保持一段时间 * channelInactive * channelUnregistered * @param ctx * @throws Exception */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { 
    System.out.println("channelActive"); super.channelActive(ctx); } @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { 
    System.out.println("channelRegistered"); super.channelRegistered(ctx); } @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { 
    System.out.println("handlerAdded"); super.handlerAdded(ctx); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { 
    System.out.println("channelInactive"); super.channelInactive(ctx); } @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { 
    System.out.println("channelUnregistered"); super.channelUnregistered(ctx); } } 

Netty核心组件模块

Bootstrap和ServerBootstrap

Bootstrap 意思是引导,一个 Netty 应用通常由一个 Bootstrap 开始,主要作用是配置整个 Netty 程序,串联各个组件,Netty 中 Bootstrap 类是客户端程序的启动引导类,ServerBootstrap 是服务端启动引导类

Future和ChannelFuture

Netty 中所有的 IO 操作都是异步的,不能立刻得知消息是否被正确处理。但是可以过一会等它执行完成或者直接注册一个监听,具体的实现就是通过 Future 和 ChannelFutures,他们可以注册一个监听,当操作执行成功或失败时监听会自动触发注册的监听事件

Channel

Selector

ChannelHandler 及其实现类

Pipeline 和 ChannelPipeline

ChannelPipeline 是一个重点:

在这里插入图片描述
说明
1) 一个 Channel 包含了一个 ChannelPipeline,而 ChannelPipeline 中又维护了一个由 ChannelHandlerContext 组成的双向链表,并且每个 ChannelHandlerContext 中又关联着一个 ChannelHandler
2)入站事件和出站事件在一个双向链表中,入站事件会从链表 head 往后传递到最后一个入站的 handler,出站事件会从链表 tail 往前传递到最前一个出站的 handler,两种类型的 handler 互不干扰


ChannelHandlerContext

1)保存 Channel 相关的所有上下文信息,同时关联一个 ChannelHandler 对象,即ChannelHandlerContext 中 包 含 一 个 具 体 的 事 件 处 理 器 ChannelHandler , 同 时ChannelHandlerContext 中2)也绑定了对应的 pipeline 和 Channel 的信息,方便对 ChannelHandler进行调用.

ChannelOption

ChannelOption.SO_KEEPALIVE
一直保持连接活动状态

Unpooled 类

未完待续。。。。。。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/233449.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • GoLand 2022.01.21 激活码_在线激活2022.02.01「建议收藏」

    (GoLand 2022.01.21 激活码)好多小伙伴总是说激活码老是失效,太麻烦,关注/收藏全栈君太难教程,2021永久激活的方法等着你。https://javaforall.net/100143.htmlIntelliJ2021最新激活注册码,破解教程可免费永久激活,亲测有效,上面是详细链接哦~CJM5ZJBPHS-eyJsaWNlbnNlSWQiOi…

    2022年3月31日
    78
  • OnContextMenu事件

    OnContextMenu事件用oncontextmenu事件单禁用右键菜单 一个页面中,BODY中用oncontextmenu='returnfalse'来取消鼠标右键;在JS中设置oncontext

    2022年7月2日
    24
  • wireshark抓包分析UDP

    wireshark抓包分析UDP1 什么是 UDP 传输层有两个协议 之前讨论过的 TCP 协议和现在要说的 UDP 协议 二者互为补充 UDP 是无连接的协议 它无需经过繁琐的握手就能建立连接并且发送已封装的 IP 数据包 它能做的事情很少 而面向连接的 TCP 协议几乎可以做所有事情 特点 UDP 最大的三个特点是无连接 不可靠 快速传输 UDP 提供了无连接通信 且不对传送数据包进行可靠性保证 适合于一次传输少量数据 UDP 传输的可靠性由应用层负责 常用的 UDP 端口号有 53 DNS 69 TFTP 161 SNMP UDP 报文没有可

    2025年6月20日
    0
  • 修改计算机用户名 未识别网络,电脑出现未识别的网络,无Internet访问解决办法 这几步你要了解…[通俗易懂]

    修改计算机用户名 未识别网络,电脑出现未识别的网络,无Internet访问解决办法 这几步你要了解…[通俗易懂]网卡驱动问题01在桌面的计算机图标上右键单击一下,选择【设备管理器】,然后在设备管理器页面下方找到【网络适配器】。02打开网络适配器列表,找到当前的网卡驱动,用鼠标右键点击一下,在弹出的菜单中选择【禁用】,接着再点击【启用】。好了,以上就是大致内容了,(END)更改适配器设置01在电脑桌面上找到网络图标,右键点击一下,在菜单中选择【属性】,然后会进入到网络属性的主界面。02接着,在属性界面的左侧菜…

    2022年10月14日
    0
  • mysql全文索引实现搜索功能(关键词查询)

    mysql全文索引实现搜索功能(关键词查询)最近在做一个关键词查询功能。所以开始了解mysql的全文索引技术。接下来我将一步一步告诉大家。我是如何一步一步实现关键词检索的。1.了解到mysql全文检索是以词为基础的。MySQL默认的分词是所有非字母和数字的特殊符号都是分词符。所以我存在数据库的样子是这样的。(左边的字段用于显示,右边的字段用于全文查询)2.全文检索的sqlSELECT*FROMtbk_item_coupon…

    2022年6月21日
    43
  • ubuntu通过conda安装tensorflow

    ubuntu通过conda安装tensorflowubuntu版本:16.04,Anaconda3版本:5.3.1 tensorflow没有32位的!tensorflow没有32位的!tensorflow没有32位的! 安装Anaconda 通过conda来创建tensorflow虚拟环境:conda-ntensorflowpython=3.6 激活刚刚创建的虚拟环境:condaactiva…

    2022年6月19日
    31

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号