《Dubbo进阶一》——RPC协议底层原理

《Dubbo进阶一》——RPC协议底层原理一RPC协议简介在一个典型的RPC的使用场景中,包含了服务发现、负载、容错、序列化和网络传输等组件,其中RPC协议指明了程序如何进行序列化和网络传输,也就是说一个RPC协议的实现等于一个非透明的RPC调用。简单来说,分布式框架的核心是RPC框架,RPC框架的核心是RPC协议。二协议的基本组成IP:服务提供者的地址端口:协议指定开放端口运行服务(1)netty(2)mima…

大家好,又见面了,我是你们的朋友全栈君。

一 RPC协议简介

在一个典型的RPC的使用场景中,包含了服务发现、负载、容错、序列化和网络传输等组件,其中RPC协议指明了程序如何进行序列化和网络传输,也就是说一个RPC协议的实现等于一个非透明的RPC调用。
在这里插入图片描述
简单来说,分布式框架的核心是RPC框架,RPC框架的核心是RPC协议

dubbo 支持的RPC协议列表

名称 实现描述 连接描述 使用场景
dubbo 传输服务: mina, netty(默认), grizzy; 序列化: dubbo, hessian2(默认), java, fastjson。 自定义报文 单个长连接NIO;异步传输 1.常规RPC调用 2.传输数据量小 3.提供者少于消费者
rmi 传输:java rmi 服务; 序列化:java原生二进制序列化 多个短连接; BIO同步传输 1.常规RPC调用 2.与原RMI客户端集成 3.可传少量文件 4.不支持防火墙穿透
hessian 传输服务:servlet容器; 序列化:hessian二进制序列化 基于Http 协议传输,依懒servlet容器配置 1.提供者多于消费者 2.可传大字段和文件 3.跨语言调用
http 传输服务:servlet容器; 序列化:http表单 依懒servlet容器配置 1、数据包大小混合
thrift 与thrift RPC 实现集成,并在其基础上修改了报文头 长连接、NIO异步传输

(PS:本文只探讨dubbo协议)

二 协议的基本组成

在这里插入图片描述

  1. IP:服务提供者的地址
  2. 端口:协议指定开放端口
  3. 运行服务
    (1)netty
    (2)mima
    (3)rmi
    (4)servlet容器(Jetty、Tomcat、Jboss)
  4. 协议报文编码
  5. 序列化方式
    (1)Hessian2Serialization
    (2)DubboSerialization
    (3)JavaSerialization
    (4)JsonSerialization

三 Duboo的RPC协议报文

先看下http协议报文格式
在这里插入图片描述
在这里插入图片描述
同样,Dubbo也有自己的报文格式
在这里插入图片描述
以head+request body或head+response body的形式存在

  • head
    1标志位:表明是请求还是响应还是事件
    2status:表明状态是OK还是不OK
  • request body
    1Dubbo版本号
    2接口路径
    3接口版本
    4方法名称
    5参数类型
    6参数值
  • response body
    1结果标志(无结果、有结果、异常)
    2结果

协议的编解码过程:
在这里插入图片描述

四 源码探究

以明晰编码解码和序列化反序列化为目的探究源码。其实就是如上图所示的协议的编解码过程。

com.alibaba.dubbo.rpc.protocol.dubbo.DubboCodec是很重要的一个类,无论是request还是response,还有编码解码都在这里类进行调度。

DubboCodec:
在这里插入图片描述
其中重点关注三个方法
decodeBody():解码(请求或响应)以及序列化和反序列化
encodeRequestData():编码请求(发生在Consumer)
encodeResponseData():编码响应(发生在Provider)

1.编码序列化request

发生在Consumer发请求之前
encodeRequestData()

