Tendermint P2P源码分析

Tendermint P2P源码分析启用 P2P 在 node node go 的 NewNode 里 会发现这样一段代码 setupTranspo 创建 switch sw createSwitch config transport p2pMetrics mpReactorShi bcReactorFor stateSyncRea csReactorShi evReactorShi proxyApp nodeInfo nodeKey p

前言

官方文档:https://docs.tendermint.com/master/spec/p2p/messages/

message

Tendermint的P2P 中的消息分为两部分:channel和message。

P2P配置

# P2P Configuration Options # [p2p] # Enable the new p2p layer. disable-legacy = false # Select the p2p internal queue queue-type = "priority" # Address to listen for incoming connections laddr = "tcp://0.0.0.0:26656" # Address to advertise to peers for them to dial # If empty, will use the same port as the laddr, # and will introspect on the listener or use UPnP # to figure out the address. external-address = "" # Comma separated list of seed nodes to connect to seeds = "" # Comma separated list of nodes to keep persistent connections to persistent-peers = "" # UPNP port forwarding upnp = false # Path to address book addr-book-file = "config/addrbook.json" # Set true for strict address routability rules # Set false for private or local networks addr-book-strict = true # Maximum number of inbound peers # # TODO: Remove once p2p refactor is complete in favor of MaxConnections. # ref: https://github.com/tendermint/tendermint/issues/5670 max-num-inbound-peers = 40 # Maximum number of outbound peers to connect to, excluding persistent peers # # TODO: Remove once p2p refactor is complete in favor of MaxConnections. # ref: https://github.com/tendermint/tendermint/issues/5670 max-num-outbound-peers = 10 # Maximum number of connections (inbound and outbound). max-connections = 64 # Rate limits the number of incoming connection attempts per IP address. max-incoming-connection-attempts = 100 # List of node IDs, to which a connection will be (re)established ignoring any existing limits unconditional-peer-ids = "" # Maximum pause when redialing a persistent peer (if zero, exponential backoff is used) persistent-peers-max-dial-period = "0s" # Time to wait before flushing messages out on the connection flush-throttle-timeout = "100ms" # Maximum size of a message packet payload, in bytes max-packet-msg-payload-size = 1400 # Rate at which packets can be sent, in bytes/second send-rate =  # Rate at which packets can be received, in bytes/second recv-rate =  # Set true to enable the peer-exchange reactor pex = true # Comma separated list of peer IDs to keep private (will not be gossiped to other peers) private-peer-ids = "" # Toggle to disable guard against peers connecting from the same ip. allow-duplicate-ip = false # Peer connection configuration. handshake-timeout = "20s" dial-timeout = "3s" 

启用P2P

在node/node.go的NewNode里,会发现这样一段代码。

// setup Transport and Switch // 创建switch sw := createSwitch( config, transport, p2pMetrics, mpReactorShim, bcReactorForSwitch, stateSyncReactorShim, csReactorShim, evReactorShim, proxyApp, nodeInfo, nodeKey, p2pLogger, ) err = sw.AddPersistentPeers(splitAndTrimEmpty(config.P2P.PersistentPeers, ",", " ")) if err != nil { 
    return nil, fmt.Errorf("could not add peers from persistent-peers field: %w", err) } err = sw.AddUnconditionalPeerIDs(splitAndTrimEmpty(config.P2P.UnconditionalPeerIDs, ",", " ")) if err != nil { 
    return nil, fmt.Errorf("could not add peer ids from unconditional_peer_ids field: %w", err) } // 设置地址簿 addrBook, err := createAddrBookAndSetOnSwitch(config, sw, p2pLogger, nodeKey) if err != nil { 
    return nil, fmt.Errorf("could not create addrbook: %w", err) } // Optionally, start the pex reactor // // TODO: // // We need to set Seeds and PersistentPeers on the switch, // since it needs to be able to use these (and their DNS names) // even if the PEX is off. We can include the DNS name in the NetAddress, // but it would still be nice to have a clear list of the current "PersistentPeers" // somewhere that we can return with net_info. // // If PEX is on, it should handle dialing the seeds. Otherwise the switch does it. // Note we currently use the addrBook regardless at least for AddOurAddress var ( pexReactor *pex.Reactor pexReactorV2 *pex.ReactorV2 ) // 创建PEX if config.P2P.PexReactor { 
    pexReactor = createPEXReactorAndAddToSwitch(addrBook, config, sw, logger) pexReactorV2, err = createPEXReactorV2(config, logger, peerManager, router) if err != nil { 
    return nil, err } router.AddChannelDescriptors(pexReactor.GetChannels()) } 

