netty的ChannelPipeline类学习

netty的ChannelPipeline类学习本篇博文从 Netty 处理事件的流程出发 引申出其事件处理的相关接口并对 ChannelHandl ChannelHandl ChannelHandl ChannelHandl 相关接口和默认实现类进行了了解和使用 是一篇不错的 netty 基础学习博文 同时为我们以后探求 netty 实现流程做相关准备操作

1、netty的工作流程

在这里插入图片描述

  1. ChannelPipeline接口:每一个Channel通道都会有一个ChannelPipeline,责任链的核心组件,同时也是HandlerContext的管理容器(链表的形式管理),事件在各个ChannelHandlerContext中传递。我们可以把他理解为管道,netty的读、写、连接等事件在该通道“流动”
  2. ChannelHandlerContext接口:ChannelHandler的上下文环境,所以其封装一个具体的ChannelHandler,同时为ChannelHandler运行提供一个线程环境ChannelHandlerInvoker对象。
  3. ChannelHandlerInvoker接口:ChannelHandler的运行的线程环境所有其持有一个EventExecutor(netty的线程调度对象)。
  4. ChannelHandler接口:真正对读、写等IO事件进行处理的对象,它可以选择性地拦截和处理自己感兴趣地事件,也可以透传和终止事件。IO处理事件不同业务处理模式不同,所以这里是netty暴露给业务代码的扩展点,一般来说使用netty主要是通过实现自定义的ChannelHandler。
  5. ChannelHandlerAdapter:ChannelHandler有很多接口方法,但是我们自定义实现的时候可能只关心自己感兴趣的事件,不必实现ChannelHandler的所有接口,Netty提供了ChannelHandlerAdapter基类它以透传事件的形式实现了所有接口,如果用户关心某个事件,只需要覆盖ChannelHandlerAdapter对应的方法即可。

2、ChannelPipeline接口介绍

2.1、相关概述

​ ChannelPipeline接口的默认实现为DefaultChannelPipeline,该类主要作用如下:

  1. 管理所有的ChannelHandlerContext对象,它是以双向列表的形式管理ChannelHandlerContext,其中提供了两个默认的头(head)尾(tail)ChannelHandlerContext对象,其中该类有大量的方法来添加(addXxx)、获取(getXxx)、移除(removeXxx) ChannelHandlerContext的方法
  2. 持有ChannelHandlerContext的线程运行环境ChannelHandlerInvoker ,他根据ChannelPipeline传入的EventExecutorGroup中的线程池对象创建对应的invoker 存放到Map

    childInvokers中。
  3. ChannelHandlerContext对象还通过名字作为key存放到map集合name2ctx中。

2.2、核心方法

主要方法 描述
addXxx/rmove/replace/get/last/first 管理链表ChannelHandlerContext对象(增、删、改、查)
fireXxx 触发其持有的Handler对应的方法比如fireChannelActive方法最终触发handler的channelActive(当前channel激活的时候触发)
bind/connect/disconnect/close/flush/read/write 此类方法是其进行网络IO操作的功能方法 比如服务端绑定、客户端连接、关闭、读写事件。
2.2.1、addXxx/rmove/replace/get/last/first

​ 这类方法主要是进行ChannelHandlerContext对象的增删查改,逻辑很相似所以这里只分析一个addLast方法,其他的请同学自行分析。addLast有两个重载方法区别是

@Override public ChannelPipeline addLast(String name, ChannelHandler handler) { 
    return addLast((ChannelHandlerInvoker) null, name, handler); } //包含EventExecutorGroup 有则会调用findInvoker为其线程创建一个DefaultChannelHandlerInvoker 如果没有group 则不创建 @Override public ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) { 
    return addLast(findInvoker(group), name, handler); } //添加多个handler @Override public ChannelPipeline addLast(EventExecutorGroup group,ChannelHandler ....handler) { 
    //循环遍历添加 并调用generateName方法为handler生产明细 addLast(invoker, generateName(h), h); } 

最终调用如下方法

