@ -162,15 +162,10 @@ func (pool *TxPool) eventLoop() {
|
||||
func (pool *TxPool) resetState() {
|
||||
currentState, err := pool.currentState()
|
||||
if err != nil {
|
||||
log.Error(fmt.Sprintf("Failed to get current state: %v", err))
|
||||
log.Error("Failed reset txpool state", "err", err)
|
||||
return
|
||||
}
|
||||
managedState := state.ManageState(currentState)
|
||||
if err != nil {
|
||||
log.Error(fmt.Sprintf("Failed to get managed state: %v", err))
|
||||
return
|
||||
}
|
||||
pool.pendingState = managedState
|
||||
pool.pendingState = state.ManageState(currentState)
|
||||
|
||||
// validate the pool of pending transactions, this will remove
|
||||
// any transactions that have been included in the block or
|
||||
@ -192,7 +187,8 @@ func (pool *TxPool) Stop() {
|
||||
pool.events.Unsubscribe()
|
||||
close(pool.quit)
|
||||
pool.wg.Wait()
|
||||
log.Info(fmt.Sprint("Transaction pool stopped"))
|
||||
|
||||
log.Info("Transaction pool stopped")
|
||||
}
|
||||
|
||||
func (pool *TxPool) State() *state.ManagedState {
|
||||
@ -323,24 +319,19 @@ func (pool *TxPool) add(tx *types.Transaction) error {
|
||||
// If the transaction is already known, discard it
|
||||
hash := tx.Hash()
|
||||
if pool.all[hash] != nil {
|
||||
return fmt.Errorf("Known transaction: %x", hash[:4])
|
||||
log.Trace("Discarding already known transaction", "hash", hash)
|
||||
return fmt.Errorf("known transaction: %x", hash)
|
||||
}
|
||||
// Otherwise ensure basic validation passes and queue it up
|
||||
if err := pool.validateTx(tx); err != nil {
|
||||
log.Trace("Discarding invalid transaction", "hash", hash, "err", err)
|
||||
invalidTxCounter.Inc(1)
|
||||
return err
|
||||
}
|
||||
pool.enqueueTx(hash, tx)
|
||||
|
||||
// Print a log message if low enough level is set
|
||||
log.Debug("", "msg", log.Lazy{Fn: func() string {
|
||||
rcpt := "[NEW_CONTRACT]"
|
||||
if to := tx.To(); to != nil {
|
||||
rcpt = common.Bytes2Hex(to[:4])
|
||||
}
|
||||
from, _ := types.Sender(pool.signer, tx) // from already verified during tx validation
|
||||
return fmt.Sprintf("(t) 0x%x => %s (%v) %x\n", from[:4], rcpt, tx.Value(), hash)
|
||||
}})
|
||||
log.Debug("Pooled new transaction", "hash", hash, "from", log.Lazy{Fn: func() common.Address { from, _ := types.Sender(pool.signer, tx); return from }}, "to", tx.To())
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -409,7 +400,6 @@ func (pool *TxPool) Add(tx *types.Transaction) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
pool.promoteExecutables(state)
|
||||
|
||||
return nil
|
||||
@ -420,19 +410,21 @@ func (pool *TxPool) AddBatch(txs []*types.Transaction) error {
|
||||
pool.mu.Lock()
|
||||
defer pool.mu.Unlock()
|
||||
|
||||
// Add the batch of transaction, tracking the accepted ones
|
||||
added := 0
|
||||
for _, tx := range txs {
|
||||
if err := pool.add(tx); err != nil {
|
||||
log.Debug(fmt.Sprint("tx error:", err))
|
||||
if err := pool.add(tx); err == nil {
|
||||
added++
|
||||
}
|
||||
}
|
||||
|
||||
state, err := pool.currentState()
|
||||
if err != nil {
|
||||
return err
|
||||
// Only reprocess the internal state if something was actually added
|
||||
if added > 0 {
|
||||
state, err := pool.currentState()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
pool.promoteExecutables(state)
|
||||
}
|
||||
|
||||
pool.promoteExecutables(state)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -513,33 +505,29 @@ func (pool *TxPool) promoteExecutables(state *state.StateDB) {
|
||||
for addr, list := range pool.queue {
|
||||
// Drop all transactions that are deemed too old (low nonce)
|
||||
for _, tx := range list.Forward(state.GetNonce(addr)) {
|
||||
log.Debug("", "msg", log.Lazy{Fn: func() string {
|
||||
return fmt.Sprintf("Removed old queued transaction: %v", tx)
|
||||
}})
|
||||
delete(pool.all, tx.Hash())
|
||||
hash := tx.Hash()
|
||||
log.Debug("Removed old queued transaction", "hash", hash)
|
||||
delete(pool.all, hash)
|
||||
}
|
||||
// Drop all transactions that are too costly (low balance)
|
||||
drops, _ := list.Filter(state.GetBalance(addr))
|
||||
for _, tx := range drops {
|
||||
log.Debug("", "msg", log.Lazy{Fn: func() string {
|
||||
return fmt.Sprintf("Removed unpayable queued transaction: %v", tx)
|
||||
}})
|
||||
delete(pool.all, tx.Hash())
|
||||
hash := tx.Hash()
|
||||
log.Debug("Removed unpayable queued transaction", "hash", hash)
|
||||
delete(pool.all, hash)
|
||||
queuedNofundsCounter.Inc(1)
|
||||
}
|
||||
// Gather all executable transactions and promote them
|
||||
for _, tx := range list.Ready(pool.pendingState.GetNonce(addr)) {
|
||||
log.Debug("", "msg", log.Lazy{Fn: func() string {
|
||||
return fmt.Sprintf("Promoting queued transaction: %v", tx)
|
||||
}})
|
||||
pool.promoteTx(addr, tx.Hash(), tx)
|
||||
hash := tx.Hash()
|
||||
log.Debug("Promoting queued transaction", "hash", hash)
|
||||
pool.promoteTx(addr, hash, tx)
|
||||
}
|
||||
// Drop all transactions over the allowed limit
|
||||
for _, tx := range list.Cap(int(maxQueuedPerAccount)) {
|
||||
log.Debug("", "msg", log.Lazy{Fn: func() string {
|
||||
return fmt.Sprintf("Removed cap-exceeding queued transaction: %v", tx)
|
||||
}})
|
||||
delete(pool.all, tx.Hash())
|
||||
hash := tx.Hash()
|
||||
log.Debug("Removed cap-exceeding queued transaction", "hash", hash)
|
||||
delete(pool.all, hash)
|
||||
queuedRLCounter.Inc(1)
|
||||
}
|
||||
queued += uint64(list.Len())
|
||||
@ -650,25 +638,22 @@ func (pool *TxPool) demoteUnexecutables(state *state.StateDB) {
|
||||
|
||||
// Drop all transactions that are deemed too old (low nonce)
|
||||
for _, tx := range list.Forward(nonce) {
|
||||
log.Debug("", "msg", log.Lazy{Fn: func() string {
|
||||
return fmt.Sprintf("Removed old pending transaction: %v", tx)
|
||||
}})
|
||||
delete(pool.all, tx.Hash())
|
||||
hash := tx.Hash()
|
||||
log.Debug("Removed old pending transaction", "hash", hash)
|
||||
delete(pool.all, hash)
|
||||
}
|
||||
// Drop all transactions that are too costly (low balance), and queue any invalids back for later
|
||||
drops, invalids := list.Filter(state.GetBalance(addr))
|
||||
for _, tx := range drops {
|
||||
log.Debug("", "msg", log.Lazy{Fn: func() string {
|
||||
return fmt.Sprintf("Removed unpayable pending transaction: %v", tx)
|
||||
}})
|
||||
delete(pool.all, tx.Hash())
|
||||
hash := tx.Hash()
|
||||
log.Debug("Removed unpayable pending transaction", "hash", hash)
|
||||
delete(pool.all, hash)
|
||||
pendingNofundsCounter.Inc(1)
|
||||
}
|
||||
for _, tx := range invalids {
|
||||
log.Debug("", "msg", log.Lazy{Fn: func() string {
|
||||
return fmt.Sprintf("Demoting pending transaction: %v", tx)
|
||||
}})
|
||||
pool.enqueueTx(tx.Hash(), tx)
|
||||
hash := tx.Hash()
|
||||
log.Debug("Demoting pending transaction", "hash", hash)
|
||||
pool.enqueueTx(hash, tx)
|
||||
}
|
||||
// Delete the entire queue entry if it became empty.
|
||||
if list.Empty() {
|
||||
|
Reference in New Issue
Block a user