所以说,P2P的启动入口应该是先启动Switch实例

switch

Switch处理对等连接并公开一个API来接收传入的消息。每个“Reactor”负责处理一个/或多个“Channels”的传入消息。因此,当发送传出消息通常在对等机上执行时,传入消息在 reactor 上接收。其实大概意思就是连接各个reactor,进行信息交换。

至于reactor,在p2p/base_reactor.go里对Reactor的含义及作用进行了解释。简单来说,它就是和整个P2P网络进行交互的组件。在Tendermint中,一共有六个Reactor:mempool、blockchain、consensus、evidence、pex。

Switch结构

type Switch struct { 
    // 继承BaseService,方便统一启动和停止 service.BaseService // 找到P2P配置文件,所以说P2P的启动入口应该是先启动Switch实例 config *config.P2PConfig // 所有的创建的reactor集合 reactors map[string]Reactor // reactor和 channel 之间的对应关系,也是通过这个传递给peer再往下传递到MConnection chDescs []*conn.ChannelDescriptor reactorsByCh map[byte]Reactor // peer 集合 peers *PeerSet dialing *cmap.CMap reconnecting *cmap.CMap nodeInfo NodeInfo // our node info nodeKey NodeKey // our node privkey addrBook AddrBook // peers addresses with whom we'll maintain constant connection persistentPeersAddrs []*NetAddress unconditionalPeerIDs map[NodeID]struct{ 
   } transport Transport filterTimeout time.Duration peerFilters []PeerFilterFunc connFilters []ConnFilterFunc conns ConnSet rng *rand.Rand // seed for randomizing dial times and orders metrics *Metrics } 

创建Switch

func NewSwitch( cfg *config.P2PConfig, transport Transport, options ...SwitchOption, ) *Switch { 
    sw := &Switch{ 
    config: cfg, reactors: make(map[string]Reactor), chDescs: make([]*conn.ChannelDescriptor, 0), reactorsByCh: make(map[byte]Reactor), peers: NewPeerSet(), dialing: cmap.NewCMap(), reconnecting: cmap.NewCMap(), metrics: NopMetrics(), transport: transport, persistentPeersAddrs: make([]*NetAddress, 0), unconditionalPeerIDs: make(map[NodeID]struct{ 
   }), filterTimeout: defaultFilterTimeout, conns: NewConnSet(), } // Ensure we have a completely undeterministic PRNG. sw.rng = rand.NewRand() sw.BaseService = *service.NewBaseService(nil, "P2P Switch", sw) for _, option := range options { 
    option(sw) } return sw } 

启动Switch

启动switch,它会启动所有的reactors 和peers

func (sw *Switch) OnStart() error { 
    // FIXME: Temporary hack to pass channel descriptors to MConn transport, // since they are not available when it is constructed. This will be // fixed when we implement the new router abstraction. if t, ok := sw.transport.(*MConnTransport); ok { 
    t.channelDescs = sw.chDescs } // 首先调用Reactor 启动所有的Reactor. for _, reactor := range sw.reactors { 
    err := reactor.Start() if err != nil { 
    return fmt.Errorf("failed to start %v: %w", reactor, err) } } // 开始接受 peers go sw.acceptRoutine() return nil } 

acceptRoutine

