Jafka来源分析——Processor

Jafka来源分析——Processor

大家好,又见面了,我是全栈君,今天给大家准备了Idea注册码。

Jafka Acceptor接受client而建立后的连接请求,Acceptor会将Socket连接交给Processor进行处理。Processor通过下面的处理步骤进行client请求的处理:

1. 读取client请求。

2. 依据client请求类型的不同,调用对应的处理函数进行处理。

Processor读取client请求是一个比較有意思的事情,须要考虑两个方面的事情:第一,请求规则(Processor须要依照一定的规则进行请求的解析)。第二,怎样确定一次请求的读取已经结束(由于是非堵塞连接,很有可能第一次读操作读取了请求的一部分数据,第二次到第N次读取才干把整个client请求读取完整)。以下我们具体解析一下client请求的格式。

client请求首先包括一个int,该int指明本次client请求的大小(size)。随后,请求包括一个两个byte(short)的请求类型(请求类型包括:CreaterRequestDeleterRequestFetchRequestMultiFetchRequestMultiProducerRequestOffsetRequestProducerRequest。然后每种请求类型有固定的格式。下图具体说明了ProducerRequest的格式:

Jafka来源分析——Processor


知道了上面的格式之后,问题二(怎样确定一次请求已经读取完毕)就非常easy攻克了。

首先为“请求长度”分配一个4byteByteBuffer,直到该Buffer读满,否则说明长度一直没有读取完毕。“请求长度”读取完毕后,为请求分配一个“请求长度”大小的ByteBuffer,直到该Buffer读满则说明一次请求读取完毕。读取完毕后,依据“请求类型”调用对应的处理函数(Handler)进行处理。在jafka中,上述的两个Buffer在类BoundedByteBufferReceive中进行声明和管理。Processor接收到Acceptor分配的socket连接后。会为socke连接建立一个BoundedByteBufferReceive并将其与socket连接进行绑定。每当该socket连接“可读”时。将BoundedByteBufferReceive拿出来从上次读取的基础上继续读取。直到一次请求彻底读取完毕,详细过程如以下代码(Processor.read)所看到的:

private void read(SelectionKey key) throws IOException {
SocketChannel socketChannel = channelFor(key);
Receive request = null;
request = new BoundedByteBufferReceive(maxRequestSize);
key.attach(request);
} else {
request = (Receive) key.attachment();
}
int read = request.readFrom(socketChannel);
stats.recordBytesRead(read);
if (read < 0) {
close(key);
} else if (request.complete()) {
Send maybeResponse = handle(key, request);
key.attach(null);
// if there is a response, send it, otherwise do nothing
if (maybeResponse != null) {
key.attach(maybeResponse);
key.interestOps(SelectionKey.OP_WRITE);
}
} else {
// more reading to be done
key.interestOps(SelectionKey.OP_READ);
getSelector().wakeup();
if (logger.isTraceEnabled()) {
logger.trace("reading request not been done. " + request);
}
}
}

BoundedByteBufferReceive.readFrom的实现详细例如以下:主要是申请两个Buffer并不断的读取数据。

public int readFrom(ReadableByteChannel channel) throws IOException {
        expectIncomplete();
        int read = 0;
        if (sizeBuffer.remaining() > 0) {
            read += Utils.read(channel, sizeBuffer);
        }
        if (contentBuffer == null && !sizeBuffer.hasRemaining()) {
            sizeBuffer.rewind();
            int size = sizeBuffer.getInt();
            if (size <= 0) {
                throw new InvalidRequestException(...);
            }
            if (size > maxRequestSize) {
                final String msg = "Request of length %d is not valid, it is larger than the maximum size of %d bytes.";
                throw new InvalidRequestException(format(msg, size, maxRequestSize));
            }
            contentBuffer = byteBufferAllocate(size);
        }
        //
        if (contentBuffer != null) {
            read = Utils.read(channel, contentBuffer);
            //
            if (!contentBuffer.hasRemaining()) {
                contentBuffer.rewind();
                setCompleted();
            }
        }
        return read;
    }

读取完毕后,Processor会解析“请求类型”,依据请求类型的不同调用不同的Handler处理对应于该请求。

版权声明:本文博主原创文章,博客,未经同意不得转载。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/116982.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • 关于component-scan中base-package包含通配符的问题探究

    关于component-scan中base-package包含通配符的问题探究今天在配置Spring的component-scan时,发现了一个有趣的问题。就是在指定base-package时,如果使用了星号通配符*,有时会出现类扫描不到的情况。下面研究一下这个问题。先介绍一下项目结构: 为了演示,我在java文件夹下创建名为controller的包,并在该包下创建了一个名为IndexController的类。如图所示: 先来看正常情况: 在Spring配置…

    2022年6月13日
    88
  • fstream 中文路径_gradle files have changed

    fstream 中文路径_gradle files have changed在C++的标准库中,std::fstream是个挺好用的文件读写流,操作文件很方便,因为是C++标准库,所以没有其它的环境依赖。在使用fstream过程中,有个打开中文路径文件会失败的问题,自己的代码中一直没处理好,这几天终于有点闲心,把这里改透。涉及很多知识点,也是个遗留已久的问题,特此做个记录。在最后用了个一劳永逸的解决此问题方法:将fstream、FILE再包装下。中文路径使用fstream调试程序过程中,发现打开含中文路径的文件时,会打开失败。查了一些资料,说在VS2008、vs200..

    2022年9月19日
    2
  • jsonschema校验json数据_xml schema校验

    jsonschema校验json数据_xml schema校验ajv使用在使用前,需要知道json-schema是什么。json-schemajson-schema是一个用来描述json数据格式。ajvajv是一个校验json-schem

    2022年8月2日
    18
  • pytest fixtures_pytest allure

    pytest fixtures_pytest allurefixture的优势Pytest的fixture相对于传统的xUnit的setup/teardown函数做了显著的改进:命名方式灵活,不局限于setup和teardown这几个命名conf

    2022年7月29日
    7
  • c语言中位运算符_位运算符的用法

    c语言中位运算符_位运算符的用法C语言的运算符是一个很有意思的东西,运用起来可以解决很多麻烦的事,但是想要灵活应用也有一定的难度,总结一下c语言运算符的用法和一些常用技巧.一.C语言位运算符简介C语言的位运算符有六种,分别是:>>  右移运算符&   按位与运算符|   按位或运算符^   按位异或运算符~   按位取反运算符这些运算符都是对于基本数据类型的二进制位进行操作的,这

    2022年10月4日
    5
  • js匿名函数和命名函数_javascript中的函数

    js匿名函数和命名函数_javascript中的函数匿名函数里的对象,跟定义匿名函数页面的上下文有关,而与调用匿名函数的页面无关。很拗口。举例说明://在某个“容器”页面functionshowWorkSpace(callback,h){varheight=500;callback(“map_workspace”,height);}//在该“容器”页面中的某个iframe页面parent.showWorkSpa

    2022年9月1日
    3

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号