eth: clean up peer struct a bit, fix double txn bcast

This commit is contained in:
Péter Szilágyi
2015-06-29 12:44:00 +03:00
parent 2c8ed76e01
commit 6fc85f1ec2
4 changed files with 69 additions and 71 deletions

View File

@ -158,9 +158,7 @@ func (pm *ProtocolManager) Stop() {
}
func (pm *ProtocolManager) newPeer(pv, nv int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer {
td, current, genesis := pm.chainman.Status()
return newPeer(pv, nv, genesis, current, td, p, rw)
return newPeer(pv, nv, p, rw)
}
// handle is the callback invoked to manage the life cycle of an eth peer. When
@ -169,7 +167,8 @@ func (pm *ProtocolManager) handle(p *peer) error {
glog.V(logger.Debug).Infof("%v: peer connected", p)
// Execute the Ethereum handshake
if err := p.handleStatus(); err != nil {
td, head, genesis := pm.chainman.Status()
if err := p.Handshake(td, head, genesis); err != nil {
glog.V(logger.Debug).Infof("%v: handshake failed: %v", p, err)
return err
}
@ -182,7 +181,7 @@ func (pm *ProtocolManager) handle(p *peer) error {
defer pm.removePeer(p.id)
// Register the peer in the downloader. If the downloader considers it banned, we disconnect
if err := pm.downloader.RegisterPeer(p.id, p.Head(), p.requestHashes, p.requestBlocks); err != nil {
if err := pm.downloader.RegisterPeer(p.id, p.Head(), p.RequestHashes, p.RequestBlocks); err != nil {
return err
}
// Propagate existing transactions. new transactions appearing
@ -225,9 +224,13 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
propTxnInPacketsMeter.Mark(1)
for i, tx := range txs {
// Validate and mark the remote transaction
if tx == nil {
return errResp(ErrDecode, "transaction %d is nil", i)
}
p.MarkTransaction(tx.Hash())
// Log it's arrival for later analysis
propTxnInTrafficMeter.Mark(tx.Size().Int64())
jsonlogger.LogJson(&logger.EthTxReceived{
TxHash: tx.Hash().Hex(),
@ -255,7 +258,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
// returns either requested hashes or nothing (i.e. not found)
return p.sendBlockHashes(hashes)
return p.SendBlockHashes(hashes)
case BlockHashesMsg:
// A batch of hashes arrived to one of our previous requests
@ -314,7 +317,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
glog.Infof("%v: no blocks found for requested hashes %s", p, list)
}
return p.sendBlocks(blocks)
return p.SendBlocks(blocks)
case BlocksMsg:
// Decode the arrived block message
@ -349,7 +352,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
// Mark the hashes as present at the remote node
for _, hash := range hashes {
p.blockHashes.Add(hash)
p.MarkBlock(hash)
p.SetHead(hash)
}
// Schedule all the unknown hashes for retrieval
@ -360,7 +363,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
}
for _, hash := range unknown {
pm.fetcher.Notify(p.id, hash, time.Now(), p.requestBlocks)
pm.fetcher.Notify(p.id, hash, time.Now(), p.RequestBlocks)
}
case NewBlockMsg:
@ -387,7 +390,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
RemoteId: p.ID().String(),
})
// Mark the peer as owning the block and schedule it for import
p.blockHashes.Add(request.Block.Hash())
p.MarkBlock(request.Block.Hash())
p.SetHead(request.Block.Hash())
pm.fetcher.Enqueue(p.id, request.Block)
@ -412,14 +415,14 @@ func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool) {
if propagate {
transfer := peers[:int(math.Sqrt(float64(len(peers))))]
for _, peer := range transfer {
peer.sendNewBlock(block)
peer.SendNewBlock(block)
}
glog.V(logger.Detail).Infof("propagated block %x to %d peers in %v", hash[:4], len(transfer), time.Since(block.ReceivedAt))
}
// Otherwise if the block is indeed in out own chain, announce it
if pm.chainman.HasBlock(hash) {
for _, peer := range peers {
peer.sendNewBlockHashes([]common.Hash{hash})
peer.SendNewBlockHashes([]common.Hash{hash})
}
glog.V(logger.Detail).Infof("announced block %x to %d peers in %v", hash[:4], len(peers), time.Since(block.ReceivedAt))
}
@ -432,7 +435,7 @@ func (pm *ProtocolManager) BroadcastTx(hash common.Hash, tx *types.Transaction)
peers := pm.peers.PeersWithoutTx(hash)
//FIXME include this again: peers = peers[:int(math.Sqrt(float64(len(peers))))]
for _, peer := range peers {
peer.sendTransactions(types.Transactions{tx})
peer.SendTransactions(types.Transactions{tx})
}
glog.V(logger.Detail).Infoln("broadcast tx to", len(peers), "peers")
}