func (sw *Switch) acceptRoutine() { 
    for { 
    var peerNodeInfo NodeInfo // 接收一个新连接好的peer c, err := sw.transport.Accept() if err == nil { 
    // 在使用peer之前,需要在连接上执行一次握手 // 以前的MConn transport 使用Accept() 进行handshaing。 // 它是这是异步的,避免了head-of-line-blocking。 // 但是随着handshakes从transport中迁移出去。 // 我们在这里同步进行handshakes。 // 主要作用是获取节点的信息 peerNodeInfo, _, err = sw.handshakePeer(c, "") } if err == nil { 
    err = sw.filterConn(c.(*mConnConnection).conn) } if err != nil { 
    if c != nil { 
    _ = c.Close() } if err == io.EOF { 
    err = ErrTransportClosed{ 
   } } switch err := err.(type) { 
    case ErrRejected: // 避免连接自己 if err.IsSelf() { 
    // Remove the given address from the address book and add to our addresses // to avoid dialing in the future. addr := err.Addr() sw.addrBook.RemoveAddress(&addr) sw.addrBook.AddOurAddress(&addr) } sw.Logger.Info( "Inbound Peer rejected", "err", err, "numPeers", sw.peers.Size(), ) continue // 过滤超时peer case ErrFilterTimeout: sw.Logger.Error( "Peer filter timed out", "err", err, ) continue // 判断是否为已经关闭的Transport case ErrTransportClosed: sw.Logger.Error( "Stopped accept routine, as transport is closed", "numPeers", sw.peers.Size(), ) default: sw.Logger.Error( "Accept on transport errored", "err", err, "numPeers", sw.peers.Size(), ) // We could instead have a retry loop around the acceptRoutine, // but that would need to stop and let the node shutdown eventually. // So might as well panic and let process managers restart the node. // There's no point in letting the node run without the acceptRoutine, // since it won't be able to accept new connections. panic(fmt.Errorf("accept routine exited: %v", err)) } break } isPersistent := false addr, err := peerNodeInfo.NetAddress() if err == nil { 
    isPersistent = sw.IsPeerPersistent(addr) } // 创建新的peer实例 p := newPeer( peerNodeInfo, newPeerConn(false, isPersistent, c), sw.reactorsByCh, sw.StopPeerForError, PeerMetrics(sw.metrics), ) if !sw.IsPeerUnconditional(p.NodeInfo().ID()) { 
    // 如果我们已经有足够的peer数量,就忽略 _, in, _ := sw.NumPeers() if in >= sw.config.MaxNumInboundPeers { 
    sw.Logger.Info( "Ignoring inbound connection: already have enough inbound peers", "address", p.SocketAddr(), "have", in, "max", sw.config.MaxNumInboundPeers, ) _ = p.CloseConn() continue } } // 把peer添加到switch中 if err := sw.addPeer(p); err != nil { 
    _ = p.CloseConn() if p.IsRunning() { 
    _ = p.Stop() } sw.conns.RemoveAddr(p.RemoteAddr()) sw.Logger.Info( "Ignoring inbound connection: error while adding peer", "err", err, "id", p.ID(), ) } } } 

peer

peer在p2p中表示一个对等体。 在tendermint中也是它和应用程序之间进行直接的消息交互。 peer实现了Peer这个接口的定义。

Peer的结构

type peer struct { 
    service.BaseService // 原始的 peerConn 和 multiplex 连接 // peerConn 的创建使用 newOutboundPeerConn 和 newInboundPeerConn。这两个函数是在switch组件中被调用的 peerConn // peer's node info and the channel it knows about // channels = nodeInfo.Channels // cached to avoid copying nodeInfo in hasChannel nodeInfo NodeInfo channels []byte reactors map[byte]Reactor onPeerError func(Peer, interface{ 
   }) // 用户数据 Data *cmap.CMap metrics *Metrics metricsTicker *time.Ticker } 

创建Peer

创建peer实例,负责初始化一个peer实例。

func newPeer( nodeInfo NodeInfo, pc peerConn, reactorsByCh map[byte]Reactor, onPeerError func(Peer, interface{ 
   }), options ...PeerOption, ) *peer { 
    p := &peer{ 
    // 初始化peerConn实例 peerConn: pc, nodeInfo: nodeInfo, channels: nodeInfo.Channels, // TODO reactors: reactorsByCh, onPeerError: onPeerError, Data: cmap.NewCMap(), metricsTicker: time.NewTicker(metricsTickerDuration), metrics: NopMetrics(), } p.BaseService = *service.NewBaseService(nil, "Peer", p) for _, option := range options { 
    option(p) } return p } 

启动Peer

func (p *peer) OnStart() error { 
    // 不需要调用 BaseService.OnStart(),所以直接返回了 nil if err := p.BaseService.OnStart(); err != nil { 
    return err } // 处理从connection接收到的消息。 go p.processMessages() // 心跳检测 // 每隔10s,获取链接状态,并且将发送数据的通道大小与peerID关联起来 go p.metricsReporter() return nil } 

