eth, eth/downloader: move peer removal into downloader

This commit is contained in:
Péter Szilágyi
2015-06-11 15:56:08 +03:00
parent 6f5c6150b7
commit 66d3dc8690
6 changed files with 83 additions and 82 deletions

View File

@ -32,28 +32,32 @@ var (
var (
errLowTd = errors.New("peers TD is too low")
ErrBusy = errors.New("busy")
errBusy = errors.New("busy")
errUnknownPeer = errors.New("peer is unknown or unhealthy")
ErrBadPeer = errors.New("action from bad peer ignored")
ErrStallingPeer = errors.New("peer is stalling")
errBadPeer = errors.New("action from bad peer ignored")
errStallingPeer = errors.New("peer is stalling")
errBannedHead = errors.New("peer head hash already banned")
errNoPeers = errors.New("no peers to keep download active")
ErrPendingQueue = errors.New("pending items in queue")
ErrTimeout = errors.New("timeout")
ErrEmptyHashSet = errors.New("empty hash set by peer")
errPendingQueue = errors.New("pending items in queue")
errTimeout = errors.New("timeout")
errEmptyHashSet = errors.New("empty hash set by peer")
errPeersUnavailable = errors.New("no peers available or all peers tried for block download process")
errAlreadyInPool = errors.New("hash already in pool")
ErrInvalidChain = errors.New("retrieved hash chain is invalid")
ErrCrossCheckFailed = errors.New("block cross-check failed")
errInvalidChain = errors.New("retrieved hash chain is invalid")
errCrossCheckFailed = errors.New("block cross-check failed")
errCancelHashFetch = errors.New("hash fetching cancelled (requested)")
errCancelBlockFetch = errors.New("block downloading cancelled (requested)")
errNoSyncActive = errors.New("no sync active")
)
// hashCheckFn is a callback type for verifying a hash's presence in the local chain.
type hashCheckFn func(common.Hash) bool
type getBlockFn func(common.Hash) *types.Block
type chainInsertFn func(types.Blocks) (int, error)
type hashIterFn func() (common.Hash, error)
// blockRetrievalFn is a callback type for retrieving a block from the local chain.
type blockRetrievalFn func(common.Hash) *types.Block
// peerDropFn is a callback type for dropping a peer detected as malicious.
type peerDropFn func(id string)
type blockPack struct {
peerId string
@ -85,8 +89,9 @@ type Downloader struct {
importLock sync.Mutex
// Callbacks
hasBlock hashCheckFn
getBlock getBlockFn
hasBlock hashCheckFn // Checks if a block is present in the chain
getBlock blockRetrievalFn // Retrieves a block from the chain
dropPeer peerDropFn // Retrieved the TD of our own chain
// Status
synchronising int32
@ -107,7 +112,8 @@ type Block struct {
OriginPeer string
}
func New(mux *event.TypeMux, hasBlock hashCheckFn, getBlock getBlockFn) *Downloader {
// New creates a new downloader to fetch hashes and blocks from remote peers.
func New(mux *event.TypeMux, hasBlock hashCheckFn, getBlock blockRetrievalFn, dropPeer peerDropFn) *Downloader {
// Create the base downloader
downloader := &Downloader{
mux: mux,
@ -115,6 +121,7 @@ func New(mux *event.TypeMux, hasBlock hashCheckFn, getBlock getBlockFn) *Downloa
peers: newPeerSet(),
hasBlock: hasBlock,
getBlock: getBlock,
dropPeer: dropPeer,
newPeerCh: make(chan *peer, 1),
hashCh: make(chan hashPack, 1),
blockCh: make(chan blockPack, 1),
@ -183,19 +190,43 @@ func (d *Downloader) UnregisterPeer(id string) error {
return nil
}
// Synchronise will select the peer and use it for synchronising. If an empty string is given
// Synchronise tries to sync up our local block chain with a remote peer, both
// adding various sanity checks as well as wrapping it with various log entries.
func (d *Downloader) Synchronise(id string, head common.Hash) {
glog.V(logger.Detail).Infof("Attempting synchronisation: %v, 0x%x", id, head)
switch err := d.synchronise(id, head); err {
case nil:
glog.V(logger.Detail).Infof("Synchronisation completed")
case errBusy:
glog.V(logger.Detail).Infof("Synchronisation already in progress")
case errTimeout, errBadPeer, errEmptyHashSet, errInvalidChain, errCrossCheckFailed:
glog.V(logger.Debug).Infof("Removing peer %v: %v", id, err)
d.dropPeer(id)
case errPendingQueue:
glog.V(logger.Debug).Infoln("Synchronisation aborted:", err)
default:
glog.V(logger.Warn).Infof("Synchronisation failed: %v", err)
}
}
// synchronise will select the peer and use it for synchronising. If an empty string is given
// it will use the best peer possible and synchronize if it's TD is higher than our own. If any of the
// checks fail an error will be returned. This method is synchronous
func (d *Downloader) Synchronise(id string, hash common.Hash) error {
func (d *Downloader) synchronise(id string, hash common.Hash) error {
// Make sure only one goroutine is ever allowed past this point at once
if !atomic.CompareAndSwapInt32(&d.synchronising, 0, 1) {
return ErrBusy
return errBusy
}
defer atomic.StoreInt32(&d.synchronising, 0)
// If the head hash is banned, terminate immediately
if d.banned.Has(hash) {
return ErrInvalidChain
return errInvalidChain
}
// Post a user notification of the sync (only once per session)
if atomic.CompareAndSwapInt32(&d.notified, 0, 1) {
@ -209,7 +240,7 @@ func (d *Downloader) Synchronise(id string, hash common.Hash) error {
// Abort if the queue still contains some leftover data
if _, cached := d.queue.Size(); cached > 0 && d.queue.GetHeadBlock() != nil {
return ErrPendingQueue
return errPendingQueue
}
// Reset the queue and peer set to clean any internal leftover state
d.queue.Reset()
@ -342,7 +373,7 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error {
// Make sure the peer actually gave something valid
if len(hashPack.hashes) == 0 {
glog.V(logger.Debug).Infof("Peer (%s) responded with empty hash set", active.id)
return ErrEmptyHashSet
return errEmptyHashSet
}
for index, hash := range hashPack.hashes {
if d.banned.Has(hash) {
@ -352,7 +383,7 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error {
if err := d.banBlocks(active.id, hash); err != nil {
glog.V(logger.Debug).Infof("Failed to ban batch of blocks: %v", err)
}
return ErrInvalidChain
return errInvalidChain
}
}
// Determine if we're done fetching hashes (queue up all pending), and continue if not done
@ -369,12 +400,12 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error {
inserts := d.queue.Insert(hashPack.hashes)
if len(inserts) == 0 && !done {
glog.V(logger.Debug).Infof("Peer (%s) responded with stale hashes", active.id)
return ErrBadPeer
return errBadPeer
}
if !done {
// Check that the peer is not stalling the sync
if len(inserts) < MinHashFetch {
return ErrStallingPeer
return errStallingPeer
}
// Try and fetch a random block to verify the hash batch
// Skip the last hash as the cross check races with the next hash fetch
@ -408,7 +439,7 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error {
block := blockPack.blocks[0]
if check, ok := d.checks[block.Hash()]; ok {
if block.ParentHash() != check.parent {
return ErrCrossCheckFailed
return errCrossCheckFailed
}
delete(d.checks, block.Hash())
}
@ -418,7 +449,7 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error {
for hash, check := range d.checks {
if time.Now().After(check.expire) {
glog.V(logger.Debug).Infof("Cross check timeout for %x", hash)
return ErrCrossCheckFailed
return errCrossCheckFailed
}
}
@ -438,7 +469,7 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error {
// if all peers have been tried, abort the process entirely or if the hash is
// the zero hash.
if p == nil || (head == common.Hash{}) {
return ErrTimeout
return errTimeout
}
// set p to the active peer. this will invalidate any hashes that may be returned
// by our previous (delayed) peer.
@ -500,7 +531,7 @@ out:
peer.SetIdle()
glog.V(logger.Detail).Infof("%s: delivered %d blocks", peer, len(blockPack.blocks))
case ErrInvalidChain:
case errInvalidChain:
// The hash chain is invalid (blocks are not ordered properly), abort
return err
@ -617,7 +648,7 @@ func (d *Downloader) banBlocks(peerId string, head common.Hash) error {
return errCancelBlockFetch
case <-timeout:
return ErrTimeout
return errTimeout
case <-d.hashCh:
// Out of bounds hashes received, ignore them