cmd, core, eth: configurable txpool parameters
This commit is contained in:
101
core/tx_pool.go
101
core/tx_pool.go
@ -48,14 +48,8 @@ var (
|
||||
)
|
||||
|
||||
var (
|
||||
minPendingPerAccount = uint64(16) // Min number of guaranteed transaction slots per address
|
||||
maxPendingTotal = uint64(4096) // Max limit of pending transactions from all accounts (soft)
|
||||
maxQueuedPerAccount = uint64(64) // Max limit of queued transactions per address
|
||||
maxQueuedTotal = uint64(1024) // Max limit of queued transactions from all accounts
|
||||
maxQueuedLifetime = 3 * time.Hour // Max amount of time transactions from idle accounts are queued
|
||||
minPriceBumpPercent = int64(10) // Minimum price bump needed to replace an old transaction
|
||||
evictionInterval = time.Minute // Time interval to check for evictable transactions
|
||||
statsReportInterval = 8 * time.Second // Time interval to report transaction pool stats
|
||||
evictionInterval = time.Minute // Time interval to check for evictable transactions
|
||||
statsReportInterval = 8 * time.Second // Time interval to report transaction pool stats
|
||||
)
|
||||
|
||||
var (
|
||||
@ -78,6 +72,48 @@ var (
|
||||
|
||||
type stateFn func() (*state.StateDB, error)
|
||||
|
||||
// TxPoolConfig are the configuration parameters of the transaction pool.
|
||||
type TxPoolConfig struct {
|
||||
PriceLimit uint64 // Minimum gas price to enforce for acceptance into the pool
|
||||
PriceBump uint64 // Minimum price bump percentage to replace an already existing transaction (nonce)
|
||||
|
||||
AccountSlots uint64 // Minimum number of executable transaction slots guaranteed per account
|
||||
GlobalSlots uint64 // Maximum number of executable transaction slots for all accounts
|
||||
AccountQueue uint64 // Maximum number of non-executable transaction slots permitted per account
|
||||
GlobalQueue uint64 // Maximum number of non-executable transaction slots for all accounts
|
||||
|
||||
Lifetime time.Duration // Maximum amount of time non-executable transaction are queued
|
||||
}
|
||||
|
||||
// DefaultTxPoolConfig contains the default configurations for the transaction
|
||||
// pool.
|
||||
var DefaultTxPoolConfig = TxPoolConfig{
|
||||
PriceLimit: 1,
|
||||
PriceBump: 10,
|
||||
|
||||
AccountSlots: 16,
|
||||
GlobalSlots: 4096,
|
||||
AccountQueue: 64,
|
||||
GlobalQueue: 1024,
|
||||
|
||||
Lifetime: 3 * time.Hour,
|
||||
}
|
||||
|
||||
// sanitize checks the provided user configurations and changes anything that's
|
||||
// unreasonable or unworkable.
|
||||
func (config *TxPoolConfig) sanitize() TxPoolConfig {
|
||||
conf := *config
|
||||
if conf.PriceLimit < 1 {
|
||||
log.Warn("Sanitizing invalid txpool price limit", "provided", conf.PriceLimit, "updated", DefaultTxPoolConfig.PriceLimit)
|
||||
conf.PriceLimit = DefaultTxPoolConfig.PriceLimit
|
||||
}
|
||||
if conf.PriceBump < 1 {
|
||||
log.Warn("Sanitizing invalid txpool price bump", "provided", conf.PriceBump, "updated", DefaultTxPoolConfig.PriceBump)
|
||||
conf.PriceBump = DefaultTxPoolConfig.PriceBump
|
||||
}
|
||||
return conf
|
||||
}
|
||||
|
||||
// TxPool contains all currently known transactions. Transactions
|
||||
// enter the pool when they are received from the network or submitted
|
||||
// locally. They exit the pool when they are included in the blockchain.
|
||||
@ -86,7 +122,8 @@ type stateFn func() (*state.StateDB, error)
|
||||
// current state) and future transactions. Transactions move between those
|
||||
// two states over time as they are received and processed.
|
||||
type TxPool struct {
|
||||
config *params.ChainConfig
|
||||
config TxPoolConfig
|
||||
chainconfig *params.ChainConfig
|
||||
currentState stateFn // The state function which will allow us to do some pre checks
|
||||
pendingState *state.ManagedState
|
||||
gasLimit func() *big.Int // The current gas limit function callback
|
||||
@ -109,10 +146,17 @@ type TxPool struct {
|
||||
homestead bool
|
||||
}
|
||||
|
||||
func NewTxPool(config *params.ChainConfig, eventMux *event.TypeMux, currentStateFn stateFn, gasLimitFn func() *big.Int) *TxPool {
|
||||
// NewTxPool creates a new transaction pool to gather, sort and filter inbound
|
||||
// trnsactions from the network.
|
||||
func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, eventMux *event.TypeMux, currentStateFn stateFn, gasLimitFn func() *big.Int) *TxPool {
|
||||
// Sanitize the input to ensure no vulnerable gas prices are set
|
||||
config = (&config).sanitize()
|
||||
|
||||
// Create the transaction pool with its initial settings
|
||||
pool := &TxPool{
|
||||
config: config,
|
||||
signer: types.NewEIP155Signer(config.ChainId),
|
||||
chainconfig: chainconfig,
|
||||
signer: types.NewEIP155Signer(chainconfig.ChainId),
|
||||
pending: make(map[common.Address]*txList),
|
||||
queue: make(map[common.Address]*txList),
|
||||
beats: make(map[common.Address]time.Time),
|
||||
@ -120,7 +164,7 @@ func NewTxPool(config *params.ChainConfig, eventMux *event.TypeMux, currentState
|
||||
eventMux: eventMux,
|
||||
currentState: currentStateFn,
|
||||
gasLimit: gasLimitFn,
|
||||
gasPrice: big.NewInt(1),
|
||||
gasPrice: new(big.Int).SetUint64(config.PriceLimit),
|
||||
pendingState: nil,
|
||||
locals: newTxSet(),
|
||||
events: eventMux.Subscribe(ChainHeadEvent{}, RemovedTransactionEvent{}),
|
||||
@ -129,6 +173,7 @@ func NewTxPool(config *params.ChainConfig, eventMux *event.TypeMux, currentState
|
||||
pool.priced = newTxPricedList(&pool.all)
|
||||
pool.resetState()
|
||||
|
||||
// Start the various events loops and return
|
||||
pool.wg.Add(2)
|
||||
go pool.eventLoop()
|
||||
go pool.expirationLoop()
|
||||
@ -159,7 +204,7 @@ func (pool *TxPool) eventLoop() {
|
||||
case ChainHeadEvent:
|
||||
pool.mu.Lock()
|
||||
if ev.Block != nil {
|
||||
if pool.config.IsHomestead(ev.Block.Number()) {
|
||||
if pool.chainconfig.IsHomestead(ev.Block.Number()) {
|
||||
pool.homestead = true
|
||||
}
|
||||
}
|
||||
@ -388,7 +433,7 @@ func (pool *TxPool) add(tx *types.Transaction) (bool, error) {
|
||||
return false, err
|
||||
}
|
||||
// If the transaction pool is full, discard underpriced transactions
|
||||
if uint64(len(pool.all)) >= maxPendingTotal+maxQueuedTotal {
|
||||
if uint64(len(pool.all)) >= pool.config.GlobalSlots+pool.config.GlobalQueue {
|
||||
// If the new transaction is underpriced, don't accept it
|
||||
if pool.priced.Underpriced(tx, pool.locals) {
|
||||
log.Trace("Discarding underpriced transaction", "hash", hash, "price", tx.GasPrice())
|
||||
@ -396,7 +441,7 @@ func (pool *TxPool) add(tx *types.Transaction) (bool, error) {
|
||||
return false, ErrUnderpriced
|
||||
}
|
||||
// New transaction is better than our worse ones, make room for it
|
||||
drop := pool.priced.Discard(len(pool.all)-int(maxPendingTotal+maxQueuedTotal-1), pool.locals)
|
||||
drop := pool.priced.Discard(len(pool.all)-int(pool.config.GlobalSlots+pool.config.GlobalQueue-1), pool.locals)
|
||||
for _, tx := range drop {
|
||||
log.Trace("Discarding freshly underpriced transaction", "hash", tx.Hash(), "price", tx.GasPrice())
|
||||
underpricedTxCounter.Inc(1)
|
||||
@ -407,7 +452,7 @@ func (pool *TxPool) add(tx *types.Transaction) (bool, error) {
|
||||
from, _ := types.Sender(pool.signer, tx) // already validated
|
||||
if list := pool.pending[from]; list != nil && list.Overlaps(tx) {
|
||||
// Nonce already pending, check if required price bump is met
|
||||
inserted, old := list.Add(tx)
|
||||
inserted, old := list.Add(tx, pool.config.PriceBump)
|
||||
if !inserted {
|
||||
pendingDiscardCounter.Inc(1)
|
||||
return false, ErrReplaceUnderpriced
|
||||
@ -442,7 +487,7 @@ func (pool *TxPool) enqueueTx(hash common.Hash, tx *types.Transaction) (bool, er
|
||||
if pool.queue[from] == nil {
|
||||
pool.queue[from] = newTxList(false)
|
||||
}
|
||||
inserted, old := pool.queue[from].Add(tx)
|
||||
inserted, old := pool.queue[from].Add(tx, pool.config.PriceBump)
|
||||
if !inserted {
|
||||
// An older transaction was better, discard this
|
||||
queuedDiscardCounter.Inc(1)
|
||||
@ -469,7 +514,7 @@ func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.T
|
||||
}
|
||||
list := pool.pending[addr]
|
||||
|
||||
inserted, old := list.Add(tx)
|
||||
inserted, old := list.Add(tx, pool.config.PriceBump)
|
||||
if !inserted {
|
||||
// An older transaction was better, discard this
|
||||
delete(pool.all, hash)
|
||||
@ -644,7 +689,7 @@ func (pool *TxPool) promoteExecutables(state *state.StateDB) {
|
||||
pool.promoteTx(addr, hash, tx)
|
||||
}
|
||||
// Drop all transactions over the allowed limit
|
||||
for _, tx := range list.Cap(int(maxQueuedPerAccount)) {
|
||||
for _, tx := range list.Cap(int(pool.config.AccountQueue)) {
|
||||
hash := tx.Hash()
|
||||
log.Trace("Removed cap-exceeding queued transaction", "hash", hash)
|
||||
delete(pool.all, hash)
|
||||
@ -663,13 +708,13 @@ func (pool *TxPool) promoteExecutables(state *state.StateDB) {
|
||||
for _, list := range pool.pending {
|
||||
pending += uint64(list.Len())
|
||||
}
|
||||
if pending > maxPendingTotal {
|
||||
if pending > pool.config.GlobalSlots {
|
||||
pendingBeforeCap := pending
|
||||
// Assemble a spam order to penalize large transactors first
|
||||
spammers := prque.New()
|
||||
for addr, list := range pool.pending {
|
||||
// Only evict transactions from high rollers
|
||||
if uint64(list.Len()) > minPendingPerAccount {
|
||||
if uint64(list.Len()) > pool.config.AccountSlots {
|
||||
// Skip local accounts as pools should maintain backlogs for themselves
|
||||
for _, tx := range list.txs.items {
|
||||
if !pool.locals.contains(tx.Hash()) {
|
||||
@ -681,7 +726,7 @@ func (pool *TxPool) promoteExecutables(state *state.StateDB) {
|
||||
}
|
||||
// Gradually drop transactions from offenders
|
||||
offenders := []common.Address{}
|
||||
for pending > maxPendingTotal && !spammers.Empty() {
|
||||
for pending > pool.config.GlobalSlots && !spammers.Empty() {
|
||||
// Retrieve the next offender if not local address
|
||||
offender, _ := spammers.Pop()
|
||||
offenders = append(offenders, offender.(common.Address))
|
||||
@ -692,7 +737,7 @@ func (pool *TxPool) promoteExecutables(state *state.StateDB) {
|
||||
threshold := pool.pending[offender.(common.Address)].Len()
|
||||
|
||||
// Iteratively reduce all offenders until below limit or threshold reached
|
||||
for pending > maxPendingTotal && pool.pending[offenders[len(offenders)-2]].Len() > threshold {
|
||||
for pending > pool.config.GlobalSlots && pool.pending[offenders[len(offenders)-2]].Len() > threshold {
|
||||
for i := 0; i < len(offenders)-1; i++ {
|
||||
list := pool.pending[offenders[i]]
|
||||
list.Cap(list.Len() - 1)
|
||||
@ -702,8 +747,8 @@ func (pool *TxPool) promoteExecutables(state *state.StateDB) {
|
||||
}
|
||||
}
|
||||
// If still above threshold, reduce to limit or min allowance
|
||||
if pending > maxPendingTotal && len(offenders) > 0 {
|
||||
for pending > maxPendingTotal && uint64(pool.pending[offenders[len(offenders)-1]].Len()) > minPendingPerAccount {
|
||||
if pending > pool.config.GlobalSlots && len(offenders) > 0 {
|
||||
for pending > pool.config.GlobalSlots && uint64(pool.pending[offenders[len(offenders)-1]].Len()) > pool.config.AccountSlots {
|
||||
for _, addr := range offenders {
|
||||
list := pool.pending[addr]
|
||||
list.Cap(list.Len() - 1)
|
||||
@ -714,7 +759,7 @@ func (pool *TxPool) promoteExecutables(state *state.StateDB) {
|
||||
pendingRLCounter.Inc(int64(pendingBeforeCap - pending))
|
||||
}
|
||||
// If we've queued more transactions than the hard limit, drop oldest ones
|
||||
if queued > maxQueuedTotal {
|
||||
if queued > pool.config.GlobalQueue {
|
||||
// Sort all accounts with queued transactions by heartbeat
|
||||
addresses := make(addresssByHeartbeat, 0, len(pool.queue))
|
||||
for addr := range pool.queue {
|
||||
@ -723,7 +768,7 @@ func (pool *TxPool) promoteExecutables(state *state.StateDB) {
|
||||
sort.Sort(addresses)
|
||||
|
||||
// Drop transactions until the total is below the limit
|
||||
for drop := queued - maxQueuedTotal; drop > 0; {
|
||||
for drop := queued - pool.config.GlobalQueue; drop > 0; {
|
||||
addr := addresses[len(addresses)-1]
|
||||
list := pool.queue[addr.address]
|
||||
|
||||
@ -800,7 +845,7 @@ func (pool *TxPool) expirationLoop() {
|
||||
case <-evict.C:
|
||||
pool.mu.Lock()
|
||||
for addr := range pool.queue {
|
||||
if time.Since(pool.beats[addr]) > maxQueuedLifetime {
|
||||
if time.Since(pool.beats[addr]) > pool.config.Lifetime {
|
||||
for _, tx := range pool.queue[addr].Flatten() {
|
||||
pool.removeTx(tx.Hash())
|
||||
}
|
||||
|
Reference in New Issue
Block a user