MConnection

transport的功能实现最终会到p2p/conn/connection.go文件里的MConnection上,所以MConnection是P2P最底层的部分。消息的写入和读取都是通过此组件完成的。它维护了网络连接、进行底层的网络数据传输。

MConnection是一个多路连接,在单个TCP连接上支持多个独立的流,具有不同的服务质量保证。每个流被称为一个channel,每个channel都有一个全局的唯一字节ID,每个channel都有优先级。每个channel的id和优先级在初始连接时配置。

MConnection支持三种类型的packet:

  • Ping
  • Pong
  • Msg

当我们在pingTimeout规定的时间内没有收到任何消息时,我们就会发出一个ping消息,当一个对等端收到ping消息并且没有别的附加消息时,对等端就会回应一个pong消息。如果我们在规定的时间内没有收到pong消息,我们就会断开连接。

创建MConnection

func NewMConnectionWithConfig( conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc, config MConnConfig, ) *MConnection { 
    if config.PongTimeout >= config.PingInterval { 
    panic("pongTimeout must be less than pingInterval (otherwise, next ping will reset pong timer)") } mconn := &MConnection{ 
    // TCP连接成功返回的对象 conn: conn, // net.Con封装成bufio的读写,可以方便用类似文件IO的形式来对TCP流进行读写操作。 bufConnReader: bufio.NewReaderSize(conn, minReadBufferSize), bufConnWriter: bufio.NewWriterSize(conn, minWriteBufferSize), sendMonitor: flow.New(0, 0), recvMonitor: flow.New(0, 0), send: make(chan struct{ 
   }, 1), pong: make(chan struct{ 
   }, 1), onReceive: onReceive, onError: onError, config: config, created: time.Now(), } // 创建通道 var channelsIdx = map[byte]*Channel{ 
   } var channels = []*Channel{ 
   } for _, desc := range chDescs { 
    channel := newChannel(mconn, *desc) channelsIdx[channel.desc.ID] = channel channels = append(channels, channel) } mconn.channels = channels mconn.channelsIdx = channelsIdx mconn.BaseService = *service.NewBaseService(nil, "MConnection", mconn) // maxPacketMsgSize() is a bit heavy, so call just once mconn._maxPacketMsgSize = mconn.maxPacketMsgSize() return mconn } 

channel

type Channel struct { 
    conn *MConnection desc ChannelDescriptor sendQueue chan []byte // 发送队列 sendQueueSize int32 // atomic. recving []byte // 接受缓存队列 sending []byte // 发送缓冲区 recentlySent int64 // exponential moving average maxPacketMsgPayloadSize int Logger log.Logger } 

Peer调用Send发送消息其实是调用MConnecttion的Send方法,那么MConnecttion的Send其实也只是把内容发送到Channel的sendQueue中, 然后会有专门的routine读取Channel进行实际的消息发送。

启动MConnection

func (c *MConnection) OnStart() error { 
    if err := c.BaseService.OnStart(); err != nil { 
    return err } // 同步周期 c.flushTimer = timer.NewThrottleTimer("flush", c.config.FlushThrottle) // ping周期 c.pingTimer = time.NewTicker(c.config.PingInterval) c.pongTimeoutCh = make(chan bool, 1) c.chStatsTimer = time.NewTicker(updateStats) c.quitSendRoutine = make(chan struct{ 
   }) c.doneSendRoutine = make(chan struct{ 
   }) c.quitRecvRoutine = make(chan struct{ 
   }) // 发送任务循环 go c.sendRoutine() // 接收任务循环 go c.recvRoutine() return nil } 

sendRoutine

