nsq部署_andlua辅助源码

nsq部署_andlua辅助源码NSQ源码-Nsq客户端

大家好,又见面了,我是你们的朋友全栈君。

看完lookupd和nsqd之后我们再来看下nsq client端的代码。 我是想把nsq系统完完整整的看一遍,从而对他形成一个更整体的
认识。对message queue来说他的client端就是生产者和消费者,生产者负责想nsq中投递消息,消费者负责从lookupd中获取到
指定nsqd之后,从nsqd中获取消息。

生产者

我们以nsq/apps/to_nsq/to_nsq.go为例,客户端这边的代码逻辑就简单很多,NewProducer实例化一个instance,publish消息
到nsqd。

/// nsq/apps/to_nsq/to_nsq.go
producer, err := nsq.NewProducer(addr, cfg)
err := producer.Publish(*topic, line)

下面来看下Publish里的具体逻辑。

// Publish synchronously publishes a message body to the specified topic, returning
// an error if publish failed
func (w *Producer) Publish(topic string, body []byte) error {
    // 生成具体的cmd
    return w.sendCommand(Publish(topic, body))
}
func (w *Producer) sendCommand(cmd *Command) error {
    doneChan := make(chan *ProducerTransaction)
    err := w.sendCommandAsync(cmd, doneChan, nil)
    if err != nil {
        close(doneChan)
        return err
    }
    t := <-doneChan
    return t.Error
}
func (w *Producer) sendCommandAsync(cmd *Command, doneChan chan *ProducerTransaction,
    args []interface{}) error {
    // keep track of how many outstanding producers we're dealing with
    // in order to later ensure that we clean them all up...
    atomic.AddInt32(&w.concurrentProducers, 1)
    defer atomic.AddInt32(&w.concurrentProducers, -1)

    if atomic.LoadInt32(&w.state) != StateConnected {
        // 这里是一个lazily connect
        err := w.connect()
        if err != nil {
            return err
        }
    }

    t := &ProducerTransaction{
        cmd:      cmd,
        doneChan: doneChan,
        Args:     args,
    }

    select {
    case w.transactionChan <- t:
    case <-w.exitChan:
        return ErrStopped
    }

    return nil
}

在connect函数里启动了一个go routine去处理transactionChan对应的东西