public ChannelPipeline addLast(ChannelHandlerInvoker invoker, final String name, ChannelHandler handler) { 
    synchronized (this) { 
    //检查名字是否重复 从name2ctx判断是否存在 checkDuplicateName(name); //Handler 包装成HandlerContext DefaultChannelHandlerContext newCtx = new DefaultChannelHandlerContext(this, invoker, name, handler); //执行具体的链表添加操作 addLast0(name, newCtx); } return this; } 

addLast0()方法具体链表添加操作

private void addLast0(final String name, DefaultChannelHandlerContext newCtx) { 
    //检查Handler是否共享 如果是共享(ChannelHandler使用@Sharable注解修饰)则重复添加会报错 checkMultiplicity(newCtx); //添加链表 尾插  DefaultChannelHandlerContext prev = tail.prev; newCtx.prev = prev; newCtx.next = tail; prev.next = newCtx; tail.prev = newCtx; //并在map中存放一份 name2ctx.put(name, newCtx); //根据条件触发ChannelHandler的handlerAdded方法(添加标识该ChannelHandler已经添加完成可以使用) callHandlerAdded(newCtx); } 

callHandlerAdded方法

private void callHandlerAdded(final DefaultChannelHandlerContext ctx) { 
    //每创建一个ctx 都会为其添加skipFlags(根据方法名标记其包含的处理方式) //判断skipFlag中是否包含MASK_HANDLER_ADDED 如果有说明ChannelHandler if ((ctx.skipFlags & DefaultChannelHandlerContext.MASK_HANDLER_ADDED) != 0) { 
    return; } //如果当前操作线程和channel对应的线程池对应的线程不一致 则将该操作包装成线程任务放入线程调度中执行 //这里确保channel所有操作都同一个线程中 即使用了多线程又避免了并发访问的问题 if (ctx.channel().isRegistered() && !ctx.executor().inEventLoop()) { 
    ctx.executor().execute(new Runnable() { 
    @Override public void run() { 
    callHandlerAdded0(ctx); } }); return; } //本身处于当前线程 则直接调用 最终调用ctx.handler().handlerAdded(ctx); callHandlerAdded0(ctx); } 
2.2.2、fireXxx

​ fireXxx 有多个类似的方法逻辑是相似的 调用header或者tail的fireXxx,调用对应的findContextInbound() 根据skipFlags 找到对应的

ChannelHandlerContext对象的handler的相关方法(调用是通过其内部的线程环境对象ChannelHandlerInvoker实现)

这里以设置channel激活为类fireChannelActive方法

public ChannelHandlerContext fireChannelActive() { 
    //根据skipflag 从链表中查到对应的 DefaultChannelHandlerContext next = findContextInbound(MASK_CHANNEL_ACTIVE); //调用相关的ChannelHandler的api next.invoker.invokeChannelActive(next); return this; } 
  • fireChannelActive:设置channel为激活状态
  • fireExceptionCaught:channel IO出现异常触发
  • fireUserEventTriggered:
  • fireChannelRead
  • fireChannelReadComplete
  • fireChannelWritabilityChanged
2.2.3、netty核心服务

​ netty进行网络IO操作的相关核心服务 比如服务端绑定bind、客户端连接connect、关闭disconnect/close、读写事件read/write。

netty核心服务调用方式逻辑和fireXxx逻辑类似其中都是调用其中的head/tail的同名服务(bind、connect),根据skipFlags 找到对应的

ChannelHandlerContext对象的handler的并调用相关方法。类似此处不再复述,但是不同在于fireXxx使用findContextInbound 从header头部进行获取handler,inbound事件是从headr头部进行传递而这里使用findContextOutbound从tail尾部获取Handler,outbound事件从尾部开始传递

//inbound事件 从头部header开始向后next遍历 private DefaultChannelHandlerContext findContextInbound(int mask) { 
    DefaultChannelHandlerContext ctx = this; do { 
    ctx = ctx.next; } while ((ctx.skipFlags & mask) != 0); return ctx; } 
//outbound事件 从尾部tail开始向前prev遍历 private DefaultChannelHandlerContext findContextOutbound(int mask) { 
    DefaultChannelHandlerContext ctx = this; do { 
    ctx = ctx.prev; } while ((ctx.skipFlags & mask) != 0); //如果有skipFlags 对应标识则不调用对应的ChannelHandler return ctx; } 

3、ChannelHandlerContext接口介绍

​ ChannelHandler的上下文环境,默认实现为DefaultChannelHandlerContext,在netty中对于ChannelHandler不能直接调用,而是需要通过ChannelHandlerContext来进行调用,所以在上面学习ChannelPipline接口中所有方法最终调用hanlerChannle的调用过程中都会经过DefaultChannelHandlerContext,DefaultChannelHandlerContext上相关方法和ChannelPipeline雷同(上面已经涉及)。但是在上面的findContextInbound/findContextOutbound 根据mask获取到链表中的DefaultChannelHandlerContext,有一个疑问是当前我们怎么获取到ctx的skipFlags属性

​ DefaultChannelHandlerContext类列举了好多标识(16个)这些标识涵盖了netty 的channel通道中的所有事件。同时提供了一个skipFlagsCache map类型的集合数组(数据个数是依据JVM虚拟机处理器梳理,每一个处理器保存一个map),其只有一个构造函数在构件函数中调用skipFlags = skipFlags(handler); 获取到skipFlags,下面我们看看方法逻辑。

private static int skipFlags(ChannelHandler handler) { 
    //根据当前线程获取到一个map 缓存用于存放 key为handler的class value为handler对应的skipFlags(多个类型) WeakHashMap<Class<?>, Integer> cache = skipFlagsCache[(int) (Thread.currentThread().getId() % skipFlagsCache.length)]; Class<? extends ChannelHandler> handlerType = handler.getClass(); int flagsVal; synchronized (cache) { 
    //缓存已经存在则直接返回 Integer flags = cache.get(handlerType); if (flags != null) { 
    flagsVal = flags; } else { 
    //否则调用skipFlags0 来获取该ChannelHandler对应的所有不感兴趣的事件 并按照位运算存放待skipFlags flagsVal = skipFlags0(handlerType); //放入缓存 cache.put(handlerType, Integer.valueOf(flagsVal)); } } return flagsVal; } 

上述方法逻辑比较简单,从这个简单逻辑中我们可以给skipFlags一个清晰的定义:skipFlags是保存ChannelHandler实现类的所有不感兴趣事件类型标识,一旦channel出现事件则会根据该标识判断对应的ChannelHandler是否感兴趣该事件,从而对事件进行处理。下面关注一下具体如何获取ChannelHandler的所有感兴趣的事件的skipFlags0方法

//该方法传入参数为ChannelHandler(以及子类)的class属性 private static int skipFlags0(Class<? extends ChannelHandler> handlerType) { 
    int flags = 0; try { 
    // 省略部分雷同事件代码.... //所有事件标识获取方式类似 通过方法名和参数反射对应的方法 如果方法使用@Skip注解修饰 //则为flags添加位标示 表明此事件此ChannelHandler不进行处理 if (handlerType.getMethod( "write", ChannelHandlerContext.class, Object.class, ChannelPromise.class).isAnnotationPresent(Skip.class)) { 
    flags |= MASK_WRITE; if (handlerType.getMethod( "flush", ChannelHandlerContext.class).isAnnotationPresent(Skip.class)) { 
    flags |= MASK_FLUSH; } } } catch (Exception e) { 
    PlatformDependent.throwException(e); } return flags; } 

上述获取skipFlags标识是通过反射获取对应方法并判断方法是否使用@Skip注解添加标示,重点这里使用位操作 一则可以使用很少的字节标识更多的状态(这里两个字节可以标识16个状态)二是位运算比较高效。这里搭配findContextInbound/findContextOutbound方法很好的判断哪些ChannelHandler不处理对应事件。

4、ChannelHandlerInvoker接口

​ 该接口作用很简单为调用ChannelHandler提供线程运行环境,主要是创建一个DefaultChannelHandlerInvoker 持有一个能提供线程环境的线程调度对象EventExecutor。该实现类中方法都很类似 我们以invokeChannelRegistered举例

//ChannelHandlerContext的该方法触发调用DefaultChannelHandlerInvoker  next.invoker.invokeChannelRegistered(next); public void invokeChannelRegistered(final ChannelHandlerContext ctx) { 
    //如果当前操作在当前线程 不进行线程切换直接操作 if (executor.inEventLoop()) { 
    invokeChannelRegisteredNow(ctx); } else { 
    //否则将操作包装成线程任务 进行后续调度  executor.execute(new Runnable() { 
    @Override public void run() { 
    invokeChannelRegisteredNow(ctx); } }); } } 