func (c *MConnection) sendRoutine() { 
    defer c._recover() protoWriter := protoio.NewDelimitedWriter(c.bufConnWriter) FOR_LOOP: for { 
    var _n int var err error SELECTION: select { 
    // 进行周期性的flush case <-c.flushTimer.Ch: // NOTE: flushTimer.Set() must be called every time // something is written to .bufConnWriter. c.flush() case <-c.chStatsTimer.C: for _, channel := range c.channels { 
    channel.updateStats() } // 进行周期性的向TCP连接写入ping消息 case <-c.pingTimer.C: c.Logger.Debug("Send Ping") _n, err = protoWriter.WriteMsg(mustWrapPacket(&tmp2p.PacketPing{ 
   })) if err != nil { 
    c.Logger.Error("Failed to send PacketPing", "err", err) break SELECTION } c.sendMonitor.Update(_n) c.Logger.Debug("Starting pong timer", "dur", c.config.PongTimeout) c.pongTimer = time.AfterFunc(c.config.PongTimeout, func() { 
    select { 
    case c.pongTimeoutCh <- true: default: } }) c.flush() case timeout := <-c.pongTimeoutCh: if timeout { 
    c.Logger.Debug("Pong timeout") err = errors.New("pong timeout") } else { 
    c.stopPongTimer() } // c.pong 表示需要进行pong回复,这个不是周期性的写入,因为收到了对方发来的ping消息,这个通道的写入是在recvRoutine函数中进行的 case <-c.pong: c.Logger.Debug("Send Pong") _n, err = protoWriter.WriteMsg(mustWrapPacket(&tmp2p.PacketPong{ 
   })) if err != nil { 
    c.Logger.Error("Failed to send PacketPong", "err", err) break SELECTION } c.sendMonitor.Update(_n) c.flush() case <-c.quitSendRoutine: break FOR_LOOP // 当c.send有写入,我们就应该信息包发送了 case <-c.send: // Send some PacketMsgs // 发送一些包 eof := c.sendSomePacketMsgs() if !eof { 
    // Keep sendRoutine awake. select { 
    case c.send <- struct{ 
   }{ 
   }: default: } } } if !c.IsRunning() { 
    break FOR_LOOP } if err != nil { 
    c.Logger.Error("Connection failed @ sendRoutine", "conn", c, "err", err) c.stopForError(err) break FOR_LOOP } } // Cleanup c.stopPongTimer() close(c.doneSendRoutine) } 

sendPacketMsg

如果通道中的message被发送完, 返回true

func (c *MConnection) sendPacketMsg() bool { 
    // 选择的通道将是最近发送/优先级最小的通道。 var leastRatio float32 = math.MaxFloat32 var leastChannel *Channel for _, channel := range c.channels { 
    // 检查channel.sendQueue 是否为0,channel.sending缓存区是否为空,如果空着说明没有需要发送的内容 // 如果缓冲区为空了 就要把channel.sendQueue内部排队的内容移出一份到缓冲区 // 如果有任何PacketMsgs等待发送,则返回true。 if !channel.isSendPending() { 
    continue } // Get ratio, and keep track of lowest ratio. ratio := float32(channel.recentlySent) / float32(channel.desc.Priority) if ratio < leastRatio { 
    leastRatio = ratio leastChannel = channel } } // Nothing to send? if leastChannel == nil { 
    return true } // c.Logger.Info("Found a msgPacket to send") // Make & send a PacketMsg from this channel // 执行到这里说明某个channel内部有消息没法送出去,将消息发送出去 _n, err := leastChannel.writePacketMsgTo(c.bufConnWriter) if err != nil { 
    c.Logger.Error("Failed to write PacketMsg", "err", err) c.stopForError(err) return true } c.sendMonitor.Update(_n) c.flushTimer.Set() return false } 

recvRoutine

