PoC 6 networking code.

* Added block pool for gathering blocks from the network (chunks)
* Re wrote syncing
This commit is contained in:
obscuren
2014-08-21 14:47:58 +02:00
parent 79c64f6bca
commit eaa2e8900d
7 changed files with 296 additions and 197 deletions

288
peer.go
View File

@ -4,6 +4,8 @@ import (
"bytes"
"container/list"
"fmt"
"math"
"math/big"
"net"
"strconv"
"strings"
@ -22,7 +24,7 @@ const (
// The size of the output buffer for writing messages
outputBufferSize = 50
// Current protocol version
ProtocolVersion = 26
ProtocolVersion = 27
// Interval for ping/pong message
pingPongTimer = 2 * time.Second
)
@ -125,9 +127,13 @@ type Peer struct {
lastPong int64
lastBlockReceived time.Time
host []byte
port uint16
caps Caps
host []byte
port uint16
caps Caps
td *big.Int
bestHash []byte
lastReceivedHash []byte
requestedHashes [][]byte
// This peer's public key
pubkey []byte
@ -345,7 +351,6 @@ func (p *Peer) HandleInbound() {
for _, msg := range msgs {
peerlogger.DebugDetailf("(%v) => %v %v\n", p.conn.RemoteAddr(), msg.Type, msg.Data)
nextMsg:
switch msg.Type {
case ethwire.MsgHandshakeTy:
// Version message
@ -354,6 +359,7 @@ func (p *Peer) HandleInbound() {
if p.caps.IsCap(CapPeerDiscTy) {
p.QueueMessage(ethwire.NewMessage(ethwire.MsgGetPeersTy, ""))
}
case ethwire.MsgDiscTy:
p.Stop()
peerlogger.Infoln("Disconnect peer: ", DiscReason(msg.Data.Get(0).Uint()))
@ -366,117 +372,6 @@ func (p *Peer) HandleInbound() {
// active.
p.lastPong = time.Now().Unix()
p.pingTime = time.Since(p.pingStartTime)
case ethwire.MsgBlockTy:
// Get all blocks and process them
//var block, lastBlock *ethchain.Block
//var err error
var (
block, lastBlock *ethchain.Block
blockChain = p.ethereum.BlockChain()
err error
)
// Make sure we are actually receiving anything
if msg.Data.Len()-1 > 1 && p.diverted {
// We requested blocks and now we need to make sure we have a common ancestor somewhere in these blocks so we can find
// common ground to start syncing from
lastBlock = ethchain.NewBlockFromRlpValue(msg.Data.Get(msg.Data.Len() - 1))
if p.lastRequestedBlock != nil && bytes.Compare(lastBlock.Hash(), p.lastRequestedBlock.Hash()) == 0 {
p.catchingUp = false
continue
}
p.lastRequestedBlock = lastBlock
peerlogger.Infof("Last block: %x. Checking if we have it locally.\n", lastBlock.Hash())
for i := msg.Data.Len() - 1; i >= 0; i-- {
block = ethchain.NewBlockFromRlpValue(msg.Data.Get(i))
// Do we have this block on our chain? If so we can continue
if !blockChain.HasBlock(block.Hash()) {
// We don't have this block, but we do have a block with the same prevHash, diversion time!
if blockChain.HasBlockWithPrevHash(block.PrevHash) {
p.diverted = false
if !blockChain.FindCanonicalChainFromMsg(msg, block.PrevHash) {
p.SyncWithPeerToLastKnown()
break nextMsg
}
break
}
}
}
if !blockChain.HasBlock(lastBlock.Hash()) {
// If we can't find a common ancenstor we need to request more blocks.
// FIXME: At one point this won't scale anymore since we are not asking for an offset
// we just keep increasing the amount of blocks.
p.blocksRequested = p.blocksRequested * 2
peerlogger.Infof("No common ancestor found, requesting %d more blocks.\n", p.blocksRequested)
p.FindCommonParentBlock()
break nextMsg
}
p.catchingUp = false
}
for i := msg.Data.Len() - 1; i >= 0; i-- {
block = ethchain.NewBlockFromRlpValue(msg.Data.Get(i))
err = p.ethereum.StateManager().Process(block, false)
if err != nil {
if ethutil.Config.Debug {
peerlogger.Infof("Block %x failed\n", block.Hash())
peerlogger.Infof("%v\n", err)
peerlogger.Debugln(block)
}
break
} else {
lastBlock = block
}
p.lastBlockReceived = time.Now()
}
if msg.Data.Len() <= 1 {
// Set catching up to false if
// the peer has nothing left to give
p.catchingUp = false
}
if err != nil {
// If the parent is unknown try to catch up with this peer
if ethchain.IsParentErr(err) {
b := ethchain.NewBlockFromRlpValue(msg.Data.Get(0))
peerlogger.Infof("Attempting to catch (%x). Parent unknown\n", b.Hash())
p.catchingUp = false
p.CatchupWithPeer(b.PrevHash)
peerlogger.Infoln(b)
/*
peerlogger.Infoln("Attempting to catch. Parent known")
p.catchingUp = false
p.CatchupWithPeer(p.ethereum.BlockChain().CurrentBlock.Hash())
*/
} else if ethchain.IsValidationErr(err) {
fmt.Println("Err:", err)
p.catchingUp = false
}
} else {
// If we're catching up, try to catch up further.
if p.catchingUp && msg.Data.Len() > 1 {
if lastBlock != nil {
blockInfo := lastBlock.BlockInfo()
peerlogger.DebugDetailf("Synced chain to #%d %x %x\n", blockInfo.Number, lastBlock.Hash(), blockInfo.Hash)
}
p.catchingUp = false
hash := p.ethereum.BlockChain().CurrentBlock.Hash()
p.CatchupWithPeer(hash)
}
}
case ethwire.MsgTxTy:
// If the message was a transaction queue the transaction
// in the TxPool where it will undergo validation and
@ -501,58 +396,6 @@ func (p *Peer) HandleInbound() {
// Connect to the list of peers
p.ethereum.ProcessPeerList(peers)
case ethwire.MsgGetChainTy:
var parent *ethchain.Block
// Length minus one since the very last element in the array is a count
l := msg.Data.Len() - 1
// Ignore empty get chains
if l == 0 {
break
}
// Amount of parents in the canonical chain
//amountOfBlocks := msg.Data.Get(l).AsUint()
amountOfBlocks := uint64(100)
// Check each SHA block hash from the message and determine whether
// the SHA is in the database
for i := 0; i < l; i++ {
if data := msg.Data.Get(i).Bytes(); p.ethereum.StateManager().BlockChain().HasBlock(data) {
parent = p.ethereum.BlockChain().GetBlock(data)
break
}
}
// If a parent is found send back a reply
if parent != nil {
peerlogger.DebugDetailf("Found canonical block, returning chain from: %x ", parent.Hash())
chain := p.ethereum.BlockChain().GetChainFromHash(parent.Hash(), amountOfBlocks)
if len(chain) > 0 {
//peerlogger.Debugf("Returning %d blocks: %x ", len(chain), parent.Hash())
p.QueueMessage(ethwire.NewMessage(ethwire.MsgBlockTy, chain))
} else {
p.QueueMessage(ethwire.NewMessage(ethwire.MsgBlockTy, []interface{}{}))
}
} else {
//peerlogger.Debugf("Could not find a similar block")
// If no blocks are found we send back a reply with msg not in chain
// and the last hash from get chain
if l > 0 {
lastHash := msg.Data.Get(l - 1)
//log.Printf("Sending not in chain with hash %x\n", lastHash.AsRaw())
p.QueueMessage(ethwire.NewMessage(ethwire.MsgNotInChainTy, []interface{}{lastHash.Raw()}))
}
}
case ethwire.MsgNotInChainTy:
peerlogger.DebugDetailf("Not in chain: %x\n", msg.Data.Get(0).Bytes())
if p.diverted == true {
// If were already looking for a common parent and we get here again we need to go deeper
p.blocksRequested = p.blocksRequested * 2
}
p.diverted = true
p.catchingUp = false
p.FindCommonParentBlock()
case ethwire.MsgGetTxsTy:
// Get the current transactions of the pool
txs := p.ethereum.TxPool().CurrentTransactions()
@ -564,15 +407,103 @@ func (p *Peer) HandleInbound() {
// Broadcast it back to the peer
p.QueueMessage(ethwire.NewMessage(ethwire.MsgTxTy, txsInterface))
// Unofficial but fun nonetheless
case ethwire.MsgTalkTy:
peerlogger.Infoln("%v says: %s\n", p.conn.RemoteAddr(), msg.Data.Str())
case ethwire.MsgGetBlockHashesTy:
if msg.Data.Len() < 2 {
peerlogger.Debugln("err: argument length invalid ", msg.Data.Len())
}
hash := msg.Data.Get(0).Bytes()
amount := msg.Data.Get(1).Uint()
hashes := p.ethereum.BlockChain().GetChainHashesFromHash(hash, amount)
p.QueueMessage(ethwire.NewMessage(ethwire.MsgBlockHashesTy, ethutil.ByteSliceToInterface(hashes)))
case ethwire.MsgGetBlocksTy:
// Limit to max 300 blocks
max := int(math.Min(float64(msg.Data.Len()), 300.0))
var blocks []interface{}
for i := 0; i < max; i++ {
hash := msg.Data.Get(i).Bytes()
block := p.ethereum.BlockChain().GetBlock(hash)
if block != nil {
blocks = append(blocks, block.Value().Raw())
}
}
p.QueueMessage(ethwire.NewMessage(ethwire.MsgBlockTy, blocks))
case ethwire.MsgBlockHashesTy:
blockPool := p.ethereum.blockPool
foundCommonHash := false
it := msg.Data.NewIterator()
for it.Next() {
hash := it.Value().Bytes()
if blockPool.HasCommonHash(hash) {
foundCommonHash = true
break
}
blockPool.AddHash(hash)
p.lastReceivedHash = hash
}
if foundCommonHash {
p.FetchBlocks()
} else {
p.FetchHashes()
}
case ethwire.MsgBlockTy:
blockPool := p.ethereum.blockPool
it := msg.Data.NewIterator()
for it.Next() {
block := ethchain.NewBlockFromRlpValue(it.Value())
blockPool.SetBlock(block)
}
linked := blockPool.CheckLinkAndProcess(func(block *ethchain.Block) {
p.ethereum.StateManager().Process(block, false)
})
if !linked {
p.FetchBlocks()
}
}
}
}
p.Stop()
}
func (self *Peer) FetchBlocks() {
blockPool := self.ethereum.blockPool
hashes := blockPool.Take(100, self)
if len(hashes) > 0 {
self.QueueMessage(ethwire.NewMessage(ethwire.MsgGetBlocksTy, ethutil.ByteSliceToInterface(hashes)))
}
}
func (self *Peer) FetchHashes() {
blockPool := self.ethereum.blockPool
if self.td.Cmp(blockPool.td) >= 0 {
peerlogger.Debugf("Requesting hashes from %x\n", self.lastReceivedHash)
if !blockPool.HasLatestHash() {
self.QueueMessage(ethwire.NewMessage(ethwire.MsgGetBlockHashesTy, []interface{}{self.lastReceivedHash, uint32(200)}))
}
}
}
// General update method
func (self *Peer) update() {
serviceTimer := time.NewTicker(5 * time.Second)
@ -643,6 +574,7 @@ func (p *Peer) pushHandshake() error {
pubkey := p.ethereum.KeyManager().PublicKey()
msg := ethwire.NewMessage(ethwire.MsgHandshakeTy, []interface{}{
uint32(ProtocolVersion), uint32(0), []byte(p.version), byte(p.caps), p.port, pubkey[1:],
p.ethereum.BlockChain().TD.Uint64(), p.ethereum.BlockChain().CurrentBlock.Hash(),
})
p.QueueMessage(msg)
@ -728,10 +660,15 @@ func (p *Peer) handleHandshake(msg *ethwire.Msg) {
p.SetVersion(c.Get(2).Str())
}
// Get the td and last hash
p.td = c.Get(6).BigInt()
p.bestHash = c.Get(7).Bytes()
p.lastReceivedHash = p.bestHash
p.ethereum.PushPeer(p)
p.ethereum.reactor.Post("peerList", p.ethereum.Peers())
ethlogger.Infof("Added peer (%s) %d / %d\n", p.conn.RemoteAddr(), p.ethereum.Peers().Len(), p.ethereum.MaxPeers)
ethlogger.Infof("Added peer (%s) %d / %d (TD = %v ~ %x)\n", p.conn.RemoteAddr(), p.ethereum.Peers().Len(), p.ethereum.MaxPeers, p.td, p.bestHash)
/*
// Catch up with the connected peer
@ -740,7 +677,12 @@ func (p *Peer) handleHandshake(msg *ethwire.Msg) {
time.Sleep(10 * time.Second)
}
*/
p.SyncWithPeerToLastKnown()
//p.SyncWithPeerToLastKnown()
if p.td.Cmp(p.ethereum.BlockChain().TD) > 0 {
p.ethereum.blockPool.AddHash(p.lastReceivedHash)
p.FetchHashes()
}
peerlogger.Debugln(p)
}