protected void encodeRequestData(Channel channel, ObjectOutput out, Object data) throws IOException {
        RpcInvocation inv = (RpcInvocation)data;
        out.writeUTF(inv.getAttachment("dubbo", DUBBO_VERSION));
        out.writeUTF(inv.getAttachment("path"));
        out.writeUTF(inv.getAttachment("version"));
        out.writeUTF(inv.getMethodName());
        out.writeUTF(ReflectUtils.getDesc(inv.getParameterTypes()));
        Object[] args = inv.getArguments();
        if (args != null) {
            for(int i = 0; i < args.length; ++i) {
                out.writeObject(CallbackServiceCodec.encodeInvocationArgument(channel, inv, i));
            }
        }

        out.writeObject(inv.getAttachments());
    }

参数ObjectOutput是序列化接口,具体调用什么实现类有配置决定,如没有则默认是hessian2。能用的子类(序列化方式)如下
在这里插入图片描述

RpcInvocation拿到datadata是请求的基本内容,也就是第三部分所说的request body的六个模块:Dubbo版本号、接口路径、接口版本、方法名称、参数类型、参数值。
writeUTF()将版本号、接口路径、接口版本、方法名和参数称写进序列化类。
最后的writeObject() 通过配置的序列化方式调用相应的实现类进行序列化,如在protocol配置了serialization=“fastjson”,将调用FastJsonObjectOutput实现类的writeObject()
在这里插入图片描述
编码序列化request完成

2.编码序列化response

发生在Provider发出响应之前。
encodeResponseData

protected void encodeResponseData(Channel channel, ObjectOutput out, Object data) throws IOException {
        Result result = (Result)data;
        Throwable th = result.getException();
        if (th == null) {
            Object ret = result.getValue();
            if (ret == null) {
                out.writeByte((byte)2);
            } else {
                out.writeByte((byte)1);
                out.writeObject(ret);
            }
        } else {
            out.writeByte((byte)0);
            out.writeObject(th);
        }

    }

过程与编码序列化request类似且较为简单,不再多说。

3.解码反序列化request和response

解码反序列化request发生在Provider;解码反序列化response发生在Consumer。两个方法在同个方法中,就一起讲了。

protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
       byte flag = header[2];
       byte proto = (byte)(flag & 31);
       Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto);
       long id = Bytes.bytes2long(header, 4);
       if ((flag & -128) == 0) {
           Response res = new Response(id);
           if ((flag & 32) != 0) {
               res.setEvent(Response.HEARTBEAT_EVENT);
           }

           byte status = header[3];
           res.setStatus(status);
           if (status == 20) {
               try {
                   Object data;
                   if (res.isHeartbeat()) {
                       data = this.decodeHeartbeatData(channel, this.deserialize(s, channel.getUrl(), is));
                   } else if (res.isEvent()) {
                       data = this.decodeEventData(channel, this.deserialize(s, channel.getUrl(), is));
                   } else {
                       DecodeableRpcResult result;
                       if (channel.getUrl().getParameter("decode.in.io", true)) {
                           result = new DecodeableRpcResult(channel, res, is, (Invocation)this.getRequestData(id), proto);
                           result.decode();
                       } else {
                           result = new DecodeableRpcResult(channel, res, new UnsafeByteArrayInputStream(this.readMessageData(is)), (Invocation)this.getRequestData(id), proto);
                       }

                       data = result;
                   }

                   res.setResult(data);
               } catch (Throwable var13) {
                   if (log.isWarnEnabled()) {
                       log.warn("Decode response failed: " + var13.getMessage(), var13);
                   }

                   res.setStatus((byte)90);
                   res.setErrorMessage(StringUtils.toString(var13));
               }
           } else {
               res.setErrorMessage(this.deserialize(s, channel.getUrl(), is).readUTF());
           }

           return res;
       } else {
           Request req = new Request(id);
           req.setVersion("2.0.0");
           req.setTwoWay((flag & 64) != 0);
           if ((flag & 32) != 0) {
               req.setEvent(Request.HEARTBEAT_EVENT);
           }

           try {
               Object data;
               if (req.isHeartbeat()) {
                   data = this.decodeHeartbeatData(channel, this.deserialize(s, channel.getUrl(), is));
               } else if (req.isEvent()) {
                   data = this.decodeEventData(channel, this.deserialize(s, channel.getUrl(), is));
               } else {
                   DecodeableRpcInvocation inv;
                   if (channel.getUrl().getParameter("decode.in.io", true)) {
                       inv = new DecodeableRpcInvocation(channel, req, is, proto);
                       inv.decode();
                   } else {
                       inv = new DecodeableRpcInvocation(channel, req, new UnsafeByteArrayInputStream(this.readMessageData(is)), proto);
                   }

                   data = inv;
               }

               req.setData(data);
           } catch (Throwable var14) {
               if (log.isWarnEnabled()) {
                   log.warn("Decode request failed: " + var14.getMessage(), var14);
               }

               req.setBroken(true);
               req.setData(var14);
           }

           return req;
       }
   }