func (c *MConnection) recvRoutine() { 
    defer c._recover() protoReader := protoio.NewDelimitedReader(c.bufConnReader, c._maxPacketMsgSize) FOR_LOOP: for { 
    // Block until .recvMonitor says we can read. c.recvMonitor.Limit(c._maxPacketMsgSize, atomic.LoadInt64(&c.config.RecvRate), true) // Read packet type var packet tmp2p.Packet _n, err := protoReader.ReadMsg(&packet) c.recvMonitor.Update(_n) if err != nil { 
    // stopServices was invoked and we are shutting down // receiving is excpected to fail since we will close the connection select { 
    case <-c.quitRecvRoutine: break FOR_LOOP default: } if c.IsRunning() { 
    if err == io.EOF { 
    c.Logger.Info("Connection is closed @ recvRoutine (likely by the other side)", "conn", c) } else { 
    c.Logger.Debug("Connection failed @ recvRoutine (reading byte)", "conn", c, "err", err) } c.stopForError(err) } break FOR_LOOP } // 根据解析出来的报文类型做相应的操作 switch pkt := packet.Sum.(type) { 
    case *tmp2p.Packet_PacketPing: // TODO: prevent abuse, as they cause flush()'s. // https://github.com/tendermint/tendermint/issues/1190 c.Logger.Debug("Receive Ping") // 对法要求我们发送一个pong,所以职置位pong标识,在sendRoutine中做响应操作 select { 
    case c.pong <- struct{ 
   }{ 
   }: default: // never block } case *tmp2p.Packet_PacketPong: c.Logger.Debug("Receive Pong") // 更新pong超时状态 select { 
    case c.pongTimeoutCh <- false: default: // never block } case *tmp2p.Packet_PacketMsg: channel, ok := c.channelsIdx[byte(pkt.PacketMsg.ChannelID)] if !ok || channel == nil { 
    err := fmt.Errorf("unknown channel %X", pkt.PacketMsg.ChannelID) c.Logger.Debug("Connection failed @ recvRoutine", "conn", c, "err", err) c.stopForError(err) break FOR_LOOP } msgBytes, err := channel.recvPacketMsg(*pkt.PacketMsg) // 根据接收到的报文,选择对应的channel,放入到对应的接收缓存区中。缓存区的作用是什么呢 // 在上文的发送报文中我们发现一个PackageMsg包中可能并没有包含完整的内容,只有EOF为1才标识发送完成。 // 所以下面这个函数其实就是先将接收到的内容放入到缓存区,只有所有内容收到以后才能组装成一个完整的内容 if err != nil { 
    if c.IsRunning() { 
    c.Logger.Debug("Connection failed @ recvRoutine", "conn", c, "err", err) c.stopForError(err) } break FOR_LOOP } if msgBytes != nil { 
    c.Logger.Debug("Received bytes", "chID", pkt.PacketMsg.ChannelID, "msgBytes", msgBytes) // 注意这个函数的调用非常重要,记得之前我说为啥只有Send没有Receive呢, 答案就在此处。 // 也就是说MConnecttion会把接收到的完整消息通过回调的形式返回给上面。 这个onReceive回调和Reactor的OnReceive是啥关系呢 // 以及这个ChannelID和Reactor又是啥关系呢 不着急, 后面我们慢慢分析。 反正可以确定的是MConnecttion通过这个回调函数把接收到的消息 // 返回给你应用层。 c.onReceive(byte(pkt.PacketMsg.ChannelID), msgBytes) } default: err := fmt.Errorf("unknown message type %v", reflect.TypeOf(packet)) c.Logger.Error("Connection failed @ recvRoutine", "conn", c, "err", err) c.stopForError(err) break FOR_LOOP } } // Cleanup close(c.pong) for range c.pong { 
    // Drain } } 

PEX

上面的分析是先实现了如何向对等体发送消息,而PEX 实现了对节点的发现的功能。

p2p/pex/pex_reactor.go func NewReactor(b AddrBook, config *ReactorConfig) *Reactor { 
    r := &Reactor{ 
    book: b, config: config, ensurePeersPeriod: defaultEnsurePeersPeriod, requestsSent: cmap.NewCMap(), lastReceivedRequests: cmap.NewCMap(), crawlPeerInfos: make(map[p2p.NodeID]crawlPeerInfo), } r.BaseReactor = *p2p.NewBaseReactor("PEX", r) return r } 

启动PEX

func (r *Reactor) OnStart() error { 
    // 启动reactor err := r.book.Start() if err != nil && err != service.ErrAlreadyStarted { 
    return err } // 检查配置的种子节点格式是否正确,其格式为: 
   
     @ 
    
      : 
      
     
    numOnline, seedAddrs, err := r.checkSeeds() if err != nil { 
    return err } else if numOnline == 0 && r.book.Empty() { 
    return errors.New("address book is empty and couldn't resolve any seed nodes") } r.seedAddrs = seedAddrs // 检查节点以什么方式启动,seed或者crawler if r.config.SeedMode { 
    go r.crawlPeersRoutine() } else { 
    go r.ensurePeersRoutine() } return nil } 

可以看出,节点发现有两种模式,一种是seed模式,一种是crawler模式。

种子模式

