core, eth, internal, miner: optimize txpool for quick ops

This commit is contained in:
Péter Szilágyi
2016-07-01 18:59:55 +03:00
parent 795b70423e
commit 0ef327bbee
14 changed files with 790 additions and 428 deletions

View File

@ -20,7 +20,6 @@ import (
"errors"
"fmt"
"math/big"
"sort"
"sync"
"time"
@ -51,11 +50,6 @@ const (
type stateFn func() (*state.StateDB, error)
// TxList is a "list" of transactions belonging to an account, sorted by account
// nonce. To allow gaps and avoid constant copying, the list is represented as a
// hash map.
type TxList map[uint64]*types.Transaction
// TxPool contains all currently known transactions. Transactions
// enter the pool when they are received from the network or submitted
// locally. They exit the pool when they are included in the blockchain.
@ -74,8 +68,8 @@ type TxPool struct {
localTx *txSet
mu sync.RWMutex
pending map[common.Address]TxList // All currently processable transactions
queue map[common.Address]TxList // Queued but non-processable transactions
pending map[common.Address]*txList // All currently processable transactions
queue map[common.Address]*txList // Queued but non-processable transactions
all map[common.Hash]*types.Transaction // All transactions to allow lookups
wg sync.WaitGroup // for shutdown sync
@ -86,8 +80,8 @@ type TxPool struct {
func NewTxPool(config *ChainConfig, eventMux *event.TypeMux, currentStateFn stateFn, gasLimitFn func() *big.Int) *TxPool {
pool := &TxPool{
config: config,
pending: make(map[common.Address]TxList),
queue: make(map[common.Address]TxList),
pending: make(map[common.Address]*txList),
queue: make(map[common.Address]*txList),
all: make(map[common.Hash]*types.Transaction),
eventMux: eventMux,
currentState: currentStateFn,
@ -125,7 +119,7 @@ func (pool *TxPool) eventLoop() {
pool.minGasPrice = ev.Price
pool.mu.Unlock()
case RemovedTransactionEvent:
pool.AddTransactions(ev.Txs)
pool.AddBatch(ev.Txs)
}
}
}
@ -133,12 +127,12 @@ func (pool *TxPool) eventLoop() {
func (pool *TxPool) resetState() {
currentState, err := pool.currentState()
if err != nil {
glog.V(logger.Info).Infoln("failed to get current state: %v", err)
glog.V(logger.Error).Infof("Failed to get current state: %v", err)
return
}
managedState := state.ManageState(currentState)
if err != nil {
glog.V(logger.Info).Infoln("failed to get managed state: %v", err)
glog.V(logger.Error).Infof("Failed to get managed state: %v", err)
return
}
pool.pendingState = managedState
@ -147,22 +141,15 @@ func (pool *TxPool) resetState() {
// any transactions that have been included in the block or
// have been invalidated because of another transaction (e.g.
// higher gas price)
pool.validatePool()
pool.demoteUnexecutables()
// Loop over the pending transactions and base the nonce of the new
// pending transaction set.
for addr, txs := range pool.pending {
// Set the nonce. Transaction nonce can never be lower
// than the state nonce; validatePool took care of that.
for nonce, _ := range txs {
if pool.pendingState.GetNonce(addr) <= nonce {
pool.pendingState.SetNonce(addr, nonce+1)
}
}
// Update all accounts to the latest known pending nonce
for addr, list := range pool.pending {
pool.pendingState.SetNonce(addr, list.last+1)
}
// Check the queue and move transactions over to the pending if possible
// or remove those that have become invalid
pool.checkQueue()
pool.promoteExecutables()
}
func (pool *TxPool) Stop() {
@ -178,46 +165,58 @@ func (pool *TxPool) State() *state.ManagedState {
return pool.pendingState
}
// Stats retrieves the current pool stats, namely the number of pending and the
// number of queued (non-executable) transactions.
func (pool *TxPool) Stats() (pending int, queued int) {
pool.mu.RLock()
defer pool.mu.RUnlock()
for _, txs := range pool.pending {
pending += len(txs)
for _, list := range pool.pending {
pending += list.Len()
}
for _, txs := range pool.queue {
queued += len(txs)
for _, list := range pool.queue {
queued += list.Len()
}
return
}
// Content retrieves the data content of the transaction pool, returning all the
// pending as well as queued transactions, grouped by account and nonce.
func (pool *TxPool) Content() (map[common.Address]TxList, map[common.Address]TxList) {
// pending as well as queued transactions, grouped by account and sorted by nonce.
func (pool *TxPool) Content() (map[common.Address]types.Transactions, map[common.Address]types.Transactions) {
pool.mu.RLock()
defer pool.mu.RUnlock()
// Retrieve all the pending transactions and sort by account and by nonce
pending := make(map[common.Address]TxList)
for addr, txs := range pool.pending {
copy := make(TxList)
for nonce, tx := range txs {
copy[nonce] = tx
}
pending[addr] = copy
pending := make(map[common.Address]types.Transactions)
for addr, list := range pool.pending {
pending[addr] = list.Flatten()
}
// Retrieve all the queued transactions and sort by account and by nonce
queued := make(map[common.Address]TxList)
for addr, txs := range pool.queue {
copy := make(TxList)
for nonce, tx := range txs {
copy[nonce] = tx
}
queued[addr] = copy
queued := make(map[common.Address]types.Transactions)
for addr, list := range pool.queue {
queued[addr] = list.Flatten()
}
return pending, queued
}
// Pending retrieves all currently processable transactions, groupped by origin
// account and sorted by nonce. The returned transaction set is a copy and can be
// freely modified by calling code.
func (pool *TxPool) Pending() map[common.Address]types.Transactions {
pool.mu.Lock()
defer pool.mu.Unlock()
// check queue first
pool.promoteExecutables()
// invalidate any txs
pool.demoteUnexecutables()
pending := make(map[common.Address]types.Transactions)
for addr, list := range pool.pending {
pending[addr] = list.Flatten()
}
return pending
}
// SetLocal marks a transaction as local, skipping gas price
// check against local miner minimum in the future
func (pool *TxPool) SetLocal(tx *types.Transaction) {
@ -283,340 +282,268 @@ func (pool *TxPool) validateTx(tx *types.Transaction) error {
return nil
}
// validate and queue transactions.
func (self *TxPool) add(tx *types.Transaction) error {
// add validates a transaction and inserts it into the non-executable queue for
// later pending promotion and execution.
func (pool *TxPool) add(tx *types.Transaction) error {
// If the transaction is alreayd known, discard it
hash := tx.Hash()
if self.all[hash] != nil {
return fmt.Errorf("Known transaction (%x)", hash[:4])
if pool.all[hash] != nil {
return fmt.Errorf("Known transaction: %x", hash[:4])
}
err := self.validateTx(tx)
if err != nil {
// Otherwise ensure basic validation passes nd queue it up
if err := pool.validateTx(tx); err != nil {
return err
}
self.queueTx(hash, tx)
pool.enqueueTx(hash, tx)
// Print a log message if low enough level is set
if glog.V(logger.Debug) {
var toname string
rcpt := "[NEW_CONTRACT]"
if to := tx.To(); to != nil {
toname = common.Bytes2Hex(to[:4])
} else {
toname = "[NEW_CONTRACT]"
rcpt = common.Bytes2Hex(to[:4])
}
// we can ignore the error here because From is
// verified in ValidateTransaction.
f, _ := tx.From()
from := common.Bytes2Hex(f[:4])
glog.Infof("(t) %x => %s (%v) %x\n", from, toname, tx.Value, hash)
from, _ := tx.From() // from already verified during tx validation
glog.Infof("(t) 0x%x => %s (%v) %x\n", from[:4], rcpt, tx.Value, hash)
}
return nil
}
// queueTx will queue an unknown transaction.
func (self *TxPool) queueTx(hash common.Hash, tx *types.Transaction) {
addr, _ := tx.From() // already validated
if self.queue[addr] == nil {
self.queue[addr] = make(TxList)
// enqueueTx inserts a new transction into the non-executable transaction queue.
//
// Note, this method assumes the pool lock is held!
func (pool *TxPool) enqueueTx(hash common.Hash, tx *types.Transaction) {
// Try to insert the transaction into the future queue
from, _ := tx.From() // already validated
if pool.queue[from] == nil {
pool.queue[from] = newTxList(false)
}
// If the nonce is already used, discard the lower priced transaction
nonce := tx.Nonce()
if old, ok := self.queue[addr][nonce]; ok {
if old.GasPrice().Cmp(tx.GasPrice()) >= 0 {
return // Old was better, discard this
}
delete(self.all, old.Hash()) // New is better, drop and overwrite old one
inserted, old := pool.queue[from].Add(tx)
if !inserted {
return // An older transaction was better, discard this
}
self.queue[addr][nonce] = tx
self.all[hash] = tx
// Discard any previous transaction and mark this
if old != nil {
delete(pool.all, old.Hash())
}
pool.all[hash] = tx
}
// addTx will moves a transaction from the non-executable queue to the pending
// (processable) list of transactions.
func (pool *TxPool) addTx(addr common.Address, tx *types.Transaction) {
// promoteTx adds a transaction to the pending (processable) list of transactions.
//
// Note, this method assumes the pool lock is held!
func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.Transaction) {
// Init delayed since tx pool could have been started before any state sync
if pool.pendingState == nil {
pool.resetState()
}
// If the nonce is already used, discard the lower priced transaction
hash, nonce := tx.Hash(), tx.Nonce()
if old, ok := pool.pending[addr][nonce]; ok {
oldHash := old.Hash()
switch {
case oldHash == hash: // Nothing changed, noop
return
case old.GasPrice().Cmp(tx.GasPrice()) >= 0: // Old was better, discard this
delete(pool.all, hash)
return
default: // New is better, discard old
delete(pool.all, oldHash)
}
// Try to insert the transaction into the pending queue
if pool.pending[addr] == nil {
pool.pending[addr] = newTxList(true)
}
// The transaction is being kept, insert it into the tx pool
if _, ok := pool.pending[addr]; !ok {
pool.pending[addr] = make(TxList)
list := pool.pending[addr]
inserted, old := list.Add(tx)
if !inserted {
// An older transaction was better, discard this
delete(pool.all, hash)
return
}
pool.pending[addr][nonce] = tx
pool.all[hash] = tx
// Otherwise discard any previous transaction and mark this
if old != nil {
delete(pool.all, old.Hash())
}
pool.all[hash] = tx // Failsafe to work around direct pending inserts (tests)
// Increment the nonce on the pending state. This can only happen if
// the nonce is +1 to the previous one.
pool.pendingState.SetNonce(addr, nonce+1)
// Notify the subscribers. This event is posted in a goroutine
// because it's possible that somewhere during the post "Remove transaction"
// gets called which will then wait for the global tx pool lock and deadlock.
// Set the potentially new pending nonce and notify any subsystems of the new tx
pool.pendingState.SetNonce(addr, list.last+1)
go pool.eventMux.Post(TxPreEvent{tx})
}
// Add queues a single transaction in the pool if it is valid.
func (self *TxPool) Add(tx *types.Transaction) error {
self.mu.Lock()
defer self.mu.Unlock()
func (pool *TxPool) Add(tx *types.Transaction) error {
pool.mu.Lock()
defer pool.mu.Unlock()
if err := self.add(tx); err != nil {
if err := pool.add(tx); err != nil {
return err
}
self.checkQueue()
pool.promoteExecutables()
return nil
}
// AddTransactions attempts to queue all valid transactions in txs.
func (self *TxPool) AddTransactions(txs []*types.Transaction) {
self.mu.Lock()
defer self.mu.Unlock()
// AddBatch attempts to queue a batch of transactions.
func (pool *TxPool) AddBatch(txs []*types.Transaction) {
pool.mu.Lock()
defer pool.mu.Unlock()
for _, tx := range txs {
if err := self.add(tx); err != nil {
if err := pool.add(tx); err != nil {
glog.V(logger.Debug).Infoln("tx error:", err)
} else {
h := tx.Hash()
glog.V(logger.Debug).Infof("tx %x\n", h[:4])
}
}
// check and validate the queue
self.checkQueue()
pool.promoteExecutables()
}
// GetTransaction returns a transaction if it is contained in the pool
// Get returns a transaction if it is contained in the pool
// and nil otherwise.
func (tp *TxPool) GetTransaction(hash common.Hash) *types.Transaction {
tp.mu.RLock()
defer tp.mu.RUnlock()
func (pool *TxPool) Get(hash common.Hash) *types.Transaction {
pool.mu.RLock()
defer pool.mu.RUnlock()
return tp.all[hash]
return pool.all[hash]
}
// GetTransactions returns all currently processable transactions.
// The returned slice may be modified by the caller.
func (self *TxPool) GetTransactions() types.Transactions {
self.mu.Lock()
defer self.mu.Unlock()
// check queue first
self.checkQueue()
// invalidate any txs
self.validatePool()
count := 0
for _, txs := range self.pending {
count += len(txs)
}
pending := make(types.Transactions, 0, count)
for _, txs := range self.pending {
for _, tx := range txs {
pending = append(pending, tx)
}
}
return pending
}
// RemoveTransactions removes all given transactions from the pool.
func (self *TxPool) RemoveTransactions(txs types.Transactions) {
self.mu.Lock()
defer self.mu.Unlock()
for _, tx := range txs {
self.removeTx(tx.Hash())
}
}
// RemoveTx removes the transaction with the given hash from the pool.
func (pool *TxPool) RemoveTx(hash common.Hash) {
// Remove removes the transaction with the given hash from the pool.
func (pool *TxPool) Remove(hash common.Hash) {
pool.mu.Lock()
defer pool.mu.Unlock()
pool.removeTx(hash)
}
// RemoveBatch removes all given transactions from the pool.
func (pool *TxPool) RemoveBatch(txs types.Transactions) {
pool.mu.Lock()
defer pool.mu.Unlock()
for _, tx := range txs {
pool.removeTx(tx.Hash())
}
}
// removeTx iterates removes a single transaction from the queue, moving all
// subsequent transactions back to the future queue.
func (pool *TxPool) removeTx(hash common.Hash) {
// Fetch the transaction we wish to delete
tx, ok := pool.all[hash]
if !ok {
return
}
addr, _ := tx.From()
addr, _ := tx.From() // already validated during insertion
// Remove it from all internal lists
// Remove it from the list of known transactions
delete(pool.all, hash)
delete(pool.pending[addr], tx.Nonce())
if len(pool.pending[addr]) == 0 {
delete(pool.pending, addr)
// Remove the transaction from the pending lists and reset the account nonce
if pending := pool.pending[addr]; pending != nil {
if removed, invalids := pending.Remove(tx); removed {
// If no more transactions are left, remove the list and reset the nonce
if pending.Empty() {
delete(pool.pending, addr)
pool.pendingState.SetNonce(addr, tx.Nonce())
} else {
// Otherwise update the nonce and postpone any invalidated transactions
pool.pendingState.SetNonce(addr, pending.last)
for _, tx := range invalids {
pool.enqueueTx(tx.Hash(), tx)
}
}
}
}
delete(pool.queue[addr], tx.Nonce())
if len(pool.queue[addr]) == 0 {
delete(pool.queue, addr)
// Transaction is in the future queue
if future := pool.queue[addr]; future != nil {
future.Remove(tx)
if future.Empty() {
delete(pool.queue, addr)
}
}
}
// checkQueue moves transactions that have become processable from the pool's
// queue to the set of pending transactions.
func (pool *TxPool) checkQueue() {
// promoteExecutables moves transactions that have become processable from the
// future queue to the set of pending transactions. During this process, all
// invalidated transactions (low nonce, low balance) are deleted.
func (pool *TxPool) promoteExecutables() {
// Init delayed since tx pool could have been started before any state sync
if pool.pendingState == nil {
pool.resetState()
}
var promote txQueue
for address, txs := range pool.queue {
currentState, err := pool.currentState()
if err != nil {
glog.Errorf("could not get current state: %v", err)
return
// Retrieve the current state to allow nonce and balance checking
state, err := pool.currentState()
if err != nil {
glog.Errorf("Could not get current state: %v", err)
return
}
// Iterate over all accounts and promote any executable transactions
for addr, list := range pool.queue {
// Drop all transactions that are deemed too old (low nonce)
for _, tx := range list.Forward(state.GetNonce(addr)) {
if glog.V(logger.Core) {
glog.Infof("Removed old queued transaction: %v", tx)
}
delete(pool.all, tx.Hash())
}
balance := currentState.GetBalance(address)
var (
guessedNonce = pool.pendingState.GetNonce(address) // nonce currently kept by the tx pool (pending state)
trueNonce = currentState.GetNonce(address) // nonce known by the last state
)
promote = promote[:0]
for nonce, tx := range txs {
// Drop processed or out of fund transactions
if nonce < trueNonce || balance.Cmp(tx.Cost()) < 0 {
if glog.V(logger.Core) {
glog.Infof("removed tx (%v) from pool queue: low tx nonce or out of funds\n", tx)
}
delete(txs, nonce)
delete(pool.all, tx.Hash())
continue
// Drop all transactions that are too costly (low balance)
drops, _ := list.Filter(state.GetBalance(addr))
for _, tx := range drops {
if glog.V(logger.Core) {
glog.Infof("Removed unpayable queued transaction: %v", tx)
}
// Collect the remaining transactions for the next pass.
promote = append(promote, txQueueEntry{address, tx})
delete(pool.all, tx.Hash())
}
// Find the next consecutive nonce range starting at the current account nonce,
// pushing the guessed nonce forward if we add consecutive transactions.
sort.Sort(promote)
for i, entry := range promote {
// If we reached a gap in the nonces, enforce transaction limit and stop
if entry.Nonce() > guessedNonce {
if len(promote)-i > maxQueued {
if glog.V(logger.Debug) {
glog.Infof("Queued tx limit exceeded for %s. Tx %s removed\n", common.PP(address[:]), common.PP(entry.Hash().Bytes()))
}
for _, drop := range promote[i+maxQueued:] {
delete(txs, drop.Nonce())
delete(pool.all, drop.Hash())
}
}
break
// Gather all executable transactions and promote them
for _, tx := range list.Ready(pool.pendingState.GetNonce(addr)) {
if glog.V(logger.Core) {
glog.Infof("Promoting queued transaction: %v", tx)
}
// Otherwise promote the transaction and move the guess nonce if needed
pool.addTx(address, entry.Transaction)
delete(txs, entry.Nonce())
if entry.Nonce() == guessedNonce {
guessedNonce++
pool.promoteTx(addr, tx.Hash(), tx)
}
// Drop all transactions over the allowed limit
for _, tx := range list.Cap(maxQueued) {
if glog.V(logger.Core) {
glog.Infof("Removed cap-exceeding queued transaction: %v", tx)
}
delete(pool.all, tx.Hash())
}
// Delete the entire queue entry if it became empty.
if len(txs) == 0 {
delete(pool.queue, address)
if list.Empty() {
delete(pool.queue, addr)
}
}
}
// validatePool removes invalid and processed transactions from the main pool.
// If a transaction is removed for being invalid (e.g. out of funds), all sub-
// sequent (Still valid) transactions are moved back into the future queue. This
// is important to prevent a drained account from DOSing the network with non
// executable transactions.
func (pool *TxPool) validatePool() {
// demoteUnexecutables removes invalid and processed transactions from the pools
// executable/pending queue and any subsequent transactions that become unexecutable
// are moved back into the future queue.
func (pool *TxPool) demoteUnexecutables() {
// Retrieve the current state to allow nonce and balance checking
state, err := pool.currentState()
if err != nil {
glog.V(logger.Info).Infoln("failed to get current state: %v", err)
return
}
balanceCache := make(map[common.Address]*big.Int)
// Iterate over all accounts and demote any non-executable transactions
for addr, list := range pool.pending {
nonce := state.GetNonce(addr)
// Clean up the pending pool, accumulating invalid nonces
gaps := make(map[common.Address]uint64)
for addr, txs := range pool.pending {
for nonce, tx := range txs {
// Perform light nonce and balance validation
balance := balanceCache[addr]
if balance == nil {
balance = state.GetBalance(addr)
balanceCache[addr] = balance
}
if past := state.GetNonce(addr) > nonce; past || balance.Cmp(tx.Cost()) < 0 {
// Remove an already past it invalidated transaction
if glog.V(logger.Core) {
glog.Infof("removed tx (%v) from pool: low tx nonce or out of funds\n", tx)
}
delete(pool.pending[addr], nonce)
if len(pool.pending[addr]) == 0 {
delete(pool.pending, addr)
}
delete(pool.all, tx.Hash())
// Track the smallest invalid nonce to postpone subsequent transactions
if !past {
if prev, ok := gaps[addr]; !ok || nonce < prev {
gaps[addr] = nonce
}
}
// Drop all transactions that are deemed too old (low nonce)
for _, tx := range list.Forward(nonce) {
if glog.V(logger.Core) {
glog.Infof("Removed old pending transaction: %v", tx)
}
delete(pool.all, tx.Hash())
}
}
// Move all transactions after a gap back to the future queue
if len(gaps) > 0 {
for addr, txs := range pool.pending {
for nonce, tx := range txs {
if gap, ok := gaps[addr]; ok && nonce >= gap {
if glog.V(logger.Core) {
glog.Infof("postponed tx (%v) due to introduced gap\n", tx)
}
delete(pool.pending[addr], nonce)
if len(pool.pending[addr]) == 0 {
delete(pool.pending, addr)
}
pool.queueTx(tx.Hash(), tx)
}
// Drop all transactions that are too costly (low balance), and queue any invalids back for later
drops, invalids := list.Filter(state.GetBalance(addr))
for _, tx := range drops {
if glog.V(logger.Core) {
glog.Infof("Removed unpayable pending transaction: %v", tx)
}
delete(pool.all, tx.Hash())
}
for _, tx := range invalids {
if glog.V(logger.Core) {
glog.Infof("Demoting pending transaction: %v", tx)
}
pool.enqueueTx(tx.Hash(), tx)
}
// Delete the entire queue entry if it became empty.
if list.Empty() {
delete(pool.pending, addr)
}
}
}
type txQueue []txQueueEntry
type txQueueEntry struct {
addr common.Address
*types.Transaction
}
func (q txQueue) Len() int { return len(q) }
func (q txQueue) Swap(i, j int) { q[i], q[j] = q[j], q[i] }
func (q txQueue) Less(i, j int) bool { return q[i].Nonce() < q[j].Nonce() }
// txSet represents a set of transaction hashes in which entries
// are automatically dropped after txSetDuration time
type txSet struct {