eth, eth/fetcher: separate notification sync mechanism
This commit is contained in:
		@@ -1,3 +1,4 @@
 | 
				
			|||||||
 | 
					// Package downloader contains the manual full chain synchronisation.
 | 
				
			||||||
package downloader
 | 
					package downloader
 | 
				
			||||||
 | 
					
 | 
				
			||||||
import (
 | 
					import (
 | 
				
			||||||
 
 | 
				
			|||||||
							
								
								
									
										258
									
								
								eth/fetcher/fetcher.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										258
									
								
								eth/fetcher/fetcher.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,258 @@
 | 
				
			|||||||
 | 
					// Package fetcher contains the block announcement based synchonisation.
 | 
				
			||||||
 | 
					package fetcher
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import (
 | 
				
			||||||
 | 
						"errors"
 | 
				
			||||||
 | 
						"math/rand"
 | 
				
			||||||
 | 
						"time"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						"github.com/ethereum/go-ethereum/common"
 | 
				
			||||||
 | 
						"github.com/ethereum/go-ethereum/core/types"
 | 
				
			||||||
 | 
						"github.com/ethereum/go-ethereum/logger"
 | 
				
			||||||
 | 
						"github.com/ethereum/go-ethereum/logger/glog"
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					const (
 | 
				
			||||||
 | 
						arriveTimeout = 500 * time.Millisecond // Time allowance before an announced block is explicitly requested
 | 
				
			||||||
 | 
						fetchTimeout  = 5 * time.Second        // Maximum alloted time to return an explicitly requested block
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					var (
 | 
				
			||||||
 | 
						errTerminated = errors.New("terminated")
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// hashCheckFn is a callback type for verifying a hash's presence in the local chain.
 | 
				
			||||||
 | 
					type hashCheckFn func(common.Hash) bool
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// blockRequesterFn is a callback type for sending a block retrieval request.
 | 
				
			||||||
 | 
					type blockRequesterFn func([]common.Hash) error
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// blockImporterFn is a callback type for trying to inject a block into the local chain.
 | 
				
			||||||
 | 
					type blockImporterFn func(peer string, block *types.Block) error
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// announce is the hash notification of the availability of a new block in the
 | 
				
			||||||
 | 
					// network.
 | 
				
			||||||
 | 
					type announce struct {
 | 
				
			||||||
 | 
						hash common.Hash // Hash of the block being announced
 | 
				
			||||||
 | 
						time time.Time   // Timestamp of the announcement
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						origin string           // Identifier of the peer originating the notification
 | 
				
			||||||
 | 
						fetch  blockRequesterFn // Fetcher function to retrieve
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Fetcher is responsible for accumulating block announcements from various peers
 | 
				
			||||||
 | 
					// and scheduling them for retrieval.
 | 
				
			||||||
 | 
					type Fetcher struct {
 | 
				
			||||||
 | 
						// Various event channels
 | 
				
			||||||
 | 
						notify chan *announce
 | 
				
			||||||
 | 
						filter chan chan []*types.Block
 | 
				
			||||||
 | 
						quit   chan struct{}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// Callbacks
 | 
				
			||||||
 | 
						hasBlock    hashCheckFn     // Checks if a block is present in the chain
 | 
				
			||||||
 | 
						importBlock blockImporterFn // Injects a block from an origin peer into the chain
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// New creates a block fetcher to retrieve blocks based on hash announcements.
 | 
				
			||||||
 | 
					func New(hasBlock hashCheckFn, importBlock blockImporterFn) *Fetcher {
 | 
				
			||||||
 | 
						return &Fetcher{
 | 
				
			||||||
 | 
							notify:      make(chan *announce),
 | 
				
			||||||
 | 
							filter:      make(chan chan []*types.Block),
 | 
				
			||||||
 | 
							quit:        make(chan struct{}),
 | 
				
			||||||
 | 
							hasBlock:    hasBlock,
 | 
				
			||||||
 | 
							importBlock: importBlock,
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Start boots up the announcement based synchoniser, accepting and processing
 | 
				
			||||||
 | 
					// hash notifications and block fetches until termination requested.
 | 
				
			||||||
 | 
					func (f *Fetcher) Start() {
 | 
				
			||||||
 | 
						go f.loop()
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Stop terminates the announcement based synchroniser, canceling all pending
 | 
				
			||||||
 | 
					// operations.
 | 
				
			||||||
 | 
					func (f *Fetcher) Stop() {
 | 
				
			||||||
 | 
						close(f.quit)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Notify announces the fetcher of the potential availability of a new block in
 | 
				
			||||||
 | 
					// the network.
 | 
				
			||||||
 | 
					func (f *Fetcher) Notify(peer string, hash common.Hash, time time.Time, fetcher blockRequesterFn) error {
 | 
				
			||||||
 | 
						block := &announce{
 | 
				
			||||||
 | 
							hash:   hash,
 | 
				
			||||||
 | 
							time:   time,
 | 
				
			||||||
 | 
							origin: peer,
 | 
				
			||||||
 | 
							fetch:  fetcher,
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						select {
 | 
				
			||||||
 | 
						case f.notify <- block:
 | 
				
			||||||
 | 
							return nil
 | 
				
			||||||
 | 
						case <-f.quit:
 | 
				
			||||||
 | 
							return errTerminated
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Filter extracts all the blocks that were explicitly requested by the fetcher,
 | 
				
			||||||
 | 
					// returning those that should be handled differently.
 | 
				
			||||||
 | 
					func (f *Fetcher) Filter(blocks types.Blocks) types.Blocks {
 | 
				
			||||||
 | 
						// Send the filter channel to the fetcher
 | 
				
			||||||
 | 
						filter := make(chan []*types.Block)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						select {
 | 
				
			||||||
 | 
						case f.filter <- filter:
 | 
				
			||||||
 | 
						case <-f.quit:
 | 
				
			||||||
 | 
							return nil
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						// Request the filtering of the block list
 | 
				
			||||||
 | 
						select {
 | 
				
			||||||
 | 
						case filter <- blocks:
 | 
				
			||||||
 | 
						case <-f.quit:
 | 
				
			||||||
 | 
							return nil
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						// Retrieve the blocks remaining after filtering
 | 
				
			||||||
 | 
						select {
 | 
				
			||||||
 | 
						case blocks := <-filter:
 | 
				
			||||||
 | 
							return blocks
 | 
				
			||||||
 | 
						case <-f.quit:
 | 
				
			||||||
 | 
							return nil
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// Loop is the main fetcher loop, checking and processing various notification
 | 
				
			||||||
 | 
					// events.
 | 
				
			||||||
 | 
					func (f *Fetcher) loop() {
 | 
				
			||||||
 | 
						announced := make(map[common.Hash][]*announce)
 | 
				
			||||||
 | 
						fetching := make(map[common.Hash]*announce)
 | 
				
			||||||
 | 
						fetch := time.NewTimer(0)
 | 
				
			||||||
 | 
						done := make(chan common.Hash)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// Iterate the block fetching until a quit is requested
 | 
				
			||||||
 | 
						for {
 | 
				
			||||||
 | 
							// Clean up any expired block fetches
 | 
				
			||||||
 | 
							for hash, announce := range fetching {
 | 
				
			||||||
 | 
								if time.Since(announce.time) > fetchTimeout {
 | 
				
			||||||
 | 
									delete(announced, hash)
 | 
				
			||||||
 | 
									delete(fetching, hash)
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							// Wait for an outside event to occur
 | 
				
			||||||
 | 
							select {
 | 
				
			||||||
 | 
							case <-f.quit:
 | 
				
			||||||
 | 
								// Fetcher terminating, abort all operations
 | 
				
			||||||
 | 
								return
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							case notification := <-f.notify:
 | 
				
			||||||
 | 
								// A block was announced, schedule if it's not yet downloading
 | 
				
			||||||
 | 
								glog.V(logger.Debug).Infof("Peer %s: scheduling %x", notification.origin, notification.hash[:4])
 | 
				
			||||||
 | 
								if _, ok := fetching[notification.hash]; ok {
 | 
				
			||||||
 | 
									break
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								if len(announced) == 0 {
 | 
				
			||||||
 | 
									fetch.Reset(arriveTimeout)
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								announced[notification.hash] = append(announced[notification.hash], notification)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							case hash := <-done:
 | 
				
			||||||
 | 
								// A pending import finished, remove all traces of the notification
 | 
				
			||||||
 | 
								delete(announced, hash)
 | 
				
			||||||
 | 
								delete(fetching, hash)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							case <-fetch.C:
 | 
				
			||||||
 | 
								// At least one block's timer ran out, check for needing retrieval
 | 
				
			||||||
 | 
								request := make(map[string][]common.Hash)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								for hash, announces := range announced {
 | 
				
			||||||
 | 
									if time.Since(announces[0].time) > arriveTimeout {
 | 
				
			||||||
 | 
										announce := announces[rand.Intn(len(announces))]
 | 
				
			||||||
 | 
										if !f.hasBlock(hash) {
 | 
				
			||||||
 | 
											request[announce.origin] = append(request[announce.origin], hash)
 | 
				
			||||||
 | 
											fetching[hash] = announce
 | 
				
			||||||
 | 
										}
 | 
				
			||||||
 | 
										delete(announced, hash)
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								// Send out all block requests
 | 
				
			||||||
 | 
								for peer, hashes := range request {
 | 
				
			||||||
 | 
									glog.V(logger.Debug).Infof("Peer %s: explicitly fetching %d blocks", peer, len(hashes))
 | 
				
			||||||
 | 
									go fetching[hashes[0]].fetch(hashes)
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								// Schedule the next fetch if blocks are still pending
 | 
				
			||||||
 | 
								if len(announced) > 0 {
 | 
				
			||||||
 | 
									nearest := time.Now()
 | 
				
			||||||
 | 
									for _, announces := range announced {
 | 
				
			||||||
 | 
										if nearest.Before(announces[0].time) {
 | 
				
			||||||
 | 
											nearest = announces[0].time
 | 
				
			||||||
 | 
										}
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
 | 
									fetch.Reset(arriveTimeout + time.Since(nearest))
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							case filter := <-f.filter:
 | 
				
			||||||
 | 
								// Blocks arrived, extract any explicit fetches, return all else
 | 
				
			||||||
 | 
								var blocks types.Blocks
 | 
				
			||||||
 | 
								select {
 | 
				
			||||||
 | 
								case blocks = <-filter:
 | 
				
			||||||
 | 
								case <-f.quit:
 | 
				
			||||||
 | 
									return
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								explicit, download := []*types.Block{}, []*types.Block{}
 | 
				
			||||||
 | 
								for _, block := range blocks {
 | 
				
			||||||
 | 
									hash := block.Hash()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
									// Filter explicitly requested blocks from hash announcements
 | 
				
			||||||
 | 
									if _, ok := fetching[hash]; ok {
 | 
				
			||||||
 | 
										// Discard if already imported by other means
 | 
				
			||||||
 | 
										if !f.hasBlock(hash) {
 | 
				
			||||||
 | 
											explicit = append(explicit, block)
 | 
				
			||||||
 | 
										} else {
 | 
				
			||||||
 | 
											delete(fetching, hash)
 | 
				
			||||||
 | 
										}
 | 
				
			||||||
 | 
									} else {
 | 
				
			||||||
 | 
										download = append(download, block)
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								select {
 | 
				
			||||||
 | 
								case filter <- download:
 | 
				
			||||||
 | 
								case <-f.quit:
 | 
				
			||||||
 | 
									return
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								// Create a closure with the retrieved blocks and origin peers
 | 
				
			||||||
 | 
								peers := make([]string, 0, len(explicit))
 | 
				
			||||||
 | 
								blocks = make([]*types.Block, 0, len(explicit))
 | 
				
			||||||
 | 
								for _, block := range explicit {
 | 
				
			||||||
 | 
									hash := block.Hash()
 | 
				
			||||||
 | 
									if announce := fetching[hash]; announce != nil {
 | 
				
			||||||
 | 
										// Drop the block if it surely cannot fit
 | 
				
			||||||
 | 
										if f.hasBlock(hash) || !f.hasBlock(block.ParentHash()) {
 | 
				
			||||||
 | 
											// delete(fetching, hash) // if we drop, it will re-fetch it, wait for timeout?
 | 
				
			||||||
 | 
											continue
 | 
				
			||||||
 | 
										}
 | 
				
			||||||
 | 
										// Otherwise accumulate for import
 | 
				
			||||||
 | 
										peers = append(peers, announce.origin)
 | 
				
			||||||
 | 
										blocks = append(blocks, block)
 | 
				
			||||||
 | 
									}
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
								// If any explicit fetches were replied to, import them
 | 
				
			||||||
 | 
								if count := len(blocks); count > 0 {
 | 
				
			||||||
 | 
									glog.V(logger.Debug).Infof("Importing %d explicitly fetched blocks", len(blocks))
 | 
				
			||||||
 | 
									go func() {
 | 
				
			||||||
 | 
										// Make sure all hashes are cleaned up
 | 
				
			||||||
 | 
										for _, block := range blocks {
 | 
				
			||||||
 | 
											hash := block.Hash()
 | 
				
			||||||
 | 
											defer func() { done <- hash }()
 | 
				
			||||||
 | 
										}
 | 
				
			||||||
 | 
										// Try and actually import the blocks
 | 
				
			||||||
 | 
										for i := 0; i < len(blocks); i++ {
 | 
				
			||||||
 | 
											if err := f.importBlock(peers[i], blocks[i]); err != nil {
 | 
				
			||||||
 | 
												glog.V(logger.Detail).Infof("Failed to import explicitly fetched block: %v", err)
 | 
				
			||||||
 | 
												return
 | 
				
			||||||
 | 
											}
 | 
				
			||||||
 | 
										}
 | 
				
			||||||
 | 
									}()
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
@@ -7,6 +7,8 @@ import (
 | 
				
			|||||||
	"sync"
 | 
						"sync"
 | 
				
			||||||
	"time"
 | 
						"time"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						"github.com/ethereum/go-ethereum/eth/fetcher"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	"github.com/ethereum/go-ethereum/common"
 | 
						"github.com/ethereum/go-ethereum/common"
 | 
				
			||||||
	"github.com/ethereum/go-ethereum/core"
 | 
						"github.com/ethereum/go-ethereum/core"
 | 
				
			||||||
	"github.com/ethereum/go-ethereum/core/types"
 | 
						"github.com/ethereum/go-ethereum/core/types"
 | 
				
			||||||
@@ -45,6 +47,7 @@ type ProtocolManager struct {
 | 
				
			|||||||
	txpool         txPool
 | 
						txpool         txPool
 | 
				
			||||||
	chainman       *core.ChainManager
 | 
						chainman       *core.ChainManager
 | 
				
			||||||
	downloader     *downloader.Downloader
 | 
						downloader     *downloader.Downloader
 | 
				
			||||||
 | 
						fetcher        *fetcher.Fetcher
 | 
				
			||||||
	peers          *peerSet
 | 
						peers          *peerSet
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	SubProtocol p2p.Protocol
 | 
						SubProtocol p2p.Protocol
 | 
				
			||||||
@@ -55,8 +58,6 @@ type ProtocolManager struct {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	// channels for fetcher, syncer, txsyncLoop
 | 
						// channels for fetcher, syncer, txsyncLoop
 | 
				
			||||||
	newPeerCh chan *peer
 | 
						newPeerCh chan *peer
 | 
				
			||||||
	newHashCh  chan []*blockAnnounce
 | 
					 | 
				
			||||||
	newBlockCh chan chan []*types.Block
 | 
					 | 
				
			||||||
	txsyncCh  chan *txsync
 | 
						txsyncCh  chan *txsync
 | 
				
			||||||
	quitSync  chan struct{}
 | 
						quitSync  chan struct{}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -69,30 +70,33 @@ type ProtocolManager struct {
 | 
				
			|||||||
// NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable
 | 
					// NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable
 | 
				
			||||||
// with the ethereum network.
 | 
					// with the ethereum network.
 | 
				
			||||||
func NewProtocolManager(protocolVersion, networkId int, mux *event.TypeMux, txpool txPool, chainman *core.ChainManager) *ProtocolManager {
 | 
					func NewProtocolManager(protocolVersion, networkId int, mux *event.TypeMux, txpool txPool, chainman *core.ChainManager) *ProtocolManager {
 | 
				
			||||||
 | 
						// Create the protocol manager and initialize peer handlers
 | 
				
			||||||
	manager := &ProtocolManager{
 | 
						manager := &ProtocolManager{
 | 
				
			||||||
		eventMux:  mux,
 | 
							eventMux:  mux,
 | 
				
			||||||
		txpool:    txpool,
 | 
							txpool:    txpool,
 | 
				
			||||||
		chainman:  chainman,
 | 
							chainman:  chainman,
 | 
				
			||||||
		peers:     newPeerSet(),
 | 
							peers:     newPeerSet(),
 | 
				
			||||||
		newPeerCh: make(chan *peer, 1),
 | 
							newPeerCh: make(chan *peer, 1),
 | 
				
			||||||
		newHashCh:  make(chan []*blockAnnounce, 1),
 | 
					 | 
				
			||||||
		newBlockCh: make(chan chan []*types.Block),
 | 
					 | 
				
			||||||
		txsyncCh:  make(chan *txsync),
 | 
							txsyncCh:  make(chan *txsync),
 | 
				
			||||||
		quitSync:  make(chan struct{}),
 | 
							quitSync:  make(chan struct{}),
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	manager.downloader = downloader.New(manager.eventMux, manager.chainman.HasBlock, manager.chainman.GetBlock, manager.chainman.InsertChain, manager.removePeer)
 | 
					 | 
				
			||||||
	manager.SubProtocol = p2p.Protocol{
 | 
						manager.SubProtocol = p2p.Protocol{
 | 
				
			||||||
		Name:    "eth",
 | 
							Name:    "eth",
 | 
				
			||||||
		Version: uint(protocolVersion),
 | 
							Version: uint(protocolVersion),
 | 
				
			||||||
		Length:  ProtocolLength,
 | 
							Length:  ProtocolLength,
 | 
				
			||||||
		Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
 | 
							Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
 | 
				
			||||||
			peer := manager.newPeer(protocolVersion, networkId, p, rw)
 | 
								peer := manager.newPeer(protocolVersion, networkId, p, rw)
 | 
				
			||||||
 | 
					 | 
				
			||||||
			manager.newPeerCh <- peer
 | 
								manager.newPeerCh <- peer
 | 
				
			||||||
 | 
					 | 
				
			||||||
			return manager.handle(peer)
 | 
								return manager.handle(peer)
 | 
				
			||||||
		},
 | 
							},
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
						// Construct the different synchronisation mechanisms
 | 
				
			||||||
 | 
						manager.downloader = downloader.New(manager.eventMux, manager.chainman.HasBlock, manager.chainman.GetBlock, manager.chainman.InsertChain, manager.removePeer)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						importer := func(peer string, block *types.Block) error {
 | 
				
			||||||
 | 
							return manager.importBlock(manager.peers.Peer(peer), block, nil)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						manager.fetcher = fetcher.New(manager.chainman.HasBlock, importer)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	return manager
 | 
						return manager
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
@@ -126,7 +130,6 @@ func (pm *ProtocolManager) Start() {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	// start sync handlers
 | 
						// start sync handlers
 | 
				
			||||||
	go pm.syncer()
 | 
						go pm.syncer()
 | 
				
			||||||
	go pm.fetcher()
 | 
					 | 
				
			||||||
	go pm.txsyncLoop()
 | 
						go pm.txsyncLoop()
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -291,21 +294,10 @@ func (self *ProtocolManager) handleMsg(p *peer) error {
 | 
				
			|||||||
			glog.V(logger.Detail).Infoln("Decode error", err)
 | 
								glog.V(logger.Detail).Infoln("Decode error", err)
 | 
				
			||||||
			blocks = nil
 | 
								blocks = nil
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		// Filter out any explicitly requested blocks (cascading select to get blocking back to peer)
 | 
							// Filter out any explicitly requested blocks, deliver the rest to the downloader
 | 
				
			||||||
		filter := make(chan []*types.Block)
 | 
							if blocks := self.fetcher.Filter(blocks); len(blocks) > 0 {
 | 
				
			||||||
		select {
 | 
					 | 
				
			||||||
		case <-self.quitSync:
 | 
					 | 
				
			||||||
		case self.newBlockCh <- filter:
 | 
					 | 
				
			||||||
			select {
 | 
					 | 
				
			||||||
			case <-self.quitSync:
 | 
					 | 
				
			||||||
			case filter <- blocks:
 | 
					 | 
				
			||||||
				select {
 | 
					 | 
				
			||||||
				case <-self.quitSync:
 | 
					 | 
				
			||||||
				case blocks := <-filter:
 | 
					 | 
				
			||||||
			self.downloader.DeliverBlocks(p.id, blocks)
 | 
								self.downloader.DeliverBlocks(p.id, blocks)
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
			}
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
	case NewBlockHashesMsg:
 | 
						case NewBlockHashesMsg:
 | 
				
			||||||
		// Retrieve and deseralize the remote new block hashes notification
 | 
							// Retrieve and deseralize the remote new block hashes notification
 | 
				
			||||||
@@ -327,19 +319,8 @@ func (self *ProtocolManager) handleMsg(p *peer) error {
 | 
				
			|||||||
				unknown = append(unknown, hash)
 | 
									unknown = append(unknown, hash)
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
		announces := make([]*blockAnnounce, len(unknown))
 | 
							for _, hash := range unknown {
 | 
				
			||||||
		for i, hash := range unknown {
 | 
								self.fetcher.Notify(p.id, hash, time.Now(), p.requestBlocks)
 | 
				
			||||||
			announces[i] = &blockAnnounce{
 | 
					 | 
				
			||||||
				hash: hash,
 | 
					 | 
				
			||||||
				peer: p,
 | 
					 | 
				
			||||||
				time: time.Now(),
 | 
					 | 
				
			||||||
			}
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
		if len(announces) > 0 {
 | 
					 | 
				
			||||||
			select {
 | 
					 | 
				
			||||||
			case self.newHashCh <- announces:
 | 
					 | 
				
			||||||
			case <-self.quitSync:
 | 
					 | 
				
			||||||
			}
 | 
					 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	case NewBlockMsg:
 | 
						case NewBlockMsg:
 | 
				
			||||||
 
 | 
				
			|||||||
							
								
								
									
										141
									
								
								eth/sync.go
									
									
									
									
									
								
							
							
						
						
									
										141
									
								
								eth/sync.go
									
									
									
									
									
								
							@@ -13,9 +13,6 @@ import (
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
const (
 | 
					const (
 | 
				
			||||||
	forceSyncCycle      = 10 * time.Second // Time interval to force syncs, even if few peers are available
 | 
						forceSyncCycle      = 10 * time.Second // Time interval to force syncs, even if few peers are available
 | 
				
			||||||
	notifyCheckCycle    = 100 * time.Millisecond // Time interval to allow hash notifies to fulfill before hard fetching
 | 
					 | 
				
			||||||
	notifyArriveTimeout = 500 * time.Millisecond // Time allowance before an announced block is explicitly requested
 | 
					 | 
				
			||||||
	notifyFetchTimeout  = 5 * time.Second        // Maximum alloted time to return an explicitly requested block
 | 
					 | 
				
			||||||
	minDesiredPeerCount = 5                // Amount of peers desired to start syncing
 | 
						minDesiredPeerCount = 5                // Amount of peers desired to start syncing
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// This is the target size for the packs of transactions sent by txsyncLoop.
 | 
						// This is the target size for the packs of transactions sent by txsyncLoop.
 | 
				
			||||||
@@ -119,140 +116,15 @@ func (pm *ProtocolManager) txsyncLoop() {
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// fetcher is responsible for collecting hash notifications, and periodically
 | 
					 | 
				
			||||||
// checking all unknown ones and individually fetching them.
 | 
					 | 
				
			||||||
func (pm *ProtocolManager) fetcher() {
 | 
					 | 
				
			||||||
	announces := make(map[common.Hash][]*blockAnnounce)
 | 
					 | 
				
			||||||
	request := make(map[*peer][]common.Hash)
 | 
					 | 
				
			||||||
	pending := make(map[common.Hash]*blockAnnounce)
 | 
					 | 
				
			||||||
	cycle := time.Tick(notifyCheckCycle)
 | 
					 | 
				
			||||||
	done := make(chan common.Hash)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
	// Iterate the block fetching until a quit is requested
 | 
					 | 
				
			||||||
	for {
 | 
					 | 
				
			||||||
		select {
 | 
					 | 
				
			||||||
		case notifications := <-pm.newHashCh:
 | 
					 | 
				
			||||||
			// A batch of hashes the notified, schedule them for retrieval
 | 
					 | 
				
			||||||
			glog.V(logger.Debug).Infof("Scheduling %d hash announcements from %s", len(notifications), notifications[0].peer.id)
 | 
					 | 
				
			||||||
			for _, announce := range notifications {
 | 
					 | 
				
			||||||
				// Skip if it's already pending fetch
 | 
					 | 
				
			||||||
				if _, ok := pending[announce.hash]; ok {
 | 
					 | 
				
			||||||
					continue
 | 
					 | 
				
			||||||
				}
 | 
					 | 
				
			||||||
				// Otherwise queue up the peer as a potential source
 | 
					 | 
				
			||||||
				announces[announce.hash] = append(announces[announce.hash], announce)
 | 
					 | 
				
			||||||
			}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
		case hash := <-done:
 | 
					 | 
				
			||||||
			// A pending import finished, remove all traces
 | 
					 | 
				
			||||||
			delete(pending, hash)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
		case <-cycle:
 | 
					 | 
				
			||||||
			// Clean up any expired block fetches
 | 
					 | 
				
			||||||
			for hash, announce := range pending {
 | 
					 | 
				
			||||||
				if time.Since(announce.time) > notifyFetchTimeout {
 | 
					 | 
				
			||||||
					delete(pending, hash)
 | 
					 | 
				
			||||||
				}
 | 
					 | 
				
			||||||
			}
 | 
					 | 
				
			||||||
			// Check if any notified blocks failed to arrive
 | 
					 | 
				
			||||||
			for hash, all := range announces {
 | 
					 | 
				
			||||||
				if time.Since(all[0].time) > notifyArriveTimeout {
 | 
					 | 
				
			||||||
					announce := all[rand.Intn(len(all))]
 | 
					 | 
				
			||||||
					if !pm.chainman.HasBlock(hash) {
 | 
					 | 
				
			||||||
						request[announce.peer] = append(request[announce.peer], hash)
 | 
					 | 
				
			||||||
						pending[hash] = announce
 | 
					 | 
				
			||||||
					}
 | 
					 | 
				
			||||||
					delete(announces, hash)
 | 
					 | 
				
			||||||
				}
 | 
					 | 
				
			||||||
			}
 | 
					 | 
				
			||||||
			if len(request) == 0 {
 | 
					 | 
				
			||||||
				break
 | 
					 | 
				
			||||||
			}
 | 
					 | 
				
			||||||
			// Send out all block requests
 | 
					 | 
				
			||||||
			for peer, hashes := range request {
 | 
					 | 
				
			||||||
				glog.V(logger.Debug).Infof("Explicitly fetching %d blocks from %s", len(hashes), peer.id)
 | 
					 | 
				
			||||||
				go peer.requestBlocks(hashes)
 | 
					 | 
				
			||||||
			}
 | 
					 | 
				
			||||||
			request = make(map[*peer][]common.Hash)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
		case filter := <-pm.newBlockCh:
 | 
					 | 
				
			||||||
			// Blocks arrived, extract any explicit fetches, return all else
 | 
					 | 
				
			||||||
			var blocks types.Blocks
 | 
					 | 
				
			||||||
			select {
 | 
					 | 
				
			||||||
			case blocks = <-filter:
 | 
					 | 
				
			||||||
			case <-pm.quitSync:
 | 
					 | 
				
			||||||
				return
 | 
					 | 
				
			||||||
			}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
			explicit, download := []*types.Block{}, []*types.Block{}
 | 
					 | 
				
			||||||
			for _, block := range blocks {
 | 
					 | 
				
			||||||
				hash := block.Hash()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
				// Filter explicitly requested blocks from hash announcements
 | 
					 | 
				
			||||||
				if _, ok := pending[hash]; ok {
 | 
					 | 
				
			||||||
					// Discard if already imported by other means
 | 
					 | 
				
			||||||
					if !pm.chainman.HasBlock(hash) {
 | 
					 | 
				
			||||||
						explicit = append(explicit, block)
 | 
					 | 
				
			||||||
					} else {
 | 
					 | 
				
			||||||
						delete(pending, hash)
 | 
					 | 
				
			||||||
					}
 | 
					 | 
				
			||||||
				} else {
 | 
					 | 
				
			||||||
					download = append(download, block)
 | 
					 | 
				
			||||||
				}
 | 
					 | 
				
			||||||
			}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
			select {
 | 
					 | 
				
			||||||
			case filter <- download:
 | 
					 | 
				
			||||||
			case <-pm.quitSync:
 | 
					 | 
				
			||||||
				return
 | 
					 | 
				
			||||||
			}
 | 
					 | 
				
			||||||
			// Create a closure with the retrieved blocks and origin peers
 | 
					 | 
				
			||||||
			peers := make([]*peer, 0, len(explicit))
 | 
					 | 
				
			||||||
			blocks = make([]*types.Block, 0, len(explicit))
 | 
					 | 
				
			||||||
			for _, block := range explicit {
 | 
					 | 
				
			||||||
				hash := block.Hash()
 | 
					 | 
				
			||||||
				if announce := pending[hash]; announce != nil {
 | 
					 | 
				
			||||||
					// Drop the block if it surely cannot fit
 | 
					 | 
				
			||||||
					if pm.chainman.HasBlock(hash) || !pm.chainman.HasBlock(block.ParentHash()) {
 | 
					 | 
				
			||||||
						// delete(pending, hash) // if we drop, it will re-fetch it, wait for timeout?
 | 
					 | 
				
			||||||
						continue
 | 
					 | 
				
			||||||
					}
 | 
					 | 
				
			||||||
					// Otherwise accumulate for import
 | 
					 | 
				
			||||||
					peers = append(peers, announce.peer)
 | 
					 | 
				
			||||||
					blocks = append(blocks, block)
 | 
					 | 
				
			||||||
				}
 | 
					 | 
				
			||||||
			}
 | 
					 | 
				
			||||||
			// If any explicit fetches were replied to, import them
 | 
					 | 
				
			||||||
			if count := len(blocks); count > 0 {
 | 
					 | 
				
			||||||
				glog.V(logger.Debug).Infof("Importing %d explicitly fetched blocks", len(blocks))
 | 
					 | 
				
			||||||
				go func() {
 | 
					 | 
				
			||||||
					// Make sure all hashes are cleaned up
 | 
					 | 
				
			||||||
					for _, block := range blocks {
 | 
					 | 
				
			||||||
						hash := block.Hash()
 | 
					 | 
				
			||||||
						defer func() { done <- hash }()
 | 
					 | 
				
			||||||
					}
 | 
					 | 
				
			||||||
					// Try and actually import the blocks
 | 
					 | 
				
			||||||
					for i := 0; i < len(blocks); i++ {
 | 
					 | 
				
			||||||
						if err := pm.importBlock(peers[i], blocks[i], nil); err != nil {
 | 
					 | 
				
			||||||
							glog.V(logger.Detail).Infof("Failed to import explicitly fetched block: %v", err)
 | 
					 | 
				
			||||||
							return
 | 
					 | 
				
			||||||
						}
 | 
					 | 
				
			||||||
					}
 | 
					 | 
				
			||||||
				}()
 | 
					 | 
				
			||||||
			}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
		case <-pm.quitSync:
 | 
					 | 
				
			||||||
			return
 | 
					 | 
				
			||||||
		}
 | 
					 | 
				
			||||||
	}
 | 
					 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
// syncer is responsible for periodically synchronising with the network, both
 | 
					// syncer is responsible for periodically synchronising with the network, both
 | 
				
			||||||
// downloading hashes and blocks as well as retrieving cached ones.
 | 
					// downloading hashes and blocks as well as handling the announcement handler.
 | 
				
			||||||
func (pm *ProtocolManager) syncer() {
 | 
					func (pm *ProtocolManager) syncer() {
 | 
				
			||||||
	// Abort any pending syncs if we terminate
 | 
						// Start and ensure cleanup of sync mechanisms
 | 
				
			||||||
 | 
						pm.fetcher.Start()
 | 
				
			||||||
 | 
						defer pm.fetcher.Stop()
 | 
				
			||||||
	defer pm.downloader.Terminate()
 | 
						defer pm.downloader.Terminate()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// Wait for different events to fire synchronisation operations
 | 
				
			||||||
	forceSync := time.Tick(forceSyncCycle)
 | 
						forceSync := time.Tick(forceSyncCycle)
 | 
				
			||||||
	for {
 | 
						for {
 | 
				
			||||||
		select {
 | 
							select {
 | 
				
			||||||
@@ -273,8 +145,7 @@ func (pm *ProtocolManager) syncer() {
 | 
				
			|||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// synchronise tries to sync up our local block chain with a remote peer, both
 | 
					// synchronise tries to sync up our local block chain with a remote peer.
 | 
				
			||||||
// adding various sanity checks as well as wrapping it with various log entries.
 | 
					 | 
				
			||||||
func (pm *ProtocolManager) synchronise(peer *peer) {
 | 
					func (pm *ProtocolManager) synchronise(peer *peer) {
 | 
				
			||||||
	// Short circuit if no peers are available
 | 
						// Short circuit if no peers are available
 | 
				
			||||||
	if peer == nil {
 | 
						if peer == nil {
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user