eth: implement the NewBlockHashes protocol proposal
This commit is contained in:
		
							
								
								
									
										180
									
								
								eth/handler.go
									
									
									
									
									
								
							
							
						
						
									
										180
									
								
								eth/handler.go
									
									
									
									
									
								
							@@ -2,6 +2,7 @@ package eth
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
import (
 | 
					import (
 | 
				
			||||||
	"fmt"
 | 
						"fmt"
 | 
				
			||||||
 | 
						"math"
 | 
				
			||||||
	"math/big"
 | 
						"math/big"
 | 
				
			||||||
	"sync"
 | 
						"sync"
 | 
				
			||||||
	"time"
 | 
						"time"
 | 
				
			||||||
@@ -20,6 +21,7 @@ import (
 | 
				
			|||||||
const (
 | 
					const (
 | 
				
			||||||
	forceSyncCycle      = 10 * time.Second       // Time interval to force syncs, even if few peers are available
 | 
						forceSyncCycle      = 10 * time.Second       // Time interval to force syncs, even if few peers are available
 | 
				
			||||||
	blockProcCycle      = 500 * time.Millisecond // Time interval to check for new blocks to process
 | 
						blockProcCycle      = 500 * time.Millisecond // Time interval to check for new blocks to process
 | 
				
			||||||
 | 
						blockArrivalTimeout = 500 * time.Millisecond // Time allowance before an announced block is explicitly requested
 | 
				
			||||||
	minDesiredPeerCount = 5                      // Amount of peers desired to start syncing
 | 
						minDesiredPeerCount = 5                      // Amount of peers desired to start syncing
 | 
				
			||||||
	blockProcAmount     = 256
 | 
						blockProcAmount     = 256
 | 
				
			||||||
)
 | 
					)
 | 
				
			||||||
@@ -186,7 +188,6 @@ func (self *ProtocolManager) handleMsg(p *peer) error {
 | 
				
			|||||||
	defer msg.Discard()
 | 
						defer msg.Discard()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	switch msg.Code {
 | 
						switch msg.Code {
 | 
				
			||||||
	case GetTxMsg: // ignore
 | 
					 | 
				
			||||||
	case StatusMsg:
 | 
						case StatusMsg:
 | 
				
			||||||
		return errResp(ErrExtraStatusMsg, "uncontrolled status message")
 | 
							return errResp(ErrExtraStatusMsg, "uncontrolled status message")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -227,6 +228,7 @@ func (self *ProtocolManager) handleMsg(p *peer) error {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
		// returns either requested hashes or nothing (i.e. not found)
 | 
							// returns either requested hashes or nothing (i.e. not found)
 | 
				
			||||||
		return p.sendBlockHashes(hashes)
 | 
							return p.sendBlockHashes(hashes)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	case BlockHashesMsg:
 | 
						case BlockHashesMsg:
 | 
				
			||||||
		msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size))
 | 
							msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -266,6 +268,7 @@ func (self *ProtocolManager) handleMsg(p *peer) error {
 | 
				
			|||||||
			}
 | 
								}
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		return p.sendBlocks(blocks)
 | 
							return p.sendBlocks(blocks)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	case BlocksMsg:
 | 
						case BlocksMsg:
 | 
				
			||||||
		var blocks []*types.Block
 | 
							var blocks []*types.Block
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -274,7 +277,57 @@ func (self *ProtocolManager) handleMsg(p *peer) error {
 | 
				
			|||||||
			glog.V(logger.Detail).Infoln("Decode error", err)
 | 
								glog.V(logger.Detail).Infoln("Decode error", err)
 | 
				
			||||||
			blocks = nil
 | 
								blocks = nil
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		self.downloader.DeliverBlocks(p.id, blocks)
 | 
					
 | 
				
			||||||
 | 
							// Either deliver to the downloader or the importer
 | 
				
			||||||
 | 
							if self.downloader.Synchronising() {
 | 
				
			||||||
 | 
								self.downloader.DeliverBlocks(p.id, blocks)
 | 
				
			||||||
 | 
							} else {
 | 
				
			||||||
 | 
								for _, block := range blocks {
 | 
				
			||||||
 | 
									if err := self.importBlock(p, block, nil); err != nil {
 | 
				
			||||||
 | 
										return err
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						case NewBlockHashesMsg:
 | 
				
			||||||
 | 
							// Retrieve and deseralize the remote new block hashes notification
 | 
				
			||||||
 | 
							msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							var hashes []common.Hash
 | 
				
			||||||
 | 
							if err := msgStream.Decode(&hashes); err != nil {
 | 
				
			||||||
 | 
								break
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							// Mark the hashes as present at the remote node
 | 
				
			||||||
 | 
							for _, hash := range hashes {
 | 
				
			||||||
 | 
								p.blockHashes.Add(hash)
 | 
				
			||||||
 | 
								p.recentHash = hash
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							// Wait a bit for potentially receiving the blocks, fetch if not
 | 
				
			||||||
 | 
							go func() {
 | 
				
			||||||
 | 
								time.Sleep(blockArrivalTimeout)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								// Drop all the hashes that are already known
 | 
				
			||||||
 | 
								unknown := make([]common.Hash, 0, len(hashes))
 | 
				
			||||||
 | 
								for _, hash := range hashes {
 | 
				
			||||||
 | 
									if !self.chainman.HasBlock(hash) {
 | 
				
			||||||
 | 
										unknown = append(unknown, hash)
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								if len(unknown) == 0 {
 | 
				
			||||||
 | 
									return
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								// Retrieve all the unknown hashes
 | 
				
			||||||
 | 
								if err := p.requestBlocks(unknown); err != nil {
 | 
				
			||||||
 | 
									glog.V(logger.Debug).Infof("%s: failed to request blocks: %v", p.id, err)
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								if glog.V(logger.Detail) {
 | 
				
			||||||
 | 
									hashes := make([]string, len(unknown))
 | 
				
			||||||
 | 
									for i, hash := range unknown {
 | 
				
			||||||
 | 
										hashes[i] = fmt.Sprintf("%x", hash[:4])
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
 | 
									glog.Infof("%s: requested blocks explicitly: %v", p.id, hashes)
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
							}()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	case NewBlockMsg:
 | 
						case NewBlockMsg:
 | 
				
			||||||
		var request newBlockMsgData
 | 
							var request newBlockMsgData
 | 
				
			||||||
@@ -286,83 +339,86 @@ func (self *ProtocolManager) handleMsg(p *peer) error {
 | 
				
			|||||||
		}
 | 
							}
 | 
				
			||||||
		request.Block.ReceivedAt = msg.ReceivedAt
 | 
							request.Block.ReceivedAt = msg.ReceivedAt
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		hash := request.Block.Hash()
 | 
							if err := self.importBlock(p, request.Block, request.TD); err != nil {
 | 
				
			||||||
		// Add the block hash as a known hash to the peer. This will later be used to determine
 | 
								return err
 | 
				
			||||||
		// who should receive this.
 | 
					 | 
				
			||||||
		p.blockHashes.Add(hash)
 | 
					 | 
				
			||||||
		// update the peer info
 | 
					 | 
				
			||||||
		p.recentHash = hash
 | 
					 | 
				
			||||||
		p.td = request.TD
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
		_, chainHead, _ := self.chainman.Status()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
		jsonlogger.LogJson(&logger.EthChainReceivedNewBlock{
 | 
					 | 
				
			||||||
			BlockHash:     hash.Hex(),
 | 
					 | 
				
			||||||
			BlockNumber:   request.Block.Number(), // this surely must be zero
 | 
					 | 
				
			||||||
			ChainHeadHash: chainHead.Hex(),
 | 
					 | 
				
			||||||
			BlockPrevHash: request.Block.ParentHash().Hex(),
 | 
					 | 
				
			||||||
			RemoteId:      p.ID().String(),
 | 
					 | 
				
			||||||
		})
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
		// Make sure the block isn't already known. If this is the case simply drop
 | 
					 | 
				
			||||||
		// the message and move on. If the TD is < currentTd; drop it as well. If this
 | 
					 | 
				
			||||||
		// chain at some point becomes canonical, the downloader will fetch it.
 | 
					 | 
				
			||||||
		if self.chainman.HasBlock(hash) {
 | 
					 | 
				
			||||||
			break
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
		if self.chainman.Td().Cmp(request.TD) > 0 && new(big.Int).Add(request.Block.Number(), big.NewInt(7)).Cmp(self.chainman.CurrentBlock().Number()) < 0 {
 | 
					 | 
				
			||||||
			glog.V(logger.Debug).Infof("[%s] dropped block %v due to low TD %v\n", p.id, request.Block.Number(), request.TD)
 | 
					 | 
				
			||||||
			break
 | 
					 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		// Attempt to insert the newly received by checking if the parent exists.
 | 
					 | 
				
			||||||
		// if the parent exists we process the block and propagate to our peers
 | 
					 | 
				
			||||||
		// otherwise synchronize with the peer
 | 
					 | 
				
			||||||
		if self.chainman.HasBlock(request.Block.ParentHash()) {
 | 
					 | 
				
			||||||
			if _, err := self.chainman.InsertChain(types.Blocks{request.Block}); err != nil {
 | 
					 | 
				
			||||||
				glog.V(logger.Error).Infoln("removed peer (", p.id, ") due to block error")
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
				self.removePeer(p.id)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
				return nil
 | 
					 | 
				
			||||||
			}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
			if err := self.verifyTd(p, request); err != nil {
 | 
					 | 
				
			||||||
				glog.V(logger.Error).Infoln(err)
 | 
					 | 
				
			||||||
				// XXX for now return nil so it won't disconnect (we should in the future)
 | 
					 | 
				
			||||||
				return nil
 | 
					 | 
				
			||||||
			}
 | 
					 | 
				
			||||||
			self.BroadcastBlock(hash, request.Block)
 | 
					 | 
				
			||||||
		} else {
 | 
					 | 
				
			||||||
			go self.synchronise(p)
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	default:
 | 
						default:
 | 
				
			||||||
		return errResp(ErrInvalidMsgCode, "%v", msg.Code)
 | 
							return errResp(ErrInvalidMsgCode, "%v", msg.Code)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	return nil
 | 
						return nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (pm *ProtocolManager) verifyTd(peer *peer, request newBlockMsgData) error {
 | 
					// importBlocks injects a new block retrieved from the given peer into the chain
 | 
				
			||||||
	if request.Block.Td.Cmp(request.TD) != 0 {
 | 
					// manager.
 | 
				
			||||||
		glog.V(logger.Detail).Infoln(peer)
 | 
					func (pm *ProtocolManager) importBlock(p *peer, block *types.Block, td *big.Int) error {
 | 
				
			||||||
 | 
						hash := block.Hash()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		return fmt.Errorf("invalid TD on block(%v) from peer(%s): block.td=%v, request.td=%v", request.Block.Number(), peer.id, request.Block.Td, request.TD)
 | 
						// Mark the block as present at the remote node (don't duplicate already held data)
 | 
				
			||||||
 | 
						p.blockHashes.Add(hash)
 | 
				
			||||||
 | 
						p.recentHash = hash
 | 
				
			||||||
 | 
						if td != nil {
 | 
				
			||||||
 | 
							p.td = td
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						// Log the block's arrival
 | 
				
			||||||
 | 
						_, chainHead, _ := pm.chainman.Status()
 | 
				
			||||||
 | 
						jsonlogger.LogJson(&logger.EthChainReceivedNewBlock{
 | 
				
			||||||
 | 
							BlockHash:     hash.Hex(),
 | 
				
			||||||
 | 
							BlockNumber:   block.Number(), // this surely must be zero
 | 
				
			||||||
 | 
							ChainHeadHash: chainHead.Hex(),
 | 
				
			||||||
 | 
							BlockPrevHash: block.ParentHash().Hex(),
 | 
				
			||||||
 | 
							RemoteId:      p.ID().String(),
 | 
				
			||||||
 | 
						})
 | 
				
			||||||
 | 
						// If the block's already known or its difficulty is lower than ours, drop
 | 
				
			||||||
 | 
						if pm.chainman.HasBlock(hash) {
 | 
				
			||||||
 | 
							p.td = pm.chainman.GetBlock(hash).Td // update the peer's TD to the real value
 | 
				
			||||||
 | 
							return nil
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						if td != nil && pm.chainman.Td().Cmp(td) > 0 && new(big.Int).Add(block.Number(), big.NewInt(7)).Cmp(pm.chainman.CurrentBlock().Number()) < 0 {
 | 
				
			||||||
 | 
							glog.V(logger.Debug).Infof("[%s] dropped block %v due to low TD %v\n", p.id, block.Number(), td)
 | 
				
			||||||
 | 
							return nil
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						// Attempt to insert the newly received block and propagate to our peers
 | 
				
			||||||
 | 
						if pm.chainman.HasBlock(block.ParentHash()) {
 | 
				
			||||||
 | 
							if _, err := pm.chainman.InsertChain(types.Blocks{block}); err != nil {
 | 
				
			||||||
 | 
								glog.V(logger.Error).Infoln("removed peer (", p.id, ") due to block error", err)
 | 
				
			||||||
 | 
								return err
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							if td != nil && block.Td.Cmp(td) != 0 {
 | 
				
			||||||
 | 
								err := fmt.Errorf("invalid TD on block(%v) from peer(%s): block.td=%v, request.td=%v", block.Number(), p.id, block.Td, td)
 | 
				
			||||||
 | 
								glog.V(logger.Error).Infoln(err)
 | 
				
			||||||
 | 
								return err
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							pm.BroadcastBlock(hash, block)
 | 
				
			||||||
 | 
							return nil
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						// Parent of the block is unknown, try to sync with this peer if it seems to be good
 | 
				
			||||||
 | 
						if td != nil {
 | 
				
			||||||
 | 
							go pm.synchronise(p)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					 | 
				
			||||||
	return nil
 | 
						return nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// BroadcastBlock will propagate the block to its connected peers. It will sort
 | 
					// BroadcastBlock will propagate the block to a subset of its connected peers,
 | 
				
			||||||
// out which peers do not contain the block in their block set and will do a
 | 
					// only notifying the rest of the block's appearance.
 | 
				
			||||||
// sqrt(peers) to determine the amount of peers we broadcast to.
 | 
					 | 
				
			||||||
func (pm *ProtocolManager) BroadcastBlock(hash common.Hash, block *types.Block) {
 | 
					func (pm *ProtocolManager) BroadcastBlock(hash common.Hash, block *types.Block) {
 | 
				
			||||||
	// Broadcast block to a batch of peers not knowing about it
 | 
						// Retrieve all the target peers and split between full broadcast or only notification
 | 
				
			||||||
	peers := pm.peers.PeersWithoutBlock(hash)
 | 
						peers := pm.peers.PeersWithoutBlock(hash)
 | 
				
			||||||
	//peers = peers[:int(math.Sqrt(float64(len(peers))))]
 | 
						split := int(math.Sqrt(float64(len(peers))))
 | 
				
			||||||
	for _, peer := range peers {
 | 
					
 | 
				
			||||||
 | 
						transfer := peers[:split]
 | 
				
			||||||
 | 
						nofity := peers[split:]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// Send out the data transfers and the notifications
 | 
				
			||||||
 | 
						for _, peer := range nofity {
 | 
				
			||||||
 | 
							peer.sendNewBlockHashes([]common.Hash{hash})
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						glog.V(logger.Detail).Infoln("broadcast hash to", len(nofity), "peers.")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						for _, peer := range transfer {
 | 
				
			||||||
		peer.sendNewBlock(block)
 | 
							peer.sendNewBlock(block)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	glog.V(logger.Detail).Infoln("broadcast block to", len(peers), "peers. Total processing time:", time.Since(block.ReceivedAt))
 | 
						glog.V(logger.Detail).Infoln("broadcast block to", len(transfer), "peers. Total processing time:", time.Since(block.ReceivedAt))
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// BroadcastTx will propagate the block to its connected peers. It will sort
 | 
					// BroadcastTx will propagate the block to its connected peers. It will sort
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -88,6 +88,10 @@ func (p *peer) sendBlocks(blocks []*types.Block) error {
 | 
				
			|||||||
	return p2p.Send(p.rw, BlocksMsg, blocks)
 | 
						return p2p.Send(p.rw, BlocksMsg, blocks)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (p *peer) sendNewBlockHashes(hashes []common.Hash) error {
 | 
				
			||||||
 | 
						return p2p.Send(p.rw, NewBlockHashesMsg, hashes)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (p *peer) sendNewBlock(block *types.Block) error {
 | 
					func (p *peer) sendNewBlock(block *types.Block) error {
 | 
				
			||||||
	p.blockHashes.Add(block.Hash())
 | 
						p.blockHashes.Add(block.Hash())
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -17,7 +17,7 @@ const (
 | 
				
			|||||||
// eth protocol message codes
 | 
					// eth protocol message codes
 | 
				
			||||||
const (
 | 
					const (
 | 
				
			||||||
	StatusMsg = iota
 | 
						StatusMsg = iota
 | 
				
			||||||
	GetTxMsg  // unused
 | 
						NewBlockHashesMsg
 | 
				
			||||||
	TxMsg
 | 
						TxMsg
 | 
				
			||||||
	GetBlockHashesMsg
 | 
						GetBlockHashesMsg
 | 
				
			||||||
	BlockHashesMsg
 | 
						BlockHashesMsg
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user