Jafka来源分析——Processor

Jafka来源分析——Processor

大家好,又见面了,我是全栈君,今天给大家准备了Idea注册码。

Jafka Acceptor接受client而建立后的连接请求,Acceptor会将Socket连接交给Processor进行处理。Processor通过下面的处理步骤进行client请求的处理:

1. 读取client请求。

2. 依据client请求类型的不同,调用对应的处理函数进行处理。

Processor读取client请求是一个比較有意思的事情,须要考虑两个方面的事情:第一,请求规则(Processor须要依照一定的规则进行请求的解析)。第二,怎样确定一次请求的读取已经结束(由于是非堵塞连接,很有可能第一次读操作读取了请求的一部分数据,第二次到第N次读取才干把整个client请求读取完整)。以下我们具体解析一下client请求的格式。

client请求首先包括一个int,该int指明本次client请求的大小(size)。随后,请求包括一个两个byte(short)的请求类型(请求类型包括:CreaterRequestDeleterRequestFetchRequestMultiFetchRequestMultiProducerRequestOffsetRequestProducerRequest。然后每种请求类型有固定的格式。下图具体说明了ProducerRequest的格式:

Jafka来源分析——Processor


知道了上面的格式之后,问题二(怎样确定一次请求已经读取完毕)就非常easy攻克了。

首先为“请求长度”分配一个4byteByteBuffer,直到该Buffer读满,否则说明长度一直没有读取完毕。“请求长度”读取完毕后,为请求分配一个“请求长度”大小的ByteBuffer,直到该Buffer读满则说明一次请求读取完毕。读取完毕后,依据“请求类型”调用对应的处理函数(Handler)进行处理。在jafka中,上述的两个Buffer在类BoundedByteBufferReceive中进行声明和管理。Processor接收到Acceptor分配的socket连接后。会为socke连接建立一个BoundedByteBufferReceive并将其与socket连接进行绑定。每当该socket连接“可读”时。将BoundedByteBufferReceive拿出来从上次读取的基础上继续读取。直到一次请求彻底读取完毕,详细过程如以下代码(Processor.read)所看到的:

private void read(SelectionKey key) throws IOException {
SocketChannel socketChannel = channelFor(key);
Receive request = null;
request = new BoundedByteBufferReceive(maxRequestSize);
key.attach(request);
} else {
request = (Receive) key.attachment();
}
int read = request.readFrom(socketChannel);
stats.recordBytesRead(read);
if (read < 0) {
close(key);
} else if (request.complete()) {
Send maybeResponse = handle(key, request);
key.attach(null);
// if there is a response, send it, otherwise do nothing
if (maybeResponse != null) {
key.attach(maybeResponse);
key.interestOps(SelectionKey.OP_WRITE);
}
} else {
// more reading to be done
key.interestOps(SelectionKey.OP_READ);
getSelector().wakeup();
if (logger.isTraceEnabled()) {
logger.trace("reading request not been done. " + request);
}
}
}

BoundedByteBufferReceive.readFrom的实现详细例如以下:主要是申请两个Buffer并不断的读取数据。

public int readFrom(ReadableByteChannel channel) throws IOException {
        expectIncomplete();
        int read = 0;
        if (sizeBuffer.remaining() > 0) {
            read += Utils.read(channel, sizeBuffer);
        }
        if (contentBuffer == null && !sizeBuffer.hasRemaining()) {
            sizeBuffer.rewind();
            int size = sizeBuffer.getInt();
            if (size <= 0) {
                throw new InvalidRequestException(...);
            }
            if (size > maxRequestSize) {
                final String msg = "Request of length %d is not valid, it is larger than the maximum size of %d bytes.";
                throw new InvalidRequestException(format(msg, size, maxRequestSize));
            }
            contentBuffer = byteBufferAllocate(size);
        }
        //
        if (contentBuffer != null) {
            read = Utils.read(channel, contentBuffer);
            //
            if (!contentBuffer.hasRemaining()) {
                contentBuffer.rewind();
                setCompleted();
            }
        }
        return read;
    }

读取完毕后,Processor会解析“请求类型”,依据请求类型的不同调用不同的Handler处理对应于该请求。

版权声明:本文博主原创文章,博客,未经同意不得转载。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/116982.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • linux下java的环境配置

    linux下java的环境配置linux下java的环境配置文章目录linux下java的环境配置1.删除原有的java环境2.去官网下载相应的Java环境3.在Linux上进行解压4.修改~/.bashrc参考链接之前在大数据配置hadoop开发环境的时候,进行了相关的配置,所以还有印象,接下来对虚拟机ubuntu进行java的环境配置1.删除原有的java环境2.去官网下载相应的Java环境我用的是java8的环境,比较经典,另外还有java11也是比较稳定的,相较于java8做了一些改进3.在Linux上进行解

    2022年5月16日
    48
  • 工程师的基本功是什么?如何练习?—学习心得分享「建议收藏」

    开头聊几句1、周末休息,今天下雨了,气温还行,不冷不热2、刚看完《这!就是街舞》,很燃很炸,一些作品表达的东西也很让人感动3、发现真正热爱的事情,并为之付出所有的能量,很让人羡慕开头周四上线到比较晚,好在中间有空,去公司楼下湖边散了散步,上线回到家,已经是凌晨了。周五中午在去公司的路上看到了美团技术团队的一篇文章,觉得很不错,值得学习,也分享到朋友圈了,希望保留下方便自己查阅,也分享给更多的技术伙伴,一起看好的文章。在技术之路上,不断的持续学习,持续进步,一起精进。那天朋友圈分享美团的这

    2022年3月1日
    44
  • laravel 中使用tinker 验证驱动加载是否成功

    laravel 中使用tinker 验证驱动加载是否成功

    2021年10月26日
    53
  • 如何创建conda环境_conda安装Python包

    如何创建conda环境_conda安装Python包Pycharm中如何使用新建的conda环境?

    2022年8月27日
    4
  • 电阻电容电感的常用标注方法(手机电容和电阻的区分)

    认识电容及电容电阻的标注   一、认识电容及电容的标注①电容的功能和表示方法。由两个金属极,中间夹有绝缘介质构成。电容的特性主要是隔直流通交流,因此多用于级间耦合、滤波、去耦、旁路及信号调谐。电容在电路中用“C”加数字表示,比如C8,表示在电路中编号为8的电容。②电容的分类。电容按介质不同分为:气体介质电容,液体介质电容,无机固体介质电容,有机固体介质电容电解电容。按极性分为:有极性电容和无极性电

    2022年4月12日
    139
  • ElasticSearch安装&安装成windows服务

    ElasticSearch安装&安装成windows服务ElasticSearch安装&安装成windows服务

    2022年6月17日
    35

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号