core: add upper bound on the queued transctions
This commit is contained in:
108
core/tx_pool.go
108
core/tx_pool.go
@ -20,6 +20,7 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"math/big"
|
||||
"sort"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@ -44,8 +45,11 @@ var (
|
||||
ErrNegativeValue = errors.New("Negative value")
|
||||
)
|
||||
|
||||
const (
|
||||
maxQueued = 64 // max limit of queued txs per address
|
||||
var (
|
||||
maxQueuedPerAccount = uint64(64) // Max limit of queued transactions per address
|
||||
maxQueuedInTotal = uint64(65536) // Max limit of queued transactions from all accounts
|
||||
maxQueuedLifetime = 3 * time.Hour // Max amount of time transactions from idle accounts are queued
|
||||
evictionInterval = time.Minute // Time interval to check for evictable transactions
|
||||
)
|
||||
|
||||
type stateFn func() (*state.StateDB, error)
|
||||
@ -71,8 +75,10 @@ type TxPool struct {
|
||||
pending map[common.Address]*txList // All currently processable transactions
|
||||
queue map[common.Address]*txList // Queued but non-processable transactions
|
||||
all map[common.Hash]*types.Transaction // All transactions to allow lookups
|
||||
beats map[common.Address]time.Time // Last heartbeat from each known account
|
||||
|
||||
wg sync.WaitGroup // for shutdown sync
|
||||
wg sync.WaitGroup // for shutdown sync
|
||||
quit chan struct{}
|
||||
|
||||
homestead bool
|
||||
}
|
||||
@ -83,6 +89,7 @@ func NewTxPool(config *ChainConfig, eventMux *event.TypeMux, currentStateFn stat
|
||||
pending: make(map[common.Address]*txList),
|
||||
queue: make(map[common.Address]*txList),
|
||||
all: make(map[common.Hash]*types.Transaction),
|
||||
beats: make(map[common.Address]time.Time),
|
||||
eventMux: eventMux,
|
||||
currentState: currentStateFn,
|
||||
gasLimit: gasLimitFn,
|
||||
@ -90,10 +97,12 @@ func NewTxPool(config *ChainConfig, eventMux *event.TypeMux, currentStateFn stat
|
||||
pendingState: nil,
|
||||
localTx: newTxSet(),
|
||||
events: eventMux.Subscribe(ChainHeadEvent{}, GasPriceChanged{}, RemovedTransactionEvent{}),
|
||||
quit: make(chan struct{}),
|
||||
}
|
||||
|
||||
pool.wg.Add(1)
|
||||
pool.wg.Add(2)
|
||||
go pool.eventLoop()
|
||||
go pool.expirationLoop()
|
||||
|
||||
return pool
|
||||
}
|
||||
@ -154,6 +163,7 @@ func (pool *TxPool) resetState() {
|
||||
|
||||
func (pool *TxPool) Stop() {
|
||||
pool.events.Unsubscribe()
|
||||
close(pool.quit)
|
||||
pool.wg.Wait()
|
||||
glog.V(logger.Info).Infoln("Transaction pool stopped")
|
||||
}
|
||||
@ -290,7 +300,7 @@ func (pool *TxPool) add(tx *types.Transaction) error {
|
||||
if pool.all[hash] != nil {
|
||||
return fmt.Errorf("Known transaction: %x", hash[:4])
|
||||
}
|
||||
// Otherwise ensure basic validation passes nd queue it up
|
||||
// Otherwise ensure basic validation passes and queue it up
|
||||
if err := pool.validateTx(tx); err != nil {
|
||||
return err
|
||||
}
|
||||
@ -308,7 +318,7 @@ func (pool *TxPool) add(tx *types.Transaction) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// enqueueTx inserts a new transction into the non-executable transaction queue.
|
||||
// enqueueTx inserts a new transaction into the non-executable transaction queue.
|
||||
//
|
||||
// Note, this method assumes the pool lock is held!
|
||||
func (pool *TxPool) enqueueTx(hash common.Hash, tx *types.Transaction) {
|
||||
@ -355,6 +365,7 @@ func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.T
|
||||
pool.all[hash] = tx // Failsafe to work around direct pending inserts (tests)
|
||||
|
||||
// Set the potentially new pending nonce and notify any subsystems of the new tx
|
||||
pool.beats[addr] = time.Now()
|
||||
pool.pendingState.SetNonce(addr, list.last+1)
|
||||
go pool.eventMux.Post(TxPreEvent{tx})
|
||||
}
|
||||
@ -412,8 +423,8 @@ func (pool *TxPool) RemoveBatch(txs types.Transactions) {
|
||||
}
|
||||
}
|
||||
|
||||
// removeTx iterates removes a single transaction from the queue, moving all
|
||||
// subsequent transactions back to the future queue.
|
||||
// removeTx removes a single transaction from the queue, moving all subsequent
|
||||
// transactions back to the future queue.
|
||||
func (pool *TxPool) removeTx(hash common.Hash) {
|
||||
// Fetch the transaction we wish to delete
|
||||
tx, ok := pool.all[hash]
|
||||
@ -431,6 +442,8 @@ func (pool *TxPool) removeTx(hash common.Hash) {
|
||||
// If no more transactions are left, remove the list and reset the nonce
|
||||
if pending.Empty() {
|
||||
delete(pool.pending, addr)
|
||||
delete(pool.beats, addr)
|
||||
|
||||
pool.pendingState.SetNonce(addr, tx.Nonce())
|
||||
} else {
|
||||
// Otherwise update the nonce and postpone any invalidated transactions
|
||||
@ -465,6 +478,8 @@ func (pool *TxPool) promoteExecutables() {
|
||||
return
|
||||
}
|
||||
// Iterate over all accounts and promote any executable transactions
|
||||
queued := uint64(0)
|
||||
|
||||
for addr, list := range pool.queue {
|
||||
// Drop all transactions that are deemed too old (low nonce)
|
||||
for _, tx := range list.Forward(state.GetNonce(addr)) {
|
||||
@ -489,17 +504,51 @@ func (pool *TxPool) promoteExecutables() {
|
||||
pool.promoteTx(addr, tx.Hash(), tx)
|
||||
}
|
||||
// Drop all transactions over the allowed limit
|
||||
for _, tx := range list.Cap(maxQueued) {
|
||||
for _, tx := range list.Cap(int(maxQueuedPerAccount)) {
|
||||
if glog.V(logger.Core) {
|
||||
glog.Infof("Removed cap-exceeding queued transaction: %v", tx)
|
||||
}
|
||||
delete(pool.all, tx.Hash())
|
||||
}
|
||||
queued += uint64(list.Len())
|
||||
|
||||
// Delete the entire queue entry if it became empty.
|
||||
if list.Empty() {
|
||||
delete(pool.queue, addr)
|
||||
}
|
||||
}
|
||||
// If we've queued more transactions than the hard limit, drop oldest ones
|
||||
if queued > maxQueuedInTotal {
|
||||
// Sort all accounts with queued transactions by heartbeat
|
||||
addresses := make(addresssByHeartbeat, 0, len(pool.queue))
|
||||
for addr, _ := range pool.queue {
|
||||
addresses = append(addresses, addressByHeartbeat{addr, pool.beats[addr]})
|
||||
}
|
||||
sort.Sort(addresses)
|
||||
|
||||
// Drop transactions until the total is below the limit
|
||||
for drop := queued - maxQueuedInTotal; drop > 0; {
|
||||
addr := addresses[len(addresses)-1]
|
||||
list := pool.queue[addr.address]
|
||||
|
||||
addresses = addresses[:len(addresses)-1]
|
||||
|
||||
// Drop all transactions if they are less than the overflow
|
||||
if size := uint64(list.Len()); size <= drop {
|
||||
for _, tx := range list.Flatten() {
|
||||
pool.removeTx(tx.Hash())
|
||||
}
|
||||
drop -= size
|
||||
continue
|
||||
}
|
||||
// Otherwise drop only last few transactions
|
||||
txs := list.Flatten()
|
||||
for i := len(txs) - 1; i >= 0 && drop > 0; i-- {
|
||||
pool.removeTx(txs[i].Hash())
|
||||
drop--
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// demoteUnexecutables removes invalid and processed transactions from the pools
|
||||
@ -540,10 +589,51 @@ func (pool *TxPool) demoteUnexecutables() {
|
||||
// Delete the entire queue entry if it became empty.
|
||||
if list.Empty() {
|
||||
delete(pool.pending, addr)
|
||||
delete(pool.beats, addr)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// expirationLoop is a loop that periodically iterates over all accounts with
|
||||
// queued transactions and drop all that have been inactive for a prolonged amount
|
||||
// of time.
|
||||
func (pool *TxPool) expirationLoop() {
|
||||
defer pool.wg.Done()
|
||||
|
||||
evict := time.NewTicker(evictionInterval)
|
||||
defer evict.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-evict.C:
|
||||
pool.mu.Lock()
|
||||
for addr := range pool.queue {
|
||||
if time.Since(pool.beats[addr]) > maxQueuedLifetime {
|
||||
for _, tx := range pool.queue[addr].Flatten() {
|
||||
pool.removeTx(tx.Hash())
|
||||
}
|
||||
}
|
||||
}
|
||||
pool.mu.Unlock()
|
||||
|
||||
case <-pool.quit:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// addressByHeartbeat is an account address tagged with its last activity timestamp.
|
||||
type addressByHeartbeat struct {
|
||||
address common.Address
|
||||
heartbeat time.Time
|
||||
}
|
||||
|
||||
type addresssByHeartbeat []addressByHeartbeat
|
||||
|
||||
func (a addresssByHeartbeat) Len() int { return len(a) }
|
||||
func (a addresssByHeartbeat) Less(i, j int) bool { return a[i].heartbeat.Before(a[j].heartbeat) }
|
||||
func (a addresssByHeartbeat) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
||||
|
||||
// txSet represents a set of transaction hashes in which entries
|
||||
// are automatically dropped after txSetDuration time
|
||||
type txSet struct {
|
||||
|
Reference in New Issue
Block a user