eth: rework tx fetcher to use O(1) ops + manage network requests

This commit is contained in:
Péter Szilágyi
2020-01-22 16:39:43 +02:00
parent 049e17116e
commit 9938d954c8
128 changed files with 2867 additions and 871 deletions

View File

@ -51,7 +51,7 @@ const (
// The number is referenced from the size of tx pool.
txChanSize = 4096
// minimim number of peers to broadcast new blocks to
// minimim number of peers to broadcast entire blocks and transactions too.
minBroadcastPeers = 4
)
@ -192,7 +192,15 @@ func NewProtocolManager(config *params.ChainConfig, checkpoint *params.TrustedCh
return n, err
}
manager.blockFetcher = fetcher.NewBlockFetcher(blockchain.GetBlockByHash, validator, manager.BroadcastBlock, heighter, inserter, manager.removePeer)
manager.txFetcher = fetcher.NewTxFetcher(txpool.Has, txpool.AddRemotes, manager.removePeer)
fetchTx := func(peer string, hashes []common.Hash) error {
p := manager.peers.Peer(peer)
if p == nil {
return errors.New("unknown peer")
}
return p.RequestTxs(hashes)
}
manager.txFetcher = fetcher.NewTxFetcher(txpool.Has, txpool.AddRemotes, fetchTx)
return manager, nil
}
@ -240,6 +248,8 @@ func (pm *ProtocolManager) removePeer(id string) {
// Unregister the peer from the downloader and Ethereum peer set
pm.downloader.UnregisterPeer(id)
pm.txFetcher.Drop(id)
if err := pm.peers.Unregister(id); err != nil {
log.Error("Peer removal failed", "peer", id, "err", err)
}
@ -263,7 +273,7 @@ func (pm *ProtocolManager) Start(maxPeers int) {
// start sync handlers
go pm.syncer()
go pm.txsyncLoop()
go pm.txsyncLoop64() // TODO(karalabe): Legacy initial tx echange, drop with eth/64.
}
func (pm *ProtocolManager) Stop() {
@ -292,7 +302,7 @@ func (pm *ProtocolManager) Stop() {
}
func (pm *ProtocolManager) newPeer(pv int, p *p2p.Peer, rw p2p.MsgReadWriter, getPooledTx func(hash common.Hash) *types.Transaction) *peer {
return newPeer(pv, p, newMeteredMsgWriter(rw), getPooledTx)
return newPeer(pv, p, rw, getPooledTx)
}
// handle is the callback invoked to manage the life cycle of an eth peer. When
@ -316,9 +326,6 @@ func (pm *ProtocolManager) handle(p *peer) error {
p.Log().Debug("Ethereum handshake failed", "err", err)
return err
}
if rw, ok := p.rw.(*meteredMsgReadWriter); ok {
rw.Init(p.version)
}
// Register the peer locally
if err := pm.peers.Register(p); err != nil {
p.Log().Error("Ethereum peer registration failed", "err", err)
@ -740,20 +747,10 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
// Schedule all the unknown hashes for retrieval
var unknown []common.Hash
for _, hash := range hashes {
// Mark the hashes as present at the remote node
p.MarkTransaction(hash)
// Filter duplicated transaction announcement.
// Notably we only dedupliate announcement in txpool, check the rationale
// behind in EIP https://github.com/ethereum/EIPs/pull/2464.
if pm.txpool.Has(hash) {
continue
}
unknown = append(unknown, hash)
}
pm.txFetcher.Notify(p.id, unknown, time.Now(), p.AsyncRequestTxs)
pm.txFetcher.Notify(p.id, hashes)
case msg.Code == GetPooledTransactionsMsg && p.version >= eth65:
// Decode the retrieval message
@ -763,9 +760,10 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
// Gather transactions until the fetch or network limits is reached
var (
hash common.Hash
bytes int
txs []rlp.RawValue
hash common.Hash
bytes int
hashes []common.Hash
txs []rlp.RawValue
)
for bytes < softResponseLimit {
// Retrieve the hash of the next block
@ -783,13 +781,14 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
if encoded, err := rlp.EncodeToBytes(tx); err != nil {
log.Error("Failed to encode transaction", "err", err)
} else {
hashes = append(hashes, hash)
txs = append(txs, encoded)
bytes += len(encoded)
}
}
return p.SendTransactionRLP(txs)
return p.SendPooledTransactionsRLP(hashes, txs)
case msg.Code == TxMsg:
case msg.Code == TransactionMsg || (msg.Code == PooledTransactionsMsg && p.version >= eth65):
// Transactions arrived, make sure we have a valid and fresh chain to handle them
if atomic.LoadUint32(&pm.acceptTxs) == 0 {
break
@ -806,7 +805,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
p.MarkTransaction(tx.Hash())
}
pm.txFetcher.EnqueueTxs(p.id, txs)
pm.txFetcher.Enqueue(p.id, txs, msg.Code == PooledTransactionsMsg)
default:
return errResp(ErrInvalidMsgCode, "%v", msg.Code)
@ -854,9 +853,9 @@ func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool) {
}
}
// BroadcastTxs will propagate a batch of transactions to all peers which are not known to
// BroadcastTransactions will propagate a batch of transactions to all peers which are not known to
// already have the given transaction.
func (pm *ProtocolManager) BroadcastTxs(txs types.Transactions, propagate bool) {
func (pm *ProtocolManager) BroadcastTransactions(txs types.Transactions, propagate bool) {
var (
txset = make(map[*peer][]common.Hash)
annos = make(map[*peer][]common.Hash)
@ -894,7 +893,7 @@ func (pm *ProtocolManager) BroadcastTxs(txs types.Transactions, propagate bool)
}
for peer, hashes := range annos {
if peer.version >= eth65 {
peer.AsyncSendTransactionHashes(hashes)
peer.AsyncSendPooledTransactionHashes(hashes)
} else {
peer.AsyncSendTransactions(hashes)
}
@ -918,11 +917,11 @@ func (pm *ProtocolManager) txBroadcastLoop() {
case event := <-pm.txsCh:
// For testing purpose only, disable propagation
if pm.broadcastTxAnnouncesOnly {
pm.BroadcastTxs(event.Txs, false)
pm.BroadcastTransactions(event.Txs, false)
continue
}
pm.BroadcastTxs(event.Txs, true) // First propagate transactions to peers
pm.BroadcastTxs(event.Txs, false) // Only then announce to the rest
pm.BroadcastTransactions(event.Txs, true) // First propagate transactions to peers
pm.BroadcastTransactions(event.Txs, false) // Only then announce to the rest
// Err() channel will be closed when unsubscribing.
case <-pm.txsSub.Err():