updated blockpool
This commit is contained in:
@@ -1,12 +1,12 @@
|
||||
package blockpool
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"math/big"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/core/types"
|
||||
"github.com/ethereum/go-ethereum/errs"
|
||||
ethlogger "github.com/ethereum/go-ethereum/logger"
|
||||
@@ -101,7 +101,7 @@ func (self *Config) init() {
|
||||
// node is the basic unit of the internal model of block chain/tree in the blockpool
|
||||
type node struct {
|
||||
lock sync.RWMutex
|
||||
hash []byte
|
||||
hash common.Hash
|
||||
block *types.Block
|
||||
hashBy string
|
||||
blockBy string
|
||||
@@ -123,7 +123,7 @@ type BlockPool struct {
|
||||
Config *Config
|
||||
|
||||
// the minimal interface with blockchain
|
||||
hasBlock func(hash []byte) bool
|
||||
hasBlock func(hash common.Hash) bool
|
||||
insertChain func(types.Blocks) error
|
||||
verifyPoW func(pow.Block) bool
|
||||
|
||||
@@ -133,7 +133,7 @@ type BlockPool struct {
|
||||
lock sync.RWMutex
|
||||
chainLock sync.RWMutex
|
||||
// alloc-easy pool of hash slices
|
||||
hashSlicePool chan [][]byte
|
||||
hashSlicePool chan []common.Hash
|
||||
|
||||
status *status
|
||||
|
||||
@@ -144,7 +144,7 @@ type BlockPool struct {
|
||||
|
||||
// public constructor
|
||||
func New(
|
||||
hasBlock func(hash []byte) bool,
|
||||
hasBlock func(hash common.Hash) bool,
|
||||
insertChain func(types.Blocks) error,
|
||||
verifyPoW func(pow.Block) bool,
|
||||
) *BlockPool {
|
||||
@@ -176,7 +176,7 @@ func (self *BlockPool) Start() {
|
||||
}
|
||||
|
||||
self.Config.init()
|
||||
self.hashSlicePool = make(chan [][]byte, 150)
|
||||
self.hashSlicePool = make(chan []common.Hash, 150)
|
||||
self.status = newStatus()
|
||||
self.quit = make(chan bool)
|
||||
self.pool = make(map[string]*entry)
|
||||
@@ -261,14 +261,13 @@ Peer info is currently not persisted across disconnects (or sessions)
|
||||
*/
|
||||
func (self *BlockPool) AddPeer(
|
||||
|
||||
td *big.Int, currentBlockHash []byte,
|
||||
td *big.Int, currentBlockHash common.Hash,
|
||||
peerId string,
|
||||
requestBlockHashes func([]byte) error,
|
||||
requestBlocks func([][]byte) error,
|
||||
requestBlockHashes func(common.Hash) error,
|
||||
requestBlocks func([]common.Hash) error,
|
||||
peerError func(*errs.Error),
|
||||
|
||||
) (best bool) {
|
||||
|
||||
return self.peers.addPeer(td, currentBlockHash, peerId, requestBlockHashes, requestBlocks, peerError)
|
||||
}
|
||||
|
||||
@@ -289,7 +288,7 @@ launches all block request processes on each chain section
|
||||
|
||||
the first argument is an iterator function. Using this block hashes are decoded from the rlp message payload on demand. As a result, AddBlockHashes needs to run synchronously for one peer since the message is discarded if the caller thread returns.
|
||||
*/
|
||||
func (self *BlockPool) AddBlockHashes(next func() ([]byte, bool), peerId string) {
|
||||
func (self *BlockPool) AddBlockHashes(next func() (common.Hash, bool), peerId string) {
|
||||
|
||||
bestpeer, best := self.peers.getPeer(peerId)
|
||||
if !best {
|
||||
@@ -306,7 +305,7 @@ func (self *BlockPool) AddBlockHashes(next func() ([]byte, bool), peerId string)
|
||||
self.status.lock.Unlock()
|
||||
|
||||
var n int
|
||||
var hash []byte
|
||||
var hash common.Hash
|
||||
var ok, headSection, peerswitch bool
|
||||
var sec, child, parent *section
|
||||
var entry *entry
|
||||
@@ -318,7 +317,7 @@ func (self *BlockPool) AddBlockHashes(next func() ([]byte, bool), peerId string)
|
||||
plog.Debugf("AddBlockHashes: peer <%s> starting from [%s] (peer head: %s)", peerId, hex(bestpeer.parentHash), hex(bestpeer.currentBlockHash))
|
||||
|
||||
// first check if we are building the head section of a peer's chain
|
||||
if bytes.Equal(bestpeer.parentHash, hash) {
|
||||
if bestpeer.parentHash == hash {
|
||||
if self.hasBlock(bestpeer.currentBlockHash) {
|
||||
return
|
||||
}
|
||||
@@ -561,7 +560,7 @@ func (self *BlockPool) AddBlock(block *types.Block, peerId string) {
|
||||
entry := self.get(hash)
|
||||
|
||||
// a peer's current head block is appearing the first time
|
||||
if bytes.Equal(hash, sender.currentBlockHash) {
|
||||
if hash == sender.currentBlockHash {
|
||||
if sender.currentBlock == nil {
|
||||
plog.Debugf("AddBlock: add head block %s for peer <%s> (head: %s)", hex(hash), peerId, hex(sender.currentBlockHash))
|
||||
sender.setChainInfoFromBlock(block)
|
||||
@@ -664,7 +663,7 @@ LOOP:
|
||||
plog.DebugDetailf("activateChain: section [%s] activated by peer <%s>", sectionhex(sec), p.id)
|
||||
sec.activate(p)
|
||||
if i > 0 && connected != nil {
|
||||
connected[string(sec.top.hash)] = sec
|
||||
connected[sec.top.hash.Str()] = sec
|
||||
}
|
||||
/*
|
||||
we need to relink both complete and incomplete sections
|
||||
@@ -696,7 +695,7 @@ LOOP:
|
||||
|
||||
// must run in separate go routine, otherwise
|
||||
// switchpeer -> activateChain -> activate deadlocks on section process select and peers.lock
|
||||
func (self *BlockPool) requestBlocks(attempts int, hashes [][]byte) {
|
||||
func (self *BlockPool) requestBlocks(attempts int, hashes []common.Hash) {
|
||||
self.wg.Add(1)
|
||||
go func() {
|
||||
self.peers.requestBlocks(attempts, hashes)
|
||||
@@ -718,16 +717,16 @@ func (self *BlockPool) getChild(sec *section) *section {
|
||||
}
|
||||
|
||||
// accessor and setter for entries in the pool
|
||||
func (self *BlockPool) get(hash []byte) *entry {
|
||||
func (self *BlockPool) get(hash common.Hash) *entry {
|
||||
self.lock.RLock()
|
||||
defer self.lock.RUnlock()
|
||||
return self.pool[string(hash)]
|
||||
return self.pool[hash.Str()]
|
||||
}
|
||||
|
||||
func (self *BlockPool) set(hash []byte, e *entry) {
|
||||
func (self *BlockPool) set(hash common.Hash, e *entry) {
|
||||
self.lock.Lock()
|
||||
defer self.lock.Unlock()
|
||||
self.pool[string(hash)] = e
|
||||
self.pool[hash.Str()] = e
|
||||
}
|
||||
|
||||
func (self *BlockPool) remove(sec *section) {
|
||||
@@ -736,7 +735,7 @@ func (self *BlockPool) remove(sec *section) {
|
||||
defer self.lock.Unlock()
|
||||
|
||||
for _, node := range sec.nodes {
|
||||
delete(self.pool, string(node.hash))
|
||||
delete(self.pool, node.hash.Str())
|
||||
}
|
||||
if sec.initialised && sec.poolRootIndex != 0 {
|
||||
self.status.lock.Lock()
|
||||
@@ -745,17 +744,17 @@ func (self *BlockPool) remove(sec *section) {
|
||||
}
|
||||
}
|
||||
|
||||
func (self *BlockPool) getHashSlice() (s [][]byte) {
|
||||
func (self *BlockPool) getHashSlice() (s []common.Hash) {
|
||||
select {
|
||||
case s = <-self.hashSlicePool:
|
||||
default:
|
||||
s = make([][]byte, self.Config.BlockBatchSize)
|
||||
s = make([]common.Hash, self.Config.BlockBatchSize)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Return returns a Client to the pool.
|
||||
func (self *BlockPool) putHashSlice(s [][]byte) {
|
||||
func (self *BlockPool) putHashSlice(s []common.Hash) {
|
||||
if len(s) == self.Config.BlockBatchSize {
|
||||
select {
|
||||
case self.hashSlicePool <- s:
|
||||
@@ -765,8 +764,8 @@ func (self *BlockPool) putHashSlice(s [][]byte) {
|
||||
}
|
||||
|
||||
// pretty prints hash (byte array) with first 4 bytes in hex
|
||||
func hex(hash []byte) (name string) {
|
||||
if hash == nil {
|
||||
func hex(hash common.Hash) (name string) {
|
||||
if (hash == common.Hash{}) {
|
||||
name = ""
|
||||
} else {
|
||||
name = fmt.Sprintf("%x", hash[:4])
|
||||
|
@@ -1,16 +1,15 @@
|
||||
package blockpool
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"math/big"
|
||||
"math/rand"
|
||||
"sort"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/core/types"
|
||||
"github.com/ethereum/go-ethereum/errs"
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
)
|
||||
|
||||
type peer struct {
|
||||
@@ -18,20 +17,20 @@ type peer struct {
|
||||
|
||||
// last known blockchain status
|
||||
td *big.Int
|
||||
currentBlockHash []byte
|
||||
currentBlockHash common.Hash
|
||||
currentBlock *types.Block
|
||||
parentHash []byte
|
||||
parentHash common.Hash
|
||||
headSection *section
|
||||
|
||||
id string
|
||||
|
||||
// peer callbacks
|
||||
requestBlockHashes func([]byte) error
|
||||
requestBlocks func([][]byte) error
|
||||
requestBlockHashes func(common.Hash) error
|
||||
requestBlocks func([]common.Hash) error
|
||||
peerError func(*errs.Error)
|
||||
errors *errs.Errors
|
||||
|
||||
sections [][]byte
|
||||
sections []common.Hash
|
||||
|
||||
// channels to push new head block and head section for peer a
|
||||
currentBlockC chan *types.Block
|
||||
@@ -66,10 +65,10 @@ type peers struct {
|
||||
// peer constructor
|
||||
func (self *peers) newPeer(
|
||||
td *big.Int,
|
||||
currentBlockHash []byte,
|
||||
currentBlockHash common.Hash,
|
||||
id string,
|
||||
requestBlockHashes func([]byte) error,
|
||||
requestBlocks func([][]byte) error,
|
||||
requestBlockHashes func(common.Hash) error,
|
||||
requestBlocks func([]common.Hash) error,
|
||||
peerError func(*errs.Error),
|
||||
) (p *peer) {
|
||||
|
||||
@@ -107,7 +106,7 @@ func (self *peer) addError(code int, format string, params ...interface{}) {
|
||||
self.peerError(err)
|
||||
}
|
||||
|
||||
func (self *peer) setChainInfo(td *big.Int, c []byte) {
|
||||
func (self *peer) setChainInfo(td *big.Int, c common.Hash) {
|
||||
self.lock.Lock()
|
||||
defer self.lock.Unlock()
|
||||
|
||||
@@ -115,7 +114,7 @@ func (self *peer) setChainInfo(td *big.Int, c []byte) {
|
||||
self.currentBlockHash = c
|
||||
|
||||
self.currentBlock = nil
|
||||
self.parentHash = nil
|
||||
self.parentHash = common.Hash{}
|
||||
self.headSection = nil
|
||||
}
|
||||
|
||||
@@ -139,7 +138,7 @@ func (self *peer) setChainInfoFromBlock(block *types.Block) {
|
||||
}()
|
||||
}
|
||||
|
||||
func (self *peers) requestBlocks(attempts int, hashes [][]byte) {
|
||||
func (self *peers) requestBlocks(attempts int, hashes []common.Hash) {
|
||||
// distribute block request among known peers
|
||||
self.lock.RLock()
|
||||
defer self.lock.RUnlock()
|
||||
@@ -178,18 +177,18 @@ func (self *peers) requestBlocks(attempts int, hashes [][]byte) {
|
||||
// returns true iff peer is promoted as best peer in the pool
|
||||
func (self *peers) addPeer(
|
||||
td *big.Int,
|
||||
currentBlockHash []byte,
|
||||
currentBlockHash common.Hash,
|
||||
id string,
|
||||
requestBlockHashes func([]byte) error,
|
||||
requestBlocks func([][]byte) error,
|
||||
requestBlockHashes func(common.Hash) error,
|
||||
requestBlocks func([]common.Hash) error,
|
||||
peerError func(*errs.Error),
|
||||
) (best bool) {
|
||||
|
||||
var previousBlockHash []byte
|
||||
var previousBlockHash common.Hash
|
||||
self.lock.Lock()
|
||||
p, found := self.peers[id]
|
||||
if found {
|
||||
if !bytes.Equal(p.currentBlockHash, currentBlockHash) {
|
||||
if p.currentBlockHash != currentBlockHash {
|
||||
previousBlockHash = p.currentBlockHash
|
||||
plog.Debugf("addPeer: Update peer <%s> with td %v and current block %s (was %v)", id, td, hex(currentBlockHash), hex(previousBlockHash))
|
||||
p.setChainInfo(td, currentBlockHash)
|
||||
@@ -221,7 +220,7 @@ func (self *peers) addPeer(
|
||||
// new block update for active current best peer -> request hashes
|
||||
plog.Debugf("addPeer: <%s> already the best peer. Request new head section info from %s", id, hex(currentBlockHash))
|
||||
|
||||
if previousBlockHash != nil {
|
||||
if (previousBlockHash != common.Hash{}) {
|
||||
if entry := self.bp.get(previousBlockHash); entry != nil {
|
||||
p.headSectionC <- nil
|
||||
self.bp.activateChain(entry.section, p, nil)
|
||||
@@ -318,15 +317,15 @@ func (self *BlockPool) switchPeer(oldp, newp *peer) {
|
||||
}
|
||||
|
||||
var connected = make(map[string]*section)
|
||||
var sections [][]byte
|
||||
var sections []common.Hash
|
||||
for _, hash := range newp.sections {
|
||||
plog.DebugDetailf("activate chain starting from section [%s]", hex(hash))
|
||||
// if section not connected (ie, top of a contiguous sequence of sections)
|
||||
if connected[string(hash)] == nil {
|
||||
if connected[hash.Str()] == nil {
|
||||
// if not deleted, then reread from pool (it can be orphaned top half of a split section)
|
||||
if entry := self.get(hash); entry != nil {
|
||||
self.activateChain(entry.section, newp, connected)
|
||||
connected[string(hash)] = entry.section
|
||||
connected[hash.Str()] = entry.section
|
||||
sections = append(sections, hash)
|
||||
}
|
||||
}
|
||||
@@ -396,7 +395,7 @@ func (self *peer) getCurrentBlock(currentBlock *types.Block) {
|
||||
plog.DebugDetailf("HeadSection: <%s> head block %s found in blockpool", self.id, hex(self.currentBlockHash))
|
||||
} else {
|
||||
plog.DebugDetailf("HeadSection: <%s> head block %s not found... requesting it", self.id, hex(self.currentBlockHash))
|
||||
self.requestBlocks([][]byte{self.currentBlockHash})
|
||||
self.requestBlocks([]common.Hash{self.currentBlockHash})
|
||||
self.blocksRequestTimer = time.After(self.bp.Config.BlocksRequestInterval)
|
||||
return
|
||||
}
|
||||
@@ -427,9 +426,9 @@ func (self *peer) getBlockHashes() {
|
||||
self.addError(ErrInvalidBlock, "%v", err)
|
||||
self.bp.status.badPeers[self.id]++
|
||||
} else {
|
||||
headKey := string(self.parentHash)
|
||||
headKey := self.parentHash.Str()
|
||||
height := self.bp.status.chain[headKey] + 1
|
||||
self.bp.status.chain[string(self.currentBlockHash)] = height
|
||||
self.bp.status.chain[self.currentBlockHash.Str()] = height
|
||||
if height > self.bp.status.values.LongestChain {
|
||||
self.bp.status.values.LongestChain = height
|
||||
}
|
||||
|
@@ -4,6 +4,7 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/core/types"
|
||||
)
|
||||
|
||||
@@ -27,9 +28,9 @@ type section struct {
|
||||
nodes []*node
|
||||
|
||||
peer *peer
|
||||
parentHash []byte
|
||||
parentHash common.Hash
|
||||
|
||||
blockHashes [][]byte
|
||||
blockHashes []common.Hash
|
||||
|
||||
poolRootIndex int
|
||||
|
||||
@@ -115,7 +116,7 @@ func (self *section) addSectionToBlockChain(p *peer) {
|
||||
break
|
||||
}
|
||||
self.poolRootIndex--
|
||||
keys = append(keys, string(node.hash))
|
||||
keys = append(keys, node.hash.Str())
|
||||
blocks = append(blocks, block)
|
||||
}
|
||||
|
||||
@@ -166,9 +167,9 @@ func (self *section) addSectionToBlockChain(p *peer) {
|
||||
|
||||
self.bp.status.lock.Lock()
|
||||
if err == nil {
|
||||
headKey := string(blocks[0].ParentHash())
|
||||
headKey := blocks[0].ParentHash().Str()
|
||||
height := self.bp.status.chain[headKey] + len(blocks)
|
||||
self.bp.status.chain[string(blocks[len(blocks)-1].Hash())] = height
|
||||
self.bp.status.chain[blocks[len(blocks)-1].Hash().Str()] = height
|
||||
if height > self.bp.status.values.LongestChain {
|
||||
self.bp.status.values.LongestChain = height
|
||||
}
|
||||
@@ -316,7 +317,7 @@ LOOP:
|
||||
self.addSectionToBlockChain(self.peer)
|
||||
}
|
||||
} else {
|
||||
if self.parentHash == nil && n == self.bottom {
|
||||
if (self.parentHash == common.Hash{}) && n == self.bottom {
|
||||
self.parentHash = block.ParentHash()
|
||||
plog.DebugDetailf("[%s] got parent head block hash %s...checking", sectionhex(self), hex(self.parentHash))
|
||||
self.blockHashesRequest()
|
||||
@@ -456,7 +457,7 @@ func (self *section) blockHashesRequest() {
|
||||
// a demoted peer's fork will be chosen over the best peer's chain
|
||||
// because relinking the correct chain (activateChain) is overwritten here in
|
||||
// demoted peer's section process just before the section is put to idle mode
|
||||
if self.parentHash != nil {
|
||||
if (self.parentHash != common.Hash{}) {
|
||||
if parent := self.bp.get(self.parentHash); parent != nil {
|
||||
parentSection = parent.section
|
||||
plog.DebugDetailf("[%s] blockHashesRequest: parent section [%s] linked\n", sectionhex(self), sectionhex(parentSection))
|
||||
|
Reference in New Issue
Block a user