需要注意的是来到这个方法表明请求头已经处理好,现在是处理body。
flag通过header拿到标志位。
第一个if语句(flag & -128) == 0,实际上是在判断是request还是response,若为true为response,也就是Consumer要解码反序列化从Provider发来的响应;若为false为request,也就是Provider要解码反序列化从Consumer发来的请求。

(1)解码反序列化request

(flag & -128) == 0为false时,进入else执行体,在服务端进行操作。
if ((flag & 32) != 0)在判断是否时一个心跳事件,心跳事件时为了检测连接是否断开以备重连。
if (req.isHeartbeat())判断是否时一个心跳事件,else if (req.isEvent())判断是否时一个事件
排除了这两个之后就是真正的request。
inv拿到request相关参数,inv.decode()进行解码和反序列化。
调用DecodeableRpcInvocationdecode()方法如下

public Object decode(Channel channel, InputStream input) throws IOException {
        ObjectInput in = CodecSupport.getSerialization(channel.getUrl(), this.serializationType).deserialize(channel.getUrl(), input);
        this.setAttachment("dubbo", in.readUTF());
        this.setAttachment("path", in.readUTF());
        this.setAttachment("version", in.readUTF());
        this.setMethodName(in.readUTF());

        try {
            String desc = in.readUTF();
            Object[] args;
            Class[] pts;
            if (desc.length() == 0) {
                pts = DubboCodec.EMPTY_CLASS_ARRAY;
                args = DubboCodec.EMPTY_OBJECT_ARRAY;
            } else {
                pts = ReflectUtils.desc2classArray(desc);
                args = new Object[pts.length];

                for(int i = 0; i < args.length; ++i) {
                    try {
                        args[i] = in.readObject(pts[i]);
                    } catch (Exception var9) {
                        if (log.isWarnEnabled()) {
                            log.warn("Decode argument failed: " + var9.getMessage(), var9);
                        }
                    }
                }
            }

            this.setParameterTypes(pts);
            Map<String, String> map = (Map)in.readObject(Map.class);
            if (map != null && map.size() > 0) {
                Map<String, String> attachment = this.getAttachments();
                if (attachment == null) {
                    attachment = new HashMap();
                }

                ((Map)attachment).putAll(map);
                this.setAttachments((Map)attachment);
            }

            for(int i = 0; i < args.length; ++i) {
                args[i] = CallbackServiceCodec.decodeInvocationArgument(channel, this, pts, i, args[i]);
            }

            this.setArguments(args);
            return this;
        } catch (ClassNotFoundException var10) {
            throw new IOException(StringUtils.toString("Read invocation data failed.", var10));
        }
    }

其中ObjectInput选择的序列化方式实现子类依然时根据配置文件来的,只有与客户端序列化的方式一样才能反序列化成功。接下来是逐个readUTF()解码request body的模块。try代码块里的readUTF()解码出参数类型和参数值。最后将dubbo的隐式参数也一同设置进去Map<String, String> map = (Map)in.readObject(Map.class),到这里DecodeableRpcInvocation拿到所有相关参数,后续可以进行业务操作。
解码反序列化request完成

(2)解码反序列化response