func (w *Producer) connect() error {
    w.closeChan = make(chan int)
    w.conn = NewConn(w.addr, &w.config, &producerConnDelegate{w})
    w.conn.SetLogger(logger, logLvl, fmt.Sprintf("%3d (%%s)", w.id))
    _, err := w.conn.Connect()
    w.wg.Add(1)
    go w.router()

这里需要注意一下, go-nsq/conn.go是对底层连接的一个抽象,他是不关心你是生产者还是消费者,这里使用到了
delegate 模式,conn.go收到消息的处理放到了producerConnDelegate和consumerConnDelegate中,然后通知到具体的
消费者活着生产者。

消费者

回过头我们再来看下消费者部分的代码,client端我们以nsq/apps/nsq_tail/nsq_tail.go为例,代码的基本逻辑如下:


// 1. new comsunmer instanace 
consumer, err := nsq.NewConsumer(topics[i], *channel, cfg)
// 2. add handler
consumer.AddHandler(&TailHandler{topicName: topics[i], totalMessages: *totalMessages})
// 3. connect to nsqd
consumer.ConnectToNSQDs(nsqdTCPAddrs)
if err != nil {
    log.Fatal(err)
}
// 4. connect to lookupd
err = consumer.ConnectToNSQLookupds(lookupdHTTPAddrs)
if err != nil {
    log.Fatal(err)
}
consumers = append(consumers, consumer)

下面来看下每个部分的实际代码:

func (r *Consumer) AddHandler(handler Handler) {
    r.AddConcurrentHandlers(handler, 1)
}
func (r *Consumer) AddConcurrentHandlers(handler Handler, concurrency int) {
    if atomic.LoadInt32(&r.connectedFlag) == 1 {
        panic("already connected")
    }

    atomic.AddInt32(&r.runningHandlers, int32(concurrency))
    for i := 0; i < concurrency; i++ {
        go r.handlerLoop(handler)
    }
}

至此handler添加完成,起一个单独的go routine来等待消息的到了。

func (r *Consumer) handlerLoop(handler Handler) {
    r.log(LogLevelDebug, "starting Handler")

    for {
        message, ok := <-r.incomingMessages // 有新的消息的到来
        if !ok {
            goto exit
        }

        if r.shouldFailMessage(message, handler) {
            message.Finish()
            continue
        }

        err := handler.HandleMessage(message) // 调用之前注册的handler
        if err != nil {
            r.log(LogLevelError, "Handler returned error (%s) for msg %s", err, message.ID)
            if !message.IsAutoResponseDisabled() {
                message.Requeue(-1)
            }
            continue
        }

        if !message.IsAutoResponseDisabled() {
            message.Finish()
        }
    }

exit:
    r.log(LogLevelDebug, "stopping Handler")
    if atomic.AddInt32(&r.runningHandlers, -1) == 0 {
        r.exit()
    }
}

官方是不推荐只部署nqd而不部署lookupd的,我们直接看下lookup的连接过程:

func (r *Consumer) ConnectToNSQLookupd(addr string) error {
    ...
    r.lookupdHTTPAddrs = append(r.lookupdHTTPAddrs, addr)
    numLookupd := len(r.lookupdHTTPAddrs)
    r.mtx.Unlock()

    // if this is the first one, kick off the go loop
    if numLookupd == 1 {
        r.queryLookupd()
        r.wg.Add(1)
        go r.lookupdLoop()
    }
    return nil
}

在queryLookupd中先去查询lookupd获取最新的nqd地址,然后connect to nsqd.

func (r *Consumer) lookupdLoop() {
    // add some jitter so that multiple consumers discovering the same topic,
    // when restarted at the same time, dont all connect at once.
    ticker = time.NewTicker(r.config.LookupdPollInterval)
    // 每个ticker interval更新nqd的地址信息
    for {
        select {
        case <-ticker.C:
            r.queryLookupd()
        case <-r.lookupdRecheckChan:
            r.queryLookupd()
        case <-r.exitChan:
            goto exit
        }
    }
}
func (r *Consumer) ConnectToNSQD(addr string) error {
    // 1. new connection
    conn := NewConn(addr, &r.config, &consumerConnDelegate{r})
    conn.SetLogger(logger, logLvl,
    fmt.Sprintf("%3d [%s/%s] (%%s)", r.id, r.topic, r.channel))

    // 2. connection list
    _, pendingOk := r.pendingConnections[addr]
    _, ok := r.connections[addr]

    r.pendingConnections[addr] = conn
    if idx := indexOf(addr, r.nsqdTCPAddrs); idx == -1 {
        r.nsqdTCPAddrs = append(r.nsqdTCPAddrs, addr)
    }

    r.log(LogLevelInfo, "(%s) connecting to nsqd", addr)
    // 3. new connect
    //   3.1 go c.readLoop()
    //   3.2 go c.writeLoop()
    resp, err := conn.Connect()
    
    // 4. sub to nsqd
    cmd := Subscribe(r.topic, r.channel)
    err = conn.WriteCommand(cmd)
}

以上就是客户端初始化的一个流程,然后就是接受消息处理了。

->NewConsumer() // 新建一个consumer
->ConnectToNSQLookupds() // 连接到lookupd
  |->ConnectToNSQLookupd() // 连接到lookupd
     |->r.queryLookupd() // 查询lookupd的
         |->apiRequestNegotiateV1() // 调用lookupd的rest api获取nsqd消息
         |->ConnectToNSQD() // 连接到具体nsq
            |->NewConn() // 连接instance
            |->conn.Connect() // 开始连接
                  |->c.readLoop() // 与nqd连接read loop
                  |->c.writeLoop() // 与nqd连接write loop
            |->Subscribe() // consumer发送SUB command
     |->lookupdLoop() // 定时查询lookupd并更新nsqd信息

注:

[1]. 关于delegate模式参考 这里

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/107169.html原文链接:https://javaforall.net

(0)
全栈程序员-站长的头像全栈程序员-站长


相关推荐

  • nyaa镜像站 nyaa反向代理站「建议收藏」

    提供一个可以磁力转种子,并且资源中心里边是nyaa镜像站的网站:https://magnet-vip.com资源中心做了登录限定,防止无法访问!

    2022年4月7日
    959
  • 【OpenCV入门教程之九】 非线性滤波专场:中值滤波、双边滤波[通俗易懂]

    正如我们上一篇文章中讲到的,线性滤波可以实现很多种不同的图像变换。然而非线性滤波,如中值滤波器和双边滤波器,有时可以达到更好的实现效果。邻域算子的其他一些例子还有对二值图像进行操作的形态学算子,用于计算距离变换和寻找连通量的半全局算子。先上一张截图:一、理论与概念讲解——从现象到本质1.1非线性滤波概述之前的那篇文章里,我们所考虑的滤波器都是线性的,即两个信号之和的响应和他们各自响应之和相等。换句话说,每个像素的输出值是一些输入像素的加权和,线性滤波器易于构造,并且易于从频率响应角度来进行分

    2022年4月9日
    54
  • 项目实战:ASP.NET:B/S结构 个人空间相册、照片上传下载系统

    项目实战:ASP.NET:B/S结构 个人空间相册、照片上传下载系统项目实战:ASP.NET:B/S结构个人空间相册、照片上传下载系统编辑环境:win10_x64/VS2015/SqlServer2012项目:asp.net项目简介:只是具有基本的登录功能,上传相关信息,图片,提供下载,相册功能,熟悉表格和基本的前后台程序其他:这是写的第一个asp.net,只是为了熟悉和了解asp.net项目和相关的知识。本次只是简单地前台ht…

    2022年7月11日
    17
  • 2020 年最新版 68 道Redis面试题,20000 字干货,赶紧收藏起来备用![通俗易懂]

    2020 年最新版 68 道Redis面试题,20000 字干货,赶紧收藏起来备用!

    2022年2月14日
    77
  • python全国计算机二级报名_python有证书考吗

    python全国计算机二级报名_python有证书考吗第一次参加全国计算机等级考试的考生对于网上报名的流程,对全国计算机考试流程中某些环节并不清楚,小编今天就整理下全国计算机等级考试流程及详细说明,提供网上报名流程示意图,解决大家在全国计算机等级考试报名过程中的疑问。(如有出入,请以官方信息为准)考生需登录各地计算机等级考试官方报名网站,进入“全国计算机等级考试报名系统”进行注册登录。(一)注册账号和登录一、注册ETEST通行证1.考生首次登录系…

    2025年8月28日
    6
  • Dubbo负载均衡策略之 一致性哈希

    Dubbo负载均衡策略之 一致性哈希Dubbo负载均衡策略之一致性哈希1负载均衡在这里引用dubbo官网的一段话——LoadBalance中文意思为负载均衡,它的职责是将网络请求,或者其他形式的负载“均摊”到不同的机器上。避免集群中部分服务器压力过大,而另一些服务器比较空闲的情况。通过负载均衡,可以让每台服务器获取到适合自己处理能力的负载。在为高负载服务器分流的同时,还可以避免资源浪费,一举两得。负载均衡可分为软件负载均衡和硬件负载均衡。在我们日常开发中,一般很难接触到硬件负载均衡。但软件负载均衡还是可以接触到的,比如Nginx

    2022年7月27日
    9

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号