eth, eth/downloader: handle header requests, table driven proto tests
This commit is contained in:
273
eth/handler.go
273
eth/handler.go
@ -36,10 +36,8 @@ import (
|
||||
"github.com/ethereum/go-ethereum/rlp"
|
||||
)
|
||||
|
||||
// This is the target maximum size of returned blocks for the
|
||||
// getBlocks message. The reply message may exceed it
|
||||
// if a single block is larger than the limit.
|
||||
const maxBlockRespSize = 2 * 1024 * 1024
|
||||
// This is the target maximum size of returned blocks, headers or node data.
|
||||
const softResponseLimit = 2 * 1024 * 1024
|
||||
|
||||
func errResp(code errCode, format string, v ...interface{}) error {
|
||||
return fmt.Errorf("%v - %v", code, fmt.Sprintf(format, v...))
|
||||
@ -59,12 +57,13 @@ func (ep extProt) GetHashes(hash common.Hash) error { return ep.getHashes(has
|
||||
func (ep extProt) GetBlock(hashes []common.Hash) error { return ep.getBlocks(hashes) }
|
||||
|
||||
type ProtocolManager struct {
|
||||
protVer, netId int
|
||||
txpool txPool
|
||||
chainman *core.ChainManager
|
||||
downloader *downloader.Downloader
|
||||
fetcher *fetcher.Fetcher
|
||||
peers *peerSet
|
||||
txpool txPool
|
||||
chainman *core.ChainManager
|
||||
chaindb common.Database
|
||||
|
||||
downloader *downloader.Downloader
|
||||
fetcher *fetcher.Fetcher
|
||||
peers *peerSet
|
||||
|
||||
SubProtocols []p2p.Protocol
|
||||
|
||||
@ -85,17 +84,17 @@ type ProtocolManager struct {
|
||||
|
||||
// NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable
|
||||
// with the ethereum network.
|
||||
func NewProtocolManager(networkId int, mux *event.TypeMux, txpool txPool, pow pow.PoW, chainman *core.ChainManager) *ProtocolManager {
|
||||
func NewProtocolManager(networkId int, mux *event.TypeMux, txpool txPool, pow pow.PoW, chainman *core.ChainManager, chaindb common.Database) *ProtocolManager {
|
||||
// Create the protocol manager with the base fields
|
||||
manager := &ProtocolManager{
|
||||
eventMux: mux,
|
||||
txpool: txpool,
|
||||
chainman: chainman,
|
||||
chaindb: chaindb,
|
||||
peers: newPeerSet(),
|
||||
newPeerCh: make(chan *peer, 1),
|
||||
txsyncCh: make(chan *txsync),
|
||||
quitSync: make(chan struct{}),
|
||||
netId: networkId,
|
||||
}
|
||||
// Initiate a sub-protocol for every implemented version we can handle
|
||||
manager.SubProtocols = make([]p2p.Protocol, len(ProtocolVersions))
|
||||
@ -190,6 +189,9 @@ func (pm *ProtocolManager) handle(p *peer) error {
|
||||
glog.V(logger.Debug).Infof("%v: handshake failed: %v", p, err)
|
||||
return err
|
||||
}
|
||||
if rw, ok := p.rw.(*meteredMsgReadWriter); ok {
|
||||
rw.Init(p.version)
|
||||
}
|
||||
// Register the peer locally
|
||||
glog.V(logger.Detail).Infof("%v: adding peer", p)
|
||||
if err := pm.peers.Register(p); err != nil {
|
||||
@ -230,12 +232,12 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
|
||||
defer msg.Discard()
|
||||
|
||||
// Handle the message depending on its contents
|
||||
switch msg.Code {
|
||||
case StatusMsg:
|
||||
switch {
|
||||
case msg.Code == StatusMsg:
|
||||
// Status messages should never arrive after the handshake
|
||||
return errResp(ErrExtraStatusMsg, "uncontrolled status message")
|
||||
|
||||
case GetBlockHashesMsg:
|
||||
case p.version < eth62 && msg.Code == GetBlockHashesMsg:
|
||||
// Retrieve the number of hashes to return and from which origin hash
|
||||
var request getBlockHashesData
|
||||
if err := msg.Decode(&request); err != nil {
|
||||
@ -251,7 +253,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
|
||||
}
|
||||
return p.SendBlockHashes(hashes)
|
||||
|
||||
case GetBlockHashesFromNumberMsg:
|
||||
case p.version < eth62 && msg.Code == GetBlockHashesFromNumberMsg:
|
||||
// Retrieve and decode the number of hashes to return and from which origin number
|
||||
var request getBlockHashesFromNumberData
|
||||
if err := msg.Decode(&request); err != nil {
|
||||
@ -278,12 +280,10 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
|
||||
}
|
||||
return p.SendBlockHashes(hashes)
|
||||
|
||||
case BlockHashesMsg:
|
||||
case p.version < eth62 && msg.Code == BlockHashesMsg:
|
||||
// A batch of hashes arrived to one of our previous requests
|
||||
msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size))
|
||||
|
||||
var hashes []common.Hash
|
||||
if err := msgStream.Decode(&hashes); err != nil {
|
||||
if err := msg.Decode(&hashes); err != nil {
|
||||
break
|
||||
}
|
||||
// Deliver them all to the downloader for queuing
|
||||
@ -292,7 +292,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
|
||||
glog.V(logger.Debug).Infoln(err)
|
||||
}
|
||||
|
||||
case GetBlocksMsg:
|
||||
case p.version < eth62 && msg.Code == GetBlocksMsg:
|
||||
// Decode the retrieval message
|
||||
msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size))
|
||||
if _, err := msgStream.List(); err != nil {
|
||||
@ -302,44 +302,28 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
|
||||
var (
|
||||
hash common.Hash
|
||||
bytes common.StorageSize
|
||||
hashes []common.Hash
|
||||
blocks []*types.Block
|
||||
)
|
||||
for {
|
||||
for len(blocks) < downloader.MaxBlockFetch && bytes < softResponseLimit {
|
||||
//Retrieve the hash of the next block
|
||||
err := msgStream.Decode(&hash)
|
||||
if err == rlp.EOL {
|
||||
break
|
||||
} else if err != nil {
|
||||
return errResp(ErrDecode, "msg %v: %v", msg, err)
|
||||
}
|
||||
hashes = append(hashes, hash)
|
||||
|
||||
// Retrieve the requested block, stopping if enough was found
|
||||
if block := pm.chainman.GetBlock(hash); block != nil {
|
||||
blocks = append(blocks, block)
|
||||
bytes += block.Size()
|
||||
if len(blocks) >= downloader.MaxBlockFetch || bytes > maxBlockRespSize {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
if glog.V(logger.Detail) && len(blocks) == 0 && len(hashes) > 0 {
|
||||
list := "["
|
||||
for _, hash := range hashes {
|
||||
list += fmt.Sprintf("%x, ", hash[:4])
|
||||
}
|
||||
list = list[:len(list)-2] + "]"
|
||||
|
||||
glog.Infof("%v: no blocks found for requested hashes %s", p, list)
|
||||
}
|
||||
return p.SendBlocks(blocks)
|
||||
|
||||
case BlocksMsg:
|
||||
case p.version < eth62 && msg.Code == BlocksMsg:
|
||||
// Decode the arrived block message
|
||||
msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size))
|
||||
|
||||
var blocks []*types.Block
|
||||
if err := msgStream.Decode(&blocks); err != nil {
|
||||
if err := msg.Decode(&blocks); err != nil {
|
||||
glog.V(logger.Detail).Infoln("Decode error", err)
|
||||
blocks = nil
|
||||
}
|
||||
@ -352,31 +336,196 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
|
||||
pm.downloader.DeliverBlocks(p.id, blocks)
|
||||
}
|
||||
|
||||
case NewBlockHashesMsg:
|
||||
// Retrieve and deseralize the remote new block hashes notification
|
||||
msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size))
|
||||
// Block header query, collect the requested headers and reply
|
||||
case p.version >= eth62 && msg.Code == GetBlockHeadersMsg:
|
||||
// Decode the complex header query
|
||||
var query getBlockHeadersData
|
||||
if err := msg.Decode(&query); err != nil {
|
||||
return errResp(ErrDecode, "%v: %v", msg, err)
|
||||
}
|
||||
// Gather blocks until the fetch or network limits is reached
|
||||
var (
|
||||
bytes common.StorageSize
|
||||
headers []*types.Header
|
||||
unknown bool
|
||||
)
|
||||
for !unknown && len(headers) < int(query.Amount) && bytes < softResponseLimit && len(headers) < downloader.MaxHeaderFetch {
|
||||
// Retrieve the next block satisfying the query
|
||||
var origin *types.Block
|
||||
if query.Origin.Hash != (common.Hash{}) {
|
||||
origin = pm.chainman.GetBlock(query.Origin.Hash)
|
||||
} else {
|
||||
origin = pm.chainman.GetBlockByNumber(query.Origin.Number)
|
||||
}
|
||||
if origin == nil {
|
||||
break
|
||||
}
|
||||
headers = append(headers, origin.Header())
|
||||
bytes += origin.Size()
|
||||
|
||||
var hashes []common.Hash
|
||||
if err := msgStream.Decode(&hashes); err != nil {
|
||||
break
|
||||
}
|
||||
// Mark the hashes as present at the remote node
|
||||
for _, hash := range hashes {
|
||||
p.MarkBlock(hash)
|
||||
p.SetHead(hash)
|
||||
}
|
||||
// Schedule all the unknown hashes for retrieval
|
||||
unknown := make([]common.Hash, 0, len(hashes))
|
||||
for _, hash := range hashes {
|
||||
if !pm.chainman.HasBlock(hash) {
|
||||
unknown = append(unknown, hash)
|
||||
// Advance to the next block of the query
|
||||
switch {
|
||||
case query.Origin.Hash != (common.Hash{}) && query.Reverse:
|
||||
// Hash based traversal towards the genesis block
|
||||
for i := 0; i < int(query.Skip)+1; i++ {
|
||||
if block := pm.chainman.GetBlock(query.Origin.Hash); block != nil {
|
||||
query.Origin.Hash = block.ParentHash()
|
||||
} else {
|
||||
unknown = true
|
||||
break
|
||||
}
|
||||
}
|
||||
case query.Origin.Hash != (common.Hash{}) && !query.Reverse:
|
||||
// Hash based traversal towards the leaf block
|
||||
if block := pm.chainman.GetBlockByNumber(origin.NumberU64() + query.Skip + 1); block != nil {
|
||||
if pm.chainman.GetBlockHashesFromHash(block.Hash(), query.Skip+1)[query.Skip] == query.Origin.Hash {
|
||||
query.Origin.Hash = block.Hash()
|
||||
} else {
|
||||
unknown = true
|
||||
}
|
||||
} else {
|
||||
unknown = true
|
||||
}
|
||||
case query.Reverse:
|
||||
// Number based traversal towards the genesis block
|
||||
if query.Origin.Number >= query.Skip+1 {
|
||||
query.Origin.Number -= (query.Skip + 1)
|
||||
} else {
|
||||
unknown = true
|
||||
}
|
||||
|
||||
case !query.Reverse:
|
||||
// Number based traversal towards the leaf block
|
||||
query.Origin.Number += (query.Skip + 1)
|
||||
}
|
||||
}
|
||||
for _, hash := range unknown {
|
||||
pm.fetcher.Notify(p.id, hash, time.Now(), p.RequestBlocks)
|
||||
return p.SendBlockHeaders(headers)
|
||||
|
||||
case p.version >= eth62 && msg.Code == GetBlockBodiesMsg:
|
||||
// Decode the retrieval message
|
||||
msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size))
|
||||
if _, err := msgStream.List(); err != nil {
|
||||
return err
|
||||
}
|
||||
// Gather blocks until the fetch or network limits is reached
|
||||
var (
|
||||
hash common.Hash
|
||||
bytes common.StorageSize
|
||||
bodies []*blockBody
|
||||
)
|
||||
for bytes < softResponseLimit && len(bodies) < downloader.MaxBlockFetch {
|
||||
//Retrieve the hash of the next block
|
||||
if err := msgStream.Decode(&hash); err == rlp.EOL {
|
||||
break
|
||||
} else if err != nil {
|
||||
return errResp(ErrDecode, "msg %v: %v", msg, err)
|
||||
}
|
||||
// Retrieve the requested block, stopping if enough was found
|
||||
if block := pm.chainman.GetBlock(hash); block != nil {
|
||||
bodies = append(bodies, &blockBody{Transactions: block.Transactions(), Uncles: block.Uncles()})
|
||||
bytes += block.Size()
|
||||
}
|
||||
}
|
||||
return p.SendBlockBodies(bodies)
|
||||
|
||||
case p.version >= eth63 && msg.Code == GetNodeDataMsg:
|
||||
// Decode the retrieval message
|
||||
msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size))
|
||||
if _, err := msgStream.List(); err != nil {
|
||||
return err
|
||||
}
|
||||
// Gather state data until the fetch or network limits is reached
|
||||
var (
|
||||
hash common.Hash
|
||||
bytes int
|
||||
data [][]byte
|
||||
)
|
||||
for bytes < softResponseLimit && len(data) < downloader.MaxStateFetch {
|
||||
// Retrieve the hash of the next state entry
|
||||
if err := msgStream.Decode(&hash); err == rlp.EOL {
|
||||
break
|
||||
} else if err != nil {
|
||||
return errResp(ErrDecode, "msg %v: %v", msg, err)
|
||||
}
|
||||
// Retrieve the requested state entry, stopping if enough was found
|
||||
if entry, err := pm.chaindb.Get(hash.Bytes()); err == nil {
|
||||
data = append(data, entry)
|
||||
bytes += len(entry)
|
||||
}
|
||||
}
|
||||
return p.SendNodeData(data)
|
||||
|
||||
case p.version >= eth63 && msg.Code == GetReceiptsMsg:
|
||||
// Decode the retrieval message
|
||||
msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size))
|
||||
if _, err := msgStream.List(); err != nil {
|
||||
return err
|
||||
}
|
||||
// Gather state data until the fetch or network limits is reached
|
||||
var (
|
||||
hash common.Hash
|
||||
bytes int
|
||||
receipts []*types.Receipt
|
||||
)
|
||||
for bytes < softResponseLimit && len(receipts) < downloader.MaxReceiptsFetch {
|
||||
// Retrieve the hash of the next transaction receipt
|
||||
if err := msgStream.Decode(&hash); err == rlp.EOL {
|
||||
break
|
||||
} else if err != nil {
|
||||
return errResp(ErrDecode, "msg %v: %v", msg, err)
|
||||
}
|
||||
// Retrieve the requested receipt, stopping if enough was found
|
||||
if receipt := core.GetReceipt(pm.chaindb, hash); receipt != nil {
|
||||
receipts = append(receipts, receipt)
|
||||
bytes += len(receipt.RlpEncode())
|
||||
}
|
||||
}
|
||||
return p.SendReceipts(receipts)
|
||||
|
||||
case msg.Code == NewBlockHashesMsg:
|
||||
// Retrieve and deseralize the remote new block hashes notification
|
||||
type announce struct {
|
||||
Hash common.Hash
|
||||
Number uint64
|
||||
}
|
||||
var announces = []announce{}
|
||||
|
||||
if p.version < eth62 {
|
||||
// We're running the old protocol, make block number unknown (0)
|
||||
var hashes []common.Hash
|
||||
if err := msg.Decode(&hashes); err != nil {
|
||||
return errResp(ErrDecode, "%v: %v", msg, err)
|
||||
}
|
||||
for _, hash := range hashes {
|
||||
announces = append(announces, announce{hash, 0})
|
||||
}
|
||||
} else {
|
||||
// Otherwise extract both block hash and number
|
||||
var request newBlockHashesData
|
||||
if err := msg.Decode(&request); err != nil {
|
||||
return errResp(ErrDecode, "%v: %v", msg, err)
|
||||
}
|
||||
for _, block := range request {
|
||||
announces = append(announces, announce{block.Hash, block.Number})
|
||||
}
|
||||
}
|
||||
// Mark the hashes as present at the remote node
|
||||
for _, block := range announces {
|
||||
p.MarkBlock(block.Hash)
|
||||
p.SetHead(block.Hash)
|
||||
}
|
||||
// Schedule all the unknown hashes for retrieval
|
||||
unknown := make([]announce, 0, len(announces))
|
||||
for _, block := range announces {
|
||||
if !pm.chainman.HasBlock(block.Hash) {
|
||||
unknown = append(unknown, block)
|
||||
}
|
||||
}
|
||||
for _, block := range unknown {
|
||||
pm.fetcher.Notify(p.id, block.Hash, block.Number, time.Now(), p.RequestBlocks)
|
||||
}
|
||||
|
||||
case NewBlockMsg:
|
||||
case msg.Code == NewBlockMsg:
|
||||
// Retrieve and decode the propagated block
|
||||
var request newBlockData
|
||||
if err := msg.Decode(&request); err != nil {
|
||||
@ -410,7 +559,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
|
||||
}
|
||||
}
|
||||
|
||||
case TxMsg:
|
||||
case msg.Code == TxMsg:
|
||||
// Transactions arrived, parse all of them and deliver to the pool
|
||||
var txs []*types.Transaction
|
||||
if err := msg.Decode(&txs); err != nil {
|
||||
|
Reference in New Issue
Block a user