eth: don't block if transaction broadcast loop fails (#21255)
* eth: don't block if transaction broadcast loop is returned * eth: kick out peer if we failed to send message * eth: address comment
This commit is contained in:
		@@ -325,7 +325,7 @@ func (pm *ProtocolManager) handle(p *peer) error {
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Register the peer locally
 | 
						// Register the peer locally
 | 
				
			||||||
	if err := pm.peers.Register(p); err != nil {
 | 
						if err := pm.peers.Register(p, pm.removePeer); err != nil {
 | 
				
			||||||
		p.Log().Error("Ethereum peer registration failed", "err", err)
 | 
							p.Log().Error("Ethereum peer registration failed", "err", err)
 | 
				
			||||||
		return err
 | 
							return err
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 
 | 
				
			|||||||
							
								
								
									
										18
									
								
								eth/peer.go
									
									
									
									
									
								
							
							
						
						
									
										18
									
								
								eth/peer.go
									
									
									
									
									
								
							@@ -129,17 +129,19 @@ func newPeer(version int, p *p2p.Peer, rw p2p.MsgReadWriter, getPooledTx func(ha
 | 
				
			|||||||
// broadcastBlocks is a write loop that multiplexes blocks and block accouncements
 | 
					// broadcastBlocks is a write loop that multiplexes blocks and block accouncements
 | 
				
			||||||
// to the remote peer. The goal is to have an async writer that does not lock up
 | 
					// to the remote peer. The goal is to have an async writer that does not lock up
 | 
				
			||||||
// node internals and at the same time rate limits queued data.
 | 
					// node internals and at the same time rate limits queued data.
 | 
				
			||||||
func (p *peer) broadcastBlocks() {
 | 
					func (p *peer) broadcastBlocks(removePeer func(string)) {
 | 
				
			||||||
	for {
 | 
						for {
 | 
				
			||||||
		select {
 | 
							select {
 | 
				
			||||||
		case prop := <-p.queuedBlocks:
 | 
							case prop := <-p.queuedBlocks:
 | 
				
			||||||
			if err := p.SendNewBlock(prop.block, prop.td); err != nil {
 | 
								if err := p.SendNewBlock(prop.block, prop.td); err != nil {
 | 
				
			||||||
 | 
									removePeer(p.id)
 | 
				
			||||||
				return
 | 
									return
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
			p.Log().Trace("Propagated block", "number", prop.block.Number(), "hash", prop.block.Hash(), "td", prop.td)
 | 
								p.Log().Trace("Propagated block", "number", prop.block.Number(), "hash", prop.block.Hash(), "td", prop.td)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		case block := <-p.queuedBlockAnns:
 | 
							case block := <-p.queuedBlockAnns:
 | 
				
			||||||
			if err := p.SendNewBlockHashes([]common.Hash{block.Hash()}, []uint64{block.NumberU64()}); err != nil {
 | 
								if err := p.SendNewBlockHashes([]common.Hash{block.Hash()}, []uint64{block.NumberU64()}); err != nil {
 | 
				
			||||||
 | 
									removePeer(p.id)
 | 
				
			||||||
				return
 | 
									return
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
			p.Log().Trace("Announced block", "number", block.Number(), "hash", block.Hash())
 | 
								p.Log().Trace("Announced block", "number", block.Number(), "hash", block.Hash())
 | 
				
			||||||
@@ -153,7 +155,7 @@ func (p *peer) broadcastBlocks() {
 | 
				
			|||||||
// broadcastTransactions is a write loop that schedules transaction broadcasts
 | 
					// broadcastTransactions is a write loop that schedules transaction broadcasts
 | 
				
			||||||
// to the remote peer. The goal is to have an async writer that does not lock up
 | 
					// to the remote peer. The goal is to have an async writer that does not lock up
 | 
				
			||||||
// node internals and at the same time rate limits queued data.
 | 
					// node internals and at the same time rate limits queued data.
 | 
				
			||||||
func (p *peer) broadcastTransactions() {
 | 
					func (p *peer) broadcastTransactions(removePeer func(string)) {
 | 
				
			||||||
	var (
 | 
						var (
 | 
				
			||||||
		queue []common.Hash         // Queue of hashes to broadcast as full transactions
 | 
							queue []common.Hash         // Queue of hashes to broadcast as full transactions
 | 
				
			||||||
		done  chan struct{}         // Non-nil if background broadcaster is running
 | 
							done  chan struct{}         // Non-nil if background broadcaster is running
 | 
				
			||||||
@@ -204,6 +206,7 @@ func (p *peer) broadcastTransactions() {
 | 
				
			|||||||
			done = nil
 | 
								done = nil
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		case <-fail:
 | 
							case <-fail:
 | 
				
			||||||
 | 
								removePeer(p.id)
 | 
				
			||||||
			return
 | 
								return
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		case <-p.term:
 | 
							case <-p.term:
 | 
				
			||||||
@@ -215,7 +218,7 @@ func (p *peer) broadcastTransactions() {
 | 
				
			|||||||
// announceTransactions is a write loop that schedules transaction broadcasts
 | 
					// announceTransactions is a write loop that schedules transaction broadcasts
 | 
				
			||||||
// to the remote peer. The goal is to have an async writer that does not lock up
 | 
					// to the remote peer. The goal is to have an async writer that does not lock up
 | 
				
			||||||
// node internals and at the same time rate limits queued data.
 | 
					// node internals and at the same time rate limits queued data.
 | 
				
			||||||
func (p *peer) announceTransactions() {
 | 
					func (p *peer) announceTransactions(removePeer func(string)) {
 | 
				
			||||||
	var (
 | 
						var (
 | 
				
			||||||
		queue []common.Hash         // Queue of hashes to announce as transaction stubs
 | 
							queue []common.Hash         // Queue of hashes to announce as transaction stubs
 | 
				
			||||||
		done  chan struct{}         // Non-nil if background announcer is running
 | 
							done  chan struct{}         // Non-nil if background announcer is running
 | 
				
			||||||
@@ -266,6 +269,7 @@ func (p *peer) announceTransactions() {
 | 
				
			|||||||
			done = nil
 | 
								done = nil
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		case <-fail:
 | 
							case <-fail:
 | 
				
			||||||
 | 
								removePeer(p.id)
 | 
				
			||||||
			return
 | 
								return
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		case <-p.term:
 | 
							case <-p.term:
 | 
				
			||||||
@@ -706,7 +710,7 @@ func newPeerSet() *peerSet {
 | 
				
			|||||||
// Register injects a new peer into the working set, or returns an error if the
 | 
					// Register injects a new peer into the working set, or returns an error if the
 | 
				
			||||||
// peer is already known. If a new peer it registered, its broadcast loop is also
 | 
					// peer is already known. If a new peer it registered, its broadcast loop is also
 | 
				
			||||||
// started.
 | 
					// started.
 | 
				
			||||||
func (ps *peerSet) Register(p *peer) error {
 | 
					func (ps *peerSet) Register(p *peer, removePeer func(string)) error {
 | 
				
			||||||
	ps.lock.Lock()
 | 
						ps.lock.Lock()
 | 
				
			||||||
	defer ps.lock.Unlock()
 | 
						defer ps.lock.Unlock()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -718,10 +722,10 @@ func (ps *peerSet) Register(p *peer) error {
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
	ps.peers[p.id] = p
 | 
						ps.peers[p.id] = p
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	go p.broadcastBlocks()
 | 
						go p.broadcastBlocks(removePeer)
 | 
				
			||||||
	go p.broadcastTransactions()
 | 
						go p.broadcastTransactions(removePeer)
 | 
				
			||||||
	if p.version >= eth65 {
 | 
						if p.version >= eth65 {
 | 
				
			||||||
		go p.announceTransactions()
 | 
							go p.announceTransactions(removePeer)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	return nil
 | 
						return nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user