Merge branch 'develop' into downloader-proto
Conflicts: eth/downloader/downloader.go
This commit is contained in:
@ -1,6 +1,7 @@
|
||||
package downloader
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"math"
|
||||
"math/big"
|
||||
"sync"
|
||||
@ -20,6 +21,12 @@ const (
|
||||
minDesiredPeerCount = 3 // Amount of peers desired to start syncing
|
||||
)
|
||||
|
||||
var (
|
||||
errLowTd = errors.New("peer's TD is too low")
|
||||
errBusy = errors.New("busy")
|
||||
errUnknownPeer = errors.New("peer's unknown or unhealthy")
|
||||
)
|
||||
|
||||
type hashCheckFn func(common.Hash) bool
|
||||
type chainInsertFn func(types.Blocks) error
|
||||
type hashIterFn func() (common.Hash, error)
|
||||
@ -82,18 +89,19 @@ func (d *Downloader) RegisterPeer(id string, td *big.Int, hash common.Hash, getH
|
||||
d.mu.Lock()
|
||||
defer d.mu.Unlock()
|
||||
|
||||
glog.V(logger.Detail).Infoln("Register peer", id)
|
||||
glog.V(logger.Detail).Infoln("Register peer", id, "TD =", td)
|
||||
|
||||
// Create a new peer and add it to the list of known peers
|
||||
peer := newPeer(id, td, hash, getHashes, getBlocks)
|
||||
// add peer to our peer set
|
||||
d.peers[id] = peer
|
||||
// broadcast new peer
|
||||
d.newPeerCh <- peer
|
||||
//d.newPeerCh <- peer
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// UnregisterPeer unregister's a peer. This will prevent any action from the specified peer.
|
||||
func (d *Downloader) UnregisterPeer(id string) {
|
||||
d.mu.Lock()
|
||||
defer d.mu.Unlock()
|
||||
@ -103,6 +111,73 @@ func (d *Downloader) UnregisterPeer(id string) {
|
||||
delete(d.peers, id)
|
||||
}
|
||||
|
||||
// SynchroniseWithPeer will select the peer and use it for synchronising. If an empty string is given
|
||||
// it will use the best peer possible and synchronise if it's TD is higher than our own. If any of the
|
||||
// checks fail an error will be returned. This method is synchronous
|
||||
func (d *Downloader) SynchroniseWithPeer(id string) (types.Blocks, error) {
|
||||
// Check if we're busy
|
||||
if d.isFetchingHashes() || d.isDownloadingBlocks() || d.isProcessing() {
|
||||
return nil, errBusy
|
||||
}
|
||||
|
||||
// Attempt to select a peer. This can either be nothing, which returns, best peer
|
||||
// or selected peer. If no peer could be found an error will be returned
|
||||
var p *peer
|
||||
if len(id) == 0 {
|
||||
p = d.peers[id]
|
||||
if p == nil {
|
||||
return nil, errUnknownPeer
|
||||
}
|
||||
} else {
|
||||
p = d.peers.bestPeer()
|
||||
}
|
||||
|
||||
// Make sure our td is lower than the peer's td
|
||||
if p.td.Cmp(d.currentTd()) <= 0 || d.hasBlock(p.recentHash) {
|
||||
return nil, errLowTd
|
||||
}
|
||||
|
||||
// Get the hash from the peer and initiate the downloading progress.
|
||||
err := d.getFromPeer(p, p.recentHash, false)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return d.queue.blocks, nil
|
||||
}
|
||||
|
||||
// Synchronise will synchronise using the best peer.
|
||||
func (d *Downloader) Synchronise() (types.Blocks, error) {
|
||||
return d.SynchroniseWithPeer("")
|
||||
}
|
||||
|
||||
func (d *Downloader) getFromPeer(p *peer, hash common.Hash, ignoreInitial bool) error {
|
||||
glog.V(logger.Detail).Infoln("Synchronising with the network using:", p.id)
|
||||
// Start the fetcher. This will block the update entirely
|
||||
// interupts need to be send to the appropriate channels
|
||||
// respectively.
|
||||
if err := d.startFetchingHashes(p, hash, ignoreInitial); err != nil {
|
||||
// handle error
|
||||
glog.V(logger.Debug).Infoln("Error fetching hashes:", err)
|
||||
// XXX Reset
|
||||
return err
|
||||
}
|
||||
|
||||
// Start fetching blocks in paralel. The strategy is simple
|
||||
// take any available peers, seserve a chunk for each peer available,
|
||||
// let the peer deliver the chunkn and periodically check if a peer
|
||||
// has timedout. When done downloading, process blocks.
|
||||
if err := d.startFetchingBlocks(p); err != nil {
|
||||
glog.V(logger.Debug).Infoln("Error downloading blocks:", err)
|
||||
// XXX reset
|
||||
return err
|
||||
}
|
||||
|
||||
glog.V(logger.Detail).Infoln("Sync completed")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *Downloader) peerHandler() {
|
||||
// itimer is used to determine when to start ignoring `minDesiredPeerCount`
|
||||
//itimer := time.NewTicker(5 * time.Second)
|
||||
@ -116,11 +191,18 @@ out:
|
||||
if len(d.peers) < minDesiredPeerCount {
|
||||
break
|
||||
}
|
||||
|
||||
d.selectPeer(d.peers.bestPeer())
|
||||
case <-itimer.C:
|
||||
// The timer will make sure that the downloader keeps an active state
|
||||
// in which it attempts to always check the network for highest td peers
|
||||
d.selectPeer(d.peers.bestPeer())
|
||||
// Either select the peer or restart the timer if no peers could
|
||||
// be selected.
|
||||
if peer := d.peers.bestPeer(); peer != nil {
|
||||
d.selectPeer(d.peers.bestPeer())
|
||||
} else {
|
||||
itimer.Reset(5 * time.Second)
|
||||
}
|
||||
case <-d.quit:
|
||||
break out
|
||||
}
|
||||
@ -142,6 +224,7 @@ func (d *Downloader) selectPeer(p *peer) {
|
||||
glog.V(logger.Detail).Infoln("New peer with highest TD =", p.td)
|
||||
d.syncCh <- syncPack{p, p.recentHash, false}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func (d *Downloader) update() {
|
||||
@ -245,8 +328,13 @@ out:
|
||||
for {
|
||||
select {
|
||||
case blockPack := <-d.blockCh:
|
||||
d.queue.deliver(blockPack.peerId, blockPack.blocks)
|
||||
d.peers.setState(blockPack.peerId, idleState)
|
||||
// If the peer was previously banned and failed to deliver it's pack
|
||||
// in a reasonable time frame, ignore it's message.
|
||||
if d.peers[blockPack.peerId] != nil {
|
||||
d.peers[blockPack.peerId].promote()
|
||||
d.queue.deliver(blockPack.peerId, blockPack.blocks)
|
||||
d.peers.setState(blockPack.peerId, idleState)
|
||||
}
|
||||
case <-ticker.C:
|
||||
// If there are unrequested hashes left start fetching
|
||||
// from the available peers.
|
||||
@ -310,6 +398,9 @@ out:
|
||||
// 2) Measure their speed;
|
||||
// 3) Amount and availability.
|
||||
d.queue.deliver(pid, nil)
|
||||
if peer := d.peers[pid]; peer != nil {
|
||||
peer.demote()
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@ -343,6 +434,7 @@ func (d *Downloader) AddBlock(id string, block *types.Block, td *big.Int) {
|
||||
peer.td = td
|
||||
peer.recentHash = block.Hash()
|
||||
peer.mu.Unlock()
|
||||
peer.promote()
|
||||
|
||||
glog.V(logger.Detail).Infoln("Inserting new block from:", id)
|
||||
d.queue.addBlock(id, block, td)
|
||||
|
Reference in New Issue
Block a user