文章目录
1、netty的工作流程

- ChannelPipeline接口:每一个Channel通道都会有一个ChannelPipeline,责任链的核心组件,同时也是HandlerContext的管理容器(链表的形式管理),事件在各个ChannelHandlerContext中传递。我们可以把他理解为管道,netty的读、写、连接等事件在该通道“流动”。
- ChannelHandlerContext接口:ChannelHandler的上下文环境,所以其封装一个具体的ChannelHandler,同时为ChannelHandler运行提供一个线程环境ChannelHandlerInvoker对象。
- ChannelHandlerInvoker接口:ChannelHandler的运行的线程环境所有其持有一个EventExecutor(netty的线程调度对象)。
- ChannelHandler接口:真正对读、写等IO事件进行处理的对象,它可以选择性地拦截和处理自己感兴趣地事件,也可以透传和终止事件。IO处理事件不同业务处理模式不同,所以这里是netty暴露给业务代码的扩展点,一般来说使用netty主要是通过实现自定义的ChannelHandler。
- ChannelHandlerAdapter:ChannelHandler有很多接口方法,但是我们自定义实现的时候可能只关心自己感兴趣的事件,不必实现ChannelHandler的所有接口,Netty提供了ChannelHandlerAdapter基类它以透传事件的形式实现了所有接口,如果用户关心某个事件,只需要覆盖ChannelHandlerAdapter对应的方法即可。
2、ChannelPipeline接口介绍
2.1、相关概述
ChannelPipeline接口的默认实现为DefaultChannelPipeline,该类主要作用如下:
- 管理所有的ChannelHandlerContext对象,它是以双向列表的形式管理ChannelHandlerContext,其中提供了两个默认的头(head)尾(tail)ChannelHandlerContext对象,其中该类有大量的方法来添加(addXxx)、获取(getXxx)、移除(removeXxx) ChannelHandlerContext的方法
- 持有ChannelHandlerContext的线程运行环境ChannelHandlerInvoker ,他根据ChannelPipeline传入的EventExecutorGroup中的线程池对象创建对应的invoker 存放到Map
childInvokers中。
- ChannelHandlerContext对象还通过名字作为key存放到map集合name2ctx中。
2.2、核心方法
| 主要方法 | 描述 |
|---|---|
| addXxx/rmove/replace/get/last/first | 管理链表ChannelHandlerContext对象(增、删、改、查) |
| fireXxx | 触发其持有的Handler对应的方法比如fireChannelActive方法最终触发handler的channelActive(当前channel激活的时候触发) |
| bind/connect/disconnect/close/flush/read/write | 此类方法是其进行网络IO操作的功能方法 比如服务端绑定、客户端连接、关闭、读写事件。 |
2.2.1、addXxx/rmove/replace/get/last/first
这类方法主要是进行ChannelHandlerContext对象的增删查改,逻辑很相似所以这里只分析一个addLast方法,其他的请同学自行分析。addLast有两个重载方法区别是
@Override public ChannelPipeline addLast(String name, ChannelHandler handler) {
return addLast((ChannelHandlerInvoker) null, name, handler); } //包含EventExecutorGroup 有则会调用findInvoker为其线程创建一个DefaultChannelHandlerInvoker 如果没有group 则不创建 @Override public ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
return addLast(findInvoker(group), name, handler); } //添加多个handler @Override public ChannelPipeline addLast(EventExecutorGroup group,ChannelHandler ....handler) {
//循环遍历添加 并调用generateName方法为handler生产明细 addLast(invoker, generateName(h), h); }
最终调用如下方法
public ChannelPipeline addLast(ChannelHandlerInvoker invoker, final String name, ChannelHandler handler) {
synchronized (this) {
//检查名字是否重复 从name2ctx判断是否存在 checkDuplicateName(name); //Handler 包装成HandlerContext DefaultChannelHandlerContext newCtx = new DefaultChannelHandlerContext(this, invoker, name, handler); //执行具体的链表添加操作 addLast0(name, newCtx); } return this; }
addLast0()方法具体链表添加操作
private void addLast0(final String name, DefaultChannelHandlerContext newCtx) {
//检查Handler是否共享 如果是共享(ChannelHandler使用@Sharable注解修饰)则重复添加会报错 checkMultiplicity(newCtx); //添加链表 尾插 DefaultChannelHandlerContext prev = tail.prev; newCtx.prev = prev; newCtx.next = tail; prev.next = newCtx; tail.prev = newCtx; //并在map中存放一份 name2ctx.put(name, newCtx); //根据条件触发ChannelHandler的handlerAdded方法(添加标识该ChannelHandler已经添加完成可以使用) callHandlerAdded(newCtx); }
callHandlerAdded方法
private void callHandlerAdded(final DefaultChannelHandlerContext ctx) {
//每创建一个ctx 都会为其添加skipFlags(根据方法名标记其包含的处理方式) //判断skipFlag中是否包含MASK_HANDLER_ADDED 如果有说明ChannelHandler if ((ctx.skipFlags & DefaultChannelHandlerContext.MASK_HANDLER_ADDED) != 0) {
return; } //如果当前操作线程和channel对应的线程池对应的线程不一致 则将该操作包装成线程任务放入线程调度中执行 //这里确保channel所有操作都同一个线程中 即使用了多线程又避免了并发访问的问题 if (ctx.channel().isRegistered() && !ctx.executor().inEventLoop()) {
ctx.executor().execute(new Runnable() {
@Override public void run() {
callHandlerAdded0(ctx); } }); return; } //本身处于当前线程 则直接调用 最终调用ctx.handler().handlerAdded(ctx); callHandlerAdded0(ctx); }
2.2.2、fireXxx
fireXxx 有多个类似的方法逻辑是相似的 调用header或者tail的fireXxx,调用对应的findContextInbound() 根据skipFlags 找到对应的
ChannelHandlerContext对象的handler的相关方法(调用是通过其内部的线程环境对象ChannelHandlerInvoker实现)
这里以设置channel激活为类fireChannelActive方法
public ChannelHandlerContext fireChannelActive() {
//根据skipflag 从链表中查到对应的 DefaultChannelHandlerContext next = findContextInbound(MASK_CHANNEL_ACTIVE); //调用相关的ChannelHandler的api next.invoker.invokeChannelActive(next); return this; }
- fireChannelActive:设置channel为激活状态
- fireExceptionCaught:channel IO出现异常触发
- fireUserEventTriggered:
- fireChannelRead
- fireChannelReadComplete
- fireChannelWritabilityChanged
2.2.3、netty核心服务
netty进行网络IO操作的相关核心服务 比如服务端绑定bind、客户端连接connect、关闭disconnect/close、读写事件read/write。
netty核心服务调用方式逻辑和fireXxx逻辑类似其中都是调用其中的head/tail的同名服务(bind、connect),根据skipFlags 找到对应的
ChannelHandlerContext对象的handler的并调用相关方法。类似此处不再复述,但是不同在于fireXxx使用findContextInbound 从header头部进行获取handler,inbound事件是从headr头部进行传递而这里使用findContextOutbound从tail尾部获取Handler,outbound事件从尾部开始传递
//inbound事件 从头部header开始向后next遍历 private DefaultChannelHandlerContext findContextInbound(int mask) {
DefaultChannelHandlerContext ctx = this; do {
ctx = ctx.next; } while ((ctx.skipFlags & mask) != 0); return ctx; }
//outbound事件 从尾部tail开始向前prev遍历 private DefaultChannelHandlerContext findContextOutbound(int mask) {
DefaultChannelHandlerContext ctx = this; do {
ctx = ctx.prev; } while ((ctx.skipFlags & mask) != 0); //如果有skipFlags 对应标识则不调用对应的ChannelHandler return ctx; }
3、ChannelHandlerContext接口介绍
ChannelHandler的上下文环境,默认实现为DefaultChannelHandlerContext,在netty中对于ChannelHandler不能直接调用,而是需要通过ChannelHandlerContext来进行调用,所以在上面学习ChannelPipline接口中所有方法最终调用hanlerChannle的调用过程中都会经过DefaultChannelHandlerContext,DefaultChannelHandlerContext上相关方法和ChannelPipeline雷同(上面已经涉及)。但是在上面的findContextInbound/findContextOutbound 根据mask获取到链表中的DefaultChannelHandlerContext,有一个疑问是当前我们怎么获取到ctx的skipFlags属性。
DefaultChannelHandlerContext类列举了好多标识(16个)这些标识涵盖了netty 的channel通道中的所有事件。同时提供了一个skipFlagsCache map类型的集合数组(数据个数是依据JVM虚拟机处理器梳理,每一个处理器保存一个map),其只有一个构造函数在构件函数中调用skipFlags = skipFlags(handler); 获取到skipFlags,下面我们看看方法逻辑。
private static int skipFlags(ChannelHandler handler) {
//根据当前线程获取到一个map 缓存用于存放 key为handler的class value为handler对应的skipFlags(多个类型) WeakHashMap<Class<?>, Integer> cache = skipFlagsCache[(int) (Thread.currentThread().getId() % skipFlagsCache.length)]; Class<? extends ChannelHandler> handlerType = handler.getClass(); int flagsVal; synchronized (cache) {
//缓存已经存在则直接返回 Integer flags = cache.get(handlerType); if (flags != null) {
flagsVal = flags; } else {
//否则调用skipFlags0 来获取该ChannelHandler对应的所有不感兴趣的事件 并按照位运算存放待skipFlags flagsVal = skipFlags0(handlerType); //放入缓存 cache.put(handlerType, Integer.valueOf(flagsVal)); } } return flagsVal; }
上述方法逻辑比较简单,从这个简单逻辑中我们可以给skipFlags一个清晰的定义:skipFlags是保存ChannelHandler实现类的所有不感兴趣事件类型标识,一旦channel出现事件则会根据该标识判断对应的ChannelHandler是否感兴趣该事件,从而对事件进行处理。下面关注一下具体如何获取ChannelHandler的所有感兴趣的事件的skipFlags0方法
//该方法传入参数为ChannelHandler(以及子类)的class属性 private static int skipFlags0(Class<? extends ChannelHandler> handlerType) {
int flags = 0; try {
// 省略部分雷同事件代码.... //所有事件标识获取方式类似 通过方法名和参数反射对应的方法 如果方法使用@Skip注解修饰 //则为flags添加位标示 表明此事件此ChannelHandler不进行处理 if (handlerType.getMethod( "write", ChannelHandlerContext.class, Object.class, ChannelPromise.class).isAnnotationPresent(Skip.class)) {
flags |= MASK_WRITE; if (handlerType.getMethod( "flush", ChannelHandlerContext.class).isAnnotationPresent(Skip.class)) {
flags |= MASK_FLUSH; } } } catch (Exception e) {
PlatformDependent.throwException(e); } return flags; }
上述获取skipFlags标识是通过反射获取对应方法并判断方法是否使用@Skip注解添加标示,重点这里使用位操作 一则可以使用很少的字节标识更多的状态(这里两个字节可以标识16个状态)二是位运算比较高效。这里搭配findContextInbound/findContextOutbound方法很好的判断哪些ChannelHandler不处理对应事件。
4、ChannelHandlerInvoker接口
该接口作用很简单为调用ChannelHandler提供线程运行环境,主要是创建一个DefaultChannelHandlerInvoker 持有一个能提供线程环境的线程调度对象EventExecutor。该实现类中方法都很类似 我们以invokeChannelRegistered举例
//ChannelHandlerContext的该方法触发调用DefaultChannelHandlerInvoker next.invoker.invokeChannelRegistered(next); public void invokeChannelRegistered(final ChannelHandlerContext ctx) {
//如果当前操作在当前线程 不进行线程切换直接操作 if (executor.inEventLoop()) {
invokeChannelRegisteredNow(ctx); } else {
//否则将操作包装成线程任务 进行后续调度 executor.execute(new Runnable() {
@Override public void run() {
invokeChannelRegisteredNow(ctx); } }); } }
这样可以保证ChannelHandler的操作始终在同一个线程中处理,即使用了多线程有保证了不出现线程并发问题。
5、ChannelHandler接口
ChannelHandler是Netty中最终进行网络事件处理的接口,也是netty暴露给业务代码的扩展点,一般来说我们使用netty主要是通过实现实现自定义的ChannelHandler去处理业务逻辑。在netty5.0版本之前netty相对于Netty Channel与Java NIO Channel而言抽象出出站和入站。
- 数据入站,指的是数据从底层的Java NIO channel到Netty的Channel。 ChannelInboundHandler
- 数据出站,指的是通过Netty的Channel来操作底层的 Java NIO chanel。ChannelOutboundHandler
netty5.0以后不进行区分了但是为了方便记忆相关事件 我们还是进行一定程度区分
(1)数据入站 当Java NIO事件进站到Channel时,产生一的一系列事件将由ChannelHandler所对应的API处理
- 注册/解除注册事件 channelRegistered/channelUnregistered。
- 连接建立事件 channelActive。
- 读事件和读完成事件 channelRead、channelReadComplete。
- 异常通知事件 exceptionCaught。
- 用户自定义事件 userEventTriggered。
- Channel 可写状态变化事件 channelWritabilityChanged。
- 连接关闭事件 channelInactive。
(2)数据出站:当需要Netty Channel需要操作Java NIO底层Channel时
- . 端口绑定 bind。
- 连接服务端 connect。
- 写事件 write。
- 刷新事件 flush。
- 读事件 read
- 主动断开连接 disconnect
- 关闭 channel 事件 close
ChannelHandlerAdapter是netty为ChannelHandler接口提供的一个适配类,该类实现了ChannelHandler接口的所有方法 都是基于透传事件的形式,我们一般的业务自定义的ChannelHandler 只需要继承该Adapter类实现自己感兴趣的方法再添加到ChannelPipeline中。
//ChannelHandler适配器类有一个方法判断该ChannelHandler是否共享 如果贡献不允许重复添加 public boolean isSharable() { return getClass().isAnnotationPresent(Sharable.class); }
贴一个netty例子中netty客户端的Channelhandler自定义实现 这里只这里只感兴趣连接建立、读事件和异常事件
//继承ChannelHandlerAdapter 重写连接建立、读事件和异常事件 public class TimeClientHandler extends ChannelHandlerAdapter {
//读事件获取服务端响应 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg; byte[] bytes = new byte[buf.readableBytes()]; buf.readBytes(bytes); String response = new String(bytes,"UTF-8"); System.out.println("获取到服务端响应:"+response); } //连接建立与TCP连接 给服务器发送请求 @Override public void channelActive(ChannelHandlerContext ctx) throws Exception {
byte[] request = "QUERY TIME REQUEST".getBytes(); ByteBuf buffer = Unpooled.buffer(request.length); buffer.writeBytes(request); ctx.writeAndFlush(buffer); } //出现异常则关闭该Channelhandler @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close(); } }
发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/203406.html原文链接:https://javaforall.net