(flag & -128) == 0为true时,进入if执行体,在客户端进行操作。
if ((flag & 32) != 0)在判断是否时一个心跳事件,心跳事件时为了检测连接是否断开以备重连。
status从header拿到状态码,如果不等于20,直接进入else执行错误信息写入到responseres.setErrorMessage()
if (req.isHeartbeat()判断是否时一个心跳事件,else if (req.isEvent()判断是否时一个事件
排除了这两个之后就是真正的response。
result拿到response相关参数,result .decode()进行解码和反序列化。
调用DecodeableRpcResultdecode()方法如下

public Object decode(Channel channel, InputStream input) throws IOException {
        ObjectInput in = CodecSupport.getSerialization(channel.getUrl(), this.serializationType).deserialize(channel.getUrl(), input);
        byte flag = in.readByte();
        switch(flag) {
        case 0:
            try {
                Object obj = in.readObject();
                if (!(obj instanceof Throwable)) {
                    throw new IOException("Response data error, expect Throwable, but get " + obj);
                }

                this.setException((Throwable)obj);
                break;
            } catch (ClassNotFoundException var6) {
                throw new IOException(StringUtils.toString("Read response data failed.", var6));
            }
        case 1:
            try {
                Type[] returnType = RpcUtils.getReturnTypes(this.invocation);
                this.setValue(returnType != null && returnType.length != 0 ? (returnType.length == 1 ? in.readObject((Class)returnType[0]) : in.readObject((Class)returnType[0], returnType[1])) : in.readObject());
            } catch (ClassNotFoundException var7) {
                throw new IOException(StringUtils.toString("Read response data failed.", var7));
            }
        case 2:
            break;
        default:
            throw new IOException("Unknown result flag, expect '0' '1' '2', get " + flag);
        }

        return this;
    }

一开始就调用getSerialization()进行反序列化,然后赋给ObjectInput。
判断flag,0为发生异常,并处理异常信息;2为没值,直接退出方法。
当等于1时对response进行解码,调用setValue()将信息读出来。
解码反序列化response完成

4.业务调用

在这里插入图片描述
了解是如何编码序列化等操作之后,最后看下服务端接收到请求整个流程是如何调用的。(客户端接收到响应类似)
在这里插入图片描述
以dubbo默认的传输服务netty为例,存在一个重要的类:
com\alibaba\dubbo\remoting\transport\netty\NettyServer.class
(客户端为NettyClient)
在这里插入图片描述
其中的doOpen()方法,表示打开服务

protected void doOpen() throws Throwable {
        NettyHelper.setNettyLoggerFactory();
        this.bootstrap = new ClientBootstrap(channelFactory);
        this.bootstrap.setOption("keepAlive", true);
        this.bootstrap.setOption("tcpNoDelay", true);
        this.bootstrap.setOption("connectTimeoutMillis", this.getTimeout());
        final NettyHandler nettyHandler = new NettyHandler(this.getUrl(), this);
        this.bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
            public ChannelPipeline getPipeline() {
                NettyCodecAdapter adapter = new NettyCodecAdapter(NettyClient.this.getCodec(), NettyClient.this.getUrl(), NettyClient.this);
                ChannelPipeline pipeline = Channels.pipeline();
                pipeline.addLast("decoder", adapter.getDecoder());
                pipeline.addLast("encoder", adapter.getEncoder());
                pipeline.addLast("handler", nettyHandler);
                return pipeline;
            }
        });
    }

三个pipeline.addLast()操作对应解码、编码以及解码后的操作。编解码上面已经说过,这里主要探究解码后的操作。

解码完成后带着参数发起对AllDispatcher类的调用

public class AllDispatcher implements Dispatcher {
    public static final String NAME = "all";

    public AllDispatcher() {
    }

    public ChannelHandler dispatch(ChannelHandler handler, URL url) {
        return new AllChannelHandler(handler, url);
    }
}

可以看到它又调用了ChannelHandler接口来处理,最终是返回调用AllChannelHandler实现类。
在这里插入图片描述
其中在received()方法中进行线程派发

