Netty介绍
官网说明
1)netty是由JBOSS提供的一个java开源框架。Netty提供异步的,基于事件驱动的网络应用程序框架,用于快速开发高性能,高可靠的网络IO程序
2)netty可以帮助你快速,简单的开发一个网络应用,相当于简化和流程化NIO的开发流程
3)netty目前最流行的NIO框架,在互联网,大数据分布式计算领域,游戏行业,通信行业等有广泛的应用,知名的Es,Dubbo等框架内部都采用netty
Netty优点
1)设计优雅
2)使用方便
3)高性能,吞吐量高,延迟低,减少资源损耗,减少不必要的内存复制
4)安全,完整的ssl/tls 和start TLS支持
5)社区活跃
Netty工作原理示意图
说明
1) netty抽象出两组线程池 BossGroup专门负责接受客户端请求,WorkerGroup专门负责网络读写
2) BossGroup和WorkerGroup 类型都是NioEventLoopGroup
3)NioEventLoopGroup相当于一个事件循环组,这个组中含有多个事件循环 ,每个循环相当于NioEventLoop
4)NioEventLoop表示一个不断循环的执行处理任务的线程,每个NioEventLoop都有一个selector,用于监听绑定在其上的socket的网络通讯
5)NioEventLoopGroup可以有多个NioEventLoop
6)每个Boss NioEventLoop循环执行的步骤
- 轮询accept事件
- 处理accept事件,与client建立连接后,生成NioSocketChannel,并将其注册到某个worker NioEventLoop上的selector
- 处理任务队列的任务,即runAllTasks
- 每个Worker NIOEventLoop循环执行的步骤
- 轮询read,write事件
- 处理io事件,在对应的NioSocketChannel处理
- 处理任务队列的任务,即runAllTasks
8)每个Worker NioEventLoop处理业务的时候,会使用pipline,pipline中包含lchannel,即可以通过pipline可以获取对应的通道,管道中维护多个处理器。
9)NioEventLoop内部采用串行化设计,消息的读取-》解码-》处理-》编码-》发送,始终由IO线程的NioEventLoop负责
代码实例1 简单的聊天
实例要求:使用IDEA 创建Netty项目
1)Netty 服务器在 6668 端口监听,客户端能发送消息给服务器 “hello, 服务器~”
2)服务器可以回复消息给客户端 “hello, 客户端~”
server
/ * @author gusteu * @date 2022/04/16 21:50:33 */ public class NettyServer {
public static void main(String[] args) throws InterruptedException {
//创建NioEventLoopGroup NioEventLoopGroup bossGroup = new NioEventLoopGroup(); NioEventLoopGroup workerGroup = new NioEventLoopGroup(); //创建netty 引导类 ServerBootstrap serverBootstrap = new ServerBootstrap(); //配置 serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class)//使用NioServerSocketChannel作为服务器端通道的实现 .option(ChannelOption.SO_BACKLOG, 128)//设置线程队列中等待连接的个数 .childOption(ChannelOption.SO_KEEPALIVE, true)//保持活动的连接 .childHandler(new ChannelInitializer<SocketChannel>() {
@Override protected void initChannel(SocketChannel serverSocketChannel) throws Exception {
//添加自定义处理器 serverSocketChannel.pipeline().addLast(new NettyServerHandler()); } }); System.out.println("......Server is ready......"); ChannelFuture cf = serverBootstrap.bind(6666).sync(); //绑定端口 bind方法是异步的 sync方法是同步阻塞的 System.out.println("......Server is starting......"); //11. 关闭通道,关闭线程组 cf.channel().closeFuture().sync(); //异步 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } }
serverHandler
import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.CharsetUtil; / * @author gusteu * @date 2022/04/16 21:51:10 */ public class NettyServerHandler extends ChannelInboundHandlerAdapter {
//读取客户端消息 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("服务端:" + ctx); ByteBuf byteBuffer = (ByteBuf) msg; System.out.println("客户端发送的消息:" + byteBuffer.toString(CharsetUtil.UTF_8)); } //读取完成后返回个信息 @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(Unpooled.copiedBuffer("hello client(>^ω^<)喵", CharsetUtil.UTF_8)); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close(); } }
client
import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; / * @author gusteu * @date 2022/04/16 21:40:23 */ public class NettyClient {
public static void main(String[] args) throws InterruptedException {
//创建一个线程组 NioEventLoopGroup group = new NioEventLoopGroup(); //创建客户端启动对象 Bootstrap bootstrap = new Bootstrap(); //客户端配置 bootstrap.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() {
@Override protected void initChannel(SocketChannel socketChannel) throws Exception {
//pipline添加处理器 socketChannel.pipeline().addLast(new NettyClientHandler()); } }); //绑定服务器端口 //7.启动客户端去连接服务器端 connect方法是异步的 sync方法是同步阻塞的 ChannelFuture cf = bootstrap.connect("127.0.0.1", 6666).sync(); //8.关闭连接(异步非阻塞) cf.channel().closeFuture().sync(); } }
clientHandler
import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.CharsetUtil; / * @author gusteu * @date 2022/04/16 21:33:11 * 客户端业务处理类 */ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
//通道就绪事件 @Override public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("client: " + ctx); ctx.writeAndFlush(Unpooled.copiedBuffer("Hello Server", CharsetUtil.UTF_8)); } / * 读取数据事件 * * @param ctx * @param msg * @throws Exception */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuffer = (ByteBuf) msg; System.out.println("服务端返回的数据: " + byteBuffer.toString(CharsetUtil.UTF_8)); } }
Netty异步模型
Future-Listener机制
1)当Future对象刚刚创建的时候,处于非完成的状态,调用者可以通过返回ChannelFuture来获取操作执行的状态,注册监听函数来执行完成后的操作
serverBootstrap.bind(port).addListener(future -> {
if(future.isSuccess()) {
System.out.println(newDate() + ": 端口["+ port + "]绑定成功!"); } else{
System.err.println("端口["+ port + "]绑定失败!"); } });
小结:相比传统阻塞 I/O,执行 I/O 操作后线程会被阻塞住, 直到操作完成;异步处理的好处是不会造成线程阻塞,线程在 I/O 操作期间可以执行别的程序,在高并发情形下会更稳定和更高的吞吐量
代码实例2 Http服务
实例要求:使用IDEA 创建Netty项目
Netty 服务器在 6668 端口监听,浏览器发出请求 “http://localhost:6668/ “
服务器可以回复消息给客户端 “Hello! 我是服务器 5 ” , 并对特定请求资源进行过滤.
server
import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; / * @author gusteu * @date 2022/04/16 22:37:14 */ public class HttpNettyServer {
public static void main(String[] args) {
/ * 事件循环组,就是死循环 */ //仅仅接受连接,转给workerGroup,自己不做处理 EventLoopGroup bossGroup = new NioEventLoopGroup(); //真正处理 EventLoopGroup workerGroup = new NioEventLoopGroup(); try {
//很轻松的启动服务端代码 ServerBootstrap serverBootstrap = new ServerBootstrap(); //childHandler子处理器,传入一个初始化器参数TestServerInitializer(这里是自定义) //TestServerInitializer在channel被注册时,就会创建调用 serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class). childHandler(new HttpServerInitializer()); //绑定一个端口并且同步,生成一个ChannelFuture对象 ChannelFuture channelFuture = serverBootstrap.bind(6668).sync(); //对关闭的监听 channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) {
e.printStackTrace(); } finally {
//循环组优雅关闭 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
ChildChannel
import io.netty.channel.ChannelInitializer; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.http.HttpServerCodec; / * @author gusteu * @date 2022/04/16 22:37:41 */ public class HttpServerInitializer extends ChannelInitializer<SocketChannel> {
@Override protected void initChannel(SocketChannel socketChannel) throws Exception {
//添加解码 socketChannel.pipeline().addLast(new HttpServerCodec()); //添加自定义处理器 socketChannel.pipeline().addLast(new HttpServerHandler()); } }
自定义Handler
/ * @author gusteu * @date 2022/04/16 22:39:49 * 继承InboundHandler类,代表处理进入的请求,还有OutboundHandler,处理出去请求 * 其中里面的泛型表示msg的类型,如果指定了HttpObject,表明相互通讯的数据被封装成HttpObject */ public class HttpServerHandler extends SimpleChannelInboundHandler<HttpObject> {
int count = 4; // 通过这个可以看到在服务器 每一个客户端对应一个 独立的handler //channelRead0读取客户端请求,并返回响应的方法 @Override protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
//判断是不是HttpRequest if(msg instanceof HttpRequest){
System.out.println(msg.getClass()); System.out.println(ctx.channel().remoteAddress()); HttpRequest httpRequest= (HttpRequest) msg; URI uri=new URI(httpRequest.uri()); System.out.println("uri----"+uri.toString()); System.out.println("请求方法名:"+httpRequest.method().name()); //ByteBuf,neety中极为重要的概念,代表响应返回的数据 ByteBuf content = Unpooled.copiedBuffer("Hello! 我是服务器" + (++count), CharsetUtil.UTF_8); //构造一个http响应,HttpVersion.HTTP_1_1:采用http1.1协议,HttpResponseStatus.OK:状态码200 FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, content); response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain"); response.headers().set(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes()); //如果只是调用write方法,他仅仅是存在缓冲区里,并不会返回客户端 //调用writeAndFlush可以 ctx.writeAndFlush(response); } } / * 处理顺序如下: * handlerAdded * channelRegistered * channelActive * 请求方法名:GET(channelRead0) * (下面的表示的是断开连接后) * 1.如果是使用curl :连接会立刻关闭 * 2.如果是浏览器访问,http1.0:它是短连接,会立刻关闭。http1.1,是长连接,连接保持一段时间 * channelInactive * channelUnregistered * @param ctx * @throws Exception */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelActive"); super.channelActive(ctx); } @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelRegistered"); super.channelRegistered(ctx); } @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
System.out.println("handlerAdded"); super.handlerAdded(ctx); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelInactive"); super.channelInactive(ctx); } @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelUnregistered"); super.channelUnregistered(ctx); } }
Netty核心组件模块
Bootstrap和ServerBootstrap
Bootstrap 意思是引导,一个 Netty 应用通常由一个 Bootstrap 开始,主要作用是配置整个 Netty 程序,串联各个组件,Netty 中 Bootstrap 类是客户端程序的启动引导类,ServerBootstrap 是服务端启动引导类
Future和ChannelFuture
Netty 中所有的 IO 操作都是异步的,不能立刻得知消息是否被正确处理。但是可以过一会等它执行完成或者直接注册一个监听,具体的实现就是通过 Future 和 ChannelFutures,他们可以注册一个监听,当操作执行成功或失败时监听会自动触发注册的监听事件
Channel
Selector
ChannelHandler 及其实现类
Pipeline 和 ChannelPipeline
ChannelPipeline 是一个重点:
说明
1) 一个 Channel 包含了一个 ChannelPipeline,而 ChannelPipeline 中又维护了一个由 ChannelHandlerContext 组成的双向链表,并且每个 ChannelHandlerContext 中又关联着一个 ChannelHandler
2)入站事件和出站事件在一个双向链表中,入站事件会从链表 head 往后传递到最后一个入站的 handler,出站事件会从链表 tail 往前传递到最前一个出站的 handler,两种类型的 handler 互不干扰
ChannelHandlerContext
1)保存 Channel 相关的所有上下文信息,同时关联一个 ChannelHandler 对象,即ChannelHandlerContext 中 包 含 一 个 具 体 的 事 件 处 理 器 ChannelHandler , 同 时ChannelHandlerContext 中2)也绑定了对应的 pipeline 和 Channel 的信息,方便对 ChannelHandler进行调用.
ChannelOption
ChannelOption.SO_KEEPALIVE
一直保持连接活动状态
Unpooled 类
未完待续。。。。。。
发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/233449.html原文链接:https://javaforall.net