Merge branch 'develop' into downloader-proto
This commit is contained in:
@ -4,6 +4,7 @@ import (
|
||||
"crypto/ecdsa"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"math"
|
||||
"path"
|
||||
"strings"
|
||||
|
||||
@ -43,6 +44,9 @@ type Config struct {
|
||||
ProtocolVersion int
|
||||
NetworkId int
|
||||
|
||||
BlockChainVersion int
|
||||
SkipBcVersionCheck bool // e.g. blockchain export
|
||||
|
||||
DataDir string
|
||||
LogFile string
|
||||
LogLevel int
|
||||
@ -151,7 +155,7 @@ type Ethereum struct {
|
||||
}
|
||||
|
||||
func New(config *Config) (*Ethereum, error) {
|
||||
// Boostrap database
|
||||
// Bootstrap database
|
||||
logger.New(config.DataDir, config.LogFile, config.LogLevel)
|
||||
if len(config.LogJSON) > 0 {
|
||||
logger.NewJSONsystem(config.DataDir, config.LogJSON)
|
||||
@ -181,6 +185,16 @@ func New(config *Config) (*Ethereum, error) {
|
||||
saveProtocolVersion(blockDb, config.ProtocolVersion)
|
||||
glog.V(logger.Info).Infof("Protocol Version: %v, Network Id: %v", config.ProtocolVersion, config.NetworkId)
|
||||
|
||||
if !config.SkipBcVersionCheck {
|
||||
b, _ := blockDb.Get([]byte("BlockchainVersion"))
|
||||
bcVersion := int(common.NewValue(b).Uint())
|
||||
if bcVersion != config.BlockChainVersion && bcVersion != 0 {
|
||||
return nil, fmt.Errorf("Blockchain DB version mismatch (%d / %d). Run geth upgradedb.\n", bcVersion, config.BlockChainVersion)
|
||||
}
|
||||
saveBlockchainVersion(blockDb, config.BlockChainVersion)
|
||||
}
|
||||
glog.V(logger.Info).Infof("Blockchain DB Version: %d", config.BlockChainVersion)
|
||||
|
||||
eth := &Ethereum{
|
||||
shutdownChan: make(chan bool),
|
||||
blockDb: blockDb,
|
||||
@ -439,7 +453,7 @@ func (self *Ethereum) txBroadcastLoop() {
|
||||
// automatically stops if unsubscribe
|
||||
for obj := range self.txSub.Chan() {
|
||||
event := obj.(core.TxPreEvent)
|
||||
self.net.Broadcast("eth", TxMsg, []*types.Transaction{event.Tx})
|
||||
self.net.BroadcastLimited("eth", TxMsg, math.Sqrt, []*types.Transaction{event.Tx})
|
||||
self.syncAccounts(event.Tx)
|
||||
}
|
||||
}
|
||||
@ -463,7 +477,7 @@ func (self *Ethereum) blockBroadcastLoop() {
|
||||
for obj := range self.blockSub.Chan() {
|
||||
switch ev := obj.(type) {
|
||||
case core.ChainHeadEvent:
|
||||
self.net.Broadcast("eth", NewBlockMsg, []interface{}{ev.Block, ev.Block.Td})
|
||||
self.net.BroadcastLimited("eth", NewBlockMsg, math.Sqrt, []interface{}{ev.Block, ev.Block.Td})
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -476,3 +490,12 @@ func saveProtocolVersion(db common.Database, protov int) {
|
||||
db.Put([]byte("ProtocolVersion"), common.NewValue(protov).Bytes())
|
||||
}
|
||||
}
|
||||
|
||||
func saveBlockchainVersion(db common.Database, bcVersion int) {
|
||||
d, _ := db.Get([]byte("BlockchainVersion"))
|
||||
blockchainVersion := common.NewValue(d).Uint()
|
||||
|
||||
if blockchainVersion == 0 {
|
||||
db.Put([]byte("BlockchainVersion"), common.NewValue(bcVersion).Bytes())
|
||||
}
|
||||
}
|
||||
|
@ -54,8 +54,9 @@ type blockPack struct {
|
||||
}
|
||||
|
||||
type syncPack struct {
|
||||
peer *peer
|
||||
hash common.Hash
|
||||
peer *peer
|
||||
hash common.Hash
|
||||
ignoreInitial bool
|
||||
}
|
||||
|
||||
func New(hasBlock hashCheckFn, insertChain chainInsertFn, currentTd currentTdFn) *Downloader {
|
||||
@ -104,11 +105,13 @@ func (d *Downloader) UnregisterPeer(id string) {
|
||||
|
||||
func (d *Downloader) peerHandler() {
|
||||
// itimer is used to determine when to start ignoring `minDesiredPeerCount`
|
||||
itimer := time.NewTicker(5 * time.Second)
|
||||
//itimer := time.NewTicker(5 * time.Second)
|
||||
itimer := time.NewTimer(5 * time.Second)
|
||||
out:
|
||||
for {
|
||||
select {
|
||||
case <-d.newPeerCh:
|
||||
itimer.Stop()
|
||||
// Meet the `minDesiredPeerCount` before we select our best peer
|
||||
if len(d.peers) < minDesiredPeerCount {
|
||||
break
|
||||
@ -137,7 +140,7 @@ func (d *Downloader) selectPeer(p *peer) {
|
||||
}
|
||||
|
||||
glog.V(logger.Detail).Infoln("New peer with highest TD =", p.td)
|
||||
d.syncCh <- syncPack{p, p.recentHash}
|
||||
d.syncCh <- syncPack{p, p.recentHash, false}
|
||||
}
|
||||
}
|
||||
|
||||
@ -147,11 +150,11 @@ out:
|
||||
select {
|
||||
case sync := <-d.syncCh:
|
||||
selectedPeer := sync.peer
|
||||
glog.V(logger.Detail).Infoln("Synchronising with network using:", selectedPeer.id)
|
||||
glog.V(logger.Detail).Infoln("Synchronising with the network using:", selectedPeer.id)
|
||||
// Start the fetcher. This will block the update entirely
|
||||
// interupts need to be send to the appropriate channels
|
||||
// respectively.
|
||||
if err := d.startFetchingHashes(selectedPeer, sync.hash); err != nil {
|
||||
if err := d.startFetchingHashes(selectedPeer, sync.hash, sync.ignoreInitial); err != nil {
|
||||
// handle error
|
||||
glog.V(logger.Debug).Infoln("Error fetching hashes:", err)
|
||||
// XXX Reset
|
||||
@ -178,11 +181,18 @@ out:
|
||||
}
|
||||
|
||||
// XXX Make synchronous
|
||||
func (d *Downloader) startFetchingHashes(p *peer, hash common.Hash) error {
|
||||
glog.V(logger.Debug).Infoln("Downloading hashes")
|
||||
func (d *Downloader) startFetchingHashes(p *peer, hash common.Hash, ignoreInitial bool) error {
|
||||
glog.V(logger.Debug).Infof("Downloading hashes (%x) from %s", hash.Bytes()[:4], p.id)
|
||||
|
||||
start := time.Now()
|
||||
|
||||
// We ignore the initial hash in some cases (e.g. we received a block without it's parent)
|
||||
// In such circumstances we don't need to download the block so don't add it to the queue.
|
||||
if !ignoreInitial {
|
||||
// Add the hash to the queue first
|
||||
d.queue.hashPool.Add(hash)
|
||||
}
|
||||
|
||||
// Get the first batch of hashes
|
||||
p.getHashes(hash)
|
||||
atomic.StoreInt32(&d.fetchingHashes, 1)
|
||||
@ -195,7 +205,7 @@ out:
|
||||
hashSet := set.New()
|
||||
for _, hash := range hashes {
|
||||
if d.hasBlock(hash) {
|
||||
glog.V(logger.Debug).Infof("Found common hash %x\n", hash)
|
||||
glog.V(logger.Debug).Infof("Found common hash %x\n", hash[:4])
|
||||
|
||||
done = true
|
||||
break
|
||||
@ -207,7 +217,7 @@ out:
|
||||
|
||||
// Add hashes to the chunk set
|
||||
// Check if we're done fetching
|
||||
if !done {
|
||||
if !done && len(hashes) > 0 {
|
||||
//fmt.Println("re-fetch. current =", d.queue.hashPool.Size())
|
||||
// Get the next set of hashes
|
||||
p.getHashes(hashes[len(hashes)-1])
|
||||
@ -218,7 +228,7 @@ out:
|
||||
}
|
||||
}
|
||||
}
|
||||
glog.V(logger.Detail).Infoln("Download hashes: done. Took", time.Since(start))
|
||||
glog.V(logger.Detail).Infof("Downloaded hashes (%d). Took %v\n", d.queue.hashPool.Size(), time.Since(start))
|
||||
|
||||
return nil
|
||||
}
|
||||
@ -242,6 +252,10 @@ out:
|
||||
// from the available peers.
|
||||
if d.queue.hashPool.Size() > 0 {
|
||||
availablePeers := d.peers.get(idleState)
|
||||
if len(availablePeers) == 0 {
|
||||
glog.V(logger.Detail).Infoln("No peers available out of", len(d.peers))
|
||||
}
|
||||
|
||||
for _, peer := range availablePeers {
|
||||
// Get a possible chunk. If nil is returned no chunk
|
||||
// could be returned due to no hashes available.
|
||||
@ -317,21 +331,33 @@ func (d *Downloader) AddBlock(id string, block *types.Block, td *big.Int) {
|
||||
return
|
||||
}
|
||||
|
||||
peer := d.peers.getPeer(id)
|
||||
// if the peer is in our healthy list of peers; update the td
|
||||
// and add the block. Otherwise just ignore it
|
||||
if peer == nil {
|
||||
glog.V(logger.Detail).Infof("Ignored block from bad peer %s\n", id)
|
||||
return
|
||||
}
|
||||
|
||||
peer.mu.Lock()
|
||||
peer.td = td
|
||||
peer.recentHash = block.Hash()
|
||||
peer.mu.Unlock()
|
||||
|
||||
glog.V(logger.Detail).Infoln("Inserting new block from:", id)
|
||||
d.queue.addBlock(id, block, td)
|
||||
|
||||
// if the peer is in our healthy list of peers; update the td
|
||||
// here is a good chance to add the peer back to the list
|
||||
if peer := d.peers.getPeer(id); peer != nil {
|
||||
peer.mu.Lock()
|
||||
peer.td = td
|
||||
peer.recentHash = block.Hash()
|
||||
peer.mu.Unlock()
|
||||
}
|
||||
|
||||
// if neither go ahead to process
|
||||
if !(d.isFetchingHashes() || d.isDownloadingBlocks()) {
|
||||
d.process()
|
||||
// Check if the parent of the received block is known.
|
||||
// If the block is not know, request it otherwise, request.
|
||||
phash := block.ParentHash()
|
||||
if !d.hasBlock(phash) {
|
||||
glog.V(logger.Detail).Infof("Missing parent %x, requires fetching\n", phash.Bytes()[:4])
|
||||
d.syncCh <- syncPack{peer, peer.recentHash, true}
|
||||
} else {
|
||||
d.process()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -369,7 +395,7 @@ func (d *Downloader) process() error {
|
||||
// TODO change this. This shite
|
||||
for i, block := range blocks[:max] {
|
||||
if !d.hasBlock(block.ParentHash()) {
|
||||
d.syncCh <- syncPack{d.peers.bestPeer(), block.Hash()}
|
||||
d.syncCh <- syncPack{d.peers.bestPeer(), block.Hash(), true}
|
||||
// remove processed blocks
|
||||
blocks = blocks[i:]
|
||||
|
||||
|
@ -324,7 +324,7 @@ func (self *ethProtocol) handle() error {
|
||||
// to simplify backend interface adding a new block
|
||||
// uses AddPeer followed by AddBlock only if peer is the best peer
|
||||
// (or selected as new best peer)
|
||||
if best, _ := self.blockPool.AddPeer(request.TD, hash, self.id, self.requestBlockHashes, self.requestBlocks, self.protoErrorDisconnect); best {
|
||||
if _, suspended := self.blockPool.AddPeer(request.TD, hash, self.id, self.requestBlockHashes, self.requestBlocks, self.protoErrorDisconnect); !suspended {
|
||||
self.blockPool.AddBlock(request.Block, self.id)
|
||||
}
|
||||
|
||||
@ -415,11 +415,9 @@ func (self *ethProtocol) sendStatus() error {
|
||||
}
|
||||
|
||||
func (self *ethProtocol) protoErrorDisconnect(err *errs.Error) {
|
||||
//err.Log(self.peer.Logger)
|
||||
err.Log(glog.V(logger.Info))
|
||||
/*
|
||||
if err.Fatal() {
|
||||
self.peer.Disconnect(p2p.DiscSubprotocolError)
|
||||
}
|
||||
*/
|
||||
if err.Fatal() {
|
||||
self.peer.Disconnect(p2p.DiscSubprotocolError)
|
||||
}
|
||||
|
||||
}
|
||||
|
Reference in New Issue
Block a user