public void received(Channel channel, Object message) throws RemotingException {
        ExecutorService cexecutor = this.getExecutorService();

        try {
            cexecutor.execute(new ChannelEventRunnable(channel, this.handler, ChannelState.RECEIVED, message));
        } catch (Throwable var8) {
            if (message instanceof Request && var8 instanceof RejectedExecutionException) {
                Request request = (Request)message;
                if (request.isTwoWay()) {
                    String msg = "Server side(" + this.url.getIp() + "," + this.url.getPort() + ") threadpool is exhausted ,detail msg:" + var8.getMessage();
                    Response response = new Response(request.getId(), request.getVersion());
                    response.setStatus((byte)100);
                    response.setErrorMessage(msg);
                    channel.send(response);
                    return;
                }
            }

            throw new ExecutionException(message, channel, this.getClass() + " error when process received event .", var8);
        }
    }

传进来的参数Object message包含request。
ExecutorService cexecutor拿到对应的线程池。
调用cexecutor.execute()执行,执行时调用了ChannelEventRunnable,在ChannelEventRunnable这个类的run()方法就调用了我们自己写的业务方法。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/144671.html原文链接:https://javaforall.net

(0)
上一篇 2022年5月19日 上午6:00
下一篇 2022年5月19日 上午6:00


相关推荐

  • 使用内存盘加快开发效率 (UltraRAMDisk,Jetbrains Idea,java)

    使用内存盘加快开发效率 (UltraRAMDisk,Jetbrains Idea,java)环境:Windows1020H2IntelliJIDEA2020.2.4x64JDK1.8内存盘软件:内存盘软件ultraramdisk官方地址CSDN资源链接://TODO硬件:i797001TB机械盘ddr426668G*2步骤0:内存大小根据实际内存去分配合理的大小动态分配内存根据需要备份和恢复根据自身需要,(关机速度会很慢内存盘内所有数据会写入到该镜像文件内)步骤1:我这边是选择直接将已有的idea软件…

    2022年5月7日
    51
  • Linux系统下使用gfortran

    Linux系统下使用gfortranLinux 系统下使用 gfortran 之前习惯了在 win 下用 fortran 写一些简单的专业课作业 切换到 linux 一脸懵逼 在此一一记录下踩过的坑 Linux 基本命令不完全总结第一次接触 linux 甚至如何关机都要问度娘 我实在是太菜了 我先把我迄今为止用到的命令做一个小结作为第一部分 gfortran 的用法在第二部分 超基础命令 shutdown hnow 立即关机 定时关机就把 now 改成时间 例如 10 53 10min reboot 重启 ls dir 查看文件夹下的全部文件

    2026年3月26日
    2
  • 人脸识别系统如何建模_3dmax人脸建模

    人脸识别系统如何建模_3dmax人脸建模本发明涉及生物特征识别,特别是涉及人脸识别中的特征建模方法。背景技术:人脸识别技术一般包括四个组成部分,分别为人脸图像采集、人脸图像预处理、人脸图像特征提取以及匹配与识别,具体来说:人脸图像采集及检测是指通过摄像镜头等视频图像采集装置采集包括有人脸的视频或图像数据,可以是采集对象的静态图像、动态图像、不同的位置、不同表情等。人脸图像预处理是指从采集的图像数据中确定人脸的部分,并进行灰度校正、噪声过…

    2025年12月10日
    5
  • tcp/ip协议包含哪几层_ip协议提供的是一种什么服务

    tcp/ip协议包含哪几层_ip协议提供的是一种什么服务在OSI模型中ARP协议属于链路层;而在TCP/IP模型中,ARP协议属于网络层。1)ARP分层的位置是TCP/IP的网络层2)ARP报文是由以太网帧进行封装传输的。没有封装进IP包。3)实际上

    2022年8月5日
    10
  • 如何搭建本地知识库使用豆包大模型

    如何搭建本地知识库使用豆包大模型

    2026年3月12日
    2
  • ntp时间同步端口_网络客户端协议没有了

    ntp时间同步端口_网络客户端协议没有了用Java实现的一个简单的NTP客户端

    2022年10月9日
    4

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号