func (r *Reactor) crawlPeersRoutine() { 
    // 如果我们有任何种子节点,拨号一次 if len(r.seedAddrs) > 0 { 
    r.dialSeeds() } else { 
    // 做一个最初的查找节点 r.crawlPeers(r.book.GetSelection()) } ticker := time.NewTicker(crawlPeerPeriod) for { 
    select { 
    case <-ticker.C: // 查看每一个peer和自己连接时长,如果超过规定的时间就断开 r.attemptDisconnects() // 在指定范围内随机选取地址,然后进行连接 r.crawlPeers(r.book.GetSelection()) r.cleanupCrawlPeerInfos() case <-r.Quit(): return } } } func (r *Reactor) crawlPeers(addrs []*p2p.NetAddress) { 
    now := time.Now() // addrs 是由GetSelection选出的特定地址,必须满足peer-exchange协议  for _, addr := range addrs { 
    // peer的信息 peerInfo, ok := r.crawlPeerInfos[addr.ID] // 如果上次尝试连接的时间和此处相差不到2分钟,则不进行连接 if ok && now.Sub(peerInfo.LastCrawled) < minTimeBetweenCrawls { 
    continue } // 更新最后一次尝试连接的时间和次数 r.crawlPeerInfos[addr.ID] = crawlPeerInfo{ 
    Addr: addr, LastCrawled: now, } // 尝试和这个地址进行一次连接 err := r.dialPeer(addr) if err != nil { 
    switch err.(type) { 
    case errMaxAttemptsToDial, errTooEarlyToDial, p2p.ErrCurrentlyDialingOrExistingAddress: r.Logger.Debug(err.Error(), "addr", addr) default: r.Logger.Error(err.Error(), "addr", addr) } continue } peer := r.Switch.Peers().Get(addr.ID) if peer != nil { 
    // 如果连接成功了,就向这个地址发送一个报文,这个报文的目的就是请求此peer知道的所有peer地址 r.RequestAddrs(peer) } } } 

crawler模式

// 确保有足够的peer正在连接 func (r *Reactor) ensurePeersRoutine() { 
    var ( seed = tmrand.NewRand() jitter = seed.Int63n(r.ensurePeersPeriod.Nanoseconds()) ) // Randomize first round of communication to avoid thundering herd. // If no peers are present directly start connecting so we guarantee swift // setup with the help of configured seeds. if r.nodeHasSomePeersOrDialingAny() { 
    time.Sleep(time.Duration(jitter)) } // 如果地址簿是空的,那么立马对seed进行拨号,确保有足够的节点连接 r.ensurePeers() // fire periodically ticker := time.NewTicker(r.ensurePeersPeriod) for { 
    select { 
    case <-ticker.C: r.ensurePeers() case <-r.Quit(): ticker.Stop() return } } } func (r *Reactor) ensurePeers() { 
    // 获取当前正在连接的peer信息 var ( out, in, dial = r.Switch.NumPeers() numToDial = r.Switch.MaxNumOutboundPeers() - (out + dial) ) r.Logger.Info( "Ensure peers", "numOutPeers", out, "numInPeers", in, "numDialing", dial, "numToDial", numToDial, ) if numToDial <= 0 { 
    return } // bias to prefer more vetted peers when we have fewer connections. // not perfect, but somewhate ensures that we prioritize connecting to more-vetted // NOTE: range here is [10, 90]. Too high ? newBias := tmmath.MinInt(out, 8)*10 + 10 toDial := make(map[p2p.NodeID]*p2p.NetAddress) // Try maxAttempts times to pick numToDial addresses to dial maxAttempts := numToDial * 3 for i := 0; i < maxAttempts && len(toDial) < numToDial; i++ { 
    // 根据 newBias 随机的从bucket中挑选地址 try := r.book.PickAddress(newBias) if try == nil { 
    continue } if _, selected := toDial[try.ID]; selected { 
    continue } if r.Switch.IsDialingOrExistingAddress(try) { 
    continue } // TODO: consider moving some checks from toDial into here // so we don't even consider dialing peers that we want to wait // before dialing again, or have dialed too many times already r.Logger.Info("Will dial address", "addr", try) toDial[try.ID] = try } // 对挑选的地址进行拨号 for _, addr := range toDial { 
    go func(addr *p2p.NetAddress) { 
    err := r.dialPeer(addr) if err != nil { 
    switch err.(type) { 
    case errMaxAttemptsToDial, errTooEarlyToDial: r.Logger.Debug(err.Error(), "addr", addr) default: r.Logger.Error(err.Error(), "addr", addr) } } }(addr) } if r.book.NeedMoreAddrs() { 
    // 检查被禁止的节点是否可以恢复 r.book.ReinstateBadPeers() } if r.book.NeedMoreAddrs() { 
    // 1) Pick a random peer and ask for more. peers := r.Switch.Peers().List() peersCount := len(peers) if peersCount > 0 { 
    peer := peers[tmrand.Int()%peersCount] r.Logger.Info("We need more addresses. Sending pexRequest to random peer", "peer", peer) r.RequestAddrs(peer) } // 2) Dial seeds if we are not dialing anyone. // This is done in addition to asking a peer for addresses to work-around // peers not participating in PEX. if len(toDial) == 0 { 
    r.Logger.Info("No addresses to dial. Falling back to seeds") // 如果没有对任何地址拨号就对种子节点进行拨号 r.dialSeeds() } } } func (r *Reactor) dialPeer(addr *p2p.NetAddress) error { 
    attempts, lastDialed := r.dialAttemptsInfo(addr) // 如果是禁止的节点或者尝试次数过多,就标记为坏掉的节点 if !r.Switch.IsPeerPersistent(addr) && attempts > maxAttemptsToDial { 
    r.book.MarkBad(addr, defaultBanTime) return errMaxAttemptsToDial{ 
   } } // exponential backoff if it's not our first attempt to dial given address if attempts > 0 { 
    jitter := time.Duration(tmrand.Float64() * float64(time.Second)) // 1s == (1e9 ns) backoffDuration := jitter + ((1 << uint(attempts)) * time.Second) backoffDuration = r.maxBackoffDurationForPeer(addr, backoffDuration) sinceLastDialed := time.Since(lastDialed) if sinceLastDialed < backoffDuration { 
    return errTooEarlyToDial{ 
   backoffDuration, lastDialed} } } // 尝试和这个地址进行一次连接 err := r.Switch.DialPeerWithAddress(addr) if err != nil { 
    if _, ok := err.(p2p.ErrCurrentlyDialingOrExistingAddress); ok { 
    return err } markAddrInBookBasedOnErr(addr, r.book, err) switch err.(type) { 
    case p2p.ErrSwitchAuthenticationFailure: // NOTE: addr is removed from addrbook in markAddrInBookBasedOnErr r.attemptsToDial.Delete(addr.DialString()) default: r.attemptsToDial.Store(addr.DialString(), _attemptsToDial{ 
   attempts + 1, time.Now()}) } return fmt.Errorf("dialing failed (attempts: %d): %w", attempts+1, err) } // cleanup any history r.attemptsToDial.Delete(addr.DialString()) return nil } func (sw *Switch) DialPeerWithAddress(addr *NetAddress) error { 
    // 判断节点是否合法 if sw.IsDialingOrExistingAddress(addr) { 
    return ErrCurrentlyDialingOrExistingAddress{ 
   addr.String()} } sw.dialing.Set(string(addr.ID), addr) defer sw.dialing.Delete(string(addr.ID)) // 添加参数中的节点到outbound return sw.addOutboundPeerWithConfig(addr, sw.config) } 

最后

至此,Tendermint P2P源码就分析完了。如果有错误之处,希望路过的大佬能够指点指点。另外如果有任何问题可以来群里进行交流讨论。群里还有很多视频书籍资料可以自行下载。

在这里插入图片描述最后推荐一位大佬的公众号,欢迎关注哦:区块链技术栈

参考文章:https://gitee.com/wupeaking/tendermint_code_analysis/blob/master/p2p%E6%BA%90%E7%A0%81%E5%88%86%E6%9E%90.md

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请联系我们举报,一经查实,本站将立刻删除。

发布者:全栈程序员-站长,转载请注明出处:https://javaforall.net/211611.html原文链接:https://javaforall.net

(0)
上一篇 2026年3月18日 下午10:02
下一篇 2026年3月18日 下午10:02


相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注全栈程序员社区公众号