这样可以保证ChannelHandler的操作始终在同一个线程中处理,即使用了多线程有保证了不出现线程并发问题。

5、ChannelHandler接口

​ ChannelHandler是Netty中最终进行网络事件处理的接口,也是netty暴露给业务代码的扩展点,一般来说我们使用netty主要是通过实现实现自定义的ChannelHandler去处理业务逻辑。在netty5.0版本之前netty相对于Netty Channel与Java NIO Channel而言抽象出出站和入站。

  • 数据入站,指的是数据从底层的Java NIO channel到Netty的Channel。 ChannelInboundHandler
  • 数据出站,指的是通过Netty的Channel来操作底层的 Java NIO chanel。ChannelOutboundHandler

    netty5.0以后不进行区分了但是为了方便记忆相关事件 我们还是进行一定程度区分

    (1)数据入站 当Java NIO事件进站到Channel时,产生一的一系列事件将由ChannelHandler所对应的API处理

  1. 注册/解除注册事件 channelRegistered/channelUnregistered。
  2. 连接建立事件 channelActive。
  3. 读事件和读完成事件 channelRead、channelReadComplete。
  4. 异常通知事件 exceptionCaught。
  5. 用户自定义事件 userEventTriggered。
  6. Channel 可写状态变化事件 channelWritabilityChanged。
  7. 连接关闭事件 channelInactive。

    (2)数据出站:当需要Netty Channel需要操作Java NIO底层Channel时

  8. . 端口绑定 bind。
  9. 连接服务端 connect。
  10. 写事件 write。
  11. 刷新事件 flush。
  12. 读事件 read
  13. 主动断开连接 disconnect
  14. 关闭 channel 事件 close

    ChannelHandlerAdapter是netty为ChannelHandler接口提供的一个适配类,该类实现了ChannelHandler接口的所有方法 都是基于透传事件的形式,我们一般的业务自定义的ChannelHandler 只需要继承该Adapter类实现自己感兴趣的方法再添加到ChannelPipeline中。

    //ChannelHandler适配器类有一个方法判断该ChannelHandler是否共享 如果贡献不允许重复添加 public boolean isSharable() { 
          return getClass().isAnnotationPresent(Sharable.class); } 

贴一个netty例子中netty客户端的Channelhandler自定义实现 这里只这里只感兴趣连接建立、读事件和异常事件

//继承ChannelHandlerAdapter 重写连接建立、读事件和异常事件 public class TimeClientHandler extends ChannelHandlerAdapter { 
    //读事件获取服务端响应 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 
    ByteBuf buf = (ByteBuf) msg; byte[] bytes = new byte[buf.readableBytes()]; buf.readBytes(bytes); String response = new String(bytes,"UTF-8"); System.out.println("获取到服务端响应:"+response); } //连接建立与TCP连接 给服务器发送请求 @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { 
    byte[] request = "QUERY TIME REQUEST".getBytes(); ByteBuf buffer = Unpooled.buffer(request.length); buffer.writeBytes(request); ctx.writeAndFlush(buffer); } //出现异常则关闭该Channelhandler @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 
    ctx.close(); } } 
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/203406.html原文链接:https://javaforall.net

(0)
上一篇 2026年3月19日 下午9:52
下一篇 2026年3月19日 下午9:53


相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号